datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use std::any::Any;
21use std::collections::{BTreeMap, HashMap};
22use std::error::Error;
23use std::fmt::{self, Display};
24use std::str::FromStr;
25
26use crate::error::_config_err;
27use crate::parsers::CompressionTypeVariant;
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30
31/// A macro that wraps a configuration struct and automatically derives
32/// [`Default`] and [`ConfigField`] for it, allowing it to be used
33/// in the [`ConfigOptions`] configuration tree.
34///
35/// `transform` is used to normalize values before parsing.
36///
37/// For example,
38///
39/// ```ignore
40/// config_namespace! {
41///    /// Amazing config
42///    pub struct MyConfig {
43///        /// Field 1 doc
44///        field1: String, transform = str::to_lowercase, default = "".to_string()
45///
46///        /// Field 2 doc
47///        field2: usize, default = 232
48///
49///        /// Field 3 doc
50///        field3: Option<usize>, default = None
51///    }
52///}
53/// ```
54///
55/// Will generate
56///
57/// ```ignore
58/// /// Amazing config
59/// #[derive(Debug, Clone)]
60/// #[non_exhaustive]
61/// pub struct MyConfig {
62///     /// Field 1 doc
63///     field1: String,
64///     /// Field 2 doc
65///     field2: usize,
66///     /// Field 3 doc
67///     field3: Option<usize>,
68/// }
69/// impl ConfigField for MyConfig {
70///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
71///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
72///         match key {
73///             "field1" => {
74///                 let value = str::to_lowercase(value);
75///                 self.field1.set(rem, value.as_ref())
76///             },
77///             "field2" => self.field2.set(rem, value.as_ref()),
78///             "field3" => self.field3.set(rem, value.as_ref()),
79///             _ => _internal_err!(
80///                 "Config value \"{}\" not found on MyConfig",
81///                 key
82///             ),
83///         }
84///     }
85///
86///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
87///         let key = format!("{}.field1", key_prefix);
88///         let desc = "Field 1 doc";
89///         self.field1.visit(v, key.as_str(), desc);
90///         let key = format!("{}.field2", key_prefix);
91///         let desc = "Field 2 doc";
92///         self.field2.visit(v, key.as_str(), desc);
93///         let key = format!("{}.field3", key_prefix);
94///         let desc = "Field 3 doc";
95///         self.field3.visit(v, key.as_str(), desc);
96///     }
97/// }
98///
99/// impl Default for MyConfig {
100///     fn default() -> Self {
101///         Self {
102///             field1: "".to_string(),
103///             field2: 232,
104///             field3: None,
105///         }
106///     }
107/// }
108/// ```
109///
110/// NB: Misplaced commas may result in nonsensical errors
111#[macro_export]
112macro_rules! config_namespace {
113    (
114        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
115        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
116        $(#[allow($($struct_de:tt)*)])?
117        $vis:vis struct $struct_name:ident {
118            $(
119                $(#[doc = $d:tt])* // Field-level documentation attributes
120                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
121                $(#[allow($($field_de:tt)*)])?
122                $field_vis:vis $field_name:ident : $field_type:ty,
123                $(warn = $warn:expr,)?
124                $(transform = $transform:expr,)?
125                default = $default:expr
126            )*$(,)*
127        }
128    ) => {
129        $(#[doc = $struct_d])* // Apply struct documentation
130        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
131        $(#[allow($($struct_de)*)])?
132        #[derive(Debug, Clone, PartialEq)]
133        $vis struct $struct_name {
134            $(
135                $(#[doc = $d])* // Apply field documentation
136                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
137                $(#[allow($($field_de)*)])?
138                $field_vis $field_name: $field_type,
139            )*
140        }
141
142        impl $crate::config::ConfigField for $struct_name {
143            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
144                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
145                match key {
146                    $(
147                        stringify!($field_name) => {
148                            // Safely apply deprecated attribute if present
149                            // $(#[allow(deprecated)])?
150                            {
151                                $(let value = $transform(value);)? // Apply transformation if specified
152                                #[allow(deprecated)]
153                                let ret = self.$field_name.set(rem, value.as_ref());
154
155                                $(if !$warn.is_empty() {
156                                    let default: $field_type = $default;
157                                    #[allow(deprecated)]
158                                    if default != self.$field_name {
159                                        log::warn!($warn);
160                                    }
161                                })? // Log warning if specified, and the value is not the default
162                                ret
163                            }
164                        },
165                    )*
166                    _ => return $crate::error::_config_err!(
167                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
168                    )
169                }
170            }
171
172            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
173                $(
174                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
175                    let desc = concat!($($d),*).trim();
176                    #[allow(deprecated)]
177                    self.$field_name.visit(v, key.as_str(), desc);
178                )*
179            }
180        }
181        impl Default for $struct_name {
182            fn default() -> Self {
183                #[allow(deprecated)]
184                Self {
185                    $($field_name: $default),*
186                }
187            }
188        }
189    }
190}
191
192config_namespace! {
193    /// Options related to catalog and directory scanning
194    ///
195    /// See also: [`SessionConfig`]
196    ///
197    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
198    pub struct CatalogOptions {
199        /// Whether the default catalog and schema should be created automatically.
200        pub create_default_catalog_and_schema: bool, default = true
201
202        /// The default catalog name - this impacts what SQL queries use if not specified
203        pub default_catalog: String, default = "datafusion".to_string()
204
205        /// The default schema name - this impacts what SQL queries use if not specified
206        pub default_schema: String, default = "public".to_string()
207
208        /// Should DataFusion provide access to `information_schema`
209        /// virtual tables for displaying schema information
210        pub information_schema: bool, default = false
211
212        /// Location scanned to load tables for `default` schema
213        pub location: Option<String>, default = None
214
215        /// Type of `TableProvider` to use when loading `default` schema
216        pub format: Option<String>, default = None
217
218        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
219        /// if not specified explicitly in the statement.
220        pub has_header: bool, default = true
221
222        /// Specifies whether newlines in (quoted) CSV values are supported.
223        ///
224        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
225        /// if not specified explicitly in the statement.
226        ///
227        /// Parsing newlines in quoted values may be affected by execution behaviour such as
228        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
229        /// parsed successfully, which may reduce performance.
230        pub newlines_in_values: bool, default = false
231    }
232}
233
234config_namespace! {
235    /// Options related to SQL parser
236    ///
237    /// See also: [`SessionConfig`]
238    ///
239    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
240    pub struct SqlParserOptions {
241        /// When set to true, SQL parser will parse float as decimal type
242        pub parse_float_as_decimal: bool, default = false
243
244        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
245        pub enable_ident_normalization: bool, default = true
246
247        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
248        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
249        /// are normalized automatically.
250        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
251
252        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
253        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
254        pub dialect: String, default = "generic".to_string()
255        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
256
257        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
258        /// ignore the length. If false, error if a `VARCHAR` with a length is
259        /// specified. The Arrow type system does not have a notion of maximum
260        /// string length and thus DataFusion can not enforce such limits.
261        pub support_varchar_with_length: bool, default = true
262
263       /// If true, `VARCHAR` is mapped to `Utf8View` during SQL planning.
264       /// If false, `VARCHAR` is mapped to `Utf8`  during SQL planning.
265       /// Default is false.
266        pub map_varchar_to_utf8view: bool, default = false
267
268        /// When set to true, the source locations relative to the original SQL
269        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
270        /// and recorded in the logical plan nodes.
271        pub collect_spans: bool, default = false
272
273        /// Specifies the recursion depth limit when parsing complex SQL Queries
274        pub recursion_limit: usize, default = 50
275    }
276}
277
278config_namespace! {
279    /// Options related to query execution
280    ///
281    /// See also: [`SessionConfig`]
282    ///
283    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
284    pub struct ExecutionOptions {
285        /// Default batch size while creating new batches, it's especially useful for
286        /// buffer-in-memory batches since creating tiny batches would result in too much
287        /// metadata memory consumption
288        pub batch_size: usize, default = 8192
289
290        /// When set to true, record batches will be examined between each operator and
291        /// small batches will be coalesced into larger batches. This is helpful when there
292        /// are highly selective filters or joins that could produce tiny output batches. The
293        /// target batch size is determined by the configuration setting
294        pub coalesce_batches: bool, default = true
295
296        /// Should DataFusion collect statistics after listing files
297        pub collect_statistics: bool, default = false
298
299        /// Number of partitions for query execution. Increasing partitions can increase
300        /// concurrency.
301        ///
302        /// Defaults to the number of CPU cores on the system
303        pub target_partitions: usize, default = get_available_parallelism()
304
305        /// The default time zone
306        ///
307        /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
308        /// according to this time zone, and then extract the hour
309        pub time_zone: Option<String>, default = Some("+00:00".into())
310
311        /// Parquet options
312        pub parquet: ParquetOptions, default = Default::default()
313
314        /// Fan-out during initial physical planning.
315        ///
316        /// This is mostly use to plan `UNION` children in parallel.
317        ///
318        /// Defaults to the number of CPU cores on the system
319        pub planning_concurrency: usize, default = get_available_parallelism()
320
321        /// When set to true, skips verifying that the schema produced by
322        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
323        /// schema of the input plan.
324        ///
325        /// When set to false, if the schema does not match exactly
326        /// (including nullability and metadata), a planning error will be raised.
327        ///
328        /// This is used to workaround bugs in the planner that are now caught by
329        /// the new schema verification step.
330        pub skip_physical_aggregate_schema_check: bool, default = false
331
332        /// Specifies the reserved memory for each spillable sort operation to
333        /// facilitate an in-memory merge.
334        ///
335        /// When a sort operation spills to disk, the in-memory data must be
336        /// sorted and merged before being written to a file. This setting reserves
337        /// a specific amount of memory for that in-memory sort/merge process.
338        ///
339        /// Note: This setting is irrelevant if the sort operation cannot spill
340        /// (i.e., if there's no `DiskManager` configured).
341        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
342
343        /// When sorting, below what size should data be concatenated
344        /// and sorted in a single RecordBatch rather than sorted in
345        /// batches and merged.
346        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
347
348        /// Number of files to read in parallel when inferring schema and statistics
349        pub meta_fetch_concurrency: usize, default = 32
350
351        /// Guarantees a minimum level of output files running in parallel.
352        /// RecordBatches will be distributed in round robin fashion to each
353        /// parallel writer. Each writer is closed and a new file opened once
354        /// soft_max_rows_per_output_file is reached.
355        pub minimum_parallel_output_files: usize, default = 4
356
357        /// Target number of rows in output files when writing multiple.
358        /// This is a soft max, so it can be exceeded slightly. There also
359        /// will be one file smaller than the limit if the total
360        /// number of rows written is not roughly divisible by the soft max
361        pub soft_max_rows_per_output_file: usize, default = 50000000
362
363        /// This is the maximum number of RecordBatches buffered
364        /// for each output file being worked. Higher values can potentially
365        /// give faster write performance at the cost of higher peak
366        /// memory consumption
367        pub max_buffered_batches_per_output_file: usize, default = 2
368
369        /// Should sub directories be ignored when scanning directories for data
370        /// files. Defaults to true (ignores subdirectories), consistent with
371        /// Hive. Note that this setting does not affect reading partitioned
372        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
373        pub listing_table_ignore_subdirectory: bool, default = true
374
375        /// Should DataFusion support recursive CTEs
376        pub enable_recursive_ctes: bool, default = true
377
378        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
379        /// statistics into the same file groups.
380        /// Currently experimental
381        pub split_file_groups_by_statistics: bool, default = false
382
383        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
384        pub keep_partition_by_columns: bool, default = false
385
386        /// Aggregation ratio (number of distinct groups / number of input rows)
387        /// threshold for skipping partial aggregation. If the value is greater
388        /// then partial aggregation will skip aggregation for further input
389        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
390
391        /// Number of input rows partial aggregation partition should process, before
392        /// aggregation ratio check and trying to switch to skipping aggregation mode
393        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
394
395        /// Should DataFusion use row number estimates at the input to decide
396        /// whether increasing parallelism is beneficial or not. By default,
397        /// only exact row numbers (not estimates) are used for this decision.
398        /// Setting this flag to `true` will likely produce better plans.
399        /// if the source of statistics is accurate.
400        /// We plan to make this the default in the future.
401        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
402
403        /// Should DataFusion enforce batch size in joins or not. By default,
404        /// DataFusion will not enforce batch size in joins. Enforcing batch size
405        /// in joins can reduce memory usage when joining large
406        /// tables with a highly-selective join filter, but is also slightly slower.
407        pub enforce_batch_size_in_joins: bool, default = false
408    }
409}
410
411config_namespace! {
412    /// Options for reading and writing parquet files
413    ///
414    /// See also: [`SessionConfig`]
415    ///
416    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
417    pub struct ParquetOptions {
418        // The following options affect reading parquet files
419
420        /// (reading) If true, reads the Parquet data page level metadata (the
421        /// Page Index), if present, to reduce the I/O and number of
422        /// rows decoded.
423        pub enable_page_index: bool, default = true
424
425        /// (reading) If true, the parquet reader attempts to skip entire row groups based
426        /// on the predicate in the query and the metadata (min/max values) stored in
427        /// the parquet file
428        pub pruning: bool, default = true
429
430        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
431        /// the file Schema. This setting can help avoid schema conflicts when querying
432        /// multiple parquet files with schemas containing compatible types but different metadata
433        pub skip_metadata: bool, default = true
434
435        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
436        /// bytes of the parquet file optimistically. If not specified, two reads are required:
437        /// One read to fetch the 8-byte parquet footer and
438        /// another to fetch the metadata length encoded in the footer
439        pub metadata_size_hint: Option<usize>, default = None
440
441        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
442        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
443        pub pushdown_filters: bool, default = false
444
445        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
446        /// will be reordered heuristically to minimize the cost of evaluation. If false,
447        /// the filters are applied in the same order as written in the query
448        pub reorder_filters: bool, default = false
449
450        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
451        /// and `Binary/BinaryLarge` with `BinaryView`.
452        pub schema_force_view_types: bool, default = true
453
454        /// (reading) If true, parquet reader will read columns of
455        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
456        ///
457        /// Parquet files generated by some legacy writers do not correctly set
458        /// the UTF8 flag for strings, causing string columns to be loaded as
459        /// BLOB instead.
460        pub binary_as_string: bool, default = false
461
462        /// (reading) If true, parquet reader will read columns of
463        /// physical type int96 as originating from a different resolution
464        /// than nanosecond. This is useful for reading data from systems like Spark
465        /// which stores microsecond resolution timestamps in an int96 allowing it
466        /// to write values with a larger date range than 64-bit timestamps with
467        /// nanosecond resolution.
468        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
469
470        // The following options affect writing to parquet files
471        // and map to parquet::file::properties::WriterProperties
472
473        /// (writing) Sets best effort maximum size of data page in bytes
474        pub data_pagesize_limit: usize, default = 1024 * 1024
475
476        /// (writing) Sets write_batch_size in bytes
477        pub write_batch_size: usize, default = 1024
478
479        /// (writing) Sets parquet writer version
480        /// valid values are "1.0" and "2.0"
481        pub writer_version: String, default = "1.0".to_string()
482
483        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
484        ///
485        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
486        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
487        pub skip_arrow_metadata: bool, default = false
488
489        /// (writing) Sets default parquet compression codec.
490        /// Valid values are: uncompressed, snappy, gzip(level),
491        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
492        /// These values are not case sensitive. If NULL, uses
493        /// default parquet writer setting
494        ///
495        /// Note that this default setting is not the same as
496        /// the default parquet writer setting.
497        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
498
499        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
500        /// default parquet writer setting
501        pub dictionary_enabled: Option<bool>, default = Some(true)
502
503        /// (writing) Sets best effort maximum dictionary page size, in bytes
504        pub dictionary_page_size_limit: usize, default = 1024 * 1024
505
506        /// (writing) Sets if statistics are enabled for any column
507        /// Valid values are: "none", "chunk", and "page"
508        /// These values are not case sensitive. If NULL, uses
509        /// default parquet writer setting
510        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
511
512        /// (writing) Sets max statistics size for any column. If NULL, uses
513        /// default parquet writer setting
514        /// max_statistics_size is deprecated, currently it is not being used
515        // TODO: remove once deprecated
516        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
517        pub max_statistics_size: Option<usize>, default = Some(4096)
518
519        /// (writing) Target maximum number of rows in each row group (defaults to 1M
520        /// rows). Writing larger row groups requires more memory to write, but
521        /// can get better compression and be faster to read.
522        pub max_row_group_size: usize, default =  1024 * 1024
523
524        /// (writing) Sets "created by" property
525        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
526
527        /// (writing) Sets column index truncate length
528        pub column_index_truncate_length: Option<usize>, default = Some(64)
529
530        /// (writing) Sets statictics truncate length. If NULL, uses
531        /// default parquet writer setting
532        pub statistics_truncate_length: Option<usize>, default = None
533
534        /// (writing) Sets best effort maximum number of rows in data page
535        pub data_page_row_count_limit: usize, default = 20_000
536
537        /// (writing)  Sets default encoding for any column.
538        /// Valid values are: plain, plain_dictionary, rle,
539        /// bit_packed, delta_binary_packed, delta_length_byte_array,
540        /// delta_byte_array, rle_dictionary, and byte_stream_split.
541        /// These values are not case sensitive. If NULL, uses
542        /// default parquet writer setting
543        pub encoding: Option<String>, transform = str::to_lowercase, default = None
544
545        /// (writing) Use any available bloom filters when reading parquet files
546        pub bloom_filter_on_read: bool, default = true
547
548        /// (writing) Write bloom filters for all columns when creating parquet files
549        pub bloom_filter_on_write: bool, default = false
550
551        /// (writing) Sets bloom filter false positive probability. If NULL, uses
552        /// default parquet writer setting
553        pub bloom_filter_fpp: Option<f64>, default = None
554
555        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
556        /// default parquet writer setting
557        pub bloom_filter_ndv: Option<u64>, default = None
558
559        /// (writing) Controls whether DataFusion will attempt to speed up writing
560        /// parquet files by serializing them in parallel. Each column
561        /// in each row group in each output file are serialized in parallel
562        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
563        pub allow_single_file_parallelism: bool, default = true
564
565        /// (writing) By default parallel parquet writer is tuned for minimum
566        /// memory usage in a streaming execution plan. You may see
567        /// a performance benefit when writing large parquet files
568        /// by increasing maximum_parallel_row_group_writers and
569        /// maximum_buffered_record_batches_per_stream if your system
570        /// has idle cores and can tolerate additional memory usage.
571        /// Boosting these values is likely worthwhile when
572        /// writing out already in-memory data, such as from a cached
573        /// data frame.
574        pub maximum_parallel_row_group_writers: usize, default = 1
575
576        /// (writing) By default parallel parquet writer is tuned for minimum
577        /// memory usage in a streaming execution plan. You may see
578        /// a performance benefit when writing large parquet files
579        /// by increasing maximum_parallel_row_group_writers and
580        /// maximum_buffered_record_batches_per_stream if your system
581        /// has idle cores and can tolerate additional memory usage.
582        /// Boosting these values is likely worthwhile when
583        /// writing out already in-memory data, such as from a cached
584        /// data frame.
585        pub maximum_buffered_record_batches_per_stream: usize, default = 2
586    }
587}
588
589config_namespace! {
590    /// Options related to query optimization
591    ///
592    /// See also: [`SessionConfig`]
593    ///
594    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
595    pub struct OptimizerOptions {
596        /// When set to true, the optimizer will push a limit operation into
597        /// grouped aggregations which have no aggregate expressions, as a soft limit,
598        /// emitting groups once the limit is reached, before all rows in the group are read.
599        pub enable_distinct_aggregation_soft_limit: bool, default = true
600
601        /// When set to true, the physical plan optimizer will try to add round robin
602        /// repartitioning to increase parallelism to leverage more CPU cores
603        pub enable_round_robin_repartition: bool, default = true
604
605        /// When set to true, the optimizer will attempt to perform limit operations
606        /// during aggregations, if possible
607        pub enable_topk_aggregation: bool, default = true
608
609        /// When set to true, the optimizer will insert filters before a join between
610        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
611        /// filter can add additional overhead when the file format does not fully support
612        /// predicate push down.
613        pub filter_null_join_keys: bool, default = false
614
615        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
616        /// in parallel using the provided `target_partitions` level
617        pub repartition_aggregations: bool, default = true
618
619        /// Minimum total files size in bytes to perform file scan repartitioning.
620        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
621
622        /// Should DataFusion repartition data using the join keys to execute joins in parallel
623        /// using the provided `target_partitions` level
624        pub repartition_joins: bool, default = true
625
626        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
627        /// its inputs do not have any ordering or filtering If the flag is not enabled,
628        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
629        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
630        /// RightAnti, and RightSemi - being produced only at the end of the execution.
631        /// This is not typical in stream processing. Additionally, without proper design for
632        /// long runner execution, all types of joins may encounter out-of-memory errors.
633        pub allow_symmetric_joins_without_pruning: bool, default = true
634
635        /// When set to `true`, file groups will be repartitioned to achieve maximum parallelism.
636        /// Currently Parquet and CSV formats are supported.
637        ///
638        /// If set to `true`, all files will be repartitioned evenly (i.e., a single large file
639        /// might be partitioned into smaller chunks) for parallel scanning.
640        /// If set to `false`, different files will be read in parallel, but repartitioning won't
641        /// happen within a single file.
642        pub repartition_file_scans: bool, default = true
643
644        /// Should DataFusion repartition data using the partitions keys to execute window
645        /// functions in parallel using the provided `target_partitions` level
646        pub repartition_windows: bool, default = true
647
648        /// Should DataFusion execute sorts in a per-partition fashion and merge
649        /// afterwards instead of coalescing first and sorting globally.
650        /// With this flag is enabled, plans in the form below
651        ///
652        /// ```text
653        ///      "SortExec: [a@0 ASC]",
654        ///      "  CoalescePartitionsExec",
655        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
656        /// ```
657        /// would turn into the plan below which performs better in multithreaded environments
658        ///
659        /// ```text
660        ///      "SortPreservingMergeExec: [a@0 ASC]",
661        ///      "  SortExec: [a@0 ASC]",
662        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
663        /// ```
664        pub repartition_sorts: bool, default = true
665
666        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
667        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
668        /// using `SortPreservingMergeExec`)
669        ///
670        /// When false, DataFusion will maximize plan parallelism using
671        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
672        pub prefer_existing_sort: bool, default = false
673
674        /// When set to true, the logical plan optimizer will produce warning
675        /// messages if any optimization rules produce errors and then proceed to the next
676        /// rule. When set to false, any rules that produce errors will cause the query to fail
677        pub skip_failed_rules: bool, default = false
678
679        /// Number of times that the optimizer will attempt to optimize the plan
680        pub max_passes: usize, default = 3
681
682        /// When set to true, the physical plan optimizer will run a top down
683        /// process to reorder the join keys
684        pub top_down_join_key_reordering: bool, default = true
685
686        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
687        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
688        pub prefer_hash_join: bool, default = true
689
690        /// The maximum estimated size in bytes for one input side of a HashJoin
691        /// will be collected into a single partition
692        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
693
694        /// The maximum estimated size in rows for one input side of a HashJoin
695        /// will be collected into a single partition
696        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
697
698        /// The default filter selectivity used by Filter Statistics
699        /// when an exact selectivity cannot be determined. Valid values are
700        /// between 0 (no selectivity) and 100 (all rows are selected).
701        pub default_filter_selectivity: u8, default = 20
702
703        /// When set to true, the optimizer will not attempt to convert Union to Interleave
704        pub prefer_existing_union: bool, default = false
705
706        /// When set to true, if the returned type is a view type
707        /// then the output will be coerced to a non-view.
708        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
709        pub expand_views_at_output: bool, default = false
710    }
711}
712
713config_namespace! {
714    /// Options controlling explain output
715    ///
716    /// See also: [`SessionConfig`]
717    ///
718    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
719    pub struct ExplainOptions {
720        /// When set to true, the explain statement will only print logical plans
721        pub logical_plan_only: bool, default = false
722
723        /// When set to true, the explain statement will only print physical plans
724        pub physical_plan_only: bool, default = false
725
726        /// When set to true, the explain statement will print operator statistics
727        /// for physical plans
728        pub show_statistics: bool, default = false
729
730        /// When set to true, the explain statement will print the partition sizes
731        pub show_sizes: bool, default = true
732
733        /// When set to true, the explain statement will print schema information
734        pub show_schema: bool, default = false
735
736        /// Display format of explain. Default is "indent".
737        /// When set to "tree", it will print the plan in a tree-rendered format.
738        pub format: String, default = "indent".to_string()
739    }
740}
741
742/// A key value pair, with a corresponding description
743#[derive(Debug)]
744pub struct ConfigEntry {
745    /// A unique string to identify this config value
746    pub key: String,
747
748    /// The value if any
749    pub value: Option<String>,
750
751    /// A description of this configuration entry
752    pub description: &'static str,
753}
754
755/// Configuration options struct, able to store both built-in configuration and custom options
756#[derive(Debug, Clone, Default)]
757#[non_exhaustive]
758pub struct ConfigOptions {
759    /// Catalog options
760    pub catalog: CatalogOptions,
761    /// Execution options
762    pub execution: ExecutionOptions,
763    /// Optimizer options
764    pub optimizer: OptimizerOptions,
765    /// SQL parser options
766    pub sql_parser: SqlParserOptions,
767    /// Explain options
768    pub explain: ExplainOptions,
769    /// Optional extensions registered using [`Extensions::insert`]
770    pub extensions: Extensions,
771}
772
773impl ConfigField for ConfigOptions {
774    fn set(&mut self, key: &str, value: &str) -> Result<()> {
775        // Extensions are handled in the public `ConfigOptions::set`
776        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
777        match key {
778            "catalog" => self.catalog.set(rem, value),
779            "execution" => self.execution.set(rem, value),
780            "optimizer" => self.optimizer.set(rem, value),
781            "explain" => self.explain.set(rem, value),
782            "sql_parser" => self.sql_parser.set(rem, value),
783            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
784        }
785    }
786
787    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
788        self.catalog.visit(v, "datafusion.catalog", "");
789        self.execution.visit(v, "datafusion.execution", "");
790        self.optimizer.visit(v, "datafusion.optimizer", "");
791        self.explain.visit(v, "datafusion.explain", "");
792        self.sql_parser.visit(v, "datafusion.sql_parser", "");
793    }
794}
795
796impl ConfigOptions {
797    /// Creates a new [`ConfigOptions`] with default values
798    pub fn new() -> Self {
799        Self::default()
800    }
801
802    /// Set extensions to provided value
803    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
804        self.extensions = extensions;
805        self
806    }
807
808    /// Set a configuration option
809    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
810        let Some((prefix, key)) = key.split_once('.') else {
811            return _config_err!("could not find config namespace for key \"{key}\"");
812        };
813
814        if prefix == "datafusion" {
815            return ConfigField::set(self, key, value);
816        }
817
818        let Some(e) = self.extensions.0.get_mut(prefix) else {
819            return _config_err!("Could not find config namespace \"{prefix}\"");
820        };
821        e.0.set(key, value)
822    }
823
824    /// Create new ConfigOptions struct, taking values from
825    /// environment variables where possible.
826    ///
827    /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
828    /// control `datafusion.execution.batch_size`.
829    pub fn from_env() -> Result<Self> {
830        struct Visitor(Vec<String>);
831
832        impl Visit for Visitor {
833            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
834                self.0.push(key.to_string())
835            }
836
837            fn none(&mut self, key: &str, _: &'static str) {
838                self.0.push(key.to_string())
839            }
840        }
841
842        // Extract the names of all fields and then look up the corresponding
843        // environment variables. This isn't hugely efficient but avoids
844        // ambiguity between `a.b` and `a_b` which would both correspond
845        // to an environment variable of `A_B`
846
847        let mut keys = Visitor(vec![]);
848        let mut ret = Self::default();
849        ret.visit(&mut keys, "datafusion", "");
850
851        for key in keys.0 {
852            let env = key.to_uppercase().replace('.', "_");
853            if let Some(var) = std::env::var_os(env) {
854                ret.set(&key, var.to_string_lossy().as_ref())?;
855            }
856        }
857
858        Ok(ret)
859    }
860
861    /// Create new ConfigOptions struct, taking values from a string hash map.
862    ///
863    /// Only the built-in configurations will be extracted from the hash map
864    /// and other key value pairs will be ignored.
865    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
866        struct Visitor(Vec<String>);
867
868        impl Visit for Visitor {
869            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
870                self.0.push(key.to_string())
871            }
872
873            fn none(&mut self, key: &str, _: &'static str) {
874                self.0.push(key.to_string())
875            }
876        }
877
878        let mut keys = Visitor(vec![]);
879        let mut ret = Self::default();
880        ret.visit(&mut keys, "datafusion", "");
881
882        for key in keys.0 {
883            if let Some(var) = settings.get(&key) {
884                ret.set(&key, var)?;
885            }
886        }
887
888        Ok(ret)
889    }
890
891    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
892    pub fn entries(&self) -> Vec<ConfigEntry> {
893        struct Visitor(Vec<ConfigEntry>);
894
895        impl Visit for Visitor {
896            fn some<V: Display>(
897                &mut self,
898                key: &str,
899                value: V,
900                description: &'static str,
901            ) {
902                self.0.push(ConfigEntry {
903                    key: key.to_string(),
904                    value: Some(value.to_string()),
905                    description,
906                })
907            }
908
909            fn none(&mut self, key: &str, description: &'static str) {
910                self.0.push(ConfigEntry {
911                    key: key.to_string(),
912                    value: None,
913                    description,
914                })
915            }
916        }
917
918        let mut v = Visitor(vec![]);
919        self.visit(&mut v, "datafusion", "");
920
921        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
922        v.0
923    }
924
925    /// Generate documentation that can be included in the user guide
926    pub fn generate_config_markdown() -> String {
927        use std::fmt::Write as _;
928
929        let mut s = Self::default();
930
931        // Normalize for display
932        s.execution.target_partitions = 0;
933        s.execution.planning_concurrency = 0;
934
935        let mut docs = "| key | default | description |\n".to_string();
936        docs += "|-----|---------|-------------|\n";
937        let mut entries = s.entries();
938        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
939
940        for entry in s.entries() {
941            let _ = writeln!(
942                &mut docs,
943                "| {} | {} | {} |",
944                entry.key,
945                entry.value.as_deref().unwrap_or("NULL"),
946                entry.description
947            );
948        }
949        docs
950    }
951}
952
953/// [`ConfigExtension`] provides a mechanism to store third-party configuration
954/// within DataFusion [`ConfigOptions`]
955///
956/// This mechanism can be used to pass configuration to user defined functions
957/// or optimizer passes
958///
959/// # Example
960/// ```
961/// use datafusion_common::{
962///     config::ConfigExtension, extensions_options,
963///     config::ConfigOptions,
964/// };
965///  // Define a new configuration struct using the `extensions_options` macro
966///  extensions_options! {
967///     /// My own config options.
968///     pub struct MyConfig {
969///         /// Should "foo" be replaced by "bar"?
970///         pub foo_to_bar: bool, default = true
971///
972///         /// How many "baz" should be created?
973///         pub baz_count: usize, default = 1337
974///     }
975///  }
976///
977///  impl ConfigExtension for MyConfig {
978///     const PREFIX: &'static str = "my_config";
979///  }
980///
981///  // set up config struct and register extension
982///  let mut config = ConfigOptions::default();
983///  config.extensions.insert(MyConfig::default());
984///
985///  // overwrite config default
986///  config.set("my_config.baz_count", "42").unwrap();
987///
988///  // check config state
989///  let my_config = config.extensions.get::<MyConfig>().unwrap();
990///  assert!(my_config.foo_to_bar,);
991///  assert_eq!(my_config.baz_count, 42,);
992/// ```
993///
994/// # Note:
995/// Unfortunately associated constants are not currently object-safe, and so this
996/// extends the object-safe [`ExtensionOptions`]
997pub trait ConfigExtension: ExtensionOptions {
998    /// Configuration namespace prefix to use
999    ///
1000    /// All values under this will be prefixed with `$PREFIX + "."`
1001    const PREFIX: &'static str;
1002}
1003
1004/// An object-safe API for storing arbitrary configuration.
1005///
1006/// See [`ConfigExtension`] for user defined configuration
1007pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1008    /// Return `self` as [`Any`]
1009    ///
1010    /// This is needed until trait upcasting is stabilized
1011    fn as_any(&self) -> &dyn Any;
1012
1013    /// Return `self` as [`Any`]
1014    ///
1015    /// This is needed until trait upcasting is stabilized
1016    fn as_any_mut(&mut self) -> &mut dyn Any;
1017
1018    /// Return a deep clone of this [`ExtensionOptions`]
1019    ///
1020    /// It is important this does not share mutable state to avoid consistency issues
1021    /// with configuration changing whilst queries are executing
1022    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1023
1024    /// Set the given `key`, `value` pair
1025    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1026
1027    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1028    fn entries(&self) -> Vec<ConfigEntry>;
1029}
1030
1031/// A type-safe container for [`ConfigExtension`]
1032#[derive(Debug, Default, Clone)]
1033pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1034
1035impl Extensions {
1036    /// Create a new, empty [`Extensions`]
1037    pub fn new() -> Self {
1038        Self(BTreeMap::new())
1039    }
1040
1041    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1042    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1043        assert_ne!(T::PREFIX, "datafusion");
1044        let e = ExtensionBox(Box::new(extension));
1045        self.0.insert(T::PREFIX, e);
1046    }
1047
1048    /// Retrieves the extension of the given type if any
1049    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1050        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1051    }
1052
1053    /// Retrieves the extension of the given type if any
1054    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1055        let e = self.0.get_mut(T::PREFIX)?;
1056        e.0.as_any_mut().downcast_mut()
1057    }
1058}
1059
1060#[derive(Debug)]
1061struct ExtensionBox(Box<dyn ExtensionOptions>);
1062
1063impl Clone for ExtensionBox {
1064    fn clone(&self) -> Self {
1065        Self(self.0.cloned())
1066    }
1067}
1068
1069/// A trait implemented by `config_namespace` and for field types that provides
1070/// the ability to walk and mutate the configuration tree
1071pub trait ConfigField {
1072    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1073
1074    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1075}
1076
1077impl<F: ConfigField + Default> ConfigField for Option<F> {
1078    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1079        match self {
1080            Some(s) => s.visit(v, key, description),
1081            None => v.none(key, description),
1082        }
1083    }
1084
1085    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1086        self.get_or_insert_with(Default::default).set(key, value)
1087    }
1088}
1089
1090fn default_transform<T>(input: &str) -> Result<T>
1091where
1092    T: FromStr,
1093    <T as FromStr>::Err: Sync + Send + Error + 'static,
1094{
1095    input.parse().map_err(|e| {
1096        DataFusionError::Context(
1097            format!(
1098                "Error parsing '{}' as {}",
1099                input,
1100                std::any::type_name::<T>()
1101            ),
1102            Box::new(DataFusionError::External(Box::new(e))),
1103        )
1104    })
1105}
1106
1107#[macro_export]
1108macro_rules! config_field {
1109    ($t:ty) => {
1110        config_field!($t, value => default_transform(value)?);
1111    };
1112
1113    ($t:ty, $arg:ident => $transform:expr) => {
1114        impl ConfigField for $t {
1115            fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1116                v.some(key, self, description)
1117            }
1118
1119            fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
1120                *self = $transform;
1121                Ok(())
1122            }
1123        }
1124    };
1125}
1126
1127config_field!(String);
1128config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
1129config_field!(usize);
1130config_field!(f64);
1131config_field!(u64);
1132
1133impl ConfigField for u8 {
1134    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1135        v.some(key, self, description)
1136    }
1137
1138    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1139        if value.is_empty() {
1140            return Err(DataFusionError::Configuration(format!(
1141                "Input string for {} key is empty",
1142                key
1143            )));
1144        }
1145        // Check if the string is a valid number
1146        if let Ok(num) = value.parse::<u8>() {
1147            // TODO: Let's decide how we treat the numerical strings.
1148            *self = num;
1149        } else {
1150            let bytes = value.as_bytes();
1151            // Check if the first character is ASCII (single byte)
1152            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1153                return Err(DataFusionError::Configuration(format!(
1154                    "Error parsing {} as u8. Non-ASCII string provided",
1155                    value
1156                )));
1157            }
1158            *self = bytes[0];
1159        }
1160        Ok(())
1161    }
1162}
1163
1164impl ConfigField for CompressionTypeVariant {
1165    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1166        v.some(key, self, description)
1167    }
1168
1169    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1170        *self = CompressionTypeVariant::from_str(value)?;
1171        Ok(())
1172    }
1173}
1174
1175/// An implementation trait used to recursively walk configuration
1176pub trait Visit {
1177    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1178
1179    fn none(&mut self, key: &str, description: &'static str);
1180}
1181
1182/// Convenience macro to create [`ExtensionsOptions`].
1183///
1184/// The created structure implements the following traits:
1185///
1186/// - [`Clone`]
1187/// - [`Debug`]
1188/// - [`Default`]
1189/// - [`ExtensionOptions`]
1190///
1191/// # Usage
1192/// The syntax is:
1193///
1194/// ```text
1195/// extensions_options! {
1196///      /// Struct docs (optional).
1197///     [<vis>] struct <StructName> {
1198///         /// Field docs (optional)
1199///         [<vis>] <field_name>: <field_type>, default = <default_value>
1200///
1201///         ... more fields
1202///     }
1203/// }
1204/// ```
1205///
1206/// The placeholders are:
1207/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1208/// - `<StructName>`: Struct name like `MyStruct`.
1209/// - `<field_name>`: Field name like `my_field`.
1210/// - `<field_type>`: Field type like `u8`.
1211/// - `<default_value>`: Default value matching the field type like `42`.
1212///
1213/// # Example
1214/// See also a full example on the [`ConfigExtension`] documentation
1215///
1216/// ```
1217/// use datafusion_common::extensions_options;
1218///
1219/// extensions_options! {
1220///     /// My own config options.
1221///     pub struct MyConfig {
1222///         /// Should "foo" be replaced by "bar"?
1223///         pub foo_to_bar: bool, default = true
1224///
1225///         /// How many "baz" should be created?
1226///         pub baz_count: usize, default = 1337
1227///     }
1228/// }
1229/// ```
1230///
1231///
1232/// [`Debug`]: std::fmt::Debug
1233/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1234#[macro_export]
1235macro_rules! extensions_options {
1236    (
1237     $(#[doc = $struct_d:tt])*
1238     $vis:vis struct $struct_name:ident {
1239        $(
1240        $(#[doc = $d:tt])*
1241        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1242        )*$(,)*
1243    }
1244    ) => {
1245        $(#[doc = $struct_d])*
1246        #[derive(Debug, Clone)]
1247        #[non_exhaustive]
1248        $vis struct $struct_name{
1249            $(
1250            $(#[doc = $d])*
1251            $field_vis $field_name : $field_type,
1252            )*
1253        }
1254
1255        impl Default for $struct_name {
1256            fn default() -> Self {
1257                Self {
1258                    $($field_name: $default),*
1259                }
1260            }
1261        }
1262
1263        impl $crate::config::ExtensionOptions for $struct_name {
1264            fn as_any(&self) -> &dyn ::std::any::Any {
1265                self
1266            }
1267
1268            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1269                self
1270            }
1271
1272            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1273                Box::new(self.clone())
1274            }
1275
1276            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1277                $crate::config::ConfigField::set(self, key, value)
1278            }
1279
1280            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1281                struct Visitor(Vec<$crate::config::ConfigEntry>);
1282
1283                impl $crate::config::Visit for Visitor {
1284                    fn some<V: std::fmt::Display>(
1285                        &mut self,
1286                        key: &str,
1287                        value: V,
1288                        description: &'static str,
1289                    ) {
1290                        self.0.push($crate::config::ConfigEntry {
1291                            key: key.to_string(),
1292                            value: Some(value.to_string()),
1293                            description,
1294                        })
1295                    }
1296
1297                    fn none(&mut self, key: &str, description: &'static str) {
1298                        self.0.push($crate::config::ConfigEntry {
1299                            key: key.to_string(),
1300                            value: None,
1301                            description,
1302                        })
1303                    }
1304                }
1305
1306                let mut v = Visitor(vec![]);
1307                // The prefix is not used for extensions.
1308                // The description is generated in ConfigField::visit.
1309                // We can just pass empty strings here.
1310                $crate::config::ConfigField::visit(self, &mut v, "", "");
1311                v.0
1312            }
1313        }
1314
1315        impl $crate::config::ConfigField for $struct_name {
1316            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1317                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1318                match key {
1319                    $(
1320                        stringify!($field_name) => {
1321                            // Safely apply deprecated attribute if present
1322                            // $(#[allow(deprecated)])?
1323                            {
1324                                #[allow(deprecated)]
1325                                self.$field_name.set(rem, value.as_ref())
1326                            }
1327                        },
1328                    )*
1329                    _ => return $crate::error::_config_err!(
1330                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1331                    )
1332                }
1333            }
1334
1335            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1336                $(
1337                    let key = stringify!($field_name).to_string();
1338                    let desc = concat!($($d),*).trim();
1339                    #[allow(deprecated)]
1340                    self.$field_name.visit(v, key.as_str(), desc);
1341                )*
1342            }
1343        }
1344    }
1345}
1346
1347/// These file types have special built in behavior for configuration.
1348/// Use TableOptions::Extensions for configuring other file types.
1349#[derive(Debug, Clone)]
1350pub enum ConfigFileType {
1351    CSV,
1352    #[cfg(feature = "parquet")]
1353    PARQUET,
1354    JSON,
1355}
1356
1357/// Represents the configuration options available for handling different table formats within a data processing application.
1358/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1359/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1360#[derive(Debug, Clone, Default)]
1361pub struct TableOptions {
1362    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1363    /// quote character, and whether the first row is considered as headers.
1364    pub csv: CsvOptions,
1365
1366    /// Configuration options for Parquet file handling. This includes settings for compression,
1367    /// encoding, and other Parquet-specific file characteristics.
1368    pub parquet: TableParquetOptions,
1369
1370    /// Configuration options for JSON file handling.
1371    pub json: JsonOptions,
1372
1373    /// The current file format that the table operations should assume. This option allows
1374    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1375    pub current_format: Option<ConfigFileType>,
1376
1377    /// Optional extensions that can be used to extend or customize the behavior of the table
1378    /// options. Extensions can be registered using `Extensions::insert` and might include
1379    /// custom file handling logic, additional configuration parameters, or other enhancements.
1380    pub extensions: Extensions,
1381}
1382
1383impl ConfigField for TableOptions {
1384    /// Visits configuration settings for the current file format, or all formats if none is selected.
1385    ///
1386    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1387    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1388    /// it visits all available format settings.
1389    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1390        if let Some(file_type) = &self.current_format {
1391            match file_type {
1392                #[cfg(feature = "parquet")]
1393                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1394                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1395                ConfigFileType::JSON => self.json.visit(v, "format", ""),
1396            }
1397        } else {
1398            self.csv.visit(v, "csv", "");
1399            self.parquet.visit(v, "parquet", "");
1400            self.json.visit(v, "json", "");
1401        }
1402    }
1403
1404    /// Sets a configuration value for a specific key within `TableOptions`.
1405    ///
1406    /// This method delegates setting configuration values to the specific file format configurations,
1407    /// based on the current format selected. If no format is selected, it returns an error.
1408    ///
1409    /// # Parameters
1410    ///
1411    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1412    ///   for CSV format.
1413    /// * `value`: The value to set for the specified configuration key.
1414    ///
1415    /// # Returns
1416    ///
1417    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1418    /// or if setting the configuration value fails for the specific format.
1419    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1420        // Extensions are handled in the public `ConfigOptions::set`
1421        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1422        match key {
1423            "format" => {
1424                let Some(format) = &self.current_format else {
1425                    return _config_err!("Specify a format for TableOptions");
1426                };
1427                match format {
1428                    #[cfg(feature = "parquet")]
1429                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
1430                    ConfigFileType::CSV => self.csv.set(rem, value),
1431                    ConfigFileType::JSON => self.json.set(rem, value),
1432                }
1433            }
1434            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1435        }
1436    }
1437}
1438
1439impl TableOptions {
1440    /// Constructs a new instance of `TableOptions` with default settings.
1441    ///
1442    /// # Returns
1443    ///
1444    /// A new `TableOptions` instance with default configuration values.
1445    pub fn new() -> Self {
1446        Self::default()
1447    }
1448
1449    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1450    ///
1451    /// # Parameters
1452    ///
1453    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1454    ///
1455    /// # Returns
1456    ///
1457    /// A new `TableOptions` instance with settings applied from the session config.
1458    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1459        let initial = TableOptions::default();
1460        initial.combine_with_session_config(config)
1461    }
1462
1463    /// Updates the current `TableOptions` with settings from a given session config.
1464    ///
1465    /// # Parameters
1466    ///
1467    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1468    ///
1469    /// # Returns
1470    ///
1471    /// A new `TableOptions` instance with updated settings from the session config.
1472    #[must_use = "this method returns a new instance"]
1473    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1474        let mut clone = self.clone();
1475        clone.parquet.global = config.execution.parquet.clone();
1476        clone
1477    }
1478
1479    /// Sets the file format for the table.
1480    ///
1481    /// # Parameters
1482    ///
1483    /// * `format`: The file format to use (e.g., CSV, Parquet).
1484    pub fn set_config_format(&mut self, format: ConfigFileType) {
1485        self.current_format = Some(format);
1486    }
1487
1488    /// Sets the extensions for this `TableOptions` instance.
1489    ///
1490    /// # Parameters
1491    ///
1492    /// * `extensions`: The `Extensions` instance to set.
1493    ///
1494    /// # Returns
1495    ///
1496    /// A new `TableOptions` instance with the specified extensions applied.
1497    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1498        self.extensions = extensions;
1499        self
1500    }
1501
1502    /// Sets a specific configuration option.
1503    ///
1504    /// # Parameters
1505    ///
1506    /// * `key`: The configuration key (e.g., "format.delimiter").
1507    /// * `value`: The value to set for the specified key.
1508    ///
1509    /// # Returns
1510    ///
1511    /// A result indicating success or failure in setting the configuration option.
1512    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1513        let Some((prefix, _)) = key.split_once('.') else {
1514            return _config_err!("could not find config namespace for key \"{key}\"");
1515        };
1516
1517        if prefix == "format" {
1518            return ConfigField::set(self, key, value);
1519        }
1520
1521        if prefix == "execution" {
1522            return Ok(());
1523        }
1524
1525        let Some(e) = self.extensions.0.get_mut(prefix) else {
1526            return _config_err!("Could not find config namespace \"{prefix}\"");
1527        };
1528        e.0.set(key, value)
1529    }
1530
1531    /// Initializes a new `TableOptions` from a hash map of string settings.
1532    ///
1533    /// # Parameters
1534    ///
1535    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1536    ///
1537    /// # Returns
1538    ///
1539    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1540    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1541        let mut ret = Self::default();
1542        for (k, v) in settings {
1543            ret.set(k, v)?;
1544        }
1545
1546        Ok(ret)
1547    }
1548
1549    /// Modifies the current `TableOptions` instance with settings from a hash map.
1550    ///
1551    /// # Parameters
1552    ///
1553    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1554    ///
1555    /// # Returns
1556    ///
1557    /// A result indicating success or failure in applying the settings.
1558    pub fn alter_with_string_hash_map(
1559        &mut self,
1560        settings: &HashMap<String, String>,
1561    ) -> Result<()> {
1562        for (k, v) in settings {
1563            self.set(k, v)?;
1564        }
1565        Ok(())
1566    }
1567
1568    /// Retrieves all configuration entries from this `TableOptions`.
1569    ///
1570    /// # Returns
1571    ///
1572    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1573    pub fn entries(&self) -> Vec<ConfigEntry> {
1574        struct Visitor(Vec<ConfigEntry>);
1575
1576        impl Visit for Visitor {
1577            fn some<V: Display>(
1578                &mut self,
1579                key: &str,
1580                value: V,
1581                description: &'static str,
1582            ) {
1583                self.0.push(ConfigEntry {
1584                    key: key.to_string(),
1585                    value: Some(value.to_string()),
1586                    description,
1587                })
1588            }
1589
1590            fn none(&mut self, key: &str, description: &'static str) {
1591                self.0.push(ConfigEntry {
1592                    key: key.to_string(),
1593                    value: None,
1594                    description,
1595                })
1596            }
1597        }
1598
1599        let mut v = Visitor(vec![]);
1600        self.visit(&mut v, "format", "");
1601
1602        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1603        v.0
1604    }
1605}
1606
1607/// Options that control how Parquet files are read, including global options
1608/// that apply to all columns and optional column-specific overrides
1609///
1610/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1611/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1612/// (e.g. sorting_columns).
1613#[derive(Clone, Default, Debug, PartialEq)]
1614pub struct TableParquetOptions {
1615    /// Global Parquet options that propagates to all columns.
1616    pub global: ParquetOptions,
1617    /// Column specific options. Default usage is parquet.XX::column.
1618    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1619    /// Additional file-level metadata to include. Inserted into the key_value_metadata
1620    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1621    ///
1622    /// Multiple entries are permitted
1623    /// ```sql
1624    /// OPTIONS (
1625    ///    'format.metadata::key1' '',
1626    ///    'format.metadata::key2' 'value',
1627    ///    'format.metadata::key3' 'value has spaces',
1628    ///    'format.metadata::key4' 'value has special chars :: :',
1629    ///    'format.metadata::key_dupe' 'original will be overwritten',
1630    ///    'format.metadata::key_dupe' 'final'
1631    /// )
1632    /// ```
1633    pub key_value_metadata: HashMap<String, Option<String>>,
1634}
1635
1636impl TableParquetOptions {
1637    /// Return new default TableParquetOptions
1638    pub fn new() -> Self {
1639        Self::default()
1640    }
1641
1642    /// Set whether the encoding of the arrow metadata should occur
1643    /// during the writing of parquet.
1644    ///
1645    /// Default is to encode the arrow schema in the file kv_metadata.
1646    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1647        Self {
1648            global: ParquetOptions {
1649                skip_arrow_metadata: skip,
1650                ..self.global
1651            },
1652            ..self
1653        }
1654    }
1655}
1656
1657impl ConfigField for TableParquetOptions {
1658    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1659        self.global.visit(v, key_prefix, description);
1660        self.column_specific_options
1661            .visit(v, key_prefix, description)
1662    }
1663
1664    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1665        // Determine if the key is a global, metadata, or column-specific setting
1666        if key.starts_with("metadata::") {
1667            let k = match key.split("::").collect::<Vec<_>>()[..] {
1668                [_meta] | [_meta, ""] => {
1669                    return _config_err!(
1670                        "Invalid metadata key provided, missing key in metadata::<key>"
1671                    )
1672                }
1673                [_meta, k] => k.into(),
1674                _ => {
1675                    return _config_err!(
1676                        "Invalid metadata key provided, found too many '::' in \"{key}\""
1677                    )
1678                }
1679            };
1680            self.key_value_metadata.insert(k, Some(value.into()));
1681            Ok(())
1682        } else if key.contains("::") {
1683            self.column_specific_options.set(key, value)
1684        } else {
1685            self.global.set(key, value)
1686        }
1687    }
1688}
1689
1690macro_rules! config_namespace_with_hashmap {
1691    (
1692     $(#[doc = $struct_d:tt])*
1693     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
1694     $vis:vis struct $struct_name:ident {
1695        $(
1696        $(#[doc = $d:tt])*
1697        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1698        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1699        )*$(,)*
1700    }
1701    ) => {
1702
1703        $(#[doc = $struct_d])*
1704        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
1705        #[derive(Debug, Clone, PartialEq)]
1706        $vis struct $struct_name{
1707            $(
1708            $(#[doc = $d])*
1709            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1710            $field_vis $field_name : $field_type,
1711            )*
1712        }
1713
1714        impl ConfigField for $struct_name {
1715            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1716                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1717                match key {
1718                    $(
1719                       stringify!($field_name) => {
1720                           // Handle deprecated fields
1721                           #[allow(deprecated)] // Allow deprecated fields
1722                           $(let value = $transform(value);)?
1723                           self.$field_name.set(rem, value.as_ref())
1724                       },
1725                    )*
1726                    _ => _config_err!(
1727                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1728                    )
1729                }
1730            }
1731
1732            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1733                $(
1734                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1735                let desc = concat!($($d),*).trim();
1736                // Handle deprecated fields
1737                #[allow(deprecated)]
1738                self.$field_name.visit(v, key.as_str(), desc);
1739                )*
1740            }
1741        }
1742
1743        impl Default for $struct_name {
1744            fn default() -> Self {
1745                #[allow(deprecated)]
1746                Self {
1747                    $($field_name: $default),*
1748                }
1749            }
1750        }
1751
1752        impl ConfigField for HashMap<String,$struct_name> {
1753            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1754                let parts: Vec<&str> = key.splitn(2, "::").collect();
1755                match parts.as_slice() {
1756                    [inner_key, hashmap_key] => {
1757                        // Get or create the struct for the specified key
1758                        let inner_value = self
1759                            .entry((*hashmap_key).to_owned())
1760                            .or_insert_with($struct_name::default);
1761
1762                        inner_value.set(inner_key, value)
1763                    }
1764                    _ => _config_err!("Unrecognized key '{key}'."),
1765                }
1766            }
1767
1768            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1769                for (column_name, col_options) in self {
1770                    $(
1771                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1772                    let desc = concat!($($d),*).trim();
1773                    #[allow(deprecated)]
1774                    col_options.$field_name.visit(v, key.as_str(), desc);
1775                    )*
1776                }
1777            }
1778        }
1779    }
1780}
1781
1782config_namespace_with_hashmap! {
1783    /// Options controlling parquet format for individual columns.
1784    ///
1785    /// See [`ParquetOptions`] for more details
1786    pub struct ParquetColumnOptions {
1787        /// Sets if bloom filter is enabled for the column path.
1788        pub bloom_filter_enabled: Option<bool>, default = None
1789
1790        /// Sets encoding for the column path.
1791        /// Valid values are: plain, plain_dictionary, rle,
1792        /// bit_packed, delta_binary_packed, delta_length_byte_array,
1793        /// delta_byte_array, rle_dictionary, and byte_stream_split.
1794        /// These values are not case-sensitive. If NULL, uses
1795        /// default parquet options
1796        pub encoding: Option<String>, default = None
1797
1798        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
1799        /// default parquet options
1800        pub dictionary_enabled: Option<bool>, default = None
1801
1802        /// Sets default parquet compression codec for the column path.
1803        /// Valid values are: uncompressed, snappy, gzip(level),
1804        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
1805        /// These values are not case-sensitive. If NULL, uses
1806        /// default parquet options
1807        pub compression: Option<String>, transform = str::to_lowercase, default = None
1808
1809        /// Sets if statistics are enabled for the column
1810        /// Valid values are: "none", "chunk", and "page"
1811        /// These values are not case sensitive. If NULL, uses
1812        /// default parquet options
1813        pub statistics_enabled: Option<String>, default = None
1814
1815        /// Sets bloom filter false positive probability for the column path. If NULL, uses
1816        /// default parquet options
1817        pub bloom_filter_fpp: Option<f64>, default = None
1818
1819        /// Sets bloom filter number of distinct values. If NULL, uses
1820        /// default parquet options
1821        pub bloom_filter_ndv: Option<u64>, default = None
1822
1823        /// Sets max statistics size for the column path. If NULL, uses
1824        /// default parquet options
1825        /// max_statistics_size is deprecated, currently it is not being used
1826        // TODO: remove once deprecated
1827        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
1828        pub max_statistics_size: Option<usize>, default = None
1829    }
1830}
1831
1832config_namespace! {
1833    /// Options controlling CSV format
1834    pub struct CsvOptions {
1835        /// Specifies whether there is a CSV header (i.e. the first line
1836        /// consists of is column names). The value `None` indicates that
1837        /// the configuration should be consulted.
1838        pub has_header: Option<bool>, default = None
1839        pub delimiter: u8, default = b','
1840        pub quote: u8, default = b'"'
1841        pub terminator: Option<u8>, default = None
1842        pub escape: Option<u8>, default = None
1843        pub double_quote: Option<bool>, default = None
1844        /// Specifies whether newlines in (quoted) values are supported.
1845        ///
1846        /// Parsing newlines in quoted values may be affected by execution behaviour such as
1847        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1848        /// parsed successfully, which may reduce performance.
1849        ///
1850        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1851        pub newlines_in_values: Option<bool>, default = None
1852        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1853        pub schema_infer_max_rec: Option<usize>, default = None
1854        pub date_format: Option<String>, default = None
1855        pub datetime_format: Option<String>, default = None
1856        pub timestamp_format: Option<String>, default = None
1857        pub timestamp_tz_format: Option<String>, default = None
1858        pub time_format: Option<String>, default = None
1859        // The output format for Nulls in the CSV writer.
1860        pub null_value: Option<String>, default = None
1861        // The input regex for Nulls when loading CSVs.
1862        pub null_regex: Option<String>, default = None
1863        pub comment: Option<u8>, default = None
1864    }
1865}
1866
1867impl CsvOptions {
1868    /// Set a limit in terms of records to scan to infer the schema
1869    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1870    pub fn with_compression(
1871        mut self,
1872        compression_type_variant: CompressionTypeVariant,
1873    ) -> Self {
1874        self.compression = compression_type_variant;
1875        self
1876    }
1877
1878    /// Set a limit in terms of records to scan to infer the schema
1879    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1880    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
1881        self.schema_infer_max_rec = Some(max_rec);
1882        self
1883    }
1884
1885    /// Set true to indicate that the first line is a header.
1886    /// - default to true
1887    pub fn with_has_header(mut self, has_header: bool) -> Self {
1888        self.has_header = Some(has_header);
1889        self
1890    }
1891
1892    /// Returns true if the first line is a header. If format options does not
1893    /// specify whether there is a header, returns `None` (indicating that the
1894    /// configuration should be consulted).
1895    pub fn has_header(&self) -> Option<bool> {
1896        self.has_header
1897    }
1898
1899    /// The character separating values within a row.
1900    /// - default to ','
1901    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
1902        self.delimiter = delimiter;
1903        self
1904    }
1905
1906    /// The quote character in a row.
1907    /// - default to '"'
1908    pub fn with_quote(mut self, quote: u8) -> Self {
1909        self.quote = quote;
1910        self
1911    }
1912
1913    /// The character that terminates a row.
1914    /// - default to None (CRLF)
1915    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
1916        self.terminator = terminator;
1917        self
1918    }
1919
1920    /// The escape character in a row.
1921    /// - default is None
1922    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
1923        self.escape = escape;
1924        self
1925    }
1926
1927    /// Set true to indicate that the CSV quotes should be doubled.
1928    /// - default to true
1929    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
1930        self.double_quote = Some(double_quote);
1931        self
1932    }
1933
1934    /// Specifies whether newlines in (quoted) values are supported.
1935    ///
1936    /// Parsing newlines in quoted values may be affected by execution behaviour such as
1937    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1938    /// parsed successfully, which may reduce performance.
1939    ///
1940    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1941    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
1942        self.newlines_in_values = Some(newlines_in_values);
1943        self
1944    }
1945
1946    /// Set a `CompressionTypeVariant` of CSV
1947    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
1948    pub fn with_file_compression_type(
1949        mut self,
1950        compression: CompressionTypeVariant,
1951    ) -> Self {
1952        self.compression = compression;
1953        self
1954    }
1955
1956    /// The delimiter character.
1957    pub fn delimiter(&self) -> u8 {
1958        self.delimiter
1959    }
1960
1961    /// The quote character.
1962    pub fn quote(&self) -> u8 {
1963        self.quote
1964    }
1965
1966    /// The terminator character.
1967    pub fn terminator(&self) -> Option<u8> {
1968        self.terminator
1969    }
1970
1971    /// The escape character.
1972    pub fn escape(&self) -> Option<u8> {
1973        self.escape
1974    }
1975}
1976
1977config_namespace! {
1978    /// Options controlling JSON format
1979    pub struct JsonOptions {
1980        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1981        pub schema_infer_max_rec: Option<usize>, default = None
1982    }
1983}
1984
1985pub trait FormatOptionsExt: Display {}
1986
1987#[derive(Debug, Clone, PartialEq)]
1988#[allow(clippy::large_enum_variant)]
1989pub enum FormatOptions {
1990    CSV(CsvOptions),
1991    JSON(JsonOptions),
1992    #[cfg(feature = "parquet")]
1993    PARQUET(TableParquetOptions),
1994    AVRO,
1995    ARROW,
1996}
1997
1998impl Display for FormatOptions {
1999    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2000        let out = match self {
2001            FormatOptions::CSV(_) => "csv",
2002            FormatOptions::JSON(_) => "json",
2003            #[cfg(feature = "parquet")]
2004            FormatOptions::PARQUET(_) => "parquet",
2005            FormatOptions::AVRO => "avro",
2006            FormatOptions::ARROW => "arrow",
2007        };
2008        write!(f, "{}", out)
2009    }
2010}
2011
2012#[cfg(test)]
2013mod tests {
2014    use std::any::Any;
2015    use std::collections::HashMap;
2016
2017    use crate::config::{
2018        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2019        Extensions, TableOptions,
2020    };
2021
2022    #[derive(Default, Debug, Clone)]
2023    pub struct TestExtensionConfig {
2024        /// Should "foo" be replaced by "bar"?
2025        pub properties: HashMap<String, String>,
2026    }
2027
2028    impl ExtensionOptions for TestExtensionConfig {
2029        fn as_any(&self) -> &dyn Any {
2030            self
2031        }
2032
2033        fn as_any_mut(&mut self) -> &mut dyn Any {
2034            self
2035        }
2036
2037        fn cloned(&self) -> Box<dyn ExtensionOptions> {
2038            Box::new(self.clone())
2039        }
2040
2041        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2042            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2043            assert_eq!(key, "test");
2044            self.properties.insert(rem.to_owned(), value.to_owned());
2045            Ok(())
2046        }
2047
2048        fn entries(&self) -> Vec<ConfigEntry> {
2049            self.properties
2050                .iter()
2051                .map(|(k, v)| ConfigEntry {
2052                    key: k.into(),
2053                    value: Some(v.into()),
2054                    description: "",
2055                })
2056                .collect()
2057        }
2058    }
2059
2060    impl ConfigExtension for TestExtensionConfig {
2061        const PREFIX: &'static str = "test";
2062    }
2063
2064    #[test]
2065    fn create_table_config() {
2066        let mut extension = Extensions::new();
2067        extension.insert(TestExtensionConfig::default());
2068        let table_config = TableOptions::new().with_extensions(extension);
2069        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2070        assert!(kafka_config.is_some())
2071    }
2072
2073    #[test]
2074    fn alter_test_extension_config() {
2075        let mut extension = Extensions::new();
2076        extension.insert(TestExtensionConfig::default());
2077        let mut table_config = TableOptions::new().with_extensions(extension);
2078        table_config.set_config_format(ConfigFileType::CSV);
2079        table_config.set("format.delimiter", ";").unwrap();
2080        assert_eq!(table_config.csv.delimiter, b';');
2081        table_config.set("test.bootstrap.servers", "asd").unwrap();
2082        let kafka_config = table_config
2083            .extensions
2084            .get::<TestExtensionConfig>()
2085            .unwrap();
2086        assert_eq!(
2087            kafka_config.properties.get("bootstrap.servers").unwrap(),
2088            "asd"
2089        );
2090    }
2091
2092    #[test]
2093    fn csv_u8_table_options() {
2094        let mut table_config = TableOptions::new();
2095        table_config.set_config_format(ConfigFileType::CSV);
2096        table_config.set("format.delimiter", ";").unwrap();
2097        assert_eq!(table_config.csv.delimiter as char, ';');
2098        table_config.set("format.escape", "\"").unwrap();
2099        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2100        table_config.set("format.escape", "\'").unwrap();
2101        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2102    }
2103
2104    #[test]
2105    fn warning_only_not_default() {
2106        use std::sync::atomic::AtomicUsize;
2107        static COUNT: AtomicUsize = AtomicUsize::new(0);
2108        use log::{Level, LevelFilter, Metadata, Record};
2109        struct SimpleLogger;
2110        impl log::Log for SimpleLogger {
2111            fn enabled(&self, metadata: &Metadata) -> bool {
2112                metadata.level() <= Level::Info
2113            }
2114
2115            fn log(&self, record: &Record) {
2116                if self.enabled(record.metadata()) {
2117                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2118                }
2119            }
2120            fn flush(&self) {}
2121        }
2122        log::set_logger(&SimpleLogger).unwrap();
2123        log::set_max_level(LevelFilter::Info);
2124        let mut sql_parser_options = crate::config::SqlParserOptions::default();
2125        sql_parser_options
2126            .set("enable_options_value_normalization", "false")
2127            .unwrap();
2128        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2129        sql_parser_options
2130            .set("enable_options_value_normalization", "true")
2131            .unwrap();
2132        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2133    }
2134
2135    #[cfg(feature = "parquet")]
2136    #[test]
2137    fn parquet_table_options() {
2138        let mut table_config = TableOptions::new();
2139        table_config.set_config_format(ConfigFileType::PARQUET);
2140        table_config
2141            .set("format.bloom_filter_enabled::col1", "true")
2142            .unwrap();
2143        assert_eq!(
2144            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2145            Some(true)
2146        );
2147    }
2148
2149    #[cfg(feature = "parquet")]
2150    #[test]
2151    fn parquet_table_options_config_entry() {
2152        let mut table_config = TableOptions::new();
2153        table_config.set_config_format(ConfigFileType::PARQUET);
2154        table_config
2155            .set("format.bloom_filter_enabled::col1", "true")
2156            .unwrap();
2157        let entries = table_config.entries();
2158        assert!(entries
2159            .iter()
2160            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2161    }
2162
2163    #[cfg(feature = "parquet")]
2164    #[test]
2165    fn parquet_table_options_config_metadata_entry() {
2166        let mut table_config = TableOptions::new();
2167        table_config.set_config_format(ConfigFileType::PARQUET);
2168        table_config.set("format.metadata::key1", "").unwrap();
2169        table_config.set("format.metadata::key2", "value2").unwrap();
2170        table_config
2171            .set("format.metadata::key3", "value with spaces ")
2172            .unwrap();
2173        table_config
2174            .set("format.metadata::key4", "value with special chars :: :")
2175            .unwrap();
2176
2177        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2178        assert_eq!(parsed_metadata.get("should not exist1"), None);
2179        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2180        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2181        assert_eq!(
2182            parsed_metadata.get("key3"),
2183            Some(&Some("value with spaces ".into()))
2184        );
2185        assert_eq!(
2186            parsed_metadata.get("key4"),
2187            Some(&Some("value with special chars :: :".into()))
2188        );
2189
2190        // duplicate keys are overwritten
2191        table_config.set("format.metadata::key_dupe", "A").unwrap();
2192        table_config.set("format.metadata::key_dupe", "B").unwrap();
2193        let parsed_metadata = table_config.parquet.key_value_metadata;
2194        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2195    }
2196}