datafusion_execution/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{collections::HashMap, sync::Arc};
19
20use datafusion_common::{
21 Result, ScalarValue,
22 config::{ConfigExtension, ConfigOptions, SpillCompression},
23 extensions::Extensions,
24};
25
26/// Configuration options for [`SessionContext`].
27///
28/// Can be passed to [`SessionContext::new_with_config`] to customize the configuration of DataFusion.
29///
30/// Options can be set using namespaces keys with `.` as the separator, where the
31/// namespace determines which configuration struct the value to routed to. All
32/// built-in options are under the `datafusion` namespace.
33///
34/// For example, the key `datafusion.execution.batch_size` will set [ExecutionOptions::batch_size][datafusion_common::config::ExecutionOptions::batch_size],
35/// because [ConfigOptions::execution] is [ExecutionOptions][datafusion_common::config::ExecutionOptions]. Similarly, the key
36/// `datafusion.execution.parquet.pushdown_filters` will set [ParquetOptions::pushdown_filters][datafusion_common::config::ParquetOptions::pushdown_filters],
37/// since [ExecutionOptions::parquet][datafusion_common::config::ExecutionOptions::parquet] is [ParquetOptions][datafusion_common::config::ParquetOptions].
38///
39/// Some options have convenience methods. For example [SessionConfig::with_batch_size] is
40/// shorthand for setting `datafusion.execution.batch_size`.
41///
42/// ```
43/// use datafusion_common::ScalarValue;
44/// use datafusion_execution::config::SessionConfig;
45///
46/// let config = SessionConfig::new()
47/// .set(
48/// "datafusion.execution.batch_size",
49/// &ScalarValue::UInt64(Some(1234)),
50/// )
51/// .set_bool("datafusion.execution.parquet.pushdown_filters", true);
52///
53/// assert_eq!(config.batch_size(), 1234);
54/// assert_eq!(config.options().execution.batch_size, 1234);
55/// assert_eq!(config.options().execution.parquet.pushdown_filters, true);
56/// ```
57///
58/// You can also directly mutate the options via [SessionConfig::options_mut].
59/// So the following is equivalent to the above:
60///
61/// ```
62/// # use datafusion_execution::config::SessionConfig;
63/// # use datafusion_common::ScalarValue;
64/// #
65/// let mut config = SessionConfig::new();
66/// config.options_mut().execution.batch_size = 1234;
67/// config.options_mut().execution.parquet.pushdown_filters = true;
68/// #
69/// # assert_eq!(config.batch_size(), 1234);
70/// # assert_eq!(config.options().execution.batch_size, 1234);
71/// # assert_eq!(config.options().execution.parquet.pushdown_filters, true);
72/// ```
73///
74/// ## Built-in options
75///
76/// | Namespace | Config struct |
77/// | --------- | ------------- |
78/// | `datafusion.catalog` | [CatalogOptions][datafusion_common::config::CatalogOptions] |
79/// | `datafusion.execution` | [ExecutionOptions][datafusion_common::config::ExecutionOptions] |
80/// | `datafusion.execution.parquet` | [ParquetOptions][datafusion_common::config::ParquetOptions] |
81/// | `datafusion.optimizer` | [OptimizerOptions][datafusion_common::config::OptimizerOptions] |
82/// | `datafusion.sql_parser` | [SqlParserOptions][datafusion_common::config::SqlParserOptions] |
83/// | `datafusion.explain` | [ExplainOptions][datafusion_common::config::ExplainOptions] |
84///
85/// ## Custom configuration
86///
87/// Configuration options can be extended. See [SessionConfig::with_extension] for details.
88///
89/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
90/// [`SessionContext::new_with_config`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.new_with_config
91#[derive(Clone, Debug)]
92pub struct SessionConfig {
93 /// Configuration options for the current session.
94 ///
95 /// A new copy is created on write, if there are other outstanding
96 /// references to the same options.
97 options: Arc<ConfigOptions>,
98 /// Opaque extensions, keyed by concrete Rust type. See
99 /// [`with_extension`](Self::with_extension) and
100 /// [`get_extension`](Self::get_extension).
101 extensions: Extensions,
102}
103
104impl Default for SessionConfig {
105 fn default() -> Self {
106 Self {
107 options: Arc::new(ConfigOptions::new()),
108 extensions: Extensions::new(),
109 }
110 }
111}
112
113impl SessionConfig {
114 /// Create an execution config with default setting
115 pub fn new() -> Self {
116 Default::default()
117 }
118
119 /// Create an execution config with config options read from the environment
120 ///
121 /// See [`ConfigOptions::from_env`] for details on how environment variables
122 /// are mapped to config options.
123 pub fn from_env() -> Result<Self> {
124 Ok(ConfigOptions::from_env()?.into())
125 }
126
127 /// Create new ConfigOptions struct, taking values from a string hash map.
128 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
129 Ok(ConfigOptions::from_string_hash_map(settings)?.into())
130 }
131
132 /// Return a handle to the configuration options.
133 ///
134 /// Can be used to read the current configuration.
135 ///
136 /// ```
137 /// use datafusion_execution::config::SessionConfig;
138 ///
139 /// let config = SessionConfig::new();
140 /// assert!(config.options().execution.batch_size > 0);
141 /// ```
142 pub fn options(&self) -> &Arc<ConfigOptions> {
143 &self.options
144 }
145
146 /// Return a mutable handle to the configuration options.
147 ///
148 /// Can be used to set configuration options.
149 ///
150 /// ```
151 /// use datafusion_execution::config::SessionConfig;
152 ///
153 /// let mut config = SessionConfig::new();
154 /// config.options_mut().execution.batch_size = 1024;
155 /// assert_eq!(config.options().execution.batch_size, 1024);
156 /// ```
157 pub fn options_mut(&mut self) -> &mut ConfigOptions {
158 Arc::make_mut(&mut self.options)
159 }
160
161 /// Set a configuration option
162 pub fn set(self, key: &str, value: &ScalarValue) -> Self {
163 self.set_str(key, &value.to_string())
164 }
165
166 /// Set a boolean configuration option
167 pub fn set_bool(self, key: &str, value: bool) -> Self {
168 self.set_str(key, &value.to_string())
169 }
170
171 /// Set a generic `u64` configuration option
172 pub fn set_u64(self, key: &str, value: u64) -> Self {
173 self.set_str(key, &value.to_string())
174 }
175
176 /// Set a generic `usize` configuration option
177 pub fn set_usize(self, key: &str, value: usize) -> Self {
178 self.set_str(key, &value.to_string())
179 }
180
181 /// Set a generic `str` configuration option
182 pub fn set_str(mut self, key: &str, value: &str) -> Self {
183 self.options_mut().set(key, value).unwrap();
184 self
185 }
186
187 /// Customize batch size
188 pub fn with_batch_size(mut self, n: usize) -> Self {
189 // batch size must be greater than zero
190 assert!(n > 0);
191 self.options_mut().execution.batch_size = n;
192 self
193 }
194
195 /// Customize [`target_partitions`]
196 ///
197 /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
198 pub fn with_target_partitions(mut self, n: usize) -> Self {
199 self.options_mut().execution.target_partitions = if n == 0 {
200 datafusion_common::config::ExecutionOptions::default().target_partitions
201 } else {
202 n
203 };
204 self
205 }
206
207 /// Insert new [ConfigExtension]
208 pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
209 self.options_mut().extensions.insert(extension);
210 self
211 }
212
213 /// Get [`target_partitions`]
214 ///
215 /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
216 pub fn target_partitions(&self) -> usize {
217 self.options.execution.target_partitions
218 }
219
220 /// Is the information schema enabled?
221 pub fn information_schema(&self) -> bool {
222 self.options.catalog.information_schema
223 }
224
225 /// Should the context create the default catalog and schema?
226 pub fn create_default_catalog_and_schema(&self) -> bool {
227 self.options.catalog.create_default_catalog_and_schema
228 }
229
230 /// Are joins repartitioned during execution?
231 pub fn repartition_joins(&self) -> bool {
232 self.options.optimizer.repartition_joins
233 }
234
235 /// Are aggregates repartitioned during execution?
236 pub fn repartition_aggregations(&self) -> bool {
237 self.options.optimizer.repartition_aggregations
238 }
239
240 /// Are window functions repartitioned during execution?
241 pub fn repartition_window_functions(&self) -> bool {
242 self.options.optimizer.repartition_windows
243 }
244
245 /// Do we execute sorts in a per-partition fashion and merge afterwards,
246 /// or do we coalesce partitions first and sort globally?
247 pub fn repartition_sorts(&self) -> bool {
248 self.options.optimizer.repartition_sorts
249 }
250
251 /// Prefer existing sort (true) or maximize parallelism (false). See
252 /// [prefer_existing_sort] for more details
253 ///
254 /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
255 pub fn prefer_existing_sort(&self) -> bool {
256 self.options.optimizer.prefer_existing_sort
257 }
258
259 /// Are statistics collected during execution?
260 pub fn collect_statistics(&self) -> bool {
261 self.options.execution.collect_statistics
262 }
263
264 /// Compression codec for spill file
265 pub fn spill_compression(&self) -> SpillCompression {
266 self.options.execution.spill_compression
267 }
268
269 /// Selects a name for the default catalog and schema
270 pub fn with_default_catalog_and_schema(
271 mut self,
272 catalog: impl Into<String>,
273 schema: impl Into<String>,
274 ) -> Self {
275 self.options_mut().catalog.default_catalog = catalog.into();
276 self.options_mut().catalog.default_schema = schema.into();
277 self
278 }
279
280 /// Controls whether the default catalog and schema will be automatically created
281 pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
282 self.options_mut().catalog.create_default_catalog_and_schema = create;
283 self
284 }
285
286 /// Enables or disables the inclusion of `information_schema` virtual tables
287 pub fn with_information_schema(mut self, enabled: bool) -> Self {
288 self.options_mut().catalog.information_schema = enabled;
289 self
290 }
291
292 /// Enables or disables the use of repartitioning for joins to improve parallelism
293 pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
294 self.options_mut().optimizer.repartition_joins = enabled;
295 self
296 }
297
298 /// Enables or disables the use of repartitioning for aggregations to improve parallelism
299 pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
300 self.options_mut().optimizer.repartition_aggregations = enabled;
301 self
302 }
303
304 /// Sets minimum file range size for repartitioning scans
305 pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
306 self.options_mut().optimizer.repartition_file_min_size = size;
307 self
308 }
309
310 /// Enables or disables the allowing unordered symmetric hash join
311 pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
312 self.options_mut()
313 .optimizer
314 .allow_symmetric_joins_without_pruning = enabled;
315 self
316 }
317
318 /// Enables or disables the use of repartitioning for file scans
319 pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
320 self.options_mut().optimizer.repartition_file_scans = enabled;
321 self
322 }
323
324 /// Enables or disables the use of repartitioning for window functions to improve parallelism
325 pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
326 self.options_mut().optimizer.repartition_windows = enabled;
327 self
328 }
329
330 /// Enables or disables the use of per-partition sorting to improve parallelism
331 pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
332 self.options_mut().optimizer.repartition_sorts = enabled;
333 self
334 }
335
336 /// Prefer existing sort (true) or maximize parallelism (false). See
337 /// [prefer_existing_sort] for more details
338 ///
339 /// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
340 pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
341 self.options_mut().optimizer.prefer_existing_sort = enabled;
342 self
343 }
344
345 /// Prefer existing union (true). See [prefer_existing_union] for more details
346 ///
347 /// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
348 pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
349 self.options_mut().optimizer.prefer_existing_union = enabled;
350 self
351 }
352
353 /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
354 pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
355 self.options_mut().execution.parquet.pruning = enabled;
356 self
357 }
358
359 /// Returns true if pruning predicate should be used to skip parquet row groups
360 pub fn parquet_pruning(&self) -> bool {
361 self.options.execution.parquet.pruning
362 }
363
364 /// Returns true if bloom filter should be used to skip parquet row groups
365 pub fn parquet_bloom_filter_pruning(&self) -> bool {
366 self.options.execution.parquet.bloom_filter_on_read
367 }
368
369 /// Enables or disables the use of bloom filter for parquet readers to skip row groups
370 pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
371 self.options_mut().execution.parquet.bloom_filter_on_read = enabled;
372 self
373 }
374
375 /// Returns true if page index should be used to skip parquet data pages
376 pub fn parquet_page_index_pruning(&self) -> bool {
377 self.options.execution.parquet.enable_page_index
378 }
379
380 /// Enables or disables the use of page index for parquet readers to skip parquet data pages
381 pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
382 self.options_mut().execution.parquet.enable_page_index = enabled;
383 self
384 }
385
386 /// Enables or disables the collection of statistics after listing files
387 pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
388 self.options_mut().execution.collect_statistics = enabled;
389 self
390 }
391
392 /// Get the currently configured batch size
393 pub fn batch_size(&self) -> usize {
394 self.options.execution.batch_size
395 }
396
397 /// Enables or disables the coalescence of small batches into larger batches
398 pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
399 self.options_mut().execution.coalesce_batches = enabled;
400 self
401 }
402
403 /// Returns true if record batches will be examined between each operator
404 /// and small batches will be coalesced into larger batches.
405 pub fn coalesce_batches(&self) -> bool {
406 self.options.execution.coalesce_batches
407 }
408
409 /// Enables or disables the round robin repartition for increasing parallelism
410 pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
411 self.options_mut().optimizer.enable_round_robin_repartition = enabled;
412 self
413 }
414
415 /// Returns true if the physical plan optimizer will try to
416 /// add round robin repartition to increase parallelism to leverage more CPU cores.
417 pub fn round_robin_repartition(&self) -> bool {
418 self.options.optimizer.enable_round_robin_repartition
419 }
420
421 /// Enables or disables sort pushdown optimization, and currently only
422 /// applies to Parquet data source.
423 pub fn with_enable_sort_pushdown(mut self, enabled: bool) -> Self {
424 self.options_mut().optimizer.enable_sort_pushdown = enabled;
425 self
426 }
427
428 /// Enables or disables elimination of `ORDER BY` clauses in subqueries
429 /// when they are not required by order-sensitive operators.
430 pub fn with_enable_subquery_sort_elimination(mut self, enabled: bool) -> Self {
431 self.options_mut()
432 .sql_parser
433 .enable_subquery_sort_elimination = enabled;
434 self
435 }
436
437 /// Set the size of [`sort_spill_reservation_bytes`] to control
438 /// memory pre-reservation
439 ///
440 /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
441 pub fn with_sort_spill_reservation_bytes(
442 mut self,
443 sort_spill_reservation_bytes: usize,
444 ) -> Self {
445 self.options_mut().execution.sort_spill_reservation_bytes =
446 sort_spill_reservation_bytes;
447 self
448 }
449
450 /// Set the compression codec [`spill_compression`] used when spilling data to disk.
451 ///
452 /// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression
453 pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self {
454 self.options_mut().execution.spill_compression = spill_compression;
455 self
456 }
457
458 /// Set the size of [`sort_in_place_threshold_bytes`] to control
459 /// how sort does things.
460 ///
461 /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
462 pub fn with_sort_in_place_threshold_bytes(
463 mut self,
464 sort_in_place_threshold_bytes: usize,
465 ) -> Self {
466 self.options_mut().execution.sort_in_place_threshold_bytes =
467 sort_in_place_threshold_bytes;
468 self
469 }
470
471 /// Enables or disables the enforcement of batch size in joins
472 pub fn with_enforce_batch_size_in_joins(
473 mut self,
474 enforce_batch_size_in_joins: bool,
475 ) -> Self {
476 self.options_mut().execution.enforce_batch_size_in_joins =
477 enforce_batch_size_in_joins;
478 self
479 }
480
481 /// Returns true if the joins will be enforced to output batches of the configured size
482 pub fn enforce_batch_size_in_joins(&self) -> bool {
483 self.options.execution.enforce_batch_size_in_joins
484 }
485
486 /// Toggle SQL ANSI mode for expressions, casting, and error handling
487 pub fn with_enable_ansi_mode(mut self, enable_ansi_mode: bool) -> Self {
488 self.options_mut().execution.enable_ansi_mode = enable_ansi_mode;
489 self
490 }
491
492 /// Convert configuration options to name-value pairs with values
493 /// converted to strings.
494 ///
495 /// Note that this method will eventually be deprecated and
496 /// replaced by [`options`].
497 ///
498 /// [`options`]: Self::options
499 pub fn to_props(&self) -> HashMap<String, String> {
500 let mut map = HashMap::new();
501 // copy configs from config_options
502 for entry in self.options.entries() {
503 map.insert(entry.key, entry.value.unwrap_or_default());
504 }
505
506 map
507 }
508
509 /// Add extensions.
510 ///
511 /// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
512 /// Extensions are opaque and the types are unknown to DataFusion itself, which makes them extremely flexible. [^1]
513 ///
514 /// Extensions are stored within an [`Arc`] so they do NOT require [`Clone`]. The are immutable. If you need to
515 /// modify their state over their lifetime -- e.g. for caches -- you need to establish some for of interior mutability.
516 ///
517 /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
518 /// will be kept.
519 ///
520 /// You may use [`get_extension`](Self::get_extension) to retrieve extensions.
521 ///
522 /// # Example
523 /// ```
524 /// use datafusion_execution::config::SessionConfig;
525 /// use std::sync::Arc;
526 ///
527 /// // application-specific extension types
528 /// struct Ext1(u8);
529 /// struct Ext2(u8);
530 /// struct Ext3(u8);
531 ///
532 /// let ext1a = Arc::new(Ext1(10));
533 /// let ext1b = Arc::new(Ext1(11));
534 /// let ext2 = Arc::new(Ext2(2));
535 ///
536 /// let cfg = SessionConfig::default()
537 /// // will only remember the last Ext1
538 /// .with_extension(Arc::clone(&ext1a))
539 /// .with_extension(Arc::clone(&ext1b))
540 /// .with_extension(Arc::clone(&ext2));
541 ///
542 /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
543 /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
544 /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
545 ///
546 /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
547 /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
548 ///
549 /// assert!(cfg.get_extension::<Ext3>().is_none());
550 /// ```
551 ///
552 /// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
553 pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
554 where
555 T: Send + Sync + 'static,
556 {
557 self.set_extension(ext);
558 self
559 }
560
561 /// Set extension. Pretty much the same as [`with_extension`](Self::with_extension), but take
562 /// mutable reference instead of owning it. Useful if you want to add another extension after
563 /// the [`SessionConfig`] is created.
564 ///
565 /// # Example
566 /// ```
567 /// use datafusion_execution::config::SessionConfig;
568 /// use std::sync::Arc;
569 ///
570 /// // application-specific extension types
571 /// struct Ext1(u8);
572 /// struct Ext2(u8);
573 /// struct Ext3(u8);
574 ///
575 /// let ext1a = Arc::new(Ext1(10));
576 /// let ext1b = Arc::new(Ext1(11));
577 /// let ext2 = Arc::new(Ext2(2));
578 ///
579 /// let mut cfg = SessionConfig::default();
580 ///
581 /// // will only remember the last Ext1
582 /// cfg.set_extension(Arc::clone(&ext1a));
583 /// cfg.set_extension(Arc::clone(&ext1b));
584 /// cfg.set_extension(Arc::clone(&ext2));
585 ///
586 /// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
587 /// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
588 /// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
589 ///
590 /// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
591 /// assert!(Arc::ptr_eq(&ext2_received, &ext2));
592 ///
593 /// assert!(cfg.get_extension::<Ext3>().is_none());
594 /// ```
595 pub fn set_extension<T>(&mut self, ext: Arc<T>)
596 where
597 T: Send + Sync + 'static,
598 {
599 self.extensions.insert_arc(ext);
600 }
601
602 /// Get extension, if any for the specified type `T` exists.
603 ///
604 /// See [`with_extension`](Self::with_extension) on how to add attach extensions.
605 pub fn get_extension<T>(&self) -> Option<Arc<T>>
606 where
607 T: Send + Sync + 'static,
608 {
609 self.extensions.get_arc::<T>()
610 }
611}
612
613impl From<ConfigOptions> for SessionConfig {
614 fn from(options: ConfigOptions) -> Self {
615 let options = Arc::new(options);
616 Self {
617 options,
618 ..Default::default()
619 }
620 }
621}