datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use crate::error::_config_err;
21use crate::parsers::CompressionTypeVariant;
22use crate::utils::get_available_parallelism;
23use crate::{DataFusionError, Result};
24use std::any::Any;
25use std::collections::{BTreeMap, HashMap};
26use std::error::Error;
27use std::fmt::{self, Display};
28use std::str::FromStr;
29
30/// A macro that wraps a configuration struct and automatically derives
31/// [`Default`] and [`ConfigField`] for it, allowing it to be used
32/// in the [`ConfigOptions`] configuration tree.
33///
34/// `transform` is used to normalize values before parsing.
35///
36/// For example,
37///
38/// ```ignore
39/// config_namespace! {
40///    /// Amazing config
41///    pub struct MyConfig {
42///        /// Field 1 doc
43///        field1: String, transform = str::to_lowercase, default = "".to_string()
44///
45///        /// Field 2 doc
46///        field2: usize, default = 232
47///
48///        /// Field 3 doc
49///        field3: Option<usize>, default = None
50///    }
51///}
52/// ```
53///
54/// Will generate
55///
56/// ```ignore
57/// /// Amazing config
58/// #[derive(Debug, Clone)]
59/// #[non_exhaustive]
60/// pub struct MyConfig {
61///     /// Field 1 doc
62///     field1: String,
63///     /// Field 2 doc
64///     field2: usize,
65///     /// Field 3 doc
66///     field3: Option<usize>,
67/// }
68/// impl ConfigField for MyConfig {
69///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
70///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
71///         match key {
72///             "field1" => {
73///                 let value = str::to_lowercase(value);
74///                 self.field1.set(rem, value.as_ref())
75///             },
76///             "field2" => self.field2.set(rem, value.as_ref()),
77///             "field3" => self.field3.set(rem, value.as_ref()),
78///             _ => _internal_err!(
79///                 "Config value \"{}\" not found on MyConfig",
80///                 key
81///             ),
82///         }
83///     }
84///
85///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
86///         let key = format!("{}.field1", key_prefix);
87///         let desc = "Field 1 doc";
88///         self.field1.visit(v, key.as_str(), desc);
89///         let key = format!("{}.field2", key_prefix);
90///         let desc = "Field 2 doc";
91///         self.field2.visit(v, key.as_str(), desc);
92///         let key = format!("{}.field3", key_prefix);
93///         let desc = "Field 3 doc";
94///         self.field3.visit(v, key.as_str(), desc);
95///     }
96/// }
97///
98/// impl Default for MyConfig {
99///     fn default() -> Self {
100///         Self {
101///             field1: "".to_string(),
102///             field2: 232,
103///             field3: None,
104///         }
105///     }
106/// }
107/// ```
108///
109/// NB: Misplaced commas may result in nonsensical errors
110#[macro_export]
111macro_rules! config_namespace {
112    (
113        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
114        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
115        $(#[allow($($struct_de:tt)*)])?
116        $vis:vis struct $struct_name:ident {
117            $(
118                $(#[doc = $d:tt])* // Field-level documentation attributes
119                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
120                $(#[allow($($field_de:tt)*)])?
121                $field_vis:vis $field_name:ident : $field_type:ty,
122                $(warn = $warn:expr,)?
123                $(transform = $transform:expr,)?
124                default = $default:expr
125            )*$(,)*
126        }
127    ) => {
128        $(#[doc = $struct_d])* // Apply struct documentation
129        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
130        $(#[allow($($struct_de)*)])?
131        #[derive(Debug, Clone, PartialEq)]
132        $vis struct $struct_name {
133            $(
134                $(#[doc = $d])* // Apply field documentation
135                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
136                $(#[allow($($field_de)*)])?
137                $field_vis $field_name: $field_type,
138            )*
139        }
140
141        impl $crate::config::ConfigField for $struct_name {
142            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
143                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
144                match key {
145                    $(
146                        stringify!($field_name) => {
147                            // Safely apply deprecated attribute if present
148                            // $(#[allow(deprecated)])?
149                            {
150                                $(let value = $transform(value);)? // Apply transformation if specified
151                                #[allow(deprecated)]
152                                let ret = self.$field_name.set(rem, value.as_ref());
153
154                                $(if !$warn.is_empty() {
155                                    let default: $field_type = $default;
156                                    #[allow(deprecated)]
157                                    if default != self.$field_name {
158                                        log::warn!($warn);
159                                    }
160                                })? // Log warning if specified, and the value is not the default
161                                ret
162                            }
163                        },
164                    )*
165                    _ => return $crate::error::_config_err!(
166                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
167                    )
168                }
169            }
170
171            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
172                $(
173                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
174                    let desc = concat!($($d),*).trim();
175                    #[allow(deprecated)]
176                    self.$field_name.visit(v, key.as_str(), desc);
177                )*
178            }
179        }
180        impl Default for $struct_name {
181            fn default() -> Self {
182                #[allow(deprecated)]
183                Self {
184                    $($field_name: $default),*
185                }
186            }
187        }
188    }
189}
190
191config_namespace! {
192    /// Options related to catalog and directory scanning
193    ///
194    /// See also: [`SessionConfig`]
195    ///
196    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
197    pub struct CatalogOptions {
198        /// Whether the default catalog and schema should be created automatically.
199        pub create_default_catalog_and_schema: bool, default = true
200
201        /// The default catalog name - this impacts what SQL queries use if not specified
202        pub default_catalog: String, default = "datafusion".to_string()
203
204        /// The default schema name - this impacts what SQL queries use if not specified
205        pub default_schema: String, default = "public".to_string()
206
207        /// Should DataFusion provide access to `information_schema`
208        /// virtual tables for displaying schema information
209        pub information_schema: bool, default = false
210
211        /// Location scanned to load tables for `default` schema
212        pub location: Option<String>, default = None
213
214        /// Type of `TableProvider` to use when loading `default` schema
215        pub format: Option<String>, default = None
216
217        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
218        /// if not specified explicitly in the statement.
219        pub has_header: bool, default = true
220
221        /// Specifies whether newlines in (quoted) CSV values are supported.
222        ///
223        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
224        /// if not specified explicitly in the statement.
225        ///
226        /// Parsing newlines in quoted values may be affected by execution behaviour such as
227        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
228        /// parsed successfully, which may reduce performance.
229        pub newlines_in_values: bool, default = false
230    }
231}
232
233config_namespace! {
234    /// Options related to SQL parser
235    ///
236    /// See also: [`SessionConfig`]
237    ///
238    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
239    pub struct SqlParserOptions {
240        /// When set to true, SQL parser will parse float as decimal type
241        pub parse_float_as_decimal: bool, default = false
242
243        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
244        pub enable_ident_normalization: bool, default = true
245
246        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
247        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
248        /// are normalized automatically.
249        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
250
251        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
252        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
253        pub dialect: String, default = "generic".to_string()
254        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
255
256        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
257        /// ignore the length. If false, error if a `VARCHAR` with a length is
258        /// specified. The Arrow type system does not have a notion of maximum
259        /// string length and thus DataFusion can not enforce such limits.
260        pub support_varchar_with_length: bool, default = true
261
262       /// If true, `VARCHAR` is mapped to `Utf8View` during SQL planning.
263       /// If false, `VARCHAR` is mapped to `Utf8`  during SQL planning.
264       /// Default is false.
265        pub map_varchar_to_utf8view: bool, default = true
266
267        /// When set to true, the source locations relative to the original SQL
268        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
269        /// and recorded in the logical plan nodes.
270        pub collect_spans: bool, default = false
271
272        /// Specifies the recursion depth limit when parsing complex SQL Queries
273        pub recursion_limit: usize, default = 50
274    }
275}
276
277config_namespace! {
278    /// Options related to query execution
279    ///
280    /// See also: [`SessionConfig`]
281    ///
282    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
283    pub struct ExecutionOptions {
284        /// Default batch size while creating new batches, it's especially useful for
285        /// buffer-in-memory batches since creating tiny batches would result in too much
286        /// metadata memory consumption
287        pub batch_size: usize, default = 8192
288
289        /// When set to true, record batches will be examined between each operator and
290        /// small batches will be coalesced into larger batches. This is helpful when there
291        /// are highly selective filters or joins that could produce tiny output batches. The
292        /// target batch size is determined by the configuration setting
293        pub coalesce_batches: bool, default = true
294
295        /// Should DataFusion collect statistics when first creating a table.
296        /// Has no effect after the table is created. Applies to the default
297        /// `ListingTableProvider` in DataFusion. Defaults to true.
298        pub collect_statistics: bool, default = true
299
300        /// Number of partitions for query execution. Increasing partitions can increase
301        /// concurrency.
302        ///
303        /// Defaults to the number of CPU cores on the system
304        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
305
306        /// The default time zone
307        ///
308        /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
309        /// according to this time zone, and then extract the hour
310        pub time_zone: Option<String>, default = Some("+00:00".into())
311
312        /// Parquet options
313        pub parquet: ParquetOptions, default = Default::default()
314
315        /// Fan-out during initial physical planning.
316        ///
317        /// This is mostly use to plan `UNION` children in parallel.
318        ///
319        /// Defaults to the number of CPU cores on the system
320        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
321
322        /// When set to true, skips verifying that the schema produced by
323        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
324        /// schema of the input plan.
325        ///
326        /// When set to false, if the schema does not match exactly
327        /// (including nullability and metadata), a planning error will be raised.
328        ///
329        /// This is used to workaround bugs in the planner that are now caught by
330        /// the new schema verification step.
331        pub skip_physical_aggregate_schema_check: bool, default = false
332
333        /// Specifies the reserved memory for each spillable sort operation to
334        /// facilitate an in-memory merge.
335        ///
336        /// When a sort operation spills to disk, the in-memory data must be
337        /// sorted and merged before being written to a file. This setting reserves
338        /// a specific amount of memory for that in-memory sort/merge process.
339        ///
340        /// Note: This setting is irrelevant if the sort operation cannot spill
341        /// (i.e., if there's no `DiskManager` configured).
342        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
343
344        /// When sorting, below what size should data be concatenated
345        /// and sorted in a single RecordBatch rather than sorted in
346        /// batches and merged.
347        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
348
349        /// Number of files to read in parallel when inferring schema and statistics
350        pub meta_fetch_concurrency: usize, default = 32
351
352        /// Guarantees a minimum level of output files running in parallel.
353        /// RecordBatches will be distributed in round robin fashion to each
354        /// parallel writer. Each writer is closed and a new file opened once
355        /// soft_max_rows_per_output_file is reached.
356        pub minimum_parallel_output_files: usize, default = 4
357
358        /// Target number of rows in output files when writing multiple.
359        /// This is a soft max, so it can be exceeded slightly. There also
360        /// will be one file smaller than the limit if the total
361        /// number of rows written is not roughly divisible by the soft max
362        pub soft_max_rows_per_output_file: usize, default = 50000000
363
364        /// This is the maximum number of RecordBatches buffered
365        /// for each output file being worked. Higher values can potentially
366        /// give faster write performance at the cost of higher peak
367        /// memory consumption
368        pub max_buffered_batches_per_output_file: usize, default = 2
369
370        /// Should sub directories be ignored when scanning directories for data
371        /// files. Defaults to true (ignores subdirectories), consistent with
372        /// Hive. Note that this setting does not affect reading partitioned
373        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
374        pub listing_table_ignore_subdirectory: bool, default = true
375
376        /// Should DataFusion support recursive CTEs
377        pub enable_recursive_ctes: bool, default = true
378
379        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
380        /// statistics into the same file groups.
381        /// Currently experimental
382        pub split_file_groups_by_statistics: bool, default = false
383
384        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
385        pub keep_partition_by_columns: bool, default = false
386
387        /// Aggregation ratio (number of distinct groups / number of input rows)
388        /// threshold for skipping partial aggregation. If the value is greater
389        /// then partial aggregation will skip aggregation for further input
390        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
391
392        /// Number of input rows partial aggregation partition should process, before
393        /// aggregation ratio check and trying to switch to skipping aggregation mode
394        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
395
396        /// Should DataFusion use row number estimates at the input to decide
397        /// whether increasing parallelism is beneficial or not. By default,
398        /// only exact row numbers (not estimates) are used for this decision.
399        /// Setting this flag to `true` will likely produce better plans.
400        /// if the source of statistics is accurate.
401        /// We plan to make this the default in the future.
402        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
403
404        /// Should DataFusion enforce batch size in joins or not. By default,
405        /// DataFusion will not enforce batch size in joins. Enforcing batch size
406        /// in joins can reduce memory usage when joining large
407        /// tables with a highly-selective join filter, but is also slightly slower.
408        pub enforce_batch_size_in_joins: bool, default = false
409
410        /// Size (bytes) of data buffer DataFusion uses when writing output files.
411        /// This affects the size of the data chunks that are uploaded to remote
412        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
413        /// written, it may be necessary to increase this size to avoid errors from
414        /// the remote end point.
415        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
416    }
417}
418
419config_namespace! {
420    /// Options for reading and writing parquet files
421    ///
422    /// See also: [`SessionConfig`]
423    ///
424    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
425    pub struct ParquetOptions {
426        // The following options affect reading parquet files
427
428        /// (reading) If true, reads the Parquet data page level metadata (the
429        /// Page Index), if present, to reduce the I/O and number of
430        /// rows decoded.
431        pub enable_page_index: bool, default = true
432
433        /// (reading) If true, the parquet reader attempts to skip entire row groups based
434        /// on the predicate in the query and the metadata (min/max values) stored in
435        /// the parquet file
436        pub pruning: bool, default = true
437
438        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
439        /// the file Schema. This setting can help avoid schema conflicts when querying
440        /// multiple parquet files with schemas containing compatible types but different metadata
441        pub skip_metadata: bool, default = true
442
443        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
444        /// bytes of the parquet file optimistically. If not specified, two reads are required:
445        /// One read to fetch the 8-byte parquet footer and
446        /// another to fetch the metadata length encoded in the footer
447        pub metadata_size_hint: Option<usize>, default = None
448
449        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
450        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
451        pub pushdown_filters: bool, default = false
452
453        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
454        /// will be reordered heuristically to minimize the cost of evaluation. If false,
455        /// the filters are applied in the same order as written in the query
456        pub reorder_filters: bool, default = false
457
458        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
459        /// and `Binary/BinaryLarge` with `BinaryView`.
460        pub schema_force_view_types: bool, default = true
461
462        /// (reading) If true, parquet reader will read columns of
463        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
464        ///
465        /// Parquet files generated by some legacy writers do not correctly set
466        /// the UTF8 flag for strings, causing string columns to be loaded as
467        /// BLOB instead.
468        pub binary_as_string: bool, default = false
469
470        /// (reading) If true, parquet reader will read columns of
471        /// physical type int96 as originating from a different resolution
472        /// than nanosecond. This is useful for reading data from systems like Spark
473        /// which stores microsecond resolution timestamps in an int96 allowing it
474        /// to write values with a larger date range than 64-bit timestamps with
475        /// nanosecond resolution.
476        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
477
478        /// (reading) Use any available bloom filters when reading parquet files
479        pub bloom_filter_on_read: bool, default = true
480
481        // The following options affect writing to parquet files
482        // and map to parquet::file::properties::WriterProperties
483
484        /// (writing) Sets best effort maximum size of data page in bytes
485        pub data_pagesize_limit: usize, default = 1024 * 1024
486
487        /// (writing) Sets write_batch_size in bytes
488        pub write_batch_size: usize, default = 1024
489
490        /// (writing) Sets parquet writer version
491        /// valid values are "1.0" and "2.0"
492        pub writer_version: String, default = "1.0".to_string()
493
494        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
495        ///
496        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
497        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
498        pub skip_arrow_metadata: bool, default = false
499
500        /// (writing) Sets default parquet compression codec.
501        /// Valid values are: uncompressed, snappy, gzip(level),
502        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
503        /// These values are not case sensitive. If NULL, uses
504        /// default parquet writer setting
505        ///
506        /// Note that this default setting is not the same as
507        /// the default parquet writer setting.
508        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
509
510        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
511        /// default parquet writer setting
512        pub dictionary_enabled: Option<bool>, default = Some(true)
513
514        /// (writing) Sets best effort maximum dictionary page size, in bytes
515        pub dictionary_page_size_limit: usize, default = 1024 * 1024
516
517        /// (writing) Sets if statistics are enabled for any column
518        /// Valid values are: "none", "chunk", and "page"
519        /// These values are not case sensitive. If NULL, uses
520        /// default parquet writer setting
521        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
522
523        /// (writing) Sets max statistics size for any column. If NULL, uses
524        /// default parquet writer setting
525        /// max_statistics_size is deprecated, currently it is not being used
526        // TODO: remove once deprecated
527        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
528        pub max_statistics_size: Option<usize>, default = Some(4096)
529
530        /// (writing) Target maximum number of rows in each row group (defaults to 1M
531        /// rows). Writing larger row groups requires more memory to write, but
532        /// can get better compression and be faster to read.
533        pub max_row_group_size: usize, default =  1024 * 1024
534
535        /// (writing) Sets "created by" property
536        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
537
538        /// (writing) Sets column index truncate length
539        pub column_index_truncate_length: Option<usize>, default = Some(64)
540
541        /// (writing) Sets statictics truncate length. If NULL, uses
542        /// default parquet writer setting
543        pub statistics_truncate_length: Option<usize>, default = None
544
545        /// (writing) Sets best effort maximum number of rows in data page
546        pub data_page_row_count_limit: usize, default = 20_000
547
548        /// (writing)  Sets default encoding for any column.
549        /// Valid values are: plain, plain_dictionary, rle,
550        /// bit_packed, delta_binary_packed, delta_length_byte_array,
551        /// delta_byte_array, rle_dictionary, and byte_stream_split.
552        /// These values are not case sensitive. If NULL, uses
553        /// default parquet writer setting
554        pub encoding: Option<String>, transform = str::to_lowercase, default = None
555
556        /// (writing) Write bloom filters for all columns when creating parquet files
557        pub bloom_filter_on_write: bool, default = false
558
559        /// (writing) Sets bloom filter false positive probability. If NULL, uses
560        /// default parquet writer setting
561        pub bloom_filter_fpp: Option<f64>, default = None
562
563        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
564        /// default parquet writer setting
565        pub bloom_filter_ndv: Option<u64>, default = None
566
567        /// (writing) Controls whether DataFusion will attempt to speed up writing
568        /// parquet files by serializing them in parallel. Each column
569        /// in each row group in each output file are serialized in parallel
570        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
571        pub allow_single_file_parallelism: bool, default = true
572
573        /// (writing) By default parallel parquet writer is tuned for minimum
574        /// memory usage in a streaming execution plan. You may see
575        /// a performance benefit when writing large parquet files
576        /// by increasing maximum_parallel_row_group_writers and
577        /// maximum_buffered_record_batches_per_stream if your system
578        /// has idle cores and can tolerate additional memory usage.
579        /// Boosting these values is likely worthwhile when
580        /// writing out already in-memory data, such as from a cached
581        /// data frame.
582        pub maximum_parallel_row_group_writers: usize, default = 1
583
584        /// (writing) By default parallel parquet writer is tuned for minimum
585        /// memory usage in a streaming execution plan. You may see
586        /// a performance benefit when writing large parquet files
587        /// by increasing maximum_parallel_row_group_writers and
588        /// maximum_buffered_record_batches_per_stream if your system
589        /// has idle cores and can tolerate additional memory usage.
590        /// Boosting these values is likely worthwhile when
591        /// writing out already in-memory data, such as from a cached
592        /// data frame.
593        pub maximum_buffered_record_batches_per_stream: usize, default = 2
594    }
595}
596
597config_namespace! {
598    /// Options related to query optimization
599    ///
600    /// See also: [`SessionConfig`]
601    ///
602    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
603    pub struct OptimizerOptions {
604        /// When set to true, the optimizer will push a limit operation into
605        /// grouped aggregations which have no aggregate expressions, as a soft limit,
606        /// emitting groups once the limit is reached, before all rows in the group are read.
607        pub enable_distinct_aggregation_soft_limit: bool, default = true
608
609        /// When set to true, the physical plan optimizer will try to add round robin
610        /// repartitioning to increase parallelism to leverage more CPU cores
611        pub enable_round_robin_repartition: bool, default = true
612
613        /// When set to true, the optimizer will attempt to perform limit operations
614        /// during aggregations, if possible
615        pub enable_topk_aggregation: bool, default = true
616
617        /// When set to true, the optimizer will insert filters before a join between
618        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
619        /// filter can add additional overhead when the file format does not fully support
620        /// predicate push down.
621        pub filter_null_join_keys: bool, default = false
622
623        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
624        /// in parallel using the provided `target_partitions` level
625        pub repartition_aggregations: bool, default = true
626
627        /// Minimum total files size in bytes to perform file scan repartitioning.
628        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
629
630        /// Should DataFusion repartition data using the join keys to execute joins in parallel
631        /// using the provided `target_partitions` level
632        pub repartition_joins: bool, default = true
633
634        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
635        /// its inputs do not have any ordering or filtering If the flag is not enabled,
636        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
637        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
638        /// RightAnti, and RightSemi - being produced only at the end of the execution.
639        /// This is not typical in stream processing. Additionally, without proper design for
640        /// long runner execution, all types of joins may encounter out-of-memory errors.
641        pub allow_symmetric_joins_without_pruning: bool, default = true
642
643        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
644        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
645        ///
646        /// For FileSources, only Parquet and CSV formats are currently supported.
647        ///
648        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
649        /// might be partitioned into smaller chunks) for parallel scanning.
650        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
651        /// happen within a single file.
652        ///
653        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
654        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
655        /// the total number of partitions and batches per partition, but does not slice the initial
656        /// record tables provided to the MemTable on creation.
657        pub repartition_file_scans: bool, default = true
658
659        /// Should DataFusion repartition data using the partitions keys to execute window
660        /// functions in parallel using the provided `target_partitions` level
661        pub repartition_windows: bool, default = true
662
663        /// Should DataFusion execute sorts in a per-partition fashion and merge
664        /// afterwards instead of coalescing first and sorting globally.
665        /// With this flag is enabled, plans in the form below
666        ///
667        /// ```text
668        ///      "SortExec: [a@0 ASC]",
669        ///      "  CoalescePartitionsExec",
670        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
671        /// ```
672        /// would turn into the plan below which performs better in multithreaded environments
673        ///
674        /// ```text
675        ///      "SortPreservingMergeExec: [a@0 ASC]",
676        ///      "  SortExec: [a@0 ASC]",
677        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
678        /// ```
679        pub repartition_sorts: bool, default = true
680
681        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
682        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
683        /// using `SortPreservingMergeExec`)
684        ///
685        /// When false, DataFusion will maximize plan parallelism using
686        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
687        pub prefer_existing_sort: bool, default = false
688
689        /// When set to true, the logical plan optimizer will produce warning
690        /// messages if any optimization rules produce errors and then proceed to the next
691        /// rule. When set to false, any rules that produce errors will cause the query to fail
692        pub skip_failed_rules: bool, default = false
693
694        /// Number of times that the optimizer will attempt to optimize the plan
695        pub max_passes: usize, default = 3
696
697        /// When set to true, the physical plan optimizer will run a top down
698        /// process to reorder the join keys
699        pub top_down_join_key_reordering: bool, default = true
700
701        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
702        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
703        pub prefer_hash_join: bool, default = true
704
705        /// The maximum estimated size in bytes for one input side of a HashJoin
706        /// will be collected into a single partition
707        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
708
709        /// The maximum estimated size in rows for one input side of a HashJoin
710        /// will be collected into a single partition
711        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
712
713        /// The default filter selectivity used by Filter Statistics
714        /// when an exact selectivity cannot be determined. Valid values are
715        /// between 0 (no selectivity) and 100 (all rows are selected).
716        pub default_filter_selectivity: u8, default = 20
717
718        /// When set to true, the optimizer will not attempt to convert Union to Interleave
719        pub prefer_existing_union: bool, default = false
720
721        /// When set to true, if the returned type is a view type
722        /// then the output will be coerced to a non-view.
723        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
724        pub expand_views_at_output: bool, default = false
725    }
726}
727
728config_namespace! {
729    /// Options controlling explain output
730    ///
731    /// See also: [`SessionConfig`]
732    ///
733    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
734    pub struct ExplainOptions {
735        /// When set to true, the explain statement will only print logical plans
736        pub logical_plan_only: bool, default = false
737
738        /// When set to true, the explain statement will only print physical plans
739        pub physical_plan_only: bool, default = false
740
741        /// When set to true, the explain statement will print operator statistics
742        /// for physical plans
743        pub show_statistics: bool, default = false
744
745        /// When set to true, the explain statement will print the partition sizes
746        pub show_sizes: bool, default = true
747
748        /// When set to true, the explain statement will print schema information
749        pub show_schema: bool, default = false
750
751        /// Display format of explain. Default is "indent".
752        /// When set to "tree", it will print the plan in a tree-rendered format.
753        pub format: String, default = "indent".to_string()
754    }
755}
756
757impl ExecutionOptions {
758    /// Returns the correct parallelism based on the provided `value`.
759    /// If `value` is `"0"`, returns the default available parallelism, computed with
760    /// `get_available_parallelism`. Otherwise, returns `value`.
761    fn normalized_parallelism(value: &str) -> String {
762        if value.parse::<usize>() == Ok(0) {
763            get_available_parallelism().to_string()
764        } else {
765            value.to_owned()
766        }
767    }
768}
769
770config_namespace! {
771    /// Options controlling the format of output when printing record batches
772    /// Copies [`arrow::util::display::FormatOptions`]
773    pub struct FormatOptions {
774        /// If set to `true` any formatting errors will be written to the output
775        /// instead of being converted into a [`std::fmt::Error`]
776        pub safe: bool, default = true
777        /// Format string for nulls
778        pub null: String, default = "".into()
779        /// Date format for date arrays
780        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
781        /// Format for DateTime arrays
782        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
783        /// Timestamp format for timestamp arrays
784        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
785        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
786        pub timestamp_tz_format: Option<String>, default = None
787        /// Time format for time arrays
788        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
789        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
790        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
791        /// Show types in visual representation batches
792        pub types_info: bool, default = false
793    }
794}
795
796impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
797    type Error = DataFusionError;
798    fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
799        let duration_format = match self.duration_format.as_str() {
800            "pretty" => arrow::util::display::DurationFormat::Pretty,
801            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
802            _ => {
803                return _config_err!(
804                    "Invalid duration format: {}. Valid values are pretty or iso8601",
805                    self.duration_format
806                )
807            }
808        };
809
810        Ok(arrow::util::display::FormatOptions::new()
811            .with_display_error(self.safe)
812            .with_null(&self.null)
813            .with_date_format(self.date_format.as_deref())
814            .with_datetime_format(self.datetime_format.as_deref())
815            .with_timestamp_format(self.timestamp_format.as_deref())
816            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
817            .with_time_format(self.time_format.as_deref())
818            .with_duration_format(duration_format)
819            .with_types_info(self.types_info))
820    }
821}
822
823/// A key value pair, with a corresponding description
824#[derive(Debug)]
825pub struct ConfigEntry {
826    /// A unique string to identify this config value
827    pub key: String,
828
829    /// The value if any
830    pub value: Option<String>,
831
832    /// A description of this configuration entry
833    pub description: &'static str,
834}
835
836/// Configuration options struct, able to store both built-in configuration and custom options
837#[derive(Debug, Clone, Default)]
838#[non_exhaustive]
839pub struct ConfigOptions {
840    /// Catalog options
841    pub catalog: CatalogOptions,
842    /// Execution options
843    pub execution: ExecutionOptions,
844    /// Optimizer options
845    pub optimizer: OptimizerOptions,
846    /// SQL parser options
847    pub sql_parser: SqlParserOptions,
848    /// Explain options
849    pub explain: ExplainOptions,
850    /// Optional extensions registered using [`Extensions::insert`]
851    pub extensions: Extensions,
852    /// Formatting options when printing batches
853    pub format: FormatOptions,
854}
855
856impl ConfigField for ConfigOptions {
857    fn set(&mut self, key: &str, value: &str) -> Result<()> {
858        // Extensions are handled in the public `ConfigOptions::set`
859        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
860        match key {
861            "catalog" => self.catalog.set(rem, value),
862            "execution" => self.execution.set(rem, value),
863            "optimizer" => self.optimizer.set(rem, value),
864            "explain" => self.explain.set(rem, value),
865            "sql_parser" => self.sql_parser.set(rem, value),
866            "format" => self.format.set(rem, value),
867            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
868        }
869    }
870
871    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
872        self.catalog.visit(v, "datafusion.catalog", "");
873        self.execution.visit(v, "datafusion.execution", "");
874        self.optimizer.visit(v, "datafusion.optimizer", "");
875        self.explain.visit(v, "datafusion.explain", "");
876        self.sql_parser.visit(v, "datafusion.sql_parser", "");
877        self.format.visit(v, "datafusion.format", "");
878    }
879}
880
881impl ConfigOptions {
882    /// Creates a new [`ConfigOptions`] with default values
883    pub fn new() -> Self {
884        Self::default()
885    }
886
887    /// Set extensions to provided value
888    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
889        self.extensions = extensions;
890        self
891    }
892
893    /// Set a configuration option
894    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
895        let Some((prefix, key)) = key.split_once('.') else {
896            return _config_err!("could not find config namespace for key \"{key}\"");
897        };
898
899        if prefix == "datafusion" {
900            return ConfigField::set(self, key, value);
901        }
902
903        let Some(e) = self.extensions.0.get_mut(prefix) else {
904            return _config_err!("Could not find config namespace \"{prefix}\"");
905        };
906        e.0.set(key, value)
907    }
908
909    /// Create new ConfigOptions struct, taking values from
910    /// environment variables where possible.
911    ///
912    /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
913    /// control `datafusion.execution.batch_size`.
914    pub fn from_env() -> Result<Self> {
915        struct Visitor(Vec<String>);
916
917        impl Visit for Visitor {
918            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
919                self.0.push(key.to_string())
920            }
921
922            fn none(&mut self, key: &str, _: &'static str) {
923                self.0.push(key.to_string())
924            }
925        }
926
927        // Extract the names of all fields and then look up the corresponding
928        // environment variables. This isn't hugely efficient but avoids
929        // ambiguity between `a.b` and `a_b` which would both correspond
930        // to an environment variable of `A_B`
931
932        let mut keys = Visitor(vec![]);
933        let mut ret = Self::default();
934        ret.visit(&mut keys, "datafusion", "");
935
936        for key in keys.0 {
937            let env = key.to_uppercase().replace('.', "_");
938            if let Some(var) = std::env::var_os(env) {
939                let value = var.to_string_lossy();
940                log::info!("Set {key} to {value} from the environment variable");
941                ret.set(&key, value.as_ref())?;
942            }
943        }
944
945        Ok(ret)
946    }
947
948    /// Create new ConfigOptions struct, taking values from a string hash map.
949    ///
950    /// Only the built-in configurations will be extracted from the hash map
951    /// and other key value pairs will be ignored.
952    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
953        struct Visitor(Vec<String>);
954
955        impl Visit for Visitor {
956            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
957                self.0.push(key.to_string())
958            }
959
960            fn none(&mut self, key: &str, _: &'static str) {
961                self.0.push(key.to_string())
962            }
963        }
964
965        let mut keys = Visitor(vec![]);
966        let mut ret = Self::default();
967        ret.visit(&mut keys, "datafusion", "");
968
969        for key in keys.0 {
970            if let Some(var) = settings.get(&key) {
971                ret.set(&key, var)?;
972            }
973        }
974
975        Ok(ret)
976    }
977
978    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
979    pub fn entries(&self) -> Vec<ConfigEntry> {
980        struct Visitor(Vec<ConfigEntry>);
981
982        impl Visit for Visitor {
983            fn some<V: Display>(
984                &mut self,
985                key: &str,
986                value: V,
987                description: &'static str,
988            ) {
989                self.0.push(ConfigEntry {
990                    key: key.to_string(),
991                    value: Some(value.to_string()),
992                    description,
993                })
994            }
995
996            fn none(&mut self, key: &str, description: &'static str) {
997                self.0.push(ConfigEntry {
998                    key: key.to_string(),
999                    value: None,
1000                    description,
1001                })
1002            }
1003        }
1004
1005        let mut v = Visitor(vec![]);
1006        self.visit(&mut v, "datafusion", "");
1007
1008        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1009        v.0
1010    }
1011
1012    /// Generate documentation that can be included in the user guide
1013    pub fn generate_config_markdown() -> String {
1014        use std::fmt::Write as _;
1015
1016        let mut s = Self::default();
1017
1018        // Normalize for display
1019        s.execution.target_partitions = 0;
1020        s.execution.planning_concurrency = 0;
1021
1022        let mut docs = "| key | default | description |\n".to_string();
1023        docs += "|-----|---------|-------------|\n";
1024        let mut entries = s.entries();
1025        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1026
1027        for entry in s.entries() {
1028            let _ = writeln!(
1029                &mut docs,
1030                "| {} | {} | {} |",
1031                entry.key,
1032                entry.value.as_deref().unwrap_or("NULL"),
1033                entry.description
1034            );
1035        }
1036        docs
1037    }
1038}
1039
1040/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1041/// within DataFusion [`ConfigOptions`]
1042///
1043/// This mechanism can be used to pass configuration to user defined functions
1044/// or optimizer passes
1045///
1046/// # Example
1047/// ```
1048/// use datafusion_common::{
1049///     config::ConfigExtension, extensions_options,
1050///     config::ConfigOptions,
1051/// };
1052///  // Define a new configuration struct using the `extensions_options` macro
1053///  extensions_options! {
1054///     /// My own config options.
1055///     pub struct MyConfig {
1056///         /// Should "foo" be replaced by "bar"?
1057///         pub foo_to_bar: bool, default = true
1058///
1059///         /// How many "baz" should be created?
1060///         pub baz_count: usize, default = 1337
1061///     }
1062///  }
1063///
1064///  impl ConfigExtension for MyConfig {
1065///     const PREFIX: &'static str = "my_config";
1066///  }
1067///
1068///  // set up config struct and register extension
1069///  let mut config = ConfigOptions::default();
1070///  config.extensions.insert(MyConfig::default());
1071///
1072///  // overwrite config default
1073///  config.set("my_config.baz_count", "42").unwrap();
1074///
1075///  // check config state
1076///  let my_config = config.extensions.get::<MyConfig>().unwrap();
1077///  assert!(my_config.foo_to_bar,);
1078///  assert_eq!(my_config.baz_count, 42,);
1079/// ```
1080///
1081/// # Note:
1082/// Unfortunately associated constants are not currently object-safe, and so this
1083/// extends the object-safe [`ExtensionOptions`]
1084pub trait ConfigExtension: ExtensionOptions {
1085    /// Configuration namespace prefix to use
1086    ///
1087    /// All values under this will be prefixed with `$PREFIX + "."`
1088    const PREFIX: &'static str;
1089}
1090
1091/// An object-safe API for storing arbitrary configuration.
1092///
1093/// See [`ConfigExtension`] for user defined configuration
1094pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1095    /// Return `self` as [`Any`]
1096    ///
1097    /// This is needed until trait upcasting is stabilized
1098    fn as_any(&self) -> &dyn Any;
1099
1100    /// Return `self` as [`Any`]
1101    ///
1102    /// This is needed until trait upcasting is stabilized
1103    fn as_any_mut(&mut self) -> &mut dyn Any;
1104
1105    /// Return a deep clone of this [`ExtensionOptions`]
1106    ///
1107    /// It is important this does not share mutable state to avoid consistency issues
1108    /// with configuration changing whilst queries are executing
1109    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1110
1111    /// Set the given `key`, `value` pair
1112    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1113
1114    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1115    fn entries(&self) -> Vec<ConfigEntry>;
1116}
1117
1118/// A type-safe container for [`ConfigExtension`]
1119#[derive(Debug, Default, Clone)]
1120pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1121
1122impl Extensions {
1123    /// Create a new, empty [`Extensions`]
1124    pub fn new() -> Self {
1125        Self(BTreeMap::new())
1126    }
1127
1128    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1129    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1130        assert_ne!(T::PREFIX, "datafusion");
1131        let e = ExtensionBox(Box::new(extension));
1132        self.0.insert(T::PREFIX, e);
1133    }
1134
1135    /// Retrieves the extension of the given type if any
1136    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1137        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1138    }
1139
1140    /// Retrieves the extension of the given type if any
1141    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1142        let e = self.0.get_mut(T::PREFIX)?;
1143        e.0.as_any_mut().downcast_mut()
1144    }
1145}
1146
1147#[derive(Debug)]
1148struct ExtensionBox(Box<dyn ExtensionOptions>);
1149
1150impl Clone for ExtensionBox {
1151    fn clone(&self) -> Self {
1152        Self(self.0.cloned())
1153    }
1154}
1155
1156/// A trait implemented by `config_namespace` and for field types that provides
1157/// the ability to walk and mutate the configuration tree
1158pub trait ConfigField {
1159    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1160
1161    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1162}
1163
1164impl<F: ConfigField + Default> ConfigField for Option<F> {
1165    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1166        match self {
1167            Some(s) => s.visit(v, key, description),
1168            None => v.none(key, description),
1169        }
1170    }
1171
1172    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1173        self.get_or_insert_with(Default::default).set(key, value)
1174    }
1175}
1176
1177fn default_transform<T>(input: &str) -> Result<T>
1178where
1179    T: FromStr,
1180    <T as FromStr>::Err: Sync + Send + Error + 'static,
1181{
1182    input.parse().map_err(|e| {
1183        DataFusionError::Context(
1184            format!(
1185                "Error parsing '{}' as {}",
1186                input,
1187                std::any::type_name::<T>()
1188            ),
1189            Box::new(DataFusionError::External(Box::new(e))),
1190        )
1191    })
1192}
1193
1194#[macro_export]
1195macro_rules! config_field {
1196    ($t:ty) => {
1197        config_field!($t, value => default_transform(value)?);
1198    };
1199
1200    ($t:ty, $arg:ident => $transform:expr) => {
1201        impl ConfigField for $t {
1202            fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1203                v.some(key, self, description)
1204            }
1205
1206            fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
1207                *self = $transform;
1208                Ok(())
1209            }
1210        }
1211    };
1212}
1213
1214config_field!(String);
1215config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
1216config_field!(usize);
1217config_field!(f64);
1218config_field!(u64);
1219
1220impl ConfigField for u8 {
1221    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1222        v.some(key, self, description)
1223    }
1224
1225    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1226        if value.is_empty() {
1227            return Err(DataFusionError::Configuration(format!(
1228                "Input string for {key} key is empty"
1229            )));
1230        }
1231        // Check if the string is a valid number
1232        if let Ok(num) = value.parse::<u8>() {
1233            // TODO: Let's decide how we treat the numerical strings.
1234            *self = num;
1235        } else {
1236            let bytes = value.as_bytes();
1237            // Check if the first character is ASCII (single byte)
1238            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1239                return Err(DataFusionError::Configuration(format!(
1240                    "Error parsing {value} as u8. Non-ASCII string provided"
1241                )));
1242            }
1243            *self = bytes[0];
1244        }
1245        Ok(())
1246    }
1247}
1248
1249impl ConfigField for CompressionTypeVariant {
1250    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1251        v.some(key, self, description)
1252    }
1253
1254    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1255        *self = CompressionTypeVariant::from_str(value)?;
1256        Ok(())
1257    }
1258}
1259
1260/// An implementation trait used to recursively walk configuration
1261pub trait Visit {
1262    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1263
1264    fn none(&mut self, key: &str, description: &'static str);
1265}
1266
1267/// Convenience macro to create [`ExtensionsOptions`].
1268///
1269/// The created structure implements the following traits:
1270///
1271/// - [`Clone`]
1272/// - [`Debug`]
1273/// - [`Default`]
1274/// - [`ExtensionOptions`]
1275///
1276/// # Usage
1277/// The syntax is:
1278///
1279/// ```text
1280/// extensions_options! {
1281///      /// Struct docs (optional).
1282///     [<vis>] struct <StructName> {
1283///         /// Field docs (optional)
1284///         [<vis>] <field_name>: <field_type>, default = <default_value>
1285///
1286///         ... more fields
1287///     }
1288/// }
1289/// ```
1290///
1291/// The placeholders are:
1292/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1293/// - `<StructName>`: Struct name like `MyStruct`.
1294/// - `<field_name>`: Field name like `my_field`.
1295/// - `<field_type>`: Field type like `u8`.
1296/// - `<default_value>`: Default value matching the field type like `42`.
1297///
1298/// # Example
1299/// See also a full example on the [`ConfigExtension`] documentation
1300///
1301/// ```
1302/// use datafusion_common::extensions_options;
1303///
1304/// extensions_options! {
1305///     /// My own config options.
1306///     pub struct MyConfig {
1307///         /// Should "foo" be replaced by "bar"?
1308///         pub foo_to_bar: bool, default = true
1309///
1310///         /// How many "baz" should be created?
1311///         pub baz_count: usize, default = 1337
1312///     }
1313/// }
1314/// ```
1315///
1316///
1317/// [`Debug`]: std::fmt::Debug
1318/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1319#[macro_export]
1320macro_rules! extensions_options {
1321    (
1322     $(#[doc = $struct_d:tt])*
1323     $vis:vis struct $struct_name:ident {
1324        $(
1325        $(#[doc = $d:tt])*
1326        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1327        )*$(,)*
1328    }
1329    ) => {
1330        $(#[doc = $struct_d])*
1331        #[derive(Debug, Clone)]
1332        #[non_exhaustive]
1333        $vis struct $struct_name{
1334            $(
1335            $(#[doc = $d])*
1336            $field_vis $field_name : $field_type,
1337            )*
1338        }
1339
1340        impl Default for $struct_name {
1341            fn default() -> Self {
1342                Self {
1343                    $($field_name: $default),*
1344                }
1345            }
1346        }
1347
1348        impl $crate::config::ExtensionOptions for $struct_name {
1349            fn as_any(&self) -> &dyn ::std::any::Any {
1350                self
1351            }
1352
1353            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1354                self
1355            }
1356
1357            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1358                Box::new(self.clone())
1359            }
1360
1361            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1362                $crate::config::ConfigField::set(self, key, value)
1363            }
1364
1365            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1366                struct Visitor(Vec<$crate::config::ConfigEntry>);
1367
1368                impl $crate::config::Visit for Visitor {
1369                    fn some<V: std::fmt::Display>(
1370                        &mut self,
1371                        key: &str,
1372                        value: V,
1373                        description: &'static str,
1374                    ) {
1375                        self.0.push($crate::config::ConfigEntry {
1376                            key: key.to_string(),
1377                            value: Some(value.to_string()),
1378                            description,
1379                        })
1380                    }
1381
1382                    fn none(&mut self, key: &str, description: &'static str) {
1383                        self.0.push($crate::config::ConfigEntry {
1384                            key: key.to_string(),
1385                            value: None,
1386                            description,
1387                        })
1388                    }
1389                }
1390
1391                let mut v = Visitor(vec![]);
1392                // The prefix is not used for extensions.
1393                // The description is generated in ConfigField::visit.
1394                // We can just pass empty strings here.
1395                $crate::config::ConfigField::visit(self, &mut v, "", "");
1396                v.0
1397            }
1398        }
1399
1400        impl $crate::config::ConfigField for $struct_name {
1401            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1402                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1403                match key {
1404                    $(
1405                        stringify!($field_name) => {
1406                            // Safely apply deprecated attribute if present
1407                            // $(#[allow(deprecated)])?
1408                            {
1409                                #[allow(deprecated)]
1410                                self.$field_name.set(rem, value.as_ref())
1411                            }
1412                        },
1413                    )*
1414                    _ => return $crate::error::_config_err!(
1415                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1416                    )
1417                }
1418            }
1419
1420            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1421                $(
1422                    let key = stringify!($field_name).to_string();
1423                    let desc = concat!($($d),*).trim();
1424                    #[allow(deprecated)]
1425                    self.$field_name.visit(v, key.as_str(), desc);
1426                )*
1427            }
1428        }
1429    }
1430}
1431
1432/// These file types have special built in behavior for configuration.
1433/// Use TableOptions::Extensions for configuring other file types.
1434#[derive(Debug, Clone)]
1435pub enum ConfigFileType {
1436    CSV,
1437    #[cfg(feature = "parquet")]
1438    PARQUET,
1439    JSON,
1440}
1441
1442/// Represents the configuration options available for handling different table formats within a data processing application.
1443/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1444/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1445#[derive(Debug, Clone, Default)]
1446pub struct TableOptions {
1447    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1448    /// quote character, and whether the first row is considered as headers.
1449    pub csv: CsvOptions,
1450
1451    /// Configuration options for Parquet file handling. This includes settings for compression,
1452    /// encoding, and other Parquet-specific file characteristics.
1453    pub parquet: TableParquetOptions,
1454
1455    /// Configuration options for JSON file handling.
1456    pub json: JsonOptions,
1457
1458    /// The current file format that the table operations should assume. This option allows
1459    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1460    pub current_format: Option<ConfigFileType>,
1461
1462    /// Optional extensions that can be used to extend or customize the behavior of the table
1463    /// options. Extensions can be registered using `Extensions::insert` and might include
1464    /// custom file handling logic, additional configuration parameters, or other enhancements.
1465    pub extensions: Extensions,
1466}
1467
1468impl ConfigField for TableOptions {
1469    /// Visits configuration settings for the current file format, or all formats if none is selected.
1470    ///
1471    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1472    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1473    /// it visits all available format settings.
1474    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1475        if let Some(file_type) = &self.current_format {
1476            match file_type {
1477                #[cfg(feature = "parquet")]
1478                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1479                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1480                ConfigFileType::JSON => self.json.visit(v, "format", ""),
1481            }
1482        } else {
1483            self.csv.visit(v, "csv", "");
1484            self.parquet.visit(v, "parquet", "");
1485            self.json.visit(v, "json", "");
1486        }
1487    }
1488
1489    /// Sets a configuration value for a specific key within `TableOptions`.
1490    ///
1491    /// This method delegates setting configuration values to the specific file format configurations,
1492    /// based on the current format selected. If no format is selected, it returns an error.
1493    ///
1494    /// # Parameters
1495    ///
1496    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1497    ///   for CSV format.
1498    /// * `value`: The value to set for the specified configuration key.
1499    ///
1500    /// # Returns
1501    ///
1502    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1503    /// or if setting the configuration value fails for the specific format.
1504    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1505        // Extensions are handled in the public `ConfigOptions::set`
1506        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1507        match key {
1508            "format" => {
1509                let Some(format) = &self.current_format else {
1510                    return _config_err!("Specify a format for TableOptions");
1511                };
1512                match format {
1513                    #[cfg(feature = "parquet")]
1514                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
1515                    ConfigFileType::CSV => self.csv.set(rem, value),
1516                    ConfigFileType::JSON => self.json.set(rem, value),
1517                }
1518            }
1519            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1520        }
1521    }
1522}
1523
1524impl TableOptions {
1525    /// Constructs a new instance of `TableOptions` with default settings.
1526    ///
1527    /// # Returns
1528    ///
1529    /// A new `TableOptions` instance with default configuration values.
1530    pub fn new() -> Self {
1531        Self::default()
1532    }
1533
1534    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1535    ///
1536    /// # Parameters
1537    ///
1538    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1539    ///
1540    /// # Returns
1541    ///
1542    /// A new `TableOptions` instance with settings applied from the session config.
1543    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1544        let initial = TableOptions::default();
1545        initial.combine_with_session_config(config)
1546    }
1547
1548    /// Updates the current `TableOptions` with settings from a given session config.
1549    ///
1550    /// # Parameters
1551    ///
1552    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1553    ///
1554    /// # Returns
1555    ///
1556    /// A new `TableOptions` instance with updated settings from the session config.
1557    #[must_use = "this method returns a new instance"]
1558    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1559        let mut clone = self.clone();
1560        clone.parquet.global = config.execution.parquet.clone();
1561        clone
1562    }
1563
1564    /// Sets the file format for the table.
1565    ///
1566    /// # Parameters
1567    ///
1568    /// * `format`: The file format to use (e.g., CSV, Parquet).
1569    pub fn set_config_format(&mut self, format: ConfigFileType) {
1570        self.current_format = Some(format);
1571    }
1572
1573    /// Sets the extensions for this `TableOptions` instance.
1574    ///
1575    /// # Parameters
1576    ///
1577    /// * `extensions`: The `Extensions` instance to set.
1578    ///
1579    /// # Returns
1580    ///
1581    /// A new `TableOptions` instance with the specified extensions applied.
1582    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1583        self.extensions = extensions;
1584        self
1585    }
1586
1587    /// Sets a specific configuration option.
1588    ///
1589    /// # Parameters
1590    ///
1591    /// * `key`: The configuration key (e.g., "format.delimiter").
1592    /// * `value`: The value to set for the specified key.
1593    ///
1594    /// # Returns
1595    ///
1596    /// A result indicating success or failure in setting the configuration option.
1597    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1598        let Some((prefix, _)) = key.split_once('.') else {
1599            return _config_err!("could not find config namespace for key \"{key}\"");
1600        };
1601
1602        if prefix == "format" {
1603            return ConfigField::set(self, key, value);
1604        }
1605
1606        if prefix == "execution" {
1607            return Ok(());
1608        }
1609
1610        let Some(e) = self.extensions.0.get_mut(prefix) else {
1611            return _config_err!("Could not find config namespace \"{prefix}\"");
1612        };
1613        e.0.set(key, value)
1614    }
1615
1616    /// Initializes a new `TableOptions` from a hash map of string settings.
1617    ///
1618    /// # Parameters
1619    ///
1620    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1621    ///
1622    /// # Returns
1623    ///
1624    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1625    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1626        let mut ret = Self::default();
1627        for (k, v) in settings {
1628            ret.set(k, v)?;
1629        }
1630
1631        Ok(ret)
1632    }
1633
1634    /// Modifies the current `TableOptions` instance with settings from a hash map.
1635    ///
1636    /// # Parameters
1637    ///
1638    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1639    ///
1640    /// # Returns
1641    ///
1642    /// A result indicating success or failure in applying the settings.
1643    pub fn alter_with_string_hash_map(
1644        &mut self,
1645        settings: &HashMap<String, String>,
1646    ) -> Result<()> {
1647        for (k, v) in settings {
1648            self.set(k, v)?;
1649        }
1650        Ok(())
1651    }
1652
1653    /// Retrieves all configuration entries from this `TableOptions`.
1654    ///
1655    /// # Returns
1656    ///
1657    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1658    pub fn entries(&self) -> Vec<ConfigEntry> {
1659        struct Visitor(Vec<ConfigEntry>);
1660
1661        impl Visit for Visitor {
1662            fn some<V: Display>(
1663                &mut self,
1664                key: &str,
1665                value: V,
1666                description: &'static str,
1667            ) {
1668                self.0.push(ConfigEntry {
1669                    key: key.to_string(),
1670                    value: Some(value.to_string()),
1671                    description,
1672                })
1673            }
1674
1675            fn none(&mut self, key: &str, description: &'static str) {
1676                self.0.push(ConfigEntry {
1677                    key: key.to_string(),
1678                    value: None,
1679                    description,
1680                })
1681            }
1682        }
1683
1684        let mut v = Visitor(vec![]);
1685        self.visit(&mut v, "format", "");
1686
1687        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1688        v.0
1689    }
1690}
1691
1692/// Options that control how Parquet files are read, including global options
1693/// that apply to all columns and optional column-specific overrides
1694///
1695/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1696/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1697/// (e.g. sorting_columns).
1698#[derive(Clone, Default, Debug, PartialEq)]
1699pub struct TableParquetOptions {
1700    /// Global Parquet options that propagates to all columns.
1701    pub global: ParquetOptions,
1702    /// Column specific options. Default usage is parquet.XX::column.
1703    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1704    /// Additional file-level metadata to include. Inserted into the key_value_metadata
1705    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1706    ///
1707    /// Multiple entries are permitted
1708    /// ```sql
1709    /// OPTIONS (
1710    ///    'format.metadata::key1' '',
1711    ///    'format.metadata::key2' 'value',
1712    ///    'format.metadata::key3' 'value has spaces',
1713    ///    'format.metadata::key4' 'value has special chars :: :',
1714    ///    'format.metadata::key_dupe' 'original will be overwritten',
1715    ///    'format.metadata::key_dupe' 'final'
1716    /// )
1717    /// ```
1718    pub key_value_metadata: HashMap<String, Option<String>>,
1719}
1720
1721impl TableParquetOptions {
1722    /// Return new default TableParquetOptions
1723    pub fn new() -> Self {
1724        Self::default()
1725    }
1726
1727    /// Set whether the encoding of the arrow metadata should occur
1728    /// during the writing of parquet.
1729    ///
1730    /// Default is to encode the arrow schema in the file kv_metadata.
1731    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1732        Self {
1733            global: ParquetOptions {
1734                skip_arrow_metadata: skip,
1735                ..self.global
1736            },
1737            ..self
1738        }
1739    }
1740}
1741
1742impl ConfigField for TableParquetOptions {
1743    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1744        self.global.visit(v, key_prefix, description);
1745        self.column_specific_options
1746            .visit(v, key_prefix, description)
1747    }
1748
1749    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1750        // Determine if the key is a global, metadata, or column-specific setting
1751        if key.starts_with("metadata::") {
1752            let k = match key.split("::").collect::<Vec<_>>()[..] {
1753                [_meta] | [_meta, ""] => {
1754                    return _config_err!(
1755                        "Invalid metadata key provided, missing key in metadata::<key>"
1756                    )
1757                }
1758                [_meta, k] => k.into(),
1759                _ => {
1760                    return _config_err!(
1761                        "Invalid metadata key provided, found too many '::' in \"{key}\""
1762                    )
1763                }
1764            };
1765            self.key_value_metadata.insert(k, Some(value.into()));
1766            Ok(())
1767        } else if key.contains("::") {
1768            self.column_specific_options.set(key, value)
1769        } else {
1770            self.global.set(key, value)
1771        }
1772    }
1773}
1774
1775macro_rules! config_namespace_with_hashmap {
1776    (
1777     $(#[doc = $struct_d:tt])*
1778     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
1779     $vis:vis struct $struct_name:ident {
1780        $(
1781        $(#[doc = $d:tt])*
1782        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1783        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1784        )*$(,)*
1785    }
1786    ) => {
1787
1788        $(#[doc = $struct_d])*
1789        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
1790        #[derive(Debug, Clone, PartialEq)]
1791        $vis struct $struct_name{
1792            $(
1793            $(#[doc = $d])*
1794            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1795            $field_vis $field_name : $field_type,
1796            )*
1797        }
1798
1799        impl ConfigField for $struct_name {
1800            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1801                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1802                match key {
1803                    $(
1804                       stringify!($field_name) => {
1805                           // Handle deprecated fields
1806                           #[allow(deprecated)] // Allow deprecated fields
1807                           $(let value = $transform(value);)?
1808                           self.$field_name.set(rem, value.as_ref())
1809                       },
1810                    )*
1811                    _ => _config_err!(
1812                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1813                    )
1814                }
1815            }
1816
1817            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1818                $(
1819                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1820                let desc = concat!($($d),*).trim();
1821                // Handle deprecated fields
1822                #[allow(deprecated)]
1823                self.$field_name.visit(v, key.as_str(), desc);
1824                )*
1825            }
1826        }
1827
1828        impl Default for $struct_name {
1829            fn default() -> Self {
1830                #[allow(deprecated)]
1831                Self {
1832                    $($field_name: $default),*
1833                }
1834            }
1835        }
1836
1837        impl ConfigField for HashMap<String,$struct_name> {
1838            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1839                let parts: Vec<&str> = key.splitn(2, "::").collect();
1840                match parts.as_slice() {
1841                    [inner_key, hashmap_key] => {
1842                        // Get or create the struct for the specified key
1843                        let inner_value = self
1844                            .entry((*hashmap_key).to_owned())
1845                            .or_insert_with($struct_name::default);
1846
1847                        inner_value.set(inner_key, value)
1848                    }
1849                    _ => _config_err!("Unrecognized key '{key}'."),
1850                }
1851            }
1852
1853            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1854                for (column_name, col_options) in self {
1855                    $(
1856                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1857                    let desc = concat!($($d),*).trim();
1858                    #[allow(deprecated)]
1859                    col_options.$field_name.visit(v, key.as_str(), desc);
1860                    )*
1861                }
1862            }
1863        }
1864    }
1865}
1866
1867config_namespace_with_hashmap! {
1868    /// Options controlling parquet format for individual columns.
1869    ///
1870    /// See [`ParquetOptions`] for more details
1871    pub struct ParquetColumnOptions {
1872        /// Sets if bloom filter is enabled for the column path.
1873        pub bloom_filter_enabled: Option<bool>, default = None
1874
1875        /// Sets encoding for the column path.
1876        /// Valid values are: plain, plain_dictionary, rle,
1877        /// bit_packed, delta_binary_packed, delta_length_byte_array,
1878        /// delta_byte_array, rle_dictionary, and byte_stream_split.
1879        /// These values are not case-sensitive. If NULL, uses
1880        /// default parquet options
1881        pub encoding: Option<String>, default = None
1882
1883        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
1884        /// default parquet options
1885        pub dictionary_enabled: Option<bool>, default = None
1886
1887        /// Sets default parquet compression codec for the column path.
1888        /// Valid values are: uncompressed, snappy, gzip(level),
1889        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
1890        /// These values are not case-sensitive. If NULL, uses
1891        /// default parquet options
1892        pub compression: Option<String>, transform = str::to_lowercase, default = None
1893
1894        /// Sets if statistics are enabled for the column
1895        /// Valid values are: "none", "chunk", and "page"
1896        /// These values are not case sensitive. If NULL, uses
1897        /// default parquet options
1898        pub statistics_enabled: Option<String>, default = None
1899
1900        /// Sets bloom filter false positive probability for the column path. If NULL, uses
1901        /// default parquet options
1902        pub bloom_filter_fpp: Option<f64>, default = None
1903
1904        /// Sets bloom filter number of distinct values. If NULL, uses
1905        /// default parquet options
1906        pub bloom_filter_ndv: Option<u64>, default = None
1907
1908        /// Sets max statistics size for the column path. If NULL, uses
1909        /// default parquet options
1910        /// max_statistics_size is deprecated, currently it is not being used
1911        // TODO: remove once deprecated
1912        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
1913        pub max_statistics_size: Option<usize>, default = None
1914    }
1915}
1916
1917config_namespace! {
1918    /// Options controlling CSV format
1919    pub struct CsvOptions {
1920        /// Specifies whether there is a CSV header (i.e. the first line
1921        /// consists of is column names). The value `None` indicates that
1922        /// the configuration should be consulted.
1923        pub has_header: Option<bool>, default = None
1924        pub delimiter: u8, default = b','
1925        pub quote: u8, default = b'"'
1926        pub terminator: Option<u8>, default = None
1927        pub escape: Option<u8>, default = None
1928        pub double_quote: Option<bool>, default = None
1929        /// Specifies whether newlines in (quoted) values are supported.
1930        ///
1931        /// Parsing newlines in quoted values may be affected by execution behaviour such as
1932        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1933        /// parsed successfully, which may reduce performance.
1934        ///
1935        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1936        pub newlines_in_values: Option<bool>, default = None
1937        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1938        pub schema_infer_max_rec: Option<usize>, default = None
1939        pub date_format: Option<String>, default = None
1940        pub datetime_format: Option<String>, default = None
1941        pub timestamp_format: Option<String>, default = None
1942        pub timestamp_tz_format: Option<String>, default = None
1943        pub time_format: Option<String>, default = None
1944        // The output format for Nulls in the CSV writer.
1945        pub null_value: Option<String>, default = None
1946        // The input regex for Nulls when loading CSVs.
1947        pub null_regex: Option<String>, default = None
1948        pub comment: Option<u8>, default = None
1949    }
1950}
1951
1952impl CsvOptions {
1953    /// Set a limit in terms of records to scan to infer the schema
1954    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1955    pub fn with_compression(
1956        mut self,
1957        compression_type_variant: CompressionTypeVariant,
1958    ) -> Self {
1959        self.compression = compression_type_variant;
1960        self
1961    }
1962
1963    /// Set a limit in terms of records to scan to infer the schema
1964    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1965    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
1966        self.schema_infer_max_rec = Some(max_rec);
1967        self
1968    }
1969
1970    /// Set true to indicate that the first line is a header.
1971    /// - default to true
1972    pub fn with_has_header(mut self, has_header: bool) -> Self {
1973        self.has_header = Some(has_header);
1974        self
1975    }
1976
1977    /// Returns true if the first line is a header. If format options does not
1978    /// specify whether there is a header, returns `None` (indicating that the
1979    /// configuration should be consulted).
1980    pub fn has_header(&self) -> Option<bool> {
1981        self.has_header
1982    }
1983
1984    /// The character separating values within a row.
1985    /// - default to ','
1986    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
1987        self.delimiter = delimiter;
1988        self
1989    }
1990
1991    /// The quote character in a row.
1992    /// - default to '"'
1993    pub fn with_quote(mut self, quote: u8) -> Self {
1994        self.quote = quote;
1995        self
1996    }
1997
1998    /// The character that terminates a row.
1999    /// - default to None (CRLF)
2000    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2001        self.terminator = terminator;
2002        self
2003    }
2004
2005    /// The escape character in a row.
2006    /// - default is None
2007    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2008        self.escape = escape;
2009        self
2010    }
2011
2012    /// Set true to indicate that the CSV quotes should be doubled.
2013    /// - default to true
2014    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2015        self.double_quote = Some(double_quote);
2016        self
2017    }
2018
2019    /// Specifies whether newlines in (quoted) values are supported.
2020    ///
2021    /// Parsing newlines in quoted values may be affected by execution behaviour such as
2022    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2023    /// parsed successfully, which may reduce performance.
2024    ///
2025    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2026    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2027        self.newlines_in_values = Some(newlines_in_values);
2028        self
2029    }
2030
2031    /// Set a `CompressionTypeVariant` of CSV
2032    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2033    pub fn with_file_compression_type(
2034        mut self,
2035        compression: CompressionTypeVariant,
2036    ) -> Self {
2037        self.compression = compression;
2038        self
2039    }
2040
2041    /// The delimiter character.
2042    pub fn delimiter(&self) -> u8 {
2043        self.delimiter
2044    }
2045
2046    /// The quote character.
2047    pub fn quote(&self) -> u8 {
2048        self.quote
2049    }
2050
2051    /// The terminator character.
2052    pub fn terminator(&self) -> Option<u8> {
2053        self.terminator
2054    }
2055
2056    /// The escape character.
2057    pub fn escape(&self) -> Option<u8> {
2058        self.escape
2059    }
2060}
2061
2062config_namespace! {
2063    /// Options controlling JSON format
2064    pub struct JsonOptions {
2065        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2066        pub schema_infer_max_rec: Option<usize>, default = None
2067    }
2068}
2069
2070pub trait OutputFormatExt: Display {}
2071
2072#[derive(Debug, Clone, PartialEq)]
2073#[allow(clippy::large_enum_variant)]
2074pub enum OutputFormat {
2075    CSV(CsvOptions),
2076    JSON(JsonOptions),
2077    #[cfg(feature = "parquet")]
2078    PARQUET(TableParquetOptions),
2079    AVRO,
2080    ARROW,
2081}
2082
2083impl Display for OutputFormat {
2084    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2085        let out = match self {
2086            OutputFormat::CSV(_) => "csv",
2087            OutputFormat::JSON(_) => "json",
2088            #[cfg(feature = "parquet")]
2089            OutputFormat::PARQUET(_) => "parquet",
2090            OutputFormat::AVRO => "avro",
2091            OutputFormat::ARROW => "arrow",
2092        };
2093        write!(f, "{out}")
2094    }
2095}
2096
2097#[cfg(test)]
2098mod tests {
2099    use std::any::Any;
2100    use std::collections::HashMap;
2101
2102    use crate::config::{
2103        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2104        Extensions, TableOptions,
2105    };
2106
2107    #[derive(Default, Debug, Clone)]
2108    pub struct TestExtensionConfig {
2109        /// Should "foo" be replaced by "bar"?
2110        pub properties: HashMap<String, String>,
2111    }
2112
2113    impl ExtensionOptions for TestExtensionConfig {
2114        fn as_any(&self) -> &dyn Any {
2115            self
2116        }
2117
2118        fn as_any_mut(&mut self) -> &mut dyn Any {
2119            self
2120        }
2121
2122        fn cloned(&self) -> Box<dyn ExtensionOptions> {
2123            Box::new(self.clone())
2124        }
2125
2126        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2127            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2128            assert_eq!(key, "test");
2129            self.properties.insert(rem.to_owned(), value.to_owned());
2130            Ok(())
2131        }
2132
2133        fn entries(&self) -> Vec<ConfigEntry> {
2134            self.properties
2135                .iter()
2136                .map(|(k, v)| ConfigEntry {
2137                    key: k.into(),
2138                    value: Some(v.into()),
2139                    description: "",
2140                })
2141                .collect()
2142        }
2143    }
2144
2145    impl ConfigExtension for TestExtensionConfig {
2146        const PREFIX: &'static str = "test";
2147    }
2148
2149    #[test]
2150    fn create_table_config() {
2151        let mut extension = Extensions::new();
2152        extension.insert(TestExtensionConfig::default());
2153        let table_config = TableOptions::new().with_extensions(extension);
2154        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2155        assert!(kafka_config.is_some())
2156    }
2157
2158    #[test]
2159    fn alter_test_extension_config() {
2160        let mut extension = Extensions::new();
2161        extension.insert(TestExtensionConfig::default());
2162        let mut table_config = TableOptions::new().with_extensions(extension);
2163        table_config.set_config_format(ConfigFileType::CSV);
2164        table_config.set("format.delimiter", ";").unwrap();
2165        assert_eq!(table_config.csv.delimiter, b';');
2166        table_config.set("test.bootstrap.servers", "asd").unwrap();
2167        let kafka_config = table_config
2168            .extensions
2169            .get::<TestExtensionConfig>()
2170            .unwrap();
2171        assert_eq!(
2172            kafka_config.properties.get("bootstrap.servers").unwrap(),
2173            "asd"
2174        );
2175    }
2176
2177    #[test]
2178    fn csv_u8_table_options() {
2179        let mut table_config = TableOptions::new();
2180        table_config.set_config_format(ConfigFileType::CSV);
2181        table_config.set("format.delimiter", ";").unwrap();
2182        assert_eq!(table_config.csv.delimiter as char, ';');
2183        table_config.set("format.escape", "\"").unwrap();
2184        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2185        table_config.set("format.escape", "\'").unwrap();
2186        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2187    }
2188
2189    #[test]
2190    fn warning_only_not_default() {
2191        use std::sync::atomic::AtomicUsize;
2192        static COUNT: AtomicUsize = AtomicUsize::new(0);
2193        use log::{Level, LevelFilter, Metadata, Record};
2194        struct SimpleLogger;
2195        impl log::Log for SimpleLogger {
2196            fn enabled(&self, metadata: &Metadata) -> bool {
2197                metadata.level() <= Level::Info
2198            }
2199
2200            fn log(&self, record: &Record) {
2201                if self.enabled(record.metadata()) {
2202                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2203                }
2204            }
2205            fn flush(&self) {}
2206        }
2207        log::set_logger(&SimpleLogger).unwrap();
2208        log::set_max_level(LevelFilter::Info);
2209        let mut sql_parser_options = crate::config::SqlParserOptions::default();
2210        sql_parser_options
2211            .set("enable_options_value_normalization", "false")
2212            .unwrap();
2213        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2214        sql_parser_options
2215            .set("enable_options_value_normalization", "true")
2216            .unwrap();
2217        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2218    }
2219
2220    #[cfg(feature = "parquet")]
2221    #[test]
2222    fn parquet_table_options() {
2223        let mut table_config = TableOptions::new();
2224        table_config.set_config_format(ConfigFileType::PARQUET);
2225        table_config
2226            .set("format.bloom_filter_enabled::col1", "true")
2227            .unwrap();
2228        assert_eq!(
2229            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2230            Some(true)
2231        );
2232    }
2233
2234    #[cfg(feature = "parquet")]
2235    #[test]
2236    fn parquet_table_options_config_entry() {
2237        let mut table_config = TableOptions::new();
2238        table_config.set_config_format(ConfigFileType::PARQUET);
2239        table_config
2240            .set("format.bloom_filter_enabled::col1", "true")
2241            .unwrap();
2242        let entries = table_config.entries();
2243        assert!(entries
2244            .iter()
2245            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2246    }
2247
2248    #[cfg(feature = "parquet")]
2249    #[test]
2250    fn parquet_table_options_config_metadata_entry() {
2251        let mut table_config = TableOptions::new();
2252        table_config.set_config_format(ConfigFileType::PARQUET);
2253        table_config.set("format.metadata::key1", "").unwrap();
2254        table_config.set("format.metadata::key2", "value2").unwrap();
2255        table_config
2256            .set("format.metadata::key3", "value with spaces ")
2257            .unwrap();
2258        table_config
2259            .set("format.metadata::key4", "value with special chars :: :")
2260            .unwrap();
2261
2262        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2263        assert_eq!(parsed_metadata.get("should not exist1"), None);
2264        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2265        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2266        assert_eq!(
2267            parsed_metadata.get("key3"),
2268            Some(&Some("value with spaces ".into()))
2269        );
2270        assert_eq!(
2271            parsed_metadata.get("key4"),
2272            Some(&Some("value with special chars :: :".into()))
2273        );
2274
2275        // duplicate keys are overwritten
2276        table_config.set("format.metadata::key_dupe", "A").unwrap();
2277        table_config.set("format.metadata::key_dupe", "B").unwrap();
2278        let parsed_metadata = table_config.parquet.key_value_metadata;
2279        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2280    }
2281}