Skip to main content

datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType};
26use crate::parquet_config::DFParquetWriterVersion;
27use crate::parsers::{CompressionTypeVariant, CsvQuoteStyle};
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30#[cfg(feature = "parquet_encryption")]
31use hex;
32use std::any::Any;
33use std::collections::{BTreeMap, HashMap};
34use std::error::Error;
35use std::fmt::{self, Display};
36use std::str::FromStr;
37#[cfg(feature = "parquet_encryption")]
38use std::sync::Arc;
39
40/// A macro that wraps a configuration struct and automatically derives
41/// [`Default`] and [`ConfigField`] for it, allowing it to be used
42/// in the [`ConfigOptions`] configuration tree.
43///
44/// `transform` is used to normalize values before parsing.
45///
46/// For example,
47///
48/// ```ignore
49/// config_namespace! {
50///    /// Amazing config
51///    pub struct MyConfig {
52///        /// Field 1 doc
53///        field1: String, transform = str::to_lowercase, default = "".to_string()
54///
55///        /// Field 2 doc
56///        field2: usize, default = 232
57///
58///        /// Field 3 doc
59///        field3: Option<usize>, default = None
60///    }
61/// }
62/// ```
63///
64/// Will generate
65///
66/// ```ignore
67/// /// Amazing config
68/// #[derive(Debug, Clone)]
69/// #[non_exhaustive]
70/// pub struct MyConfig {
71///     /// Field 1 doc
72///     field1: String,
73///     /// Field 2 doc
74///     field2: usize,
75///     /// Field 3 doc
76///     field3: Option<usize>,
77/// }
78/// impl ConfigField for MyConfig {
79///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
80///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
81///         match key {
82///             "field1" => {
83///                 let value = str::to_lowercase(value);
84///                 self.field1.set(rem, value.as_ref())
85///             },
86///             "field2" => self.field2.set(rem, value.as_ref()),
87///             "field3" => self.field3.set(rem, value.as_ref()),
88///             _ => _internal_err!(
89///                 "Config value \"{}\" not found on MyConfig",
90///                 key
91///             ),
92///         }
93///     }
94///
95///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
96///         let key = format!("{}.field1", key_prefix);
97///         let desc = "Field 1 doc";
98///         self.field1.visit(v, key.as_str(), desc);
99///         let key = format!("{}.field2", key_prefix);
100///         let desc = "Field 2 doc";
101///         self.field2.visit(v, key.as_str(), desc);
102///         let key = format!("{}.field3", key_prefix);
103///         let desc = "Field 3 doc";
104///         self.field3.visit(v, key.as_str(), desc);
105///     }
106/// }
107///
108/// impl Default for MyConfig {
109///     fn default() -> Self {
110///         Self {
111///             field1: "".to_string(),
112///             field2: 232,
113///             field3: None,
114///         }
115///     }
116/// }
117/// ```
118///
119/// NB: Misplaced commas may result in nonsensical errors
120#[macro_export]
121macro_rules! config_namespace {
122    (
123        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
124        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
125        $(#[allow($($struct_de:tt)*)])?
126        $vis:vis struct $struct_name:ident {
127            $(
128                $(#[doc = $d:tt])* // Field-level documentation attributes
129                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
130                $(#[allow($($field_de:tt)*)])?
131                $field_vis:vis $field_name:ident : $field_type:ty,
132                $(warn = $warn:expr,)?
133                $(transform = $transform:expr,)?
134                default = $default:expr
135            )*$(,)*
136        }
137    ) => {
138        $(#[doc = $struct_d])* // Apply struct documentation
139        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
140        $(#[allow($($struct_de)*)])?
141        #[derive(Debug, Clone, PartialEq)]
142        $vis struct $struct_name {
143            $(
144                $(#[doc = $d])* // Apply field documentation
145                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
146                $(#[allow($($field_de)*)])?
147                $field_vis $field_name: $field_type,
148            )*
149        }
150
151        impl $crate::config::ConfigField for $struct_name {
152            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
153                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
154                match key {
155                    $(
156                        stringify!($field_name) => {
157                            // Safely apply deprecated attribute if present
158                            // $(#[allow(deprecated)])?
159                            {
160                                $(let value = $transform(value);)? // Apply transformation if specified
161                                let ret = self.$field_name.set(rem, value.as_ref());
162
163                                $(if !$warn.is_empty() {
164                                    let default: $field_type = $default;
165                                    if default != self.$field_name {
166                                        log::warn!($warn);
167                                    }
168                                })? // Log warning if specified, and the value is not the default
169                                ret
170                            }
171                        },
172                    )*
173                    _ => return $crate::error::_config_err!(
174                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
175                    )
176                }
177            }
178
179            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
180                $(
181                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
182                    let desc = concat!($($d),*).trim();
183                    self.$field_name.visit(v, key.as_str(), desc);
184                )*
185            }
186
187            fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
188                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
189                match key {
190                    $(
191                        stringify!($field_name) => {
192                                    {
193                                if rem.is_empty() {
194                                    let default_value: $field_type = $default;
195                                    self.$field_name = default_value;
196                                    Ok(())
197                                } else {
198                                    self.$field_name.reset(rem)
199                                }
200                            }
201                        },
202                    )*
203                    _ => $crate::error::_config_err!(
204                        "Config value \"{}\" not found on {}",
205                        key,
206                        stringify!($struct_name)
207                    ),
208                }
209            }
210        }
211        impl Default for $struct_name {
212            fn default() -> Self {
213                Self {
214                    $($field_name: $default),*
215                }
216            }
217        }
218    }
219}
220
221config_namespace! {
222    /// Options related to catalog and directory scanning
223    ///
224    /// See also: [`SessionConfig`]
225    ///
226    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
227    pub struct CatalogOptions {
228        /// Whether the default catalog and schema should be created automatically.
229        pub create_default_catalog_and_schema: bool, default = true
230
231        /// The default catalog name - this impacts what SQL queries use if not specified
232        pub default_catalog: String, default = "datafusion".to_string()
233
234        /// The default schema name - this impacts what SQL queries use if not specified
235        pub default_schema: String, default = "public".to_string()
236
237        /// Should DataFusion provide access to `information_schema`
238        /// virtual tables for displaying schema information
239        pub information_schema: bool, default = false
240
241        /// Location scanned to load tables for `default` schema
242        pub location: Option<String>, default = None
243
244        /// Type of `TableProvider` to use when loading `default` schema
245        pub format: Option<String>, default = None
246
247        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
248        /// if not specified explicitly in the statement.
249        pub has_header: bool, default = true
250
251        /// Specifies whether newlines in (quoted) CSV values are supported.
252        ///
253        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
254        /// if not specified explicitly in the statement.
255        ///
256        /// Parsing newlines in quoted values may be affected by execution behaviour such as
257        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
258        /// parsed successfully, which may reduce performance.
259        pub newlines_in_values: bool, default = false
260    }
261}
262
263config_namespace! {
264    /// Options related to SQL parser
265    ///
266    /// See also: [`SessionConfig`]
267    ///
268    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
269    pub struct SqlParserOptions {
270        /// When set to true, SQL parser will parse float as decimal type
271        pub parse_float_as_decimal: bool, default = false
272
273        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
274        pub enable_ident_normalization: bool, default = true
275
276        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
277        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
278        /// are normalized automatically.
279        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
280
281        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
282        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
283        pub dialect: Dialect, default = Dialect::Generic
284        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
285
286        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
287        /// ignore the length. If false, error if a `VARCHAR` with a length is
288        /// specified. The Arrow type system does not have a notion of maximum
289        /// string length and thus DataFusion can not enforce such limits.
290        pub support_varchar_with_length: bool, default = true
291
292        /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
293        /// If false, they are mapped to `Utf8`.
294        /// Default is true.
295        pub map_string_types_to_utf8view: bool, default = true
296
297        /// When set to true, the source locations relative to the original SQL
298        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
299        /// and recorded in the logical plan nodes.
300        pub collect_spans: bool, default = false
301
302        /// Specifies the recursion depth limit when parsing complex SQL Queries
303        pub recursion_limit: usize, default = 50
304
305        /// Specifies the default null ordering for query results. There are 4 options:
306        /// - `nulls_max`: Nulls appear last in ascending order.
307        /// - `nulls_min`: Nulls appear first in ascending order.
308        /// - `nulls_first`: Nulls always be first in any order.
309        /// - `nulls_last`: Nulls always be last in any order.
310        ///
311        /// By default, `nulls_max` is used to follow Postgres's behavior.
312        /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
313        pub default_null_ordering: String, default = "nulls_max".to_string()
314
315        /// When set to true, DataFusion may remove `ORDER BY` clauses from
316        /// subqueries or CTEs during SQL planning when their ordering cannot
317        /// affect the result, such as when no `LIMIT` or other
318        /// order-sensitive operator depends on them.
319        ///
320        /// Disable this option to preserve explicit subquery ordering in the
321        /// planned query.
322        pub enable_subquery_sort_elimination: bool, default = true
323    }
324}
325
326/// This is the SQL dialect used by DataFusion's parser.
327/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
328/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
329#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
330pub enum Dialect {
331    #[default]
332    Generic,
333    MySQL,
334    PostgreSQL,
335    Hive,
336    SQLite,
337    Snowflake,
338    Redshift,
339    MsSQL,
340    ClickHouse,
341    BigQuery,
342    Ansi,
343    DuckDB,
344    Databricks,
345}
346
347impl AsRef<str> for Dialect {
348    fn as_ref(&self) -> &str {
349        match self {
350            Self::Generic => "generic",
351            Self::MySQL => "mysql",
352            Self::PostgreSQL => "postgresql",
353            Self::Hive => "hive",
354            Self::SQLite => "sqlite",
355            Self::Snowflake => "snowflake",
356            Self::Redshift => "redshift",
357            Self::MsSQL => "mssql",
358            Self::ClickHouse => "clickhouse",
359            Self::BigQuery => "bigquery",
360            Self::Ansi => "ansi",
361            Self::DuckDB => "duckdb",
362            Self::Databricks => "databricks",
363        }
364    }
365}
366
367impl FromStr for Dialect {
368    type Err = DataFusionError;
369
370    fn from_str(s: &str) -> Result<Self, Self::Err> {
371        let value = match s.to_ascii_lowercase().as_str() {
372            "generic" => Self::Generic,
373            "mysql" => Self::MySQL,
374            "postgresql" | "postgres" => Self::PostgreSQL,
375            "hive" => Self::Hive,
376            "sqlite" => Self::SQLite,
377            "snowflake" => Self::Snowflake,
378            "redshift" => Self::Redshift,
379            "mssql" => Self::MsSQL,
380            "clickhouse" => Self::ClickHouse,
381            "bigquery" => Self::BigQuery,
382            "ansi" => Self::Ansi,
383            "duckdb" => Self::DuckDB,
384            "databricks" => Self::Databricks,
385            other => {
386                let error_message = format!(
387                    "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
388                );
389                return Err(DataFusionError::Configuration(error_message));
390            }
391        };
392        Ok(value)
393    }
394}
395
396impl ConfigField for Dialect {
397    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
398        v.some(key, self, description)
399    }
400
401    fn set(&mut self, _: &str, value: &str) -> Result<()> {
402        *self = Self::from_str(value)?;
403        Ok(())
404    }
405}
406
407impl Display for Dialect {
408    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409        let str = self.as_ref();
410        write!(f, "{str}")
411    }
412}
413
414#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
415pub enum SpillCompression {
416    Zstd,
417    Lz4Frame,
418    #[default]
419    Uncompressed,
420}
421
422impl FromStr for SpillCompression {
423    type Err = DataFusionError;
424
425    fn from_str(s: &str) -> Result<Self, Self::Err> {
426        match s.to_ascii_lowercase().as_str() {
427            "zstd" => Ok(Self::Zstd),
428            "lz4_frame" => Ok(Self::Lz4Frame),
429            "uncompressed" | "" => Ok(Self::Uncompressed),
430            other => Err(DataFusionError::Configuration(format!(
431                "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
432            ))),
433        }
434    }
435}
436
437impl ConfigField for SpillCompression {
438    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
439        v.some(key, self, description)
440    }
441
442    fn set(&mut self, _: &str, value: &str) -> Result<()> {
443        *self = SpillCompression::from_str(value)?;
444        Ok(())
445    }
446}
447
448impl Display for SpillCompression {
449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450        let str = match self {
451            Self::Zstd => "zstd",
452            Self::Lz4Frame => "lz4_frame",
453            Self::Uncompressed => "uncompressed",
454        };
455        write!(f, "{str}")
456    }
457}
458
459impl From<SpillCompression> for Option<CompressionType> {
460    fn from(c: SpillCompression) -> Self {
461        match c {
462            SpillCompression::Zstd => Some(CompressionType::ZSTD),
463            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
464            SpillCompression::Uncompressed => None,
465        }
466    }
467}
468
469config_namespace! {
470    /// Options related to query execution
471    ///
472    /// See also: [`SessionConfig`]
473    ///
474    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
475    pub struct ExecutionOptions {
476        /// Default batch size while creating new batches, it's especially useful for
477        /// buffer-in-memory batches since creating tiny batches would result in too much
478        /// metadata memory consumption
479        pub batch_size: usize, default = 8192
480
481        /// A perfect hash join (see `HashJoinExec` for more details) will be considered
482        /// if the range of keys (max - min) on the build side is < this threshold.
483        /// This provides a fast path for joins with very small key ranges,
484        /// bypassing the density check.
485        ///
486        /// Currently only supports cases where build_side.num_rows() < u32::MAX.
487        /// Support for build_side.num_rows() >= u32::MAX will be added in the future.
488        pub perfect_hash_join_small_build_threshold: usize, default = 1024
489
490        /// The minimum required density of join keys on the build side to consider a
491        /// perfect hash join (see `HashJoinExec` for more details). Density is calculated as:
492        /// `(number of rows) / (max_key - min_key + 1)`.
493        /// A perfect hash join may be used if the actual key density > this
494        /// value.
495        ///
496        /// Currently only supports cases where build_side.num_rows() < u32::MAX.
497        /// Support for build_side.num_rows() >= u32::MAX will be added in the future.
498        pub perfect_hash_join_min_key_density: f64, default = 0.15
499
500        /// When set to true, record batches will be examined between each operator and
501        /// small batches will be coalesced into larger batches. This is helpful when there
502        /// are highly selective filters or joins that could produce tiny output batches. The
503        /// target batch size is determined by the configuration setting
504        pub coalesce_batches: bool, default = true
505
506        /// Should DataFusion collect statistics when first creating a table.
507        /// Has no effect after the table is created. Applies to the default
508        /// `ListingTableProvider` in DataFusion. Defaults to true.
509        pub collect_statistics: bool, default = true
510
511        /// Number of partitions for query execution. Increasing partitions can increase
512        /// concurrency.
513        ///
514        /// Defaults to the number of CPU cores on the system
515        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
516
517        /// The default time zone
518        ///
519        /// Some functions, e.g. `now` return timestamps in this time zone
520        pub time_zone: Option<String>, default = None
521
522        /// Parquet options
523        pub parquet: ParquetOptions, default = Default::default()
524
525        /// Fan-out during initial physical planning.
526        ///
527        /// This is mostly use to plan `UNION` children in parallel.
528        ///
529        /// Defaults to the number of CPU cores on the system
530        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
531
532        /// When set to true, skips verifying that the schema produced by
533        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
534        /// schema of the input plan.
535        ///
536        /// When set to false, if the schema does not match exactly
537        /// (including nullability and metadata), a planning error will be raised.
538        ///
539        /// This is used to workaround bugs in the planner that are now caught by
540        /// the new schema verification step.
541        pub skip_physical_aggregate_schema_check: bool, default = false
542
543        /// Sets the compression codec used when spilling data to disk.
544        ///
545        /// Since datafusion writes spill files using the Arrow IPC Stream format,
546        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
547        /// Valid values are: uncompressed, lz4_frame, zstd.
548        /// Note: lz4_frame offers faster (de)compression, but typically results in
549        /// larger spill files. In contrast, zstd achieves
550        /// higher compression ratios at the cost of slower (de)compression speed.
551        pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
552
553        /// Specifies the reserved memory for each spillable sort operation to
554        /// facilitate an in-memory merge.
555        ///
556        /// When a sort operation spills to disk, the in-memory data must be
557        /// sorted and merged before being written to a file. This setting reserves
558        /// a specific amount of memory for that in-memory sort/merge process.
559        ///
560        /// Note: This setting is irrelevant if the sort operation cannot spill
561        /// (i.e., if there's no `DiskManager` configured).
562        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
563
564        /// When sorting, below what size should data be concatenated
565        /// and sorted in a single RecordBatch rather than sorted in
566        /// batches and merged.
567        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
568
569        /// Maximum buffer capacity (in bytes) per partition for BufferExec
570        /// inserted during sort pushdown optimization.
571        ///
572        /// When PushdownSort eliminates a SortExec under SortPreservingMergeExec,
573        /// a BufferExec is inserted to replace SortExec's buffering role. This
574        /// prevents I/O stalls by allowing the scan to run ahead of the merge.
575        ///
576        /// This uses strictly less memory than the SortExec it replaces (which
577        /// buffers the entire partition). The buffer respects the global memory
578        /// pool limit. Setting this to a large value is safe — actual memory
579        /// usage is bounded by partition size and global memory limits.
580        pub sort_pushdown_buffer_capacity: usize, default = 1024 * 1024 * 1024
581
582        /// Maximum size in bytes for individual spill files before rotating to a new file.
583        ///
584        /// When operators spill data to disk (e.g., RepartitionExec), they write
585        /// multiple batches to the same file until this size limit is reached, then rotate
586        /// to a new file. This reduces syscall overhead compared to one-file-per-batch
587        /// while preventing files from growing too large.
588        ///
589        /// A larger value reduces file creation overhead but may hold more disk space.
590        /// A smaller value creates more files but allows finer-grained space reclamation
591        /// as files can be deleted once fully consumed.
592        ///
593        /// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
594        /// may create spill files larger than the limit.
595        ///
596        /// Default: 128 MB
597        pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024
598
599        /// Number of files to read in parallel when inferring schema and statistics
600        pub meta_fetch_concurrency: usize, default = 32
601
602        /// Guarantees a minimum level of output files running in parallel.
603        /// RecordBatches will be distributed in round robin fashion to each
604        /// parallel writer. Each writer is closed and a new file opened once
605        /// soft_max_rows_per_output_file is reached.
606        pub minimum_parallel_output_files: usize, default = 4
607
608        /// Target number of rows in output files when writing multiple.
609        /// This is a soft max, so it can be exceeded slightly. There also
610        /// will be one file smaller than the limit if the total
611        /// number of rows written is not roughly divisible by the soft max
612        pub soft_max_rows_per_output_file: usize, default = 50000000
613
614        /// This is the maximum number of RecordBatches buffered
615        /// for each output file being worked. Higher values can potentially
616        /// give faster write performance at the cost of higher peak
617        /// memory consumption
618        pub max_buffered_batches_per_output_file: usize, default = 2
619
620        /// Should sub directories be ignored when scanning directories for data
621        /// files. Defaults to true (ignores subdirectories), consistent with
622        /// Hive. Note that this setting does not affect reading partitioned
623        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
624        pub listing_table_ignore_subdirectory: bool, default = true
625
626        /// Should a `ListingTable` created through the `ListingTableFactory` infer table
627        /// partitions from Hive compliant directories. Defaults to true (partition columns are
628        /// inferred and will be represented in the table schema).
629        pub listing_table_factory_infer_partitions: bool, default = true
630
631        /// Should DataFusion support recursive CTEs
632        pub enable_recursive_ctes: bool, default = true
633
634        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
635        /// statistics into the same file groups.
636        /// Currently experimental
637        pub split_file_groups_by_statistics: bool, default = false
638
639        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
640        pub keep_partition_by_columns: bool, default = false
641
642        /// Aggregation ratio (number of distinct groups / number of input rows)
643        /// threshold for skipping partial aggregation. If the value is greater
644        /// then partial aggregation will skip aggregation for further input
645        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
646
647        /// Number of input rows partial aggregation partition should process, before
648        /// aggregation ratio check and trying to switch to skipping aggregation mode
649        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
650
651        /// Should DataFusion use row number estimates at the input to decide
652        /// whether increasing parallelism is beneficial or not. By default,
653        /// only exact row numbers (not estimates) are used for this decision.
654        /// Setting this flag to `true` will likely produce better plans.
655        /// if the source of statistics is accurate.
656        /// We plan to make this the default in the future.
657        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
658
659        /// Should DataFusion enforce batch size in joins or not. By default,
660        /// DataFusion will not enforce batch size in joins. Enforcing batch size
661        /// in joins can reduce memory usage when joining large
662        /// tables with a highly-selective join filter, but is also slightly slower.
663        pub enforce_batch_size_in_joins: bool, default = false
664
665        /// Size (bytes) of data buffer DataFusion uses when writing output files.
666        /// This affects the size of the data chunks that are uploaded to remote
667        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
668        /// written, it may be necessary to increase this size to avoid errors from
669        /// the remote end point.
670        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
671
672        /// Whether to enable ANSI SQL mode.
673        ///
674        /// The flag is experimental and relevant only for DataFusion Spark built-in functions
675        ///
676        /// When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL
677        /// semantics for expressions, casting, and error handling. This means:
678        /// - **Strict type coercion rules:** implicit casts between incompatible types are disallowed.
679        /// - **Standard SQL arithmetic behavior:** operations such as division by zero,
680        ///   numeric overflow, or invalid casts raise runtime errors rather than returning
681        ///   `NULL` or adjusted values.
682        /// - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling.
683        ///
684        /// When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive,
685        /// non-ANSI mode designed for user convenience and backward compatibility. In this mode:
686        /// - Implicit casts between types are allowed (e.g., string to integer when possible).
687        /// - Arithmetic operations are more lenient — for example, `abs()` on the minimum
688        ///   representable integer value returns the input value instead of raising overflow.
689        /// - Division by zero or invalid casts may return `NULL` instead of failing.
690        ///
691        /// # Default
692        /// `false` — ANSI SQL mode is disabled by default.
693        pub enable_ansi_mode: bool, default = false
694
695        /// How many bytes to buffer in the probe side of hash joins while the build side is
696        /// concurrently being built.
697        ///
698        /// Without this, hash joins will wait until the full materialization of the build side
699        /// before polling the probe side. This is useful in scenarios where the query is not
700        /// completely CPU bounded, allowing to do some early work concurrently and reducing the
701        /// latency of the query.
702        ///
703        /// Note that when hash join buffering is enabled, the probe side will start eagerly
704        /// polling data, not giving time for the producer side of dynamic filters to produce any
705        /// meaningful predicate. Queries with dynamic filters might see performance degradation.
706        ///
707        /// Disabled by default, set to a number greater than 0 for enabling it.
708        pub hash_join_buffering_capacity: usize, default = 0
709    }
710}
711
712config_namespace! {
713    /// Options for content-defined chunking (CDC) when writing parquet files.
714    /// Mirrors `parquet::file::properties::CdcOptions`.
715    ///
716    /// Carried as a [`ParquetCdcOptions`] in [`ParquetOptions::content_defined_chunking`]
717    /// with an explicit `enabled` flag, so it can be toggled with dotted config
718    /// keys (`content_defined_chunking.enabled = true|false`) and the result is
719    /// independent of the order in which the keys are set.
720    pub struct ParquetCdcOptions {
721        /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
722        /// parquet files. When enabled, parallel writing is automatically disabled
723        /// since the chunker state must persist across row groups.
724        pub enabled: bool, default = false
725
726        /// Minimum chunk size in bytes. The rolling hash will not trigger a split
727        /// until this many bytes have been accumulated. Default is 256 KiB.
728        pub min_chunk_size: usize, default = 256 * 1024
729
730        /// Maximum chunk size in bytes. A split is forced when the accumulated
731        /// size exceeds this value. Default is 1 MiB.
732        pub max_chunk_size: usize, default = 1024 * 1024
733
734        /// Normalization level. Increasing this improves deduplication ratio
735        /// but increases fragmentation. Recommended range is [-3, 3], default is 0.
736        pub norm_level: i32, default = 0
737    }
738}
739
740impl ParquetCdcOptions {
741    /// Returns enabled CDC options with the default chunking parameters.
742    ///
743    /// Shorthand for `ParquetCdcOptions { enabled: true, ..Default::default() }`;
744    /// combine with struct-update syntax to override parameters, e.g.
745    /// `ParquetCdcOptions { min_chunk_size: 4096, ..ParquetCdcOptions::enabled() }`.
746    pub fn enabled() -> Self {
747        Self {
748            enabled: true,
749            ..Default::default()
750        }
751    }
752
753    /// Returns disabled CDC options (equivalent to [`ParquetCdcOptions::default`]).
754    pub fn disabled() -> Self {
755        Self::default()
756    }
757}
758
759config_namespace! {
760    /// Options for reading and writing parquet files
761    ///
762    /// See also: [`SessionConfig`]
763    ///
764    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
765    pub struct ParquetOptions {
766        // The following options affect reading parquet files
767
768        /// (reading) If true, reads the Parquet data page level metadata (the
769        /// Page Index), if present, to reduce the I/O and number of
770        /// rows decoded.
771        pub enable_page_index: bool, default = true
772
773        /// (reading) If true, the parquet reader attempts to skip entire row groups based
774        /// on the predicate in the query and the metadata (min/max values) stored in
775        /// the parquet file
776        pub pruning: bool, default = true
777
778        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
779        /// the file Schema. This setting can help avoid schema conflicts when querying
780        /// multiple parquet files with schemas containing compatible types but different metadata
781        pub skip_metadata: bool, default = true
782
783        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
784        /// bytes of the parquet file optimistically. If not specified, two reads are required:
785        /// One read to fetch the 8-byte parquet footer and
786        /// another to fetch the metadata length encoded in the footer
787        /// Default setting to 512 KiB, which should be sufficient for most parquet files,
788        /// it can reduce one I/O operation per parquet file. If the metadata is larger than
789        /// the hint, two reads will still be performed.
790        pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
791
792        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
793        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
794        pub pushdown_filters: bool, default = false
795
796        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
797        /// will be reordered heuristically to minimize the cost of evaluation. If false,
798        /// the filters are applied in the same order as written in the query
799        pub reorder_filters: bool, default = false
800
801        /// (reading) Force the use of RowSelections for filter results, when
802        /// pushdown_filters is enabled. If false, the reader will automatically
803        /// choose between a RowSelection and a Bitmap based on the number and
804        /// pattern of selected rows.
805        pub force_filter_selections: bool, default = false
806
807        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
808        /// and `Binary/BinaryLarge` with `BinaryView`.
809        pub schema_force_view_types: bool, default = true
810
811        /// (reading) If true, parquet reader will read columns of
812        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
813        ///
814        /// Parquet files generated by some legacy writers do not correctly set
815        /// the UTF8 flag for strings, causing string columns to be loaded as
816        /// BLOB instead.
817        pub binary_as_string: bool, default = false
818
819        /// (reading) If true, parquet reader will read columns of
820        /// physical type int96 as originating from a different resolution
821        /// than nanosecond. This is useful for reading data from systems like Spark
822        /// which stores microsecond resolution timestamps in an int96 allowing it
823        /// to write values with a larger date range than 64-bit timestamps with
824        /// nanosecond resolution.
825        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
826
827        /// (reading) Optional timezone applied to INT96 columns when `coerce_int96`
828        /// is set. When `Some`, INT96 columns coerce to
829        /// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
830        /// `Timestamp(<coerce_int96>, None)`. Spark and other systems write INT96
831        /// values as UTC-adjusted instants, so callers that need the resulting
832        /// Arrow type to be timezone-aware (e.g. for Spark `TimestampType`
833        /// semantics) should set this to `"UTC"`. No effect when `coerce_int96`
834        /// is `None`.
835        pub coerce_int96_tz: Option<String>, default = None
836
837        /// (reading) Use any available bloom filters when reading parquet files
838        pub bloom_filter_on_read: bool, default = true
839
840        /// (reading) The maximum predicate cache size, in bytes. When
841        /// `pushdown_filters` is enabled, sets the maximum memory used to cache
842        /// the results of predicate evaluation between filter evaluation and
843        /// output generation. Decreasing this value will reduce memory usage,
844        /// but may increase IO and CPU usage. None means use the default
845        /// parquet reader setting. 0 means no caching.
846        pub max_predicate_cache_size: Option<usize>, default = None
847
848        // The following options affect writing to parquet files
849        // and map to parquet::file::properties::WriterProperties
850
851        /// (writing) Sets best effort maximum size of data page in bytes
852        pub data_pagesize_limit: usize, default = 1024 * 1024
853
854        /// (writing) Sets write_batch_size in rows
855        pub write_batch_size: usize, default = 1024
856
857        /// (writing) Sets parquet writer version
858        /// valid values are "1.0" and "2.0"
859        pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()
860
861        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
862        ///
863        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
864        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
865        pub skip_arrow_metadata: bool, default = false
866
867        /// (writing) Sets default parquet compression codec.
868        /// Valid values are: uncompressed, snappy, gzip(level),
869        /// brotli(level), lz4, zstd(level), and lz4_raw.
870        /// These values are not case sensitive. If NULL, uses
871        /// default parquet writer setting
872        ///
873        /// Note that this default setting is not the same as
874        /// the default parquet writer setting.
875        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
876
877        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
878        /// default parquet writer setting
879        pub dictionary_enabled: Option<bool>, default = Some(true)
880
881        /// (writing) Sets best effort maximum dictionary page size, in bytes
882        pub dictionary_page_size_limit: usize, default = 1024 * 1024
883
884        /// (writing) Sets if statistics are enabled for any column
885        /// Valid values are: "none", "chunk", and "page"
886        /// These values are not case sensitive. If NULL, uses
887        /// default parquet writer setting
888        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
889
890        /// (writing) Target maximum number of rows in each row group (defaults to 1M
891        /// rows). Writing larger row groups requires more memory to write, but
892        /// can get better compression and be faster to read.
893        pub max_row_group_size: usize, default =  1024 * 1024
894
895        /// (writing) Sets "created by" property
896        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
897
898        /// (writing) Sets column index truncate length
899        pub column_index_truncate_length: Option<usize>, default = Some(64)
900
901        /// (writing) Sets statistics truncate length. If NULL, uses
902        /// default parquet writer setting
903        pub statistics_truncate_length: Option<usize>, default = Some(64)
904
905        /// (writing) Sets best effort maximum number of rows in data page
906        pub data_page_row_count_limit: usize, default = 20_000
907
908        /// (writing)  Sets default encoding for any column.
909        /// Valid values are: plain, plain_dictionary, rle,
910        /// bit_packed, delta_binary_packed, delta_length_byte_array,
911        /// delta_byte_array, rle_dictionary, and byte_stream_split.
912        /// These values are not case sensitive. If NULL, uses
913        /// default parquet writer setting
914        pub encoding: Option<String>, transform = str::to_lowercase, default = None
915
916        /// (writing) Write bloom filters for all columns when creating parquet files
917        pub bloom_filter_on_write: bool, default = false
918
919        /// (writing) Sets bloom filter false positive probability. If NULL, uses
920        /// default parquet writer setting
921        pub bloom_filter_fpp: Option<f64>, default = None
922
923        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
924        /// default parquet writer setting
925        pub bloom_filter_ndv: Option<u64>, default = None
926
927        /// (writing) Controls whether DataFusion will attempt to speed up writing
928        /// parquet files by serializing them in parallel. Each column
929        /// in each row group in each output file are serialized in parallel
930        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
931        pub allow_single_file_parallelism: bool, default = true
932
933        /// (writing) By default parallel parquet writer is tuned for minimum
934        /// memory usage in a streaming execution plan. You may see
935        /// a performance benefit when writing large parquet files
936        /// by increasing maximum_parallel_row_group_writers and
937        /// maximum_buffered_record_batches_per_stream if your system
938        /// has idle cores and can tolerate additional memory usage.
939        /// Boosting these values is likely worthwhile when
940        /// writing out already in-memory data, such as from a cached
941        /// data frame.
942        pub maximum_parallel_row_group_writers: usize, default = 1
943
944        /// (writing) By default parallel parquet writer is tuned for minimum
945        /// memory usage in a streaming execution plan. You may see
946        /// a performance benefit when writing large parquet files
947        /// by increasing maximum_parallel_row_group_writers and
948        /// maximum_buffered_record_batches_per_stream if your system
949        /// has idle cores and can tolerate additional memory usage.
950        /// Boosting these values is likely worthwhile when
951        /// writing out already in-memory data, such as from a cached
952        /// data frame.
953        pub maximum_buffered_record_batches_per_stream: usize, default = 2
954
955        /// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing
956        /// parquet files. Disabled by default; toggle with
957        /// `content_defined_chunking.enabled = true|false`. The chunking parameters live
958        /// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When
959        /// enabled, parallel writing is automatically disabled since the chunker state
960        /// must persist across row groups. Mirrors
961        /// `parquet::file::properties::WriterProperties::content_defined_chunking`.
962        pub content_defined_chunking: ParquetCdcOptions, default = Default::default()
963    }
964}
965
966config_namespace! {
967    /// Options for configuring Parquet Modular Encryption
968    ///
969    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
970    pub struct ParquetEncryptionOptions {
971        /// Optional file decryption properties
972        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
973
974        /// Optional file encryption properties
975        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
976
977        /// Identifier for the encryption factory to use to create file encryption and decryption properties.
978        /// Encryption factories can be registered in the runtime environment with
979        /// `RuntimeEnv::register_parquet_encryption_factory`.
980        pub factory_id: Option<String>, default = None
981
982        /// Any encryption factory specific options
983        pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
984    }
985}
986
987impl ParquetEncryptionOptions {
988    /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
989    pub fn configure_factory(
990        &mut self,
991        factory_id: &str,
992        config: &impl ExtensionOptions,
993    ) {
994        self.factory_id = Some(factory_id.to_owned());
995        self.factory_options.options.clear();
996        for entry in config.entries() {
997            if let Some(value) = entry.value {
998                self.factory_options.options.insert(entry.key, value);
999            }
1000        }
1001    }
1002}
1003
1004config_namespace! {
1005    /// Options related to query optimization
1006    ///
1007    /// See also: [`SessionConfig`]
1008    ///
1009    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
1010    pub struct OptimizerOptions {
1011        /// When set to true, the optimizer will push a limit operation into
1012        /// grouped aggregations which have no aggregate expressions, as a soft limit,
1013        /// emitting groups once the limit is reached, before all rows in the group are read.
1014        pub enable_distinct_aggregation_soft_limit: bool, default = true
1015
1016        /// When set to true, the physical plan optimizer will try to add round robin
1017        /// repartitioning to increase parallelism to leverage more CPU cores
1018        pub enable_round_robin_repartition: bool, default = true
1019
1020        /// When set to true, the optimizer will attempt to perform limit operations
1021        /// during aggregations, if possible
1022        pub enable_topk_aggregation: bool, default = true
1023
1024        /// When set to true, the optimizer will attempt to push limit operations
1025        /// past window functions, if possible
1026        pub enable_window_limits: bool, default = true
1027
1028        /// When set to true, the optimizer will replace
1029        /// Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a
1030        /// PartitionedTopKExec that maintains per-partition heaps, avoiding
1031        /// a full sort of the input.
1032        /// When the window partition key has low cardinality, enabling this optimization
1033        /// can improve performance. However, for high cardinality keys, it may
1034        /// cause regressions in both memory usage and runtime.
1035        pub enable_window_topn: bool, default = false
1036
1037        /// When set to true, the optimizer will push TopK (Sort with fetch)
1038        /// below hash repartition when the partition key is a prefix of the
1039        /// sort key, reducing data volume before the shuffle.
1040        pub enable_topk_repartition: bool, default = true
1041
1042        /// When set to true, the optimizer will attempt to push down TopK dynamic filters
1043        /// into the file scan phase.
1044        pub enable_topk_dynamic_filter_pushdown: bool, default = true
1045
1046        /// When set to true, uncorrelated scalar subqueries are
1047        /// left in the logical plan and executed by `ScalarSubqueryExec` during
1048        /// physical execution. When set to false, all scalar subqueries
1049        /// (including uncorrelated ones) are rewritten to left joins by the
1050        /// `ScalarSubqueryToJoin` optimizer rule.
1051        ///
1052        /// Note disabling this option is not recommended. It restores
1053        /// pre <https://github.com/apache/datafusion/pull/21240>
1054        /// behavior, which silently produces incorrect results for
1055        /// multi-row subqueries and does not support scalar subqueries in
1056        /// ORDER BY / JOIN ON / aggregate-function arguments. This option is
1057        /// intended as a temporary escape hatch for distributed execution
1058        /// frameworks and is planned to be removed in a future DataFusion
1059        /// release.
1060        pub enable_physical_uncorrelated_scalar_subquery: bool, default = true
1061
1062        /// When set to true, the optimizer will attempt to push down Join dynamic filters
1063        /// into the file scan phase.
1064        pub enable_join_dynamic_filter_pushdown: bool, default = true
1065
1066        /// When set to true, the optimizer will attempt to push down Aggregate dynamic filters
1067        /// into the file scan phase.
1068        pub enable_aggregate_dynamic_filter_pushdown: bool, default = true
1069
1070        /// When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase.
1071        /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
1072        /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
1073        /// This means that if we already have 10 timestamps in the year 2025
1074        /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
1075        /// The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown`
1076        /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
1077        pub enable_dynamic_filter_pushdown: bool, default = true
1078
1079        /// When set to true, the optimizer will insert filters before a join between
1080        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
1081        /// filter can add additional overhead when the file format does not fully support
1082        /// predicate push down.
1083        pub filter_null_join_keys: bool, default = false
1084
1085        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
1086        /// in parallel using the provided `target_partitions` level
1087        pub repartition_aggregations: bool, default = true
1088
1089        /// Minimum total files size in bytes to perform file scan repartitioning.
1090        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
1091
1092        /// Should DataFusion repartition data using the join keys to execute joins in parallel
1093        /// using the provided `target_partitions` level
1094        pub repartition_joins: bool, default = true
1095
1096        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
1097        /// its inputs do not have any ordering or filtering If the flag is not enabled,
1098        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
1099        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
1100        /// RightAnti, and RightSemi - being produced only at the end of the execution.
1101        /// This is not typical in stream processing. Additionally, without proper design for
1102        /// long runner execution, all types of joins may encounter out-of-memory errors.
1103        pub allow_symmetric_joins_without_pruning: bool, default = true
1104
1105        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
1106        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
1107        ///
1108        /// For FileSources, only Parquet and CSV formats are currently supported.
1109        ///
1110        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
1111        /// might be partitioned into smaller chunks) for parallel scanning.
1112        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
1113        /// happen within a single file.
1114        ///
1115        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
1116        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
1117        /// the total number of partitions and batches per partition, but does not slice the initial
1118        /// record tables provided to the MemTable on creation.
1119        pub repartition_file_scans: bool, default = true
1120
1121        /// Minimum number of distinct partition values required to group files by their
1122        /// Hive partition column values (enabling Hash partitioning declaration).
1123        ///
1124        /// How the option is used:
1125        ///     - preserve_file_partitions=0: Disable it.
1126        ///     - preserve_file_partitions=1: Always enable it.
1127        ///     - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N.
1128        ///     This threshold preserves I/O parallelism when file partitioning is below it.
1129        ///
1130        /// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
1131        /// partitions is less than the target_partitions.
1132        pub preserve_file_partitions: usize, default = 0
1133
1134        /// Should DataFusion repartition data using the partitions keys to execute window
1135        /// functions in parallel using the provided `target_partitions` level
1136        pub repartition_windows: bool, default = true
1137
1138        /// Should DataFusion execute sorts in a per-partition fashion and merge
1139        /// afterwards instead of coalescing first and sorting globally.
1140        /// With this flag is enabled, plans in the form below
1141        ///
1142        /// ```text
1143        ///      "SortExec: [a@0 ASC]",
1144        ///      "  CoalescePartitionsExec",
1145        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1146        /// ```
1147        /// would turn into the plan below which performs better in multithreaded environments
1148        ///
1149        /// ```text
1150        ///      "SortPreservingMergeExec: [a@0 ASC]",
1151        ///      "  SortExec: [a@0 ASC]",
1152        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1153        /// ```
1154        pub repartition_sorts: bool, default = true
1155
1156        /// Partition count threshold for subset satisfaction optimization.
1157        ///
1158        /// When the current partition count is >= this threshold, DataFusion will
1159        /// skip repartitioning if the required partitioning expression is a subset
1160        /// of the current partition expression such as Hash(a) satisfies Hash(a, b).
1161        ///
1162        /// When the current partition count is < this threshold, DataFusion will
1163        /// repartition to increase parallelism even when subset satisfaction applies.
1164        ///
1165        /// Set to 0 to always repartition (disable subset satisfaction optimization).
1166        /// Set to a high value to always use subset satisfaction.
1167        ///
1168        /// Example (subset_repartition_threshold = 4):
1169        /// ```text
1170        ///     Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a])
1171        ///
1172        ///     If current partitions (3) < threshold (4), repartition:
1173        ///     AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
1174        ///       RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3
1175        ///         AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
1176        ///           DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3)
1177        ///
1178        ///     If current partitions (8) >= threshold (4), use subset satisfaction:
1179        ///     AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)]
1180        ///       DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8)
1181        /// ```
1182        pub subset_repartition_threshold: usize, default = 4
1183
1184        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
1185        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
1186        /// using `SortPreservingMergeExec`)
1187        ///
1188        /// When false, DataFusion will maximize plan parallelism using
1189        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
1190        pub prefer_existing_sort: bool, default = false
1191
1192        /// When set to true, the logical plan optimizer will produce warning
1193        /// messages if any optimization rules produce errors and then proceed to the next
1194        /// rule. When set to false, any rules that produce errors will cause the query to fail
1195        pub skip_failed_rules: bool, default = false
1196
1197        /// Number of times that the optimizer will attempt to optimize the plan
1198        pub max_passes: usize, default = 3
1199
1200        /// When set to true, the physical plan optimizer will run a top down
1201        /// process to reorder the join keys
1202        pub top_down_join_key_reordering: bool, default = true
1203
1204        /// When set to true, the physical plan optimizer may swap join inputs
1205        /// based on statistics. When set to false, statistics-driven join
1206        /// input reordering is disabled and the original join order in the
1207        /// query is used.
1208        pub join_reordering: bool, default = true
1209
1210        /// When set to true, the physical plan optimizer uses the pluggable
1211        /// `StatisticsRegistry` for statistics propagation across operators.
1212        /// This enables more accurate cardinality estimates compared to each
1213        /// operator's built-in `partition_statistics`.
1214        pub use_statistics_registry: bool, default = false
1215
1216        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
1217        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
1218        pub prefer_hash_join: bool, default = true
1219
1220        /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
1221        /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
1222        /// one range filter.
1223        pub enable_piecewise_merge_join: bool, default = false
1224
1225        /// The maximum estimated size in bytes for one input side of a HashJoin
1226        /// will be collected into a single partition
1227        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
1228
1229        /// The maximum estimated size in rows for one input side of a HashJoin
1230        /// will be collected into a single partition
1231        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
1232
1233        /// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1234        /// Build sides larger than this will use hash table lookups instead.
1235        /// Set to 0 to always use hash table lookups.
1236        ///
1237        /// InList pushdown can be more efficient for small build sides because it can result in better
1238        /// statistics pruning as well as use any bloom filters present on the scan side.
1239        /// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
1240        /// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
1241        ///
1242        /// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
1243        ///
1244        /// The default is 128kB per partition.
1245        /// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
1246        /// but avoids excessive memory usage or overhead for larger joins.
1247        pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
1248
1249        /// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1250        /// Build sides with more rows than this will use hash table lookups instead.
1251        /// Set to 0 to always use hash table lookups.
1252        ///
1253        /// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
1254        /// very large IN lists that might not provide much benefit over hash table lookups.
1255        ///
1256        /// This uses the deduplicated row count once the build side has been evaluated.
1257        ///
1258        /// The default is 150 values per partition.
1259        /// This is inspired by Trino's `max-filter-keys-per-column` setting.
1260        /// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1261        pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1262
1263        /// The default filter selectivity used by Filter Statistics
1264        /// when an exact selectivity cannot be determined. Valid values are
1265        /// between 0 (no selectivity) and 100 (all rows are selected).
1266        pub default_filter_selectivity: u8, default = 20
1267
1268        /// When set to true, the optimizer will not attempt to convert Union to Interleave
1269        pub prefer_existing_union: bool, default = false
1270
1271        /// When set to true, if the returned type is a view type
1272        /// then the output will be coerced to a non-view.
1273        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
1274        pub expand_views_at_output: bool, default = false
1275
1276        /// Enable sort pushdown optimization.
1277        /// When enabled, attempts to push sort requirements down to data sources
1278        /// that can natively handle them (e.g., by reversing file/row group read order).
1279        ///
1280        /// Returns **inexact ordering**: Sort operator is kept for correctness,
1281        /// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N),
1282        /// providing significant speedup.
1283        ///
1284        /// Memory: No additional overhead (only changes read order).
1285        ///
1286        /// Future: Will add option to detect perfectly sorted data and eliminate Sort completely.
1287        ///
1288        /// Default: true
1289        pub enable_sort_pushdown: bool, default = true
1290
1291        /// When set to true, the optimizer will extract leaf expressions
1292        /// (such as `get_field`) from filter/sort/join nodes into projections
1293        /// closer to the leaf table scans, and push those projections down
1294        /// towards the leaf nodes.
1295        pub enable_leaf_expression_pushdown: bool, default = true
1296
1297        /// When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that
1298        /// read from the same source and differ only by filter predicates into a single branch
1299        /// with a combined filter. This optimization is conservative and only applies when the
1300        /// branches share the same source and compatible wrapper nodes such as identical
1301        /// projections or aliases.
1302        pub enable_unions_to_filter: bool, default = false
1303    }
1304}
1305
1306config_namespace! {
1307    /// Options controlling explain output
1308    ///
1309    /// See also: [`SessionConfig`]
1310    ///
1311    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
1312    pub struct ExplainOptions {
1313        /// When set to true, the explain statement will only print logical plans
1314        pub logical_plan_only: bool, default = false
1315
1316        /// When set to true, the explain statement will only print physical plans
1317        pub physical_plan_only: bool, default = false
1318
1319        /// When set to true, the explain statement will print operator statistics
1320        /// for physical plans
1321        pub show_statistics: bool, default = false
1322
1323        /// When set to true, the explain statement will print the partition sizes
1324        pub show_sizes: bool, default = true
1325
1326        /// When set to true, the explain statement will print schema information
1327        pub show_schema: bool, default = false
1328
1329        /// Display format of explain. Default is "indent".
1330        /// When set to "tree", it will print the plan in a tree-rendered format.
1331        pub format: ExplainFormat, default = ExplainFormat::Indent
1332
1333        /// (format=tree only) Maximum total width of the rendered tree.
1334        /// When set to 0, the tree will have no width limit.
1335        pub tree_maximum_render_width: usize, default = 240
1336
1337        /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1338        /// "summary" shows common metrics for high-level insights.
1339        /// "dev" provides deep operator-level introspection for developers.
1340        pub analyze_level: MetricType, default = MetricType::Dev
1341
1342        /// Which metric categories to include in "EXPLAIN ANALYZE" output.
1343        /// Comma-separated list of: "rows", "bytes", "timing", "uncategorized".
1344        /// Use "none" to show plan structure only, or "all" (default) to show everything.
1345        /// Metrics without a declared category are treated as "uncategorized".
1346        pub analyze_categories: ExplainAnalyzeCategories, default = ExplainAnalyzeCategories::All
1347    }
1348}
1349
1350impl ExecutionOptions {
1351    /// Returns the correct parallelism based on the provided `value`.
1352    /// If `value` is `"0"`, returns the default available parallelism, computed with
1353    /// `get_available_parallelism`. Otherwise, returns `value`.
1354    fn normalized_parallelism(value: &str) -> String {
1355        if value.parse::<usize>() == Ok(0) {
1356            get_available_parallelism().to_string()
1357        } else {
1358            value.to_owned()
1359        }
1360    }
1361}
1362
1363config_namespace! {
1364    /// Options controlling the format of output when printing record batches
1365    /// Copies [`arrow::util::display::FormatOptions`]
1366    pub struct FormatOptions {
1367        /// If set to `true` any formatting errors will be written to the output
1368        /// instead of being converted into a [`std::fmt::Error`]
1369        pub safe: bool, default = true
1370        /// Format string for nulls
1371        pub null: String, default = "".into()
1372        /// Date format for date arrays
1373        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1374        /// Format for DateTime arrays
1375        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1376        /// Timestamp format for timestamp arrays
1377        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1378        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1379        pub timestamp_tz_format: Option<String>, default = None
1380        /// Time format for time arrays
1381        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1382        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1383        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1384        /// Show types in visual representation batches
1385        pub types_info: bool, default = false
1386    }
1387}
1388
1389impl<'a> TryFrom<&'a FormatOptions> for arrow::util::display::FormatOptions<'a> {
1390    type Error = DataFusionError;
1391    fn try_from(options: &'a FormatOptions) -> Result<Self> {
1392        let duration_format = match options.duration_format.as_str() {
1393            "pretty" => arrow::util::display::DurationFormat::Pretty,
1394            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1395            _ => {
1396                return _config_err!(
1397                    "Invalid duration format: {}. Valid values are pretty or iso8601",
1398                    options.duration_format
1399                );
1400            }
1401        };
1402
1403        Ok(Self::new()
1404            .with_display_error(options.safe)
1405            .with_null(&options.null)
1406            .with_date_format(options.date_format.as_deref())
1407            .with_datetime_format(options.datetime_format.as_deref())
1408            .with_timestamp_format(options.timestamp_format.as_deref())
1409            .with_timestamp_tz_format(options.timestamp_tz_format.as_deref())
1410            .with_time_format(options.time_format.as_deref())
1411            .with_duration_format(duration_format)
1412            .with_types_info(options.types_info))
1413    }
1414}
1415
1416/// A key value pair, with a corresponding description
1417#[derive(Debug, Clone, Hash, PartialEq, Eq)]
1418pub struct ConfigEntry {
1419    /// A unique string to identify this config value
1420    pub key: String,
1421
1422    /// The value if any
1423    pub value: Option<String>,
1424
1425    /// A description of this configuration entry
1426    pub description: &'static str,
1427}
1428
1429/// Configuration options struct, able to store both built-in configuration and custom options
1430#[derive(Debug, Clone, Default)]
1431#[non_exhaustive]
1432pub struct ConfigOptions {
1433    /// Catalog options
1434    pub catalog: CatalogOptions,
1435    /// Execution options
1436    pub execution: ExecutionOptions,
1437    /// Optimizer options
1438    pub optimizer: OptimizerOptions,
1439    /// SQL parser options
1440    pub sql_parser: SqlParserOptions,
1441    /// Explain options
1442    pub explain: ExplainOptions,
1443    /// Optional extensions registered using [`Extensions::insert`]
1444    pub extensions: Extensions,
1445    /// Formatting options when printing batches
1446    pub format: FormatOptions,
1447}
1448
1449impl ConfigField for ConfigOptions {
1450    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1451        self.catalog.visit(v, "datafusion.catalog", "");
1452        self.execution.visit(v, "datafusion.execution", "");
1453        self.optimizer.visit(v, "datafusion.optimizer", "");
1454        self.explain.visit(v, "datafusion.explain", "");
1455        self.sql_parser.visit(v, "datafusion.sql_parser", "");
1456        self.format.visit(v, "datafusion.format", "");
1457    }
1458
1459    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1460        // Extensions are handled in the public `ConfigOptions::set`
1461        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1462        match key {
1463            "catalog" => self.catalog.set(rem, value),
1464            "execution" => self.execution.set(rem, value),
1465            "optimizer" => self.optimizer.set(rem, value),
1466            "explain" => self.explain.set(rem, value),
1467            "sql_parser" => self.sql_parser.set(rem, value),
1468            "format" => self.format.set(rem, value),
1469            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1470        }
1471    }
1472
1473    /// Reset a configuration option back to its default value
1474    fn reset(&mut self, key: &str) -> Result<()> {
1475        let Some((prefix, rest)) = key.split_once('.') else {
1476            return _config_err!("could not find config namespace for key \"{key}\"");
1477        };
1478
1479        if prefix != "datafusion" {
1480            return _config_err!("Could not find config namespace \"{prefix}\"");
1481        }
1482
1483        let (section, rem) = rest.split_once('.').unwrap_or((rest, ""));
1484        if rem.is_empty() {
1485            return _config_err!("could not find config field for key \"{key}\"");
1486        }
1487
1488        match section {
1489            "catalog" => self.catalog.reset(rem),
1490            "execution" => self.execution.reset(rem),
1491            "optimizer" => {
1492                if rem == "enable_dynamic_filter_pushdown" {
1493                    let defaults = OptimizerOptions::default();
1494                    self.optimizer.enable_dynamic_filter_pushdown =
1495                        defaults.enable_dynamic_filter_pushdown;
1496                    self.optimizer.enable_topk_dynamic_filter_pushdown =
1497                        defaults.enable_topk_dynamic_filter_pushdown;
1498                    self.optimizer.enable_join_dynamic_filter_pushdown =
1499                        defaults.enable_join_dynamic_filter_pushdown;
1500                    Ok(())
1501                } else {
1502                    self.optimizer.reset(rem)
1503                }
1504            }
1505            "explain" => self.explain.reset(rem),
1506            "sql_parser" => self.sql_parser.reset(rem),
1507            "format" => self.format.reset(rem),
1508            other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
1509        }
1510    }
1511}
1512
1513/// This namespace is reserved for interacting with Foreign Function Interface
1514/// (FFI) based configuration extensions.
1515pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi";
1516
1517impl ConfigOptions {
1518    /// Creates a new [`ConfigOptions`] with default values
1519    pub fn new() -> Self {
1520        Self::default()
1521    }
1522
1523    /// Set extensions to provided value
1524    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1525        self.extensions = extensions;
1526        self
1527    }
1528
1529    /// Set a configuration option
1530    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1531        let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
1532            return _config_err!("could not find config namespace for key \"{key}\"");
1533        };
1534
1535        if prefix == "datafusion" {
1536            if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
1537                let bool_value = value.parse::<bool>().map_err(|e| {
1538                    DataFusionError::Configuration(format!(
1539                        "Failed to parse '{value}' as bool: {e}",
1540                    ))
1541                })?;
1542
1543                {
1544                    self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1545                    self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1546                    self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1547                    self.optimizer.enable_aggregate_dynamic_filter_pushdown = bool_value;
1548                }
1549                return Ok(());
1550            }
1551            return ConfigField::set(self, inner_key, value);
1552        }
1553
1554        if !self.extensions.0.contains_key(prefix)
1555            && self
1556                .extensions
1557                .0
1558                .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
1559        {
1560            inner_key = key;
1561            prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
1562        }
1563
1564        let Some(e) = self.extensions.0.get_mut(prefix) else {
1565            return _config_err!("Could not find config namespace \"{prefix}\"");
1566        };
1567        e.0.set(inner_key, value)
1568    }
1569
1570    /// Create new [`ConfigOptions`], taking values from environment variables
1571    /// where possible.
1572    ///
1573    /// For example, to configure `datafusion.execution.batch_size`
1574    /// ([`ExecutionOptions::batch_size`]) you would set the
1575    /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1576    ///
1577    /// The name of the environment variable is the option's key, transformed to
1578    /// uppercase and with periods replaced with underscores.
1579    ///
1580    /// Values are parsed according to the [same rules used in casts from
1581    /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1582    ///
1583    /// If the value in the environment variable cannot be cast to the type of
1584    /// the configuration option, the default value will be used instead and a
1585    /// warning emitted. Environment variables are read when this method is
1586    /// called, and are not re-read later.
1587    pub fn from_env() -> Result<Self> {
1588        struct Visitor(Vec<String>);
1589
1590        impl Visit for Visitor {
1591            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1592                self.0.push(key.to_string())
1593            }
1594
1595            fn none(&mut self, key: &str, _: &'static str) {
1596                self.0.push(key.to_string())
1597            }
1598        }
1599
1600        // Extract the names of all fields and then look up the corresponding
1601        // environment variables. This isn't hugely efficient but avoids
1602        // ambiguity between `a.b` and `a_b` which would both correspond
1603        // to an environment variable of `A_B`
1604
1605        let mut keys = Visitor(vec![]);
1606        let mut ret = Self::default();
1607        ret.visit(&mut keys, "datafusion", "");
1608
1609        for key in keys.0 {
1610            let env = key.to_uppercase().replace('.', "_");
1611            if let Some(var) = std::env::var_os(env) {
1612                let value = var.to_string_lossy();
1613                log::info!("Set {key} to {value} from the environment variable");
1614                ret.set(&key, value.as_ref())?;
1615            }
1616        }
1617
1618        Ok(ret)
1619    }
1620
1621    /// Create new ConfigOptions struct, taking values from a string hash map.
1622    ///
1623    /// Only the built-in configurations will be extracted from the hash map
1624    /// and other key value pairs will be ignored.
1625    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1626        struct Visitor(Vec<String>);
1627
1628        impl Visit for Visitor {
1629            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1630                self.0.push(key.to_string())
1631            }
1632
1633            fn none(&mut self, key: &str, _: &'static str) {
1634                self.0.push(key.to_string())
1635            }
1636        }
1637
1638        let mut keys = Visitor(vec![]);
1639        let mut ret = Self::default();
1640        ret.visit(&mut keys, "datafusion", "");
1641
1642        for key in keys.0 {
1643            if let Some(var) = settings.get(&key) {
1644                ret.set(&key, var)?;
1645            }
1646        }
1647
1648        Ok(ret)
1649    }
1650
1651    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1652    pub fn entries(&self) -> Vec<ConfigEntry> {
1653        struct Visitor(Vec<ConfigEntry>);
1654
1655        impl Visit for Visitor {
1656            fn some<V: Display>(
1657                &mut self,
1658                key: &str,
1659                value: V,
1660                description: &'static str,
1661            ) {
1662                self.0.push(ConfigEntry {
1663                    key: key.to_string(),
1664                    value: Some(value.to_string()),
1665                    description,
1666                })
1667            }
1668
1669            fn none(&mut self, key: &str, description: &'static str) {
1670                self.0.push(ConfigEntry {
1671                    key: key.to_string(),
1672                    value: None,
1673                    description,
1674                })
1675            }
1676        }
1677
1678        let mut v = Visitor(vec![]);
1679        self.visit(&mut v, "datafusion", "");
1680
1681        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1682        v.0
1683    }
1684
1685    /// Generate documentation that can be included in the user guide
1686    pub fn generate_config_markdown() -> String {
1687        use std::fmt::Write as _;
1688
1689        let mut s = Self::default();
1690
1691        // Normalize for display
1692        s.execution.target_partitions = 0;
1693        s.execution.planning_concurrency = 0;
1694
1695        let mut docs = "| key | default | description |\n".to_string();
1696        docs += "|-----|---------|-------------|\n";
1697        let mut entries = s.entries();
1698        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1699
1700        for entry in s.entries() {
1701            let _ = writeln!(
1702                &mut docs,
1703                "| {} | {} | {} |",
1704                entry.key,
1705                entry.value.as_deref().unwrap_or("NULL"),
1706                entry.description
1707            );
1708        }
1709        docs
1710    }
1711}
1712
1713/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1714/// within DataFusion [`ConfigOptions`]
1715///
1716/// This mechanism can be used to pass configuration to user defined functions
1717/// or optimizer passes
1718///
1719/// # Example
1720/// ```
1721/// use datafusion_common::{
1722///     config::ConfigExtension, config::ConfigOptions, extensions_options,
1723/// };
1724/// // Define a new configuration struct using the `extensions_options` macro
1725/// extensions_options! {
1726///    /// My own config options.
1727///    pub struct MyConfig {
1728///        /// Should "foo" be replaced by "bar"?
1729///        pub foo_to_bar: bool, default = true
1730///
1731///        /// How many "baz" should be created?
1732///        pub baz_count: usize, default = 1337
1733///    }
1734/// }
1735///
1736/// impl ConfigExtension for MyConfig {
1737///     const PREFIX: &'static str = "my_config";
1738/// }
1739///
1740/// // set up config struct and register extension
1741/// let mut config = ConfigOptions::default();
1742/// config.extensions.insert(MyConfig::default());
1743///
1744/// // overwrite config default
1745/// config.set("my_config.baz_count", "42").unwrap();
1746///
1747/// // check config state
1748/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1749/// assert!(my_config.foo_to_bar,);
1750/// assert_eq!(my_config.baz_count, 42,);
1751/// ```
1752///
1753/// # Note:
1754/// Unfortunately associated constants are not currently object-safe, and so this
1755/// extends the object-safe [`ExtensionOptions`]
1756pub trait ConfigExtension: ExtensionOptions {
1757    /// Configuration namespace prefix to use
1758    ///
1759    /// All values under this will be prefixed with `$PREFIX + "."`
1760    const PREFIX: &'static str;
1761}
1762
1763/// An object-safe API for storing arbitrary configuration.
1764///
1765/// See [`ConfigExtension`] for user defined configuration
1766pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1767    /// Return `self` as [`Any`]
1768    ///
1769    /// This is needed until trait upcasting is stabilized
1770    fn as_any(&self) -> &dyn Any;
1771
1772    /// Return `self` as [`Any`]
1773    ///
1774    /// This is needed until trait upcasting is stabilized
1775    fn as_any_mut(&mut self) -> &mut dyn Any;
1776
1777    /// Return a deep clone of this [`ExtensionOptions`]
1778    ///
1779    /// It is important this does not share mutable state to avoid consistency issues
1780    /// with configuration changing whilst queries are executing
1781    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1782
1783    /// Set the given `key`, `value` pair
1784    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1785
1786    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1787    fn entries(&self) -> Vec<ConfigEntry>;
1788}
1789
1790/// A type-safe container for [`ConfigExtension`]
1791#[derive(Debug, Default, Clone)]
1792pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1793
1794impl Extensions {
1795    /// Create a new, empty [`Extensions`]
1796    pub fn new() -> Self {
1797        Self(BTreeMap::new())
1798    }
1799
1800    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1801    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1802        assert_ne!(T::PREFIX, "datafusion");
1803        let e = ExtensionBox(Box::new(extension));
1804        self.0.insert(T::PREFIX, e);
1805    }
1806
1807    /// Retrieves the extension of the given type if any
1808    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1809        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1810    }
1811
1812    /// Retrieves the extension of the given type if any
1813    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1814        let e = self.0.get_mut(T::PREFIX)?;
1815        e.0.as_any_mut().downcast_mut()
1816    }
1817
1818    /// Iterates all the config extension entries yielding their prefix and their
1819    /// [ExtensionOptions] implementation.
1820    pub fn iter(
1821        &self,
1822    ) -> impl Iterator<Item = (&'static str, &Box<dyn ExtensionOptions>)> {
1823        self.0.iter().map(|(k, v)| (*k, &v.0))
1824    }
1825}
1826
1827#[derive(Debug)]
1828struct ExtensionBox(Box<dyn ExtensionOptions>);
1829
1830impl Clone for ExtensionBox {
1831    fn clone(&self) -> Self {
1832        Self(self.0.cloned())
1833    }
1834}
1835
1836/// A trait implemented by `config_namespace` and for field types that provides
1837/// the ability to walk and mutate the configuration tree
1838pub trait ConfigField {
1839    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1840
1841    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1842
1843    fn reset(&mut self, key: &str) -> Result<()> {
1844        _config_err!("Reset is not supported for this config field, key: {}", key)
1845    }
1846}
1847
1848impl<F: ConfigField + Default> ConfigField for Option<F> {
1849    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1850        match self {
1851            Some(s) => s.visit(v, key, description),
1852            None => v.none(key, description),
1853        }
1854    }
1855
1856    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1857        self.get_or_insert_with(Default::default).set(key, value)
1858    }
1859
1860    fn reset(&mut self, key: &str) -> Result<()> {
1861        if key.is_empty() {
1862            *self = Default::default();
1863            Ok(())
1864        } else {
1865            self.get_or_insert_with(Default::default).reset(key)
1866        }
1867    }
1868}
1869
1870/// Default transformation to parse a [`ConfigField`] for a string.
1871///
1872/// This uses [`FromStr`] to parse the data.
1873pub fn default_config_transform<T>(input: &str) -> Result<T>
1874where
1875    T: FromStr,
1876    <T as FromStr>::Err: Sync + Send + Error + 'static,
1877{
1878    input.parse().map_err(|e| {
1879        DataFusionError::Context(
1880            format!(
1881                "Error parsing '{}' as {}",
1882                input,
1883                std::any::type_name::<T>()
1884            ),
1885            Box::new(DataFusionError::External(Box::new(e))),
1886        )
1887    })
1888}
1889
1890/// Macro that generates [`ConfigField`] for a given type.
1891///
1892/// # Usage
1893/// This always requires [`Display`] to be implemented for the given type.
1894///
1895/// There are two ways to invoke this macro. The first one uses
1896/// [`default_config_transform`]/[`FromStr`] to parse the data:
1897///
1898/// ```ignore
1899/// config_field(MyType);
1900/// ```
1901///
1902/// Note that the parsing error MUST implement [`std::error::Error`]!
1903///
1904/// Or you can specify how you want to parse an [`str`] into the type:
1905///
1906/// ```ignore
1907/// fn parse_it(s: &str) -> Result<MyType> {
1908///     ...
1909/// }
1910///
1911/// config_field(
1912///     MyType,
1913///     value => parse_it(value)
1914/// );
1915/// ```
1916#[macro_export]
1917macro_rules! config_field {
1918    ($t:ty) => {
1919        config_field!($t, value => $crate::config::default_config_transform(value)?);
1920    };
1921
1922    ($t:ty, $arg:ident => $transform:expr) => {
1923        impl $crate::config::ConfigField for $t {
1924            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1925                v.some(key, self, description)
1926            }
1927
1928            fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1929                *self = $transform;
1930                Ok(())
1931            }
1932
1933            fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
1934                if key.is_empty() {
1935                    *self = <$t as Default>::default();
1936                    Ok(())
1937                } else {
1938                    $crate::error::_config_err!(
1939                        "Config field is a scalar {} and does not have nested field \"{}\"",
1940                        stringify!($t),
1941                        key
1942                    )
1943                }
1944            }
1945        }
1946    };
1947}
1948
1949config_field!(String);
1950config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1951config_field!(usize);
1952config_field!(f64);
1953config_field!(u64);
1954config_field!(u32);
1955config_field!(i32);
1956
1957impl ConfigField for u8 {
1958    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1959        v.some(key, self, description)
1960    }
1961
1962    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1963        if value.is_empty() {
1964            return Err(DataFusionError::Configuration(format!(
1965                "Input string for {key} key is empty"
1966            )));
1967        }
1968        // Check if the string is a valid number
1969        if let Ok(num) = value.parse::<u8>() {
1970            // TODO: Let's decide how we treat the numerical strings.
1971            *self = num;
1972        } else {
1973            let bytes = value.as_bytes();
1974            // Check if the first character is ASCII (single byte)
1975            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1976                return Err(DataFusionError::Configuration(format!(
1977                    "Error parsing {value} as u8. Non-ASCII string provided"
1978                )));
1979            }
1980            *self = bytes[0];
1981        }
1982        Ok(())
1983    }
1984}
1985
1986impl ConfigField for CompressionTypeVariant {
1987    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1988        v.some(key, self, description)
1989    }
1990
1991    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1992        *self = CompressionTypeVariant::from_str(value)?;
1993        Ok(())
1994    }
1995}
1996
1997impl ConfigField for CsvQuoteStyle {
1998    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1999        v.some(key, self, description)
2000    }
2001
2002    fn set(&mut self, _: &str, value: &str) -> Result<()> {
2003        *self = CsvQuoteStyle::from_str(value)?;
2004        Ok(())
2005    }
2006}
2007
2008/// An implementation trait used to recursively walk configuration
2009pub trait Visit {
2010    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
2011
2012    fn none(&mut self, key: &str, description: &'static str);
2013}
2014
2015/// Convenience macro to create [`ExtensionsOptions`].
2016///
2017/// The created structure implements the following traits:
2018///
2019/// - [`Clone`]
2020/// - [`Debug`]
2021/// - [`Default`]
2022/// - [`ExtensionOptions`]
2023///
2024/// # Usage
2025/// The syntax is:
2026///
2027/// ```text
2028/// extensions_options! {
2029///      /// Struct docs (optional).
2030///     [<vis>] struct <StructName> {
2031///         /// Field docs (optional)
2032///         [<vis>] <field_name>: <field_type>, default = <default_value>
2033///
2034///         ... more fields
2035///     }
2036/// }
2037/// ```
2038///
2039/// The placeholders are:
2040/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
2041/// - `<StructName>`: Struct name like `MyStruct`.
2042/// - `<field_name>`: Field name like `my_field`.
2043/// - `<field_type>`: Field type like `u8`.
2044/// - `<default_value>`: Default value matching the field type like `42`.
2045///
2046/// # Example
2047/// See also a full example on the [`ConfigExtension`] documentation
2048///
2049/// ```
2050/// use datafusion_common::extensions_options;
2051///
2052/// extensions_options! {
2053///     /// My own config options.
2054///     pub struct MyConfig {
2055///         /// Should "foo" be replaced by "bar"?
2056///         pub foo_to_bar: bool, default = true
2057///
2058///         /// How many "baz" should be created?
2059///         pub baz_count: usize, default = 1337
2060///     }
2061/// }
2062/// ```
2063///
2064///
2065/// [`Debug`]: std::fmt::Debug
2066/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
2067#[macro_export]
2068macro_rules! extensions_options {
2069    (
2070     $(#[doc = $struct_d:tt])*
2071     $vis:vis struct $struct_name:ident {
2072        $(
2073        $(#[doc = $d:tt])*
2074        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
2075        )*$(,)*
2076    }
2077    ) => {
2078        $(#[doc = $struct_d])*
2079        #[derive(Debug, Clone)]
2080        #[non_exhaustive]
2081        $vis struct $struct_name{
2082            $(
2083            $(#[doc = $d])*
2084            $field_vis $field_name : $field_type,
2085            )*
2086        }
2087
2088        impl Default for $struct_name {
2089            fn default() -> Self {
2090                Self {
2091                    $($field_name: $default),*
2092                }
2093            }
2094        }
2095
2096        impl $crate::config::ExtensionOptions for $struct_name {
2097            fn as_any(&self) -> &dyn ::std::any::Any {
2098                self
2099            }
2100
2101            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
2102                self
2103            }
2104
2105            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
2106                Box::new(self.clone())
2107            }
2108
2109            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
2110                $crate::config::ConfigField::set(self, key, value)
2111            }
2112
2113            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
2114                struct Visitor(Vec<$crate::config::ConfigEntry>);
2115
2116                impl $crate::config::Visit for Visitor {
2117                    fn some<V: std::fmt::Display>(
2118                        &mut self,
2119                        key: &str,
2120                        value: V,
2121                        description: &'static str,
2122                    ) {
2123                        self.0.push($crate::config::ConfigEntry {
2124                            key: key.to_string(),
2125                            value: Some(value.to_string()),
2126                            description,
2127                        })
2128                    }
2129
2130                    fn none(&mut self, key: &str, description: &'static str) {
2131                        self.0.push($crate::config::ConfigEntry {
2132                            key: key.to_string(),
2133                            value: None,
2134                            description,
2135                        })
2136                    }
2137                }
2138
2139                let mut v = Visitor(vec![]);
2140                // The prefix is not used for extensions.
2141                // The description is generated in ConfigField::visit.
2142                // We can just pass empty strings here.
2143                $crate::config::ConfigField::visit(self, &mut v, "", "");
2144                v.0
2145            }
2146        }
2147
2148        impl $crate::config::ConfigField for $struct_name {
2149            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
2150                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2151                match key {
2152                    $(
2153                        stringify!($field_name) => {
2154                            // Safely apply deprecated attribute if present
2155                            // $(#[allow(deprecated)])?
2156                            {
2157                                            self.$field_name.set(rem, value.as_ref())
2158                            }
2159                        },
2160                    )*
2161                    _ => return $crate::error::_config_err!(
2162                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2163                    )
2164                }
2165            }
2166
2167            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
2168                $(
2169                    let key = stringify!($field_name).to_string();
2170                    let desc = concat!($($d),*).trim();
2171                    self.$field_name.visit(v, key.as_str(), desc);
2172                )*
2173            }
2174        }
2175    }
2176}
2177
2178/// These file types have special built in behavior for configuration.
2179/// Use TableOptions::Extensions for configuring other file types.
2180#[derive(Debug, Clone)]
2181pub enum ConfigFileType {
2182    CSV,
2183    #[cfg(feature = "parquet")]
2184    PARQUET,
2185    JSON,
2186}
2187
2188/// Represents the configuration options available for handling different table formats within a data processing application.
2189/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
2190/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
2191#[derive(Debug, Clone, Default)]
2192pub struct TableOptions {
2193    /// Configuration options for CSV file handling. This includes settings like the delimiter,
2194    /// quote character, and whether the first row is considered as headers.
2195    pub csv: CsvOptions,
2196
2197    /// Configuration options for Parquet file handling. This includes settings for compression,
2198    /// encoding, and other Parquet-specific file characteristics.
2199    pub parquet: TableParquetOptions,
2200
2201    /// Configuration options for JSON file handling.
2202    pub json: JsonOptions,
2203
2204    /// The current file format that the table operations should assume. This option allows
2205    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
2206    pub current_format: Option<ConfigFileType>,
2207
2208    /// Optional extensions that can be used to extend or customize the behavior of the table
2209    /// options. Extensions can be registered using `Extensions::insert` and might include
2210    /// custom file handling logic, additional configuration parameters, or other enhancements.
2211    pub extensions: Extensions,
2212}
2213
2214impl ConfigField for TableOptions {
2215    /// Visits configuration settings for the current file format, or all formats if none is selected.
2216    ///
2217    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
2218    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
2219    /// it visits all available format settings.
2220    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
2221        if let Some(file_type) = &self.current_format {
2222            match file_type {
2223                #[cfg(feature = "parquet")]
2224                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
2225                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
2226                ConfigFileType::JSON => self.json.visit(v, "format", ""),
2227            }
2228        } else {
2229            self.csv.visit(v, "csv", "");
2230            self.parquet.visit(v, "parquet", "");
2231            self.json.visit(v, "json", "");
2232        }
2233    }
2234
2235    /// Sets a configuration value for a specific key within `TableOptions`.
2236    ///
2237    /// This method delegates setting configuration values to the specific file format configurations,
2238    /// based on the current format selected. If no format is selected, it returns an error.
2239    ///
2240    /// # Parameters
2241    ///
2242    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
2243    ///   for CSV format.
2244    /// * `value`: The value to set for the specified configuration key.
2245    ///
2246    /// # Returns
2247    ///
2248    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
2249    /// or if setting the configuration value fails for the specific format.
2250    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2251        // Extensions are handled in the public `ConfigOptions::set`
2252        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2253        match key {
2254            "format" => {
2255                let Some(format) = &self.current_format else {
2256                    return _config_err!("Specify a format for TableOptions");
2257                };
2258                match format {
2259                    #[cfg(feature = "parquet")]
2260                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
2261                    ConfigFileType::CSV => self.csv.set(rem, value),
2262                    ConfigFileType::JSON => self.json.set(rem, value),
2263                }
2264            }
2265            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
2266        }
2267    }
2268}
2269
2270impl TableOptions {
2271    /// Constructs a new instance of `TableOptions` with default settings.
2272    ///
2273    /// # Returns
2274    ///
2275    /// A new `TableOptions` instance with default configuration values.
2276    pub fn new() -> Self {
2277        Self::default()
2278    }
2279
2280    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
2281    ///
2282    /// # Parameters
2283    ///
2284    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
2285    ///
2286    /// # Returns
2287    ///
2288    /// A new `TableOptions` instance with settings applied from the session config.
2289    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
2290        let initial = TableOptions::default();
2291        initial.combine_with_session_config(config)
2292    }
2293
2294    /// Updates the current `TableOptions` with settings from a given session config.
2295    ///
2296    /// # Parameters
2297    ///
2298    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
2299    ///
2300    /// # Returns
2301    ///
2302    /// A new `TableOptions` instance with updated settings from the session config.
2303    #[must_use = "this method returns a new instance"]
2304    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
2305        let mut clone = self.clone();
2306        clone.parquet.global = config.execution.parquet.clone();
2307        clone
2308    }
2309
2310    /// Sets the file format for the table.
2311    ///
2312    /// # Parameters
2313    ///
2314    /// * `format`: The file format to use (e.g., CSV, Parquet).
2315    pub fn set_config_format(&mut self, format: ConfigFileType) {
2316        self.current_format = Some(format);
2317    }
2318
2319    /// Sets the extensions for this `TableOptions` instance.
2320    ///
2321    /// # Parameters
2322    ///
2323    /// * `extensions`: The `Extensions` instance to set.
2324    ///
2325    /// # Returns
2326    ///
2327    /// A new `TableOptions` instance with the specified extensions applied.
2328    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2329        self.extensions = extensions;
2330        self
2331    }
2332
2333    /// Sets a specific configuration option.
2334    ///
2335    /// # Parameters
2336    ///
2337    /// * `key`: The configuration key (e.g., "format.delimiter").
2338    /// * `value`: The value to set for the specified key.
2339    ///
2340    /// # Returns
2341    ///
2342    /// A result indicating success or failure in setting the configuration option.
2343    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2344        let Some((mut prefix, _)) = key.split_once('.') else {
2345            return _config_err!("could not find config namespace for key \"{key}\"");
2346        };
2347
2348        if prefix == "format" {
2349            return ConfigField::set(self, key, value);
2350        }
2351
2352        if prefix == "execution" {
2353            return Ok(());
2354        }
2355
2356        if !self.extensions.0.contains_key(prefix)
2357            && self
2358                .extensions
2359                .0
2360                .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
2361        {
2362            prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
2363        }
2364
2365        let Some(e) = self.extensions.0.get_mut(prefix) else {
2366            return _config_err!("Could not find config namespace \"{prefix}\"");
2367        };
2368        e.0.set(key, value)
2369    }
2370
2371    /// Initializes a new `TableOptions` from a hash map of string settings.
2372    ///
2373    /// # Parameters
2374    ///
2375    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2376    ///
2377    /// # Returns
2378    ///
2379    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
2380    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
2381        let mut ret = Self::default();
2382        for (k, v) in settings {
2383            ret.set(k, v)?;
2384        }
2385
2386        Ok(ret)
2387    }
2388
2389    /// Modifies the current `TableOptions` instance with settings from a hash map.
2390    ///
2391    /// # Parameters
2392    ///
2393    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2394    ///
2395    /// # Returns
2396    ///
2397    /// A result indicating success or failure in applying the settings.
2398    pub fn alter_with_string_hash_map(
2399        &mut self,
2400        settings: &HashMap<String, String>,
2401    ) -> Result<()> {
2402        for (k, v) in settings {
2403            self.set(k, v)?;
2404        }
2405        Ok(())
2406    }
2407
2408    /// Retrieves all configuration entries from this `TableOptions`.
2409    ///
2410    /// # Returns
2411    ///
2412    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
2413    pub fn entries(&self) -> Vec<ConfigEntry> {
2414        struct Visitor(Vec<ConfigEntry>);
2415
2416        impl Visit for Visitor {
2417            fn some<V: Display>(
2418                &mut self,
2419                key: &str,
2420                value: V,
2421                description: &'static str,
2422            ) {
2423                self.0.push(ConfigEntry {
2424                    key: key.to_string(),
2425                    value: Some(value.to_string()),
2426                    description,
2427                })
2428            }
2429
2430            fn none(&mut self, key: &str, description: &'static str) {
2431                self.0.push(ConfigEntry {
2432                    key: key.to_string(),
2433                    value: None,
2434                    description,
2435                })
2436            }
2437        }
2438
2439        let mut v = Visitor(vec![]);
2440        self.visit(&mut v, "format", "");
2441
2442        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
2443        v.0
2444    }
2445}
2446
2447/// Options that control how Parquet files are read, including global options
2448/// that apply to all columns and optional column-specific overrides
2449///
2450/// Closely tied to `ParquetWriterOptions` (see `crate::file_options::parquet_writer::ParquetWriterOptions` when the "parquet" feature is enabled).
2451/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2452/// (e.g. sorting_columns).
2453#[derive(Clone, Default, Debug, PartialEq)]
2454pub struct TableParquetOptions {
2455    /// Global Parquet options that propagates to all columns.
2456    pub global: ParquetOptions,
2457    /// Column specific options. Default usage is parquet.XX::column.
2458    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2459    /// Additional file-level metadata to include. Inserted into the key_value_metadata
2460    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2461    ///
2462    /// Multiple entries are permitted
2463    /// ```sql
2464    /// OPTIONS (
2465    ///    'format.metadata::key1' '',
2466    ///    'format.metadata::key2' 'value',
2467    ///    'format.metadata::key3' 'value has spaces',
2468    ///    'format.metadata::key4' 'value has special chars :: :',
2469    ///    'format.metadata::key_dupe' 'original will be overwritten',
2470    ///    'format.metadata::key_dupe' 'final'
2471    /// )
2472    /// ```
2473    pub key_value_metadata: HashMap<String, Option<String>>,
2474    /// Options for configuring Parquet modular encryption
2475    ///
2476    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2477    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2478    /// These can be set via 'format.crypto', for example:
2479    /// ```sql
2480    /// OPTIONS (
2481    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
2482    ///    'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435',  -- b"0123456789012345" */
2483    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2484    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2485    ///     -- Same for decryption
2486    ///    'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2487    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2488    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2489    /// )
2490    /// ```
2491    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2492    /// Note that keys must be provided as in hex format since these are binary strings.
2493    pub crypto: ParquetEncryptionOptions,
2494}
2495
2496impl TableParquetOptions {
2497    /// Return new default TableParquetOptions
2498    pub fn new() -> Self {
2499        Self::default()
2500    }
2501
2502    /// Set whether the encoding of the arrow metadata should occur
2503    /// during the writing of parquet.
2504    ///
2505    /// Default is to encode the arrow schema in the file kv_metadata.
2506    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2507        Self {
2508            global: ParquetOptions {
2509                skip_arrow_metadata: skip,
2510                ..self.global
2511            },
2512            ..self
2513        }
2514    }
2515
2516    /// Retrieves all configuration entries from this `TableParquetOptions`.
2517    ///
2518    /// # Returns
2519    ///
2520    /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2521    pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2522        struct Visitor(Vec<ConfigEntry>);
2523
2524        impl Visit for Visitor {
2525            fn some<V: Display>(
2526                &mut self,
2527                key: &str,
2528                value: V,
2529                description: &'static str,
2530            ) {
2531                self.0.push(ConfigEntry {
2532                    key: key[1..].to_string(),
2533                    value: Some(value.to_string()),
2534                    description,
2535                })
2536            }
2537
2538            fn none(&mut self, key: &str, description: &'static str) {
2539                self.0.push(ConfigEntry {
2540                    key: key[1..].to_string(),
2541                    value: None,
2542                    description,
2543                })
2544            }
2545        }
2546
2547        let mut v = Visitor(vec![]);
2548        self.visit(&mut v, "", "");
2549
2550        v.0
2551    }
2552}
2553
2554impl ConfigField for TableParquetOptions {
2555    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2556        self.global.visit(v, key_prefix, description);
2557        self.column_specific_options
2558            .visit(v, key_prefix, description);
2559        self.crypto
2560            .visit(v, &format!("{key_prefix}.crypto"), description);
2561    }
2562
2563    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2564        // Determine if the key is a global, metadata, or column-specific setting
2565        if key.starts_with("metadata::") {
2566            let k = match key.split("::").collect::<Vec<_>>()[..] {
2567                [_meta] | [_meta, ""] => {
2568                    return _config_err!(
2569                        "Invalid metadata key provided, missing key in metadata::<key>"
2570                    );
2571                }
2572                [_meta, k] => k.into(),
2573                _ => {
2574                    return _config_err!(
2575                        "Invalid metadata key provided, found too many '::' in \"{key}\""
2576                    );
2577                }
2578            };
2579            self.key_value_metadata.insert(k, Some(value.into()));
2580            Ok(())
2581        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2582            self.crypto.set(crypto_feature, value)
2583        } else if key.contains("::") {
2584            self.column_specific_options.set(key, value)
2585        } else {
2586            self.global.set(key, value)
2587        }
2588    }
2589}
2590
2591macro_rules! config_namespace_with_hashmap {
2592    (
2593     $(#[doc = $struct_d:tt])*
2594     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
2595     $vis:vis struct $struct_name:ident {
2596        $(
2597        $(#[doc = $d:tt])*
2598        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2599        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2600        )*$(,)*
2601    }
2602    ) => {
2603
2604        $(#[doc = $struct_d])*
2605        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
2606        #[derive(Debug, Clone, PartialEq)]
2607        $vis struct $struct_name{
2608            $(
2609            $(#[doc = $d])*
2610            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2611            $field_vis $field_name : $field_type,
2612            )*
2613        }
2614
2615        impl ConfigField for $struct_name {
2616            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2617                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2618                match key {
2619                    $(
2620                       stringify!($field_name) => {
2621                           // Handle deprecated fields
2622                           $(let value = $transform(value);)?
2623                           self.$field_name.set(rem, value.as_ref())
2624                       },
2625                    )*
2626                    _ => _config_err!(
2627                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2628                    )
2629                }
2630            }
2631
2632            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2633                $(
2634                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2635                let desc = concat!($($d),*).trim();
2636                // Handle deprecated fields
2637                self.$field_name.visit(v, key.as_str(), desc);
2638                )*
2639            }
2640        }
2641
2642        impl Default for $struct_name {
2643            fn default() -> Self {
2644                Self {
2645                    $($field_name: $default),*
2646                }
2647            }
2648        }
2649
2650        impl ConfigField for HashMap<String,$struct_name> {
2651            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2652                let parts: Vec<&str> = key.splitn(2, "::").collect();
2653                match parts.as_slice() {
2654                    [inner_key, hashmap_key] => {
2655                        // Get or create the struct for the specified key
2656                        let inner_value = self
2657                            .entry((*hashmap_key).to_owned())
2658                            .or_insert_with($struct_name::default);
2659
2660                        inner_value.set(inner_key, value)
2661                    }
2662                    _ => _config_err!("Unrecognized key '{key}'."),
2663                }
2664            }
2665
2666            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2667                for (column_name, col_options) in self {
2668                    $(
2669                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2670                    let desc = concat!($($d),*).trim();
2671                    col_options.$field_name.visit(v, key.as_str(), desc);
2672                    )*
2673                }
2674            }
2675        }
2676    }
2677}
2678
2679config_namespace_with_hashmap! {
2680    /// Options controlling parquet format for individual columns.
2681    ///
2682    /// See [`ParquetOptions`] for more details
2683    pub struct ParquetColumnOptions {
2684        /// Sets if bloom filter is enabled for the column path.
2685        pub bloom_filter_enabled: Option<bool>, default = None
2686
2687        /// Sets encoding for the column path.
2688        /// Valid values are: plain, plain_dictionary, rle,
2689        /// bit_packed, delta_binary_packed, delta_length_byte_array,
2690        /// delta_byte_array, rle_dictionary, and byte_stream_split.
2691        /// These values are not case-sensitive. If NULL, uses
2692        /// default parquet options
2693        pub encoding: Option<String>, default = None
2694
2695        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2696        /// default parquet options
2697        pub dictionary_enabled: Option<bool>, default = None
2698
2699        /// Sets default parquet compression codec for the column path.
2700        /// Valid values are: uncompressed, snappy, gzip(level),
2701        /// brotli(level), lz4, zstd(level), and lz4_raw.
2702        /// These values are not case-sensitive. If NULL, uses
2703        /// default parquet options
2704        pub compression: Option<String>, transform = str::to_lowercase, default = None
2705
2706        /// Sets if statistics are enabled for the column
2707        /// Valid values are: "none", "chunk", and "page"
2708        /// These values are not case sensitive. If NULL, uses
2709        /// default parquet options
2710        pub statistics_enabled: Option<String>, default = None
2711
2712        /// Sets bloom filter false positive probability for the column path. If NULL, uses
2713        /// default parquet options
2714        pub bloom_filter_fpp: Option<f64>, default = None
2715
2716        /// Sets bloom filter number of distinct values. If NULL, uses
2717        /// default parquet options
2718        pub bloom_filter_ndv: Option<u64>, default = None
2719    }
2720}
2721
2722#[derive(Clone, Debug, PartialEq)]
2723pub struct ConfigFileEncryptionProperties {
2724    /// Should the parquet footer be encrypted
2725    /// default is true
2726    pub encrypt_footer: bool,
2727    /// Key to use for the parquet footer encoded in hex format
2728    pub footer_key_as_hex: String,
2729    /// Metadata information for footer key
2730    pub footer_key_metadata_as_hex: String,
2731    /// HashMap of column names --> (key in hex format, metadata)
2732    pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2733    /// AAD prefix string uniquely identifies the file and prevents file swapping
2734    pub aad_prefix_as_hex: String,
2735    /// If true, store the AAD prefix in the file
2736    /// default is false
2737    pub store_aad_prefix: bool,
2738}
2739
2740// Setup to match EncryptionPropertiesBuilder::new()
2741impl Default for ConfigFileEncryptionProperties {
2742    fn default() -> Self {
2743        ConfigFileEncryptionProperties {
2744            encrypt_footer: true,
2745            footer_key_as_hex: String::new(),
2746            footer_key_metadata_as_hex: String::new(),
2747            column_encryption_properties: Default::default(),
2748            aad_prefix_as_hex: String::new(),
2749            store_aad_prefix: false,
2750        }
2751    }
2752}
2753
2754config_namespace_with_hashmap! {
2755    pub struct ColumnEncryptionProperties {
2756        /// Per column encryption key
2757        pub column_key_as_hex: String, default = "".to_string()
2758        /// Per column encryption key metadata
2759        pub column_metadata_as_hex: Option<String>, default = None
2760    }
2761}
2762
2763impl ConfigField for ConfigFileEncryptionProperties {
2764    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2765        let key = format!("{key_prefix}.encrypt_footer");
2766        let desc = "Encrypt the footer";
2767        self.encrypt_footer.visit(v, key.as_str(), desc);
2768
2769        let key = format!("{key_prefix}.footer_key_as_hex");
2770        let desc = "Key to use for the parquet footer";
2771        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2772
2773        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2774        let desc = "Metadata to use for the parquet footer";
2775        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2776
2777        self.column_encryption_properties.visit(v, key_prefix, desc);
2778
2779        let key = format!("{key_prefix}.aad_prefix_as_hex");
2780        let desc = "AAD prefix to use";
2781        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2782
2783        let key = format!("{key_prefix}.store_aad_prefix");
2784        let desc = "If true, store the AAD prefix";
2785        self.store_aad_prefix.visit(v, key.as_str(), desc);
2786
2787        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2788    }
2789
2790    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2791        // Any hex encoded values must be pre-encoded using
2792        // hex::encode() before calling set.
2793
2794        if key.contains("::") {
2795            // Handle any column specific properties
2796            return self.column_encryption_properties.set(key, value);
2797        };
2798
2799        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2800        match key {
2801            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2802            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2803            "footer_key_metadata_as_hex" => {
2804                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2805            }
2806            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2807            "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2808            _ => _config_err!(
2809                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2810                key
2811            ),
2812        }
2813    }
2814}
2815
2816#[cfg(feature = "parquet_encryption")]
2817impl TryFrom<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2818    type Error = DataFusionError;
2819
2820    fn try_from(val: ConfigFileEncryptionProperties) -> Result<Self> {
2821        let mut fep = FileEncryptionProperties::builder(
2822            hex::decode(val.footer_key_as_hex)
2823            .map_err(|e| {
2824                DataFusionError::Configuration(format!("Unable to decode hex footer key from ConfigFileEncryptionProperties: {e}"))
2825            })?,
2826        )
2827        .with_plaintext_footer(!val.encrypt_footer)
2828        .with_aad_prefix_storage(val.store_aad_prefix);
2829
2830        if !val.footer_key_metadata_as_hex.is_empty() {
2831            fep = fep.with_footer_key_metadata(
2832                hex::decode(&val.footer_key_metadata_as_hex)
2833                    .map_err(|e| {
2834                        DataFusionError::Configuration(format!("Unable to decode hex footer key metadata from ConfigFileEncryptionProperties: {e}"))
2835                    })?,
2836            );
2837        }
2838
2839        for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2840            let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2841                .map_err(|e| {
2842                    DataFusionError::Configuration(format!("Unable to decode hex encryption key for column {column_name}: {e}"))
2843                })?;
2844            let key_metadata = encryption_props
2845                .column_metadata_as_hex
2846                .as_ref()
2847                .map(hex::decode)
2848                .transpose()
2849                .map_err(|e| {
2850                    DataFusionError::Configuration(format!("Unable to decode hex column metadata for column {column_name}: {e}"))
2851                })?;
2852
2853            match key_metadata {
2854                Some(key_metadata) => {
2855                    fep = fep.with_column_key_and_metadata(
2856                        column_name,
2857                        encryption_key,
2858                        key_metadata,
2859                    );
2860                }
2861                None => {
2862                    fep = fep.with_column_key(column_name, encryption_key);
2863                }
2864            }
2865        }
2866
2867        if !val.aad_prefix_as_hex.is_empty() {
2868            let aad_prefix: Vec<u8> = hex::decode(&val.aad_prefix_as_hex).map_err(|e| {
2869                DataFusionError::Configuration(format!(
2870                    "Unable to decode hex AAD prefix from ConfigFileEncryptionProperties: {e}"
2871                ))
2872            })?;
2873            fep = fep.with_aad_prefix(aad_prefix);
2874        }
2875        Ok(Arc::unwrap_or_clone(fep.build().map_err(|e| {
2876            DataFusionError::Configuration(format!(
2877                "Could not build FileEncryptionProperties: {e}"
2878            ))
2879        })?))
2880    }
2881}
2882
2883#[cfg(feature = "parquet_encryption")]
2884impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2885    fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2886        let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2887
2888        let mut column_encryption_properties: HashMap<
2889            String,
2890            ColumnEncryptionProperties,
2891        > = HashMap::new();
2892
2893        for (i, column_name) in column_names_vec.iter().enumerate() {
2894            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2895            let column_metadata_as_hex: Option<String> =
2896                column_metas_vec.get(i).map(hex::encode);
2897            column_encryption_properties.insert(
2898                column_name.clone(),
2899                ColumnEncryptionProperties {
2900                    column_key_as_hex,
2901                    column_metadata_as_hex,
2902                },
2903            );
2904        }
2905        let aad_prefix = f.aad_prefix().cloned().unwrap_or_default();
2906        ConfigFileEncryptionProperties {
2907            encrypt_footer: f.encrypt_footer(),
2908            footer_key_as_hex: hex::encode(f.footer_key()),
2909            footer_key_metadata_as_hex: f
2910                .footer_key_metadata()
2911                .map(hex::encode)
2912                .unwrap_or_default(),
2913            column_encryption_properties,
2914            aad_prefix_as_hex: hex::encode(aad_prefix),
2915            store_aad_prefix: f.store_aad_prefix(),
2916        }
2917    }
2918}
2919
2920#[derive(Clone, Debug, PartialEq)]
2921pub struct ConfigFileDecryptionProperties {
2922    /// Binary string to use for the parquet footer encoded in hex format
2923    pub footer_key_as_hex: String,
2924    /// HashMap of column names --> key in hex format
2925    pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2926    /// AAD prefix string uniquely identifies the file and prevents file swapping
2927    pub aad_prefix_as_hex: String,
2928    /// If true, then verify signature for files with plaintext footers.
2929    /// default = true
2930    pub footer_signature_verification: bool,
2931}
2932
2933config_namespace_with_hashmap! {
2934    pub struct ColumnDecryptionProperties {
2935        /// Per column encryption key
2936        pub column_key_as_hex: String, default = "".to_string()
2937    }
2938}
2939
2940// Setup to match DecryptionPropertiesBuilder::new()
2941impl Default for ConfigFileDecryptionProperties {
2942    fn default() -> Self {
2943        ConfigFileDecryptionProperties {
2944            footer_key_as_hex: String::new(),
2945            column_decryption_properties: Default::default(),
2946            aad_prefix_as_hex: String::new(),
2947            footer_signature_verification: true,
2948        }
2949    }
2950}
2951
2952impl ConfigField for ConfigFileDecryptionProperties {
2953    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2954        let key = format!("{key_prefix}.footer_key_as_hex");
2955        let desc = "Key to use for the parquet footer";
2956        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2957
2958        let key = format!("{key_prefix}.aad_prefix_as_hex");
2959        let desc = "AAD prefix to use";
2960        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2961
2962        let key = format!("{key_prefix}.footer_signature_verification");
2963        let desc = "If true, verify the footer signature";
2964        self.footer_signature_verification
2965            .visit(v, key.as_str(), desc);
2966
2967        self.column_decryption_properties.visit(v, key_prefix, desc);
2968    }
2969
2970    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2971        // Any hex encoded values must be pre-encoded using
2972        // hex::encode() before calling set.
2973
2974        if key.contains("::") {
2975            // Handle any column specific properties
2976            return self.column_decryption_properties.set(key, value);
2977        };
2978
2979        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2980        match key {
2981            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2982            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2983            "footer_signature_verification" => {
2984                self.footer_signature_verification.set(rem, value.as_ref())
2985            }
2986            _ => _config_err!(
2987                "Config value \"{}\" not found on ConfigFileDecryptionProperties",
2988                key
2989            ),
2990        }
2991    }
2992}
2993
2994#[cfg(feature = "parquet_encryption")]
2995impl TryFrom<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2996    type Error = DataFusionError;
2997
2998    fn try_from(val: ConfigFileDecryptionProperties) -> Result<Self> {
2999        let mut column_names: Vec<&str> = Vec::new();
3000        let mut column_keys: Vec<Vec<u8>> = Vec::new();
3001
3002        for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
3003            let column_key = hex::decode(&decryption_properties.column_key_as_hex).map_err(|e| {
3004                        DataFusionError::Configuration(format!
3005                            ("Could not decode hex column key from ConfigFileDecryptionProperties for column name {col_name}: {e}."))
3006                    })?;
3007            column_names.push(col_name.as_str());
3008            column_keys.push(column_key);
3009        }
3010
3011        let footer_key = hex::decode(val.footer_key_as_hex).map_err(|e| {
3012            DataFusionError::Configuration(format!(
3013                "Could not decode hex footer key from ConfigFileDecryptionProperties: {e}."
3014            ))
3015        })?;
3016
3017        let mut fep = FileDecryptionProperties::builder(footer_key)
3018            .with_column_keys(column_names, column_keys)
3019            .map_err(|e| {
3020                DataFusionError::Configuration(format!(
3021                    "Could not set column keys on FileDecryptionPropertiesBuilder: {e}."
3022                ))
3023            })?;
3024
3025        if !val.footer_signature_verification {
3026            fep = fep.disable_footer_signature_verification();
3027        }
3028
3029        if !val.aad_prefix_as_hex.is_empty() {
3030            let aad_prefix = hex::decode(&val.aad_prefix_as_hex).map_err(|e| {
3031                DataFusionError::Configuration(format!(
3032                    "Could not decode hex AAD prefix from ConfigFileDecryptionProperties: {e}."
3033                ))
3034            })?;
3035            fep = fep.with_aad_prefix(aad_prefix);
3036        }
3037
3038        Ok(Arc::unwrap_or_clone(fep.build().map_err(|e| {
3039            DataFusionError::Configuration(format!(
3040                "Could not build FileDecryptionProperties: {e}."
3041            ))
3042        })?))
3043    }
3044}
3045
3046#[cfg(feature = "parquet_encryption")]
3047impl TryFrom<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
3048    type Error = DataFusionError;
3049
3050    fn try_from(f: &Arc<FileDecryptionProperties>) -> Result<Self> {
3051        let footer_key = f.footer_key(None).map_err(|e| {
3052            DataFusionError::Configuration(format!(
3053                "Could not retrieve footer key from FileDecryptionProperties. \
3054                Note that conversion to ConfigFileDecryptionProperties is not supported \
3055                when using a key retriever: {e}"
3056            ))
3057        })?;
3058
3059        let (column_names_vec, column_keys_vec) = f.column_keys();
3060        let mut column_decryption_properties: HashMap<
3061            String,
3062            ColumnDecryptionProperties,
3063        > = HashMap::new();
3064        for (i, column_name) in column_names_vec.iter().enumerate() {
3065            let props = ColumnDecryptionProperties {
3066                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
3067            };
3068            column_decryption_properties.insert(column_name.clone(), props);
3069        }
3070
3071        let aad_prefix = f.aad_prefix().cloned().unwrap_or_default();
3072        Ok(ConfigFileDecryptionProperties {
3073            footer_key_as_hex: hex::encode(footer_key.as_ref()),
3074            column_decryption_properties,
3075            aad_prefix_as_hex: hex::encode(aad_prefix),
3076            footer_signature_verification: f.check_plaintext_footer_integrity(),
3077        })
3078    }
3079}
3080
3081/// Holds implementation-specific options for an encryption factory
3082#[derive(Clone, Debug, Default, PartialEq)]
3083pub struct EncryptionFactoryOptions {
3084    pub options: HashMap<String, String>,
3085}
3086
3087impl ConfigField for EncryptionFactoryOptions {
3088    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
3089        for (option_key, option_value) in &self.options {
3090            v.some(
3091                &format!("{key}.{option_key}"),
3092                option_value,
3093                "Encryption factory specific option",
3094            );
3095        }
3096    }
3097
3098    fn set(&mut self, key: &str, value: &str) -> Result<()> {
3099        self.options.insert(key.to_owned(), value.to_owned());
3100        Ok(())
3101    }
3102}
3103
3104impl EncryptionFactoryOptions {
3105    /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
3106    pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
3107        let mut options = T::default();
3108        for (key, value) in &self.options {
3109            options.set(key, value)?;
3110        }
3111        Ok(options)
3112    }
3113}
3114
3115config_namespace! {
3116    /// Options controlling CSV format
3117    pub struct CsvOptions {
3118        /// Specifies whether there is a CSV header (i.e. the first line
3119        /// consists of is column names). The value `None` indicates that
3120        /// the configuration should be consulted.
3121        pub has_header: Option<bool>, default = None
3122        pub delimiter: u8, default = b','
3123        pub quote: u8, default = b'"'
3124        pub terminator: Option<u8>, default = None
3125        pub escape: Option<u8>, default = None
3126        pub double_quote: Option<bool>, default = None
3127        /// Quote style for CSV writing.
3128        /// One of: "Always", "Necessary", "NonNumeric", "Never"
3129        pub quote_style: CsvQuoteStyle, default = CsvQuoteStyle::Necessary
3130        /// Whether to ignore leading whitespace in string values when writing CSV.
3131        /// Defaults to `false` when `None`.
3132        pub ignore_leading_whitespace: Option<bool>, default = None
3133        /// Whether to ignore trailing whitespace in string values when writing CSV.
3134        /// Defaults to `false` when `None`.
3135        pub ignore_trailing_whitespace: Option<bool>, default = None
3136        /// Specifies whether newlines in (quoted) values are supported.
3137        ///
3138        /// Parsing newlines in quoted values may be affected by execution behaviour such as
3139        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
3140        /// parsed successfully, which may reduce performance.
3141        ///
3142        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
3143        pub newlines_in_values: Option<bool>, default = None
3144        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3145        /// Compression level for the output file. The valid range depends on the
3146        /// compression algorithm:
3147        /// - ZSTD: 1 to 22 (default: 3)
3148        /// - GZIP: 0 to 9 (default: 6)
3149        /// - BZIP2: 0 to 9 (default: 6)
3150        /// - XZ: 0 to 9 (default: 6)
3151        /// If not specified, the default level for the compression algorithm is used.
3152        pub compression_level: Option<u32>, default = None
3153        pub schema_infer_max_rec: Option<usize>, default = None
3154        pub date_format: Option<String>, default = None
3155        pub datetime_format: Option<String>, default = None
3156        pub timestamp_format: Option<String>, default = None
3157        pub timestamp_tz_format: Option<String>, default = None
3158        pub time_format: Option<String>, default = None
3159        // The output format for Nulls in the CSV writer.
3160        pub null_value: Option<String>, default = None
3161        // The input regex for Nulls when loading CSVs.
3162        pub null_regex: Option<String>, default = None
3163        pub comment: Option<u8>, default = None
3164        /// Whether to allow truncated rows when parsing, both within a single file and across files.
3165        ///
3166        /// When set to false (default), reading a single CSV file which has rows of different lengths will
3167        /// error; if reading multiple CSV files with different number of columns, it will also fail.
3168        ///
3169        /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
3170        /// rows with null values for the missing columns; if reading multiple CSV files with different number
3171        /// of columns, it creates a union schema containing all columns found across the files, and will
3172        /// pad any files missing columns with null values for their rows.
3173        pub truncated_rows: Option<bool>, default = None
3174    }
3175}
3176
3177impl CsvOptions {
3178    /// Set a limit in terms of records to scan to infer the schema
3179    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
3180    pub fn with_compression(
3181        mut self,
3182        compression_type_variant: CompressionTypeVariant,
3183    ) -> Self {
3184        self.compression = compression_type_variant;
3185        self
3186    }
3187
3188    /// Set a limit in terms of records to scan to infer the schema
3189    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
3190    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
3191        self.schema_infer_max_rec = Some(max_rec);
3192        self
3193    }
3194
3195    /// Set true to indicate that the first line is a header.
3196    /// - default to true
3197    pub fn with_has_header(mut self, has_header: bool) -> Self {
3198        self.has_header = Some(has_header);
3199        self
3200    }
3201
3202    /// Returns true if the first line is a header. If format options does not
3203    /// specify whether there is a header, returns `None` (indicating that the
3204    /// configuration should be consulted).
3205    pub fn has_header(&self) -> Option<bool> {
3206        self.has_header
3207    }
3208
3209    /// The character separating values within a row.
3210    /// - default to ','
3211    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
3212        self.delimiter = delimiter;
3213        self
3214    }
3215
3216    /// The quote character in a row.
3217    /// - default to '"'
3218    pub fn with_quote(mut self, quote: u8) -> Self {
3219        self.quote = quote;
3220        self
3221    }
3222
3223    /// The character that terminates a row.
3224    /// - default to None (CRLF)
3225    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
3226        self.terminator = terminator;
3227        self
3228    }
3229
3230    /// The escape character in a row.
3231    /// - default is None
3232    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
3233        self.escape = escape;
3234        self
3235    }
3236
3237    /// Set true to indicate that the CSV quotes should be doubled.
3238    /// - default to true
3239    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
3240        self.double_quote = Some(double_quote);
3241        self
3242    }
3243
3244    /// Set the quote style for CSV writing.
3245    pub fn with_quote_style(mut self, quote_style: CsvQuoteStyle) -> Self {
3246        self.quote_style = quote_style;
3247        self
3248    }
3249
3250    /// Set whether to ignore leading whitespace in string values when writing CSV.
3251    pub fn with_ignore_leading_whitespace(
3252        mut self,
3253        ignore_leading_whitespace: bool,
3254    ) -> Self {
3255        self.ignore_leading_whitespace = Some(ignore_leading_whitespace);
3256        self
3257    }
3258
3259    /// Set whether to ignore trailing whitespace in string values when writing CSV.
3260    pub fn with_ignore_trailing_whitespace(
3261        mut self,
3262        ignore_trailing_whitespace: bool,
3263    ) -> Self {
3264        self.ignore_trailing_whitespace = Some(ignore_trailing_whitespace);
3265        self
3266    }
3267
3268    /// Specifies whether newlines in (quoted) values are supported.
3269    ///
3270    /// Parsing newlines in quoted values may be affected by execution behaviour such as
3271    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
3272    /// parsed successfully, which may reduce performance.
3273    ///
3274    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
3275    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
3276        self.newlines_in_values = Some(newlines_in_values);
3277        self
3278    }
3279
3280    /// Set a `CompressionTypeVariant` of CSV
3281    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
3282    pub fn with_file_compression_type(
3283        mut self,
3284        compression: CompressionTypeVariant,
3285    ) -> Self {
3286        self.compression = compression;
3287        self
3288    }
3289
3290    /// Whether to allow truncated rows when parsing.
3291    /// By default this is set to false and will error if the CSV rows have different lengths.
3292    /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
3293    /// If the record’s schema is not nullable, then it will still return an error.
3294    pub fn with_truncated_rows(mut self, allow: bool) -> Self {
3295        self.truncated_rows = Some(allow);
3296        self
3297    }
3298
3299    /// Set the compression level for the output file.
3300    /// The valid range depends on the compression algorithm.
3301    /// If not specified, the default level for the algorithm is used.
3302    pub fn with_compression_level(mut self, level: u32) -> Self {
3303        self.compression_level = Some(level);
3304        self
3305    }
3306
3307    /// The delimiter character.
3308    pub fn delimiter(&self) -> u8 {
3309        self.delimiter
3310    }
3311
3312    /// The quote character.
3313    pub fn quote(&self) -> u8 {
3314        self.quote
3315    }
3316
3317    /// The terminator character.
3318    pub fn terminator(&self) -> Option<u8> {
3319        self.terminator
3320    }
3321
3322    /// The escape character.
3323    pub fn escape(&self) -> Option<u8> {
3324        self.escape
3325    }
3326}
3327
3328config_namespace! {
3329    /// Options controlling JSON format
3330    pub struct JsonOptions {
3331        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3332        /// Compression level for the output file. The valid range depends on the
3333        /// compression algorithm:
3334        /// - ZSTD: 1 to 22 (default: 3)
3335        /// - GZIP: 0 to 9 (default: 6)
3336        /// - BZIP2: 0 to 9 (default: 6)
3337        /// - XZ: 0 to 9 (default: 6)
3338        /// If not specified, the default level for the compression algorithm is used.
3339        pub compression_level: Option<u32>, default = None
3340        pub schema_infer_max_rec: Option<usize>, default = None
3341       /// The JSON format to use when reading files.
3342       ///
3343       /// When `true` (default), expects newline-delimited JSON (NDJSON):
3344       /// ```text
3345       /// {"key1": 1, "key2": "val"}
3346       /// {"key1": 2, "key2": "vals"}
3347       /// ```
3348       ///
3349       /// When `false`, expects JSON array format:
3350       /// ```text
3351       /// [
3352       ///   {"key1": 1, "key2": "val"},
3353       ///   {"key1": 2, "key2": "vals"}
3354       /// ]
3355       /// ```
3356       pub newline_delimited: bool, default = true
3357    }
3358}
3359
3360pub trait OutputFormatExt: Display {}
3361
3362#[derive(Debug, Clone, PartialEq)]
3363#[cfg_attr(feature = "parquet", expect(clippy::large_enum_variant))]
3364pub enum OutputFormat {
3365    CSV(CsvOptions),
3366    JSON(JsonOptions),
3367    #[cfg(feature = "parquet")]
3368    PARQUET(TableParquetOptions),
3369    AVRO,
3370    ARROW,
3371}
3372
3373impl Display for OutputFormat {
3374    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3375        let out = match self {
3376            OutputFormat::CSV(_) => "csv",
3377            OutputFormat::JSON(_) => "json",
3378            #[cfg(feature = "parquet")]
3379            OutputFormat::PARQUET(_) => "parquet",
3380            OutputFormat::AVRO => "avro",
3381            OutputFormat::ARROW => "arrow",
3382        };
3383        write!(f, "{out}")
3384    }
3385}
3386
3387#[cfg(test)]
3388mod tests {
3389    #[cfg(feature = "parquet")]
3390    use crate::config::TableParquetOptions;
3391    use crate::config::{
3392        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
3393        Extensions, TableOptions,
3394    };
3395    use std::any::Any;
3396    use std::collections::HashMap;
3397
3398    #[derive(Default, Debug, Clone)]
3399    pub struct TestExtensionConfig {
3400        /// Should "foo" be replaced by "bar"?
3401        pub properties: HashMap<String, String>,
3402    }
3403
3404    impl ExtensionOptions for TestExtensionConfig {
3405        fn as_any(&self) -> &dyn Any {
3406            self
3407        }
3408
3409        fn as_any_mut(&mut self) -> &mut dyn Any {
3410            self
3411        }
3412
3413        fn cloned(&self) -> Box<dyn ExtensionOptions> {
3414            Box::new(self.clone())
3415        }
3416
3417        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
3418            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
3419            assert_eq!(key, "test");
3420            self.properties.insert(rem.to_owned(), value.to_owned());
3421            Ok(())
3422        }
3423
3424        fn entries(&self) -> Vec<ConfigEntry> {
3425            self.properties
3426                .iter()
3427                .map(|(k, v)| ConfigEntry {
3428                    key: k.into(),
3429                    value: Some(v.into()),
3430                    description: "",
3431                })
3432                .collect()
3433        }
3434    }
3435
3436    impl ConfigExtension for TestExtensionConfig {
3437        const PREFIX: &'static str = "test";
3438    }
3439
3440    #[test]
3441    fn create_table_config() {
3442        let mut extension = Extensions::new();
3443        extension.insert(TestExtensionConfig::default());
3444        let table_config = TableOptions::new().with_extensions(extension);
3445        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
3446        assert!(kafka_config.is_some())
3447    }
3448
3449    #[test]
3450    fn alter_test_extension_config() {
3451        let mut extension = Extensions::new();
3452        extension.insert(TestExtensionConfig::default());
3453        let mut table_config = TableOptions::new().with_extensions(extension);
3454        table_config.set_config_format(ConfigFileType::CSV);
3455        table_config.set("format.delimiter", ";").unwrap();
3456        assert_eq!(table_config.csv.delimiter, b';');
3457        table_config.set("test.bootstrap.servers", "asd").unwrap();
3458        let kafka_config = table_config
3459            .extensions
3460            .get::<TestExtensionConfig>()
3461            .unwrap();
3462        assert_eq!(
3463            kafka_config.properties.get("bootstrap.servers").unwrap(),
3464            "asd"
3465        );
3466    }
3467
3468    #[test]
3469    fn iter_test_extension_config() {
3470        let mut extension = Extensions::new();
3471        extension.insert(TestExtensionConfig::default());
3472        let table_config = TableOptions::new().with_extensions(extension);
3473        let extensions = table_config.extensions.iter().collect::<Vec<_>>();
3474        assert_eq!(extensions.len(), 1);
3475        assert_eq!(extensions[0].0, TestExtensionConfig::PREFIX);
3476    }
3477
3478    #[test]
3479    fn csv_u8_table_options() {
3480        let mut table_config = TableOptions::new();
3481        table_config.set_config_format(ConfigFileType::CSV);
3482        table_config.set("format.delimiter", ";").unwrap();
3483        assert_eq!(table_config.csv.delimiter as char, ';');
3484        table_config.set("format.escape", "\"").unwrap();
3485        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
3486        table_config.set("format.escape", "\'").unwrap();
3487        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
3488    }
3489
3490    #[test]
3491    fn warning_only_not_default() {
3492        use std::sync::atomic::AtomicUsize;
3493        static COUNT: AtomicUsize = AtomicUsize::new(0);
3494        use log::{Level, LevelFilter, Metadata, Record};
3495        struct SimpleLogger;
3496        impl log::Log for SimpleLogger {
3497            fn enabled(&self, metadata: &Metadata) -> bool {
3498                metadata.level() <= Level::Info
3499            }
3500
3501            fn log(&self, record: &Record) {
3502                if self.enabled(record.metadata()) {
3503                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3504                }
3505            }
3506            fn flush(&self) {}
3507        }
3508        log::set_logger(&SimpleLogger).unwrap();
3509        log::set_max_level(LevelFilter::Info);
3510        let mut sql_parser_options = crate::config::SqlParserOptions::default();
3511        sql_parser_options
3512            .set("enable_options_value_normalization", "false")
3513            .unwrap();
3514        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
3515        sql_parser_options
3516            .set("enable_options_value_normalization", "true")
3517            .unwrap();
3518        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
3519    }
3520
3521    #[test]
3522    fn reset_nested_scalar_reports_helpful_error() {
3523        let mut value = true;
3524        let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err();
3525        let message = err.to_string();
3526        assert!(
3527            message.starts_with(
3528                "Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\""
3529            ),
3530            "unexpected error message: {message}"
3531        );
3532    }
3533
3534    #[cfg(feature = "parquet")]
3535    #[test]
3536    fn parquet_table_options() {
3537        let mut table_config = TableOptions::new();
3538        table_config.set_config_format(ConfigFileType::PARQUET);
3539        table_config
3540            .set("format.bloom_filter_enabled::col1", "true")
3541            .unwrap();
3542        assert_eq!(
3543            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
3544            Some(true)
3545        );
3546    }
3547
3548    #[cfg(feature = "parquet_encryption")]
3549    #[test]
3550    fn parquet_table_encryption() {
3551        use crate::config::{
3552            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
3553        };
3554        use parquet::encryption::decrypt::FileDecryptionProperties;
3555        use parquet::encryption::encrypt::FileEncryptionProperties;
3556        use std::sync::Arc;
3557
3558        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
3559        let column_names = vec!["double_field", "float_field"];
3560        let column_keys =
3561            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
3562
3563        let file_encryption_properties =
3564            FileEncryptionProperties::builder(footer_key.clone())
3565                .with_column_keys(column_names.clone(), column_keys.clone())
3566                .unwrap()
3567                .build()
3568                .unwrap();
3569
3570        let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
3571            .with_column_keys(column_names.clone(), column_keys.clone())
3572            .unwrap()
3573            .build()
3574            .unwrap();
3575
3576        // Test round-trip
3577        let config_encrypt =
3578            ConfigFileEncryptionProperties::from(&file_encryption_properties);
3579        let encryption_properties_built =
3580            Arc::new(FileEncryptionProperties::try_from(config_encrypt.clone()).unwrap());
3581        assert_eq!(file_encryption_properties, encryption_properties_built);
3582
3583        let config_decrypt =
3584            ConfigFileDecryptionProperties::try_from(&decryption_properties).unwrap();
3585        let decryption_properties_built =
3586            Arc::new(FileDecryptionProperties::try_from(config_decrypt.clone()).unwrap());
3587        assert_eq!(decryption_properties, decryption_properties_built);
3588
3589        ///////////////////////////////////////////////////////////////////////////////////
3590        // Test encryption config
3591
3592        // Display original encryption config
3593        // println!("{:#?}", config_encrypt);
3594
3595        let mut table_config = TableOptions::new();
3596        table_config.set_config_format(ConfigFileType::PARQUET);
3597        table_config
3598            .parquet
3599            .set(
3600                "crypto.file_encryption.encrypt_footer",
3601                config_encrypt.encrypt_footer.to_string().as_str(),
3602            )
3603            .unwrap();
3604        table_config
3605            .parquet
3606            .set(
3607                "crypto.file_encryption.footer_key_as_hex",
3608                config_encrypt.footer_key_as_hex.as_str(),
3609            )
3610            .unwrap();
3611
3612        for (i, col_name) in column_names.iter().enumerate() {
3613            let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3614            let value = hex::encode(column_keys[i].clone());
3615            table_config
3616                .parquet
3617                .set(key.as_str(), value.as_str())
3618                .unwrap();
3619        }
3620
3621        // Print matching final encryption config
3622        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3623
3624        assert_eq!(
3625            table_config.parquet.crypto.file_encryption,
3626            Some(config_encrypt)
3627        );
3628
3629        ///////////////////////////////////////////////////////////////////////////////////
3630        // Test decryption config
3631
3632        // Display original decryption config
3633        // println!("{:#?}", config_decrypt);
3634
3635        let mut table_config = TableOptions::new();
3636        table_config.set_config_format(ConfigFileType::PARQUET);
3637        table_config
3638            .parquet
3639            .set(
3640                "crypto.file_decryption.footer_key_as_hex",
3641                config_decrypt.footer_key_as_hex.as_str(),
3642            )
3643            .unwrap();
3644
3645        for (i, col_name) in column_names.iter().enumerate() {
3646            let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3647            let value = hex::encode(column_keys[i].clone());
3648            table_config
3649                .parquet
3650                .set(key.as_str(), value.as_str())
3651                .unwrap();
3652        }
3653
3654        // Print matching final decryption config
3655        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3656
3657        assert_eq!(
3658            table_config.parquet.crypto.file_decryption,
3659            Some(config_decrypt.clone())
3660        );
3661
3662        // Set config directly
3663        let mut table_config = TableOptions::new();
3664        table_config.set_config_format(ConfigFileType::PARQUET);
3665        table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3666        assert_eq!(
3667            table_config.parquet.crypto.file_decryption,
3668            Some(config_decrypt.clone())
3669        );
3670    }
3671
3672    #[cfg(feature = "parquet_encryption")]
3673    #[test]
3674    fn parquet_encryption_invalid_hex_errors_encryption() {
3675        use crate::config::ColumnEncryptionProperties;
3676        use crate::config::ConfigFileEncryptionProperties;
3677        use parquet::encryption::encrypt::FileEncryptionProperties;
3678        use std::collections::HashMap;
3679
3680        let valid_footer_key_as_hex = hex::encode(b"0123456789012345");
3681
3682        let mut enc = ConfigFileEncryptionProperties {
3683            encrypt_footer: true,
3684            footer_key_as_hex: valid_footer_key_as_hex.clone(),
3685            footer_key_metadata_as_hex: String::new(),
3686            column_encryption_properties: HashMap::new(),
3687            aad_prefix_as_hex: String::new(),
3688            store_aad_prefix: false,
3689        };
3690
3691        // Encryption: invalid footer key hex
3692        enc.footer_key_as_hex = "not_hex".to_string();
3693        let err = FileEncryptionProperties::try_from(enc.clone())
3694            .unwrap_err()
3695            .to_string();
3696        assert!(err.contains("Unable to decode hex footer key"));
3697        enc.footer_key_as_hex = valid_footer_key_as_hex.clone();
3698
3699        // Encryption: invalid footer key metadata hex
3700        enc.footer_key_metadata_as_hex = "zz".to_string();
3701        let err = FileEncryptionProperties::try_from(enc.clone())
3702            .unwrap_err()
3703            .to_string();
3704        assert!(err.contains("Unable to decode hex footer key metadata"));
3705        enc.footer_key_metadata_as_hex = String::new();
3706
3707        // Encryption: invalid column key hex
3708        enc.column_encryption_properties.insert(
3709            "col1".to_string(),
3710            ColumnEncryptionProperties {
3711                column_key_as_hex: "bad".to_string(),
3712                column_metadata_as_hex: None,
3713            },
3714        );
3715        let err = FileEncryptionProperties::try_from(enc.clone())
3716            .unwrap_err()
3717            .to_string();
3718        assert!(err.contains("Unable to decode hex encryption key for column col1"));
3719        enc.column_encryption_properties.clear();
3720
3721        // Encryption: invalid column metadata hex
3722        enc.column_encryption_properties.insert(
3723            "col1".to_string(),
3724            ColumnEncryptionProperties {
3725                column_key_as_hex: hex::encode(b"1234567890123450"),
3726                column_metadata_as_hex: Some("zz".to_string()),
3727            },
3728        );
3729        let err = FileEncryptionProperties::try_from(enc.clone())
3730            .unwrap_err()
3731            .to_string();
3732        assert!(err.contains("Unable to decode hex column metadata for column col1"));
3733        enc.column_encryption_properties.clear();
3734
3735        // Encryption: invalid AAD prefix hex
3736        enc.aad_prefix_as_hex = "zz".to_string();
3737        let err = FileEncryptionProperties::try_from(enc.clone())
3738            .unwrap_err()
3739            .to_string();
3740        assert!(err.contains("Unable to decode hex AAD prefix"));
3741    }
3742
3743    #[cfg(feature = "parquet_encryption")]
3744    #[test]
3745    fn parquet_encryption_invalid_hex_errors_decryption() {
3746        use crate::config::ColumnDecryptionProperties;
3747        use crate::config::ConfigFileDecryptionProperties;
3748        use parquet::encryption::decrypt::FileDecryptionProperties;
3749        use std::collections::HashMap;
3750
3751        let valid_footer_key_as_hex = hex::encode(b"0123456789012345");
3752
3753        let mut dec = ConfigFileDecryptionProperties {
3754            footer_key_as_hex: valid_footer_key_as_hex.clone(),
3755            column_decryption_properties: HashMap::new(),
3756            aad_prefix_as_hex: String::new(),
3757            footer_signature_verification: true,
3758        };
3759
3760        // Decryption: invalid column key hex
3761        dec.column_decryption_properties.insert(
3762            "col1".to_string(),
3763            ColumnDecryptionProperties {
3764                column_key_as_hex: "bad".to_string(),
3765            },
3766        );
3767        let err = FileDecryptionProperties::try_from(dec.clone())
3768            .unwrap_err()
3769            .to_string();
3770        assert!(err.contains("Could not decode hex column key"));
3771        assert!(err.contains("col1"));
3772        dec.column_decryption_properties.clear();
3773
3774        // Decryption: invalid footer key hex
3775        dec.footer_key_as_hex = "bad".to_string();
3776        let err = FileDecryptionProperties::try_from(dec.clone())
3777            .unwrap_err()
3778            .to_string();
3779        assert!(err.contains("Could not decode hex footer key"));
3780        dec.footer_key_as_hex = valid_footer_key_as_hex;
3781
3782        // Decryption: invalid AAD prefix hex
3783        dec.aad_prefix_as_hex = "zz".to_string();
3784        let err = FileDecryptionProperties::try_from(dec.clone())
3785            .unwrap_err()
3786            .to_string();
3787        assert!(err.contains("Could not decode hex AAD prefix"));
3788    }
3789
3790    #[cfg(feature = "parquet_encryption")]
3791    #[test]
3792    fn parquet_encryption_factory_config() {
3793        let mut parquet_options = TableParquetOptions::default();
3794
3795        assert_eq!(parquet_options.crypto.factory_id, None);
3796        assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3797
3798        let mut input_config = TestExtensionConfig::default();
3799        input_config
3800            .properties
3801            .insert("key1".to_string(), "value 1".to_string());
3802        input_config
3803            .properties
3804            .insert("key2".to_string(), "value 2".to_string());
3805
3806        parquet_options
3807            .crypto
3808            .configure_factory("example_factory", &input_config);
3809
3810        assert_eq!(
3811            parquet_options.crypto.factory_id,
3812            Some("example_factory".to_string())
3813        );
3814        let factory_options = &parquet_options.crypto.factory_options.options;
3815        assert_eq!(factory_options.len(), 2);
3816        assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3817        assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3818    }
3819
3820    #[cfg(feature = "parquet_encryption")]
3821    struct ParquetEncryptionKeyRetriever {}
3822
3823    #[cfg(feature = "parquet_encryption")]
3824    impl parquet::encryption::decrypt::KeyRetriever for ParquetEncryptionKeyRetriever {
3825        fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result<Vec<u8>> {
3826            if !key_metadata.is_empty() {
3827                Ok(b"1234567890123450".to_vec())
3828            } else {
3829                Err(parquet::errors::ParquetError::General(
3830                    "Key metadata not provided".to_string(),
3831                ))
3832            }
3833        }
3834    }
3835
3836    #[cfg(feature = "parquet_encryption")]
3837    #[test]
3838    fn conversion_from_key_retriever_to_config_file_decryption_properties() {
3839        use crate::Result;
3840        use crate::config::ConfigFileDecryptionProperties;
3841        use crate::encryption::FileDecryptionProperties;
3842
3843        let retriever = std::sync::Arc::new(ParquetEncryptionKeyRetriever {});
3844        let decryption_properties =
3845            FileDecryptionProperties::with_key_retriever(retriever)
3846                .build()
3847                .unwrap();
3848        let config_file_decryption_properties: Result<ConfigFileDecryptionProperties> =
3849            (&decryption_properties).try_into();
3850        assert!(config_file_decryption_properties.is_err());
3851        let err = config_file_decryption_properties.unwrap_err().to_string();
3852        assert!(err.contains("key retriever"));
3853        assert!(err.contains("Key metadata not provided"));
3854    }
3855
3856    #[cfg(feature = "parquet")]
3857    #[test]
3858    fn parquet_table_options_config_entry() {
3859        let mut table_config = TableOptions::new();
3860        table_config.set_config_format(ConfigFileType::PARQUET);
3861        table_config
3862            .set("format.bloom_filter_enabled::col1", "true")
3863            .unwrap();
3864        let entries = table_config.entries();
3865        assert!(
3866            entries
3867                .iter()
3868                .any(|item| item.key == "format.bloom_filter_enabled::col1")
3869        )
3870    }
3871
3872    #[cfg(feature = "parquet")]
3873    #[test]
3874    fn parquet_table_parquet_options_config_entry() {
3875        let mut table_parquet_options = TableParquetOptions::new();
3876        table_parquet_options
3877            .set(
3878                "crypto.file_encryption.column_key_as_hex::double_field",
3879                "31323334353637383930313233343530",
3880            )
3881            .unwrap();
3882        let entries = table_parquet_options.entries();
3883        assert!(
3884            entries.iter().any(|item| item.key
3885                == "crypto.file_encryption.column_key_as_hex::double_field")
3886        )
3887    }
3888
3889    #[cfg(feature = "parquet")]
3890    #[test]
3891    fn parquet_table_options_config_metadata_entry() {
3892        let mut table_config = TableOptions::new();
3893        table_config.set_config_format(ConfigFileType::PARQUET);
3894        table_config.set("format.metadata::key1", "").unwrap();
3895        table_config.set("format.metadata::key2", "value2").unwrap();
3896        table_config
3897            .set("format.metadata::key3", "value with spaces ")
3898            .unwrap();
3899        table_config
3900            .set("format.metadata::key4", "value with special chars :: :")
3901            .unwrap();
3902
3903        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3904        assert_eq!(parsed_metadata.get("should not exist1"), None);
3905        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3906        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3907        assert_eq!(
3908            parsed_metadata.get("key3"),
3909            Some(&Some("value with spaces ".into()))
3910        );
3911        assert_eq!(
3912            parsed_metadata.get("key4"),
3913            Some(&Some("value with special chars :: :".into()))
3914        );
3915
3916        // duplicate keys are overwritten
3917        table_config.set("format.metadata::key_dupe", "A").unwrap();
3918        table_config.set("format.metadata::key_dupe", "B").unwrap();
3919        let parsed_metadata = table_config.parquet.key_value_metadata;
3920        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3921    }
3922    #[cfg(feature = "parquet")]
3923    #[test]
3924    fn test_parquet_writer_version_validation() {
3925        use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};
3926
3927        let mut config = ConfigOptions::default();
3928
3929        // Valid values should work
3930        config
3931            .set("datafusion.execution.parquet.writer_version", "1.0")
3932            .unwrap();
3933        assert_eq!(
3934            config.execution.parquet.writer_version,
3935            DFParquetWriterVersion::V1_0
3936        );
3937
3938        config
3939            .set("datafusion.execution.parquet.writer_version", "2.0")
3940            .unwrap();
3941        assert_eq!(
3942            config.execution.parquet.writer_version,
3943            DFParquetWriterVersion::V2_0
3944        );
3945
3946        // Invalid value should error immediately at SET time
3947        let err = config
3948            .set("datafusion.execution.parquet.writer_version", "3.0")
3949            .unwrap_err();
3950        assert_eq!(
3951            err.to_string(),
3952            "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
3953        );
3954    }
3955
3956    #[cfg(feature = "parquet")]
3957    #[test]
3958    fn set_cdc_enabled_flag() {
3959        use crate::config::ConfigOptions;
3960
3961        let mut config = ConfigOptions::default();
3962        // CDC is disabled by default.
3963        assert!(!config.execution.parquet.content_defined_chunking.enabled);
3964
3965        // `.enabled = true` enables CDC; parameters keep their defaults.
3966        config
3967            .set(
3968                "datafusion.execution.parquet.content_defined_chunking.enabled",
3969                "true",
3970            )
3971            .unwrap();
3972        let cdc = &config.execution.parquet.content_defined_chunking;
3973        assert!(cdc.enabled);
3974        assert_eq!(cdc.min_chunk_size, 256 * 1024);
3975        assert_eq!(cdc.max_chunk_size, 1024 * 1024);
3976        assert_eq!(cdc.norm_level, 0);
3977
3978        // `.enabled = false` disables CDC.
3979        config
3980            .set(
3981                "datafusion.execution.parquet.content_defined_chunking.enabled",
3982                "false",
3983            )
3984            .unwrap();
3985        assert!(!config.execution.parquet.content_defined_chunking.enabled);
3986    }
3987
3988    #[cfg(feature = "parquet")]
3989    #[test]
3990    fn set_cdc_param_does_not_enable() {
3991        use crate::config::ConfigOptions;
3992
3993        let mut config = ConfigOptions::default();
3994
3995        // Setting a parameter does NOT enable CDC (`enabled` is a distinct field,
3996        // defaulting to false), and the result is independent of key order.
3997        config
3998            .set(
3999                "datafusion.execution.parquet.content_defined_chunking.min_chunk_size",
4000                "1024",
4001            )
4002            .unwrap();
4003        let cdc = &config.execution.parquet.content_defined_chunking;
4004        assert!(!cdc.enabled);
4005        assert_eq!(cdc.min_chunk_size, 1024);
4006        assert_eq!(cdc.max_chunk_size, 1024 * 1024);
4007        assert_eq!(cdc.norm_level, 0);
4008    }
4009}