datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType};
26use crate::parquet_config::DFParquetWriterVersion;
27use crate::parsers::{CompressionTypeVariant, CsvQuoteStyle};
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30#[cfg(feature = "parquet_encryption")]
31use hex;
32use std::any::Any;
33use std::collections::{BTreeMap, HashMap};
34use std::error::Error;
35use std::fmt::{self, Display};
36use std::str::FromStr;
37#[cfg(feature = "parquet_encryption")]
38use std::sync::Arc;
39
40/// A macro that wraps a configuration struct and automatically derives
41/// [`Default`] and [`ConfigField`] for it, allowing it to be used
42/// in the [`ConfigOptions`] configuration tree.
43///
44/// `transform` is used to normalize values before parsing.
45///
46/// For example,
47///
48/// ```ignore
49/// config_namespace! {
50/// /// Amazing config
51/// pub struct MyConfig {
52/// /// Field 1 doc
53/// field1: String, transform = str::to_lowercase, default = "".to_string()
54///
55/// /// Field 2 doc
56/// field2: usize, default = 232
57///
58/// /// Field 3 doc
59/// field3: Option<usize>, default = None
60/// }
61/// }
62/// ```
63///
64/// Will generate
65///
66/// ```ignore
67/// /// Amazing config
68/// #[derive(Debug, Clone)]
69/// #[non_exhaustive]
70/// pub struct MyConfig {
71/// /// Field 1 doc
72/// field1: String,
73/// /// Field 2 doc
74/// field2: usize,
75/// /// Field 3 doc
76/// field3: Option<usize>,
77/// }
78/// impl ConfigField for MyConfig {
79/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
80/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
81/// match key {
82/// "field1" => {
83/// let value = str::to_lowercase(value);
84/// self.field1.set(rem, value.as_ref())
85/// },
86/// "field2" => self.field2.set(rem, value.as_ref()),
87/// "field3" => self.field3.set(rem, value.as_ref()),
88/// _ => _internal_err!(
89/// "Config value \"{}\" not found on MyConfig",
90/// key
91/// ),
92/// }
93/// }
94///
95/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
96/// let key = format!("{}.field1", key_prefix);
97/// let desc = "Field 1 doc";
98/// self.field1.visit(v, key.as_str(), desc);
99/// let key = format!("{}.field2", key_prefix);
100/// let desc = "Field 2 doc";
101/// self.field2.visit(v, key.as_str(), desc);
102/// let key = format!("{}.field3", key_prefix);
103/// let desc = "Field 3 doc";
104/// self.field3.visit(v, key.as_str(), desc);
105/// }
106/// }
107///
108/// impl Default for MyConfig {
109/// fn default() -> Self {
110/// Self {
111/// field1: "".to_string(),
112/// field2: 232,
113/// field3: None,
114/// }
115/// }
116/// }
117/// ```
118///
119/// NB: Misplaced commas may result in nonsensical errors
120#[macro_export]
121macro_rules! config_namespace {
122 (
123 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
124 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
125 $(#[allow($($struct_de:tt)*)])?
126 $vis:vis struct $struct_name:ident {
127 $(
128 $(#[doc = $d:tt])* // Field-level documentation attributes
129 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
130 $(#[allow($($field_de:tt)*)])?
131 $field_vis:vis $field_name:ident : $field_type:ty,
132 $(warn = $warn:expr,)?
133 $(transform = $transform:expr,)?
134 default = $default:expr
135 )*$(,)*
136 }
137 ) => {
138 $(#[doc = $struct_d])* // Apply struct documentation
139 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
140 $(#[allow($($struct_de)*)])?
141 #[derive(Debug, Clone, PartialEq)]
142 $vis struct $struct_name {
143 $(
144 $(#[doc = $d])* // Apply field documentation
145 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
146 $(#[allow($($field_de)*)])?
147 $field_vis $field_name: $field_type,
148 )*
149 }
150
151 impl $crate::config::ConfigField for $struct_name {
152 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
153 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
154 match key {
155 $(
156 stringify!($field_name) => {
157 // Safely apply deprecated attribute if present
158 // $(#[allow(deprecated)])?
159 {
160 $(let value = $transform(value);)? // Apply transformation if specified
161 let ret = self.$field_name.set(rem, value.as_ref());
162
163 $(if !$warn.is_empty() {
164 let default: $field_type = $default;
165 if default != self.$field_name {
166 log::warn!($warn);
167 }
168 })? // Log warning if specified, and the value is not the default
169 ret
170 }
171 },
172 )*
173 _ => return $crate::error::_config_err!(
174 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
175 )
176 }
177 }
178
179 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
180 $(
181 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
182 let desc = concat!($($d),*).trim();
183 self.$field_name.visit(v, key.as_str(), desc);
184 )*
185 }
186
187 fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
188 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
189 match key {
190 $(
191 stringify!($field_name) => {
192 {
193 if rem.is_empty() {
194 let default_value: $field_type = $default;
195 self.$field_name = default_value;
196 Ok(())
197 } else {
198 self.$field_name.reset(rem)
199 }
200 }
201 },
202 )*
203 _ => $crate::error::_config_err!(
204 "Config value \"{}\" not found on {}",
205 key,
206 stringify!($struct_name)
207 ),
208 }
209 }
210 }
211 impl Default for $struct_name {
212 fn default() -> Self {
213 Self {
214 $($field_name: $default),*
215 }
216 }
217 }
218 }
219}
220
221config_namespace! {
222 /// Options related to catalog and directory scanning
223 ///
224 /// See also: [`SessionConfig`]
225 ///
226 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
227 pub struct CatalogOptions {
228 /// Whether the default catalog and schema should be created automatically.
229 pub create_default_catalog_and_schema: bool, default = true
230
231 /// The default catalog name - this impacts what SQL queries use if not specified
232 pub default_catalog: String, default = "datafusion".to_string()
233
234 /// The default schema name - this impacts what SQL queries use if not specified
235 pub default_schema: String, default = "public".to_string()
236
237 /// Should DataFusion provide access to `information_schema`
238 /// virtual tables for displaying schema information
239 pub information_schema: bool, default = false
240
241 /// Location scanned to load tables for `default` schema
242 pub location: Option<String>, default = None
243
244 /// Type of `TableProvider` to use when loading `default` schema
245 pub format: Option<String>, default = None
246
247 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
248 /// if not specified explicitly in the statement.
249 pub has_header: bool, default = true
250
251 /// Specifies whether newlines in (quoted) CSV values are supported.
252 ///
253 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
254 /// if not specified explicitly in the statement.
255 ///
256 /// Parsing newlines in quoted values may be affected by execution behaviour such as
257 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
258 /// parsed successfully, which may reduce performance.
259 pub newlines_in_values: bool, default = false
260 }
261}
262
263config_namespace! {
264 /// Options related to SQL parser
265 ///
266 /// See also: [`SessionConfig`]
267 ///
268 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
269 pub struct SqlParserOptions {
270 /// When set to true, SQL parser will parse float as decimal type
271 pub parse_float_as_decimal: bool, default = false
272
273 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
274 pub enable_ident_normalization: bool, default = true
275
276 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
277 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
278 /// are normalized automatically.
279 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
280
281 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
282 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
283 pub dialect: Dialect, default = Dialect::Generic
284 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
285
286 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
287 /// ignore the length. If false, error if a `VARCHAR` with a length is
288 /// specified. The Arrow type system does not have a notion of maximum
289 /// string length and thus DataFusion can not enforce such limits.
290 pub support_varchar_with_length: bool, default = true
291
292 /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
293 /// If false, they are mapped to `Utf8`.
294 /// Default is true.
295 pub map_string_types_to_utf8view: bool, default = true
296
297 /// When set to true, the source locations relative to the original SQL
298 /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
299 /// and recorded in the logical plan nodes.
300 pub collect_spans: bool, default = false
301
302 /// Specifies the recursion depth limit when parsing complex SQL Queries
303 pub recursion_limit: usize, default = 50
304
305 /// Specifies the default null ordering for query results. There are 4 options:
306 /// - `nulls_max`: Nulls appear last in ascending order.
307 /// - `nulls_min`: Nulls appear first in ascending order.
308 /// - `nulls_first`: Nulls always be first in any order.
309 /// - `nulls_last`: Nulls always be last in any order.
310 ///
311 /// By default, `nulls_max` is used to follow Postgres's behavior.
312 /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
313 pub default_null_ordering: String, default = "nulls_max".to_string()
314
315 /// When set to true, DataFusion may remove `ORDER BY` clauses from
316 /// subqueries or CTEs during SQL planning when their ordering cannot
317 /// affect the result, such as when no `LIMIT` or other
318 /// order-sensitive operator depends on them.
319 ///
320 /// Disable this option to preserve explicit subquery ordering in the
321 /// planned query.
322 pub enable_subquery_sort_elimination: bool, default = true
323 }
324}
325
326/// This is the SQL dialect used by DataFusion's parser.
327/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
328/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
329#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
330pub enum Dialect {
331 #[default]
332 Generic,
333 MySQL,
334 PostgreSQL,
335 Hive,
336 SQLite,
337 Snowflake,
338 Redshift,
339 MsSQL,
340 ClickHouse,
341 BigQuery,
342 Ansi,
343 DuckDB,
344 Databricks,
345}
346
347impl AsRef<str> for Dialect {
348 fn as_ref(&self) -> &str {
349 match self {
350 Self::Generic => "generic",
351 Self::MySQL => "mysql",
352 Self::PostgreSQL => "postgresql",
353 Self::Hive => "hive",
354 Self::SQLite => "sqlite",
355 Self::Snowflake => "snowflake",
356 Self::Redshift => "redshift",
357 Self::MsSQL => "mssql",
358 Self::ClickHouse => "clickhouse",
359 Self::BigQuery => "bigquery",
360 Self::Ansi => "ansi",
361 Self::DuckDB => "duckdb",
362 Self::Databricks => "databricks",
363 }
364 }
365}
366
367impl FromStr for Dialect {
368 type Err = DataFusionError;
369
370 fn from_str(s: &str) -> Result<Self, Self::Err> {
371 let value = match s.to_ascii_lowercase().as_str() {
372 "generic" => Self::Generic,
373 "mysql" => Self::MySQL,
374 "postgresql" | "postgres" => Self::PostgreSQL,
375 "hive" => Self::Hive,
376 "sqlite" => Self::SQLite,
377 "snowflake" => Self::Snowflake,
378 "redshift" => Self::Redshift,
379 "mssql" => Self::MsSQL,
380 "clickhouse" => Self::ClickHouse,
381 "bigquery" => Self::BigQuery,
382 "ansi" => Self::Ansi,
383 "duckdb" => Self::DuckDB,
384 "databricks" => Self::Databricks,
385 other => {
386 let error_message = format!(
387 "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
388 );
389 return Err(DataFusionError::Configuration(error_message));
390 }
391 };
392 Ok(value)
393 }
394}
395
396impl ConfigField for Dialect {
397 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
398 v.some(key, self, description)
399 }
400
401 fn set(&mut self, _: &str, value: &str) -> Result<()> {
402 *self = Self::from_str(value)?;
403 Ok(())
404 }
405}
406
407impl Display for Dialect {
408 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409 let str = self.as_ref();
410 write!(f, "{str}")
411 }
412}
413
414#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
415pub enum SpillCompression {
416 Zstd,
417 Lz4Frame,
418 #[default]
419 Uncompressed,
420}
421
422impl FromStr for SpillCompression {
423 type Err = DataFusionError;
424
425 fn from_str(s: &str) -> Result<Self, Self::Err> {
426 match s.to_ascii_lowercase().as_str() {
427 "zstd" => Ok(Self::Zstd),
428 "lz4_frame" => Ok(Self::Lz4Frame),
429 "uncompressed" | "" => Ok(Self::Uncompressed),
430 other => Err(DataFusionError::Configuration(format!(
431 "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
432 ))),
433 }
434 }
435}
436
437impl ConfigField for SpillCompression {
438 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
439 v.some(key, self, description)
440 }
441
442 fn set(&mut self, _: &str, value: &str) -> Result<()> {
443 *self = SpillCompression::from_str(value)?;
444 Ok(())
445 }
446}
447
448impl Display for SpillCompression {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 let str = match self {
451 Self::Zstd => "zstd",
452 Self::Lz4Frame => "lz4_frame",
453 Self::Uncompressed => "uncompressed",
454 };
455 write!(f, "{str}")
456 }
457}
458
459impl From<SpillCompression> for Option<CompressionType> {
460 fn from(c: SpillCompression) -> Self {
461 match c {
462 SpillCompression::Zstd => Some(CompressionType::ZSTD),
463 SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
464 SpillCompression::Uncompressed => None,
465 }
466 }
467}
468
469config_namespace! {
470 /// Options related to query execution
471 ///
472 /// See also: [`SessionConfig`]
473 ///
474 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
475 pub struct ExecutionOptions {
476 /// Default batch size while creating new batches, it's especially useful for
477 /// buffer-in-memory batches since creating tiny batches would result in too much
478 /// metadata memory consumption
479 pub batch_size: usize, default = 8192
480
481 /// A perfect hash join (see `HashJoinExec` for more details) will be considered
482 /// if the range of keys (max - min) on the build side is < this threshold.
483 /// This provides a fast path for joins with very small key ranges,
484 /// bypassing the density check.
485 ///
486 /// Currently only supports cases where build_side.num_rows() < u32::MAX.
487 /// Support for build_side.num_rows() >= u32::MAX will be added in the future.
488 pub perfect_hash_join_small_build_threshold: usize, default = 1024
489
490 /// The minimum required density of join keys on the build side to consider a
491 /// perfect hash join (see `HashJoinExec` for more details). Density is calculated as:
492 /// `(number of rows) / (max_key - min_key + 1)`.
493 /// A perfect hash join may be used if the actual key density > this
494 /// value.
495 ///
496 /// Currently only supports cases where build_side.num_rows() < u32::MAX.
497 /// Support for build_side.num_rows() >= u32::MAX will be added in the future.
498 pub perfect_hash_join_min_key_density: f64, default = 0.15
499
500 /// When set to true, record batches will be examined between each operator and
501 /// small batches will be coalesced into larger batches. This is helpful when there
502 /// are highly selective filters or joins that could produce tiny output batches. The
503 /// target batch size is determined by the configuration setting
504 pub coalesce_batches: bool, default = true
505
506 /// Should DataFusion collect statistics when first creating a table.
507 /// Has no effect after the table is created. Applies to the default
508 /// `ListingTableProvider` in DataFusion. Defaults to true.
509 pub collect_statistics: bool, default = true
510
511 /// Number of partitions for query execution. Increasing partitions can increase
512 /// concurrency.
513 ///
514 /// Defaults to the number of CPU cores on the system
515 pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
516
517 /// The default time zone
518 ///
519 /// Some functions, e.g. `now` return timestamps in this time zone
520 pub time_zone: Option<String>, default = None
521
522 /// Parquet options
523 pub parquet: ParquetOptions, default = Default::default()
524
525 /// Fan-out during initial physical planning.
526 ///
527 /// This is mostly use to plan `UNION` children in parallel.
528 ///
529 /// Defaults to the number of CPU cores on the system
530 pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
531
532 /// When set to true, skips verifying that the schema produced by
533 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
534 /// schema of the input plan.
535 ///
536 /// When set to false, if the schema does not match exactly
537 /// (including nullability and metadata), a planning error will be raised.
538 ///
539 /// This is used to workaround bugs in the planner that are now caught by
540 /// the new schema verification step.
541 pub skip_physical_aggregate_schema_check: bool, default = false
542
543 /// Sets the compression codec used when spilling data to disk.
544 ///
545 /// Since datafusion writes spill files using the Arrow IPC Stream format,
546 /// only codecs supported by the Arrow IPC Stream Writer are allowed.
547 /// Valid values are: uncompressed, lz4_frame, zstd.
548 /// Note: lz4_frame offers faster (de)compression, but typically results in
549 /// larger spill files. In contrast, zstd achieves
550 /// higher compression ratios at the cost of slower (de)compression speed.
551 pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
552
553 /// Specifies the reserved memory for each spillable sort operation to
554 /// facilitate an in-memory merge.
555 ///
556 /// When a sort operation spills to disk, the in-memory data must be
557 /// sorted and merged before being written to a file. This setting reserves
558 /// a specific amount of memory for that in-memory sort/merge process.
559 ///
560 /// Note: This setting is irrelevant if the sort operation cannot spill
561 /// (i.e., if there's no `DiskManager` configured).
562 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
563
564 /// When sorting, below what size should data be concatenated
565 /// and sorted in a single RecordBatch rather than sorted in
566 /// batches and merged.
567 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
568
569 /// Maximum buffer capacity (in bytes) per partition for BufferExec
570 /// inserted during sort pushdown optimization.
571 ///
572 /// When PushdownSort eliminates a SortExec under SortPreservingMergeExec,
573 /// a BufferExec is inserted to replace SortExec's buffering role. This
574 /// prevents I/O stalls by allowing the scan to run ahead of the merge.
575 ///
576 /// This uses strictly less memory than the SortExec it replaces (which
577 /// buffers the entire partition). The buffer respects the global memory
578 /// pool limit. Setting this to a large value is safe — actual memory
579 /// usage is bounded by partition size and global memory limits.
580 pub sort_pushdown_buffer_capacity: usize, default = 1024 * 1024 * 1024
581
582 /// Maximum size in bytes for individual spill files before rotating to a new file.
583 ///
584 /// When operators spill data to disk (e.g., RepartitionExec), they write
585 /// multiple batches to the same file until this size limit is reached, then rotate
586 /// to a new file. This reduces syscall overhead compared to one-file-per-batch
587 /// while preventing files from growing too large.
588 ///
589 /// A larger value reduces file creation overhead but may hold more disk space.
590 /// A smaller value creates more files but allows finer-grained space reclamation
591 /// as files can be deleted once fully consumed.
592 ///
593 /// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
594 /// may create spill files larger than the limit.
595 ///
596 /// Default: 128 MB
597 pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024
598
599 /// Number of files to read in parallel when inferring schema and statistics
600 pub meta_fetch_concurrency: usize, default = 32
601
602 /// Guarantees a minimum level of output files running in parallel.
603 /// RecordBatches will be distributed in round robin fashion to each
604 /// parallel writer. Each writer is closed and a new file opened once
605 /// soft_max_rows_per_output_file is reached.
606 pub minimum_parallel_output_files: usize, default = 4
607
608 /// Target number of rows in output files when writing multiple.
609 /// This is a soft max, so it can be exceeded slightly. There also
610 /// will be one file smaller than the limit if the total
611 /// number of rows written is not roughly divisible by the soft max
612 pub soft_max_rows_per_output_file: usize, default = 50000000
613
614 /// This is the maximum number of RecordBatches buffered
615 /// for each output file being worked. Higher values can potentially
616 /// give faster write performance at the cost of higher peak
617 /// memory consumption
618 pub max_buffered_batches_per_output_file: usize, default = 2
619
620 /// Should sub directories be ignored when scanning directories for data
621 /// files. Defaults to true (ignores subdirectories), consistent with
622 /// Hive. Note that this setting does not affect reading partitioned
623 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
624 pub listing_table_ignore_subdirectory: bool, default = true
625
626 /// Should a `ListingTable` created through the `ListingTableFactory` infer table
627 /// partitions from Hive compliant directories. Defaults to true (partition columns are
628 /// inferred and will be represented in the table schema).
629 pub listing_table_factory_infer_partitions: bool, default = true
630
631 /// Should DataFusion support recursive CTEs
632 pub enable_recursive_ctes: bool, default = true
633
634 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
635 /// statistics into the same file groups.
636 /// Currently experimental
637 pub split_file_groups_by_statistics: bool, default = false
638
639 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
640 pub keep_partition_by_columns: bool, default = false
641
642 /// Aggregation ratio (number of distinct groups / number of input rows)
643 /// threshold for skipping partial aggregation. If the value is greater
644 /// then partial aggregation will skip aggregation for further input
645 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
646
647 /// Number of input rows partial aggregation partition should process, before
648 /// aggregation ratio check and trying to switch to skipping aggregation mode
649 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
650
651 /// Should DataFusion use row number estimates at the input to decide
652 /// whether increasing parallelism is beneficial or not. By default,
653 /// only exact row numbers (not estimates) are used for this decision.
654 /// Setting this flag to `true` will likely produce better plans.
655 /// if the source of statistics is accurate.
656 /// We plan to make this the default in the future.
657 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
658
659 /// Should DataFusion enforce batch size in joins or not. By default,
660 /// DataFusion will not enforce batch size in joins. Enforcing batch size
661 /// in joins can reduce memory usage when joining large
662 /// tables with a highly-selective join filter, but is also slightly slower.
663 pub enforce_batch_size_in_joins: bool, default = false
664
665 /// Size (bytes) of data buffer DataFusion uses when writing output files.
666 /// This affects the size of the data chunks that are uploaded to remote
667 /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
668 /// written, it may be necessary to increase this size to avoid errors from
669 /// the remote end point.
670 pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
671
672 /// Whether to enable ANSI SQL mode.
673 ///
674 /// The flag is experimental and relevant only for DataFusion Spark built-in functions
675 ///
676 /// When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL
677 /// semantics for expressions, casting, and error handling. This means:
678 /// - **Strict type coercion rules:** implicit casts between incompatible types are disallowed.
679 /// - **Standard SQL arithmetic behavior:** operations such as division by zero,
680 /// numeric overflow, or invalid casts raise runtime errors rather than returning
681 /// `NULL` or adjusted values.
682 /// - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling.
683 ///
684 /// When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive,
685 /// non-ANSI mode designed for user convenience and backward compatibility. In this mode:
686 /// - Implicit casts between types are allowed (e.g., string to integer when possible).
687 /// - Arithmetic operations are more lenient — for example, `abs()` on the minimum
688 /// representable integer value returns the input value instead of raising overflow.
689 /// - Division by zero or invalid casts may return `NULL` instead of failing.
690 ///
691 /// # Default
692 /// `false` — ANSI SQL mode is disabled by default.
693 pub enable_ansi_mode: bool, default = false
694
695 /// How many bytes to buffer in the probe side of hash joins while the build side is
696 /// concurrently being built.
697 ///
698 /// Without this, hash joins will wait until the full materialization of the build side
699 /// before polling the probe side. This is useful in scenarios where the query is not
700 /// completely CPU bounded, allowing to do some early work concurrently and reducing the
701 /// latency of the query.
702 ///
703 /// Note that when hash join buffering is enabled, the probe side will start eagerly
704 /// polling data, not giving time for the producer side of dynamic filters to produce any
705 /// meaningful predicate. Queries with dynamic filters might see performance degradation.
706 ///
707 /// Disabled by default, set to a number greater than 0 for enabling it.
708 pub hash_join_buffering_capacity: usize, default = 0
709 }
710}
711
712config_namespace! {
713 /// Options for content-defined chunking (CDC) when writing parquet files.
714 /// Mirrors `parquet::file::properties::CdcOptions`.
715 ///
716 /// Carried as a [`ParquetCdcOptions`] in [`ParquetOptions::content_defined_chunking`]
717 /// with an explicit `enabled` flag, so it can be toggled with dotted config
718 /// keys (`content_defined_chunking.enabled = true|false`) and the result is
719 /// independent of the order in which the keys are set.
720 pub struct ParquetCdcOptions {
721 /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
722 /// parquet files. When enabled, parallel writing is automatically disabled
723 /// since the chunker state must persist across row groups.
724 pub enabled: bool, default = false
725
726 /// Minimum chunk size in bytes. The rolling hash will not trigger a split
727 /// until this many bytes have been accumulated. Default is 256 KiB.
728 pub min_chunk_size: usize, default = 256 * 1024
729
730 /// Maximum chunk size in bytes. A split is forced when the accumulated
731 /// size exceeds this value. Default is 1 MiB.
732 pub max_chunk_size: usize, default = 1024 * 1024
733
734 /// Normalization level. Increasing this improves deduplication ratio
735 /// but increases fragmentation. Recommended range is [-3, 3], default is 0.
736 pub norm_level: i32, default = 0
737 }
738}
739
740impl ParquetCdcOptions {
741 /// Returns enabled CDC options with the default chunking parameters.
742 ///
743 /// Shorthand for `ParquetCdcOptions { enabled: true, ..Default::default() }`;
744 /// combine with struct-update syntax to override parameters, e.g.
745 /// `ParquetCdcOptions { min_chunk_size: 4096, ..ParquetCdcOptions::enabled() }`.
746 pub fn enabled() -> Self {
747 Self {
748 enabled: true,
749 ..Default::default()
750 }
751 }
752
753 /// Returns disabled CDC options (equivalent to [`ParquetCdcOptions::default`]).
754 pub fn disabled() -> Self {
755 Self::default()
756 }
757}
758
759config_namespace! {
760 /// Options for reading and writing parquet files
761 ///
762 /// See also: [`SessionConfig`]
763 ///
764 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
765 pub struct ParquetOptions {
766 // The following options affect reading parquet files
767
768 /// (reading) If true, reads the Parquet data page level metadata (the
769 /// Page Index), if present, to reduce the I/O and number of
770 /// rows decoded.
771 pub enable_page_index: bool, default = true
772
773 /// (reading) If true, the parquet reader attempts to skip entire row groups based
774 /// on the predicate in the query and the metadata (min/max values) stored in
775 /// the parquet file
776 pub pruning: bool, default = true
777
778 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
779 /// the file Schema. This setting can help avoid schema conflicts when querying
780 /// multiple parquet files with schemas containing compatible types but different metadata
781 pub skip_metadata: bool, default = true
782
783 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
784 /// bytes of the parquet file optimistically. If not specified, two reads are required:
785 /// One read to fetch the 8-byte parquet footer and
786 /// another to fetch the metadata length encoded in the footer
787 /// Default setting to 512 KiB, which should be sufficient for most parquet files,
788 /// it can reduce one I/O operation per parquet file. If the metadata is larger than
789 /// the hint, two reads will still be performed.
790 pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
791
792 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
793 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
794 pub pushdown_filters: bool, default = false
795
796 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
797 /// will be reordered heuristically to minimize the cost of evaluation. If false,
798 /// the filters are applied in the same order as written in the query
799 pub reorder_filters: bool, default = false
800
801 /// (reading) Force the use of RowSelections for filter results, when
802 /// pushdown_filters is enabled. If false, the reader will automatically
803 /// choose between a RowSelection and a Bitmap based on the number and
804 /// pattern of selected rows.
805 pub force_filter_selections: bool, default = false
806
807 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
808 /// and `Binary/BinaryLarge` with `BinaryView`.
809 pub schema_force_view_types: bool, default = true
810
811 /// (reading) If true, parquet reader will read columns of
812 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
813 ///
814 /// Parquet files generated by some legacy writers do not correctly set
815 /// the UTF8 flag for strings, causing string columns to be loaded as
816 /// BLOB instead.
817 pub binary_as_string: bool, default = false
818
819 /// (reading) If true, parquet reader will read columns of
820 /// physical type int96 as originating from a different resolution
821 /// than nanosecond. This is useful for reading data from systems like Spark
822 /// which stores microsecond resolution timestamps in an int96 allowing it
823 /// to write values with a larger date range than 64-bit timestamps with
824 /// nanosecond resolution.
825 pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
826
827 /// (reading) Optional timezone applied to INT96 columns when `coerce_int96`
828 /// is set. When `Some`, INT96 columns coerce to
829 /// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
830 /// `Timestamp(<coerce_int96>, None)`. Spark and other systems write INT96
831 /// values as UTC-adjusted instants, so callers that need the resulting
832 /// Arrow type to be timezone-aware (e.g. for Spark `TimestampType`
833 /// semantics) should set this to `"UTC"`. No effect when `coerce_int96`
834 /// is `None`.
835 pub coerce_int96_tz: Option<String>, default = None
836
837 /// (reading) Use any available bloom filters when reading parquet files
838 pub bloom_filter_on_read: bool, default = true
839
840 /// (reading) The maximum predicate cache size, in bytes. When
841 /// `pushdown_filters` is enabled, sets the maximum memory used to cache
842 /// the results of predicate evaluation between filter evaluation and
843 /// output generation. Decreasing this value will reduce memory usage,
844 /// but may increase IO and CPU usage. None means use the default
845 /// parquet reader setting. 0 means no caching.
846 pub max_predicate_cache_size: Option<usize>, default = None
847
848 // The following options affect writing to parquet files
849 // and map to parquet::file::properties::WriterProperties
850
851 /// (writing) Sets best effort maximum size of data page in bytes
852 pub data_pagesize_limit: usize, default = 1024 * 1024
853
854 /// (writing) Sets write_batch_size in rows
855 pub write_batch_size: usize, default = 1024
856
857 /// (writing) Sets parquet writer version
858 /// valid values are "1.0" and "2.0"
859 pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()
860
861 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
862 ///
863 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
864 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
865 pub skip_arrow_metadata: bool, default = false
866
867 /// (writing) Sets default parquet compression codec.
868 /// Valid values are: uncompressed, snappy, gzip(level),
869 /// brotli(level), lz4, zstd(level), and lz4_raw.
870 /// These values are not case sensitive. If NULL, uses
871 /// default parquet writer setting
872 ///
873 /// Note that this default setting is not the same as
874 /// the default parquet writer setting.
875 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
876
877 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
878 /// default parquet writer setting
879 pub dictionary_enabled: Option<bool>, default = Some(true)
880
881 /// (writing) Sets best effort maximum dictionary page size, in bytes
882 pub dictionary_page_size_limit: usize, default = 1024 * 1024
883
884 /// (writing) Sets if statistics are enabled for any column
885 /// Valid values are: "none", "chunk", and "page"
886 /// These values are not case sensitive. If NULL, uses
887 /// default parquet writer setting
888 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
889
890 /// (writing) Target maximum number of rows in each row group (defaults to 1M
891 /// rows). Writing larger row groups requires more memory to write, but
892 /// can get better compression and be faster to read.
893 pub max_row_group_size: usize, default = 1024 * 1024
894
895 /// (writing) Sets "created by" property
896 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
897
898 /// (writing) Sets column index truncate length
899 pub column_index_truncate_length: Option<usize>, default = Some(64)
900
901 /// (writing) Sets statistics truncate length. If NULL, uses
902 /// default parquet writer setting
903 pub statistics_truncate_length: Option<usize>, default = Some(64)
904
905 /// (writing) Sets best effort maximum number of rows in data page
906 pub data_page_row_count_limit: usize, default = 20_000
907
908 /// (writing) Sets default encoding for any column.
909 /// Valid values are: plain, plain_dictionary, rle,
910 /// bit_packed, delta_binary_packed, delta_length_byte_array,
911 /// delta_byte_array, rle_dictionary, and byte_stream_split.
912 /// These values are not case sensitive. If NULL, uses
913 /// default parquet writer setting
914 pub encoding: Option<String>, transform = str::to_lowercase, default = None
915
916 /// (writing) Write bloom filters for all columns when creating parquet files
917 pub bloom_filter_on_write: bool, default = false
918
919 /// (writing) Sets bloom filter false positive probability. If NULL, uses
920 /// default parquet writer setting
921 pub bloom_filter_fpp: Option<f64>, default = None
922
923 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
924 /// default parquet writer setting
925 pub bloom_filter_ndv: Option<u64>, default = None
926
927 /// (writing) Controls whether DataFusion will attempt to speed up writing
928 /// parquet files by serializing them in parallel. Each column
929 /// in each row group in each output file are serialized in parallel
930 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
931 pub allow_single_file_parallelism: bool, default = true
932
933 /// (writing) By default parallel parquet writer is tuned for minimum
934 /// memory usage in a streaming execution plan. You may see
935 /// a performance benefit when writing large parquet files
936 /// by increasing maximum_parallel_row_group_writers and
937 /// maximum_buffered_record_batches_per_stream if your system
938 /// has idle cores and can tolerate additional memory usage.
939 /// Boosting these values is likely worthwhile when
940 /// writing out already in-memory data, such as from a cached
941 /// data frame.
942 pub maximum_parallel_row_group_writers: usize, default = 1
943
944 /// (writing) By default parallel parquet writer is tuned for minimum
945 /// memory usage in a streaming execution plan. You may see
946 /// a performance benefit when writing large parquet files
947 /// by increasing maximum_parallel_row_group_writers and
948 /// maximum_buffered_record_batches_per_stream if your system
949 /// has idle cores and can tolerate additional memory usage.
950 /// Boosting these values is likely worthwhile when
951 /// writing out already in-memory data, such as from a cached
952 /// data frame.
953 pub maximum_buffered_record_batches_per_stream: usize, default = 2
954
955 /// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing
956 /// parquet files. Disabled by default; toggle with
957 /// `content_defined_chunking.enabled = true|false`. The chunking parameters live
958 /// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When
959 /// enabled, parallel writing is automatically disabled since the chunker state
960 /// must persist across row groups. Mirrors
961 /// `parquet::file::properties::WriterProperties::content_defined_chunking`.
962 pub content_defined_chunking: ParquetCdcOptions, default = Default::default()
963 }
964}
965
966config_namespace! {
967 /// Options for configuring Parquet Modular Encryption
968 ///
969 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
970 pub struct ParquetEncryptionOptions {
971 /// Optional file decryption properties
972 pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
973
974 /// Optional file encryption properties
975 pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
976
977 /// Identifier for the encryption factory to use to create file encryption and decryption properties.
978 /// Encryption factories can be registered in the runtime environment with
979 /// `RuntimeEnv::register_parquet_encryption_factory`.
980 pub factory_id: Option<String>, default = None
981
982 /// Any encryption factory specific options
983 pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
984 }
985}
986
987impl ParquetEncryptionOptions {
988 /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
989 pub fn configure_factory(
990 &mut self,
991 factory_id: &str,
992 config: &impl ExtensionOptions,
993 ) {
994 self.factory_id = Some(factory_id.to_owned());
995 self.factory_options.options.clear();
996 for entry in config.entries() {
997 if let Some(value) = entry.value {
998 self.factory_options.options.insert(entry.key, value);
999 }
1000 }
1001 }
1002}
1003
1004config_namespace! {
1005 /// Options related to query optimization
1006 ///
1007 /// See also: [`SessionConfig`]
1008 ///
1009 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
1010 pub struct OptimizerOptions {
1011 /// When set to true, the optimizer will push a limit operation into
1012 /// grouped aggregations which have no aggregate expressions, as a soft limit,
1013 /// emitting groups once the limit is reached, before all rows in the group are read.
1014 pub enable_distinct_aggregation_soft_limit: bool, default = true
1015
1016 /// When set to true, the physical plan optimizer will try to add round robin
1017 /// repartitioning to increase parallelism to leverage more CPU cores
1018 pub enable_round_robin_repartition: bool, default = true
1019
1020 /// When set to true, the optimizer will attempt to perform limit operations
1021 /// during aggregations, if possible
1022 pub enable_topk_aggregation: bool, default = true
1023
1024 /// When set to true, the optimizer will attempt to push limit operations
1025 /// past window functions, if possible
1026 pub enable_window_limits: bool, default = true
1027
1028 /// When set to true, the optimizer will replace
1029 /// Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a
1030 /// PartitionedTopKExec that maintains per-partition heaps, avoiding
1031 /// a full sort of the input.
1032 /// When the window partition key has low cardinality, enabling this optimization
1033 /// can improve performance. However, for high cardinality keys, it may
1034 /// cause regressions in both memory usage and runtime.
1035 pub enable_window_topn: bool, default = false
1036
1037 /// When set to true, the optimizer will push TopK (Sort with fetch)
1038 /// below hash repartition when the partition key is a prefix of the
1039 /// sort key, reducing data volume before the shuffle.
1040 pub enable_topk_repartition: bool, default = true
1041
1042 /// When set to true, the optimizer will attempt to push down TopK dynamic filters
1043 /// into the file scan phase.
1044 pub enable_topk_dynamic_filter_pushdown: bool, default = true
1045
1046 /// When set to true, uncorrelated scalar subqueries are
1047 /// left in the logical plan and executed by `ScalarSubqueryExec` during
1048 /// physical execution. When set to false, all scalar subqueries
1049 /// (including uncorrelated ones) are rewritten to left joins by the
1050 /// `ScalarSubqueryToJoin` optimizer rule.
1051 ///
1052 /// Note disabling this option is not recommended. It restores
1053 /// pre <https://github.com/apache/datafusion/pull/21240>
1054 /// behavior, which silently produces incorrect results for
1055 /// multi-row subqueries and does not support scalar subqueries in
1056 /// ORDER BY / JOIN ON / aggregate-function arguments. This option is
1057 /// intended as a temporary escape hatch for distributed execution
1058 /// frameworks and is planned to be removed in a future DataFusion
1059 /// release.
1060 pub enable_physical_uncorrelated_scalar_subquery: bool, default = true
1061
1062 /// When set to true, the optimizer will attempt to push down Join dynamic filters
1063 /// into the file scan phase.
1064 pub enable_join_dynamic_filter_pushdown: bool, default = true
1065
1066 /// When set to true, the optimizer will attempt to push down Aggregate dynamic filters
1067 /// into the file scan phase.
1068 pub enable_aggregate_dynamic_filter_pushdown: bool, default = true
1069
1070 /// When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase.
1071 /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
1072 /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
1073 /// This means that if we already have 10 timestamps in the year 2025
1074 /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
1075 /// The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown`
1076 /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
1077 pub enable_dynamic_filter_pushdown: bool, default = true
1078
1079 /// When set to true, the optimizer will insert filters before a join between
1080 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
1081 /// filter can add additional overhead when the file format does not fully support
1082 /// predicate push down.
1083 pub filter_null_join_keys: bool, default = false
1084
1085 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
1086 /// in parallel using the provided `target_partitions` level
1087 pub repartition_aggregations: bool, default = true
1088
1089 /// Minimum total files size in bytes to perform file scan repartitioning.
1090 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
1091
1092 /// Should DataFusion repartition data using the join keys to execute joins in parallel
1093 /// using the provided `target_partitions` level
1094 pub repartition_joins: bool, default = true
1095
1096 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
1097 /// its inputs do not have any ordering or filtering If the flag is not enabled,
1098 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
1099 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
1100 /// RightAnti, and RightSemi - being produced only at the end of the execution.
1101 /// This is not typical in stream processing. Additionally, without proper design for
1102 /// long runner execution, all types of joins may encounter out-of-memory errors.
1103 pub allow_symmetric_joins_without_pruning: bool, default = true
1104
1105 /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
1106 /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
1107 ///
1108 /// For FileSources, only Parquet and CSV formats are currently supported.
1109 ///
1110 /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
1111 /// might be partitioned into smaller chunks) for parallel scanning.
1112 /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
1113 /// happen within a single file.
1114 ///
1115 /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
1116 /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
1117 /// the total number of partitions and batches per partition, but does not slice the initial
1118 /// record tables provided to the MemTable on creation.
1119 pub repartition_file_scans: bool, default = true
1120
1121 /// Minimum number of distinct partition values required to group files by their
1122 /// Hive partition column values (enabling Hash partitioning declaration).
1123 ///
1124 /// How the option is used:
1125 /// - preserve_file_partitions=0: Disable it.
1126 /// - preserve_file_partitions=1: Always enable it.
1127 /// - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N.
1128 /// This threshold preserves I/O parallelism when file partitioning is below it.
1129 ///
1130 /// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
1131 /// partitions is less than the target_partitions.
1132 pub preserve_file_partitions: usize, default = 0
1133
1134 /// Should DataFusion repartition data using the partitions keys to execute window
1135 /// functions in parallel using the provided `target_partitions` level
1136 pub repartition_windows: bool, default = true
1137
1138 /// Should DataFusion execute sorts in a per-partition fashion and merge
1139 /// afterwards instead of coalescing first and sorting globally.
1140 /// With this flag is enabled, plans in the form below
1141 ///
1142 /// ```text
1143 /// "SortExec: [a@0 ASC]",
1144 /// " CoalescePartitionsExec",
1145 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1146 /// ```
1147 /// would turn into the plan below which performs better in multithreaded environments
1148 ///
1149 /// ```text
1150 /// "SortPreservingMergeExec: [a@0 ASC]",
1151 /// " SortExec: [a@0 ASC]",
1152 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1153 /// ```
1154 pub repartition_sorts: bool, default = true
1155
1156 /// Partition count threshold for subset satisfaction optimization.
1157 ///
1158 /// When the current partition count is >= this threshold, DataFusion will
1159 /// skip repartitioning if the required partitioning expression is a subset
1160 /// of the current partition expression such as Hash(a) satisfies Hash(a, b).
1161 ///
1162 /// When the current partition count is < this threshold, DataFusion will
1163 /// repartition to increase parallelism even when subset satisfaction applies.
1164 ///
1165 /// Set to 0 to always repartition (disable subset satisfaction optimization).
1166 /// Set to a high value to always use subset satisfaction.
1167 ///
1168 /// Example (subset_repartition_threshold = 4):
1169 /// ```text
1170 /// Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a])
1171 ///
1172 /// If current partitions (3) < threshold (4), repartition:
1173 /// AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
1174 /// RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3
1175 /// AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
1176 /// DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3)
1177 ///
1178 /// If current partitions (8) >= threshold (4), use subset satisfaction:
1179 /// AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)]
1180 /// DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8)
1181 /// ```
1182 pub subset_repartition_threshold: usize, default = 4
1183
1184 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
1185 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
1186 /// using `SortPreservingMergeExec`)
1187 ///
1188 /// When false, DataFusion will maximize plan parallelism using
1189 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
1190 pub prefer_existing_sort: bool, default = false
1191
1192 /// When set to true, the logical plan optimizer will produce warning
1193 /// messages if any optimization rules produce errors and then proceed to the next
1194 /// rule. When set to false, any rules that produce errors will cause the query to fail
1195 pub skip_failed_rules: bool, default = false
1196
1197 /// Number of times that the optimizer will attempt to optimize the plan
1198 pub max_passes: usize, default = 3
1199
1200 /// When set to true, the physical plan optimizer will run a top down
1201 /// process to reorder the join keys
1202 pub top_down_join_key_reordering: bool, default = true
1203
1204 /// When set to true, the physical plan optimizer may swap join inputs
1205 /// based on statistics. When set to false, statistics-driven join
1206 /// input reordering is disabled and the original join order in the
1207 /// query is used.
1208 pub join_reordering: bool, default = true
1209
1210 /// When set to true, the physical plan optimizer uses the pluggable
1211 /// `StatisticsRegistry` for statistics propagation across operators.
1212 /// This enables more accurate cardinality estimates compared to each
1213 /// operator's built-in `partition_statistics`.
1214 pub use_statistics_registry: bool, default = false
1215
1216 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
1217 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
1218 pub prefer_hash_join: bool, default = true
1219
1220 /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
1221 /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
1222 /// one range filter.
1223 pub enable_piecewise_merge_join: bool, default = false
1224
1225 /// The maximum estimated size in bytes for one input side of a HashJoin
1226 /// will be collected into a single partition
1227 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
1228
1229 /// The maximum estimated size in rows for one input side of a HashJoin
1230 /// will be collected into a single partition
1231 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
1232
1233 /// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1234 /// Build sides larger than this will use hash table lookups instead.
1235 /// Set to 0 to always use hash table lookups.
1236 ///
1237 /// InList pushdown can be more efficient for small build sides because it can result in better
1238 /// statistics pruning as well as use any bloom filters present on the scan side.
1239 /// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
1240 /// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
1241 ///
1242 /// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
1243 ///
1244 /// The default is 128kB per partition.
1245 /// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
1246 /// but avoids excessive memory usage or overhead for larger joins.
1247 pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
1248
1249 /// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1250 /// Build sides with more rows than this will use hash table lookups instead.
1251 /// Set to 0 to always use hash table lookups.
1252 ///
1253 /// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
1254 /// very large IN lists that might not provide much benefit over hash table lookups.
1255 ///
1256 /// This uses the deduplicated row count once the build side has been evaluated.
1257 ///
1258 /// The default is 150 values per partition.
1259 /// This is inspired by Trino's `max-filter-keys-per-column` setting.
1260 /// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1261 pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1262
1263 /// The default filter selectivity used by Filter Statistics
1264 /// when an exact selectivity cannot be determined. Valid values are
1265 /// between 0 (no selectivity) and 100 (all rows are selected).
1266 pub default_filter_selectivity: u8, default = 20
1267
1268 /// When set to true, the optimizer will not attempt to convert Union to Interleave
1269 pub prefer_existing_union: bool, default = false
1270
1271 /// When set to true, if the returned type is a view type
1272 /// then the output will be coerced to a non-view.
1273 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
1274 pub expand_views_at_output: bool, default = false
1275
1276 /// Enable sort pushdown optimization.
1277 /// When enabled, attempts to push sort requirements down to data sources
1278 /// that can natively handle them (e.g., by reversing file/row group read order).
1279 ///
1280 /// Returns **inexact ordering**: Sort operator is kept for correctness,
1281 /// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N),
1282 /// providing significant speedup.
1283 ///
1284 /// Memory: No additional overhead (only changes read order).
1285 ///
1286 /// Future: Will add option to detect perfectly sorted data and eliminate Sort completely.
1287 ///
1288 /// Default: true
1289 pub enable_sort_pushdown: bool, default = true
1290
1291 /// When set to true, the optimizer will extract leaf expressions
1292 /// (such as `get_field`) from filter/sort/join nodes into projections
1293 /// closer to the leaf table scans, and push those projections down
1294 /// towards the leaf nodes.
1295 pub enable_leaf_expression_pushdown: bool, default = true
1296
1297 /// When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that
1298 /// read from the same source and differ only by filter predicates into a single branch
1299 /// with a combined filter. This optimization is conservative and only applies when the
1300 /// branches share the same source and compatible wrapper nodes such as identical
1301 /// projections or aliases.
1302 pub enable_unions_to_filter: bool, default = false
1303 }
1304}
1305
1306config_namespace! {
1307 /// Options controlling explain output
1308 ///
1309 /// See also: [`SessionConfig`]
1310 ///
1311 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
1312 pub struct ExplainOptions {
1313 /// When set to true, the explain statement will only print logical plans
1314 pub logical_plan_only: bool, default = false
1315
1316 /// When set to true, the explain statement will only print physical plans
1317 pub physical_plan_only: bool, default = false
1318
1319 /// When set to true, the explain statement will print operator statistics
1320 /// for physical plans
1321 pub show_statistics: bool, default = false
1322
1323 /// When set to true, the explain statement will print the partition sizes
1324 pub show_sizes: bool, default = true
1325
1326 /// When set to true, the explain statement will print schema information
1327 pub show_schema: bool, default = false
1328
1329 /// Display format of explain. Default is "indent".
1330 /// When set to "tree", it will print the plan in a tree-rendered format.
1331 pub format: ExplainFormat, default = ExplainFormat::Indent
1332
1333 /// (format=tree only) Maximum total width of the rendered tree.
1334 /// When set to 0, the tree will have no width limit.
1335 pub tree_maximum_render_width: usize, default = 240
1336
1337 /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1338 /// "summary" shows common metrics for high-level insights.
1339 /// "dev" provides deep operator-level introspection for developers.
1340 pub analyze_level: MetricType, default = MetricType::Dev
1341
1342 /// Which metric categories to include in "EXPLAIN ANALYZE" output.
1343 /// Comma-separated list of: "rows", "bytes", "timing", "uncategorized".
1344 /// Use "none" to show plan structure only, or "all" (default) to show everything.
1345 /// Metrics without a declared category are treated as "uncategorized".
1346 pub analyze_categories: ExplainAnalyzeCategories, default = ExplainAnalyzeCategories::All
1347 }
1348}
1349
1350impl ExecutionOptions {
1351 /// Returns the correct parallelism based on the provided `value`.
1352 /// If `value` is `"0"`, returns the default available parallelism, computed with
1353 /// `get_available_parallelism`. Otherwise, returns `value`.
1354 fn normalized_parallelism(value: &str) -> String {
1355 if value.parse::<usize>() == Ok(0) {
1356 get_available_parallelism().to_string()
1357 } else {
1358 value.to_owned()
1359 }
1360 }
1361}
1362
1363config_namespace! {
1364 /// Options controlling the format of output when printing record batches
1365 /// Copies [`arrow::util::display::FormatOptions`]
1366 pub struct FormatOptions {
1367 /// If set to `true` any formatting errors will be written to the output
1368 /// instead of being converted into a [`std::fmt::Error`]
1369 pub safe: bool, default = true
1370 /// Format string for nulls
1371 pub null: String, default = "".into()
1372 /// Date format for date arrays
1373 pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1374 /// Format for DateTime arrays
1375 pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1376 /// Timestamp format for timestamp arrays
1377 pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1378 /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1379 pub timestamp_tz_format: Option<String>, default = None
1380 /// Time format for time arrays
1381 pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1382 /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1383 pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1384 /// Show types in visual representation batches
1385 pub types_info: bool, default = false
1386 }
1387}
1388
1389impl<'a> TryFrom<&'a FormatOptions> for arrow::util::display::FormatOptions<'a> {
1390 type Error = DataFusionError;
1391 fn try_from(options: &'a FormatOptions) -> Result<Self> {
1392 let duration_format = match options.duration_format.as_str() {
1393 "pretty" => arrow::util::display::DurationFormat::Pretty,
1394 "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1395 _ => {
1396 return _config_err!(
1397 "Invalid duration format: {}. Valid values are pretty or iso8601",
1398 options.duration_format
1399 );
1400 }
1401 };
1402
1403 Ok(Self::new()
1404 .with_display_error(options.safe)
1405 .with_null(&options.null)
1406 .with_date_format(options.date_format.as_deref())
1407 .with_datetime_format(options.datetime_format.as_deref())
1408 .with_timestamp_format(options.timestamp_format.as_deref())
1409 .with_timestamp_tz_format(options.timestamp_tz_format.as_deref())
1410 .with_time_format(options.time_format.as_deref())
1411 .with_duration_format(duration_format)
1412 .with_types_info(options.types_info))
1413 }
1414}
1415
1416/// A key value pair, with a corresponding description
1417#[derive(Debug, Clone, Hash, PartialEq, Eq)]
1418pub struct ConfigEntry {
1419 /// A unique string to identify this config value
1420 pub key: String,
1421
1422 /// The value if any
1423 pub value: Option<String>,
1424
1425 /// A description of this configuration entry
1426 pub description: &'static str,
1427}
1428
1429/// Configuration options struct, able to store both built-in configuration and custom options
1430#[derive(Debug, Clone, Default)]
1431#[non_exhaustive]
1432pub struct ConfigOptions {
1433 /// Catalog options
1434 pub catalog: CatalogOptions,
1435 /// Execution options
1436 pub execution: ExecutionOptions,
1437 /// Optimizer options
1438 pub optimizer: OptimizerOptions,
1439 /// SQL parser options
1440 pub sql_parser: SqlParserOptions,
1441 /// Explain options
1442 pub explain: ExplainOptions,
1443 /// Optional extensions registered using [`Extensions::insert`]
1444 pub extensions: Extensions,
1445 /// Formatting options when printing batches
1446 pub format: FormatOptions,
1447}
1448
1449impl ConfigField for ConfigOptions {
1450 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1451 self.catalog.visit(v, "datafusion.catalog", "");
1452 self.execution.visit(v, "datafusion.execution", "");
1453 self.optimizer.visit(v, "datafusion.optimizer", "");
1454 self.explain.visit(v, "datafusion.explain", "");
1455 self.sql_parser.visit(v, "datafusion.sql_parser", "");
1456 self.format.visit(v, "datafusion.format", "");
1457 }
1458
1459 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1460 // Extensions are handled in the public `ConfigOptions::set`
1461 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1462 match key {
1463 "catalog" => self.catalog.set(rem, value),
1464 "execution" => self.execution.set(rem, value),
1465 "optimizer" => self.optimizer.set(rem, value),
1466 "explain" => self.explain.set(rem, value),
1467 "sql_parser" => self.sql_parser.set(rem, value),
1468 "format" => self.format.set(rem, value),
1469 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1470 }
1471 }
1472
1473 /// Reset a configuration option back to its default value
1474 fn reset(&mut self, key: &str) -> Result<()> {
1475 let Some((prefix, rest)) = key.split_once('.') else {
1476 return _config_err!("could not find config namespace for key \"{key}\"");
1477 };
1478
1479 if prefix != "datafusion" {
1480 return _config_err!("Could not find config namespace \"{prefix}\"");
1481 }
1482
1483 let (section, rem) = rest.split_once('.').unwrap_or((rest, ""));
1484 if rem.is_empty() {
1485 return _config_err!("could not find config field for key \"{key}\"");
1486 }
1487
1488 match section {
1489 "catalog" => self.catalog.reset(rem),
1490 "execution" => self.execution.reset(rem),
1491 "optimizer" => {
1492 if rem == "enable_dynamic_filter_pushdown" {
1493 let defaults = OptimizerOptions::default();
1494 self.optimizer.enable_dynamic_filter_pushdown =
1495 defaults.enable_dynamic_filter_pushdown;
1496 self.optimizer.enable_topk_dynamic_filter_pushdown =
1497 defaults.enable_topk_dynamic_filter_pushdown;
1498 self.optimizer.enable_join_dynamic_filter_pushdown =
1499 defaults.enable_join_dynamic_filter_pushdown;
1500 Ok(())
1501 } else {
1502 self.optimizer.reset(rem)
1503 }
1504 }
1505 "explain" => self.explain.reset(rem),
1506 "sql_parser" => self.sql_parser.reset(rem),
1507 "format" => self.format.reset(rem),
1508 other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
1509 }
1510 }
1511}
1512
1513/// This namespace is reserved for interacting with Foreign Function Interface
1514/// (FFI) based configuration extensions.
1515pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi";
1516
1517impl ConfigOptions {
1518 /// Creates a new [`ConfigOptions`] with default values
1519 pub fn new() -> Self {
1520 Self::default()
1521 }
1522
1523 /// Set extensions to provided value
1524 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1525 self.extensions = extensions;
1526 self
1527 }
1528
1529 /// Set a configuration option
1530 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1531 let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
1532 return _config_err!("could not find config namespace for key \"{key}\"");
1533 };
1534
1535 if prefix == "datafusion" {
1536 if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
1537 let bool_value = value.parse::<bool>().map_err(|e| {
1538 DataFusionError::Configuration(format!(
1539 "Failed to parse '{value}' as bool: {e}",
1540 ))
1541 })?;
1542
1543 {
1544 self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1545 self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1546 self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1547 self.optimizer.enable_aggregate_dynamic_filter_pushdown = bool_value;
1548 }
1549 return Ok(());
1550 }
1551 return ConfigField::set(self, inner_key, value);
1552 }
1553
1554 if !self.extensions.0.contains_key(prefix)
1555 && self
1556 .extensions
1557 .0
1558 .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
1559 {
1560 inner_key = key;
1561 prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
1562 }
1563
1564 let Some(e) = self.extensions.0.get_mut(prefix) else {
1565 return _config_err!("Could not find config namespace \"{prefix}\"");
1566 };
1567 e.0.set(inner_key, value)
1568 }
1569
1570 /// Create new [`ConfigOptions`], taking values from environment variables
1571 /// where possible.
1572 ///
1573 /// For example, to configure `datafusion.execution.batch_size`
1574 /// ([`ExecutionOptions::batch_size`]) you would set the
1575 /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1576 ///
1577 /// The name of the environment variable is the option's key, transformed to
1578 /// uppercase and with periods replaced with underscores.
1579 ///
1580 /// Values are parsed according to the [same rules used in casts from
1581 /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1582 ///
1583 /// If the value in the environment variable cannot be cast to the type of
1584 /// the configuration option, the default value will be used instead and a
1585 /// warning emitted. Environment variables are read when this method is
1586 /// called, and are not re-read later.
1587 pub fn from_env() -> Result<Self> {
1588 struct Visitor(Vec<String>);
1589
1590 impl Visit for Visitor {
1591 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1592 self.0.push(key.to_string())
1593 }
1594
1595 fn none(&mut self, key: &str, _: &'static str) {
1596 self.0.push(key.to_string())
1597 }
1598 }
1599
1600 // Extract the names of all fields and then look up the corresponding
1601 // environment variables. This isn't hugely efficient but avoids
1602 // ambiguity between `a.b` and `a_b` which would both correspond
1603 // to an environment variable of `A_B`
1604
1605 let mut keys = Visitor(vec![]);
1606 let mut ret = Self::default();
1607 ret.visit(&mut keys, "datafusion", "");
1608
1609 for key in keys.0 {
1610 let env = key.to_uppercase().replace('.', "_");
1611 if let Some(var) = std::env::var_os(env) {
1612 let value = var.to_string_lossy();
1613 log::info!("Set {key} to {value} from the environment variable");
1614 ret.set(&key, value.as_ref())?;
1615 }
1616 }
1617
1618 Ok(ret)
1619 }
1620
1621 /// Create new ConfigOptions struct, taking values from a string hash map.
1622 ///
1623 /// Only the built-in configurations will be extracted from the hash map
1624 /// and other key value pairs will be ignored.
1625 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1626 struct Visitor(Vec<String>);
1627
1628 impl Visit for Visitor {
1629 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1630 self.0.push(key.to_string())
1631 }
1632
1633 fn none(&mut self, key: &str, _: &'static str) {
1634 self.0.push(key.to_string())
1635 }
1636 }
1637
1638 let mut keys = Visitor(vec![]);
1639 let mut ret = Self::default();
1640 ret.visit(&mut keys, "datafusion", "");
1641
1642 for key in keys.0 {
1643 if let Some(var) = settings.get(&key) {
1644 ret.set(&key, var)?;
1645 }
1646 }
1647
1648 Ok(ret)
1649 }
1650
1651 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1652 pub fn entries(&self) -> Vec<ConfigEntry> {
1653 struct Visitor(Vec<ConfigEntry>);
1654
1655 impl Visit for Visitor {
1656 fn some<V: Display>(
1657 &mut self,
1658 key: &str,
1659 value: V,
1660 description: &'static str,
1661 ) {
1662 self.0.push(ConfigEntry {
1663 key: key.to_string(),
1664 value: Some(value.to_string()),
1665 description,
1666 })
1667 }
1668
1669 fn none(&mut self, key: &str, description: &'static str) {
1670 self.0.push(ConfigEntry {
1671 key: key.to_string(),
1672 value: None,
1673 description,
1674 })
1675 }
1676 }
1677
1678 let mut v = Visitor(vec![]);
1679 self.visit(&mut v, "datafusion", "");
1680
1681 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1682 v.0
1683 }
1684
1685 /// Generate documentation that can be included in the user guide
1686 pub fn generate_config_markdown() -> String {
1687 use std::fmt::Write as _;
1688
1689 let mut s = Self::default();
1690
1691 // Normalize for display
1692 s.execution.target_partitions = 0;
1693 s.execution.planning_concurrency = 0;
1694
1695 let mut docs = "| key | default | description |\n".to_string();
1696 docs += "|-----|---------|-------------|\n";
1697 let mut entries = s.entries();
1698 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1699
1700 for entry in s.entries() {
1701 let _ = writeln!(
1702 &mut docs,
1703 "| {} | {} | {} |",
1704 entry.key,
1705 entry.value.as_deref().unwrap_or("NULL"),
1706 entry.description
1707 );
1708 }
1709 docs
1710 }
1711}
1712
1713/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1714/// within DataFusion [`ConfigOptions`]
1715///
1716/// This mechanism can be used to pass configuration to user defined functions
1717/// or optimizer passes
1718///
1719/// # Example
1720/// ```
1721/// use datafusion_common::{
1722/// config::ConfigExtension, config::ConfigOptions, extensions_options,
1723/// };
1724/// // Define a new configuration struct using the `extensions_options` macro
1725/// extensions_options! {
1726/// /// My own config options.
1727/// pub struct MyConfig {
1728/// /// Should "foo" be replaced by "bar"?
1729/// pub foo_to_bar: bool, default = true
1730///
1731/// /// How many "baz" should be created?
1732/// pub baz_count: usize, default = 1337
1733/// }
1734/// }
1735///
1736/// impl ConfigExtension for MyConfig {
1737/// const PREFIX: &'static str = "my_config";
1738/// }
1739///
1740/// // set up config struct and register extension
1741/// let mut config = ConfigOptions::default();
1742/// config.extensions.insert(MyConfig::default());
1743///
1744/// // overwrite config default
1745/// config.set("my_config.baz_count", "42").unwrap();
1746///
1747/// // check config state
1748/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1749/// assert!(my_config.foo_to_bar,);
1750/// assert_eq!(my_config.baz_count, 42,);
1751/// ```
1752///
1753/// # Note:
1754/// Unfortunately associated constants are not currently object-safe, and so this
1755/// extends the object-safe [`ExtensionOptions`]
1756pub trait ConfigExtension: ExtensionOptions {
1757 /// Configuration namespace prefix to use
1758 ///
1759 /// All values under this will be prefixed with `$PREFIX + "."`
1760 const PREFIX: &'static str;
1761}
1762
1763/// An object-safe API for storing arbitrary configuration.
1764///
1765/// See [`ConfigExtension`] for user defined configuration
1766pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1767 /// Return `self` as [`Any`]
1768 ///
1769 /// This is needed until trait upcasting is stabilized
1770 fn as_any(&self) -> &dyn Any;
1771
1772 /// Return `self` as [`Any`]
1773 ///
1774 /// This is needed until trait upcasting is stabilized
1775 fn as_any_mut(&mut self) -> &mut dyn Any;
1776
1777 /// Return a deep clone of this [`ExtensionOptions`]
1778 ///
1779 /// It is important this does not share mutable state to avoid consistency issues
1780 /// with configuration changing whilst queries are executing
1781 fn cloned(&self) -> Box<dyn ExtensionOptions>;
1782
1783 /// Set the given `key`, `value` pair
1784 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1785
1786 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1787 fn entries(&self) -> Vec<ConfigEntry>;
1788}
1789
1790/// A type-safe container for [`ConfigExtension`]
1791#[derive(Debug, Default, Clone)]
1792pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1793
1794impl Extensions {
1795 /// Create a new, empty [`Extensions`]
1796 pub fn new() -> Self {
1797 Self(BTreeMap::new())
1798 }
1799
1800 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1801 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1802 assert_ne!(T::PREFIX, "datafusion");
1803 let e = ExtensionBox(Box::new(extension));
1804 self.0.insert(T::PREFIX, e);
1805 }
1806
1807 /// Retrieves the extension of the given type if any
1808 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1809 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1810 }
1811
1812 /// Retrieves the extension of the given type if any
1813 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1814 let e = self.0.get_mut(T::PREFIX)?;
1815 e.0.as_any_mut().downcast_mut()
1816 }
1817
1818 /// Iterates all the config extension entries yielding their prefix and their
1819 /// [ExtensionOptions] implementation.
1820 pub fn iter(
1821 &self,
1822 ) -> impl Iterator<Item = (&'static str, &Box<dyn ExtensionOptions>)> {
1823 self.0.iter().map(|(k, v)| (*k, &v.0))
1824 }
1825}
1826
1827#[derive(Debug)]
1828struct ExtensionBox(Box<dyn ExtensionOptions>);
1829
1830impl Clone for ExtensionBox {
1831 fn clone(&self) -> Self {
1832 Self(self.0.cloned())
1833 }
1834}
1835
1836/// A trait implemented by `config_namespace` and for field types that provides
1837/// the ability to walk and mutate the configuration tree
1838pub trait ConfigField {
1839 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1840
1841 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1842
1843 fn reset(&mut self, key: &str) -> Result<()> {
1844 _config_err!("Reset is not supported for this config field, key: {}", key)
1845 }
1846}
1847
1848impl<F: ConfigField + Default> ConfigField for Option<F> {
1849 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1850 match self {
1851 Some(s) => s.visit(v, key, description),
1852 None => v.none(key, description),
1853 }
1854 }
1855
1856 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1857 self.get_or_insert_with(Default::default).set(key, value)
1858 }
1859
1860 fn reset(&mut self, key: &str) -> Result<()> {
1861 if key.is_empty() {
1862 *self = Default::default();
1863 Ok(())
1864 } else {
1865 self.get_or_insert_with(Default::default).reset(key)
1866 }
1867 }
1868}
1869
1870/// Default transformation to parse a [`ConfigField`] for a string.
1871///
1872/// This uses [`FromStr`] to parse the data.
1873pub fn default_config_transform<T>(input: &str) -> Result<T>
1874where
1875 T: FromStr,
1876 <T as FromStr>::Err: Sync + Send + Error + 'static,
1877{
1878 input.parse().map_err(|e| {
1879 DataFusionError::Context(
1880 format!(
1881 "Error parsing '{}' as {}",
1882 input,
1883 std::any::type_name::<T>()
1884 ),
1885 Box::new(DataFusionError::External(Box::new(e))),
1886 )
1887 })
1888}
1889
1890/// Macro that generates [`ConfigField`] for a given type.
1891///
1892/// # Usage
1893/// This always requires [`Display`] to be implemented for the given type.
1894///
1895/// There are two ways to invoke this macro. The first one uses
1896/// [`default_config_transform`]/[`FromStr`] to parse the data:
1897///
1898/// ```ignore
1899/// config_field(MyType);
1900/// ```
1901///
1902/// Note that the parsing error MUST implement [`std::error::Error`]!
1903///
1904/// Or you can specify how you want to parse an [`str`] into the type:
1905///
1906/// ```ignore
1907/// fn parse_it(s: &str) -> Result<MyType> {
1908/// ...
1909/// }
1910///
1911/// config_field(
1912/// MyType,
1913/// value => parse_it(value)
1914/// );
1915/// ```
1916#[macro_export]
1917macro_rules! config_field {
1918 ($t:ty) => {
1919 config_field!($t, value => $crate::config::default_config_transform(value)?);
1920 };
1921
1922 ($t:ty, $arg:ident => $transform:expr) => {
1923 impl $crate::config::ConfigField for $t {
1924 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1925 v.some(key, self, description)
1926 }
1927
1928 fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1929 *self = $transform;
1930 Ok(())
1931 }
1932
1933 fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
1934 if key.is_empty() {
1935 *self = <$t as Default>::default();
1936 Ok(())
1937 } else {
1938 $crate::error::_config_err!(
1939 "Config field is a scalar {} and does not have nested field \"{}\"",
1940 stringify!($t),
1941 key
1942 )
1943 }
1944 }
1945 }
1946 };
1947}
1948
1949config_field!(String);
1950config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1951config_field!(usize);
1952config_field!(f64);
1953config_field!(u64);
1954config_field!(u32);
1955config_field!(i32);
1956
1957impl ConfigField for u8 {
1958 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1959 v.some(key, self, description)
1960 }
1961
1962 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1963 if value.is_empty() {
1964 return Err(DataFusionError::Configuration(format!(
1965 "Input string for {key} key is empty"
1966 )));
1967 }
1968 // Check if the string is a valid number
1969 if let Ok(num) = value.parse::<u8>() {
1970 // TODO: Let's decide how we treat the numerical strings.
1971 *self = num;
1972 } else {
1973 let bytes = value.as_bytes();
1974 // Check if the first character is ASCII (single byte)
1975 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1976 return Err(DataFusionError::Configuration(format!(
1977 "Error parsing {value} as u8. Non-ASCII string provided"
1978 )));
1979 }
1980 *self = bytes[0];
1981 }
1982 Ok(())
1983 }
1984}
1985
1986impl ConfigField for CompressionTypeVariant {
1987 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1988 v.some(key, self, description)
1989 }
1990
1991 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1992 *self = CompressionTypeVariant::from_str(value)?;
1993 Ok(())
1994 }
1995}
1996
1997impl ConfigField for CsvQuoteStyle {
1998 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1999 v.some(key, self, description)
2000 }
2001
2002 fn set(&mut self, _: &str, value: &str) -> Result<()> {
2003 *self = CsvQuoteStyle::from_str(value)?;
2004 Ok(())
2005 }
2006}
2007
2008/// An implementation trait used to recursively walk configuration
2009pub trait Visit {
2010 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
2011
2012 fn none(&mut self, key: &str, description: &'static str);
2013}
2014
2015/// Convenience macro to create [`ExtensionsOptions`].
2016///
2017/// The created structure implements the following traits:
2018///
2019/// - [`Clone`]
2020/// - [`Debug`]
2021/// - [`Default`]
2022/// - [`ExtensionOptions`]
2023///
2024/// # Usage
2025/// The syntax is:
2026///
2027/// ```text
2028/// extensions_options! {
2029/// /// Struct docs (optional).
2030/// [<vis>] struct <StructName> {
2031/// /// Field docs (optional)
2032/// [<vis>] <field_name>: <field_type>, default = <default_value>
2033///
2034/// ... more fields
2035/// }
2036/// }
2037/// ```
2038///
2039/// The placeholders are:
2040/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
2041/// - `<StructName>`: Struct name like `MyStruct`.
2042/// - `<field_name>`: Field name like `my_field`.
2043/// - `<field_type>`: Field type like `u8`.
2044/// - `<default_value>`: Default value matching the field type like `42`.
2045///
2046/// # Example
2047/// See also a full example on the [`ConfigExtension`] documentation
2048///
2049/// ```
2050/// use datafusion_common::extensions_options;
2051///
2052/// extensions_options! {
2053/// /// My own config options.
2054/// pub struct MyConfig {
2055/// /// Should "foo" be replaced by "bar"?
2056/// pub foo_to_bar: bool, default = true
2057///
2058/// /// How many "baz" should be created?
2059/// pub baz_count: usize, default = 1337
2060/// }
2061/// }
2062/// ```
2063///
2064///
2065/// [`Debug`]: std::fmt::Debug
2066/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
2067#[macro_export]
2068macro_rules! extensions_options {
2069 (
2070 $(#[doc = $struct_d:tt])*
2071 $vis:vis struct $struct_name:ident {
2072 $(
2073 $(#[doc = $d:tt])*
2074 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
2075 )*$(,)*
2076 }
2077 ) => {
2078 $(#[doc = $struct_d])*
2079 #[derive(Debug, Clone)]
2080 #[non_exhaustive]
2081 $vis struct $struct_name{
2082 $(
2083 $(#[doc = $d])*
2084 $field_vis $field_name : $field_type,
2085 )*
2086 }
2087
2088 impl Default for $struct_name {
2089 fn default() -> Self {
2090 Self {
2091 $($field_name: $default),*
2092 }
2093 }
2094 }
2095
2096 impl $crate::config::ExtensionOptions for $struct_name {
2097 fn as_any(&self) -> &dyn ::std::any::Any {
2098 self
2099 }
2100
2101 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
2102 self
2103 }
2104
2105 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
2106 Box::new(self.clone())
2107 }
2108
2109 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
2110 $crate::config::ConfigField::set(self, key, value)
2111 }
2112
2113 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
2114 struct Visitor(Vec<$crate::config::ConfigEntry>);
2115
2116 impl $crate::config::Visit for Visitor {
2117 fn some<V: std::fmt::Display>(
2118 &mut self,
2119 key: &str,
2120 value: V,
2121 description: &'static str,
2122 ) {
2123 self.0.push($crate::config::ConfigEntry {
2124 key: key.to_string(),
2125 value: Some(value.to_string()),
2126 description,
2127 })
2128 }
2129
2130 fn none(&mut self, key: &str, description: &'static str) {
2131 self.0.push($crate::config::ConfigEntry {
2132 key: key.to_string(),
2133 value: None,
2134 description,
2135 })
2136 }
2137 }
2138
2139 let mut v = Visitor(vec![]);
2140 // The prefix is not used for extensions.
2141 // The description is generated in ConfigField::visit.
2142 // We can just pass empty strings here.
2143 $crate::config::ConfigField::visit(self, &mut v, "", "");
2144 v.0
2145 }
2146 }
2147
2148 impl $crate::config::ConfigField for $struct_name {
2149 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
2150 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2151 match key {
2152 $(
2153 stringify!($field_name) => {
2154 // Safely apply deprecated attribute if present
2155 // $(#[allow(deprecated)])?
2156 {
2157 self.$field_name.set(rem, value.as_ref())
2158 }
2159 },
2160 )*
2161 _ => return $crate::error::_config_err!(
2162 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2163 )
2164 }
2165 }
2166
2167 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
2168 $(
2169 let key = stringify!($field_name).to_string();
2170 let desc = concat!($($d),*).trim();
2171 self.$field_name.visit(v, key.as_str(), desc);
2172 )*
2173 }
2174 }
2175 }
2176}
2177
2178/// These file types have special built in behavior for configuration.
2179/// Use TableOptions::Extensions for configuring other file types.
2180#[derive(Debug, Clone)]
2181pub enum ConfigFileType {
2182 CSV,
2183 #[cfg(feature = "parquet")]
2184 PARQUET,
2185 JSON,
2186}
2187
2188/// Represents the configuration options available for handling different table formats within a data processing application.
2189/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
2190/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
2191#[derive(Debug, Clone, Default)]
2192pub struct TableOptions {
2193 /// Configuration options for CSV file handling. This includes settings like the delimiter,
2194 /// quote character, and whether the first row is considered as headers.
2195 pub csv: CsvOptions,
2196
2197 /// Configuration options for Parquet file handling. This includes settings for compression,
2198 /// encoding, and other Parquet-specific file characteristics.
2199 pub parquet: TableParquetOptions,
2200
2201 /// Configuration options for JSON file handling.
2202 pub json: JsonOptions,
2203
2204 /// The current file format that the table operations should assume. This option allows
2205 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
2206 pub current_format: Option<ConfigFileType>,
2207
2208 /// Optional extensions that can be used to extend or customize the behavior of the table
2209 /// options. Extensions can be registered using `Extensions::insert` and might include
2210 /// custom file handling logic, additional configuration parameters, or other enhancements.
2211 pub extensions: Extensions,
2212}
2213
2214impl ConfigField for TableOptions {
2215 /// Visits configuration settings for the current file format, or all formats if none is selected.
2216 ///
2217 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
2218 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
2219 /// it visits all available format settings.
2220 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
2221 if let Some(file_type) = &self.current_format {
2222 match file_type {
2223 #[cfg(feature = "parquet")]
2224 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
2225 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
2226 ConfigFileType::JSON => self.json.visit(v, "format", ""),
2227 }
2228 } else {
2229 self.csv.visit(v, "csv", "");
2230 self.parquet.visit(v, "parquet", "");
2231 self.json.visit(v, "json", "");
2232 }
2233 }
2234
2235 /// Sets a configuration value for a specific key within `TableOptions`.
2236 ///
2237 /// This method delegates setting configuration values to the specific file format configurations,
2238 /// based on the current format selected. If no format is selected, it returns an error.
2239 ///
2240 /// # Parameters
2241 ///
2242 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
2243 /// for CSV format.
2244 /// * `value`: The value to set for the specified configuration key.
2245 ///
2246 /// # Returns
2247 ///
2248 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
2249 /// or if setting the configuration value fails for the specific format.
2250 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2251 // Extensions are handled in the public `ConfigOptions::set`
2252 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2253 match key {
2254 "format" => {
2255 let Some(format) = &self.current_format else {
2256 return _config_err!("Specify a format for TableOptions");
2257 };
2258 match format {
2259 #[cfg(feature = "parquet")]
2260 ConfigFileType::PARQUET => self.parquet.set(rem, value),
2261 ConfigFileType::CSV => self.csv.set(rem, value),
2262 ConfigFileType::JSON => self.json.set(rem, value),
2263 }
2264 }
2265 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
2266 }
2267 }
2268}
2269
2270impl TableOptions {
2271 /// Constructs a new instance of `TableOptions` with default settings.
2272 ///
2273 /// # Returns
2274 ///
2275 /// A new `TableOptions` instance with default configuration values.
2276 pub fn new() -> Self {
2277 Self::default()
2278 }
2279
2280 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
2281 ///
2282 /// # Parameters
2283 ///
2284 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
2285 ///
2286 /// # Returns
2287 ///
2288 /// A new `TableOptions` instance with settings applied from the session config.
2289 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
2290 let initial = TableOptions::default();
2291 initial.combine_with_session_config(config)
2292 }
2293
2294 /// Updates the current `TableOptions` with settings from a given session config.
2295 ///
2296 /// # Parameters
2297 ///
2298 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
2299 ///
2300 /// # Returns
2301 ///
2302 /// A new `TableOptions` instance with updated settings from the session config.
2303 #[must_use = "this method returns a new instance"]
2304 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
2305 let mut clone = self.clone();
2306 clone.parquet.global = config.execution.parquet.clone();
2307 clone
2308 }
2309
2310 /// Sets the file format for the table.
2311 ///
2312 /// # Parameters
2313 ///
2314 /// * `format`: The file format to use (e.g., CSV, Parquet).
2315 pub fn set_config_format(&mut self, format: ConfigFileType) {
2316 self.current_format = Some(format);
2317 }
2318
2319 /// Sets the extensions for this `TableOptions` instance.
2320 ///
2321 /// # Parameters
2322 ///
2323 /// * `extensions`: The `Extensions` instance to set.
2324 ///
2325 /// # Returns
2326 ///
2327 /// A new `TableOptions` instance with the specified extensions applied.
2328 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2329 self.extensions = extensions;
2330 self
2331 }
2332
2333 /// Sets a specific configuration option.
2334 ///
2335 /// # Parameters
2336 ///
2337 /// * `key`: The configuration key (e.g., "format.delimiter").
2338 /// * `value`: The value to set for the specified key.
2339 ///
2340 /// # Returns
2341 ///
2342 /// A result indicating success or failure in setting the configuration option.
2343 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2344 let Some((mut prefix, _)) = key.split_once('.') else {
2345 return _config_err!("could not find config namespace for key \"{key}\"");
2346 };
2347
2348 if prefix == "format" {
2349 return ConfigField::set(self, key, value);
2350 }
2351
2352 if prefix == "execution" {
2353 return Ok(());
2354 }
2355
2356 if !self.extensions.0.contains_key(prefix)
2357 && self
2358 .extensions
2359 .0
2360 .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
2361 {
2362 prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
2363 }
2364
2365 let Some(e) = self.extensions.0.get_mut(prefix) else {
2366 return _config_err!("Could not find config namespace \"{prefix}\"");
2367 };
2368 e.0.set(key, value)
2369 }
2370
2371 /// Initializes a new `TableOptions` from a hash map of string settings.
2372 ///
2373 /// # Parameters
2374 ///
2375 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2376 ///
2377 /// # Returns
2378 ///
2379 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
2380 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
2381 let mut ret = Self::default();
2382 for (k, v) in settings {
2383 ret.set(k, v)?;
2384 }
2385
2386 Ok(ret)
2387 }
2388
2389 /// Modifies the current `TableOptions` instance with settings from a hash map.
2390 ///
2391 /// # Parameters
2392 ///
2393 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2394 ///
2395 /// # Returns
2396 ///
2397 /// A result indicating success or failure in applying the settings.
2398 pub fn alter_with_string_hash_map(
2399 &mut self,
2400 settings: &HashMap<String, String>,
2401 ) -> Result<()> {
2402 for (k, v) in settings {
2403 self.set(k, v)?;
2404 }
2405 Ok(())
2406 }
2407
2408 /// Retrieves all configuration entries from this `TableOptions`.
2409 ///
2410 /// # Returns
2411 ///
2412 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
2413 pub fn entries(&self) -> Vec<ConfigEntry> {
2414 struct Visitor(Vec<ConfigEntry>);
2415
2416 impl Visit for Visitor {
2417 fn some<V: Display>(
2418 &mut self,
2419 key: &str,
2420 value: V,
2421 description: &'static str,
2422 ) {
2423 self.0.push(ConfigEntry {
2424 key: key.to_string(),
2425 value: Some(value.to_string()),
2426 description,
2427 })
2428 }
2429
2430 fn none(&mut self, key: &str, description: &'static str) {
2431 self.0.push(ConfigEntry {
2432 key: key.to_string(),
2433 value: None,
2434 description,
2435 })
2436 }
2437 }
2438
2439 let mut v = Visitor(vec![]);
2440 self.visit(&mut v, "format", "");
2441
2442 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
2443 v.0
2444 }
2445}
2446
2447/// Options that control how Parquet files are read, including global options
2448/// that apply to all columns and optional column-specific overrides
2449///
2450/// Closely tied to `ParquetWriterOptions` (see `crate::file_options::parquet_writer::ParquetWriterOptions` when the "parquet" feature is enabled).
2451/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2452/// (e.g. sorting_columns).
2453#[derive(Clone, Default, Debug, PartialEq)]
2454pub struct TableParquetOptions {
2455 /// Global Parquet options that propagates to all columns.
2456 pub global: ParquetOptions,
2457 /// Column specific options. Default usage is parquet.XX::column.
2458 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2459 /// Additional file-level metadata to include. Inserted into the key_value_metadata
2460 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2461 ///
2462 /// Multiple entries are permitted
2463 /// ```sql
2464 /// OPTIONS (
2465 /// 'format.metadata::key1' '',
2466 /// 'format.metadata::key2' 'value',
2467 /// 'format.metadata::key3' 'value has spaces',
2468 /// 'format.metadata::key4' 'value has special chars :: :',
2469 /// 'format.metadata::key_dupe' 'original will be overwritten',
2470 /// 'format.metadata::key_dupe' 'final'
2471 /// )
2472 /// ```
2473 pub key_value_metadata: HashMap<String, Option<String>>,
2474 /// Options for configuring Parquet modular encryption
2475 ///
2476 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2477 /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2478 /// These can be set via 'format.crypto', for example:
2479 /// ```sql
2480 /// OPTIONS (
2481 /// 'format.crypto.file_encryption.encrypt_footer' 'true',
2482 /// 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" */
2483 /// 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2484 /// 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2485 /// -- Same for decryption
2486 /// 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2487 /// 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2488 /// 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2489 /// )
2490 /// ```
2491 /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2492 /// Note that keys must be provided as in hex format since these are binary strings.
2493 pub crypto: ParquetEncryptionOptions,
2494}
2495
2496impl TableParquetOptions {
2497 /// Return new default TableParquetOptions
2498 pub fn new() -> Self {
2499 Self::default()
2500 }
2501
2502 /// Set whether the encoding of the arrow metadata should occur
2503 /// during the writing of parquet.
2504 ///
2505 /// Default is to encode the arrow schema in the file kv_metadata.
2506 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2507 Self {
2508 global: ParquetOptions {
2509 skip_arrow_metadata: skip,
2510 ..self.global
2511 },
2512 ..self
2513 }
2514 }
2515
2516 /// Retrieves all configuration entries from this `TableParquetOptions`.
2517 ///
2518 /// # Returns
2519 ///
2520 /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2521 pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2522 struct Visitor(Vec<ConfigEntry>);
2523
2524 impl Visit for Visitor {
2525 fn some<V: Display>(
2526 &mut self,
2527 key: &str,
2528 value: V,
2529 description: &'static str,
2530 ) {
2531 self.0.push(ConfigEntry {
2532 key: key[1..].to_string(),
2533 value: Some(value.to_string()),
2534 description,
2535 })
2536 }
2537
2538 fn none(&mut self, key: &str, description: &'static str) {
2539 self.0.push(ConfigEntry {
2540 key: key[1..].to_string(),
2541 value: None,
2542 description,
2543 })
2544 }
2545 }
2546
2547 let mut v = Visitor(vec![]);
2548 self.visit(&mut v, "", "");
2549
2550 v.0
2551 }
2552}
2553
2554impl ConfigField for TableParquetOptions {
2555 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2556 self.global.visit(v, key_prefix, description);
2557 self.column_specific_options
2558 .visit(v, key_prefix, description);
2559 self.crypto
2560 .visit(v, &format!("{key_prefix}.crypto"), description);
2561 }
2562
2563 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2564 // Determine if the key is a global, metadata, or column-specific setting
2565 if key.starts_with("metadata::") {
2566 let k = match key.split("::").collect::<Vec<_>>()[..] {
2567 [_meta] | [_meta, ""] => {
2568 return _config_err!(
2569 "Invalid metadata key provided, missing key in metadata::<key>"
2570 );
2571 }
2572 [_meta, k] => k.into(),
2573 _ => {
2574 return _config_err!(
2575 "Invalid metadata key provided, found too many '::' in \"{key}\""
2576 );
2577 }
2578 };
2579 self.key_value_metadata.insert(k, Some(value.into()));
2580 Ok(())
2581 } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2582 self.crypto.set(crypto_feature, value)
2583 } else if key.contains("::") {
2584 self.column_specific_options.set(key, value)
2585 } else {
2586 self.global.set(key, value)
2587 }
2588 }
2589}
2590
2591macro_rules! config_namespace_with_hashmap {
2592 (
2593 $(#[doc = $struct_d:tt])*
2594 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
2595 $vis:vis struct $struct_name:ident {
2596 $(
2597 $(#[doc = $d:tt])*
2598 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2599 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2600 )*$(,)*
2601 }
2602 ) => {
2603
2604 $(#[doc = $struct_d])*
2605 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
2606 #[derive(Debug, Clone, PartialEq)]
2607 $vis struct $struct_name{
2608 $(
2609 $(#[doc = $d])*
2610 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2611 $field_vis $field_name : $field_type,
2612 )*
2613 }
2614
2615 impl ConfigField for $struct_name {
2616 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2617 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2618 match key {
2619 $(
2620 stringify!($field_name) => {
2621 // Handle deprecated fields
2622 $(let value = $transform(value);)?
2623 self.$field_name.set(rem, value.as_ref())
2624 },
2625 )*
2626 _ => _config_err!(
2627 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2628 )
2629 }
2630 }
2631
2632 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2633 $(
2634 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2635 let desc = concat!($($d),*).trim();
2636 // Handle deprecated fields
2637 self.$field_name.visit(v, key.as_str(), desc);
2638 )*
2639 }
2640 }
2641
2642 impl Default for $struct_name {
2643 fn default() -> Self {
2644 Self {
2645 $($field_name: $default),*
2646 }
2647 }
2648 }
2649
2650 impl ConfigField for HashMap<String,$struct_name> {
2651 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2652 let parts: Vec<&str> = key.splitn(2, "::").collect();
2653 match parts.as_slice() {
2654 [inner_key, hashmap_key] => {
2655 // Get or create the struct for the specified key
2656 let inner_value = self
2657 .entry((*hashmap_key).to_owned())
2658 .or_insert_with($struct_name::default);
2659
2660 inner_value.set(inner_key, value)
2661 }
2662 _ => _config_err!("Unrecognized key '{key}'."),
2663 }
2664 }
2665
2666 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2667 for (column_name, col_options) in self {
2668 $(
2669 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2670 let desc = concat!($($d),*).trim();
2671 col_options.$field_name.visit(v, key.as_str(), desc);
2672 )*
2673 }
2674 }
2675 }
2676 }
2677}
2678
2679config_namespace_with_hashmap! {
2680 /// Options controlling parquet format for individual columns.
2681 ///
2682 /// See [`ParquetOptions`] for more details
2683 pub struct ParquetColumnOptions {
2684 /// Sets if bloom filter is enabled for the column path.
2685 pub bloom_filter_enabled: Option<bool>, default = None
2686
2687 /// Sets encoding for the column path.
2688 /// Valid values are: plain, plain_dictionary, rle,
2689 /// bit_packed, delta_binary_packed, delta_length_byte_array,
2690 /// delta_byte_array, rle_dictionary, and byte_stream_split.
2691 /// These values are not case-sensitive. If NULL, uses
2692 /// default parquet options
2693 pub encoding: Option<String>, default = None
2694
2695 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2696 /// default parquet options
2697 pub dictionary_enabled: Option<bool>, default = None
2698
2699 /// Sets default parquet compression codec for the column path.
2700 /// Valid values are: uncompressed, snappy, gzip(level),
2701 /// brotli(level), lz4, zstd(level), and lz4_raw.
2702 /// These values are not case-sensitive. If NULL, uses
2703 /// default parquet options
2704 pub compression: Option<String>, transform = str::to_lowercase, default = None
2705
2706 /// Sets if statistics are enabled for the column
2707 /// Valid values are: "none", "chunk", and "page"
2708 /// These values are not case sensitive. If NULL, uses
2709 /// default parquet options
2710 pub statistics_enabled: Option<String>, default = None
2711
2712 /// Sets bloom filter false positive probability for the column path. If NULL, uses
2713 /// default parquet options
2714 pub bloom_filter_fpp: Option<f64>, default = None
2715
2716 /// Sets bloom filter number of distinct values. If NULL, uses
2717 /// default parquet options
2718 pub bloom_filter_ndv: Option<u64>, default = None
2719 }
2720}
2721
2722#[derive(Clone, Debug, PartialEq)]
2723pub struct ConfigFileEncryptionProperties {
2724 /// Should the parquet footer be encrypted
2725 /// default is true
2726 pub encrypt_footer: bool,
2727 /// Key to use for the parquet footer encoded in hex format
2728 pub footer_key_as_hex: String,
2729 /// Metadata information for footer key
2730 pub footer_key_metadata_as_hex: String,
2731 /// HashMap of column names --> (key in hex format, metadata)
2732 pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2733 /// AAD prefix string uniquely identifies the file and prevents file swapping
2734 pub aad_prefix_as_hex: String,
2735 /// If true, store the AAD prefix in the file
2736 /// default is false
2737 pub store_aad_prefix: bool,
2738}
2739
2740// Setup to match EncryptionPropertiesBuilder::new()
2741impl Default for ConfigFileEncryptionProperties {
2742 fn default() -> Self {
2743 ConfigFileEncryptionProperties {
2744 encrypt_footer: true,
2745 footer_key_as_hex: String::new(),
2746 footer_key_metadata_as_hex: String::new(),
2747 column_encryption_properties: Default::default(),
2748 aad_prefix_as_hex: String::new(),
2749 store_aad_prefix: false,
2750 }
2751 }
2752}
2753
2754config_namespace_with_hashmap! {
2755 pub struct ColumnEncryptionProperties {
2756 /// Per column encryption key
2757 pub column_key_as_hex: String, default = "".to_string()
2758 /// Per column encryption key metadata
2759 pub column_metadata_as_hex: Option<String>, default = None
2760 }
2761}
2762
2763impl ConfigField for ConfigFileEncryptionProperties {
2764 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2765 let key = format!("{key_prefix}.encrypt_footer");
2766 let desc = "Encrypt the footer";
2767 self.encrypt_footer.visit(v, key.as_str(), desc);
2768
2769 let key = format!("{key_prefix}.footer_key_as_hex");
2770 let desc = "Key to use for the parquet footer";
2771 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2772
2773 let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2774 let desc = "Metadata to use for the parquet footer";
2775 self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2776
2777 self.column_encryption_properties.visit(v, key_prefix, desc);
2778
2779 let key = format!("{key_prefix}.aad_prefix_as_hex");
2780 let desc = "AAD prefix to use";
2781 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2782
2783 let key = format!("{key_prefix}.store_aad_prefix");
2784 let desc = "If true, store the AAD prefix";
2785 self.store_aad_prefix.visit(v, key.as_str(), desc);
2786
2787 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2788 }
2789
2790 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2791 // Any hex encoded values must be pre-encoded using
2792 // hex::encode() before calling set.
2793
2794 if key.contains("::") {
2795 // Handle any column specific properties
2796 return self.column_encryption_properties.set(key, value);
2797 };
2798
2799 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2800 match key {
2801 "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2802 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2803 "footer_key_metadata_as_hex" => {
2804 self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2805 }
2806 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2807 "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2808 _ => _config_err!(
2809 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2810 key
2811 ),
2812 }
2813 }
2814}
2815
2816#[cfg(feature = "parquet_encryption")]
2817impl TryFrom<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2818 type Error = DataFusionError;
2819
2820 fn try_from(val: ConfigFileEncryptionProperties) -> Result<Self> {
2821 let mut fep = FileEncryptionProperties::builder(
2822 hex::decode(val.footer_key_as_hex)
2823 .map_err(|e| {
2824 DataFusionError::Configuration(format!("Unable to decode hex footer key from ConfigFileEncryptionProperties: {e}"))
2825 })?,
2826 )
2827 .with_plaintext_footer(!val.encrypt_footer)
2828 .with_aad_prefix_storage(val.store_aad_prefix);
2829
2830 if !val.footer_key_metadata_as_hex.is_empty() {
2831 fep = fep.with_footer_key_metadata(
2832 hex::decode(&val.footer_key_metadata_as_hex)
2833 .map_err(|e| {
2834 DataFusionError::Configuration(format!("Unable to decode hex footer key metadata from ConfigFileEncryptionProperties: {e}"))
2835 })?,
2836 );
2837 }
2838
2839 for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2840 let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2841 .map_err(|e| {
2842 DataFusionError::Configuration(format!("Unable to decode hex encryption key for column {column_name}: {e}"))
2843 })?;
2844 let key_metadata = encryption_props
2845 .column_metadata_as_hex
2846 .as_ref()
2847 .map(hex::decode)
2848 .transpose()
2849 .map_err(|e| {
2850 DataFusionError::Configuration(format!("Unable to decode hex column metadata for column {column_name}: {e}"))
2851 })?;
2852
2853 match key_metadata {
2854 Some(key_metadata) => {
2855 fep = fep.with_column_key_and_metadata(
2856 column_name,
2857 encryption_key,
2858 key_metadata,
2859 );
2860 }
2861 None => {
2862 fep = fep.with_column_key(column_name, encryption_key);
2863 }
2864 }
2865 }
2866
2867 if !val.aad_prefix_as_hex.is_empty() {
2868 let aad_prefix: Vec<u8> = hex::decode(&val.aad_prefix_as_hex).map_err(|e| {
2869 DataFusionError::Configuration(format!(
2870 "Unable to decode hex AAD prefix from ConfigFileEncryptionProperties: {e}"
2871 ))
2872 })?;
2873 fep = fep.with_aad_prefix(aad_prefix);
2874 }
2875 Ok(Arc::unwrap_or_clone(fep.build().map_err(|e| {
2876 DataFusionError::Configuration(format!(
2877 "Could not build FileEncryptionProperties: {e}"
2878 ))
2879 })?))
2880 }
2881}
2882
2883#[cfg(feature = "parquet_encryption")]
2884impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2885 fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2886 let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2887
2888 let mut column_encryption_properties: HashMap<
2889 String,
2890 ColumnEncryptionProperties,
2891 > = HashMap::new();
2892
2893 for (i, column_name) in column_names_vec.iter().enumerate() {
2894 let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2895 let column_metadata_as_hex: Option<String> =
2896 column_metas_vec.get(i).map(hex::encode);
2897 column_encryption_properties.insert(
2898 column_name.clone(),
2899 ColumnEncryptionProperties {
2900 column_key_as_hex,
2901 column_metadata_as_hex,
2902 },
2903 );
2904 }
2905 let aad_prefix = f.aad_prefix().cloned().unwrap_or_default();
2906 ConfigFileEncryptionProperties {
2907 encrypt_footer: f.encrypt_footer(),
2908 footer_key_as_hex: hex::encode(f.footer_key()),
2909 footer_key_metadata_as_hex: f
2910 .footer_key_metadata()
2911 .map(hex::encode)
2912 .unwrap_or_default(),
2913 column_encryption_properties,
2914 aad_prefix_as_hex: hex::encode(aad_prefix),
2915 store_aad_prefix: f.store_aad_prefix(),
2916 }
2917 }
2918}
2919
2920#[derive(Clone, Debug, PartialEq)]
2921pub struct ConfigFileDecryptionProperties {
2922 /// Binary string to use for the parquet footer encoded in hex format
2923 pub footer_key_as_hex: String,
2924 /// HashMap of column names --> key in hex format
2925 pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2926 /// AAD prefix string uniquely identifies the file and prevents file swapping
2927 pub aad_prefix_as_hex: String,
2928 /// If true, then verify signature for files with plaintext footers.
2929 /// default = true
2930 pub footer_signature_verification: bool,
2931}
2932
2933config_namespace_with_hashmap! {
2934 pub struct ColumnDecryptionProperties {
2935 /// Per column encryption key
2936 pub column_key_as_hex: String, default = "".to_string()
2937 }
2938}
2939
2940// Setup to match DecryptionPropertiesBuilder::new()
2941impl Default for ConfigFileDecryptionProperties {
2942 fn default() -> Self {
2943 ConfigFileDecryptionProperties {
2944 footer_key_as_hex: String::new(),
2945 column_decryption_properties: Default::default(),
2946 aad_prefix_as_hex: String::new(),
2947 footer_signature_verification: true,
2948 }
2949 }
2950}
2951
2952impl ConfigField for ConfigFileDecryptionProperties {
2953 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2954 let key = format!("{key_prefix}.footer_key_as_hex");
2955 let desc = "Key to use for the parquet footer";
2956 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2957
2958 let key = format!("{key_prefix}.aad_prefix_as_hex");
2959 let desc = "AAD prefix to use";
2960 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2961
2962 let key = format!("{key_prefix}.footer_signature_verification");
2963 let desc = "If true, verify the footer signature";
2964 self.footer_signature_verification
2965 .visit(v, key.as_str(), desc);
2966
2967 self.column_decryption_properties.visit(v, key_prefix, desc);
2968 }
2969
2970 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2971 // Any hex encoded values must be pre-encoded using
2972 // hex::encode() before calling set.
2973
2974 if key.contains("::") {
2975 // Handle any column specific properties
2976 return self.column_decryption_properties.set(key, value);
2977 };
2978
2979 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2980 match key {
2981 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2982 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2983 "footer_signature_verification" => {
2984 self.footer_signature_verification.set(rem, value.as_ref())
2985 }
2986 _ => _config_err!(
2987 "Config value \"{}\" not found on ConfigFileDecryptionProperties",
2988 key
2989 ),
2990 }
2991 }
2992}
2993
2994#[cfg(feature = "parquet_encryption")]
2995impl TryFrom<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2996 type Error = DataFusionError;
2997
2998 fn try_from(val: ConfigFileDecryptionProperties) -> Result<Self> {
2999 let mut column_names: Vec<&str> = Vec::new();
3000 let mut column_keys: Vec<Vec<u8>> = Vec::new();
3001
3002 for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
3003 let column_key = hex::decode(&decryption_properties.column_key_as_hex).map_err(|e| {
3004 DataFusionError::Configuration(format!
3005 ("Could not decode hex column key from ConfigFileDecryptionProperties for column name {col_name}: {e}."))
3006 })?;
3007 column_names.push(col_name.as_str());
3008 column_keys.push(column_key);
3009 }
3010
3011 let footer_key = hex::decode(val.footer_key_as_hex).map_err(|e| {
3012 DataFusionError::Configuration(format!(
3013 "Could not decode hex footer key from ConfigFileDecryptionProperties: {e}."
3014 ))
3015 })?;
3016
3017 let mut fep = FileDecryptionProperties::builder(footer_key)
3018 .with_column_keys(column_names, column_keys)
3019 .map_err(|e| {
3020 DataFusionError::Configuration(format!(
3021 "Could not set column keys on FileDecryptionPropertiesBuilder: {e}."
3022 ))
3023 })?;
3024
3025 if !val.footer_signature_verification {
3026 fep = fep.disable_footer_signature_verification();
3027 }
3028
3029 if !val.aad_prefix_as_hex.is_empty() {
3030 let aad_prefix = hex::decode(&val.aad_prefix_as_hex).map_err(|e| {
3031 DataFusionError::Configuration(format!(
3032 "Could not decode hex AAD prefix from ConfigFileDecryptionProperties: {e}."
3033 ))
3034 })?;
3035 fep = fep.with_aad_prefix(aad_prefix);
3036 }
3037
3038 Ok(Arc::unwrap_or_clone(fep.build().map_err(|e| {
3039 DataFusionError::Configuration(format!(
3040 "Could not build FileDecryptionProperties: {e}."
3041 ))
3042 })?))
3043 }
3044}
3045
3046#[cfg(feature = "parquet_encryption")]
3047impl TryFrom<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
3048 type Error = DataFusionError;
3049
3050 fn try_from(f: &Arc<FileDecryptionProperties>) -> Result<Self> {
3051 let footer_key = f.footer_key(None).map_err(|e| {
3052 DataFusionError::Configuration(format!(
3053 "Could not retrieve footer key from FileDecryptionProperties. \
3054 Note that conversion to ConfigFileDecryptionProperties is not supported \
3055 when using a key retriever: {e}"
3056 ))
3057 })?;
3058
3059 let (column_names_vec, column_keys_vec) = f.column_keys();
3060 let mut column_decryption_properties: HashMap<
3061 String,
3062 ColumnDecryptionProperties,
3063 > = HashMap::new();
3064 for (i, column_name) in column_names_vec.iter().enumerate() {
3065 let props = ColumnDecryptionProperties {
3066 column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
3067 };
3068 column_decryption_properties.insert(column_name.clone(), props);
3069 }
3070
3071 let aad_prefix = f.aad_prefix().cloned().unwrap_or_default();
3072 Ok(ConfigFileDecryptionProperties {
3073 footer_key_as_hex: hex::encode(footer_key.as_ref()),
3074 column_decryption_properties,
3075 aad_prefix_as_hex: hex::encode(aad_prefix),
3076 footer_signature_verification: f.check_plaintext_footer_integrity(),
3077 })
3078 }
3079}
3080
3081/// Holds implementation-specific options for an encryption factory
3082#[derive(Clone, Debug, Default, PartialEq)]
3083pub struct EncryptionFactoryOptions {
3084 pub options: HashMap<String, String>,
3085}
3086
3087impl ConfigField for EncryptionFactoryOptions {
3088 fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
3089 for (option_key, option_value) in &self.options {
3090 v.some(
3091 &format!("{key}.{option_key}"),
3092 option_value,
3093 "Encryption factory specific option",
3094 );
3095 }
3096 }
3097
3098 fn set(&mut self, key: &str, value: &str) -> Result<()> {
3099 self.options.insert(key.to_owned(), value.to_owned());
3100 Ok(())
3101 }
3102}
3103
3104impl EncryptionFactoryOptions {
3105 /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
3106 pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
3107 let mut options = T::default();
3108 for (key, value) in &self.options {
3109 options.set(key, value)?;
3110 }
3111 Ok(options)
3112 }
3113}
3114
3115config_namespace! {
3116 /// Options controlling CSV format
3117 pub struct CsvOptions {
3118 /// Specifies whether there is a CSV header (i.e. the first line
3119 /// consists of is column names). The value `None` indicates that
3120 /// the configuration should be consulted.
3121 pub has_header: Option<bool>, default = None
3122 pub delimiter: u8, default = b','
3123 pub quote: u8, default = b'"'
3124 pub terminator: Option<u8>, default = None
3125 pub escape: Option<u8>, default = None
3126 pub double_quote: Option<bool>, default = None
3127 /// Quote style for CSV writing.
3128 /// One of: "Always", "Necessary", "NonNumeric", "Never"
3129 pub quote_style: CsvQuoteStyle, default = CsvQuoteStyle::Necessary
3130 /// Whether to ignore leading whitespace in string values when writing CSV.
3131 /// Defaults to `false` when `None`.
3132 pub ignore_leading_whitespace: Option<bool>, default = None
3133 /// Whether to ignore trailing whitespace in string values when writing CSV.
3134 /// Defaults to `false` when `None`.
3135 pub ignore_trailing_whitespace: Option<bool>, default = None
3136 /// Specifies whether newlines in (quoted) values are supported.
3137 ///
3138 /// Parsing newlines in quoted values may be affected by execution behaviour such as
3139 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
3140 /// parsed successfully, which may reduce performance.
3141 ///
3142 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
3143 pub newlines_in_values: Option<bool>, default = None
3144 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3145 /// Compression level for the output file. The valid range depends on the
3146 /// compression algorithm:
3147 /// - ZSTD: 1 to 22 (default: 3)
3148 /// - GZIP: 0 to 9 (default: 6)
3149 /// - BZIP2: 0 to 9 (default: 6)
3150 /// - XZ: 0 to 9 (default: 6)
3151 /// If not specified, the default level for the compression algorithm is used.
3152 pub compression_level: Option<u32>, default = None
3153 pub schema_infer_max_rec: Option<usize>, default = None
3154 pub date_format: Option<String>, default = None
3155 pub datetime_format: Option<String>, default = None
3156 pub timestamp_format: Option<String>, default = None
3157 pub timestamp_tz_format: Option<String>, default = None
3158 pub time_format: Option<String>, default = None
3159 // The output format for Nulls in the CSV writer.
3160 pub null_value: Option<String>, default = None
3161 // The input regex for Nulls when loading CSVs.
3162 pub null_regex: Option<String>, default = None
3163 pub comment: Option<u8>, default = None
3164 /// Whether to allow truncated rows when parsing, both within a single file and across files.
3165 ///
3166 /// When set to false (default), reading a single CSV file which has rows of different lengths will
3167 /// error; if reading multiple CSV files with different number of columns, it will also fail.
3168 ///
3169 /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
3170 /// rows with null values for the missing columns; if reading multiple CSV files with different number
3171 /// of columns, it creates a union schema containing all columns found across the files, and will
3172 /// pad any files missing columns with null values for their rows.
3173 pub truncated_rows: Option<bool>, default = None
3174 }
3175}
3176
3177impl CsvOptions {
3178 /// Set a limit in terms of records to scan to infer the schema
3179 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
3180 pub fn with_compression(
3181 mut self,
3182 compression_type_variant: CompressionTypeVariant,
3183 ) -> Self {
3184 self.compression = compression_type_variant;
3185 self
3186 }
3187
3188 /// Set a limit in terms of records to scan to infer the schema
3189 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
3190 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
3191 self.schema_infer_max_rec = Some(max_rec);
3192 self
3193 }
3194
3195 /// Set true to indicate that the first line is a header.
3196 /// - default to true
3197 pub fn with_has_header(mut self, has_header: bool) -> Self {
3198 self.has_header = Some(has_header);
3199 self
3200 }
3201
3202 /// Returns true if the first line is a header. If format options does not
3203 /// specify whether there is a header, returns `None` (indicating that the
3204 /// configuration should be consulted).
3205 pub fn has_header(&self) -> Option<bool> {
3206 self.has_header
3207 }
3208
3209 /// The character separating values within a row.
3210 /// - default to ','
3211 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
3212 self.delimiter = delimiter;
3213 self
3214 }
3215
3216 /// The quote character in a row.
3217 /// - default to '"'
3218 pub fn with_quote(mut self, quote: u8) -> Self {
3219 self.quote = quote;
3220 self
3221 }
3222
3223 /// The character that terminates a row.
3224 /// - default to None (CRLF)
3225 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
3226 self.terminator = terminator;
3227 self
3228 }
3229
3230 /// The escape character in a row.
3231 /// - default is None
3232 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
3233 self.escape = escape;
3234 self
3235 }
3236
3237 /// Set true to indicate that the CSV quotes should be doubled.
3238 /// - default to true
3239 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
3240 self.double_quote = Some(double_quote);
3241 self
3242 }
3243
3244 /// Set the quote style for CSV writing.
3245 pub fn with_quote_style(mut self, quote_style: CsvQuoteStyle) -> Self {
3246 self.quote_style = quote_style;
3247 self
3248 }
3249
3250 /// Set whether to ignore leading whitespace in string values when writing CSV.
3251 pub fn with_ignore_leading_whitespace(
3252 mut self,
3253 ignore_leading_whitespace: bool,
3254 ) -> Self {
3255 self.ignore_leading_whitespace = Some(ignore_leading_whitespace);
3256 self
3257 }
3258
3259 /// Set whether to ignore trailing whitespace in string values when writing CSV.
3260 pub fn with_ignore_trailing_whitespace(
3261 mut self,
3262 ignore_trailing_whitespace: bool,
3263 ) -> Self {
3264 self.ignore_trailing_whitespace = Some(ignore_trailing_whitespace);
3265 self
3266 }
3267
3268 /// Specifies whether newlines in (quoted) values are supported.
3269 ///
3270 /// Parsing newlines in quoted values may be affected by execution behaviour such as
3271 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
3272 /// parsed successfully, which may reduce performance.
3273 ///
3274 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
3275 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
3276 self.newlines_in_values = Some(newlines_in_values);
3277 self
3278 }
3279
3280 /// Set a `CompressionTypeVariant` of CSV
3281 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
3282 pub fn with_file_compression_type(
3283 mut self,
3284 compression: CompressionTypeVariant,
3285 ) -> Self {
3286 self.compression = compression;
3287 self
3288 }
3289
3290 /// Whether to allow truncated rows when parsing.
3291 /// By default this is set to false and will error if the CSV rows have different lengths.
3292 /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
3293 /// If the record’s schema is not nullable, then it will still return an error.
3294 pub fn with_truncated_rows(mut self, allow: bool) -> Self {
3295 self.truncated_rows = Some(allow);
3296 self
3297 }
3298
3299 /// Set the compression level for the output file.
3300 /// The valid range depends on the compression algorithm.
3301 /// If not specified, the default level for the algorithm is used.
3302 pub fn with_compression_level(mut self, level: u32) -> Self {
3303 self.compression_level = Some(level);
3304 self
3305 }
3306
3307 /// The delimiter character.
3308 pub fn delimiter(&self) -> u8 {
3309 self.delimiter
3310 }
3311
3312 /// The quote character.
3313 pub fn quote(&self) -> u8 {
3314 self.quote
3315 }
3316
3317 /// The terminator character.
3318 pub fn terminator(&self) -> Option<u8> {
3319 self.terminator
3320 }
3321
3322 /// The escape character.
3323 pub fn escape(&self) -> Option<u8> {
3324 self.escape
3325 }
3326}
3327
3328config_namespace! {
3329 /// Options controlling JSON format
3330 pub struct JsonOptions {
3331 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3332 /// Compression level for the output file. The valid range depends on the
3333 /// compression algorithm:
3334 /// - ZSTD: 1 to 22 (default: 3)
3335 /// - GZIP: 0 to 9 (default: 6)
3336 /// - BZIP2: 0 to 9 (default: 6)
3337 /// - XZ: 0 to 9 (default: 6)
3338 /// If not specified, the default level for the compression algorithm is used.
3339 pub compression_level: Option<u32>, default = None
3340 pub schema_infer_max_rec: Option<usize>, default = None
3341 /// The JSON format to use when reading files.
3342 ///
3343 /// When `true` (default), expects newline-delimited JSON (NDJSON):
3344 /// ```text
3345 /// {"key1": 1, "key2": "val"}
3346 /// {"key1": 2, "key2": "vals"}
3347 /// ```
3348 ///
3349 /// When `false`, expects JSON array format:
3350 /// ```text
3351 /// [
3352 /// {"key1": 1, "key2": "val"},
3353 /// {"key1": 2, "key2": "vals"}
3354 /// ]
3355 /// ```
3356 pub newline_delimited: bool, default = true
3357 }
3358}
3359
3360pub trait OutputFormatExt: Display {}
3361
3362#[derive(Debug, Clone, PartialEq)]
3363#[cfg_attr(feature = "parquet", expect(clippy::large_enum_variant))]
3364pub enum OutputFormat {
3365 CSV(CsvOptions),
3366 JSON(JsonOptions),
3367 #[cfg(feature = "parquet")]
3368 PARQUET(TableParquetOptions),
3369 AVRO,
3370 ARROW,
3371}
3372
3373impl Display for OutputFormat {
3374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3375 let out = match self {
3376 OutputFormat::CSV(_) => "csv",
3377 OutputFormat::JSON(_) => "json",
3378 #[cfg(feature = "parquet")]
3379 OutputFormat::PARQUET(_) => "parquet",
3380 OutputFormat::AVRO => "avro",
3381 OutputFormat::ARROW => "arrow",
3382 };
3383 write!(f, "{out}")
3384 }
3385}
3386
3387#[cfg(test)]
3388mod tests {
3389 #[cfg(feature = "parquet")]
3390 use crate::config::TableParquetOptions;
3391 use crate::config::{
3392 ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
3393 Extensions, TableOptions,
3394 };
3395 use std::any::Any;
3396 use std::collections::HashMap;
3397
3398 #[derive(Default, Debug, Clone)]
3399 pub struct TestExtensionConfig {
3400 /// Should "foo" be replaced by "bar"?
3401 pub properties: HashMap<String, String>,
3402 }
3403
3404 impl ExtensionOptions for TestExtensionConfig {
3405 fn as_any(&self) -> &dyn Any {
3406 self
3407 }
3408
3409 fn as_any_mut(&mut self) -> &mut dyn Any {
3410 self
3411 }
3412
3413 fn cloned(&self) -> Box<dyn ExtensionOptions> {
3414 Box::new(self.clone())
3415 }
3416
3417 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
3418 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
3419 assert_eq!(key, "test");
3420 self.properties.insert(rem.to_owned(), value.to_owned());
3421 Ok(())
3422 }
3423
3424 fn entries(&self) -> Vec<ConfigEntry> {
3425 self.properties
3426 .iter()
3427 .map(|(k, v)| ConfigEntry {
3428 key: k.into(),
3429 value: Some(v.into()),
3430 description: "",
3431 })
3432 .collect()
3433 }
3434 }
3435
3436 impl ConfigExtension for TestExtensionConfig {
3437 const PREFIX: &'static str = "test";
3438 }
3439
3440 #[test]
3441 fn create_table_config() {
3442 let mut extension = Extensions::new();
3443 extension.insert(TestExtensionConfig::default());
3444 let table_config = TableOptions::new().with_extensions(extension);
3445 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
3446 assert!(kafka_config.is_some())
3447 }
3448
3449 #[test]
3450 fn alter_test_extension_config() {
3451 let mut extension = Extensions::new();
3452 extension.insert(TestExtensionConfig::default());
3453 let mut table_config = TableOptions::new().with_extensions(extension);
3454 table_config.set_config_format(ConfigFileType::CSV);
3455 table_config.set("format.delimiter", ";").unwrap();
3456 assert_eq!(table_config.csv.delimiter, b';');
3457 table_config.set("test.bootstrap.servers", "asd").unwrap();
3458 let kafka_config = table_config
3459 .extensions
3460 .get::<TestExtensionConfig>()
3461 .unwrap();
3462 assert_eq!(
3463 kafka_config.properties.get("bootstrap.servers").unwrap(),
3464 "asd"
3465 );
3466 }
3467
3468 #[test]
3469 fn iter_test_extension_config() {
3470 let mut extension = Extensions::new();
3471 extension.insert(TestExtensionConfig::default());
3472 let table_config = TableOptions::new().with_extensions(extension);
3473 let extensions = table_config.extensions.iter().collect::<Vec<_>>();
3474 assert_eq!(extensions.len(), 1);
3475 assert_eq!(extensions[0].0, TestExtensionConfig::PREFIX);
3476 }
3477
3478 #[test]
3479 fn csv_u8_table_options() {
3480 let mut table_config = TableOptions::new();
3481 table_config.set_config_format(ConfigFileType::CSV);
3482 table_config.set("format.delimiter", ";").unwrap();
3483 assert_eq!(table_config.csv.delimiter as char, ';');
3484 table_config.set("format.escape", "\"").unwrap();
3485 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
3486 table_config.set("format.escape", "\'").unwrap();
3487 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
3488 }
3489
3490 #[test]
3491 fn warning_only_not_default() {
3492 use std::sync::atomic::AtomicUsize;
3493 static COUNT: AtomicUsize = AtomicUsize::new(0);
3494 use log::{Level, LevelFilter, Metadata, Record};
3495 struct SimpleLogger;
3496 impl log::Log for SimpleLogger {
3497 fn enabled(&self, metadata: &Metadata) -> bool {
3498 metadata.level() <= Level::Info
3499 }
3500
3501 fn log(&self, record: &Record) {
3502 if self.enabled(record.metadata()) {
3503 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3504 }
3505 }
3506 fn flush(&self) {}
3507 }
3508 log::set_logger(&SimpleLogger).unwrap();
3509 log::set_max_level(LevelFilter::Info);
3510 let mut sql_parser_options = crate::config::SqlParserOptions::default();
3511 sql_parser_options
3512 .set("enable_options_value_normalization", "false")
3513 .unwrap();
3514 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
3515 sql_parser_options
3516 .set("enable_options_value_normalization", "true")
3517 .unwrap();
3518 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
3519 }
3520
3521 #[test]
3522 fn reset_nested_scalar_reports_helpful_error() {
3523 let mut value = true;
3524 let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err();
3525 let message = err.to_string();
3526 assert!(
3527 message.starts_with(
3528 "Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\""
3529 ),
3530 "unexpected error message: {message}"
3531 );
3532 }
3533
3534 #[cfg(feature = "parquet")]
3535 #[test]
3536 fn parquet_table_options() {
3537 let mut table_config = TableOptions::new();
3538 table_config.set_config_format(ConfigFileType::PARQUET);
3539 table_config
3540 .set("format.bloom_filter_enabled::col1", "true")
3541 .unwrap();
3542 assert_eq!(
3543 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
3544 Some(true)
3545 );
3546 }
3547
3548 #[cfg(feature = "parquet_encryption")]
3549 #[test]
3550 fn parquet_table_encryption() {
3551 use crate::config::{
3552 ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
3553 };
3554 use parquet::encryption::decrypt::FileDecryptionProperties;
3555 use parquet::encryption::encrypt::FileEncryptionProperties;
3556 use std::sync::Arc;
3557
3558 let footer_key = b"0123456789012345".to_vec(); // 128bit/16
3559 let column_names = vec!["double_field", "float_field"];
3560 let column_keys =
3561 vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
3562
3563 let file_encryption_properties =
3564 FileEncryptionProperties::builder(footer_key.clone())
3565 .with_column_keys(column_names.clone(), column_keys.clone())
3566 .unwrap()
3567 .build()
3568 .unwrap();
3569
3570 let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
3571 .with_column_keys(column_names.clone(), column_keys.clone())
3572 .unwrap()
3573 .build()
3574 .unwrap();
3575
3576 // Test round-trip
3577 let config_encrypt =
3578 ConfigFileEncryptionProperties::from(&file_encryption_properties);
3579 let encryption_properties_built =
3580 Arc::new(FileEncryptionProperties::try_from(config_encrypt.clone()).unwrap());
3581 assert_eq!(file_encryption_properties, encryption_properties_built);
3582
3583 let config_decrypt =
3584 ConfigFileDecryptionProperties::try_from(&decryption_properties).unwrap();
3585 let decryption_properties_built =
3586 Arc::new(FileDecryptionProperties::try_from(config_decrypt.clone()).unwrap());
3587 assert_eq!(decryption_properties, decryption_properties_built);
3588
3589 ///////////////////////////////////////////////////////////////////////////////////
3590 // Test encryption config
3591
3592 // Display original encryption config
3593 // println!("{:#?}", config_encrypt);
3594
3595 let mut table_config = TableOptions::new();
3596 table_config.set_config_format(ConfigFileType::PARQUET);
3597 table_config
3598 .parquet
3599 .set(
3600 "crypto.file_encryption.encrypt_footer",
3601 config_encrypt.encrypt_footer.to_string().as_str(),
3602 )
3603 .unwrap();
3604 table_config
3605 .parquet
3606 .set(
3607 "crypto.file_encryption.footer_key_as_hex",
3608 config_encrypt.footer_key_as_hex.as_str(),
3609 )
3610 .unwrap();
3611
3612 for (i, col_name) in column_names.iter().enumerate() {
3613 let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3614 let value = hex::encode(column_keys[i].clone());
3615 table_config
3616 .parquet
3617 .set(key.as_str(), value.as_str())
3618 .unwrap();
3619 }
3620
3621 // Print matching final encryption config
3622 // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3623
3624 assert_eq!(
3625 table_config.parquet.crypto.file_encryption,
3626 Some(config_encrypt)
3627 );
3628
3629 ///////////////////////////////////////////////////////////////////////////////////
3630 // Test decryption config
3631
3632 // Display original decryption config
3633 // println!("{:#?}", config_decrypt);
3634
3635 let mut table_config = TableOptions::new();
3636 table_config.set_config_format(ConfigFileType::PARQUET);
3637 table_config
3638 .parquet
3639 .set(
3640 "crypto.file_decryption.footer_key_as_hex",
3641 config_decrypt.footer_key_as_hex.as_str(),
3642 )
3643 .unwrap();
3644
3645 for (i, col_name) in column_names.iter().enumerate() {
3646 let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3647 let value = hex::encode(column_keys[i].clone());
3648 table_config
3649 .parquet
3650 .set(key.as_str(), value.as_str())
3651 .unwrap();
3652 }
3653
3654 // Print matching final decryption config
3655 // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3656
3657 assert_eq!(
3658 table_config.parquet.crypto.file_decryption,
3659 Some(config_decrypt.clone())
3660 );
3661
3662 // Set config directly
3663 let mut table_config = TableOptions::new();
3664 table_config.set_config_format(ConfigFileType::PARQUET);
3665 table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3666 assert_eq!(
3667 table_config.parquet.crypto.file_decryption,
3668 Some(config_decrypt.clone())
3669 );
3670 }
3671
3672 #[cfg(feature = "parquet_encryption")]
3673 #[test]
3674 fn parquet_encryption_invalid_hex_errors_encryption() {
3675 use crate::config::ColumnEncryptionProperties;
3676 use crate::config::ConfigFileEncryptionProperties;
3677 use parquet::encryption::encrypt::FileEncryptionProperties;
3678 use std::collections::HashMap;
3679
3680 let valid_footer_key_as_hex = hex::encode(b"0123456789012345");
3681
3682 let mut enc = ConfigFileEncryptionProperties {
3683 encrypt_footer: true,
3684 footer_key_as_hex: valid_footer_key_as_hex.clone(),
3685 footer_key_metadata_as_hex: String::new(),
3686 column_encryption_properties: HashMap::new(),
3687 aad_prefix_as_hex: String::new(),
3688 store_aad_prefix: false,
3689 };
3690
3691 // Encryption: invalid footer key hex
3692 enc.footer_key_as_hex = "not_hex".to_string();
3693 let err = FileEncryptionProperties::try_from(enc.clone())
3694 .unwrap_err()
3695 .to_string();
3696 assert!(err.contains("Unable to decode hex footer key"));
3697 enc.footer_key_as_hex = valid_footer_key_as_hex.clone();
3698
3699 // Encryption: invalid footer key metadata hex
3700 enc.footer_key_metadata_as_hex = "zz".to_string();
3701 let err = FileEncryptionProperties::try_from(enc.clone())
3702 .unwrap_err()
3703 .to_string();
3704 assert!(err.contains("Unable to decode hex footer key metadata"));
3705 enc.footer_key_metadata_as_hex = String::new();
3706
3707 // Encryption: invalid column key hex
3708 enc.column_encryption_properties.insert(
3709 "col1".to_string(),
3710 ColumnEncryptionProperties {
3711 column_key_as_hex: "bad".to_string(),
3712 column_metadata_as_hex: None,
3713 },
3714 );
3715 let err = FileEncryptionProperties::try_from(enc.clone())
3716 .unwrap_err()
3717 .to_string();
3718 assert!(err.contains("Unable to decode hex encryption key for column col1"));
3719 enc.column_encryption_properties.clear();
3720
3721 // Encryption: invalid column metadata hex
3722 enc.column_encryption_properties.insert(
3723 "col1".to_string(),
3724 ColumnEncryptionProperties {
3725 column_key_as_hex: hex::encode(b"1234567890123450"),
3726 column_metadata_as_hex: Some("zz".to_string()),
3727 },
3728 );
3729 let err = FileEncryptionProperties::try_from(enc.clone())
3730 .unwrap_err()
3731 .to_string();
3732 assert!(err.contains("Unable to decode hex column metadata for column col1"));
3733 enc.column_encryption_properties.clear();
3734
3735 // Encryption: invalid AAD prefix hex
3736 enc.aad_prefix_as_hex = "zz".to_string();
3737 let err = FileEncryptionProperties::try_from(enc.clone())
3738 .unwrap_err()
3739 .to_string();
3740 assert!(err.contains("Unable to decode hex AAD prefix"));
3741 }
3742
3743 #[cfg(feature = "parquet_encryption")]
3744 #[test]
3745 fn parquet_encryption_invalid_hex_errors_decryption() {
3746 use crate::config::ColumnDecryptionProperties;
3747 use crate::config::ConfigFileDecryptionProperties;
3748 use parquet::encryption::decrypt::FileDecryptionProperties;
3749 use std::collections::HashMap;
3750
3751 let valid_footer_key_as_hex = hex::encode(b"0123456789012345");
3752
3753 let mut dec = ConfigFileDecryptionProperties {
3754 footer_key_as_hex: valid_footer_key_as_hex.clone(),
3755 column_decryption_properties: HashMap::new(),
3756 aad_prefix_as_hex: String::new(),
3757 footer_signature_verification: true,
3758 };
3759
3760 // Decryption: invalid column key hex
3761 dec.column_decryption_properties.insert(
3762 "col1".to_string(),
3763 ColumnDecryptionProperties {
3764 column_key_as_hex: "bad".to_string(),
3765 },
3766 );
3767 let err = FileDecryptionProperties::try_from(dec.clone())
3768 .unwrap_err()
3769 .to_string();
3770 assert!(err.contains("Could not decode hex column key"));
3771 assert!(err.contains("col1"));
3772 dec.column_decryption_properties.clear();
3773
3774 // Decryption: invalid footer key hex
3775 dec.footer_key_as_hex = "bad".to_string();
3776 let err = FileDecryptionProperties::try_from(dec.clone())
3777 .unwrap_err()
3778 .to_string();
3779 assert!(err.contains("Could not decode hex footer key"));
3780 dec.footer_key_as_hex = valid_footer_key_as_hex;
3781
3782 // Decryption: invalid AAD prefix hex
3783 dec.aad_prefix_as_hex = "zz".to_string();
3784 let err = FileDecryptionProperties::try_from(dec.clone())
3785 .unwrap_err()
3786 .to_string();
3787 assert!(err.contains("Could not decode hex AAD prefix"));
3788 }
3789
3790 #[cfg(feature = "parquet_encryption")]
3791 #[test]
3792 fn parquet_encryption_factory_config() {
3793 let mut parquet_options = TableParquetOptions::default();
3794
3795 assert_eq!(parquet_options.crypto.factory_id, None);
3796 assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3797
3798 let mut input_config = TestExtensionConfig::default();
3799 input_config
3800 .properties
3801 .insert("key1".to_string(), "value 1".to_string());
3802 input_config
3803 .properties
3804 .insert("key2".to_string(), "value 2".to_string());
3805
3806 parquet_options
3807 .crypto
3808 .configure_factory("example_factory", &input_config);
3809
3810 assert_eq!(
3811 parquet_options.crypto.factory_id,
3812 Some("example_factory".to_string())
3813 );
3814 let factory_options = &parquet_options.crypto.factory_options.options;
3815 assert_eq!(factory_options.len(), 2);
3816 assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3817 assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3818 }
3819
3820 #[cfg(feature = "parquet_encryption")]
3821 struct ParquetEncryptionKeyRetriever {}
3822
3823 #[cfg(feature = "parquet_encryption")]
3824 impl parquet::encryption::decrypt::KeyRetriever for ParquetEncryptionKeyRetriever {
3825 fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result<Vec<u8>> {
3826 if !key_metadata.is_empty() {
3827 Ok(b"1234567890123450".to_vec())
3828 } else {
3829 Err(parquet::errors::ParquetError::General(
3830 "Key metadata not provided".to_string(),
3831 ))
3832 }
3833 }
3834 }
3835
3836 #[cfg(feature = "parquet_encryption")]
3837 #[test]
3838 fn conversion_from_key_retriever_to_config_file_decryption_properties() {
3839 use crate::Result;
3840 use crate::config::ConfigFileDecryptionProperties;
3841 use crate::encryption::FileDecryptionProperties;
3842
3843 let retriever = std::sync::Arc::new(ParquetEncryptionKeyRetriever {});
3844 let decryption_properties =
3845 FileDecryptionProperties::with_key_retriever(retriever)
3846 .build()
3847 .unwrap();
3848 let config_file_decryption_properties: Result<ConfigFileDecryptionProperties> =
3849 (&decryption_properties).try_into();
3850 assert!(config_file_decryption_properties.is_err());
3851 let err = config_file_decryption_properties.unwrap_err().to_string();
3852 assert!(err.contains("key retriever"));
3853 assert!(err.contains("Key metadata not provided"));
3854 }
3855
3856 #[cfg(feature = "parquet")]
3857 #[test]
3858 fn parquet_table_options_config_entry() {
3859 let mut table_config = TableOptions::new();
3860 table_config.set_config_format(ConfigFileType::PARQUET);
3861 table_config
3862 .set("format.bloom_filter_enabled::col1", "true")
3863 .unwrap();
3864 let entries = table_config.entries();
3865 assert!(
3866 entries
3867 .iter()
3868 .any(|item| item.key == "format.bloom_filter_enabled::col1")
3869 )
3870 }
3871
3872 #[cfg(feature = "parquet")]
3873 #[test]
3874 fn parquet_table_parquet_options_config_entry() {
3875 let mut table_parquet_options = TableParquetOptions::new();
3876 table_parquet_options
3877 .set(
3878 "crypto.file_encryption.column_key_as_hex::double_field",
3879 "31323334353637383930313233343530",
3880 )
3881 .unwrap();
3882 let entries = table_parquet_options.entries();
3883 assert!(
3884 entries.iter().any(|item| item.key
3885 == "crypto.file_encryption.column_key_as_hex::double_field")
3886 )
3887 }
3888
3889 #[cfg(feature = "parquet")]
3890 #[test]
3891 fn parquet_table_options_config_metadata_entry() {
3892 let mut table_config = TableOptions::new();
3893 table_config.set_config_format(ConfigFileType::PARQUET);
3894 table_config.set("format.metadata::key1", "").unwrap();
3895 table_config.set("format.metadata::key2", "value2").unwrap();
3896 table_config
3897 .set("format.metadata::key3", "value with spaces ")
3898 .unwrap();
3899 table_config
3900 .set("format.metadata::key4", "value with special chars :: :")
3901 .unwrap();
3902
3903 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3904 assert_eq!(parsed_metadata.get("should not exist1"), None);
3905 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3906 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3907 assert_eq!(
3908 parsed_metadata.get("key3"),
3909 Some(&Some("value with spaces ".into()))
3910 );
3911 assert_eq!(
3912 parsed_metadata.get("key4"),
3913 Some(&Some("value with special chars :: :".into()))
3914 );
3915
3916 // duplicate keys are overwritten
3917 table_config.set("format.metadata::key_dupe", "A").unwrap();
3918 table_config.set("format.metadata::key_dupe", "B").unwrap();
3919 let parsed_metadata = table_config.parquet.key_value_metadata;
3920 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3921 }
3922 #[cfg(feature = "parquet")]
3923 #[test]
3924 fn test_parquet_writer_version_validation() {
3925 use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};
3926
3927 let mut config = ConfigOptions::default();
3928
3929 // Valid values should work
3930 config
3931 .set("datafusion.execution.parquet.writer_version", "1.0")
3932 .unwrap();
3933 assert_eq!(
3934 config.execution.parquet.writer_version,
3935 DFParquetWriterVersion::V1_0
3936 );
3937
3938 config
3939 .set("datafusion.execution.parquet.writer_version", "2.0")
3940 .unwrap();
3941 assert_eq!(
3942 config.execution.parquet.writer_version,
3943 DFParquetWriterVersion::V2_0
3944 );
3945
3946 // Invalid value should error immediately at SET time
3947 let err = config
3948 .set("datafusion.execution.parquet.writer_version", "3.0")
3949 .unwrap_err();
3950 assert_eq!(
3951 err.to_string(),
3952 "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
3953 );
3954 }
3955
3956 #[cfg(feature = "parquet")]
3957 #[test]
3958 fn set_cdc_enabled_flag() {
3959 use crate::config::ConfigOptions;
3960
3961 let mut config = ConfigOptions::default();
3962 // CDC is disabled by default.
3963 assert!(!config.execution.parquet.content_defined_chunking.enabled);
3964
3965 // `.enabled = true` enables CDC; parameters keep their defaults.
3966 config
3967 .set(
3968 "datafusion.execution.parquet.content_defined_chunking.enabled",
3969 "true",
3970 )
3971 .unwrap();
3972 let cdc = &config.execution.parquet.content_defined_chunking;
3973 assert!(cdc.enabled);
3974 assert_eq!(cdc.min_chunk_size, 256 * 1024);
3975 assert_eq!(cdc.max_chunk_size, 1024 * 1024);
3976 assert_eq!(cdc.norm_level, 0);
3977
3978 // `.enabled = false` disables CDC.
3979 config
3980 .set(
3981 "datafusion.execution.parquet.content_defined_chunking.enabled",
3982 "false",
3983 )
3984 .unwrap();
3985 assert!(!config.execution.parquet.content_defined_chunking.enabled);
3986 }
3987
3988 #[cfg(feature = "parquet")]
3989 #[test]
3990 fn set_cdc_param_does_not_enable() {
3991 use crate::config::ConfigOptions;
3992
3993 let mut config = ConfigOptions::default();
3994
3995 // Setting a parameter does NOT enable CDC (`enabled` is a distinct field,
3996 // defaulting to false), and the result is independent of key order.
3997 config
3998 .set(
3999 "datafusion.execution.parquet.content_defined_chunking.min_chunk_size",
4000 "1024",
4001 )
4002 .unwrap();
4003 let cdc = &config.execution.parquet.content_defined_chunking;
4004 assert!(!cdc.enabled);
4005 assert_eq!(cdc.min_chunk_size, 1024);
4006 assert_eq!(cdc.max_chunk_size, 1024 * 1024);
4007 assert_eq!(cdc.norm_level, 0);
4008 }
4009}