Skip to main content

datafusion_execution/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{collections::HashMap, sync::Arc};
19
20use datafusion_common::{
21    Result, ScalarValue,
22    config::{ConfigExtension, ConfigOptions, SpillCompression},
23    extensions::Extensions,
24};
25
26/// Configuration options for [`SessionContext`].
27///
28/// Can be passed to [`SessionContext::new_with_config`] to customize the configuration of DataFusion.
29///
30/// Options can be set using namespaces keys with `.` as the separator, where the
31/// namespace determines which configuration struct the value to routed to. All
32/// built-in options are under the `datafusion` namespace.
33///
34/// For example, the key `datafusion.execution.batch_size` will set [ExecutionOptions::batch_size][datafusion_common::config::ExecutionOptions::batch_size],
35/// because [ConfigOptions::execution] is [ExecutionOptions][datafusion_common::config::ExecutionOptions]. Similarly, the key
36/// `datafusion.execution.parquet.pushdown_filters` will set [ParquetOptions::pushdown_filters][datafusion_common::config::ParquetOptions::pushdown_filters],
37/// since [ExecutionOptions::parquet][datafusion_common::config::ExecutionOptions::parquet] is [ParquetOptions][datafusion_common::config::ParquetOptions].
38///
39/// Some options have convenience methods. For example [SessionConfig::with_batch_size] is
40/// shorthand for setting `datafusion.execution.batch_size`.
41///
42/// ```
43/// use datafusion_common::ScalarValue;
44/// use datafusion_execution::config::SessionConfig;
45///
46/// let config = SessionConfig::new()
47///     .set(
48///         "datafusion.execution.batch_size",
49///         &ScalarValue::UInt64(Some(1234)),
50///     )
51///     .set_bool("datafusion.execution.parquet.pushdown_filters", true);
52///
53/// assert_eq!(config.batch_size(), 1234);
54/// assert_eq!(config.options().execution.batch_size, 1234);
55/// assert_eq!(config.options().execution.parquet.pushdown_filters, true);
56/// ```
57///
58/// You can also directly mutate the options via [SessionConfig::options_mut].
59/// So the following is equivalent to the above:
60///
61/// ```
62/// # use datafusion_execution::config::SessionConfig;
63/// # use datafusion_common::ScalarValue;
64/// #
65/// let mut config = SessionConfig::new();
66/// config.options_mut().execution.batch_size = 1234;
67/// config.options_mut().execution.parquet.pushdown_filters = true;
68/// #
69/// # assert_eq!(config.batch_size(), 1234);
70/// # assert_eq!(config.options().execution.batch_size, 1234);
71/// # assert_eq!(config.options().execution.parquet.pushdown_filters, true);
72/// ```
73///
74/// ## Built-in options
75///
76/// | Namespace | Config struct |
77/// | --------- | ------------- |
78/// | `datafusion.catalog` | [CatalogOptions][datafusion_common::config::CatalogOptions] |
79/// | `datafusion.execution` | [ExecutionOptions][datafusion_common::config::ExecutionOptions] |
80/// | `datafusion.execution.parquet` | [ParquetOptions][datafusion_common::config::ParquetOptions] |
81/// | `datafusion.optimizer` | [OptimizerOptions][datafusion_common::config::OptimizerOptions] |
82/// | `datafusion.sql_parser` | [SqlParserOptions][datafusion_common::config::SqlParserOptions] |
83/// | `datafusion.explain` | [ExplainOptions][datafusion_common::config::ExplainOptions] |
84///
85/// ## Custom configuration
86///
87/// Configuration options can be extended. See [SessionConfig::with_extension] for details.
88///
89/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
90/// [`SessionContext::new_with_config`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.new_with_config
91#[derive(Clone, Debug)]
92pub struct SessionConfig {
93    /// Configuration options for the current session.
94    ///
95    /// A new copy is created on write, if there are other outstanding
96    /// references to the same options.
97    options: Arc<ConfigOptions>,
98    /// Opaque extensions, keyed by concrete Rust type. See
99    /// [`with_extension`](Self::with_extension) and
100    /// [`get_extension`](Self::get_extension).
101    extensions: Extensions,
102}
103
104impl Default for SessionConfig {
105    fn default() -> Self {
106        Self {
107            options: Arc::new(ConfigOptions::new()),
108            extensions: Extensions::new(),
109        }
110    }
111}
112
113impl SessionConfig {
114    /// Create an execution config with default setting
115    pub fn new() -> Self {
116        Default::default()
117    }
118
119    /// Create an execution config with config options read from the environment
120    ///
121    /// See [`ConfigOptions::from_env`] for details on how environment variables
122    /// are mapped to config options.
123    pub fn from_env() -> Result<Self> {
124        Ok(ConfigOptions::from_env()?.into())
125    }
126
127    /// Create new ConfigOptions struct, taking values from a string hash map.
128    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
129        Ok(ConfigOptions::from_string_hash_map(settings)?.into())
130    }
131
132    /// Return a handle to the configuration options.
133    ///
134    /// Can be used to read the current configuration.
135    ///
136    /// ```
137    /// use datafusion_execution::config::SessionConfig;
138    ///
139    /// let config = SessionConfig::new();
140    /// assert!(config.options().execution.batch_size > 0);
141    /// ```
142    pub fn options(&self) -> &Arc<ConfigOptions> {
143        &self.options
144    }
145
146    /// Return a mutable handle to the configuration options.
147    ///
148    /// Can be used to set configuration options.
149    ///
150    /// ```
151    /// use datafusion_execution::config::SessionConfig;
152    ///
153    /// let mut config = SessionConfig::new();
154    /// config.options_mut().execution.batch_size = 1024;
155    /// assert_eq!(config.options().execution.batch_size, 1024);
156    /// ```
157    pub fn options_mut(&mut self) -> &mut ConfigOptions {
158        Arc::make_mut(&mut self.options)
159    }
160
161    /// Set a configuration option
162    pub fn set(self, key: &str, value: &ScalarValue) -> Self {
163        self.set_str(key, &value.to_string())
164    }
165
166    /// Set a boolean configuration option
167    pub fn set_bool(self, key: &str, value: bool) -> Self {
168        self.set_str(key, &value.to_string())
169    }
170
171    /// Set a generic `u64` configuration option
172    pub fn set_u64(self, key: &str, value: u64) -> Self {
173        self.set_str(key, &value.to_string())
174    }
175
176    /// Set a generic `usize` configuration option
177    pub fn set_usize(self, key: &str, value: usize) -> Self {
178        self.set_str(key, &value.to_string())
179    }
180
181    /// Set a generic `str` configuration option
182    pub fn set_str(mut self, key: &str, value: &str) -> Self {
183        self.options_mut().set(key, value).unwrap();
184        self
185    }
186
187    /// Customize batch size
188    pub fn with_batch_size(mut self, n: usize) -> Self {
189        // batch size must be greater than zero
190        assert!(n > 0);
191        self.options_mut().execution.batch_size = n;
192        self
193    }
194
195    /// Customize [`target_partitions`]
196    ///
197    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
198    pub fn with_target_partitions(mut self, n: usize) -> Self {
199        self.options_mut().execution.target_partitions = if n == 0 {
200            datafusion_common::config::ExecutionOptions::default().target_partitions
201        } else {
202            n
203        };
204        self
205    }
206
207    /// Insert new [ConfigExtension]
208    pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
209        self.options_mut().extensions.insert(extension);
210        self
211    }
212
213    /// Get [`target_partitions`]
214    ///
215    /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
216    pub fn target_partitions(&self) -> usize {
217        self.options.execution.target_partitions
218    }
219
220    /// Is the information schema enabled?
221    pub fn information_schema(&self) -> bool {
222        self.options.catalog.information_schema
223    }
224
225    /// Should the context create the default catalog and schema?
226    pub fn create_default_catalog_and_schema(&self) -> bool {
227        self.options.catalog.create_default_catalog_and_schema
228    }
229
230    /// Are joins repartitioned during execution?
231    pub fn repartition_joins(&self) -> bool {
232        self.options.optimizer.repartition_joins
233    }
234
235    /// Are aggregates repartitioned during execution?
236    pub fn repartition_aggregations(&self) -> bool {
237        self.options.optimizer.repartition_aggregations
238    }
239
240    /// Are window functions repartitioned during execution?
241    pub fn repartition_window_functions(&self) -> bool {
242        self.options.optimizer.repartition_windows
243    }
244
245    /// Do we execute sorts in a per-partition fashion and merge afterwards,
246    /// or do we coalesce partitions first and sort globally?
247    pub fn repartition_sorts(&self) -> bool {
248        self.options.optimizer.repartition_sorts
249    }
250
251    /// Prefer existing sort (true) or maximize parallelism (false). See
252    /// [prefer_existing_sort] for more details
253    ///
254    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
255    pub fn prefer_existing_sort(&self) -> bool {
256        self.options.optimizer.prefer_existing_sort
257    }
258
259    /// Are statistics collected during execution?
260    pub fn collect_statistics(&self) -> bool {
261        self.options.execution.collect_statistics
262    }
263
264    /// Compression codec for spill file
265    pub fn spill_compression(&self) -> SpillCompression {
266        self.options.execution.spill_compression
267    }
268
269    /// Selects a name for the default catalog and schema
270    pub fn with_default_catalog_and_schema(
271        mut self,
272        catalog: impl Into<String>,
273        schema: impl Into<String>,
274    ) -> Self {
275        self.options_mut().catalog.default_catalog = catalog.into();
276        self.options_mut().catalog.default_schema = schema.into();
277        self
278    }
279
280    /// Controls whether the default catalog and schema will be automatically created
281    pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
282        self.options_mut().catalog.create_default_catalog_and_schema = create;
283        self
284    }
285
286    /// Enables or disables the inclusion of `information_schema` virtual tables
287    pub fn with_information_schema(mut self, enabled: bool) -> Self {
288        self.options_mut().catalog.information_schema = enabled;
289        self
290    }
291
292    /// Enables or disables the use of repartitioning for joins to improve parallelism
293    pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
294        self.options_mut().optimizer.repartition_joins = enabled;
295        self
296    }
297
298    /// Enables or disables the use of repartitioning for aggregations to improve parallelism
299    pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
300        self.options_mut().optimizer.repartition_aggregations = enabled;
301        self
302    }
303
304    /// Sets minimum file range size for repartitioning scans
305    pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
306        self.options_mut().optimizer.repartition_file_min_size = size;
307        self
308    }
309
310    /// Enables or disables the allowing unordered symmetric hash join
311    pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
312        self.options_mut()
313            .optimizer
314            .allow_symmetric_joins_without_pruning = enabled;
315        self
316    }
317
318    /// Enables or disables the use of repartitioning for file scans
319    pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
320        self.options_mut().optimizer.repartition_file_scans = enabled;
321        self
322    }
323
324    /// Enables or disables the use of repartitioning for window functions to improve parallelism
325    pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
326        self.options_mut().optimizer.repartition_windows = enabled;
327        self
328    }
329
330    /// Enables or disables the use of per-partition sorting to improve parallelism
331    pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
332        self.options_mut().optimizer.repartition_sorts = enabled;
333        self
334    }
335
336    /// Prefer existing sort (true) or maximize parallelism (false). See
337    /// [prefer_existing_sort] for more details
338    ///
339    /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
340    pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
341        self.options_mut().optimizer.prefer_existing_sort = enabled;
342        self
343    }
344
345    /// Prefer existing union (true). See [prefer_existing_union] for more details
346    ///
347    /// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
348    pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
349        self.options_mut().optimizer.prefer_existing_union = enabled;
350        self
351    }
352
353    /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
354    pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
355        self.options_mut().execution.parquet.pruning = enabled;
356        self
357    }
358
359    /// Returns true if pruning predicate should be used to skip parquet row groups
360    pub fn parquet_pruning(&self) -> bool {
361        self.options.execution.parquet.pruning
362    }
363
364    /// Returns true if bloom filter should be used to skip parquet row groups
365    pub fn parquet_bloom_filter_pruning(&self) -> bool {
366        self.options.execution.parquet.bloom_filter_on_read
367    }
368
369    /// Enables or disables the use of bloom filter for parquet readers to skip row groups
370    pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
371        self.options_mut().execution.parquet.bloom_filter_on_read = enabled;
372        self
373    }
374
375    /// Returns true if page index should be used to skip parquet data pages
376    pub fn parquet_page_index_pruning(&self) -> bool {
377        self.options.execution.parquet.enable_page_index
378    }
379
380    /// Enables or disables the use of page index for parquet readers to skip parquet data pages
381    pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
382        self.options_mut().execution.parquet.enable_page_index = enabled;
383        self
384    }
385
386    /// Enables or disables the collection of statistics after listing files
387    pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
388        self.options_mut().execution.collect_statistics = enabled;
389        self
390    }
391
392    /// Get the currently configured batch size
393    pub fn batch_size(&self) -> usize {
394        self.options.execution.batch_size
395    }
396
397    /// Enables or disables the coalescence of small batches into larger batches
398    pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
399        self.options_mut().execution.coalesce_batches = enabled;
400        self
401    }
402
403    /// Returns true if record batches will be examined between each operator
404    /// and small batches will be coalesced into larger batches.
405    pub fn coalesce_batches(&self) -> bool {
406        self.options.execution.coalesce_batches
407    }
408
409    /// Enables or disables the round robin repartition for increasing parallelism
410    pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
411        self.options_mut().optimizer.enable_round_robin_repartition = enabled;
412        self
413    }
414
415    /// Returns true if the physical plan optimizer will try to
416    /// add round robin repartition to increase parallelism to leverage more CPU cores.
417    pub fn round_robin_repartition(&self) -> bool {
418        self.options.optimizer.enable_round_robin_repartition
419    }
420
421    /// Enables or disables sort pushdown optimization, and currently only
422    /// applies to Parquet data source.
423    pub fn with_enable_sort_pushdown(mut self, enabled: bool) -> Self {
424        self.options_mut().optimizer.enable_sort_pushdown = enabled;
425        self
426    }
427
428    /// Enables or disables elimination of `ORDER BY` clauses in subqueries
429    /// when they are not required by order-sensitive operators.
430    pub fn with_enable_subquery_sort_elimination(mut self, enabled: bool) -> Self {
431        self.options_mut()
432            .sql_parser
433            .enable_subquery_sort_elimination = enabled;
434        self
435    }
436
437    /// Set the size of [`sort_spill_reservation_bytes`] to control
438    /// memory pre-reservation
439    ///
440    /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
441    pub fn with_sort_spill_reservation_bytes(
442        mut self,
443        sort_spill_reservation_bytes: usize,
444    ) -> Self {
445        self.options_mut().execution.sort_spill_reservation_bytes =
446            sort_spill_reservation_bytes;
447        self
448    }
449
450    /// Set the compression codec [`spill_compression`] used when spilling data to disk.
451    ///
452    /// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression
453    pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self {
454        self.options_mut().execution.spill_compression = spill_compression;
455        self
456    }
457
458    /// Set the size of [`sort_in_place_threshold_bytes`] to control
459    /// how sort does things.
460    ///
461    /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
462    pub fn with_sort_in_place_threshold_bytes(
463        mut self,
464        sort_in_place_threshold_bytes: usize,
465    ) -> Self {
466        self.options_mut().execution.sort_in_place_threshold_bytes =
467            sort_in_place_threshold_bytes;
468        self
469    }
470
471    /// Enables or disables the enforcement of batch size in joins
472    pub fn with_enforce_batch_size_in_joins(
473        mut self,
474        enforce_batch_size_in_joins: bool,
475    ) -> Self {
476        self.options_mut().execution.enforce_batch_size_in_joins =
477            enforce_batch_size_in_joins;
478        self
479    }
480
481    /// Returns true if the joins will be enforced to output batches of the configured size
482    pub fn enforce_batch_size_in_joins(&self) -> bool {
483        self.options.execution.enforce_batch_size_in_joins
484    }
485
486    /// Toggle SQL ANSI mode for expressions, casting, and error handling
487    pub fn with_enable_ansi_mode(mut self, enable_ansi_mode: bool) -> Self {
488        self.options_mut().execution.enable_ansi_mode = enable_ansi_mode;
489        self
490    }
491
492    /// Convert configuration options to name-value pairs with values
493    /// converted to strings.
494    ///
495    /// Note that this method will eventually be deprecated and
496    /// replaced by [`options`].
497    ///
498    /// [`options`]: Self::options
499    pub fn to_props(&self) -> HashMap<String, String> {
500        let mut map = HashMap::new();
501        // copy configs from config_options
502        for entry in self.options.entries() {
503            map.insert(entry.key, entry.value.unwrap_or_default());
504        }
505
506        map
507    }
508
509    /// Add extensions.
510    ///
511    /// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
512    /// Extensions are opaque and the types are unknown to DataFusion itself, which makes them extremely flexible. [^1]
513    ///
514    /// Extensions are stored within an [`Arc`] so they do NOT require [`Clone`]. The are immutable. If you need to
515    /// modify their state over their lifetime -- e.g. for caches -- you need to establish some for of interior mutability.
516    ///
517    /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
518    /// will be kept.
519    ///
520    /// You may use [`get_extension`](Self::get_extension) to retrieve extensions.
521    ///
522    /// # Example
523    /// ```
524    /// use datafusion_execution::config::SessionConfig;
525    /// use std::sync::Arc;
526    ///
527    /// // application-specific extension types
528    /// struct Ext1(u8);
529    /// struct Ext2(u8);
530    /// struct Ext3(u8);
531    ///
532    /// let ext1a = Arc::new(Ext1(10));
533    /// let ext1b = Arc::new(Ext1(11));
534    /// let ext2 = Arc::new(Ext2(2));
535    ///
536    /// let cfg = SessionConfig::default()
537    ///     // will only remember the last Ext1
538    ///     .with_extension(Arc::clone(&ext1a))
539    ///     .with_extension(Arc::clone(&ext1b))
540    ///     .with_extension(Arc::clone(&ext2));
541    ///
542    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
543    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
544    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
545    ///
546    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
547    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
548    ///
549    /// assert!(cfg.get_extension::<Ext3>().is_none());
550    /// ```
551    ///
552    /// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
553    pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
554    where
555        T: Send + Sync + 'static,
556    {
557        self.set_extension(ext);
558        self
559    }
560
561    /// Set extension. Pretty much the same as [`with_extension`](Self::with_extension), but take
562    /// mutable reference instead of owning it. Useful if you want to add another extension after
563    /// the [`SessionConfig`] is created.
564    ///
565    /// # Example
566    /// ```
567    /// use datafusion_execution::config::SessionConfig;
568    /// use std::sync::Arc;
569    ///
570    /// // application-specific extension types
571    /// struct Ext1(u8);
572    /// struct Ext2(u8);
573    /// struct Ext3(u8);
574    ///
575    /// let ext1a = Arc::new(Ext1(10));
576    /// let ext1b = Arc::new(Ext1(11));
577    /// let ext2 = Arc::new(Ext2(2));
578    ///
579    /// let mut cfg = SessionConfig::default();
580    ///
581    /// // will only remember the last Ext1
582    /// cfg.set_extension(Arc::clone(&ext1a));
583    /// cfg.set_extension(Arc::clone(&ext1b));
584    /// cfg.set_extension(Arc::clone(&ext2));
585    ///
586    /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
587    /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
588    /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
589    ///
590    /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
591    /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
592    ///
593    /// assert!(cfg.get_extension::<Ext3>().is_none());
594    /// ```
595    pub fn set_extension<T>(&mut self, ext: Arc<T>)
596    where
597        T: Send + Sync + 'static,
598    {
599        self.extensions.insert_arc(ext);
600    }
601
602    /// Get extension, if any for the specified type `T` exists.
603    ///
604    /// See [`with_extension`](Self::with_extension) on how to add attach extensions.
605    pub fn get_extension<T>(&self) -> Option<Arc<T>>
606    where
607        T: Send + Sync + 'static,
608    {
609        self.extensions.get_arc::<T>()
610    }
611}
612
613impl From<ConfigOptions> for SessionConfig {
614    fn from(options: ConfigOptions) -> Self {
615        let options = Arc::new(options);
616        Self {
617            options,
618            ..Default::default()
619        }
620    }
621}