Skip to main content

datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26use crate::parquet_config::DFParquetWriterVersion;
27use crate::parsers::CompressionTypeVariant;
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30#[cfg(feature = "parquet_encryption")]
31use hex;
32use std::any::Any;
33use std::collections::{BTreeMap, HashMap};
34use std::error::Error;
35use std::fmt::{self, Display};
36use std::str::FromStr;
37#[cfg(feature = "parquet_encryption")]
38use std::sync::Arc;
39
40/// A macro that wraps a configuration struct and automatically derives
41/// [`Default`] and [`ConfigField`] for it, allowing it to be used
42/// in the [`ConfigOptions`] configuration tree.
43///
44/// `transform` is used to normalize values before parsing.
45///
46/// For example,
47///
48/// ```ignore
49/// config_namespace! {
50///    /// Amazing config
51///    pub struct MyConfig {
52///        /// Field 1 doc
53///        field1: String, transform = str::to_lowercase, default = "".to_string()
54///
55///        /// Field 2 doc
56///        field2: usize, default = 232
57///
58///        /// Field 3 doc
59///        field3: Option<usize>, default = None
60///    }
61/// }
62/// ```
63///
64/// Will generate
65///
66/// ```ignore
67/// /// Amazing config
68/// #[derive(Debug, Clone)]
69/// #[non_exhaustive]
70/// pub struct MyConfig {
71///     /// Field 1 doc
72///     field1: String,
73///     /// Field 2 doc
74///     field2: usize,
75///     /// Field 3 doc
76///     field3: Option<usize>,
77/// }
78/// impl ConfigField for MyConfig {
79///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
80///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
81///         match key {
82///             "field1" => {
83///                 let value = str::to_lowercase(value);
84///                 self.field1.set(rem, value.as_ref())
85///             },
86///             "field2" => self.field2.set(rem, value.as_ref()),
87///             "field3" => self.field3.set(rem, value.as_ref()),
88///             _ => _internal_err!(
89///                 "Config value \"{}\" not found on MyConfig",
90///                 key
91///             ),
92///         }
93///     }
94///
95///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
96///         let key = format!("{}.field1", key_prefix);
97///         let desc = "Field 1 doc";
98///         self.field1.visit(v, key.as_str(), desc);
99///         let key = format!("{}.field2", key_prefix);
100///         let desc = "Field 2 doc";
101///         self.field2.visit(v, key.as_str(), desc);
102///         let key = format!("{}.field3", key_prefix);
103///         let desc = "Field 3 doc";
104///         self.field3.visit(v, key.as_str(), desc);
105///     }
106/// }
107///
108/// impl Default for MyConfig {
109///     fn default() -> Self {
110///         Self {
111///             field1: "".to_string(),
112///             field2: 232,
113///             field3: None,
114///         }
115///     }
116/// }
117/// ```
118///
119/// NB: Misplaced commas may result in nonsensical errors
120#[macro_export]
121macro_rules! config_namespace {
122    (
123        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
124        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
125        $(#[allow($($struct_de:tt)*)])?
126        $vis:vis struct $struct_name:ident {
127            $(
128                $(#[doc = $d:tt])* // Field-level documentation attributes
129                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
130                $(#[allow($($field_de:tt)*)])?
131                $field_vis:vis $field_name:ident : $field_type:ty,
132                $(warn = $warn:expr,)?
133                $(transform = $transform:expr,)?
134                default = $default:expr
135            )*$(,)*
136        }
137    ) => {
138        $(#[doc = $struct_d])* // Apply struct documentation
139        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
140        $(#[allow($($struct_de)*)])?
141        #[derive(Debug, Clone, PartialEq)]
142        $vis struct $struct_name {
143            $(
144                $(#[doc = $d])* // Apply field documentation
145                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
146                $(#[allow($($field_de)*)])?
147                $field_vis $field_name: $field_type,
148            )*
149        }
150
151        impl $crate::config::ConfigField for $struct_name {
152            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
153                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
154                match key {
155                    $(
156                        stringify!($field_name) => {
157                            // Safely apply deprecated attribute if present
158                            // $(#[allow(deprecated)])?
159                            {
160                                $(let value = $transform(value);)? // Apply transformation if specified
161                                let ret = self.$field_name.set(rem, value.as_ref());
162
163                                $(if !$warn.is_empty() {
164                                    let default: $field_type = $default;
165                                    if default != self.$field_name {
166                                        log::warn!($warn);
167                                    }
168                                })? // Log warning if specified, and the value is not the default
169                                ret
170                            }
171                        },
172                    )*
173                    _ => return $crate::error::_config_err!(
174                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
175                    )
176                }
177            }
178
179            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
180                $(
181                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
182                    let desc = concat!($($d),*).trim();
183                    self.$field_name.visit(v, key.as_str(), desc);
184                )*
185            }
186
187            fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
188                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
189                match key {
190                    $(
191                        stringify!($field_name) => {
192                                    {
193                                if rem.is_empty() {
194                                    let default_value: $field_type = $default;
195                                    self.$field_name = default_value;
196                                    Ok(())
197                                } else {
198                                    self.$field_name.reset(rem)
199                                }
200                            }
201                        },
202                    )*
203                    _ => $crate::error::_config_err!(
204                        "Config value \"{}\" not found on {}",
205                        key,
206                        stringify!($struct_name)
207                    ),
208                }
209            }
210        }
211        impl Default for $struct_name {
212            fn default() -> Self {
213                Self {
214                    $($field_name: $default),*
215                }
216            }
217        }
218    }
219}
220
221config_namespace! {
222    /// Options related to catalog and directory scanning
223    ///
224    /// See also: [`SessionConfig`]
225    ///
226    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
227    pub struct CatalogOptions {
228        /// Whether the default catalog and schema should be created automatically.
229        pub create_default_catalog_and_schema: bool, default = true
230
231        /// The default catalog name - this impacts what SQL queries use if not specified
232        pub default_catalog: String, default = "datafusion".to_string()
233
234        /// The default schema name - this impacts what SQL queries use if not specified
235        pub default_schema: String, default = "public".to_string()
236
237        /// Should DataFusion provide access to `information_schema`
238        /// virtual tables for displaying schema information
239        pub information_schema: bool, default = false
240
241        /// Location scanned to load tables for `default` schema
242        pub location: Option<String>, default = None
243
244        /// Type of `TableProvider` to use when loading `default` schema
245        pub format: Option<String>, default = None
246
247        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
248        /// if not specified explicitly in the statement.
249        pub has_header: bool, default = true
250
251        /// Specifies whether newlines in (quoted) CSV values are supported.
252        ///
253        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
254        /// if not specified explicitly in the statement.
255        ///
256        /// Parsing newlines in quoted values may be affected by execution behaviour such as
257        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
258        /// parsed successfully, which may reduce performance.
259        pub newlines_in_values: bool, default = false
260    }
261}
262
263config_namespace! {
264    /// Options related to SQL parser
265    ///
266    /// See also: [`SessionConfig`]
267    ///
268    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
269    pub struct SqlParserOptions {
270        /// When set to true, SQL parser will parse float as decimal type
271        pub parse_float_as_decimal: bool, default = false
272
273        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
274        pub enable_ident_normalization: bool, default = true
275
276        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
277        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
278        /// are normalized automatically.
279        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
280
281        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
282        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
283        pub dialect: Dialect, default = Dialect::Generic
284        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
285
286        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
287        /// ignore the length. If false, error if a `VARCHAR` with a length is
288        /// specified. The Arrow type system does not have a notion of maximum
289        /// string length and thus DataFusion can not enforce such limits.
290        pub support_varchar_with_length: bool, default = true
291
292        /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
293        /// If false, they are mapped to `Utf8`.
294        /// Default is true.
295        pub map_string_types_to_utf8view: bool, default = true
296
297        /// When set to true, the source locations relative to the original SQL
298        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
299        /// and recorded in the logical plan nodes.
300        pub collect_spans: bool, default = false
301
302        /// Specifies the recursion depth limit when parsing complex SQL Queries
303        pub recursion_limit: usize, default = 50
304
305        /// Specifies the default null ordering for query results. There are 4 options:
306        /// - `nulls_max`: Nulls appear last in ascending order.
307        /// - `nulls_min`: Nulls appear first in ascending order.
308        /// - `nulls_first`: Nulls always be first in any order.
309        /// - `nulls_last`: Nulls always be last in any order.
310        ///
311        /// By default, `nulls_max` is used to follow Postgres's behavior.
312        /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
313        pub default_null_ordering: String, default = "nulls_max".to_string()
314    }
315}
316
317/// This is the SQL dialect used by DataFusion's parser.
318/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
319/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
320#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
321pub enum Dialect {
322    #[default]
323    Generic,
324    MySQL,
325    PostgreSQL,
326    Hive,
327    SQLite,
328    Snowflake,
329    Redshift,
330    MsSQL,
331    ClickHouse,
332    BigQuery,
333    Ansi,
334    DuckDB,
335    Databricks,
336}
337
338impl AsRef<str> for Dialect {
339    fn as_ref(&self) -> &str {
340        match self {
341            Self::Generic => "generic",
342            Self::MySQL => "mysql",
343            Self::PostgreSQL => "postgresql",
344            Self::Hive => "hive",
345            Self::SQLite => "sqlite",
346            Self::Snowflake => "snowflake",
347            Self::Redshift => "redshift",
348            Self::MsSQL => "mssql",
349            Self::ClickHouse => "clickhouse",
350            Self::BigQuery => "bigquery",
351            Self::Ansi => "ansi",
352            Self::DuckDB => "duckdb",
353            Self::Databricks => "databricks",
354        }
355    }
356}
357
358impl FromStr for Dialect {
359    type Err = DataFusionError;
360
361    fn from_str(s: &str) -> Result<Self, Self::Err> {
362        let value = match s.to_ascii_lowercase().as_str() {
363            "generic" => Self::Generic,
364            "mysql" => Self::MySQL,
365            "postgresql" | "postgres" => Self::PostgreSQL,
366            "hive" => Self::Hive,
367            "sqlite" => Self::SQLite,
368            "snowflake" => Self::Snowflake,
369            "redshift" => Self::Redshift,
370            "mssql" => Self::MsSQL,
371            "clickhouse" => Self::ClickHouse,
372            "bigquery" => Self::BigQuery,
373            "ansi" => Self::Ansi,
374            "duckdb" => Self::DuckDB,
375            "databricks" => Self::Databricks,
376            other => {
377                let error_message = format!(
378                    "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
379                );
380                return Err(DataFusionError::Configuration(error_message));
381            }
382        };
383        Ok(value)
384    }
385}
386
387impl ConfigField for Dialect {
388    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
389        v.some(key, self, description)
390    }
391
392    fn set(&mut self, _: &str, value: &str) -> Result<()> {
393        *self = Self::from_str(value)?;
394        Ok(())
395    }
396}
397
398impl Display for Dialect {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        let str = self.as_ref();
401        write!(f, "{str}")
402    }
403}
404
405#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
406pub enum SpillCompression {
407    Zstd,
408    Lz4Frame,
409    #[default]
410    Uncompressed,
411}
412
413impl FromStr for SpillCompression {
414    type Err = DataFusionError;
415
416    fn from_str(s: &str) -> Result<Self, Self::Err> {
417        match s.to_ascii_lowercase().as_str() {
418            "zstd" => Ok(Self::Zstd),
419            "lz4_frame" => Ok(Self::Lz4Frame),
420            "uncompressed" | "" => Ok(Self::Uncompressed),
421            other => Err(DataFusionError::Configuration(format!(
422                "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
423            ))),
424        }
425    }
426}
427
428impl ConfigField for SpillCompression {
429    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
430        v.some(key, self, description)
431    }
432
433    fn set(&mut self, _: &str, value: &str) -> Result<()> {
434        *self = SpillCompression::from_str(value)?;
435        Ok(())
436    }
437}
438
439impl Display for SpillCompression {
440    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441        let str = match self {
442            Self::Zstd => "zstd",
443            Self::Lz4Frame => "lz4_frame",
444            Self::Uncompressed => "uncompressed",
445        };
446        write!(f, "{str}")
447    }
448}
449
450impl From<SpillCompression> for Option<CompressionType> {
451    fn from(c: SpillCompression) -> Self {
452        match c {
453            SpillCompression::Zstd => Some(CompressionType::ZSTD),
454            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
455            SpillCompression::Uncompressed => None,
456        }
457    }
458}
459
460config_namespace! {
461    /// Options related to query execution
462    ///
463    /// See also: [`SessionConfig`]
464    ///
465    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
466    pub struct ExecutionOptions {
467        /// Default batch size while creating new batches, it's especially useful for
468        /// buffer-in-memory batches since creating tiny batches would result in too much
469        /// metadata memory consumption
470        pub batch_size: usize, default = 8192
471
472        /// A perfect hash join (see `HashJoinExec` for more details) will be considered
473        /// if the range of keys (max - min) on the build side is < this threshold.
474        /// This provides a fast path for joins with very small key ranges,
475        /// bypassing the density check.
476        ///
477        /// Currently only supports cases where build_side.num_rows() < u32::MAX.
478        /// Support for build_side.num_rows() >= u32::MAX will be added in the future.
479        pub perfect_hash_join_small_build_threshold: usize, default = 1024
480
481        /// The minimum required density of join keys on the build side to consider a
482        /// perfect hash join (see `HashJoinExec` for more details). Density is calculated as:
483        /// `(number of rows) / (max_key - min_key + 1)`.
484        /// A perfect hash join may be used if the actual key density > this
485        /// value.
486        ///
487        /// Currently only supports cases where build_side.num_rows() < u32::MAX.
488        /// Support for build_side.num_rows() >= u32::MAX will be added in the future.
489        pub perfect_hash_join_min_key_density: f64, default = 0.15
490
491        /// When set to true, record batches will be examined between each operator and
492        /// small batches will be coalesced into larger batches. This is helpful when there
493        /// are highly selective filters or joins that could produce tiny output batches. The
494        /// target batch size is determined by the configuration setting
495        pub coalesce_batches: bool, default = true
496
497        /// Should DataFusion collect statistics when first creating a table.
498        /// Has no effect after the table is created. Applies to the default
499        /// `ListingTableProvider` in DataFusion. Defaults to true.
500        pub collect_statistics: bool, default = true
501
502        /// Number of partitions for query execution. Increasing partitions can increase
503        /// concurrency.
504        ///
505        /// Defaults to the number of CPU cores on the system
506        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
507
508        /// The default time zone
509        ///
510        /// Some functions, e.g. `now` return timestamps in this time zone
511        pub time_zone: Option<String>, default = None
512
513        /// Parquet options
514        pub parquet: ParquetOptions, default = Default::default()
515
516        /// Fan-out during initial physical planning.
517        ///
518        /// This is mostly use to plan `UNION` children in parallel.
519        ///
520        /// Defaults to the number of CPU cores on the system
521        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
522
523        /// When set to true, skips verifying that the schema produced by
524        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
525        /// schema of the input plan.
526        ///
527        /// When set to false, if the schema does not match exactly
528        /// (including nullability and metadata), a planning error will be raised.
529        ///
530        /// This is used to workaround bugs in the planner that are now caught by
531        /// the new schema verification step.
532        pub skip_physical_aggregate_schema_check: bool, default = false
533
534        /// Sets the compression codec used when spilling data to disk.
535        ///
536        /// Since datafusion writes spill files using the Arrow IPC Stream format,
537        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
538        /// Valid values are: uncompressed, lz4_frame, zstd.
539        /// Note: lz4_frame offers faster (de)compression, but typically results in
540        /// larger spill files. In contrast, zstd achieves
541        /// higher compression ratios at the cost of slower (de)compression speed.
542        pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
543
544        /// Specifies the reserved memory for each spillable sort operation to
545        /// facilitate an in-memory merge.
546        ///
547        /// When a sort operation spills to disk, the in-memory data must be
548        /// sorted and merged before being written to a file. This setting reserves
549        /// a specific amount of memory for that in-memory sort/merge process.
550        ///
551        /// Note: This setting is irrelevant if the sort operation cannot spill
552        /// (i.e., if there's no `DiskManager` configured).
553        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
554
555        /// When sorting, below what size should data be concatenated
556        /// and sorted in a single RecordBatch rather than sorted in
557        /// batches and merged.
558        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
559
560        /// Maximum size in bytes for individual spill files before rotating to a new file.
561        ///
562        /// When operators spill data to disk (e.g., RepartitionExec), they write
563        /// multiple batches to the same file until this size limit is reached, then rotate
564        /// to a new file. This reduces syscall overhead compared to one-file-per-batch
565        /// while preventing files from growing too large.
566        ///
567        /// A larger value reduces file creation overhead but may hold more disk space.
568        /// A smaller value creates more files but allows finer-grained space reclamation
569        /// as files can be deleted once fully consumed.
570        ///
571        /// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
572        /// may create spill files larger than the limit.
573        ///
574        /// Default: 128 MB
575        pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024
576
577        /// Number of files to read in parallel when inferring schema and statistics
578        pub meta_fetch_concurrency: usize, default = 32
579
580        /// Guarantees a minimum level of output files running in parallel.
581        /// RecordBatches will be distributed in round robin fashion to each
582        /// parallel writer. Each writer is closed and a new file opened once
583        /// soft_max_rows_per_output_file is reached.
584        pub minimum_parallel_output_files: usize, default = 4
585
586        /// Target number of rows in output files when writing multiple.
587        /// This is a soft max, so it can be exceeded slightly. There also
588        /// will be one file smaller than the limit if the total
589        /// number of rows written is not roughly divisible by the soft max
590        pub soft_max_rows_per_output_file: usize, default = 50000000
591
592        /// This is the maximum number of RecordBatches buffered
593        /// for each output file being worked. Higher values can potentially
594        /// give faster write performance at the cost of higher peak
595        /// memory consumption
596        pub max_buffered_batches_per_output_file: usize, default = 2
597
598        /// Should sub directories be ignored when scanning directories for data
599        /// files. Defaults to true (ignores subdirectories), consistent with
600        /// Hive. Note that this setting does not affect reading partitioned
601        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
602        pub listing_table_ignore_subdirectory: bool, default = true
603
604        /// Should a `ListingTable` created through the `ListingTableFactory` infer table
605        /// partitions from Hive compliant directories. Defaults to true (partition columns are
606        /// inferred and will be represented in the table schema).
607        pub listing_table_factory_infer_partitions: bool, default = true
608
609        /// Should DataFusion support recursive CTEs
610        pub enable_recursive_ctes: bool, default = true
611
612        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
613        /// statistics into the same file groups.
614        /// Currently experimental
615        pub split_file_groups_by_statistics: bool, default = false
616
617        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
618        pub keep_partition_by_columns: bool, default = false
619
620        /// Aggregation ratio (number of distinct groups / number of input rows)
621        /// threshold for skipping partial aggregation. If the value is greater
622        /// then partial aggregation will skip aggregation for further input
623        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
624
625        /// Number of input rows partial aggregation partition should process, before
626        /// aggregation ratio check and trying to switch to skipping aggregation mode
627        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
628
629        /// Should DataFusion use row number estimates at the input to decide
630        /// whether increasing parallelism is beneficial or not. By default,
631        /// only exact row numbers (not estimates) are used for this decision.
632        /// Setting this flag to `true` will likely produce better plans.
633        /// if the source of statistics is accurate.
634        /// We plan to make this the default in the future.
635        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
636
637        /// Should DataFusion enforce batch size in joins or not. By default,
638        /// DataFusion will not enforce batch size in joins. Enforcing batch size
639        /// in joins can reduce memory usage when joining large
640        /// tables with a highly-selective join filter, but is also slightly slower.
641        pub enforce_batch_size_in_joins: bool, default = false
642
643        /// Size (bytes) of data buffer DataFusion uses when writing output files.
644        /// This affects the size of the data chunks that are uploaded to remote
645        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
646        /// written, it may be necessary to increase this size to avoid errors from
647        /// the remote end point.
648        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
649
650        /// Whether to enable ANSI SQL mode.
651        ///
652        /// The flag is experimental and relevant only for DataFusion Spark built-in functions
653        ///
654        /// When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL
655        /// semantics for expressions, casting, and error handling. This means:
656        /// - **Strict type coercion rules:** implicit casts between incompatible types are disallowed.
657        /// - **Standard SQL arithmetic behavior:** operations such as division by zero,
658        ///   numeric overflow, or invalid casts raise runtime errors rather than returning
659        ///   `NULL` or adjusted values.
660        /// - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling.
661        ///
662        /// When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive,
663        /// non-ANSI mode designed for user convenience and backward compatibility. In this mode:
664        /// - Implicit casts between types are allowed (e.g., string to integer when possible).
665        /// - Arithmetic operations are more lenient — for example, `abs()` on the minimum
666        ///   representable integer value returns the input value instead of raising overflow.
667        /// - Division by zero or invalid casts may return `NULL` instead of failing.
668        ///
669        /// # Default
670        /// `false` — ANSI SQL mode is disabled by default.
671        pub enable_ansi_mode: bool, default = false
672    }
673}
674
675config_namespace! {
676    /// Options for reading and writing parquet files
677    ///
678    /// See also: [`SessionConfig`]
679    ///
680    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
681    pub struct ParquetOptions {
682        // The following options affect reading parquet files
683
684        /// (reading) If true, reads the Parquet data page level metadata (the
685        /// Page Index), if present, to reduce the I/O and number of
686        /// rows decoded.
687        pub enable_page_index: bool, default = true
688
689        /// (reading) If true, the parquet reader attempts to skip entire row groups based
690        /// on the predicate in the query and the metadata (min/max values) stored in
691        /// the parquet file
692        pub pruning: bool, default = true
693
694        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
695        /// the file Schema. This setting can help avoid schema conflicts when querying
696        /// multiple parquet files with schemas containing compatible types but different metadata
697        pub skip_metadata: bool, default = true
698
699        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
700        /// bytes of the parquet file optimistically. If not specified, two reads are required:
701        /// One read to fetch the 8-byte parquet footer and
702        /// another to fetch the metadata length encoded in the footer
703        /// Default setting to 512 KiB, which should be sufficient for most parquet files,
704        /// it can reduce one I/O operation per parquet file. If the metadata is larger than
705        /// the hint, two reads will still be performed.
706        pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
707
708        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
709        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
710        pub pushdown_filters: bool, default = false
711
712        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
713        /// will be reordered heuristically to minimize the cost of evaluation. If false,
714        /// the filters are applied in the same order as written in the query
715        pub reorder_filters: bool, default = false
716
717        /// (reading) Force the use of RowSelections for filter results, when
718        /// pushdown_filters is enabled. If false, the reader will automatically
719        /// choose between a RowSelection and a Bitmap based on the number and
720        /// pattern of selected rows.
721        pub force_filter_selections: bool, default = false
722
723        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
724        /// and `Binary/BinaryLarge` with `BinaryView`.
725        pub schema_force_view_types: bool, default = true
726
727        /// (reading) If true, parquet reader will read columns of
728        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
729        ///
730        /// Parquet files generated by some legacy writers do not correctly set
731        /// the UTF8 flag for strings, causing string columns to be loaded as
732        /// BLOB instead.
733        pub binary_as_string: bool, default = false
734
735        /// (reading) If true, parquet reader will read columns of
736        /// physical type int96 as originating from a different resolution
737        /// than nanosecond. This is useful for reading data from systems like Spark
738        /// which stores microsecond resolution timestamps in an int96 allowing it
739        /// to write values with a larger date range than 64-bit timestamps with
740        /// nanosecond resolution.
741        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
742
743        /// (reading) Use any available bloom filters when reading parquet files
744        pub bloom_filter_on_read: bool, default = true
745
746        /// (reading) The maximum predicate cache size, in bytes. When
747        /// `pushdown_filters` is enabled, sets the maximum memory used to cache
748        /// the results of predicate evaluation between filter evaluation and
749        /// output generation. Decreasing this value will reduce memory usage,
750        /// but may increase IO and CPU usage. None means use the default
751        /// parquet reader setting. 0 means no caching.
752        pub max_predicate_cache_size: Option<usize>, default = None
753
754        // The following options affect writing to parquet files
755        // and map to parquet::file::properties::WriterProperties
756
757        /// (writing) Sets best effort maximum size of data page in bytes
758        pub data_pagesize_limit: usize, default = 1024 * 1024
759
760        /// (writing) Sets write_batch_size in rows
761        pub write_batch_size: usize, default = 1024
762
763        /// (writing) Sets parquet writer version
764        /// valid values are "1.0" and "2.0"
765        pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()
766
767        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
768        ///
769        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
770        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
771        pub skip_arrow_metadata: bool, default = false
772
773        /// (writing) Sets default parquet compression codec.
774        /// Valid values are: uncompressed, snappy, gzip(level),
775        /// brotli(level), lz4, zstd(level), and lz4_raw.
776        /// These values are not case sensitive. If NULL, uses
777        /// default parquet writer setting
778        ///
779        /// Note that this default setting is not the same as
780        /// the default parquet writer setting.
781        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
782
783        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
784        /// default parquet writer setting
785        pub dictionary_enabled: Option<bool>, default = Some(true)
786
787        /// (writing) Sets best effort maximum dictionary page size, in bytes
788        pub dictionary_page_size_limit: usize, default = 1024 * 1024
789
790        /// (writing) Sets if statistics are enabled for any column
791        /// Valid values are: "none", "chunk", and "page"
792        /// These values are not case sensitive. If NULL, uses
793        /// default parquet writer setting
794        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
795
796        /// (writing) Target maximum number of rows in each row group (defaults to 1M
797        /// rows). Writing larger row groups requires more memory to write, but
798        /// can get better compression and be faster to read.
799        pub max_row_group_size: usize, default =  1024 * 1024
800
801        /// (writing) Sets "created by" property
802        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
803
804        /// (writing) Sets column index truncate length
805        pub column_index_truncate_length: Option<usize>, default = Some(64)
806
807        /// (writing) Sets statistics truncate length. If NULL, uses
808        /// default parquet writer setting
809        pub statistics_truncate_length: Option<usize>, default = Some(64)
810
811        /// (writing) Sets best effort maximum number of rows in data page
812        pub data_page_row_count_limit: usize, default = 20_000
813
814        /// (writing)  Sets default encoding for any column.
815        /// Valid values are: plain, plain_dictionary, rle,
816        /// bit_packed, delta_binary_packed, delta_length_byte_array,
817        /// delta_byte_array, rle_dictionary, and byte_stream_split.
818        /// These values are not case sensitive. If NULL, uses
819        /// default parquet writer setting
820        pub encoding: Option<String>, transform = str::to_lowercase, default = None
821
822        /// (writing) Write bloom filters for all columns when creating parquet files
823        pub bloom_filter_on_write: bool, default = false
824
825        /// (writing) Sets bloom filter false positive probability. If NULL, uses
826        /// default parquet writer setting
827        pub bloom_filter_fpp: Option<f64>, default = None
828
829        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
830        /// default parquet writer setting
831        pub bloom_filter_ndv: Option<u64>, default = None
832
833        /// (writing) Controls whether DataFusion will attempt to speed up writing
834        /// parquet files by serializing them in parallel. Each column
835        /// in each row group in each output file are serialized in parallel
836        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
837        pub allow_single_file_parallelism: bool, default = true
838
839        /// (writing) By default parallel parquet writer is tuned for minimum
840        /// memory usage in a streaming execution plan. You may see
841        /// a performance benefit when writing large parquet files
842        /// by increasing maximum_parallel_row_group_writers and
843        /// maximum_buffered_record_batches_per_stream if your system
844        /// has idle cores and can tolerate additional memory usage.
845        /// Boosting these values is likely worthwhile when
846        /// writing out already in-memory data, such as from a cached
847        /// data frame.
848        pub maximum_parallel_row_group_writers: usize, default = 1
849
850        /// (writing) By default parallel parquet writer is tuned for minimum
851        /// memory usage in a streaming execution plan. You may see
852        /// a performance benefit when writing large parquet files
853        /// by increasing maximum_parallel_row_group_writers and
854        /// maximum_buffered_record_batches_per_stream if your system
855        /// has idle cores and can tolerate additional memory usage.
856        /// Boosting these values is likely worthwhile when
857        /// writing out already in-memory data, such as from a cached
858        /// data frame.
859        pub maximum_buffered_record_batches_per_stream: usize, default = 2
860    }
861}
862
863config_namespace! {
864    /// Options for configuring Parquet Modular Encryption
865    ///
866    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
867    pub struct ParquetEncryptionOptions {
868        /// Optional file decryption properties
869        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
870
871        /// Optional file encryption properties
872        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
873
874        /// Identifier for the encryption factory to use to create file encryption and decryption properties.
875        /// Encryption factories can be registered in the runtime environment with
876        /// `RuntimeEnv::register_parquet_encryption_factory`.
877        pub factory_id: Option<String>, default = None
878
879        /// Any encryption factory specific options
880        pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
881    }
882}
883
884impl ParquetEncryptionOptions {
885    /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
886    pub fn configure_factory(
887        &mut self,
888        factory_id: &str,
889        config: &impl ExtensionOptions,
890    ) {
891        self.factory_id = Some(factory_id.to_owned());
892        self.factory_options.options.clear();
893        for entry in config.entries() {
894            if let Some(value) = entry.value {
895                self.factory_options.options.insert(entry.key, value);
896            }
897        }
898    }
899}
900
901config_namespace! {
902    /// Options related to query optimization
903    ///
904    /// See also: [`SessionConfig`]
905    ///
906    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
907    pub struct OptimizerOptions {
908        /// When set to true, the optimizer will push a limit operation into
909        /// grouped aggregations which have no aggregate expressions, as a soft limit,
910        /// emitting groups once the limit is reached, before all rows in the group are read.
911        pub enable_distinct_aggregation_soft_limit: bool, default = true
912
913        /// When set to true, the physical plan optimizer will try to add round robin
914        /// repartitioning to increase parallelism to leverage more CPU cores
915        pub enable_round_robin_repartition: bool, default = true
916
917        /// When set to true, the optimizer will attempt to perform limit operations
918        /// during aggregations, if possible
919        pub enable_topk_aggregation: bool, default = true
920
921        /// When set to true, the optimizer will attempt to push limit operations
922        /// past window functions, if possible
923        pub enable_window_limits: bool, default = true
924
925        /// When set to true, the optimizer will attempt to push down TopK dynamic filters
926        /// into the file scan phase.
927        pub enable_topk_dynamic_filter_pushdown: bool, default = true
928
929        /// When set to true, the optimizer will attempt to push down Join dynamic filters
930        /// into the file scan phase.
931        pub enable_join_dynamic_filter_pushdown: bool, default = true
932
933        /// When set to true, the optimizer will attempt to push down Aggregate dynamic filters
934        /// into the file scan phase.
935        pub enable_aggregate_dynamic_filter_pushdown: bool, default = true
936
937        /// When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase.
938        /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
939        /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
940        /// This means that if we already have 10 timestamps in the year 2025
941        /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
942        /// The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown`
943        /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
944        pub enable_dynamic_filter_pushdown: bool, default = true
945
946        /// When set to true, the optimizer will insert filters before a join between
947        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
948        /// filter can add additional overhead when the file format does not fully support
949        /// predicate push down.
950        pub filter_null_join_keys: bool, default = false
951
952        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
953        /// in parallel using the provided `target_partitions` level
954        pub repartition_aggregations: bool, default = true
955
956        /// Minimum total files size in bytes to perform file scan repartitioning.
957        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
958
959        /// Should DataFusion repartition data using the join keys to execute joins in parallel
960        /// using the provided `target_partitions` level
961        pub repartition_joins: bool, default = true
962
963        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
964        /// its inputs do not have any ordering or filtering If the flag is not enabled,
965        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
966        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
967        /// RightAnti, and RightSemi - being produced only at the end of the execution.
968        /// This is not typical in stream processing. Additionally, without proper design for
969        /// long runner execution, all types of joins may encounter out-of-memory errors.
970        pub allow_symmetric_joins_without_pruning: bool, default = true
971
972        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
973        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
974        ///
975        /// For FileSources, only Parquet and CSV formats are currently supported.
976        ///
977        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
978        /// might be partitioned into smaller chunks) for parallel scanning.
979        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
980        /// happen within a single file.
981        ///
982        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
983        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
984        /// the total number of partitions and batches per partition, but does not slice the initial
985        /// record tables provided to the MemTable on creation.
986        pub repartition_file_scans: bool, default = true
987
988        /// Minimum number of distinct partition values required to group files by their
989        /// Hive partition column values (enabling Hash partitioning declaration).
990        ///
991        /// How the option is used:
992        ///     - preserve_file_partitions=0: Disable it.
993        ///     - preserve_file_partitions=1: Always enable it.
994        ///     - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N.
995        ///     This threshold preserves I/O parallelism when file partitioning is below it.
996        ///
997        /// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
998        /// partitions is less than the target_partitions.
999        pub preserve_file_partitions: usize, default = 0
1000
1001        /// Should DataFusion repartition data using the partitions keys to execute window
1002        /// functions in parallel using the provided `target_partitions` level
1003        pub repartition_windows: bool, default = true
1004
1005        /// Should DataFusion execute sorts in a per-partition fashion and merge
1006        /// afterwards instead of coalescing first and sorting globally.
1007        /// With this flag is enabled, plans in the form below
1008        ///
1009        /// ```text
1010        ///      "SortExec: [a@0 ASC]",
1011        ///      "  CoalescePartitionsExec",
1012        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1013        /// ```
1014        /// would turn into the plan below which performs better in multithreaded environments
1015        ///
1016        /// ```text
1017        ///      "SortPreservingMergeExec: [a@0 ASC]",
1018        ///      "  SortExec: [a@0 ASC]",
1019        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1020        /// ```
1021        pub repartition_sorts: bool, default = true
1022
1023        /// Partition count threshold for subset satisfaction optimization.
1024        ///
1025        /// When the current partition count is >= this threshold, DataFusion will
1026        /// skip repartitioning if the required partitioning expression is a subset
1027        /// of the current partition expression such as Hash(a) satisfies Hash(a, b).
1028        ///
1029        /// When the current partition count is < this threshold, DataFusion will
1030        /// repartition to increase parallelism even when subset satisfaction applies.
1031        ///
1032        /// Set to 0 to always repartition (disable subset satisfaction optimization).
1033        /// Set to a high value to always use subset satisfaction.
1034        ///
1035        /// Example (subset_repartition_threshold = 4):
1036        /// ```text
1037        ///     Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a])
1038        ///
1039        ///     If current partitions (3) < threshold (4), repartition:
1040        ///     AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
1041        ///       RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3
1042        ///         AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
1043        ///           DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3)
1044        ///
1045        ///     If current partitions (8) >= threshold (4), use subset satisfaction:
1046        ///     AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)]
1047        ///       DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8)
1048        /// ```
1049        pub subset_repartition_threshold: usize, default = 4
1050
1051        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
1052        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
1053        /// using `SortPreservingMergeExec`)
1054        ///
1055        /// When false, DataFusion will maximize plan parallelism using
1056        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
1057        pub prefer_existing_sort: bool, default = false
1058
1059        /// When set to true, the logical plan optimizer will produce warning
1060        /// messages if any optimization rules produce errors and then proceed to the next
1061        /// rule. When set to false, any rules that produce errors will cause the query to fail
1062        pub skip_failed_rules: bool, default = false
1063
1064        /// Number of times that the optimizer will attempt to optimize the plan
1065        pub max_passes: usize, default = 3
1066
1067        /// When set to true, the physical plan optimizer will run a top down
1068        /// process to reorder the join keys
1069        pub top_down_join_key_reordering: bool, default = true
1070
1071        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
1072        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
1073        pub prefer_hash_join: bool, default = true
1074
1075        /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
1076        /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
1077        /// one range filter.
1078        pub enable_piecewise_merge_join: bool, default = false
1079
1080        /// The maximum estimated size in bytes for one input side of a HashJoin
1081        /// will be collected into a single partition
1082        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
1083
1084        /// The maximum estimated size in rows for one input side of a HashJoin
1085        /// will be collected into a single partition
1086        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
1087
1088        /// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1089        /// Build sides larger than this will use hash table lookups instead.
1090        /// Set to 0 to always use hash table lookups.
1091        ///
1092        /// InList pushdown can be more efficient for small build sides because it can result in better
1093        /// statistics pruning as well as use any bloom filters present on the scan side.
1094        /// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
1095        /// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
1096        ///
1097        /// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
1098        ///
1099        /// The default is 128kB per partition.
1100        /// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
1101        /// but avoids excessive memory usage or overhead for larger joins.
1102        pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
1103
1104        /// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1105        /// Build sides with more rows than this will use hash table lookups instead.
1106        /// Set to 0 to always use hash table lookups.
1107        ///
1108        /// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
1109        /// very large IN lists that might not provide much benefit over hash table lookups.
1110        ///
1111        /// This uses the deduplicated row count once the build side has been evaluated.
1112        ///
1113        /// The default is 150 values per partition.
1114        /// This is inspired by Trino's `max-filter-keys-per-column` setting.
1115        /// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1116        pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1117
1118        /// The default filter selectivity used by Filter Statistics
1119        /// when an exact selectivity cannot be determined. Valid values are
1120        /// between 0 (no selectivity) and 100 (all rows are selected).
1121        pub default_filter_selectivity: u8, default = 20
1122
1123        /// When set to true, the optimizer will not attempt to convert Union to Interleave
1124        pub prefer_existing_union: bool, default = false
1125
1126        /// When set to true, if the returned type is a view type
1127        /// then the output will be coerced to a non-view.
1128        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
1129        pub expand_views_at_output: bool, default = false
1130
1131        /// Enable sort pushdown optimization.
1132        /// When enabled, attempts to push sort requirements down to data sources
1133        /// that can natively handle them (e.g., by reversing file/row group read order).
1134        ///
1135        /// Returns **inexact ordering**: Sort operator is kept for correctness,
1136        /// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N),
1137        /// providing significant speedup.
1138        ///
1139        /// Memory: No additional overhead (only changes read order).
1140        ///
1141        /// Future: Will add option to detect perfectly sorted data and eliminate Sort completely.
1142        ///
1143        /// Default: true
1144        pub enable_sort_pushdown: bool, default = true
1145
1146        /// When set to true, the optimizer will extract leaf expressions
1147        /// (such as `get_field`) from filter/sort/join nodes into projections
1148        /// closer to the leaf table scans, and push those projections down
1149        /// towards the leaf nodes.
1150        pub enable_leaf_expression_pushdown: bool, default = true
1151    }
1152}
1153
1154config_namespace! {
1155    /// Options controlling explain output
1156    ///
1157    /// See also: [`SessionConfig`]
1158    ///
1159    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
1160    pub struct ExplainOptions {
1161        /// When set to true, the explain statement will only print logical plans
1162        pub logical_plan_only: bool, default = false
1163
1164        /// When set to true, the explain statement will only print physical plans
1165        pub physical_plan_only: bool, default = false
1166
1167        /// When set to true, the explain statement will print operator statistics
1168        /// for physical plans
1169        pub show_statistics: bool, default = false
1170
1171        /// When set to true, the explain statement will print the partition sizes
1172        pub show_sizes: bool, default = true
1173
1174        /// When set to true, the explain statement will print schema information
1175        pub show_schema: bool, default = false
1176
1177        /// Display format of explain. Default is "indent".
1178        /// When set to "tree", it will print the plan in a tree-rendered format.
1179        pub format: ExplainFormat, default = ExplainFormat::Indent
1180
1181        /// (format=tree only) Maximum total width of the rendered tree.
1182        /// When set to 0, the tree will have no width limit.
1183        pub tree_maximum_render_width: usize, default = 240
1184
1185        /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1186        /// "summary" shows common metrics for high-level insights.
1187        /// "dev" provides deep operator-level introspection for developers.
1188        pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1189    }
1190}
1191
1192impl ExecutionOptions {
1193    /// Returns the correct parallelism based on the provided `value`.
1194    /// If `value` is `"0"`, returns the default available parallelism, computed with
1195    /// `get_available_parallelism`. Otherwise, returns `value`.
1196    fn normalized_parallelism(value: &str) -> String {
1197        if value.parse::<usize>() == Ok(0) {
1198            get_available_parallelism().to_string()
1199        } else {
1200            value.to_owned()
1201        }
1202    }
1203}
1204
1205config_namespace! {
1206    /// Options controlling the format of output when printing record batches
1207    /// Copies [`arrow::util::display::FormatOptions`]
1208    pub struct FormatOptions {
1209        /// If set to `true` any formatting errors will be written to the output
1210        /// instead of being converted into a [`std::fmt::Error`]
1211        pub safe: bool, default = true
1212        /// Format string for nulls
1213        pub null: String, default = "".into()
1214        /// Date format for date arrays
1215        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1216        /// Format for DateTime arrays
1217        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1218        /// Timestamp format for timestamp arrays
1219        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1220        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1221        pub timestamp_tz_format: Option<String>, default = None
1222        /// Time format for time arrays
1223        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1224        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1225        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1226        /// Show types in visual representation batches
1227        pub types_info: bool, default = false
1228    }
1229}
1230
1231impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
1232    type Error = DataFusionError;
1233    fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
1234        let duration_format = match self.duration_format.as_str() {
1235            "pretty" => arrow::util::display::DurationFormat::Pretty,
1236            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1237            _ => {
1238                return _config_err!(
1239                    "Invalid duration format: {}. Valid values are pretty or iso8601",
1240                    self.duration_format
1241                );
1242            }
1243        };
1244
1245        Ok(arrow::util::display::FormatOptions::new()
1246            .with_display_error(self.safe)
1247            .with_null(&self.null)
1248            .with_date_format(self.date_format.as_deref())
1249            .with_datetime_format(self.datetime_format.as_deref())
1250            .with_timestamp_format(self.timestamp_format.as_deref())
1251            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
1252            .with_time_format(self.time_format.as_deref())
1253            .with_duration_format(duration_format)
1254            .with_types_info(self.types_info))
1255    }
1256}
1257
1258/// A key value pair, with a corresponding description
1259#[derive(Debug, Clone, Hash, PartialEq, Eq)]
1260pub struct ConfigEntry {
1261    /// A unique string to identify this config value
1262    pub key: String,
1263
1264    /// The value if any
1265    pub value: Option<String>,
1266
1267    /// A description of this configuration entry
1268    pub description: &'static str,
1269}
1270
1271/// Configuration options struct, able to store both built-in configuration and custom options
1272#[derive(Debug, Clone, Default)]
1273#[non_exhaustive]
1274pub struct ConfigOptions {
1275    /// Catalog options
1276    pub catalog: CatalogOptions,
1277    /// Execution options
1278    pub execution: ExecutionOptions,
1279    /// Optimizer options
1280    pub optimizer: OptimizerOptions,
1281    /// SQL parser options
1282    pub sql_parser: SqlParserOptions,
1283    /// Explain options
1284    pub explain: ExplainOptions,
1285    /// Optional extensions registered using [`Extensions::insert`]
1286    pub extensions: Extensions,
1287    /// Formatting options when printing batches
1288    pub format: FormatOptions,
1289}
1290
1291impl ConfigField for ConfigOptions {
1292    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1293        self.catalog.visit(v, "datafusion.catalog", "");
1294        self.execution.visit(v, "datafusion.execution", "");
1295        self.optimizer.visit(v, "datafusion.optimizer", "");
1296        self.explain.visit(v, "datafusion.explain", "");
1297        self.sql_parser.visit(v, "datafusion.sql_parser", "");
1298        self.format.visit(v, "datafusion.format", "");
1299    }
1300
1301    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1302        // Extensions are handled in the public `ConfigOptions::set`
1303        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1304        match key {
1305            "catalog" => self.catalog.set(rem, value),
1306            "execution" => self.execution.set(rem, value),
1307            "optimizer" => self.optimizer.set(rem, value),
1308            "explain" => self.explain.set(rem, value),
1309            "sql_parser" => self.sql_parser.set(rem, value),
1310            "format" => self.format.set(rem, value),
1311            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1312        }
1313    }
1314
1315    /// Reset a configuration option back to its default value
1316    fn reset(&mut self, key: &str) -> Result<()> {
1317        let Some((prefix, rest)) = key.split_once('.') else {
1318            return _config_err!("could not find config namespace for key \"{key}\"");
1319        };
1320
1321        if prefix != "datafusion" {
1322            return _config_err!("Could not find config namespace \"{prefix}\"");
1323        }
1324
1325        let (section, rem) = rest.split_once('.').unwrap_or((rest, ""));
1326        if rem.is_empty() {
1327            return _config_err!("could not find config field for key \"{key}\"");
1328        }
1329
1330        match section {
1331            "catalog" => self.catalog.reset(rem),
1332            "execution" => self.execution.reset(rem),
1333            "optimizer" => {
1334                if rem == "enable_dynamic_filter_pushdown" {
1335                    let defaults = OptimizerOptions::default();
1336                    self.optimizer.enable_dynamic_filter_pushdown =
1337                        defaults.enable_dynamic_filter_pushdown;
1338                    self.optimizer.enable_topk_dynamic_filter_pushdown =
1339                        defaults.enable_topk_dynamic_filter_pushdown;
1340                    self.optimizer.enable_join_dynamic_filter_pushdown =
1341                        defaults.enable_join_dynamic_filter_pushdown;
1342                    Ok(())
1343                } else {
1344                    self.optimizer.reset(rem)
1345                }
1346            }
1347            "explain" => self.explain.reset(rem),
1348            "sql_parser" => self.sql_parser.reset(rem),
1349            "format" => self.format.reset(rem),
1350            other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
1351        }
1352    }
1353}
1354
1355/// This namespace is reserved for interacting with Foreign Function Interface
1356/// (FFI) based configuration extensions.
1357pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi";
1358
1359impl ConfigOptions {
1360    /// Creates a new [`ConfigOptions`] with default values
1361    pub fn new() -> Self {
1362        Self::default()
1363    }
1364
1365    /// Set extensions to provided value
1366    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1367        self.extensions = extensions;
1368        self
1369    }
1370
1371    /// Set a configuration option
1372    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1373        let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
1374            return _config_err!("could not find config namespace for key \"{key}\"");
1375        };
1376
1377        if prefix == "datafusion" {
1378            if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
1379                let bool_value = value.parse::<bool>().map_err(|e| {
1380                    DataFusionError::Configuration(format!(
1381                        "Failed to parse '{value}' as bool: {e}",
1382                    ))
1383                })?;
1384
1385                {
1386                    self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1387                    self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1388                    self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1389                    self.optimizer.enable_aggregate_dynamic_filter_pushdown = bool_value;
1390                }
1391                return Ok(());
1392            }
1393            return ConfigField::set(self, inner_key, value);
1394        }
1395
1396        if !self.extensions.0.contains_key(prefix)
1397            && self
1398                .extensions
1399                .0
1400                .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
1401        {
1402            inner_key = key;
1403            prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
1404        }
1405
1406        let Some(e) = self.extensions.0.get_mut(prefix) else {
1407            return _config_err!("Could not find config namespace \"{prefix}\"");
1408        };
1409        e.0.set(inner_key, value)
1410    }
1411
1412    /// Create new [`ConfigOptions`], taking values from environment variables
1413    /// where possible.
1414    ///
1415    /// For example, to configure `datafusion.execution.batch_size`
1416    /// ([`ExecutionOptions::batch_size`]) you would set the
1417    /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1418    ///
1419    /// The name of the environment variable is the option's key, transformed to
1420    /// uppercase and with periods replaced with underscores.
1421    ///
1422    /// Values are parsed according to the [same rules used in casts from
1423    /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1424    ///
1425    /// If the value in the environment variable cannot be cast to the type of
1426    /// the configuration option, the default value will be used instead and a
1427    /// warning emitted. Environment variables are read when this method is
1428    /// called, and are not re-read later.
1429    pub fn from_env() -> Result<Self> {
1430        struct Visitor(Vec<String>);
1431
1432        impl Visit for Visitor {
1433            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1434                self.0.push(key.to_string())
1435            }
1436
1437            fn none(&mut self, key: &str, _: &'static str) {
1438                self.0.push(key.to_string())
1439            }
1440        }
1441
1442        // Extract the names of all fields and then look up the corresponding
1443        // environment variables. This isn't hugely efficient but avoids
1444        // ambiguity between `a.b` and `a_b` which would both correspond
1445        // to an environment variable of `A_B`
1446
1447        let mut keys = Visitor(vec![]);
1448        let mut ret = Self::default();
1449        ret.visit(&mut keys, "datafusion", "");
1450
1451        for key in keys.0 {
1452            let env = key.to_uppercase().replace('.', "_");
1453            if let Some(var) = std::env::var_os(env) {
1454                let value = var.to_string_lossy();
1455                log::info!("Set {key} to {value} from the environment variable");
1456                ret.set(&key, value.as_ref())?;
1457            }
1458        }
1459
1460        Ok(ret)
1461    }
1462
1463    /// Create new ConfigOptions struct, taking values from a string hash map.
1464    ///
1465    /// Only the built-in configurations will be extracted from the hash map
1466    /// and other key value pairs will be ignored.
1467    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1468        struct Visitor(Vec<String>);
1469
1470        impl Visit for Visitor {
1471            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1472                self.0.push(key.to_string())
1473            }
1474
1475            fn none(&mut self, key: &str, _: &'static str) {
1476                self.0.push(key.to_string())
1477            }
1478        }
1479
1480        let mut keys = Visitor(vec![]);
1481        let mut ret = Self::default();
1482        ret.visit(&mut keys, "datafusion", "");
1483
1484        for key in keys.0 {
1485            if let Some(var) = settings.get(&key) {
1486                ret.set(&key, var)?;
1487            }
1488        }
1489
1490        Ok(ret)
1491    }
1492
1493    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1494    pub fn entries(&self) -> Vec<ConfigEntry> {
1495        struct Visitor(Vec<ConfigEntry>);
1496
1497        impl Visit for Visitor {
1498            fn some<V: Display>(
1499                &mut self,
1500                key: &str,
1501                value: V,
1502                description: &'static str,
1503            ) {
1504                self.0.push(ConfigEntry {
1505                    key: key.to_string(),
1506                    value: Some(value.to_string()),
1507                    description,
1508                })
1509            }
1510
1511            fn none(&mut self, key: &str, description: &'static str) {
1512                self.0.push(ConfigEntry {
1513                    key: key.to_string(),
1514                    value: None,
1515                    description,
1516                })
1517            }
1518        }
1519
1520        let mut v = Visitor(vec![]);
1521        self.visit(&mut v, "datafusion", "");
1522
1523        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1524        v.0
1525    }
1526
1527    /// Generate documentation that can be included in the user guide
1528    pub fn generate_config_markdown() -> String {
1529        use std::fmt::Write as _;
1530
1531        let mut s = Self::default();
1532
1533        // Normalize for display
1534        s.execution.target_partitions = 0;
1535        s.execution.planning_concurrency = 0;
1536
1537        let mut docs = "| key | default | description |\n".to_string();
1538        docs += "|-----|---------|-------------|\n";
1539        let mut entries = s.entries();
1540        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1541
1542        for entry in s.entries() {
1543            let _ = writeln!(
1544                &mut docs,
1545                "| {} | {} | {} |",
1546                entry.key,
1547                entry.value.as_deref().unwrap_or("NULL"),
1548                entry.description
1549            );
1550        }
1551        docs
1552    }
1553}
1554
1555/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1556/// within DataFusion [`ConfigOptions`]
1557///
1558/// This mechanism can be used to pass configuration to user defined functions
1559/// or optimizer passes
1560///
1561/// # Example
1562/// ```
1563/// use datafusion_common::{
1564///     config::ConfigExtension, config::ConfigOptions, extensions_options,
1565/// };
1566/// // Define a new configuration struct using the `extensions_options` macro
1567/// extensions_options! {
1568///    /// My own config options.
1569///    pub struct MyConfig {
1570///        /// Should "foo" be replaced by "bar"?
1571///        pub foo_to_bar: bool, default = true
1572///
1573///        /// How many "baz" should be created?
1574///        pub baz_count: usize, default = 1337
1575///    }
1576/// }
1577///
1578/// impl ConfigExtension for MyConfig {
1579///     const PREFIX: &'static str = "my_config";
1580/// }
1581///
1582/// // set up config struct and register extension
1583/// let mut config = ConfigOptions::default();
1584/// config.extensions.insert(MyConfig::default());
1585///
1586/// // overwrite config default
1587/// config.set("my_config.baz_count", "42").unwrap();
1588///
1589/// // check config state
1590/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1591/// assert!(my_config.foo_to_bar,);
1592/// assert_eq!(my_config.baz_count, 42,);
1593/// ```
1594///
1595/// # Note:
1596/// Unfortunately associated constants are not currently object-safe, and so this
1597/// extends the object-safe [`ExtensionOptions`]
1598pub trait ConfigExtension: ExtensionOptions {
1599    /// Configuration namespace prefix to use
1600    ///
1601    /// All values under this will be prefixed with `$PREFIX + "."`
1602    const PREFIX: &'static str;
1603}
1604
1605/// An object-safe API for storing arbitrary configuration.
1606///
1607/// See [`ConfigExtension`] for user defined configuration
1608pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1609    /// Return `self` as [`Any`]
1610    ///
1611    /// This is needed until trait upcasting is stabilized
1612    fn as_any(&self) -> &dyn Any;
1613
1614    /// Return `self` as [`Any`]
1615    ///
1616    /// This is needed until trait upcasting is stabilized
1617    fn as_any_mut(&mut self) -> &mut dyn Any;
1618
1619    /// Return a deep clone of this [`ExtensionOptions`]
1620    ///
1621    /// It is important this does not share mutable state to avoid consistency issues
1622    /// with configuration changing whilst queries are executing
1623    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1624
1625    /// Set the given `key`, `value` pair
1626    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1627
1628    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1629    fn entries(&self) -> Vec<ConfigEntry>;
1630}
1631
1632/// A type-safe container for [`ConfigExtension`]
1633#[derive(Debug, Default, Clone)]
1634pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1635
1636impl Extensions {
1637    /// Create a new, empty [`Extensions`]
1638    pub fn new() -> Self {
1639        Self(BTreeMap::new())
1640    }
1641
1642    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1643    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1644        assert_ne!(T::PREFIX, "datafusion");
1645        let e = ExtensionBox(Box::new(extension));
1646        self.0.insert(T::PREFIX, e);
1647    }
1648
1649    /// Retrieves the extension of the given type if any
1650    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1651        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1652    }
1653
1654    /// Retrieves the extension of the given type if any
1655    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1656        let e = self.0.get_mut(T::PREFIX)?;
1657        e.0.as_any_mut().downcast_mut()
1658    }
1659
1660    /// Iterates all the config extension entries yielding their prefix and their
1661    /// [ExtensionOptions] implementation.
1662    pub fn iter(
1663        &self,
1664    ) -> impl Iterator<Item = (&'static str, &Box<dyn ExtensionOptions>)> {
1665        self.0.iter().map(|(k, v)| (*k, &v.0))
1666    }
1667}
1668
1669#[derive(Debug)]
1670struct ExtensionBox(Box<dyn ExtensionOptions>);
1671
1672impl Clone for ExtensionBox {
1673    fn clone(&self) -> Self {
1674        Self(self.0.cloned())
1675    }
1676}
1677
1678/// A trait implemented by `config_namespace` and for field types that provides
1679/// the ability to walk and mutate the configuration tree
1680pub trait ConfigField {
1681    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1682
1683    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1684
1685    fn reset(&mut self, key: &str) -> Result<()> {
1686        _config_err!("Reset is not supported for this config field, key: {}", key)
1687    }
1688}
1689
1690impl<F: ConfigField + Default> ConfigField for Option<F> {
1691    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1692        match self {
1693            Some(s) => s.visit(v, key, description),
1694            None => v.none(key, description),
1695        }
1696    }
1697
1698    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1699        self.get_or_insert_with(Default::default).set(key, value)
1700    }
1701
1702    fn reset(&mut self, key: &str) -> Result<()> {
1703        if key.is_empty() {
1704            *self = Default::default();
1705            Ok(())
1706        } else {
1707            self.get_or_insert_with(Default::default).reset(key)
1708        }
1709    }
1710}
1711
1712/// Default transformation to parse a [`ConfigField`] for a string.
1713///
1714/// This uses [`FromStr`] to parse the data.
1715pub fn default_config_transform<T>(input: &str) -> Result<T>
1716where
1717    T: FromStr,
1718    <T as FromStr>::Err: Sync + Send + Error + 'static,
1719{
1720    input.parse().map_err(|e| {
1721        DataFusionError::Context(
1722            format!(
1723                "Error parsing '{}' as {}",
1724                input,
1725                std::any::type_name::<T>()
1726            ),
1727            Box::new(DataFusionError::External(Box::new(e))),
1728        )
1729    })
1730}
1731
1732/// Macro that generates [`ConfigField`] for a given type.
1733///
1734/// # Usage
1735/// This always requires [`Display`] to be implemented for the given type.
1736///
1737/// There are two ways to invoke this macro. The first one uses
1738/// [`default_config_transform`]/[`FromStr`] to parse the data:
1739///
1740/// ```ignore
1741/// config_field(MyType);
1742/// ```
1743///
1744/// Note that the parsing error MUST implement [`std::error::Error`]!
1745///
1746/// Or you can specify how you want to parse an [`str`] into the type:
1747///
1748/// ```ignore
1749/// fn parse_it(s: &str) -> Result<MyType> {
1750///     ...
1751/// }
1752///
1753/// config_field(
1754///     MyType,
1755///     value => parse_it(value)
1756/// );
1757/// ```
1758#[macro_export]
1759macro_rules! config_field {
1760    ($t:ty) => {
1761        config_field!($t, value => $crate::config::default_config_transform(value)?);
1762    };
1763
1764    ($t:ty, $arg:ident => $transform:expr) => {
1765        impl $crate::config::ConfigField for $t {
1766            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1767                v.some(key, self, description)
1768            }
1769
1770            fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1771                *self = $transform;
1772                Ok(())
1773            }
1774
1775            fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
1776                if key.is_empty() {
1777                    *self = <$t as Default>::default();
1778                    Ok(())
1779                } else {
1780                    $crate::error::_config_err!(
1781                        "Config field is a scalar {} and does not have nested field \"{}\"",
1782                        stringify!($t),
1783                        key
1784                    )
1785                }
1786            }
1787        }
1788    };
1789}
1790
1791config_field!(String);
1792config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1793config_field!(usize);
1794config_field!(f64);
1795config_field!(u64);
1796config_field!(u32);
1797
1798impl ConfigField for u8 {
1799    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1800        v.some(key, self, description)
1801    }
1802
1803    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1804        if value.is_empty() {
1805            return Err(DataFusionError::Configuration(format!(
1806                "Input string for {key} key is empty"
1807            )));
1808        }
1809        // Check if the string is a valid number
1810        if let Ok(num) = value.parse::<u8>() {
1811            // TODO: Let's decide how we treat the numerical strings.
1812            *self = num;
1813        } else {
1814            let bytes = value.as_bytes();
1815            // Check if the first character is ASCII (single byte)
1816            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1817                return Err(DataFusionError::Configuration(format!(
1818                    "Error parsing {value} as u8. Non-ASCII string provided"
1819                )));
1820            }
1821            *self = bytes[0];
1822        }
1823        Ok(())
1824    }
1825}
1826
1827impl ConfigField for CompressionTypeVariant {
1828    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1829        v.some(key, self, description)
1830    }
1831
1832    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1833        *self = CompressionTypeVariant::from_str(value)?;
1834        Ok(())
1835    }
1836}
1837
1838/// An implementation trait used to recursively walk configuration
1839pub trait Visit {
1840    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1841
1842    fn none(&mut self, key: &str, description: &'static str);
1843}
1844
1845/// Convenience macro to create [`ExtensionsOptions`].
1846///
1847/// The created structure implements the following traits:
1848///
1849/// - [`Clone`]
1850/// - [`Debug`]
1851/// - [`Default`]
1852/// - [`ExtensionOptions`]
1853///
1854/// # Usage
1855/// The syntax is:
1856///
1857/// ```text
1858/// extensions_options! {
1859///      /// Struct docs (optional).
1860///     [<vis>] struct <StructName> {
1861///         /// Field docs (optional)
1862///         [<vis>] <field_name>: <field_type>, default = <default_value>
1863///
1864///         ... more fields
1865///     }
1866/// }
1867/// ```
1868///
1869/// The placeholders are:
1870/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1871/// - `<StructName>`: Struct name like `MyStruct`.
1872/// - `<field_name>`: Field name like `my_field`.
1873/// - `<field_type>`: Field type like `u8`.
1874/// - `<default_value>`: Default value matching the field type like `42`.
1875///
1876/// # Example
1877/// See also a full example on the [`ConfigExtension`] documentation
1878///
1879/// ```
1880/// use datafusion_common::extensions_options;
1881///
1882/// extensions_options! {
1883///     /// My own config options.
1884///     pub struct MyConfig {
1885///         /// Should "foo" be replaced by "bar"?
1886///         pub foo_to_bar: bool, default = true
1887///
1888///         /// How many "baz" should be created?
1889///         pub baz_count: usize, default = 1337
1890///     }
1891/// }
1892/// ```
1893///
1894///
1895/// [`Debug`]: std::fmt::Debug
1896/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1897#[macro_export]
1898macro_rules! extensions_options {
1899    (
1900     $(#[doc = $struct_d:tt])*
1901     $vis:vis struct $struct_name:ident {
1902        $(
1903        $(#[doc = $d:tt])*
1904        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1905        )*$(,)*
1906    }
1907    ) => {
1908        $(#[doc = $struct_d])*
1909        #[derive(Debug, Clone)]
1910        #[non_exhaustive]
1911        $vis struct $struct_name{
1912            $(
1913            $(#[doc = $d])*
1914            $field_vis $field_name : $field_type,
1915            )*
1916        }
1917
1918        impl Default for $struct_name {
1919            fn default() -> Self {
1920                Self {
1921                    $($field_name: $default),*
1922                }
1923            }
1924        }
1925
1926        impl $crate::config::ExtensionOptions for $struct_name {
1927            fn as_any(&self) -> &dyn ::std::any::Any {
1928                self
1929            }
1930
1931            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1932                self
1933            }
1934
1935            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1936                Box::new(self.clone())
1937            }
1938
1939            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1940                $crate::config::ConfigField::set(self, key, value)
1941            }
1942
1943            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1944                struct Visitor(Vec<$crate::config::ConfigEntry>);
1945
1946                impl $crate::config::Visit for Visitor {
1947                    fn some<V: std::fmt::Display>(
1948                        &mut self,
1949                        key: &str,
1950                        value: V,
1951                        description: &'static str,
1952                    ) {
1953                        self.0.push($crate::config::ConfigEntry {
1954                            key: key.to_string(),
1955                            value: Some(value.to_string()),
1956                            description,
1957                        })
1958                    }
1959
1960                    fn none(&mut self, key: &str, description: &'static str) {
1961                        self.0.push($crate::config::ConfigEntry {
1962                            key: key.to_string(),
1963                            value: None,
1964                            description,
1965                        })
1966                    }
1967                }
1968
1969                let mut v = Visitor(vec![]);
1970                // The prefix is not used for extensions.
1971                // The description is generated in ConfigField::visit.
1972                // We can just pass empty strings here.
1973                $crate::config::ConfigField::visit(self, &mut v, "", "");
1974                v.0
1975            }
1976        }
1977
1978        impl $crate::config::ConfigField for $struct_name {
1979            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1980                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1981                match key {
1982                    $(
1983                        stringify!($field_name) => {
1984                            // Safely apply deprecated attribute if present
1985                            // $(#[allow(deprecated)])?
1986                            {
1987                                            self.$field_name.set(rem, value.as_ref())
1988                            }
1989                        },
1990                    )*
1991                    _ => return $crate::error::_config_err!(
1992                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1993                    )
1994                }
1995            }
1996
1997            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1998                $(
1999                    let key = stringify!($field_name).to_string();
2000                    let desc = concat!($($d),*).trim();
2001                    self.$field_name.visit(v, key.as_str(), desc);
2002                )*
2003            }
2004        }
2005    }
2006}
2007
2008/// These file types have special built in behavior for configuration.
2009/// Use TableOptions::Extensions for configuring other file types.
2010#[derive(Debug, Clone)]
2011pub enum ConfigFileType {
2012    CSV,
2013    #[cfg(feature = "parquet")]
2014    PARQUET,
2015    JSON,
2016}
2017
2018/// Represents the configuration options available for handling different table formats within a data processing application.
2019/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
2020/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
2021#[derive(Debug, Clone, Default)]
2022pub struct TableOptions {
2023    /// Configuration options for CSV file handling. This includes settings like the delimiter,
2024    /// quote character, and whether the first row is considered as headers.
2025    pub csv: CsvOptions,
2026
2027    /// Configuration options for Parquet file handling. This includes settings for compression,
2028    /// encoding, and other Parquet-specific file characteristics.
2029    pub parquet: TableParquetOptions,
2030
2031    /// Configuration options for JSON file handling.
2032    pub json: JsonOptions,
2033
2034    /// The current file format that the table operations should assume. This option allows
2035    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
2036    pub current_format: Option<ConfigFileType>,
2037
2038    /// Optional extensions that can be used to extend or customize the behavior of the table
2039    /// options. Extensions can be registered using `Extensions::insert` and might include
2040    /// custom file handling logic, additional configuration parameters, or other enhancements.
2041    pub extensions: Extensions,
2042}
2043
2044impl ConfigField for TableOptions {
2045    /// Visits configuration settings for the current file format, or all formats if none is selected.
2046    ///
2047    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
2048    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
2049    /// it visits all available format settings.
2050    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
2051        if let Some(file_type) = &self.current_format {
2052            match file_type {
2053                #[cfg(feature = "parquet")]
2054                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
2055                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
2056                ConfigFileType::JSON => self.json.visit(v, "format", ""),
2057            }
2058        } else {
2059            self.csv.visit(v, "csv", "");
2060            self.parquet.visit(v, "parquet", "");
2061            self.json.visit(v, "json", "");
2062        }
2063    }
2064
2065    /// Sets a configuration value for a specific key within `TableOptions`.
2066    ///
2067    /// This method delegates setting configuration values to the specific file format configurations,
2068    /// based on the current format selected. If no format is selected, it returns an error.
2069    ///
2070    /// # Parameters
2071    ///
2072    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
2073    ///   for CSV format.
2074    /// * `value`: The value to set for the specified configuration key.
2075    ///
2076    /// # Returns
2077    ///
2078    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
2079    /// or if setting the configuration value fails for the specific format.
2080    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2081        // Extensions are handled in the public `ConfigOptions::set`
2082        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2083        match key {
2084            "format" => {
2085                let Some(format) = &self.current_format else {
2086                    return _config_err!("Specify a format for TableOptions");
2087                };
2088                match format {
2089                    #[cfg(feature = "parquet")]
2090                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
2091                    ConfigFileType::CSV => self.csv.set(rem, value),
2092                    ConfigFileType::JSON => self.json.set(rem, value),
2093                }
2094            }
2095            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
2096        }
2097    }
2098}
2099
2100impl TableOptions {
2101    /// Constructs a new instance of `TableOptions` with default settings.
2102    ///
2103    /// # Returns
2104    ///
2105    /// A new `TableOptions` instance with default configuration values.
2106    pub fn new() -> Self {
2107        Self::default()
2108    }
2109
2110    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
2111    ///
2112    /// # Parameters
2113    ///
2114    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
2115    ///
2116    /// # Returns
2117    ///
2118    /// A new `TableOptions` instance with settings applied from the session config.
2119    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
2120        let initial = TableOptions::default();
2121        initial.combine_with_session_config(config)
2122    }
2123
2124    /// Updates the current `TableOptions` with settings from a given session config.
2125    ///
2126    /// # Parameters
2127    ///
2128    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
2129    ///
2130    /// # Returns
2131    ///
2132    /// A new `TableOptions` instance with updated settings from the session config.
2133    #[must_use = "this method returns a new instance"]
2134    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
2135        let mut clone = self.clone();
2136        clone.parquet.global = config.execution.parquet.clone();
2137        clone
2138    }
2139
2140    /// Sets the file format for the table.
2141    ///
2142    /// # Parameters
2143    ///
2144    /// * `format`: The file format to use (e.g., CSV, Parquet).
2145    pub fn set_config_format(&mut self, format: ConfigFileType) {
2146        self.current_format = Some(format);
2147    }
2148
2149    /// Sets the extensions for this `TableOptions` instance.
2150    ///
2151    /// # Parameters
2152    ///
2153    /// * `extensions`: The `Extensions` instance to set.
2154    ///
2155    /// # Returns
2156    ///
2157    /// A new `TableOptions` instance with the specified extensions applied.
2158    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2159        self.extensions = extensions;
2160        self
2161    }
2162
2163    /// Sets a specific configuration option.
2164    ///
2165    /// # Parameters
2166    ///
2167    /// * `key`: The configuration key (e.g., "format.delimiter").
2168    /// * `value`: The value to set for the specified key.
2169    ///
2170    /// # Returns
2171    ///
2172    /// A result indicating success or failure in setting the configuration option.
2173    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2174        let Some((mut prefix, _)) = key.split_once('.') else {
2175            return _config_err!("could not find config namespace for key \"{key}\"");
2176        };
2177
2178        if prefix == "format" {
2179            return ConfigField::set(self, key, value);
2180        }
2181
2182        if prefix == "execution" {
2183            return Ok(());
2184        }
2185
2186        if !self.extensions.0.contains_key(prefix)
2187            && self
2188                .extensions
2189                .0
2190                .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
2191        {
2192            prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
2193        }
2194
2195        let Some(e) = self.extensions.0.get_mut(prefix) else {
2196            return _config_err!("Could not find config namespace \"{prefix}\"");
2197        };
2198        e.0.set(key, value)
2199    }
2200
2201    /// Initializes a new `TableOptions` from a hash map of string settings.
2202    ///
2203    /// # Parameters
2204    ///
2205    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2206    ///
2207    /// # Returns
2208    ///
2209    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
2210    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
2211        let mut ret = Self::default();
2212        for (k, v) in settings {
2213            ret.set(k, v)?;
2214        }
2215
2216        Ok(ret)
2217    }
2218
2219    /// Modifies the current `TableOptions` instance with settings from a hash map.
2220    ///
2221    /// # Parameters
2222    ///
2223    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2224    ///
2225    /// # Returns
2226    ///
2227    /// A result indicating success or failure in applying the settings.
2228    pub fn alter_with_string_hash_map(
2229        &mut self,
2230        settings: &HashMap<String, String>,
2231    ) -> Result<()> {
2232        for (k, v) in settings {
2233            self.set(k, v)?;
2234        }
2235        Ok(())
2236    }
2237
2238    /// Retrieves all configuration entries from this `TableOptions`.
2239    ///
2240    /// # Returns
2241    ///
2242    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
2243    pub fn entries(&self) -> Vec<ConfigEntry> {
2244        struct Visitor(Vec<ConfigEntry>);
2245
2246        impl Visit for Visitor {
2247            fn some<V: Display>(
2248                &mut self,
2249                key: &str,
2250                value: V,
2251                description: &'static str,
2252            ) {
2253                self.0.push(ConfigEntry {
2254                    key: key.to_string(),
2255                    value: Some(value.to_string()),
2256                    description,
2257                })
2258            }
2259
2260            fn none(&mut self, key: &str, description: &'static str) {
2261                self.0.push(ConfigEntry {
2262                    key: key.to_string(),
2263                    value: None,
2264                    description,
2265                })
2266            }
2267        }
2268
2269        let mut v = Visitor(vec![]);
2270        self.visit(&mut v, "format", "");
2271
2272        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
2273        v.0
2274    }
2275}
2276
2277/// Options that control how Parquet files are read, including global options
2278/// that apply to all columns and optional column-specific overrides
2279///
2280/// Closely tied to `ParquetWriterOptions` (see `crate::file_options::parquet_writer::ParquetWriterOptions` when the "parquet" feature is enabled).
2281/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2282/// (e.g. sorting_columns).
2283#[derive(Clone, Default, Debug, PartialEq)]
2284pub struct TableParquetOptions {
2285    /// Global Parquet options that propagates to all columns.
2286    pub global: ParquetOptions,
2287    /// Column specific options. Default usage is parquet.XX::column.
2288    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2289    /// Additional file-level metadata to include. Inserted into the key_value_metadata
2290    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2291    ///
2292    /// Multiple entries are permitted
2293    /// ```sql
2294    /// OPTIONS (
2295    ///    'format.metadata::key1' '',
2296    ///    'format.metadata::key2' 'value',
2297    ///    'format.metadata::key3' 'value has spaces',
2298    ///    'format.metadata::key4' 'value has special chars :: :',
2299    ///    'format.metadata::key_dupe' 'original will be overwritten',
2300    ///    'format.metadata::key_dupe' 'final'
2301    /// )
2302    /// ```
2303    pub key_value_metadata: HashMap<String, Option<String>>,
2304    /// Options for configuring Parquet modular encryption
2305    ///
2306    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2307    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2308    /// These can be set via 'format.crypto', for example:
2309    /// ```sql
2310    /// OPTIONS (
2311    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
2312    ///    'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435',  -- b"0123456789012345" */
2313    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2314    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2315    ///     -- Same for decryption
2316    ///    'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2317    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2318    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2319    /// )
2320    /// ```
2321    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2322    /// Note that keys must be provided as in hex format since these are binary strings.
2323    pub crypto: ParquetEncryptionOptions,
2324}
2325
2326impl TableParquetOptions {
2327    /// Return new default TableParquetOptions
2328    pub fn new() -> Self {
2329        Self::default()
2330    }
2331
2332    /// Set whether the encoding of the arrow metadata should occur
2333    /// during the writing of parquet.
2334    ///
2335    /// Default is to encode the arrow schema in the file kv_metadata.
2336    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2337        Self {
2338            global: ParquetOptions {
2339                skip_arrow_metadata: skip,
2340                ..self.global
2341            },
2342            ..self
2343        }
2344    }
2345
2346    /// Retrieves all configuration entries from this `TableParquetOptions`.
2347    ///
2348    /// # Returns
2349    ///
2350    /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2351    pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2352        struct Visitor(Vec<ConfigEntry>);
2353
2354        impl Visit for Visitor {
2355            fn some<V: Display>(
2356                &mut self,
2357                key: &str,
2358                value: V,
2359                description: &'static str,
2360            ) {
2361                self.0.push(ConfigEntry {
2362                    key: key[1..].to_string(),
2363                    value: Some(value.to_string()),
2364                    description,
2365                })
2366            }
2367
2368            fn none(&mut self, key: &str, description: &'static str) {
2369                self.0.push(ConfigEntry {
2370                    key: key[1..].to_string(),
2371                    value: None,
2372                    description,
2373                })
2374            }
2375        }
2376
2377        let mut v = Visitor(vec![]);
2378        self.visit(&mut v, "", "");
2379
2380        v.0
2381    }
2382}
2383
2384impl ConfigField for TableParquetOptions {
2385    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2386        self.global.visit(v, key_prefix, description);
2387        self.column_specific_options
2388            .visit(v, key_prefix, description);
2389        self.crypto
2390            .visit(v, &format!("{key_prefix}.crypto"), description);
2391    }
2392
2393    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2394        // Determine if the key is a global, metadata, or column-specific setting
2395        if key.starts_with("metadata::") {
2396            let k = match key.split("::").collect::<Vec<_>>()[..] {
2397                [_meta] | [_meta, ""] => {
2398                    return _config_err!(
2399                        "Invalid metadata key provided, missing key in metadata::<key>"
2400                    );
2401                }
2402                [_meta, k] => k.into(),
2403                _ => {
2404                    return _config_err!(
2405                        "Invalid metadata key provided, found too many '::' in \"{key}\""
2406                    );
2407                }
2408            };
2409            self.key_value_metadata.insert(k, Some(value.into()));
2410            Ok(())
2411        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2412            self.crypto.set(crypto_feature, value)
2413        } else if key.contains("::") {
2414            self.column_specific_options.set(key, value)
2415        } else {
2416            self.global.set(key, value)
2417        }
2418    }
2419}
2420
2421macro_rules! config_namespace_with_hashmap {
2422    (
2423     $(#[doc = $struct_d:tt])*
2424     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
2425     $vis:vis struct $struct_name:ident {
2426        $(
2427        $(#[doc = $d:tt])*
2428        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2429        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2430        )*$(,)*
2431    }
2432    ) => {
2433
2434        $(#[doc = $struct_d])*
2435        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
2436        #[derive(Debug, Clone, PartialEq)]
2437        $vis struct $struct_name{
2438            $(
2439            $(#[doc = $d])*
2440            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2441            $field_vis $field_name : $field_type,
2442            )*
2443        }
2444
2445        impl ConfigField for $struct_name {
2446            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2447                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2448                match key {
2449                    $(
2450                       stringify!($field_name) => {
2451                           // Handle deprecated fields
2452                           $(let value = $transform(value);)?
2453                           self.$field_name.set(rem, value.as_ref())
2454                       },
2455                    )*
2456                    _ => _config_err!(
2457                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2458                    )
2459                }
2460            }
2461
2462            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2463                $(
2464                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2465                let desc = concat!($($d),*).trim();
2466                // Handle deprecated fields
2467                self.$field_name.visit(v, key.as_str(), desc);
2468                )*
2469            }
2470        }
2471
2472        impl Default for $struct_name {
2473            fn default() -> Self {
2474                Self {
2475                    $($field_name: $default),*
2476                }
2477            }
2478        }
2479
2480        impl ConfigField for HashMap<String,$struct_name> {
2481            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2482                let parts: Vec<&str> = key.splitn(2, "::").collect();
2483                match parts.as_slice() {
2484                    [inner_key, hashmap_key] => {
2485                        // Get or create the struct for the specified key
2486                        let inner_value = self
2487                            .entry((*hashmap_key).to_owned())
2488                            .or_insert_with($struct_name::default);
2489
2490                        inner_value.set(inner_key, value)
2491                    }
2492                    _ => _config_err!("Unrecognized key '{key}'."),
2493                }
2494            }
2495
2496            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2497                for (column_name, col_options) in self {
2498                    $(
2499                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2500                    let desc = concat!($($d),*).trim();
2501                    col_options.$field_name.visit(v, key.as_str(), desc);
2502                    )*
2503                }
2504            }
2505        }
2506    }
2507}
2508
2509config_namespace_with_hashmap! {
2510    /// Options controlling parquet format for individual columns.
2511    ///
2512    /// See [`ParquetOptions`] for more details
2513    pub struct ParquetColumnOptions {
2514        /// Sets if bloom filter is enabled for the column path.
2515        pub bloom_filter_enabled: Option<bool>, default = None
2516
2517        /// Sets encoding for the column path.
2518        /// Valid values are: plain, plain_dictionary, rle,
2519        /// bit_packed, delta_binary_packed, delta_length_byte_array,
2520        /// delta_byte_array, rle_dictionary, and byte_stream_split.
2521        /// These values are not case-sensitive. If NULL, uses
2522        /// default parquet options
2523        pub encoding: Option<String>, default = None
2524
2525        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2526        /// default parquet options
2527        pub dictionary_enabled: Option<bool>, default = None
2528
2529        /// Sets default parquet compression codec for the column path.
2530        /// Valid values are: uncompressed, snappy, gzip(level),
2531        /// brotli(level), lz4, zstd(level), and lz4_raw.
2532        /// These values are not case-sensitive. If NULL, uses
2533        /// default parquet options
2534        pub compression: Option<String>, transform = str::to_lowercase, default = None
2535
2536        /// Sets if statistics are enabled for the column
2537        /// Valid values are: "none", "chunk", and "page"
2538        /// These values are not case sensitive. If NULL, uses
2539        /// default parquet options
2540        pub statistics_enabled: Option<String>, default = None
2541
2542        /// Sets bloom filter false positive probability for the column path. If NULL, uses
2543        /// default parquet options
2544        pub bloom_filter_fpp: Option<f64>, default = None
2545
2546        /// Sets bloom filter number of distinct values. If NULL, uses
2547        /// default parquet options
2548        pub bloom_filter_ndv: Option<u64>, default = None
2549    }
2550}
2551
2552#[derive(Clone, Debug, PartialEq)]
2553pub struct ConfigFileEncryptionProperties {
2554    /// Should the parquet footer be encrypted
2555    /// default is true
2556    pub encrypt_footer: bool,
2557    /// Key to use for the parquet footer encoded in hex format
2558    pub footer_key_as_hex: String,
2559    /// Metadata information for footer key
2560    pub footer_key_metadata_as_hex: String,
2561    /// HashMap of column names --> (key in hex format, metadata)
2562    pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2563    /// AAD prefix string uniquely identifies the file and prevents file swapping
2564    pub aad_prefix_as_hex: String,
2565    /// If true, store the AAD prefix in the file
2566    /// default is false
2567    pub store_aad_prefix: bool,
2568}
2569
2570// Setup to match EncryptionPropertiesBuilder::new()
2571impl Default for ConfigFileEncryptionProperties {
2572    fn default() -> Self {
2573        ConfigFileEncryptionProperties {
2574            encrypt_footer: true,
2575            footer_key_as_hex: String::new(),
2576            footer_key_metadata_as_hex: String::new(),
2577            column_encryption_properties: Default::default(),
2578            aad_prefix_as_hex: String::new(),
2579            store_aad_prefix: false,
2580        }
2581    }
2582}
2583
2584config_namespace_with_hashmap! {
2585    pub struct ColumnEncryptionProperties {
2586        /// Per column encryption key
2587        pub column_key_as_hex: String, default = "".to_string()
2588        /// Per column encryption key metadata
2589        pub column_metadata_as_hex: Option<String>, default = None
2590    }
2591}
2592
2593impl ConfigField for ConfigFileEncryptionProperties {
2594    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2595        let key = format!("{key_prefix}.encrypt_footer");
2596        let desc = "Encrypt the footer";
2597        self.encrypt_footer.visit(v, key.as_str(), desc);
2598
2599        let key = format!("{key_prefix}.footer_key_as_hex");
2600        let desc = "Key to use for the parquet footer";
2601        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2602
2603        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2604        let desc = "Metadata to use for the parquet footer";
2605        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2606
2607        self.column_encryption_properties.visit(v, key_prefix, desc);
2608
2609        let key = format!("{key_prefix}.aad_prefix_as_hex");
2610        let desc = "AAD prefix to use";
2611        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2612
2613        let key = format!("{key_prefix}.store_aad_prefix");
2614        let desc = "If true, store the AAD prefix";
2615        self.store_aad_prefix.visit(v, key.as_str(), desc);
2616
2617        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2618    }
2619
2620    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2621        // Any hex encoded values must be pre-encoded using
2622        // hex::encode() before calling set.
2623
2624        if key.contains("::") {
2625            // Handle any column specific properties
2626            return self.column_encryption_properties.set(key, value);
2627        };
2628
2629        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2630        match key {
2631            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2632            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2633            "footer_key_metadata_as_hex" => {
2634                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2635            }
2636            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2637            "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2638            _ => _config_err!(
2639                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2640                key
2641            ),
2642        }
2643    }
2644}
2645
2646#[cfg(feature = "parquet_encryption")]
2647impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2648    fn from(val: ConfigFileEncryptionProperties) -> Self {
2649        let mut fep = FileEncryptionProperties::builder(
2650            hex::decode(val.footer_key_as_hex).unwrap(),
2651        )
2652        .with_plaintext_footer(!val.encrypt_footer)
2653        .with_aad_prefix_storage(val.store_aad_prefix);
2654
2655        if !val.footer_key_metadata_as_hex.is_empty() {
2656            fep = fep.with_footer_key_metadata(
2657                hex::decode(&val.footer_key_metadata_as_hex)
2658                    .expect("Invalid footer key metadata"),
2659            );
2660        }
2661
2662        for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2663            let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2664                .expect("Invalid column encryption key");
2665            let key_metadata = encryption_props
2666                .column_metadata_as_hex
2667                .as_ref()
2668                .map(|x| hex::decode(x).expect("Invalid column metadata"));
2669            match key_metadata {
2670                Some(key_metadata) => {
2671                    fep = fep.with_column_key_and_metadata(
2672                        column_name,
2673                        encryption_key,
2674                        key_metadata,
2675                    );
2676                }
2677                None => {
2678                    fep = fep.with_column_key(column_name, encryption_key);
2679                }
2680            }
2681        }
2682
2683        if !val.aad_prefix_as_hex.is_empty() {
2684            let aad_prefix: Vec<u8> =
2685                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2686            fep = fep.with_aad_prefix(aad_prefix);
2687        }
2688        Arc::unwrap_or_clone(fep.build().unwrap())
2689    }
2690}
2691
2692#[cfg(feature = "parquet_encryption")]
2693impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2694    fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2695        let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2696
2697        let mut column_encryption_properties: HashMap<
2698            String,
2699            ColumnEncryptionProperties,
2700        > = HashMap::new();
2701
2702        for (i, column_name) in column_names_vec.iter().enumerate() {
2703            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2704            let column_metadata_as_hex: Option<String> =
2705                column_metas_vec.get(i).map(hex::encode);
2706            column_encryption_properties.insert(
2707                column_name.clone(),
2708                ColumnEncryptionProperties {
2709                    column_key_as_hex,
2710                    column_metadata_as_hex,
2711                },
2712            );
2713        }
2714        let mut aad_prefix: Vec<u8> = Vec::new();
2715        if let Some(prefix) = f.aad_prefix() {
2716            aad_prefix = prefix.clone();
2717        }
2718        ConfigFileEncryptionProperties {
2719            encrypt_footer: f.encrypt_footer(),
2720            footer_key_as_hex: hex::encode(f.footer_key()),
2721            footer_key_metadata_as_hex: f
2722                .footer_key_metadata()
2723                .map(hex::encode)
2724                .unwrap_or_default(),
2725            column_encryption_properties,
2726            aad_prefix_as_hex: hex::encode(aad_prefix),
2727            store_aad_prefix: f.store_aad_prefix(),
2728        }
2729    }
2730}
2731
2732#[derive(Clone, Debug, PartialEq)]
2733pub struct ConfigFileDecryptionProperties {
2734    /// Binary string to use for the parquet footer encoded in hex format
2735    pub footer_key_as_hex: String,
2736    /// HashMap of column names --> key in hex format
2737    pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2738    /// AAD prefix string uniquely identifies the file and prevents file swapping
2739    pub aad_prefix_as_hex: String,
2740    /// If true, then verify signature for files with plaintext footers.
2741    /// default = true
2742    pub footer_signature_verification: bool,
2743}
2744
2745config_namespace_with_hashmap! {
2746    pub struct ColumnDecryptionProperties {
2747        /// Per column encryption key
2748        pub column_key_as_hex: String, default = "".to_string()
2749    }
2750}
2751
2752// Setup to match DecryptionPropertiesBuilder::new()
2753impl Default for ConfigFileDecryptionProperties {
2754    fn default() -> Self {
2755        ConfigFileDecryptionProperties {
2756            footer_key_as_hex: String::new(),
2757            column_decryption_properties: Default::default(),
2758            aad_prefix_as_hex: String::new(),
2759            footer_signature_verification: true,
2760        }
2761    }
2762}
2763
2764impl ConfigField for ConfigFileDecryptionProperties {
2765    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2766        let key = format!("{key_prefix}.footer_key_as_hex");
2767        let desc = "Key to use for the parquet footer";
2768        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2769
2770        let key = format!("{key_prefix}.aad_prefix_as_hex");
2771        let desc = "AAD prefix to use";
2772        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2773
2774        let key = format!("{key_prefix}.footer_signature_verification");
2775        let desc = "If true, verify the footer signature";
2776        self.footer_signature_verification
2777            .visit(v, key.as_str(), desc);
2778
2779        self.column_decryption_properties.visit(v, key_prefix, desc);
2780    }
2781
2782    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2783        // Any hex encoded values must be pre-encoded using
2784        // hex::encode() before calling set.
2785
2786        if key.contains("::") {
2787            // Handle any column specific properties
2788            return self.column_decryption_properties.set(key, value);
2789        };
2790
2791        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2792        match key {
2793            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2794            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2795            "footer_signature_verification" => {
2796                self.footer_signature_verification.set(rem, value.as_ref())
2797            }
2798            _ => _config_err!(
2799                "Config value \"{}\" not found on ConfigFileDecryptionProperties",
2800                key
2801            ),
2802        }
2803    }
2804}
2805
2806#[cfg(feature = "parquet_encryption")]
2807impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2808    fn from(val: ConfigFileDecryptionProperties) -> Self {
2809        let mut column_names: Vec<&str> = Vec::new();
2810        let mut column_keys: Vec<Vec<u8>> = Vec::new();
2811
2812        for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2813            column_names.push(col_name.as_str());
2814            column_keys.push(
2815                hex::decode(&decryption_properties.column_key_as_hex)
2816                    .expect("Invalid column decryption key"),
2817            );
2818        }
2819
2820        let mut fep = FileDecryptionProperties::builder(
2821            hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2822        )
2823        .with_column_keys(column_names, column_keys)
2824        .unwrap();
2825
2826        if !val.footer_signature_verification {
2827            fep = fep.disable_footer_signature_verification();
2828        }
2829
2830        if !val.aad_prefix_as_hex.is_empty() {
2831            let aad_prefix =
2832                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2833            fep = fep.with_aad_prefix(aad_prefix);
2834        }
2835
2836        Arc::unwrap_or_clone(fep.build().unwrap())
2837    }
2838}
2839
2840#[cfg(feature = "parquet_encryption")]
2841impl From<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
2842    fn from(f: &Arc<FileDecryptionProperties>) -> Self {
2843        let (column_names_vec, column_keys_vec) = f.column_keys();
2844        let mut column_decryption_properties: HashMap<
2845            String,
2846            ColumnDecryptionProperties,
2847        > = HashMap::new();
2848        for (i, column_name) in column_names_vec.iter().enumerate() {
2849            let props = ColumnDecryptionProperties {
2850                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2851            };
2852            column_decryption_properties.insert(column_name.clone(), props);
2853        }
2854
2855        let mut aad_prefix: Vec<u8> = Vec::new();
2856        if let Some(prefix) = f.aad_prefix() {
2857            aad_prefix = prefix.clone();
2858        }
2859        ConfigFileDecryptionProperties {
2860            footer_key_as_hex: hex::encode(
2861                f.footer_key(None).unwrap_or_default().as_ref(),
2862            ),
2863            column_decryption_properties,
2864            aad_prefix_as_hex: hex::encode(aad_prefix),
2865            footer_signature_verification: f.check_plaintext_footer_integrity(),
2866        }
2867    }
2868}
2869
2870/// Holds implementation-specific options for an encryption factory
2871#[derive(Clone, Debug, Default, PartialEq)]
2872pub struct EncryptionFactoryOptions {
2873    pub options: HashMap<String, String>,
2874}
2875
2876impl ConfigField for EncryptionFactoryOptions {
2877    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2878        for (option_key, option_value) in &self.options {
2879            v.some(
2880                &format!("{key}.{option_key}"),
2881                option_value,
2882                "Encryption factory specific option",
2883            );
2884        }
2885    }
2886
2887    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2888        self.options.insert(key.to_owned(), value.to_owned());
2889        Ok(())
2890    }
2891}
2892
2893impl EncryptionFactoryOptions {
2894    /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2895    pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2896        let mut options = T::default();
2897        for (key, value) in &self.options {
2898            options.set(key, value)?;
2899        }
2900        Ok(options)
2901    }
2902}
2903
2904config_namespace! {
2905    /// Options controlling CSV format
2906    pub struct CsvOptions {
2907        /// Specifies whether there is a CSV header (i.e. the first line
2908        /// consists of is column names). The value `None` indicates that
2909        /// the configuration should be consulted.
2910        pub has_header: Option<bool>, default = None
2911        pub delimiter: u8, default = b','
2912        pub quote: u8, default = b'"'
2913        pub terminator: Option<u8>, default = None
2914        pub escape: Option<u8>, default = None
2915        pub double_quote: Option<bool>, default = None
2916        /// Specifies whether newlines in (quoted) values are supported.
2917        ///
2918        /// Parsing newlines in quoted values may be affected by execution behaviour such as
2919        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2920        /// parsed successfully, which may reduce performance.
2921        ///
2922        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2923        pub newlines_in_values: Option<bool>, default = None
2924        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2925        /// Compression level for the output file. The valid range depends on the
2926        /// compression algorithm:
2927        /// - ZSTD: 1 to 22 (default: 3)
2928        /// - GZIP: 0 to 9 (default: 6)
2929        /// - BZIP2: 0 to 9 (default: 6)
2930        /// - XZ: 0 to 9 (default: 6)
2931        /// If not specified, the default level for the compression algorithm is used.
2932        pub compression_level: Option<u32>, default = None
2933        pub schema_infer_max_rec: Option<usize>, default = None
2934        pub date_format: Option<String>, default = None
2935        pub datetime_format: Option<String>, default = None
2936        pub timestamp_format: Option<String>, default = None
2937        pub timestamp_tz_format: Option<String>, default = None
2938        pub time_format: Option<String>, default = None
2939        // The output format for Nulls in the CSV writer.
2940        pub null_value: Option<String>, default = None
2941        // The input regex for Nulls when loading CSVs.
2942        pub null_regex: Option<String>, default = None
2943        pub comment: Option<u8>, default = None
2944        /// Whether to allow truncated rows when parsing, both within a single file and across files.
2945        ///
2946        /// When set to false (default), reading a single CSV file which has rows of different lengths will
2947        /// error; if reading multiple CSV files with different number of columns, it will also fail.
2948        ///
2949        /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
2950        /// rows with null values for the missing columns; if reading multiple CSV files with different number
2951        /// of columns, it creates a union schema containing all columns found across the files, and will
2952        /// pad any files missing columns with null values for their rows.
2953        pub truncated_rows: Option<bool>, default = None
2954    }
2955}
2956
2957impl CsvOptions {
2958    /// Set a limit in terms of records to scan to infer the schema
2959    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2960    pub fn with_compression(
2961        mut self,
2962        compression_type_variant: CompressionTypeVariant,
2963    ) -> Self {
2964        self.compression = compression_type_variant;
2965        self
2966    }
2967
2968    /// Set a limit in terms of records to scan to infer the schema
2969    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2970    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2971        self.schema_infer_max_rec = Some(max_rec);
2972        self
2973    }
2974
2975    /// Set true to indicate that the first line is a header.
2976    /// - default to true
2977    pub fn with_has_header(mut self, has_header: bool) -> Self {
2978        self.has_header = Some(has_header);
2979        self
2980    }
2981
2982    /// Returns true if the first line is a header. If format options does not
2983    /// specify whether there is a header, returns `None` (indicating that the
2984    /// configuration should be consulted).
2985    pub fn has_header(&self) -> Option<bool> {
2986        self.has_header
2987    }
2988
2989    /// The character separating values within a row.
2990    /// - default to ','
2991    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2992        self.delimiter = delimiter;
2993        self
2994    }
2995
2996    /// The quote character in a row.
2997    /// - default to '"'
2998    pub fn with_quote(mut self, quote: u8) -> Self {
2999        self.quote = quote;
3000        self
3001    }
3002
3003    /// The character that terminates a row.
3004    /// - default to None (CRLF)
3005    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
3006        self.terminator = terminator;
3007        self
3008    }
3009
3010    /// The escape character in a row.
3011    /// - default is None
3012    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
3013        self.escape = escape;
3014        self
3015    }
3016
3017    /// Set true to indicate that the CSV quotes should be doubled.
3018    /// - default to true
3019    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
3020        self.double_quote = Some(double_quote);
3021        self
3022    }
3023
3024    /// Specifies whether newlines in (quoted) values are supported.
3025    ///
3026    /// Parsing newlines in quoted values may be affected by execution behaviour such as
3027    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
3028    /// parsed successfully, which may reduce performance.
3029    ///
3030    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
3031    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
3032        self.newlines_in_values = Some(newlines_in_values);
3033        self
3034    }
3035
3036    /// Set a `CompressionTypeVariant` of CSV
3037    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
3038    pub fn with_file_compression_type(
3039        mut self,
3040        compression: CompressionTypeVariant,
3041    ) -> Self {
3042        self.compression = compression;
3043        self
3044    }
3045
3046    /// Whether to allow truncated rows when parsing.
3047    /// By default this is set to false and will error if the CSV rows have different lengths.
3048    /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
3049    /// If the record’s schema is not nullable, then it will still return an error.
3050    pub fn with_truncated_rows(mut self, allow: bool) -> Self {
3051        self.truncated_rows = Some(allow);
3052        self
3053    }
3054
3055    /// Set the compression level for the output file.
3056    /// The valid range depends on the compression algorithm.
3057    /// If not specified, the default level for the algorithm is used.
3058    pub fn with_compression_level(mut self, level: u32) -> Self {
3059        self.compression_level = Some(level);
3060        self
3061    }
3062
3063    /// The delimiter character.
3064    pub fn delimiter(&self) -> u8 {
3065        self.delimiter
3066    }
3067
3068    /// The quote character.
3069    pub fn quote(&self) -> u8 {
3070        self.quote
3071    }
3072
3073    /// The terminator character.
3074    pub fn terminator(&self) -> Option<u8> {
3075        self.terminator
3076    }
3077
3078    /// The escape character.
3079    pub fn escape(&self) -> Option<u8> {
3080        self.escape
3081    }
3082}
3083
3084config_namespace! {
3085    /// Options controlling JSON format
3086    pub struct JsonOptions {
3087        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3088        /// Compression level for the output file. The valid range depends on the
3089        /// compression algorithm:
3090        /// - ZSTD: 1 to 22 (default: 3)
3091        /// - GZIP: 0 to 9 (default: 6)
3092        /// - BZIP2: 0 to 9 (default: 6)
3093        /// - XZ: 0 to 9 (default: 6)
3094        /// If not specified, the default level for the compression algorithm is used.
3095        pub compression_level: Option<u32>, default = None
3096        pub schema_infer_max_rec: Option<usize>, default = None
3097       /// The JSON format to use when reading files.
3098       ///
3099       /// When `true` (default), expects newline-delimited JSON (NDJSON):
3100       /// ```text
3101       /// {"key1": 1, "key2": "val"}
3102       /// {"key1": 2, "key2": "vals"}
3103       /// ```
3104       ///
3105       /// When `false`, expects JSON array format:
3106       /// ```text
3107       /// [
3108       ///   {"key1": 1, "key2": "val"},
3109       ///   {"key1": 2, "key2": "vals"}
3110       /// ]
3111       /// ```
3112       pub newline_delimited: bool, default = true
3113    }
3114}
3115
3116pub trait OutputFormatExt: Display {}
3117
3118#[derive(Debug, Clone, PartialEq)]
3119#[cfg_attr(feature = "parquet", expect(clippy::large_enum_variant))]
3120pub enum OutputFormat {
3121    CSV(CsvOptions),
3122    JSON(JsonOptions),
3123    #[cfg(feature = "parquet")]
3124    PARQUET(TableParquetOptions),
3125    AVRO,
3126    ARROW,
3127}
3128
3129impl Display for OutputFormat {
3130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3131        let out = match self {
3132            OutputFormat::CSV(_) => "csv",
3133            OutputFormat::JSON(_) => "json",
3134            #[cfg(feature = "parquet")]
3135            OutputFormat::PARQUET(_) => "parquet",
3136            OutputFormat::AVRO => "avro",
3137            OutputFormat::ARROW => "arrow",
3138        };
3139        write!(f, "{out}")
3140    }
3141}
3142
3143#[cfg(test)]
3144mod tests {
3145    #[cfg(feature = "parquet")]
3146    use crate::config::TableParquetOptions;
3147    use crate::config::{
3148        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
3149        Extensions, TableOptions,
3150    };
3151    use std::any::Any;
3152    use std::collections::HashMap;
3153
3154    #[derive(Default, Debug, Clone)]
3155    pub struct TestExtensionConfig {
3156        /// Should "foo" be replaced by "bar"?
3157        pub properties: HashMap<String, String>,
3158    }
3159
3160    impl ExtensionOptions for TestExtensionConfig {
3161        fn as_any(&self) -> &dyn Any {
3162            self
3163        }
3164
3165        fn as_any_mut(&mut self) -> &mut dyn Any {
3166            self
3167        }
3168
3169        fn cloned(&self) -> Box<dyn ExtensionOptions> {
3170            Box::new(self.clone())
3171        }
3172
3173        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
3174            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
3175            assert_eq!(key, "test");
3176            self.properties.insert(rem.to_owned(), value.to_owned());
3177            Ok(())
3178        }
3179
3180        fn entries(&self) -> Vec<ConfigEntry> {
3181            self.properties
3182                .iter()
3183                .map(|(k, v)| ConfigEntry {
3184                    key: k.into(),
3185                    value: Some(v.into()),
3186                    description: "",
3187                })
3188                .collect()
3189        }
3190    }
3191
3192    impl ConfigExtension for TestExtensionConfig {
3193        const PREFIX: &'static str = "test";
3194    }
3195
3196    #[test]
3197    fn create_table_config() {
3198        let mut extension = Extensions::new();
3199        extension.insert(TestExtensionConfig::default());
3200        let table_config = TableOptions::new().with_extensions(extension);
3201        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
3202        assert!(kafka_config.is_some())
3203    }
3204
3205    #[test]
3206    fn alter_test_extension_config() {
3207        let mut extension = Extensions::new();
3208        extension.insert(TestExtensionConfig::default());
3209        let mut table_config = TableOptions::new().with_extensions(extension);
3210        table_config.set_config_format(ConfigFileType::CSV);
3211        table_config.set("format.delimiter", ";").unwrap();
3212        assert_eq!(table_config.csv.delimiter, b';');
3213        table_config.set("test.bootstrap.servers", "asd").unwrap();
3214        let kafka_config = table_config
3215            .extensions
3216            .get::<TestExtensionConfig>()
3217            .unwrap();
3218        assert_eq!(
3219            kafka_config.properties.get("bootstrap.servers").unwrap(),
3220            "asd"
3221        );
3222    }
3223
3224    #[test]
3225    fn iter_test_extension_config() {
3226        let mut extension = Extensions::new();
3227        extension.insert(TestExtensionConfig::default());
3228        let table_config = TableOptions::new().with_extensions(extension);
3229        let extensions = table_config.extensions.iter().collect::<Vec<_>>();
3230        assert_eq!(extensions.len(), 1);
3231        assert_eq!(extensions[0].0, TestExtensionConfig::PREFIX);
3232    }
3233
3234    #[test]
3235    fn csv_u8_table_options() {
3236        let mut table_config = TableOptions::new();
3237        table_config.set_config_format(ConfigFileType::CSV);
3238        table_config.set("format.delimiter", ";").unwrap();
3239        assert_eq!(table_config.csv.delimiter as char, ';');
3240        table_config.set("format.escape", "\"").unwrap();
3241        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
3242        table_config.set("format.escape", "\'").unwrap();
3243        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
3244    }
3245
3246    #[test]
3247    fn warning_only_not_default() {
3248        use std::sync::atomic::AtomicUsize;
3249        static COUNT: AtomicUsize = AtomicUsize::new(0);
3250        use log::{Level, LevelFilter, Metadata, Record};
3251        struct SimpleLogger;
3252        impl log::Log for SimpleLogger {
3253            fn enabled(&self, metadata: &Metadata) -> bool {
3254                metadata.level() <= Level::Info
3255            }
3256
3257            fn log(&self, record: &Record) {
3258                if self.enabled(record.metadata()) {
3259                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3260                }
3261            }
3262            fn flush(&self) {}
3263        }
3264        log::set_logger(&SimpleLogger).unwrap();
3265        log::set_max_level(LevelFilter::Info);
3266        let mut sql_parser_options = crate::config::SqlParserOptions::default();
3267        sql_parser_options
3268            .set("enable_options_value_normalization", "false")
3269            .unwrap();
3270        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
3271        sql_parser_options
3272            .set("enable_options_value_normalization", "true")
3273            .unwrap();
3274        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
3275    }
3276
3277    #[test]
3278    fn reset_nested_scalar_reports_helpful_error() {
3279        let mut value = true;
3280        let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err();
3281        let message = err.to_string();
3282        assert!(
3283            message.starts_with(
3284                "Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\""
3285            ),
3286            "unexpected error message: {message}"
3287        );
3288    }
3289
3290    #[cfg(feature = "parquet")]
3291    #[test]
3292    fn parquet_table_options() {
3293        let mut table_config = TableOptions::new();
3294        table_config.set_config_format(ConfigFileType::PARQUET);
3295        table_config
3296            .set("format.bloom_filter_enabled::col1", "true")
3297            .unwrap();
3298        assert_eq!(
3299            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
3300            Some(true)
3301        );
3302    }
3303
3304    #[cfg(feature = "parquet_encryption")]
3305    #[test]
3306    fn parquet_table_encryption() {
3307        use crate::config::{
3308            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
3309        };
3310        use parquet::encryption::decrypt::FileDecryptionProperties;
3311        use parquet::encryption::encrypt::FileEncryptionProperties;
3312        use std::sync::Arc;
3313
3314        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
3315        let column_names = vec!["double_field", "float_field"];
3316        let column_keys =
3317            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
3318
3319        let file_encryption_properties =
3320            FileEncryptionProperties::builder(footer_key.clone())
3321                .with_column_keys(column_names.clone(), column_keys.clone())
3322                .unwrap()
3323                .build()
3324                .unwrap();
3325
3326        let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
3327            .with_column_keys(column_names.clone(), column_keys.clone())
3328            .unwrap()
3329            .build()
3330            .unwrap();
3331
3332        // Test round-trip
3333        let config_encrypt =
3334            ConfigFileEncryptionProperties::from(&file_encryption_properties);
3335        let encryption_properties_built =
3336            Arc::new(FileEncryptionProperties::from(config_encrypt.clone()));
3337        assert_eq!(file_encryption_properties, encryption_properties_built);
3338
3339        let config_decrypt = ConfigFileDecryptionProperties::from(&decryption_properties);
3340        let decryption_properties_built =
3341            Arc::new(FileDecryptionProperties::from(config_decrypt.clone()));
3342        assert_eq!(decryption_properties, decryption_properties_built);
3343
3344        ///////////////////////////////////////////////////////////////////////////////////
3345        // Test encryption config
3346
3347        // Display original encryption config
3348        // println!("{:#?}", config_encrypt);
3349
3350        let mut table_config = TableOptions::new();
3351        table_config.set_config_format(ConfigFileType::PARQUET);
3352        table_config
3353            .parquet
3354            .set(
3355                "crypto.file_encryption.encrypt_footer",
3356                config_encrypt.encrypt_footer.to_string().as_str(),
3357            )
3358            .unwrap();
3359        table_config
3360            .parquet
3361            .set(
3362                "crypto.file_encryption.footer_key_as_hex",
3363                config_encrypt.footer_key_as_hex.as_str(),
3364            )
3365            .unwrap();
3366
3367        for (i, col_name) in column_names.iter().enumerate() {
3368            let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3369            let value = hex::encode(column_keys[i].clone());
3370            table_config
3371                .parquet
3372                .set(key.as_str(), value.as_str())
3373                .unwrap();
3374        }
3375
3376        // Print matching final encryption config
3377        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3378
3379        assert_eq!(
3380            table_config.parquet.crypto.file_encryption,
3381            Some(config_encrypt)
3382        );
3383
3384        ///////////////////////////////////////////////////////////////////////////////////
3385        // Test decryption config
3386
3387        // Display original decryption config
3388        // println!("{:#?}", config_decrypt);
3389
3390        let mut table_config = TableOptions::new();
3391        table_config.set_config_format(ConfigFileType::PARQUET);
3392        table_config
3393            .parquet
3394            .set(
3395                "crypto.file_decryption.footer_key_as_hex",
3396                config_decrypt.footer_key_as_hex.as_str(),
3397            )
3398            .unwrap();
3399
3400        for (i, col_name) in column_names.iter().enumerate() {
3401            let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3402            let value = hex::encode(column_keys[i].clone());
3403            table_config
3404                .parquet
3405                .set(key.as_str(), value.as_str())
3406                .unwrap();
3407        }
3408
3409        // Print matching final decryption config
3410        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3411
3412        assert_eq!(
3413            table_config.parquet.crypto.file_decryption,
3414            Some(config_decrypt.clone())
3415        );
3416
3417        // Set config directly
3418        let mut table_config = TableOptions::new();
3419        table_config.set_config_format(ConfigFileType::PARQUET);
3420        table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3421        assert_eq!(
3422            table_config.parquet.crypto.file_decryption,
3423            Some(config_decrypt.clone())
3424        );
3425    }
3426
3427    #[cfg(feature = "parquet_encryption")]
3428    #[test]
3429    fn parquet_encryption_factory_config() {
3430        let mut parquet_options = TableParquetOptions::default();
3431
3432        assert_eq!(parquet_options.crypto.factory_id, None);
3433        assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3434
3435        let mut input_config = TestExtensionConfig::default();
3436        input_config
3437            .properties
3438            .insert("key1".to_string(), "value 1".to_string());
3439        input_config
3440            .properties
3441            .insert("key2".to_string(), "value 2".to_string());
3442
3443        parquet_options
3444            .crypto
3445            .configure_factory("example_factory", &input_config);
3446
3447        assert_eq!(
3448            parquet_options.crypto.factory_id,
3449            Some("example_factory".to_string())
3450        );
3451        let factory_options = &parquet_options.crypto.factory_options.options;
3452        assert_eq!(factory_options.len(), 2);
3453        assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3454        assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3455    }
3456
3457    #[cfg(feature = "parquet")]
3458    #[test]
3459    fn parquet_table_options_config_entry() {
3460        let mut table_config = TableOptions::new();
3461        table_config.set_config_format(ConfigFileType::PARQUET);
3462        table_config
3463            .set("format.bloom_filter_enabled::col1", "true")
3464            .unwrap();
3465        let entries = table_config.entries();
3466        assert!(
3467            entries
3468                .iter()
3469                .any(|item| item.key == "format.bloom_filter_enabled::col1")
3470        )
3471    }
3472
3473    #[cfg(feature = "parquet")]
3474    #[test]
3475    fn parquet_table_parquet_options_config_entry() {
3476        let mut table_parquet_options = TableParquetOptions::new();
3477        table_parquet_options
3478            .set(
3479                "crypto.file_encryption.column_key_as_hex::double_field",
3480                "31323334353637383930313233343530",
3481            )
3482            .unwrap();
3483        let entries = table_parquet_options.entries();
3484        assert!(
3485            entries.iter().any(|item| item.key
3486                == "crypto.file_encryption.column_key_as_hex::double_field")
3487        )
3488    }
3489
3490    #[cfg(feature = "parquet")]
3491    #[test]
3492    fn parquet_table_options_config_metadata_entry() {
3493        let mut table_config = TableOptions::new();
3494        table_config.set_config_format(ConfigFileType::PARQUET);
3495        table_config.set("format.metadata::key1", "").unwrap();
3496        table_config.set("format.metadata::key2", "value2").unwrap();
3497        table_config
3498            .set("format.metadata::key3", "value with spaces ")
3499            .unwrap();
3500        table_config
3501            .set("format.metadata::key4", "value with special chars :: :")
3502            .unwrap();
3503
3504        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3505        assert_eq!(parsed_metadata.get("should not exist1"), None);
3506        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3507        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3508        assert_eq!(
3509            parsed_metadata.get("key3"),
3510            Some(&Some("value with spaces ".into()))
3511        );
3512        assert_eq!(
3513            parsed_metadata.get("key4"),
3514            Some(&Some("value with special chars :: :".into()))
3515        );
3516
3517        // duplicate keys are overwritten
3518        table_config.set("format.metadata::key_dupe", "A").unwrap();
3519        table_config.set("format.metadata::key_dupe", "B").unwrap();
3520        let parsed_metadata = table_config.parquet.key_value_metadata;
3521        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3522    }
3523    #[cfg(feature = "parquet")]
3524    #[test]
3525    fn test_parquet_writer_version_validation() {
3526        use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};
3527
3528        let mut config = ConfigOptions::default();
3529
3530        // Valid values should work
3531        config
3532            .set("datafusion.execution.parquet.writer_version", "1.0")
3533            .unwrap();
3534        assert_eq!(
3535            config.execution.parquet.writer_version,
3536            DFParquetWriterVersion::V1_0
3537        );
3538
3539        config
3540            .set("datafusion.execution.parquet.writer_version", "2.0")
3541            .unwrap();
3542        assert_eq!(
3543            config.execution.parquet.writer_version,
3544            DFParquetWriterVersion::V2_0
3545        );
3546
3547        // Invalid value should error immediately at SET time
3548        let err = config
3549            .set("datafusion.execution.parquet.writer_version", "3.0")
3550            .unwrap_err();
3551        assert_eq!(
3552            err.to_string(),
3553            "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
3554        );
3555    }
3556}