datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26use crate::parsers::CompressionTypeVariant;
27use crate::utils::get_available_parallelism;
28use crate::{DataFusionError, Result};
29#[cfg(feature = "parquet_encryption")]
30use hex;
31use std::any::Any;
32use std::collections::{BTreeMap, HashMap};
33use std::error::Error;
34use std::fmt::{self, Display};
35use std::str::FromStr;
36#[cfg(feature = "parquet_encryption")]
37use std::sync::Arc;
38
39/// A macro that wraps a configuration struct and automatically derives
40/// [`Default`] and [`ConfigField`] for it, allowing it to be used
41/// in the [`ConfigOptions`] configuration tree.
42///
43/// `transform` is used to normalize values before parsing.
44///
45/// For example,
46///
47/// ```ignore
48/// config_namespace! {
49///    /// Amazing config
50///    pub struct MyConfig {
51///        /// Field 1 doc
52///        field1: String, transform = str::to_lowercase, default = "".to_string()
53///
54///        /// Field 2 doc
55///        field2: usize, default = 232
56///
57///        /// Field 3 doc
58///        field3: Option<usize>, default = None
59///    }
60/// }
61/// ```
62///
63/// Will generate
64///
65/// ```ignore
66/// /// Amazing config
67/// #[derive(Debug, Clone)]
68/// #[non_exhaustive]
69/// pub struct MyConfig {
70///     /// Field 1 doc
71///     field1: String,
72///     /// Field 2 doc
73///     field2: usize,
74///     /// Field 3 doc
75///     field3: Option<usize>,
76/// }
77/// impl ConfigField for MyConfig {
78///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
79///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
80///         match key {
81///             "field1" => {
82///                 let value = str::to_lowercase(value);
83///                 self.field1.set(rem, value.as_ref())
84///             },
85///             "field2" => self.field2.set(rem, value.as_ref()),
86///             "field3" => self.field3.set(rem, value.as_ref()),
87///             _ => _internal_err!(
88///                 "Config value \"{}\" not found on MyConfig",
89///                 key
90///             ),
91///         }
92///     }
93///
94///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
95///         let key = format!("{}.field1", key_prefix);
96///         let desc = "Field 1 doc";
97///         self.field1.visit(v, key.as_str(), desc);
98///         let key = format!("{}.field2", key_prefix);
99///         let desc = "Field 2 doc";
100///         self.field2.visit(v, key.as_str(), desc);
101///         let key = format!("{}.field3", key_prefix);
102///         let desc = "Field 3 doc";
103///         self.field3.visit(v, key.as_str(), desc);
104///     }
105/// }
106///
107/// impl Default for MyConfig {
108///     fn default() -> Self {
109///         Self {
110///             field1: "".to_string(),
111///             field2: 232,
112///             field3: None,
113///         }
114///     }
115/// }
116/// ```
117///
118/// NB: Misplaced commas may result in nonsensical errors
119#[macro_export]
120macro_rules! config_namespace {
121    (
122        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
123        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
124        $(#[allow($($struct_de:tt)*)])?
125        $vis:vis struct $struct_name:ident {
126            $(
127                $(#[doc = $d:tt])* // Field-level documentation attributes
128                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
129                $(#[allow($($field_de:tt)*)])?
130                $field_vis:vis $field_name:ident : $field_type:ty,
131                $(warn = $warn:expr,)?
132                $(transform = $transform:expr,)?
133                default = $default:expr
134            )*$(,)*
135        }
136    ) => {
137        $(#[doc = $struct_d])* // Apply struct documentation
138        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
139        $(#[allow($($struct_de)*)])?
140        #[derive(Debug, Clone, PartialEq)]
141        $vis struct $struct_name {
142            $(
143                $(#[doc = $d])* // Apply field documentation
144                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
145                $(#[allow($($field_de)*)])?
146                $field_vis $field_name: $field_type,
147            )*
148        }
149
150        impl $crate::config::ConfigField for $struct_name {
151            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
152                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
153                match key {
154                    $(
155                        stringify!($field_name) => {
156                            // Safely apply deprecated attribute if present
157                            // $(#[allow(deprecated)])?
158                            {
159                                $(let value = $transform(value);)? // Apply transformation if specified
160                                #[allow(deprecated)]
161                                let ret = self.$field_name.set(rem, value.as_ref());
162
163                                $(if !$warn.is_empty() {
164                                    let default: $field_type = $default;
165                                    #[allow(deprecated)]
166                                    if default != self.$field_name {
167                                        log::warn!($warn);
168                                    }
169                                })? // Log warning if specified, and the value is not the default
170                                ret
171                            }
172                        },
173                    )*
174                    _ => return $crate::error::_config_err!(
175                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
176                    )
177                }
178            }
179
180            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
181                $(
182                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
183                    let desc = concat!($($d),*).trim();
184                    #[allow(deprecated)]
185                    self.$field_name.visit(v, key.as_str(), desc);
186                )*
187            }
188        }
189        impl Default for $struct_name {
190            fn default() -> Self {
191                #[allow(deprecated)]
192                Self {
193                    $($field_name: $default),*
194                }
195            }
196        }
197    }
198}
199
200config_namespace! {
201    /// Options related to catalog and directory scanning
202    ///
203    /// See also: [`SessionConfig`]
204    ///
205    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
206    pub struct CatalogOptions {
207        /// Whether the default catalog and schema should be created automatically.
208        pub create_default_catalog_and_schema: bool, default = true
209
210        /// The default catalog name - this impacts what SQL queries use if not specified
211        pub default_catalog: String, default = "datafusion".to_string()
212
213        /// The default schema name - this impacts what SQL queries use if not specified
214        pub default_schema: String, default = "public".to_string()
215
216        /// Should DataFusion provide access to `information_schema`
217        /// virtual tables for displaying schema information
218        pub information_schema: bool, default = false
219
220        /// Location scanned to load tables for `default` schema
221        pub location: Option<String>, default = None
222
223        /// Type of `TableProvider` to use when loading `default` schema
224        pub format: Option<String>, default = None
225
226        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
227        /// if not specified explicitly in the statement.
228        pub has_header: bool, default = true
229
230        /// Specifies whether newlines in (quoted) CSV values are supported.
231        ///
232        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
233        /// if not specified explicitly in the statement.
234        ///
235        /// Parsing newlines in quoted values may be affected by execution behaviour such as
236        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
237        /// parsed successfully, which may reduce performance.
238        pub newlines_in_values: bool, default = false
239    }
240}
241
242config_namespace! {
243    /// Options related to SQL parser
244    ///
245    /// See also: [`SessionConfig`]
246    ///
247    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
248    pub struct SqlParserOptions {
249        /// When set to true, SQL parser will parse float as decimal type
250        pub parse_float_as_decimal: bool, default = false
251
252        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
253        pub enable_ident_normalization: bool, default = true
254
255        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
256        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
257        /// are normalized automatically.
258        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
259
260        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
261        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
262        pub dialect: Dialect, default = Dialect::Generic
263        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
264
265        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
266        /// ignore the length. If false, error if a `VARCHAR` with a length is
267        /// specified. The Arrow type system does not have a notion of maximum
268        /// string length and thus DataFusion can not enforce such limits.
269        pub support_varchar_with_length: bool, default = true
270
271        /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
272        /// If false, they are mapped to `Utf8`.
273        /// Default is true.
274        pub map_string_types_to_utf8view: bool, default = true
275
276        /// When set to true, the source locations relative to the original SQL
277        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
278        /// and recorded in the logical plan nodes.
279        pub collect_spans: bool, default = false
280
281        /// Specifies the recursion depth limit when parsing complex SQL Queries
282        pub recursion_limit: usize, default = 50
283
284        /// Specifies the default null ordering for query results. There are 4 options:
285        /// - `nulls_max`: Nulls appear last in ascending order.
286        /// - `nulls_min`: Nulls appear first in ascending order.
287        /// - `nulls_first`: Nulls always be first in any order.
288        /// - `nulls_last`: Nulls always be last in any order.
289        ///
290        /// By default, `nulls_max` is used to follow Postgres's behavior.
291        /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
292        pub default_null_ordering: String, default = "nulls_max".to_string()
293    }
294}
295
296/// This is the SQL dialect used by DataFusion's parser.
297/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
298/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
299#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
300pub enum Dialect {
301    #[default]
302    Generic,
303    MySQL,
304    PostgreSQL,
305    Hive,
306    SQLite,
307    Snowflake,
308    Redshift,
309    MsSQL,
310    ClickHouse,
311    BigQuery,
312    Ansi,
313    DuckDB,
314    Databricks,
315}
316
317impl AsRef<str> for Dialect {
318    fn as_ref(&self) -> &str {
319        match self {
320            Self::Generic => "generic",
321            Self::MySQL => "mysql",
322            Self::PostgreSQL => "postgresql",
323            Self::Hive => "hive",
324            Self::SQLite => "sqlite",
325            Self::Snowflake => "snowflake",
326            Self::Redshift => "redshift",
327            Self::MsSQL => "mssql",
328            Self::ClickHouse => "clickhouse",
329            Self::BigQuery => "bigquery",
330            Self::Ansi => "ansi",
331            Self::DuckDB => "duckdb",
332            Self::Databricks => "databricks",
333        }
334    }
335}
336
337impl FromStr for Dialect {
338    type Err = DataFusionError;
339
340    fn from_str(s: &str) -> Result<Self, Self::Err> {
341        let value = match s.to_ascii_lowercase().as_str() {
342            "generic" => Self::Generic,
343            "mysql" => Self::MySQL,
344            "postgresql" | "postgres" => Self::PostgreSQL,
345            "hive" => Self::Hive,
346            "sqlite" => Self::SQLite,
347            "snowflake" => Self::Snowflake,
348            "redshift" => Self::Redshift,
349            "mssql" => Self::MsSQL,
350            "clickhouse" => Self::ClickHouse,
351            "bigquery" => Self::BigQuery,
352            "ansi" => Self::Ansi,
353            "duckdb" => Self::DuckDB,
354            "databricks" => Self::Databricks,
355            other => {
356                let error_message = format!(
357                    "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
358                );
359                return Err(DataFusionError::Configuration(error_message));
360            }
361        };
362        Ok(value)
363    }
364}
365
366impl ConfigField for Dialect {
367    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
368        v.some(key, self, description)
369    }
370
371    fn set(&mut self, _: &str, value: &str) -> Result<()> {
372        *self = Self::from_str(value)?;
373        Ok(())
374    }
375}
376
377impl Display for Dialect {
378    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379        let str = self.as_ref();
380        write!(f, "{str}")
381    }
382}
383
384#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
385pub enum SpillCompression {
386    Zstd,
387    Lz4Frame,
388    #[default]
389    Uncompressed,
390}
391
392impl FromStr for SpillCompression {
393    type Err = DataFusionError;
394
395    fn from_str(s: &str) -> Result<Self, Self::Err> {
396        match s.to_ascii_lowercase().as_str() {
397            "zstd" => Ok(Self::Zstd),
398            "lz4_frame" => Ok(Self::Lz4Frame),
399            "uncompressed" | "" => Ok(Self::Uncompressed),
400            other => Err(DataFusionError::Configuration(format!(
401                "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
402            ))),
403        }
404    }
405}
406
407impl ConfigField for SpillCompression {
408    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
409        v.some(key, self, description)
410    }
411
412    fn set(&mut self, _: &str, value: &str) -> Result<()> {
413        *self = SpillCompression::from_str(value)?;
414        Ok(())
415    }
416}
417
418impl Display for SpillCompression {
419    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
420        let str = match self {
421            Self::Zstd => "zstd",
422            Self::Lz4Frame => "lz4_frame",
423            Self::Uncompressed => "uncompressed",
424        };
425        write!(f, "{str}")
426    }
427}
428
429impl From<SpillCompression> for Option<CompressionType> {
430    fn from(c: SpillCompression) -> Self {
431        match c {
432            SpillCompression::Zstd => Some(CompressionType::ZSTD),
433            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
434            SpillCompression::Uncompressed => None,
435        }
436    }
437}
438
439config_namespace! {
440    /// Options related to query execution
441    ///
442    /// See also: [`SessionConfig`]
443    ///
444    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
445    pub struct ExecutionOptions {
446        /// Default batch size while creating new batches, it's especially useful for
447        /// buffer-in-memory batches since creating tiny batches would result in too much
448        /// metadata memory consumption
449        pub batch_size: usize, default = 8192
450
451        /// When set to true, record batches will be examined between each operator and
452        /// small batches will be coalesced into larger batches. This is helpful when there
453        /// are highly selective filters or joins that could produce tiny output batches. The
454        /// target batch size is determined by the configuration setting
455        pub coalesce_batches: bool, default = true
456
457        /// Should DataFusion collect statistics when first creating a table.
458        /// Has no effect after the table is created. Applies to the default
459        /// `ListingTableProvider` in DataFusion. Defaults to true.
460        pub collect_statistics: bool, default = true
461
462        /// Number of partitions for query execution. Increasing partitions can increase
463        /// concurrency.
464        ///
465        /// Defaults to the number of CPU cores on the system
466        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
467
468        /// The default time zone
469        ///
470        /// Some functions, e.g. `now` return timestamps in this time zone
471        pub time_zone: Option<String>, default = None
472
473        /// Parquet options
474        pub parquet: ParquetOptions, default = Default::default()
475
476        /// Fan-out during initial physical planning.
477        ///
478        /// This is mostly use to plan `UNION` children in parallel.
479        ///
480        /// Defaults to the number of CPU cores on the system
481        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
482
483        /// When set to true, skips verifying that the schema produced by
484        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
485        /// schema of the input plan.
486        ///
487        /// When set to false, if the schema does not match exactly
488        /// (including nullability and metadata), a planning error will be raised.
489        ///
490        /// This is used to workaround bugs in the planner that are now caught by
491        /// the new schema verification step.
492        pub skip_physical_aggregate_schema_check: bool, default = false
493
494        /// Sets the compression codec used when spilling data to disk.
495        ///
496        /// Since datafusion writes spill files using the Arrow IPC Stream format,
497        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
498        /// Valid values are: uncompressed, lz4_frame, zstd.
499        /// Note: lz4_frame offers faster (de)compression, but typically results in
500        /// larger spill files. In contrast, zstd achieves
501        /// higher compression ratios at the cost of slower (de)compression speed.
502        pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
503
504        /// Specifies the reserved memory for each spillable sort operation to
505        /// facilitate an in-memory merge.
506        ///
507        /// When a sort operation spills to disk, the in-memory data must be
508        /// sorted and merged before being written to a file. This setting reserves
509        /// a specific amount of memory for that in-memory sort/merge process.
510        ///
511        /// Note: This setting is irrelevant if the sort operation cannot spill
512        /// (i.e., if there's no `DiskManager` configured).
513        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
514
515        /// When sorting, below what size should data be concatenated
516        /// and sorted in a single RecordBatch rather than sorted in
517        /// batches and merged.
518        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
519
520        /// Maximum size in bytes for individual spill files before rotating to a new file.
521        ///
522        /// When operators spill data to disk (e.g., RepartitionExec), they write
523        /// multiple batches to the same file until this size limit is reached, then rotate
524        /// to a new file. This reduces syscall overhead compared to one-file-per-batch
525        /// while preventing files from growing too large.
526        ///
527        /// A larger value reduces file creation overhead but may hold more disk space.
528        /// A smaller value creates more files but allows finer-grained space reclamation
529        /// as files can be deleted once fully consumed.
530        ///
531        /// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
532        /// may create spill files larger than the limit.
533        ///
534        /// Default: 128 MB
535        pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024
536
537        /// Number of files to read in parallel when inferring schema and statistics
538        pub meta_fetch_concurrency: usize, default = 32
539
540        /// Guarantees a minimum level of output files running in parallel.
541        /// RecordBatches will be distributed in round robin fashion to each
542        /// parallel writer. Each writer is closed and a new file opened once
543        /// soft_max_rows_per_output_file is reached.
544        pub minimum_parallel_output_files: usize, default = 4
545
546        /// Target number of rows in output files when writing multiple.
547        /// This is a soft max, so it can be exceeded slightly. There also
548        /// will be one file smaller than the limit if the total
549        /// number of rows written is not roughly divisible by the soft max
550        pub soft_max_rows_per_output_file: usize, default = 50000000
551
552        /// This is the maximum number of RecordBatches buffered
553        /// for each output file being worked. Higher values can potentially
554        /// give faster write performance at the cost of higher peak
555        /// memory consumption
556        pub max_buffered_batches_per_output_file: usize, default = 2
557
558        /// Should sub directories be ignored when scanning directories for data
559        /// files. Defaults to true (ignores subdirectories), consistent with
560        /// Hive. Note that this setting does not affect reading partitioned
561        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
562        pub listing_table_ignore_subdirectory: bool, default = true
563
564        /// Should a `ListingTable` created through the `ListingTableFactory` infer table
565        /// partitions from Hive compliant directories. Defaults to true (partition columns are
566        /// inferred and will be represented in the table schema).
567        pub listing_table_factory_infer_partitions: bool, default = true
568
569        /// Should DataFusion support recursive CTEs
570        pub enable_recursive_ctes: bool, default = true
571
572        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
573        /// statistics into the same file groups.
574        /// Currently experimental
575        pub split_file_groups_by_statistics: bool, default = false
576
577        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
578        pub keep_partition_by_columns: bool, default = false
579
580        /// Aggregation ratio (number of distinct groups / number of input rows)
581        /// threshold for skipping partial aggregation. If the value is greater
582        /// then partial aggregation will skip aggregation for further input
583        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
584
585        /// Number of input rows partial aggregation partition should process, before
586        /// aggregation ratio check and trying to switch to skipping aggregation mode
587        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
588
589        /// Should DataFusion use row number estimates at the input to decide
590        /// whether increasing parallelism is beneficial or not. By default,
591        /// only exact row numbers (not estimates) are used for this decision.
592        /// Setting this flag to `true` will likely produce better plans.
593        /// if the source of statistics is accurate.
594        /// We plan to make this the default in the future.
595        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
596
597        /// Should DataFusion enforce batch size in joins or not. By default,
598        /// DataFusion will not enforce batch size in joins. Enforcing batch size
599        /// in joins can reduce memory usage when joining large
600        /// tables with a highly-selective join filter, but is also slightly slower.
601        pub enforce_batch_size_in_joins: bool, default = false
602
603        /// Size (bytes) of data buffer DataFusion uses when writing output files.
604        /// This affects the size of the data chunks that are uploaded to remote
605        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
606        /// written, it may be necessary to increase this size to avoid errors from
607        /// the remote end point.
608        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
609    }
610}
611
612config_namespace! {
613    /// Options for reading and writing parquet files
614    ///
615    /// See also: [`SessionConfig`]
616    ///
617    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
618    pub struct ParquetOptions {
619        // The following options affect reading parquet files
620
621        /// (reading) If true, reads the Parquet data page level metadata (the
622        /// Page Index), if present, to reduce the I/O and number of
623        /// rows decoded.
624        pub enable_page_index: bool, default = true
625
626        /// (reading) If true, the parquet reader attempts to skip entire row groups based
627        /// on the predicate in the query and the metadata (min/max values) stored in
628        /// the parquet file
629        pub pruning: bool, default = true
630
631        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
632        /// the file Schema. This setting can help avoid schema conflicts when querying
633        /// multiple parquet files with schemas containing compatible types but different metadata
634        pub skip_metadata: bool, default = true
635
636        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
637        /// bytes of the parquet file optimistically. If not specified, two reads are required:
638        /// One read to fetch the 8-byte parquet footer and
639        /// another to fetch the metadata length encoded in the footer
640        /// Default setting to 512 KiB, which should be sufficient for most parquet files,
641        /// it can reduce one I/O operation per parquet file. If the metadata is larger than
642        /// the hint, two reads will still be performed.
643        pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
644
645        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
646        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
647        pub pushdown_filters: bool, default = false
648
649        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
650        /// will be reordered heuristically to minimize the cost of evaluation. If false,
651        /// the filters are applied in the same order as written in the query
652        pub reorder_filters: bool, default = false
653
654        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
655        /// and `Binary/BinaryLarge` with `BinaryView`.
656        pub schema_force_view_types: bool, default = true
657
658        /// (reading) If true, parquet reader will read columns of
659        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
660        ///
661        /// Parquet files generated by some legacy writers do not correctly set
662        /// the UTF8 flag for strings, causing string columns to be loaded as
663        /// BLOB instead.
664        pub binary_as_string: bool, default = false
665
666        /// (reading) If true, parquet reader will read columns of
667        /// physical type int96 as originating from a different resolution
668        /// than nanosecond. This is useful for reading data from systems like Spark
669        /// which stores microsecond resolution timestamps in an int96 allowing it
670        /// to write values with a larger date range than 64-bit timestamps with
671        /// nanosecond resolution.
672        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
673
674        /// (reading) Use any available bloom filters when reading parquet files
675        pub bloom_filter_on_read: bool, default = true
676
677        /// (reading) The maximum predicate cache size, in bytes. When
678        /// `pushdown_filters` is enabled, sets the maximum memory used to cache
679        /// the results of predicate evaluation between filter evaluation and
680        /// output generation. Decreasing this value will reduce memory usage,
681        /// but may increase IO and CPU usage. None means use the default
682        /// parquet reader setting. 0 means no caching.
683        pub max_predicate_cache_size: Option<usize>, default = None
684
685        // The following options affect writing to parquet files
686        // and map to parquet::file::properties::WriterProperties
687
688        /// (writing) Sets best effort maximum size of data page in bytes
689        pub data_pagesize_limit: usize, default = 1024 * 1024
690
691        /// (writing) Sets write_batch_size in bytes
692        pub write_batch_size: usize, default = 1024
693
694        /// (writing) Sets parquet writer version
695        /// valid values are "1.0" and "2.0"
696        pub writer_version: String, default = "1.0".to_string()
697
698        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
699        ///
700        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
701        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
702        pub skip_arrow_metadata: bool, default = false
703
704        /// (writing) Sets default parquet compression codec.
705        /// Valid values are: uncompressed, snappy, gzip(level),
706        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
707        /// These values are not case sensitive. If NULL, uses
708        /// default parquet writer setting
709        ///
710        /// Note that this default setting is not the same as
711        /// the default parquet writer setting.
712        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
713
714        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
715        /// default parquet writer setting
716        pub dictionary_enabled: Option<bool>, default = Some(true)
717
718        /// (writing) Sets best effort maximum dictionary page size, in bytes
719        pub dictionary_page_size_limit: usize, default = 1024 * 1024
720
721        /// (writing) Sets if statistics are enabled for any column
722        /// Valid values are: "none", "chunk", and "page"
723        /// These values are not case sensitive. If NULL, uses
724        /// default parquet writer setting
725        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
726
727        /// (writing) Target maximum number of rows in each row group (defaults to 1M
728        /// rows). Writing larger row groups requires more memory to write, but
729        /// can get better compression and be faster to read.
730        pub max_row_group_size: usize, default =  1024 * 1024
731
732        /// (writing) Sets "created by" property
733        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
734
735        /// (writing) Sets column index truncate length
736        pub column_index_truncate_length: Option<usize>, default = Some(64)
737
738        /// (writing) Sets statistics truncate length. If NULL, uses
739        /// default parquet writer setting
740        pub statistics_truncate_length: Option<usize>, default = Some(64)
741
742        /// (writing) Sets best effort maximum number of rows in data page
743        pub data_page_row_count_limit: usize, default = 20_000
744
745        /// (writing)  Sets default encoding for any column.
746        /// Valid values are: plain, plain_dictionary, rle,
747        /// bit_packed, delta_binary_packed, delta_length_byte_array,
748        /// delta_byte_array, rle_dictionary, and byte_stream_split.
749        /// These values are not case sensitive. If NULL, uses
750        /// default parquet writer setting
751        pub encoding: Option<String>, transform = str::to_lowercase, default = None
752
753        /// (writing) Write bloom filters for all columns when creating parquet files
754        pub bloom_filter_on_write: bool, default = false
755
756        /// (writing) Sets bloom filter false positive probability. If NULL, uses
757        /// default parquet writer setting
758        pub bloom_filter_fpp: Option<f64>, default = None
759
760        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
761        /// default parquet writer setting
762        pub bloom_filter_ndv: Option<u64>, default = None
763
764        /// (writing) Controls whether DataFusion will attempt to speed up writing
765        /// parquet files by serializing them in parallel. Each column
766        /// in each row group in each output file are serialized in parallel
767        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
768        pub allow_single_file_parallelism: bool, default = true
769
770        /// (writing) By default parallel parquet writer is tuned for minimum
771        /// memory usage in a streaming execution plan. You may see
772        /// a performance benefit when writing large parquet files
773        /// by increasing maximum_parallel_row_group_writers and
774        /// maximum_buffered_record_batches_per_stream if your system
775        /// has idle cores and can tolerate additional memory usage.
776        /// Boosting these values is likely worthwhile when
777        /// writing out already in-memory data, such as from a cached
778        /// data frame.
779        pub maximum_parallel_row_group_writers: usize, default = 1
780
781        /// (writing) By default parallel parquet writer is tuned for minimum
782        /// memory usage in a streaming execution plan. You may see
783        /// a performance benefit when writing large parquet files
784        /// by increasing maximum_parallel_row_group_writers and
785        /// maximum_buffered_record_batches_per_stream if your system
786        /// has idle cores and can tolerate additional memory usage.
787        /// Boosting these values is likely worthwhile when
788        /// writing out already in-memory data, such as from a cached
789        /// data frame.
790        pub maximum_buffered_record_batches_per_stream: usize, default = 2
791    }
792}
793
794config_namespace! {
795    /// Options for configuring Parquet Modular Encryption
796    ///
797    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
798    pub struct ParquetEncryptionOptions {
799        /// Optional file decryption properties
800        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
801
802        /// Optional file encryption properties
803        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
804
805        /// Identifier for the encryption factory to use to create file encryption and decryption properties.
806        /// Encryption factories can be registered in the runtime environment with
807        /// `RuntimeEnv::register_parquet_encryption_factory`.
808        pub factory_id: Option<String>, default = None
809
810        /// Any encryption factory specific options
811        pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
812    }
813}
814
815impl ParquetEncryptionOptions {
816    /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
817    pub fn configure_factory(
818        &mut self,
819        factory_id: &str,
820        config: &impl ExtensionOptions,
821    ) {
822        self.factory_id = Some(factory_id.to_owned());
823        self.factory_options.options.clear();
824        for entry in config.entries() {
825            if let Some(value) = entry.value {
826                self.factory_options.options.insert(entry.key, value);
827            }
828        }
829    }
830}
831
832config_namespace! {
833    /// Options related to query optimization
834    ///
835    /// See also: [`SessionConfig`]
836    ///
837    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
838    pub struct OptimizerOptions {
839        /// When set to true, the optimizer will push a limit operation into
840        /// grouped aggregations which have no aggregate expressions, as a soft limit,
841        /// emitting groups once the limit is reached, before all rows in the group are read.
842        pub enable_distinct_aggregation_soft_limit: bool, default = true
843
844        /// When set to true, the physical plan optimizer will try to add round robin
845        /// repartitioning to increase parallelism to leverage more CPU cores
846        pub enable_round_robin_repartition: bool, default = true
847
848        /// When set to true, the optimizer will attempt to perform limit operations
849        /// during aggregations, if possible
850        pub enable_topk_aggregation: bool, default = true
851
852        /// When set to true, the optimizer will attempt to push limit operations
853        /// past window functions, if possible
854        pub enable_window_limits: bool, default = true
855
856        /// When set to true, the optimizer will attempt to push down TopK dynamic filters
857        /// into the file scan phase.
858        pub enable_topk_dynamic_filter_pushdown: bool, default = true
859
860        /// When set to true, the optimizer will attempt to push down Join dynamic filters
861        /// into the file scan phase.
862        pub enable_join_dynamic_filter_pushdown: bool, default = true
863
864        /// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase.
865        /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
866        /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
867        /// This means that if we already have 10 timestamps in the year 2025
868        /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
869        /// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown`
870        /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
871        pub enable_dynamic_filter_pushdown: bool, default = true
872
873        /// When set to true, the optimizer will insert filters before a join between
874        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
875        /// filter can add additional overhead when the file format does not fully support
876        /// predicate push down.
877        pub filter_null_join_keys: bool, default = false
878
879        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
880        /// in parallel using the provided `target_partitions` level
881        pub repartition_aggregations: bool, default = true
882
883        /// Minimum total files size in bytes to perform file scan repartitioning.
884        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
885
886        /// Should DataFusion repartition data using the join keys to execute joins in parallel
887        /// using the provided `target_partitions` level
888        pub repartition_joins: bool, default = true
889
890        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
891        /// its inputs do not have any ordering or filtering If the flag is not enabled,
892        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
893        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
894        /// RightAnti, and RightSemi - being produced only at the end of the execution.
895        /// This is not typical in stream processing. Additionally, without proper design for
896        /// long runner execution, all types of joins may encounter out-of-memory errors.
897        pub allow_symmetric_joins_without_pruning: bool, default = true
898
899        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
900        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
901        ///
902        /// For FileSources, only Parquet and CSV formats are currently supported.
903        ///
904        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
905        /// might be partitioned into smaller chunks) for parallel scanning.
906        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
907        /// happen within a single file.
908        ///
909        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
910        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
911        /// the total number of partitions and batches per partition, but does not slice the initial
912        /// record tables provided to the MemTable on creation.
913        pub repartition_file_scans: bool, default = true
914
915        /// Should DataFusion repartition data using the partitions keys to execute window
916        /// functions in parallel using the provided `target_partitions` level
917        pub repartition_windows: bool, default = true
918
919        /// Should DataFusion execute sorts in a per-partition fashion and merge
920        /// afterwards instead of coalescing first and sorting globally.
921        /// With this flag is enabled, plans in the form below
922        ///
923        /// ```text
924        ///      "SortExec: [a@0 ASC]",
925        ///      "  CoalescePartitionsExec",
926        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
927        /// ```
928        /// would turn into the plan below which performs better in multithreaded environments
929        ///
930        /// ```text
931        ///      "SortPreservingMergeExec: [a@0 ASC]",
932        ///      "  SortExec: [a@0 ASC]",
933        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
934        /// ```
935        pub repartition_sorts: bool, default = true
936
937        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
938        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
939        /// using `SortPreservingMergeExec`)
940        ///
941        /// When false, DataFusion will maximize plan parallelism using
942        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
943        pub prefer_existing_sort: bool, default = false
944
945        /// When set to true, the logical plan optimizer will produce warning
946        /// messages if any optimization rules produce errors and then proceed to the next
947        /// rule. When set to false, any rules that produce errors will cause the query to fail
948        pub skip_failed_rules: bool, default = false
949
950        /// Number of times that the optimizer will attempt to optimize the plan
951        pub max_passes: usize, default = 3
952
953        /// When set to true, the physical plan optimizer will run a top down
954        /// process to reorder the join keys
955        pub top_down_join_key_reordering: bool, default = true
956
957        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
958        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
959        pub prefer_hash_join: bool, default = true
960
961        /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
962        /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
963        /// one range filter.
964        pub enable_piecewise_merge_join: bool, default = false
965
966        /// The maximum estimated size in bytes for one input side of a HashJoin
967        /// will be collected into a single partition
968        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
969
970        /// The maximum estimated size in rows for one input side of a HashJoin
971        /// will be collected into a single partition
972        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
973
974        /// The default filter selectivity used by Filter Statistics
975        /// when an exact selectivity cannot be determined. Valid values are
976        /// between 0 (no selectivity) and 100 (all rows are selected).
977        pub default_filter_selectivity: u8, default = 20
978
979        /// When set to true, the optimizer will not attempt to convert Union to Interleave
980        pub prefer_existing_union: bool, default = false
981
982        /// When set to true, if the returned type is a view type
983        /// then the output will be coerced to a non-view.
984        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
985        pub expand_views_at_output: bool, default = false
986    }
987}
988
989config_namespace! {
990    /// Options controlling explain output
991    ///
992    /// See also: [`SessionConfig`]
993    ///
994    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
995    pub struct ExplainOptions {
996        /// When set to true, the explain statement will only print logical plans
997        pub logical_plan_only: bool, default = false
998
999        /// When set to true, the explain statement will only print physical plans
1000        pub physical_plan_only: bool, default = false
1001
1002        /// When set to true, the explain statement will print operator statistics
1003        /// for physical plans
1004        pub show_statistics: bool, default = false
1005
1006        /// When set to true, the explain statement will print the partition sizes
1007        pub show_sizes: bool, default = true
1008
1009        /// When set to true, the explain statement will print schema information
1010        pub show_schema: bool, default = false
1011
1012        /// Display format of explain. Default is "indent".
1013        /// When set to "tree", it will print the plan in a tree-rendered format.
1014        pub format: ExplainFormat, default = ExplainFormat::Indent
1015
1016        /// (format=tree only) Maximum total width of the rendered tree.
1017        /// When set to 0, the tree will have no width limit.
1018        pub tree_maximum_render_width: usize, default = 240
1019
1020        /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1021        /// "summary" shows common metrics for high-level insights.
1022        /// "dev" provides deep operator-level introspection for developers.
1023        pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1024    }
1025}
1026
1027impl ExecutionOptions {
1028    /// Returns the correct parallelism based on the provided `value`.
1029    /// If `value` is `"0"`, returns the default available parallelism, computed with
1030    /// `get_available_parallelism`. Otherwise, returns `value`.
1031    fn normalized_parallelism(value: &str) -> String {
1032        if value.parse::<usize>() == Ok(0) {
1033            get_available_parallelism().to_string()
1034        } else {
1035            value.to_owned()
1036        }
1037    }
1038}
1039
1040config_namespace! {
1041    /// Options controlling the format of output when printing record batches
1042    /// Copies [`arrow::util::display::FormatOptions`]
1043    pub struct FormatOptions {
1044        /// If set to `true` any formatting errors will be written to the output
1045        /// instead of being converted into a [`std::fmt::Error`]
1046        pub safe: bool, default = true
1047        /// Format string for nulls
1048        pub null: String, default = "".into()
1049        /// Date format for date arrays
1050        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1051        /// Format for DateTime arrays
1052        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1053        /// Timestamp format for timestamp arrays
1054        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1055        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1056        pub timestamp_tz_format: Option<String>, default = None
1057        /// Time format for time arrays
1058        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1059        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1060        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1061        /// Show types in visual representation batches
1062        pub types_info: bool, default = false
1063    }
1064}
1065
1066impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
1067    type Error = DataFusionError;
1068    fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
1069        let duration_format = match self.duration_format.as_str() {
1070            "pretty" => arrow::util::display::DurationFormat::Pretty,
1071            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1072            _ => {
1073                return _config_err!(
1074                    "Invalid duration format: {}. Valid values are pretty or iso8601",
1075                    self.duration_format
1076                )
1077            }
1078        };
1079
1080        Ok(arrow::util::display::FormatOptions::new()
1081            .with_display_error(self.safe)
1082            .with_null(&self.null)
1083            .with_date_format(self.date_format.as_deref())
1084            .with_datetime_format(self.datetime_format.as_deref())
1085            .with_timestamp_format(self.timestamp_format.as_deref())
1086            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
1087            .with_time_format(self.time_format.as_deref())
1088            .with_duration_format(duration_format)
1089            .with_types_info(self.types_info))
1090    }
1091}
1092
1093/// A key value pair, with a corresponding description
1094#[derive(Debug, Hash, PartialEq, Eq)]
1095pub struct ConfigEntry {
1096    /// A unique string to identify this config value
1097    pub key: String,
1098
1099    /// The value if any
1100    pub value: Option<String>,
1101
1102    /// A description of this configuration entry
1103    pub description: &'static str,
1104}
1105
1106/// Configuration options struct, able to store both built-in configuration and custom options
1107#[derive(Debug, Clone, Default)]
1108#[non_exhaustive]
1109pub struct ConfigOptions {
1110    /// Catalog options
1111    pub catalog: CatalogOptions,
1112    /// Execution options
1113    pub execution: ExecutionOptions,
1114    /// Optimizer options
1115    pub optimizer: OptimizerOptions,
1116    /// SQL parser options
1117    pub sql_parser: SqlParserOptions,
1118    /// Explain options
1119    pub explain: ExplainOptions,
1120    /// Optional extensions registered using [`Extensions::insert`]
1121    pub extensions: Extensions,
1122    /// Formatting options when printing batches
1123    pub format: FormatOptions,
1124}
1125
1126impl ConfigField for ConfigOptions {
1127    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1128        // Extensions are handled in the public `ConfigOptions::set`
1129        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1130        match key {
1131            "catalog" => self.catalog.set(rem, value),
1132            "execution" => self.execution.set(rem, value),
1133            "optimizer" => self.optimizer.set(rem, value),
1134            "explain" => self.explain.set(rem, value),
1135            "sql_parser" => self.sql_parser.set(rem, value),
1136            "format" => self.format.set(rem, value),
1137            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1138        }
1139    }
1140
1141    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1142        self.catalog.visit(v, "datafusion.catalog", "");
1143        self.execution.visit(v, "datafusion.execution", "");
1144        self.optimizer.visit(v, "datafusion.optimizer", "");
1145        self.explain.visit(v, "datafusion.explain", "");
1146        self.sql_parser.visit(v, "datafusion.sql_parser", "");
1147        self.format.visit(v, "datafusion.format", "");
1148    }
1149}
1150
1151impl ConfigOptions {
1152    /// Creates a new [`ConfigOptions`] with default values
1153    pub fn new() -> Self {
1154        Self::default()
1155    }
1156
1157    /// Set extensions to provided value
1158    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1159        self.extensions = extensions;
1160        self
1161    }
1162
1163    /// Set a configuration option
1164    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1165        let Some((prefix, key)) = key.split_once('.') else {
1166            return _config_err!("could not find config namespace for key \"{key}\"");
1167        };
1168
1169        if prefix == "datafusion" {
1170            if key == "optimizer.enable_dynamic_filter_pushdown" {
1171                let bool_value = value.parse::<bool>().map_err(|e| {
1172                    DataFusionError::Configuration(format!(
1173                        "Failed to parse '{value}' as bool: {e}",
1174                    ))
1175                })?;
1176
1177                {
1178                    self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1179                    self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1180                    self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1181                }
1182                return Ok(());
1183            }
1184            return ConfigField::set(self, key, value);
1185        }
1186
1187        let Some(e) = self.extensions.0.get_mut(prefix) else {
1188            return _config_err!("Could not find config namespace \"{prefix}\"");
1189        };
1190        e.0.set(key, value)
1191    }
1192
1193    /// Create new [`ConfigOptions`], taking values from environment variables
1194    /// where possible.
1195    ///
1196    /// For example, to configure `datafusion.execution.batch_size`
1197    /// ([`ExecutionOptions::batch_size`]) you would set the
1198    /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1199    ///
1200    /// The name of the environment variable is the option's key, transformed to
1201    /// uppercase and with periods replaced with underscores.
1202    ///
1203    /// Values are parsed according to the [same rules used in casts from
1204    /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1205    ///
1206    /// If the value in the environment variable cannot be cast to the type of
1207    /// the configuration option, the default value will be used instead and a
1208    /// warning emitted. Environment variables are read when this method is
1209    /// called, and are not re-read later.
1210    pub fn from_env() -> Result<Self> {
1211        struct Visitor(Vec<String>);
1212
1213        impl Visit for Visitor {
1214            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1215                self.0.push(key.to_string())
1216            }
1217
1218            fn none(&mut self, key: &str, _: &'static str) {
1219                self.0.push(key.to_string())
1220            }
1221        }
1222
1223        // Extract the names of all fields and then look up the corresponding
1224        // environment variables. This isn't hugely efficient but avoids
1225        // ambiguity between `a.b` and `a_b` which would both correspond
1226        // to an environment variable of `A_B`
1227
1228        let mut keys = Visitor(vec![]);
1229        let mut ret = Self::default();
1230        ret.visit(&mut keys, "datafusion", "");
1231
1232        for key in keys.0 {
1233            let env = key.to_uppercase().replace('.', "_");
1234            if let Some(var) = std::env::var_os(env) {
1235                let value = var.to_string_lossy();
1236                log::info!("Set {key} to {value} from the environment variable");
1237                ret.set(&key, value.as_ref())?;
1238            }
1239        }
1240
1241        Ok(ret)
1242    }
1243
1244    /// Create new ConfigOptions struct, taking values from a string hash map.
1245    ///
1246    /// Only the built-in configurations will be extracted from the hash map
1247    /// and other key value pairs will be ignored.
1248    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1249        struct Visitor(Vec<String>);
1250
1251        impl Visit for Visitor {
1252            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1253                self.0.push(key.to_string())
1254            }
1255
1256            fn none(&mut self, key: &str, _: &'static str) {
1257                self.0.push(key.to_string())
1258            }
1259        }
1260
1261        let mut keys = Visitor(vec![]);
1262        let mut ret = Self::default();
1263        ret.visit(&mut keys, "datafusion", "");
1264
1265        for key in keys.0 {
1266            if let Some(var) = settings.get(&key) {
1267                ret.set(&key, var)?;
1268            }
1269        }
1270
1271        Ok(ret)
1272    }
1273
1274    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1275    pub fn entries(&self) -> Vec<ConfigEntry> {
1276        struct Visitor(Vec<ConfigEntry>);
1277
1278        impl Visit for Visitor {
1279            fn some<V: Display>(
1280                &mut self,
1281                key: &str,
1282                value: V,
1283                description: &'static str,
1284            ) {
1285                self.0.push(ConfigEntry {
1286                    key: key.to_string(),
1287                    value: Some(value.to_string()),
1288                    description,
1289                })
1290            }
1291
1292            fn none(&mut self, key: &str, description: &'static str) {
1293                self.0.push(ConfigEntry {
1294                    key: key.to_string(),
1295                    value: None,
1296                    description,
1297                })
1298            }
1299        }
1300
1301        let mut v = Visitor(vec![]);
1302        self.visit(&mut v, "datafusion", "");
1303
1304        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1305        v.0
1306    }
1307
1308    /// Generate documentation that can be included in the user guide
1309    pub fn generate_config_markdown() -> String {
1310        use std::fmt::Write as _;
1311
1312        let mut s = Self::default();
1313
1314        // Normalize for display
1315        s.execution.target_partitions = 0;
1316        s.execution.planning_concurrency = 0;
1317
1318        let mut docs = "| key | default | description |\n".to_string();
1319        docs += "|-----|---------|-------------|\n";
1320        let mut entries = s.entries();
1321        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1322
1323        for entry in s.entries() {
1324            let _ = writeln!(
1325                &mut docs,
1326                "| {} | {} | {} |",
1327                entry.key,
1328                entry.value.as_deref().unwrap_or("NULL"),
1329                entry.description
1330            );
1331        }
1332        docs
1333    }
1334}
1335
1336/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1337/// within DataFusion [`ConfigOptions`]
1338///
1339/// This mechanism can be used to pass configuration to user defined functions
1340/// or optimizer passes
1341///
1342/// # Example
1343/// ```
1344/// use datafusion_common::{
1345///     config::ConfigExtension, config::ConfigOptions, extensions_options,
1346/// };
1347/// // Define a new configuration struct using the `extensions_options` macro
1348/// extensions_options! {
1349///    /// My own config options.
1350///    pub struct MyConfig {
1351///        /// Should "foo" be replaced by "bar"?
1352///        pub foo_to_bar: bool, default = true
1353///
1354///        /// How many "baz" should be created?
1355///        pub baz_count: usize, default = 1337
1356///    }
1357/// }
1358///
1359/// impl ConfigExtension for MyConfig {
1360///     const PREFIX: &'static str = "my_config";
1361/// }
1362///
1363/// // set up config struct and register extension
1364/// let mut config = ConfigOptions::default();
1365/// config.extensions.insert(MyConfig::default());
1366///
1367/// // overwrite config default
1368/// config.set("my_config.baz_count", "42").unwrap();
1369///
1370/// // check config state
1371/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1372/// assert!(my_config.foo_to_bar,);
1373/// assert_eq!(my_config.baz_count, 42,);
1374/// ```
1375///
1376/// # Note:
1377/// Unfortunately associated constants are not currently object-safe, and so this
1378/// extends the object-safe [`ExtensionOptions`]
1379pub trait ConfigExtension: ExtensionOptions {
1380    /// Configuration namespace prefix to use
1381    ///
1382    /// All values under this will be prefixed with `$PREFIX + "."`
1383    const PREFIX: &'static str;
1384}
1385
1386/// An object-safe API for storing arbitrary configuration.
1387///
1388/// See [`ConfigExtension`] for user defined configuration
1389pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1390    /// Return `self` as [`Any`]
1391    ///
1392    /// This is needed until trait upcasting is stabilized
1393    fn as_any(&self) -> &dyn Any;
1394
1395    /// Return `self` as [`Any`]
1396    ///
1397    /// This is needed until trait upcasting is stabilized
1398    fn as_any_mut(&mut self) -> &mut dyn Any;
1399
1400    /// Return a deep clone of this [`ExtensionOptions`]
1401    ///
1402    /// It is important this does not share mutable state to avoid consistency issues
1403    /// with configuration changing whilst queries are executing
1404    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1405
1406    /// Set the given `key`, `value` pair
1407    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1408
1409    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1410    fn entries(&self) -> Vec<ConfigEntry>;
1411}
1412
1413/// A type-safe container for [`ConfigExtension`]
1414#[derive(Debug, Default, Clone)]
1415pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1416
1417impl Extensions {
1418    /// Create a new, empty [`Extensions`]
1419    pub fn new() -> Self {
1420        Self(BTreeMap::new())
1421    }
1422
1423    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1424    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1425        assert_ne!(T::PREFIX, "datafusion");
1426        let e = ExtensionBox(Box::new(extension));
1427        self.0.insert(T::PREFIX, e);
1428    }
1429
1430    /// Retrieves the extension of the given type if any
1431    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1432        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1433    }
1434
1435    /// Retrieves the extension of the given type if any
1436    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1437        let e = self.0.get_mut(T::PREFIX)?;
1438        e.0.as_any_mut().downcast_mut()
1439    }
1440}
1441
1442#[derive(Debug)]
1443struct ExtensionBox(Box<dyn ExtensionOptions>);
1444
1445impl Clone for ExtensionBox {
1446    fn clone(&self) -> Self {
1447        Self(self.0.cloned())
1448    }
1449}
1450
1451/// A trait implemented by `config_namespace` and for field types that provides
1452/// the ability to walk and mutate the configuration tree
1453pub trait ConfigField {
1454    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1455
1456    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1457}
1458
1459impl<F: ConfigField + Default> ConfigField for Option<F> {
1460    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1461        match self {
1462            Some(s) => s.visit(v, key, description),
1463            None => v.none(key, description),
1464        }
1465    }
1466
1467    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1468        self.get_or_insert_with(Default::default).set(key, value)
1469    }
1470}
1471
1472/// Default transformation to parse a [`ConfigField`] for a string.
1473///
1474/// This uses [`FromStr`] to parse the data.
1475pub fn default_config_transform<T>(input: &str) -> Result<T>
1476where
1477    T: FromStr,
1478    <T as FromStr>::Err: Sync + Send + Error + 'static,
1479{
1480    input.parse().map_err(|e| {
1481        DataFusionError::Context(
1482            format!(
1483                "Error parsing '{}' as {}",
1484                input,
1485                std::any::type_name::<T>()
1486            ),
1487            Box::new(DataFusionError::External(Box::new(e))),
1488        )
1489    })
1490}
1491
1492/// Macro that generates [`ConfigField`] for a given type.
1493///
1494/// # Usage
1495/// This always requires [`Display`] to be implemented for the given type.
1496///
1497/// There are two ways to invoke this macro. The first one uses
1498/// [`default_config_transform`]/[`FromStr`] to parse the data:
1499///
1500/// ```ignore
1501/// config_field(MyType);
1502/// ```
1503///
1504/// Note that the parsing error MUST implement [`std::error::Error`]!
1505///
1506/// Or you can specify how you want to parse an [`str`] into the type:
1507///
1508/// ```ignore
1509/// fn parse_it(s: &str) -> Result<MyType> {
1510///     ...
1511/// }
1512///
1513/// config_field(
1514///     MyType,
1515///     value => parse_it(value)
1516/// );
1517/// ```
1518#[macro_export]
1519macro_rules! config_field {
1520    ($t:ty) => {
1521        config_field!($t, value => $crate::config::default_config_transform(value)?);
1522    };
1523
1524    ($t:ty, $arg:ident => $transform:expr) => {
1525        impl $crate::config::ConfigField for $t {
1526            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1527                v.some(key, self, description)
1528            }
1529
1530            fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1531                *self = $transform;
1532                Ok(())
1533            }
1534        }
1535    };
1536}
1537
1538config_field!(String);
1539config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1540config_field!(usize);
1541config_field!(f64);
1542config_field!(u64);
1543
1544impl ConfigField for u8 {
1545    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1546        v.some(key, self, description)
1547    }
1548
1549    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1550        if value.is_empty() {
1551            return Err(DataFusionError::Configuration(format!(
1552                "Input string for {key} key is empty"
1553            )));
1554        }
1555        // Check if the string is a valid number
1556        if let Ok(num) = value.parse::<u8>() {
1557            // TODO: Let's decide how we treat the numerical strings.
1558            *self = num;
1559        } else {
1560            let bytes = value.as_bytes();
1561            // Check if the first character is ASCII (single byte)
1562            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1563                return Err(DataFusionError::Configuration(format!(
1564                    "Error parsing {value} as u8. Non-ASCII string provided"
1565                )));
1566            }
1567            *self = bytes[0];
1568        }
1569        Ok(())
1570    }
1571}
1572
1573impl ConfigField for CompressionTypeVariant {
1574    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1575        v.some(key, self, description)
1576    }
1577
1578    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1579        *self = CompressionTypeVariant::from_str(value)?;
1580        Ok(())
1581    }
1582}
1583
1584/// An implementation trait used to recursively walk configuration
1585pub trait Visit {
1586    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1587
1588    fn none(&mut self, key: &str, description: &'static str);
1589}
1590
1591/// Convenience macro to create [`ExtensionsOptions`].
1592///
1593/// The created structure implements the following traits:
1594///
1595/// - [`Clone`]
1596/// - [`Debug`]
1597/// - [`Default`]
1598/// - [`ExtensionOptions`]
1599///
1600/// # Usage
1601/// The syntax is:
1602///
1603/// ```text
1604/// extensions_options! {
1605///      /// Struct docs (optional).
1606///     [<vis>] struct <StructName> {
1607///         /// Field docs (optional)
1608///         [<vis>] <field_name>: <field_type>, default = <default_value>
1609///
1610///         ... more fields
1611///     }
1612/// }
1613/// ```
1614///
1615/// The placeholders are:
1616/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1617/// - `<StructName>`: Struct name like `MyStruct`.
1618/// - `<field_name>`: Field name like `my_field`.
1619/// - `<field_type>`: Field type like `u8`.
1620/// - `<default_value>`: Default value matching the field type like `42`.
1621///
1622/// # Example
1623/// See also a full example on the [`ConfigExtension`] documentation
1624///
1625/// ```
1626/// use datafusion_common::extensions_options;
1627///
1628/// extensions_options! {
1629///     /// My own config options.
1630///     pub struct MyConfig {
1631///         /// Should "foo" be replaced by "bar"?
1632///         pub foo_to_bar: bool, default = true
1633///
1634///         /// How many "baz" should be created?
1635///         pub baz_count: usize, default = 1337
1636///     }
1637/// }
1638/// ```
1639///
1640///
1641/// [`Debug`]: std::fmt::Debug
1642/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1643#[macro_export]
1644macro_rules! extensions_options {
1645    (
1646     $(#[doc = $struct_d:tt])*
1647     $vis:vis struct $struct_name:ident {
1648        $(
1649        $(#[doc = $d:tt])*
1650        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1651        )*$(,)*
1652    }
1653    ) => {
1654        $(#[doc = $struct_d])*
1655        #[derive(Debug, Clone)]
1656        #[non_exhaustive]
1657        $vis struct $struct_name{
1658            $(
1659            $(#[doc = $d])*
1660            $field_vis $field_name : $field_type,
1661            )*
1662        }
1663
1664        impl Default for $struct_name {
1665            fn default() -> Self {
1666                Self {
1667                    $($field_name: $default),*
1668                }
1669            }
1670        }
1671
1672        impl $crate::config::ExtensionOptions for $struct_name {
1673            fn as_any(&self) -> &dyn ::std::any::Any {
1674                self
1675            }
1676
1677            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1678                self
1679            }
1680
1681            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1682                Box::new(self.clone())
1683            }
1684
1685            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1686                $crate::config::ConfigField::set(self, key, value)
1687            }
1688
1689            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1690                struct Visitor(Vec<$crate::config::ConfigEntry>);
1691
1692                impl $crate::config::Visit for Visitor {
1693                    fn some<V: std::fmt::Display>(
1694                        &mut self,
1695                        key: &str,
1696                        value: V,
1697                        description: &'static str,
1698                    ) {
1699                        self.0.push($crate::config::ConfigEntry {
1700                            key: key.to_string(),
1701                            value: Some(value.to_string()),
1702                            description,
1703                        })
1704                    }
1705
1706                    fn none(&mut self, key: &str, description: &'static str) {
1707                        self.0.push($crate::config::ConfigEntry {
1708                            key: key.to_string(),
1709                            value: None,
1710                            description,
1711                        })
1712                    }
1713                }
1714
1715                let mut v = Visitor(vec![]);
1716                // The prefix is not used for extensions.
1717                // The description is generated in ConfigField::visit.
1718                // We can just pass empty strings here.
1719                $crate::config::ConfigField::visit(self, &mut v, "", "");
1720                v.0
1721            }
1722        }
1723
1724        impl $crate::config::ConfigField for $struct_name {
1725            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1726                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1727                match key {
1728                    $(
1729                        stringify!($field_name) => {
1730                            // Safely apply deprecated attribute if present
1731                            // $(#[allow(deprecated)])?
1732                            {
1733                                #[allow(deprecated)]
1734                                self.$field_name.set(rem, value.as_ref())
1735                            }
1736                        },
1737                    )*
1738                    _ => return $crate::error::_config_err!(
1739                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1740                    )
1741                }
1742            }
1743
1744            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1745                $(
1746                    let key = stringify!($field_name).to_string();
1747                    let desc = concat!($($d),*).trim();
1748                    #[allow(deprecated)]
1749                    self.$field_name.visit(v, key.as_str(), desc);
1750                )*
1751            }
1752        }
1753    }
1754}
1755
1756/// These file types have special built in behavior for configuration.
1757/// Use TableOptions::Extensions for configuring other file types.
1758#[derive(Debug, Clone)]
1759pub enum ConfigFileType {
1760    CSV,
1761    #[cfg(feature = "parquet")]
1762    PARQUET,
1763    JSON,
1764}
1765
1766/// Represents the configuration options available for handling different table formats within a data processing application.
1767/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1768/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1769#[derive(Debug, Clone, Default)]
1770pub struct TableOptions {
1771    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1772    /// quote character, and whether the first row is considered as headers.
1773    pub csv: CsvOptions,
1774
1775    /// Configuration options for Parquet file handling. This includes settings for compression,
1776    /// encoding, and other Parquet-specific file characteristics.
1777    pub parquet: TableParquetOptions,
1778
1779    /// Configuration options for JSON file handling.
1780    pub json: JsonOptions,
1781
1782    /// The current file format that the table operations should assume. This option allows
1783    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1784    pub current_format: Option<ConfigFileType>,
1785
1786    /// Optional extensions that can be used to extend or customize the behavior of the table
1787    /// options. Extensions can be registered using `Extensions::insert` and might include
1788    /// custom file handling logic, additional configuration parameters, or other enhancements.
1789    pub extensions: Extensions,
1790}
1791
1792impl ConfigField for TableOptions {
1793    /// Visits configuration settings for the current file format, or all formats if none is selected.
1794    ///
1795    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1796    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1797    /// it visits all available format settings.
1798    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1799        if let Some(file_type) = &self.current_format {
1800            match file_type {
1801                #[cfg(feature = "parquet")]
1802                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1803                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1804                ConfigFileType::JSON => self.json.visit(v, "format", ""),
1805            }
1806        } else {
1807            self.csv.visit(v, "csv", "");
1808            self.parquet.visit(v, "parquet", "");
1809            self.json.visit(v, "json", "");
1810        }
1811    }
1812
1813    /// Sets a configuration value for a specific key within `TableOptions`.
1814    ///
1815    /// This method delegates setting configuration values to the specific file format configurations,
1816    /// based on the current format selected. If no format is selected, it returns an error.
1817    ///
1818    /// # Parameters
1819    ///
1820    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1821    ///   for CSV format.
1822    /// * `value`: The value to set for the specified configuration key.
1823    ///
1824    /// # Returns
1825    ///
1826    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1827    /// or if setting the configuration value fails for the specific format.
1828    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1829        // Extensions are handled in the public `ConfigOptions::set`
1830        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1831        match key {
1832            "format" => {
1833                let Some(format) = &self.current_format else {
1834                    return _config_err!("Specify a format for TableOptions");
1835                };
1836                match format {
1837                    #[cfg(feature = "parquet")]
1838                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
1839                    ConfigFileType::CSV => self.csv.set(rem, value),
1840                    ConfigFileType::JSON => self.json.set(rem, value),
1841                }
1842            }
1843            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1844        }
1845    }
1846}
1847
1848impl TableOptions {
1849    /// Constructs a new instance of `TableOptions` with default settings.
1850    ///
1851    /// # Returns
1852    ///
1853    /// A new `TableOptions` instance with default configuration values.
1854    pub fn new() -> Self {
1855        Self::default()
1856    }
1857
1858    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1859    ///
1860    /// # Parameters
1861    ///
1862    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1863    ///
1864    /// # Returns
1865    ///
1866    /// A new `TableOptions` instance with settings applied from the session config.
1867    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1868        let initial = TableOptions::default();
1869        initial.combine_with_session_config(config)
1870    }
1871
1872    /// Updates the current `TableOptions` with settings from a given session config.
1873    ///
1874    /// # Parameters
1875    ///
1876    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1877    ///
1878    /// # Returns
1879    ///
1880    /// A new `TableOptions` instance with updated settings from the session config.
1881    #[must_use = "this method returns a new instance"]
1882    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1883        let mut clone = self.clone();
1884        clone.parquet.global = config.execution.parquet.clone();
1885        clone
1886    }
1887
1888    /// Sets the file format for the table.
1889    ///
1890    /// # Parameters
1891    ///
1892    /// * `format`: The file format to use (e.g., CSV, Parquet).
1893    pub fn set_config_format(&mut self, format: ConfigFileType) {
1894        self.current_format = Some(format);
1895    }
1896
1897    /// Sets the extensions for this `TableOptions` instance.
1898    ///
1899    /// # Parameters
1900    ///
1901    /// * `extensions`: The `Extensions` instance to set.
1902    ///
1903    /// # Returns
1904    ///
1905    /// A new `TableOptions` instance with the specified extensions applied.
1906    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1907        self.extensions = extensions;
1908        self
1909    }
1910
1911    /// Sets a specific configuration option.
1912    ///
1913    /// # Parameters
1914    ///
1915    /// * `key`: The configuration key (e.g., "format.delimiter").
1916    /// * `value`: The value to set for the specified key.
1917    ///
1918    /// # Returns
1919    ///
1920    /// A result indicating success or failure in setting the configuration option.
1921    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1922        let Some((prefix, _)) = key.split_once('.') else {
1923            return _config_err!("could not find config namespace for key \"{key}\"");
1924        };
1925
1926        if prefix == "format" {
1927            return ConfigField::set(self, key, value);
1928        }
1929
1930        if prefix == "execution" {
1931            return Ok(());
1932        }
1933
1934        let Some(e) = self.extensions.0.get_mut(prefix) else {
1935            return _config_err!("Could not find config namespace \"{prefix}\"");
1936        };
1937        e.0.set(key, value)
1938    }
1939
1940    /// Initializes a new `TableOptions` from a hash map of string settings.
1941    ///
1942    /// # Parameters
1943    ///
1944    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1945    ///
1946    /// # Returns
1947    ///
1948    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1949    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1950        let mut ret = Self::default();
1951        for (k, v) in settings {
1952            ret.set(k, v)?;
1953        }
1954
1955        Ok(ret)
1956    }
1957
1958    /// Modifies the current `TableOptions` instance with settings from a hash map.
1959    ///
1960    /// # Parameters
1961    ///
1962    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1963    ///
1964    /// # Returns
1965    ///
1966    /// A result indicating success or failure in applying the settings.
1967    pub fn alter_with_string_hash_map(
1968        &mut self,
1969        settings: &HashMap<String, String>,
1970    ) -> Result<()> {
1971        for (k, v) in settings {
1972            self.set(k, v)?;
1973        }
1974        Ok(())
1975    }
1976
1977    /// Retrieves all configuration entries from this `TableOptions`.
1978    ///
1979    /// # Returns
1980    ///
1981    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1982    pub fn entries(&self) -> Vec<ConfigEntry> {
1983        struct Visitor(Vec<ConfigEntry>);
1984
1985        impl Visit for Visitor {
1986            fn some<V: Display>(
1987                &mut self,
1988                key: &str,
1989                value: V,
1990                description: &'static str,
1991            ) {
1992                self.0.push(ConfigEntry {
1993                    key: key.to_string(),
1994                    value: Some(value.to_string()),
1995                    description,
1996                })
1997            }
1998
1999            fn none(&mut self, key: &str, description: &'static str) {
2000                self.0.push(ConfigEntry {
2001                    key: key.to_string(),
2002                    value: None,
2003                    description,
2004                })
2005            }
2006        }
2007
2008        let mut v = Visitor(vec![]);
2009        self.visit(&mut v, "format", "");
2010
2011        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
2012        v.0
2013    }
2014}
2015
2016/// Options that control how Parquet files are read, including global options
2017/// that apply to all columns and optional column-specific overrides
2018///
2019/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
2020/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2021/// (e.g. sorting_columns).
2022#[derive(Clone, Default, Debug, PartialEq)]
2023pub struct TableParquetOptions {
2024    /// Global Parquet options that propagates to all columns.
2025    pub global: ParquetOptions,
2026    /// Column specific options. Default usage is parquet.XX::column.
2027    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2028    /// Additional file-level metadata to include. Inserted into the key_value_metadata
2029    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2030    ///
2031    /// Multiple entries are permitted
2032    /// ```sql
2033    /// OPTIONS (
2034    ///    'format.metadata::key1' '',
2035    ///    'format.metadata::key2' 'value',
2036    ///    'format.metadata::key3' 'value has spaces',
2037    ///    'format.metadata::key4' 'value has special chars :: :',
2038    ///    'format.metadata::key_dupe' 'original will be overwritten',
2039    ///    'format.metadata::key_dupe' 'final'
2040    /// )
2041    /// ```
2042    pub key_value_metadata: HashMap<String, Option<String>>,
2043    /// Options for configuring Parquet modular encryption
2044    ///
2045    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2046    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2047    /// These can be set via 'format.crypto', for example:
2048    /// ```sql
2049    /// OPTIONS (
2050    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
2051    ///    'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435',  -- b"0123456789012345" */
2052    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2053    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2054    ///     -- Same for decryption
2055    ///    'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2056    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2057    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2058    /// )
2059    /// ```
2060    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2061    /// Note that keys must be provided as in hex format since these are binary strings.
2062    pub crypto: ParquetEncryptionOptions,
2063}
2064
2065impl TableParquetOptions {
2066    /// Return new default TableParquetOptions
2067    pub fn new() -> Self {
2068        Self::default()
2069    }
2070
2071    /// Set whether the encoding of the arrow metadata should occur
2072    /// during the writing of parquet.
2073    ///
2074    /// Default is to encode the arrow schema in the file kv_metadata.
2075    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2076        Self {
2077            global: ParquetOptions {
2078                skip_arrow_metadata: skip,
2079                ..self.global
2080            },
2081            ..self
2082        }
2083    }
2084
2085    /// Retrieves all configuration entries from this `TableParquetOptions`.
2086    ///
2087    /// # Returns
2088    ///
2089    /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2090    pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2091        struct Visitor(Vec<ConfigEntry>);
2092
2093        impl Visit for Visitor {
2094            fn some<V: Display>(
2095                &mut self,
2096                key: &str,
2097                value: V,
2098                description: &'static str,
2099            ) {
2100                self.0.push(ConfigEntry {
2101                    key: key[1..].to_string(),
2102                    value: Some(value.to_string()),
2103                    description,
2104                })
2105            }
2106
2107            fn none(&mut self, key: &str, description: &'static str) {
2108                self.0.push(ConfigEntry {
2109                    key: key[1..].to_string(),
2110                    value: None,
2111                    description,
2112                })
2113            }
2114        }
2115
2116        let mut v = Visitor(vec![]);
2117        self.visit(&mut v, "", "");
2118
2119        v.0
2120    }
2121}
2122
2123impl ConfigField for TableParquetOptions {
2124    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2125        self.global.visit(v, key_prefix, description);
2126        self.column_specific_options
2127            .visit(v, key_prefix, description);
2128        self.crypto
2129            .visit(v, &format!("{key_prefix}.crypto"), description);
2130    }
2131
2132    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2133        // Determine if the key is a global, metadata, or column-specific setting
2134        if key.starts_with("metadata::") {
2135            let k = match key.split("::").collect::<Vec<_>>()[..] {
2136                [_meta] | [_meta, ""] => {
2137                    return _config_err!(
2138                        "Invalid metadata key provided, missing key in metadata::<key>"
2139                    )
2140                }
2141                [_meta, k] => k.into(),
2142                _ => {
2143                    return _config_err!(
2144                        "Invalid metadata key provided, found too many '::' in \"{key}\""
2145                    )
2146                }
2147            };
2148            self.key_value_metadata.insert(k, Some(value.into()));
2149            Ok(())
2150        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2151            self.crypto.set(crypto_feature, value)
2152        } else if key.contains("::") {
2153            self.column_specific_options.set(key, value)
2154        } else {
2155            self.global.set(key, value)
2156        }
2157    }
2158}
2159
2160macro_rules! config_namespace_with_hashmap {
2161    (
2162     $(#[doc = $struct_d:tt])*
2163     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
2164     $vis:vis struct $struct_name:ident {
2165        $(
2166        $(#[doc = $d:tt])*
2167        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2168        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2169        )*$(,)*
2170    }
2171    ) => {
2172
2173        $(#[doc = $struct_d])*
2174        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
2175        #[derive(Debug, Clone, PartialEq)]
2176        $vis struct $struct_name{
2177            $(
2178            $(#[doc = $d])*
2179            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2180            $field_vis $field_name : $field_type,
2181            )*
2182        }
2183
2184        impl ConfigField for $struct_name {
2185            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2186                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2187                match key {
2188                    $(
2189                       stringify!($field_name) => {
2190                           // Handle deprecated fields
2191                           #[allow(deprecated)] // Allow deprecated fields
2192                           $(let value = $transform(value);)?
2193                           self.$field_name.set(rem, value.as_ref())
2194                       },
2195                    )*
2196                    _ => _config_err!(
2197                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2198                    )
2199                }
2200            }
2201
2202            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2203                $(
2204                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2205                let desc = concat!($($d),*).trim();
2206                // Handle deprecated fields
2207                #[allow(deprecated)]
2208                self.$field_name.visit(v, key.as_str(), desc);
2209                )*
2210            }
2211        }
2212
2213        impl Default for $struct_name {
2214            fn default() -> Self {
2215                #[allow(deprecated)]
2216                Self {
2217                    $($field_name: $default),*
2218                }
2219            }
2220        }
2221
2222        impl ConfigField for HashMap<String,$struct_name> {
2223            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2224                let parts: Vec<&str> = key.splitn(2, "::").collect();
2225                match parts.as_slice() {
2226                    [inner_key, hashmap_key] => {
2227                        // Get or create the struct for the specified key
2228                        let inner_value = self
2229                            .entry((*hashmap_key).to_owned())
2230                            .or_insert_with($struct_name::default);
2231
2232                        inner_value.set(inner_key, value)
2233                    }
2234                    _ => _config_err!("Unrecognized key '{key}'."),
2235                }
2236            }
2237
2238            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2239                for (column_name, col_options) in self {
2240                    $(
2241                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2242                    let desc = concat!($($d),*).trim();
2243                    #[allow(deprecated)]
2244                    col_options.$field_name.visit(v, key.as_str(), desc);
2245                    )*
2246                }
2247            }
2248        }
2249    }
2250}
2251
2252config_namespace_with_hashmap! {
2253    /// Options controlling parquet format for individual columns.
2254    ///
2255    /// See [`ParquetOptions`] for more details
2256    pub struct ParquetColumnOptions {
2257        /// Sets if bloom filter is enabled for the column path.
2258        pub bloom_filter_enabled: Option<bool>, default = None
2259
2260        /// Sets encoding for the column path.
2261        /// Valid values are: plain, plain_dictionary, rle,
2262        /// bit_packed, delta_binary_packed, delta_length_byte_array,
2263        /// delta_byte_array, rle_dictionary, and byte_stream_split.
2264        /// These values are not case-sensitive. If NULL, uses
2265        /// default parquet options
2266        pub encoding: Option<String>, default = None
2267
2268        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2269        /// default parquet options
2270        pub dictionary_enabled: Option<bool>, default = None
2271
2272        /// Sets default parquet compression codec for the column path.
2273        /// Valid values are: uncompressed, snappy, gzip(level),
2274        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2275        /// These values are not case-sensitive. If NULL, uses
2276        /// default parquet options
2277        pub compression: Option<String>, transform = str::to_lowercase, default = None
2278
2279        /// Sets if statistics are enabled for the column
2280        /// Valid values are: "none", "chunk", and "page"
2281        /// These values are not case sensitive. If NULL, uses
2282        /// default parquet options
2283        pub statistics_enabled: Option<String>, default = None
2284
2285        /// Sets bloom filter false positive probability for the column path. If NULL, uses
2286        /// default parquet options
2287        pub bloom_filter_fpp: Option<f64>, default = None
2288
2289        /// Sets bloom filter number of distinct values. If NULL, uses
2290        /// default parquet options
2291        pub bloom_filter_ndv: Option<u64>, default = None
2292    }
2293}
2294
2295#[derive(Clone, Debug, PartialEq)]
2296pub struct ConfigFileEncryptionProperties {
2297    /// Should the parquet footer be encrypted
2298    /// default is true
2299    pub encrypt_footer: bool,
2300    /// Key to use for the parquet footer encoded in hex format
2301    pub footer_key_as_hex: String,
2302    /// Metadata information for footer key
2303    pub footer_key_metadata_as_hex: String,
2304    /// HashMap of column names --> (key in hex format, metadata)
2305    pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2306    /// AAD prefix string uniquely identifies the file and prevents file swapping
2307    pub aad_prefix_as_hex: String,
2308    /// If true, store the AAD prefix in the file
2309    /// default is false
2310    pub store_aad_prefix: bool,
2311}
2312
2313// Setup to match EncryptionPropertiesBuilder::new()
2314impl Default for ConfigFileEncryptionProperties {
2315    fn default() -> Self {
2316        ConfigFileEncryptionProperties {
2317            encrypt_footer: true,
2318            footer_key_as_hex: String::new(),
2319            footer_key_metadata_as_hex: String::new(),
2320            column_encryption_properties: Default::default(),
2321            aad_prefix_as_hex: String::new(),
2322            store_aad_prefix: false,
2323        }
2324    }
2325}
2326
2327config_namespace_with_hashmap! {
2328    pub struct ColumnEncryptionProperties {
2329        /// Per column encryption key
2330        pub column_key_as_hex: String, default = "".to_string()
2331        /// Per column encryption key metadata
2332        pub column_metadata_as_hex: Option<String>, default = None
2333    }
2334}
2335
2336impl ConfigField for ConfigFileEncryptionProperties {
2337    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2338        let key = format!("{key_prefix}.encrypt_footer");
2339        let desc = "Encrypt the footer";
2340        self.encrypt_footer.visit(v, key.as_str(), desc);
2341
2342        let key = format!("{key_prefix}.footer_key_as_hex");
2343        let desc = "Key to use for the parquet footer";
2344        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2345
2346        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2347        let desc = "Metadata to use for the parquet footer";
2348        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2349
2350        self.column_encryption_properties.visit(v, key_prefix, desc);
2351
2352        let key = format!("{key_prefix}.aad_prefix_as_hex");
2353        let desc = "AAD prefix to use";
2354        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2355
2356        let key = format!("{key_prefix}.store_aad_prefix");
2357        let desc = "If true, store the AAD prefix";
2358        self.store_aad_prefix.visit(v, key.as_str(), desc);
2359
2360        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2361    }
2362
2363    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2364        // Any hex encoded values must be pre-encoded using
2365        // hex::encode() before calling set.
2366
2367        if key.contains("::") {
2368            // Handle any column specific properties
2369            return self.column_encryption_properties.set(key, value);
2370        };
2371
2372        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2373        match key {
2374            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2375            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2376            "footer_key_metadata_as_hex" => {
2377                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2378            }
2379            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2380            "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2381            _ => _config_err!(
2382                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2383                key
2384            ),
2385        }
2386    }
2387}
2388
2389#[cfg(feature = "parquet_encryption")]
2390impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2391    fn from(val: ConfigFileEncryptionProperties) -> Self {
2392        let mut fep = FileEncryptionProperties::builder(
2393            hex::decode(val.footer_key_as_hex).unwrap(),
2394        )
2395        .with_plaintext_footer(!val.encrypt_footer)
2396        .with_aad_prefix_storage(val.store_aad_prefix);
2397
2398        if !val.footer_key_metadata_as_hex.is_empty() {
2399            fep = fep.with_footer_key_metadata(
2400                hex::decode(&val.footer_key_metadata_as_hex)
2401                    .expect("Invalid footer key metadata"),
2402            );
2403        }
2404
2405        for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2406            let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2407                .expect("Invalid column encryption key");
2408            let key_metadata = encryption_props
2409                .column_metadata_as_hex
2410                .as_ref()
2411                .map(|x| hex::decode(x).expect("Invalid column metadata"));
2412            match key_metadata {
2413                Some(key_metadata) => {
2414                    fep = fep.with_column_key_and_metadata(
2415                        column_name,
2416                        encryption_key,
2417                        key_metadata,
2418                    );
2419                }
2420                None => {
2421                    fep = fep.with_column_key(column_name, encryption_key);
2422                }
2423            }
2424        }
2425
2426        if !val.aad_prefix_as_hex.is_empty() {
2427            let aad_prefix: Vec<u8> =
2428                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2429            fep = fep.with_aad_prefix(aad_prefix);
2430        }
2431        Arc::unwrap_or_clone(fep.build().unwrap())
2432    }
2433}
2434
2435#[cfg(feature = "parquet_encryption")]
2436impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2437    fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2438        let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2439
2440        let mut column_encryption_properties: HashMap<
2441            String,
2442            ColumnEncryptionProperties,
2443        > = HashMap::new();
2444
2445        for (i, column_name) in column_names_vec.iter().enumerate() {
2446            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2447            let column_metadata_as_hex: Option<String> =
2448                column_metas_vec.get(i).map(hex::encode);
2449            column_encryption_properties.insert(
2450                column_name.clone(),
2451                ColumnEncryptionProperties {
2452                    column_key_as_hex,
2453                    column_metadata_as_hex,
2454                },
2455            );
2456        }
2457        let mut aad_prefix: Vec<u8> = Vec::new();
2458        if let Some(prefix) = f.aad_prefix() {
2459            aad_prefix = prefix.clone();
2460        }
2461        ConfigFileEncryptionProperties {
2462            encrypt_footer: f.encrypt_footer(),
2463            footer_key_as_hex: hex::encode(f.footer_key()),
2464            footer_key_metadata_as_hex: f
2465                .footer_key_metadata()
2466                .map(hex::encode)
2467                .unwrap_or_default(),
2468            column_encryption_properties,
2469            aad_prefix_as_hex: hex::encode(aad_prefix),
2470            store_aad_prefix: f.store_aad_prefix(),
2471        }
2472    }
2473}
2474
2475#[derive(Clone, Debug, PartialEq)]
2476pub struct ConfigFileDecryptionProperties {
2477    /// Binary string to use for the parquet footer encoded in hex format
2478    pub footer_key_as_hex: String,
2479    /// HashMap of column names --> key in hex format
2480    pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2481    /// AAD prefix string uniquely identifies the file and prevents file swapping
2482    pub aad_prefix_as_hex: String,
2483    /// If true, then verify signature for files with plaintext footers.
2484    /// default = true
2485    pub footer_signature_verification: bool,
2486}
2487
2488config_namespace_with_hashmap! {
2489    pub struct ColumnDecryptionProperties {
2490        /// Per column encryption key
2491        pub column_key_as_hex: String, default = "".to_string()
2492    }
2493}
2494
2495// Setup to match DecryptionPropertiesBuilder::new()
2496impl Default for ConfigFileDecryptionProperties {
2497    fn default() -> Self {
2498        ConfigFileDecryptionProperties {
2499            footer_key_as_hex: String::new(),
2500            column_decryption_properties: Default::default(),
2501            aad_prefix_as_hex: String::new(),
2502            footer_signature_verification: true,
2503        }
2504    }
2505}
2506
2507impl ConfigField for ConfigFileDecryptionProperties {
2508    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2509        let key = format!("{key_prefix}.footer_key_as_hex");
2510        let desc = "Key to use for the parquet footer";
2511        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2512
2513        let key = format!("{key_prefix}.aad_prefix_as_hex");
2514        let desc = "AAD prefix to use";
2515        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2516
2517        let key = format!("{key_prefix}.footer_signature_verification");
2518        let desc = "If true, verify the footer signature";
2519        self.footer_signature_verification
2520            .visit(v, key.as_str(), desc);
2521
2522        self.column_decryption_properties.visit(v, key_prefix, desc);
2523    }
2524
2525    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2526        // Any hex encoded values must be pre-encoded using
2527        // hex::encode() before calling set.
2528
2529        if key.contains("::") {
2530            // Handle any column specific properties
2531            return self.column_decryption_properties.set(key, value);
2532        };
2533
2534        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2535        match key {
2536            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2537            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2538            "footer_signature_verification" => {
2539                self.footer_signature_verification.set(rem, value.as_ref())
2540            }
2541            _ => _config_err!(
2542                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2543                key
2544            ),
2545        }
2546    }
2547}
2548
2549#[cfg(feature = "parquet_encryption")]
2550impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2551    fn from(val: ConfigFileDecryptionProperties) -> Self {
2552        let mut column_names: Vec<&str> = Vec::new();
2553        let mut column_keys: Vec<Vec<u8>> = Vec::new();
2554
2555        for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2556            column_names.push(col_name.as_str());
2557            column_keys.push(
2558                hex::decode(&decryption_properties.column_key_as_hex)
2559                    .expect("Invalid column decryption key"),
2560            );
2561        }
2562
2563        let mut fep = FileDecryptionProperties::builder(
2564            hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2565        )
2566        .with_column_keys(column_names, column_keys)
2567        .unwrap();
2568
2569        if !val.footer_signature_verification {
2570            fep = fep.disable_footer_signature_verification();
2571        }
2572
2573        if !val.aad_prefix_as_hex.is_empty() {
2574            let aad_prefix =
2575                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2576            fep = fep.with_aad_prefix(aad_prefix);
2577        }
2578
2579        Arc::unwrap_or_clone(fep.build().unwrap())
2580    }
2581}
2582
2583#[cfg(feature = "parquet_encryption")]
2584impl From<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
2585    fn from(f: &Arc<FileDecryptionProperties>) -> Self {
2586        let (column_names_vec, column_keys_vec) = f.column_keys();
2587        let mut column_decryption_properties: HashMap<
2588            String,
2589            ColumnDecryptionProperties,
2590        > = HashMap::new();
2591        for (i, column_name) in column_names_vec.iter().enumerate() {
2592            let props = ColumnDecryptionProperties {
2593                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2594            };
2595            column_decryption_properties.insert(column_name.clone(), props);
2596        }
2597
2598        let mut aad_prefix: Vec<u8> = Vec::new();
2599        if let Some(prefix) = f.aad_prefix() {
2600            aad_prefix = prefix.clone();
2601        }
2602        ConfigFileDecryptionProperties {
2603            footer_key_as_hex: hex::encode(
2604                f.footer_key(None).unwrap_or_default().as_ref(),
2605            ),
2606            column_decryption_properties,
2607            aad_prefix_as_hex: hex::encode(aad_prefix),
2608            footer_signature_verification: f.check_plaintext_footer_integrity(),
2609        }
2610    }
2611}
2612
2613/// Holds implementation-specific options for an encryption factory
2614#[derive(Clone, Debug, Default, PartialEq)]
2615pub struct EncryptionFactoryOptions {
2616    pub options: HashMap<String, String>,
2617}
2618
2619impl ConfigField for EncryptionFactoryOptions {
2620    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2621        for (option_key, option_value) in &self.options {
2622            v.some(
2623                &format!("{key}.{option_key}"),
2624                option_value,
2625                "Encryption factory specific option",
2626            );
2627        }
2628    }
2629
2630    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2631        self.options.insert(key.to_owned(), value.to_owned());
2632        Ok(())
2633    }
2634}
2635
2636impl EncryptionFactoryOptions {
2637    /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2638    pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2639        let mut options = T::default();
2640        for (key, value) in &self.options {
2641            options.set(key, value)?;
2642        }
2643        Ok(options)
2644    }
2645}
2646
2647config_namespace! {
2648    /// Options controlling CSV format
2649    pub struct CsvOptions {
2650        /// Specifies whether there is a CSV header (i.e. the first line
2651        /// consists of is column names). The value `None` indicates that
2652        /// the configuration should be consulted.
2653        pub has_header: Option<bool>, default = None
2654        pub delimiter: u8, default = b','
2655        pub quote: u8, default = b'"'
2656        pub terminator: Option<u8>, default = None
2657        pub escape: Option<u8>, default = None
2658        pub double_quote: Option<bool>, default = None
2659        /// Specifies whether newlines in (quoted) values are supported.
2660        ///
2661        /// Parsing newlines in quoted values may be affected by execution behaviour such as
2662        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2663        /// parsed successfully, which may reduce performance.
2664        ///
2665        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2666        pub newlines_in_values: Option<bool>, default = None
2667        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2668        pub schema_infer_max_rec: Option<usize>, default = None
2669        pub date_format: Option<String>, default = None
2670        pub datetime_format: Option<String>, default = None
2671        pub timestamp_format: Option<String>, default = None
2672        pub timestamp_tz_format: Option<String>, default = None
2673        pub time_format: Option<String>, default = None
2674        // The output format for Nulls in the CSV writer.
2675        pub null_value: Option<String>, default = None
2676        // The input regex for Nulls when loading CSVs.
2677        pub null_regex: Option<String>, default = None
2678        pub comment: Option<u8>, default = None
2679        /// Whether to allow truncated rows when parsing, both within a single file and across files.
2680        ///
2681        /// When set to false (default), reading a single CSV file which has rows of different lengths will
2682        /// error; if reading multiple CSV files with different number of columns, it will also fail.
2683        ///
2684        /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
2685        /// rows with null values for the missing columns; if reading multiple CSV files with different number
2686        /// of columns, it creates a union schema containing all columns found across the files, and will
2687        /// pad any files missing columns with null values for their rows.
2688        pub truncated_rows: Option<bool>, default = None
2689    }
2690}
2691
2692impl CsvOptions {
2693    /// Set a limit in terms of records to scan to infer the schema
2694    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2695    pub fn with_compression(
2696        mut self,
2697        compression_type_variant: CompressionTypeVariant,
2698    ) -> Self {
2699        self.compression = compression_type_variant;
2700        self
2701    }
2702
2703    /// Set a limit in terms of records to scan to infer the schema
2704    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2705    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2706        self.schema_infer_max_rec = Some(max_rec);
2707        self
2708    }
2709
2710    /// Set true to indicate that the first line is a header.
2711    /// - default to true
2712    pub fn with_has_header(mut self, has_header: bool) -> Self {
2713        self.has_header = Some(has_header);
2714        self
2715    }
2716
2717    /// Returns true if the first line is a header. If format options does not
2718    /// specify whether there is a header, returns `None` (indicating that the
2719    /// configuration should be consulted).
2720    pub fn has_header(&self) -> Option<bool> {
2721        self.has_header
2722    }
2723
2724    /// The character separating values within a row.
2725    /// - default to ','
2726    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2727        self.delimiter = delimiter;
2728        self
2729    }
2730
2731    /// The quote character in a row.
2732    /// - default to '"'
2733    pub fn with_quote(mut self, quote: u8) -> Self {
2734        self.quote = quote;
2735        self
2736    }
2737
2738    /// The character that terminates a row.
2739    /// - default to None (CRLF)
2740    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2741        self.terminator = terminator;
2742        self
2743    }
2744
2745    /// The escape character in a row.
2746    /// - default is None
2747    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2748        self.escape = escape;
2749        self
2750    }
2751
2752    /// Set true to indicate that the CSV quotes should be doubled.
2753    /// - default to true
2754    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2755        self.double_quote = Some(double_quote);
2756        self
2757    }
2758
2759    /// Specifies whether newlines in (quoted) values are supported.
2760    ///
2761    /// Parsing newlines in quoted values may be affected by execution behaviour such as
2762    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2763    /// parsed successfully, which may reduce performance.
2764    ///
2765    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2766    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2767        self.newlines_in_values = Some(newlines_in_values);
2768        self
2769    }
2770
2771    /// Set a `CompressionTypeVariant` of CSV
2772    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2773    pub fn with_file_compression_type(
2774        mut self,
2775        compression: CompressionTypeVariant,
2776    ) -> Self {
2777        self.compression = compression;
2778        self
2779    }
2780
2781    /// Whether to allow truncated rows when parsing.
2782    /// By default this is set to false and will error if the CSV rows have different lengths.
2783    /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
2784    /// If the record’s schema is not nullable, then it will still return an error.
2785    pub fn with_truncated_rows(mut self, allow: bool) -> Self {
2786        self.truncated_rows = Some(allow);
2787        self
2788    }
2789
2790    /// The delimiter character.
2791    pub fn delimiter(&self) -> u8 {
2792        self.delimiter
2793    }
2794
2795    /// The quote character.
2796    pub fn quote(&self) -> u8 {
2797        self.quote
2798    }
2799
2800    /// The terminator character.
2801    pub fn terminator(&self) -> Option<u8> {
2802        self.terminator
2803    }
2804
2805    /// The escape character.
2806    pub fn escape(&self) -> Option<u8> {
2807        self.escape
2808    }
2809}
2810
2811config_namespace! {
2812    /// Options controlling JSON format
2813    pub struct JsonOptions {
2814        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2815        pub schema_infer_max_rec: Option<usize>, default = None
2816    }
2817}
2818
2819pub trait OutputFormatExt: Display {}
2820
2821#[derive(Debug, Clone, PartialEq)]
2822#[allow(clippy::large_enum_variant)]
2823pub enum OutputFormat {
2824    CSV(CsvOptions),
2825    JSON(JsonOptions),
2826    #[cfg(feature = "parquet")]
2827    PARQUET(TableParquetOptions),
2828    AVRO,
2829    ARROW,
2830}
2831
2832impl Display for OutputFormat {
2833    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2834        let out = match self {
2835            OutputFormat::CSV(_) => "csv",
2836            OutputFormat::JSON(_) => "json",
2837            #[cfg(feature = "parquet")]
2838            OutputFormat::PARQUET(_) => "parquet",
2839            OutputFormat::AVRO => "avro",
2840            OutputFormat::ARROW => "arrow",
2841        };
2842        write!(f, "{out}")
2843    }
2844}
2845
2846#[cfg(test)]
2847mod tests {
2848    #[cfg(feature = "parquet")]
2849    use crate::config::TableParquetOptions;
2850    use crate::config::{
2851        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2852        Extensions, TableOptions,
2853    };
2854    use std::any::Any;
2855    use std::collections::HashMap;
2856    use std::sync::Arc;
2857
2858    #[derive(Default, Debug, Clone)]
2859    pub struct TestExtensionConfig {
2860        /// Should "foo" be replaced by "bar"?
2861        pub properties: HashMap<String, String>,
2862    }
2863
2864    impl ExtensionOptions for TestExtensionConfig {
2865        fn as_any(&self) -> &dyn Any {
2866            self
2867        }
2868
2869        fn as_any_mut(&mut self) -> &mut dyn Any {
2870            self
2871        }
2872
2873        fn cloned(&self) -> Box<dyn ExtensionOptions> {
2874            Box::new(self.clone())
2875        }
2876
2877        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2878            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2879            assert_eq!(key, "test");
2880            self.properties.insert(rem.to_owned(), value.to_owned());
2881            Ok(())
2882        }
2883
2884        fn entries(&self) -> Vec<ConfigEntry> {
2885            self.properties
2886                .iter()
2887                .map(|(k, v)| ConfigEntry {
2888                    key: k.into(),
2889                    value: Some(v.into()),
2890                    description: "",
2891                })
2892                .collect()
2893        }
2894    }
2895
2896    impl ConfigExtension for TestExtensionConfig {
2897        const PREFIX: &'static str = "test";
2898    }
2899
2900    #[test]
2901    fn create_table_config() {
2902        let mut extension = Extensions::new();
2903        extension.insert(TestExtensionConfig::default());
2904        let table_config = TableOptions::new().with_extensions(extension);
2905        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2906        assert!(kafka_config.is_some())
2907    }
2908
2909    #[test]
2910    fn alter_test_extension_config() {
2911        let mut extension = Extensions::new();
2912        extension.insert(TestExtensionConfig::default());
2913        let mut table_config = TableOptions::new().with_extensions(extension);
2914        table_config.set_config_format(ConfigFileType::CSV);
2915        table_config.set("format.delimiter", ";").unwrap();
2916        assert_eq!(table_config.csv.delimiter, b';');
2917        table_config.set("test.bootstrap.servers", "asd").unwrap();
2918        let kafka_config = table_config
2919            .extensions
2920            .get::<TestExtensionConfig>()
2921            .unwrap();
2922        assert_eq!(
2923            kafka_config.properties.get("bootstrap.servers").unwrap(),
2924            "asd"
2925        );
2926    }
2927
2928    #[test]
2929    fn csv_u8_table_options() {
2930        let mut table_config = TableOptions::new();
2931        table_config.set_config_format(ConfigFileType::CSV);
2932        table_config.set("format.delimiter", ";").unwrap();
2933        assert_eq!(table_config.csv.delimiter as char, ';');
2934        table_config.set("format.escape", "\"").unwrap();
2935        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2936        table_config.set("format.escape", "\'").unwrap();
2937        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2938    }
2939
2940    #[test]
2941    fn warning_only_not_default() {
2942        use std::sync::atomic::AtomicUsize;
2943        static COUNT: AtomicUsize = AtomicUsize::new(0);
2944        use log::{Level, LevelFilter, Metadata, Record};
2945        struct SimpleLogger;
2946        impl log::Log for SimpleLogger {
2947            fn enabled(&self, metadata: &Metadata) -> bool {
2948                metadata.level() <= Level::Info
2949            }
2950
2951            fn log(&self, record: &Record) {
2952                if self.enabled(record.metadata()) {
2953                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2954                }
2955            }
2956            fn flush(&self) {}
2957        }
2958        log::set_logger(&SimpleLogger).unwrap();
2959        log::set_max_level(LevelFilter::Info);
2960        let mut sql_parser_options = crate::config::SqlParserOptions::default();
2961        sql_parser_options
2962            .set("enable_options_value_normalization", "false")
2963            .unwrap();
2964        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2965        sql_parser_options
2966            .set("enable_options_value_normalization", "true")
2967            .unwrap();
2968        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2969    }
2970
2971    #[cfg(feature = "parquet")]
2972    #[test]
2973    fn parquet_table_options() {
2974        let mut table_config = TableOptions::new();
2975        table_config.set_config_format(ConfigFileType::PARQUET);
2976        table_config
2977            .set("format.bloom_filter_enabled::col1", "true")
2978            .unwrap();
2979        assert_eq!(
2980            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2981            Some(true)
2982        );
2983    }
2984
2985    #[cfg(feature = "parquet_encryption")]
2986    #[test]
2987    fn parquet_table_encryption() {
2988        use crate::config::{
2989            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
2990        };
2991        use parquet::encryption::decrypt::FileDecryptionProperties;
2992        use parquet::encryption::encrypt::FileEncryptionProperties;
2993
2994        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
2995        let column_names = vec!["double_field", "float_field"];
2996        let column_keys =
2997            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
2998
2999        let file_encryption_properties =
3000            FileEncryptionProperties::builder(footer_key.clone())
3001                .with_column_keys(column_names.clone(), column_keys.clone())
3002                .unwrap()
3003                .build()
3004                .unwrap();
3005
3006        let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
3007            .with_column_keys(column_names.clone(), column_keys.clone())
3008            .unwrap()
3009            .build()
3010            .unwrap();
3011
3012        // Test round-trip
3013        let config_encrypt =
3014            ConfigFileEncryptionProperties::from(&file_encryption_properties);
3015        let encryption_properties_built =
3016            Arc::new(FileEncryptionProperties::from(config_encrypt.clone()));
3017        assert_eq!(file_encryption_properties, encryption_properties_built);
3018
3019        let config_decrypt = ConfigFileDecryptionProperties::from(&decryption_properties);
3020        let decryption_properties_built =
3021            Arc::new(FileDecryptionProperties::from(config_decrypt.clone()));
3022        assert_eq!(decryption_properties, decryption_properties_built);
3023
3024        ///////////////////////////////////////////////////////////////////////////////////
3025        // Test encryption config
3026
3027        // Display original encryption config
3028        // println!("{:#?}", config_encrypt);
3029
3030        let mut table_config = TableOptions::new();
3031        table_config.set_config_format(ConfigFileType::PARQUET);
3032        table_config
3033            .parquet
3034            .set(
3035                "crypto.file_encryption.encrypt_footer",
3036                config_encrypt.encrypt_footer.to_string().as_str(),
3037            )
3038            .unwrap();
3039        table_config
3040            .parquet
3041            .set(
3042                "crypto.file_encryption.footer_key_as_hex",
3043                config_encrypt.footer_key_as_hex.as_str(),
3044            )
3045            .unwrap();
3046
3047        for (i, col_name) in column_names.iter().enumerate() {
3048            let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3049            let value = hex::encode(column_keys[i].clone());
3050            table_config
3051                .parquet
3052                .set(key.as_str(), value.as_str())
3053                .unwrap();
3054        }
3055
3056        // Print matching final encryption config
3057        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3058
3059        assert_eq!(
3060            table_config.parquet.crypto.file_encryption,
3061            Some(config_encrypt)
3062        );
3063
3064        ///////////////////////////////////////////////////////////////////////////////////
3065        // Test decryption config
3066
3067        // Display original decryption config
3068        // println!("{:#?}", config_decrypt);
3069
3070        let mut table_config = TableOptions::new();
3071        table_config.set_config_format(ConfigFileType::PARQUET);
3072        table_config
3073            .parquet
3074            .set(
3075                "crypto.file_decryption.footer_key_as_hex",
3076                config_decrypt.footer_key_as_hex.as_str(),
3077            )
3078            .unwrap();
3079
3080        for (i, col_name) in column_names.iter().enumerate() {
3081            let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3082            let value = hex::encode(column_keys[i].clone());
3083            table_config
3084                .parquet
3085                .set(key.as_str(), value.as_str())
3086                .unwrap();
3087        }
3088
3089        // Print matching final decryption config
3090        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3091
3092        assert_eq!(
3093            table_config.parquet.crypto.file_decryption,
3094            Some(config_decrypt.clone())
3095        );
3096
3097        // Set config directly
3098        let mut table_config = TableOptions::new();
3099        table_config.set_config_format(ConfigFileType::PARQUET);
3100        table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3101        assert_eq!(
3102            table_config.parquet.crypto.file_decryption,
3103            Some(config_decrypt.clone())
3104        );
3105    }
3106
3107    #[cfg(feature = "parquet_encryption")]
3108    #[test]
3109    fn parquet_encryption_factory_config() {
3110        let mut parquet_options = TableParquetOptions::default();
3111
3112        assert_eq!(parquet_options.crypto.factory_id, None);
3113        assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3114
3115        let mut input_config = TestExtensionConfig::default();
3116        input_config
3117            .properties
3118            .insert("key1".to_string(), "value 1".to_string());
3119        input_config
3120            .properties
3121            .insert("key2".to_string(), "value 2".to_string());
3122
3123        parquet_options
3124            .crypto
3125            .configure_factory("example_factory", &input_config);
3126
3127        assert_eq!(
3128            parquet_options.crypto.factory_id,
3129            Some("example_factory".to_string())
3130        );
3131        let factory_options = &parquet_options.crypto.factory_options.options;
3132        assert_eq!(factory_options.len(), 2);
3133        assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3134        assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3135    }
3136
3137    #[cfg(feature = "parquet")]
3138    #[test]
3139    fn parquet_table_options_config_entry() {
3140        let mut table_config = TableOptions::new();
3141        table_config.set_config_format(ConfigFileType::PARQUET);
3142        table_config
3143            .set("format.bloom_filter_enabled::col1", "true")
3144            .unwrap();
3145        let entries = table_config.entries();
3146        assert!(entries
3147            .iter()
3148            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
3149    }
3150
3151    #[cfg(feature = "parquet")]
3152    #[test]
3153    fn parquet_table_parquet_options_config_entry() {
3154        let mut table_parquet_options = TableParquetOptions::new();
3155        table_parquet_options
3156            .set(
3157                "crypto.file_encryption.column_key_as_hex::double_field",
3158                "31323334353637383930313233343530",
3159            )
3160            .unwrap();
3161        let entries = table_parquet_options.entries();
3162        assert!(entries
3163            .iter()
3164            .any(|item| item.key
3165                == "crypto.file_encryption.column_key_as_hex::double_field"))
3166    }
3167
3168    #[cfg(feature = "parquet")]
3169    #[test]
3170    fn parquet_table_options_config_metadata_entry() {
3171        let mut table_config = TableOptions::new();
3172        table_config.set_config_format(ConfigFileType::PARQUET);
3173        table_config.set("format.metadata::key1", "").unwrap();
3174        table_config.set("format.metadata::key2", "value2").unwrap();
3175        table_config
3176            .set("format.metadata::key3", "value with spaces ")
3177            .unwrap();
3178        table_config
3179            .set("format.metadata::key4", "value with special chars :: :")
3180            .unwrap();
3181
3182        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3183        assert_eq!(parsed_metadata.get("should not exist1"), None);
3184        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3185        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3186        assert_eq!(
3187            parsed_metadata.get("key3"),
3188            Some(&Some("value with spaces ".into()))
3189        );
3190        assert_eq!(
3191            parsed_metadata.get("key4"),
3192            Some(&Some("value with special chars :: :".into()))
3193        );
3194
3195        // duplicate keys are overwritten
3196        table_config.set("format.metadata::key_dupe", "A").unwrap();
3197        table_config.set("format.metadata::key_dupe", "B").unwrap();
3198        let parsed_metadata = table_config.parquet.key_value_metadata;
3199        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3200    }
3201}