datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26use crate::parquet_config::DFParquetWriterVersion;
27use crate::parsers::CompressionTypeVariant;
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30#[cfg(feature = "parquet_encryption")]
31use hex;
32use std::any::Any;
33use std::collections::{BTreeMap, HashMap};
34use std::error::Error;
35use std::fmt::{self, Display};
36use std::str::FromStr;
37#[cfg(feature = "parquet_encryption")]
38use std::sync::Arc;
39
40/// A macro that wraps a configuration struct and automatically derives
41/// [`Default`] and [`ConfigField`] for it, allowing it to be used
42/// in the [`ConfigOptions`] configuration tree.
43///
44/// `transform` is used to normalize values before parsing.
45///
46/// For example,
47///
48/// ```ignore
49/// config_namespace! {
50///    /// Amazing config
51///    pub struct MyConfig {
52///        /// Field 1 doc
53///        field1: String, transform = str::to_lowercase, default = "".to_string()
54///
55///        /// Field 2 doc
56///        field2: usize, default = 232
57///
58///        /// Field 3 doc
59///        field3: Option<usize>, default = None
60///    }
61/// }
62/// ```
63///
64/// Will generate
65///
66/// ```ignore
67/// /// Amazing config
68/// #[derive(Debug, Clone)]
69/// #[non_exhaustive]
70/// pub struct MyConfig {
71///     /// Field 1 doc
72///     field1: String,
73///     /// Field 2 doc
74///     field2: usize,
75///     /// Field 3 doc
76///     field3: Option<usize>,
77/// }
78/// impl ConfigField for MyConfig {
79///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
80///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
81///         match key {
82///             "field1" => {
83///                 let value = str::to_lowercase(value);
84///                 self.field1.set(rem, value.as_ref())
85///             },
86///             "field2" => self.field2.set(rem, value.as_ref()),
87///             "field3" => self.field3.set(rem, value.as_ref()),
88///             _ => _internal_err!(
89///                 "Config value \"{}\" not found on MyConfig",
90///                 key
91///             ),
92///         }
93///     }
94///
95///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
96///         let key = format!("{}.field1", key_prefix);
97///         let desc = "Field 1 doc";
98///         self.field1.visit(v, key.as_str(), desc);
99///         let key = format!("{}.field2", key_prefix);
100///         let desc = "Field 2 doc";
101///         self.field2.visit(v, key.as_str(), desc);
102///         let key = format!("{}.field3", key_prefix);
103///         let desc = "Field 3 doc";
104///         self.field3.visit(v, key.as_str(), desc);
105///     }
106/// }
107///
108/// impl Default for MyConfig {
109///     fn default() -> Self {
110///         Self {
111///             field1: "".to_string(),
112///             field2: 232,
113///             field3: None,
114///         }
115///     }
116/// }
117/// ```
118///
119/// NB: Misplaced commas may result in nonsensical errors
120#[macro_export]
121macro_rules! config_namespace {
122    (
123        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
124        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
125        $(#[allow($($struct_de:tt)*)])?
126        $vis:vis struct $struct_name:ident {
127            $(
128                $(#[doc = $d:tt])* // Field-level documentation attributes
129                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
130                $(#[allow($($field_de:tt)*)])?
131                $field_vis:vis $field_name:ident : $field_type:ty,
132                $(warn = $warn:expr,)?
133                $(transform = $transform:expr,)?
134                default = $default:expr
135            )*$(,)*
136        }
137    ) => {
138        $(#[doc = $struct_d])* // Apply struct documentation
139        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
140        $(#[allow($($struct_de)*)])?
141        #[derive(Debug, Clone, PartialEq)]
142        $vis struct $struct_name {
143            $(
144                $(#[doc = $d])* // Apply field documentation
145                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
146                $(#[allow($($field_de)*)])?
147                $field_vis $field_name: $field_type,
148            )*
149        }
150
151        impl $crate::config::ConfigField for $struct_name {
152            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
153                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
154                match key {
155                    $(
156                        stringify!($field_name) => {
157                            // Safely apply deprecated attribute if present
158                            // $(#[allow(deprecated)])?
159                            {
160                                $(let value = $transform(value);)? // Apply transformation if specified
161                                let ret = self.$field_name.set(rem, value.as_ref());
162
163                                $(if !$warn.is_empty() {
164                                    let default: $field_type = $default;
165                                    if default != self.$field_name {
166                                        log::warn!($warn);
167                                    }
168                                })? // Log warning if specified, and the value is not the default
169                                ret
170                            }
171                        },
172                    )*
173                    _ => return $crate::error::_config_err!(
174                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
175                    )
176                }
177            }
178
179            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
180                $(
181                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
182                    let desc = concat!($($d),*).trim();
183                    self.$field_name.visit(v, key.as_str(), desc);
184                )*
185            }
186
187            fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
188                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
189                match key {
190                    $(
191                        stringify!($field_name) => {
192                                    {
193                                if rem.is_empty() {
194                                    let default_value: $field_type = $default;
195                                    self.$field_name = default_value;
196                                    Ok(())
197                                } else {
198                                    self.$field_name.reset(rem)
199                                }
200                            }
201                        },
202                    )*
203                    _ => $crate::error::_config_err!(
204                        "Config value \"{}\" not found on {}",
205                        key,
206                        stringify!($struct_name)
207                    ),
208                }
209            }
210        }
211        impl Default for $struct_name {
212            fn default() -> Self {
213                Self {
214                    $($field_name: $default),*
215                }
216            }
217        }
218    }
219}
220
221config_namespace! {
222    /// Options related to catalog and directory scanning
223    ///
224    /// See also: [`SessionConfig`]
225    ///
226    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
227    pub struct CatalogOptions {
228        /// Whether the default catalog and schema should be created automatically.
229        pub create_default_catalog_and_schema: bool, default = true
230
231        /// The default catalog name - this impacts what SQL queries use if not specified
232        pub default_catalog: String, default = "datafusion".to_string()
233
234        /// The default schema name - this impacts what SQL queries use if not specified
235        pub default_schema: String, default = "public".to_string()
236
237        /// Should DataFusion provide access to `information_schema`
238        /// virtual tables for displaying schema information
239        pub information_schema: bool, default = false
240
241        /// Location scanned to load tables for `default` schema
242        pub location: Option<String>, default = None
243
244        /// Type of `TableProvider` to use when loading `default` schema
245        pub format: Option<String>, default = None
246
247        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
248        /// if not specified explicitly in the statement.
249        pub has_header: bool, default = true
250
251        /// Specifies whether newlines in (quoted) CSV values are supported.
252        ///
253        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
254        /// if not specified explicitly in the statement.
255        ///
256        /// Parsing newlines in quoted values may be affected by execution behaviour such as
257        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
258        /// parsed successfully, which may reduce performance.
259        pub newlines_in_values: bool, default = false
260    }
261}
262
263config_namespace! {
264    /// Options related to SQL parser
265    ///
266    /// See also: [`SessionConfig`]
267    ///
268    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
269    pub struct SqlParserOptions {
270        /// When set to true, SQL parser will parse float as decimal type
271        pub parse_float_as_decimal: bool, default = false
272
273        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
274        pub enable_ident_normalization: bool, default = true
275
276        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
277        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
278        /// are normalized automatically.
279        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
280
281        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
282        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
283        pub dialect: Dialect, default = Dialect::Generic
284        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
285
286        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
287        /// ignore the length. If false, error if a `VARCHAR` with a length is
288        /// specified. The Arrow type system does not have a notion of maximum
289        /// string length and thus DataFusion can not enforce such limits.
290        pub support_varchar_with_length: bool, default = true
291
292        /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
293        /// If false, they are mapped to `Utf8`.
294        /// Default is true.
295        pub map_string_types_to_utf8view: bool, default = true
296
297        /// When set to true, the source locations relative to the original SQL
298        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
299        /// and recorded in the logical plan nodes.
300        pub collect_spans: bool, default = false
301
302        /// Specifies the recursion depth limit when parsing complex SQL Queries
303        pub recursion_limit: usize, default = 50
304
305        /// Specifies the default null ordering for query results. There are 4 options:
306        /// - `nulls_max`: Nulls appear last in ascending order.
307        /// - `nulls_min`: Nulls appear first in ascending order.
308        /// - `nulls_first`: Nulls always be first in any order.
309        /// - `nulls_last`: Nulls always be last in any order.
310        ///
311        /// By default, `nulls_max` is used to follow Postgres's behavior.
312        /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
313        pub default_null_ordering: String, default = "nulls_max".to_string()
314    }
315}
316
317/// This is the SQL dialect used by DataFusion's parser.
318/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
319/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
320#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
321pub enum Dialect {
322    #[default]
323    Generic,
324    MySQL,
325    PostgreSQL,
326    Hive,
327    SQLite,
328    Snowflake,
329    Redshift,
330    MsSQL,
331    ClickHouse,
332    BigQuery,
333    Ansi,
334    DuckDB,
335    Databricks,
336}
337
338impl AsRef<str> for Dialect {
339    fn as_ref(&self) -> &str {
340        match self {
341            Self::Generic => "generic",
342            Self::MySQL => "mysql",
343            Self::PostgreSQL => "postgresql",
344            Self::Hive => "hive",
345            Self::SQLite => "sqlite",
346            Self::Snowflake => "snowflake",
347            Self::Redshift => "redshift",
348            Self::MsSQL => "mssql",
349            Self::ClickHouse => "clickhouse",
350            Self::BigQuery => "bigquery",
351            Self::Ansi => "ansi",
352            Self::DuckDB => "duckdb",
353            Self::Databricks => "databricks",
354        }
355    }
356}
357
358impl FromStr for Dialect {
359    type Err = DataFusionError;
360
361    fn from_str(s: &str) -> Result<Self, Self::Err> {
362        let value = match s.to_ascii_lowercase().as_str() {
363            "generic" => Self::Generic,
364            "mysql" => Self::MySQL,
365            "postgresql" | "postgres" => Self::PostgreSQL,
366            "hive" => Self::Hive,
367            "sqlite" => Self::SQLite,
368            "snowflake" => Self::Snowflake,
369            "redshift" => Self::Redshift,
370            "mssql" => Self::MsSQL,
371            "clickhouse" => Self::ClickHouse,
372            "bigquery" => Self::BigQuery,
373            "ansi" => Self::Ansi,
374            "duckdb" => Self::DuckDB,
375            "databricks" => Self::Databricks,
376            other => {
377                let error_message = format!(
378                    "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
379                );
380                return Err(DataFusionError::Configuration(error_message));
381            }
382        };
383        Ok(value)
384    }
385}
386
387impl ConfigField for Dialect {
388    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
389        v.some(key, self, description)
390    }
391
392    fn set(&mut self, _: &str, value: &str) -> Result<()> {
393        *self = Self::from_str(value)?;
394        Ok(())
395    }
396}
397
398impl Display for Dialect {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        let str = self.as_ref();
401        write!(f, "{str}")
402    }
403}
404
405#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
406pub enum SpillCompression {
407    Zstd,
408    Lz4Frame,
409    #[default]
410    Uncompressed,
411}
412
413impl FromStr for SpillCompression {
414    type Err = DataFusionError;
415
416    fn from_str(s: &str) -> Result<Self, Self::Err> {
417        match s.to_ascii_lowercase().as_str() {
418            "zstd" => Ok(Self::Zstd),
419            "lz4_frame" => Ok(Self::Lz4Frame),
420            "uncompressed" | "" => Ok(Self::Uncompressed),
421            other => Err(DataFusionError::Configuration(format!(
422                "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
423            ))),
424        }
425    }
426}
427
428impl ConfigField for SpillCompression {
429    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
430        v.some(key, self, description)
431    }
432
433    fn set(&mut self, _: &str, value: &str) -> Result<()> {
434        *self = SpillCompression::from_str(value)?;
435        Ok(())
436    }
437}
438
439impl Display for SpillCompression {
440    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441        let str = match self {
442            Self::Zstd => "zstd",
443            Self::Lz4Frame => "lz4_frame",
444            Self::Uncompressed => "uncompressed",
445        };
446        write!(f, "{str}")
447    }
448}
449
450impl From<SpillCompression> for Option<CompressionType> {
451    fn from(c: SpillCompression) -> Self {
452        match c {
453            SpillCompression::Zstd => Some(CompressionType::ZSTD),
454            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
455            SpillCompression::Uncompressed => None,
456        }
457    }
458}
459
460config_namespace! {
461    /// Options related to query execution
462    ///
463    /// See also: [`SessionConfig`]
464    ///
465    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
466    pub struct ExecutionOptions {
467        /// Default batch size while creating new batches, it's especially useful for
468        /// buffer-in-memory batches since creating tiny batches would result in too much
469        /// metadata memory consumption
470        pub batch_size: usize, default = 8192
471
472        /// When set to true, record batches will be examined between each operator and
473        /// small batches will be coalesced into larger batches. This is helpful when there
474        /// are highly selective filters or joins that could produce tiny output batches. The
475        /// target batch size is determined by the configuration setting
476        pub coalesce_batches: bool, default = true
477
478        /// Should DataFusion collect statistics when first creating a table.
479        /// Has no effect after the table is created. Applies to the default
480        /// `ListingTableProvider` in DataFusion. Defaults to true.
481        pub collect_statistics: bool, default = true
482
483        /// Number of partitions for query execution. Increasing partitions can increase
484        /// concurrency.
485        ///
486        /// Defaults to the number of CPU cores on the system
487        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
488
489        /// The default time zone
490        ///
491        /// Some functions, e.g. `now` return timestamps in this time zone
492        pub time_zone: Option<String>, default = None
493
494        /// Parquet options
495        pub parquet: ParquetOptions, default = Default::default()
496
497        /// Fan-out during initial physical planning.
498        ///
499        /// This is mostly use to plan `UNION` children in parallel.
500        ///
501        /// Defaults to the number of CPU cores on the system
502        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
503
504        /// When set to true, skips verifying that the schema produced by
505        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
506        /// schema of the input plan.
507        ///
508        /// When set to false, if the schema does not match exactly
509        /// (including nullability and metadata), a planning error will be raised.
510        ///
511        /// This is used to workaround bugs in the planner that are now caught by
512        /// the new schema verification step.
513        pub skip_physical_aggregate_schema_check: bool, default = false
514
515        /// Sets the compression codec used when spilling data to disk.
516        ///
517        /// Since datafusion writes spill files using the Arrow IPC Stream format,
518        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
519        /// Valid values are: uncompressed, lz4_frame, zstd.
520        /// Note: lz4_frame offers faster (de)compression, but typically results in
521        /// larger spill files. In contrast, zstd achieves
522        /// higher compression ratios at the cost of slower (de)compression speed.
523        pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
524
525        /// Specifies the reserved memory for each spillable sort operation to
526        /// facilitate an in-memory merge.
527        ///
528        /// When a sort operation spills to disk, the in-memory data must be
529        /// sorted and merged before being written to a file. This setting reserves
530        /// a specific amount of memory for that in-memory sort/merge process.
531        ///
532        /// Note: This setting is irrelevant if the sort operation cannot spill
533        /// (i.e., if there's no `DiskManager` configured).
534        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
535
536        /// When sorting, below what size should data be concatenated
537        /// and sorted in a single RecordBatch rather than sorted in
538        /// batches and merged.
539        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
540
541        /// Maximum size in bytes for individual spill files before rotating to a new file.
542        ///
543        /// When operators spill data to disk (e.g., RepartitionExec), they write
544        /// multiple batches to the same file until this size limit is reached, then rotate
545        /// to a new file. This reduces syscall overhead compared to one-file-per-batch
546        /// while preventing files from growing too large.
547        ///
548        /// A larger value reduces file creation overhead but may hold more disk space.
549        /// A smaller value creates more files but allows finer-grained space reclamation
550        /// as files can be deleted once fully consumed.
551        ///
552        /// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
553        /// may create spill files larger than the limit.
554        ///
555        /// Default: 128 MB
556        pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024
557
558        /// Number of files to read in parallel when inferring schema and statistics
559        pub meta_fetch_concurrency: usize, default = 32
560
561        /// Guarantees a minimum level of output files running in parallel.
562        /// RecordBatches will be distributed in round robin fashion to each
563        /// parallel writer. Each writer is closed and a new file opened once
564        /// soft_max_rows_per_output_file is reached.
565        pub minimum_parallel_output_files: usize, default = 4
566
567        /// Target number of rows in output files when writing multiple.
568        /// This is a soft max, so it can be exceeded slightly. There also
569        /// will be one file smaller than the limit if the total
570        /// number of rows written is not roughly divisible by the soft max
571        pub soft_max_rows_per_output_file: usize, default = 50000000
572
573        /// This is the maximum number of RecordBatches buffered
574        /// for each output file being worked. Higher values can potentially
575        /// give faster write performance at the cost of higher peak
576        /// memory consumption
577        pub max_buffered_batches_per_output_file: usize, default = 2
578
579        /// Should sub directories be ignored when scanning directories for data
580        /// files. Defaults to true (ignores subdirectories), consistent with
581        /// Hive. Note that this setting does not affect reading partitioned
582        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
583        pub listing_table_ignore_subdirectory: bool, default = true
584
585        /// Should a `ListingTable` created through the `ListingTableFactory` infer table
586        /// partitions from Hive compliant directories. Defaults to true (partition columns are
587        /// inferred and will be represented in the table schema).
588        pub listing_table_factory_infer_partitions: bool, default = true
589
590        /// Should DataFusion support recursive CTEs
591        pub enable_recursive_ctes: bool, default = true
592
593        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
594        /// statistics into the same file groups.
595        /// Currently experimental
596        pub split_file_groups_by_statistics: bool, default = false
597
598        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
599        pub keep_partition_by_columns: bool, default = false
600
601        /// Aggregation ratio (number of distinct groups / number of input rows)
602        /// threshold for skipping partial aggregation. If the value is greater
603        /// then partial aggregation will skip aggregation for further input
604        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
605
606        /// Number of input rows partial aggregation partition should process, before
607        /// aggregation ratio check and trying to switch to skipping aggregation mode
608        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
609
610        /// Should DataFusion use row number estimates at the input to decide
611        /// whether increasing parallelism is beneficial or not. By default,
612        /// only exact row numbers (not estimates) are used for this decision.
613        /// Setting this flag to `true` will likely produce better plans.
614        /// if the source of statistics is accurate.
615        /// We plan to make this the default in the future.
616        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
617
618        /// Should DataFusion enforce batch size in joins or not. By default,
619        /// DataFusion will not enforce batch size in joins. Enforcing batch size
620        /// in joins can reduce memory usage when joining large
621        /// tables with a highly-selective join filter, but is also slightly slower.
622        pub enforce_batch_size_in_joins: bool, default = false
623
624        /// Size (bytes) of data buffer DataFusion uses when writing output files.
625        /// This affects the size of the data chunks that are uploaded to remote
626        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
627        /// written, it may be necessary to increase this size to avoid errors from
628        /// the remote end point.
629        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
630
631        /// Whether to enable ANSI SQL mode.
632        ///
633        /// The flag is experimental and relevant only for DataFusion Spark built-in functions
634        ///
635        /// When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL
636        /// semantics for expressions, casting, and error handling. This means:
637        /// - **Strict type coercion rules:** implicit casts between incompatible types are disallowed.
638        /// - **Standard SQL arithmetic behavior:** operations such as division by zero,
639        ///   numeric overflow, or invalid casts raise runtime errors rather than returning
640        ///   `NULL` or adjusted values.
641        /// - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling.
642        ///
643        /// When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive,
644        /// non-ANSI mode designed for user convenience and backward compatibility. In this mode:
645        /// - Implicit casts between types are allowed (e.g., string to integer when possible).
646        /// - Arithmetic operations are more lenient — for example, `abs()` on the minimum
647        ///   representable integer value returns the input value instead of raising overflow.
648        /// - Division by zero or invalid casts may return `NULL` instead of failing.
649        ///
650        /// # Default
651        /// `false` — ANSI SQL mode is disabled by default.
652        pub enable_ansi_mode: bool, default = false
653    }
654}
655
656config_namespace! {
657    /// Options for reading and writing parquet files
658    ///
659    /// See also: [`SessionConfig`]
660    ///
661    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
662    pub struct ParquetOptions {
663        // The following options affect reading parquet files
664
665        /// (reading) If true, reads the Parquet data page level metadata (the
666        /// Page Index), if present, to reduce the I/O and number of
667        /// rows decoded.
668        pub enable_page_index: bool, default = true
669
670        /// (reading) If true, the parquet reader attempts to skip entire row groups based
671        /// on the predicate in the query and the metadata (min/max values) stored in
672        /// the parquet file
673        pub pruning: bool, default = true
674
675        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
676        /// the file Schema. This setting can help avoid schema conflicts when querying
677        /// multiple parquet files with schemas containing compatible types but different metadata
678        pub skip_metadata: bool, default = true
679
680        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
681        /// bytes of the parquet file optimistically. If not specified, two reads are required:
682        /// One read to fetch the 8-byte parquet footer and
683        /// another to fetch the metadata length encoded in the footer
684        /// Default setting to 512 KiB, which should be sufficient for most parquet files,
685        /// it can reduce one I/O operation per parquet file. If the metadata is larger than
686        /// the hint, two reads will still be performed.
687        pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
688
689        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
690        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
691        pub pushdown_filters: bool, default = false
692
693        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
694        /// will be reordered heuristically to minimize the cost of evaluation. If false,
695        /// the filters are applied in the same order as written in the query
696        pub reorder_filters: bool, default = false
697
698        /// (reading) Force the use of RowSelections for filter results, when
699        /// pushdown_filters is enabled. If false, the reader will automatically
700        /// choose between a RowSelection and a Bitmap based on the number and
701        /// pattern of selected rows.
702        pub force_filter_selections: bool, default = false
703
704        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
705        /// and `Binary/BinaryLarge` with `BinaryView`.
706        pub schema_force_view_types: bool, default = true
707
708        /// (reading) If true, parquet reader will read columns of
709        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
710        ///
711        /// Parquet files generated by some legacy writers do not correctly set
712        /// the UTF8 flag for strings, causing string columns to be loaded as
713        /// BLOB instead.
714        pub binary_as_string: bool, default = false
715
716        /// (reading) If true, parquet reader will read columns of
717        /// physical type int96 as originating from a different resolution
718        /// than nanosecond. This is useful for reading data from systems like Spark
719        /// which stores microsecond resolution timestamps in an int96 allowing it
720        /// to write values with a larger date range than 64-bit timestamps with
721        /// nanosecond resolution.
722        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
723
724        /// (reading) Use any available bloom filters when reading parquet files
725        pub bloom_filter_on_read: bool, default = true
726
727        /// (reading) The maximum predicate cache size, in bytes. When
728        /// `pushdown_filters` is enabled, sets the maximum memory used to cache
729        /// the results of predicate evaluation between filter evaluation and
730        /// output generation. Decreasing this value will reduce memory usage,
731        /// but may increase IO and CPU usage. None means use the default
732        /// parquet reader setting. 0 means no caching.
733        pub max_predicate_cache_size: Option<usize>, default = None
734
735        // The following options affect writing to parquet files
736        // and map to parquet::file::properties::WriterProperties
737
738        /// (writing) Sets best effort maximum size of data page in bytes
739        pub data_pagesize_limit: usize, default = 1024 * 1024
740
741        /// (writing) Sets write_batch_size in bytes
742        pub write_batch_size: usize, default = 1024
743
744        /// (writing) Sets parquet writer version
745        /// valid values are "1.0" and "2.0"
746        pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()
747
748        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
749        ///
750        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
751        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
752        pub skip_arrow_metadata: bool, default = false
753
754        /// (writing) Sets default parquet compression codec.
755        /// Valid values are: uncompressed, snappy, gzip(level),
756        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
757        /// These values are not case sensitive. If NULL, uses
758        /// default parquet writer setting
759        ///
760        /// Note that this default setting is not the same as
761        /// the default parquet writer setting.
762        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
763
764        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
765        /// default parquet writer setting
766        pub dictionary_enabled: Option<bool>, default = Some(true)
767
768        /// (writing) Sets best effort maximum dictionary page size, in bytes
769        pub dictionary_page_size_limit: usize, default = 1024 * 1024
770
771        /// (writing) Sets if statistics are enabled for any column
772        /// Valid values are: "none", "chunk", and "page"
773        /// These values are not case sensitive. If NULL, uses
774        /// default parquet writer setting
775        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
776
777        /// (writing) Target maximum number of rows in each row group (defaults to 1M
778        /// rows). Writing larger row groups requires more memory to write, but
779        /// can get better compression and be faster to read.
780        pub max_row_group_size: usize, default =  1024 * 1024
781
782        /// (writing) Sets "created by" property
783        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
784
785        /// (writing) Sets column index truncate length
786        pub column_index_truncate_length: Option<usize>, default = Some(64)
787
788        /// (writing) Sets statistics truncate length. If NULL, uses
789        /// default parquet writer setting
790        pub statistics_truncate_length: Option<usize>, default = Some(64)
791
792        /// (writing) Sets best effort maximum number of rows in data page
793        pub data_page_row_count_limit: usize, default = 20_000
794
795        /// (writing)  Sets default encoding for any column.
796        /// Valid values are: plain, plain_dictionary, rle,
797        /// bit_packed, delta_binary_packed, delta_length_byte_array,
798        /// delta_byte_array, rle_dictionary, and byte_stream_split.
799        /// These values are not case sensitive. If NULL, uses
800        /// default parquet writer setting
801        pub encoding: Option<String>, transform = str::to_lowercase, default = None
802
803        /// (writing) Write bloom filters for all columns when creating parquet files
804        pub bloom_filter_on_write: bool, default = false
805
806        /// (writing) Sets bloom filter false positive probability. If NULL, uses
807        /// default parquet writer setting
808        pub bloom_filter_fpp: Option<f64>, default = None
809
810        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
811        /// default parquet writer setting
812        pub bloom_filter_ndv: Option<u64>, default = None
813
814        /// (writing) Controls whether DataFusion will attempt to speed up writing
815        /// parquet files by serializing them in parallel. Each column
816        /// in each row group in each output file are serialized in parallel
817        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
818        pub allow_single_file_parallelism: bool, default = true
819
820        /// (writing) By default parallel parquet writer is tuned for minimum
821        /// memory usage in a streaming execution plan. You may see
822        /// a performance benefit when writing large parquet files
823        /// by increasing maximum_parallel_row_group_writers and
824        /// maximum_buffered_record_batches_per_stream if your system
825        /// has idle cores and can tolerate additional memory usage.
826        /// Boosting these values is likely worthwhile when
827        /// writing out already in-memory data, such as from a cached
828        /// data frame.
829        pub maximum_parallel_row_group_writers: usize, default = 1
830
831        /// (writing) By default parallel parquet writer is tuned for minimum
832        /// memory usage in a streaming execution plan. You may see
833        /// a performance benefit when writing large parquet files
834        /// by increasing maximum_parallel_row_group_writers and
835        /// maximum_buffered_record_batches_per_stream if your system
836        /// has idle cores and can tolerate additional memory usage.
837        /// Boosting these values is likely worthwhile when
838        /// writing out already in-memory data, such as from a cached
839        /// data frame.
840        pub maximum_buffered_record_batches_per_stream: usize, default = 2
841    }
842}
843
844config_namespace! {
845    /// Options for configuring Parquet Modular Encryption
846    ///
847    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
848    pub struct ParquetEncryptionOptions {
849        /// Optional file decryption properties
850        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
851
852        /// Optional file encryption properties
853        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
854
855        /// Identifier for the encryption factory to use to create file encryption and decryption properties.
856        /// Encryption factories can be registered in the runtime environment with
857        /// `RuntimeEnv::register_parquet_encryption_factory`.
858        pub factory_id: Option<String>, default = None
859
860        /// Any encryption factory specific options
861        pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
862    }
863}
864
865impl ParquetEncryptionOptions {
866    /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
867    pub fn configure_factory(
868        &mut self,
869        factory_id: &str,
870        config: &impl ExtensionOptions,
871    ) {
872        self.factory_id = Some(factory_id.to_owned());
873        self.factory_options.options.clear();
874        for entry in config.entries() {
875            if let Some(value) = entry.value {
876                self.factory_options.options.insert(entry.key, value);
877            }
878        }
879    }
880}
881
882config_namespace! {
883    /// Options related to query optimization
884    ///
885    /// See also: [`SessionConfig`]
886    ///
887    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
888    pub struct OptimizerOptions {
889        /// When set to true, the optimizer will push a limit operation into
890        /// grouped aggregations which have no aggregate expressions, as a soft limit,
891        /// emitting groups once the limit is reached, before all rows in the group are read.
892        pub enable_distinct_aggregation_soft_limit: bool, default = true
893
894        /// When set to true, the physical plan optimizer will try to add round robin
895        /// repartitioning to increase parallelism to leverage more CPU cores
896        pub enable_round_robin_repartition: bool, default = true
897
898        /// When set to true, the optimizer will attempt to perform limit operations
899        /// during aggregations, if possible
900        pub enable_topk_aggregation: bool, default = true
901
902        /// When set to true, the optimizer will attempt to push limit operations
903        /// past window functions, if possible
904        pub enable_window_limits: bool, default = true
905
906        /// When set to true, the optimizer will attempt to push down TopK dynamic filters
907        /// into the file scan phase.
908        pub enable_topk_dynamic_filter_pushdown: bool, default = true
909
910        /// When set to true, the optimizer will attempt to push down Join dynamic filters
911        /// into the file scan phase.
912        pub enable_join_dynamic_filter_pushdown: bool, default = true
913
914        /// When set to true, the optimizer will attempt to push down Aggregate dynamic filters
915        /// into the file scan phase.
916        pub enable_aggregate_dynamic_filter_pushdown: bool, default = true
917
918        /// When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase.
919        /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
920        /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
921        /// This means that if we already have 10 timestamps in the year 2025
922        /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
923        /// The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown`
924        /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
925        pub enable_dynamic_filter_pushdown: bool, default = true
926
927        /// When set to true, the optimizer will insert filters before a join between
928        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
929        /// filter can add additional overhead when the file format does not fully support
930        /// predicate push down.
931        pub filter_null_join_keys: bool, default = false
932
933        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
934        /// in parallel using the provided `target_partitions` level
935        pub repartition_aggregations: bool, default = true
936
937        /// Minimum total files size in bytes to perform file scan repartitioning.
938        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
939
940        /// Should DataFusion repartition data using the join keys to execute joins in parallel
941        /// using the provided `target_partitions` level
942        pub repartition_joins: bool, default = true
943
944        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
945        /// its inputs do not have any ordering or filtering If the flag is not enabled,
946        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
947        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
948        /// RightAnti, and RightSemi - being produced only at the end of the execution.
949        /// This is not typical in stream processing. Additionally, without proper design for
950        /// long runner execution, all types of joins may encounter out-of-memory errors.
951        pub allow_symmetric_joins_without_pruning: bool, default = true
952
953        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
954        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
955        ///
956        /// For FileSources, only Parquet and CSV formats are currently supported.
957        ///
958        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
959        /// might be partitioned into smaller chunks) for parallel scanning.
960        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
961        /// happen within a single file.
962        ///
963        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
964        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
965        /// the total number of partitions and batches per partition, but does not slice the initial
966        /// record tables provided to the MemTable on creation.
967        pub repartition_file_scans: bool, default = true
968
969        /// Minimum number of distinct partition values required to group files by their
970        /// Hive partition column values (enabling Hash partitioning declaration).
971        ///
972        /// How the option is used:
973        ///     - preserve_file_partitions=0: Disable it.
974        ///     - preserve_file_partitions=1: Always enable it.
975        ///     - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N.
976        ///     This threshold preserves I/O parallelism when file partitioning is below it.
977        ///
978        /// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
979        /// partitions is less than the target_partitions.
980        pub preserve_file_partitions: usize, default = 0
981
982        /// Should DataFusion repartition data using the partitions keys to execute window
983        /// functions in parallel using the provided `target_partitions` level
984        pub repartition_windows: bool, default = true
985
986        /// Should DataFusion execute sorts in a per-partition fashion and merge
987        /// afterwards instead of coalescing first and sorting globally.
988        /// With this flag is enabled, plans in the form below
989        ///
990        /// ```text
991        ///      "SortExec: [a@0 ASC]",
992        ///      "  CoalescePartitionsExec",
993        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
994        /// ```
995        /// would turn into the plan below which performs better in multithreaded environments
996        ///
997        /// ```text
998        ///      "SortPreservingMergeExec: [a@0 ASC]",
999        ///      "  SortExec: [a@0 ASC]",
1000        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1001        /// ```
1002        pub repartition_sorts: bool, default = true
1003
1004        /// Partition count threshold for subset satisfaction optimization.
1005        ///
1006        /// When the current partition count is >= this threshold, DataFusion will
1007        /// skip repartitioning if the required partitioning expression is a subset
1008        /// of the current partition expression such as Hash(a) satisfies Hash(a, b).
1009        ///
1010        /// When the current partition count is < this threshold, DataFusion will
1011        /// repartition to increase parallelism even when subset satisfaction applies.
1012        ///
1013        /// Set to 0 to always repartition (disable subset satisfaction optimization).
1014        /// Set to a high value to always use subset satisfaction.
1015        ///
1016        /// Example (subset_repartition_threshold = 4):
1017        /// ```text
1018        ///     Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a])
1019        ///
1020        ///     If current partitions (3) < threshold (4), repartition:
1021        ///     AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
1022        ///       RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3
1023        ///         AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
1024        ///           DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3)
1025        ///
1026        ///     If current partitions (8) >= threshold (4), use subset satisfaction:
1027        ///     AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)]
1028        ///       DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8)
1029        /// ```
1030        pub subset_repartition_threshold: usize, default = 4
1031
1032        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
1033        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
1034        /// using `SortPreservingMergeExec`)
1035        ///
1036        /// When false, DataFusion will maximize plan parallelism using
1037        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
1038        pub prefer_existing_sort: bool, default = false
1039
1040        /// When set to true, the logical plan optimizer will produce warning
1041        /// messages if any optimization rules produce errors and then proceed to the next
1042        /// rule. When set to false, any rules that produce errors will cause the query to fail
1043        pub skip_failed_rules: bool, default = false
1044
1045        /// Number of times that the optimizer will attempt to optimize the plan
1046        pub max_passes: usize, default = 3
1047
1048        /// When set to true, the physical plan optimizer will run a top down
1049        /// process to reorder the join keys
1050        pub top_down_join_key_reordering: bool, default = true
1051
1052        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
1053        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
1054        pub prefer_hash_join: bool, default = true
1055
1056        /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
1057        /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
1058        /// one range filter.
1059        pub enable_piecewise_merge_join: bool, default = false
1060
1061        /// The maximum estimated size in bytes for one input side of a HashJoin
1062        /// will be collected into a single partition
1063        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
1064
1065        /// The maximum estimated size in rows for one input side of a HashJoin
1066        /// will be collected into a single partition
1067        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
1068
1069        /// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1070        /// Build sides larger than this will use hash table lookups instead.
1071        /// Set to 0 to always use hash table lookups.
1072        ///
1073        /// InList pushdown can be more efficient for small build sides because it can result in better
1074        /// statistics pruning as well as use any bloom filters present on the scan side.
1075        /// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
1076        /// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
1077        ///
1078        /// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
1079        ///
1080        /// The default is 128kB per partition.
1081        /// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
1082        /// but avoids excessive memory usage or overhead for larger joins.
1083        pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
1084
1085        /// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1086        /// Build sides with more rows than this will use hash table lookups instead.
1087        /// Set to 0 to always use hash table lookups.
1088        ///
1089        /// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
1090        /// very large IN lists that might not provide much benefit over hash table lookups.
1091        ///
1092        /// This uses the deduplicated row count once the build side has been evaluated.
1093        ///
1094        /// The default is 150 values per partition.
1095        /// This is inspired by Trino's `max-filter-keys-per-column` setting.
1096        /// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1097        pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1098
1099        /// The default filter selectivity used by Filter Statistics
1100        /// when an exact selectivity cannot be determined. Valid values are
1101        /// between 0 (no selectivity) and 100 (all rows are selected).
1102        pub default_filter_selectivity: u8, default = 20
1103
1104        /// When set to true, the optimizer will not attempt to convert Union to Interleave
1105        pub prefer_existing_union: bool, default = false
1106
1107        /// When set to true, if the returned type is a view type
1108        /// then the output will be coerced to a non-view.
1109        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
1110        pub expand_views_at_output: bool, default = false
1111
1112        /// Enable sort pushdown optimization.
1113        /// When enabled, attempts to push sort requirements down to data sources
1114        /// that can natively handle them (e.g., by reversing file/row group read order).
1115        ///
1116        /// Returns **inexact ordering**: Sort operator is kept for correctness,
1117        /// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N),
1118        /// providing significant speedup.
1119        ///
1120        /// Memory: No additional overhead (only changes read order).
1121        ///
1122        /// Future: Will add option to detect perfectly sorted data and eliminate Sort completely.
1123        ///
1124        /// Default: true
1125        pub enable_sort_pushdown: bool, default = true
1126    }
1127}
1128
1129config_namespace! {
1130    /// Options controlling explain output
1131    ///
1132    /// See also: [`SessionConfig`]
1133    ///
1134    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
1135    pub struct ExplainOptions {
1136        /// When set to true, the explain statement will only print logical plans
1137        pub logical_plan_only: bool, default = false
1138
1139        /// When set to true, the explain statement will only print physical plans
1140        pub physical_plan_only: bool, default = false
1141
1142        /// When set to true, the explain statement will print operator statistics
1143        /// for physical plans
1144        pub show_statistics: bool, default = false
1145
1146        /// When set to true, the explain statement will print the partition sizes
1147        pub show_sizes: bool, default = true
1148
1149        /// When set to true, the explain statement will print schema information
1150        pub show_schema: bool, default = false
1151
1152        /// Display format of explain. Default is "indent".
1153        /// When set to "tree", it will print the plan in a tree-rendered format.
1154        pub format: ExplainFormat, default = ExplainFormat::Indent
1155
1156        /// (format=tree only) Maximum total width of the rendered tree.
1157        /// When set to 0, the tree will have no width limit.
1158        pub tree_maximum_render_width: usize, default = 240
1159
1160        /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1161        /// "summary" shows common metrics for high-level insights.
1162        /// "dev" provides deep operator-level introspection for developers.
1163        pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1164    }
1165}
1166
1167impl ExecutionOptions {
1168    /// Returns the correct parallelism based on the provided `value`.
1169    /// If `value` is `"0"`, returns the default available parallelism, computed with
1170    /// `get_available_parallelism`. Otherwise, returns `value`.
1171    fn normalized_parallelism(value: &str) -> String {
1172        if value.parse::<usize>() == Ok(0) {
1173            get_available_parallelism().to_string()
1174        } else {
1175            value.to_owned()
1176        }
1177    }
1178}
1179
1180config_namespace! {
1181    /// Options controlling the format of output when printing record batches
1182    /// Copies [`arrow::util::display::FormatOptions`]
1183    pub struct FormatOptions {
1184        /// If set to `true` any formatting errors will be written to the output
1185        /// instead of being converted into a [`std::fmt::Error`]
1186        pub safe: bool, default = true
1187        /// Format string for nulls
1188        pub null: String, default = "".into()
1189        /// Date format for date arrays
1190        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1191        /// Format for DateTime arrays
1192        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1193        /// Timestamp format for timestamp arrays
1194        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1195        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1196        pub timestamp_tz_format: Option<String>, default = None
1197        /// Time format for time arrays
1198        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1199        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1200        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1201        /// Show types in visual representation batches
1202        pub types_info: bool, default = false
1203    }
1204}
1205
1206impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
1207    type Error = DataFusionError;
1208    fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
1209        let duration_format = match self.duration_format.as_str() {
1210            "pretty" => arrow::util::display::DurationFormat::Pretty,
1211            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1212            _ => {
1213                return _config_err!(
1214                    "Invalid duration format: {}. Valid values are pretty or iso8601",
1215                    self.duration_format
1216                );
1217            }
1218        };
1219
1220        Ok(arrow::util::display::FormatOptions::new()
1221            .with_display_error(self.safe)
1222            .with_null(&self.null)
1223            .with_date_format(self.date_format.as_deref())
1224            .with_datetime_format(self.datetime_format.as_deref())
1225            .with_timestamp_format(self.timestamp_format.as_deref())
1226            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
1227            .with_time_format(self.time_format.as_deref())
1228            .with_duration_format(duration_format)
1229            .with_types_info(self.types_info))
1230    }
1231}
1232
1233/// A key value pair, with a corresponding description
1234#[derive(Debug, Hash, PartialEq, Eq)]
1235pub struct ConfigEntry {
1236    /// A unique string to identify this config value
1237    pub key: String,
1238
1239    /// The value if any
1240    pub value: Option<String>,
1241
1242    /// A description of this configuration entry
1243    pub description: &'static str,
1244}
1245
1246/// Configuration options struct, able to store both built-in configuration and custom options
1247#[derive(Debug, Clone, Default)]
1248#[non_exhaustive]
1249pub struct ConfigOptions {
1250    /// Catalog options
1251    pub catalog: CatalogOptions,
1252    /// Execution options
1253    pub execution: ExecutionOptions,
1254    /// Optimizer options
1255    pub optimizer: OptimizerOptions,
1256    /// SQL parser options
1257    pub sql_parser: SqlParserOptions,
1258    /// Explain options
1259    pub explain: ExplainOptions,
1260    /// Optional extensions registered using [`Extensions::insert`]
1261    pub extensions: Extensions,
1262    /// Formatting options when printing batches
1263    pub format: FormatOptions,
1264}
1265
1266impl ConfigField for ConfigOptions {
1267    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1268        self.catalog.visit(v, "datafusion.catalog", "");
1269        self.execution.visit(v, "datafusion.execution", "");
1270        self.optimizer.visit(v, "datafusion.optimizer", "");
1271        self.explain.visit(v, "datafusion.explain", "");
1272        self.sql_parser.visit(v, "datafusion.sql_parser", "");
1273        self.format.visit(v, "datafusion.format", "");
1274    }
1275
1276    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1277        // Extensions are handled in the public `ConfigOptions::set`
1278        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1279        match key {
1280            "catalog" => self.catalog.set(rem, value),
1281            "execution" => self.execution.set(rem, value),
1282            "optimizer" => self.optimizer.set(rem, value),
1283            "explain" => self.explain.set(rem, value),
1284            "sql_parser" => self.sql_parser.set(rem, value),
1285            "format" => self.format.set(rem, value),
1286            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1287        }
1288    }
1289
1290    /// Reset a configuration option back to its default value
1291    fn reset(&mut self, key: &str) -> Result<()> {
1292        let Some((prefix, rest)) = key.split_once('.') else {
1293            return _config_err!("could not find config namespace for key \"{key}\"");
1294        };
1295
1296        if prefix != "datafusion" {
1297            return _config_err!("Could not find config namespace \"{prefix}\"");
1298        }
1299
1300        let (section, rem) = rest.split_once('.').unwrap_or((rest, ""));
1301        if rem.is_empty() {
1302            return _config_err!("could not find config field for key \"{key}\"");
1303        }
1304
1305        match section {
1306            "catalog" => self.catalog.reset(rem),
1307            "execution" => self.execution.reset(rem),
1308            "optimizer" => {
1309                if rem == "enable_dynamic_filter_pushdown" {
1310                    let defaults = OptimizerOptions::default();
1311                    self.optimizer.enable_dynamic_filter_pushdown =
1312                        defaults.enable_dynamic_filter_pushdown;
1313                    self.optimizer.enable_topk_dynamic_filter_pushdown =
1314                        defaults.enable_topk_dynamic_filter_pushdown;
1315                    self.optimizer.enable_join_dynamic_filter_pushdown =
1316                        defaults.enable_join_dynamic_filter_pushdown;
1317                    Ok(())
1318                } else {
1319                    self.optimizer.reset(rem)
1320                }
1321            }
1322            "explain" => self.explain.reset(rem),
1323            "sql_parser" => self.sql_parser.reset(rem),
1324            "format" => self.format.reset(rem),
1325            other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
1326        }
1327    }
1328}
1329
1330impl ConfigOptions {
1331    /// Creates a new [`ConfigOptions`] with default values
1332    pub fn new() -> Self {
1333        Self::default()
1334    }
1335
1336    /// Set extensions to provided value
1337    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1338        self.extensions = extensions;
1339        self
1340    }
1341
1342    /// Set a configuration option
1343    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1344        let Some((prefix, key)) = key.split_once('.') else {
1345            return _config_err!("could not find config namespace for key \"{key}\"");
1346        };
1347
1348        if prefix == "datafusion" {
1349            if key == "optimizer.enable_dynamic_filter_pushdown" {
1350                let bool_value = value.parse::<bool>().map_err(|e| {
1351                    DataFusionError::Configuration(format!(
1352                        "Failed to parse '{value}' as bool: {e}",
1353                    ))
1354                })?;
1355
1356                {
1357                    self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1358                    self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1359                    self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1360                    self.optimizer.enable_aggregate_dynamic_filter_pushdown = bool_value;
1361                }
1362                return Ok(());
1363            }
1364            return ConfigField::set(self, key, value);
1365        }
1366
1367        let Some(e) = self.extensions.0.get_mut(prefix) else {
1368            return _config_err!("Could not find config namespace \"{prefix}\"");
1369        };
1370        e.0.set(key, value)
1371    }
1372
1373    /// Create new [`ConfigOptions`], taking values from environment variables
1374    /// where possible.
1375    ///
1376    /// For example, to configure `datafusion.execution.batch_size`
1377    /// ([`ExecutionOptions::batch_size`]) you would set the
1378    /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1379    ///
1380    /// The name of the environment variable is the option's key, transformed to
1381    /// uppercase and with periods replaced with underscores.
1382    ///
1383    /// Values are parsed according to the [same rules used in casts from
1384    /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1385    ///
1386    /// If the value in the environment variable cannot be cast to the type of
1387    /// the configuration option, the default value will be used instead and a
1388    /// warning emitted. Environment variables are read when this method is
1389    /// called, and are not re-read later.
1390    pub fn from_env() -> Result<Self> {
1391        struct Visitor(Vec<String>);
1392
1393        impl Visit for Visitor {
1394            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1395                self.0.push(key.to_string())
1396            }
1397
1398            fn none(&mut self, key: &str, _: &'static str) {
1399                self.0.push(key.to_string())
1400            }
1401        }
1402
1403        // Extract the names of all fields and then look up the corresponding
1404        // environment variables. This isn't hugely efficient but avoids
1405        // ambiguity between `a.b` and `a_b` which would both correspond
1406        // to an environment variable of `A_B`
1407
1408        let mut keys = Visitor(vec![]);
1409        let mut ret = Self::default();
1410        ret.visit(&mut keys, "datafusion", "");
1411
1412        for key in keys.0 {
1413            let env = key.to_uppercase().replace('.', "_");
1414            if let Some(var) = std::env::var_os(env) {
1415                let value = var.to_string_lossy();
1416                log::info!("Set {key} to {value} from the environment variable");
1417                ret.set(&key, value.as_ref())?;
1418            }
1419        }
1420
1421        Ok(ret)
1422    }
1423
1424    /// Create new ConfigOptions struct, taking values from a string hash map.
1425    ///
1426    /// Only the built-in configurations will be extracted from the hash map
1427    /// and other key value pairs will be ignored.
1428    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1429        struct Visitor(Vec<String>);
1430
1431        impl Visit for Visitor {
1432            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1433                self.0.push(key.to_string())
1434            }
1435
1436            fn none(&mut self, key: &str, _: &'static str) {
1437                self.0.push(key.to_string())
1438            }
1439        }
1440
1441        let mut keys = Visitor(vec![]);
1442        let mut ret = Self::default();
1443        ret.visit(&mut keys, "datafusion", "");
1444
1445        for key in keys.0 {
1446            if let Some(var) = settings.get(&key) {
1447                ret.set(&key, var)?;
1448            }
1449        }
1450
1451        Ok(ret)
1452    }
1453
1454    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1455    pub fn entries(&self) -> Vec<ConfigEntry> {
1456        struct Visitor(Vec<ConfigEntry>);
1457
1458        impl Visit for Visitor {
1459            fn some<V: Display>(
1460                &mut self,
1461                key: &str,
1462                value: V,
1463                description: &'static str,
1464            ) {
1465                self.0.push(ConfigEntry {
1466                    key: key.to_string(),
1467                    value: Some(value.to_string()),
1468                    description,
1469                })
1470            }
1471
1472            fn none(&mut self, key: &str, description: &'static str) {
1473                self.0.push(ConfigEntry {
1474                    key: key.to_string(),
1475                    value: None,
1476                    description,
1477                })
1478            }
1479        }
1480
1481        let mut v = Visitor(vec![]);
1482        self.visit(&mut v, "datafusion", "");
1483
1484        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1485        v.0
1486    }
1487
1488    /// Generate documentation that can be included in the user guide
1489    pub fn generate_config_markdown() -> String {
1490        use std::fmt::Write as _;
1491
1492        let mut s = Self::default();
1493
1494        // Normalize for display
1495        s.execution.target_partitions = 0;
1496        s.execution.planning_concurrency = 0;
1497
1498        let mut docs = "| key | default | description |\n".to_string();
1499        docs += "|-----|---------|-------------|\n";
1500        let mut entries = s.entries();
1501        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1502
1503        for entry in s.entries() {
1504            let _ = writeln!(
1505                &mut docs,
1506                "| {} | {} | {} |",
1507                entry.key,
1508                entry.value.as_deref().unwrap_or("NULL"),
1509                entry.description
1510            );
1511        }
1512        docs
1513    }
1514}
1515
1516/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1517/// within DataFusion [`ConfigOptions`]
1518///
1519/// This mechanism can be used to pass configuration to user defined functions
1520/// or optimizer passes
1521///
1522/// # Example
1523/// ```
1524/// use datafusion_common::{
1525///     config::ConfigExtension, config::ConfigOptions, extensions_options,
1526/// };
1527/// // Define a new configuration struct using the `extensions_options` macro
1528/// extensions_options! {
1529///    /// My own config options.
1530///    pub struct MyConfig {
1531///        /// Should "foo" be replaced by "bar"?
1532///        pub foo_to_bar: bool, default = true
1533///
1534///        /// How many "baz" should be created?
1535///        pub baz_count: usize, default = 1337
1536///    }
1537/// }
1538///
1539/// impl ConfigExtension for MyConfig {
1540///     const PREFIX: &'static str = "my_config";
1541/// }
1542///
1543/// // set up config struct and register extension
1544/// let mut config = ConfigOptions::default();
1545/// config.extensions.insert(MyConfig::default());
1546///
1547/// // overwrite config default
1548/// config.set("my_config.baz_count", "42").unwrap();
1549///
1550/// // check config state
1551/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1552/// assert!(my_config.foo_to_bar,);
1553/// assert_eq!(my_config.baz_count, 42,);
1554/// ```
1555///
1556/// # Note:
1557/// Unfortunately associated constants are not currently object-safe, and so this
1558/// extends the object-safe [`ExtensionOptions`]
1559pub trait ConfigExtension: ExtensionOptions {
1560    /// Configuration namespace prefix to use
1561    ///
1562    /// All values under this will be prefixed with `$PREFIX + "."`
1563    const PREFIX: &'static str;
1564}
1565
1566/// An object-safe API for storing arbitrary configuration.
1567///
1568/// See [`ConfigExtension`] for user defined configuration
1569pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1570    /// Return `self` as [`Any`]
1571    ///
1572    /// This is needed until trait upcasting is stabilized
1573    fn as_any(&self) -> &dyn Any;
1574
1575    /// Return `self` as [`Any`]
1576    ///
1577    /// This is needed until trait upcasting is stabilized
1578    fn as_any_mut(&mut self) -> &mut dyn Any;
1579
1580    /// Return a deep clone of this [`ExtensionOptions`]
1581    ///
1582    /// It is important this does not share mutable state to avoid consistency issues
1583    /// with configuration changing whilst queries are executing
1584    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1585
1586    /// Set the given `key`, `value` pair
1587    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1588
1589    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1590    fn entries(&self) -> Vec<ConfigEntry>;
1591}
1592
1593/// A type-safe container for [`ConfigExtension`]
1594#[derive(Debug, Default, Clone)]
1595pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1596
1597impl Extensions {
1598    /// Create a new, empty [`Extensions`]
1599    pub fn new() -> Self {
1600        Self(BTreeMap::new())
1601    }
1602
1603    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1604    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1605        assert_ne!(T::PREFIX, "datafusion");
1606        let e = ExtensionBox(Box::new(extension));
1607        self.0.insert(T::PREFIX, e);
1608    }
1609
1610    /// Retrieves the extension of the given type if any
1611    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1612        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1613    }
1614
1615    /// Retrieves the extension of the given type if any
1616    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1617        let e = self.0.get_mut(T::PREFIX)?;
1618        e.0.as_any_mut().downcast_mut()
1619    }
1620
1621    /// Iterates all the config extension entries yielding their prefix and their
1622    /// [ExtensionOptions] implementation.
1623    pub fn iter(
1624        &self,
1625    ) -> impl Iterator<Item = (&'static str, &Box<dyn ExtensionOptions>)> {
1626        self.0.iter().map(|(k, v)| (*k, &v.0))
1627    }
1628}
1629
1630#[derive(Debug)]
1631struct ExtensionBox(Box<dyn ExtensionOptions>);
1632
1633impl Clone for ExtensionBox {
1634    fn clone(&self) -> Self {
1635        Self(self.0.cloned())
1636    }
1637}
1638
1639/// A trait implemented by `config_namespace` and for field types that provides
1640/// the ability to walk and mutate the configuration tree
1641pub trait ConfigField {
1642    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1643
1644    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1645
1646    fn reset(&mut self, key: &str) -> Result<()> {
1647        _config_err!("Reset is not supported for this config field, key: {}", key)
1648    }
1649}
1650
1651impl<F: ConfigField + Default> ConfigField for Option<F> {
1652    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1653        match self {
1654            Some(s) => s.visit(v, key, description),
1655            None => v.none(key, description),
1656        }
1657    }
1658
1659    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1660        self.get_or_insert_with(Default::default).set(key, value)
1661    }
1662
1663    fn reset(&mut self, key: &str) -> Result<()> {
1664        if key.is_empty() {
1665            *self = Default::default();
1666            Ok(())
1667        } else {
1668            self.get_or_insert_with(Default::default).reset(key)
1669        }
1670    }
1671}
1672
1673/// Default transformation to parse a [`ConfigField`] for a string.
1674///
1675/// This uses [`FromStr`] to parse the data.
1676pub fn default_config_transform<T>(input: &str) -> Result<T>
1677where
1678    T: FromStr,
1679    <T as FromStr>::Err: Sync + Send + Error + 'static,
1680{
1681    input.parse().map_err(|e| {
1682        DataFusionError::Context(
1683            format!(
1684                "Error parsing '{}' as {}",
1685                input,
1686                std::any::type_name::<T>()
1687            ),
1688            Box::new(DataFusionError::External(Box::new(e))),
1689        )
1690    })
1691}
1692
1693/// Macro that generates [`ConfigField`] for a given type.
1694///
1695/// # Usage
1696/// This always requires [`Display`] to be implemented for the given type.
1697///
1698/// There are two ways to invoke this macro. The first one uses
1699/// [`default_config_transform`]/[`FromStr`] to parse the data:
1700///
1701/// ```ignore
1702/// config_field(MyType);
1703/// ```
1704///
1705/// Note that the parsing error MUST implement [`std::error::Error`]!
1706///
1707/// Or you can specify how you want to parse an [`str`] into the type:
1708///
1709/// ```ignore
1710/// fn parse_it(s: &str) -> Result<MyType> {
1711///     ...
1712/// }
1713///
1714/// config_field(
1715///     MyType,
1716///     value => parse_it(value)
1717/// );
1718/// ```
1719#[macro_export]
1720macro_rules! config_field {
1721    ($t:ty) => {
1722        config_field!($t, value => $crate::config::default_config_transform(value)?);
1723    };
1724
1725    ($t:ty, $arg:ident => $transform:expr) => {
1726        impl $crate::config::ConfigField for $t {
1727            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1728                v.some(key, self, description)
1729            }
1730
1731            fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1732                *self = $transform;
1733                Ok(())
1734            }
1735
1736            fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
1737                if key.is_empty() {
1738                    *self = <$t as Default>::default();
1739                    Ok(())
1740                } else {
1741                    $crate::error::_config_err!(
1742                        "Config field is a scalar {} and does not have nested field \"{}\"",
1743                        stringify!($t),
1744                        key
1745                    )
1746                }
1747            }
1748        }
1749    };
1750}
1751
1752config_field!(String);
1753config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1754config_field!(usize);
1755config_field!(f64);
1756config_field!(u64);
1757config_field!(u32);
1758
1759impl ConfigField for u8 {
1760    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1761        v.some(key, self, description)
1762    }
1763
1764    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1765        if value.is_empty() {
1766            return Err(DataFusionError::Configuration(format!(
1767                "Input string for {key} key is empty"
1768            )));
1769        }
1770        // Check if the string is a valid number
1771        if let Ok(num) = value.parse::<u8>() {
1772            // TODO: Let's decide how we treat the numerical strings.
1773            *self = num;
1774        } else {
1775            let bytes = value.as_bytes();
1776            // Check if the first character is ASCII (single byte)
1777            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1778                return Err(DataFusionError::Configuration(format!(
1779                    "Error parsing {value} as u8. Non-ASCII string provided"
1780                )));
1781            }
1782            *self = bytes[0];
1783        }
1784        Ok(())
1785    }
1786}
1787
1788impl ConfigField for CompressionTypeVariant {
1789    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1790        v.some(key, self, description)
1791    }
1792
1793    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1794        *self = CompressionTypeVariant::from_str(value)?;
1795        Ok(())
1796    }
1797}
1798
1799/// An implementation trait used to recursively walk configuration
1800pub trait Visit {
1801    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1802
1803    fn none(&mut self, key: &str, description: &'static str);
1804}
1805
1806/// Convenience macro to create [`ExtensionsOptions`].
1807///
1808/// The created structure implements the following traits:
1809///
1810/// - [`Clone`]
1811/// - [`Debug`]
1812/// - [`Default`]
1813/// - [`ExtensionOptions`]
1814///
1815/// # Usage
1816/// The syntax is:
1817///
1818/// ```text
1819/// extensions_options! {
1820///      /// Struct docs (optional).
1821///     [<vis>] struct <StructName> {
1822///         /// Field docs (optional)
1823///         [<vis>] <field_name>: <field_type>, default = <default_value>
1824///
1825///         ... more fields
1826///     }
1827/// }
1828/// ```
1829///
1830/// The placeholders are:
1831/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1832/// - `<StructName>`: Struct name like `MyStruct`.
1833/// - `<field_name>`: Field name like `my_field`.
1834/// - `<field_type>`: Field type like `u8`.
1835/// - `<default_value>`: Default value matching the field type like `42`.
1836///
1837/// # Example
1838/// See also a full example on the [`ConfigExtension`] documentation
1839///
1840/// ```
1841/// use datafusion_common::extensions_options;
1842///
1843/// extensions_options! {
1844///     /// My own config options.
1845///     pub struct MyConfig {
1846///         /// Should "foo" be replaced by "bar"?
1847///         pub foo_to_bar: bool, default = true
1848///
1849///         /// How many "baz" should be created?
1850///         pub baz_count: usize, default = 1337
1851///     }
1852/// }
1853/// ```
1854///
1855///
1856/// [`Debug`]: std::fmt::Debug
1857/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1858#[macro_export]
1859macro_rules! extensions_options {
1860    (
1861     $(#[doc = $struct_d:tt])*
1862     $vis:vis struct $struct_name:ident {
1863        $(
1864        $(#[doc = $d:tt])*
1865        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1866        )*$(,)*
1867    }
1868    ) => {
1869        $(#[doc = $struct_d])*
1870        #[derive(Debug, Clone)]
1871        #[non_exhaustive]
1872        $vis struct $struct_name{
1873            $(
1874            $(#[doc = $d])*
1875            $field_vis $field_name : $field_type,
1876            )*
1877        }
1878
1879        impl Default for $struct_name {
1880            fn default() -> Self {
1881                Self {
1882                    $($field_name: $default),*
1883                }
1884            }
1885        }
1886
1887        impl $crate::config::ExtensionOptions for $struct_name {
1888            fn as_any(&self) -> &dyn ::std::any::Any {
1889                self
1890            }
1891
1892            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1893                self
1894            }
1895
1896            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1897                Box::new(self.clone())
1898            }
1899
1900            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1901                $crate::config::ConfigField::set(self, key, value)
1902            }
1903
1904            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1905                struct Visitor(Vec<$crate::config::ConfigEntry>);
1906
1907                impl $crate::config::Visit for Visitor {
1908                    fn some<V: std::fmt::Display>(
1909                        &mut self,
1910                        key: &str,
1911                        value: V,
1912                        description: &'static str,
1913                    ) {
1914                        self.0.push($crate::config::ConfigEntry {
1915                            key: key.to_string(),
1916                            value: Some(value.to_string()),
1917                            description,
1918                        })
1919                    }
1920
1921                    fn none(&mut self, key: &str, description: &'static str) {
1922                        self.0.push($crate::config::ConfigEntry {
1923                            key: key.to_string(),
1924                            value: None,
1925                            description,
1926                        })
1927                    }
1928                }
1929
1930                let mut v = Visitor(vec![]);
1931                // The prefix is not used for extensions.
1932                // The description is generated in ConfigField::visit.
1933                // We can just pass empty strings here.
1934                $crate::config::ConfigField::visit(self, &mut v, "", "");
1935                v.0
1936            }
1937        }
1938
1939        impl $crate::config::ConfigField for $struct_name {
1940            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1941                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1942                match key {
1943                    $(
1944                        stringify!($field_name) => {
1945                            // Safely apply deprecated attribute if present
1946                            // $(#[allow(deprecated)])?
1947                            {
1948                                            self.$field_name.set(rem, value.as_ref())
1949                            }
1950                        },
1951                    )*
1952                    _ => return $crate::error::_config_err!(
1953                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1954                    )
1955                }
1956            }
1957
1958            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1959                $(
1960                    let key = stringify!($field_name).to_string();
1961                    let desc = concat!($($d),*).trim();
1962                    self.$field_name.visit(v, key.as_str(), desc);
1963                )*
1964            }
1965        }
1966    }
1967}
1968
1969/// These file types have special built in behavior for configuration.
1970/// Use TableOptions::Extensions for configuring other file types.
1971#[derive(Debug, Clone)]
1972pub enum ConfigFileType {
1973    CSV,
1974    #[cfg(feature = "parquet")]
1975    PARQUET,
1976    JSON,
1977}
1978
1979/// Represents the configuration options available for handling different table formats within a data processing application.
1980/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1981/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1982#[derive(Debug, Clone, Default)]
1983pub struct TableOptions {
1984    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1985    /// quote character, and whether the first row is considered as headers.
1986    pub csv: CsvOptions,
1987
1988    /// Configuration options for Parquet file handling. This includes settings for compression,
1989    /// encoding, and other Parquet-specific file characteristics.
1990    pub parquet: TableParquetOptions,
1991
1992    /// Configuration options for JSON file handling.
1993    pub json: JsonOptions,
1994
1995    /// The current file format that the table operations should assume. This option allows
1996    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1997    pub current_format: Option<ConfigFileType>,
1998
1999    /// Optional extensions that can be used to extend or customize the behavior of the table
2000    /// options. Extensions can be registered using `Extensions::insert` and might include
2001    /// custom file handling logic, additional configuration parameters, or other enhancements.
2002    pub extensions: Extensions,
2003}
2004
2005impl ConfigField for TableOptions {
2006    /// Visits configuration settings for the current file format, or all formats if none is selected.
2007    ///
2008    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
2009    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
2010    /// it visits all available format settings.
2011    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
2012        if let Some(file_type) = &self.current_format {
2013            match file_type {
2014                #[cfg(feature = "parquet")]
2015                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
2016                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
2017                ConfigFileType::JSON => self.json.visit(v, "format", ""),
2018            }
2019        } else {
2020            self.csv.visit(v, "csv", "");
2021            self.parquet.visit(v, "parquet", "");
2022            self.json.visit(v, "json", "");
2023        }
2024    }
2025
2026    /// Sets a configuration value for a specific key within `TableOptions`.
2027    ///
2028    /// This method delegates setting configuration values to the specific file format configurations,
2029    /// based on the current format selected. If no format is selected, it returns an error.
2030    ///
2031    /// # Parameters
2032    ///
2033    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
2034    ///   for CSV format.
2035    /// * `value`: The value to set for the specified configuration key.
2036    ///
2037    /// # Returns
2038    ///
2039    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
2040    /// or if setting the configuration value fails for the specific format.
2041    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2042        // Extensions are handled in the public `ConfigOptions::set`
2043        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2044        match key {
2045            "format" => {
2046                let Some(format) = &self.current_format else {
2047                    return _config_err!("Specify a format for TableOptions");
2048                };
2049                match format {
2050                    #[cfg(feature = "parquet")]
2051                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
2052                    ConfigFileType::CSV => self.csv.set(rem, value),
2053                    ConfigFileType::JSON => self.json.set(rem, value),
2054                }
2055            }
2056            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
2057        }
2058    }
2059}
2060
2061impl TableOptions {
2062    /// Constructs a new instance of `TableOptions` with default settings.
2063    ///
2064    /// # Returns
2065    ///
2066    /// A new `TableOptions` instance with default configuration values.
2067    pub fn new() -> Self {
2068        Self::default()
2069    }
2070
2071    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
2072    ///
2073    /// # Parameters
2074    ///
2075    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
2076    ///
2077    /// # Returns
2078    ///
2079    /// A new `TableOptions` instance with settings applied from the session config.
2080    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
2081        let initial = TableOptions::default();
2082        initial.combine_with_session_config(config)
2083    }
2084
2085    /// Updates the current `TableOptions` with settings from a given session config.
2086    ///
2087    /// # Parameters
2088    ///
2089    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
2090    ///
2091    /// # Returns
2092    ///
2093    /// A new `TableOptions` instance with updated settings from the session config.
2094    #[must_use = "this method returns a new instance"]
2095    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
2096        let mut clone = self.clone();
2097        clone.parquet.global = config.execution.parquet.clone();
2098        clone
2099    }
2100
2101    /// Sets the file format for the table.
2102    ///
2103    /// # Parameters
2104    ///
2105    /// * `format`: The file format to use (e.g., CSV, Parquet).
2106    pub fn set_config_format(&mut self, format: ConfigFileType) {
2107        self.current_format = Some(format);
2108    }
2109
2110    /// Sets the extensions for this `TableOptions` instance.
2111    ///
2112    /// # Parameters
2113    ///
2114    /// * `extensions`: The `Extensions` instance to set.
2115    ///
2116    /// # Returns
2117    ///
2118    /// A new `TableOptions` instance with the specified extensions applied.
2119    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2120        self.extensions = extensions;
2121        self
2122    }
2123
2124    /// Sets a specific configuration option.
2125    ///
2126    /// # Parameters
2127    ///
2128    /// * `key`: The configuration key (e.g., "format.delimiter").
2129    /// * `value`: The value to set for the specified key.
2130    ///
2131    /// # Returns
2132    ///
2133    /// A result indicating success or failure in setting the configuration option.
2134    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2135        let Some((prefix, _)) = key.split_once('.') else {
2136            return _config_err!("could not find config namespace for key \"{key}\"");
2137        };
2138
2139        if prefix == "format" {
2140            return ConfigField::set(self, key, value);
2141        }
2142
2143        if prefix == "execution" {
2144            return Ok(());
2145        }
2146
2147        let Some(e) = self.extensions.0.get_mut(prefix) else {
2148            return _config_err!("Could not find config namespace \"{prefix}\"");
2149        };
2150        e.0.set(key, value)
2151    }
2152
2153    /// Initializes a new `TableOptions` from a hash map of string settings.
2154    ///
2155    /// # Parameters
2156    ///
2157    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2158    ///
2159    /// # Returns
2160    ///
2161    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
2162    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
2163        let mut ret = Self::default();
2164        for (k, v) in settings {
2165            ret.set(k, v)?;
2166        }
2167
2168        Ok(ret)
2169    }
2170
2171    /// Modifies the current `TableOptions` instance with settings from a hash map.
2172    ///
2173    /// # Parameters
2174    ///
2175    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2176    ///
2177    /// # Returns
2178    ///
2179    /// A result indicating success or failure in applying the settings.
2180    pub fn alter_with_string_hash_map(
2181        &mut self,
2182        settings: &HashMap<String, String>,
2183    ) -> Result<()> {
2184        for (k, v) in settings {
2185            self.set(k, v)?;
2186        }
2187        Ok(())
2188    }
2189
2190    /// Retrieves all configuration entries from this `TableOptions`.
2191    ///
2192    /// # Returns
2193    ///
2194    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
2195    pub fn entries(&self) -> Vec<ConfigEntry> {
2196        struct Visitor(Vec<ConfigEntry>);
2197
2198        impl Visit for Visitor {
2199            fn some<V: Display>(
2200                &mut self,
2201                key: &str,
2202                value: V,
2203                description: &'static str,
2204            ) {
2205                self.0.push(ConfigEntry {
2206                    key: key.to_string(),
2207                    value: Some(value.to_string()),
2208                    description,
2209                })
2210            }
2211
2212            fn none(&mut self, key: &str, description: &'static str) {
2213                self.0.push(ConfigEntry {
2214                    key: key.to_string(),
2215                    value: None,
2216                    description,
2217                })
2218            }
2219        }
2220
2221        let mut v = Visitor(vec![]);
2222        self.visit(&mut v, "format", "");
2223
2224        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
2225        v.0
2226    }
2227}
2228
2229/// Options that control how Parquet files are read, including global options
2230/// that apply to all columns and optional column-specific overrides
2231///
2232/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
2233/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2234/// (e.g. sorting_columns).
2235#[derive(Clone, Default, Debug, PartialEq)]
2236pub struct TableParquetOptions {
2237    /// Global Parquet options that propagates to all columns.
2238    pub global: ParquetOptions,
2239    /// Column specific options. Default usage is parquet.XX::column.
2240    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2241    /// Additional file-level metadata to include. Inserted into the key_value_metadata
2242    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2243    ///
2244    /// Multiple entries are permitted
2245    /// ```sql
2246    /// OPTIONS (
2247    ///    'format.metadata::key1' '',
2248    ///    'format.metadata::key2' 'value',
2249    ///    'format.metadata::key3' 'value has spaces',
2250    ///    'format.metadata::key4' 'value has special chars :: :',
2251    ///    'format.metadata::key_dupe' 'original will be overwritten',
2252    ///    'format.metadata::key_dupe' 'final'
2253    /// )
2254    /// ```
2255    pub key_value_metadata: HashMap<String, Option<String>>,
2256    /// Options for configuring Parquet modular encryption
2257    ///
2258    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2259    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2260    /// These can be set via 'format.crypto', for example:
2261    /// ```sql
2262    /// OPTIONS (
2263    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
2264    ///    'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435',  -- b"0123456789012345" */
2265    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2266    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2267    ///     -- Same for decryption
2268    ///    'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2269    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2270    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2271    /// )
2272    /// ```
2273    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2274    /// Note that keys must be provided as in hex format since these are binary strings.
2275    pub crypto: ParquetEncryptionOptions,
2276}
2277
2278impl TableParquetOptions {
2279    /// Return new default TableParquetOptions
2280    pub fn new() -> Self {
2281        Self::default()
2282    }
2283
2284    /// Set whether the encoding of the arrow metadata should occur
2285    /// during the writing of parquet.
2286    ///
2287    /// Default is to encode the arrow schema in the file kv_metadata.
2288    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2289        Self {
2290            global: ParquetOptions {
2291                skip_arrow_metadata: skip,
2292                ..self.global
2293            },
2294            ..self
2295        }
2296    }
2297
2298    /// Retrieves all configuration entries from this `TableParquetOptions`.
2299    ///
2300    /// # Returns
2301    ///
2302    /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2303    pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2304        struct Visitor(Vec<ConfigEntry>);
2305
2306        impl Visit for Visitor {
2307            fn some<V: Display>(
2308                &mut self,
2309                key: &str,
2310                value: V,
2311                description: &'static str,
2312            ) {
2313                self.0.push(ConfigEntry {
2314                    key: key[1..].to_string(),
2315                    value: Some(value.to_string()),
2316                    description,
2317                })
2318            }
2319
2320            fn none(&mut self, key: &str, description: &'static str) {
2321                self.0.push(ConfigEntry {
2322                    key: key[1..].to_string(),
2323                    value: None,
2324                    description,
2325                })
2326            }
2327        }
2328
2329        let mut v = Visitor(vec![]);
2330        self.visit(&mut v, "", "");
2331
2332        v.0
2333    }
2334}
2335
2336impl ConfigField for TableParquetOptions {
2337    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2338        self.global.visit(v, key_prefix, description);
2339        self.column_specific_options
2340            .visit(v, key_prefix, description);
2341        self.crypto
2342            .visit(v, &format!("{key_prefix}.crypto"), description);
2343    }
2344
2345    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2346        // Determine if the key is a global, metadata, or column-specific setting
2347        if key.starts_with("metadata::") {
2348            let k = match key.split("::").collect::<Vec<_>>()[..] {
2349                [_meta] | [_meta, ""] => {
2350                    return _config_err!(
2351                        "Invalid metadata key provided, missing key in metadata::<key>"
2352                    );
2353                }
2354                [_meta, k] => k.into(),
2355                _ => {
2356                    return _config_err!(
2357                        "Invalid metadata key provided, found too many '::' in \"{key}\""
2358                    );
2359                }
2360            };
2361            self.key_value_metadata.insert(k, Some(value.into()));
2362            Ok(())
2363        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2364            self.crypto.set(crypto_feature, value)
2365        } else if key.contains("::") {
2366            self.column_specific_options.set(key, value)
2367        } else {
2368            self.global.set(key, value)
2369        }
2370    }
2371}
2372
2373macro_rules! config_namespace_with_hashmap {
2374    (
2375     $(#[doc = $struct_d:tt])*
2376     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
2377     $vis:vis struct $struct_name:ident {
2378        $(
2379        $(#[doc = $d:tt])*
2380        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2381        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2382        )*$(,)*
2383    }
2384    ) => {
2385
2386        $(#[doc = $struct_d])*
2387        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
2388        #[derive(Debug, Clone, PartialEq)]
2389        $vis struct $struct_name{
2390            $(
2391            $(#[doc = $d])*
2392            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2393            $field_vis $field_name : $field_type,
2394            )*
2395        }
2396
2397        impl ConfigField for $struct_name {
2398            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2399                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2400                match key {
2401                    $(
2402                       stringify!($field_name) => {
2403                           // Handle deprecated fields
2404                           $(let value = $transform(value);)?
2405                           self.$field_name.set(rem, value.as_ref())
2406                       },
2407                    )*
2408                    _ => _config_err!(
2409                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2410                    )
2411                }
2412            }
2413
2414            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2415                $(
2416                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2417                let desc = concat!($($d),*).trim();
2418                // Handle deprecated fields
2419                self.$field_name.visit(v, key.as_str(), desc);
2420                )*
2421            }
2422        }
2423
2424        impl Default for $struct_name {
2425            fn default() -> Self {
2426                Self {
2427                    $($field_name: $default),*
2428                }
2429            }
2430        }
2431
2432        impl ConfigField for HashMap<String,$struct_name> {
2433            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2434                let parts: Vec<&str> = key.splitn(2, "::").collect();
2435                match parts.as_slice() {
2436                    [inner_key, hashmap_key] => {
2437                        // Get or create the struct for the specified key
2438                        let inner_value = self
2439                            .entry((*hashmap_key).to_owned())
2440                            .or_insert_with($struct_name::default);
2441
2442                        inner_value.set(inner_key, value)
2443                    }
2444                    _ => _config_err!("Unrecognized key '{key}'."),
2445                }
2446            }
2447
2448            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2449                for (column_name, col_options) in self {
2450                    $(
2451                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2452                    let desc = concat!($($d),*).trim();
2453                    col_options.$field_name.visit(v, key.as_str(), desc);
2454                    )*
2455                }
2456            }
2457        }
2458    }
2459}
2460
2461config_namespace_with_hashmap! {
2462    /// Options controlling parquet format for individual columns.
2463    ///
2464    /// See [`ParquetOptions`] for more details
2465    pub struct ParquetColumnOptions {
2466        /// Sets if bloom filter is enabled for the column path.
2467        pub bloom_filter_enabled: Option<bool>, default = None
2468
2469        /// Sets encoding for the column path.
2470        /// Valid values are: plain, plain_dictionary, rle,
2471        /// bit_packed, delta_binary_packed, delta_length_byte_array,
2472        /// delta_byte_array, rle_dictionary, and byte_stream_split.
2473        /// These values are not case-sensitive. If NULL, uses
2474        /// default parquet options
2475        pub encoding: Option<String>, default = None
2476
2477        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2478        /// default parquet options
2479        pub dictionary_enabled: Option<bool>, default = None
2480
2481        /// Sets default parquet compression codec for the column path.
2482        /// Valid values are: uncompressed, snappy, gzip(level),
2483        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2484        /// These values are not case-sensitive. If NULL, uses
2485        /// default parquet options
2486        pub compression: Option<String>, transform = str::to_lowercase, default = None
2487
2488        /// Sets if statistics are enabled for the column
2489        /// Valid values are: "none", "chunk", and "page"
2490        /// These values are not case sensitive. If NULL, uses
2491        /// default parquet options
2492        pub statistics_enabled: Option<String>, default = None
2493
2494        /// Sets bloom filter false positive probability for the column path. If NULL, uses
2495        /// default parquet options
2496        pub bloom_filter_fpp: Option<f64>, default = None
2497
2498        /// Sets bloom filter number of distinct values. If NULL, uses
2499        /// default parquet options
2500        pub bloom_filter_ndv: Option<u64>, default = None
2501    }
2502}
2503
2504#[derive(Clone, Debug, PartialEq)]
2505pub struct ConfigFileEncryptionProperties {
2506    /// Should the parquet footer be encrypted
2507    /// default is true
2508    pub encrypt_footer: bool,
2509    /// Key to use for the parquet footer encoded in hex format
2510    pub footer_key_as_hex: String,
2511    /// Metadata information for footer key
2512    pub footer_key_metadata_as_hex: String,
2513    /// HashMap of column names --> (key in hex format, metadata)
2514    pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2515    /// AAD prefix string uniquely identifies the file and prevents file swapping
2516    pub aad_prefix_as_hex: String,
2517    /// If true, store the AAD prefix in the file
2518    /// default is false
2519    pub store_aad_prefix: bool,
2520}
2521
2522// Setup to match EncryptionPropertiesBuilder::new()
2523impl Default for ConfigFileEncryptionProperties {
2524    fn default() -> Self {
2525        ConfigFileEncryptionProperties {
2526            encrypt_footer: true,
2527            footer_key_as_hex: String::new(),
2528            footer_key_metadata_as_hex: String::new(),
2529            column_encryption_properties: Default::default(),
2530            aad_prefix_as_hex: String::new(),
2531            store_aad_prefix: false,
2532        }
2533    }
2534}
2535
2536config_namespace_with_hashmap! {
2537    pub struct ColumnEncryptionProperties {
2538        /// Per column encryption key
2539        pub column_key_as_hex: String, default = "".to_string()
2540        /// Per column encryption key metadata
2541        pub column_metadata_as_hex: Option<String>, default = None
2542    }
2543}
2544
2545impl ConfigField for ConfigFileEncryptionProperties {
2546    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2547        let key = format!("{key_prefix}.encrypt_footer");
2548        let desc = "Encrypt the footer";
2549        self.encrypt_footer.visit(v, key.as_str(), desc);
2550
2551        let key = format!("{key_prefix}.footer_key_as_hex");
2552        let desc = "Key to use for the parquet footer";
2553        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2554
2555        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2556        let desc = "Metadata to use for the parquet footer";
2557        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2558
2559        self.column_encryption_properties.visit(v, key_prefix, desc);
2560
2561        let key = format!("{key_prefix}.aad_prefix_as_hex");
2562        let desc = "AAD prefix to use";
2563        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2564
2565        let key = format!("{key_prefix}.store_aad_prefix");
2566        let desc = "If true, store the AAD prefix";
2567        self.store_aad_prefix.visit(v, key.as_str(), desc);
2568
2569        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2570    }
2571
2572    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2573        // Any hex encoded values must be pre-encoded using
2574        // hex::encode() before calling set.
2575
2576        if key.contains("::") {
2577            // Handle any column specific properties
2578            return self.column_encryption_properties.set(key, value);
2579        };
2580
2581        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2582        match key {
2583            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2584            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2585            "footer_key_metadata_as_hex" => {
2586                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2587            }
2588            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2589            "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2590            _ => _config_err!(
2591                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2592                key
2593            ),
2594        }
2595    }
2596}
2597
2598#[cfg(feature = "parquet_encryption")]
2599impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2600    fn from(val: ConfigFileEncryptionProperties) -> Self {
2601        let mut fep = FileEncryptionProperties::builder(
2602            hex::decode(val.footer_key_as_hex).unwrap(),
2603        )
2604        .with_plaintext_footer(!val.encrypt_footer)
2605        .with_aad_prefix_storage(val.store_aad_prefix);
2606
2607        if !val.footer_key_metadata_as_hex.is_empty() {
2608            fep = fep.with_footer_key_metadata(
2609                hex::decode(&val.footer_key_metadata_as_hex)
2610                    .expect("Invalid footer key metadata"),
2611            );
2612        }
2613
2614        for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2615            let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2616                .expect("Invalid column encryption key");
2617            let key_metadata = encryption_props
2618                .column_metadata_as_hex
2619                .as_ref()
2620                .map(|x| hex::decode(x).expect("Invalid column metadata"));
2621            match key_metadata {
2622                Some(key_metadata) => {
2623                    fep = fep.with_column_key_and_metadata(
2624                        column_name,
2625                        encryption_key,
2626                        key_metadata,
2627                    );
2628                }
2629                None => {
2630                    fep = fep.with_column_key(column_name, encryption_key);
2631                }
2632            }
2633        }
2634
2635        if !val.aad_prefix_as_hex.is_empty() {
2636            let aad_prefix: Vec<u8> =
2637                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2638            fep = fep.with_aad_prefix(aad_prefix);
2639        }
2640        Arc::unwrap_or_clone(fep.build().unwrap())
2641    }
2642}
2643
2644#[cfg(feature = "parquet_encryption")]
2645impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2646    fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2647        let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2648
2649        let mut column_encryption_properties: HashMap<
2650            String,
2651            ColumnEncryptionProperties,
2652        > = HashMap::new();
2653
2654        for (i, column_name) in column_names_vec.iter().enumerate() {
2655            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2656            let column_metadata_as_hex: Option<String> =
2657                column_metas_vec.get(i).map(hex::encode);
2658            column_encryption_properties.insert(
2659                column_name.clone(),
2660                ColumnEncryptionProperties {
2661                    column_key_as_hex,
2662                    column_metadata_as_hex,
2663                },
2664            );
2665        }
2666        let mut aad_prefix: Vec<u8> = Vec::new();
2667        if let Some(prefix) = f.aad_prefix() {
2668            aad_prefix = prefix.clone();
2669        }
2670        ConfigFileEncryptionProperties {
2671            encrypt_footer: f.encrypt_footer(),
2672            footer_key_as_hex: hex::encode(f.footer_key()),
2673            footer_key_metadata_as_hex: f
2674                .footer_key_metadata()
2675                .map(hex::encode)
2676                .unwrap_or_default(),
2677            column_encryption_properties,
2678            aad_prefix_as_hex: hex::encode(aad_prefix),
2679            store_aad_prefix: f.store_aad_prefix(),
2680        }
2681    }
2682}
2683
2684#[derive(Clone, Debug, PartialEq)]
2685pub struct ConfigFileDecryptionProperties {
2686    /// Binary string to use for the parquet footer encoded in hex format
2687    pub footer_key_as_hex: String,
2688    /// HashMap of column names --> key in hex format
2689    pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2690    /// AAD prefix string uniquely identifies the file and prevents file swapping
2691    pub aad_prefix_as_hex: String,
2692    /// If true, then verify signature for files with plaintext footers.
2693    /// default = true
2694    pub footer_signature_verification: bool,
2695}
2696
2697config_namespace_with_hashmap! {
2698    pub struct ColumnDecryptionProperties {
2699        /// Per column encryption key
2700        pub column_key_as_hex: String, default = "".to_string()
2701    }
2702}
2703
2704// Setup to match DecryptionPropertiesBuilder::new()
2705impl Default for ConfigFileDecryptionProperties {
2706    fn default() -> Self {
2707        ConfigFileDecryptionProperties {
2708            footer_key_as_hex: String::new(),
2709            column_decryption_properties: Default::default(),
2710            aad_prefix_as_hex: String::new(),
2711            footer_signature_verification: true,
2712        }
2713    }
2714}
2715
2716impl ConfigField for ConfigFileDecryptionProperties {
2717    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2718        let key = format!("{key_prefix}.footer_key_as_hex");
2719        let desc = "Key to use for the parquet footer";
2720        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2721
2722        let key = format!("{key_prefix}.aad_prefix_as_hex");
2723        let desc = "AAD prefix to use";
2724        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2725
2726        let key = format!("{key_prefix}.footer_signature_verification");
2727        let desc = "If true, verify the footer signature";
2728        self.footer_signature_verification
2729            .visit(v, key.as_str(), desc);
2730
2731        self.column_decryption_properties.visit(v, key_prefix, desc);
2732    }
2733
2734    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2735        // Any hex encoded values must be pre-encoded using
2736        // hex::encode() before calling set.
2737
2738        if key.contains("::") {
2739            // Handle any column specific properties
2740            return self.column_decryption_properties.set(key, value);
2741        };
2742
2743        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2744        match key {
2745            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2746            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2747            "footer_signature_verification" => {
2748                self.footer_signature_verification.set(rem, value.as_ref())
2749            }
2750            _ => _config_err!(
2751                "Config value \"{}\" not found on ConfigFileDecryptionProperties",
2752                key
2753            ),
2754        }
2755    }
2756}
2757
2758#[cfg(feature = "parquet_encryption")]
2759impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2760    fn from(val: ConfigFileDecryptionProperties) -> Self {
2761        let mut column_names: Vec<&str> = Vec::new();
2762        let mut column_keys: Vec<Vec<u8>> = Vec::new();
2763
2764        for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2765            column_names.push(col_name.as_str());
2766            column_keys.push(
2767                hex::decode(&decryption_properties.column_key_as_hex)
2768                    .expect("Invalid column decryption key"),
2769            );
2770        }
2771
2772        let mut fep = FileDecryptionProperties::builder(
2773            hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2774        )
2775        .with_column_keys(column_names, column_keys)
2776        .unwrap();
2777
2778        if !val.footer_signature_verification {
2779            fep = fep.disable_footer_signature_verification();
2780        }
2781
2782        if !val.aad_prefix_as_hex.is_empty() {
2783            let aad_prefix =
2784                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2785            fep = fep.with_aad_prefix(aad_prefix);
2786        }
2787
2788        Arc::unwrap_or_clone(fep.build().unwrap())
2789    }
2790}
2791
2792#[cfg(feature = "parquet_encryption")]
2793impl From<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
2794    fn from(f: &Arc<FileDecryptionProperties>) -> Self {
2795        let (column_names_vec, column_keys_vec) = f.column_keys();
2796        let mut column_decryption_properties: HashMap<
2797            String,
2798            ColumnDecryptionProperties,
2799        > = HashMap::new();
2800        for (i, column_name) in column_names_vec.iter().enumerate() {
2801            let props = ColumnDecryptionProperties {
2802                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2803            };
2804            column_decryption_properties.insert(column_name.clone(), props);
2805        }
2806
2807        let mut aad_prefix: Vec<u8> = Vec::new();
2808        if let Some(prefix) = f.aad_prefix() {
2809            aad_prefix = prefix.clone();
2810        }
2811        ConfigFileDecryptionProperties {
2812            footer_key_as_hex: hex::encode(
2813                f.footer_key(None).unwrap_or_default().as_ref(),
2814            ),
2815            column_decryption_properties,
2816            aad_prefix_as_hex: hex::encode(aad_prefix),
2817            footer_signature_verification: f.check_plaintext_footer_integrity(),
2818        }
2819    }
2820}
2821
2822/// Holds implementation-specific options for an encryption factory
2823#[derive(Clone, Debug, Default, PartialEq)]
2824pub struct EncryptionFactoryOptions {
2825    pub options: HashMap<String, String>,
2826}
2827
2828impl ConfigField for EncryptionFactoryOptions {
2829    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2830        for (option_key, option_value) in &self.options {
2831            v.some(
2832                &format!("{key}.{option_key}"),
2833                option_value,
2834                "Encryption factory specific option",
2835            );
2836        }
2837    }
2838
2839    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2840        self.options.insert(key.to_owned(), value.to_owned());
2841        Ok(())
2842    }
2843}
2844
2845impl EncryptionFactoryOptions {
2846    /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2847    pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2848        let mut options = T::default();
2849        for (key, value) in &self.options {
2850            options.set(key, value)?;
2851        }
2852        Ok(options)
2853    }
2854}
2855
2856config_namespace! {
2857    /// Options controlling CSV format
2858    pub struct CsvOptions {
2859        /// Specifies whether there is a CSV header (i.e. the first line
2860        /// consists of is column names). The value `None` indicates that
2861        /// the configuration should be consulted.
2862        pub has_header: Option<bool>, default = None
2863        pub delimiter: u8, default = b','
2864        pub quote: u8, default = b'"'
2865        pub terminator: Option<u8>, default = None
2866        pub escape: Option<u8>, default = None
2867        pub double_quote: Option<bool>, default = None
2868        /// Specifies whether newlines in (quoted) values are supported.
2869        ///
2870        /// Parsing newlines in quoted values may be affected by execution behaviour such as
2871        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2872        /// parsed successfully, which may reduce performance.
2873        ///
2874        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2875        pub newlines_in_values: Option<bool>, default = None
2876        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2877        /// Compression level for the output file. The valid range depends on the
2878        /// compression algorithm:
2879        /// - ZSTD: 1 to 22 (default: 3)
2880        /// - GZIP: 0 to 9 (default: 6)
2881        /// - BZIP2: 0 to 9 (default: 6)
2882        /// - XZ: 0 to 9 (default: 6)
2883        /// If not specified, the default level for the compression algorithm is used.
2884        pub compression_level: Option<u32>, default = None
2885        pub schema_infer_max_rec: Option<usize>, default = None
2886        pub date_format: Option<String>, default = None
2887        pub datetime_format: Option<String>, default = None
2888        pub timestamp_format: Option<String>, default = None
2889        pub timestamp_tz_format: Option<String>, default = None
2890        pub time_format: Option<String>, default = None
2891        // The output format for Nulls in the CSV writer.
2892        pub null_value: Option<String>, default = None
2893        // The input regex for Nulls when loading CSVs.
2894        pub null_regex: Option<String>, default = None
2895        pub comment: Option<u8>, default = None
2896        /// Whether to allow truncated rows when parsing, both within a single file and across files.
2897        ///
2898        /// When set to false (default), reading a single CSV file which has rows of different lengths will
2899        /// error; if reading multiple CSV files with different number of columns, it will also fail.
2900        ///
2901        /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
2902        /// rows with null values for the missing columns; if reading multiple CSV files with different number
2903        /// of columns, it creates a union schema containing all columns found across the files, and will
2904        /// pad any files missing columns with null values for their rows.
2905        pub truncated_rows: Option<bool>, default = None
2906    }
2907}
2908
2909impl CsvOptions {
2910    /// Set a limit in terms of records to scan to infer the schema
2911    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2912    pub fn with_compression(
2913        mut self,
2914        compression_type_variant: CompressionTypeVariant,
2915    ) -> Self {
2916        self.compression = compression_type_variant;
2917        self
2918    }
2919
2920    /// Set a limit in terms of records to scan to infer the schema
2921    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2922    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2923        self.schema_infer_max_rec = Some(max_rec);
2924        self
2925    }
2926
2927    /// Set true to indicate that the first line is a header.
2928    /// - default to true
2929    pub fn with_has_header(mut self, has_header: bool) -> Self {
2930        self.has_header = Some(has_header);
2931        self
2932    }
2933
2934    /// Returns true if the first line is a header. If format options does not
2935    /// specify whether there is a header, returns `None` (indicating that the
2936    /// configuration should be consulted).
2937    pub fn has_header(&self) -> Option<bool> {
2938        self.has_header
2939    }
2940
2941    /// The character separating values within a row.
2942    /// - default to ','
2943    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2944        self.delimiter = delimiter;
2945        self
2946    }
2947
2948    /// The quote character in a row.
2949    /// - default to '"'
2950    pub fn with_quote(mut self, quote: u8) -> Self {
2951        self.quote = quote;
2952        self
2953    }
2954
2955    /// The character that terminates a row.
2956    /// - default to None (CRLF)
2957    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2958        self.terminator = terminator;
2959        self
2960    }
2961
2962    /// The escape character in a row.
2963    /// - default is None
2964    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2965        self.escape = escape;
2966        self
2967    }
2968
2969    /// Set true to indicate that the CSV quotes should be doubled.
2970    /// - default to true
2971    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2972        self.double_quote = Some(double_quote);
2973        self
2974    }
2975
2976    /// Specifies whether newlines in (quoted) values are supported.
2977    ///
2978    /// Parsing newlines in quoted values may be affected by execution behaviour such as
2979    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2980    /// parsed successfully, which may reduce performance.
2981    ///
2982    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2983    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2984        self.newlines_in_values = Some(newlines_in_values);
2985        self
2986    }
2987
2988    /// Set a `CompressionTypeVariant` of CSV
2989    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2990    pub fn with_file_compression_type(
2991        mut self,
2992        compression: CompressionTypeVariant,
2993    ) -> Self {
2994        self.compression = compression;
2995        self
2996    }
2997
2998    /// Whether to allow truncated rows when parsing.
2999    /// By default this is set to false and will error if the CSV rows have different lengths.
3000    /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
3001    /// If the record’s schema is not nullable, then it will still return an error.
3002    pub fn with_truncated_rows(mut self, allow: bool) -> Self {
3003        self.truncated_rows = Some(allow);
3004        self
3005    }
3006
3007    /// Set the compression level for the output file.
3008    /// The valid range depends on the compression algorithm.
3009    /// If not specified, the default level for the algorithm is used.
3010    pub fn with_compression_level(mut self, level: u32) -> Self {
3011        self.compression_level = Some(level);
3012        self
3013    }
3014
3015    /// The delimiter character.
3016    pub fn delimiter(&self) -> u8 {
3017        self.delimiter
3018    }
3019
3020    /// The quote character.
3021    pub fn quote(&self) -> u8 {
3022        self.quote
3023    }
3024
3025    /// The terminator character.
3026    pub fn terminator(&self) -> Option<u8> {
3027        self.terminator
3028    }
3029
3030    /// The escape character.
3031    pub fn escape(&self) -> Option<u8> {
3032        self.escape
3033    }
3034}
3035
3036config_namespace! {
3037    /// Options controlling JSON format
3038    pub struct JsonOptions {
3039        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3040        /// Compression level for the output file. The valid range depends on the
3041        /// compression algorithm:
3042        /// - ZSTD: 1 to 22 (default: 3)
3043        /// - GZIP: 0 to 9 (default: 6)
3044        /// - BZIP2: 0 to 9 (default: 6)
3045        /// - XZ: 0 to 9 (default: 6)
3046        /// If not specified, the default level for the compression algorithm is used.
3047        pub compression_level: Option<u32>, default = None
3048        pub schema_infer_max_rec: Option<usize>, default = None
3049    }
3050}
3051
3052pub trait OutputFormatExt: Display {}
3053
3054#[derive(Debug, Clone, PartialEq)]
3055#[cfg_attr(feature = "parquet", expect(clippy::large_enum_variant))]
3056pub enum OutputFormat {
3057    CSV(CsvOptions),
3058    JSON(JsonOptions),
3059    #[cfg(feature = "parquet")]
3060    PARQUET(TableParquetOptions),
3061    AVRO,
3062    ARROW,
3063}
3064
3065impl Display for OutputFormat {
3066    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3067        let out = match self {
3068            OutputFormat::CSV(_) => "csv",
3069            OutputFormat::JSON(_) => "json",
3070            #[cfg(feature = "parquet")]
3071            OutputFormat::PARQUET(_) => "parquet",
3072            OutputFormat::AVRO => "avro",
3073            OutputFormat::ARROW => "arrow",
3074        };
3075        write!(f, "{out}")
3076    }
3077}
3078
3079#[cfg(test)]
3080mod tests {
3081    #[cfg(feature = "parquet")]
3082    use crate::config::TableParquetOptions;
3083    use crate::config::{
3084        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
3085        Extensions, TableOptions,
3086    };
3087    use std::any::Any;
3088    use std::collections::HashMap;
3089
3090    #[derive(Default, Debug, Clone)]
3091    pub struct TestExtensionConfig {
3092        /// Should "foo" be replaced by "bar"?
3093        pub properties: HashMap<String, String>,
3094    }
3095
3096    impl ExtensionOptions for TestExtensionConfig {
3097        fn as_any(&self) -> &dyn Any {
3098            self
3099        }
3100
3101        fn as_any_mut(&mut self) -> &mut dyn Any {
3102            self
3103        }
3104
3105        fn cloned(&self) -> Box<dyn ExtensionOptions> {
3106            Box::new(self.clone())
3107        }
3108
3109        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
3110            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
3111            assert_eq!(key, "test");
3112            self.properties.insert(rem.to_owned(), value.to_owned());
3113            Ok(())
3114        }
3115
3116        fn entries(&self) -> Vec<ConfigEntry> {
3117            self.properties
3118                .iter()
3119                .map(|(k, v)| ConfigEntry {
3120                    key: k.into(),
3121                    value: Some(v.into()),
3122                    description: "",
3123                })
3124                .collect()
3125        }
3126    }
3127
3128    impl ConfigExtension for TestExtensionConfig {
3129        const PREFIX: &'static str = "test";
3130    }
3131
3132    #[test]
3133    fn create_table_config() {
3134        let mut extension = Extensions::new();
3135        extension.insert(TestExtensionConfig::default());
3136        let table_config = TableOptions::new().with_extensions(extension);
3137        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
3138        assert!(kafka_config.is_some())
3139    }
3140
3141    #[test]
3142    fn alter_test_extension_config() {
3143        let mut extension = Extensions::new();
3144        extension.insert(TestExtensionConfig::default());
3145        let mut table_config = TableOptions::new().with_extensions(extension);
3146        table_config.set_config_format(ConfigFileType::CSV);
3147        table_config.set("format.delimiter", ";").unwrap();
3148        assert_eq!(table_config.csv.delimiter, b';');
3149        table_config.set("test.bootstrap.servers", "asd").unwrap();
3150        let kafka_config = table_config
3151            .extensions
3152            .get::<TestExtensionConfig>()
3153            .unwrap();
3154        assert_eq!(
3155            kafka_config.properties.get("bootstrap.servers").unwrap(),
3156            "asd"
3157        );
3158    }
3159
3160    #[test]
3161    fn iter_test_extension_config() {
3162        let mut extension = Extensions::new();
3163        extension.insert(TestExtensionConfig::default());
3164        let table_config = TableOptions::new().with_extensions(extension);
3165        let extensions = table_config.extensions.iter().collect::<Vec<_>>();
3166        assert_eq!(extensions.len(), 1);
3167        assert_eq!(extensions[0].0, TestExtensionConfig::PREFIX);
3168    }
3169
3170    #[test]
3171    fn csv_u8_table_options() {
3172        let mut table_config = TableOptions::new();
3173        table_config.set_config_format(ConfigFileType::CSV);
3174        table_config.set("format.delimiter", ";").unwrap();
3175        assert_eq!(table_config.csv.delimiter as char, ';');
3176        table_config.set("format.escape", "\"").unwrap();
3177        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
3178        table_config.set("format.escape", "\'").unwrap();
3179        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
3180    }
3181
3182    #[test]
3183    fn warning_only_not_default() {
3184        use std::sync::atomic::AtomicUsize;
3185        static COUNT: AtomicUsize = AtomicUsize::new(0);
3186        use log::{Level, LevelFilter, Metadata, Record};
3187        struct SimpleLogger;
3188        impl log::Log for SimpleLogger {
3189            fn enabled(&self, metadata: &Metadata) -> bool {
3190                metadata.level() <= Level::Info
3191            }
3192
3193            fn log(&self, record: &Record) {
3194                if self.enabled(record.metadata()) {
3195                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3196                }
3197            }
3198            fn flush(&self) {}
3199        }
3200        log::set_logger(&SimpleLogger).unwrap();
3201        log::set_max_level(LevelFilter::Info);
3202        let mut sql_parser_options = crate::config::SqlParserOptions::default();
3203        sql_parser_options
3204            .set("enable_options_value_normalization", "false")
3205            .unwrap();
3206        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
3207        sql_parser_options
3208            .set("enable_options_value_normalization", "true")
3209            .unwrap();
3210        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
3211    }
3212
3213    #[test]
3214    fn reset_nested_scalar_reports_helpful_error() {
3215        let mut value = true;
3216        let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err();
3217        let message = err.to_string();
3218        assert!(
3219            message.starts_with(
3220                "Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\""
3221            ),
3222            "unexpected error message: {message}"
3223        );
3224    }
3225
3226    #[cfg(feature = "parquet")]
3227    #[test]
3228    fn parquet_table_options() {
3229        let mut table_config = TableOptions::new();
3230        table_config.set_config_format(ConfigFileType::PARQUET);
3231        table_config
3232            .set("format.bloom_filter_enabled::col1", "true")
3233            .unwrap();
3234        assert_eq!(
3235            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
3236            Some(true)
3237        );
3238    }
3239
3240    #[cfg(feature = "parquet_encryption")]
3241    #[test]
3242    fn parquet_table_encryption() {
3243        use crate::config::{
3244            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
3245        };
3246        use parquet::encryption::decrypt::FileDecryptionProperties;
3247        use parquet::encryption::encrypt::FileEncryptionProperties;
3248        use std::sync::Arc;
3249
3250        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
3251        let column_names = vec!["double_field", "float_field"];
3252        let column_keys =
3253            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
3254
3255        let file_encryption_properties =
3256            FileEncryptionProperties::builder(footer_key.clone())
3257                .with_column_keys(column_names.clone(), column_keys.clone())
3258                .unwrap()
3259                .build()
3260                .unwrap();
3261
3262        let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
3263            .with_column_keys(column_names.clone(), column_keys.clone())
3264            .unwrap()
3265            .build()
3266            .unwrap();
3267
3268        // Test round-trip
3269        let config_encrypt =
3270            ConfigFileEncryptionProperties::from(&file_encryption_properties);
3271        let encryption_properties_built =
3272            Arc::new(FileEncryptionProperties::from(config_encrypt.clone()));
3273        assert_eq!(file_encryption_properties, encryption_properties_built);
3274
3275        let config_decrypt = ConfigFileDecryptionProperties::from(&decryption_properties);
3276        let decryption_properties_built =
3277            Arc::new(FileDecryptionProperties::from(config_decrypt.clone()));
3278        assert_eq!(decryption_properties, decryption_properties_built);
3279
3280        ///////////////////////////////////////////////////////////////////////////////////
3281        // Test encryption config
3282
3283        // Display original encryption config
3284        // println!("{:#?}", config_encrypt);
3285
3286        let mut table_config = TableOptions::new();
3287        table_config.set_config_format(ConfigFileType::PARQUET);
3288        table_config
3289            .parquet
3290            .set(
3291                "crypto.file_encryption.encrypt_footer",
3292                config_encrypt.encrypt_footer.to_string().as_str(),
3293            )
3294            .unwrap();
3295        table_config
3296            .parquet
3297            .set(
3298                "crypto.file_encryption.footer_key_as_hex",
3299                config_encrypt.footer_key_as_hex.as_str(),
3300            )
3301            .unwrap();
3302
3303        for (i, col_name) in column_names.iter().enumerate() {
3304            let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3305            let value = hex::encode(column_keys[i].clone());
3306            table_config
3307                .parquet
3308                .set(key.as_str(), value.as_str())
3309                .unwrap();
3310        }
3311
3312        // Print matching final encryption config
3313        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3314
3315        assert_eq!(
3316            table_config.parquet.crypto.file_encryption,
3317            Some(config_encrypt)
3318        );
3319
3320        ///////////////////////////////////////////////////////////////////////////////////
3321        // Test decryption config
3322
3323        // Display original decryption config
3324        // println!("{:#?}", config_decrypt);
3325
3326        let mut table_config = TableOptions::new();
3327        table_config.set_config_format(ConfigFileType::PARQUET);
3328        table_config
3329            .parquet
3330            .set(
3331                "crypto.file_decryption.footer_key_as_hex",
3332                config_decrypt.footer_key_as_hex.as_str(),
3333            )
3334            .unwrap();
3335
3336        for (i, col_name) in column_names.iter().enumerate() {
3337            let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3338            let value = hex::encode(column_keys[i].clone());
3339            table_config
3340                .parquet
3341                .set(key.as_str(), value.as_str())
3342                .unwrap();
3343        }
3344
3345        // Print matching final decryption config
3346        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3347
3348        assert_eq!(
3349            table_config.parquet.crypto.file_decryption,
3350            Some(config_decrypt.clone())
3351        );
3352
3353        // Set config directly
3354        let mut table_config = TableOptions::new();
3355        table_config.set_config_format(ConfigFileType::PARQUET);
3356        table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3357        assert_eq!(
3358            table_config.parquet.crypto.file_decryption,
3359            Some(config_decrypt.clone())
3360        );
3361    }
3362
3363    #[cfg(feature = "parquet_encryption")]
3364    #[test]
3365    fn parquet_encryption_factory_config() {
3366        let mut parquet_options = TableParquetOptions::default();
3367
3368        assert_eq!(parquet_options.crypto.factory_id, None);
3369        assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3370
3371        let mut input_config = TestExtensionConfig::default();
3372        input_config
3373            .properties
3374            .insert("key1".to_string(), "value 1".to_string());
3375        input_config
3376            .properties
3377            .insert("key2".to_string(), "value 2".to_string());
3378
3379        parquet_options
3380            .crypto
3381            .configure_factory("example_factory", &input_config);
3382
3383        assert_eq!(
3384            parquet_options.crypto.factory_id,
3385            Some("example_factory".to_string())
3386        );
3387        let factory_options = &parquet_options.crypto.factory_options.options;
3388        assert_eq!(factory_options.len(), 2);
3389        assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3390        assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3391    }
3392
3393    #[cfg(feature = "parquet")]
3394    #[test]
3395    fn parquet_table_options_config_entry() {
3396        let mut table_config = TableOptions::new();
3397        table_config.set_config_format(ConfigFileType::PARQUET);
3398        table_config
3399            .set("format.bloom_filter_enabled::col1", "true")
3400            .unwrap();
3401        let entries = table_config.entries();
3402        assert!(
3403            entries
3404                .iter()
3405                .any(|item| item.key == "format.bloom_filter_enabled::col1")
3406        )
3407    }
3408
3409    #[cfg(feature = "parquet")]
3410    #[test]
3411    fn parquet_table_parquet_options_config_entry() {
3412        let mut table_parquet_options = TableParquetOptions::new();
3413        table_parquet_options
3414            .set(
3415                "crypto.file_encryption.column_key_as_hex::double_field",
3416                "31323334353637383930313233343530",
3417            )
3418            .unwrap();
3419        let entries = table_parquet_options.entries();
3420        assert!(
3421            entries.iter().any(|item| item.key
3422                == "crypto.file_encryption.column_key_as_hex::double_field")
3423        )
3424    }
3425
3426    #[cfg(feature = "parquet")]
3427    #[test]
3428    fn parquet_table_options_config_metadata_entry() {
3429        let mut table_config = TableOptions::new();
3430        table_config.set_config_format(ConfigFileType::PARQUET);
3431        table_config.set("format.metadata::key1", "").unwrap();
3432        table_config.set("format.metadata::key2", "value2").unwrap();
3433        table_config
3434            .set("format.metadata::key3", "value with spaces ")
3435            .unwrap();
3436        table_config
3437            .set("format.metadata::key4", "value with special chars :: :")
3438            .unwrap();
3439
3440        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3441        assert_eq!(parsed_metadata.get("should not exist1"), None);
3442        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3443        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3444        assert_eq!(
3445            parsed_metadata.get("key3"),
3446            Some(&Some("value with spaces ".into()))
3447        );
3448        assert_eq!(
3449            parsed_metadata.get("key4"),
3450            Some(&Some("value with special chars :: :".into()))
3451        );
3452
3453        // duplicate keys are overwritten
3454        table_config.set("format.metadata::key_dupe", "A").unwrap();
3455        table_config.set("format.metadata::key_dupe", "B").unwrap();
3456        let parsed_metadata = table_config.parquet.key_value_metadata;
3457        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3458    }
3459    #[cfg(feature = "parquet")]
3460    #[test]
3461    fn test_parquet_writer_version_validation() {
3462        use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};
3463
3464        let mut config = ConfigOptions::default();
3465
3466        // Valid values should work
3467        config
3468            .set("datafusion.execution.parquet.writer_version", "1.0")
3469            .unwrap();
3470        assert_eq!(
3471            config.execution.parquet.writer_version,
3472            DFParquetWriterVersion::V1_0
3473        );
3474
3475        config
3476            .set("datafusion.execution.parquet.writer_version", "2.0")
3477            .unwrap();
3478        assert_eq!(
3479            config.execution.parquet.writer_version,
3480            DFParquetWriterVersion::V2_0
3481        );
3482
3483        // Invalid value should error immediately at SET time
3484        let err = config
3485            .set("datafusion.execution.parquet.writer_version", "3.0")
3486            .unwrap_err();
3487        assert_eq!(
3488            err.to_string(),
3489            "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
3490        );
3491    }
3492}