datafusion_execution/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    any::{Any, TypeId},
20    collections::HashMap,
21    hash::{BuildHasherDefault, Hasher},
22    sync::Arc,
23};
24
25use datafusion_common::{
26    config::{ConfigExtension, ConfigOptions, SpillCompression},
27    Result, ScalarValue,
28};
29
30/// Configuration options for [`SessionContext`].
31///
32/// Can be passed to [`SessionContext::new_with_config`] to customize the configuration of DataFusion.
33///
34/// Options can be set using namespaces keys with `.` as the separator, where the
35/// namespace determines which configuration struct the value to routed to. All
36/// built-in options are under the `datafusion` namespace.
37///
38/// For example, the key `datafusion.execution.batch_size` will set [ExecutionOptions::batch_size][datafusion_common::config::ExecutionOptions::batch_size],
39/// because [ConfigOptions::execution] is [ExecutionOptions][datafusion_common::config::ExecutionOptions]. Similarly, the key
40/// `datafusion.execution.parquet.pushdown_filters` will set [ParquetOptions::pushdown_filters][datafusion_common::config::ParquetOptions::pushdown_filters],
41/// since [ExecutionOptions::parquet][datafusion_common::config::ExecutionOptions::parquet] is [ParquetOptions][datafusion_common::config::ParquetOptions].
42///
43/// Some options have convenience methods. For example [SessionConfig::with_batch_size] is
44/// shorthand for setting `datafusion.execution.batch_size`.
45///
46/// ```
47/// use datafusion_execution::config::SessionConfig;
48/// use datafusion_common::ScalarValue;
49///
50/// let config = SessionConfig::new()
51///    .set("datafusion.execution.batch_size", &ScalarValue::UInt64(Some(1234)))
52///    .set_bool("datafusion.execution.parquet.pushdown_filters", true);
53///
54/// assert_eq!(config.batch_size(), 1234);
55/// assert_eq!(config.options().execution.batch_size, 1234);
56/// assert_eq!(config.options().execution.parquet.pushdown_filters, true);
57/// ```
58///
59/// You can also directly mutate the options via [SessionConfig::options_mut].
60/// So the following is equivalent to the above:
61///
62/// ```
63/// # use datafusion_execution::config::SessionConfig;
64/// # use datafusion_common::ScalarValue;
65/// #
66/// let mut config = SessionConfig::new();
67/// config.options_mut().execution.batch_size = 1234;
68/// config.options_mut().execution.parquet.pushdown_filters = true;
69/// #
70/// # assert_eq!(config.batch_size(), 1234);
71/// # assert_eq!(config.options().execution.batch_size, 1234);
72/// # assert_eq!(config.options().execution.parquet.pushdown_filters, true);
73/// ```
74///
75/// ## Built-in options
76///
77/// | Namespace | Config struct |
78/// | --------- | ------------- |
79/// | `datafusion.catalog` | [CatalogOptions][datafusion_common::config::CatalogOptions] |
80/// | `datafusion.execution` | [ExecutionOptions][datafusion_common::config::ExecutionOptions] |
81/// | `datafusion.execution.parquet` | [ParquetOptions][datafusion_common::config::ParquetOptions] |
82/// | `datafusion.optimizer` | [OptimizerOptions][datafusion_common::config::OptimizerOptions] |
83/// | `datafusion.sql_parser` | [SqlParserOptions][datafusion_common::config::SqlParserOptions] |
84/// | `datafusion.explain` | [ExplainOptions][datafusion_common::config::ExplainOptions] |
85///
86/// ## Custom configuration
87///
88/// Configuration options can be extended. See [SessionConfig::with_extension] for details.
89///
90/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
91/// [`SessionContext::new_with_config`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.new_with_config
92#[derive(Clone, Debug)]
93pub struct SessionConfig {
94    /// Configuration options for the current session.
95    ///
96    /// A new copy is created on write, if there are other outstanding
97    /// references to the same options.
98    options: Arc<ConfigOptions>,
99    /// Opaque extensions.
100    extensions: AnyMap,
101}
102
103impl Default for SessionConfig {
104    fn default() -> Self {
105        Self {
106            options: Arc::new(ConfigOptions::new()),
107            // Assume no extensions by default.
108            extensions: HashMap::with_capacity_and_hasher(
109                0,
110                BuildHasherDefault::default(),
111            ),
112        }
113    }
114}
115
116impl SessionConfig {
117    /// Create an execution config with default setting
118    pub fn new() -> Self {
119        Default::default()
120    }
121
122    /// Create an execution config with config options read from the environment
123    ///
124    /// See [`ConfigOptions::from_env`] for details on how environment variables
125    /// are mapped to config options.
126    pub fn from_env() -> Result<Self> {
127        Ok(ConfigOptions::from_env()?.into())
128    }
129
130    /// Create new ConfigOptions struct, taking values from a string hash map.
131    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
132        Ok(ConfigOptions::from_string_hash_map(settings)?.into())
133    }
134
135    /// Return a handle to the configuration options.
136    ///
137    /// Can be used to read the current configuration.
138    ///
139    /// ```
140    /// use datafusion_execution::config::SessionConfig;
141    ///
142    /// let config = SessionConfig::new();
143    /// assert!(config.options().execution.batch_size > 0);
144    /// ```
145    pub fn options(&self) -> &Arc<ConfigOptions> {
146        &self.options
147    }
148
149    /// Return a mutable handle to the configuration options.
150    ///
151    /// Can be used to set configuration options.
152    ///
153    /// ```
154    /// use datafusion_execution::config::SessionConfig;
155    ///
156    /// let mut config = SessionConfig::new();
157    /// config.options_mut().execution.batch_size = 1024;
158    /// assert_eq!(config.options().execution.batch_size, 1024);
159    /// ```
160    pub fn options_mut(&mut self) -> &mut ConfigOptions {
161        Arc::make_mut(&mut self.options)
162    }
163
164    /// Set a configuration option
165    pub fn set(self, key: &str, value: &ScalarValue) -> Self {
166        self.set_str(key, &value.to_string())
167    }
168
169    /// Set a boolean configuration option
170    pub fn set_bool(self, key: &str, value: bool) -> Self {
171        self.set_str(key, &value.to_string())
172    }
173
174    /// Set a generic `u64` configuration option
175    pub fn set_u64(self, key: &str, value: u64) -> Self {
176        self.set_str(key, &value.to_string())
177    }
178
179    /// Set a generic `usize` configuration option
180    pub fn set_usize(self, key: &str, value: usize) -> Self {
181        self.set_str(key, &value.to_string())
182    }
183
184    /// Set a generic `str` configuration option
185    pub fn set_str(mut self, key: &str, value: &str) -> Self {
186        self.options_mut().set(key, value).unwrap();
187        self
188    }
189
190    /// Customize batch size
191    pub fn with_batch_size(mut self, n: usize) -> Self {
192        // batch size must be greater than zero
193        assert!(n > 0);
194        self.options_mut().execution.batch_size = n;
195        self
196    }
197
198    /// Customize [`target_partitions`]
199    ///
200    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
201    pub fn with_target_partitions(mut self, n: usize) -> Self {
202        self.options_mut().execution.target_partitions = if n == 0 {
203            datafusion_common::config::ExecutionOptions::default().target_partitions
204        } else {
205            n
206        };
207        self
208    }
209
210    /// Insert new [ConfigExtension]
211    pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
212        self.options_mut().extensions.insert(extension);
213        self
214    }
215
216    /// Get [`target_partitions`]
217    ///
218    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
219    pub fn target_partitions(&self) -> usize {
220        self.options.execution.target_partitions
221    }
222
223    /// Is the information schema enabled?
224    pub fn information_schema(&self) -> bool {
225        self.options.catalog.information_schema
226    }
227
228    /// Should the context create the default catalog and schema?
229    pub fn create_default_catalog_and_schema(&self) -> bool {
230        self.options.catalog.create_default_catalog_and_schema
231    }
232
233    /// Are joins repartitioned during execution?
234    pub fn repartition_joins(&self) -> bool {
235        self.options.optimizer.repartition_joins
236    }
237
238    /// Are aggregates repartitioned during execution?
239    pub fn repartition_aggregations(&self) -> bool {
240        self.options.optimizer.repartition_aggregations
241    }
242
243    /// Are window functions repartitioned during execution?
244    pub fn repartition_window_functions(&self) -> bool {
245        self.options.optimizer.repartition_windows
246    }
247
248    /// Do we execute sorts in a per-partition fashion and merge afterwards,
249    /// or do we coalesce partitions first and sort globally?
250    pub fn repartition_sorts(&self) -> bool {
251        self.options.optimizer.repartition_sorts
252    }
253
254    /// Prefer existing sort (true) or maximize parallelism (false). See
255    /// [prefer_existing_sort] for more details
256    ///
257    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
258    pub fn prefer_existing_sort(&self) -> bool {
259        self.options.optimizer.prefer_existing_sort
260    }
261
262    /// Are statistics collected during execution?
263    pub fn collect_statistics(&self) -> bool {
264        self.options.execution.collect_statistics
265    }
266
267    /// Compression codec for spill file
268    pub fn spill_compression(&self) -> SpillCompression {
269        self.options.execution.spill_compression
270    }
271
272    /// Selects a name for the default catalog and schema
273    pub fn with_default_catalog_and_schema(
274        mut self,
275        catalog: impl Into<String>,
276        schema: impl Into<String>,
277    ) -> Self {
278        self.options_mut().catalog.default_catalog = catalog.into();
279        self.options_mut().catalog.default_schema = schema.into();
280        self
281    }
282
283    /// Controls whether the default catalog and schema will be automatically created
284    pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
285        self.options_mut().catalog.create_default_catalog_and_schema = create;
286        self
287    }
288
289    /// Enables or disables the inclusion of `information_schema` virtual tables
290    pub fn with_information_schema(mut self, enabled: bool) -> Self {
291        self.options_mut().catalog.information_schema = enabled;
292        self
293    }
294
295    /// Enables or disables the use of repartitioning for joins to improve parallelism
296    pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
297        self.options_mut().optimizer.repartition_joins = enabled;
298        self
299    }
300
301    /// Enables or disables the use of repartitioning for aggregations to improve parallelism
302    pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
303        self.options_mut().optimizer.repartition_aggregations = enabled;
304        self
305    }
306
307    /// Sets minimum file range size for repartitioning scans
308    pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
309        self.options_mut().optimizer.repartition_file_min_size = size;
310        self
311    }
312
313    /// Enables or disables the allowing unordered symmetric hash join
314    pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
315        self.options_mut()
316            .optimizer
317            .allow_symmetric_joins_without_pruning = enabled;
318        self
319    }
320
321    /// Enables or disables the use of repartitioning for file scans
322    pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
323        self.options_mut().optimizer.repartition_file_scans = enabled;
324        self
325    }
326
327    /// Enables or disables the use of repartitioning for window functions to improve parallelism
328    pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
329        self.options_mut().optimizer.repartition_windows = enabled;
330        self
331    }
332
333    /// Enables or disables the use of per-partition sorting to improve parallelism
334    pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
335        self.options_mut().optimizer.repartition_sorts = enabled;
336        self
337    }
338
339    /// Prefer existing sort (true) or maximize parallelism (false). See
340    /// [prefer_existing_sort] for more details
341    ///
342    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
343    pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
344        self.options_mut().optimizer.prefer_existing_sort = enabled;
345        self
346    }
347
348    /// Prefer existing union (true). See [prefer_existing_union] for more details
349    ///
350    /// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
351    pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
352        self.options_mut().optimizer.prefer_existing_union = enabled;
353        self
354    }
355
356    /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
357    pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
358        self.options_mut().execution.parquet.pruning = enabled;
359        self
360    }
361
362    /// Returns true if pruning predicate should be used to skip parquet row groups
363    pub fn parquet_pruning(&self) -> bool {
364        self.options.execution.parquet.pruning
365    }
366
367    /// Returns true if bloom filter should be used to skip parquet row groups
368    pub fn parquet_bloom_filter_pruning(&self) -> bool {
369        self.options.execution.parquet.bloom_filter_on_read
370    }
371
372    /// Enables or disables the use of bloom filter for parquet readers to skip row groups
373    pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
374        self.options_mut().execution.parquet.bloom_filter_on_read = enabled;
375        self
376    }
377
378    /// Returns true if page index should be used to skip parquet data pages
379    pub fn parquet_page_index_pruning(&self) -> bool {
380        self.options.execution.parquet.enable_page_index
381    }
382
383    /// Enables or disables the use of page index for parquet readers to skip parquet data pages
384    pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
385        self.options_mut().execution.parquet.enable_page_index = enabled;
386        self
387    }
388
389    /// Enables or disables the collection of statistics after listing files
390    pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
391        self.options_mut().execution.collect_statistics = enabled;
392        self
393    }
394
395    /// Get the currently configured batch size
396    pub fn batch_size(&self) -> usize {
397        self.options.execution.batch_size
398    }
399
400    /// Enables or disables the coalescence of small batches into larger batches
401    pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
402        self.options_mut().execution.coalesce_batches = enabled;
403        self
404    }
405
406    /// Returns true if record batches will be examined between each operator
407    /// and small batches will be coalesced into larger batches.
408    pub fn coalesce_batches(&self) -> bool {
409        self.options.execution.coalesce_batches
410    }
411
412    /// Enables or disables the round robin repartition for increasing parallelism
413    pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
414        self.options_mut().optimizer.enable_round_robin_repartition = enabled;
415        self
416    }
417
418    /// Returns true if the physical plan optimizer will try to
419    /// add round robin repartition to increase parallelism to leverage more CPU cores.
420    pub fn round_robin_repartition(&self) -> bool {
421        self.options.optimizer.enable_round_robin_repartition
422    }
423
424    /// Set the size of [`sort_spill_reservation_bytes`] to control
425    /// memory pre-reservation
426    ///
427    /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
428    pub fn with_sort_spill_reservation_bytes(
429        mut self,
430        sort_spill_reservation_bytes: usize,
431    ) -> Self {
432        self.options_mut().execution.sort_spill_reservation_bytes =
433            sort_spill_reservation_bytes;
434        self
435    }
436
437    /// Set the compression codec [`spill_compression`] used when spilling data to disk.
438    ///
439    /// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression
440    pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self {
441        self.options_mut().execution.spill_compression = spill_compression;
442        self
443    }
444
445    /// Set the size of [`sort_in_place_threshold_bytes`] to control
446    /// how sort does things.
447    ///
448    /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
449    pub fn with_sort_in_place_threshold_bytes(
450        mut self,
451        sort_in_place_threshold_bytes: usize,
452    ) -> Self {
453        self.options_mut().execution.sort_in_place_threshold_bytes =
454            sort_in_place_threshold_bytes;
455        self
456    }
457
458    /// Enables or disables the enforcement of batch size in joins
459    pub fn with_enforce_batch_size_in_joins(
460        mut self,
461        enforce_batch_size_in_joins: bool,
462    ) -> Self {
463        self.options_mut().execution.enforce_batch_size_in_joins =
464            enforce_batch_size_in_joins;
465        self
466    }
467
468    /// Returns true if the joins will be enforced to output batches of the configured size
469    pub fn enforce_batch_size_in_joins(&self) -> bool {
470        self.options.execution.enforce_batch_size_in_joins
471    }
472
473    /// Convert configuration options to name-value pairs with values
474    /// converted to strings.
475    ///
476    /// Note that this method will eventually be deprecated and
477    /// replaced by [`options`].
478    ///
479    /// [`options`]: Self::options
480    pub fn to_props(&self) -> HashMap<String, String> {
481        let mut map = HashMap::new();
482        // copy configs from config_options
483        for entry in self.options.entries() {
484            map.insert(entry.key, entry.value.unwrap_or_default());
485        }
486
487        map
488    }
489
490    /// Add extensions.
491    ///
492    /// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
493    /// Extensions are opaque and the types are unknown to DataFusion itself, which makes them extremely flexible. [^1]
494    ///
495    /// Extensions are stored within an [`Arc`] so they do NOT require [`Clone`]. The are immutable. If you need to
496    /// modify their state over their lifetime -- e.g. for caches -- you need to establish some for of interior mutability.
497    ///
498    /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
499    /// will be kept.
500    ///
501    /// You may use [`get_extension`](Self::get_extension) to retrieve extensions.
502    ///
503    /// # Example
504    /// ```
505    /// use std::sync::Arc;
506    /// use datafusion_execution::config::SessionConfig;
507    ///
508    /// // application-specific extension types
509    /// struct Ext1(u8);
510    /// struct Ext2(u8);
511    /// struct Ext3(u8);
512    ///
513    /// let ext1a = Arc::new(Ext1(10));
514    /// let ext1b = Arc::new(Ext1(11));
515    /// let ext2 = Arc::new(Ext2(2));
516    ///
517    /// let cfg = SessionConfig::default()
518    ///     // will only remember the last Ext1
519    ///     .with_extension(Arc::clone(&ext1a))
520    ///     .with_extension(Arc::clone(&ext1b))
521    ///     .with_extension(Arc::clone(&ext2));
522    ///
523    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
524    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
525    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
526    ///
527    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
528    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
529    ///
530    /// assert!(cfg.get_extension::<Ext3>().is_none());
531    /// ```
532    ///
533    /// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
534    pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
535    where
536        T: Send + Sync + 'static,
537    {
538        self.set_extension(ext);
539        self
540    }
541
542    /// Set extension. Pretty much the same as [`with_extension`](Self::with_extension), but take
543    /// mutable reference instead of owning it. Useful if you want to add another extension after
544    /// the [`SessionConfig`] is created.
545    ///
546    /// # Example
547    /// ```
548    /// use std::sync::Arc;
549    /// use datafusion_execution::config::SessionConfig;
550    ///
551    /// // application-specific extension types
552    /// struct Ext1(u8);
553    /// struct Ext2(u8);
554    /// struct Ext3(u8);
555    ///
556    /// let ext1a = Arc::new(Ext1(10));
557    /// let ext1b = Arc::new(Ext1(11));
558    /// let ext2 = Arc::new(Ext2(2));
559    ///
560    /// let mut cfg = SessionConfig::default();
561    ///
562    /// // will only remember the last Ext1
563    /// cfg.set_extension(Arc::clone(&ext1a));
564    /// cfg.set_extension(Arc::clone(&ext1b));
565    /// cfg.set_extension(Arc::clone(&ext2));
566    ///
567    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
568    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
569    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
570    ///
571    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
572    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
573    ///
574    /// assert!(cfg.get_extension::<Ext3>().is_none());
575    /// ```
576    pub fn set_extension<T>(&mut self, ext: Arc<T>)
577    where
578        T: Send + Sync + 'static,
579    {
580        let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
581        let id = TypeId::of::<T>();
582        self.extensions.insert(id, ext);
583    }
584
585    /// Get extension, if any for the specified type `T` exists.
586    ///
587    /// See [`with_extension`](Self::with_extension) on how to add attach extensions.
588    pub fn get_extension<T>(&self) -> Option<Arc<T>>
589    where
590        T: Send + Sync + 'static,
591    {
592        let id = TypeId::of::<T>();
593        self.extensions
594            .get(&id)
595            .cloned()
596            .map(|ext| Arc::downcast(ext).expect("TypeId unique"))
597    }
598}
599
600impl From<ConfigOptions> for SessionConfig {
601    fn from(options: ConfigOptions) -> Self {
602        let options = Arc::new(options);
603        Self {
604            options,
605            ..Default::default()
606        }
607    }
608}
609
610/// Map that holds opaque objects indexed by their type.
611///
612/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
613///
614/// [object safe]: https://doc.rust-lang.org/reference/items/traits.html#object-safety
615type AnyMap =
616    HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>, BuildHasherDefault<IdHasher>>;
617
618/// Hasher for [`AnyMap`].
619///
620/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, coming from the compiler.
621/// The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then returns it, instead of doing any bit fiddling.
622#[derive(Default)]
623struct IdHasher(u64);
624
625impl Hasher for IdHasher {
626    fn write(&mut self, _: &[u8]) {
627        unreachable!("TypeId calls write_u64");
628    }
629
630    #[inline]
631    fn write_u64(&mut self, id: u64) {
632        self.0 = id;
633    }
634
635    #[inline]
636    fn finish(&self) -> u64 {
637        self.0
638    }
639}