datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::parsers::CompressionTypeVariant;
26use crate::utils::get_available_parallelism;
27use crate::{DataFusionError, Result};
28use std::any::Any;
29use std::collections::{BTreeMap, HashMap};
30use std::error::Error;
31use std::fmt::{self, Display};
32use std::str::FromStr;
33
34#[cfg(feature = "parquet_encryption")]
35use hex;
36
37/// A macro that wraps a configuration struct and automatically derives
38/// [`Default`] and [`ConfigField`] for it, allowing it to be used
39/// in the [`ConfigOptions`] configuration tree.
40///
41/// `transform` is used to normalize values before parsing.
42///
43/// For example,
44///
45/// ```ignore
46/// config_namespace! {
47///    /// Amazing config
48///    pub struct MyConfig {
49///        /// Field 1 doc
50///        field1: String, transform = str::to_lowercase, default = "".to_string()
51///
52///        /// Field 2 doc
53///        field2: usize, default = 232
54///
55///        /// Field 3 doc
56///        field3: Option<usize>, default = None
57///    }
58///}
59/// ```
60///
61/// Will generate
62///
63/// ```ignore
64/// /// Amazing config
65/// #[derive(Debug, Clone)]
66/// #[non_exhaustive]
67/// pub struct MyConfig {
68///     /// Field 1 doc
69///     field1: String,
70///     /// Field 2 doc
71///     field2: usize,
72///     /// Field 3 doc
73///     field3: Option<usize>,
74/// }
75/// impl ConfigField for MyConfig {
76///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
77///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
78///         match key {
79///             "field1" => {
80///                 let value = str::to_lowercase(value);
81///                 self.field1.set(rem, value.as_ref())
82///             },
83///             "field2" => self.field2.set(rem, value.as_ref()),
84///             "field3" => self.field3.set(rem, value.as_ref()),
85///             _ => _internal_err!(
86///                 "Config value \"{}\" not found on MyConfig",
87///                 key
88///             ),
89///         }
90///     }
91///
92///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
93///         let key = format!("{}.field1", key_prefix);
94///         let desc = "Field 1 doc";
95///         self.field1.visit(v, key.as_str(), desc);
96///         let key = format!("{}.field2", key_prefix);
97///         let desc = "Field 2 doc";
98///         self.field2.visit(v, key.as_str(), desc);
99///         let key = format!("{}.field3", key_prefix);
100///         let desc = "Field 3 doc";
101///         self.field3.visit(v, key.as_str(), desc);
102///     }
103/// }
104///
105/// impl Default for MyConfig {
106///     fn default() -> Self {
107///         Self {
108///             field1: "".to_string(),
109///             field2: 232,
110///             field3: None,
111///         }
112///     }
113/// }
114/// ```
115///
116/// NB: Misplaced commas may result in nonsensical errors
117#[macro_export]
118macro_rules! config_namespace {
119    (
120        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
121        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
122        $(#[allow($($struct_de:tt)*)])?
123        $vis:vis struct $struct_name:ident {
124            $(
125                $(#[doc = $d:tt])* // Field-level documentation attributes
126                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
127                $(#[allow($($field_de:tt)*)])?
128                $field_vis:vis $field_name:ident : $field_type:ty,
129                $(warn = $warn:expr,)?
130                $(transform = $transform:expr,)?
131                default = $default:expr
132            )*$(,)*
133        }
134    ) => {
135        $(#[doc = $struct_d])* // Apply struct documentation
136        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
137        $(#[allow($($struct_de)*)])?
138        #[derive(Debug, Clone, PartialEq)]
139        $vis struct $struct_name {
140            $(
141                $(#[doc = $d])* // Apply field documentation
142                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
143                $(#[allow($($field_de)*)])?
144                $field_vis $field_name: $field_type,
145            )*
146        }
147
148        impl $crate::config::ConfigField for $struct_name {
149            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
150                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
151                match key {
152                    $(
153                        stringify!($field_name) => {
154                            // Safely apply deprecated attribute if present
155                            // $(#[allow(deprecated)])?
156                            {
157                                $(let value = $transform(value);)? // Apply transformation if specified
158                                #[allow(deprecated)]
159                                let ret = self.$field_name.set(rem, value.as_ref());
160
161                                $(if !$warn.is_empty() {
162                                    let default: $field_type = $default;
163                                    #[allow(deprecated)]
164                                    if default != self.$field_name {
165                                        log::warn!($warn);
166                                    }
167                                })? // Log warning if specified, and the value is not the default
168                                ret
169                            }
170                        },
171                    )*
172                    _ => return $crate::error::_config_err!(
173                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
174                    )
175                }
176            }
177
178            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
179                $(
180                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
181                    let desc = concat!($($d),*).trim();
182                    #[allow(deprecated)]
183                    self.$field_name.visit(v, key.as_str(), desc);
184                )*
185            }
186        }
187        impl Default for $struct_name {
188            fn default() -> Self {
189                #[allow(deprecated)]
190                Self {
191                    $($field_name: $default),*
192                }
193            }
194        }
195    }
196}
197config_namespace! {
198    /// Options related to catalog and directory scanning
199    ///
200    /// See also: [`SessionConfig`]
201    ///
202    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
203    pub struct CatalogOptions {
204        /// Whether the default catalog and schema should be created automatically.
205        pub create_default_catalog_and_schema: bool, default = true
206
207        /// The default catalog name - this impacts what SQL queries use if not specified
208        pub default_catalog: String, default = "datafusion".to_string()
209
210        /// The default schema name - this impacts what SQL queries use if not specified
211        pub default_schema: String, default = "public".to_string()
212
213        /// Should DataFusion provide access to `information_schema`
214        /// virtual tables for displaying schema information
215        pub information_schema: bool, default = false
216
217        /// Location scanned to load tables for `default` schema
218        pub location: Option<String>, default = None
219
220        /// Type of `TableProvider` to use when loading `default` schema
221        pub format: Option<String>, default = None
222
223        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
224        /// if not specified explicitly in the statement.
225        pub has_header: bool, default = true
226
227        /// Specifies whether newlines in (quoted) CSV values are supported.
228        ///
229        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
230        /// if not specified explicitly in the statement.
231        ///
232        /// Parsing newlines in quoted values may be affected by execution behaviour such as
233        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
234        /// parsed successfully, which may reduce performance.
235        pub newlines_in_values: bool, default = false
236    }
237}
238
239config_namespace! {
240    /// Options related to SQL parser
241    ///
242    /// See also: [`SessionConfig`]
243    ///
244    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
245    pub struct SqlParserOptions {
246        /// When set to true, SQL parser will parse float as decimal type
247        pub parse_float_as_decimal: bool, default = false
248
249        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
250        pub enable_ident_normalization: bool, default = true
251
252        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
253        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
254        /// are normalized automatically.
255        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
256
257        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
258        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
259        pub dialect: String, default = "generic".to_string()
260        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
261
262        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
263        /// ignore the length. If false, error if a `VARCHAR` with a length is
264        /// specified. The Arrow type system does not have a notion of maximum
265        /// string length and thus DataFusion can not enforce such limits.
266        pub support_varchar_with_length: bool, default = true
267
268        /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
269        /// If false, they are mapped to `Utf8`.
270        /// Default is true.
271        pub map_string_types_to_utf8view: bool, default = true
272
273        /// When set to true, the source locations relative to the original SQL
274        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
275        /// and recorded in the logical plan nodes.
276        pub collect_spans: bool, default = false
277
278        /// Specifies the recursion depth limit when parsing complex SQL Queries
279        pub recursion_limit: usize, default = 50
280    }
281}
282
283#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
284pub enum SpillCompression {
285    Zstd,
286    Lz4Frame,
287    #[default]
288    Uncompressed,
289}
290
291impl FromStr for SpillCompression {
292    type Err = DataFusionError;
293
294    fn from_str(s: &str) -> Result<Self, Self::Err> {
295        match s.to_ascii_lowercase().as_str() {
296            "zstd" => Ok(Self::Zstd),
297            "lz4_frame" => Ok(Self::Lz4Frame),
298            "uncompressed" | "" => Ok(Self::Uncompressed),
299            other => Err(DataFusionError::Configuration(format!(
300                "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
301            ))),
302        }
303    }
304}
305
306impl ConfigField for SpillCompression {
307    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
308        v.some(key, self, description)
309    }
310
311    fn set(&mut self, _: &str, value: &str) -> Result<()> {
312        *self = SpillCompression::from_str(value)?;
313        Ok(())
314    }
315}
316
317impl Display for SpillCompression {
318    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319        let str = match self {
320            Self::Zstd => "zstd",
321            Self::Lz4Frame => "lz4_frame",
322            Self::Uncompressed => "uncompressed",
323        };
324        write!(f, "{str}")
325    }
326}
327
328impl From<SpillCompression> for Option<CompressionType> {
329    fn from(c: SpillCompression) -> Self {
330        match c {
331            SpillCompression::Zstd => Some(CompressionType::ZSTD),
332            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
333            SpillCompression::Uncompressed => None,
334        }
335    }
336}
337
338config_namespace! {
339    /// Options related to query execution
340    ///
341    /// See also: [`SessionConfig`]
342    ///
343    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
344    pub struct ExecutionOptions {
345        /// Default batch size while creating new batches, it's especially useful for
346        /// buffer-in-memory batches since creating tiny batches would result in too much
347        /// metadata memory consumption
348        pub batch_size: usize, default = 8192
349
350        /// When set to true, record batches will be examined between each operator and
351        /// small batches will be coalesced into larger batches. This is helpful when there
352        /// are highly selective filters or joins that could produce tiny output batches. The
353        /// target batch size is determined by the configuration setting
354        pub coalesce_batches: bool, default = true
355
356        /// Should DataFusion collect statistics when first creating a table.
357        /// Has no effect after the table is created. Applies to the default
358        /// `ListingTableProvider` in DataFusion. Defaults to true.
359        pub collect_statistics: bool, default = true
360
361        /// Number of partitions for query execution. Increasing partitions can increase
362        /// concurrency.
363        ///
364        /// Defaults to the number of CPU cores on the system
365        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
366
367        /// The default time zone
368        ///
369        /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
370        /// according to this time zone, and then extract the hour
371        pub time_zone: String, default = "+00:00".into()
372
373        /// Parquet options
374        pub parquet: ParquetOptions, default = Default::default()
375
376        /// Fan-out during initial physical planning.
377        ///
378        /// This is mostly use to plan `UNION` children in parallel.
379        ///
380        /// Defaults to the number of CPU cores on the system
381        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
382
383        /// When set to true, skips verifying that the schema produced by
384        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
385        /// schema of the input plan.
386        ///
387        /// When set to false, if the schema does not match exactly
388        /// (including nullability and metadata), a planning error will be raised.
389        ///
390        /// This is used to workaround bugs in the planner that are now caught by
391        /// the new schema verification step.
392        pub skip_physical_aggregate_schema_check: bool, default = false
393
394        /// Sets the compression codec used when spilling data to disk.
395        ///
396        /// Since datafusion writes spill files using the Arrow IPC Stream format,
397        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
398        /// Valid values are: uncompressed, lz4_frame, zstd.
399        /// Note: lz4_frame offers faster (de)compression, but typically results in
400        /// larger spill files. In contrast, zstd achieves
401        /// higher compression ratios at the cost of slower (de)compression speed.
402        pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
403
404        /// Specifies the reserved memory for each spillable sort operation to
405        /// facilitate an in-memory merge.
406        ///
407        /// When a sort operation spills to disk, the in-memory data must be
408        /// sorted and merged before being written to a file. This setting reserves
409        /// a specific amount of memory for that in-memory sort/merge process.
410        ///
411        /// Note: This setting is irrelevant if the sort operation cannot spill
412        /// (i.e., if there's no `DiskManager` configured).
413        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
414
415        /// When sorting, below what size should data be concatenated
416        /// and sorted in a single RecordBatch rather than sorted in
417        /// batches and merged.
418        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
419
420        /// Number of files to read in parallel when inferring schema and statistics
421        pub meta_fetch_concurrency: usize, default = 32
422
423        /// Guarantees a minimum level of output files running in parallel.
424        /// RecordBatches will be distributed in round robin fashion to each
425        /// parallel writer. Each writer is closed and a new file opened once
426        /// soft_max_rows_per_output_file is reached.
427        pub minimum_parallel_output_files: usize, default = 4
428
429        /// Target number of rows in output files when writing multiple.
430        /// This is a soft max, so it can be exceeded slightly. There also
431        /// will be one file smaller than the limit if the total
432        /// number of rows written is not roughly divisible by the soft max
433        pub soft_max_rows_per_output_file: usize, default = 50000000
434
435        /// This is the maximum number of RecordBatches buffered
436        /// for each output file being worked. Higher values can potentially
437        /// give faster write performance at the cost of higher peak
438        /// memory consumption
439        pub max_buffered_batches_per_output_file: usize, default = 2
440
441        /// Should sub directories be ignored when scanning directories for data
442        /// files. Defaults to true (ignores subdirectories), consistent with
443        /// Hive. Note that this setting does not affect reading partitioned
444        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
445        pub listing_table_ignore_subdirectory: bool, default = true
446
447        /// Should DataFusion support recursive CTEs
448        pub enable_recursive_ctes: bool, default = true
449
450        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
451        /// statistics into the same file groups.
452        /// Currently experimental
453        pub split_file_groups_by_statistics: bool, default = false
454
455        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
456        pub keep_partition_by_columns: bool, default = false
457
458        /// Aggregation ratio (number of distinct groups / number of input rows)
459        /// threshold for skipping partial aggregation. If the value is greater
460        /// then partial aggregation will skip aggregation for further input
461        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
462
463        /// Number of input rows partial aggregation partition should process, before
464        /// aggregation ratio check and trying to switch to skipping aggregation mode
465        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
466
467        /// Should DataFusion use row number estimates at the input to decide
468        /// whether increasing parallelism is beneficial or not. By default,
469        /// only exact row numbers (not estimates) are used for this decision.
470        /// Setting this flag to `true` will likely produce better plans.
471        /// if the source of statistics is accurate.
472        /// We plan to make this the default in the future.
473        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
474
475        /// Should DataFusion enforce batch size in joins or not. By default,
476        /// DataFusion will not enforce batch size in joins. Enforcing batch size
477        /// in joins can reduce memory usage when joining large
478        /// tables with a highly-selective join filter, but is also slightly slower.
479        pub enforce_batch_size_in_joins: bool, default = false
480
481        /// Size (bytes) of data buffer DataFusion uses when writing output files.
482        /// This affects the size of the data chunks that are uploaded to remote
483        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
484        /// written, it may be necessary to increase this size to avoid errors from
485        /// the remote end point.
486        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
487    }
488}
489
490config_namespace! {
491    /// Options for reading and writing parquet files
492    ///
493    /// See also: [`SessionConfig`]
494    ///
495    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
496    pub struct ParquetOptions {
497        // The following options affect reading parquet files
498
499        /// (reading) If true, reads the Parquet data page level metadata (the
500        /// Page Index), if present, to reduce the I/O and number of
501        /// rows decoded.
502        pub enable_page_index: bool, default = true
503
504        /// (reading) If true, the parquet reader attempts to skip entire row groups based
505        /// on the predicate in the query and the metadata (min/max values) stored in
506        /// the parquet file
507        pub pruning: bool, default = true
508
509        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
510        /// the file Schema. This setting can help avoid schema conflicts when querying
511        /// multiple parquet files with schemas containing compatible types but different metadata
512        pub skip_metadata: bool, default = true
513
514        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
515        /// bytes of the parquet file optimistically. If not specified, two reads are required:
516        /// One read to fetch the 8-byte parquet footer and
517        /// another to fetch the metadata length encoded in the footer
518        pub metadata_size_hint: Option<usize>, default = None
519
520        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
521        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
522        pub pushdown_filters: bool, default = false
523
524        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
525        /// will be reordered heuristically to minimize the cost of evaluation. If false,
526        /// the filters are applied in the same order as written in the query
527        pub reorder_filters: bool, default = false
528
529        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
530        /// and `Binary/BinaryLarge` with `BinaryView`.
531        pub schema_force_view_types: bool, default = true
532
533        /// (reading) If true, parquet reader will read columns of
534        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
535        ///
536        /// Parquet files generated by some legacy writers do not correctly set
537        /// the UTF8 flag for strings, causing string columns to be loaded as
538        /// BLOB instead.
539        pub binary_as_string: bool, default = false
540
541        /// (reading) If true, parquet reader will read columns of
542        /// physical type int96 as originating from a different resolution
543        /// than nanosecond. This is useful for reading data from systems like Spark
544        /// which stores microsecond resolution timestamps in an int96 allowing it
545        /// to write values with a larger date range than 64-bit timestamps with
546        /// nanosecond resolution.
547        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
548
549        /// (reading) Use any available bloom filters when reading parquet files
550        pub bloom_filter_on_read: bool, default = true
551
552        // The following options affect writing to parquet files
553        // and map to parquet::file::properties::WriterProperties
554
555        /// (writing) Sets best effort maximum size of data page in bytes
556        pub data_pagesize_limit: usize, default = 1024 * 1024
557
558        /// (writing) Sets write_batch_size in bytes
559        pub write_batch_size: usize, default = 1024
560
561        /// (writing) Sets parquet writer version
562        /// valid values are "1.0" and "2.0"
563        pub writer_version: String, default = "1.0".to_string()
564
565        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
566        ///
567        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
568        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
569        pub skip_arrow_metadata: bool, default = false
570
571        /// (writing) Sets default parquet compression codec.
572        /// Valid values are: uncompressed, snappy, gzip(level),
573        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
574        /// These values are not case sensitive. If NULL, uses
575        /// default parquet writer setting
576        ///
577        /// Note that this default setting is not the same as
578        /// the default parquet writer setting.
579        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
580
581        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
582        /// default parquet writer setting
583        pub dictionary_enabled: Option<bool>, default = Some(true)
584
585        /// (writing) Sets best effort maximum dictionary page size, in bytes
586        pub dictionary_page_size_limit: usize, default = 1024 * 1024
587
588        /// (writing) Sets if statistics are enabled for any column
589        /// Valid values are: "none", "chunk", and "page"
590        /// These values are not case sensitive. If NULL, uses
591        /// default parquet writer setting
592        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
593
594        /// (writing) Sets max statistics size for any column. If NULL, uses
595        /// default parquet writer setting
596        /// max_statistics_size is deprecated, currently it is not being used
597        // TODO: remove once deprecated
598        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
599        pub max_statistics_size: Option<usize>, default = Some(4096)
600
601        /// (writing) Target maximum number of rows in each row group (defaults to 1M
602        /// rows). Writing larger row groups requires more memory to write, but
603        /// can get better compression and be faster to read.
604        pub max_row_group_size: usize, default =  1024 * 1024
605
606        /// (writing) Sets "created by" property
607        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
608
609        /// (writing) Sets column index truncate length
610        pub column_index_truncate_length: Option<usize>, default = Some(64)
611
612        /// (writing) Sets statictics truncate length. If NULL, uses
613        /// default parquet writer setting
614        pub statistics_truncate_length: Option<usize>, default = None
615
616        /// (writing) Sets best effort maximum number of rows in data page
617        pub data_page_row_count_limit: usize, default = 20_000
618
619        /// (writing)  Sets default encoding for any column.
620        /// Valid values are: plain, plain_dictionary, rle,
621        /// bit_packed, delta_binary_packed, delta_length_byte_array,
622        /// delta_byte_array, rle_dictionary, and byte_stream_split.
623        /// These values are not case sensitive. If NULL, uses
624        /// default parquet writer setting
625        pub encoding: Option<String>, transform = str::to_lowercase, default = None
626
627        /// (writing) Write bloom filters for all columns when creating parquet files
628        pub bloom_filter_on_write: bool, default = false
629
630        /// (writing) Sets bloom filter false positive probability. If NULL, uses
631        /// default parquet writer setting
632        pub bloom_filter_fpp: Option<f64>, default = None
633
634        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
635        /// default parquet writer setting
636        pub bloom_filter_ndv: Option<u64>, default = None
637
638        /// (writing) Controls whether DataFusion will attempt to speed up writing
639        /// parquet files by serializing them in parallel. Each column
640        /// in each row group in each output file are serialized in parallel
641        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
642        pub allow_single_file_parallelism: bool, default = true
643
644        /// (writing) By default parallel parquet writer is tuned for minimum
645        /// memory usage in a streaming execution plan. You may see
646        /// a performance benefit when writing large parquet files
647        /// by increasing maximum_parallel_row_group_writers and
648        /// maximum_buffered_record_batches_per_stream if your system
649        /// has idle cores and can tolerate additional memory usage.
650        /// Boosting these values is likely worthwhile when
651        /// writing out already in-memory data, such as from a cached
652        /// data frame.
653        pub maximum_parallel_row_group_writers: usize, default = 1
654
655        /// (writing) By default parallel parquet writer is tuned for minimum
656        /// memory usage in a streaming execution plan. You may see
657        /// a performance benefit when writing large parquet files
658        /// by increasing maximum_parallel_row_group_writers and
659        /// maximum_buffered_record_batches_per_stream if your system
660        /// has idle cores and can tolerate additional memory usage.
661        /// Boosting these values is likely worthwhile when
662        /// writing out already in-memory data, such as from a cached
663        /// data frame.
664        pub maximum_buffered_record_batches_per_stream: usize, default = 2
665    }
666}
667
668config_namespace! {
669    /// Options for configuring Parquet Modular Encryption
670    pub struct ParquetEncryptionOptions {
671        /// Optional file decryption properties
672        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
673
674        /// Optional file encryption properties
675        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
676    }
677}
678
679config_namespace! {
680    /// Options related to query optimization
681    ///
682    /// See also: [`SessionConfig`]
683    ///
684    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
685    pub struct OptimizerOptions {
686        /// When set to true, the optimizer will push a limit operation into
687        /// grouped aggregations which have no aggregate expressions, as a soft limit,
688        /// emitting groups once the limit is reached, before all rows in the group are read.
689        pub enable_distinct_aggregation_soft_limit: bool, default = true
690
691        /// When set to true, the physical plan optimizer will try to add round robin
692        /// repartitioning to increase parallelism to leverage more CPU cores
693        pub enable_round_robin_repartition: bool, default = true
694
695        /// When set to true, the optimizer will attempt to perform limit operations
696        /// during aggregations, if possible
697        pub enable_topk_aggregation: bool, default = true
698
699        /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
700        /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
701        /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
702        /// This means that if we already have 10 timestamps in the year 2025
703        /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
704        pub enable_dynamic_filter_pushdown: bool, default = true
705
706        /// When set to true, the optimizer will insert filters before a join between
707        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
708        /// filter can add additional overhead when the file format does not fully support
709        /// predicate push down.
710        pub filter_null_join_keys: bool, default = false
711
712        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
713        /// in parallel using the provided `target_partitions` level
714        pub repartition_aggregations: bool, default = true
715
716        /// Minimum total files size in bytes to perform file scan repartitioning.
717        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
718
719        /// Should DataFusion repartition data using the join keys to execute joins in parallel
720        /// using the provided `target_partitions` level
721        pub repartition_joins: bool, default = true
722
723        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
724        /// its inputs do not have any ordering or filtering If the flag is not enabled,
725        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
726        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
727        /// RightAnti, and RightSemi - being produced only at the end of the execution.
728        /// This is not typical in stream processing. Additionally, without proper design for
729        /// long runner execution, all types of joins may encounter out-of-memory errors.
730        pub allow_symmetric_joins_without_pruning: bool, default = true
731
732        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
733        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
734        ///
735        /// For FileSources, only Parquet and CSV formats are currently supported.
736        ///
737        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
738        /// might be partitioned into smaller chunks) for parallel scanning.
739        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
740        /// happen within a single file.
741        ///
742        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
743        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
744        /// the total number of partitions and batches per partition, but does not slice the initial
745        /// record tables provided to the MemTable on creation.
746        pub repartition_file_scans: bool, default = true
747
748        /// Should DataFusion repartition data using the partitions keys to execute window
749        /// functions in parallel using the provided `target_partitions` level
750        pub repartition_windows: bool, default = true
751
752        /// Should DataFusion execute sorts in a per-partition fashion and merge
753        /// afterwards instead of coalescing first and sorting globally.
754        /// With this flag is enabled, plans in the form below
755        ///
756        /// ```text
757        ///      "SortExec: [a@0 ASC]",
758        ///      "  CoalescePartitionsExec",
759        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
760        /// ```
761        /// would turn into the plan below which performs better in multithreaded environments
762        ///
763        /// ```text
764        ///      "SortPreservingMergeExec: [a@0 ASC]",
765        ///      "  SortExec: [a@0 ASC]",
766        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
767        /// ```
768        pub repartition_sorts: bool, default = true
769
770        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
771        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
772        /// using `SortPreservingMergeExec`)
773        ///
774        /// When false, DataFusion will maximize plan parallelism using
775        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
776        pub prefer_existing_sort: bool, default = false
777
778        /// When set to true, the logical plan optimizer will produce warning
779        /// messages if any optimization rules produce errors and then proceed to the next
780        /// rule. When set to false, any rules that produce errors will cause the query to fail
781        pub skip_failed_rules: bool, default = false
782
783        /// Number of times that the optimizer will attempt to optimize the plan
784        pub max_passes: usize, default = 3
785
786        /// When set to true, the physical plan optimizer will run a top down
787        /// process to reorder the join keys
788        pub top_down_join_key_reordering: bool, default = true
789
790        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
791        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
792        pub prefer_hash_join: bool, default = true
793
794        /// The maximum estimated size in bytes for one input side of a HashJoin
795        /// will be collected into a single partition
796        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
797
798        /// The maximum estimated size in rows for one input side of a HashJoin
799        /// will be collected into a single partition
800        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
801
802        /// The default filter selectivity used by Filter Statistics
803        /// when an exact selectivity cannot be determined. Valid values are
804        /// between 0 (no selectivity) and 100 (all rows are selected).
805        pub default_filter_selectivity: u8, default = 20
806
807        /// When set to true, the optimizer will not attempt to convert Union to Interleave
808        pub prefer_existing_union: bool, default = false
809
810        /// When set to true, if the returned type is a view type
811        /// then the output will be coerced to a non-view.
812        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
813        pub expand_views_at_output: bool, default = false
814    }
815}
816
817config_namespace! {
818    /// Options controlling explain output
819    ///
820    /// See also: [`SessionConfig`]
821    ///
822    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
823    pub struct ExplainOptions {
824        /// When set to true, the explain statement will only print logical plans
825        pub logical_plan_only: bool, default = false
826
827        /// When set to true, the explain statement will only print physical plans
828        pub physical_plan_only: bool, default = false
829
830        /// When set to true, the explain statement will print operator statistics
831        /// for physical plans
832        pub show_statistics: bool, default = false
833
834        /// When set to true, the explain statement will print the partition sizes
835        pub show_sizes: bool, default = true
836
837        /// When set to true, the explain statement will print schema information
838        pub show_schema: bool, default = false
839
840        /// Display format of explain. Default is "indent".
841        /// When set to "tree", it will print the plan in a tree-rendered format.
842        pub format: String, default = "indent".to_string()
843    }
844}
845
846impl ExecutionOptions {
847    /// Returns the correct parallelism based on the provided `value`.
848    /// If `value` is `"0"`, returns the default available parallelism, computed with
849    /// `get_available_parallelism`. Otherwise, returns `value`.
850    fn normalized_parallelism(value: &str) -> String {
851        if value.parse::<usize>() == Ok(0) {
852            get_available_parallelism().to_string()
853        } else {
854            value.to_owned()
855        }
856    }
857}
858
859config_namespace! {
860    /// Options controlling the format of output when printing record batches
861    /// Copies [`arrow::util::display::FormatOptions`]
862    pub struct FormatOptions {
863        /// If set to `true` any formatting errors will be written to the output
864        /// instead of being converted into a [`std::fmt::Error`]
865        pub safe: bool, default = true
866        /// Format string for nulls
867        pub null: String, default = "".into()
868        /// Date format for date arrays
869        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
870        /// Format for DateTime arrays
871        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
872        /// Timestamp format for timestamp arrays
873        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
874        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
875        pub timestamp_tz_format: Option<String>, default = None
876        /// Time format for time arrays
877        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
878        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
879        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
880        /// Show types in visual representation batches
881        pub types_info: bool, default = false
882    }
883}
884
885impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
886    type Error = DataFusionError;
887    fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
888        let duration_format = match self.duration_format.as_str() {
889            "pretty" => arrow::util::display::DurationFormat::Pretty,
890            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
891            _ => {
892                return _config_err!(
893                    "Invalid duration format: {}. Valid values are pretty or iso8601",
894                    self.duration_format
895                )
896            }
897        };
898
899        Ok(arrow::util::display::FormatOptions::new()
900            .with_display_error(self.safe)
901            .with_null(&self.null)
902            .with_date_format(self.date_format.as_deref())
903            .with_datetime_format(self.datetime_format.as_deref())
904            .with_timestamp_format(self.timestamp_format.as_deref())
905            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
906            .with_time_format(self.time_format.as_deref())
907            .with_duration_format(duration_format)
908            .with_types_info(self.types_info))
909    }
910}
911
912/// A key value pair, with a corresponding description
913#[derive(Debug)]
914pub struct ConfigEntry {
915    /// A unique string to identify this config value
916    pub key: String,
917
918    /// The value if any
919    pub value: Option<String>,
920
921    /// A description of this configuration entry
922    pub description: &'static str,
923}
924
925/// Configuration options struct, able to store both built-in configuration and custom options
926#[derive(Debug, Clone, Default)]
927#[non_exhaustive]
928pub struct ConfigOptions {
929    /// Catalog options
930    pub catalog: CatalogOptions,
931    /// Execution options
932    pub execution: ExecutionOptions,
933    /// Optimizer options
934    pub optimizer: OptimizerOptions,
935    /// SQL parser options
936    pub sql_parser: SqlParserOptions,
937    /// Explain options
938    pub explain: ExplainOptions,
939    /// Optional extensions registered using [`Extensions::insert`]
940    pub extensions: Extensions,
941    /// Formatting options when printing batches
942    pub format: FormatOptions,
943}
944
945impl ConfigField for ConfigOptions {
946    fn set(&mut self, key: &str, value: &str) -> Result<()> {
947        // Extensions are handled in the public `ConfigOptions::set`
948        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
949        match key {
950            "catalog" => self.catalog.set(rem, value),
951            "execution" => self.execution.set(rem, value),
952            "optimizer" => self.optimizer.set(rem, value),
953            "explain" => self.explain.set(rem, value),
954            "sql_parser" => self.sql_parser.set(rem, value),
955            "format" => self.format.set(rem, value),
956            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
957        }
958    }
959
960    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
961        self.catalog.visit(v, "datafusion.catalog", "");
962        self.execution.visit(v, "datafusion.execution", "");
963        self.optimizer.visit(v, "datafusion.optimizer", "");
964        self.explain.visit(v, "datafusion.explain", "");
965        self.sql_parser.visit(v, "datafusion.sql_parser", "");
966        self.format.visit(v, "datafusion.format", "");
967    }
968}
969
970impl ConfigOptions {
971    /// Creates a new [`ConfigOptions`] with default values
972    pub fn new() -> Self {
973        Self::default()
974    }
975
976    /// Set extensions to provided value
977    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
978        self.extensions = extensions;
979        self
980    }
981
982    /// Set a configuration option
983    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
984        let Some((prefix, key)) = key.split_once('.') else {
985            return _config_err!("could not find config namespace for key \"{key}\"");
986        };
987
988        if prefix == "datafusion" {
989            return ConfigField::set(self, key, value);
990        }
991
992        let Some(e) = self.extensions.0.get_mut(prefix) else {
993            return _config_err!("Could not find config namespace \"{prefix}\"");
994        };
995        e.0.set(key, value)
996    }
997
998    /// Create new ConfigOptions struct, taking values from
999    /// environment variables where possible.
1000    ///
1001    /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
1002    /// control `datafusion.execution.batch_size`.
1003    pub fn from_env() -> Result<Self> {
1004        struct Visitor(Vec<String>);
1005
1006        impl Visit for Visitor {
1007            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1008                self.0.push(key.to_string())
1009            }
1010
1011            fn none(&mut self, key: &str, _: &'static str) {
1012                self.0.push(key.to_string())
1013            }
1014        }
1015
1016        // Extract the names of all fields and then look up the corresponding
1017        // environment variables. This isn't hugely efficient but avoids
1018        // ambiguity between `a.b` and `a_b` which would both correspond
1019        // to an environment variable of `A_B`
1020
1021        let mut keys = Visitor(vec![]);
1022        let mut ret = Self::default();
1023        ret.visit(&mut keys, "datafusion", "");
1024
1025        for key in keys.0 {
1026            let env = key.to_uppercase().replace('.', "_");
1027            if let Some(var) = std::env::var_os(env) {
1028                let value = var.to_string_lossy();
1029                log::info!("Set {key} to {value} from the environment variable");
1030                ret.set(&key, value.as_ref())?;
1031            }
1032        }
1033
1034        Ok(ret)
1035    }
1036
1037    /// Create new ConfigOptions struct, taking values from a string hash map.
1038    ///
1039    /// Only the built-in configurations will be extracted from the hash map
1040    /// and other key value pairs will be ignored.
1041    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1042        struct Visitor(Vec<String>);
1043
1044        impl Visit for Visitor {
1045            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1046                self.0.push(key.to_string())
1047            }
1048
1049            fn none(&mut self, key: &str, _: &'static str) {
1050                self.0.push(key.to_string())
1051            }
1052        }
1053
1054        let mut keys = Visitor(vec![]);
1055        let mut ret = Self::default();
1056        ret.visit(&mut keys, "datafusion", "");
1057
1058        for key in keys.0 {
1059            if let Some(var) = settings.get(&key) {
1060                ret.set(&key, var)?;
1061            }
1062        }
1063
1064        Ok(ret)
1065    }
1066
1067    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1068    pub fn entries(&self) -> Vec<ConfigEntry> {
1069        struct Visitor(Vec<ConfigEntry>);
1070
1071        impl Visit for Visitor {
1072            fn some<V: Display>(
1073                &mut self,
1074                key: &str,
1075                value: V,
1076                description: &'static str,
1077            ) {
1078                self.0.push(ConfigEntry {
1079                    key: key.to_string(),
1080                    value: Some(value.to_string()),
1081                    description,
1082                })
1083            }
1084
1085            fn none(&mut self, key: &str, description: &'static str) {
1086                self.0.push(ConfigEntry {
1087                    key: key.to_string(),
1088                    value: None,
1089                    description,
1090                })
1091            }
1092        }
1093
1094        let mut v = Visitor(vec![]);
1095        self.visit(&mut v, "datafusion", "");
1096
1097        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1098        v.0
1099    }
1100
1101    /// Generate documentation that can be included in the user guide
1102    pub fn generate_config_markdown() -> String {
1103        use std::fmt::Write as _;
1104
1105        let mut s = Self::default();
1106
1107        // Normalize for display
1108        s.execution.target_partitions = 0;
1109        s.execution.planning_concurrency = 0;
1110
1111        let mut docs = "| key | default | description |\n".to_string();
1112        docs += "|-----|---------|-------------|\n";
1113        let mut entries = s.entries();
1114        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1115
1116        for entry in s.entries() {
1117            let _ = writeln!(
1118                &mut docs,
1119                "| {} | {} | {} |",
1120                entry.key,
1121                entry.value.as_deref().unwrap_or("NULL"),
1122                entry.description
1123            );
1124        }
1125        docs
1126    }
1127}
1128
1129/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1130/// within DataFusion [`ConfigOptions`]
1131///
1132/// This mechanism can be used to pass configuration to user defined functions
1133/// or optimizer passes
1134///
1135/// # Example
1136/// ```
1137/// use datafusion_common::{
1138///     config::ConfigExtension, extensions_options,
1139///     config::ConfigOptions,
1140/// };
1141///  // Define a new configuration struct using the `extensions_options` macro
1142///  extensions_options! {
1143///     /// My own config options.
1144///     pub struct MyConfig {
1145///         /// Should "foo" be replaced by "bar"?
1146///         pub foo_to_bar: bool, default = true
1147///
1148///         /// How many "baz" should be created?
1149///         pub baz_count: usize, default = 1337
1150///     }
1151///  }
1152///
1153///  impl ConfigExtension for MyConfig {
1154///     const PREFIX: &'static str = "my_config";
1155///  }
1156///
1157///  // set up config struct and register extension
1158///  let mut config = ConfigOptions::default();
1159///  config.extensions.insert(MyConfig::default());
1160///
1161///  // overwrite config default
1162///  config.set("my_config.baz_count", "42").unwrap();
1163///
1164///  // check config state
1165///  let my_config = config.extensions.get::<MyConfig>().unwrap();
1166///  assert!(my_config.foo_to_bar,);
1167///  assert_eq!(my_config.baz_count, 42,);
1168/// ```
1169///
1170/// # Note:
1171/// Unfortunately associated constants are not currently object-safe, and so this
1172/// extends the object-safe [`ExtensionOptions`]
1173pub trait ConfigExtension: ExtensionOptions {
1174    /// Configuration namespace prefix to use
1175    ///
1176    /// All values under this will be prefixed with `$PREFIX + "."`
1177    const PREFIX: &'static str;
1178}
1179
1180/// An object-safe API for storing arbitrary configuration.
1181///
1182/// See [`ConfigExtension`] for user defined configuration
1183pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1184    /// Return `self` as [`Any`]
1185    ///
1186    /// This is needed until trait upcasting is stabilized
1187    fn as_any(&self) -> &dyn Any;
1188
1189    /// Return `self` as [`Any`]
1190    ///
1191    /// This is needed until trait upcasting is stabilized
1192    fn as_any_mut(&mut self) -> &mut dyn Any;
1193
1194    /// Return a deep clone of this [`ExtensionOptions`]
1195    ///
1196    /// It is important this does not share mutable state to avoid consistency issues
1197    /// with configuration changing whilst queries are executing
1198    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1199
1200    /// Set the given `key`, `value` pair
1201    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1202
1203    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1204    fn entries(&self) -> Vec<ConfigEntry>;
1205}
1206
1207/// A type-safe container for [`ConfigExtension`]
1208#[derive(Debug, Default, Clone)]
1209pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1210
1211impl Extensions {
1212    /// Create a new, empty [`Extensions`]
1213    pub fn new() -> Self {
1214        Self(BTreeMap::new())
1215    }
1216
1217    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1218    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1219        assert_ne!(T::PREFIX, "datafusion");
1220        let e = ExtensionBox(Box::new(extension));
1221        self.0.insert(T::PREFIX, e);
1222    }
1223
1224    /// Retrieves the extension of the given type if any
1225    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1226        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1227    }
1228
1229    /// Retrieves the extension of the given type if any
1230    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1231        let e = self.0.get_mut(T::PREFIX)?;
1232        e.0.as_any_mut().downcast_mut()
1233    }
1234}
1235
1236#[derive(Debug)]
1237struct ExtensionBox(Box<dyn ExtensionOptions>);
1238
1239impl Clone for ExtensionBox {
1240    fn clone(&self) -> Self {
1241        Self(self.0.cloned())
1242    }
1243}
1244
1245/// A trait implemented by `config_namespace` and for field types that provides
1246/// the ability to walk and mutate the configuration tree
1247pub trait ConfigField {
1248    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1249
1250    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1251}
1252
1253impl<F: ConfigField + Default> ConfigField for Option<F> {
1254    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1255        match self {
1256            Some(s) => s.visit(v, key, description),
1257            None => v.none(key, description),
1258        }
1259    }
1260
1261    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1262        self.get_or_insert_with(Default::default).set(key, value)
1263    }
1264}
1265
1266/// Default transformation to parse a [`ConfigField`] for a string.
1267///
1268/// This uses [`FromStr`] to parse the data.
1269pub fn default_config_transform<T>(input: &str) -> Result<T>
1270where
1271    T: FromStr,
1272    <T as FromStr>::Err: Sync + Send + Error + 'static,
1273{
1274    input.parse().map_err(|e| {
1275        DataFusionError::Context(
1276            format!(
1277                "Error parsing '{}' as {}",
1278                input,
1279                std::any::type_name::<T>()
1280            ),
1281            Box::new(DataFusionError::External(Box::new(e))),
1282        )
1283    })
1284}
1285
1286/// Macro that generates [`ConfigField`] for a given type.
1287///
1288/// # Usage
1289/// This always requires [`Display`] to be implemented for the given type.
1290///
1291/// There are two ways to invoke this macro. The first one uses
1292/// [`default_config_transform`]/[`FromStr`] to parse the data:
1293///
1294/// ```ignore
1295/// config_field(MyType);
1296/// ```
1297///
1298/// Note that the parsing error MUST implement [`std::error::Error`]!
1299///
1300/// Or you can specify how you want to parse an [`str`] into the type:
1301///
1302/// ```ignore
1303/// fn parse_it(s: &str) -> Result<MyType> {
1304///     ...
1305/// }
1306///
1307/// config_field(
1308///     MyType,
1309///     value => parse_it(value)
1310/// );
1311/// ```
1312#[macro_export]
1313macro_rules! config_field {
1314    ($t:ty) => {
1315        config_field!($t, value => $crate::config::default_config_transform(value)?);
1316    };
1317
1318    ($t:ty, $arg:ident => $transform:expr) => {
1319        impl $crate::config::ConfigField for $t {
1320            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1321                v.some(key, self, description)
1322            }
1323
1324            fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1325                *self = $transform;
1326                Ok(())
1327            }
1328        }
1329    };
1330}
1331
1332config_field!(String);
1333config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1334config_field!(usize);
1335config_field!(f64);
1336config_field!(u64);
1337
1338impl ConfigField for u8 {
1339    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1340        v.some(key, self, description)
1341    }
1342
1343    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1344        if value.is_empty() {
1345            return Err(DataFusionError::Configuration(format!(
1346                "Input string for {key} key is empty"
1347            )));
1348        }
1349        // Check if the string is a valid number
1350        if let Ok(num) = value.parse::<u8>() {
1351            // TODO: Let's decide how we treat the numerical strings.
1352            *self = num;
1353        } else {
1354            let bytes = value.as_bytes();
1355            // Check if the first character is ASCII (single byte)
1356            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1357                return Err(DataFusionError::Configuration(format!(
1358                    "Error parsing {value} as u8. Non-ASCII string provided"
1359                )));
1360            }
1361            *self = bytes[0];
1362        }
1363        Ok(())
1364    }
1365}
1366
1367impl ConfigField for CompressionTypeVariant {
1368    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1369        v.some(key, self, description)
1370    }
1371
1372    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1373        *self = CompressionTypeVariant::from_str(value)?;
1374        Ok(())
1375    }
1376}
1377
1378/// An implementation trait used to recursively walk configuration
1379pub trait Visit {
1380    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1381
1382    fn none(&mut self, key: &str, description: &'static str);
1383}
1384
1385/// Convenience macro to create [`ExtensionsOptions`].
1386///
1387/// The created structure implements the following traits:
1388///
1389/// - [`Clone`]
1390/// - [`Debug`]
1391/// - [`Default`]
1392/// - [`ExtensionOptions`]
1393///
1394/// # Usage
1395/// The syntax is:
1396///
1397/// ```text
1398/// extensions_options! {
1399///      /// Struct docs (optional).
1400///     [<vis>] struct <StructName> {
1401///         /// Field docs (optional)
1402///         [<vis>] <field_name>: <field_type>, default = <default_value>
1403///
1404///         ... more fields
1405///     }
1406/// }
1407/// ```
1408///
1409/// The placeholders are:
1410/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1411/// - `<StructName>`: Struct name like `MyStruct`.
1412/// - `<field_name>`: Field name like `my_field`.
1413/// - `<field_type>`: Field type like `u8`.
1414/// - `<default_value>`: Default value matching the field type like `42`.
1415///
1416/// # Example
1417/// See also a full example on the [`ConfigExtension`] documentation
1418///
1419/// ```
1420/// use datafusion_common::extensions_options;
1421///
1422/// extensions_options! {
1423///     /// My own config options.
1424///     pub struct MyConfig {
1425///         /// Should "foo" be replaced by "bar"?
1426///         pub foo_to_bar: bool, default = true
1427///
1428///         /// How many "baz" should be created?
1429///         pub baz_count: usize, default = 1337
1430///     }
1431/// }
1432/// ```
1433///
1434///
1435/// [`Debug`]: std::fmt::Debug
1436/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1437#[macro_export]
1438macro_rules! extensions_options {
1439    (
1440     $(#[doc = $struct_d:tt])*
1441     $vis:vis struct $struct_name:ident {
1442        $(
1443        $(#[doc = $d:tt])*
1444        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1445        )*$(,)*
1446    }
1447    ) => {
1448        $(#[doc = $struct_d])*
1449        #[derive(Debug, Clone)]
1450        #[non_exhaustive]
1451        $vis struct $struct_name{
1452            $(
1453            $(#[doc = $d])*
1454            $field_vis $field_name : $field_type,
1455            )*
1456        }
1457
1458        impl Default for $struct_name {
1459            fn default() -> Self {
1460                Self {
1461                    $($field_name: $default),*
1462                }
1463            }
1464        }
1465
1466        impl $crate::config::ExtensionOptions for $struct_name {
1467            fn as_any(&self) -> &dyn ::std::any::Any {
1468                self
1469            }
1470
1471            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1472                self
1473            }
1474
1475            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1476                Box::new(self.clone())
1477            }
1478
1479            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1480                $crate::config::ConfigField::set(self, key, value)
1481            }
1482
1483            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1484                struct Visitor(Vec<$crate::config::ConfigEntry>);
1485
1486                impl $crate::config::Visit for Visitor {
1487                    fn some<V: std::fmt::Display>(
1488                        &mut self,
1489                        key: &str,
1490                        value: V,
1491                        description: &'static str,
1492                    ) {
1493                        self.0.push($crate::config::ConfigEntry {
1494                            key: key.to_string(),
1495                            value: Some(value.to_string()),
1496                            description,
1497                        })
1498                    }
1499
1500                    fn none(&mut self, key: &str, description: &'static str) {
1501                        self.0.push($crate::config::ConfigEntry {
1502                            key: key.to_string(),
1503                            value: None,
1504                            description,
1505                        })
1506                    }
1507                }
1508
1509                let mut v = Visitor(vec![]);
1510                // The prefix is not used for extensions.
1511                // The description is generated in ConfigField::visit.
1512                // We can just pass empty strings here.
1513                $crate::config::ConfigField::visit(self, &mut v, "", "");
1514                v.0
1515            }
1516        }
1517
1518        impl $crate::config::ConfigField for $struct_name {
1519            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1520                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1521                match key {
1522                    $(
1523                        stringify!($field_name) => {
1524                            // Safely apply deprecated attribute if present
1525                            // $(#[allow(deprecated)])?
1526                            {
1527                                #[allow(deprecated)]
1528                                self.$field_name.set(rem, value.as_ref())
1529                            }
1530                        },
1531                    )*
1532                    _ => return $crate::error::_config_err!(
1533                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1534                    )
1535                }
1536            }
1537
1538            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1539                $(
1540                    let key = stringify!($field_name).to_string();
1541                    let desc = concat!($($d),*).trim();
1542                    #[allow(deprecated)]
1543                    self.$field_name.visit(v, key.as_str(), desc);
1544                )*
1545            }
1546        }
1547    }
1548}
1549
1550/// These file types have special built in behavior for configuration.
1551/// Use TableOptions::Extensions for configuring other file types.
1552#[derive(Debug, Clone)]
1553pub enum ConfigFileType {
1554    CSV,
1555    #[cfg(feature = "parquet")]
1556    PARQUET,
1557    JSON,
1558}
1559
1560/// Represents the configuration options available for handling different table formats within a data processing application.
1561/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1562/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1563#[derive(Debug, Clone, Default)]
1564pub struct TableOptions {
1565    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1566    /// quote character, and whether the first row is considered as headers.
1567    pub csv: CsvOptions,
1568
1569    /// Configuration options for Parquet file handling. This includes settings for compression,
1570    /// encoding, and other Parquet-specific file characteristics.
1571    pub parquet: TableParquetOptions,
1572
1573    /// Configuration options for JSON file handling.
1574    pub json: JsonOptions,
1575
1576    /// The current file format that the table operations should assume. This option allows
1577    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1578    pub current_format: Option<ConfigFileType>,
1579
1580    /// Optional extensions that can be used to extend or customize the behavior of the table
1581    /// options. Extensions can be registered using `Extensions::insert` and might include
1582    /// custom file handling logic, additional configuration parameters, or other enhancements.
1583    pub extensions: Extensions,
1584}
1585
1586impl ConfigField for TableOptions {
1587    /// Visits configuration settings for the current file format, or all formats if none is selected.
1588    ///
1589    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1590    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1591    /// it visits all available format settings.
1592    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1593        if let Some(file_type) = &self.current_format {
1594            match file_type {
1595                #[cfg(feature = "parquet")]
1596                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1597                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1598                ConfigFileType::JSON => self.json.visit(v, "format", ""),
1599            }
1600        } else {
1601            self.csv.visit(v, "csv", "");
1602            self.parquet.visit(v, "parquet", "");
1603            self.json.visit(v, "json", "");
1604        }
1605    }
1606
1607    /// Sets a configuration value for a specific key within `TableOptions`.
1608    ///
1609    /// This method delegates setting configuration values to the specific file format configurations,
1610    /// based on the current format selected. If no format is selected, it returns an error.
1611    ///
1612    /// # Parameters
1613    ///
1614    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1615    ///   for CSV format.
1616    /// * `value`: The value to set for the specified configuration key.
1617    ///
1618    /// # Returns
1619    ///
1620    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1621    /// or if setting the configuration value fails for the specific format.
1622    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1623        // Extensions are handled in the public `ConfigOptions::set`
1624        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1625        match key {
1626            "format" => {
1627                let Some(format) = &self.current_format else {
1628                    return _config_err!("Specify a format for TableOptions");
1629                };
1630                match format {
1631                    #[cfg(feature = "parquet")]
1632                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
1633                    ConfigFileType::CSV => self.csv.set(rem, value),
1634                    ConfigFileType::JSON => self.json.set(rem, value),
1635                }
1636            }
1637            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1638        }
1639    }
1640}
1641
1642impl TableOptions {
1643    /// Constructs a new instance of `TableOptions` with default settings.
1644    ///
1645    /// # Returns
1646    ///
1647    /// A new `TableOptions` instance with default configuration values.
1648    pub fn new() -> Self {
1649        Self::default()
1650    }
1651
1652    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1653    ///
1654    /// # Parameters
1655    ///
1656    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1657    ///
1658    /// # Returns
1659    ///
1660    /// A new `TableOptions` instance with settings applied from the session config.
1661    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1662        let initial = TableOptions::default();
1663        initial.combine_with_session_config(config)
1664    }
1665
1666    /// Updates the current `TableOptions` with settings from a given session config.
1667    ///
1668    /// # Parameters
1669    ///
1670    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1671    ///
1672    /// # Returns
1673    ///
1674    /// A new `TableOptions` instance with updated settings from the session config.
1675    #[must_use = "this method returns a new instance"]
1676    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1677        let mut clone = self.clone();
1678        clone.parquet.global = config.execution.parquet.clone();
1679        clone
1680    }
1681
1682    /// Sets the file format for the table.
1683    ///
1684    /// # Parameters
1685    ///
1686    /// * `format`: The file format to use (e.g., CSV, Parquet).
1687    pub fn set_config_format(&mut self, format: ConfigFileType) {
1688        self.current_format = Some(format);
1689    }
1690
1691    /// Sets the extensions for this `TableOptions` instance.
1692    ///
1693    /// # Parameters
1694    ///
1695    /// * `extensions`: The `Extensions` instance to set.
1696    ///
1697    /// # Returns
1698    ///
1699    /// A new `TableOptions` instance with the specified extensions applied.
1700    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1701        self.extensions = extensions;
1702        self
1703    }
1704
1705    /// Sets a specific configuration option.
1706    ///
1707    /// # Parameters
1708    ///
1709    /// * `key`: The configuration key (e.g., "format.delimiter").
1710    /// * `value`: The value to set for the specified key.
1711    ///
1712    /// # Returns
1713    ///
1714    /// A result indicating success or failure in setting the configuration option.
1715    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1716        let Some((prefix, _)) = key.split_once('.') else {
1717            return _config_err!("could not find config namespace for key \"{key}\"");
1718        };
1719
1720        if prefix == "format" {
1721            return ConfigField::set(self, key, value);
1722        }
1723
1724        if prefix == "execution" {
1725            return Ok(());
1726        }
1727
1728        let Some(e) = self.extensions.0.get_mut(prefix) else {
1729            return _config_err!("Could not find config namespace \"{prefix}\"");
1730        };
1731        e.0.set(key, value)
1732    }
1733
1734    /// Initializes a new `TableOptions` from a hash map of string settings.
1735    ///
1736    /// # Parameters
1737    ///
1738    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1739    ///
1740    /// # Returns
1741    ///
1742    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1743    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1744        let mut ret = Self::default();
1745        for (k, v) in settings {
1746            ret.set(k, v)?;
1747        }
1748
1749        Ok(ret)
1750    }
1751
1752    /// Modifies the current `TableOptions` instance with settings from a hash map.
1753    ///
1754    /// # Parameters
1755    ///
1756    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1757    ///
1758    /// # Returns
1759    ///
1760    /// A result indicating success or failure in applying the settings.
1761    pub fn alter_with_string_hash_map(
1762        &mut self,
1763        settings: &HashMap<String, String>,
1764    ) -> Result<()> {
1765        for (k, v) in settings {
1766            self.set(k, v)?;
1767        }
1768        Ok(())
1769    }
1770
1771    /// Retrieves all configuration entries from this `TableOptions`.
1772    ///
1773    /// # Returns
1774    ///
1775    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1776    pub fn entries(&self) -> Vec<ConfigEntry> {
1777        struct Visitor(Vec<ConfigEntry>);
1778
1779        impl Visit for Visitor {
1780            fn some<V: Display>(
1781                &mut self,
1782                key: &str,
1783                value: V,
1784                description: &'static str,
1785            ) {
1786                self.0.push(ConfigEntry {
1787                    key: key.to_string(),
1788                    value: Some(value.to_string()),
1789                    description,
1790                })
1791            }
1792
1793            fn none(&mut self, key: &str, description: &'static str) {
1794                self.0.push(ConfigEntry {
1795                    key: key.to_string(),
1796                    value: None,
1797                    description,
1798                })
1799            }
1800        }
1801
1802        let mut v = Visitor(vec![]);
1803        self.visit(&mut v, "format", "");
1804
1805        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1806        v.0
1807    }
1808}
1809
1810/// Options that control how Parquet files are read, including global options
1811/// that apply to all columns and optional column-specific overrides
1812///
1813/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1814/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1815/// (e.g. sorting_columns).
1816#[derive(Clone, Default, Debug, PartialEq)]
1817pub struct TableParquetOptions {
1818    /// Global Parquet options that propagates to all columns.
1819    pub global: ParquetOptions,
1820    /// Column specific options. Default usage is parquet.XX::column.
1821    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1822    /// Additional file-level metadata to include. Inserted into the key_value_metadata
1823    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1824    ///
1825    /// Multiple entries are permitted
1826    /// ```sql
1827    /// OPTIONS (
1828    ///    'format.metadata::key1' '',
1829    ///    'format.metadata::key2' 'value',
1830    ///    'format.metadata::key3' 'value has spaces',
1831    ///    'format.metadata::key4' 'value has special chars :: :',
1832    ///    'format.metadata::key_dupe' 'original will be overwritten',
1833    ///    'format.metadata::key_dupe' 'final'
1834    /// )
1835    /// ```
1836    pub key_value_metadata: HashMap<String, Option<String>>,
1837    /// Options for configuring Parquet modular encryption
1838    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
1839    /// These can be set via 'format.crypto', for example:
1840    /// ```sql
1841    /// OPTIONS (
1842    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
1843    ///    'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435',  -- b"0123456789012345" */
1844    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1845    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1846    ///     -- Same for decryption
1847    ///    'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
1848    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1849    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1850    /// )
1851    /// ```
1852    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
1853    /// Note that keys must be provided as in hex format since these are binary strings.
1854    pub crypto: ParquetEncryptionOptions,
1855}
1856
1857impl TableParquetOptions {
1858    /// Return new default TableParquetOptions
1859    pub fn new() -> Self {
1860        Self::default()
1861    }
1862
1863    /// Set whether the encoding of the arrow metadata should occur
1864    /// during the writing of parquet.
1865    ///
1866    /// Default is to encode the arrow schema in the file kv_metadata.
1867    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1868        Self {
1869            global: ParquetOptions {
1870                skip_arrow_metadata: skip,
1871                ..self.global
1872            },
1873            ..self
1874        }
1875    }
1876}
1877
1878impl ConfigField for TableParquetOptions {
1879    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1880        self.global.visit(v, key_prefix, description);
1881        self.column_specific_options
1882            .visit(v, key_prefix, description);
1883        self.crypto
1884            .visit(v, &format!("{key_prefix}.crypto"), description);
1885    }
1886
1887    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1888        // Determine if the key is a global, metadata, or column-specific setting
1889        if key.starts_with("metadata::") {
1890            let k = match key.split("::").collect::<Vec<_>>()[..] {
1891                [_meta] | [_meta, ""] => {
1892                    return _config_err!(
1893                        "Invalid metadata key provided, missing key in metadata::<key>"
1894                    )
1895                }
1896                [_meta, k] => k.into(),
1897                _ => {
1898                    return _config_err!(
1899                        "Invalid metadata key provided, found too many '::' in \"{key}\""
1900                    )
1901                }
1902            };
1903            self.key_value_metadata.insert(k, Some(value.into()));
1904            Ok(())
1905        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
1906            self.crypto.set(crypto_feature, value)
1907        } else if key.contains("::") {
1908            self.column_specific_options.set(key, value)
1909        } else {
1910            self.global.set(key, value)
1911        }
1912    }
1913}
1914
1915macro_rules! config_namespace_with_hashmap {
1916    (
1917     $(#[doc = $struct_d:tt])*
1918     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
1919     $vis:vis struct $struct_name:ident {
1920        $(
1921        $(#[doc = $d:tt])*
1922        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1923        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1924        )*$(,)*
1925    }
1926    ) => {
1927
1928        $(#[doc = $struct_d])*
1929        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
1930        #[derive(Debug, Clone, PartialEq)]
1931        $vis struct $struct_name{
1932            $(
1933            $(#[doc = $d])*
1934            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1935            $field_vis $field_name : $field_type,
1936            )*
1937        }
1938
1939        impl ConfigField for $struct_name {
1940            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1941                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1942                match key {
1943                    $(
1944                       stringify!($field_name) => {
1945                           // Handle deprecated fields
1946                           #[allow(deprecated)] // Allow deprecated fields
1947                           $(let value = $transform(value);)?
1948                           self.$field_name.set(rem, value.as_ref())
1949                       },
1950                    )*
1951                    _ => _config_err!(
1952                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1953                    )
1954                }
1955            }
1956
1957            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1958                $(
1959                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1960                let desc = concat!($($d),*).trim();
1961                // Handle deprecated fields
1962                #[allow(deprecated)]
1963                self.$field_name.visit(v, key.as_str(), desc);
1964                )*
1965            }
1966        }
1967
1968        impl Default for $struct_name {
1969            fn default() -> Self {
1970                #[allow(deprecated)]
1971                Self {
1972                    $($field_name: $default),*
1973                }
1974            }
1975        }
1976
1977        impl ConfigField for HashMap<String,$struct_name> {
1978            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1979                let parts: Vec<&str> = key.splitn(2, "::").collect();
1980                match parts.as_slice() {
1981                    [inner_key, hashmap_key] => {
1982                        // Get or create the struct for the specified key
1983                        let inner_value = self
1984                            .entry((*hashmap_key).to_owned())
1985                            .or_insert_with($struct_name::default);
1986
1987                        inner_value.set(inner_key, value)
1988                    }
1989                    _ => _config_err!("Unrecognized key '{key}'."),
1990                }
1991            }
1992
1993            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1994                for (column_name, col_options) in self {
1995                    $(
1996                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1997                    let desc = concat!($($d),*).trim();
1998                    #[allow(deprecated)]
1999                    col_options.$field_name.visit(v, key.as_str(), desc);
2000                    )*
2001                }
2002            }
2003        }
2004    }
2005}
2006
2007config_namespace_with_hashmap! {
2008    /// Options controlling parquet format for individual columns.
2009    ///
2010    /// See [`ParquetOptions`] for more details
2011    pub struct ParquetColumnOptions {
2012        /// Sets if bloom filter is enabled for the column path.
2013        pub bloom_filter_enabled: Option<bool>, default = None
2014
2015        /// Sets encoding for the column path.
2016        /// Valid values are: plain, plain_dictionary, rle,
2017        /// bit_packed, delta_binary_packed, delta_length_byte_array,
2018        /// delta_byte_array, rle_dictionary, and byte_stream_split.
2019        /// These values are not case-sensitive. If NULL, uses
2020        /// default parquet options
2021        pub encoding: Option<String>, default = None
2022
2023        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2024        /// default parquet options
2025        pub dictionary_enabled: Option<bool>, default = None
2026
2027        /// Sets default parquet compression codec for the column path.
2028        /// Valid values are: uncompressed, snappy, gzip(level),
2029        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2030        /// These values are not case-sensitive. If NULL, uses
2031        /// default parquet options
2032        pub compression: Option<String>, transform = str::to_lowercase, default = None
2033
2034        /// Sets if statistics are enabled for the column
2035        /// Valid values are: "none", "chunk", and "page"
2036        /// These values are not case sensitive. If NULL, uses
2037        /// default parquet options
2038        pub statistics_enabled: Option<String>, default = None
2039
2040        /// Sets bloom filter false positive probability for the column path. If NULL, uses
2041        /// default parquet options
2042        pub bloom_filter_fpp: Option<f64>, default = None
2043
2044        /// Sets bloom filter number of distinct values. If NULL, uses
2045        /// default parquet options
2046        pub bloom_filter_ndv: Option<u64>, default = None
2047
2048        /// Sets max statistics size for the column path. If NULL, uses
2049        /// default parquet options
2050        /// max_statistics_size is deprecated, currently it is not being used
2051        // TODO: remove once deprecated
2052        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
2053        pub max_statistics_size: Option<usize>, default = None
2054    }
2055}
2056
2057#[derive(Clone, Debug, PartialEq)]
2058pub struct ConfigFileEncryptionProperties {
2059    /// Should the parquet footer be encrypted
2060    /// default is true
2061    pub encrypt_footer: bool,
2062    /// Key to use for the parquet footer encoded in hex format
2063    pub footer_key_as_hex: String,
2064    /// Metadata information for footer key
2065    pub footer_key_metadata_as_hex: String,
2066    /// HashMap of column names --> (key in hex format, metadata)
2067    pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2068    /// AAD prefix string uniquely identifies the file and prevents file swapping
2069    pub aad_prefix_as_hex: String,
2070    /// If true, store the AAD prefix in the file
2071    /// default is false
2072    pub store_aad_prefix: bool,
2073}
2074
2075// Setup to match EncryptionPropertiesBuilder::new()
2076impl Default for ConfigFileEncryptionProperties {
2077    fn default() -> Self {
2078        ConfigFileEncryptionProperties {
2079            encrypt_footer: true,
2080            footer_key_as_hex: String::new(),
2081            footer_key_metadata_as_hex: String::new(),
2082            column_encryption_properties: Default::default(),
2083            aad_prefix_as_hex: String::new(),
2084            store_aad_prefix: false,
2085        }
2086    }
2087}
2088
2089config_namespace_with_hashmap! {
2090    pub struct ColumnEncryptionProperties {
2091        /// Per column encryption key
2092        pub column_key_as_hex: String, default = "".to_string()
2093        /// Per column encryption key metadata
2094        pub column_metadata_as_hex: Option<String>, default = None
2095    }
2096}
2097
2098impl ConfigField for ConfigFileEncryptionProperties {
2099    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2100        let key = format!("{key_prefix}.encrypt_footer");
2101        let desc = "Encrypt the footer";
2102        self.encrypt_footer.visit(v, key.as_str(), desc);
2103
2104        let key = format!("{key_prefix}.footer_key_as_hex");
2105        let desc = "Key to use for the parquet footer";
2106        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2107
2108        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2109        let desc = "Metadata to use for the parquet footer";
2110        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2111
2112        let key = format!("{key_prefix}.aad_prefix_as_hex");
2113        let desc = "AAD prefix to use";
2114        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2115
2116        let key = format!("{key_prefix}.store_aad_prefix");
2117        let desc = "If true, store the AAD prefix";
2118        self.store_aad_prefix.visit(v, key.as_str(), desc);
2119
2120        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2121    }
2122
2123    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2124        // Any hex encoded values must be pre-encoded using
2125        // hex::encode() before calling set.
2126
2127        if key.contains("::") {
2128            // Handle any column specific properties
2129            return self.column_encryption_properties.set(key, value);
2130        };
2131
2132        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2133        match key {
2134            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2135            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2136            "footer_key_metadata_as_hex" => {
2137                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2138            }
2139            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2140            "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2141            _ => _config_err!(
2142                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2143                key
2144            ),
2145        }
2146    }
2147}
2148
2149#[cfg(feature = "parquet_encryption")]
2150impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2151    fn from(val: ConfigFileEncryptionProperties) -> Self {
2152        let mut fep = FileEncryptionProperties::builder(
2153            hex::decode(val.footer_key_as_hex).unwrap(),
2154        )
2155        .with_plaintext_footer(!val.encrypt_footer)
2156        .with_aad_prefix_storage(val.store_aad_prefix);
2157
2158        if !val.footer_key_metadata_as_hex.is_empty() {
2159            fep = fep.with_footer_key_metadata(
2160                hex::decode(&val.footer_key_metadata_as_hex)
2161                    .expect("Invalid footer key metadata"),
2162            );
2163        }
2164
2165        for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2166            let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2167                .expect("Invalid column encryption key");
2168            let key_metadata = encryption_props
2169                .column_metadata_as_hex
2170                .as_ref()
2171                .map(|x| hex::decode(x).expect("Invalid column metadata"));
2172            match key_metadata {
2173                Some(key_metadata) => {
2174                    fep = fep.with_column_key_and_metadata(
2175                        column_name,
2176                        encryption_key,
2177                        key_metadata,
2178                    );
2179                }
2180                None => {
2181                    fep = fep.with_column_key(column_name, encryption_key);
2182                }
2183            }
2184        }
2185
2186        if !val.aad_prefix_as_hex.is_empty() {
2187            let aad_prefix: Vec<u8> =
2188                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2189            fep = fep.with_aad_prefix(aad_prefix);
2190        }
2191        fep.build().unwrap()
2192    }
2193}
2194
2195#[cfg(feature = "parquet_encryption")]
2196impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties {
2197    fn from(f: &FileEncryptionProperties) -> Self {
2198        let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2199
2200        let mut column_encryption_properties: HashMap<
2201            String,
2202            ColumnEncryptionProperties,
2203        > = HashMap::new();
2204
2205        for (i, column_name) in column_names_vec.iter().enumerate() {
2206            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2207            let column_metadata_as_hex: Option<String> =
2208                column_metas_vec.get(i).map(hex::encode);
2209            column_encryption_properties.insert(
2210                column_name.clone(),
2211                ColumnEncryptionProperties {
2212                    column_key_as_hex,
2213                    column_metadata_as_hex,
2214                },
2215            );
2216        }
2217        let mut aad_prefix: Vec<u8> = Vec::new();
2218        if let Some(prefix) = f.aad_prefix() {
2219            aad_prefix = prefix.clone();
2220        }
2221        ConfigFileEncryptionProperties {
2222            encrypt_footer: f.encrypt_footer(),
2223            footer_key_as_hex: hex::encode(f.footer_key()),
2224            footer_key_metadata_as_hex: f
2225                .footer_key_metadata()
2226                .map(hex::encode)
2227                .unwrap_or_default(),
2228            column_encryption_properties,
2229            aad_prefix_as_hex: hex::encode(aad_prefix),
2230            store_aad_prefix: f.store_aad_prefix(),
2231        }
2232    }
2233}
2234
2235#[derive(Clone, Debug, PartialEq)]
2236pub struct ConfigFileDecryptionProperties {
2237    /// Binary string to use for the parquet footer encoded in hex format
2238    pub footer_key_as_hex: String,
2239    /// HashMap of column names --> key in hex format
2240    pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2241    /// AAD prefix string uniquely identifies the file and prevents file swapping
2242    pub aad_prefix_as_hex: String,
2243    /// If true, then verify signature for files with plaintext footers.
2244    /// default = true
2245    pub footer_signature_verification: bool,
2246}
2247
2248config_namespace_with_hashmap! {
2249    pub struct ColumnDecryptionProperties {
2250        /// Per column encryption key
2251        pub column_key_as_hex: String, default = "".to_string()
2252    }
2253}
2254
2255// Setup to match DecryptionPropertiesBuilder::new()
2256impl Default for ConfigFileDecryptionProperties {
2257    fn default() -> Self {
2258        ConfigFileDecryptionProperties {
2259            footer_key_as_hex: String::new(),
2260            column_decryption_properties: Default::default(),
2261            aad_prefix_as_hex: String::new(),
2262            footer_signature_verification: true,
2263        }
2264    }
2265}
2266
2267impl ConfigField for ConfigFileDecryptionProperties {
2268    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2269        let key = format!("{key_prefix}.footer_key_as_hex");
2270        let desc = "Key to use for the parquet footer";
2271        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2272
2273        let key = format!("{key_prefix}.aad_prefix_as_hex");
2274        let desc = "AAD prefix to use";
2275        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2276
2277        let key = format!("{key_prefix}.footer_signature_verification");
2278        let desc = "If true, verify the footer signature";
2279        self.footer_signature_verification
2280            .visit(v, key.as_str(), desc);
2281
2282        self.column_decryption_properties.visit(v, key_prefix, desc);
2283    }
2284
2285    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2286        // Any hex encoded values must be pre-encoded using
2287        // hex::encode() before calling set.
2288
2289        if key.contains("::") {
2290            // Handle any column specific properties
2291            return self.column_decryption_properties.set(key, value);
2292        };
2293
2294        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2295        match key {
2296            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2297            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2298            "footer_signature_verification" => {
2299                self.footer_signature_verification.set(rem, value.as_ref())
2300            }
2301            _ => _config_err!(
2302                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2303                key
2304            ),
2305        }
2306    }
2307}
2308
2309#[cfg(feature = "parquet_encryption")]
2310impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2311    fn from(val: ConfigFileDecryptionProperties) -> Self {
2312        let mut column_names: Vec<&str> = Vec::new();
2313        let mut column_keys: Vec<Vec<u8>> = Vec::new();
2314
2315        for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2316            column_names.push(col_name.as_str());
2317            column_keys.push(
2318                hex::decode(&decryption_properties.column_key_as_hex)
2319                    .expect("Invalid column decryption key"),
2320            );
2321        }
2322
2323        let mut fep = FileDecryptionProperties::builder(
2324            hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2325        )
2326        .with_column_keys(column_names, column_keys)
2327        .unwrap();
2328
2329        if !val.footer_signature_verification {
2330            fep = fep.disable_footer_signature_verification();
2331        }
2332
2333        if !val.aad_prefix_as_hex.is_empty() {
2334            let aad_prefix =
2335                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2336            fep = fep.with_aad_prefix(aad_prefix);
2337        }
2338
2339        fep.build().unwrap()
2340    }
2341}
2342
2343#[cfg(feature = "parquet_encryption")]
2344impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties {
2345    fn from(f: &FileDecryptionProperties) -> Self {
2346        let (column_names_vec, column_keys_vec) = f.column_keys();
2347        let mut column_decryption_properties: HashMap<
2348            String,
2349            ColumnDecryptionProperties,
2350        > = HashMap::new();
2351        for (i, column_name) in column_names_vec.iter().enumerate() {
2352            let props = ColumnDecryptionProperties {
2353                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2354            };
2355            column_decryption_properties.insert(column_name.clone(), props);
2356        }
2357
2358        let mut aad_prefix: Vec<u8> = Vec::new();
2359        if let Some(prefix) = f.aad_prefix() {
2360            aad_prefix = prefix.clone();
2361        }
2362        ConfigFileDecryptionProperties {
2363            footer_key_as_hex: hex::encode(
2364                f.footer_key(None).unwrap_or_default().as_ref(),
2365            ),
2366            column_decryption_properties,
2367            aad_prefix_as_hex: hex::encode(aad_prefix),
2368            footer_signature_verification: f.check_plaintext_footer_integrity(),
2369        }
2370    }
2371}
2372
2373config_namespace! {
2374    /// Options controlling CSV format
2375    pub struct CsvOptions {
2376        /// Specifies whether there is a CSV header (i.e. the first line
2377        /// consists of is column names). The value `None` indicates that
2378        /// the configuration should be consulted.
2379        pub has_header: Option<bool>, default = None
2380        pub delimiter: u8, default = b','
2381        pub quote: u8, default = b'"'
2382        pub terminator: Option<u8>, default = None
2383        pub escape: Option<u8>, default = None
2384        pub double_quote: Option<bool>, default = None
2385        /// Specifies whether newlines in (quoted) values are supported.
2386        ///
2387        /// Parsing newlines in quoted values may be affected by execution behaviour such as
2388        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2389        /// parsed successfully, which may reduce performance.
2390        ///
2391        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2392        pub newlines_in_values: Option<bool>, default = None
2393        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2394        pub schema_infer_max_rec: Option<usize>, default = None
2395        pub date_format: Option<String>, default = None
2396        pub datetime_format: Option<String>, default = None
2397        pub timestamp_format: Option<String>, default = None
2398        pub timestamp_tz_format: Option<String>, default = None
2399        pub time_format: Option<String>, default = None
2400        // The output format for Nulls in the CSV writer.
2401        pub null_value: Option<String>, default = None
2402        // The input regex for Nulls when loading CSVs.
2403        pub null_regex: Option<String>, default = None
2404        pub comment: Option<u8>, default = None
2405    }
2406}
2407
2408impl CsvOptions {
2409    /// Set a limit in terms of records to scan to infer the schema
2410    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2411    pub fn with_compression(
2412        mut self,
2413        compression_type_variant: CompressionTypeVariant,
2414    ) -> Self {
2415        self.compression = compression_type_variant;
2416        self
2417    }
2418
2419    /// Set a limit in terms of records to scan to infer the schema
2420    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2421    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2422        self.schema_infer_max_rec = Some(max_rec);
2423        self
2424    }
2425
2426    /// Set true to indicate that the first line is a header.
2427    /// - default to true
2428    pub fn with_has_header(mut self, has_header: bool) -> Self {
2429        self.has_header = Some(has_header);
2430        self
2431    }
2432
2433    /// Returns true if the first line is a header. If format options does not
2434    /// specify whether there is a header, returns `None` (indicating that the
2435    /// configuration should be consulted).
2436    pub fn has_header(&self) -> Option<bool> {
2437        self.has_header
2438    }
2439
2440    /// The character separating values within a row.
2441    /// - default to ','
2442    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2443        self.delimiter = delimiter;
2444        self
2445    }
2446
2447    /// The quote character in a row.
2448    /// - default to '"'
2449    pub fn with_quote(mut self, quote: u8) -> Self {
2450        self.quote = quote;
2451        self
2452    }
2453
2454    /// The character that terminates a row.
2455    /// - default to None (CRLF)
2456    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2457        self.terminator = terminator;
2458        self
2459    }
2460
2461    /// The escape character in a row.
2462    /// - default is None
2463    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2464        self.escape = escape;
2465        self
2466    }
2467
2468    /// Set true to indicate that the CSV quotes should be doubled.
2469    /// - default to true
2470    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2471        self.double_quote = Some(double_quote);
2472        self
2473    }
2474
2475    /// Specifies whether newlines in (quoted) values are supported.
2476    ///
2477    /// Parsing newlines in quoted values may be affected by execution behaviour such as
2478    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2479    /// parsed successfully, which may reduce performance.
2480    ///
2481    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2482    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2483        self.newlines_in_values = Some(newlines_in_values);
2484        self
2485    }
2486
2487    /// Set a `CompressionTypeVariant` of CSV
2488    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2489    pub fn with_file_compression_type(
2490        mut self,
2491        compression: CompressionTypeVariant,
2492    ) -> Self {
2493        self.compression = compression;
2494        self
2495    }
2496
2497    /// The delimiter character.
2498    pub fn delimiter(&self) -> u8 {
2499        self.delimiter
2500    }
2501
2502    /// The quote character.
2503    pub fn quote(&self) -> u8 {
2504        self.quote
2505    }
2506
2507    /// The terminator character.
2508    pub fn terminator(&self) -> Option<u8> {
2509        self.terminator
2510    }
2511
2512    /// The escape character.
2513    pub fn escape(&self) -> Option<u8> {
2514        self.escape
2515    }
2516}
2517
2518config_namespace! {
2519    /// Options controlling JSON format
2520    pub struct JsonOptions {
2521        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2522        pub schema_infer_max_rec: Option<usize>, default = None
2523    }
2524}
2525
2526pub trait OutputFormatExt: Display {}
2527
2528#[derive(Debug, Clone, PartialEq)]
2529#[allow(clippy::large_enum_variant)]
2530pub enum OutputFormat {
2531    CSV(CsvOptions),
2532    JSON(JsonOptions),
2533    #[cfg(feature = "parquet")]
2534    PARQUET(TableParquetOptions),
2535    AVRO,
2536    ARROW,
2537}
2538
2539impl Display for OutputFormat {
2540    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2541        let out = match self {
2542            OutputFormat::CSV(_) => "csv",
2543            OutputFormat::JSON(_) => "json",
2544            #[cfg(feature = "parquet")]
2545            OutputFormat::PARQUET(_) => "parquet",
2546            OutputFormat::AVRO => "avro",
2547            OutputFormat::ARROW => "arrow",
2548        };
2549        write!(f, "{out}")
2550    }
2551}
2552
2553#[cfg(test)]
2554mod tests {
2555    use crate::config::{
2556        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2557        Extensions, TableOptions,
2558    };
2559    use std::any::Any;
2560    use std::collections::HashMap;
2561
2562    #[derive(Default, Debug, Clone)]
2563    pub struct TestExtensionConfig {
2564        /// Should "foo" be replaced by "bar"?
2565        pub properties: HashMap<String, String>,
2566    }
2567
2568    impl ExtensionOptions for TestExtensionConfig {
2569        fn as_any(&self) -> &dyn Any {
2570            self
2571        }
2572
2573        fn as_any_mut(&mut self) -> &mut dyn Any {
2574            self
2575        }
2576
2577        fn cloned(&self) -> Box<dyn ExtensionOptions> {
2578            Box::new(self.clone())
2579        }
2580
2581        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2582            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2583            assert_eq!(key, "test");
2584            self.properties.insert(rem.to_owned(), value.to_owned());
2585            Ok(())
2586        }
2587
2588        fn entries(&self) -> Vec<ConfigEntry> {
2589            self.properties
2590                .iter()
2591                .map(|(k, v)| ConfigEntry {
2592                    key: k.into(),
2593                    value: Some(v.into()),
2594                    description: "",
2595                })
2596                .collect()
2597        }
2598    }
2599
2600    impl ConfigExtension for TestExtensionConfig {
2601        const PREFIX: &'static str = "test";
2602    }
2603
2604    #[test]
2605    fn create_table_config() {
2606        let mut extension = Extensions::new();
2607        extension.insert(TestExtensionConfig::default());
2608        let table_config = TableOptions::new().with_extensions(extension);
2609        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2610        assert!(kafka_config.is_some())
2611    }
2612
2613    #[test]
2614    fn alter_test_extension_config() {
2615        let mut extension = Extensions::new();
2616        extension.insert(TestExtensionConfig::default());
2617        let mut table_config = TableOptions::new().with_extensions(extension);
2618        table_config.set_config_format(ConfigFileType::CSV);
2619        table_config.set("format.delimiter", ";").unwrap();
2620        assert_eq!(table_config.csv.delimiter, b';');
2621        table_config.set("test.bootstrap.servers", "asd").unwrap();
2622        let kafka_config = table_config
2623            .extensions
2624            .get::<TestExtensionConfig>()
2625            .unwrap();
2626        assert_eq!(
2627            kafka_config.properties.get("bootstrap.servers").unwrap(),
2628            "asd"
2629        );
2630    }
2631
2632    #[test]
2633    fn csv_u8_table_options() {
2634        let mut table_config = TableOptions::new();
2635        table_config.set_config_format(ConfigFileType::CSV);
2636        table_config.set("format.delimiter", ";").unwrap();
2637        assert_eq!(table_config.csv.delimiter as char, ';');
2638        table_config.set("format.escape", "\"").unwrap();
2639        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2640        table_config.set("format.escape", "\'").unwrap();
2641        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2642    }
2643
2644    #[test]
2645    fn warning_only_not_default() {
2646        use std::sync::atomic::AtomicUsize;
2647        static COUNT: AtomicUsize = AtomicUsize::new(0);
2648        use log::{Level, LevelFilter, Metadata, Record};
2649        struct SimpleLogger;
2650        impl log::Log for SimpleLogger {
2651            fn enabled(&self, metadata: &Metadata) -> bool {
2652                metadata.level() <= Level::Info
2653            }
2654
2655            fn log(&self, record: &Record) {
2656                if self.enabled(record.metadata()) {
2657                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2658                }
2659            }
2660            fn flush(&self) {}
2661        }
2662        log::set_logger(&SimpleLogger).unwrap();
2663        log::set_max_level(LevelFilter::Info);
2664        let mut sql_parser_options = crate::config::SqlParserOptions::default();
2665        sql_parser_options
2666            .set("enable_options_value_normalization", "false")
2667            .unwrap();
2668        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2669        sql_parser_options
2670            .set("enable_options_value_normalization", "true")
2671            .unwrap();
2672        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2673    }
2674
2675    #[cfg(feature = "parquet")]
2676    #[test]
2677    fn parquet_table_options() {
2678        let mut table_config = TableOptions::new();
2679        table_config.set_config_format(ConfigFileType::PARQUET);
2680        table_config
2681            .set("format.bloom_filter_enabled::col1", "true")
2682            .unwrap();
2683        assert_eq!(
2684            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2685            Some(true)
2686        );
2687    }
2688
2689    #[cfg(feature = "parquet_encryption")]
2690    #[test]
2691    fn parquet_table_encryption() {
2692        use crate::config::{
2693            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
2694        };
2695        use parquet::encryption::decrypt::FileDecryptionProperties;
2696        use parquet::encryption::encrypt::FileEncryptionProperties;
2697
2698        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
2699        let column_names = vec!["double_field", "float_field"];
2700        let column_keys =
2701            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
2702
2703        let file_encryption_properties =
2704            FileEncryptionProperties::builder(footer_key.clone())
2705                .with_column_keys(column_names.clone(), column_keys.clone())
2706                .unwrap()
2707                .build()
2708                .unwrap();
2709
2710        let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
2711            .with_column_keys(column_names.clone(), column_keys.clone())
2712            .unwrap()
2713            .build()
2714            .unwrap();
2715
2716        // Test round-trip
2717        let config_encrypt: ConfigFileEncryptionProperties =
2718            (&file_encryption_properties).into();
2719        let encryption_properties_built: FileEncryptionProperties =
2720            config_encrypt.clone().into();
2721        assert_eq!(file_encryption_properties, encryption_properties_built);
2722
2723        let config_decrypt: ConfigFileDecryptionProperties =
2724            (&decryption_properties).into();
2725        let decryption_properties_built: FileDecryptionProperties =
2726            config_decrypt.clone().into();
2727        assert_eq!(decryption_properties, decryption_properties_built);
2728
2729        ///////////////////////////////////////////////////////////////////////////////////
2730        // Test encryption config
2731
2732        // Display original encryption config
2733        // println!("{:#?}", config_encrypt);
2734
2735        let mut table_config = TableOptions::new();
2736        table_config.set_config_format(ConfigFileType::PARQUET);
2737        table_config
2738            .parquet
2739            .set(
2740                "crypto.file_encryption.encrypt_footer",
2741                config_encrypt.encrypt_footer.to_string().as_str(),
2742            )
2743            .unwrap();
2744        table_config
2745            .parquet
2746            .set(
2747                "crypto.file_encryption.footer_key_as_hex",
2748                config_encrypt.footer_key_as_hex.as_str(),
2749            )
2750            .unwrap();
2751
2752        for (i, col_name) in column_names.iter().enumerate() {
2753            let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
2754            let value = hex::encode(column_keys[i].clone());
2755            table_config
2756                .parquet
2757                .set(key.as_str(), value.as_str())
2758                .unwrap();
2759        }
2760
2761        // Print matching final encryption config
2762        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
2763
2764        assert_eq!(
2765            table_config.parquet.crypto.file_encryption,
2766            Some(config_encrypt)
2767        );
2768
2769        ///////////////////////////////////////////////////////////////////////////////////
2770        // Test decryption config
2771
2772        // Display original decryption config
2773        // println!("{:#?}", config_decrypt);
2774
2775        let mut table_config = TableOptions::new();
2776        table_config.set_config_format(ConfigFileType::PARQUET);
2777        table_config
2778            .parquet
2779            .set(
2780                "crypto.file_decryption.footer_key_as_hex",
2781                config_decrypt.footer_key_as_hex.as_str(),
2782            )
2783            .unwrap();
2784
2785        for (i, col_name) in column_names.iter().enumerate() {
2786            let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
2787            let value = hex::encode(column_keys[i].clone());
2788            table_config
2789                .parquet
2790                .set(key.as_str(), value.as_str())
2791                .unwrap();
2792        }
2793
2794        // Print matching final decryption config
2795        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
2796
2797        assert_eq!(
2798            table_config.parquet.crypto.file_decryption,
2799            Some(config_decrypt.clone())
2800        );
2801
2802        // Set config directly
2803        let mut table_config = TableOptions::new();
2804        table_config.set_config_format(ConfigFileType::PARQUET);
2805        table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
2806        assert_eq!(
2807            table_config.parquet.crypto.file_decryption,
2808            Some(config_decrypt.clone())
2809        );
2810    }
2811
2812    #[cfg(feature = "parquet")]
2813    #[test]
2814    fn parquet_table_options_config_entry() {
2815        let mut table_config = TableOptions::new();
2816        table_config.set_config_format(ConfigFileType::PARQUET);
2817        table_config
2818            .set("format.bloom_filter_enabled::col1", "true")
2819            .unwrap();
2820        let entries = table_config.entries();
2821        assert!(entries
2822            .iter()
2823            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2824    }
2825
2826    #[cfg(feature = "parquet")]
2827    #[test]
2828    fn parquet_table_options_config_metadata_entry() {
2829        let mut table_config = TableOptions::new();
2830        table_config.set_config_format(ConfigFileType::PARQUET);
2831        table_config.set("format.metadata::key1", "").unwrap();
2832        table_config.set("format.metadata::key2", "value2").unwrap();
2833        table_config
2834            .set("format.metadata::key3", "value with spaces ")
2835            .unwrap();
2836        table_config
2837            .set("format.metadata::key4", "value with special chars :: :")
2838            .unwrap();
2839
2840        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2841        assert_eq!(parsed_metadata.get("should not exist1"), None);
2842        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2843        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2844        assert_eq!(
2845            parsed_metadata.get("key3"),
2846            Some(&Some("value with spaces ".into()))
2847        );
2848        assert_eq!(
2849            parsed_metadata.get("key4"),
2850            Some(&Some("value with special chars :: :".into()))
2851        );
2852
2853        // duplicate keys are overwritten
2854        table_config.set("format.metadata::key_dupe", "A").unwrap();
2855        table_config.set("format.metadata::key_dupe", "B").unwrap();
2856        let parsed_metadata = table_config.parquet.key_value_metadata;
2857        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2858    }
2859}