1use std::sync::LazyLock;
2use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
3
4mod engine;
5mod parse;
6mod resolve_mode;
7mod spill_format;
8pub mod spill_path;
9mod spill_policy;
10
11pub use engine::Engine;
12use polars_error::polars_warn;
13pub use resolve_mode::ResolveMode;
14pub use spill_format::SpillFormat;
15pub use spill_policy::SpillPolicy;
16
17const VERBOSE: &str = "POLARS_VERBOSE";
19const DEFAULT_VERBOSE: bool = false;
20
21const WARN_UNKNOWN_CONFIG: &str = "POLARS_WARN_UNKNOWN_CONFIG";
22const DEFAULT_WARN_UNKNOWN_CONFIG: bool = false;
23const WARN_UNSTABLE: &str = "POLARS_WARN_UNSTABLE";
26const DEFAULT_WARN_UNSTABLE: bool = true;
27
28const MAX_THREADS: &str = "POLARS_MAX_THREADS";
29fn default_max_threads() -> u64 {
30 std::thread::available_parallelism()
31 .unwrap_or(std::num::NonZeroUsize::new(4).unwrap())
32 .get() as u64
33}
34
35const IDEAL_MORSEL_SIZE: &str = "POLARS_IDEAL_MORSEL_SIZE";
36const STREAMING_CHUNK_SIZE: &str = "POLARS_STREAMING_CHUNK_SIZE"; const DEFAULT_IDEAL_MORSEL_SIZE: u64 = 100_000;
38
39const ENGINE_AFFINITY: &str = "POLARS_ENGINE_AFFINITY";
40const DEFAULT_ENGINE_AFFINITY: Engine = Engine::Auto;
41
42const PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH: &str =
43 "POLARS_PARQUET_BINARY_STATISTICS_TRUNCATE_LEN";
44const DEFAULT_PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH: u64 = 64;
45
46const PRUNE_PARQUET_METADATA: &str = "POLARS_PRUNE_PARQUET_METADATA";
47const DEFAULT_PRUNE_PARQUET_METADATA: bool = false;
48
49const RESOLVE_METADATA_LEVEL: &str = "POLARS_RESOLVE_METADATA_LEVEL";
50const DEFAULT_RESOLVE_METADATA_LEVEL: ResolveMode = ResolveMode::RowCounts;
51
52const VERBOSE_SENSITIVE: &str = "POLARS_VERBOSE_SENSITIVE";
54const DEFAULT_VERBOSE_SENSITIVE: bool = false;
55
56const FORCE_ASYNC: &str = "POLARS_FORCE_ASYNC";
57const DEFAULT_FORCE_ASYNC: bool = false;
58
59const IMPORT_INTERVAL_AS_STRUCT: &str = "POLARS_IMPORT_INTERVAL_AS_STRUCT";
60const DEFAULT_IMPORT_INTERVAL_AS_STRUCT: bool = false;
61
62const OOC_DRIFT_THRESHOLD: &str = "POLARS_OOC_DRIFT_THRESHOLD";
63const DEFAULT_OOC_DRIFT_THRESHOLD: u64 = 4 * 1024 * 1024;
64
65const OOC_SPILL_POLICY: &str = "POLARS_OOC_SPILL_POLICY";
66const DEFAULT_OOC_SPILL_POLICY: SpillPolicy = SpillPolicy::NoSpill;
67
68const OOC_SPILL_FORMAT: &str = "POLARS_OOC_SPILL_FORMAT";
69const DEFAULT_OOC_SPILL_FORMAT: SpillFormat = SpillFormat::Ipc;
70
71const OOC_MEMORY_BUDGET_FRACTION: &str = "POLARS_OOC_MEMORY_BUDGET_FRACTION";
72const DEFAULT_OOC_MEMORY_BUDGET_FRACTION: f64 = 0.8;
73
74const OOC_SPILL_MIN_BYTES: &str = "POLARS_OOC_SPILL_MIN_BYTES";
75const DEFAULT_OOC_SPILL_MIN_BYTES: u64 = 100 * 1024; const JOIN_SAMPLE_LIMIT: &str = "POLARS_JOIN_SAMPLE_LIMIT";
78const DEFAULT_JOIN_SAMPLE_LIMIT: u64 = 10_000_000;
79
80const PROJECTION_PUSHDOWN_PRUNE_STRICT_HCONCAT_INPUTS: &str =
83 "POLARS_PROJECTION_PUSHDOWN_PRUNE_STRICT_HCONCAT_INPUTS";
84const DEFAULT_PROJECTION_PUSHDOWN_PRUNE_STRICT_HCONCAT_INPUTS: bool = false;
85
86const ALLOW_NESTED_CSPE: &str = "POLARS_ALLOW_NESTED_CSPE";
87const DEFAULT_ALLOW_NESTED_CSPE: bool = false;
88
89static KNOWN_OPTIONS: &[&str] = &[
90 VERBOSE,
92 WARN_UNKNOWN_CONFIG,
93 WARN_UNSTABLE,
94 MAX_THREADS,
95 IDEAL_MORSEL_SIZE,
96 STREAMING_CHUNK_SIZE,
97 ENGINE_AFFINITY,
98 PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH,
99 PRUNE_PARQUET_METADATA,
100 ALLOW_NESTED_CSPE,
101 RESOLVE_METADATA_LEVEL,
102 VERBOSE_SENSITIVE,
126 FORCE_ASYNC,
127 IMPORT_INTERVAL_AS_STRUCT,
128 OOC_DRIFT_THRESHOLD,
129 OOC_SPILL_POLICY,
130 OOC_SPILL_FORMAT,
131 OOC_MEMORY_BUDGET_FRACTION,
132 OOC_SPILL_MIN_BYTES,
133 JOIN_SAMPLE_LIMIT,
134 PROJECTION_PUSHDOWN_PRUNE_STRICT_HCONCAT_INPUTS,
135];
136
137pub struct Config {
138 verbose: AtomicBool,
140 warn_unknown_config: AtomicBool,
141 warn_unstable: AtomicBool,
142 max_threads: AtomicU64,
143 ideal_morsel_size: AtomicU64,
144 engine_affinity: AtomicU8,
145 parquet_binary_statistics_truncate_length: AtomicU64,
146 prune_parquet_metadata: AtomicBool,
147 allow_nested_cspe: AtomicBool,
148 resolve_metadata_level: AtomicU8,
149
150 verbose_sensitive: AtomicBool,
152 force_async: AtomicBool,
153 import_interval_as_struct: AtomicBool,
154 ooc_spill_policy: AtomicU8,
155 ooc_spill_format: AtomicU8,
156 ooc_memory_budget_fraction: AtomicU64,
157 ooc_spill_min_bytes: AtomicU64,
158 join_sample_limit: AtomicU64,
159 projection_pushdown_prune_strict_hconcat_inputs: AtomicBool,
160}
161
162impl Config {
163 fn new() -> Self {
164 let cfg = Self {
165 verbose: AtomicBool::new(DEFAULT_VERBOSE),
167 warn_unknown_config: AtomicBool::new(DEFAULT_WARN_UNKNOWN_CONFIG),
168 warn_unstable: AtomicBool::new(DEFAULT_WARN_UNSTABLE),
169 max_threads: AtomicU64::new(default_max_threads()),
170 ideal_morsel_size: AtomicU64::new(DEFAULT_IDEAL_MORSEL_SIZE),
171 engine_affinity: AtomicU8::new(DEFAULT_ENGINE_AFFINITY as u8),
172 parquet_binary_statistics_truncate_length: AtomicU64::new(
173 DEFAULT_PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH,
174 ),
175 prune_parquet_metadata: AtomicBool::new(DEFAULT_PRUNE_PARQUET_METADATA),
176 resolve_metadata_level: AtomicU8::new(DEFAULT_RESOLVE_METADATA_LEVEL as u8),
177
178 verbose_sensitive: AtomicBool::new(DEFAULT_VERBOSE_SENSITIVE),
180 force_async: AtomicBool::new(DEFAULT_FORCE_ASYNC),
181 import_interval_as_struct: AtomicBool::new(DEFAULT_IMPORT_INTERVAL_AS_STRUCT),
182 ooc_spill_policy: AtomicU8::new(DEFAULT_OOC_SPILL_POLICY as u8),
183 ooc_spill_format: AtomicU8::new(DEFAULT_OOC_SPILL_FORMAT as u8),
184 ooc_memory_budget_fraction: AtomicU64::new(
185 DEFAULT_OOC_MEMORY_BUDGET_FRACTION.to_bits(),
186 ),
187 ooc_spill_min_bytes: AtomicU64::new(DEFAULT_OOC_SPILL_MIN_BYTES),
188 join_sample_limit: AtomicU64::new(DEFAULT_JOIN_SAMPLE_LIMIT),
189 projection_pushdown_prune_strict_hconcat_inputs: AtomicBool::new(
190 DEFAULT_PROJECTION_PUSHDOWN_PRUNE_STRICT_HCONCAT_INPUTS,
191 ),
192 allow_nested_cspe: AtomicBool::new(DEFAULT_ALLOW_NESTED_CSPE),
193 };
194 cfg.reload_env_vars();
195 cfg
196 }
197
198 pub fn reload_env_vars(&self) {
200 self.reload_env_var("POLARS_WARN_UNKNOWN_CONFIG");
202
203 for var in KNOWN_OPTIONS {
204 self.reload_env_var(var);
205 }
206 }
207
208 pub fn reload_env_var(&self, var: &str) {
210 self.apply_env_var(var, std::env::var(var).ok().as_deref());
211 }
212
213 fn apply_env_var(&self, var: &str, val: Option<&str>) {
214 match var {
215 WARN_UNKNOWN_CONFIG => self.warn_unknown_config.store(
217 val.and_then(|x| parse::parse_bool(var, x))
218 .unwrap_or(DEFAULT_WARN_UNKNOWN_CONFIG),
219 Ordering::Relaxed,
220 ),
221 WARN_UNSTABLE => self.warn_unstable.store(
222 val.and_then(|x| parse::parse_bool(var, x))
223 .unwrap_or(DEFAULT_WARN_UNSTABLE),
224 Ordering::Relaxed,
225 ),
226 VERBOSE => self.verbose.store(
227 val.and_then(|x| parse::parse_bool(var, x))
228 .unwrap_or(DEFAULT_VERBOSE),
229 Ordering::Relaxed,
230 ),
231 MAX_THREADS => self.max_threads.store(
232 val.and_then(|x| parse::parse_u64(var, x))
233 .unwrap_or(default_max_threads()),
234 Ordering::Relaxed,
235 ),
236 IDEAL_MORSEL_SIZE | STREAMING_CHUNK_SIZE => self.ideal_morsel_size.store(
237 val.and_then(|x| parse::parse_u64(var, x))
238 .unwrap_or(DEFAULT_IDEAL_MORSEL_SIZE),
239 Ordering::Relaxed,
240 ),
241 ENGINE_AFFINITY => self.engine_affinity.store(
242 val.and_then(|x| parse::parse_engine(var, x))
243 .unwrap_or(DEFAULT_ENGINE_AFFINITY) as u8,
244 Ordering::Relaxed,
245 ),
246 PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH => {
247 self.parquet_binary_statistics_truncate_length.store(
248 val.and_then(|x| parse::parse_u64(var, x))
249 .unwrap_or(DEFAULT_PARQUET_BINARY_STATISTICS_TRUNCATE_LENGTH),
250 Ordering::Relaxed,
251 )
252 },
253 PRUNE_PARQUET_METADATA => self.prune_parquet_metadata.store(
254 val.and_then(|x| parse::parse_bool(var, x))
255 .unwrap_or(DEFAULT_PRUNE_PARQUET_METADATA),
256 Ordering::Relaxed,
257 ),
258 ALLOW_NESTED_CSPE => self.allow_nested_cspe.store(
259 val.and_then(|x| parse::parse_bool(var, x))
260 .unwrap_or(DEFAULT_ALLOW_NESTED_CSPE),
261 Ordering::Relaxed,
262 ),
263 RESOLVE_METADATA_LEVEL => self.resolve_metadata_level.store(
264 val.and_then(|x| parse::parse_resolve_mode(var, x))
265 .unwrap_or(DEFAULT_RESOLVE_METADATA_LEVEL) as u8,
266 Ordering::Relaxed,
267 ),
268
269 VERBOSE_SENSITIVE => self.verbose_sensitive.store(
271 val.and_then(|x| parse::parse_bool(var, x))
272 .unwrap_or(DEFAULT_VERBOSE_SENSITIVE),
273 Ordering::Relaxed,
274 ),
275 FORCE_ASYNC => self.force_async.store(
276 val.and_then(|x| parse::parse_bool(var, x))
277 .unwrap_or(DEFAULT_FORCE_ASYNC),
278 Ordering::Relaxed,
279 ),
280 IMPORT_INTERVAL_AS_STRUCT => self.import_interval_as_struct.store(
281 val.and_then(|x| parse::parse_bool(var, x))
282 .unwrap_or(DEFAULT_IMPORT_INTERVAL_AS_STRUCT),
283 Ordering::Relaxed,
284 ),
285 OOC_DRIFT_THRESHOLD => OOC_DRIFT_THRESHOLD_ATOMIC.store(
286 val.and_then(|x| parse::parse_u64(var, x))
287 .unwrap_or(DEFAULT_OOC_DRIFT_THRESHOLD),
288 Ordering::Relaxed,
289 ),
290 OOC_SPILL_POLICY => self.ooc_spill_policy.store(
291 val.and_then(|x| parse::parse_spill_policy(var, x))
292 .unwrap_or(DEFAULT_OOC_SPILL_POLICY) as u8,
293 Ordering::Relaxed,
294 ),
295 OOC_SPILL_FORMAT => self.ooc_spill_format.store(
296 val.and_then(|x| parse::parse_spill_format(var, x))
297 .unwrap_or(DEFAULT_OOC_SPILL_FORMAT) as u8,
298 Ordering::Relaxed,
299 ),
300 OOC_MEMORY_BUDGET_FRACTION => self.ooc_memory_budget_fraction.store(
301 val.and_then(|x| parse::parse_f64(var, x))
302 .unwrap_or(DEFAULT_OOC_MEMORY_BUDGET_FRACTION)
303 .to_bits(),
304 Ordering::Relaxed,
305 ),
306 OOC_SPILL_MIN_BYTES => self.ooc_spill_min_bytes.store(
307 val.and_then(|x| parse::parse_u64(var, x))
308 .unwrap_or(DEFAULT_OOC_SPILL_MIN_BYTES),
309 Ordering::Relaxed,
310 ),
311 JOIN_SAMPLE_LIMIT => self.join_sample_limit.store(
312 val.and_then(|x| parse::parse_u64(var, x))
313 .unwrap_or(DEFAULT_JOIN_SAMPLE_LIMIT),
314 Ordering::Relaxed,
315 ),
316 PROJECTION_PUSHDOWN_PRUNE_STRICT_HCONCAT_INPUTS => {
317 self.projection_pushdown_prune_strict_hconcat_inputs.store(
318 val.and_then(|x| parse::parse_bool(var, x))
319 .unwrap_or(DEFAULT_PROJECTION_PUSHDOWN_PRUNE_STRICT_HCONCAT_INPUTS),
320 Ordering::Relaxed,
321 )
322 },
323 _ => {
324 if var.starts_with("POLARS_") {
325 if self.warn_unknown_config.load(Ordering::Relaxed) {
326 polars_warn!(
327 "unknown config option '{var}' found in environment variables.\n\nYou can silence this warning by specifying POLARS_WARN_UNKNOWN_CONFIG=0."
328 )
329 }
330 }
331 },
332 }
333 }
334
335 pub fn verbose(&self) -> bool {
337 self.verbose.load(Ordering::Relaxed)
338 }
339
340 pub fn warn_unstable(&self) -> bool {
342 self.warn_unstable.load(Ordering::Relaxed)
343 }
344
345 pub fn max_threads(&self) -> usize {
347 self.max_threads.load(Ordering::Relaxed).try_into().unwrap()
348 }
349
350 pub fn ideal_morsel_size(&self) -> u64 {
352 self.ideal_morsel_size.load(Ordering::Relaxed)
353 }
354
355 pub fn engine_affinity(&self) -> Engine {
357 Engine::from_discriminant(self.engine_affinity.load(Ordering::Relaxed))
358 }
359
360 pub fn parquet_binary_statistics_truncate_length(&self) -> u64 {
362 self.parquet_binary_statistics_truncate_length
363 .load(Ordering::Relaxed)
364 }
365
366 pub fn prune_parquet_metadata(&self) -> bool {
369 self.prune_parquet_metadata.load(Ordering::Relaxed)
370 }
371
372 pub fn allow_nested_cspe(&self) -> bool {
374 self.allow_nested_cspe.load(Ordering::Relaxed)
375 }
376
377 pub fn resolve_metadata_level(&self) -> ResolveMode {
381 ResolveMode::from_discriminant(self.resolve_metadata_level.load(Ordering::Relaxed))
382 }
383
384 pub fn verbose_sensitive(&self) -> bool {
386 self.verbose_sensitive.load(Ordering::Relaxed)
387 }
388
389 pub fn force_async(&self) -> bool {
390 self.force_async.load(Ordering::Relaxed)
391 }
392
393 pub fn import_interval_as_struct(&self) -> bool {
394 self.import_interval_as_struct.load(Ordering::Relaxed)
395 }
396
397 pub fn ooc_drift_threshold(&self) -> u64 {
398 get_ooc_drift_threshold()
399 }
400
401 pub fn ooc_spill_policy(&self) -> SpillPolicy {
402 SpillPolicy::from_discriminant(self.ooc_spill_policy.load(Ordering::Relaxed))
403 }
404
405 pub fn ooc_spill_format(&self) -> SpillFormat {
406 SpillFormat::from_discriminant(self.ooc_spill_format.load(Ordering::Relaxed))
407 }
408
409 pub fn ooc_memory_budget_fraction(&self) -> f64 {
410 f64::from_bits(self.ooc_memory_budget_fraction.load(Ordering::Relaxed))
411 }
412
413 pub fn ooc_spill_min_bytes(&self) -> u64 {
414 self.ooc_spill_min_bytes.load(Ordering::Relaxed)
415 }
416
417 pub fn ooc_spill_dir(&self) -> std::path::PathBuf {
418 if let Ok(dir) = std::env::var("POLARS_OOC_SPILL_DIR") {
419 std::path::PathBuf::from(dir)
420 } else {
421 spill_path::default_ooc_spill_dir()
422 }
423 }
424
425 pub fn join_sample_limit(&self) -> u64 {
426 self.join_sample_limit.load(Ordering::Relaxed)
427 }
428
429 pub fn projection_pushdown_prune_strict_hconcat_inputs(&self) -> bool {
430 self.projection_pushdown_prune_strict_hconcat_inputs
431 .load(Ordering::Relaxed)
432 }
433}
434
435pub fn config() -> &'static Config {
436 static CONFIG: LazyLock<Config> = LazyLock::new(Config::new);
437 &CONFIG
438}
439
440static OOC_DRIFT_THRESHOLD_ATOMIC: AtomicU64 = AtomicU64::new(DEFAULT_OOC_DRIFT_THRESHOLD);
443
444#[inline(always)]
445pub fn get_ooc_drift_threshold() -> u64 {
446 OOC_DRIFT_THRESHOLD_ATOMIC.load(Ordering::Relaxed)
447}