datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::parsers::CompressionTypeVariant;
26use crate::utils::get_available_parallelism;
27use crate::{DataFusionError, Result};
28use std::any::Any;
29use std::collections::{BTreeMap, HashMap};
30use std::error::Error;
31use std::fmt::{self, Display};
32use std::str::FromStr;
33
34#[cfg(feature = "parquet_encryption")]
35use hex;
36
37/// A macro that wraps a configuration struct and automatically derives
38/// [`Default`] and [`ConfigField`] for it, allowing it to be used
39/// in the [`ConfigOptions`] configuration tree.
40///
41/// `transform` is used to normalize values before parsing.
42///
43/// For example,
44///
45/// ```ignore
46/// config_namespace! {
47///    /// Amazing config
48///    pub struct MyConfig {
49///        /// Field 1 doc
50///        field1: String, transform = str::to_lowercase, default = "".to_string()
51///
52///        /// Field 2 doc
53///        field2: usize, default = 232
54///
55///        /// Field 3 doc
56///        field3: Option<usize>, default = None
57///    }
58///}
59/// ```
60///
61/// Will generate
62///
63/// ```ignore
64/// /// Amazing config
65/// #[derive(Debug, Clone)]
66/// #[non_exhaustive]
67/// pub struct MyConfig {
68///     /// Field 1 doc
69///     field1: String,
70///     /// Field 2 doc
71///     field2: usize,
72///     /// Field 3 doc
73///     field3: Option<usize>,
74/// }
75/// impl ConfigField for MyConfig {
76///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
77///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
78///         match key {
79///             "field1" => {
80///                 let value = str::to_lowercase(value);
81///                 self.field1.set(rem, value.as_ref())
82///             },
83///             "field2" => self.field2.set(rem, value.as_ref()),
84///             "field3" => self.field3.set(rem, value.as_ref()),
85///             _ => _internal_err!(
86///                 "Config value \"{}\" not found on MyConfig",
87///                 key
88///             ),
89///         }
90///     }
91///
92///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
93///         let key = format!("{}.field1", key_prefix);
94///         let desc = "Field 1 doc";
95///         self.field1.visit(v, key.as_str(), desc);
96///         let key = format!("{}.field2", key_prefix);
97///         let desc = "Field 2 doc";
98///         self.field2.visit(v, key.as_str(), desc);
99///         let key = format!("{}.field3", key_prefix);
100///         let desc = "Field 3 doc";
101///         self.field3.visit(v, key.as_str(), desc);
102///     }
103/// }
104///
105/// impl Default for MyConfig {
106///     fn default() -> Self {
107///         Self {
108///             field1: "".to_string(),
109///             field2: 232,
110///             field3: None,
111///         }
112///     }
113/// }
114/// ```
115///
116/// NB: Misplaced commas may result in nonsensical errors
117#[macro_export]
118macro_rules! config_namespace {
119    (
120        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
121        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
122        $(#[allow($($struct_de:tt)*)])?
123        $vis:vis struct $struct_name:ident {
124            $(
125                $(#[doc = $d:tt])* // Field-level documentation attributes
126                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
127                $(#[allow($($field_de:tt)*)])?
128                $field_vis:vis $field_name:ident : $field_type:ty,
129                $(warn = $warn:expr,)?
130                $(transform = $transform:expr,)?
131                default = $default:expr
132            )*$(,)*
133        }
134    ) => {
135        $(#[doc = $struct_d])* // Apply struct documentation
136        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
137        $(#[allow($($struct_de)*)])?
138        #[derive(Debug, Clone, PartialEq)]
139        $vis struct $struct_name {
140            $(
141                $(#[doc = $d])* // Apply field documentation
142                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
143                $(#[allow($($field_de)*)])?
144                $field_vis $field_name: $field_type,
145            )*
146        }
147
148        impl $crate::config::ConfigField for $struct_name {
149            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
150                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
151                match key {
152                    $(
153                        stringify!($field_name) => {
154                            // Safely apply deprecated attribute if present
155                            // $(#[allow(deprecated)])?
156                            {
157                                $(let value = $transform(value);)? // Apply transformation if specified
158                                #[allow(deprecated)]
159                                let ret = self.$field_name.set(rem, value.as_ref());
160
161                                $(if !$warn.is_empty() {
162                                    let default: $field_type = $default;
163                                    #[allow(deprecated)]
164                                    if default != self.$field_name {
165                                        log::warn!($warn);
166                                    }
167                                })? // Log warning if specified, and the value is not the default
168                                ret
169                            }
170                        },
171                    )*
172                    _ => return $crate::error::_config_err!(
173                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
174                    )
175                }
176            }
177
178            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
179                $(
180                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
181                    let desc = concat!($($d),*).trim();
182                    #[allow(deprecated)]
183                    self.$field_name.visit(v, key.as_str(), desc);
184                )*
185            }
186        }
187        impl Default for $struct_name {
188            fn default() -> Self {
189                #[allow(deprecated)]
190                Self {
191                    $($field_name: $default),*
192                }
193            }
194        }
195    }
196}
197
198config_namespace! {
199    /// Options related to catalog and directory scanning
200    ///
201    /// See also: [`SessionConfig`]
202    ///
203    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
204    pub struct CatalogOptions {
205        /// Whether the default catalog and schema should be created automatically.
206        pub create_default_catalog_and_schema: bool, default = true
207
208        /// The default catalog name - this impacts what SQL queries use if not specified
209        pub default_catalog: String, default = "datafusion".to_string()
210
211        /// The default schema name - this impacts what SQL queries use if not specified
212        pub default_schema: String, default = "public".to_string()
213
214        /// Should DataFusion provide access to `information_schema`
215        /// virtual tables for displaying schema information
216        pub information_schema: bool, default = false
217
218        /// Location scanned to load tables for `default` schema
219        pub location: Option<String>, default = None
220
221        /// Type of `TableProvider` to use when loading `default` schema
222        pub format: Option<String>, default = None
223
224        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
225        /// if not specified explicitly in the statement.
226        pub has_header: bool, default = true
227
228        /// Specifies whether newlines in (quoted) CSV values are supported.
229        ///
230        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
231        /// if not specified explicitly in the statement.
232        ///
233        /// Parsing newlines in quoted values may be affected by execution behaviour such as
234        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
235        /// parsed successfully, which may reduce performance.
236        pub newlines_in_values: bool, default = false
237    }
238}
239
240config_namespace! {
241    /// Options related to SQL parser
242    ///
243    /// See also: [`SessionConfig`]
244    ///
245    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
246    pub struct SqlParserOptions {
247        /// When set to true, SQL parser will parse float as decimal type
248        pub parse_float_as_decimal: bool, default = false
249
250        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
251        pub enable_ident_normalization: bool, default = true
252
253        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
254        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
255        /// are normalized automatically.
256        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
257
258        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
259        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
260        pub dialect: String, default = "generic".to_string()
261        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
262
263        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
264        /// ignore the length. If false, error if a `VARCHAR` with a length is
265        /// specified. The Arrow type system does not have a notion of maximum
266        /// string length and thus DataFusion can not enforce such limits.
267        pub support_varchar_with_length: bool, default = true
268
269        /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
270        /// If false, they are mapped to `Utf8`.
271        /// Default is true.
272        pub map_string_types_to_utf8view: bool, default = true
273
274        /// When set to true, the source locations relative to the original SQL
275        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
276        /// and recorded in the logical plan nodes.
277        pub collect_spans: bool, default = false
278
279        /// Specifies the recursion depth limit when parsing complex SQL Queries
280        pub recursion_limit: usize, default = 50
281
282        /// Specifies the default null ordering for query results. There are 4 options:
283        /// - `nulls_max`: Nulls appear last in ascending order.
284        /// - `nulls_min`: Nulls appear first in ascending order.
285        /// - `nulls_first`: Nulls always be first in any order.
286        /// - `nulls_last`: Nulls always be last in any order.
287        ///
288        /// By default, `nulls_max` is used to follow Postgres's behavior.
289        /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
290        pub default_null_ordering: String, default = "nulls_max".to_string()
291    }
292}
293
294#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
295pub enum SpillCompression {
296    Zstd,
297    Lz4Frame,
298    #[default]
299    Uncompressed,
300}
301
302impl FromStr for SpillCompression {
303    type Err = DataFusionError;
304
305    fn from_str(s: &str) -> Result<Self, Self::Err> {
306        match s.to_ascii_lowercase().as_str() {
307            "zstd" => Ok(Self::Zstd),
308            "lz4_frame" => Ok(Self::Lz4Frame),
309            "uncompressed" | "" => Ok(Self::Uncompressed),
310            other => Err(DataFusionError::Configuration(format!(
311                "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
312            ))),
313        }
314    }
315}
316
317impl ConfigField for SpillCompression {
318    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
319        v.some(key, self, description)
320    }
321
322    fn set(&mut self, _: &str, value: &str) -> Result<()> {
323        *self = SpillCompression::from_str(value)?;
324        Ok(())
325    }
326}
327
328impl Display for SpillCompression {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        let str = match self {
331            Self::Zstd => "zstd",
332            Self::Lz4Frame => "lz4_frame",
333            Self::Uncompressed => "uncompressed",
334        };
335        write!(f, "{str}")
336    }
337}
338
339impl From<SpillCompression> for Option<CompressionType> {
340    fn from(c: SpillCompression) -> Self {
341        match c {
342            SpillCompression::Zstd => Some(CompressionType::ZSTD),
343            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
344            SpillCompression::Uncompressed => None,
345        }
346    }
347}
348
349config_namespace! {
350    /// Options related to query execution
351    ///
352    /// See also: [`SessionConfig`]
353    ///
354    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
355    pub struct ExecutionOptions {
356        /// Default batch size while creating new batches, it's especially useful for
357        /// buffer-in-memory batches since creating tiny batches would result in too much
358        /// metadata memory consumption
359        pub batch_size: usize, default = 8192
360
361        /// When set to true, record batches will be examined between each operator and
362        /// small batches will be coalesced into larger batches. This is helpful when there
363        /// are highly selective filters or joins that could produce tiny output batches. The
364        /// target batch size is determined by the configuration setting
365        pub coalesce_batches: bool, default = true
366
367        /// Should DataFusion collect statistics when first creating a table.
368        /// Has no effect after the table is created. Applies to the default
369        /// `ListingTableProvider` in DataFusion. Defaults to true.
370        pub collect_statistics: bool, default = true
371
372        /// Number of partitions for query execution. Increasing partitions can increase
373        /// concurrency.
374        ///
375        /// Defaults to the number of CPU cores on the system
376        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
377
378        /// The default time zone
379        ///
380        /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
381        /// according to this time zone, and then extract the hour
382        pub time_zone: String, default = "+00:00".into()
383
384        /// Parquet options
385        pub parquet: ParquetOptions, default = Default::default()
386
387        /// Fan-out during initial physical planning.
388        ///
389        /// This is mostly use to plan `UNION` children in parallel.
390        ///
391        /// Defaults to the number of CPU cores on the system
392        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
393
394        /// When set to true, skips verifying that the schema produced by
395        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
396        /// schema of the input plan.
397        ///
398        /// When set to false, if the schema does not match exactly
399        /// (including nullability and metadata), a planning error will be raised.
400        ///
401        /// This is used to workaround bugs in the planner that are now caught by
402        /// the new schema verification step.
403        pub skip_physical_aggregate_schema_check: bool, default = false
404
405        /// Sets the compression codec used when spilling data to disk.
406        ///
407        /// Since datafusion writes spill files using the Arrow IPC Stream format,
408        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
409        /// Valid values are: uncompressed, lz4_frame, zstd.
410        /// Note: lz4_frame offers faster (de)compression, but typically results in
411        /// larger spill files. In contrast, zstd achieves
412        /// higher compression ratios at the cost of slower (de)compression speed.
413        pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
414
415        /// Specifies the reserved memory for each spillable sort operation to
416        /// facilitate an in-memory merge.
417        ///
418        /// When a sort operation spills to disk, the in-memory data must be
419        /// sorted and merged before being written to a file. This setting reserves
420        /// a specific amount of memory for that in-memory sort/merge process.
421        ///
422        /// Note: This setting is irrelevant if the sort operation cannot spill
423        /// (i.e., if there's no `DiskManager` configured).
424        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
425
426        /// When sorting, below what size should data be concatenated
427        /// and sorted in a single RecordBatch rather than sorted in
428        /// batches and merged.
429        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
430
431        /// Number of files to read in parallel when inferring schema and statistics
432        pub meta_fetch_concurrency: usize, default = 32
433
434        /// Guarantees a minimum level of output files running in parallel.
435        /// RecordBatches will be distributed in round robin fashion to each
436        /// parallel writer. Each writer is closed and a new file opened once
437        /// soft_max_rows_per_output_file is reached.
438        pub minimum_parallel_output_files: usize, default = 4
439
440        /// Target number of rows in output files when writing multiple.
441        /// This is a soft max, so it can be exceeded slightly. There also
442        /// will be one file smaller than the limit if the total
443        /// number of rows written is not roughly divisible by the soft max
444        pub soft_max_rows_per_output_file: usize, default = 50000000
445
446        /// This is the maximum number of RecordBatches buffered
447        /// for each output file being worked. Higher values can potentially
448        /// give faster write performance at the cost of higher peak
449        /// memory consumption
450        pub max_buffered_batches_per_output_file: usize, default = 2
451
452        /// Should sub directories be ignored when scanning directories for data
453        /// files. Defaults to true (ignores subdirectories), consistent with
454        /// Hive. Note that this setting does not affect reading partitioned
455        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
456        pub listing_table_ignore_subdirectory: bool, default = true
457
458        /// Should DataFusion support recursive CTEs
459        pub enable_recursive_ctes: bool, default = true
460
461        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
462        /// statistics into the same file groups.
463        /// Currently experimental
464        pub split_file_groups_by_statistics: bool, default = false
465
466        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
467        pub keep_partition_by_columns: bool, default = false
468
469        /// Aggregation ratio (number of distinct groups / number of input rows)
470        /// threshold for skipping partial aggregation. If the value is greater
471        /// then partial aggregation will skip aggregation for further input
472        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
473
474        /// Number of input rows partial aggregation partition should process, before
475        /// aggregation ratio check and trying to switch to skipping aggregation mode
476        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
477
478        /// Should DataFusion use row number estimates at the input to decide
479        /// whether increasing parallelism is beneficial or not. By default,
480        /// only exact row numbers (not estimates) are used for this decision.
481        /// Setting this flag to `true` will likely produce better plans.
482        /// if the source of statistics is accurate.
483        /// We plan to make this the default in the future.
484        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
485
486        /// Should DataFusion enforce batch size in joins or not. By default,
487        /// DataFusion will not enforce batch size in joins. Enforcing batch size
488        /// in joins can reduce memory usage when joining large
489        /// tables with a highly-selective join filter, but is also slightly slower.
490        pub enforce_batch_size_in_joins: bool, default = false
491
492        /// Size (bytes) of data buffer DataFusion uses when writing output files.
493        /// This affects the size of the data chunks that are uploaded to remote
494        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
495        /// written, it may be necessary to increase this size to avoid errors from
496        /// the remote end point.
497        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
498    }
499}
500
501config_namespace! {
502    /// Options for reading and writing parquet files
503    ///
504    /// See also: [`SessionConfig`]
505    ///
506    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
507    pub struct ParquetOptions {
508        // The following options affect reading parquet files
509
510        /// (reading) If true, reads the Parquet data page level metadata (the
511        /// Page Index), if present, to reduce the I/O and number of
512        /// rows decoded.
513        pub enable_page_index: bool, default = true
514
515        /// (reading) If true, the parquet reader attempts to skip entire row groups based
516        /// on the predicate in the query and the metadata (min/max values) stored in
517        /// the parquet file
518        pub pruning: bool, default = true
519
520        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
521        /// the file Schema. This setting can help avoid schema conflicts when querying
522        /// multiple parquet files with schemas containing compatible types but different metadata
523        pub skip_metadata: bool, default = true
524
525        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
526        /// bytes of the parquet file optimistically. If not specified, two reads are required:
527        /// One read to fetch the 8-byte parquet footer and
528        /// another to fetch the metadata length encoded in the footer
529        pub metadata_size_hint: Option<usize>, default = None
530
531        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
532        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
533        pub pushdown_filters: bool, default = false
534
535        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
536        /// will be reordered heuristically to minimize the cost of evaluation. If false,
537        /// the filters are applied in the same order as written in the query
538        pub reorder_filters: bool, default = false
539
540        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
541        /// and `Binary/BinaryLarge` with `BinaryView`.
542        pub schema_force_view_types: bool, default = true
543
544        /// (reading) If true, parquet reader will read columns of
545        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
546        ///
547        /// Parquet files generated by some legacy writers do not correctly set
548        /// the UTF8 flag for strings, causing string columns to be loaded as
549        /// BLOB instead.
550        pub binary_as_string: bool, default = false
551
552        /// (reading) If true, parquet reader will read columns of
553        /// physical type int96 as originating from a different resolution
554        /// than nanosecond. This is useful for reading data from systems like Spark
555        /// which stores microsecond resolution timestamps in an int96 allowing it
556        /// to write values with a larger date range than 64-bit timestamps with
557        /// nanosecond resolution.
558        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
559
560        /// (reading) Use any available bloom filters when reading parquet files
561        pub bloom_filter_on_read: bool, default = true
562
563        // The following options affect writing to parquet files
564        // and map to parquet::file::properties::WriterProperties
565
566        /// (writing) Sets best effort maximum size of data page in bytes
567        pub data_pagesize_limit: usize, default = 1024 * 1024
568
569        /// (writing) Sets write_batch_size in bytes
570        pub write_batch_size: usize, default = 1024
571
572        /// (writing) Sets parquet writer version
573        /// valid values are "1.0" and "2.0"
574        pub writer_version: String, default = "1.0".to_string()
575
576        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
577        ///
578        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
579        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
580        pub skip_arrow_metadata: bool, default = false
581
582        /// (writing) Sets default parquet compression codec.
583        /// Valid values are: uncompressed, snappy, gzip(level),
584        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
585        /// These values are not case sensitive. If NULL, uses
586        /// default parquet writer setting
587        ///
588        /// Note that this default setting is not the same as
589        /// the default parquet writer setting.
590        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
591
592        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
593        /// default parquet writer setting
594        pub dictionary_enabled: Option<bool>, default = Some(true)
595
596        /// (writing) Sets best effort maximum dictionary page size, in bytes
597        pub dictionary_page_size_limit: usize, default = 1024 * 1024
598
599        /// (writing) Sets if statistics are enabled for any column
600        /// Valid values are: "none", "chunk", and "page"
601        /// These values are not case sensitive. If NULL, uses
602        /// default parquet writer setting
603        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
604
605        /// (writing) Target maximum number of rows in each row group (defaults to 1M
606        /// rows). Writing larger row groups requires more memory to write, but
607        /// can get better compression and be faster to read.
608        pub max_row_group_size: usize, default =  1024 * 1024
609
610        /// (writing) Sets "created by" property
611        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
612
613        /// (writing) Sets column index truncate length
614        pub column_index_truncate_length: Option<usize>, default = Some(64)
615
616        /// (writing) Sets statistics truncate length. If NULL, uses
617        /// default parquet writer setting
618        pub statistics_truncate_length: Option<usize>, default = Some(64)
619
620        /// (writing) Sets best effort maximum number of rows in data page
621        pub data_page_row_count_limit: usize, default = 20_000
622
623        /// (writing)  Sets default encoding for any column.
624        /// Valid values are: plain, plain_dictionary, rle,
625        /// bit_packed, delta_binary_packed, delta_length_byte_array,
626        /// delta_byte_array, rle_dictionary, and byte_stream_split.
627        /// These values are not case sensitive. If NULL, uses
628        /// default parquet writer setting
629        pub encoding: Option<String>, transform = str::to_lowercase, default = None
630
631        /// (writing) Write bloom filters for all columns when creating parquet files
632        pub bloom_filter_on_write: bool, default = false
633
634        /// (writing) Sets bloom filter false positive probability. If NULL, uses
635        /// default parquet writer setting
636        pub bloom_filter_fpp: Option<f64>, default = None
637
638        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
639        /// default parquet writer setting
640        pub bloom_filter_ndv: Option<u64>, default = None
641
642        /// (writing) Controls whether DataFusion will attempt to speed up writing
643        /// parquet files by serializing them in parallel. Each column
644        /// in each row group in each output file are serialized in parallel
645        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
646        pub allow_single_file_parallelism: bool, default = true
647
648        /// (writing) By default parallel parquet writer is tuned for minimum
649        /// memory usage in a streaming execution plan. You may see
650        /// a performance benefit when writing large parquet files
651        /// by increasing maximum_parallel_row_group_writers and
652        /// maximum_buffered_record_batches_per_stream if your system
653        /// has idle cores and can tolerate additional memory usage.
654        /// Boosting these values is likely worthwhile when
655        /// writing out already in-memory data, such as from a cached
656        /// data frame.
657        pub maximum_parallel_row_group_writers: usize, default = 1
658
659        /// (writing) By default parallel parquet writer is tuned for minimum
660        /// memory usage in a streaming execution plan. You may see
661        /// a performance benefit when writing large parquet files
662        /// by increasing maximum_parallel_row_group_writers and
663        /// maximum_buffered_record_batches_per_stream if your system
664        /// has idle cores and can tolerate additional memory usage.
665        /// Boosting these values is likely worthwhile when
666        /// writing out already in-memory data, such as from a cached
667        /// data frame.
668        pub maximum_buffered_record_batches_per_stream: usize, default = 2
669    }
670}
671
672config_namespace! {
673    /// Options for configuring Parquet Modular Encryption
674    ///
675    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
676    pub struct ParquetEncryptionOptions {
677        /// Optional file decryption properties
678        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
679
680        /// Optional file encryption properties
681        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
682
683        /// Identifier for the encryption factory to use to create file encryption and decryption properties.
684        /// Encryption factories can be registered in the runtime environment with
685        /// `RuntimeEnv::register_parquet_encryption_factory`.
686        pub factory_id: Option<String>, default = None
687
688        /// Any encryption factory specific options
689        pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
690    }
691}
692
693impl ParquetEncryptionOptions {
694    /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
695    pub fn configure_factory(
696        &mut self,
697        factory_id: &str,
698        config: &impl ExtensionOptions,
699    ) {
700        self.factory_id = Some(factory_id.to_owned());
701        self.factory_options.options.clear();
702        for entry in config.entries() {
703            if let Some(value) = entry.value {
704                self.factory_options.options.insert(entry.key, value);
705            }
706        }
707    }
708}
709
710config_namespace! {
711    /// Options related to query optimization
712    ///
713    /// See also: [`SessionConfig`]
714    ///
715    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
716    pub struct OptimizerOptions {
717        /// When set to true, the optimizer will push a limit operation into
718        /// grouped aggregations which have no aggregate expressions, as a soft limit,
719        /// emitting groups once the limit is reached, before all rows in the group are read.
720        pub enable_distinct_aggregation_soft_limit: bool, default = true
721
722        /// When set to true, the physical plan optimizer will try to add round robin
723        /// repartitioning to increase parallelism to leverage more CPU cores
724        pub enable_round_robin_repartition: bool, default = true
725
726        /// When set to true, the optimizer will attempt to perform limit operations
727        /// during aggregations, if possible
728        pub enable_topk_aggregation: bool, default = true
729
730        /// When set to true, the optimizer will attempt to push limit operations
731        /// past window functions, if possible
732        pub enable_window_limits: bool, default = true
733
734        /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
735        /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
736        /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
737        /// This means that if we already have 10 timestamps in the year 2025
738        /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
739        pub enable_dynamic_filter_pushdown: bool, default = true
740
741        /// When set to true, the optimizer will insert filters before a join between
742        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
743        /// filter can add additional overhead when the file format does not fully support
744        /// predicate push down.
745        pub filter_null_join_keys: bool, default = false
746
747        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
748        /// in parallel using the provided `target_partitions` level
749        pub repartition_aggregations: bool, default = true
750
751        /// Minimum total files size in bytes to perform file scan repartitioning.
752        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
753
754        /// Should DataFusion repartition data using the join keys to execute joins in parallel
755        /// using the provided `target_partitions` level
756        pub repartition_joins: bool, default = true
757
758        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
759        /// its inputs do not have any ordering or filtering If the flag is not enabled,
760        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
761        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
762        /// RightAnti, and RightSemi - being produced only at the end of the execution.
763        /// This is not typical in stream processing. Additionally, without proper design for
764        /// long runner execution, all types of joins may encounter out-of-memory errors.
765        pub allow_symmetric_joins_without_pruning: bool, default = true
766
767        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
768        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
769        ///
770        /// For FileSources, only Parquet and CSV formats are currently supported.
771        ///
772        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
773        /// might be partitioned into smaller chunks) for parallel scanning.
774        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
775        /// happen within a single file.
776        ///
777        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
778        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
779        /// the total number of partitions and batches per partition, but does not slice the initial
780        /// record tables provided to the MemTable on creation.
781        pub repartition_file_scans: bool, default = true
782
783        /// Should DataFusion repartition data using the partitions keys to execute window
784        /// functions in parallel using the provided `target_partitions` level
785        pub repartition_windows: bool, default = true
786
787        /// Should DataFusion execute sorts in a per-partition fashion and merge
788        /// afterwards instead of coalescing first and sorting globally.
789        /// With this flag is enabled, plans in the form below
790        ///
791        /// ```text
792        ///      "SortExec: [a@0 ASC]",
793        ///      "  CoalescePartitionsExec",
794        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
795        /// ```
796        /// would turn into the plan below which performs better in multithreaded environments
797        ///
798        /// ```text
799        ///      "SortPreservingMergeExec: [a@0 ASC]",
800        ///      "  SortExec: [a@0 ASC]",
801        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
802        /// ```
803        pub repartition_sorts: bool, default = true
804
805        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
806        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
807        /// using `SortPreservingMergeExec`)
808        ///
809        /// When false, DataFusion will maximize plan parallelism using
810        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
811        pub prefer_existing_sort: bool, default = false
812
813        /// When set to true, the logical plan optimizer will produce warning
814        /// messages if any optimization rules produce errors and then proceed to the next
815        /// rule. When set to false, any rules that produce errors will cause the query to fail
816        pub skip_failed_rules: bool, default = false
817
818        /// Number of times that the optimizer will attempt to optimize the plan
819        pub max_passes: usize, default = 3
820
821        /// When set to true, the physical plan optimizer will run a top down
822        /// process to reorder the join keys
823        pub top_down_join_key_reordering: bool, default = true
824
825        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
826        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
827        pub prefer_hash_join: bool, default = true
828
829        /// The maximum estimated size in bytes for one input side of a HashJoin
830        /// will be collected into a single partition
831        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
832
833        /// The maximum estimated size in rows for one input side of a HashJoin
834        /// will be collected into a single partition
835        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
836
837        /// The default filter selectivity used by Filter Statistics
838        /// when an exact selectivity cannot be determined. Valid values are
839        /// between 0 (no selectivity) and 100 (all rows are selected).
840        pub default_filter_selectivity: u8, default = 20
841
842        /// When set to true, the optimizer will not attempt to convert Union to Interleave
843        pub prefer_existing_union: bool, default = false
844
845        /// When set to true, if the returned type is a view type
846        /// then the output will be coerced to a non-view.
847        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
848        pub expand_views_at_output: bool, default = false
849    }
850}
851
852config_namespace! {
853    /// Options controlling explain output
854    ///
855    /// See also: [`SessionConfig`]
856    ///
857    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
858    pub struct ExplainOptions {
859        /// When set to true, the explain statement will only print logical plans
860        pub logical_plan_only: bool, default = false
861
862        /// When set to true, the explain statement will only print physical plans
863        pub physical_plan_only: bool, default = false
864
865        /// When set to true, the explain statement will print operator statistics
866        /// for physical plans
867        pub show_statistics: bool, default = false
868
869        /// When set to true, the explain statement will print the partition sizes
870        pub show_sizes: bool, default = true
871
872        /// When set to true, the explain statement will print schema information
873        pub show_schema: bool, default = false
874
875        /// Display format of explain. Default is "indent".
876        /// When set to "tree", it will print the plan in a tree-rendered format.
877        pub format: String, default = "indent".to_string()
878
879        /// (format=tree only) Maximum total width of the rendered tree.
880        /// When set to 0, the tree will have no width limit.
881        pub tree_maximum_render_width: usize, default = 240
882    }
883}
884
885impl ExecutionOptions {
886    /// Returns the correct parallelism based on the provided `value`.
887    /// If `value` is `"0"`, returns the default available parallelism, computed with
888    /// `get_available_parallelism`. Otherwise, returns `value`.
889    fn normalized_parallelism(value: &str) -> String {
890        if value.parse::<usize>() == Ok(0) {
891            get_available_parallelism().to_string()
892        } else {
893            value.to_owned()
894        }
895    }
896}
897
898config_namespace! {
899    /// Options controlling the format of output when printing record batches
900    /// Copies [`arrow::util::display::FormatOptions`]
901    pub struct FormatOptions {
902        /// If set to `true` any formatting errors will be written to the output
903        /// instead of being converted into a [`std::fmt::Error`]
904        pub safe: bool, default = true
905        /// Format string for nulls
906        pub null: String, default = "".into()
907        /// Date format for date arrays
908        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
909        /// Format for DateTime arrays
910        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
911        /// Timestamp format for timestamp arrays
912        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
913        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
914        pub timestamp_tz_format: Option<String>, default = None
915        /// Time format for time arrays
916        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
917        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
918        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
919        /// Show types in visual representation batches
920        pub types_info: bool, default = false
921    }
922}
923
924impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
925    type Error = DataFusionError;
926    fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
927        let duration_format = match self.duration_format.as_str() {
928            "pretty" => arrow::util::display::DurationFormat::Pretty,
929            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
930            _ => {
931                return _config_err!(
932                    "Invalid duration format: {}. Valid values are pretty or iso8601",
933                    self.duration_format
934                )
935            }
936        };
937
938        Ok(arrow::util::display::FormatOptions::new()
939            .with_display_error(self.safe)
940            .with_null(&self.null)
941            .with_date_format(self.date_format.as_deref())
942            .with_datetime_format(self.datetime_format.as_deref())
943            .with_timestamp_format(self.timestamp_format.as_deref())
944            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
945            .with_time_format(self.time_format.as_deref())
946            .with_duration_format(duration_format)
947            .with_types_info(self.types_info))
948    }
949}
950
951/// A key value pair, with a corresponding description
952#[derive(Debug, Hash, PartialEq, Eq)]
953pub struct ConfigEntry {
954    /// A unique string to identify this config value
955    pub key: String,
956
957    /// The value if any
958    pub value: Option<String>,
959
960    /// A description of this configuration entry
961    pub description: &'static str,
962}
963
964/// Configuration options struct, able to store both built-in configuration and custom options
965#[derive(Debug, Clone, Default)]
966#[non_exhaustive]
967pub struct ConfigOptions {
968    /// Catalog options
969    pub catalog: CatalogOptions,
970    /// Execution options
971    pub execution: ExecutionOptions,
972    /// Optimizer options
973    pub optimizer: OptimizerOptions,
974    /// SQL parser options
975    pub sql_parser: SqlParserOptions,
976    /// Explain options
977    pub explain: ExplainOptions,
978    /// Optional extensions registered using [`Extensions::insert`]
979    pub extensions: Extensions,
980    /// Formatting options when printing batches
981    pub format: FormatOptions,
982}
983
984impl ConfigField for ConfigOptions {
985    fn set(&mut self, key: &str, value: &str) -> Result<()> {
986        // Extensions are handled in the public `ConfigOptions::set`
987        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
988        match key {
989            "catalog" => self.catalog.set(rem, value),
990            "execution" => self.execution.set(rem, value),
991            "optimizer" => self.optimizer.set(rem, value),
992            "explain" => self.explain.set(rem, value),
993            "sql_parser" => self.sql_parser.set(rem, value),
994            "format" => self.format.set(rem, value),
995            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
996        }
997    }
998
999    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1000        self.catalog.visit(v, "datafusion.catalog", "");
1001        self.execution.visit(v, "datafusion.execution", "");
1002        self.optimizer.visit(v, "datafusion.optimizer", "");
1003        self.explain.visit(v, "datafusion.explain", "");
1004        self.sql_parser.visit(v, "datafusion.sql_parser", "");
1005        self.format.visit(v, "datafusion.format", "");
1006    }
1007}
1008
1009impl ConfigOptions {
1010    /// Creates a new [`ConfigOptions`] with default values
1011    pub fn new() -> Self {
1012        Self::default()
1013    }
1014
1015    /// Set extensions to provided value
1016    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1017        self.extensions = extensions;
1018        self
1019    }
1020
1021    /// Set a configuration option
1022    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1023        let Some((prefix, key)) = key.split_once('.') else {
1024            return _config_err!("could not find config namespace for key \"{key}\"");
1025        };
1026
1027        if prefix == "datafusion" {
1028            return ConfigField::set(self, key, value);
1029        }
1030
1031        let Some(e) = self.extensions.0.get_mut(prefix) else {
1032            return _config_err!("Could not find config namespace \"{prefix}\"");
1033        };
1034        e.0.set(key, value)
1035    }
1036
1037    /// Create new [`ConfigOptions`], taking values from environment variables
1038    /// where possible.
1039    ///
1040    /// For example, to configure `datafusion.execution.batch_size`
1041    /// ([`ExecutionOptions::batch_size`]) you would set the
1042    /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1043    ///
1044    /// The name of the environment variable is the option's key, transformed to
1045    /// uppercase and with periods replaced with underscores.
1046    ///
1047    /// Values are parsed according to the [same rules used in casts from
1048    /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1049    ///
1050    /// If the value in the environment variable cannot be cast to the type of
1051    /// the configuration option, the default value will be used instead and a
1052    /// warning emitted. Environment variables are read when this method is
1053    /// called, and are not re-read later.
1054    pub fn from_env() -> Result<Self> {
1055        struct Visitor(Vec<String>);
1056
1057        impl Visit for Visitor {
1058            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1059                self.0.push(key.to_string())
1060            }
1061
1062            fn none(&mut self, key: &str, _: &'static str) {
1063                self.0.push(key.to_string())
1064            }
1065        }
1066
1067        // Extract the names of all fields and then look up the corresponding
1068        // environment variables. This isn't hugely efficient but avoids
1069        // ambiguity between `a.b` and `a_b` which would both correspond
1070        // to an environment variable of `A_B`
1071
1072        let mut keys = Visitor(vec![]);
1073        let mut ret = Self::default();
1074        ret.visit(&mut keys, "datafusion", "");
1075
1076        for key in keys.0 {
1077            let env = key.to_uppercase().replace('.', "_");
1078            if let Some(var) = std::env::var_os(env) {
1079                let value = var.to_string_lossy();
1080                log::info!("Set {key} to {value} from the environment variable");
1081                ret.set(&key, value.as_ref())?;
1082            }
1083        }
1084
1085        Ok(ret)
1086    }
1087
1088    /// Create new ConfigOptions struct, taking values from a string hash map.
1089    ///
1090    /// Only the built-in configurations will be extracted from the hash map
1091    /// and other key value pairs will be ignored.
1092    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1093        struct Visitor(Vec<String>);
1094
1095        impl Visit for Visitor {
1096            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1097                self.0.push(key.to_string())
1098            }
1099
1100            fn none(&mut self, key: &str, _: &'static str) {
1101                self.0.push(key.to_string())
1102            }
1103        }
1104
1105        let mut keys = Visitor(vec![]);
1106        let mut ret = Self::default();
1107        ret.visit(&mut keys, "datafusion", "");
1108
1109        for key in keys.0 {
1110            if let Some(var) = settings.get(&key) {
1111                ret.set(&key, var)?;
1112            }
1113        }
1114
1115        Ok(ret)
1116    }
1117
1118    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1119    pub fn entries(&self) -> Vec<ConfigEntry> {
1120        struct Visitor(Vec<ConfigEntry>);
1121
1122        impl Visit for Visitor {
1123            fn some<V: Display>(
1124                &mut self,
1125                key: &str,
1126                value: V,
1127                description: &'static str,
1128            ) {
1129                self.0.push(ConfigEntry {
1130                    key: key.to_string(),
1131                    value: Some(value.to_string()),
1132                    description,
1133                })
1134            }
1135
1136            fn none(&mut self, key: &str, description: &'static str) {
1137                self.0.push(ConfigEntry {
1138                    key: key.to_string(),
1139                    value: None,
1140                    description,
1141                })
1142            }
1143        }
1144
1145        let mut v = Visitor(vec![]);
1146        self.visit(&mut v, "datafusion", "");
1147
1148        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1149        v.0
1150    }
1151
1152    /// Generate documentation that can be included in the user guide
1153    pub fn generate_config_markdown() -> String {
1154        use std::fmt::Write as _;
1155
1156        let mut s = Self::default();
1157
1158        // Normalize for display
1159        s.execution.target_partitions = 0;
1160        s.execution.planning_concurrency = 0;
1161
1162        let mut docs = "| key | default | description |\n".to_string();
1163        docs += "|-----|---------|-------------|\n";
1164        let mut entries = s.entries();
1165        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1166
1167        for entry in s.entries() {
1168            let _ = writeln!(
1169                &mut docs,
1170                "| {} | {} | {} |",
1171                entry.key,
1172                entry.value.as_deref().unwrap_or("NULL"),
1173                entry.description
1174            );
1175        }
1176        docs
1177    }
1178}
1179
1180/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1181/// within DataFusion [`ConfigOptions`]
1182///
1183/// This mechanism can be used to pass configuration to user defined functions
1184/// or optimizer passes
1185///
1186/// # Example
1187/// ```
1188/// use datafusion_common::{
1189///     config::ConfigExtension, extensions_options,
1190///     config::ConfigOptions,
1191/// };
1192///  // Define a new configuration struct using the `extensions_options` macro
1193///  extensions_options! {
1194///     /// My own config options.
1195///     pub struct MyConfig {
1196///         /// Should "foo" be replaced by "bar"?
1197///         pub foo_to_bar: bool, default = true
1198///
1199///         /// How many "baz" should be created?
1200///         pub baz_count: usize, default = 1337
1201///     }
1202///  }
1203///
1204///  impl ConfigExtension for MyConfig {
1205///     const PREFIX: &'static str = "my_config";
1206///  }
1207///
1208///  // set up config struct and register extension
1209///  let mut config = ConfigOptions::default();
1210///  config.extensions.insert(MyConfig::default());
1211///
1212///  // overwrite config default
1213///  config.set("my_config.baz_count", "42").unwrap();
1214///
1215///  // check config state
1216///  let my_config = config.extensions.get::<MyConfig>().unwrap();
1217///  assert!(my_config.foo_to_bar,);
1218///  assert_eq!(my_config.baz_count, 42,);
1219/// ```
1220///
1221/// # Note:
1222/// Unfortunately associated constants are not currently object-safe, and so this
1223/// extends the object-safe [`ExtensionOptions`]
1224pub trait ConfigExtension: ExtensionOptions {
1225    /// Configuration namespace prefix to use
1226    ///
1227    /// All values under this will be prefixed with `$PREFIX + "."`
1228    const PREFIX: &'static str;
1229}
1230
1231/// An object-safe API for storing arbitrary configuration.
1232///
1233/// See [`ConfigExtension`] for user defined configuration
1234pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1235    /// Return `self` as [`Any`]
1236    ///
1237    /// This is needed until trait upcasting is stabilized
1238    fn as_any(&self) -> &dyn Any;
1239
1240    /// Return `self` as [`Any`]
1241    ///
1242    /// This is needed until trait upcasting is stabilized
1243    fn as_any_mut(&mut self) -> &mut dyn Any;
1244
1245    /// Return a deep clone of this [`ExtensionOptions`]
1246    ///
1247    /// It is important this does not share mutable state to avoid consistency issues
1248    /// with configuration changing whilst queries are executing
1249    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1250
1251    /// Set the given `key`, `value` pair
1252    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1253
1254    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1255    fn entries(&self) -> Vec<ConfigEntry>;
1256}
1257
1258/// A type-safe container for [`ConfigExtension`]
1259#[derive(Debug, Default, Clone)]
1260pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1261
1262impl Extensions {
1263    /// Create a new, empty [`Extensions`]
1264    pub fn new() -> Self {
1265        Self(BTreeMap::new())
1266    }
1267
1268    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1269    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1270        assert_ne!(T::PREFIX, "datafusion");
1271        let e = ExtensionBox(Box::new(extension));
1272        self.0.insert(T::PREFIX, e);
1273    }
1274
1275    /// Retrieves the extension of the given type if any
1276    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1277        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1278    }
1279
1280    /// Retrieves the extension of the given type if any
1281    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1282        let e = self.0.get_mut(T::PREFIX)?;
1283        e.0.as_any_mut().downcast_mut()
1284    }
1285}
1286
1287#[derive(Debug)]
1288struct ExtensionBox(Box<dyn ExtensionOptions>);
1289
1290impl Clone for ExtensionBox {
1291    fn clone(&self) -> Self {
1292        Self(self.0.cloned())
1293    }
1294}
1295
1296/// A trait implemented by `config_namespace` and for field types that provides
1297/// the ability to walk and mutate the configuration tree
1298pub trait ConfigField {
1299    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1300
1301    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1302}
1303
1304impl<F: ConfigField + Default> ConfigField for Option<F> {
1305    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1306        match self {
1307            Some(s) => s.visit(v, key, description),
1308            None => v.none(key, description),
1309        }
1310    }
1311
1312    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1313        self.get_or_insert_with(Default::default).set(key, value)
1314    }
1315}
1316
1317/// Default transformation to parse a [`ConfigField`] for a string.
1318///
1319/// This uses [`FromStr`] to parse the data.
1320pub fn default_config_transform<T>(input: &str) -> Result<T>
1321where
1322    T: FromStr,
1323    <T as FromStr>::Err: Sync + Send + Error + 'static,
1324{
1325    input.parse().map_err(|e| {
1326        DataFusionError::Context(
1327            format!(
1328                "Error parsing '{}' as {}",
1329                input,
1330                std::any::type_name::<T>()
1331            ),
1332            Box::new(DataFusionError::External(Box::new(e))),
1333        )
1334    })
1335}
1336
1337/// Macro that generates [`ConfigField`] for a given type.
1338///
1339/// # Usage
1340/// This always requires [`Display`] to be implemented for the given type.
1341///
1342/// There are two ways to invoke this macro. The first one uses
1343/// [`default_config_transform`]/[`FromStr`] to parse the data:
1344///
1345/// ```ignore
1346/// config_field(MyType);
1347/// ```
1348///
1349/// Note that the parsing error MUST implement [`std::error::Error`]!
1350///
1351/// Or you can specify how you want to parse an [`str`] into the type:
1352///
1353/// ```ignore
1354/// fn parse_it(s: &str) -> Result<MyType> {
1355///     ...
1356/// }
1357///
1358/// config_field(
1359///     MyType,
1360///     value => parse_it(value)
1361/// );
1362/// ```
1363#[macro_export]
1364macro_rules! config_field {
1365    ($t:ty) => {
1366        config_field!($t, value => $crate::config::default_config_transform(value)?);
1367    };
1368
1369    ($t:ty, $arg:ident => $transform:expr) => {
1370        impl $crate::config::ConfigField for $t {
1371            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1372                v.some(key, self, description)
1373            }
1374
1375            fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1376                *self = $transform;
1377                Ok(())
1378            }
1379        }
1380    };
1381}
1382
1383config_field!(String);
1384config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1385config_field!(usize);
1386config_field!(f64);
1387config_field!(u64);
1388
1389impl ConfigField for u8 {
1390    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1391        v.some(key, self, description)
1392    }
1393
1394    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1395        if value.is_empty() {
1396            return Err(DataFusionError::Configuration(format!(
1397                "Input string for {key} key is empty"
1398            )));
1399        }
1400        // Check if the string is a valid number
1401        if let Ok(num) = value.parse::<u8>() {
1402            // TODO: Let's decide how we treat the numerical strings.
1403            *self = num;
1404        } else {
1405            let bytes = value.as_bytes();
1406            // Check if the first character is ASCII (single byte)
1407            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1408                return Err(DataFusionError::Configuration(format!(
1409                    "Error parsing {value} as u8. Non-ASCII string provided"
1410                )));
1411            }
1412            *self = bytes[0];
1413        }
1414        Ok(())
1415    }
1416}
1417
1418impl ConfigField for CompressionTypeVariant {
1419    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1420        v.some(key, self, description)
1421    }
1422
1423    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1424        *self = CompressionTypeVariant::from_str(value)?;
1425        Ok(())
1426    }
1427}
1428
1429/// An implementation trait used to recursively walk configuration
1430pub trait Visit {
1431    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1432
1433    fn none(&mut self, key: &str, description: &'static str);
1434}
1435
1436/// Convenience macro to create [`ExtensionsOptions`].
1437///
1438/// The created structure implements the following traits:
1439///
1440/// - [`Clone`]
1441/// - [`Debug`]
1442/// - [`Default`]
1443/// - [`ExtensionOptions`]
1444///
1445/// # Usage
1446/// The syntax is:
1447///
1448/// ```text
1449/// extensions_options! {
1450///      /// Struct docs (optional).
1451///     [<vis>] struct <StructName> {
1452///         /// Field docs (optional)
1453///         [<vis>] <field_name>: <field_type>, default = <default_value>
1454///
1455///         ... more fields
1456///     }
1457/// }
1458/// ```
1459///
1460/// The placeholders are:
1461/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1462/// - `<StructName>`: Struct name like `MyStruct`.
1463/// - `<field_name>`: Field name like `my_field`.
1464/// - `<field_type>`: Field type like `u8`.
1465/// - `<default_value>`: Default value matching the field type like `42`.
1466///
1467/// # Example
1468/// See also a full example on the [`ConfigExtension`] documentation
1469///
1470/// ```
1471/// use datafusion_common::extensions_options;
1472///
1473/// extensions_options! {
1474///     /// My own config options.
1475///     pub struct MyConfig {
1476///         /// Should "foo" be replaced by "bar"?
1477///         pub foo_to_bar: bool, default = true
1478///
1479///         /// How many "baz" should be created?
1480///         pub baz_count: usize, default = 1337
1481///     }
1482/// }
1483/// ```
1484///
1485///
1486/// [`Debug`]: std::fmt::Debug
1487/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1488#[macro_export]
1489macro_rules! extensions_options {
1490    (
1491     $(#[doc = $struct_d:tt])*
1492     $vis:vis struct $struct_name:ident {
1493        $(
1494        $(#[doc = $d:tt])*
1495        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1496        )*$(,)*
1497    }
1498    ) => {
1499        $(#[doc = $struct_d])*
1500        #[derive(Debug, Clone)]
1501        #[non_exhaustive]
1502        $vis struct $struct_name{
1503            $(
1504            $(#[doc = $d])*
1505            $field_vis $field_name : $field_type,
1506            )*
1507        }
1508
1509        impl Default for $struct_name {
1510            fn default() -> Self {
1511                Self {
1512                    $($field_name: $default),*
1513                }
1514            }
1515        }
1516
1517        impl $crate::config::ExtensionOptions for $struct_name {
1518            fn as_any(&self) -> &dyn ::std::any::Any {
1519                self
1520            }
1521
1522            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1523                self
1524            }
1525
1526            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1527                Box::new(self.clone())
1528            }
1529
1530            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1531                $crate::config::ConfigField::set(self, key, value)
1532            }
1533
1534            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1535                struct Visitor(Vec<$crate::config::ConfigEntry>);
1536
1537                impl $crate::config::Visit for Visitor {
1538                    fn some<V: std::fmt::Display>(
1539                        &mut self,
1540                        key: &str,
1541                        value: V,
1542                        description: &'static str,
1543                    ) {
1544                        self.0.push($crate::config::ConfigEntry {
1545                            key: key.to_string(),
1546                            value: Some(value.to_string()),
1547                            description,
1548                        })
1549                    }
1550
1551                    fn none(&mut self, key: &str, description: &'static str) {
1552                        self.0.push($crate::config::ConfigEntry {
1553                            key: key.to_string(),
1554                            value: None,
1555                            description,
1556                        })
1557                    }
1558                }
1559
1560                let mut v = Visitor(vec![]);
1561                // The prefix is not used for extensions.
1562                // The description is generated in ConfigField::visit.
1563                // We can just pass empty strings here.
1564                $crate::config::ConfigField::visit(self, &mut v, "", "");
1565                v.0
1566            }
1567        }
1568
1569        impl $crate::config::ConfigField for $struct_name {
1570            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1571                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1572                match key {
1573                    $(
1574                        stringify!($field_name) => {
1575                            // Safely apply deprecated attribute if present
1576                            // $(#[allow(deprecated)])?
1577                            {
1578                                #[allow(deprecated)]
1579                                self.$field_name.set(rem, value.as_ref())
1580                            }
1581                        },
1582                    )*
1583                    _ => return $crate::error::_config_err!(
1584                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1585                    )
1586                }
1587            }
1588
1589            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1590                $(
1591                    let key = stringify!($field_name).to_string();
1592                    let desc = concat!($($d),*).trim();
1593                    #[allow(deprecated)]
1594                    self.$field_name.visit(v, key.as_str(), desc);
1595                )*
1596            }
1597        }
1598    }
1599}
1600
1601/// These file types have special built in behavior for configuration.
1602/// Use TableOptions::Extensions for configuring other file types.
1603#[derive(Debug, Clone)]
1604pub enum ConfigFileType {
1605    CSV,
1606    #[cfg(feature = "parquet")]
1607    PARQUET,
1608    JSON,
1609}
1610
1611/// Represents the configuration options available for handling different table formats within a data processing application.
1612/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1613/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1614#[derive(Debug, Clone, Default)]
1615pub struct TableOptions {
1616    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1617    /// quote character, and whether the first row is considered as headers.
1618    pub csv: CsvOptions,
1619
1620    /// Configuration options for Parquet file handling. This includes settings for compression,
1621    /// encoding, and other Parquet-specific file characteristics.
1622    pub parquet: TableParquetOptions,
1623
1624    /// Configuration options for JSON file handling.
1625    pub json: JsonOptions,
1626
1627    /// The current file format that the table operations should assume. This option allows
1628    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1629    pub current_format: Option<ConfigFileType>,
1630
1631    /// Optional extensions that can be used to extend or customize the behavior of the table
1632    /// options. Extensions can be registered using `Extensions::insert` and might include
1633    /// custom file handling logic, additional configuration parameters, or other enhancements.
1634    pub extensions: Extensions,
1635}
1636
1637impl ConfigField for TableOptions {
1638    /// Visits configuration settings for the current file format, or all formats if none is selected.
1639    ///
1640    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1641    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1642    /// it visits all available format settings.
1643    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1644        if let Some(file_type) = &self.current_format {
1645            match file_type {
1646                #[cfg(feature = "parquet")]
1647                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1648                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1649                ConfigFileType::JSON => self.json.visit(v, "format", ""),
1650            }
1651        } else {
1652            self.csv.visit(v, "csv", "");
1653            self.parquet.visit(v, "parquet", "");
1654            self.json.visit(v, "json", "");
1655        }
1656    }
1657
1658    /// Sets a configuration value for a specific key within `TableOptions`.
1659    ///
1660    /// This method delegates setting configuration values to the specific file format configurations,
1661    /// based on the current format selected. If no format is selected, it returns an error.
1662    ///
1663    /// # Parameters
1664    ///
1665    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1666    ///   for CSV format.
1667    /// * `value`: The value to set for the specified configuration key.
1668    ///
1669    /// # Returns
1670    ///
1671    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1672    /// or if setting the configuration value fails for the specific format.
1673    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1674        // Extensions are handled in the public `ConfigOptions::set`
1675        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1676        match key {
1677            "format" => {
1678                let Some(format) = &self.current_format else {
1679                    return _config_err!("Specify a format for TableOptions");
1680                };
1681                match format {
1682                    #[cfg(feature = "parquet")]
1683                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
1684                    ConfigFileType::CSV => self.csv.set(rem, value),
1685                    ConfigFileType::JSON => self.json.set(rem, value),
1686                }
1687            }
1688            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1689        }
1690    }
1691}
1692
1693impl TableOptions {
1694    /// Constructs a new instance of `TableOptions` with default settings.
1695    ///
1696    /// # Returns
1697    ///
1698    /// A new `TableOptions` instance with default configuration values.
1699    pub fn new() -> Self {
1700        Self::default()
1701    }
1702
1703    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1704    ///
1705    /// # Parameters
1706    ///
1707    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1708    ///
1709    /// # Returns
1710    ///
1711    /// A new `TableOptions` instance with settings applied from the session config.
1712    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1713        let initial = TableOptions::default();
1714        initial.combine_with_session_config(config)
1715    }
1716
1717    /// Updates the current `TableOptions` with settings from a given session config.
1718    ///
1719    /// # Parameters
1720    ///
1721    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1722    ///
1723    /// # Returns
1724    ///
1725    /// A new `TableOptions` instance with updated settings from the session config.
1726    #[must_use = "this method returns a new instance"]
1727    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1728        let mut clone = self.clone();
1729        clone.parquet.global = config.execution.parquet.clone();
1730        clone
1731    }
1732
1733    /// Sets the file format for the table.
1734    ///
1735    /// # Parameters
1736    ///
1737    /// * `format`: The file format to use (e.g., CSV, Parquet).
1738    pub fn set_config_format(&mut self, format: ConfigFileType) {
1739        self.current_format = Some(format);
1740    }
1741
1742    /// Sets the extensions for this `TableOptions` instance.
1743    ///
1744    /// # Parameters
1745    ///
1746    /// * `extensions`: The `Extensions` instance to set.
1747    ///
1748    /// # Returns
1749    ///
1750    /// A new `TableOptions` instance with the specified extensions applied.
1751    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1752        self.extensions = extensions;
1753        self
1754    }
1755
1756    /// Sets a specific configuration option.
1757    ///
1758    /// # Parameters
1759    ///
1760    /// * `key`: The configuration key (e.g., "format.delimiter").
1761    /// * `value`: The value to set for the specified key.
1762    ///
1763    /// # Returns
1764    ///
1765    /// A result indicating success or failure in setting the configuration option.
1766    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1767        let Some((prefix, _)) = key.split_once('.') else {
1768            return _config_err!("could not find config namespace for key \"{key}\"");
1769        };
1770
1771        if prefix == "format" {
1772            return ConfigField::set(self, key, value);
1773        }
1774
1775        if prefix == "execution" {
1776            return Ok(());
1777        }
1778
1779        let Some(e) = self.extensions.0.get_mut(prefix) else {
1780            return _config_err!("Could not find config namespace \"{prefix}\"");
1781        };
1782        e.0.set(key, value)
1783    }
1784
1785    /// Initializes a new `TableOptions` from a hash map of string settings.
1786    ///
1787    /// # Parameters
1788    ///
1789    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1790    ///
1791    /// # Returns
1792    ///
1793    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1794    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1795        let mut ret = Self::default();
1796        for (k, v) in settings {
1797            ret.set(k, v)?;
1798        }
1799
1800        Ok(ret)
1801    }
1802
1803    /// Modifies the current `TableOptions` instance with settings from a hash map.
1804    ///
1805    /// # Parameters
1806    ///
1807    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1808    ///
1809    /// # Returns
1810    ///
1811    /// A result indicating success or failure in applying the settings.
1812    pub fn alter_with_string_hash_map(
1813        &mut self,
1814        settings: &HashMap<String, String>,
1815    ) -> Result<()> {
1816        for (k, v) in settings {
1817            self.set(k, v)?;
1818        }
1819        Ok(())
1820    }
1821
1822    /// Retrieves all configuration entries from this `TableOptions`.
1823    ///
1824    /// # Returns
1825    ///
1826    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1827    pub fn entries(&self) -> Vec<ConfigEntry> {
1828        struct Visitor(Vec<ConfigEntry>);
1829
1830        impl Visit for Visitor {
1831            fn some<V: Display>(
1832                &mut self,
1833                key: &str,
1834                value: V,
1835                description: &'static str,
1836            ) {
1837                self.0.push(ConfigEntry {
1838                    key: key.to_string(),
1839                    value: Some(value.to_string()),
1840                    description,
1841                })
1842            }
1843
1844            fn none(&mut self, key: &str, description: &'static str) {
1845                self.0.push(ConfigEntry {
1846                    key: key.to_string(),
1847                    value: None,
1848                    description,
1849                })
1850            }
1851        }
1852
1853        let mut v = Visitor(vec![]);
1854        self.visit(&mut v, "format", "");
1855
1856        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1857        v.0
1858    }
1859}
1860
1861/// Options that control how Parquet files are read, including global options
1862/// that apply to all columns and optional column-specific overrides
1863///
1864/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1865/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1866/// (e.g. sorting_columns).
1867#[derive(Clone, Default, Debug, PartialEq)]
1868pub struct TableParquetOptions {
1869    /// Global Parquet options that propagates to all columns.
1870    pub global: ParquetOptions,
1871    /// Column specific options. Default usage is parquet.XX::column.
1872    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1873    /// Additional file-level metadata to include. Inserted into the key_value_metadata
1874    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1875    ///
1876    /// Multiple entries are permitted
1877    /// ```sql
1878    /// OPTIONS (
1879    ///    'format.metadata::key1' '',
1880    ///    'format.metadata::key2' 'value',
1881    ///    'format.metadata::key3' 'value has spaces',
1882    ///    'format.metadata::key4' 'value has special chars :: :',
1883    ///    'format.metadata::key_dupe' 'original will be overwritten',
1884    ///    'format.metadata::key_dupe' 'final'
1885    /// )
1886    /// ```
1887    pub key_value_metadata: HashMap<String, Option<String>>,
1888    /// Options for configuring Parquet modular encryption
1889    ///
1890    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
1891    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
1892    /// These can be set via 'format.crypto', for example:
1893    /// ```sql
1894    /// OPTIONS (
1895    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
1896    ///    'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435',  -- b"0123456789012345" */
1897    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1898    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1899    ///     -- Same for decryption
1900    ///    'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
1901    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1902    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1903    /// )
1904    /// ```
1905    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
1906    /// Note that keys must be provided as in hex format since these are binary strings.
1907    pub crypto: ParquetEncryptionOptions,
1908}
1909
1910impl TableParquetOptions {
1911    /// Return new default TableParquetOptions
1912    pub fn new() -> Self {
1913        Self::default()
1914    }
1915
1916    /// Set whether the encoding of the arrow metadata should occur
1917    /// during the writing of parquet.
1918    ///
1919    /// Default is to encode the arrow schema in the file kv_metadata.
1920    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1921        Self {
1922            global: ParquetOptions {
1923                skip_arrow_metadata: skip,
1924                ..self.global
1925            },
1926            ..self
1927        }
1928    }
1929
1930    /// Retrieves all configuration entries from this `TableParquetOptions`.
1931    ///
1932    /// # Returns
1933    ///
1934    /// A vector of `ConfigEntry` instances, representing all the configuration options within this
1935    pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
1936        struct Visitor(Vec<ConfigEntry>);
1937
1938        impl Visit for Visitor {
1939            fn some<V: Display>(
1940                &mut self,
1941                key: &str,
1942                value: V,
1943                description: &'static str,
1944            ) {
1945                self.0.push(ConfigEntry {
1946                    key: key[1..].to_string(),
1947                    value: Some(value.to_string()),
1948                    description,
1949                })
1950            }
1951
1952            fn none(&mut self, key: &str, description: &'static str) {
1953                self.0.push(ConfigEntry {
1954                    key: key[1..].to_string(),
1955                    value: None,
1956                    description,
1957                })
1958            }
1959        }
1960
1961        let mut v = Visitor(vec![]);
1962        self.visit(&mut v, "", "");
1963
1964        v.0
1965    }
1966}
1967
1968impl ConfigField for TableParquetOptions {
1969    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1970        self.global.visit(v, key_prefix, description);
1971        self.column_specific_options
1972            .visit(v, key_prefix, description);
1973        self.crypto
1974            .visit(v, &format!("{key_prefix}.crypto"), description);
1975    }
1976
1977    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1978        // Determine if the key is a global, metadata, or column-specific setting
1979        if key.starts_with("metadata::") {
1980            let k = match key.split("::").collect::<Vec<_>>()[..] {
1981                [_meta] | [_meta, ""] => {
1982                    return _config_err!(
1983                        "Invalid metadata key provided, missing key in metadata::<key>"
1984                    )
1985                }
1986                [_meta, k] => k.into(),
1987                _ => {
1988                    return _config_err!(
1989                        "Invalid metadata key provided, found too many '::' in \"{key}\""
1990                    )
1991                }
1992            };
1993            self.key_value_metadata.insert(k, Some(value.into()));
1994            Ok(())
1995        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
1996            self.crypto.set(crypto_feature, value)
1997        } else if key.contains("::") {
1998            self.column_specific_options.set(key, value)
1999        } else {
2000            self.global.set(key, value)
2001        }
2002    }
2003}
2004
2005macro_rules! config_namespace_with_hashmap {
2006    (
2007     $(#[doc = $struct_d:tt])*
2008     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
2009     $vis:vis struct $struct_name:ident {
2010        $(
2011        $(#[doc = $d:tt])*
2012        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2013        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2014        )*$(,)*
2015    }
2016    ) => {
2017
2018        $(#[doc = $struct_d])*
2019        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
2020        #[derive(Debug, Clone, PartialEq)]
2021        $vis struct $struct_name{
2022            $(
2023            $(#[doc = $d])*
2024            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2025            $field_vis $field_name : $field_type,
2026            )*
2027        }
2028
2029        impl ConfigField for $struct_name {
2030            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2031                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2032                match key {
2033                    $(
2034                       stringify!($field_name) => {
2035                           // Handle deprecated fields
2036                           #[allow(deprecated)] // Allow deprecated fields
2037                           $(let value = $transform(value);)?
2038                           self.$field_name.set(rem, value.as_ref())
2039                       },
2040                    )*
2041                    _ => _config_err!(
2042                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2043                    )
2044                }
2045            }
2046
2047            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2048                $(
2049                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2050                let desc = concat!($($d),*).trim();
2051                // Handle deprecated fields
2052                #[allow(deprecated)]
2053                self.$field_name.visit(v, key.as_str(), desc);
2054                )*
2055            }
2056        }
2057
2058        impl Default for $struct_name {
2059            fn default() -> Self {
2060                #[allow(deprecated)]
2061                Self {
2062                    $($field_name: $default),*
2063                }
2064            }
2065        }
2066
2067        impl ConfigField for HashMap<String,$struct_name> {
2068            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2069                let parts: Vec<&str> = key.splitn(2, "::").collect();
2070                match parts.as_slice() {
2071                    [inner_key, hashmap_key] => {
2072                        // Get or create the struct for the specified key
2073                        let inner_value = self
2074                            .entry((*hashmap_key).to_owned())
2075                            .or_insert_with($struct_name::default);
2076
2077                        inner_value.set(inner_key, value)
2078                    }
2079                    _ => _config_err!("Unrecognized key '{key}'."),
2080                }
2081            }
2082
2083            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2084                for (column_name, col_options) in self {
2085                    $(
2086                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2087                    let desc = concat!($($d),*).trim();
2088                    #[allow(deprecated)]
2089                    col_options.$field_name.visit(v, key.as_str(), desc);
2090                    )*
2091                }
2092            }
2093        }
2094    }
2095}
2096
2097config_namespace_with_hashmap! {
2098    /// Options controlling parquet format for individual columns.
2099    ///
2100    /// See [`ParquetOptions`] for more details
2101    pub struct ParquetColumnOptions {
2102        /// Sets if bloom filter is enabled for the column path.
2103        pub bloom_filter_enabled: Option<bool>, default = None
2104
2105        /// Sets encoding for the column path.
2106        /// Valid values are: plain, plain_dictionary, rle,
2107        /// bit_packed, delta_binary_packed, delta_length_byte_array,
2108        /// delta_byte_array, rle_dictionary, and byte_stream_split.
2109        /// These values are not case-sensitive. If NULL, uses
2110        /// default parquet options
2111        pub encoding: Option<String>, default = None
2112
2113        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2114        /// default parquet options
2115        pub dictionary_enabled: Option<bool>, default = None
2116
2117        /// Sets default parquet compression codec for the column path.
2118        /// Valid values are: uncompressed, snappy, gzip(level),
2119        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2120        /// These values are not case-sensitive. If NULL, uses
2121        /// default parquet options
2122        pub compression: Option<String>, transform = str::to_lowercase, default = None
2123
2124        /// Sets if statistics are enabled for the column
2125        /// Valid values are: "none", "chunk", and "page"
2126        /// These values are not case sensitive. If NULL, uses
2127        /// default parquet options
2128        pub statistics_enabled: Option<String>, default = None
2129
2130        /// Sets bloom filter false positive probability for the column path. If NULL, uses
2131        /// default parquet options
2132        pub bloom_filter_fpp: Option<f64>, default = None
2133
2134        /// Sets bloom filter number of distinct values. If NULL, uses
2135        /// default parquet options
2136        pub bloom_filter_ndv: Option<u64>, default = None
2137    }
2138}
2139
2140#[derive(Clone, Debug, PartialEq)]
2141pub struct ConfigFileEncryptionProperties {
2142    /// Should the parquet footer be encrypted
2143    /// default is true
2144    pub encrypt_footer: bool,
2145    /// Key to use for the parquet footer encoded in hex format
2146    pub footer_key_as_hex: String,
2147    /// Metadata information for footer key
2148    pub footer_key_metadata_as_hex: String,
2149    /// HashMap of column names --> (key in hex format, metadata)
2150    pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2151    /// AAD prefix string uniquely identifies the file and prevents file swapping
2152    pub aad_prefix_as_hex: String,
2153    /// If true, store the AAD prefix in the file
2154    /// default is false
2155    pub store_aad_prefix: bool,
2156}
2157
2158// Setup to match EncryptionPropertiesBuilder::new()
2159impl Default for ConfigFileEncryptionProperties {
2160    fn default() -> Self {
2161        ConfigFileEncryptionProperties {
2162            encrypt_footer: true,
2163            footer_key_as_hex: String::new(),
2164            footer_key_metadata_as_hex: String::new(),
2165            column_encryption_properties: Default::default(),
2166            aad_prefix_as_hex: String::new(),
2167            store_aad_prefix: false,
2168        }
2169    }
2170}
2171
2172config_namespace_with_hashmap! {
2173    pub struct ColumnEncryptionProperties {
2174        /// Per column encryption key
2175        pub column_key_as_hex: String, default = "".to_string()
2176        /// Per column encryption key metadata
2177        pub column_metadata_as_hex: Option<String>, default = None
2178    }
2179}
2180
2181impl ConfigField for ConfigFileEncryptionProperties {
2182    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2183        let key = format!("{key_prefix}.encrypt_footer");
2184        let desc = "Encrypt the footer";
2185        self.encrypt_footer.visit(v, key.as_str(), desc);
2186
2187        let key = format!("{key_prefix}.footer_key_as_hex");
2188        let desc = "Key to use for the parquet footer";
2189        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2190
2191        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2192        let desc = "Metadata to use for the parquet footer";
2193        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2194
2195        self.column_encryption_properties.visit(v, key_prefix, desc);
2196
2197        let key = format!("{key_prefix}.aad_prefix_as_hex");
2198        let desc = "AAD prefix to use";
2199        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2200
2201        let key = format!("{key_prefix}.store_aad_prefix");
2202        let desc = "If true, store the AAD prefix";
2203        self.store_aad_prefix.visit(v, key.as_str(), desc);
2204
2205        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2206    }
2207
2208    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2209        // Any hex encoded values must be pre-encoded using
2210        // hex::encode() before calling set.
2211
2212        if key.contains("::") {
2213            // Handle any column specific properties
2214            return self.column_encryption_properties.set(key, value);
2215        };
2216
2217        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2218        match key {
2219            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2220            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2221            "footer_key_metadata_as_hex" => {
2222                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2223            }
2224            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2225            "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2226            _ => _config_err!(
2227                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2228                key
2229            ),
2230        }
2231    }
2232}
2233
2234#[cfg(feature = "parquet_encryption")]
2235impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2236    fn from(val: ConfigFileEncryptionProperties) -> Self {
2237        let mut fep = FileEncryptionProperties::builder(
2238            hex::decode(val.footer_key_as_hex).unwrap(),
2239        )
2240        .with_plaintext_footer(!val.encrypt_footer)
2241        .with_aad_prefix_storage(val.store_aad_prefix);
2242
2243        if !val.footer_key_metadata_as_hex.is_empty() {
2244            fep = fep.with_footer_key_metadata(
2245                hex::decode(&val.footer_key_metadata_as_hex)
2246                    .expect("Invalid footer key metadata"),
2247            );
2248        }
2249
2250        for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2251            let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2252                .expect("Invalid column encryption key");
2253            let key_metadata = encryption_props
2254                .column_metadata_as_hex
2255                .as_ref()
2256                .map(|x| hex::decode(x).expect("Invalid column metadata"));
2257            match key_metadata {
2258                Some(key_metadata) => {
2259                    fep = fep.with_column_key_and_metadata(
2260                        column_name,
2261                        encryption_key,
2262                        key_metadata,
2263                    );
2264                }
2265                None => {
2266                    fep = fep.with_column_key(column_name, encryption_key);
2267                }
2268            }
2269        }
2270
2271        if !val.aad_prefix_as_hex.is_empty() {
2272            let aad_prefix: Vec<u8> =
2273                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2274            fep = fep.with_aad_prefix(aad_prefix);
2275        }
2276        fep.build().unwrap()
2277    }
2278}
2279
2280#[cfg(feature = "parquet_encryption")]
2281impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties {
2282    fn from(f: &FileEncryptionProperties) -> Self {
2283        let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2284
2285        let mut column_encryption_properties: HashMap<
2286            String,
2287            ColumnEncryptionProperties,
2288        > = HashMap::new();
2289
2290        for (i, column_name) in column_names_vec.iter().enumerate() {
2291            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2292            let column_metadata_as_hex: Option<String> =
2293                column_metas_vec.get(i).map(hex::encode);
2294            column_encryption_properties.insert(
2295                column_name.clone(),
2296                ColumnEncryptionProperties {
2297                    column_key_as_hex,
2298                    column_metadata_as_hex,
2299                },
2300            );
2301        }
2302        let mut aad_prefix: Vec<u8> = Vec::new();
2303        if let Some(prefix) = f.aad_prefix() {
2304            aad_prefix = prefix.clone();
2305        }
2306        ConfigFileEncryptionProperties {
2307            encrypt_footer: f.encrypt_footer(),
2308            footer_key_as_hex: hex::encode(f.footer_key()),
2309            footer_key_metadata_as_hex: f
2310                .footer_key_metadata()
2311                .map(hex::encode)
2312                .unwrap_or_default(),
2313            column_encryption_properties,
2314            aad_prefix_as_hex: hex::encode(aad_prefix),
2315            store_aad_prefix: f.store_aad_prefix(),
2316        }
2317    }
2318}
2319
2320#[derive(Clone, Debug, PartialEq)]
2321pub struct ConfigFileDecryptionProperties {
2322    /// Binary string to use for the parquet footer encoded in hex format
2323    pub footer_key_as_hex: String,
2324    /// HashMap of column names --> key in hex format
2325    pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2326    /// AAD prefix string uniquely identifies the file and prevents file swapping
2327    pub aad_prefix_as_hex: String,
2328    /// If true, then verify signature for files with plaintext footers.
2329    /// default = true
2330    pub footer_signature_verification: bool,
2331}
2332
2333config_namespace_with_hashmap! {
2334    pub struct ColumnDecryptionProperties {
2335        /// Per column encryption key
2336        pub column_key_as_hex: String, default = "".to_string()
2337    }
2338}
2339
2340// Setup to match DecryptionPropertiesBuilder::new()
2341impl Default for ConfigFileDecryptionProperties {
2342    fn default() -> Self {
2343        ConfigFileDecryptionProperties {
2344            footer_key_as_hex: String::new(),
2345            column_decryption_properties: Default::default(),
2346            aad_prefix_as_hex: String::new(),
2347            footer_signature_verification: true,
2348        }
2349    }
2350}
2351
2352impl ConfigField for ConfigFileDecryptionProperties {
2353    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2354        let key = format!("{key_prefix}.footer_key_as_hex");
2355        let desc = "Key to use for the parquet footer";
2356        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2357
2358        let key = format!("{key_prefix}.aad_prefix_as_hex");
2359        let desc = "AAD prefix to use";
2360        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2361
2362        let key = format!("{key_prefix}.footer_signature_verification");
2363        let desc = "If true, verify the footer signature";
2364        self.footer_signature_verification
2365            .visit(v, key.as_str(), desc);
2366
2367        self.column_decryption_properties.visit(v, key_prefix, desc);
2368    }
2369
2370    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2371        // Any hex encoded values must be pre-encoded using
2372        // hex::encode() before calling set.
2373
2374        if key.contains("::") {
2375            // Handle any column specific properties
2376            return self.column_decryption_properties.set(key, value);
2377        };
2378
2379        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2380        match key {
2381            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2382            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2383            "footer_signature_verification" => {
2384                self.footer_signature_verification.set(rem, value.as_ref())
2385            }
2386            _ => _config_err!(
2387                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2388                key
2389            ),
2390        }
2391    }
2392}
2393
2394#[cfg(feature = "parquet_encryption")]
2395impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2396    fn from(val: ConfigFileDecryptionProperties) -> Self {
2397        let mut column_names: Vec<&str> = Vec::new();
2398        let mut column_keys: Vec<Vec<u8>> = Vec::new();
2399
2400        for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2401            column_names.push(col_name.as_str());
2402            column_keys.push(
2403                hex::decode(&decryption_properties.column_key_as_hex)
2404                    .expect("Invalid column decryption key"),
2405            );
2406        }
2407
2408        let mut fep = FileDecryptionProperties::builder(
2409            hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2410        )
2411        .with_column_keys(column_names, column_keys)
2412        .unwrap();
2413
2414        if !val.footer_signature_verification {
2415            fep = fep.disable_footer_signature_verification();
2416        }
2417
2418        if !val.aad_prefix_as_hex.is_empty() {
2419            let aad_prefix =
2420                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2421            fep = fep.with_aad_prefix(aad_prefix);
2422        }
2423
2424        fep.build().unwrap()
2425    }
2426}
2427
2428#[cfg(feature = "parquet_encryption")]
2429impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties {
2430    fn from(f: &FileDecryptionProperties) -> Self {
2431        let (column_names_vec, column_keys_vec) = f.column_keys();
2432        let mut column_decryption_properties: HashMap<
2433            String,
2434            ColumnDecryptionProperties,
2435        > = HashMap::new();
2436        for (i, column_name) in column_names_vec.iter().enumerate() {
2437            let props = ColumnDecryptionProperties {
2438                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2439            };
2440            column_decryption_properties.insert(column_name.clone(), props);
2441        }
2442
2443        let mut aad_prefix: Vec<u8> = Vec::new();
2444        if let Some(prefix) = f.aad_prefix() {
2445            aad_prefix = prefix.clone();
2446        }
2447        ConfigFileDecryptionProperties {
2448            footer_key_as_hex: hex::encode(
2449                f.footer_key(None).unwrap_or_default().as_ref(),
2450            ),
2451            column_decryption_properties,
2452            aad_prefix_as_hex: hex::encode(aad_prefix),
2453            footer_signature_verification: f.check_plaintext_footer_integrity(),
2454        }
2455    }
2456}
2457
2458/// Holds implementation-specific options for an encryption factory
2459#[derive(Clone, Debug, Default, PartialEq)]
2460pub struct EncryptionFactoryOptions {
2461    pub options: HashMap<String, String>,
2462}
2463
2464impl ConfigField for EncryptionFactoryOptions {
2465    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2466        for (option_key, option_value) in &self.options {
2467            v.some(
2468                &format!("{key}.{option_key}"),
2469                option_value,
2470                "Encryption factory specific option",
2471            );
2472        }
2473    }
2474
2475    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2476        self.options.insert(key.to_owned(), value.to_owned());
2477        Ok(())
2478    }
2479}
2480
2481impl EncryptionFactoryOptions {
2482    /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2483    pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2484        let mut options = T::default();
2485        for (key, value) in &self.options {
2486            options.set(key, value)?;
2487        }
2488        Ok(options)
2489    }
2490}
2491
2492config_namespace! {
2493    /// Options controlling CSV format
2494    pub struct CsvOptions {
2495        /// Specifies whether there is a CSV header (i.e. the first line
2496        /// consists of is column names). The value `None` indicates that
2497        /// the configuration should be consulted.
2498        pub has_header: Option<bool>, default = None
2499        pub delimiter: u8, default = b','
2500        pub quote: u8, default = b'"'
2501        pub terminator: Option<u8>, default = None
2502        pub escape: Option<u8>, default = None
2503        pub double_quote: Option<bool>, default = None
2504        /// Specifies whether newlines in (quoted) values are supported.
2505        ///
2506        /// Parsing newlines in quoted values may be affected by execution behaviour such as
2507        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2508        /// parsed successfully, which may reduce performance.
2509        ///
2510        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2511        pub newlines_in_values: Option<bool>, default = None
2512        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2513        pub schema_infer_max_rec: Option<usize>, default = None
2514        pub date_format: Option<String>, default = None
2515        pub datetime_format: Option<String>, default = None
2516        pub timestamp_format: Option<String>, default = None
2517        pub timestamp_tz_format: Option<String>, default = None
2518        pub time_format: Option<String>, default = None
2519        // The output format for Nulls in the CSV writer.
2520        pub null_value: Option<String>, default = None
2521        // The input regex for Nulls when loading CSVs.
2522        pub null_regex: Option<String>, default = None
2523        pub comment: Option<u8>, default = None
2524    }
2525}
2526
2527impl CsvOptions {
2528    /// Set a limit in terms of records to scan to infer the schema
2529    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2530    pub fn with_compression(
2531        mut self,
2532        compression_type_variant: CompressionTypeVariant,
2533    ) -> Self {
2534        self.compression = compression_type_variant;
2535        self
2536    }
2537
2538    /// Set a limit in terms of records to scan to infer the schema
2539    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2540    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2541        self.schema_infer_max_rec = Some(max_rec);
2542        self
2543    }
2544
2545    /// Set true to indicate that the first line is a header.
2546    /// - default to true
2547    pub fn with_has_header(mut self, has_header: bool) -> Self {
2548        self.has_header = Some(has_header);
2549        self
2550    }
2551
2552    /// Returns true if the first line is a header. If format options does not
2553    /// specify whether there is a header, returns `None` (indicating that the
2554    /// configuration should be consulted).
2555    pub fn has_header(&self) -> Option<bool> {
2556        self.has_header
2557    }
2558
2559    /// The character separating values within a row.
2560    /// - default to ','
2561    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2562        self.delimiter = delimiter;
2563        self
2564    }
2565
2566    /// The quote character in a row.
2567    /// - default to '"'
2568    pub fn with_quote(mut self, quote: u8) -> Self {
2569        self.quote = quote;
2570        self
2571    }
2572
2573    /// The character that terminates a row.
2574    /// - default to None (CRLF)
2575    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2576        self.terminator = terminator;
2577        self
2578    }
2579
2580    /// The escape character in a row.
2581    /// - default is None
2582    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2583        self.escape = escape;
2584        self
2585    }
2586
2587    /// Set true to indicate that the CSV quotes should be doubled.
2588    /// - default to true
2589    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2590        self.double_quote = Some(double_quote);
2591        self
2592    }
2593
2594    /// Specifies whether newlines in (quoted) values are supported.
2595    ///
2596    /// Parsing newlines in quoted values may be affected by execution behaviour such as
2597    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2598    /// parsed successfully, which may reduce performance.
2599    ///
2600    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2601    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2602        self.newlines_in_values = Some(newlines_in_values);
2603        self
2604    }
2605
2606    /// Set a `CompressionTypeVariant` of CSV
2607    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2608    pub fn with_file_compression_type(
2609        mut self,
2610        compression: CompressionTypeVariant,
2611    ) -> Self {
2612        self.compression = compression;
2613        self
2614    }
2615
2616    /// The delimiter character.
2617    pub fn delimiter(&self) -> u8 {
2618        self.delimiter
2619    }
2620
2621    /// The quote character.
2622    pub fn quote(&self) -> u8 {
2623        self.quote
2624    }
2625
2626    /// The terminator character.
2627    pub fn terminator(&self) -> Option<u8> {
2628        self.terminator
2629    }
2630
2631    /// The escape character.
2632    pub fn escape(&self) -> Option<u8> {
2633        self.escape
2634    }
2635}
2636
2637config_namespace! {
2638    /// Options controlling JSON format
2639    pub struct JsonOptions {
2640        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2641        pub schema_infer_max_rec: Option<usize>, default = None
2642    }
2643}
2644
2645pub trait OutputFormatExt: Display {}
2646
2647#[derive(Debug, Clone, PartialEq)]
2648#[allow(clippy::large_enum_variant)]
2649pub enum OutputFormat {
2650    CSV(CsvOptions),
2651    JSON(JsonOptions),
2652    #[cfg(feature = "parquet")]
2653    PARQUET(TableParquetOptions),
2654    AVRO,
2655    ARROW,
2656}
2657
2658impl Display for OutputFormat {
2659    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2660        let out = match self {
2661            OutputFormat::CSV(_) => "csv",
2662            OutputFormat::JSON(_) => "json",
2663            #[cfg(feature = "parquet")]
2664            OutputFormat::PARQUET(_) => "parquet",
2665            OutputFormat::AVRO => "avro",
2666            OutputFormat::ARROW => "arrow",
2667        };
2668        write!(f, "{out}")
2669    }
2670}
2671
2672#[cfg(test)]
2673mod tests {
2674    use crate::config::{
2675        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2676        Extensions, TableOptions, TableParquetOptions,
2677    };
2678    use std::any::Any;
2679    use std::collections::HashMap;
2680
2681    #[derive(Default, Debug, Clone)]
2682    pub struct TestExtensionConfig {
2683        /// Should "foo" be replaced by "bar"?
2684        pub properties: HashMap<String, String>,
2685    }
2686
2687    impl ExtensionOptions for TestExtensionConfig {
2688        fn as_any(&self) -> &dyn Any {
2689            self
2690        }
2691
2692        fn as_any_mut(&mut self) -> &mut dyn Any {
2693            self
2694        }
2695
2696        fn cloned(&self) -> Box<dyn ExtensionOptions> {
2697            Box::new(self.clone())
2698        }
2699
2700        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2701            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2702            assert_eq!(key, "test");
2703            self.properties.insert(rem.to_owned(), value.to_owned());
2704            Ok(())
2705        }
2706
2707        fn entries(&self) -> Vec<ConfigEntry> {
2708            self.properties
2709                .iter()
2710                .map(|(k, v)| ConfigEntry {
2711                    key: k.into(),
2712                    value: Some(v.into()),
2713                    description: "",
2714                })
2715                .collect()
2716        }
2717    }
2718
2719    impl ConfigExtension for TestExtensionConfig {
2720        const PREFIX: &'static str = "test";
2721    }
2722
2723    #[test]
2724    fn create_table_config() {
2725        let mut extension = Extensions::new();
2726        extension.insert(TestExtensionConfig::default());
2727        let table_config = TableOptions::new().with_extensions(extension);
2728        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2729        assert!(kafka_config.is_some())
2730    }
2731
2732    #[test]
2733    fn alter_test_extension_config() {
2734        let mut extension = Extensions::new();
2735        extension.insert(TestExtensionConfig::default());
2736        let mut table_config = TableOptions::new().with_extensions(extension);
2737        table_config.set_config_format(ConfigFileType::CSV);
2738        table_config.set("format.delimiter", ";").unwrap();
2739        assert_eq!(table_config.csv.delimiter, b';');
2740        table_config.set("test.bootstrap.servers", "asd").unwrap();
2741        let kafka_config = table_config
2742            .extensions
2743            .get::<TestExtensionConfig>()
2744            .unwrap();
2745        assert_eq!(
2746            kafka_config.properties.get("bootstrap.servers").unwrap(),
2747            "asd"
2748        );
2749    }
2750
2751    #[test]
2752    fn csv_u8_table_options() {
2753        let mut table_config = TableOptions::new();
2754        table_config.set_config_format(ConfigFileType::CSV);
2755        table_config.set("format.delimiter", ";").unwrap();
2756        assert_eq!(table_config.csv.delimiter as char, ';');
2757        table_config.set("format.escape", "\"").unwrap();
2758        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2759        table_config.set("format.escape", "\'").unwrap();
2760        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2761    }
2762
2763    #[test]
2764    fn warning_only_not_default() {
2765        use std::sync::atomic::AtomicUsize;
2766        static COUNT: AtomicUsize = AtomicUsize::new(0);
2767        use log::{Level, LevelFilter, Metadata, Record};
2768        struct SimpleLogger;
2769        impl log::Log for SimpleLogger {
2770            fn enabled(&self, metadata: &Metadata) -> bool {
2771                metadata.level() <= Level::Info
2772            }
2773
2774            fn log(&self, record: &Record) {
2775                if self.enabled(record.metadata()) {
2776                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2777                }
2778            }
2779            fn flush(&self) {}
2780        }
2781        log::set_logger(&SimpleLogger).unwrap();
2782        log::set_max_level(LevelFilter::Info);
2783        let mut sql_parser_options = crate::config::SqlParserOptions::default();
2784        sql_parser_options
2785            .set("enable_options_value_normalization", "false")
2786            .unwrap();
2787        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2788        sql_parser_options
2789            .set("enable_options_value_normalization", "true")
2790            .unwrap();
2791        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2792    }
2793
2794    #[cfg(feature = "parquet")]
2795    #[test]
2796    fn parquet_table_options() {
2797        let mut table_config = TableOptions::new();
2798        table_config.set_config_format(ConfigFileType::PARQUET);
2799        table_config
2800            .set("format.bloom_filter_enabled::col1", "true")
2801            .unwrap();
2802        assert_eq!(
2803            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2804            Some(true)
2805        );
2806    }
2807
2808    #[cfg(feature = "parquet_encryption")]
2809    #[test]
2810    fn parquet_table_encryption() {
2811        use crate::config::{
2812            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
2813        };
2814        use parquet::encryption::decrypt::FileDecryptionProperties;
2815        use parquet::encryption::encrypt::FileEncryptionProperties;
2816
2817        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
2818        let column_names = vec!["double_field", "float_field"];
2819        let column_keys =
2820            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
2821
2822        let file_encryption_properties =
2823            FileEncryptionProperties::builder(footer_key.clone())
2824                .with_column_keys(column_names.clone(), column_keys.clone())
2825                .unwrap()
2826                .build()
2827                .unwrap();
2828
2829        let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
2830            .with_column_keys(column_names.clone(), column_keys.clone())
2831            .unwrap()
2832            .build()
2833            .unwrap();
2834
2835        // Test round-trip
2836        let config_encrypt: ConfigFileEncryptionProperties =
2837            (&file_encryption_properties).into();
2838        let encryption_properties_built: FileEncryptionProperties =
2839            config_encrypt.clone().into();
2840        assert_eq!(file_encryption_properties, encryption_properties_built);
2841
2842        let config_decrypt: ConfigFileDecryptionProperties =
2843            (&decryption_properties).into();
2844        let decryption_properties_built: FileDecryptionProperties =
2845            config_decrypt.clone().into();
2846        assert_eq!(decryption_properties, decryption_properties_built);
2847
2848        ///////////////////////////////////////////////////////////////////////////////////
2849        // Test encryption config
2850
2851        // Display original encryption config
2852        // println!("{:#?}", config_encrypt);
2853
2854        let mut table_config = TableOptions::new();
2855        table_config.set_config_format(ConfigFileType::PARQUET);
2856        table_config
2857            .parquet
2858            .set(
2859                "crypto.file_encryption.encrypt_footer",
2860                config_encrypt.encrypt_footer.to_string().as_str(),
2861            )
2862            .unwrap();
2863        table_config
2864            .parquet
2865            .set(
2866                "crypto.file_encryption.footer_key_as_hex",
2867                config_encrypt.footer_key_as_hex.as_str(),
2868            )
2869            .unwrap();
2870
2871        for (i, col_name) in column_names.iter().enumerate() {
2872            let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
2873            let value = hex::encode(column_keys[i].clone());
2874            table_config
2875                .parquet
2876                .set(key.as_str(), value.as_str())
2877                .unwrap();
2878        }
2879
2880        // Print matching final encryption config
2881        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
2882
2883        assert_eq!(
2884            table_config.parquet.crypto.file_encryption,
2885            Some(config_encrypt)
2886        );
2887
2888        ///////////////////////////////////////////////////////////////////////////////////
2889        // Test decryption config
2890
2891        // Display original decryption config
2892        // println!("{:#?}", config_decrypt);
2893
2894        let mut table_config = TableOptions::new();
2895        table_config.set_config_format(ConfigFileType::PARQUET);
2896        table_config
2897            .parquet
2898            .set(
2899                "crypto.file_decryption.footer_key_as_hex",
2900                config_decrypt.footer_key_as_hex.as_str(),
2901            )
2902            .unwrap();
2903
2904        for (i, col_name) in column_names.iter().enumerate() {
2905            let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
2906            let value = hex::encode(column_keys[i].clone());
2907            table_config
2908                .parquet
2909                .set(key.as_str(), value.as_str())
2910                .unwrap();
2911        }
2912
2913        // Print matching final decryption config
2914        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
2915
2916        assert_eq!(
2917            table_config.parquet.crypto.file_decryption,
2918            Some(config_decrypt.clone())
2919        );
2920
2921        // Set config directly
2922        let mut table_config = TableOptions::new();
2923        table_config.set_config_format(ConfigFileType::PARQUET);
2924        table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
2925        assert_eq!(
2926            table_config.parquet.crypto.file_decryption,
2927            Some(config_decrypt.clone())
2928        );
2929    }
2930
2931    #[cfg(feature = "parquet_encryption")]
2932    #[test]
2933    fn parquet_encryption_factory_config() {
2934        let mut parquet_options = TableParquetOptions::default();
2935
2936        assert_eq!(parquet_options.crypto.factory_id, None);
2937        assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
2938
2939        let mut input_config = TestExtensionConfig::default();
2940        input_config
2941            .properties
2942            .insert("key1".to_string(), "value 1".to_string());
2943        input_config
2944            .properties
2945            .insert("key2".to_string(), "value 2".to_string());
2946
2947        parquet_options
2948            .crypto
2949            .configure_factory("example_factory", &input_config);
2950
2951        assert_eq!(
2952            parquet_options.crypto.factory_id,
2953            Some("example_factory".to_string())
2954        );
2955        let factory_options = &parquet_options.crypto.factory_options.options;
2956        assert_eq!(factory_options.len(), 2);
2957        assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
2958        assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
2959    }
2960
2961    #[cfg(feature = "parquet")]
2962    #[test]
2963    fn parquet_table_options_config_entry() {
2964        let mut table_config = TableOptions::new();
2965        table_config.set_config_format(ConfigFileType::PARQUET);
2966        table_config
2967            .set("format.bloom_filter_enabled::col1", "true")
2968            .unwrap();
2969        let entries = table_config.entries();
2970        assert!(entries
2971            .iter()
2972            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2973    }
2974
2975    #[cfg(feature = "parquet")]
2976    #[test]
2977    fn parquet_table_parquet_options_config_entry() {
2978        let mut table_parquet_options = TableParquetOptions::new();
2979        table_parquet_options
2980            .set(
2981                "crypto.file_encryption.column_key_as_hex::double_field",
2982                "31323334353637383930313233343530",
2983            )
2984            .unwrap();
2985        let entries = table_parquet_options.entries();
2986        assert!(entries
2987            .iter()
2988            .any(|item| item.key
2989                == "crypto.file_encryption.column_key_as_hex::double_field"))
2990    }
2991
2992    #[cfg(feature = "parquet")]
2993    #[test]
2994    fn parquet_table_options_config_metadata_entry() {
2995        let mut table_config = TableOptions::new();
2996        table_config.set_config_format(ConfigFileType::PARQUET);
2997        table_config.set("format.metadata::key1", "").unwrap();
2998        table_config.set("format.metadata::key2", "value2").unwrap();
2999        table_config
3000            .set("format.metadata::key3", "value with spaces ")
3001            .unwrap();
3002        table_config
3003            .set("format.metadata::key4", "value with special chars :: :")
3004            .unwrap();
3005
3006        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3007        assert_eq!(parsed_metadata.get("should not exist1"), None);
3008        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3009        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3010        assert_eq!(
3011            parsed_metadata.get("key3"),
3012            Some(&Some("value with spaces ".into()))
3013        );
3014        assert_eq!(
3015            parsed_metadata.get("key4"),
3016            Some(&Some("value with special chars :: :".into()))
3017        );
3018
3019        // duplicate keys are overwritten
3020        table_config.set("format.metadata::key_dupe", "A").unwrap();
3021        table_config.set("format.metadata::key_dupe", "B").unwrap();
3022        let parsed_metadata = table_config.parquet.key_value_metadata;
3023        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3024    }
3025}