datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26use crate::parquet_config::DFParquetWriterVersion;
27use crate::parsers::CompressionTypeVariant;
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30#[cfg(feature = "parquet_encryption")]
31use hex;
32use std::any::Any;
33use std::collections::{BTreeMap, HashMap};
34use std::error::Error;
35use std::fmt::{self, Display};
36use std::str::FromStr;
37#[cfg(feature = "parquet_encryption")]
38use std::sync::Arc;
39
40/// A macro that wraps a configuration struct and automatically derives
41/// [`Default`] and [`ConfigField`] for it, allowing it to be used
42/// in the [`ConfigOptions`] configuration tree.
43///
44/// `transform` is used to normalize values before parsing.
45///
46/// For example,
47///
48/// ```ignore
49/// config_namespace! {
50/// /// Amazing config
51/// pub struct MyConfig {
52/// /// Field 1 doc
53/// field1: String, transform = str::to_lowercase, default = "".to_string()
54///
55/// /// Field 2 doc
56/// field2: usize, default = 232
57///
58/// /// Field 3 doc
59/// field3: Option<usize>, default = None
60/// }
61/// }
62/// ```
63///
64/// Will generate
65///
66/// ```ignore
67/// /// Amazing config
68/// #[derive(Debug, Clone)]
69/// #[non_exhaustive]
70/// pub struct MyConfig {
71/// /// Field 1 doc
72/// field1: String,
73/// /// Field 2 doc
74/// field2: usize,
75/// /// Field 3 doc
76/// field3: Option<usize>,
77/// }
78/// impl ConfigField for MyConfig {
79/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
80/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
81/// match key {
82/// "field1" => {
83/// let value = str::to_lowercase(value);
84/// self.field1.set(rem, value.as_ref())
85/// },
86/// "field2" => self.field2.set(rem, value.as_ref()),
87/// "field3" => self.field3.set(rem, value.as_ref()),
88/// _ => _internal_err!(
89/// "Config value \"{}\" not found on MyConfig",
90/// key
91/// ),
92/// }
93/// }
94///
95/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
96/// let key = format!("{}.field1", key_prefix);
97/// let desc = "Field 1 doc";
98/// self.field1.visit(v, key.as_str(), desc);
99/// let key = format!("{}.field2", key_prefix);
100/// let desc = "Field 2 doc";
101/// self.field2.visit(v, key.as_str(), desc);
102/// let key = format!("{}.field3", key_prefix);
103/// let desc = "Field 3 doc";
104/// self.field3.visit(v, key.as_str(), desc);
105/// }
106/// }
107///
108/// impl Default for MyConfig {
109/// fn default() -> Self {
110/// Self {
111/// field1: "".to_string(),
112/// field2: 232,
113/// field3: None,
114/// }
115/// }
116/// }
117/// ```
118///
119/// NB: Misplaced commas may result in nonsensical errors
120#[macro_export]
121macro_rules! config_namespace {
122 (
123 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
124 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
125 $(#[allow($($struct_de:tt)*)])?
126 $vis:vis struct $struct_name:ident {
127 $(
128 $(#[doc = $d:tt])* // Field-level documentation attributes
129 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
130 $(#[allow($($field_de:tt)*)])?
131 $field_vis:vis $field_name:ident : $field_type:ty,
132 $(warn = $warn:expr,)?
133 $(transform = $transform:expr,)?
134 default = $default:expr
135 )*$(,)*
136 }
137 ) => {
138 $(#[doc = $struct_d])* // Apply struct documentation
139 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
140 $(#[allow($($struct_de)*)])?
141 #[derive(Debug, Clone, PartialEq)]
142 $vis struct $struct_name {
143 $(
144 $(#[doc = $d])* // Apply field documentation
145 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
146 $(#[allow($($field_de)*)])?
147 $field_vis $field_name: $field_type,
148 )*
149 }
150
151 impl $crate::config::ConfigField for $struct_name {
152 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
153 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
154 match key {
155 $(
156 stringify!($field_name) => {
157 // Safely apply deprecated attribute if present
158 // $(#[allow(deprecated)])?
159 {
160 $(let value = $transform(value);)? // Apply transformation if specified
161 let ret = self.$field_name.set(rem, value.as_ref());
162
163 $(if !$warn.is_empty() {
164 let default: $field_type = $default;
165 if default != self.$field_name {
166 log::warn!($warn);
167 }
168 })? // Log warning if specified, and the value is not the default
169 ret
170 }
171 },
172 )*
173 _ => return $crate::error::_config_err!(
174 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
175 )
176 }
177 }
178
179 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
180 $(
181 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
182 let desc = concat!($($d),*).trim();
183 self.$field_name.visit(v, key.as_str(), desc);
184 )*
185 }
186
187 fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
188 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
189 match key {
190 $(
191 stringify!($field_name) => {
192 {
193 if rem.is_empty() {
194 let default_value: $field_type = $default;
195 self.$field_name = default_value;
196 Ok(())
197 } else {
198 self.$field_name.reset(rem)
199 }
200 }
201 },
202 )*
203 _ => $crate::error::_config_err!(
204 "Config value \"{}\" not found on {}",
205 key,
206 stringify!($struct_name)
207 ),
208 }
209 }
210 }
211 impl Default for $struct_name {
212 fn default() -> Self {
213 Self {
214 $($field_name: $default),*
215 }
216 }
217 }
218 }
219}
220
221config_namespace! {
222 /// Options related to catalog and directory scanning
223 ///
224 /// See also: [`SessionConfig`]
225 ///
226 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
227 pub struct CatalogOptions {
228 /// Whether the default catalog and schema should be created automatically.
229 pub create_default_catalog_and_schema: bool, default = true
230
231 /// The default catalog name - this impacts what SQL queries use if not specified
232 pub default_catalog: String, default = "datafusion".to_string()
233
234 /// The default schema name - this impacts what SQL queries use if not specified
235 pub default_schema: String, default = "public".to_string()
236
237 /// Should DataFusion provide access to `information_schema`
238 /// virtual tables for displaying schema information
239 pub information_schema: bool, default = false
240
241 /// Location scanned to load tables for `default` schema
242 pub location: Option<String>, default = None
243
244 /// Type of `TableProvider` to use when loading `default` schema
245 pub format: Option<String>, default = None
246
247 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
248 /// if not specified explicitly in the statement.
249 pub has_header: bool, default = true
250
251 /// Specifies whether newlines in (quoted) CSV values are supported.
252 ///
253 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
254 /// if not specified explicitly in the statement.
255 ///
256 /// Parsing newlines in quoted values may be affected by execution behaviour such as
257 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
258 /// parsed successfully, which may reduce performance.
259 pub newlines_in_values: bool, default = false
260 }
261}
262
263config_namespace! {
264 /// Options related to SQL parser
265 ///
266 /// See also: [`SessionConfig`]
267 ///
268 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
269 pub struct SqlParserOptions {
270 /// When set to true, SQL parser will parse float as decimal type
271 pub parse_float_as_decimal: bool, default = false
272
273 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
274 pub enable_ident_normalization: bool, default = true
275
276 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
277 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
278 /// are normalized automatically.
279 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
280
281 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
282 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
283 pub dialect: Dialect, default = Dialect::Generic
284 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
285
286 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
287 /// ignore the length. If false, error if a `VARCHAR` with a length is
288 /// specified. The Arrow type system does not have a notion of maximum
289 /// string length and thus DataFusion can not enforce such limits.
290 pub support_varchar_with_length: bool, default = true
291
292 /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
293 /// If false, they are mapped to `Utf8`.
294 /// Default is true.
295 pub map_string_types_to_utf8view: bool, default = true
296
297 /// When set to true, the source locations relative to the original SQL
298 /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
299 /// and recorded in the logical plan nodes.
300 pub collect_spans: bool, default = false
301
302 /// Specifies the recursion depth limit when parsing complex SQL Queries
303 pub recursion_limit: usize, default = 50
304
305 /// Specifies the default null ordering for query results. There are 4 options:
306 /// - `nulls_max`: Nulls appear last in ascending order.
307 /// - `nulls_min`: Nulls appear first in ascending order.
308 /// - `nulls_first`: Nulls always be first in any order.
309 /// - `nulls_last`: Nulls always be last in any order.
310 ///
311 /// By default, `nulls_max` is used to follow Postgres's behavior.
312 /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
313 pub default_null_ordering: String, default = "nulls_max".to_string()
314 }
315}
316
317/// This is the SQL dialect used by DataFusion's parser.
318/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
319/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
320#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
321pub enum Dialect {
322 #[default]
323 Generic,
324 MySQL,
325 PostgreSQL,
326 Hive,
327 SQLite,
328 Snowflake,
329 Redshift,
330 MsSQL,
331 ClickHouse,
332 BigQuery,
333 Ansi,
334 DuckDB,
335 Databricks,
336}
337
338impl AsRef<str> for Dialect {
339 fn as_ref(&self) -> &str {
340 match self {
341 Self::Generic => "generic",
342 Self::MySQL => "mysql",
343 Self::PostgreSQL => "postgresql",
344 Self::Hive => "hive",
345 Self::SQLite => "sqlite",
346 Self::Snowflake => "snowflake",
347 Self::Redshift => "redshift",
348 Self::MsSQL => "mssql",
349 Self::ClickHouse => "clickhouse",
350 Self::BigQuery => "bigquery",
351 Self::Ansi => "ansi",
352 Self::DuckDB => "duckdb",
353 Self::Databricks => "databricks",
354 }
355 }
356}
357
358impl FromStr for Dialect {
359 type Err = DataFusionError;
360
361 fn from_str(s: &str) -> Result<Self, Self::Err> {
362 let value = match s.to_ascii_lowercase().as_str() {
363 "generic" => Self::Generic,
364 "mysql" => Self::MySQL,
365 "postgresql" | "postgres" => Self::PostgreSQL,
366 "hive" => Self::Hive,
367 "sqlite" => Self::SQLite,
368 "snowflake" => Self::Snowflake,
369 "redshift" => Self::Redshift,
370 "mssql" => Self::MsSQL,
371 "clickhouse" => Self::ClickHouse,
372 "bigquery" => Self::BigQuery,
373 "ansi" => Self::Ansi,
374 "duckdb" => Self::DuckDB,
375 "databricks" => Self::Databricks,
376 other => {
377 let error_message = format!(
378 "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
379 );
380 return Err(DataFusionError::Configuration(error_message));
381 }
382 };
383 Ok(value)
384 }
385}
386
387impl ConfigField for Dialect {
388 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
389 v.some(key, self, description)
390 }
391
392 fn set(&mut self, _: &str, value: &str) -> Result<()> {
393 *self = Self::from_str(value)?;
394 Ok(())
395 }
396}
397
398impl Display for Dialect {
399 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400 let str = self.as_ref();
401 write!(f, "{str}")
402 }
403}
404
405#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
406pub enum SpillCompression {
407 Zstd,
408 Lz4Frame,
409 #[default]
410 Uncompressed,
411}
412
413impl FromStr for SpillCompression {
414 type Err = DataFusionError;
415
416 fn from_str(s: &str) -> Result<Self, Self::Err> {
417 match s.to_ascii_lowercase().as_str() {
418 "zstd" => Ok(Self::Zstd),
419 "lz4_frame" => Ok(Self::Lz4Frame),
420 "uncompressed" | "" => Ok(Self::Uncompressed),
421 other => Err(DataFusionError::Configuration(format!(
422 "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
423 ))),
424 }
425 }
426}
427
428impl ConfigField for SpillCompression {
429 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
430 v.some(key, self, description)
431 }
432
433 fn set(&mut self, _: &str, value: &str) -> Result<()> {
434 *self = SpillCompression::from_str(value)?;
435 Ok(())
436 }
437}
438
439impl Display for SpillCompression {
440 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441 let str = match self {
442 Self::Zstd => "zstd",
443 Self::Lz4Frame => "lz4_frame",
444 Self::Uncompressed => "uncompressed",
445 };
446 write!(f, "{str}")
447 }
448}
449
450impl From<SpillCompression> for Option<CompressionType> {
451 fn from(c: SpillCompression) -> Self {
452 match c {
453 SpillCompression::Zstd => Some(CompressionType::ZSTD),
454 SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
455 SpillCompression::Uncompressed => None,
456 }
457 }
458}
459
460config_namespace! {
461 /// Options related to query execution
462 ///
463 /// See also: [`SessionConfig`]
464 ///
465 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
466 pub struct ExecutionOptions {
467 /// Default batch size while creating new batches, it's especially useful for
468 /// buffer-in-memory batches since creating tiny batches would result in too much
469 /// metadata memory consumption
470 pub batch_size: usize, default = 8192
471
472 /// When set to true, record batches will be examined between each operator and
473 /// small batches will be coalesced into larger batches. This is helpful when there
474 /// are highly selective filters or joins that could produce tiny output batches. The
475 /// target batch size is determined by the configuration setting
476 pub coalesce_batches: bool, default = true
477
478 /// Should DataFusion collect statistics when first creating a table.
479 /// Has no effect after the table is created. Applies to the default
480 /// `ListingTableProvider` in DataFusion. Defaults to true.
481 pub collect_statistics: bool, default = true
482
483 /// Number of partitions for query execution. Increasing partitions can increase
484 /// concurrency.
485 ///
486 /// Defaults to the number of CPU cores on the system
487 pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
488
489 /// The default time zone
490 ///
491 /// Some functions, e.g. `now` return timestamps in this time zone
492 pub time_zone: Option<String>, default = None
493
494 /// Parquet options
495 pub parquet: ParquetOptions, default = Default::default()
496
497 /// Fan-out during initial physical planning.
498 ///
499 /// This is mostly use to plan `UNION` children in parallel.
500 ///
501 /// Defaults to the number of CPU cores on the system
502 pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
503
504 /// When set to true, skips verifying that the schema produced by
505 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
506 /// schema of the input plan.
507 ///
508 /// When set to false, if the schema does not match exactly
509 /// (including nullability and metadata), a planning error will be raised.
510 ///
511 /// This is used to workaround bugs in the planner that are now caught by
512 /// the new schema verification step.
513 pub skip_physical_aggregate_schema_check: bool, default = false
514
515 /// Sets the compression codec used when spilling data to disk.
516 ///
517 /// Since datafusion writes spill files using the Arrow IPC Stream format,
518 /// only codecs supported by the Arrow IPC Stream Writer are allowed.
519 /// Valid values are: uncompressed, lz4_frame, zstd.
520 /// Note: lz4_frame offers faster (de)compression, but typically results in
521 /// larger spill files. In contrast, zstd achieves
522 /// higher compression ratios at the cost of slower (de)compression speed.
523 pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
524
525 /// Specifies the reserved memory for each spillable sort operation to
526 /// facilitate an in-memory merge.
527 ///
528 /// When a sort operation spills to disk, the in-memory data must be
529 /// sorted and merged before being written to a file. This setting reserves
530 /// a specific amount of memory for that in-memory sort/merge process.
531 ///
532 /// Note: This setting is irrelevant if the sort operation cannot spill
533 /// (i.e., if there's no `DiskManager` configured).
534 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
535
536 /// When sorting, below what size should data be concatenated
537 /// and sorted in a single RecordBatch rather than sorted in
538 /// batches and merged.
539 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
540
541 /// Maximum size in bytes for individual spill files before rotating to a new file.
542 ///
543 /// When operators spill data to disk (e.g., RepartitionExec), they write
544 /// multiple batches to the same file until this size limit is reached, then rotate
545 /// to a new file. This reduces syscall overhead compared to one-file-per-batch
546 /// while preventing files from growing too large.
547 ///
548 /// A larger value reduces file creation overhead but may hold more disk space.
549 /// A smaller value creates more files but allows finer-grained space reclamation
550 /// as files can be deleted once fully consumed.
551 ///
552 /// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
553 /// may create spill files larger than the limit.
554 ///
555 /// Default: 128 MB
556 pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024
557
558 /// Number of files to read in parallel when inferring schema and statistics
559 pub meta_fetch_concurrency: usize, default = 32
560
561 /// Guarantees a minimum level of output files running in parallel.
562 /// RecordBatches will be distributed in round robin fashion to each
563 /// parallel writer. Each writer is closed and a new file opened once
564 /// soft_max_rows_per_output_file is reached.
565 pub minimum_parallel_output_files: usize, default = 4
566
567 /// Target number of rows in output files when writing multiple.
568 /// This is a soft max, so it can be exceeded slightly. There also
569 /// will be one file smaller than the limit if the total
570 /// number of rows written is not roughly divisible by the soft max
571 pub soft_max_rows_per_output_file: usize, default = 50000000
572
573 /// This is the maximum number of RecordBatches buffered
574 /// for each output file being worked. Higher values can potentially
575 /// give faster write performance at the cost of higher peak
576 /// memory consumption
577 pub max_buffered_batches_per_output_file: usize, default = 2
578
579 /// Should sub directories be ignored when scanning directories for data
580 /// files. Defaults to true (ignores subdirectories), consistent with
581 /// Hive. Note that this setting does not affect reading partitioned
582 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
583 pub listing_table_ignore_subdirectory: bool, default = true
584
585 /// Should a `ListingTable` created through the `ListingTableFactory` infer table
586 /// partitions from Hive compliant directories. Defaults to true (partition columns are
587 /// inferred and will be represented in the table schema).
588 pub listing_table_factory_infer_partitions: bool, default = true
589
590 /// Should DataFusion support recursive CTEs
591 pub enable_recursive_ctes: bool, default = true
592
593 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
594 /// statistics into the same file groups.
595 /// Currently experimental
596 pub split_file_groups_by_statistics: bool, default = false
597
598 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
599 pub keep_partition_by_columns: bool, default = false
600
601 /// Aggregation ratio (number of distinct groups / number of input rows)
602 /// threshold for skipping partial aggregation. If the value is greater
603 /// then partial aggregation will skip aggregation for further input
604 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
605
606 /// Number of input rows partial aggregation partition should process, before
607 /// aggregation ratio check and trying to switch to skipping aggregation mode
608 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
609
610 /// Should DataFusion use row number estimates at the input to decide
611 /// whether increasing parallelism is beneficial or not. By default,
612 /// only exact row numbers (not estimates) are used for this decision.
613 /// Setting this flag to `true` will likely produce better plans.
614 /// if the source of statistics is accurate.
615 /// We plan to make this the default in the future.
616 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
617
618 /// Should DataFusion enforce batch size in joins or not. By default,
619 /// DataFusion will not enforce batch size in joins. Enforcing batch size
620 /// in joins can reduce memory usage when joining large
621 /// tables with a highly-selective join filter, but is also slightly slower.
622 pub enforce_batch_size_in_joins: bool, default = false
623
624 /// Size (bytes) of data buffer DataFusion uses when writing output files.
625 /// This affects the size of the data chunks that are uploaded to remote
626 /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
627 /// written, it may be necessary to increase this size to avoid errors from
628 /// the remote end point.
629 pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
630
631 /// Whether to enable ANSI SQL mode.
632 ///
633 /// The flag is experimental and relevant only for DataFusion Spark built-in functions
634 ///
635 /// When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL
636 /// semantics for expressions, casting, and error handling. This means:
637 /// - **Strict type coercion rules:** implicit casts between incompatible types are disallowed.
638 /// - **Standard SQL arithmetic behavior:** operations such as division by zero,
639 /// numeric overflow, or invalid casts raise runtime errors rather than returning
640 /// `NULL` or adjusted values.
641 /// - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling.
642 ///
643 /// When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive,
644 /// non-ANSI mode designed for user convenience and backward compatibility. In this mode:
645 /// - Implicit casts between types are allowed (e.g., string to integer when possible).
646 /// - Arithmetic operations are more lenient — for example, `abs()` on the minimum
647 /// representable integer value returns the input value instead of raising overflow.
648 /// - Division by zero or invalid casts may return `NULL` instead of failing.
649 ///
650 /// # Default
651 /// `false` — ANSI SQL mode is disabled by default.
652 pub enable_ansi_mode: bool, default = false
653 }
654}
655
656config_namespace! {
657 /// Options for reading and writing parquet files
658 ///
659 /// See also: [`SessionConfig`]
660 ///
661 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
662 pub struct ParquetOptions {
663 // The following options affect reading parquet files
664
665 /// (reading) If true, reads the Parquet data page level metadata (the
666 /// Page Index), if present, to reduce the I/O and number of
667 /// rows decoded.
668 pub enable_page_index: bool, default = true
669
670 /// (reading) If true, the parquet reader attempts to skip entire row groups based
671 /// on the predicate in the query and the metadata (min/max values) stored in
672 /// the parquet file
673 pub pruning: bool, default = true
674
675 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
676 /// the file Schema. This setting can help avoid schema conflicts when querying
677 /// multiple parquet files with schemas containing compatible types but different metadata
678 pub skip_metadata: bool, default = true
679
680 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
681 /// bytes of the parquet file optimistically. If not specified, two reads are required:
682 /// One read to fetch the 8-byte parquet footer and
683 /// another to fetch the metadata length encoded in the footer
684 /// Default setting to 512 KiB, which should be sufficient for most parquet files,
685 /// it can reduce one I/O operation per parquet file. If the metadata is larger than
686 /// the hint, two reads will still be performed.
687 pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
688
689 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
690 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
691 pub pushdown_filters: bool, default = false
692
693 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
694 /// will be reordered heuristically to minimize the cost of evaluation. If false,
695 /// the filters are applied in the same order as written in the query
696 pub reorder_filters: bool, default = false
697
698 /// (reading) Force the use of RowSelections for filter results, when
699 /// pushdown_filters is enabled. If false, the reader will automatically
700 /// choose between a RowSelection and a Bitmap based on the number and
701 /// pattern of selected rows.
702 pub force_filter_selections: bool, default = false
703
704 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
705 /// and `Binary/BinaryLarge` with `BinaryView`.
706 pub schema_force_view_types: bool, default = true
707
708 /// (reading) If true, parquet reader will read columns of
709 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
710 ///
711 /// Parquet files generated by some legacy writers do not correctly set
712 /// the UTF8 flag for strings, causing string columns to be loaded as
713 /// BLOB instead.
714 pub binary_as_string: bool, default = false
715
716 /// (reading) If true, parquet reader will read columns of
717 /// physical type int96 as originating from a different resolution
718 /// than nanosecond. This is useful for reading data from systems like Spark
719 /// which stores microsecond resolution timestamps in an int96 allowing it
720 /// to write values with a larger date range than 64-bit timestamps with
721 /// nanosecond resolution.
722 pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
723
724 /// (reading) Use any available bloom filters when reading parquet files
725 pub bloom_filter_on_read: bool, default = true
726
727 /// (reading) The maximum predicate cache size, in bytes. When
728 /// `pushdown_filters` is enabled, sets the maximum memory used to cache
729 /// the results of predicate evaluation between filter evaluation and
730 /// output generation. Decreasing this value will reduce memory usage,
731 /// but may increase IO and CPU usage. None means use the default
732 /// parquet reader setting. 0 means no caching.
733 pub max_predicate_cache_size: Option<usize>, default = None
734
735 // The following options affect writing to parquet files
736 // and map to parquet::file::properties::WriterProperties
737
738 /// (writing) Sets best effort maximum size of data page in bytes
739 pub data_pagesize_limit: usize, default = 1024 * 1024
740
741 /// (writing) Sets write_batch_size in bytes
742 pub write_batch_size: usize, default = 1024
743
744 /// (writing) Sets parquet writer version
745 /// valid values are "1.0" and "2.0"
746 pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()
747
748 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
749 ///
750 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
751 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
752 pub skip_arrow_metadata: bool, default = false
753
754 /// (writing) Sets default parquet compression codec.
755 /// Valid values are: uncompressed, snappy, gzip(level),
756 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
757 /// These values are not case sensitive. If NULL, uses
758 /// default parquet writer setting
759 ///
760 /// Note that this default setting is not the same as
761 /// the default parquet writer setting.
762 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
763
764 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
765 /// default parquet writer setting
766 pub dictionary_enabled: Option<bool>, default = Some(true)
767
768 /// (writing) Sets best effort maximum dictionary page size, in bytes
769 pub dictionary_page_size_limit: usize, default = 1024 * 1024
770
771 /// (writing) Sets if statistics are enabled for any column
772 /// Valid values are: "none", "chunk", and "page"
773 /// These values are not case sensitive. If NULL, uses
774 /// default parquet writer setting
775 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
776
777 /// (writing) Target maximum number of rows in each row group (defaults to 1M
778 /// rows). Writing larger row groups requires more memory to write, but
779 /// can get better compression and be faster to read.
780 pub max_row_group_size: usize, default = 1024 * 1024
781
782 /// (writing) Sets "created by" property
783 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
784
785 /// (writing) Sets column index truncate length
786 pub column_index_truncate_length: Option<usize>, default = Some(64)
787
788 /// (writing) Sets statistics truncate length. If NULL, uses
789 /// default parquet writer setting
790 pub statistics_truncate_length: Option<usize>, default = Some(64)
791
792 /// (writing) Sets best effort maximum number of rows in data page
793 pub data_page_row_count_limit: usize, default = 20_000
794
795 /// (writing) Sets default encoding for any column.
796 /// Valid values are: plain, plain_dictionary, rle,
797 /// bit_packed, delta_binary_packed, delta_length_byte_array,
798 /// delta_byte_array, rle_dictionary, and byte_stream_split.
799 /// These values are not case sensitive. If NULL, uses
800 /// default parquet writer setting
801 pub encoding: Option<String>, transform = str::to_lowercase, default = None
802
803 /// (writing) Write bloom filters for all columns when creating parquet files
804 pub bloom_filter_on_write: bool, default = false
805
806 /// (writing) Sets bloom filter false positive probability. If NULL, uses
807 /// default parquet writer setting
808 pub bloom_filter_fpp: Option<f64>, default = None
809
810 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
811 /// default parquet writer setting
812 pub bloom_filter_ndv: Option<u64>, default = None
813
814 /// (writing) Controls whether DataFusion will attempt to speed up writing
815 /// parquet files by serializing them in parallel. Each column
816 /// in each row group in each output file are serialized in parallel
817 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
818 pub allow_single_file_parallelism: bool, default = true
819
820 /// (writing) By default parallel parquet writer is tuned for minimum
821 /// memory usage in a streaming execution plan. You may see
822 /// a performance benefit when writing large parquet files
823 /// by increasing maximum_parallel_row_group_writers and
824 /// maximum_buffered_record_batches_per_stream if your system
825 /// has idle cores and can tolerate additional memory usage.
826 /// Boosting these values is likely worthwhile when
827 /// writing out already in-memory data, such as from a cached
828 /// data frame.
829 pub maximum_parallel_row_group_writers: usize, default = 1
830
831 /// (writing) By default parallel parquet writer is tuned for minimum
832 /// memory usage in a streaming execution plan. You may see
833 /// a performance benefit when writing large parquet files
834 /// by increasing maximum_parallel_row_group_writers and
835 /// maximum_buffered_record_batches_per_stream if your system
836 /// has idle cores and can tolerate additional memory usage.
837 /// Boosting these values is likely worthwhile when
838 /// writing out already in-memory data, such as from a cached
839 /// data frame.
840 pub maximum_buffered_record_batches_per_stream: usize, default = 2
841 }
842}
843
844config_namespace! {
845 /// Options for configuring Parquet Modular Encryption
846 ///
847 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
848 pub struct ParquetEncryptionOptions {
849 /// Optional file decryption properties
850 pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
851
852 /// Optional file encryption properties
853 pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
854
855 /// Identifier for the encryption factory to use to create file encryption and decryption properties.
856 /// Encryption factories can be registered in the runtime environment with
857 /// `RuntimeEnv::register_parquet_encryption_factory`.
858 pub factory_id: Option<String>, default = None
859
860 /// Any encryption factory specific options
861 pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
862 }
863}
864
865impl ParquetEncryptionOptions {
866 /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
867 pub fn configure_factory(
868 &mut self,
869 factory_id: &str,
870 config: &impl ExtensionOptions,
871 ) {
872 self.factory_id = Some(factory_id.to_owned());
873 self.factory_options.options.clear();
874 for entry in config.entries() {
875 if let Some(value) = entry.value {
876 self.factory_options.options.insert(entry.key, value);
877 }
878 }
879 }
880}
881
882config_namespace! {
883 /// Options related to query optimization
884 ///
885 /// See also: [`SessionConfig`]
886 ///
887 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
888 pub struct OptimizerOptions {
889 /// When set to true, the optimizer will push a limit operation into
890 /// grouped aggregations which have no aggregate expressions, as a soft limit,
891 /// emitting groups once the limit is reached, before all rows in the group are read.
892 pub enable_distinct_aggregation_soft_limit: bool, default = true
893
894 /// When set to true, the physical plan optimizer will try to add round robin
895 /// repartitioning to increase parallelism to leverage more CPU cores
896 pub enable_round_robin_repartition: bool, default = true
897
898 /// When set to true, the optimizer will attempt to perform limit operations
899 /// during aggregations, if possible
900 pub enable_topk_aggregation: bool, default = true
901
902 /// When set to true, the optimizer will attempt to push limit operations
903 /// past window functions, if possible
904 pub enable_window_limits: bool, default = true
905
906 /// When set to true, the optimizer will attempt to push down TopK dynamic filters
907 /// into the file scan phase.
908 pub enable_topk_dynamic_filter_pushdown: bool, default = true
909
910 /// When set to true, the optimizer will attempt to push down Join dynamic filters
911 /// into the file scan phase.
912 pub enable_join_dynamic_filter_pushdown: bool, default = true
913
914 /// When set to true, the optimizer will attempt to push down Aggregate dynamic filters
915 /// into the file scan phase.
916 pub enable_aggregate_dynamic_filter_pushdown: bool, default = true
917
918 /// When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase.
919 /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
920 /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
921 /// This means that if we already have 10 timestamps in the year 2025
922 /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
923 /// The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown`
924 /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
925 pub enable_dynamic_filter_pushdown: bool, default = true
926
927 /// When set to true, the optimizer will insert filters before a join between
928 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
929 /// filter can add additional overhead when the file format does not fully support
930 /// predicate push down.
931 pub filter_null_join_keys: bool, default = false
932
933 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
934 /// in parallel using the provided `target_partitions` level
935 pub repartition_aggregations: bool, default = true
936
937 /// Minimum total files size in bytes to perform file scan repartitioning.
938 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
939
940 /// Should DataFusion repartition data using the join keys to execute joins in parallel
941 /// using the provided `target_partitions` level
942 pub repartition_joins: bool, default = true
943
944 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
945 /// its inputs do not have any ordering or filtering If the flag is not enabled,
946 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
947 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
948 /// RightAnti, and RightSemi - being produced only at the end of the execution.
949 /// This is not typical in stream processing. Additionally, without proper design for
950 /// long runner execution, all types of joins may encounter out-of-memory errors.
951 pub allow_symmetric_joins_without_pruning: bool, default = true
952
953 /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
954 /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
955 ///
956 /// For FileSources, only Parquet and CSV formats are currently supported.
957 ///
958 /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
959 /// might be partitioned into smaller chunks) for parallel scanning.
960 /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
961 /// happen within a single file.
962 ///
963 /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
964 /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
965 /// the total number of partitions and batches per partition, but does not slice the initial
966 /// record tables provided to the MemTable on creation.
967 pub repartition_file_scans: bool, default = true
968
969 /// Minimum number of distinct partition values required to group files by their
970 /// Hive partition column values (enabling Hash partitioning declaration).
971 ///
972 /// How the option is used:
973 /// - preserve_file_partitions=0: Disable it.
974 /// - preserve_file_partitions=1: Always enable it.
975 /// - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N.
976 /// This threshold preserves I/O parallelism when file partitioning is below it.
977 ///
978 /// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
979 /// partitions is less than the target_partitions.
980 pub preserve_file_partitions: usize, default = 0
981
982 /// Should DataFusion repartition data using the partitions keys to execute window
983 /// functions in parallel using the provided `target_partitions` level
984 pub repartition_windows: bool, default = true
985
986 /// Should DataFusion execute sorts in a per-partition fashion and merge
987 /// afterwards instead of coalescing first and sorting globally.
988 /// With this flag is enabled, plans in the form below
989 ///
990 /// ```text
991 /// "SortExec: [a@0 ASC]",
992 /// " CoalescePartitionsExec",
993 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
994 /// ```
995 /// would turn into the plan below which performs better in multithreaded environments
996 ///
997 /// ```text
998 /// "SortPreservingMergeExec: [a@0 ASC]",
999 /// " SortExec: [a@0 ASC]",
1000 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
1001 /// ```
1002 pub repartition_sorts: bool, default = true
1003
1004 /// Partition count threshold for subset satisfaction optimization.
1005 ///
1006 /// When the current partition count is >= this threshold, DataFusion will
1007 /// skip repartitioning if the required partitioning expression is a subset
1008 /// of the current partition expression such as Hash(a) satisfies Hash(a, b).
1009 ///
1010 /// When the current partition count is < this threshold, DataFusion will
1011 /// repartition to increase parallelism even when subset satisfaction applies.
1012 ///
1013 /// Set to 0 to always repartition (disable subset satisfaction optimization).
1014 /// Set to a high value to always use subset satisfaction.
1015 ///
1016 /// Example (subset_repartition_threshold = 4):
1017 /// ```text
1018 /// Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a])
1019 ///
1020 /// If current partitions (3) < threshold (4), repartition:
1021 /// AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
1022 /// RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3
1023 /// AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
1024 /// DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3)
1025 ///
1026 /// If current partitions (8) >= threshold (4), use subset satisfaction:
1027 /// AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)]
1028 /// DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8)
1029 /// ```
1030 pub subset_repartition_threshold: usize, default = 4
1031
1032 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
1033 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
1034 /// using `SortPreservingMergeExec`)
1035 ///
1036 /// When false, DataFusion will maximize plan parallelism using
1037 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
1038 pub prefer_existing_sort: bool, default = false
1039
1040 /// When set to true, the logical plan optimizer will produce warning
1041 /// messages if any optimization rules produce errors and then proceed to the next
1042 /// rule. When set to false, any rules that produce errors will cause the query to fail
1043 pub skip_failed_rules: bool, default = false
1044
1045 /// Number of times that the optimizer will attempt to optimize the plan
1046 pub max_passes: usize, default = 3
1047
1048 /// When set to true, the physical plan optimizer will run a top down
1049 /// process to reorder the join keys
1050 pub top_down_join_key_reordering: bool, default = true
1051
1052 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
1053 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
1054 pub prefer_hash_join: bool, default = true
1055
1056 /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
1057 /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
1058 /// one range filter.
1059 pub enable_piecewise_merge_join: bool, default = false
1060
1061 /// The maximum estimated size in bytes for one input side of a HashJoin
1062 /// will be collected into a single partition
1063 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
1064
1065 /// The maximum estimated size in rows for one input side of a HashJoin
1066 /// will be collected into a single partition
1067 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
1068
1069 /// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1070 /// Build sides larger than this will use hash table lookups instead.
1071 /// Set to 0 to always use hash table lookups.
1072 ///
1073 /// InList pushdown can be more efficient for small build sides because it can result in better
1074 /// statistics pruning as well as use any bloom filters present on the scan side.
1075 /// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
1076 /// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
1077 ///
1078 /// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
1079 ///
1080 /// The default is 128kB per partition.
1081 /// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
1082 /// but avoids excessive memory usage or overhead for larger joins.
1083 pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
1084
1085 /// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1086 /// Build sides with more rows than this will use hash table lookups instead.
1087 /// Set to 0 to always use hash table lookups.
1088 ///
1089 /// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
1090 /// very large IN lists that might not provide much benefit over hash table lookups.
1091 ///
1092 /// This uses the deduplicated row count once the build side has been evaluated.
1093 ///
1094 /// The default is 150 values per partition.
1095 /// This is inspired by Trino's `max-filter-keys-per-column` setting.
1096 /// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1097 pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1098
1099 /// The default filter selectivity used by Filter Statistics
1100 /// when an exact selectivity cannot be determined. Valid values are
1101 /// between 0 (no selectivity) and 100 (all rows are selected).
1102 pub default_filter_selectivity: u8, default = 20
1103
1104 /// When set to true, the optimizer will not attempt to convert Union to Interleave
1105 pub prefer_existing_union: bool, default = false
1106
1107 /// When set to true, if the returned type is a view type
1108 /// then the output will be coerced to a non-view.
1109 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
1110 pub expand_views_at_output: bool, default = false
1111
1112 /// Enable sort pushdown optimization.
1113 /// When enabled, attempts to push sort requirements down to data sources
1114 /// that can natively handle them (e.g., by reversing file/row group read order).
1115 ///
1116 /// Returns **inexact ordering**: Sort operator is kept for correctness,
1117 /// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N),
1118 /// providing significant speedup.
1119 ///
1120 /// Memory: No additional overhead (only changes read order).
1121 ///
1122 /// Future: Will add option to detect perfectly sorted data and eliminate Sort completely.
1123 ///
1124 /// Default: true
1125 pub enable_sort_pushdown: bool, default = true
1126 }
1127}
1128
1129config_namespace! {
1130 /// Options controlling explain output
1131 ///
1132 /// See also: [`SessionConfig`]
1133 ///
1134 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
1135 pub struct ExplainOptions {
1136 /// When set to true, the explain statement will only print logical plans
1137 pub logical_plan_only: bool, default = false
1138
1139 /// When set to true, the explain statement will only print physical plans
1140 pub physical_plan_only: bool, default = false
1141
1142 /// When set to true, the explain statement will print operator statistics
1143 /// for physical plans
1144 pub show_statistics: bool, default = false
1145
1146 /// When set to true, the explain statement will print the partition sizes
1147 pub show_sizes: bool, default = true
1148
1149 /// When set to true, the explain statement will print schema information
1150 pub show_schema: bool, default = false
1151
1152 /// Display format of explain. Default is "indent".
1153 /// When set to "tree", it will print the plan in a tree-rendered format.
1154 pub format: ExplainFormat, default = ExplainFormat::Indent
1155
1156 /// (format=tree only) Maximum total width of the rendered tree.
1157 /// When set to 0, the tree will have no width limit.
1158 pub tree_maximum_render_width: usize, default = 240
1159
1160 /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1161 /// "summary" shows common metrics for high-level insights.
1162 /// "dev" provides deep operator-level introspection for developers.
1163 pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1164 }
1165}
1166
1167impl ExecutionOptions {
1168 /// Returns the correct parallelism based on the provided `value`.
1169 /// If `value` is `"0"`, returns the default available parallelism, computed with
1170 /// `get_available_parallelism`. Otherwise, returns `value`.
1171 fn normalized_parallelism(value: &str) -> String {
1172 if value.parse::<usize>() == Ok(0) {
1173 get_available_parallelism().to_string()
1174 } else {
1175 value.to_owned()
1176 }
1177 }
1178}
1179
1180config_namespace! {
1181 /// Options controlling the format of output when printing record batches
1182 /// Copies [`arrow::util::display::FormatOptions`]
1183 pub struct FormatOptions {
1184 /// If set to `true` any formatting errors will be written to the output
1185 /// instead of being converted into a [`std::fmt::Error`]
1186 pub safe: bool, default = true
1187 /// Format string for nulls
1188 pub null: String, default = "".into()
1189 /// Date format for date arrays
1190 pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1191 /// Format for DateTime arrays
1192 pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1193 /// Timestamp format for timestamp arrays
1194 pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1195 /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1196 pub timestamp_tz_format: Option<String>, default = None
1197 /// Time format for time arrays
1198 pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1199 /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1200 pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1201 /// Show types in visual representation batches
1202 pub types_info: bool, default = false
1203 }
1204}
1205
1206impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
1207 type Error = DataFusionError;
1208 fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
1209 let duration_format = match self.duration_format.as_str() {
1210 "pretty" => arrow::util::display::DurationFormat::Pretty,
1211 "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1212 _ => {
1213 return _config_err!(
1214 "Invalid duration format: {}. Valid values are pretty or iso8601",
1215 self.duration_format
1216 );
1217 }
1218 };
1219
1220 Ok(arrow::util::display::FormatOptions::new()
1221 .with_display_error(self.safe)
1222 .with_null(&self.null)
1223 .with_date_format(self.date_format.as_deref())
1224 .with_datetime_format(self.datetime_format.as_deref())
1225 .with_timestamp_format(self.timestamp_format.as_deref())
1226 .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
1227 .with_time_format(self.time_format.as_deref())
1228 .with_duration_format(duration_format)
1229 .with_types_info(self.types_info))
1230 }
1231}
1232
1233/// A key value pair, with a corresponding description
1234#[derive(Debug, Hash, PartialEq, Eq)]
1235pub struct ConfigEntry {
1236 /// A unique string to identify this config value
1237 pub key: String,
1238
1239 /// The value if any
1240 pub value: Option<String>,
1241
1242 /// A description of this configuration entry
1243 pub description: &'static str,
1244}
1245
1246/// Configuration options struct, able to store both built-in configuration and custom options
1247#[derive(Debug, Clone, Default)]
1248#[non_exhaustive]
1249pub struct ConfigOptions {
1250 /// Catalog options
1251 pub catalog: CatalogOptions,
1252 /// Execution options
1253 pub execution: ExecutionOptions,
1254 /// Optimizer options
1255 pub optimizer: OptimizerOptions,
1256 /// SQL parser options
1257 pub sql_parser: SqlParserOptions,
1258 /// Explain options
1259 pub explain: ExplainOptions,
1260 /// Optional extensions registered using [`Extensions::insert`]
1261 pub extensions: Extensions,
1262 /// Formatting options when printing batches
1263 pub format: FormatOptions,
1264}
1265
1266impl ConfigField for ConfigOptions {
1267 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1268 self.catalog.visit(v, "datafusion.catalog", "");
1269 self.execution.visit(v, "datafusion.execution", "");
1270 self.optimizer.visit(v, "datafusion.optimizer", "");
1271 self.explain.visit(v, "datafusion.explain", "");
1272 self.sql_parser.visit(v, "datafusion.sql_parser", "");
1273 self.format.visit(v, "datafusion.format", "");
1274 }
1275
1276 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1277 // Extensions are handled in the public `ConfigOptions::set`
1278 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1279 match key {
1280 "catalog" => self.catalog.set(rem, value),
1281 "execution" => self.execution.set(rem, value),
1282 "optimizer" => self.optimizer.set(rem, value),
1283 "explain" => self.explain.set(rem, value),
1284 "sql_parser" => self.sql_parser.set(rem, value),
1285 "format" => self.format.set(rem, value),
1286 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1287 }
1288 }
1289
1290 /// Reset a configuration option back to its default value
1291 fn reset(&mut self, key: &str) -> Result<()> {
1292 let Some((prefix, rest)) = key.split_once('.') else {
1293 return _config_err!("could not find config namespace for key \"{key}\"");
1294 };
1295
1296 if prefix != "datafusion" {
1297 return _config_err!("Could not find config namespace \"{prefix}\"");
1298 }
1299
1300 let (section, rem) = rest.split_once('.').unwrap_or((rest, ""));
1301 if rem.is_empty() {
1302 return _config_err!("could not find config field for key \"{key}\"");
1303 }
1304
1305 match section {
1306 "catalog" => self.catalog.reset(rem),
1307 "execution" => self.execution.reset(rem),
1308 "optimizer" => {
1309 if rem == "enable_dynamic_filter_pushdown" {
1310 let defaults = OptimizerOptions::default();
1311 self.optimizer.enable_dynamic_filter_pushdown =
1312 defaults.enable_dynamic_filter_pushdown;
1313 self.optimizer.enable_topk_dynamic_filter_pushdown =
1314 defaults.enable_topk_dynamic_filter_pushdown;
1315 self.optimizer.enable_join_dynamic_filter_pushdown =
1316 defaults.enable_join_dynamic_filter_pushdown;
1317 Ok(())
1318 } else {
1319 self.optimizer.reset(rem)
1320 }
1321 }
1322 "explain" => self.explain.reset(rem),
1323 "sql_parser" => self.sql_parser.reset(rem),
1324 "format" => self.format.reset(rem),
1325 other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
1326 }
1327 }
1328}
1329
1330impl ConfigOptions {
1331 /// Creates a new [`ConfigOptions`] with default values
1332 pub fn new() -> Self {
1333 Self::default()
1334 }
1335
1336 /// Set extensions to provided value
1337 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1338 self.extensions = extensions;
1339 self
1340 }
1341
1342 /// Set a configuration option
1343 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1344 let Some((prefix, key)) = key.split_once('.') else {
1345 return _config_err!("could not find config namespace for key \"{key}\"");
1346 };
1347
1348 if prefix == "datafusion" {
1349 if key == "optimizer.enable_dynamic_filter_pushdown" {
1350 let bool_value = value.parse::<bool>().map_err(|e| {
1351 DataFusionError::Configuration(format!(
1352 "Failed to parse '{value}' as bool: {e}",
1353 ))
1354 })?;
1355
1356 {
1357 self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1358 self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1359 self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1360 self.optimizer.enable_aggregate_dynamic_filter_pushdown = bool_value;
1361 }
1362 return Ok(());
1363 }
1364 return ConfigField::set(self, key, value);
1365 }
1366
1367 let Some(e) = self.extensions.0.get_mut(prefix) else {
1368 return _config_err!("Could not find config namespace \"{prefix}\"");
1369 };
1370 e.0.set(key, value)
1371 }
1372
1373 /// Create new [`ConfigOptions`], taking values from environment variables
1374 /// where possible.
1375 ///
1376 /// For example, to configure `datafusion.execution.batch_size`
1377 /// ([`ExecutionOptions::batch_size`]) you would set the
1378 /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1379 ///
1380 /// The name of the environment variable is the option's key, transformed to
1381 /// uppercase and with periods replaced with underscores.
1382 ///
1383 /// Values are parsed according to the [same rules used in casts from
1384 /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1385 ///
1386 /// If the value in the environment variable cannot be cast to the type of
1387 /// the configuration option, the default value will be used instead and a
1388 /// warning emitted. Environment variables are read when this method is
1389 /// called, and are not re-read later.
1390 pub fn from_env() -> Result<Self> {
1391 struct Visitor(Vec<String>);
1392
1393 impl Visit for Visitor {
1394 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1395 self.0.push(key.to_string())
1396 }
1397
1398 fn none(&mut self, key: &str, _: &'static str) {
1399 self.0.push(key.to_string())
1400 }
1401 }
1402
1403 // Extract the names of all fields and then look up the corresponding
1404 // environment variables. This isn't hugely efficient but avoids
1405 // ambiguity between `a.b` and `a_b` which would both correspond
1406 // to an environment variable of `A_B`
1407
1408 let mut keys = Visitor(vec![]);
1409 let mut ret = Self::default();
1410 ret.visit(&mut keys, "datafusion", "");
1411
1412 for key in keys.0 {
1413 let env = key.to_uppercase().replace('.', "_");
1414 if let Some(var) = std::env::var_os(env) {
1415 let value = var.to_string_lossy();
1416 log::info!("Set {key} to {value} from the environment variable");
1417 ret.set(&key, value.as_ref())?;
1418 }
1419 }
1420
1421 Ok(ret)
1422 }
1423
1424 /// Create new ConfigOptions struct, taking values from a string hash map.
1425 ///
1426 /// Only the built-in configurations will be extracted from the hash map
1427 /// and other key value pairs will be ignored.
1428 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1429 struct Visitor(Vec<String>);
1430
1431 impl Visit for Visitor {
1432 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1433 self.0.push(key.to_string())
1434 }
1435
1436 fn none(&mut self, key: &str, _: &'static str) {
1437 self.0.push(key.to_string())
1438 }
1439 }
1440
1441 let mut keys = Visitor(vec![]);
1442 let mut ret = Self::default();
1443 ret.visit(&mut keys, "datafusion", "");
1444
1445 for key in keys.0 {
1446 if let Some(var) = settings.get(&key) {
1447 ret.set(&key, var)?;
1448 }
1449 }
1450
1451 Ok(ret)
1452 }
1453
1454 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1455 pub fn entries(&self) -> Vec<ConfigEntry> {
1456 struct Visitor(Vec<ConfigEntry>);
1457
1458 impl Visit for Visitor {
1459 fn some<V: Display>(
1460 &mut self,
1461 key: &str,
1462 value: V,
1463 description: &'static str,
1464 ) {
1465 self.0.push(ConfigEntry {
1466 key: key.to_string(),
1467 value: Some(value.to_string()),
1468 description,
1469 })
1470 }
1471
1472 fn none(&mut self, key: &str, description: &'static str) {
1473 self.0.push(ConfigEntry {
1474 key: key.to_string(),
1475 value: None,
1476 description,
1477 })
1478 }
1479 }
1480
1481 let mut v = Visitor(vec![]);
1482 self.visit(&mut v, "datafusion", "");
1483
1484 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1485 v.0
1486 }
1487
1488 /// Generate documentation that can be included in the user guide
1489 pub fn generate_config_markdown() -> String {
1490 use std::fmt::Write as _;
1491
1492 let mut s = Self::default();
1493
1494 // Normalize for display
1495 s.execution.target_partitions = 0;
1496 s.execution.planning_concurrency = 0;
1497
1498 let mut docs = "| key | default | description |\n".to_string();
1499 docs += "|-----|---------|-------------|\n";
1500 let mut entries = s.entries();
1501 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1502
1503 for entry in s.entries() {
1504 let _ = writeln!(
1505 &mut docs,
1506 "| {} | {} | {} |",
1507 entry.key,
1508 entry.value.as_deref().unwrap_or("NULL"),
1509 entry.description
1510 );
1511 }
1512 docs
1513 }
1514}
1515
1516/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1517/// within DataFusion [`ConfigOptions`]
1518///
1519/// This mechanism can be used to pass configuration to user defined functions
1520/// or optimizer passes
1521///
1522/// # Example
1523/// ```
1524/// use datafusion_common::{
1525/// config::ConfigExtension, config::ConfigOptions, extensions_options,
1526/// };
1527/// // Define a new configuration struct using the `extensions_options` macro
1528/// extensions_options! {
1529/// /// My own config options.
1530/// pub struct MyConfig {
1531/// /// Should "foo" be replaced by "bar"?
1532/// pub foo_to_bar: bool, default = true
1533///
1534/// /// How many "baz" should be created?
1535/// pub baz_count: usize, default = 1337
1536/// }
1537/// }
1538///
1539/// impl ConfigExtension for MyConfig {
1540/// const PREFIX: &'static str = "my_config";
1541/// }
1542///
1543/// // set up config struct and register extension
1544/// let mut config = ConfigOptions::default();
1545/// config.extensions.insert(MyConfig::default());
1546///
1547/// // overwrite config default
1548/// config.set("my_config.baz_count", "42").unwrap();
1549///
1550/// // check config state
1551/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1552/// assert!(my_config.foo_to_bar,);
1553/// assert_eq!(my_config.baz_count, 42,);
1554/// ```
1555///
1556/// # Note:
1557/// Unfortunately associated constants are not currently object-safe, and so this
1558/// extends the object-safe [`ExtensionOptions`]
1559pub trait ConfigExtension: ExtensionOptions {
1560 /// Configuration namespace prefix to use
1561 ///
1562 /// All values under this will be prefixed with `$PREFIX + "."`
1563 const PREFIX: &'static str;
1564}
1565
1566/// An object-safe API for storing arbitrary configuration.
1567///
1568/// See [`ConfigExtension`] for user defined configuration
1569pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1570 /// Return `self` as [`Any`]
1571 ///
1572 /// This is needed until trait upcasting is stabilized
1573 fn as_any(&self) -> &dyn Any;
1574
1575 /// Return `self` as [`Any`]
1576 ///
1577 /// This is needed until trait upcasting is stabilized
1578 fn as_any_mut(&mut self) -> &mut dyn Any;
1579
1580 /// Return a deep clone of this [`ExtensionOptions`]
1581 ///
1582 /// It is important this does not share mutable state to avoid consistency issues
1583 /// with configuration changing whilst queries are executing
1584 fn cloned(&self) -> Box<dyn ExtensionOptions>;
1585
1586 /// Set the given `key`, `value` pair
1587 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1588
1589 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1590 fn entries(&self) -> Vec<ConfigEntry>;
1591}
1592
1593/// A type-safe container for [`ConfigExtension`]
1594#[derive(Debug, Default, Clone)]
1595pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1596
1597impl Extensions {
1598 /// Create a new, empty [`Extensions`]
1599 pub fn new() -> Self {
1600 Self(BTreeMap::new())
1601 }
1602
1603 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1604 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1605 assert_ne!(T::PREFIX, "datafusion");
1606 let e = ExtensionBox(Box::new(extension));
1607 self.0.insert(T::PREFIX, e);
1608 }
1609
1610 /// Retrieves the extension of the given type if any
1611 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1612 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1613 }
1614
1615 /// Retrieves the extension of the given type if any
1616 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1617 let e = self.0.get_mut(T::PREFIX)?;
1618 e.0.as_any_mut().downcast_mut()
1619 }
1620
1621 /// Iterates all the config extension entries yielding their prefix and their
1622 /// [ExtensionOptions] implementation.
1623 pub fn iter(
1624 &self,
1625 ) -> impl Iterator<Item = (&'static str, &Box<dyn ExtensionOptions>)> {
1626 self.0.iter().map(|(k, v)| (*k, &v.0))
1627 }
1628}
1629
1630#[derive(Debug)]
1631struct ExtensionBox(Box<dyn ExtensionOptions>);
1632
1633impl Clone for ExtensionBox {
1634 fn clone(&self) -> Self {
1635 Self(self.0.cloned())
1636 }
1637}
1638
1639/// A trait implemented by `config_namespace` and for field types that provides
1640/// the ability to walk and mutate the configuration tree
1641pub trait ConfigField {
1642 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1643
1644 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1645
1646 fn reset(&mut self, key: &str) -> Result<()> {
1647 _config_err!("Reset is not supported for this config field, key: {}", key)
1648 }
1649}
1650
1651impl<F: ConfigField + Default> ConfigField for Option<F> {
1652 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1653 match self {
1654 Some(s) => s.visit(v, key, description),
1655 None => v.none(key, description),
1656 }
1657 }
1658
1659 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1660 self.get_or_insert_with(Default::default).set(key, value)
1661 }
1662
1663 fn reset(&mut self, key: &str) -> Result<()> {
1664 if key.is_empty() {
1665 *self = Default::default();
1666 Ok(())
1667 } else {
1668 self.get_or_insert_with(Default::default).reset(key)
1669 }
1670 }
1671}
1672
1673/// Default transformation to parse a [`ConfigField`] for a string.
1674///
1675/// This uses [`FromStr`] to parse the data.
1676pub fn default_config_transform<T>(input: &str) -> Result<T>
1677where
1678 T: FromStr,
1679 <T as FromStr>::Err: Sync + Send + Error + 'static,
1680{
1681 input.parse().map_err(|e| {
1682 DataFusionError::Context(
1683 format!(
1684 "Error parsing '{}' as {}",
1685 input,
1686 std::any::type_name::<T>()
1687 ),
1688 Box::new(DataFusionError::External(Box::new(e))),
1689 )
1690 })
1691}
1692
1693/// Macro that generates [`ConfigField`] for a given type.
1694///
1695/// # Usage
1696/// This always requires [`Display`] to be implemented for the given type.
1697///
1698/// There are two ways to invoke this macro. The first one uses
1699/// [`default_config_transform`]/[`FromStr`] to parse the data:
1700///
1701/// ```ignore
1702/// config_field(MyType);
1703/// ```
1704///
1705/// Note that the parsing error MUST implement [`std::error::Error`]!
1706///
1707/// Or you can specify how you want to parse an [`str`] into the type:
1708///
1709/// ```ignore
1710/// fn parse_it(s: &str) -> Result<MyType> {
1711/// ...
1712/// }
1713///
1714/// config_field(
1715/// MyType,
1716/// value => parse_it(value)
1717/// );
1718/// ```
1719#[macro_export]
1720macro_rules! config_field {
1721 ($t:ty) => {
1722 config_field!($t, value => $crate::config::default_config_transform(value)?);
1723 };
1724
1725 ($t:ty, $arg:ident => $transform:expr) => {
1726 impl $crate::config::ConfigField for $t {
1727 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1728 v.some(key, self, description)
1729 }
1730
1731 fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1732 *self = $transform;
1733 Ok(())
1734 }
1735
1736 fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
1737 if key.is_empty() {
1738 *self = <$t as Default>::default();
1739 Ok(())
1740 } else {
1741 $crate::error::_config_err!(
1742 "Config field is a scalar {} and does not have nested field \"{}\"",
1743 stringify!($t),
1744 key
1745 )
1746 }
1747 }
1748 }
1749 };
1750}
1751
1752config_field!(String);
1753config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1754config_field!(usize);
1755config_field!(f64);
1756config_field!(u64);
1757config_field!(u32);
1758
1759impl ConfigField for u8 {
1760 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1761 v.some(key, self, description)
1762 }
1763
1764 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1765 if value.is_empty() {
1766 return Err(DataFusionError::Configuration(format!(
1767 "Input string for {key} key is empty"
1768 )));
1769 }
1770 // Check if the string is a valid number
1771 if let Ok(num) = value.parse::<u8>() {
1772 // TODO: Let's decide how we treat the numerical strings.
1773 *self = num;
1774 } else {
1775 let bytes = value.as_bytes();
1776 // Check if the first character is ASCII (single byte)
1777 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1778 return Err(DataFusionError::Configuration(format!(
1779 "Error parsing {value} as u8. Non-ASCII string provided"
1780 )));
1781 }
1782 *self = bytes[0];
1783 }
1784 Ok(())
1785 }
1786}
1787
1788impl ConfigField for CompressionTypeVariant {
1789 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1790 v.some(key, self, description)
1791 }
1792
1793 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1794 *self = CompressionTypeVariant::from_str(value)?;
1795 Ok(())
1796 }
1797}
1798
1799/// An implementation trait used to recursively walk configuration
1800pub trait Visit {
1801 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1802
1803 fn none(&mut self, key: &str, description: &'static str);
1804}
1805
1806/// Convenience macro to create [`ExtensionsOptions`].
1807///
1808/// The created structure implements the following traits:
1809///
1810/// - [`Clone`]
1811/// - [`Debug`]
1812/// - [`Default`]
1813/// - [`ExtensionOptions`]
1814///
1815/// # Usage
1816/// The syntax is:
1817///
1818/// ```text
1819/// extensions_options! {
1820/// /// Struct docs (optional).
1821/// [<vis>] struct <StructName> {
1822/// /// Field docs (optional)
1823/// [<vis>] <field_name>: <field_type>, default = <default_value>
1824///
1825/// ... more fields
1826/// }
1827/// }
1828/// ```
1829///
1830/// The placeholders are:
1831/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1832/// - `<StructName>`: Struct name like `MyStruct`.
1833/// - `<field_name>`: Field name like `my_field`.
1834/// - `<field_type>`: Field type like `u8`.
1835/// - `<default_value>`: Default value matching the field type like `42`.
1836///
1837/// # Example
1838/// See also a full example on the [`ConfigExtension`] documentation
1839///
1840/// ```
1841/// use datafusion_common::extensions_options;
1842///
1843/// extensions_options! {
1844/// /// My own config options.
1845/// pub struct MyConfig {
1846/// /// Should "foo" be replaced by "bar"?
1847/// pub foo_to_bar: bool, default = true
1848///
1849/// /// How many "baz" should be created?
1850/// pub baz_count: usize, default = 1337
1851/// }
1852/// }
1853/// ```
1854///
1855///
1856/// [`Debug`]: std::fmt::Debug
1857/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1858#[macro_export]
1859macro_rules! extensions_options {
1860 (
1861 $(#[doc = $struct_d:tt])*
1862 $vis:vis struct $struct_name:ident {
1863 $(
1864 $(#[doc = $d:tt])*
1865 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1866 )*$(,)*
1867 }
1868 ) => {
1869 $(#[doc = $struct_d])*
1870 #[derive(Debug, Clone)]
1871 #[non_exhaustive]
1872 $vis struct $struct_name{
1873 $(
1874 $(#[doc = $d])*
1875 $field_vis $field_name : $field_type,
1876 )*
1877 }
1878
1879 impl Default for $struct_name {
1880 fn default() -> Self {
1881 Self {
1882 $($field_name: $default),*
1883 }
1884 }
1885 }
1886
1887 impl $crate::config::ExtensionOptions for $struct_name {
1888 fn as_any(&self) -> &dyn ::std::any::Any {
1889 self
1890 }
1891
1892 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1893 self
1894 }
1895
1896 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1897 Box::new(self.clone())
1898 }
1899
1900 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1901 $crate::config::ConfigField::set(self, key, value)
1902 }
1903
1904 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1905 struct Visitor(Vec<$crate::config::ConfigEntry>);
1906
1907 impl $crate::config::Visit for Visitor {
1908 fn some<V: std::fmt::Display>(
1909 &mut self,
1910 key: &str,
1911 value: V,
1912 description: &'static str,
1913 ) {
1914 self.0.push($crate::config::ConfigEntry {
1915 key: key.to_string(),
1916 value: Some(value.to_string()),
1917 description,
1918 })
1919 }
1920
1921 fn none(&mut self, key: &str, description: &'static str) {
1922 self.0.push($crate::config::ConfigEntry {
1923 key: key.to_string(),
1924 value: None,
1925 description,
1926 })
1927 }
1928 }
1929
1930 let mut v = Visitor(vec![]);
1931 // The prefix is not used for extensions.
1932 // The description is generated in ConfigField::visit.
1933 // We can just pass empty strings here.
1934 $crate::config::ConfigField::visit(self, &mut v, "", "");
1935 v.0
1936 }
1937 }
1938
1939 impl $crate::config::ConfigField for $struct_name {
1940 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1941 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1942 match key {
1943 $(
1944 stringify!($field_name) => {
1945 // Safely apply deprecated attribute if present
1946 // $(#[allow(deprecated)])?
1947 {
1948 self.$field_name.set(rem, value.as_ref())
1949 }
1950 },
1951 )*
1952 _ => return $crate::error::_config_err!(
1953 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1954 )
1955 }
1956 }
1957
1958 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1959 $(
1960 let key = stringify!($field_name).to_string();
1961 let desc = concat!($($d),*).trim();
1962 self.$field_name.visit(v, key.as_str(), desc);
1963 )*
1964 }
1965 }
1966 }
1967}
1968
1969/// These file types have special built in behavior for configuration.
1970/// Use TableOptions::Extensions for configuring other file types.
1971#[derive(Debug, Clone)]
1972pub enum ConfigFileType {
1973 CSV,
1974 #[cfg(feature = "parquet")]
1975 PARQUET,
1976 JSON,
1977}
1978
1979/// Represents the configuration options available for handling different table formats within a data processing application.
1980/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1981/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1982#[derive(Debug, Clone, Default)]
1983pub struct TableOptions {
1984 /// Configuration options for CSV file handling. This includes settings like the delimiter,
1985 /// quote character, and whether the first row is considered as headers.
1986 pub csv: CsvOptions,
1987
1988 /// Configuration options for Parquet file handling. This includes settings for compression,
1989 /// encoding, and other Parquet-specific file characteristics.
1990 pub parquet: TableParquetOptions,
1991
1992 /// Configuration options for JSON file handling.
1993 pub json: JsonOptions,
1994
1995 /// The current file format that the table operations should assume. This option allows
1996 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1997 pub current_format: Option<ConfigFileType>,
1998
1999 /// Optional extensions that can be used to extend or customize the behavior of the table
2000 /// options. Extensions can be registered using `Extensions::insert` and might include
2001 /// custom file handling logic, additional configuration parameters, or other enhancements.
2002 pub extensions: Extensions,
2003}
2004
2005impl ConfigField for TableOptions {
2006 /// Visits configuration settings for the current file format, or all formats if none is selected.
2007 ///
2008 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
2009 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
2010 /// it visits all available format settings.
2011 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
2012 if let Some(file_type) = &self.current_format {
2013 match file_type {
2014 #[cfg(feature = "parquet")]
2015 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
2016 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
2017 ConfigFileType::JSON => self.json.visit(v, "format", ""),
2018 }
2019 } else {
2020 self.csv.visit(v, "csv", "");
2021 self.parquet.visit(v, "parquet", "");
2022 self.json.visit(v, "json", "");
2023 }
2024 }
2025
2026 /// Sets a configuration value for a specific key within `TableOptions`.
2027 ///
2028 /// This method delegates setting configuration values to the specific file format configurations,
2029 /// based on the current format selected. If no format is selected, it returns an error.
2030 ///
2031 /// # Parameters
2032 ///
2033 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
2034 /// for CSV format.
2035 /// * `value`: The value to set for the specified configuration key.
2036 ///
2037 /// # Returns
2038 ///
2039 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
2040 /// or if setting the configuration value fails for the specific format.
2041 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2042 // Extensions are handled in the public `ConfigOptions::set`
2043 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2044 match key {
2045 "format" => {
2046 let Some(format) = &self.current_format else {
2047 return _config_err!("Specify a format for TableOptions");
2048 };
2049 match format {
2050 #[cfg(feature = "parquet")]
2051 ConfigFileType::PARQUET => self.parquet.set(rem, value),
2052 ConfigFileType::CSV => self.csv.set(rem, value),
2053 ConfigFileType::JSON => self.json.set(rem, value),
2054 }
2055 }
2056 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
2057 }
2058 }
2059}
2060
2061impl TableOptions {
2062 /// Constructs a new instance of `TableOptions` with default settings.
2063 ///
2064 /// # Returns
2065 ///
2066 /// A new `TableOptions` instance with default configuration values.
2067 pub fn new() -> Self {
2068 Self::default()
2069 }
2070
2071 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
2072 ///
2073 /// # Parameters
2074 ///
2075 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
2076 ///
2077 /// # Returns
2078 ///
2079 /// A new `TableOptions` instance with settings applied from the session config.
2080 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
2081 let initial = TableOptions::default();
2082 initial.combine_with_session_config(config)
2083 }
2084
2085 /// Updates the current `TableOptions` with settings from a given session config.
2086 ///
2087 /// # Parameters
2088 ///
2089 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
2090 ///
2091 /// # Returns
2092 ///
2093 /// A new `TableOptions` instance with updated settings from the session config.
2094 #[must_use = "this method returns a new instance"]
2095 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
2096 let mut clone = self.clone();
2097 clone.parquet.global = config.execution.parquet.clone();
2098 clone
2099 }
2100
2101 /// Sets the file format for the table.
2102 ///
2103 /// # Parameters
2104 ///
2105 /// * `format`: The file format to use (e.g., CSV, Parquet).
2106 pub fn set_config_format(&mut self, format: ConfigFileType) {
2107 self.current_format = Some(format);
2108 }
2109
2110 /// Sets the extensions for this `TableOptions` instance.
2111 ///
2112 /// # Parameters
2113 ///
2114 /// * `extensions`: The `Extensions` instance to set.
2115 ///
2116 /// # Returns
2117 ///
2118 /// A new `TableOptions` instance with the specified extensions applied.
2119 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2120 self.extensions = extensions;
2121 self
2122 }
2123
2124 /// Sets a specific configuration option.
2125 ///
2126 /// # Parameters
2127 ///
2128 /// * `key`: The configuration key (e.g., "format.delimiter").
2129 /// * `value`: The value to set for the specified key.
2130 ///
2131 /// # Returns
2132 ///
2133 /// A result indicating success or failure in setting the configuration option.
2134 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2135 let Some((prefix, _)) = key.split_once('.') else {
2136 return _config_err!("could not find config namespace for key \"{key}\"");
2137 };
2138
2139 if prefix == "format" {
2140 return ConfigField::set(self, key, value);
2141 }
2142
2143 if prefix == "execution" {
2144 return Ok(());
2145 }
2146
2147 let Some(e) = self.extensions.0.get_mut(prefix) else {
2148 return _config_err!("Could not find config namespace \"{prefix}\"");
2149 };
2150 e.0.set(key, value)
2151 }
2152
2153 /// Initializes a new `TableOptions` from a hash map of string settings.
2154 ///
2155 /// # Parameters
2156 ///
2157 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2158 ///
2159 /// # Returns
2160 ///
2161 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
2162 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
2163 let mut ret = Self::default();
2164 for (k, v) in settings {
2165 ret.set(k, v)?;
2166 }
2167
2168 Ok(ret)
2169 }
2170
2171 /// Modifies the current `TableOptions` instance with settings from a hash map.
2172 ///
2173 /// # Parameters
2174 ///
2175 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
2176 ///
2177 /// # Returns
2178 ///
2179 /// A result indicating success or failure in applying the settings.
2180 pub fn alter_with_string_hash_map(
2181 &mut self,
2182 settings: &HashMap<String, String>,
2183 ) -> Result<()> {
2184 for (k, v) in settings {
2185 self.set(k, v)?;
2186 }
2187 Ok(())
2188 }
2189
2190 /// Retrieves all configuration entries from this `TableOptions`.
2191 ///
2192 /// # Returns
2193 ///
2194 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
2195 pub fn entries(&self) -> Vec<ConfigEntry> {
2196 struct Visitor(Vec<ConfigEntry>);
2197
2198 impl Visit for Visitor {
2199 fn some<V: Display>(
2200 &mut self,
2201 key: &str,
2202 value: V,
2203 description: &'static str,
2204 ) {
2205 self.0.push(ConfigEntry {
2206 key: key.to_string(),
2207 value: Some(value.to_string()),
2208 description,
2209 })
2210 }
2211
2212 fn none(&mut self, key: &str, description: &'static str) {
2213 self.0.push(ConfigEntry {
2214 key: key.to_string(),
2215 value: None,
2216 description,
2217 })
2218 }
2219 }
2220
2221 let mut v = Visitor(vec![]);
2222 self.visit(&mut v, "format", "");
2223
2224 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
2225 v.0
2226 }
2227}
2228
2229/// Options that control how Parquet files are read, including global options
2230/// that apply to all columns and optional column-specific overrides
2231///
2232/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
2233/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2234/// (e.g. sorting_columns).
2235#[derive(Clone, Default, Debug, PartialEq)]
2236pub struct TableParquetOptions {
2237 /// Global Parquet options that propagates to all columns.
2238 pub global: ParquetOptions,
2239 /// Column specific options. Default usage is parquet.XX::column.
2240 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2241 /// Additional file-level metadata to include. Inserted into the key_value_metadata
2242 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2243 ///
2244 /// Multiple entries are permitted
2245 /// ```sql
2246 /// OPTIONS (
2247 /// 'format.metadata::key1' '',
2248 /// 'format.metadata::key2' 'value',
2249 /// 'format.metadata::key3' 'value has spaces',
2250 /// 'format.metadata::key4' 'value has special chars :: :',
2251 /// 'format.metadata::key_dupe' 'original will be overwritten',
2252 /// 'format.metadata::key_dupe' 'final'
2253 /// )
2254 /// ```
2255 pub key_value_metadata: HashMap<String, Option<String>>,
2256 /// Options for configuring Parquet modular encryption
2257 ///
2258 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2259 /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2260 /// These can be set via 'format.crypto', for example:
2261 /// ```sql
2262 /// OPTIONS (
2263 /// 'format.crypto.file_encryption.encrypt_footer' 'true',
2264 /// 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" */
2265 /// 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2266 /// 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2267 /// -- Same for decryption
2268 /// 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2269 /// 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2270 /// 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2271 /// )
2272 /// ```
2273 /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2274 /// Note that keys must be provided as in hex format since these are binary strings.
2275 pub crypto: ParquetEncryptionOptions,
2276}
2277
2278impl TableParquetOptions {
2279 /// Return new default TableParquetOptions
2280 pub fn new() -> Self {
2281 Self::default()
2282 }
2283
2284 /// Set whether the encoding of the arrow metadata should occur
2285 /// during the writing of parquet.
2286 ///
2287 /// Default is to encode the arrow schema in the file kv_metadata.
2288 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2289 Self {
2290 global: ParquetOptions {
2291 skip_arrow_metadata: skip,
2292 ..self.global
2293 },
2294 ..self
2295 }
2296 }
2297
2298 /// Retrieves all configuration entries from this `TableParquetOptions`.
2299 ///
2300 /// # Returns
2301 ///
2302 /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2303 pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2304 struct Visitor(Vec<ConfigEntry>);
2305
2306 impl Visit for Visitor {
2307 fn some<V: Display>(
2308 &mut self,
2309 key: &str,
2310 value: V,
2311 description: &'static str,
2312 ) {
2313 self.0.push(ConfigEntry {
2314 key: key[1..].to_string(),
2315 value: Some(value.to_string()),
2316 description,
2317 })
2318 }
2319
2320 fn none(&mut self, key: &str, description: &'static str) {
2321 self.0.push(ConfigEntry {
2322 key: key[1..].to_string(),
2323 value: None,
2324 description,
2325 })
2326 }
2327 }
2328
2329 let mut v = Visitor(vec![]);
2330 self.visit(&mut v, "", "");
2331
2332 v.0
2333 }
2334}
2335
2336impl ConfigField for TableParquetOptions {
2337 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2338 self.global.visit(v, key_prefix, description);
2339 self.column_specific_options
2340 .visit(v, key_prefix, description);
2341 self.crypto
2342 .visit(v, &format!("{key_prefix}.crypto"), description);
2343 }
2344
2345 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2346 // Determine if the key is a global, metadata, or column-specific setting
2347 if key.starts_with("metadata::") {
2348 let k = match key.split("::").collect::<Vec<_>>()[..] {
2349 [_meta] | [_meta, ""] => {
2350 return _config_err!(
2351 "Invalid metadata key provided, missing key in metadata::<key>"
2352 );
2353 }
2354 [_meta, k] => k.into(),
2355 _ => {
2356 return _config_err!(
2357 "Invalid metadata key provided, found too many '::' in \"{key}\""
2358 );
2359 }
2360 };
2361 self.key_value_metadata.insert(k, Some(value.into()));
2362 Ok(())
2363 } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2364 self.crypto.set(crypto_feature, value)
2365 } else if key.contains("::") {
2366 self.column_specific_options.set(key, value)
2367 } else {
2368 self.global.set(key, value)
2369 }
2370 }
2371}
2372
2373macro_rules! config_namespace_with_hashmap {
2374 (
2375 $(#[doc = $struct_d:tt])*
2376 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
2377 $vis:vis struct $struct_name:ident {
2378 $(
2379 $(#[doc = $d:tt])*
2380 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2381 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2382 )*$(,)*
2383 }
2384 ) => {
2385
2386 $(#[doc = $struct_d])*
2387 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
2388 #[derive(Debug, Clone, PartialEq)]
2389 $vis struct $struct_name{
2390 $(
2391 $(#[doc = $d])*
2392 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2393 $field_vis $field_name : $field_type,
2394 )*
2395 }
2396
2397 impl ConfigField for $struct_name {
2398 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2399 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2400 match key {
2401 $(
2402 stringify!($field_name) => {
2403 // Handle deprecated fields
2404 $(let value = $transform(value);)?
2405 self.$field_name.set(rem, value.as_ref())
2406 },
2407 )*
2408 _ => _config_err!(
2409 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2410 )
2411 }
2412 }
2413
2414 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2415 $(
2416 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2417 let desc = concat!($($d),*).trim();
2418 // Handle deprecated fields
2419 self.$field_name.visit(v, key.as_str(), desc);
2420 )*
2421 }
2422 }
2423
2424 impl Default for $struct_name {
2425 fn default() -> Self {
2426 Self {
2427 $($field_name: $default),*
2428 }
2429 }
2430 }
2431
2432 impl ConfigField for HashMap<String,$struct_name> {
2433 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2434 let parts: Vec<&str> = key.splitn(2, "::").collect();
2435 match parts.as_slice() {
2436 [inner_key, hashmap_key] => {
2437 // Get or create the struct for the specified key
2438 let inner_value = self
2439 .entry((*hashmap_key).to_owned())
2440 .or_insert_with($struct_name::default);
2441
2442 inner_value.set(inner_key, value)
2443 }
2444 _ => _config_err!("Unrecognized key '{key}'."),
2445 }
2446 }
2447
2448 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2449 for (column_name, col_options) in self {
2450 $(
2451 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2452 let desc = concat!($($d),*).trim();
2453 col_options.$field_name.visit(v, key.as_str(), desc);
2454 )*
2455 }
2456 }
2457 }
2458 }
2459}
2460
2461config_namespace_with_hashmap! {
2462 /// Options controlling parquet format for individual columns.
2463 ///
2464 /// See [`ParquetOptions`] for more details
2465 pub struct ParquetColumnOptions {
2466 /// Sets if bloom filter is enabled for the column path.
2467 pub bloom_filter_enabled: Option<bool>, default = None
2468
2469 /// Sets encoding for the column path.
2470 /// Valid values are: plain, plain_dictionary, rle,
2471 /// bit_packed, delta_binary_packed, delta_length_byte_array,
2472 /// delta_byte_array, rle_dictionary, and byte_stream_split.
2473 /// These values are not case-sensitive. If NULL, uses
2474 /// default parquet options
2475 pub encoding: Option<String>, default = None
2476
2477 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2478 /// default parquet options
2479 pub dictionary_enabled: Option<bool>, default = None
2480
2481 /// Sets default parquet compression codec for the column path.
2482 /// Valid values are: uncompressed, snappy, gzip(level),
2483 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2484 /// These values are not case-sensitive. If NULL, uses
2485 /// default parquet options
2486 pub compression: Option<String>, transform = str::to_lowercase, default = None
2487
2488 /// Sets if statistics are enabled for the column
2489 /// Valid values are: "none", "chunk", and "page"
2490 /// These values are not case sensitive. If NULL, uses
2491 /// default parquet options
2492 pub statistics_enabled: Option<String>, default = None
2493
2494 /// Sets bloom filter false positive probability for the column path. If NULL, uses
2495 /// default parquet options
2496 pub bloom_filter_fpp: Option<f64>, default = None
2497
2498 /// Sets bloom filter number of distinct values. If NULL, uses
2499 /// default parquet options
2500 pub bloom_filter_ndv: Option<u64>, default = None
2501 }
2502}
2503
2504#[derive(Clone, Debug, PartialEq)]
2505pub struct ConfigFileEncryptionProperties {
2506 /// Should the parquet footer be encrypted
2507 /// default is true
2508 pub encrypt_footer: bool,
2509 /// Key to use for the parquet footer encoded in hex format
2510 pub footer_key_as_hex: String,
2511 /// Metadata information for footer key
2512 pub footer_key_metadata_as_hex: String,
2513 /// HashMap of column names --> (key in hex format, metadata)
2514 pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2515 /// AAD prefix string uniquely identifies the file and prevents file swapping
2516 pub aad_prefix_as_hex: String,
2517 /// If true, store the AAD prefix in the file
2518 /// default is false
2519 pub store_aad_prefix: bool,
2520}
2521
2522// Setup to match EncryptionPropertiesBuilder::new()
2523impl Default for ConfigFileEncryptionProperties {
2524 fn default() -> Self {
2525 ConfigFileEncryptionProperties {
2526 encrypt_footer: true,
2527 footer_key_as_hex: String::new(),
2528 footer_key_metadata_as_hex: String::new(),
2529 column_encryption_properties: Default::default(),
2530 aad_prefix_as_hex: String::new(),
2531 store_aad_prefix: false,
2532 }
2533 }
2534}
2535
2536config_namespace_with_hashmap! {
2537 pub struct ColumnEncryptionProperties {
2538 /// Per column encryption key
2539 pub column_key_as_hex: String, default = "".to_string()
2540 /// Per column encryption key metadata
2541 pub column_metadata_as_hex: Option<String>, default = None
2542 }
2543}
2544
2545impl ConfigField for ConfigFileEncryptionProperties {
2546 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2547 let key = format!("{key_prefix}.encrypt_footer");
2548 let desc = "Encrypt the footer";
2549 self.encrypt_footer.visit(v, key.as_str(), desc);
2550
2551 let key = format!("{key_prefix}.footer_key_as_hex");
2552 let desc = "Key to use for the parquet footer";
2553 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2554
2555 let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2556 let desc = "Metadata to use for the parquet footer";
2557 self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2558
2559 self.column_encryption_properties.visit(v, key_prefix, desc);
2560
2561 let key = format!("{key_prefix}.aad_prefix_as_hex");
2562 let desc = "AAD prefix to use";
2563 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2564
2565 let key = format!("{key_prefix}.store_aad_prefix");
2566 let desc = "If true, store the AAD prefix";
2567 self.store_aad_prefix.visit(v, key.as_str(), desc);
2568
2569 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2570 }
2571
2572 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2573 // Any hex encoded values must be pre-encoded using
2574 // hex::encode() before calling set.
2575
2576 if key.contains("::") {
2577 // Handle any column specific properties
2578 return self.column_encryption_properties.set(key, value);
2579 };
2580
2581 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2582 match key {
2583 "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2584 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2585 "footer_key_metadata_as_hex" => {
2586 self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2587 }
2588 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2589 "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2590 _ => _config_err!(
2591 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2592 key
2593 ),
2594 }
2595 }
2596}
2597
2598#[cfg(feature = "parquet_encryption")]
2599impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2600 fn from(val: ConfigFileEncryptionProperties) -> Self {
2601 let mut fep = FileEncryptionProperties::builder(
2602 hex::decode(val.footer_key_as_hex).unwrap(),
2603 )
2604 .with_plaintext_footer(!val.encrypt_footer)
2605 .with_aad_prefix_storage(val.store_aad_prefix);
2606
2607 if !val.footer_key_metadata_as_hex.is_empty() {
2608 fep = fep.with_footer_key_metadata(
2609 hex::decode(&val.footer_key_metadata_as_hex)
2610 .expect("Invalid footer key metadata"),
2611 );
2612 }
2613
2614 for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2615 let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2616 .expect("Invalid column encryption key");
2617 let key_metadata = encryption_props
2618 .column_metadata_as_hex
2619 .as_ref()
2620 .map(|x| hex::decode(x).expect("Invalid column metadata"));
2621 match key_metadata {
2622 Some(key_metadata) => {
2623 fep = fep.with_column_key_and_metadata(
2624 column_name,
2625 encryption_key,
2626 key_metadata,
2627 );
2628 }
2629 None => {
2630 fep = fep.with_column_key(column_name, encryption_key);
2631 }
2632 }
2633 }
2634
2635 if !val.aad_prefix_as_hex.is_empty() {
2636 let aad_prefix: Vec<u8> =
2637 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2638 fep = fep.with_aad_prefix(aad_prefix);
2639 }
2640 Arc::unwrap_or_clone(fep.build().unwrap())
2641 }
2642}
2643
2644#[cfg(feature = "parquet_encryption")]
2645impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2646 fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2647 let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2648
2649 let mut column_encryption_properties: HashMap<
2650 String,
2651 ColumnEncryptionProperties,
2652 > = HashMap::new();
2653
2654 for (i, column_name) in column_names_vec.iter().enumerate() {
2655 let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2656 let column_metadata_as_hex: Option<String> =
2657 column_metas_vec.get(i).map(hex::encode);
2658 column_encryption_properties.insert(
2659 column_name.clone(),
2660 ColumnEncryptionProperties {
2661 column_key_as_hex,
2662 column_metadata_as_hex,
2663 },
2664 );
2665 }
2666 let mut aad_prefix: Vec<u8> = Vec::new();
2667 if let Some(prefix) = f.aad_prefix() {
2668 aad_prefix = prefix.clone();
2669 }
2670 ConfigFileEncryptionProperties {
2671 encrypt_footer: f.encrypt_footer(),
2672 footer_key_as_hex: hex::encode(f.footer_key()),
2673 footer_key_metadata_as_hex: f
2674 .footer_key_metadata()
2675 .map(hex::encode)
2676 .unwrap_or_default(),
2677 column_encryption_properties,
2678 aad_prefix_as_hex: hex::encode(aad_prefix),
2679 store_aad_prefix: f.store_aad_prefix(),
2680 }
2681 }
2682}
2683
2684#[derive(Clone, Debug, PartialEq)]
2685pub struct ConfigFileDecryptionProperties {
2686 /// Binary string to use for the parquet footer encoded in hex format
2687 pub footer_key_as_hex: String,
2688 /// HashMap of column names --> key in hex format
2689 pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2690 /// AAD prefix string uniquely identifies the file and prevents file swapping
2691 pub aad_prefix_as_hex: String,
2692 /// If true, then verify signature for files with plaintext footers.
2693 /// default = true
2694 pub footer_signature_verification: bool,
2695}
2696
2697config_namespace_with_hashmap! {
2698 pub struct ColumnDecryptionProperties {
2699 /// Per column encryption key
2700 pub column_key_as_hex: String, default = "".to_string()
2701 }
2702}
2703
2704// Setup to match DecryptionPropertiesBuilder::new()
2705impl Default for ConfigFileDecryptionProperties {
2706 fn default() -> Self {
2707 ConfigFileDecryptionProperties {
2708 footer_key_as_hex: String::new(),
2709 column_decryption_properties: Default::default(),
2710 aad_prefix_as_hex: String::new(),
2711 footer_signature_verification: true,
2712 }
2713 }
2714}
2715
2716impl ConfigField for ConfigFileDecryptionProperties {
2717 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2718 let key = format!("{key_prefix}.footer_key_as_hex");
2719 let desc = "Key to use for the parquet footer";
2720 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2721
2722 let key = format!("{key_prefix}.aad_prefix_as_hex");
2723 let desc = "AAD prefix to use";
2724 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2725
2726 let key = format!("{key_prefix}.footer_signature_verification");
2727 let desc = "If true, verify the footer signature";
2728 self.footer_signature_verification
2729 .visit(v, key.as_str(), desc);
2730
2731 self.column_decryption_properties.visit(v, key_prefix, desc);
2732 }
2733
2734 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2735 // Any hex encoded values must be pre-encoded using
2736 // hex::encode() before calling set.
2737
2738 if key.contains("::") {
2739 // Handle any column specific properties
2740 return self.column_decryption_properties.set(key, value);
2741 };
2742
2743 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2744 match key {
2745 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2746 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2747 "footer_signature_verification" => {
2748 self.footer_signature_verification.set(rem, value.as_ref())
2749 }
2750 _ => _config_err!(
2751 "Config value \"{}\" not found on ConfigFileDecryptionProperties",
2752 key
2753 ),
2754 }
2755 }
2756}
2757
2758#[cfg(feature = "parquet_encryption")]
2759impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2760 fn from(val: ConfigFileDecryptionProperties) -> Self {
2761 let mut column_names: Vec<&str> = Vec::new();
2762 let mut column_keys: Vec<Vec<u8>> = Vec::new();
2763
2764 for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2765 column_names.push(col_name.as_str());
2766 column_keys.push(
2767 hex::decode(&decryption_properties.column_key_as_hex)
2768 .expect("Invalid column decryption key"),
2769 );
2770 }
2771
2772 let mut fep = FileDecryptionProperties::builder(
2773 hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2774 )
2775 .with_column_keys(column_names, column_keys)
2776 .unwrap();
2777
2778 if !val.footer_signature_verification {
2779 fep = fep.disable_footer_signature_verification();
2780 }
2781
2782 if !val.aad_prefix_as_hex.is_empty() {
2783 let aad_prefix =
2784 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2785 fep = fep.with_aad_prefix(aad_prefix);
2786 }
2787
2788 Arc::unwrap_or_clone(fep.build().unwrap())
2789 }
2790}
2791
2792#[cfg(feature = "parquet_encryption")]
2793impl From<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
2794 fn from(f: &Arc<FileDecryptionProperties>) -> Self {
2795 let (column_names_vec, column_keys_vec) = f.column_keys();
2796 let mut column_decryption_properties: HashMap<
2797 String,
2798 ColumnDecryptionProperties,
2799 > = HashMap::new();
2800 for (i, column_name) in column_names_vec.iter().enumerate() {
2801 let props = ColumnDecryptionProperties {
2802 column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2803 };
2804 column_decryption_properties.insert(column_name.clone(), props);
2805 }
2806
2807 let mut aad_prefix: Vec<u8> = Vec::new();
2808 if let Some(prefix) = f.aad_prefix() {
2809 aad_prefix = prefix.clone();
2810 }
2811 ConfigFileDecryptionProperties {
2812 footer_key_as_hex: hex::encode(
2813 f.footer_key(None).unwrap_or_default().as_ref(),
2814 ),
2815 column_decryption_properties,
2816 aad_prefix_as_hex: hex::encode(aad_prefix),
2817 footer_signature_verification: f.check_plaintext_footer_integrity(),
2818 }
2819 }
2820}
2821
2822/// Holds implementation-specific options for an encryption factory
2823#[derive(Clone, Debug, Default, PartialEq)]
2824pub struct EncryptionFactoryOptions {
2825 pub options: HashMap<String, String>,
2826}
2827
2828impl ConfigField for EncryptionFactoryOptions {
2829 fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2830 for (option_key, option_value) in &self.options {
2831 v.some(
2832 &format!("{key}.{option_key}"),
2833 option_value,
2834 "Encryption factory specific option",
2835 );
2836 }
2837 }
2838
2839 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2840 self.options.insert(key.to_owned(), value.to_owned());
2841 Ok(())
2842 }
2843}
2844
2845impl EncryptionFactoryOptions {
2846 /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2847 pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2848 let mut options = T::default();
2849 for (key, value) in &self.options {
2850 options.set(key, value)?;
2851 }
2852 Ok(options)
2853 }
2854}
2855
2856config_namespace! {
2857 /// Options controlling CSV format
2858 pub struct CsvOptions {
2859 /// Specifies whether there is a CSV header (i.e. the first line
2860 /// consists of is column names). The value `None` indicates that
2861 /// the configuration should be consulted.
2862 pub has_header: Option<bool>, default = None
2863 pub delimiter: u8, default = b','
2864 pub quote: u8, default = b'"'
2865 pub terminator: Option<u8>, default = None
2866 pub escape: Option<u8>, default = None
2867 pub double_quote: Option<bool>, default = None
2868 /// Specifies whether newlines in (quoted) values are supported.
2869 ///
2870 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2871 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2872 /// parsed successfully, which may reduce performance.
2873 ///
2874 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2875 pub newlines_in_values: Option<bool>, default = None
2876 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2877 /// Compression level for the output file. The valid range depends on the
2878 /// compression algorithm:
2879 /// - ZSTD: 1 to 22 (default: 3)
2880 /// - GZIP: 0 to 9 (default: 6)
2881 /// - BZIP2: 0 to 9 (default: 6)
2882 /// - XZ: 0 to 9 (default: 6)
2883 /// If not specified, the default level for the compression algorithm is used.
2884 pub compression_level: Option<u32>, default = None
2885 pub schema_infer_max_rec: Option<usize>, default = None
2886 pub date_format: Option<String>, default = None
2887 pub datetime_format: Option<String>, default = None
2888 pub timestamp_format: Option<String>, default = None
2889 pub timestamp_tz_format: Option<String>, default = None
2890 pub time_format: Option<String>, default = None
2891 // The output format for Nulls in the CSV writer.
2892 pub null_value: Option<String>, default = None
2893 // The input regex for Nulls when loading CSVs.
2894 pub null_regex: Option<String>, default = None
2895 pub comment: Option<u8>, default = None
2896 /// Whether to allow truncated rows when parsing, both within a single file and across files.
2897 ///
2898 /// When set to false (default), reading a single CSV file which has rows of different lengths will
2899 /// error; if reading multiple CSV files with different number of columns, it will also fail.
2900 ///
2901 /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
2902 /// rows with null values for the missing columns; if reading multiple CSV files with different number
2903 /// of columns, it creates a union schema containing all columns found across the files, and will
2904 /// pad any files missing columns with null values for their rows.
2905 pub truncated_rows: Option<bool>, default = None
2906 }
2907}
2908
2909impl CsvOptions {
2910 /// Set a limit in terms of records to scan to infer the schema
2911 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2912 pub fn with_compression(
2913 mut self,
2914 compression_type_variant: CompressionTypeVariant,
2915 ) -> Self {
2916 self.compression = compression_type_variant;
2917 self
2918 }
2919
2920 /// Set a limit in terms of records to scan to infer the schema
2921 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2922 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2923 self.schema_infer_max_rec = Some(max_rec);
2924 self
2925 }
2926
2927 /// Set true to indicate that the first line is a header.
2928 /// - default to true
2929 pub fn with_has_header(mut self, has_header: bool) -> Self {
2930 self.has_header = Some(has_header);
2931 self
2932 }
2933
2934 /// Returns true if the first line is a header. If format options does not
2935 /// specify whether there is a header, returns `None` (indicating that the
2936 /// configuration should be consulted).
2937 pub fn has_header(&self) -> Option<bool> {
2938 self.has_header
2939 }
2940
2941 /// The character separating values within a row.
2942 /// - default to ','
2943 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2944 self.delimiter = delimiter;
2945 self
2946 }
2947
2948 /// The quote character in a row.
2949 /// - default to '"'
2950 pub fn with_quote(mut self, quote: u8) -> Self {
2951 self.quote = quote;
2952 self
2953 }
2954
2955 /// The character that terminates a row.
2956 /// - default to None (CRLF)
2957 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2958 self.terminator = terminator;
2959 self
2960 }
2961
2962 /// The escape character in a row.
2963 /// - default is None
2964 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2965 self.escape = escape;
2966 self
2967 }
2968
2969 /// Set true to indicate that the CSV quotes should be doubled.
2970 /// - default to true
2971 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2972 self.double_quote = Some(double_quote);
2973 self
2974 }
2975
2976 /// Specifies whether newlines in (quoted) values are supported.
2977 ///
2978 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2979 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2980 /// parsed successfully, which may reduce performance.
2981 ///
2982 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2983 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2984 self.newlines_in_values = Some(newlines_in_values);
2985 self
2986 }
2987
2988 /// Set a `CompressionTypeVariant` of CSV
2989 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2990 pub fn with_file_compression_type(
2991 mut self,
2992 compression: CompressionTypeVariant,
2993 ) -> Self {
2994 self.compression = compression;
2995 self
2996 }
2997
2998 /// Whether to allow truncated rows when parsing.
2999 /// By default this is set to false and will error if the CSV rows have different lengths.
3000 /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
3001 /// If the record’s schema is not nullable, then it will still return an error.
3002 pub fn with_truncated_rows(mut self, allow: bool) -> Self {
3003 self.truncated_rows = Some(allow);
3004 self
3005 }
3006
3007 /// Set the compression level for the output file.
3008 /// The valid range depends on the compression algorithm.
3009 /// If not specified, the default level for the algorithm is used.
3010 pub fn with_compression_level(mut self, level: u32) -> Self {
3011 self.compression_level = Some(level);
3012 self
3013 }
3014
3015 /// The delimiter character.
3016 pub fn delimiter(&self) -> u8 {
3017 self.delimiter
3018 }
3019
3020 /// The quote character.
3021 pub fn quote(&self) -> u8 {
3022 self.quote
3023 }
3024
3025 /// The terminator character.
3026 pub fn terminator(&self) -> Option<u8> {
3027 self.terminator
3028 }
3029
3030 /// The escape character.
3031 pub fn escape(&self) -> Option<u8> {
3032 self.escape
3033 }
3034}
3035
3036config_namespace! {
3037 /// Options controlling JSON format
3038 pub struct JsonOptions {
3039 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3040 /// Compression level for the output file. The valid range depends on the
3041 /// compression algorithm:
3042 /// - ZSTD: 1 to 22 (default: 3)
3043 /// - GZIP: 0 to 9 (default: 6)
3044 /// - BZIP2: 0 to 9 (default: 6)
3045 /// - XZ: 0 to 9 (default: 6)
3046 /// If not specified, the default level for the compression algorithm is used.
3047 pub compression_level: Option<u32>, default = None
3048 pub schema_infer_max_rec: Option<usize>, default = None
3049 }
3050}
3051
3052pub trait OutputFormatExt: Display {}
3053
3054#[derive(Debug, Clone, PartialEq)]
3055#[cfg_attr(feature = "parquet", expect(clippy::large_enum_variant))]
3056pub enum OutputFormat {
3057 CSV(CsvOptions),
3058 JSON(JsonOptions),
3059 #[cfg(feature = "parquet")]
3060 PARQUET(TableParquetOptions),
3061 AVRO,
3062 ARROW,
3063}
3064
3065impl Display for OutputFormat {
3066 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3067 let out = match self {
3068 OutputFormat::CSV(_) => "csv",
3069 OutputFormat::JSON(_) => "json",
3070 #[cfg(feature = "parquet")]
3071 OutputFormat::PARQUET(_) => "parquet",
3072 OutputFormat::AVRO => "avro",
3073 OutputFormat::ARROW => "arrow",
3074 };
3075 write!(f, "{out}")
3076 }
3077}
3078
3079#[cfg(test)]
3080mod tests {
3081 #[cfg(feature = "parquet")]
3082 use crate::config::TableParquetOptions;
3083 use crate::config::{
3084 ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
3085 Extensions, TableOptions,
3086 };
3087 use std::any::Any;
3088 use std::collections::HashMap;
3089
3090 #[derive(Default, Debug, Clone)]
3091 pub struct TestExtensionConfig {
3092 /// Should "foo" be replaced by "bar"?
3093 pub properties: HashMap<String, String>,
3094 }
3095
3096 impl ExtensionOptions for TestExtensionConfig {
3097 fn as_any(&self) -> &dyn Any {
3098 self
3099 }
3100
3101 fn as_any_mut(&mut self) -> &mut dyn Any {
3102 self
3103 }
3104
3105 fn cloned(&self) -> Box<dyn ExtensionOptions> {
3106 Box::new(self.clone())
3107 }
3108
3109 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
3110 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
3111 assert_eq!(key, "test");
3112 self.properties.insert(rem.to_owned(), value.to_owned());
3113 Ok(())
3114 }
3115
3116 fn entries(&self) -> Vec<ConfigEntry> {
3117 self.properties
3118 .iter()
3119 .map(|(k, v)| ConfigEntry {
3120 key: k.into(),
3121 value: Some(v.into()),
3122 description: "",
3123 })
3124 .collect()
3125 }
3126 }
3127
3128 impl ConfigExtension for TestExtensionConfig {
3129 const PREFIX: &'static str = "test";
3130 }
3131
3132 #[test]
3133 fn create_table_config() {
3134 let mut extension = Extensions::new();
3135 extension.insert(TestExtensionConfig::default());
3136 let table_config = TableOptions::new().with_extensions(extension);
3137 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
3138 assert!(kafka_config.is_some())
3139 }
3140
3141 #[test]
3142 fn alter_test_extension_config() {
3143 let mut extension = Extensions::new();
3144 extension.insert(TestExtensionConfig::default());
3145 let mut table_config = TableOptions::new().with_extensions(extension);
3146 table_config.set_config_format(ConfigFileType::CSV);
3147 table_config.set("format.delimiter", ";").unwrap();
3148 assert_eq!(table_config.csv.delimiter, b';');
3149 table_config.set("test.bootstrap.servers", "asd").unwrap();
3150 let kafka_config = table_config
3151 .extensions
3152 .get::<TestExtensionConfig>()
3153 .unwrap();
3154 assert_eq!(
3155 kafka_config.properties.get("bootstrap.servers").unwrap(),
3156 "asd"
3157 );
3158 }
3159
3160 #[test]
3161 fn iter_test_extension_config() {
3162 let mut extension = Extensions::new();
3163 extension.insert(TestExtensionConfig::default());
3164 let table_config = TableOptions::new().with_extensions(extension);
3165 let extensions = table_config.extensions.iter().collect::<Vec<_>>();
3166 assert_eq!(extensions.len(), 1);
3167 assert_eq!(extensions[0].0, TestExtensionConfig::PREFIX);
3168 }
3169
3170 #[test]
3171 fn csv_u8_table_options() {
3172 let mut table_config = TableOptions::new();
3173 table_config.set_config_format(ConfigFileType::CSV);
3174 table_config.set("format.delimiter", ";").unwrap();
3175 assert_eq!(table_config.csv.delimiter as char, ';');
3176 table_config.set("format.escape", "\"").unwrap();
3177 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
3178 table_config.set("format.escape", "\'").unwrap();
3179 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
3180 }
3181
3182 #[test]
3183 fn warning_only_not_default() {
3184 use std::sync::atomic::AtomicUsize;
3185 static COUNT: AtomicUsize = AtomicUsize::new(0);
3186 use log::{Level, LevelFilter, Metadata, Record};
3187 struct SimpleLogger;
3188 impl log::Log for SimpleLogger {
3189 fn enabled(&self, metadata: &Metadata) -> bool {
3190 metadata.level() <= Level::Info
3191 }
3192
3193 fn log(&self, record: &Record) {
3194 if self.enabled(record.metadata()) {
3195 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3196 }
3197 }
3198 fn flush(&self) {}
3199 }
3200 log::set_logger(&SimpleLogger).unwrap();
3201 log::set_max_level(LevelFilter::Info);
3202 let mut sql_parser_options = crate::config::SqlParserOptions::default();
3203 sql_parser_options
3204 .set("enable_options_value_normalization", "false")
3205 .unwrap();
3206 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
3207 sql_parser_options
3208 .set("enable_options_value_normalization", "true")
3209 .unwrap();
3210 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
3211 }
3212
3213 #[test]
3214 fn reset_nested_scalar_reports_helpful_error() {
3215 let mut value = true;
3216 let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err();
3217 let message = err.to_string();
3218 assert!(
3219 message.starts_with(
3220 "Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\""
3221 ),
3222 "unexpected error message: {message}"
3223 );
3224 }
3225
3226 #[cfg(feature = "parquet")]
3227 #[test]
3228 fn parquet_table_options() {
3229 let mut table_config = TableOptions::new();
3230 table_config.set_config_format(ConfigFileType::PARQUET);
3231 table_config
3232 .set("format.bloom_filter_enabled::col1", "true")
3233 .unwrap();
3234 assert_eq!(
3235 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
3236 Some(true)
3237 );
3238 }
3239
3240 #[cfg(feature = "parquet_encryption")]
3241 #[test]
3242 fn parquet_table_encryption() {
3243 use crate::config::{
3244 ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
3245 };
3246 use parquet::encryption::decrypt::FileDecryptionProperties;
3247 use parquet::encryption::encrypt::FileEncryptionProperties;
3248 use std::sync::Arc;
3249
3250 let footer_key = b"0123456789012345".to_vec(); // 128bit/16
3251 let column_names = vec!["double_field", "float_field"];
3252 let column_keys =
3253 vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
3254
3255 let file_encryption_properties =
3256 FileEncryptionProperties::builder(footer_key.clone())
3257 .with_column_keys(column_names.clone(), column_keys.clone())
3258 .unwrap()
3259 .build()
3260 .unwrap();
3261
3262 let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
3263 .with_column_keys(column_names.clone(), column_keys.clone())
3264 .unwrap()
3265 .build()
3266 .unwrap();
3267
3268 // Test round-trip
3269 let config_encrypt =
3270 ConfigFileEncryptionProperties::from(&file_encryption_properties);
3271 let encryption_properties_built =
3272 Arc::new(FileEncryptionProperties::from(config_encrypt.clone()));
3273 assert_eq!(file_encryption_properties, encryption_properties_built);
3274
3275 let config_decrypt = ConfigFileDecryptionProperties::from(&decryption_properties);
3276 let decryption_properties_built =
3277 Arc::new(FileDecryptionProperties::from(config_decrypt.clone()));
3278 assert_eq!(decryption_properties, decryption_properties_built);
3279
3280 ///////////////////////////////////////////////////////////////////////////////////
3281 // Test encryption config
3282
3283 // Display original encryption config
3284 // println!("{:#?}", config_encrypt);
3285
3286 let mut table_config = TableOptions::new();
3287 table_config.set_config_format(ConfigFileType::PARQUET);
3288 table_config
3289 .parquet
3290 .set(
3291 "crypto.file_encryption.encrypt_footer",
3292 config_encrypt.encrypt_footer.to_string().as_str(),
3293 )
3294 .unwrap();
3295 table_config
3296 .parquet
3297 .set(
3298 "crypto.file_encryption.footer_key_as_hex",
3299 config_encrypt.footer_key_as_hex.as_str(),
3300 )
3301 .unwrap();
3302
3303 for (i, col_name) in column_names.iter().enumerate() {
3304 let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3305 let value = hex::encode(column_keys[i].clone());
3306 table_config
3307 .parquet
3308 .set(key.as_str(), value.as_str())
3309 .unwrap();
3310 }
3311
3312 // Print matching final encryption config
3313 // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3314
3315 assert_eq!(
3316 table_config.parquet.crypto.file_encryption,
3317 Some(config_encrypt)
3318 );
3319
3320 ///////////////////////////////////////////////////////////////////////////////////
3321 // Test decryption config
3322
3323 // Display original decryption config
3324 // println!("{:#?}", config_decrypt);
3325
3326 let mut table_config = TableOptions::new();
3327 table_config.set_config_format(ConfigFileType::PARQUET);
3328 table_config
3329 .parquet
3330 .set(
3331 "crypto.file_decryption.footer_key_as_hex",
3332 config_decrypt.footer_key_as_hex.as_str(),
3333 )
3334 .unwrap();
3335
3336 for (i, col_name) in column_names.iter().enumerate() {
3337 let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3338 let value = hex::encode(column_keys[i].clone());
3339 table_config
3340 .parquet
3341 .set(key.as_str(), value.as_str())
3342 .unwrap();
3343 }
3344
3345 // Print matching final decryption config
3346 // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3347
3348 assert_eq!(
3349 table_config.parquet.crypto.file_decryption,
3350 Some(config_decrypt.clone())
3351 );
3352
3353 // Set config directly
3354 let mut table_config = TableOptions::new();
3355 table_config.set_config_format(ConfigFileType::PARQUET);
3356 table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3357 assert_eq!(
3358 table_config.parquet.crypto.file_decryption,
3359 Some(config_decrypt.clone())
3360 );
3361 }
3362
3363 #[cfg(feature = "parquet_encryption")]
3364 #[test]
3365 fn parquet_encryption_factory_config() {
3366 let mut parquet_options = TableParquetOptions::default();
3367
3368 assert_eq!(parquet_options.crypto.factory_id, None);
3369 assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3370
3371 let mut input_config = TestExtensionConfig::default();
3372 input_config
3373 .properties
3374 .insert("key1".to_string(), "value 1".to_string());
3375 input_config
3376 .properties
3377 .insert("key2".to_string(), "value 2".to_string());
3378
3379 parquet_options
3380 .crypto
3381 .configure_factory("example_factory", &input_config);
3382
3383 assert_eq!(
3384 parquet_options.crypto.factory_id,
3385 Some("example_factory".to_string())
3386 );
3387 let factory_options = &parquet_options.crypto.factory_options.options;
3388 assert_eq!(factory_options.len(), 2);
3389 assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3390 assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3391 }
3392
3393 #[cfg(feature = "parquet")]
3394 #[test]
3395 fn parquet_table_options_config_entry() {
3396 let mut table_config = TableOptions::new();
3397 table_config.set_config_format(ConfigFileType::PARQUET);
3398 table_config
3399 .set("format.bloom_filter_enabled::col1", "true")
3400 .unwrap();
3401 let entries = table_config.entries();
3402 assert!(
3403 entries
3404 .iter()
3405 .any(|item| item.key == "format.bloom_filter_enabled::col1")
3406 )
3407 }
3408
3409 #[cfg(feature = "parquet")]
3410 #[test]
3411 fn parquet_table_parquet_options_config_entry() {
3412 let mut table_parquet_options = TableParquetOptions::new();
3413 table_parquet_options
3414 .set(
3415 "crypto.file_encryption.column_key_as_hex::double_field",
3416 "31323334353637383930313233343530",
3417 )
3418 .unwrap();
3419 let entries = table_parquet_options.entries();
3420 assert!(
3421 entries.iter().any(|item| item.key
3422 == "crypto.file_encryption.column_key_as_hex::double_field")
3423 )
3424 }
3425
3426 #[cfg(feature = "parquet")]
3427 #[test]
3428 fn parquet_table_options_config_metadata_entry() {
3429 let mut table_config = TableOptions::new();
3430 table_config.set_config_format(ConfigFileType::PARQUET);
3431 table_config.set("format.metadata::key1", "").unwrap();
3432 table_config.set("format.metadata::key2", "value2").unwrap();
3433 table_config
3434 .set("format.metadata::key3", "value with spaces ")
3435 .unwrap();
3436 table_config
3437 .set("format.metadata::key4", "value with special chars :: :")
3438 .unwrap();
3439
3440 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3441 assert_eq!(parsed_metadata.get("should not exist1"), None);
3442 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3443 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3444 assert_eq!(
3445 parsed_metadata.get("key3"),
3446 Some(&Some("value with spaces ".into()))
3447 );
3448 assert_eq!(
3449 parsed_metadata.get("key4"),
3450 Some(&Some("value with special chars :: :".into()))
3451 );
3452
3453 // duplicate keys are overwritten
3454 table_config.set("format.metadata::key_dupe", "A").unwrap();
3455 table_config.set("format.metadata::key_dupe", "B").unwrap();
3456 let parsed_metadata = table_config.parquet.key_value_metadata;
3457 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3458 }
3459 #[cfg(feature = "parquet")]
3460 #[test]
3461 fn test_parquet_writer_version_validation() {
3462 use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};
3463
3464 let mut config = ConfigOptions::default();
3465
3466 // Valid values should work
3467 config
3468 .set("datafusion.execution.parquet.writer_version", "1.0")
3469 .unwrap();
3470 assert_eq!(
3471 config.execution.parquet.writer_version,
3472 DFParquetWriterVersion::V1_0
3473 );
3474
3475 config
3476 .set("datafusion.execution.parquet.writer_version", "2.0")
3477 .unwrap();
3478 assert_eq!(
3479 config.execution.parquet.writer_version,
3480 DFParquetWriterVersion::V2_0
3481 );
3482
3483 // Invalid value should error immediately at SET time
3484 let err = config
3485 .set("datafusion.execution.parquet.writer_version", "3.0")
3486 .unwrap_err();
3487 assert_eq!(
3488 err.to_string(),
3489 "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
3490 );
3491 }
3492}