datafusion_execution/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19 any::{Any, TypeId},
20 collections::HashMap,
21 hash::{BuildHasherDefault, Hasher},
22 sync::Arc,
23};
24
25use datafusion_common::{
26 config::{ConfigExtension, ConfigOptions},
27 Result, ScalarValue,
28};
29
30/// Configuration options for [`SessionContext`].
31///
32/// Can be passed to [`SessionContext::new_with_config`] to customize the configuration of DataFusion.
33///
34/// Options can be set using namespaces keys with `.` as the separator, where the
35/// namespace determines which configuration struct the value to routed to. All
36/// built-in options are under the `datafusion` namespace.
37///
38/// For example, the key `datafusion.execution.batch_size` will set [ExecutionOptions::batch_size][datafusion_common::config::ExecutionOptions::batch_size],
39/// because [ConfigOptions::execution] is [ExecutionOptions][datafusion_common::config::ExecutionOptions]. Similarly, the key
40/// `datafusion.execution.parquet.pushdown_filters` will set [ParquetOptions::pushdown_filters][datafusion_common::config::ParquetOptions::pushdown_filters],
41/// since [ExecutionOptions::parquet][datafusion_common::config::ExecutionOptions::parquet] is [ParquetOptions][datafusion_common::config::ParquetOptions].
42///
43/// Some options have convenience methods. For example [SessionConfig::with_batch_size] is
44/// shorthand for setting `datafusion.execution.batch_size`.
45///
46/// ```
47/// use datafusion_execution::config::SessionConfig;
48/// use datafusion_common::ScalarValue;
49///
50/// let config = SessionConfig::new()
51/// .set("datafusion.execution.batch_size", &ScalarValue::UInt64(Some(1234)))
52/// .set_bool("datafusion.execution.parquet.pushdown_filters", true);
53///
54/// assert_eq!(config.batch_size(), 1234);
55/// assert_eq!(config.options().execution.batch_size, 1234);
56/// assert_eq!(config.options().execution.parquet.pushdown_filters, true);
57/// ```
58///
59/// You can also directly mutate the options via [SessionConfig::options_mut].
60/// So the following is equivalent to the above:
61///
62/// ```
63/// # use datafusion_execution::config::SessionConfig;
64/// # use datafusion_common::ScalarValue;
65/// #
66/// let mut config = SessionConfig::new();
67/// config.options_mut().execution.batch_size = 1234;
68/// config.options_mut().execution.parquet.pushdown_filters = true;
69/// #
70/// # assert_eq!(config.batch_size(), 1234);
71/// # assert_eq!(config.options().execution.batch_size, 1234);
72/// # assert_eq!(config.options().execution.parquet.pushdown_filters, true);
73/// ```
74///
75/// ## Built-in options
76///
77/// | Namespace | Config struct |
78/// | --------- | ------------- |
79/// | `datafusion.catalog` | [CatalogOptions][datafusion_common::config::CatalogOptions] |
80/// | `datafusion.execution` | [ExecutionOptions][datafusion_common::config::ExecutionOptions] |
81/// | `datafusion.execution.parquet` | [ParquetOptions][datafusion_common::config::ParquetOptions] |
82/// | `datafusion.optimizer` | [OptimizerOptions][datafusion_common::config::OptimizerOptions] |
83/// | `datafusion.sql_parser` | [SqlParserOptions][datafusion_common::config::SqlParserOptions] |
84/// | `datafusion.explain` | [ExplainOptions][datafusion_common::config::ExplainOptions] |
85///
86/// ## Custom configuration
87///
88/// Configuration options can be extended. See [SessionConfig::with_extension] for details.
89///
90/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
91/// [`SessionContext::new_with_config`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.new_with_config
92#[derive(Clone, Debug)]
93pub struct SessionConfig {
94 /// Configuration options
95 options: ConfigOptions,
96 /// Opaque extensions.
97 extensions: AnyMap,
98}
99
100impl Default for SessionConfig {
101 fn default() -> Self {
102 Self {
103 options: ConfigOptions::new(),
104 // Assume no extensions by default.
105 extensions: HashMap::with_capacity_and_hasher(
106 0,
107 BuildHasherDefault::default(),
108 ),
109 }
110 }
111}
112
113impl SessionConfig {
114 /// Create an execution config with default setting
115 pub fn new() -> Self {
116 Default::default()
117 }
118
119 /// Create an execution config with config options read from the environment
120 pub fn from_env() -> Result<Self> {
121 Ok(ConfigOptions::from_env()?.into())
122 }
123
124 /// Create new ConfigOptions struct, taking values from a string hash map.
125 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
126 Ok(ConfigOptions::from_string_hash_map(settings)?.into())
127 }
128
129 /// Return a handle to the configuration options.
130 ///
131 /// Can be used to read the current configuration.
132 ///
133 /// ```
134 /// use datafusion_execution::config::SessionConfig;
135 ///
136 /// let config = SessionConfig::new();
137 /// assert!(config.options().execution.batch_size > 0);
138 /// ```
139 pub fn options(&self) -> &ConfigOptions {
140 &self.options
141 }
142
143 /// Return a mutable handle to the configuration options.
144 ///
145 /// Can be used to set configuration options.
146 ///
147 /// ```
148 /// use datafusion_execution::config::SessionConfig;
149 ///
150 /// let mut config = SessionConfig::new();
151 /// config.options_mut().execution.batch_size = 1024;
152 /// assert_eq!(config.options().execution.batch_size, 1024);
153 /// ```
154 pub fn options_mut(&mut self) -> &mut ConfigOptions {
155 &mut self.options
156 }
157
158 /// Set a configuration option
159 pub fn set(self, key: &str, value: &ScalarValue) -> Self {
160 self.set_str(key, &value.to_string())
161 }
162
163 /// Set a boolean configuration option
164 pub fn set_bool(self, key: &str, value: bool) -> Self {
165 self.set_str(key, &value.to_string())
166 }
167
168 /// Set a generic `u64` configuration option
169 pub fn set_u64(self, key: &str, value: u64) -> Self {
170 self.set_str(key, &value.to_string())
171 }
172
173 /// Set a generic `usize` configuration option
174 pub fn set_usize(self, key: &str, value: usize) -> Self {
175 self.set_str(key, &value.to_string())
176 }
177
178 /// Set a generic `str` configuration option
179 pub fn set_str(mut self, key: &str, value: &str) -> Self {
180 self.options.set(key, value).unwrap();
181 self
182 }
183
184 /// Customize batch size
185 pub fn with_batch_size(mut self, n: usize) -> Self {
186 // batch size must be greater than zero
187 assert!(n > 0);
188 self.options.execution.batch_size = n;
189 self
190 }
191
192 /// Customize [`target_partitions`]
193 ///
194 /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
195 pub fn with_target_partitions(mut self, n: usize) -> Self {
196 // partition count must be greater than zero
197 assert!(n > 0);
198 self.options.execution.target_partitions = n;
199 self
200 }
201
202 /// Insert new [ConfigExtension]
203 pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
204 self.options_mut().extensions.insert(extension);
205 self
206 }
207
208 /// Get [`target_partitions`]
209 ///
210 /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
211 pub fn target_partitions(&self) -> usize {
212 self.options.execution.target_partitions
213 }
214
215 /// Is the information schema enabled?
216 pub fn information_schema(&self) -> bool {
217 self.options.catalog.information_schema
218 }
219
220 /// Should the context create the default catalog and schema?
221 pub fn create_default_catalog_and_schema(&self) -> bool {
222 self.options.catalog.create_default_catalog_and_schema
223 }
224
225 /// Are joins repartitioned during execution?
226 pub fn repartition_joins(&self) -> bool {
227 self.options.optimizer.repartition_joins
228 }
229
230 /// Are aggregates repartitioned during execution?
231 pub fn repartition_aggregations(&self) -> bool {
232 self.options.optimizer.repartition_aggregations
233 }
234
235 /// Are window functions repartitioned during execution?
236 pub fn repartition_window_functions(&self) -> bool {
237 self.options.optimizer.repartition_windows
238 }
239
240 /// Do we execute sorts in a per-partition fashion and merge afterwards,
241 /// or do we coalesce partitions first and sort globally?
242 pub fn repartition_sorts(&self) -> bool {
243 self.options.optimizer.repartition_sorts
244 }
245
246 /// Prefer existing sort (true) or maximize parallelism (false). See
247 /// [prefer_existing_sort] for more details
248 ///
249 /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
250 pub fn prefer_existing_sort(&self) -> bool {
251 self.options.optimizer.prefer_existing_sort
252 }
253
254 /// Are statistics collected during execution?
255 pub fn collect_statistics(&self) -> bool {
256 self.options.execution.collect_statistics
257 }
258
259 /// Selects a name for the default catalog and schema
260 pub fn with_default_catalog_and_schema(
261 mut self,
262 catalog: impl Into<String>,
263 schema: impl Into<String>,
264 ) -> Self {
265 self.options.catalog.default_catalog = catalog.into();
266 self.options.catalog.default_schema = schema.into();
267 self
268 }
269
270 /// Controls whether the default catalog and schema will be automatically created
271 pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
272 self.options.catalog.create_default_catalog_and_schema = create;
273 self
274 }
275
276 /// Enables or disables the inclusion of `information_schema` virtual tables
277 pub fn with_information_schema(mut self, enabled: bool) -> Self {
278 self.options.catalog.information_schema = enabled;
279 self
280 }
281
282 /// Enables or disables the use of repartitioning for joins to improve parallelism
283 pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
284 self.options.optimizer.repartition_joins = enabled;
285 self
286 }
287
288 /// Enables or disables the use of repartitioning for aggregations to improve parallelism
289 pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
290 self.options.optimizer.repartition_aggregations = enabled;
291 self
292 }
293
294 /// Sets minimum file range size for repartitioning scans
295 pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
296 self.options.optimizer.repartition_file_min_size = size;
297 self
298 }
299
300 /// Enables or disables the allowing unordered symmetric hash join
301 pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
302 self.options.optimizer.allow_symmetric_joins_without_pruning = enabled;
303 self
304 }
305
306 /// Enables or disables the use of repartitioning for file scans
307 pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
308 self.options.optimizer.repartition_file_scans = enabled;
309 self
310 }
311
312 /// Enables or disables the use of repartitioning for window functions to improve parallelism
313 pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
314 self.options.optimizer.repartition_windows = enabled;
315 self
316 }
317
318 /// Enables or disables the use of per-partition sorting to improve parallelism
319 pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
320 self.options.optimizer.repartition_sorts = enabled;
321 self
322 }
323
324 /// Prefer existing sort (true) or maximize parallelism (false). See
325 /// [prefer_existing_sort] for more details
326 ///
327 /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
328 pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
329 self.options.optimizer.prefer_existing_sort = enabled;
330 self
331 }
332
333 /// Prefer existing union (true). See [prefer_existing_union] for more details
334 ///
335 /// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
336 pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
337 self.options.optimizer.prefer_existing_union = enabled;
338 self
339 }
340
341 /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
342 pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
343 self.options.execution.parquet.pruning = enabled;
344 self
345 }
346
347 /// Returns true if pruning predicate should be used to skip parquet row groups
348 pub fn parquet_pruning(&self) -> bool {
349 self.options.execution.parquet.pruning
350 }
351
352 /// Returns true if bloom filter should be used to skip parquet row groups
353 pub fn parquet_bloom_filter_pruning(&self) -> bool {
354 self.options.execution.parquet.bloom_filter_on_read
355 }
356
357 /// Enables or disables the use of bloom filter for parquet readers to skip row groups
358 pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
359 self.options.execution.parquet.bloom_filter_on_read = enabled;
360 self
361 }
362
363 /// Returns true if page index should be used to skip parquet data pages
364 pub fn parquet_page_index_pruning(&self) -> bool {
365 self.options.execution.parquet.enable_page_index
366 }
367
368 /// Enables or disables the use of page index for parquet readers to skip parquet data pages
369 pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
370 self.options.execution.parquet.enable_page_index = enabled;
371 self
372 }
373
374 /// Enables or disables the collection of statistics after listing files
375 pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
376 self.options.execution.collect_statistics = enabled;
377 self
378 }
379
380 /// Get the currently configured batch size
381 pub fn batch_size(&self) -> usize {
382 self.options.execution.batch_size
383 }
384
385 /// Enables or disables the coalescence of small batches into larger batches
386 pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
387 self.options.execution.coalesce_batches = enabled;
388 self
389 }
390
391 /// Returns true if record batches will be examined between each operator
392 /// and small batches will be coalesced into larger batches.
393 pub fn coalesce_batches(&self) -> bool {
394 self.options.execution.coalesce_batches
395 }
396
397 /// Enables or disables the round robin repartition for increasing parallelism
398 pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
399 self.options.optimizer.enable_round_robin_repartition = enabled;
400 self
401 }
402
403 /// Returns true if the physical plan optimizer will try to
404 /// add round robin repartition to increase parallelism to leverage more CPU cores.
405 pub fn round_robin_repartition(&self) -> bool {
406 self.options.optimizer.enable_round_robin_repartition
407 }
408
409 /// Set the size of [`sort_spill_reservation_bytes`] to control
410 /// memory pre-reservation
411 ///
412 /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
413 pub fn with_sort_spill_reservation_bytes(
414 mut self,
415 sort_spill_reservation_bytes: usize,
416 ) -> Self {
417 self.options.execution.sort_spill_reservation_bytes =
418 sort_spill_reservation_bytes;
419 self
420 }
421
422 /// Set the size of [`sort_in_place_threshold_bytes`] to control
423 /// how sort does things.
424 ///
425 /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
426 pub fn with_sort_in_place_threshold_bytes(
427 mut self,
428 sort_in_place_threshold_bytes: usize,
429 ) -> Self {
430 self.options.execution.sort_in_place_threshold_bytes =
431 sort_in_place_threshold_bytes;
432 self
433 }
434
435 /// Enables or disables the enforcement of batch size in joins
436 pub fn with_enforce_batch_size_in_joins(
437 mut self,
438 enforce_batch_size_in_joins: bool,
439 ) -> Self {
440 self.options.execution.enforce_batch_size_in_joins = enforce_batch_size_in_joins;
441 self
442 }
443
444 /// Returns true if the joins will be enforced to output batches of the configured size
445 pub fn enforce_batch_size_in_joins(&self) -> bool {
446 self.options.execution.enforce_batch_size_in_joins
447 }
448
449 /// Convert configuration options to name-value pairs with values
450 /// converted to strings.
451 ///
452 /// Note that this method will eventually be deprecated and
453 /// replaced by [`options`].
454 ///
455 /// [`options`]: Self::options
456 pub fn to_props(&self) -> HashMap<String, String> {
457 let mut map = HashMap::new();
458 // copy configs from config_options
459 for entry in self.options.entries() {
460 map.insert(entry.key, entry.value.unwrap_or_default());
461 }
462
463 map
464 }
465
466 /// Add extensions.
467 ///
468 /// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
469 /// Extensions are opaque and the types are unknown to DataFusion itself, which makes them extremely flexible. [^1]
470 ///
471 /// Extensions are stored within an [`Arc`] so they do NOT require [`Clone`]. The are immutable. If you need to
472 /// modify their state over their lifetime -- e.g. for caches -- you need to establish some for of interior mutability.
473 ///
474 /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
475 /// will be kept.
476 ///
477 /// You may use [`get_extension`](Self::get_extension) to retrieve extensions.
478 ///
479 /// # Example
480 /// ```
481 /// use std::sync::Arc;
482 /// use datafusion_execution::config::SessionConfig;
483 ///
484 /// // application-specific extension types
485 /// struct Ext1(u8);
486 /// struct Ext2(u8);
487 /// struct Ext3(u8);
488 ///
489 /// let ext1a = Arc::new(Ext1(10));
490 /// let ext1b = Arc::new(Ext1(11));
491 /// let ext2 = Arc::new(Ext2(2));
492 ///
493 /// let cfg = SessionConfig::default()
494 /// // will only remember the last Ext1
495 /// .with_extension(Arc::clone(&ext1a))
496 /// .with_extension(Arc::clone(&ext1b))
497 /// .with_extension(Arc::clone(&ext2));
498 ///
499 /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
500 /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
501 /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
502 ///
503 /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
504 /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
505 ///
506 /// assert!(cfg.get_extension::<Ext3>().is_none());
507 /// ```
508 ///
509 /// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
510 pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
511 where
512 T: Send + Sync + 'static,
513 {
514 self.set_extension(ext);
515 self
516 }
517
518 /// Set extension. Pretty much the same as [`with_extension`](Self::with_extension), but take
519 /// mutable reference instead of owning it. Useful if you want to add another extension after
520 /// the [`SessionConfig`] is created.
521 ///
522 /// # Example
523 /// ```
524 /// use std::sync::Arc;
525 /// use datafusion_execution::config::SessionConfig;
526 ///
527 /// // application-specific extension types
528 /// struct Ext1(u8);
529 /// struct Ext2(u8);
530 /// struct Ext3(u8);
531 ///
532 /// let ext1a = Arc::new(Ext1(10));
533 /// let ext1b = Arc::new(Ext1(11));
534 /// let ext2 = Arc::new(Ext2(2));
535 ///
536 /// let mut cfg = SessionConfig::default();
537 ///
538 /// // will only remember the last Ext1
539 /// cfg.set_extension(Arc::clone(&ext1a));
540 /// cfg.set_extension(Arc::clone(&ext1b));
541 /// cfg.set_extension(Arc::clone(&ext2));
542 ///
543 /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
544 /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
545 /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
546 ///
547 /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
548 /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
549 ///
550 /// assert!(cfg.get_extension::<Ext3>().is_none());
551 /// ```
552 pub fn set_extension<T>(&mut self, ext: Arc<T>)
553 where
554 T: Send + Sync + 'static,
555 {
556 let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
557 let id = TypeId::of::<T>();
558 self.extensions.insert(id, ext);
559 }
560
561 /// Get extension, if any for the specified type `T` exists.
562 ///
563 /// See [`with_extension`](Self::with_extension) on how to add attach extensions.
564 pub fn get_extension<T>(&self) -> Option<Arc<T>>
565 where
566 T: Send + Sync + 'static,
567 {
568 let id = TypeId::of::<T>();
569 self.extensions
570 .get(&id)
571 .cloned()
572 .map(|ext| Arc::downcast(ext).expect("TypeId unique"))
573 }
574}
575
576impl From<ConfigOptions> for SessionConfig {
577 fn from(options: ConfigOptions) -> Self {
578 Self {
579 options,
580 ..Default::default()
581 }
582 }
583}
584
585/// Map that holds opaque objects indexed by their type.
586///
587/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
588///
589/// [object safe]: https://doc.rust-lang.org/reference/items/traits.html#object-safety
590type AnyMap =
591 HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>, BuildHasherDefault<IdHasher>>;
592
593/// Hasher for [`AnyMap`].
594///
595/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, coming from the compiler.
596/// The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then returns it, instead of doing any bit fiddling.
597#[derive(Default)]
598struct IdHasher(u64);
599
600impl Hasher for IdHasher {
601 fn write(&mut self, _: &[u8]) {
602 unreachable!("TypeId calls write_u64");
603 }
604
605 #[inline]
606 fn write_u64(&mut self, id: u64) {
607 self.0 = id;
608 }
609
610 #[inline]
611 fn finish(&self) -> u64 {
612 self.0
613 }
614}