use std::num::NonZeroUsize;
use std::sync::Arc;
use polars_core::config;
use polars_error::PolarsResult;
use polars_io::cloud::CloudOptions;
use polars_io::utils::byte_source::{ByteSource, DynByteSourceBuilder};
use polars_io::utils::compression::SupportedCompression;
use polars_plan::dsl::ScanSource;
use polars_utils::slice_enum::Slice;
pub mod resolve_projections;
pub mod resolve_slice;
pub fn calc_n_readers_pre_init(
num_pipelines: usize,
num_sources: usize,
pre_slice: Option<&Slice>,
) -> usize {
if let Ok(v) = std::env::var("POLARS_NUM_READERS_PRE_INIT").map(|x| {
x.parse::<NonZeroUsize>()
.unwrap_or_else(|_| panic!("invalid value for POLARS_NUM_READERS_PRE_INIT: {x}"))
.get()
}) {
return v;
}
let max_files_with_slice = match pre_slice {
Some(v @ Slice::Positive { .. }) => v.end_position().max(1),
Some(Slice::Negative { .. }) | None => usize::MAX,
};
num_pipelines
.saturating_add(3)
.min(max_files_with_slice)
.min(num_sources)
.clamp(1, 128)
}
pub fn calc_max_concurrent_scans(num_pipelines: usize, num_sources: usize) -> usize {
if let Ok(v) = std::env::var("POLARS_MAX_CONCURRENT_SCANS").map(|x| {
x.parse::<NonZeroUsize>()
.unwrap_or_else(|_| panic!("invalid value for POLARS_MAX_CONCURRENT_SCANS: {x}"))
.get()
}) {
return v;
}
num_pipelines.min(num_sources).clamp(1, 128)
}
pub async fn is_compressed_source(
scan_source: ScanSource,
cloud_options: Option<Arc<CloudOptions>>,
) -> PolarsResult<bool> {
let byte_source_builder = if scan_source.is_cloud_url() || config::force_async() {
DynByteSourceBuilder::ObjectStore
} else {
DynByteSourceBuilder::Mmap
};
let byte_source = scan_source
.as_scan_source_ref()
.to_dyn_byte_source(&byte_source_builder, cloud_options.as_deref())
.await?;
let Ok(first_4_bytes) = byte_source.get_range(0..4).await else {
return Ok(false);
};
Ok(SupportedCompression::check(&first_4_bytes).is_some())
}