datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use crate::error::_config_err;
21use crate::parsers::CompressionTypeVariant;
22use crate::utils::get_available_parallelism;
23use crate::{DataFusionError, Result};
24use std::any::Any;
25use std::collections::{BTreeMap, HashMap};
26use std::error::Error;
27use std::fmt::{self, Display};
28use std::str::FromStr;
29
30/// A macro that wraps a configuration struct and automatically derives
31/// [`Default`] and [`ConfigField`] for it, allowing it to be used
32/// in the [`ConfigOptions`] configuration tree.
33///
34/// `transform` is used to normalize values before parsing.
35///
36/// For example,
37///
38/// ```ignore
39/// config_namespace! {
40/// /// Amazing config
41/// pub struct MyConfig {
42/// /// Field 1 doc
43/// field1: String, transform = str::to_lowercase, default = "".to_string()
44///
45/// /// Field 2 doc
46/// field2: usize, default = 232
47///
48/// /// Field 3 doc
49/// field3: Option<usize>, default = None
50/// }
51///}
52/// ```
53///
54/// Will generate
55///
56/// ```ignore
57/// /// Amazing config
58/// #[derive(Debug, Clone)]
59/// #[non_exhaustive]
60/// pub struct MyConfig {
61/// /// Field 1 doc
62/// field1: String,
63/// /// Field 2 doc
64/// field2: usize,
65/// /// Field 3 doc
66/// field3: Option<usize>,
67/// }
68/// impl ConfigField for MyConfig {
69/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
70/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
71/// match key {
72/// "field1" => {
73/// let value = str::to_lowercase(value);
74/// self.field1.set(rem, value.as_ref())
75/// },
76/// "field2" => self.field2.set(rem, value.as_ref()),
77/// "field3" => self.field3.set(rem, value.as_ref()),
78/// _ => _internal_err!(
79/// "Config value \"{}\" not found on MyConfig",
80/// key
81/// ),
82/// }
83/// }
84///
85/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
86/// let key = format!("{}.field1", key_prefix);
87/// let desc = "Field 1 doc";
88/// self.field1.visit(v, key.as_str(), desc);
89/// let key = format!("{}.field2", key_prefix);
90/// let desc = "Field 2 doc";
91/// self.field2.visit(v, key.as_str(), desc);
92/// let key = format!("{}.field3", key_prefix);
93/// let desc = "Field 3 doc";
94/// self.field3.visit(v, key.as_str(), desc);
95/// }
96/// }
97///
98/// impl Default for MyConfig {
99/// fn default() -> Self {
100/// Self {
101/// field1: "".to_string(),
102/// field2: 232,
103/// field3: None,
104/// }
105/// }
106/// }
107/// ```
108///
109/// NB: Misplaced commas may result in nonsensical errors
110#[macro_export]
111macro_rules! config_namespace {
112 (
113 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
114 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
115 $(#[allow($($struct_de:tt)*)])?
116 $vis:vis struct $struct_name:ident {
117 $(
118 $(#[doc = $d:tt])* // Field-level documentation attributes
119 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
120 $(#[allow($($field_de:tt)*)])?
121 $field_vis:vis $field_name:ident : $field_type:ty,
122 $(warn = $warn:expr,)?
123 $(transform = $transform:expr,)?
124 default = $default:expr
125 )*$(,)*
126 }
127 ) => {
128 $(#[doc = $struct_d])* // Apply struct documentation
129 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
130 $(#[allow($($struct_de)*)])?
131 #[derive(Debug, Clone, PartialEq)]
132 $vis struct $struct_name {
133 $(
134 $(#[doc = $d])* // Apply field documentation
135 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
136 $(#[allow($($field_de)*)])?
137 $field_vis $field_name: $field_type,
138 )*
139 }
140
141 impl $crate::config::ConfigField for $struct_name {
142 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
143 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
144 match key {
145 $(
146 stringify!($field_name) => {
147 // Safely apply deprecated attribute if present
148 // $(#[allow(deprecated)])?
149 {
150 $(let value = $transform(value);)? // Apply transformation if specified
151 #[allow(deprecated)]
152 let ret = self.$field_name.set(rem, value.as_ref());
153
154 $(if !$warn.is_empty() {
155 let default: $field_type = $default;
156 #[allow(deprecated)]
157 if default != self.$field_name {
158 log::warn!($warn);
159 }
160 })? // Log warning if specified, and the value is not the default
161 ret
162 }
163 },
164 )*
165 _ => return $crate::error::_config_err!(
166 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
167 )
168 }
169 }
170
171 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
172 $(
173 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
174 let desc = concat!($($d),*).trim();
175 #[allow(deprecated)]
176 self.$field_name.visit(v, key.as_str(), desc);
177 )*
178 }
179 }
180 impl Default for $struct_name {
181 fn default() -> Self {
182 #[allow(deprecated)]
183 Self {
184 $($field_name: $default),*
185 }
186 }
187 }
188 }
189}
190
191config_namespace! {
192 /// Options related to catalog and directory scanning
193 ///
194 /// See also: [`SessionConfig`]
195 ///
196 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
197 pub struct CatalogOptions {
198 /// Whether the default catalog and schema should be created automatically.
199 pub create_default_catalog_and_schema: bool, default = true
200
201 /// The default catalog name - this impacts what SQL queries use if not specified
202 pub default_catalog: String, default = "datafusion".to_string()
203
204 /// The default schema name - this impacts what SQL queries use if not specified
205 pub default_schema: String, default = "public".to_string()
206
207 /// Should DataFusion provide access to `information_schema`
208 /// virtual tables for displaying schema information
209 pub information_schema: bool, default = false
210
211 /// Location scanned to load tables for `default` schema
212 pub location: Option<String>, default = None
213
214 /// Type of `TableProvider` to use when loading `default` schema
215 pub format: Option<String>, default = None
216
217 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
218 /// if not specified explicitly in the statement.
219 pub has_header: bool, default = true
220
221 /// Specifies whether newlines in (quoted) CSV values are supported.
222 ///
223 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
224 /// if not specified explicitly in the statement.
225 ///
226 /// Parsing newlines in quoted values may be affected by execution behaviour such as
227 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
228 /// parsed successfully, which may reduce performance.
229 pub newlines_in_values: bool, default = false
230 }
231}
232
233config_namespace! {
234 /// Options related to SQL parser
235 ///
236 /// See also: [`SessionConfig`]
237 ///
238 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
239 pub struct SqlParserOptions {
240 /// When set to true, SQL parser will parse float as decimal type
241 pub parse_float_as_decimal: bool, default = false
242
243 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
244 pub enable_ident_normalization: bool, default = true
245
246 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
247 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
248 /// are normalized automatically.
249 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
250
251 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
252 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
253 pub dialect: String, default = "generic".to_string()
254 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
255
256 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
257 /// ignore the length. If false, error if a `VARCHAR` with a length is
258 /// specified. The Arrow type system does not have a notion of maximum
259 /// string length and thus DataFusion can not enforce such limits.
260 pub support_varchar_with_length: bool, default = true
261
262 /// If true, `VARCHAR` is mapped to `Utf8View` during SQL planning.
263 /// If false, `VARCHAR` is mapped to `Utf8` during SQL planning.
264 /// Default is false.
265 pub map_varchar_to_utf8view: bool, default = true
266
267 /// When set to true, the source locations relative to the original SQL
268 /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
269 /// and recorded in the logical plan nodes.
270 pub collect_spans: bool, default = false
271
272 /// Specifies the recursion depth limit when parsing complex SQL Queries
273 pub recursion_limit: usize, default = 50
274 }
275}
276
277config_namespace! {
278 /// Options related to query execution
279 ///
280 /// See also: [`SessionConfig`]
281 ///
282 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
283 pub struct ExecutionOptions {
284 /// Default batch size while creating new batches, it's especially useful for
285 /// buffer-in-memory batches since creating tiny batches would result in too much
286 /// metadata memory consumption
287 pub batch_size: usize, default = 8192
288
289 /// When set to true, record batches will be examined between each operator and
290 /// small batches will be coalesced into larger batches. This is helpful when there
291 /// are highly selective filters or joins that could produce tiny output batches. The
292 /// target batch size is determined by the configuration setting
293 pub coalesce_batches: bool, default = true
294
295 /// Should DataFusion collect statistics when first creating a table.
296 /// Has no effect after the table is created. Applies to the default
297 /// `ListingTableProvider` in DataFusion. Defaults to true.
298 pub collect_statistics: bool, default = true
299
300 /// Number of partitions for query execution. Increasing partitions can increase
301 /// concurrency.
302 ///
303 /// Defaults to the number of CPU cores on the system
304 pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
305
306 /// The default time zone
307 ///
308 /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
309 /// according to this time zone, and then extract the hour
310 pub time_zone: Option<String>, default = Some("+00:00".into())
311
312 /// Parquet options
313 pub parquet: ParquetOptions, default = Default::default()
314
315 /// Fan-out during initial physical planning.
316 ///
317 /// This is mostly use to plan `UNION` children in parallel.
318 ///
319 /// Defaults to the number of CPU cores on the system
320 pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
321
322 /// When set to true, skips verifying that the schema produced by
323 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
324 /// schema of the input plan.
325 ///
326 /// When set to false, if the schema does not match exactly
327 /// (including nullability and metadata), a planning error will be raised.
328 ///
329 /// This is used to workaround bugs in the planner that are now caught by
330 /// the new schema verification step.
331 pub skip_physical_aggregate_schema_check: bool, default = false
332
333 /// Specifies the reserved memory for each spillable sort operation to
334 /// facilitate an in-memory merge.
335 ///
336 /// When a sort operation spills to disk, the in-memory data must be
337 /// sorted and merged before being written to a file. This setting reserves
338 /// a specific amount of memory for that in-memory sort/merge process.
339 ///
340 /// Note: This setting is irrelevant if the sort operation cannot spill
341 /// (i.e., if there's no `DiskManager` configured).
342 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
343
344 /// When sorting, below what size should data be concatenated
345 /// and sorted in a single RecordBatch rather than sorted in
346 /// batches and merged.
347 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
348
349 /// Number of files to read in parallel when inferring schema and statistics
350 pub meta_fetch_concurrency: usize, default = 32
351
352 /// Guarantees a minimum level of output files running in parallel.
353 /// RecordBatches will be distributed in round robin fashion to each
354 /// parallel writer. Each writer is closed and a new file opened once
355 /// soft_max_rows_per_output_file is reached.
356 pub minimum_parallel_output_files: usize, default = 4
357
358 /// Target number of rows in output files when writing multiple.
359 /// This is a soft max, so it can be exceeded slightly. There also
360 /// will be one file smaller than the limit if the total
361 /// number of rows written is not roughly divisible by the soft max
362 pub soft_max_rows_per_output_file: usize, default = 50000000
363
364 /// This is the maximum number of RecordBatches buffered
365 /// for each output file being worked. Higher values can potentially
366 /// give faster write performance at the cost of higher peak
367 /// memory consumption
368 pub max_buffered_batches_per_output_file: usize, default = 2
369
370 /// Should sub directories be ignored when scanning directories for data
371 /// files. Defaults to true (ignores subdirectories), consistent with
372 /// Hive. Note that this setting does not affect reading partitioned
373 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
374 pub listing_table_ignore_subdirectory: bool, default = true
375
376 /// Should DataFusion support recursive CTEs
377 pub enable_recursive_ctes: bool, default = true
378
379 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
380 /// statistics into the same file groups.
381 /// Currently experimental
382 pub split_file_groups_by_statistics: bool, default = false
383
384 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
385 pub keep_partition_by_columns: bool, default = false
386
387 /// Aggregation ratio (number of distinct groups / number of input rows)
388 /// threshold for skipping partial aggregation. If the value is greater
389 /// then partial aggregation will skip aggregation for further input
390 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
391
392 /// Number of input rows partial aggregation partition should process, before
393 /// aggregation ratio check and trying to switch to skipping aggregation mode
394 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
395
396 /// Should DataFusion use row number estimates at the input to decide
397 /// whether increasing parallelism is beneficial or not. By default,
398 /// only exact row numbers (not estimates) are used for this decision.
399 /// Setting this flag to `true` will likely produce better plans.
400 /// if the source of statistics is accurate.
401 /// We plan to make this the default in the future.
402 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
403
404 /// Should DataFusion enforce batch size in joins or not. By default,
405 /// DataFusion will not enforce batch size in joins. Enforcing batch size
406 /// in joins can reduce memory usage when joining large
407 /// tables with a highly-selective join filter, but is also slightly slower.
408 pub enforce_batch_size_in_joins: bool, default = false
409
410 /// Size (bytes) of data buffer DataFusion uses when writing output files.
411 /// This affects the size of the data chunks that are uploaded to remote
412 /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
413 /// written, it may be necessary to increase this size to avoid errors from
414 /// the remote end point.
415 pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
416 }
417}
418
419config_namespace! {
420 /// Options for reading and writing parquet files
421 ///
422 /// See also: [`SessionConfig`]
423 ///
424 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
425 pub struct ParquetOptions {
426 // The following options affect reading parquet files
427
428 /// (reading) If true, reads the Parquet data page level metadata (the
429 /// Page Index), if present, to reduce the I/O and number of
430 /// rows decoded.
431 pub enable_page_index: bool, default = true
432
433 /// (reading) If true, the parquet reader attempts to skip entire row groups based
434 /// on the predicate in the query and the metadata (min/max values) stored in
435 /// the parquet file
436 pub pruning: bool, default = true
437
438 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
439 /// the file Schema. This setting can help avoid schema conflicts when querying
440 /// multiple parquet files with schemas containing compatible types but different metadata
441 pub skip_metadata: bool, default = true
442
443 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
444 /// bytes of the parquet file optimistically. If not specified, two reads are required:
445 /// One read to fetch the 8-byte parquet footer and
446 /// another to fetch the metadata length encoded in the footer
447 pub metadata_size_hint: Option<usize>, default = None
448
449 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
450 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
451 pub pushdown_filters: bool, default = false
452
453 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
454 /// will be reordered heuristically to minimize the cost of evaluation. If false,
455 /// the filters are applied in the same order as written in the query
456 pub reorder_filters: bool, default = false
457
458 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
459 /// and `Binary/BinaryLarge` with `BinaryView`.
460 pub schema_force_view_types: bool, default = true
461
462 /// (reading) If true, parquet reader will read columns of
463 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
464 ///
465 /// Parquet files generated by some legacy writers do not correctly set
466 /// the UTF8 flag for strings, causing string columns to be loaded as
467 /// BLOB instead.
468 pub binary_as_string: bool, default = false
469
470 /// (reading) If true, parquet reader will read columns of
471 /// physical type int96 as originating from a different resolution
472 /// than nanosecond. This is useful for reading data from systems like Spark
473 /// which stores microsecond resolution timestamps in an int96 allowing it
474 /// to write values with a larger date range than 64-bit timestamps with
475 /// nanosecond resolution.
476 pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
477
478 /// (reading) Use any available bloom filters when reading parquet files
479 pub bloom_filter_on_read: bool, default = true
480
481 // The following options affect writing to parquet files
482 // and map to parquet::file::properties::WriterProperties
483
484 /// (writing) Sets best effort maximum size of data page in bytes
485 pub data_pagesize_limit: usize, default = 1024 * 1024
486
487 /// (writing) Sets write_batch_size in bytes
488 pub write_batch_size: usize, default = 1024
489
490 /// (writing) Sets parquet writer version
491 /// valid values are "1.0" and "2.0"
492 pub writer_version: String, default = "1.0".to_string()
493
494 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
495 ///
496 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
497 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
498 pub skip_arrow_metadata: bool, default = false
499
500 /// (writing) Sets default parquet compression codec.
501 /// Valid values are: uncompressed, snappy, gzip(level),
502 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
503 /// These values are not case sensitive. If NULL, uses
504 /// default parquet writer setting
505 ///
506 /// Note that this default setting is not the same as
507 /// the default parquet writer setting.
508 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
509
510 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
511 /// default parquet writer setting
512 pub dictionary_enabled: Option<bool>, default = Some(true)
513
514 /// (writing) Sets best effort maximum dictionary page size, in bytes
515 pub dictionary_page_size_limit: usize, default = 1024 * 1024
516
517 /// (writing) Sets if statistics are enabled for any column
518 /// Valid values are: "none", "chunk", and "page"
519 /// These values are not case sensitive. If NULL, uses
520 /// default parquet writer setting
521 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
522
523 /// (writing) Sets max statistics size for any column. If NULL, uses
524 /// default parquet writer setting
525 /// max_statistics_size is deprecated, currently it is not being used
526 // TODO: remove once deprecated
527 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
528 pub max_statistics_size: Option<usize>, default = Some(4096)
529
530 /// (writing) Target maximum number of rows in each row group (defaults to 1M
531 /// rows). Writing larger row groups requires more memory to write, but
532 /// can get better compression and be faster to read.
533 pub max_row_group_size: usize, default = 1024 * 1024
534
535 /// (writing) Sets "created by" property
536 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
537
538 /// (writing) Sets column index truncate length
539 pub column_index_truncate_length: Option<usize>, default = Some(64)
540
541 /// (writing) Sets statictics truncate length. If NULL, uses
542 /// default parquet writer setting
543 pub statistics_truncate_length: Option<usize>, default = None
544
545 /// (writing) Sets best effort maximum number of rows in data page
546 pub data_page_row_count_limit: usize, default = 20_000
547
548 /// (writing) Sets default encoding for any column.
549 /// Valid values are: plain, plain_dictionary, rle,
550 /// bit_packed, delta_binary_packed, delta_length_byte_array,
551 /// delta_byte_array, rle_dictionary, and byte_stream_split.
552 /// These values are not case sensitive. If NULL, uses
553 /// default parquet writer setting
554 pub encoding: Option<String>, transform = str::to_lowercase, default = None
555
556 /// (writing) Write bloom filters for all columns when creating parquet files
557 pub bloom_filter_on_write: bool, default = false
558
559 /// (writing) Sets bloom filter false positive probability. If NULL, uses
560 /// default parquet writer setting
561 pub bloom_filter_fpp: Option<f64>, default = None
562
563 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
564 /// default parquet writer setting
565 pub bloom_filter_ndv: Option<u64>, default = None
566
567 /// (writing) Controls whether DataFusion will attempt to speed up writing
568 /// parquet files by serializing them in parallel. Each column
569 /// in each row group in each output file are serialized in parallel
570 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
571 pub allow_single_file_parallelism: bool, default = true
572
573 /// (writing) By default parallel parquet writer is tuned for minimum
574 /// memory usage in a streaming execution plan. You may see
575 /// a performance benefit when writing large parquet files
576 /// by increasing maximum_parallel_row_group_writers and
577 /// maximum_buffered_record_batches_per_stream if your system
578 /// has idle cores and can tolerate additional memory usage.
579 /// Boosting these values is likely worthwhile when
580 /// writing out already in-memory data, such as from a cached
581 /// data frame.
582 pub maximum_parallel_row_group_writers: usize, default = 1
583
584 /// (writing) By default parallel parquet writer is tuned for minimum
585 /// memory usage in a streaming execution plan. You may see
586 /// a performance benefit when writing large parquet files
587 /// by increasing maximum_parallel_row_group_writers and
588 /// maximum_buffered_record_batches_per_stream if your system
589 /// has idle cores and can tolerate additional memory usage.
590 /// Boosting these values is likely worthwhile when
591 /// writing out already in-memory data, such as from a cached
592 /// data frame.
593 pub maximum_buffered_record_batches_per_stream: usize, default = 2
594 }
595}
596
597config_namespace! {
598 /// Options related to query optimization
599 ///
600 /// See also: [`SessionConfig`]
601 ///
602 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
603 pub struct OptimizerOptions {
604 /// When set to true, the optimizer will push a limit operation into
605 /// grouped aggregations which have no aggregate expressions, as a soft limit,
606 /// emitting groups once the limit is reached, before all rows in the group are read.
607 pub enable_distinct_aggregation_soft_limit: bool, default = true
608
609 /// When set to true, the physical plan optimizer will try to add round robin
610 /// repartitioning to increase parallelism to leverage more CPU cores
611 pub enable_round_robin_repartition: bool, default = true
612
613 /// When set to true, the optimizer will attempt to perform limit operations
614 /// during aggregations, if possible
615 pub enable_topk_aggregation: bool, default = true
616
617 /// When set to true, the optimizer will insert filters before a join between
618 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
619 /// filter can add additional overhead when the file format does not fully support
620 /// predicate push down.
621 pub filter_null_join_keys: bool, default = false
622
623 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
624 /// in parallel using the provided `target_partitions` level
625 pub repartition_aggregations: bool, default = true
626
627 /// Minimum total files size in bytes to perform file scan repartitioning.
628 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
629
630 /// Should DataFusion repartition data using the join keys to execute joins in parallel
631 /// using the provided `target_partitions` level
632 pub repartition_joins: bool, default = true
633
634 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
635 /// its inputs do not have any ordering or filtering If the flag is not enabled,
636 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
637 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
638 /// RightAnti, and RightSemi - being produced only at the end of the execution.
639 /// This is not typical in stream processing. Additionally, without proper design for
640 /// long runner execution, all types of joins may encounter out-of-memory errors.
641 pub allow_symmetric_joins_without_pruning: bool, default = true
642
643 /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
644 /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
645 ///
646 /// For FileSources, only Parquet and CSV formats are currently supported.
647 ///
648 /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
649 /// might be partitioned into smaller chunks) for parallel scanning.
650 /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
651 /// happen within a single file.
652 ///
653 /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
654 /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
655 /// the total number of partitions and batches per partition, but does not slice the initial
656 /// record tables provided to the MemTable on creation.
657 pub repartition_file_scans: bool, default = true
658
659 /// Should DataFusion repartition data using the partitions keys to execute window
660 /// functions in parallel using the provided `target_partitions` level
661 pub repartition_windows: bool, default = true
662
663 /// Should DataFusion execute sorts in a per-partition fashion and merge
664 /// afterwards instead of coalescing first and sorting globally.
665 /// With this flag is enabled, plans in the form below
666 ///
667 /// ```text
668 /// "SortExec: [a@0 ASC]",
669 /// " CoalescePartitionsExec",
670 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
671 /// ```
672 /// would turn into the plan below which performs better in multithreaded environments
673 ///
674 /// ```text
675 /// "SortPreservingMergeExec: [a@0 ASC]",
676 /// " SortExec: [a@0 ASC]",
677 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
678 /// ```
679 pub repartition_sorts: bool, default = true
680
681 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
682 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
683 /// using `SortPreservingMergeExec`)
684 ///
685 /// When false, DataFusion will maximize plan parallelism using
686 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
687 pub prefer_existing_sort: bool, default = false
688
689 /// When set to true, the logical plan optimizer will produce warning
690 /// messages if any optimization rules produce errors and then proceed to the next
691 /// rule. When set to false, any rules that produce errors will cause the query to fail
692 pub skip_failed_rules: bool, default = false
693
694 /// Number of times that the optimizer will attempt to optimize the plan
695 pub max_passes: usize, default = 3
696
697 /// When set to true, the physical plan optimizer will run a top down
698 /// process to reorder the join keys
699 pub top_down_join_key_reordering: bool, default = true
700
701 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
702 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
703 pub prefer_hash_join: bool, default = true
704
705 /// The maximum estimated size in bytes for one input side of a HashJoin
706 /// will be collected into a single partition
707 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
708
709 /// The maximum estimated size in rows for one input side of a HashJoin
710 /// will be collected into a single partition
711 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
712
713 /// The default filter selectivity used by Filter Statistics
714 /// when an exact selectivity cannot be determined. Valid values are
715 /// between 0 (no selectivity) and 100 (all rows are selected).
716 pub default_filter_selectivity: u8, default = 20
717
718 /// When set to true, the optimizer will not attempt to convert Union to Interleave
719 pub prefer_existing_union: bool, default = false
720
721 /// When set to true, if the returned type is a view type
722 /// then the output will be coerced to a non-view.
723 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
724 pub expand_views_at_output: bool, default = false
725 }
726}
727
728config_namespace! {
729 /// Options controlling explain output
730 ///
731 /// See also: [`SessionConfig`]
732 ///
733 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
734 pub struct ExplainOptions {
735 /// When set to true, the explain statement will only print logical plans
736 pub logical_plan_only: bool, default = false
737
738 /// When set to true, the explain statement will only print physical plans
739 pub physical_plan_only: bool, default = false
740
741 /// When set to true, the explain statement will print operator statistics
742 /// for physical plans
743 pub show_statistics: bool, default = false
744
745 /// When set to true, the explain statement will print the partition sizes
746 pub show_sizes: bool, default = true
747
748 /// When set to true, the explain statement will print schema information
749 pub show_schema: bool, default = false
750
751 /// Display format of explain. Default is "indent".
752 /// When set to "tree", it will print the plan in a tree-rendered format.
753 pub format: String, default = "indent".to_string()
754 }
755}
756
757impl ExecutionOptions {
758 /// Returns the correct parallelism based on the provided `value`.
759 /// If `value` is `"0"`, returns the default available parallelism, computed with
760 /// `get_available_parallelism`. Otherwise, returns `value`.
761 fn normalized_parallelism(value: &str) -> String {
762 if value.parse::<usize>() == Ok(0) {
763 get_available_parallelism().to_string()
764 } else {
765 value.to_owned()
766 }
767 }
768}
769
770config_namespace! {
771 /// Options controlling the format of output when printing record batches
772 /// Copies [`arrow::util::display::FormatOptions`]
773 pub struct FormatOptions {
774 /// If set to `true` any formatting errors will be written to the output
775 /// instead of being converted into a [`std::fmt::Error`]
776 pub safe: bool, default = true
777 /// Format string for nulls
778 pub null: String, default = "".into()
779 /// Date format for date arrays
780 pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
781 /// Format for DateTime arrays
782 pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
783 /// Timestamp format for timestamp arrays
784 pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
785 /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
786 pub timestamp_tz_format: Option<String>, default = None
787 /// Time format for time arrays
788 pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
789 /// Duration format. Can be either `"pretty"` or `"ISO8601"`
790 pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
791 /// Show types in visual representation batches
792 pub types_info: bool, default = false
793 }
794}
795
796impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
797 type Error = DataFusionError;
798 fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
799 let duration_format = match self.duration_format.as_str() {
800 "pretty" => arrow::util::display::DurationFormat::Pretty,
801 "iso8601" => arrow::util::display::DurationFormat::ISO8601,
802 _ => {
803 return _config_err!(
804 "Invalid duration format: {}. Valid values are pretty or iso8601",
805 self.duration_format
806 )
807 }
808 };
809
810 Ok(arrow::util::display::FormatOptions::new()
811 .with_display_error(self.safe)
812 .with_null(&self.null)
813 .with_date_format(self.date_format.as_deref())
814 .with_datetime_format(self.datetime_format.as_deref())
815 .with_timestamp_format(self.timestamp_format.as_deref())
816 .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
817 .with_time_format(self.time_format.as_deref())
818 .with_duration_format(duration_format)
819 .with_types_info(self.types_info))
820 }
821}
822
823/// A key value pair, with a corresponding description
824#[derive(Debug)]
825pub struct ConfigEntry {
826 /// A unique string to identify this config value
827 pub key: String,
828
829 /// The value if any
830 pub value: Option<String>,
831
832 /// A description of this configuration entry
833 pub description: &'static str,
834}
835
836/// Configuration options struct, able to store both built-in configuration and custom options
837#[derive(Debug, Clone, Default)]
838#[non_exhaustive]
839pub struct ConfigOptions {
840 /// Catalog options
841 pub catalog: CatalogOptions,
842 /// Execution options
843 pub execution: ExecutionOptions,
844 /// Optimizer options
845 pub optimizer: OptimizerOptions,
846 /// SQL parser options
847 pub sql_parser: SqlParserOptions,
848 /// Explain options
849 pub explain: ExplainOptions,
850 /// Optional extensions registered using [`Extensions::insert`]
851 pub extensions: Extensions,
852 /// Formatting options when printing batches
853 pub format: FormatOptions,
854}
855
856impl ConfigField for ConfigOptions {
857 fn set(&mut self, key: &str, value: &str) -> Result<()> {
858 // Extensions are handled in the public `ConfigOptions::set`
859 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
860 match key {
861 "catalog" => self.catalog.set(rem, value),
862 "execution" => self.execution.set(rem, value),
863 "optimizer" => self.optimizer.set(rem, value),
864 "explain" => self.explain.set(rem, value),
865 "sql_parser" => self.sql_parser.set(rem, value),
866 "format" => self.format.set(rem, value),
867 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
868 }
869 }
870
871 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
872 self.catalog.visit(v, "datafusion.catalog", "");
873 self.execution.visit(v, "datafusion.execution", "");
874 self.optimizer.visit(v, "datafusion.optimizer", "");
875 self.explain.visit(v, "datafusion.explain", "");
876 self.sql_parser.visit(v, "datafusion.sql_parser", "");
877 self.format.visit(v, "datafusion.format", "");
878 }
879}
880
881impl ConfigOptions {
882 /// Creates a new [`ConfigOptions`] with default values
883 pub fn new() -> Self {
884 Self::default()
885 }
886
887 /// Set extensions to provided value
888 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
889 self.extensions = extensions;
890 self
891 }
892
893 /// Set a configuration option
894 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
895 let Some((prefix, key)) = key.split_once('.') else {
896 return _config_err!("could not find config namespace for key \"{key}\"");
897 };
898
899 if prefix == "datafusion" {
900 return ConfigField::set(self, key, value);
901 }
902
903 let Some(e) = self.extensions.0.get_mut(prefix) else {
904 return _config_err!("Could not find config namespace \"{prefix}\"");
905 };
906 e.0.set(key, value)
907 }
908
909 /// Create new ConfigOptions struct, taking values from
910 /// environment variables where possible.
911 ///
912 /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
913 /// control `datafusion.execution.batch_size`.
914 pub fn from_env() -> Result<Self> {
915 struct Visitor(Vec<String>);
916
917 impl Visit for Visitor {
918 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
919 self.0.push(key.to_string())
920 }
921
922 fn none(&mut self, key: &str, _: &'static str) {
923 self.0.push(key.to_string())
924 }
925 }
926
927 // Extract the names of all fields and then look up the corresponding
928 // environment variables. This isn't hugely efficient but avoids
929 // ambiguity between `a.b` and `a_b` which would both correspond
930 // to an environment variable of `A_B`
931
932 let mut keys = Visitor(vec![]);
933 let mut ret = Self::default();
934 ret.visit(&mut keys, "datafusion", "");
935
936 for key in keys.0 {
937 let env = key.to_uppercase().replace('.', "_");
938 if let Some(var) = std::env::var_os(env) {
939 let value = var.to_string_lossy();
940 log::info!("Set {key} to {value} from the environment variable");
941 ret.set(&key, value.as_ref())?;
942 }
943 }
944
945 Ok(ret)
946 }
947
948 /// Create new ConfigOptions struct, taking values from a string hash map.
949 ///
950 /// Only the built-in configurations will be extracted from the hash map
951 /// and other key value pairs will be ignored.
952 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
953 struct Visitor(Vec<String>);
954
955 impl Visit for Visitor {
956 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
957 self.0.push(key.to_string())
958 }
959
960 fn none(&mut self, key: &str, _: &'static str) {
961 self.0.push(key.to_string())
962 }
963 }
964
965 let mut keys = Visitor(vec![]);
966 let mut ret = Self::default();
967 ret.visit(&mut keys, "datafusion", "");
968
969 for key in keys.0 {
970 if let Some(var) = settings.get(&key) {
971 ret.set(&key, var)?;
972 }
973 }
974
975 Ok(ret)
976 }
977
978 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
979 pub fn entries(&self) -> Vec<ConfigEntry> {
980 struct Visitor(Vec<ConfigEntry>);
981
982 impl Visit for Visitor {
983 fn some<V: Display>(
984 &mut self,
985 key: &str,
986 value: V,
987 description: &'static str,
988 ) {
989 self.0.push(ConfigEntry {
990 key: key.to_string(),
991 value: Some(value.to_string()),
992 description,
993 })
994 }
995
996 fn none(&mut self, key: &str, description: &'static str) {
997 self.0.push(ConfigEntry {
998 key: key.to_string(),
999 value: None,
1000 description,
1001 })
1002 }
1003 }
1004
1005 let mut v = Visitor(vec![]);
1006 self.visit(&mut v, "datafusion", "");
1007
1008 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1009 v.0
1010 }
1011
1012 /// Generate documentation that can be included in the user guide
1013 pub fn generate_config_markdown() -> String {
1014 use std::fmt::Write as _;
1015
1016 let mut s = Self::default();
1017
1018 // Normalize for display
1019 s.execution.target_partitions = 0;
1020 s.execution.planning_concurrency = 0;
1021
1022 let mut docs = "| key | default | description |\n".to_string();
1023 docs += "|-----|---------|-------------|\n";
1024 let mut entries = s.entries();
1025 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1026
1027 for entry in s.entries() {
1028 let _ = writeln!(
1029 &mut docs,
1030 "| {} | {} | {} |",
1031 entry.key,
1032 entry.value.as_deref().unwrap_or("NULL"),
1033 entry.description
1034 );
1035 }
1036 docs
1037 }
1038}
1039
1040/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1041/// within DataFusion [`ConfigOptions`]
1042///
1043/// This mechanism can be used to pass configuration to user defined functions
1044/// or optimizer passes
1045///
1046/// # Example
1047/// ```
1048/// use datafusion_common::{
1049/// config::ConfigExtension, extensions_options,
1050/// config::ConfigOptions,
1051/// };
1052/// // Define a new configuration struct using the `extensions_options` macro
1053/// extensions_options! {
1054/// /// My own config options.
1055/// pub struct MyConfig {
1056/// /// Should "foo" be replaced by "bar"?
1057/// pub foo_to_bar: bool, default = true
1058///
1059/// /// How many "baz" should be created?
1060/// pub baz_count: usize, default = 1337
1061/// }
1062/// }
1063///
1064/// impl ConfigExtension for MyConfig {
1065/// const PREFIX: &'static str = "my_config";
1066/// }
1067///
1068/// // set up config struct and register extension
1069/// let mut config = ConfigOptions::default();
1070/// config.extensions.insert(MyConfig::default());
1071///
1072/// // overwrite config default
1073/// config.set("my_config.baz_count", "42").unwrap();
1074///
1075/// // check config state
1076/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1077/// assert!(my_config.foo_to_bar,);
1078/// assert_eq!(my_config.baz_count, 42,);
1079/// ```
1080///
1081/// # Note:
1082/// Unfortunately associated constants are not currently object-safe, and so this
1083/// extends the object-safe [`ExtensionOptions`]
1084pub trait ConfigExtension: ExtensionOptions {
1085 /// Configuration namespace prefix to use
1086 ///
1087 /// All values under this will be prefixed with `$PREFIX + "."`
1088 const PREFIX: &'static str;
1089}
1090
1091/// An object-safe API for storing arbitrary configuration.
1092///
1093/// See [`ConfigExtension`] for user defined configuration
1094pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1095 /// Return `self` as [`Any`]
1096 ///
1097 /// This is needed until trait upcasting is stabilized
1098 fn as_any(&self) -> &dyn Any;
1099
1100 /// Return `self` as [`Any`]
1101 ///
1102 /// This is needed until trait upcasting is stabilized
1103 fn as_any_mut(&mut self) -> &mut dyn Any;
1104
1105 /// Return a deep clone of this [`ExtensionOptions`]
1106 ///
1107 /// It is important this does not share mutable state to avoid consistency issues
1108 /// with configuration changing whilst queries are executing
1109 fn cloned(&self) -> Box<dyn ExtensionOptions>;
1110
1111 /// Set the given `key`, `value` pair
1112 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1113
1114 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1115 fn entries(&self) -> Vec<ConfigEntry>;
1116}
1117
1118/// A type-safe container for [`ConfigExtension`]
1119#[derive(Debug, Default, Clone)]
1120pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1121
1122impl Extensions {
1123 /// Create a new, empty [`Extensions`]
1124 pub fn new() -> Self {
1125 Self(BTreeMap::new())
1126 }
1127
1128 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1129 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1130 assert_ne!(T::PREFIX, "datafusion");
1131 let e = ExtensionBox(Box::new(extension));
1132 self.0.insert(T::PREFIX, e);
1133 }
1134
1135 /// Retrieves the extension of the given type if any
1136 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1137 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1138 }
1139
1140 /// Retrieves the extension of the given type if any
1141 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1142 let e = self.0.get_mut(T::PREFIX)?;
1143 e.0.as_any_mut().downcast_mut()
1144 }
1145}
1146
1147#[derive(Debug)]
1148struct ExtensionBox(Box<dyn ExtensionOptions>);
1149
1150impl Clone for ExtensionBox {
1151 fn clone(&self) -> Self {
1152 Self(self.0.cloned())
1153 }
1154}
1155
1156/// A trait implemented by `config_namespace` and for field types that provides
1157/// the ability to walk and mutate the configuration tree
1158pub trait ConfigField {
1159 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1160
1161 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1162}
1163
1164impl<F: ConfigField + Default> ConfigField for Option<F> {
1165 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1166 match self {
1167 Some(s) => s.visit(v, key, description),
1168 None => v.none(key, description),
1169 }
1170 }
1171
1172 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1173 self.get_or_insert_with(Default::default).set(key, value)
1174 }
1175}
1176
1177fn default_transform<T>(input: &str) -> Result<T>
1178where
1179 T: FromStr,
1180 <T as FromStr>::Err: Sync + Send + Error + 'static,
1181{
1182 input.parse().map_err(|e| {
1183 DataFusionError::Context(
1184 format!(
1185 "Error parsing '{}' as {}",
1186 input,
1187 std::any::type_name::<T>()
1188 ),
1189 Box::new(DataFusionError::External(Box::new(e))),
1190 )
1191 })
1192}
1193
1194#[macro_export]
1195macro_rules! config_field {
1196 ($t:ty) => {
1197 config_field!($t, value => default_transform(value)?);
1198 };
1199
1200 ($t:ty, $arg:ident => $transform:expr) => {
1201 impl ConfigField for $t {
1202 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1203 v.some(key, self, description)
1204 }
1205
1206 fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
1207 *self = $transform;
1208 Ok(())
1209 }
1210 }
1211 };
1212}
1213
1214config_field!(String);
1215config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
1216config_field!(usize);
1217config_field!(f64);
1218config_field!(u64);
1219
1220impl ConfigField for u8 {
1221 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1222 v.some(key, self, description)
1223 }
1224
1225 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1226 if value.is_empty() {
1227 return Err(DataFusionError::Configuration(format!(
1228 "Input string for {key} key is empty"
1229 )));
1230 }
1231 // Check if the string is a valid number
1232 if let Ok(num) = value.parse::<u8>() {
1233 // TODO: Let's decide how we treat the numerical strings.
1234 *self = num;
1235 } else {
1236 let bytes = value.as_bytes();
1237 // Check if the first character is ASCII (single byte)
1238 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1239 return Err(DataFusionError::Configuration(format!(
1240 "Error parsing {value} as u8. Non-ASCII string provided"
1241 )));
1242 }
1243 *self = bytes[0];
1244 }
1245 Ok(())
1246 }
1247}
1248
1249impl ConfigField for CompressionTypeVariant {
1250 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1251 v.some(key, self, description)
1252 }
1253
1254 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1255 *self = CompressionTypeVariant::from_str(value)?;
1256 Ok(())
1257 }
1258}
1259
1260/// An implementation trait used to recursively walk configuration
1261pub trait Visit {
1262 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1263
1264 fn none(&mut self, key: &str, description: &'static str);
1265}
1266
1267/// Convenience macro to create [`ExtensionsOptions`].
1268///
1269/// The created structure implements the following traits:
1270///
1271/// - [`Clone`]
1272/// - [`Debug`]
1273/// - [`Default`]
1274/// - [`ExtensionOptions`]
1275///
1276/// # Usage
1277/// The syntax is:
1278///
1279/// ```text
1280/// extensions_options! {
1281/// /// Struct docs (optional).
1282/// [<vis>] struct <StructName> {
1283/// /// Field docs (optional)
1284/// [<vis>] <field_name>: <field_type>, default = <default_value>
1285///
1286/// ... more fields
1287/// }
1288/// }
1289/// ```
1290///
1291/// The placeholders are:
1292/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1293/// - `<StructName>`: Struct name like `MyStruct`.
1294/// - `<field_name>`: Field name like `my_field`.
1295/// - `<field_type>`: Field type like `u8`.
1296/// - `<default_value>`: Default value matching the field type like `42`.
1297///
1298/// # Example
1299/// See also a full example on the [`ConfigExtension`] documentation
1300///
1301/// ```
1302/// use datafusion_common::extensions_options;
1303///
1304/// extensions_options! {
1305/// /// My own config options.
1306/// pub struct MyConfig {
1307/// /// Should "foo" be replaced by "bar"?
1308/// pub foo_to_bar: bool, default = true
1309///
1310/// /// How many "baz" should be created?
1311/// pub baz_count: usize, default = 1337
1312/// }
1313/// }
1314/// ```
1315///
1316///
1317/// [`Debug`]: std::fmt::Debug
1318/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1319#[macro_export]
1320macro_rules! extensions_options {
1321 (
1322 $(#[doc = $struct_d:tt])*
1323 $vis:vis struct $struct_name:ident {
1324 $(
1325 $(#[doc = $d:tt])*
1326 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1327 )*$(,)*
1328 }
1329 ) => {
1330 $(#[doc = $struct_d])*
1331 #[derive(Debug, Clone)]
1332 #[non_exhaustive]
1333 $vis struct $struct_name{
1334 $(
1335 $(#[doc = $d])*
1336 $field_vis $field_name : $field_type,
1337 )*
1338 }
1339
1340 impl Default for $struct_name {
1341 fn default() -> Self {
1342 Self {
1343 $($field_name: $default),*
1344 }
1345 }
1346 }
1347
1348 impl $crate::config::ExtensionOptions for $struct_name {
1349 fn as_any(&self) -> &dyn ::std::any::Any {
1350 self
1351 }
1352
1353 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1354 self
1355 }
1356
1357 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1358 Box::new(self.clone())
1359 }
1360
1361 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1362 $crate::config::ConfigField::set(self, key, value)
1363 }
1364
1365 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1366 struct Visitor(Vec<$crate::config::ConfigEntry>);
1367
1368 impl $crate::config::Visit for Visitor {
1369 fn some<V: std::fmt::Display>(
1370 &mut self,
1371 key: &str,
1372 value: V,
1373 description: &'static str,
1374 ) {
1375 self.0.push($crate::config::ConfigEntry {
1376 key: key.to_string(),
1377 value: Some(value.to_string()),
1378 description,
1379 })
1380 }
1381
1382 fn none(&mut self, key: &str, description: &'static str) {
1383 self.0.push($crate::config::ConfigEntry {
1384 key: key.to_string(),
1385 value: None,
1386 description,
1387 })
1388 }
1389 }
1390
1391 let mut v = Visitor(vec![]);
1392 // The prefix is not used for extensions.
1393 // The description is generated in ConfigField::visit.
1394 // We can just pass empty strings here.
1395 $crate::config::ConfigField::visit(self, &mut v, "", "");
1396 v.0
1397 }
1398 }
1399
1400 impl $crate::config::ConfigField for $struct_name {
1401 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1402 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1403 match key {
1404 $(
1405 stringify!($field_name) => {
1406 // Safely apply deprecated attribute if present
1407 // $(#[allow(deprecated)])?
1408 {
1409 #[allow(deprecated)]
1410 self.$field_name.set(rem, value.as_ref())
1411 }
1412 },
1413 )*
1414 _ => return $crate::error::_config_err!(
1415 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1416 )
1417 }
1418 }
1419
1420 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1421 $(
1422 let key = stringify!($field_name).to_string();
1423 let desc = concat!($($d),*).trim();
1424 #[allow(deprecated)]
1425 self.$field_name.visit(v, key.as_str(), desc);
1426 )*
1427 }
1428 }
1429 }
1430}
1431
1432/// These file types have special built in behavior for configuration.
1433/// Use TableOptions::Extensions for configuring other file types.
1434#[derive(Debug, Clone)]
1435pub enum ConfigFileType {
1436 CSV,
1437 #[cfg(feature = "parquet")]
1438 PARQUET,
1439 JSON,
1440}
1441
1442/// Represents the configuration options available for handling different table formats within a data processing application.
1443/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1444/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1445#[derive(Debug, Clone, Default)]
1446pub struct TableOptions {
1447 /// Configuration options for CSV file handling. This includes settings like the delimiter,
1448 /// quote character, and whether the first row is considered as headers.
1449 pub csv: CsvOptions,
1450
1451 /// Configuration options for Parquet file handling. This includes settings for compression,
1452 /// encoding, and other Parquet-specific file characteristics.
1453 pub parquet: TableParquetOptions,
1454
1455 /// Configuration options for JSON file handling.
1456 pub json: JsonOptions,
1457
1458 /// The current file format that the table operations should assume. This option allows
1459 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1460 pub current_format: Option<ConfigFileType>,
1461
1462 /// Optional extensions that can be used to extend or customize the behavior of the table
1463 /// options. Extensions can be registered using `Extensions::insert` and might include
1464 /// custom file handling logic, additional configuration parameters, or other enhancements.
1465 pub extensions: Extensions,
1466}
1467
1468impl ConfigField for TableOptions {
1469 /// Visits configuration settings for the current file format, or all formats if none is selected.
1470 ///
1471 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1472 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1473 /// it visits all available format settings.
1474 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1475 if let Some(file_type) = &self.current_format {
1476 match file_type {
1477 #[cfg(feature = "parquet")]
1478 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1479 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1480 ConfigFileType::JSON => self.json.visit(v, "format", ""),
1481 }
1482 } else {
1483 self.csv.visit(v, "csv", "");
1484 self.parquet.visit(v, "parquet", "");
1485 self.json.visit(v, "json", "");
1486 }
1487 }
1488
1489 /// Sets a configuration value for a specific key within `TableOptions`.
1490 ///
1491 /// This method delegates setting configuration values to the specific file format configurations,
1492 /// based on the current format selected. If no format is selected, it returns an error.
1493 ///
1494 /// # Parameters
1495 ///
1496 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1497 /// for CSV format.
1498 /// * `value`: The value to set for the specified configuration key.
1499 ///
1500 /// # Returns
1501 ///
1502 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1503 /// or if setting the configuration value fails for the specific format.
1504 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1505 // Extensions are handled in the public `ConfigOptions::set`
1506 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1507 match key {
1508 "format" => {
1509 let Some(format) = &self.current_format else {
1510 return _config_err!("Specify a format for TableOptions");
1511 };
1512 match format {
1513 #[cfg(feature = "parquet")]
1514 ConfigFileType::PARQUET => self.parquet.set(rem, value),
1515 ConfigFileType::CSV => self.csv.set(rem, value),
1516 ConfigFileType::JSON => self.json.set(rem, value),
1517 }
1518 }
1519 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1520 }
1521 }
1522}
1523
1524impl TableOptions {
1525 /// Constructs a new instance of `TableOptions` with default settings.
1526 ///
1527 /// # Returns
1528 ///
1529 /// A new `TableOptions` instance with default configuration values.
1530 pub fn new() -> Self {
1531 Self::default()
1532 }
1533
1534 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1535 ///
1536 /// # Parameters
1537 ///
1538 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1539 ///
1540 /// # Returns
1541 ///
1542 /// A new `TableOptions` instance with settings applied from the session config.
1543 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1544 let initial = TableOptions::default();
1545 initial.combine_with_session_config(config)
1546 }
1547
1548 /// Updates the current `TableOptions` with settings from a given session config.
1549 ///
1550 /// # Parameters
1551 ///
1552 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1553 ///
1554 /// # Returns
1555 ///
1556 /// A new `TableOptions` instance with updated settings from the session config.
1557 #[must_use = "this method returns a new instance"]
1558 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1559 let mut clone = self.clone();
1560 clone.parquet.global = config.execution.parquet.clone();
1561 clone
1562 }
1563
1564 /// Sets the file format for the table.
1565 ///
1566 /// # Parameters
1567 ///
1568 /// * `format`: The file format to use (e.g., CSV, Parquet).
1569 pub fn set_config_format(&mut self, format: ConfigFileType) {
1570 self.current_format = Some(format);
1571 }
1572
1573 /// Sets the extensions for this `TableOptions` instance.
1574 ///
1575 /// # Parameters
1576 ///
1577 /// * `extensions`: The `Extensions` instance to set.
1578 ///
1579 /// # Returns
1580 ///
1581 /// A new `TableOptions` instance with the specified extensions applied.
1582 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1583 self.extensions = extensions;
1584 self
1585 }
1586
1587 /// Sets a specific configuration option.
1588 ///
1589 /// # Parameters
1590 ///
1591 /// * `key`: The configuration key (e.g., "format.delimiter").
1592 /// * `value`: The value to set for the specified key.
1593 ///
1594 /// # Returns
1595 ///
1596 /// A result indicating success or failure in setting the configuration option.
1597 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1598 let Some((prefix, _)) = key.split_once('.') else {
1599 return _config_err!("could not find config namespace for key \"{key}\"");
1600 };
1601
1602 if prefix == "format" {
1603 return ConfigField::set(self, key, value);
1604 }
1605
1606 if prefix == "execution" {
1607 return Ok(());
1608 }
1609
1610 let Some(e) = self.extensions.0.get_mut(prefix) else {
1611 return _config_err!("Could not find config namespace \"{prefix}\"");
1612 };
1613 e.0.set(key, value)
1614 }
1615
1616 /// Initializes a new `TableOptions` from a hash map of string settings.
1617 ///
1618 /// # Parameters
1619 ///
1620 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1621 ///
1622 /// # Returns
1623 ///
1624 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1625 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1626 let mut ret = Self::default();
1627 for (k, v) in settings {
1628 ret.set(k, v)?;
1629 }
1630
1631 Ok(ret)
1632 }
1633
1634 /// Modifies the current `TableOptions` instance with settings from a hash map.
1635 ///
1636 /// # Parameters
1637 ///
1638 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1639 ///
1640 /// # Returns
1641 ///
1642 /// A result indicating success or failure in applying the settings.
1643 pub fn alter_with_string_hash_map(
1644 &mut self,
1645 settings: &HashMap<String, String>,
1646 ) -> Result<()> {
1647 for (k, v) in settings {
1648 self.set(k, v)?;
1649 }
1650 Ok(())
1651 }
1652
1653 /// Retrieves all configuration entries from this `TableOptions`.
1654 ///
1655 /// # Returns
1656 ///
1657 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1658 pub fn entries(&self) -> Vec<ConfigEntry> {
1659 struct Visitor(Vec<ConfigEntry>);
1660
1661 impl Visit for Visitor {
1662 fn some<V: Display>(
1663 &mut self,
1664 key: &str,
1665 value: V,
1666 description: &'static str,
1667 ) {
1668 self.0.push(ConfigEntry {
1669 key: key.to_string(),
1670 value: Some(value.to_string()),
1671 description,
1672 })
1673 }
1674
1675 fn none(&mut self, key: &str, description: &'static str) {
1676 self.0.push(ConfigEntry {
1677 key: key.to_string(),
1678 value: None,
1679 description,
1680 })
1681 }
1682 }
1683
1684 let mut v = Visitor(vec![]);
1685 self.visit(&mut v, "format", "");
1686
1687 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1688 v.0
1689 }
1690}
1691
1692/// Options that control how Parquet files are read, including global options
1693/// that apply to all columns and optional column-specific overrides
1694///
1695/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1696/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1697/// (e.g. sorting_columns).
1698#[derive(Clone, Default, Debug, PartialEq)]
1699pub struct TableParquetOptions {
1700 /// Global Parquet options that propagates to all columns.
1701 pub global: ParquetOptions,
1702 /// Column specific options. Default usage is parquet.XX::column.
1703 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1704 /// Additional file-level metadata to include. Inserted into the key_value_metadata
1705 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1706 ///
1707 /// Multiple entries are permitted
1708 /// ```sql
1709 /// OPTIONS (
1710 /// 'format.metadata::key1' '',
1711 /// 'format.metadata::key2' 'value',
1712 /// 'format.metadata::key3' 'value has spaces',
1713 /// 'format.metadata::key4' 'value has special chars :: :',
1714 /// 'format.metadata::key_dupe' 'original will be overwritten',
1715 /// 'format.metadata::key_dupe' 'final'
1716 /// )
1717 /// ```
1718 pub key_value_metadata: HashMap<String, Option<String>>,
1719}
1720
1721impl TableParquetOptions {
1722 /// Return new default TableParquetOptions
1723 pub fn new() -> Self {
1724 Self::default()
1725 }
1726
1727 /// Set whether the encoding of the arrow metadata should occur
1728 /// during the writing of parquet.
1729 ///
1730 /// Default is to encode the arrow schema in the file kv_metadata.
1731 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1732 Self {
1733 global: ParquetOptions {
1734 skip_arrow_metadata: skip,
1735 ..self.global
1736 },
1737 ..self
1738 }
1739 }
1740}
1741
1742impl ConfigField for TableParquetOptions {
1743 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1744 self.global.visit(v, key_prefix, description);
1745 self.column_specific_options
1746 .visit(v, key_prefix, description)
1747 }
1748
1749 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1750 // Determine if the key is a global, metadata, or column-specific setting
1751 if key.starts_with("metadata::") {
1752 let k = match key.split("::").collect::<Vec<_>>()[..] {
1753 [_meta] | [_meta, ""] => {
1754 return _config_err!(
1755 "Invalid metadata key provided, missing key in metadata::<key>"
1756 )
1757 }
1758 [_meta, k] => k.into(),
1759 _ => {
1760 return _config_err!(
1761 "Invalid metadata key provided, found too many '::' in \"{key}\""
1762 )
1763 }
1764 };
1765 self.key_value_metadata.insert(k, Some(value.into()));
1766 Ok(())
1767 } else if key.contains("::") {
1768 self.column_specific_options.set(key, value)
1769 } else {
1770 self.global.set(key, value)
1771 }
1772 }
1773}
1774
1775macro_rules! config_namespace_with_hashmap {
1776 (
1777 $(#[doc = $struct_d:tt])*
1778 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
1779 $vis:vis struct $struct_name:ident {
1780 $(
1781 $(#[doc = $d:tt])*
1782 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1783 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1784 )*$(,)*
1785 }
1786 ) => {
1787
1788 $(#[doc = $struct_d])*
1789 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
1790 #[derive(Debug, Clone, PartialEq)]
1791 $vis struct $struct_name{
1792 $(
1793 $(#[doc = $d])*
1794 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1795 $field_vis $field_name : $field_type,
1796 )*
1797 }
1798
1799 impl ConfigField for $struct_name {
1800 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1801 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1802 match key {
1803 $(
1804 stringify!($field_name) => {
1805 // Handle deprecated fields
1806 #[allow(deprecated)] // Allow deprecated fields
1807 $(let value = $transform(value);)?
1808 self.$field_name.set(rem, value.as_ref())
1809 },
1810 )*
1811 _ => _config_err!(
1812 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1813 )
1814 }
1815 }
1816
1817 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1818 $(
1819 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1820 let desc = concat!($($d),*).trim();
1821 // Handle deprecated fields
1822 #[allow(deprecated)]
1823 self.$field_name.visit(v, key.as_str(), desc);
1824 )*
1825 }
1826 }
1827
1828 impl Default for $struct_name {
1829 fn default() -> Self {
1830 #[allow(deprecated)]
1831 Self {
1832 $($field_name: $default),*
1833 }
1834 }
1835 }
1836
1837 impl ConfigField for HashMap<String,$struct_name> {
1838 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1839 let parts: Vec<&str> = key.splitn(2, "::").collect();
1840 match parts.as_slice() {
1841 [inner_key, hashmap_key] => {
1842 // Get or create the struct for the specified key
1843 let inner_value = self
1844 .entry((*hashmap_key).to_owned())
1845 .or_insert_with($struct_name::default);
1846
1847 inner_value.set(inner_key, value)
1848 }
1849 _ => _config_err!("Unrecognized key '{key}'."),
1850 }
1851 }
1852
1853 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1854 for (column_name, col_options) in self {
1855 $(
1856 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1857 let desc = concat!($($d),*).trim();
1858 #[allow(deprecated)]
1859 col_options.$field_name.visit(v, key.as_str(), desc);
1860 )*
1861 }
1862 }
1863 }
1864 }
1865}
1866
1867config_namespace_with_hashmap! {
1868 /// Options controlling parquet format for individual columns.
1869 ///
1870 /// See [`ParquetOptions`] for more details
1871 pub struct ParquetColumnOptions {
1872 /// Sets if bloom filter is enabled for the column path.
1873 pub bloom_filter_enabled: Option<bool>, default = None
1874
1875 /// Sets encoding for the column path.
1876 /// Valid values are: plain, plain_dictionary, rle,
1877 /// bit_packed, delta_binary_packed, delta_length_byte_array,
1878 /// delta_byte_array, rle_dictionary, and byte_stream_split.
1879 /// These values are not case-sensitive. If NULL, uses
1880 /// default parquet options
1881 pub encoding: Option<String>, default = None
1882
1883 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
1884 /// default parquet options
1885 pub dictionary_enabled: Option<bool>, default = None
1886
1887 /// Sets default parquet compression codec for the column path.
1888 /// Valid values are: uncompressed, snappy, gzip(level),
1889 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
1890 /// These values are not case-sensitive. If NULL, uses
1891 /// default parquet options
1892 pub compression: Option<String>, transform = str::to_lowercase, default = None
1893
1894 /// Sets if statistics are enabled for the column
1895 /// Valid values are: "none", "chunk", and "page"
1896 /// These values are not case sensitive. If NULL, uses
1897 /// default parquet options
1898 pub statistics_enabled: Option<String>, default = None
1899
1900 /// Sets bloom filter false positive probability for the column path. If NULL, uses
1901 /// default parquet options
1902 pub bloom_filter_fpp: Option<f64>, default = None
1903
1904 /// Sets bloom filter number of distinct values. If NULL, uses
1905 /// default parquet options
1906 pub bloom_filter_ndv: Option<u64>, default = None
1907
1908 /// Sets max statistics size for the column path. If NULL, uses
1909 /// default parquet options
1910 /// max_statistics_size is deprecated, currently it is not being used
1911 // TODO: remove once deprecated
1912 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
1913 pub max_statistics_size: Option<usize>, default = None
1914 }
1915}
1916
1917config_namespace! {
1918 /// Options controlling CSV format
1919 pub struct CsvOptions {
1920 /// Specifies whether there is a CSV header (i.e. the first line
1921 /// consists of is column names). The value `None` indicates that
1922 /// the configuration should be consulted.
1923 pub has_header: Option<bool>, default = None
1924 pub delimiter: u8, default = b','
1925 pub quote: u8, default = b'"'
1926 pub terminator: Option<u8>, default = None
1927 pub escape: Option<u8>, default = None
1928 pub double_quote: Option<bool>, default = None
1929 /// Specifies whether newlines in (quoted) values are supported.
1930 ///
1931 /// Parsing newlines in quoted values may be affected by execution behaviour such as
1932 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1933 /// parsed successfully, which may reduce performance.
1934 ///
1935 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1936 pub newlines_in_values: Option<bool>, default = None
1937 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1938 pub schema_infer_max_rec: Option<usize>, default = None
1939 pub date_format: Option<String>, default = None
1940 pub datetime_format: Option<String>, default = None
1941 pub timestamp_format: Option<String>, default = None
1942 pub timestamp_tz_format: Option<String>, default = None
1943 pub time_format: Option<String>, default = None
1944 // The output format for Nulls in the CSV writer.
1945 pub null_value: Option<String>, default = None
1946 // The input regex for Nulls when loading CSVs.
1947 pub null_regex: Option<String>, default = None
1948 pub comment: Option<u8>, default = None
1949 }
1950}
1951
1952impl CsvOptions {
1953 /// Set a limit in terms of records to scan to infer the schema
1954 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1955 pub fn with_compression(
1956 mut self,
1957 compression_type_variant: CompressionTypeVariant,
1958 ) -> Self {
1959 self.compression = compression_type_variant;
1960 self
1961 }
1962
1963 /// Set a limit in terms of records to scan to infer the schema
1964 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1965 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
1966 self.schema_infer_max_rec = Some(max_rec);
1967 self
1968 }
1969
1970 /// Set true to indicate that the first line is a header.
1971 /// - default to true
1972 pub fn with_has_header(mut self, has_header: bool) -> Self {
1973 self.has_header = Some(has_header);
1974 self
1975 }
1976
1977 /// Returns true if the first line is a header. If format options does not
1978 /// specify whether there is a header, returns `None` (indicating that the
1979 /// configuration should be consulted).
1980 pub fn has_header(&self) -> Option<bool> {
1981 self.has_header
1982 }
1983
1984 /// The character separating values within a row.
1985 /// - default to ','
1986 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
1987 self.delimiter = delimiter;
1988 self
1989 }
1990
1991 /// The quote character in a row.
1992 /// - default to '"'
1993 pub fn with_quote(mut self, quote: u8) -> Self {
1994 self.quote = quote;
1995 self
1996 }
1997
1998 /// The character that terminates a row.
1999 /// - default to None (CRLF)
2000 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2001 self.terminator = terminator;
2002 self
2003 }
2004
2005 /// The escape character in a row.
2006 /// - default is None
2007 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2008 self.escape = escape;
2009 self
2010 }
2011
2012 /// Set true to indicate that the CSV quotes should be doubled.
2013 /// - default to true
2014 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2015 self.double_quote = Some(double_quote);
2016 self
2017 }
2018
2019 /// Specifies whether newlines in (quoted) values are supported.
2020 ///
2021 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2022 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2023 /// parsed successfully, which may reduce performance.
2024 ///
2025 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2026 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2027 self.newlines_in_values = Some(newlines_in_values);
2028 self
2029 }
2030
2031 /// Set a `CompressionTypeVariant` of CSV
2032 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2033 pub fn with_file_compression_type(
2034 mut self,
2035 compression: CompressionTypeVariant,
2036 ) -> Self {
2037 self.compression = compression;
2038 self
2039 }
2040
2041 /// The delimiter character.
2042 pub fn delimiter(&self) -> u8 {
2043 self.delimiter
2044 }
2045
2046 /// The quote character.
2047 pub fn quote(&self) -> u8 {
2048 self.quote
2049 }
2050
2051 /// The terminator character.
2052 pub fn terminator(&self) -> Option<u8> {
2053 self.terminator
2054 }
2055
2056 /// The escape character.
2057 pub fn escape(&self) -> Option<u8> {
2058 self.escape
2059 }
2060}
2061
2062config_namespace! {
2063 /// Options controlling JSON format
2064 pub struct JsonOptions {
2065 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2066 pub schema_infer_max_rec: Option<usize>, default = None
2067 }
2068}
2069
2070pub trait OutputFormatExt: Display {}
2071
2072#[derive(Debug, Clone, PartialEq)]
2073#[allow(clippy::large_enum_variant)]
2074pub enum OutputFormat {
2075 CSV(CsvOptions),
2076 JSON(JsonOptions),
2077 #[cfg(feature = "parquet")]
2078 PARQUET(TableParquetOptions),
2079 AVRO,
2080 ARROW,
2081}
2082
2083impl Display for OutputFormat {
2084 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2085 let out = match self {
2086 OutputFormat::CSV(_) => "csv",
2087 OutputFormat::JSON(_) => "json",
2088 #[cfg(feature = "parquet")]
2089 OutputFormat::PARQUET(_) => "parquet",
2090 OutputFormat::AVRO => "avro",
2091 OutputFormat::ARROW => "arrow",
2092 };
2093 write!(f, "{out}")
2094 }
2095}
2096
2097#[cfg(test)]
2098mod tests {
2099 use std::any::Any;
2100 use std::collections::HashMap;
2101
2102 use crate::config::{
2103 ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2104 Extensions, TableOptions,
2105 };
2106
2107 #[derive(Default, Debug, Clone)]
2108 pub struct TestExtensionConfig {
2109 /// Should "foo" be replaced by "bar"?
2110 pub properties: HashMap<String, String>,
2111 }
2112
2113 impl ExtensionOptions for TestExtensionConfig {
2114 fn as_any(&self) -> &dyn Any {
2115 self
2116 }
2117
2118 fn as_any_mut(&mut self) -> &mut dyn Any {
2119 self
2120 }
2121
2122 fn cloned(&self) -> Box<dyn ExtensionOptions> {
2123 Box::new(self.clone())
2124 }
2125
2126 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2127 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2128 assert_eq!(key, "test");
2129 self.properties.insert(rem.to_owned(), value.to_owned());
2130 Ok(())
2131 }
2132
2133 fn entries(&self) -> Vec<ConfigEntry> {
2134 self.properties
2135 .iter()
2136 .map(|(k, v)| ConfigEntry {
2137 key: k.into(),
2138 value: Some(v.into()),
2139 description: "",
2140 })
2141 .collect()
2142 }
2143 }
2144
2145 impl ConfigExtension for TestExtensionConfig {
2146 const PREFIX: &'static str = "test";
2147 }
2148
2149 #[test]
2150 fn create_table_config() {
2151 let mut extension = Extensions::new();
2152 extension.insert(TestExtensionConfig::default());
2153 let table_config = TableOptions::new().with_extensions(extension);
2154 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2155 assert!(kafka_config.is_some())
2156 }
2157
2158 #[test]
2159 fn alter_test_extension_config() {
2160 let mut extension = Extensions::new();
2161 extension.insert(TestExtensionConfig::default());
2162 let mut table_config = TableOptions::new().with_extensions(extension);
2163 table_config.set_config_format(ConfigFileType::CSV);
2164 table_config.set("format.delimiter", ";").unwrap();
2165 assert_eq!(table_config.csv.delimiter, b';');
2166 table_config.set("test.bootstrap.servers", "asd").unwrap();
2167 let kafka_config = table_config
2168 .extensions
2169 .get::<TestExtensionConfig>()
2170 .unwrap();
2171 assert_eq!(
2172 kafka_config.properties.get("bootstrap.servers").unwrap(),
2173 "asd"
2174 );
2175 }
2176
2177 #[test]
2178 fn csv_u8_table_options() {
2179 let mut table_config = TableOptions::new();
2180 table_config.set_config_format(ConfigFileType::CSV);
2181 table_config.set("format.delimiter", ";").unwrap();
2182 assert_eq!(table_config.csv.delimiter as char, ';');
2183 table_config.set("format.escape", "\"").unwrap();
2184 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2185 table_config.set("format.escape", "\'").unwrap();
2186 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2187 }
2188
2189 #[test]
2190 fn warning_only_not_default() {
2191 use std::sync::atomic::AtomicUsize;
2192 static COUNT: AtomicUsize = AtomicUsize::new(0);
2193 use log::{Level, LevelFilter, Metadata, Record};
2194 struct SimpleLogger;
2195 impl log::Log for SimpleLogger {
2196 fn enabled(&self, metadata: &Metadata) -> bool {
2197 metadata.level() <= Level::Info
2198 }
2199
2200 fn log(&self, record: &Record) {
2201 if self.enabled(record.metadata()) {
2202 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2203 }
2204 }
2205 fn flush(&self) {}
2206 }
2207 log::set_logger(&SimpleLogger).unwrap();
2208 log::set_max_level(LevelFilter::Info);
2209 let mut sql_parser_options = crate::config::SqlParserOptions::default();
2210 sql_parser_options
2211 .set("enable_options_value_normalization", "false")
2212 .unwrap();
2213 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2214 sql_parser_options
2215 .set("enable_options_value_normalization", "true")
2216 .unwrap();
2217 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2218 }
2219
2220 #[cfg(feature = "parquet")]
2221 #[test]
2222 fn parquet_table_options() {
2223 let mut table_config = TableOptions::new();
2224 table_config.set_config_format(ConfigFileType::PARQUET);
2225 table_config
2226 .set("format.bloom_filter_enabled::col1", "true")
2227 .unwrap();
2228 assert_eq!(
2229 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2230 Some(true)
2231 );
2232 }
2233
2234 #[cfg(feature = "parquet")]
2235 #[test]
2236 fn parquet_table_options_config_entry() {
2237 let mut table_config = TableOptions::new();
2238 table_config.set_config_format(ConfigFileType::PARQUET);
2239 table_config
2240 .set("format.bloom_filter_enabled::col1", "true")
2241 .unwrap();
2242 let entries = table_config.entries();
2243 assert!(entries
2244 .iter()
2245 .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2246 }
2247
2248 #[cfg(feature = "parquet")]
2249 #[test]
2250 fn parquet_table_options_config_metadata_entry() {
2251 let mut table_config = TableOptions::new();
2252 table_config.set_config_format(ConfigFileType::PARQUET);
2253 table_config.set("format.metadata::key1", "").unwrap();
2254 table_config.set("format.metadata::key2", "value2").unwrap();
2255 table_config
2256 .set("format.metadata::key3", "value with spaces ")
2257 .unwrap();
2258 table_config
2259 .set("format.metadata::key4", "value with special chars :: :")
2260 .unwrap();
2261
2262 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2263 assert_eq!(parsed_metadata.get("should not exist1"), None);
2264 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2265 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2266 assert_eq!(
2267 parsed_metadata.get("key3"),
2268 Some(&Some("value with spaces ".into()))
2269 );
2270 assert_eq!(
2271 parsed_metadata.get("key4"),
2272 Some(&Some("value with special chars :: :".into()))
2273 );
2274
2275 // duplicate keys are overwritten
2276 table_config.set("format.metadata::key_dupe", "A").unwrap();
2277 table_config.set("format.metadata::key_dupe", "B").unwrap();
2278 let parsed_metadata = table_config.parquet.key_value_metadata;
2279 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2280 }
2281}