datafusion_execution/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    any::{Any, TypeId},
20    collections::HashMap,
21    hash::{BuildHasherDefault, Hasher},
22    sync::Arc,
23};
24
25use datafusion_common::{
26    config::{ConfigExtension, ConfigOptions},
27    Result, ScalarValue,
28};
29
30/// Configuration options for [`SessionContext`].
31///
32/// Can be passed to [`SessionContext::new_with_config`] to customize the configuration of DataFusion.
33///
34/// Options can be set using namespaces keys with `.` as the separator, where the
35/// namespace determines which configuration struct the value to routed to. All
36/// built-in options are under the `datafusion` namespace.
37///
38/// For example, the key `datafusion.execution.batch_size` will set [ExecutionOptions::batch_size][datafusion_common::config::ExecutionOptions::batch_size],
39/// because [ConfigOptions::execution] is [ExecutionOptions][datafusion_common::config::ExecutionOptions]. Similarly, the key
40/// `datafusion.execution.parquet.pushdown_filters` will set [ParquetOptions::pushdown_filters][datafusion_common::config::ParquetOptions::pushdown_filters],
41/// since [ExecutionOptions::parquet][datafusion_common::config::ExecutionOptions::parquet] is [ParquetOptions][datafusion_common::config::ParquetOptions].
42///
43/// Some options have convenience methods. For example [SessionConfig::with_batch_size] is
44/// shorthand for setting `datafusion.execution.batch_size`.
45///
46/// ```
47/// use datafusion_execution::config::SessionConfig;
48/// use datafusion_common::ScalarValue;
49///
50/// let config = SessionConfig::new()
51///    .set("datafusion.execution.batch_size", &ScalarValue::UInt64(Some(1234)))
52///    .set_bool("datafusion.execution.parquet.pushdown_filters", true);
53///
54/// assert_eq!(config.batch_size(), 1234);
55/// assert_eq!(config.options().execution.batch_size, 1234);
56/// assert_eq!(config.options().execution.parquet.pushdown_filters, true);
57/// ```
58///
59/// You can also directly mutate the options via [SessionConfig::options_mut].
60/// So the following is equivalent to the above:
61///
62/// ```
63/// # use datafusion_execution::config::SessionConfig;
64/// # use datafusion_common::ScalarValue;
65/// #
66/// let mut config = SessionConfig::new();
67/// config.options_mut().execution.batch_size = 1234;
68/// config.options_mut().execution.parquet.pushdown_filters = true;
69/// #
70/// # assert_eq!(config.batch_size(), 1234);
71/// # assert_eq!(config.options().execution.batch_size, 1234);
72/// # assert_eq!(config.options().execution.parquet.pushdown_filters, true);
73/// ```
74///
75/// ## Built-in options
76///
77/// | Namespace | Config struct |
78/// | --------- | ------------- |
79/// | `datafusion.catalog` | [CatalogOptions][datafusion_common::config::CatalogOptions] |
80/// | `datafusion.execution` | [ExecutionOptions][datafusion_common::config::ExecutionOptions] |
81/// | `datafusion.execution.parquet` | [ParquetOptions][datafusion_common::config::ParquetOptions] |
82/// | `datafusion.optimizer` | [OptimizerOptions][datafusion_common::config::OptimizerOptions] |
83/// | `datafusion.sql_parser` | [SqlParserOptions][datafusion_common::config::SqlParserOptions] |
84/// | `datafusion.explain` | [ExplainOptions][datafusion_common::config::ExplainOptions] |
85///
86/// ## Custom configuration
87///
88/// Configuration options can be extended. See [SessionConfig::with_extension] for details.
89///
90/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
91/// [`SessionContext::new_with_config`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.new_with_config
92#[derive(Clone, Debug)]
93pub struct SessionConfig {
94    /// Configuration options
95    options: ConfigOptions,
96    /// Opaque extensions.
97    extensions: AnyMap,
98}
99
100impl Default for SessionConfig {
101    fn default() -> Self {
102        Self {
103            options: ConfigOptions::new(),
104            // Assume no extensions by default.
105            extensions: HashMap::with_capacity_and_hasher(
106                0,
107                BuildHasherDefault::default(),
108            ),
109        }
110    }
111}
112
113impl SessionConfig {
114    /// Create an execution config with default setting
115    pub fn new() -> Self {
116        Default::default()
117    }
118
119    /// Create an execution config with config options read from the environment
120    pub fn from_env() -> Result<Self> {
121        Ok(ConfigOptions::from_env()?.into())
122    }
123
124    /// Create new ConfigOptions struct, taking values from a string hash map.
125    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
126        Ok(ConfigOptions::from_string_hash_map(settings)?.into())
127    }
128
129    /// Return a handle to the configuration options.
130    ///
131    /// Can be used to read the current configuration.
132    ///
133    /// ```
134    /// use datafusion_execution::config::SessionConfig;
135    ///
136    /// let config = SessionConfig::new();
137    /// assert!(config.options().execution.batch_size > 0);
138    /// ```
139    pub fn options(&self) -> &ConfigOptions {
140        &self.options
141    }
142
143    /// Return a mutable handle to the configuration options.
144    ///
145    /// Can be used to set configuration options.
146    ///
147    /// ```
148    /// use datafusion_execution::config::SessionConfig;
149    ///
150    /// let mut config = SessionConfig::new();
151    /// config.options_mut().execution.batch_size = 1024;
152    /// assert_eq!(config.options().execution.batch_size, 1024);
153    /// ```
154    pub fn options_mut(&mut self) -> &mut ConfigOptions {
155        &mut self.options
156    }
157
158    /// Set a configuration option
159    pub fn set(self, key: &str, value: &ScalarValue) -> Self {
160        self.set_str(key, &value.to_string())
161    }
162
163    /// Set a boolean configuration option
164    pub fn set_bool(self, key: &str, value: bool) -> Self {
165        self.set_str(key, &value.to_string())
166    }
167
168    /// Set a generic `u64` configuration option
169    pub fn set_u64(self, key: &str, value: u64) -> Self {
170        self.set_str(key, &value.to_string())
171    }
172
173    /// Set a generic `usize` configuration option
174    pub fn set_usize(self, key: &str, value: usize) -> Self {
175        self.set_str(key, &value.to_string())
176    }
177
178    /// Set a generic `str` configuration option
179    pub fn set_str(mut self, key: &str, value: &str) -> Self {
180        self.options.set(key, value).unwrap();
181        self
182    }
183
184    /// Customize batch size
185    pub fn with_batch_size(mut self, n: usize) -> Self {
186        // batch size must be greater than zero
187        assert!(n > 0);
188        self.options.execution.batch_size = n;
189        self
190    }
191
192    /// Customize [`target_partitions`]
193    ///
194    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
195    pub fn with_target_partitions(mut self, n: usize) -> Self {
196        self.options.execution.target_partitions = if n == 0 {
197            datafusion_common::config::ExecutionOptions::default().target_partitions
198        } else {
199            n
200        };
201        self
202    }
203
204    /// Insert new [ConfigExtension]
205    pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
206        self.options_mut().extensions.insert(extension);
207        self
208    }
209
210    /// Get [`target_partitions`]
211    ///
212    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
213    pub fn target_partitions(&self) -> usize {
214        self.options.execution.target_partitions
215    }
216
217    /// Is the information schema enabled?
218    pub fn information_schema(&self) -> bool {
219        self.options.catalog.information_schema
220    }
221
222    /// Should the context create the default catalog and schema?
223    pub fn create_default_catalog_and_schema(&self) -> bool {
224        self.options.catalog.create_default_catalog_and_schema
225    }
226
227    /// Are joins repartitioned during execution?
228    pub fn repartition_joins(&self) -> bool {
229        self.options.optimizer.repartition_joins
230    }
231
232    /// Are aggregates repartitioned during execution?
233    pub fn repartition_aggregations(&self) -> bool {
234        self.options.optimizer.repartition_aggregations
235    }
236
237    /// Are window functions repartitioned during execution?
238    pub fn repartition_window_functions(&self) -> bool {
239        self.options.optimizer.repartition_windows
240    }
241
242    /// Do we execute sorts in a per-partition fashion and merge afterwards,
243    /// or do we coalesce partitions first and sort globally?
244    pub fn repartition_sorts(&self) -> bool {
245        self.options.optimizer.repartition_sorts
246    }
247
248    /// Prefer existing sort (true) or maximize parallelism (false). See
249    /// [prefer_existing_sort] for more details
250    ///
251    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
252    pub fn prefer_existing_sort(&self) -> bool {
253        self.options.optimizer.prefer_existing_sort
254    }
255
256    /// Are statistics collected during execution?
257    pub fn collect_statistics(&self) -> bool {
258        self.options.execution.collect_statistics
259    }
260
261    /// Selects a name for the default catalog and schema
262    pub fn with_default_catalog_and_schema(
263        mut self,
264        catalog: impl Into<String>,
265        schema: impl Into<String>,
266    ) -> Self {
267        self.options.catalog.default_catalog = catalog.into();
268        self.options.catalog.default_schema = schema.into();
269        self
270    }
271
272    /// Controls whether the default catalog and schema will be automatically created
273    pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
274        self.options.catalog.create_default_catalog_and_schema = create;
275        self
276    }
277
278    /// Enables or disables the inclusion of `information_schema` virtual tables
279    pub fn with_information_schema(mut self, enabled: bool) -> Self {
280        self.options.catalog.information_schema = enabled;
281        self
282    }
283
284    /// Enables or disables the use of repartitioning for joins to improve parallelism
285    pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
286        self.options.optimizer.repartition_joins = enabled;
287        self
288    }
289
290    /// Enables or disables the use of repartitioning for aggregations to improve parallelism
291    pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
292        self.options.optimizer.repartition_aggregations = enabled;
293        self
294    }
295
296    /// Sets minimum file range size for repartitioning scans
297    pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
298        self.options.optimizer.repartition_file_min_size = size;
299        self
300    }
301
302    /// Enables or disables the allowing unordered symmetric hash join
303    pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
304        self.options.optimizer.allow_symmetric_joins_without_pruning = enabled;
305        self
306    }
307
308    /// Enables or disables the use of repartitioning for file scans
309    pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
310        self.options.optimizer.repartition_file_scans = enabled;
311        self
312    }
313
314    /// Enables or disables the use of repartitioning for window functions to improve parallelism
315    pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
316        self.options.optimizer.repartition_windows = enabled;
317        self
318    }
319
320    /// Enables or disables the use of per-partition sorting to improve parallelism
321    pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
322        self.options.optimizer.repartition_sorts = enabled;
323        self
324    }
325
326    /// Prefer existing sort (true) or maximize parallelism (false). See
327    /// [prefer_existing_sort] for more details
328    ///
329    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
330    pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
331        self.options.optimizer.prefer_existing_sort = enabled;
332        self
333    }
334
335    /// Prefer existing union (true). See [prefer_existing_union] for more details
336    ///
337    /// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
338    pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
339        self.options.optimizer.prefer_existing_union = enabled;
340        self
341    }
342
343    /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
344    pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
345        self.options.execution.parquet.pruning = enabled;
346        self
347    }
348
349    /// Returns true if pruning predicate should be used to skip parquet row groups
350    pub fn parquet_pruning(&self) -> bool {
351        self.options.execution.parquet.pruning
352    }
353
354    /// Returns true if bloom filter should be used to skip parquet row groups
355    pub fn parquet_bloom_filter_pruning(&self) -> bool {
356        self.options.execution.parquet.bloom_filter_on_read
357    }
358
359    /// Enables or disables the use of bloom filter for parquet readers to skip row groups
360    pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
361        self.options.execution.parquet.bloom_filter_on_read = enabled;
362        self
363    }
364
365    /// Returns true if page index should be used to skip parquet data pages
366    pub fn parquet_page_index_pruning(&self) -> bool {
367        self.options.execution.parquet.enable_page_index
368    }
369
370    /// Enables or disables the use of page index for parquet readers to skip parquet data pages
371    pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
372        self.options.execution.parquet.enable_page_index = enabled;
373        self
374    }
375
376    /// Enables or disables the collection of statistics after listing files
377    pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
378        self.options.execution.collect_statistics = enabled;
379        self
380    }
381
382    /// Get the currently configured batch size
383    pub fn batch_size(&self) -> usize {
384        self.options.execution.batch_size
385    }
386
387    /// Enables or disables the coalescence of small batches into larger batches
388    pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
389        self.options.execution.coalesce_batches = enabled;
390        self
391    }
392
393    /// Returns true if record batches will be examined between each operator
394    /// and small batches will be coalesced into larger batches.
395    pub fn coalesce_batches(&self) -> bool {
396        self.options.execution.coalesce_batches
397    }
398
399    /// Enables or disables the round robin repartition for increasing parallelism
400    pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
401        self.options.optimizer.enable_round_robin_repartition = enabled;
402        self
403    }
404
405    /// Returns true if the physical plan optimizer will try to
406    /// add round robin repartition to increase parallelism to leverage more CPU cores.
407    pub fn round_robin_repartition(&self) -> bool {
408        self.options.optimizer.enable_round_robin_repartition
409    }
410
411    /// Set the size of [`sort_spill_reservation_bytes`] to control
412    /// memory pre-reservation
413    ///
414    /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
415    pub fn with_sort_spill_reservation_bytes(
416        mut self,
417        sort_spill_reservation_bytes: usize,
418    ) -> Self {
419        self.options.execution.sort_spill_reservation_bytes =
420            sort_spill_reservation_bytes;
421        self
422    }
423
424    /// Set the size of [`sort_in_place_threshold_bytes`] to control
425    /// how sort does things.
426    ///
427    /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
428    pub fn with_sort_in_place_threshold_bytes(
429        mut self,
430        sort_in_place_threshold_bytes: usize,
431    ) -> Self {
432        self.options.execution.sort_in_place_threshold_bytes =
433            sort_in_place_threshold_bytes;
434        self
435    }
436
437    /// Enables or disables the enforcement of batch size in joins
438    pub fn with_enforce_batch_size_in_joins(
439        mut self,
440        enforce_batch_size_in_joins: bool,
441    ) -> Self {
442        self.options.execution.enforce_batch_size_in_joins = enforce_batch_size_in_joins;
443        self
444    }
445
446    /// Returns true if the joins will be enforced to output batches of the configured size
447    pub fn enforce_batch_size_in_joins(&self) -> bool {
448        self.options.execution.enforce_batch_size_in_joins
449    }
450
451    /// Convert configuration options to name-value pairs with values
452    /// converted to strings.
453    ///
454    /// Note that this method will eventually be deprecated and
455    /// replaced by [`options`].
456    ///
457    /// [`options`]: Self::options
458    pub fn to_props(&self) -> HashMap<String, String> {
459        let mut map = HashMap::new();
460        // copy configs from config_options
461        for entry in self.options.entries() {
462            map.insert(entry.key, entry.value.unwrap_or_default());
463        }
464
465        map
466    }
467
468    /// Add extensions.
469    ///
470    /// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
471    /// Extensions are opaque and the types are unknown to DataFusion itself, which makes them extremely flexible. [^1]
472    ///
473    /// Extensions are stored within an [`Arc`] so they do NOT require [`Clone`]. The are immutable. If you need to
474    /// modify their state over their lifetime -- e.g. for caches -- you need to establish some for of interior mutability.
475    ///
476    /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
477    /// will be kept.
478    ///
479    /// You may use [`get_extension`](Self::get_extension) to retrieve extensions.
480    ///
481    /// # Example
482    /// ```
483    /// use std::sync::Arc;
484    /// use datafusion_execution::config::SessionConfig;
485    ///
486    /// // application-specific extension types
487    /// struct Ext1(u8);
488    /// struct Ext2(u8);
489    /// struct Ext3(u8);
490    ///
491    /// let ext1a = Arc::new(Ext1(10));
492    /// let ext1b = Arc::new(Ext1(11));
493    /// let ext2 = Arc::new(Ext2(2));
494    ///
495    /// let cfg = SessionConfig::default()
496    ///     // will only remember the last Ext1
497    ///     .with_extension(Arc::clone(&ext1a))
498    ///     .with_extension(Arc::clone(&ext1b))
499    ///     .with_extension(Arc::clone(&ext2));
500    ///
501    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
502    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
503    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
504    ///
505    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
506    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
507    ///
508    /// assert!(cfg.get_extension::<Ext3>().is_none());
509    /// ```
510    ///
511    /// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
512    pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
513    where
514        T: Send + Sync + 'static,
515    {
516        self.set_extension(ext);
517        self
518    }
519
520    /// Set extension. Pretty much the same as [`with_extension`](Self::with_extension), but take
521    /// mutable reference instead of owning it. Useful if you want to add another extension after
522    /// the [`SessionConfig`] is created.
523    ///
524    /// # Example
525    /// ```
526    /// use std::sync::Arc;
527    /// use datafusion_execution::config::SessionConfig;
528    ///
529    /// // application-specific extension types
530    /// struct Ext1(u8);
531    /// struct Ext2(u8);
532    /// struct Ext3(u8);
533    ///
534    /// let ext1a = Arc::new(Ext1(10));
535    /// let ext1b = Arc::new(Ext1(11));
536    /// let ext2 = Arc::new(Ext2(2));
537    ///
538    /// let mut cfg = SessionConfig::default();
539    ///
540    /// // will only remember the last Ext1
541    /// cfg.set_extension(Arc::clone(&ext1a));
542    /// cfg.set_extension(Arc::clone(&ext1b));
543    /// cfg.set_extension(Arc::clone(&ext2));
544    ///
545    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
546    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
547    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
548    ///
549    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
550    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
551    ///
552    /// assert!(cfg.get_extension::<Ext3>().is_none());
553    /// ```
554    pub fn set_extension<T>(&mut self, ext: Arc<T>)
555    where
556        T: Send + Sync + 'static,
557    {
558        let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
559        let id = TypeId::of::<T>();
560        self.extensions.insert(id, ext);
561    }
562
563    /// Get extension, if any for the specified type `T` exists.
564    ///
565    /// See [`with_extension`](Self::with_extension) on how to add attach extensions.
566    pub fn get_extension<T>(&self) -> Option<Arc<T>>
567    where
568        T: Send + Sync + 'static,
569    {
570        let id = TypeId::of::<T>();
571        self.extensions
572            .get(&id)
573            .cloned()
574            .map(|ext| Arc::downcast(ext).expect("TypeId unique"))
575    }
576}
577
578impl From<ConfigOptions> for SessionConfig {
579    fn from(options: ConfigOptions) -> Self {
580        Self {
581            options,
582            ..Default::default()
583        }
584    }
585}
586
587/// Map that holds opaque objects indexed by their type.
588///
589/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
590///
591/// [object safe]: https://doc.rust-lang.org/reference/items/traits.html#object-safety
592type AnyMap =
593    HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>, BuildHasherDefault<IdHasher>>;
594
595/// Hasher for [`AnyMap`].
596///
597/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, coming from the compiler.
598/// The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then returns it, instead of doing any bit fiddling.
599#[derive(Default)]
600struct IdHasher(u64);
601
602impl Hasher for IdHasher {
603    fn write(&mut self, _: &[u8]) {
604        unreachable!("TypeId calls write_u64");
605    }
606
607    #[inline]
608    fn write_u64(&mut self, id: u64) {
609        self.0 = id;
610    }
611
612    #[inline]
613    fn finish(&self) -> u64 {
614        self.0
615    }
616}