datafusion_execution/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    any::{Any, TypeId},
20    collections::HashMap,
21    hash::{BuildHasherDefault, Hasher},
22    sync::Arc,
23};
24
25use datafusion_common::{
26    config::{ConfigExtension, ConfigOptions, SpillCompression},
27    Result, ScalarValue,
28};
29
30/// Configuration options for [`SessionContext`].
31///
32/// Can be passed to [`SessionContext::new_with_config`] to customize the configuration of DataFusion.
33///
34/// Options can be set using namespaces keys with `.` as the separator, where the
35/// namespace determines which configuration struct the value to routed to. All
36/// built-in options are under the `datafusion` namespace.
37///
38/// For example, the key `datafusion.execution.batch_size` will set [ExecutionOptions::batch_size][datafusion_common::config::ExecutionOptions::batch_size],
39/// because [ConfigOptions::execution] is [ExecutionOptions][datafusion_common::config::ExecutionOptions]. Similarly, the key
40/// `datafusion.execution.parquet.pushdown_filters` will set [ParquetOptions::pushdown_filters][datafusion_common::config::ParquetOptions::pushdown_filters],
41/// since [ExecutionOptions::parquet][datafusion_common::config::ExecutionOptions::parquet] is [ParquetOptions][datafusion_common::config::ParquetOptions].
42///
43/// Some options have convenience methods. For example [SessionConfig::with_batch_size] is
44/// shorthand for setting `datafusion.execution.batch_size`.
45///
46/// ```
47/// use datafusion_common::ScalarValue;
48/// use datafusion_execution::config::SessionConfig;
49///
50/// let config = SessionConfig::new()
51///     .set(
52///         "datafusion.execution.batch_size",
53///         &ScalarValue::UInt64(Some(1234)),
54///     )
55///     .set_bool("datafusion.execution.parquet.pushdown_filters", true);
56///
57/// assert_eq!(config.batch_size(), 1234);
58/// assert_eq!(config.options().execution.batch_size, 1234);
59/// assert_eq!(config.options().execution.parquet.pushdown_filters, true);
60/// ```
61///
62/// You can also directly mutate the options via [SessionConfig::options_mut].
63/// So the following is equivalent to the above:
64///
65/// ```
66/// # use datafusion_execution::config::SessionConfig;
67/// # use datafusion_common::ScalarValue;
68/// #
69/// let mut config = SessionConfig::new();
70/// config.options_mut().execution.batch_size = 1234;
71/// config.options_mut().execution.parquet.pushdown_filters = true;
72/// #
73/// # assert_eq!(config.batch_size(), 1234);
74/// # assert_eq!(config.options().execution.batch_size, 1234);
75/// # assert_eq!(config.options().execution.parquet.pushdown_filters, true);
76/// ```
77///
78/// ## Built-in options
79///
80/// | Namespace | Config struct |
81/// | --------- | ------------- |
82/// | `datafusion.catalog` | [CatalogOptions][datafusion_common::config::CatalogOptions] |
83/// | `datafusion.execution` | [ExecutionOptions][datafusion_common::config::ExecutionOptions] |
84/// | `datafusion.execution.parquet` | [ParquetOptions][datafusion_common::config::ParquetOptions] |
85/// | `datafusion.optimizer` | [OptimizerOptions][datafusion_common::config::OptimizerOptions] |
86/// | `datafusion.sql_parser` | [SqlParserOptions][datafusion_common::config::SqlParserOptions] |
87/// | `datafusion.explain` | [ExplainOptions][datafusion_common::config::ExplainOptions] |
88///
89/// ## Custom configuration
90///
91/// Configuration options can be extended. See [SessionConfig::with_extension] for details.
92///
93/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
94/// [`SessionContext::new_with_config`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.new_with_config
95#[derive(Clone, Debug)]
96pub struct SessionConfig {
97    /// Configuration options for the current session.
98    ///
99    /// A new copy is created on write, if there are other outstanding
100    /// references to the same options.
101    options: Arc<ConfigOptions>,
102    /// Opaque extensions.
103    extensions: AnyMap,
104}
105
106impl Default for SessionConfig {
107    fn default() -> Self {
108        Self {
109            options: Arc::new(ConfigOptions::new()),
110            // Assume no extensions by default.
111            extensions: HashMap::with_capacity_and_hasher(
112                0,
113                BuildHasherDefault::default(),
114            ),
115        }
116    }
117}
118
119impl SessionConfig {
120    /// Create an execution config with default setting
121    pub fn new() -> Self {
122        Default::default()
123    }
124
125    /// Create an execution config with config options read from the environment
126    ///
127    /// See [`ConfigOptions::from_env`] for details on how environment variables
128    /// are mapped to config options.
129    pub fn from_env() -> Result<Self> {
130        Ok(ConfigOptions::from_env()?.into())
131    }
132
133    /// Create new ConfigOptions struct, taking values from a string hash map.
134    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
135        Ok(ConfigOptions::from_string_hash_map(settings)?.into())
136    }
137
138    /// Return a handle to the configuration options.
139    ///
140    /// Can be used to read the current configuration.
141    ///
142    /// ```
143    /// use datafusion_execution::config::SessionConfig;
144    ///
145    /// let config = SessionConfig::new();
146    /// assert!(config.options().execution.batch_size > 0);
147    /// ```
148    pub fn options(&self) -> &Arc<ConfigOptions> {
149        &self.options
150    }
151
152    /// Return a mutable handle to the configuration options.
153    ///
154    /// Can be used to set configuration options.
155    ///
156    /// ```
157    /// use datafusion_execution::config::SessionConfig;
158    ///
159    /// let mut config = SessionConfig::new();
160    /// config.options_mut().execution.batch_size = 1024;
161    /// assert_eq!(config.options().execution.batch_size, 1024);
162    /// ```
163    pub fn options_mut(&mut self) -> &mut ConfigOptions {
164        Arc::make_mut(&mut self.options)
165    }
166
167    /// Set a configuration option
168    pub fn set(self, key: &str, value: &ScalarValue) -> Self {
169        self.set_str(key, &value.to_string())
170    }
171
172    /// Set a boolean configuration option
173    pub fn set_bool(self, key: &str, value: bool) -> Self {
174        self.set_str(key, &value.to_string())
175    }
176
177    /// Set a generic `u64` configuration option
178    pub fn set_u64(self, key: &str, value: u64) -> Self {
179        self.set_str(key, &value.to_string())
180    }
181
182    /// Set a generic `usize` configuration option
183    pub fn set_usize(self, key: &str, value: usize) -> Self {
184        self.set_str(key, &value.to_string())
185    }
186
187    /// Set a generic `str` configuration option
188    pub fn set_str(mut self, key: &str, value: &str) -> Self {
189        self.options_mut().set(key, value).unwrap();
190        self
191    }
192
193    /// Customize batch size
194    pub fn with_batch_size(mut self, n: usize) -> Self {
195        // batch size must be greater than zero
196        assert!(n > 0);
197        self.options_mut().execution.batch_size = n;
198        self
199    }
200
201    /// Customize [`target_partitions`]
202    ///
203    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
204    pub fn with_target_partitions(mut self, n: usize) -> Self {
205        self.options_mut().execution.target_partitions = if n == 0 {
206            datafusion_common::config::ExecutionOptions::default().target_partitions
207        } else {
208            n
209        };
210        self
211    }
212
213    /// Insert new [ConfigExtension]
214    pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
215        self.options_mut().extensions.insert(extension);
216        self
217    }
218
219    /// Get [`target_partitions`]
220    ///
221    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
222    pub fn target_partitions(&self) -> usize {
223        self.options.execution.target_partitions
224    }
225
226    /// Is the information schema enabled?
227    pub fn information_schema(&self) -> bool {
228        self.options.catalog.information_schema
229    }
230
231    /// Should the context create the default catalog and schema?
232    pub fn create_default_catalog_and_schema(&self) -> bool {
233        self.options.catalog.create_default_catalog_and_schema
234    }
235
236    /// Are joins repartitioned during execution?
237    pub fn repartition_joins(&self) -> bool {
238        self.options.optimizer.repartition_joins
239    }
240
241    /// Are aggregates repartitioned during execution?
242    pub fn repartition_aggregations(&self) -> bool {
243        self.options.optimizer.repartition_aggregations
244    }
245
246    /// Are window functions repartitioned during execution?
247    pub fn repartition_window_functions(&self) -> bool {
248        self.options.optimizer.repartition_windows
249    }
250
251    /// Do we execute sorts in a per-partition fashion and merge afterwards,
252    /// or do we coalesce partitions first and sort globally?
253    pub fn repartition_sorts(&self) -> bool {
254        self.options.optimizer.repartition_sorts
255    }
256
257    /// Prefer existing sort (true) or maximize parallelism (false). See
258    /// [prefer_existing_sort] for more details
259    ///
260    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
261    pub fn prefer_existing_sort(&self) -> bool {
262        self.options.optimizer.prefer_existing_sort
263    }
264
265    /// Are statistics collected during execution?
266    pub fn collect_statistics(&self) -> bool {
267        self.options.execution.collect_statistics
268    }
269
270    /// Compression codec for spill file
271    pub fn spill_compression(&self) -> SpillCompression {
272        self.options.execution.spill_compression
273    }
274
275    /// Selects a name for the default catalog and schema
276    pub fn with_default_catalog_and_schema(
277        mut self,
278        catalog: impl Into<String>,
279        schema: impl Into<String>,
280    ) -> Self {
281        self.options_mut().catalog.default_catalog = catalog.into();
282        self.options_mut().catalog.default_schema = schema.into();
283        self
284    }
285
286    /// Controls whether the default catalog and schema will be automatically created
287    pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
288        self.options_mut().catalog.create_default_catalog_and_schema = create;
289        self
290    }
291
292    /// Enables or disables the inclusion of `information_schema` virtual tables
293    pub fn with_information_schema(mut self, enabled: bool) -> Self {
294        self.options_mut().catalog.information_schema = enabled;
295        self
296    }
297
298    /// Enables or disables the use of repartitioning for joins to improve parallelism
299    pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
300        self.options_mut().optimizer.repartition_joins = enabled;
301        self
302    }
303
304    /// Enables or disables the use of repartitioning for aggregations to improve parallelism
305    pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
306        self.options_mut().optimizer.repartition_aggregations = enabled;
307        self
308    }
309
310    /// Sets minimum file range size for repartitioning scans
311    pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
312        self.options_mut().optimizer.repartition_file_min_size = size;
313        self
314    }
315
316    /// Enables or disables the allowing unordered symmetric hash join
317    pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
318        self.options_mut()
319            .optimizer
320            .allow_symmetric_joins_without_pruning = enabled;
321        self
322    }
323
324    /// Enables or disables the use of repartitioning for file scans
325    pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
326        self.options_mut().optimizer.repartition_file_scans = enabled;
327        self
328    }
329
330    /// Enables or disables the use of repartitioning for window functions to improve parallelism
331    pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
332        self.options_mut().optimizer.repartition_windows = enabled;
333        self
334    }
335
336    /// Enables or disables the use of per-partition sorting to improve parallelism
337    pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
338        self.options_mut().optimizer.repartition_sorts = enabled;
339        self
340    }
341
342    /// Prefer existing sort (true) or maximize parallelism (false). See
343    /// [prefer_existing_sort] for more details
344    ///
345    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
346    pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
347        self.options_mut().optimizer.prefer_existing_sort = enabled;
348        self
349    }
350
351    /// Prefer existing union (true). See [prefer_existing_union] for more details
352    ///
353    /// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
354    pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
355        self.options_mut().optimizer.prefer_existing_union = enabled;
356        self
357    }
358
359    /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
360    pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
361        self.options_mut().execution.parquet.pruning = enabled;
362        self
363    }
364
365    /// Returns true if pruning predicate should be used to skip parquet row groups
366    pub fn parquet_pruning(&self) -> bool {
367        self.options.execution.parquet.pruning
368    }
369
370    /// Returns true if bloom filter should be used to skip parquet row groups
371    pub fn parquet_bloom_filter_pruning(&self) -> bool {
372        self.options.execution.parquet.bloom_filter_on_read
373    }
374
375    /// Enables or disables the use of bloom filter for parquet readers to skip row groups
376    pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
377        self.options_mut().execution.parquet.bloom_filter_on_read = enabled;
378        self
379    }
380
381    /// Returns true if page index should be used to skip parquet data pages
382    pub fn parquet_page_index_pruning(&self) -> bool {
383        self.options.execution.parquet.enable_page_index
384    }
385
386    /// Enables or disables the use of page index for parquet readers to skip parquet data pages
387    pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
388        self.options_mut().execution.parquet.enable_page_index = enabled;
389        self
390    }
391
392    /// Enables or disables the collection of statistics after listing files
393    pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
394        self.options_mut().execution.collect_statistics = enabled;
395        self
396    }
397
398    /// Get the currently configured batch size
399    pub fn batch_size(&self) -> usize {
400        self.options.execution.batch_size
401    }
402
403    /// Enables or disables the coalescence of small batches into larger batches
404    pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
405        self.options_mut().execution.coalesce_batches = enabled;
406        self
407    }
408
409    /// Returns true if record batches will be examined between each operator
410    /// and small batches will be coalesced into larger batches.
411    pub fn coalesce_batches(&self) -> bool {
412        self.options.execution.coalesce_batches
413    }
414
415    /// Enables or disables the round robin repartition for increasing parallelism
416    pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
417        self.options_mut().optimizer.enable_round_robin_repartition = enabled;
418        self
419    }
420
421    /// Returns true if the physical plan optimizer will try to
422    /// add round robin repartition to increase parallelism to leverage more CPU cores.
423    pub fn round_robin_repartition(&self) -> bool {
424        self.options.optimizer.enable_round_robin_repartition
425    }
426
427    /// Set the size of [`sort_spill_reservation_bytes`] to control
428    /// memory pre-reservation
429    ///
430    /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
431    pub fn with_sort_spill_reservation_bytes(
432        mut self,
433        sort_spill_reservation_bytes: usize,
434    ) -> Self {
435        self.options_mut().execution.sort_spill_reservation_bytes =
436            sort_spill_reservation_bytes;
437        self
438    }
439
440    /// Set the compression codec [`spill_compression`] used when spilling data to disk.
441    ///
442    /// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression
443    pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self {
444        self.options_mut().execution.spill_compression = spill_compression;
445        self
446    }
447
448    /// Set the size of [`sort_in_place_threshold_bytes`] to control
449    /// how sort does things.
450    ///
451    /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
452    pub fn with_sort_in_place_threshold_bytes(
453        mut self,
454        sort_in_place_threshold_bytes: usize,
455    ) -> Self {
456        self.options_mut().execution.sort_in_place_threshold_bytes =
457            sort_in_place_threshold_bytes;
458        self
459    }
460
461    /// Enables or disables the enforcement of batch size in joins
462    pub fn with_enforce_batch_size_in_joins(
463        mut self,
464        enforce_batch_size_in_joins: bool,
465    ) -> Self {
466        self.options_mut().execution.enforce_batch_size_in_joins =
467            enforce_batch_size_in_joins;
468        self
469    }
470
471    /// Returns true if the joins will be enforced to output batches of the configured size
472    pub fn enforce_batch_size_in_joins(&self) -> bool {
473        self.options.execution.enforce_batch_size_in_joins
474    }
475
476    /// Convert configuration options to name-value pairs with values
477    /// converted to strings.
478    ///
479    /// Note that this method will eventually be deprecated and
480    /// replaced by [`options`].
481    ///
482    /// [`options`]: Self::options
483    pub fn to_props(&self) -> HashMap<String, String> {
484        let mut map = HashMap::new();
485        // copy configs from config_options
486        for entry in self.options.entries() {
487            map.insert(entry.key, entry.value.unwrap_or_default());
488        }
489
490        map
491    }
492
493    /// Add extensions.
494    ///
495    /// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
496    /// Extensions are opaque and the types are unknown to DataFusion itself, which makes them extremely flexible. [^1]
497    ///
498    /// Extensions are stored within an [`Arc`] so they do NOT require [`Clone`]. The are immutable. If you need to
499    /// modify their state over their lifetime -- e.g. for caches -- you need to establish some for of interior mutability.
500    ///
501    /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
502    /// will be kept.
503    ///
504    /// You may use [`get_extension`](Self::get_extension) to retrieve extensions.
505    ///
506    /// # Example
507    /// ```
508    /// use datafusion_execution::config::SessionConfig;
509    /// use std::sync::Arc;
510    ///
511    /// // application-specific extension types
512    /// struct Ext1(u8);
513    /// struct Ext2(u8);
514    /// struct Ext3(u8);
515    ///
516    /// let ext1a = Arc::new(Ext1(10));
517    /// let ext1b = Arc::new(Ext1(11));
518    /// let ext2 = Arc::new(Ext2(2));
519    ///
520    /// let cfg = SessionConfig::default()
521    ///     // will only remember the last Ext1
522    ///     .with_extension(Arc::clone(&ext1a))
523    ///     .with_extension(Arc::clone(&ext1b))
524    ///     .with_extension(Arc::clone(&ext2));
525    ///
526    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
527    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
528    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
529    ///
530    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
531    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
532    ///
533    /// assert!(cfg.get_extension::<Ext3>().is_none());
534    /// ```
535    ///
536    /// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
537    pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
538    where
539        T: Send + Sync + 'static,
540    {
541        self.set_extension(ext);
542        self
543    }
544
545    /// Set extension. Pretty much the same as [`with_extension`](Self::with_extension), but take
546    /// mutable reference instead of owning it. Useful if you want to add another extension after
547    /// the [`SessionConfig`] is created.
548    ///
549    /// # Example
550    /// ```
551    /// use datafusion_execution::config::SessionConfig;
552    /// use std::sync::Arc;
553    ///
554    /// // application-specific extension types
555    /// struct Ext1(u8);
556    /// struct Ext2(u8);
557    /// struct Ext3(u8);
558    ///
559    /// let ext1a = Arc::new(Ext1(10));
560    /// let ext1b = Arc::new(Ext1(11));
561    /// let ext2 = Arc::new(Ext2(2));
562    ///
563    /// let mut cfg = SessionConfig::default();
564    ///
565    /// // will only remember the last Ext1
566    /// cfg.set_extension(Arc::clone(&ext1a));
567    /// cfg.set_extension(Arc::clone(&ext1b));
568    /// cfg.set_extension(Arc::clone(&ext2));
569    ///
570    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
571    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
572    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
573    ///
574    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
575    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
576    ///
577    /// assert!(cfg.get_extension::<Ext3>().is_none());
578    /// ```
579    pub fn set_extension<T>(&mut self, ext: Arc<T>)
580    where
581        T: Send + Sync + 'static,
582    {
583        let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
584        let id = TypeId::of::<T>();
585        self.extensions.insert(id, ext);
586    }
587
588    /// Get extension, if any for the specified type `T` exists.
589    ///
590    /// See [`with_extension`](Self::with_extension) on how to add attach extensions.
591    pub fn get_extension<T>(&self) -> Option<Arc<T>>
592    where
593        T: Send + Sync + 'static,
594    {
595        let id = TypeId::of::<T>();
596        self.extensions
597            .get(&id)
598            .cloned()
599            .map(|ext| Arc::downcast(ext).expect("TypeId unique"))
600    }
601}
602
603impl From<ConfigOptions> for SessionConfig {
604    fn from(options: ConfigOptions) -> Self {
605        let options = Arc::new(options);
606        Self {
607            options,
608            ..Default::default()
609        }
610    }
611}
612
613/// Map that holds opaque objects indexed by their type.
614///
615/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
616///
617/// [object safe]: https://doc.rust-lang.org/reference/items/traits.html#object-safety
618type AnyMap =
619    HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>, BuildHasherDefault<IdHasher>>;
620
621/// Hasher for [`AnyMap`].
622///
623/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, coming from the compiler.
624/// The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then returns it, instead of doing any bit fiddling.
625#[derive(Default)]
626struct IdHasher(u64);
627
628impl Hasher for IdHasher {
629    fn write(&mut self, _: &[u8]) {
630        unreachable!("TypeId calls write_u64");
631    }
632
633    #[inline]
634    fn write_u64(&mut self, id: u64) {
635        self.0 = id;
636    }
637
638    #[inline]
639    fn finish(&self) -> u64 {
640        self.0
641    }
642}