datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::parsers::CompressionTypeVariant;
26use crate::utils::get_available_parallelism;
27use crate::{DataFusionError, Result};
28use std::any::Any;
29use std::collections::{BTreeMap, HashMap};
30use std::error::Error;
31use std::fmt::{self, Display};
32use std::str::FromStr;
33
34#[cfg(feature = "parquet_encryption")]
35use hex;
36
37/// A macro that wraps a configuration struct and automatically derives
38/// [`Default`] and [`ConfigField`] for it, allowing it to be used
39/// in the [`ConfigOptions`] configuration tree.
40///
41/// `transform` is used to normalize values before parsing.
42///
43/// For example,
44///
45/// ```ignore
46/// config_namespace! {
47/// /// Amazing config
48/// pub struct MyConfig {
49/// /// Field 1 doc
50/// field1: String, transform = str::to_lowercase, default = "".to_string()
51///
52/// /// Field 2 doc
53/// field2: usize, default = 232
54///
55/// /// Field 3 doc
56/// field3: Option<usize>, default = None
57/// }
58///}
59/// ```
60///
61/// Will generate
62///
63/// ```ignore
64/// /// Amazing config
65/// #[derive(Debug, Clone)]
66/// #[non_exhaustive]
67/// pub struct MyConfig {
68/// /// Field 1 doc
69/// field1: String,
70/// /// Field 2 doc
71/// field2: usize,
72/// /// Field 3 doc
73/// field3: Option<usize>,
74/// }
75/// impl ConfigField for MyConfig {
76/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
77/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
78/// match key {
79/// "field1" => {
80/// let value = str::to_lowercase(value);
81/// self.field1.set(rem, value.as_ref())
82/// },
83/// "field2" => self.field2.set(rem, value.as_ref()),
84/// "field3" => self.field3.set(rem, value.as_ref()),
85/// _ => _internal_err!(
86/// "Config value \"{}\" not found on MyConfig",
87/// key
88/// ),
89/// }
90/// }
91///
92/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
93/// let key = format!("{}.field1", key_prefix);
94/// let desc = "Field 1 doc";
95/// self.field1.visit(v, key.as_str(), desc);
96/// let key = format!("{}.field2", key_prefix);
97/// let desc = "Field 2 doc";
98/// self.field2.visit(v, key.as_str(), desc);
99/// let key = format!("{}.field3", key_prefix);
100/// let desc = "Field 3 doc";
101/// self.field3.visit(v, key.as_str(), desc);
102/// }
103/// }
104///
105/// impl Default for MyConfig {
106/// fn default() -> Self {
107/// Self {
108/// field1: "".to_string(),
109/// field2: 232,
110/// field3: None,
111/// }
112/// }
113/// }
114/// ```
115///
116/// NB: Misplaced commas may result in nonsensical errors
117#[macro_export]
118macro_rules! config_namespace {
119 (
120 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
121 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
122 $(#[allow($($struct_de:tt)*)])?
123 $vis:vis struct $struct_name:ident {
124 $(
125 $(#[doc = $d:tt])* // Field-level documentation attributes
126 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
127 $(#[allow($($field_de:tt)*)])?
128 $field_vis:vis $field_name:ident : $field_type:ty,
129 $(warn = $warn:expr,)?
130 $(transform = $transform:expr,)?
131 default = $default:expr
132 )*$(,)*
133 }
134 ) => {
135 $(#[doc = $struct_d])* // Apply struct documentation
136 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
137 $(#[allow($($struct_de)*)])?
138 #[derive(Debug, Clone, PartialEq)]
139 $vis struct $struct_name {
140 $(
141 $(#[doc = $d])* // Apply field documentation
142 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
143 $(#[allow($($field_de)*)])?
144 $field_vis $field_name: $field_type,
145 )*
146 }
147
148 impl $crate::config::ConfigField for $struct_name {
149 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
150 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
151 match key {
152 $(
153 stringify!($field_name) => {
154 // Safely apply deprecated attribute if present
155 // $(#[allow(deprecated)])?
156 {
157 $(let value = $transform(value);)? // Apply transformation if specified
158 #[allow(deprecated)]
159 let ret = self.$field_name.set(rem, value.as_ref());
160
161 $(if !$warn.is_empty() {
162 let default: $field_type = $default;
163 #[allow(deprecated)]
164 if default != self.$field_name {
165 log::warn!($warn);
166 }
167 })? // Log warning if specified, and the value is not the default
168 ret
169 }
170 },
171 )*
172 _ => return $crate::error::_config_err!(
173 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
174 )
175 }
176 }
177
178 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
179 $(
180 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
181 let desc = concat!($($d),*).trim();
182 #[allow(deprecated)]
183 self.$field_name.visit(v, key.as_str(), desc);
184 )*
185 }
186 }
187 impl Default for $struct_name {
188 fn default() -> Self {
189 #[allow(deprecated)]
190 Self {
191 $($field_name: $default),*
192 }
193 }
194 }
195 }
196}
197
198config_namespace! {
199 /// Options related to catalog and directory scanning
200 ///
201 /// See also: [`SessionConfig`]
202 ///
203 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
204 pub struct CatalogOptions {
205 /// Whether the default catalog and schema should be created automatically.
206 pub create_default_catalog_and_schema: bool, default = true
207
208 /// The default catalog name - this impacts what SQL queries use if not specified
209 pub default_catalog: String, default = "datafusion".to_string()
210
211 /// The default schema name - this impacts what SQL queries use if not specified
212 pub default_schema: String, default = "public".to_string()
213
214 /// Should DataFusion provide access to `information_schema`
215 /// virtual tables for displaying schema information
216 pub information_schema: bool, default = false
217
218 /// Location scanned to load tables for `default` schema
219 pub location: Option<String>, default = None
220
221 /// Type of `TableProvider` to use when loading `default` schema
222 pub format: Option<String>, default = None
223
224 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
225 /// if not specified explicitly in the statement.
226 pub has_header: bool, default = true
227
228 /// Specifies whether newlines in (quoted) CSV values are supported.
229 ///
230 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
231 /// if not specified explicitly in the statement.
232 ///
233 /// Parsing newlines in quoted values may be affected by execution behaviour such as
234 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
235 /// parsed successfully, which may reduce performance.
236 pub newlines_in_values: bool, default = false
237 }
238}
239
240config_namespace! {
241 /// Options related to SQL parser
242 ///
243 /// See also: [`SessionConfig`]
244 ///
245 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
246 pub struct SqlParserOptions {
247 /// When set to true, SQL parser will parse float as decimal type
248 pub parse_float_as_decimal: bool, default = false
249
250 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
251 pub enable_ident_normalization: bool, default = true
252
253 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
254 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
255 /// are normalized automatically.
256 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
257
258 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
259 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
260 pub dialect: String, default = "generic".to_string()
261 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
262
263 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
264 /// ignore the length. If false, error if a `VARCHAR` with a length is
265 /// specified. The Arrow type system does not have a notion of maximum
266 /// string length and thus DataFusion can not enforce such limits.
267 pub support_varchar_with_length: bool, default = true
268
269 /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
270 /// If false, they are mapped to `Utf8`.
271 /// Default is true.
272 pub map_string_types_to_utf8view: bool, default = true
273
274 /// When set to true, the source locations relative to the original SQL
275 /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
276 /// and recorded in the logical plan nodes.
277 pub collect_spans: bool, default = false
278
279 /// Specifies the recursion depth limit when parsing complex SQL Queries
280 pub recursion_limit: usize, default = 50
281
282 /// Specifies the default null ordering for query results. There are 4 options:
283 /// - `nulls_max`: Nulls appear last in ascending order.
284 /// - `nulls_min`: Nulls appear first in ascending order.
285 /// - `nulls_first`: Nulls always be first in any order.
286 /// - `nulls_last`: Nulls always be last in any order.
287 ///
288 /// By default, `nulls_max` is used to follow Postgres's behavior.
289 /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
290 pub default_null_ordering: String, default = "nulls_max".to_string()
291 }
292}
293
294#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
295pub enum SpillCompression {
296 Zstd,
297 Lz4Frame,
298 #[default]
299 Uncompressed,
300}
301
302impl FromStr for SpillCompression {
303 type Err = DataFusionError;
304
305 fn from_str(s: &str) -> Result<Self, Self::Err> {
306 match s.to_ascii_lowercase().as_str() {
307 "zstd" => Ok(Self::Zstd),
308 "lz4_frame" => Ok(Self::Lz4Frame),
309 "uncompressed" | "" => Ok(Self::Uncompressed),
310 other => Err(DataFusionError::Configuration(format!(
311 "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
312 ))),
313 }
314 }
315}
316
317impl ConfigField for SpillCompression {
318 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
319 v.some(key, self, description)
320 }
321
322 fn set(&mut self, _: &str, value: &str) -> Result<()> {
323 *self = SpillCompression::from_str(value)?;
324 Ok(())
325 }
326}
327
328impl Display for SpillCompression {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 let str = match self {
331 Self::Zstd => "zstd",
332 Self::Lz4Frame => "lz4_frame",
333 Self::Uncompressed => "uncompressed",
334 };
335 write!(f, "{str}")
336 }
337}
338
339impl From<SpillCompression> for Option<CompressionType> {
340 fn from(c: SpillCompression) -> Self {
341 match c {
342 SpillCompression::Zstd => Some(CompressionType::ZSTD),
343 SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
344 SpillCompression::Uncompressed => None,
345 }
346 }
347}
348
349config_namespace! {
350 /// Options related to query execution
351 ///
352 /// See also: [`SessionConfig`]
353 ///
354 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
355 pub struct ExecutionOptions {
356 /// Default batch size while creating new batches, it's especially useful for
357 /// buffer-in-memory batches since creating tiny batches would result in too much
358 /// metadata memory consumption
359 pub batch_size: usize, default = 8192
360
361 /// When set to true, record batches will be examined between each operator and
362 /// small batches will be coalesced into larger batches. This is helpful when there
363 /// are highly selective filters or joins that could produce tiny output batches. The
364 /// target batch size is determined by the configuration setting
365 pub coalesce_batches: bool, default = true
366
367 /// Should DataFusion collect statistics when first creating a table.
368 /// Has no effect after the table is created. Applies to the default
369 /// `ListingTableProvider` in DataFusion. Defaults to true.
370 pub collect_statistics: bool, default = true
371
372 /// Number of partitions for query execution. Increasing partitions can increase
373 /// concurrency.
374 ///
375 /// Defaults to the number of CPU cores on the system
376 pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
377
378 /// The default time zone
379 ///
380 /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
381 /// according to this time zone, and then extract the hour
382 pub time_zone: String, default = "+00:00".into()
383
384 /// Parquet options
385 pub parquet: ParquetOptions, default = Default::default()
386
387 /// Fan-out during initial physical planning.
388 ///
389 /// This is mostly use to plan `UNION` children in parallel.
390 ///
391 /// Defaults to the number of CPU cores on the system
392 pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
393
394 /// When set to true, skips verifying that the schema produced by
395 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
396 /// schema of the input plan.
397 ///
398 /// When set to false, if the schema does not match exactly
399 /// (including nullability and metadata), a planning error will be raised.
400 ///
401 /// This is used to workaround bugs in the planner that are now caught by
402 /// the new schema verification step.
403 pub skip_physical_aggregate_schema_check: bool, default = false
404
405 /// Sets the compression codec used when spilling data to disk.
406 ///
407 /// Since datafusion writes spill files using the Arrow IPC Stream format,
408 /// only codecs supported by the Arrow IPC Stream Writer are allowed.
409 /// Valid values are: uncompressed, lz4_frame, zstd.
410 /// Note: lz4_frame offers faster (de)compression, but typically results in
411 /// larger spill files. In contrast, zstd achieves
412 /// higher compression ratios at the cost of slower (de)compression speed.
413 pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
414
415 /// Specifies the reserved memory for each spillable sort operation to
416 /// facilitate an in-memory merge.
417 ///
418 /// When a sort operation spills to disk, the in-memory data must be
419 /// sorted and merged before being written to a file. This setting reserves
420 /// a specific amount of memory for that in-memory sort/merge process.
421 ///
422 /// Note: This setting is irrelevant if the sort operation cannot spill
423 /// (i.e., if there's no `DiskManager` configured).
424 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
425
426 /// When sorting, below what size should data be concatenated
427 /// and sorted in a single RecordBatch rather than sorted in
428 /// batches and merged.
429 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
430
431 /// Number of files to read in parallel when inferring schema and statistics
432 pub meta_fetch_concurrency: usize, default = 32
433
434 /// Guarantees a minimum level of output files running in parallel.
435 /// RecordBatches will be distributed in round robin fashion to each
436 /// parallel writer. Each writer is closed and a new file opened once
437 /// soft_max_rows_per_output_file is reached.
438 pub minimum_parallel_output_files: usize, default = 4
439
440 /// Target number of rows in output files when writing multiple.
441 /// This is a soft max, so it can be exceeded slightly. There also
442 /// will be one file smaller than the limit if the total
443 /// number of rows written is not roughly divisible by the soft max
444 pub soft_max_rows_per_output_file: usize, default = 50000000
445
446 /// This is the maximum number of RecordBatches buffered
447 /// for each output file being worked. Higher values can potentially
448 /// give faster write performance at the cost of higher peak
449 /// memory consumption
450 pub max_buffered_batches_per_output_file: usize, default = 2
451
452 /// Should sub directories be ignored when scanning directories for data
453 /// files. Defaults to true (ignores subdirectories), consistent with
454 /// Hive. Note that this setting does not affect reading partitioned
455 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
456 pub listing_table_ignore_subdirectory: bool, default = true
457
458 /// Should DataFusion support recursive CTEs
459 pub enable_recursive_ctes: bool, default = true
460
461 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
462 /// statistics into the same file groups.
463 /// Currently experimental
464 pub split_file_groups_by_statistics: bool, default = false
465
466 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
467 pub keep_partition_by_columns: bool, default = false
468
469 /// Aggregation ratio (number of distinct groups / number of input rows)
470 /// threshold for skipping partial aggregation. If the value is greater
471 /// then partial aggregation will skip aggregation for further input
472 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
473
474 /// Number of input rows partial aggregation partition should process, before
475 /// aggregation ratio check and trying to switch to skipping aggregation mode
476 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
477
478 /// Should DataFusion use row number estimates at the input to decide
479 /// whether increasing parallelism is beneficial or not. By default,
480 /// only exact row numbers (not estimates) are used for this decision.
481 /// Setting this flag to `true` will likely produce better plans.
482 /// if the source of statistics is accurate.
483 /// We plan to make this the default in the future.
484 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
485
486 /// Should DataFusion enforce batch size in joins or not. By default,
487 /// DataFusion will not enforce batch size in joins. Enforcing batch size
488 /// in joins can reduce memory usage when joining large
489 /// tables with a highly-selective join filter, but is also slightly slower.
490 pub enforce_batch_size_in_joins: bool, default = false
491
492 /// Size (bytes) of data buffer DataFusion uses when writing output files.
493 /// This affects the size of the data chunks that are uploaded to remote
494 /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
495 /// written, it may be necessary to increase this size to avoid errors from
496 /// the remote end point.
497 pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
498 }
499}
500
501config_namespace! {
502 /// Options for reading and writing parquet files
503 ///
504 /// See also: [`SessionConfig`]
505 ///
506 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
507 pub struct ParquetOptions {
508 // The following options affect reading parquet files
509
510 /// (reading) If true, reads the Parquet data page level metadata (the
511 /// Page Index), if present, to reduce the I/O and number of
512 /// rows decoded.
513 pub enable_page_index: bool, default = true
514
515 /// (reading) If true, the parquet reader attempts to skip entire row groups based
516 /// on the predicate in the query and the metadata (min/max values) stored in
517 /// the parquet file
518 pub pruning: bool, default = true
519
520 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
521 /// the file Schema. This setting can help avoid schema conflicts when querying
522 /// multiple parquet files with schemas containing compatible types but different metadata
523 pub skip_metadata: bool, default = true
524
525 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
526 /// bytes of the parquet file optimistically. If not specified, two reads are required:
527 /// One read to fetch the 8-byte parquet footer and
528 /// another to fetch the metadata length encoded in the footer
529 pub metadata_size_hint: Option<usize>, default = None
530
531 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
532 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
533 pub pushdown_filters: bool, default = false
534
535 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
536 /// will be reordered heuristically to minimize the cost of evaluation. If false,
537 /// the filters are applied in the same order as written in the query
538 pub reorder_filters: bool, default = false
539
540 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
541 /// and `Binary/BinaryLarge` with `BinaryView`.
542 pub schema_force_view_types: bool, default = true
543
544 /// (reading) If true, parquet reader will read columns of
545 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
546 ///
547 /// Parquet files generated by some legacy writers do not correctly set
548 /// the UTF8 flag for strings, causing string columns to be loaded as
549 /// BLOB instead.
550 pub binary_as_string: bool, default = false
551
552 /// (reading) If true, parquet reader will read columns of
553 /// physical type int96 as originating from a different resolution
554 /// than nanosecond. This is useful for reading data from systems like Spark
555 /// which stores microsecond resolution timestamps in an int96 allowing it
556 /// to write values with a larger date range than 64-bit timestamps with
557 /// nanosecond resolution.
558 pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
559
560 /// (reading) Use any available bloom filters when reading parquet files
561 pub bloom_filter_on_read: bool, default = true
562
563 // The following options affect writing to parquet files
564 // and map to parquet::file::properties::WriterProperties
565
566 /// (writing) Sets best effort maximum size of data page in bytes
567 pub data_pagesize_limit: usize, default = 1024 * 1024
568
569 /// (writing) Sets write_batch_size in bytes
570 pub write_batch_size: usize, default = 1024
571
572 /// (writing) Sets parquet writer version
573 /// valid values are "1.0" and "2.0"
574 pub writer_version: String, default = "1.0".to_string()
575
576 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
577 ///
578 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
579 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
580 pub skip_arrow_metadata: bool, default = false
581
582 /// (writing) Sets default parquet compression codec.
583 /// Valid values are: uncompressed, snappy, gzip(level),
584 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
585 /// These values are not case sensitive. If NULL, uses
586 /// default parquet writer setting
587 ///
588 /// Note that this default setting is not the same as
589 /// the default parquet writer setting.
590 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
591
592 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
593 /// default parquet writer setting
594 pub dictionary_enabled: Option<bool>, default = Some(true)
595
596 /// (writing) Sets best effort maximum dictionary page size, in bytes
597 pub dictionary_page_size_limit: usize, default = 1024 * 1024
598
599 /// (writing) Sets if statistics are enabled for any column
600 /// Valid values are: "none", "chunk", and "page"
601 /// These values are not case sensitive. If NULL, uses
602 /// default parquet writer setting
603 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
604
605 /// (writing) Target maximum number of rows in each row group (defaults to 1M
606 /// rows). Writing larger row groups requires more memory to write, but
607 /// can get better compression and be faster to read.
608 pub max_row_group_size: usize, default = 1024 * 1024
609
610 /// (writing) Sets "created by" property
611 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
612
613 /// (writing) Sets column index truncate length
614 pub column_index_truncate_length: Option<usize>, default = Some(64)
615
616 /// (writing) Sets statistics truncate length. If NULL, uses
617 /// default parquet writer setting
618 pub statistics_truncate_length: Option<usize>, default = Some(64)
619
620 /// (writing) Sets best effort maximum number of rows in data page
621 pub data_page_row_count_limit: usize, default = 20_000
622
623 /// (writing) Sets default encoding for any column.
624 /// Valid values are: plain, plain_dictionary, rle,
625 /// bit_packed, delta_binary_packed, delta_length_byte_array,
626 /// delta_byte_array, rle_dictionary, and byte_stream_split.
627 /// These values are not case sensitive. If NULL, uses
628 /// default parquet writer setting
629 pub encoding: Option<String>, transform = str::to_lowercase, default = None
630
631 /// (writing) Write bloom filters for all columns when creating parquet files
632 pub bloom_filter_on_write: bool, default = false
633
634 /// (writing) Sets bloom filter false positive probability. If NULL, uses
635 /// default parquet writer setting
636 pub bloom_filter_fpp: Option<f64>, default = None
637
638 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
639 /// default parquet writer setting
640 pub bloom_filter_ndv: Option<u64>, default = None
641
642 /// (writing) Controls whether DataFusion will attempt to speed up writing
643 /// parquet files by serializing them in parallel. Each column
644 /// in each row group in each output file are serialized in parallel
645 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
646 pub allow_single_file_parallelism: bool, default = true
647
648 /// (writing) By default parallel parquet writer is tuned for minimum
649 /// memory usage in a streaming execution plan. You may see
650 /// a performance benefit when writing large parquet files
651 /// by increasing maximum_parallel_row_group_writers and
652 /// maximum_buffered_record_batches_per_stream if your system
653 /// has idle cores and can tolerate additional memory usage.
654 /// Boosting these values is likely worthwhile when
655 /// writing out already in-memory data, such as from a cached
656 /// data frame.
657 pub maximum_parallel_row_group_writers: usize, default = 1
658
659 /// (writing) By default parallel parquet writer is tuned for minimum
660 /// memory usage in a streaming execution plan. You may see
661 /// a performance benefit when writing large parquet files
662 /// by increasing maximum_parallel_row_group_writers and
663 /// maximum_buffered_record_batches_per_stream if your system
664 /// has idle cores and can tolerate additional memory usage.
665 /// Boosting these values is likely worthwhile when
666 /// writing out already in-memory data, such as from a cached
667 /// data frame.
668 pub maximum_buffered_record_batches_per_stream: usize, default = 2
669 }
670}
671
672config_namespace! {
673 /// Options for configuring Parquet Modular Encryption
674 ///
675 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
676 pub struct ParquetEncryptionOptions {
677 /// Optional file decryption properties
678 pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
679
680 /// Optional file encryption properties
681 pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
682
683 /// Identifier for the encryption factory to use to create file encryption and decryption properties.
684 /// Encryption factories can be registered in the runtime environment with
685 /// `RuntimeEnv::register_parquet_encryption_factory`.
686 pub factory_id: Option<String>, default = None
687
688 /// Any encryption factory specific options
689 pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
690 }
691}
692
693impl ParquetEncryptionOptions {
694 /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
695 pub fn configure_factory(
696 &mut self,
697 factory_id: &str,
698 config: &impl ExtensionOptions,
699 ) {
700 self.factory_id = Some(factory_id.to_owned());
701 self.factory_options.options.clear();
702 for entry in config.entries() {
703 if let Some(value) = entry.value {
704 self.factory_options.options.insert(entry.key, value);
705 }
706 }
707 }
708}
709
710config_namespace! {
711 /// Options related to query optimization
712 ///
713 /// See also: [`SessionConfig`]
714 ///
715 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
716 pub struct OptimizerOptions {
717 /// When set to true, the optimizer will push a limit operation into
718 /// grouped aggregations which have no aggregate expressions, as a soft limit,
719 /// emitting groups once the limit is reached, before all rows in the group are read.
720 pub enable_distinct_aggregation_soft_limit: bool, default = true
721
722 /// When set to true, the physical plan optimizer will try to add round robin
723 /// repartitioning to increase parallelism to leverage more CPU cores
724 pub enable_round_robin_repartition: bool, default = true
725
726 /// When set to true, the optimizer will attempt to perform limit operations
727 /// during aggregations, if possible
728 pub enable_topk_aggregation: bool, default = true
729
730 /// When set to true, the optimizer will attempt to push limit operations
731 /// past window functions, if possible
732 pub enable_window_limits: bool, default = true
733
734 /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
735 /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
736 /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
737 /// This means that if we already have 10 timestamps in the year 2025
738 /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
739 pub enable_dynamic_filter_pushdown: bool, default = true
740
741 /// When set to true, the optimizer will insert filters before a join between
742 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
743 /// filter can add additional overhead when the file format does not fully support
744 /// predicate push down.
745 pub filter_null_join_keys: bool, default = false
746
747 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
748 /// in parallel using the provided `target_partitions` level
749 pub repartition_aggregations: bool, default = true
750
751 /// Minimum total files size in bytes to perform file scan repartitioning.
752 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
753
754 /// Should DataFusion repartition data using the join keys to execute joins in parallel
755 /// using the provided `target_partitions` level
756 pub repartition_joins: bool, default = true
757
758 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
759 /// its inputs do not have any ordering or filtering If the flag is not enabled,
760 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
761 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
762 /// RightAnti, and RightSemi - being produced only at the end of the execution.
763 /// This is not typical in stream processing. Additionally, without proper design for
764 /// long runner execution, all types of joins may encounter out-of-memory errors.
765 pub allow_symmetric_joins_without_pruning: bool, default = true
766
767 /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
768 /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
769 ///
770 /// For FileSources, only Parquet and CSV formats are currently supported.
771 ///
772 /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
773 /// might be partitioned into smaller chunks) for parallel scanning.
774 /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
775 /// happen within a single file.
776 ///
777 /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
778 /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
779 /// the total number of partitions and batches per partition, but does not slice the initial
780 /// record tables provided to the MemTable on creation.
781 pub repartition_file_scans: bool, default = true
782
783 /// Should DataFusion repartition data using the partitions keys to execute window
784 /// functions in parallel using the provided `target_partitions` level
785 pub repartition_windows: bool, default = true
786
787 /// Should DataFusion execute sorts in a per-partition fashion and merge
788 /// afterwards instead of coalescing first and sorting globally.
789 /// With this flag is enabled, plans in the form below
790 ///
791 /// ```text
792 /// "SortExec: [a@0 ASC]",
793 /// " CoalescePartitionsExec",
794 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
795 /// ```
796 /// would turn into the plan below which performs better in multithreaded environments
797 ///
798 /// ```text
799 /// "SortPreservingMergeExec: [a@0 ASC]",
800 /// " SortExec: [a@0 ASC]",
801 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
802 /// ```
803 pub repartition_sorts: bool, default = true
804
805 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
806 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
807 /// using `SortPreservingMergeExec`)
808 ///
809 /// When false, DataFusion will maximize plan parallelism using
810 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
811 pub prefer_existing_sort: bool, default = false
812
813 /// When set to true, the logical plan optimizer will produce warning
814 /// messages if any optimization rules produce errors and then proceed to the next
815 /// rule. When set to false, any rules that produce errors will cause the query to fail
816 pub skip_failed_rules: bool, default = false
817
818 /// Number of times that the optimizer will attempt to optimize the plan
819 pub max_passes: usize, default = 3
820
821 /// When set to true, the physical plan optimizer will run a top down
822 /// process to reorder the join keys
823 pub top_down_join_key_reordering: bool, default = true
824
825 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
826 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
827 pub prefer_hash_join: bool, default = true
828
829 /// The maximum estimated size in bytes for one input side of a HashJoin
830 /// will be collected into a single partition
831 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
832
833 /// The maximum estimated size in rows for one input side of a HashJoin
834 /// will be collected into a single partition
835 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
836
837 /// The default filter selectivity used by Filter Statistics
838 /// when an exact selectivity cannot be determined. Valid values are
839 /// between 0 (no selectivity) and 100 (all rows are selected).
840 pub default_filter_selectivity: u8, default = 20
841
842 /// When set to true, the optimizer will not attempt to convert Union to Interleave
843 pub prefer_existing_union: bool, default = false
844
845 /// When set to true, if the returned type is a view type
846 /// then the output will be coerced to a non-view.
847 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
848 pub expand_views_at_output: bool, default = false
849 }
850}
851
852config_namespace! {
853 /// Options controlling explain output
854 ///
855 /// See also: [`SessionConfig`]
856 ///
857 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
858 pub struct ExplainOptions {
859 /// When set to true, the explain statement will only print logical plans
860 pub logical_plan_only: bool, default = false
861
862 /// When set to true, the explain statement will only print physical plans
863 pub physical_plan_only: bool, default = false
864
865 /// When set to true, the explain statement will print operator statistics
866 /// for physical plans
867 pub show_statistics: bool, default = false
868
869 /// When set to true, the explain statement will print the partition sizes
870 pub show_sizes: bool, default = true
871
872 /// When set to true, the explain statement will print schema information
873 pub show_schema: bool, default = false
874
875 /// Display format of explain. Default is "indent".
876 /// When set to "tree", it will print the plan in a tree-rendered format.
877 pub format: String, default = "indent".to_string()
878
879 /// (format=tree only) Maximum total width of the rendered tree.
880 /// When set to 0, the tree will have no width limit.
881 pub tree_maximum_render_width: usize, default = 240
882 }
883}
884
885impl ExecutionOptions {
886 /// Returns the correct parallelism based on the provided `value`.
887 /// If `value` is `"0"`, returns the default available parallelism, computed with
888 /// `get_available_parallelism`. Otherwise, returns `value`.
889 fn normalized_parallelism(value: &str) -> String {
890 if value.parse::<usize>() == Ok(0) {
891 get_available_parallelism().to_string()
892 } else {
893 value.to_owned()
894 }
895 }
896}
897
898config_namespace! {
899 /// Options controlling the format of output when printing record batches
900 /// Copies [`arrow::util::display::FormatOptions`]
901 pub struct FormatOptions {
902 /// If set to `true` any formatting errors will be written to the output
903 /// instead of being converted into a [`std::fmt::Error`]
904 pub safe: bool, default = true
905 /// Format string for nulls
906 pub null: String, default = "".into()
907 /// Date format for date arrays
908 pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
909 /// Format for DateTime arrays
910 pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
911 /// Timestamp format for timestamp arrays
912 pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
913 /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
914 pub timestamp_tz_format: Option<String>, default = None
915 /// Time format for time arrays
916 pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
917 /// Duration format. Can be either `"pretty"` or `"ISO8601"`
918 pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
919 /// Show types in visual representation batches
920 pub types_info: bool, default = false
921 }
922}
923
924impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
925 type Error = DataFusionError;
926 fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
927 let duration_format = match self.duration_format.as_str() {
928 "pretty" => arrow::util::display::DurationFormat::Pretty,
929 "iso8601" => arrow::util::display::DurationFormat::ISO8601,
930 _ => {
931 return _config_err!(
932 "Invalid duration format: {}. Valid values are pretty or iso8601",
933 self.duration_format
934 )
935 }
936 };
937
938 Ok(arrow::util::display::FormatOptions::new()
939 .with_display_error(self.safe)
940 .with_null(&self.null)
941 .with_date_format(self.date_format.as_deref())
942 .with_datetime_format(self.datetime_format.as_deref())
943 .with_timestamp_format(self.timestamp_format.as_deref())
944 .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
945 .with_time_format(self.time_format.as_deref())
946 .with_duration_format(duration_format)
947 .with_types_info(self.types_info))
948 }
949}
950
951/// A key value pair, with a corresponding description
952#[derive(Debug, Hash, PartialEq, Eq)]
953pub struct ConfigEntry {
954 /// A unique string to identify this config value
955 pub key: String,
956
957 /// The value if any
958 pub value: Option<String>,
959
960 /// A description of this configuration entry
961 pub description: &'static str,
962}
963
964/// Configuration options struct, able to store both built-in configuration and custom options
965#[derive(Debug, Clone, Default)]
966#[non_exhaustive]
967pub struct ConfigOptions {
968 /// Catalog options
969 pub catalog: CatalogOptions,
970 /// Execution options
971 pub execution: ExecutionOptions,
972 /// Optimizer options
973 pub optimizer: OptimizerOptions,
974 /// SQL parser options
975 pub sql_parser: SqlParserOptions,
976 /// Explain options
977 pub explain: ExplainOptions,
978 /// Optional extensions registered using [`Extensions::insert`]
979 pub extensions: Extensions,
980 /// Formatting options when printing batches
981 pub format: FormatOptions,
982}
983
984impl ConfigField for ConfigOptions {
985 fn set(&mut self, key: &str, value: &str) -> Result<()> {
986 // Extensions are handled in the public `ConfigOptions::set`
987 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
988 match key {
989 "catalog" => self.catalog.set(rem, value),
990 "execution" => self.execution.set(rem, value),
991 "optimizer" => self.optimizer.set(rem, value),
992 "explain" => self.explain.set(rem, value),
993 "sql_parser" => self.sql_parser.set(rem, value),
994 "format" => self.format.set(rem, value),
995 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
996 }
997 }
998
999 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1000 self.catalog.visit(v, "datafusion.catalog", "");
1001 self.execution.visit(v, "datafusion.execution", "");
1002 self.optimizer.visit(v, "datafusion.optimizer", "");
1003 self.explain.visit(v, "datafusion.explain", "");
1004 self.sql_parser.visit(v, "datafusion.sql_parser", "");
1005 self.format.visit(v, "datafusion.format", "");
1006 }
1007}
1008
1009impl ConfigOptions {
1010 /// Creates a new [`ConfigOptions`] with default values
1011 pub fn new() -> Self {
1012 Self::default()
1013 }
1014
1015 /// Set extensions to provided value
1016 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1017 self.extensions = extensions;
1018 self
1019 }
1020
1021 /// Set a configuration option
1022 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1023 let Some((prefix, key)) = key.split_once('.') else {
1024 return _config_err!("could not find config namespace for key \"{key}\"");
1025 };
1026
1027 if prefix == "datafusion" {
1028 return ConfigField::set(self, key, value);
1029 }
1030
1031 let Some(e) = self.extensions.0.get_mut(prefix) else {
1032 return _config_err!("Could not find config namespace \"{prefix}\"");
1033 };
1034 e.0.set(key, value)
1035 }
1036
1037 /// Create new [`ConfigOptions`], taking values from environment variables
1038 /// where possible.
1039 ///
1040 /// For example, to configure `datafusion.execution.batch_size`
1041 /// ([`ExecutionOptions::batch_size`]) you would set the
1042 /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1043 ///
1044 /// The name of the environment variable is the option's key, transformed to
1045 /// uppercase and with periods replaced with underscores.
1046 ///
1047 /// Values are parsed according to the [same rules used in casts from
1048 /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1049 ///
1050 /// If the value in the environment variable cannot be cast to the type of
1051 /// the configuration option, the default value will be used instead and a
1052 /// warning emitted. Environment variables are read when this method is
1053 /// called, and are not re-read later.
1054 pub fn from_env() -> Result<Self> {
1055 struct Visitor(Vec<String>);
1056
1057 impl Visit for Visitor {
1058 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1059 self.0.push(key.to_string())
1060 }
1061
1062 fn none(&mut self, key: &str, _: &'static str) {
1063 self.0.push(key.to_string())
1064 }
1065 }
1066
1067 // Extract the names of all fields and then look up the corresponding
1068 // environment variables. This isn't hugely efficient but avoids
1069 // ambiguity between `a.b` and `a_b` which would both correspond
1070 // to an environment variable of `A_B`
1071
1072 let mut keys = Visitor(vec![]);
1073 let mut ret = Self::default();
1074 ret.visit(&mut keys, "datafusion", "");
1075
1076 for key in keys.0 {
1077 let env = key.to_uppercase().replace('.', "_");
1078 if let Some(var) = std::env::var_os(env) {
1079 let value = var.to_string_lossy();
1080 log::info!("Set {key} to {value} from the environment variable");
1081 ret.set(&key, value.as_ref())?;
1082 }
1083 }
1084
1085 Ok(ret)
1086 }
1087
1088 /// Create new ConfigOptions struct, taking values from a string hash map.
1089 ///
1090 /// Only the built-in configurations will be extracted from the hash map
1091 /// and other key value pairs will be ignored.
1092 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1093 struct Visitor(Vec<String>);
1094
1095 impl Visit for Visitor {
1096 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1097 self.0.push(key.to_string())
1098 }
1099
1100 fn none(&mut self, key: &str, _: &'static str) {
1101 self.0.push(key.to_string())
1102 }
1103 }
1104
1105 let mut keys = Visitor(vec![]);
1106 let mut ret = Self::default();
1107 ret.visit(&mut keys, "datafusion", "");
1108
1109 for key in keys.0 {
1110 if let Some(var) = settings.get(&key) {
1111 ret.set(&key, var)?;
1112 }
1113 }
1114
1115 Ok(ret)
1116 }
1117
1118 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1119 pub fn entries(&self) -> Vec<ConfigEntry> {
1120 struct Visitor(Vec<ConfigEntry>);
1121
1122 impl Visit for Visitor {
1123 fn some<V: Display>(
1124 &mut self,
1125 key: &str,
1126 value: V,
1127 description: &'static str,
1128 ) {
1129 self.0.push(ConfigEntry {
1130 key: key.to_string(),
1131 value: Some(value.to_string()),
1132 description,
1133 })
1134 }
1135
1136 fn none(&mut self, key: &str, description: &'static str) {
1137 self.0.push(ConfigEntry {
1138 key: key.to_string(),
1139 value: None,
1140 description,
1141 })
1142 }
1143 }
1144
1145 let mut v = Visitor(vec![]);
1146 self.visit(&mut v, "datafusion", "");
1147
1148 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1149 v.0
1150 }
1151
1152 /// Generate documentation that can be included in the user guide
1153 pub fn generate_config_markdown() -> String {
1154 use std::fmt::Write as _;
1155
1156 let mut s = Self::default();
1157
1158 // Normalize for display
1159 s.execution.target_partitions = 0;
1160 s.execution.planning_concurrency = 0;
1161
1162 let mut docs = "| key | default | description |\n".to_string();
1163 docs += "|-----|---------|-------------|\n";
1164 let mut entries = s.entries();
1165 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1166
1167 for entry in s.entries() {
1168 let _ = writeln!(
1169 &mut docs,
1170 "| {} | {} | {} |",
1171 entry.key,
1172 entry.value.as_deref().unwrap_or("NULL"),
1173 entry.description
1174 );
1175 }
1176 docs
1177 }
1178}
1179
1180/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1181/// within DataFusion [`ConfigOptions`]
1182///
1183/// This mechanism can be used to pass configuration to user defined functions
1184/// or optimizer passes
1185///
1186/// # Example
1187/// ```
1188/// use datafusion_common::{
1189/// config::ConfigExtension, extensions_options,
1190/// config::ConfigOptions,
1191/// };
1192/// // Define a new configuration struct using the `extensions_options` macro
1193/// extensions_options! {
1194/// /// My own config options.
1195/// pub struct MyConfig {
1196/// /// Should "foo" be replaced by "bar"?
1197/// pub foo_to_bar: bool, default = true
1198///
1199/// /// How many "baz" should be created?
1200/// pub baz_count: usize, default = 1337
1201/// }
1202/// }
1203///
1204/// impl ConfigExtension for MyConfig {
1205/// const PREFIX: &'static str = "my_config";
1206/// }
1207///
1208/// // set up config struct and register extension
1209/// let mut config = ConfigOptions::default();
1210/// config.extensions.insert(MyConfig::default());
1211///
1212/// // overwrite config default
1213/// config.set("my_config.baz_count", "42").unwrap();
1214///
1215/// // check config state
1216/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1217/// assert!(my_config.foo_to_bar,);
1218/// assert_eq!(my_config.baz_count, 42,);
1219/// ```
1220///
1221/// # Note:
1222/// Unfortunately associated constants are not currently object-safe, and so this
1223/// extends the object-safe [`ExtensionOptions`]
1224pub trait ConfigExtension: ExtensionOptions {
1225 /// Configuration namespace prefix to use
1226 ///
1227 /// All values under this will be prefixed with `$PREFIX + "."`
1228 const PREFIX: &'static str;
1229}
1230
1231/// An object-safe API for storing arbitrary configuration.
1232///
1233/// See [`ConfigExtension`] for user defined configuration
1234pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1235 /// Return `self` as [`Any`]
1236 ///
1237 /// This is needed until trait upcasting is stabilized
1238 fn as_any(&self) -> &dyn Any;
1239
1240 /// Return `self` as [`Any`]
1241 ///
1242 /// This is needed until trait upcasting is stabilized
1243 fn as_any_mut(&mut self) -> &mut dyn Any;
1244
1245 /// Return a deep clone of this [`ExtensionOptions`]
1246 ///
1247 /// It is important this does not share mutable state to avoid consistency issues
1248 /// with configuration changing whilst queries are executing
1249 fn cloned(&self) -> Box<dyn ExtensionOptions>;
1250
1251 /// Set the given `key`, `value` pair
1252 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1253
1254 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1255 fn entries(&self) -> Vec<ConfigEntry>;
1256}
1257
1258/// A type-safe container for [`ConfigExtension`]
1259#[derive(Debug, Default, Clone)]
1260pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1261
1262impl Extensions {
1263 /// Create a new, empty [`Extensions`]
1264 pub fn new() -> Self {
1265 Self(BTreeMap::new())
1266 }
1267
1268 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1269 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1270 assert_ne!(T::PREFIX, "datafusion");
1271 let e = ExtensionBox(Box::new(extension));
1272 self.0.insert(T::PREFIX, e);
1273 }
1274
1275 /// Retrieves the extension of the given type if any
1276 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1277 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1278 }
1279
1280 /// Retrieves the extension of the given type if any
1281 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1282 let e = self.0.get_mut(T::PREFIX)?;
1283 e.0.as_any_mut().downcast_mut()
1284 }
1285}
1286
1287#[derive(Debug)]
1288struct ExtensionBox(Box<dyn ExtensionOptions>);
1289
1290impl Clone for ExtensionBox {
1291 fn clone(&self) -> Self {
1292 Self(self.0.cloned())
1293 }
1294}
1295
1296/// A trait implemented by `config_namespace` and for field types that provides
1297/// the ability to walk and mutate the configuration tree
1298pub trait ConfigField {
1299 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1300
1301 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1302}
1303
1304impl<F: ConfigField + Default> ConfigField for Option<F> {
1305 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1306 match self {
1307 Some(s) => s.visit(v, key, description),
1308 None => v.none(key, description),
1309 }
1310 }
1311
1312 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1313 self.get_or_insert_with(Default::default).set(key, value)
1314 }
1315}
1316
1317/// Default transformation to parse a [`ConfigField`] for a string.
1318///
1319/// This uses [`FromStr`] to parse the data.
1320pub fn default_config_transform<T>(input: &str) -> Result<T>
1321where
1322 T: FromStr,
1323 <T as FromStr>::Err: Sync + Send + Error + 'static,
1324{
1325 input.parse().map_err(|e| {
1326 DataFusionError::Context(
1327 format!(
1328 "Error parsing '{}' as {}",
1329 input,
1330 std::any::type_name::<T>()
1331 ),
1332 Box::new(DataFusionError::External(Box::new(e))),
1333 )
1334 })
1335}
1336
1337/// Macro that generates [`ConfigField`] for a given type.
1338///
1339/// # Usage
1340/// This always requires [`Display`] to be implemented for the given type.
1341///
1342/// There are two ways to invoke this macro. The first one uses
1343/// [`default_config_transform`]/[`FromStr`] to parse the data:
1344///
1345/// ```ignore
1346/// config_field(MyType);
1347/// ```
1348///
1349/// Note that the parsing error MUST implement [`std::error::Error`]!
1350///
1351/// Or you can specify how you want to parse an [`str`] into the type:
1352///
1353/// ```ignore
1354/// fn parse_it(s: &str) -> Result<MyType> {
1355/// ...
1356/// }
1357///
1358/// config_field(
1359/// MyType,
1360/// value => parse_it(value)
1361/// );
1362/// ```
1363#[macro_export]
1364macro_rules! config_field {
1365 ($t:ty) => {
1366 config_field!($t, value => $crate::config::default_config_transform(value)?);
1367 };
1368
1369 ($t:ty, $arg:ident => $transform:expr) => {
1370 impl $crate::config::ConfigField for $t {
1371 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1372 v.some(key, self, description)
1373 }
1374
1375 fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1376 *self = $transform;
1377 Ok(())
1378 }
1379 }
1380 };
1381}
1382
1383config_field!(String);
1384config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1385config_field!(usize);
1386config_field!(f64);
1387config_field!(u64);
1388
1389impl ConfigField for u8 {
1390 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1391 v.some(key, self, description)
1392 }
1393
1394 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1395 if value.is_empty() {
1396 return Err(DataFusionError::Configuration(format!(
1397 "Input string for {key} key is empty"
1398 )));
1399 }
1400 // Check if the string is a valid number
1401 if let Ok(num) = value.parse::<u8>() {
1402 // TODO: Let's decide how we treat the numerical strings.
1403 *self = num;
1404 } else {
1405 let bytes = value.as_bytes();
1406 // Check if the first character is ASCII (single byte)
1407 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1408 return Err(DataFusionError::Configuration(format!(
1409 "Error parsing {value} as u8. Non-ASCII string provided"
1410 )));
1411 }
1412 *self = bytes[0];
1413 }
1414 Ok(())
1415 }
1416}
1417
1418impl ConfigField for CompressionTypeVariant {
1419 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1420 v.some(key, self, description)
1421 }
1422
1423 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1424 *self = CompressionTypeVariant::from_str(value)?;
1425 Ok(())
1426 }
1427}
1428
1429/// An implementation trait used to recursively walk configuration
1430pub trait Visit {
1431 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1432
1433 fn none(&mut self, key: &str, description: &'static str);
1434}
1435
1436/// Convenience macro to create [`ExtensionsOptions`].
1437///
1438/// The created structure implements the following traits:
1439///
1440/// - [`Clone`]
1441/// - [`Debug`]
1442/// - [`Default`]
1443/// - [`ExtensionOptions`]
1444///
1445/// # Usage
1446/// The syntax is:
1447///
1448/// ```text
1449/// extensions_options! {
1450/// /// Struct docs (optional).
1451/// [<vis>] struct <StructName> {
1452/// /// Field docs (optional)
1453/// [<vis>] <field_name>: <field_type>, default = <default_value>
1454///
1455/// ... more fields
1456/// }
1457/// }
1458/// ```
1459///
1460/// The placeholders are:
1461/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1462/// - `<StructName>`: Struct name like `MyStruct`.
1463/// - `<field_name>`: Field name like `my_field`.
1464/// - `<field_type>`: Field type like `u8`.
1465/// - `<default_value>`: Default value matching the field type like `42`.
1466///
1467/// # Example
1468/// See also a full example on the [`ConfigExtension`] documentation
1469///
1470/// ```
1471/// use datafusion_common::extensions_options;
1472///
1473/// extensions_options! {
1474/// /// My own config options.
1475/// pub struct MyConfig {
1476/// /// Should "foo" be replaced by "bar"?
1477/// pub foo_to_bar: bool, default = true
1478///
1479/// /// How many "baz" should be created?
1480/// pub baz_count: usize, default = 1337
1481/// }
1482/// }
1483/// ```
1484///
1485///
1486/// [`Debug`]: std::fmt::Debug
1487/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1488#[macro_export]
1489macro_rules! extensions_options {
1490 (
1491 $(#[doc = $struct_d:tt])*
1492 $vis:vis struct $struct_name:ident {
1493 $(
1494 $(#[doc = $d:tt])*
1495 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1496 )*$(,)*
1497 }
1498 ) => {
1499 $(#[doc = $struct_d])*
1500 #[derive(Debug, Clone)]
1501 #[non_exhaustive]
1502 $vis struct $struct_name{
1503 $(
1504 $(#[doc = $d])*
1505 $field_vis $field_name : $field_type,
1506 )*
1507 }
1508
1509 impl Default for $struct_name {
1510 fn default() -> Self {
1511 Self {
1512 $($field_name: $default),*
1513 }
1514 }
1515 }
1516
1517 impl $crate::config::ExtensionOptions for $struct_name {
1518 fn as_any(&self) -> &dyn ::std::any::Any {
1519 self
1520 }
1521
1522 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1523 self
1524 }
1525
1526 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1527 Box::new(self.clone())
1528 }
1529
1530 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1531 $crate::config::ConfigField::set(self, key, value)
1532 }
1533
1534 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1535 struct Visitor(Vec<$crate::config::ConfigEntry>);
1536
1537 impl $crate::config::Visit for Visitor {
1538 fn some<V: std::fmt::Display>(
1539 &mut self,
1540 key: &str,
1541 value: V,
1542 description: &'static str,
1543 ) {
1544 self.0.push($crate::config::ConfigEntry {
1545 key: key.to_string(),
1546 value: Some(value.to_string()),
1547 description,
1548 })
1549 }
1550
1551 fn none(&mut self, key: &str, description: &'static str) {
1552 self.0.push($crate::config::ConfigEntry {
1553 key: key.to_string(),
1554 value: None,
1555 description,
1556 })
1557 }
1558 }
1559
1560 let mut v = Visitor(vec![]);
1561 // The prefix is not used for extensions.
1562 // The description is generated in ConfigField::visit.
1563 // We can just pass empty strings here.
1564 $crate::config::ConfigField::visit(self, &mut v, "", "");
1565 v.0
1566 }
1567 }
1568
1569 impl $crate::config::ConfigField for $struct_name {
1570 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1571 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1572 match key {
1573 $(
1574 stringify!($field_name) => {
1575 // Safely apply deprecated attribute if present
1576 // $(#[allow(deprecated)])?
1577 {
1578 #[allow(deprecated)]
1579 self.$field_name.set(rem, value.as_ref())
1580 }
1581 },
1582 )*
1583 _ => return $crate::error::_config_err!(
1584 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1585 )
1586 }
1587 }
1588
1589 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1590 $(
1591 let key = stringify!($field_name).to_string();
1592 let desc = concat!($($d),*).trim();
1593 #[allow(deprecated)]
1594 self.$field_name.visit(v, key.as_str(), desc);
1595 )*
1596 }
1597 }
1598 }
1599}
1600
1601/// These file types have special built in behavior for configuration.
1602/// Use TableOptions::Extensions for configuring other file types.
1603#[derive(Debug, Clone)]
1604pub enum ConfigFileType {
1605 CSV,
1606 #[cfg(feature = "parquet")]
1607 PARQUET,
1608 JSON,
1609}
1610
1611/// Represents the configuration options available for handling different table formats within a data processing application.
1612/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1613/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1614#[derive(Debug, Clone, Default)]
1615pub struct TableOptions {
1616 /// Configuration options for CSV file handling. This includes settings like the delimiter,
1617 /// quote character, and whether the first row is considered as headers.
1618 pub csv: CsvOptions,
1619
1620 /// Configuration options for Parquet file handling. This includes settings for compression,
1621 /// encoding, and other Parquet-specific file characteristics.
1622 pub parquet: TableParquetOptions,
1623
1624 /// Configuration options for JSON file handling.
1625 pub json: JsonOptions,
1626
1627 /// The current file format that the table operations should assume. This option allows
1628 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1629 pub current_format: Option<ConfigFileType>,
1630
1631 /// Optional extensions that can be used to extend or customize the behavior of the table
1632 /// options. Extensions can be registered using `Extensions::insert` and might include
1633 /// custom file handling logic, additional configuration parameters, or other enhancements.
1634 pub extensions: Extensions,
1635}
1636
1637impl ConfigField for TableOptions {
1638 /// Visits configuration settings for the current file format, or all formats if none is selected.
1639 ///
1640 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1641 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1642 /// it visits all available format settings.
1643 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1644 if let Some(file_type) = &self.current_format {
1645 match file_type {
1646 #[cfg(feature = "parquet")]
1647 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1648 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1649 ConfigFileType::JSON => self.json.visit(v, "format", ""),
1650 }
1651 } else {
1652 self.csv.visit(v, "csv", "");
1653 self.parquet.visit(v, "parquet", "");
1654 self.json.visit(v, "json", "");
1655 }
1656 }
1657
1658 /// Sets a configuration value for a specific key within `TableOptions`.
1659 ///
1660 /// This method delegates setting configuration values to the specific file format configurations,
1661 /// based on the current format selected. If no format is selected, it returns an error.
1662 ///
1663 /// # Parameters
1664 ///
1665 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1666 /// for CSV format.
1667 /// * `value`: The value to set for the specified configuration key.
1668 ///
1669 /// # Returns
1670 ///
1671 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1672 /// or if setting the configuration value fails for the specific format.
1673 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1674 // Extensions are handled in the public `ConfigOptions::set`
1675 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1676 match key {
1677 "format" => {
1678 let Some(format) = &self.current_format else {
1679 return _config_err!("Specify a format for TableOptions");
1680 };
1681 match format {
1682 #[cfg(feature = "parquet")]
1683 ConfigFileType::PARQUET => self.parquet.set(rem, value),
1684 ConfigFileType::CSV => self.csv.set(rem, value),
1685 ConfigFileType::JSON => self.json.set(rem, value),
1686 }
1687 }
1688 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1689 }
1690 }
1691}
1692
1693impl TableOptions {
1694 /// Constructs a new instance of `TableOptions` with default settings.
1695 ///
1696 /// # Returns
1697 ///
1698 /// A new `TableOptions` instance with default configuration values.
1699 pub fn new() -> Self {
1700 Self::default()
1701 }
1702
1703 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1704 ///
1705 /// # Parameters
1706 ///
1707 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1708 ///
1709 /// # Returns
1710 ///
1711 /// A new `TableOptions` instance with settings applied from the session config.
1712 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1713 let initial = TableOptions::default();
1714 initial.combine_with_session_config(config)
1715 }
1716
1717 /// Updates the current `TableOptions` with settings from a given session config.
1718 ///
1719 /// # Parameters
1720 ///
1721 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1722 ///
1723 /// # Returns
1724 ///
1725 /// A new `TableOptions` instance with updated settings from the session config.
1726 #[must_use = "this method returns a new instance"]
1727 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1728 let mut clone = self.clone();
1729 clone.parquet.global = config.execution.parquet.clone();
1730 clone
1731 }
1732
1733 /// Sets the file format for the table.
1734 ///
1735 /// # Parameters
1736 ///
1737 /// * `format`: The file format to use (e.g., CSV, Parquet).
1738 pub fn set_config_format(&mut self, format: ConfigFileType) {
1739 self.current_format = Some(format);
1740 }
1741
1742 /// Sets the extensions for this `TableOptions` instance.
1743 ///
1744 /// # Parameters
1745 ///
1746 /// * `extensions`: The `Extensions` instance to set.
1747 ///
1748 /// # Returns
1749 ///
1750 /// A new `TableOptions` instance with the specified extensions applied.
1751 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1752 self.extensions = extensions;
1753 self
1754 }
1755
1756 /// Sets a specific configuration option.
1757 ///
1758 /// # Parameters
1759 ///
1760 /// * `key`: The configuration key (e.g., "format.delimiter").
1761 /// * `value`: The value to set for the specified key.
1762 ///
1763 /// # Returns
1764 ///
1765 /// A result indicating success or failure in setting the configuration option.
1766 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1767 let Some((prefix, _)) = key.split_once('.') else {
1768 return _config_err!("could not find config namespace for key \"{key}\"");
1769 };
1770
1771 if prefix == "format" {
1772 return ConfigField::set(self, key, value);
1773 }
1774
1775 if prefix == "execution" {
1776 return Ok(());
1777 }
1778
1779 let Some(e) = self.extensions.0.get_mut(prefix) else {
1780 return _config_err!("Could not find config namespace \"{prefix}\"");
1781 };
1782 e.0.set(key, value)
1783 }
1784
1785 /// Initializes a new `TableOptions` from a hash map of string settings.
1786 ///
1787 /// # Parameters
1788 ///
1789 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1790 ///
1791 /// # Returns
1792 ///
1793 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1794 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1795 let mut ret = Self::default();
1796 for (k, v) in settings {
1797 ret.set(k, v)?;
1798 }
1799
1800 Ok(ret)
1801 }
1802
1803 /// Modifies the current `TableOptions` instance with settings from a hash map.
1804 ///
1805 /// # Parameters
1806 ///
1807 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1808 ///
1809 /// # Returns
1810 ///
1811 /// A result indicating success or failure in applying the settings.
1812 pub fn alter_with_string_hash_map(
1813 &mut self,
1814 settings: &HashMap<String, String>,
1815 ) -> Result<()> {
1816 for (k, v) in settings {
1817 self.set(k, v)?;
1818 }
1819 Ok(())
1820 }
1821
1822 /// Retrieves all configuration entries from this `TableOptions`.
1823 ///
1824 /// # Returns
1825 ///
1826 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1827 pub fn entries(&self) -> Vec<ConfigEntry> {
1828 struct Visitor(Vec<ConfigEntry>);
1829
1830 impl Visit for Visitor {
1831 fn some<V: Display>(
1832 &mut self,
1833 key: &str,
1834 value: V,
1835 description: &'static str,
1836 ) {
1837 self.0.push(ConfigEntry {
1838 key: key.to_string(),
1839 value: Some(value.to_string()),
1840 description,
1841 })
1842 }
1843
1844 fn none(&mut self, key: &str, description: &'static str) {
1845 self.0.push(ConfigEntry {
1846 key: key.to_string(),
1847 value: None,
1848 description,
1849 })
1850 }
1851 }
1852
1853 let mut v = Visitor(vec![]);
1854 self.visit(&mut v, "format", "");
1855
1856 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1857 v.0
1858 }
1859}
1860
1861/// Options that control how Parquet files are read, including global options
1862/// that apply to all columns and optional column-specific overrides
1863///
1864/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1865/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1866/// (e.g. sorting_columns).
1867#[derive(Clone, Default, Debug, PartialEq)]
1868pub struct TableParquetOptions {
1869 /// Global Parquet options that propagates to all columns.
1870 pub global: ParquetOptions,
1871 /// Column specific options. Default usage is parquet.XX::column.
1872 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1873 /// Additional file-level metadata to include. Inserted into the key_value_metadata
1874 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1875 ///
1876 /// Multiple entries are permitted
1877 /// ```sql
1878 /// OPTIONS (
1879 /// 'format.metadata::key1' '',
1880 /// 'format.metadata::key2' 'value',
1881 /// 'format.metadata::key3' 'value has spaces',
1882 /// 'format.metadata::key4' 'value has special chars :: :',
1883 /// 'format.metadata::key_dupe' 'original will be overwritten',
1884 /// 'format.metadata::key_dupe' 'final'
1885 /// )
1886 /// ```
1887 pub key_value_metadata: HashMap<String, Option<String>>,
1888 /// Options for configuring Parquet modular encryption
1889 ///
1890 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
1891 /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
1892 /// These can be set via 'format.crypto', for example:
1893 /// ```sql
1894 /// OPTIONS (
1895 /// 'format.crypto.file_encryption.encrypt_footer' 'true',
1896 /// 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" */
1897 /// 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1898 /// 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1899 /// -- Same for decryption
1900 /// 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
1901 /// 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1902 /// 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1903 /// )
1904 /// ```
1905 /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
1906 /// Note that keys must be provided as in hex format since these are binary strings.
1907 pub crypto: ParquetEncryptionOptions,
1908}
1909
1910impl TableParquetOptions {
1911 /// Return new default TableParquetOptions
1912 pub fn new() -> Self {
1913 Self::default()
1914 }
1915
1916 /// Set whether the encoding of the arrow metadata should occur
1917 /// during the writing of parquet.
1918 ///
1919 /// Default is to encode the arrow schema in the file kv_metadata.
1920 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1921 Self {
1922 global: ParquetOptions {
1923 skip_arrow_metadata: skip,
1924 ..self.global
1925 },
1926 ..self
1927 }
1928 }
1929
1930 /// Retrieves all configuration entries from this `TableParquetOptions`.
1931 ///
1932 /// # Returns
1933 ///
1934 /// A vector of `ConfigEntry` instances, representing all the configuration options within this
1935 pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
1936 struct Visitor(Vec<ConfigEntry>);
1937
1938 impl Visit for Visitor {
1939 fn some<V: Display>(
1940 &mut self,
1941 key: &str,
1942 value: V,
1943 description: &'static str,
1944 ) {
1945 self.0.push(ConfigEntry {
1946 key: key[1..].to_string(),
1947 value: Some(value.to_string()),
1948 description,
1949 })
1950 }
1951
1952 fn none(&mut self, key: &str, description: &'static str) {
1953 self.0.push(ConfigEntry {
1954 key: key[1..].to_string(),
1955 value: None,
1956 description,
1957 })
1958 }
1959 }
1960
1961 let mut v = Visitor(vec![]);
1962 self.visit(&mut v, "", "");
1963
1964 v.0
1965 }
1966}
1967
1968impl ConfigField for TableParquetOptions {
1969 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1970 self.global.visit(v, key_prefix, description);
1971 self.column_specific_options
1972 .visit(v, key_prefix, description);
1973 self.crypto
1974 .visit(v, &format!("{key_prefix}.crypto"), description);
1975 }
1976
1977 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1978 // Determine if the key is a global, metadata, or column-specific setting
1979 if key.starts_with("metadata::") {
1980 let k = match key.split("::").collect::<Vec<_>>()[..] {
1981 [_meta] | [_meta, ""] => {
1982 return _config_err!(
1983 "Invalid metadata key provided, missing key in metadata::<key>"
1984 )
1985 }
1986 [_meta, k] => k.into(),
1987 _ => {
1988 return _config_err!(
1989 "Invalid metadata key provided, found too many '::' in \"{key}\""
1990 )
1991 }
1992 };
1993 self.key_value_metadata.insert(k, Some(value.into()));
1994 Ok(())
1995 } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
1996 self.crypto.set(crypto_feature, value)
1997 } else if key.contains("::") {
1998 self.column_specific_options.set(key, value)
1999 } else {
2000 self.global.set(key, value)
2001 }
2002 }
2003}
2004
2005macro_rules! config_namespace_with_hashmap {
2006 (
2007 $(#[doc = $struct_d:tt])*
2008 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
2009 $vis:vis struct $struct_name:ident {
2010 $(
2011 $(#[doc = $d:tt])*
2012 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2013 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2014 )*$(,)*
2015 }
2016 ) => {
2017
2018 $(#[doc = $struct_d])*
2019 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
2020 #[derive(Debug, Clone, PartialEq)]
2021 $vis struct $struct_name{
2022 $(
2023 $(#[doc = $d])*
2024 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2025 $field_vis $field_name : $field_type,
2026 )*
2027 }
2028
2029 impl ConfigField for $struct_name {
2030 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2031 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2032 match key {
2033 $(
2034 stringify!($field_name) => {
2035 // Handle deprecated fields
2036 #[allow(deprecated)] // Allow deprecated fields
2037 $(let value = $transform(value);)?
2038 self.$field_name.set(rem, value.as_ref())
2039 },
2040 )*
2041 _ => _config_err!(
2042 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2043 )
2044 }
2045 }
2046
2047 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2048 $(
2049 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2050 let desc = concat!($($d),*).trim();
2051 // Handle deprecated fields
2052 #[allow(deprecated)]
2053 self.$field_name.visit(v, key.as_str(), desc);
2054 )*
2055 }
2056 }
2057
2058 impl Default for $struct_name {
2059 fn default() -> Self {
2060 #[allow(deprecated)]
2061 Self {
2062 $($field_name: $default),*
2063 }
2064 }
2065 }
2066
2067 impl ConfigField for HashMap<String,$struct_name> {
2068 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2069 let parts: Vec<&str> = key.splitn(2, "::").collect();
2070 match parts.as_slice() {
2071 [inner_key, hashmap_key] => {
2072 // Get or create the struct for the specified key
2073 let inner_value = self
2074 .entry((*hashmap_key).to_owned())
2075 .or_insert_with($struct_name::default);
2076
2077 inner_value.set(inner_key, value)
2078 }
2079 _ => _config_err!("Unrecognized key '{key}'."),
2080 }
2081 }
2082
2083 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2084 for (column_name, col_options) in self {
2085 $(
2086 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2087 let desc = concat!($($d),*).trim();
2088 #[allow(deprecated)]
2089 col_options.$field_name.visit(v, key.as_str(), desc);
2090 )*
2091 }
2092 }
2093 }
2094 }
2095}
2096
2097config_namespace_with_hashmap! {
2098 /// Options controlling parquet format for individual columns.
2099 ///
2100 /// See [`ParquetOptions`] for more details
2101 pub struct ParquetColumnOptions {
2102 /// Sets if bloom filter is enabled for the column path.
2103 pub bloom_filter_enabled: Option<bool>, default = None
2104
2105 /// Sets encoding for the column path.
2106 /// Valid values are: plain, plain_dictionary, rle,
2107 /// bit_packed, delta_binary_packed, delta_length_byte_array,
2108 /// delta_byte_array, rle_dictionary, and byte_stream_split.
2109 /// These values are not case-sensitive. If NULL, uses
2110 /// default parquet options
2111 pub encoding: Option<String>, default = None
2112
2113 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2114 /// default parquet options
2115 pub dictionary_enabled: Option<bool>, default = None
2116
2117 /// Sets default parquet compression codec for the column path.
2118 /// Valid values are: uncompressed, snappy, gzip(level),
2119 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2120 /// These values are not case-sensitive. If NULL, uses
2121 /// default parquet options
2122 pub compression: Option<String>, transform = str::to_lowercase, default = None
2123
2124 /// Sets if statistics are enabled for the column
2125 /// Valid values are: "none", "chunk", and "page"
2126 /// These values are not case sensitive. If NULL, uses
2127 /// default parquet options
2128 pub statistics_enabled: Option<String>, default = None
2129
2130 /// Sets bloom filter false positive probability for the column path. If NULL, uses
2131 /// default parquet options
2132 pub bloom_filter_fpp: Option<f64>, default = None
2133
2134 /// Sets bloom filter number of distinct values. If NULL, uses
2135 /// default parquet options
2136 pub bloom_filter_ndv: Option<u64>, default = None
2137 }
2138}
2139
2140#[derive(Clone, Debug, PartialEq)]
2141pub struct ConfigFileEncryptionProperties {
2142 /// Should the parquet footer be encrypted
2143 /// default is true
2144 pub encrypt_footer: bool,
2145 /// Key to use for the parquet footer encoded in hex format
2146 pub footer_key_as_hex: String,
2147 /// Metadata information for footer key
2148 pub footer_key_metadata_as_hex: String,
2149 /// HashMap of column names --> (key in hex format, metadata)
2150 pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2151 /// AAD prefix string uniquely identifies the file and prevents file swapping
2152 pub aad_prefix_as_hex: String,
2153 /// If true, store the AAD prefix in the file
2154 /// default is false
2155 pub store_aad_prefix: bool,
2156}
2157
2158// Setup to match EncryptionPropertiesBuilder::new()
2159impl Default for ConfigFileEncryptionProperties {
2160 fn default() -> Self {
2161 ConfigFileEncryptionProperties {
2162 encrypt_footer: true,
2163 footer_key_as_hex: String::new(),
2164 footer_key_metadata_as_hex: String::new(),
2165 column_encryption_properties: Default::default(),
2166 aad_prefix_as_hex: String::new(),
2167 store_aad_prefix: false,
2168 }
2169 }
2170}
2171
2172config_namespace_with_hashmap! {
2173 pub struct ColumnEncryptionProperties {
2174 /// Per column encryption key
2175 pub column_key_as_hex: String, default = "".to_string()
2176 /// Per column encryption key metadata
2177 pub column_metadata_as_hex: Option<String>, default = None
2178 }
2179}
2180
2181impl ConfigField for ConfigFileEncryptionProperties {
2182 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2183 let key = format!("{key_prefix}.encrypt_footer");
2184 let desc = "Encrypt the footer";
2185 self.encrypt_footer.visit(v, key.as_str(), desc);
2186
2187 let key = format!("{key_prefix}.footer_key_as_hex");
2188 let desc = "Key to use for the parquet footer";
2189 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2190
2191 let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2192 let desc = "Metadata to use for the parquet footer";
2193 self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2194
2195 self.column_encryption_properties.visit(v, key_prefix, desc);
2196
2197 let key = format!("{key_prefix}.aad_prefix_as_hex");
2198 let desc = "AAD prefix to use";
2199 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2200
2201 let key = format!("{key_prefix}.store_aad_prefix");
2202 let desc = "If true, store the AAD prefix";
2203 self.store_aad_prefix.visit(v, key.as_str(), desc);
2204
2205 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2206 }
2207
2208 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2209 // Any hex encoded values must be pre-encoded using
2210 // hex::encode() before calling set.
2211
2212 if key.contains("::") {
2213 // Handle any column specific properties
2214 return self.column_encryption_properties.set(key, value);
2215 };
2216
2217 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2218 match key {
2219 "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2220 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2221 "footer_key_metadata_as_hex" => {
2222 self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2223 }
2224 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2225 "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2226 _ => _config_err!(
2227 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2228 key
2229 ),
2230 }
2231 }
2232}
2233
2234#[cfg(feature = "parquet_encryption")]
2235impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2236 fn from(val: ConfigFileEncryptionProperties) -> Self {
2237 let mut fep = FileEncryptionProperties::builder(
2238 hex::decode(val.footer_key_as_hex).unwrap(),
2239 )
2240 .with_plaintext_footer(!val.encrypt_footer)
2241 .with_aad_prefix_storage(val.store_aad_prefix);
2242
2243 if !val.footer_key_metadata_as_hex.is_empty() {
2244 fep = fep.with_footer_key_metadata(
2245 hex::decode(&val.footer_key_metadata_as_hex)
2246 .expect("Invalid footer key metadata"),
2247 );
2248 }
2249
2250 for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2251 let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2252 .expect("Invalid column encryption key");
2253 let key_metadata = encryption_props
2254 .column_metadata_as_hex
2255 .as_ref()
2256 .map(|x| hex::decode(x).expect("Invalid column metadata"));
2257 match key_metadata {
2258 Some(key_metadata) => {
2259 fep = fep.with_column_key_and_metadata(
2260 column_name,
2261 encryption_key,
2262 key_metadata,
2263 );
2264 }
2265 None => {
2266 fep = fep.with_column_key(column_name, encryption_key);
2267 }
2268 }
2269 }
2270
2271 if !val.aad_prefix_as_hex.is_empty() {
2272 let aad_prefix: Vec<u8> =
2273 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2274 fep = fep.with_aad_prefix(aad_prefix);
2275 }
2276 fep.build().unwrap()
2277 }
2278}
2279
2280#[cfg(feature = "parquet_encryption")]
2281impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties {
2282 fn from(f: &FileEncryptionProperties) -> Self {
2283 let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2284
2285 let mut column_encryption_properties: HashMap<
2286 String,
2287 ColumnEncryptionProperties,
2288 > = HashMap::new();
2289
2290 for (i, column_name) in column_names_vec.iter().enumerate() {
2291 let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2292 let column_metadata_as_hex: Option<String> =
2293 column_metas_vec.get(i).map(hex::encode);
2294 column_encryption_properties.insert(
2295 column_name.clone(),
2296 ColumnEncryptionProperties {
2297 column_key_as_hex,
2298 column_metadata_as_hex,
2299 },
2300 );
2301 }
2302 let mut aad_prefix: Vec<u8> = Vec::new();
2303 if let Some(prefix) = f.aad_prefix() {
2304 aad_prefix = prefix.clone();
2305 }
2306 ConfigFileEncryptionProperties {
2307 encrypt_footer: f.encrypt_footer(),
2308 footer_key_as_hex: hex::encode(f.footer_key()),
2309 footer_key_metadata_as_hex: f
2310 .footer_key_metadata()
2311 .map(hex::encode)
2312 .unwrap_or_default(),
2313 column_encryption_properties,
2314 aad_prefix_as_hex: hex::encode(aad_prefix),
2315 store_aad_prefix: f.store_aad_prefix(),
2316 }
2317 }
2318}
2319
2320#[derive(Clone, Debug, PartialEq)]
2321pub struct ConfigFileDecryptionProperties {
2322 /// Binary string to use for the parquet footer encoded in hex format
2323 pub footer_key_as_hex: String,
2324 /// HashMap of column names --> key in hex format
2325 pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2326 /// AAD prefix string uniquely identifies the file and prevents file swapping
2327 pub aad_prefix_as_hex: String,
2328 /// If true, then verify signature for files with plaintext footers.
2329 /// default = true
2330 pub footer_signature_verification: bool,
2331}
2332
2333config_namespace_with_hashmap! {
2334 pub struct ColumnDecryptionProperties {
2335 /// Per column encryption key
2336 pub column_key_as_hex: String, default = "".to_string()
2337 }
2338}
2339
2340// Setup to match DecryptionPropertiesBuilder::new()
2341impl Default for ConfigFileDecryptionProperties {
2342 fn default() -> Self {
2343 ConfigFileDecryptionProperties {
2344 footer_key_as_hex: String::new(),
2345 column_decryption_properties: Default::default(),
2346 aad_prefix_as_hex: String::new(),
2347 footer_signature_verification: true,
2348 }
2349 }
2350}
2351
2352impl ConfigField for ConfigFileDecryptionProperties {
2353 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2354 let key = format!("{key_prefix}.footer_key_as_hex");
2355 let desc = "Key to use for the parquet footer";
2356 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2357
2358 let key = format!("{key_prefix}.aad_prefix_as_hex");
2359 let desc = "AAD prefix to use";
2360 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2361
2362 let key = format!("{key_prefix}.footer_signature_verification");
2363 let desc = "If true, verify the footer signature";
2364 self.footer_signature_verification
2365 .visit(v, key.as_str(), desc);
2366
2367 self.column_decryption_properties.visit(v, key_prefix, desc);
2368 }
2369
2370 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2371 // Any hex encoded values must be pre-encoded using
2372 // hex::encode() before calling set.
2373
2374 if key.contains("::") {
2375 // Handle any column specific properties
2376 return self.column_decryption_properties.set(key, value);
2377 };
2378
2379 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2380 match key {
2381 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2382 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2383 "footer_signature_verification" => {
2384 self.footer_signature_verification.set(rem, value.as_ref())
2385 }
2386 _ => _config_err!(
2387 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2388 key
2389 ),
2390 }
2391 }
2392}
2393
2394#[cfg(feature = "parquet_encryption")]
2395impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2396 fn from(val: ConfigFileDecryptionProperties) -> Self {
2397 let mut column_names: Vec<&str> = Vec::new();
2398 let mut column_keys: Vec<Vec<u8>> = Vec::new();
2399
2400 for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2401 column_names.push(col_name.as_str());
2402 column_keys.push(
2403 hex::decode(&decryption_properties.column_key_as_hex)
2404 .expect("Invalid column decryption key"),
2405 );
2406 }
2407
2408 let mut fep = FileDecryptionProperties::builder(
2409 hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2410 )
2411 .with_column_keys(column_names, column_keys)
2412 .unwrap();
2413
2414 if !val.footer_signature_verification {
2415 fep = fep.disable_footer_signature_verification();
2416 }
2417
2418 if !val.aad_prefix_as_hex.is_empty() {
2419 let aad_prefix =
2420 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2421 fep = fep.with_aad_prefix(aad_prefix);
2422 }
2423
2424 fep.build().unwrap()
2425 }
2426}
2427
2428#[cfg(feature = "parquet_encryption")]
2429impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties {
2430 fn from(f: &FileDecryptionProperties) -> Self {
2431 let (column_names_vec, column_keys_vec) = f.column_keys();
2432 let mut column_decryption_properties: HashMap<
2433 String,
2434 ColumnDecryptionProperties,
2435 > = HashMap::new();
2436 for (i, column_name) in column_names_vec.iter().enumerate() {
2437 let props = ColumnDecryptionProperties {
2438 column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2439 };
2440 column_decryption_properties.insert(column_name.clone(), props);
2441 }
2442
2443 let mut aad_prefix: Vec<u8> = Vec::new();
2444 if let Some(prefix) = f.aad_prefix() {
2445 aad_prefix = prefix.clone();
2446 }
2447 ConfigFileDecryptionProperties {
2448 footer_key_as_hex: hex::encode(
2449 f.footer_key(None).unwrap_or_default().as_ref(),
2450 ),
2451 column_decryption_properties,
2452 aad_prefix_as_hex: hex::encode(aad_prefix),
2453 footer_signature_verification: f.check_plaintext_footer_integrity(),
2454 }
2455 }
2456}
2457
2458/// Holds implementation-specific options for an encryption factory
2459#[derive(Clone, Debug, Default, PartialEq)]
2460pub struct EncryptionFactoryOptions {
2461 pub options: HashMap<String, String>,
2462}
2463
2464impl ConfigField for EncryptionFactoryOptions {
2465 fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2466 for (option_key, option_value) in &self.options {
2467 v.some(
2468 &format!("{key}.{option_key}"),
2469 option_value,
2470 "Encryption factory specific option",
2471 );
2472 }
2473 }
2474
2475 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2476 self.options.insert(key.to_owned(), value.to_owned());
2477 Ok(())
2478 }
2479}
2480
2481impl EncryptionFactoryOptions {
2482 /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2483 pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2484 let mut options = T::default();
2485 for (key, value) in &self.options {
2486 options.set(key, value)?;
2487 }
2488 Ok(options)
2489 }
2490}
2491
2492config_namespace! {
2493 /// Options controlling CSV format
2494 pub struct CsvOptions {
2495 /// Specifies whether there is a CSV header (i.e. the first line
2496 /// consists of is column names). The value `None` indicates that
2497 /// the configuration should be consulted.
2498 pub has_header: Option<bool>, default = None
2499 pub delimiter: u8, default = b','
2500 pub quote: u8, default = b'"'
2501 pub terminator: Option<u8>, default = None
2502 pub escape: Option<u8>, default = None
2503 pub double_quote: Option<bool>, default = None
2504 /// Specifies whether newlines in (quoted) values are supported.
2505 ///
2506 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2507 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2508 /// parsed successfully, which may reduce performance.
2509 ///
2510 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2511 pub newlines_in_values: Option<bool>, default = None
2512 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2513 pub schema_infer_max_rec: Option<usize>, default = None
2514 pub date_format: Option<String>, default = None
2515 pub datetime_format: Option<String>, default = None
2516 pub timestamp_format: Option<String>, default = None
2517 pub timestamp_tz_format: Option<String>, default = None
2518 pub time_format: Option<String>, default = None
2519 // The output format for Nulls in the CSV writer.
2520 pub null_value: Option<String>, default = None
2521 // The input regex for Nulls when loading CSVs.
2522 pub null_regex: Option<String>, default = None
2523 pub comment: Option<u8>, default = None
2524 }
2525}
2526
2527impl CsvOptions {
2528 /// Set a limit in terms of records to scan to infer the schema
2529 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2530 pub fn with_compression(
2531 mut self,
2532 compression_type_variant: CompressionTypeVariant,
2533 ) -> Self {
2534 self.compression = compression_type_variant;
2535 self
2536 }
2537
2538 /// Set a limit in terms of records to scan to infer the schema
2539 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2540 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2541 self.schema_infer_max_rec = Some(max_rec);
2542 self
2543 }
2544
2545 /// Set true to indicate that the first line is a header.
2546 /// - default to true
2547 pub fn with_has_header(mut self, has_header: bool) -> Self {
2548 self.has_header = Some(has_header);
2549 self
2550 }
2551
2552 /// Returns true if the first line is a header. If format options does not
2553 /// specify whether there is a header, returns `None` (indicating that the
2554 /// configuration should be consulted).
2555 pub fn has_header(&self) -> Option<bool> {
2556 self.has_header
2557 }
2558
2559 /// The character separating values within a row.
2560 /// - default to ','
2561 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2562 self.delimiter = delimiter;
2563 self
2564 }
2565
2566 /// The quote character in a row.
2567 /// - default to '"'
2568 pub fn with_quote(mut self, quote: u8) -> Self {
2569 self.quote = quote;
2570 self
2571 }
2572
2573 /// The character that terminates a row.
2574 /// - default to None (CRLF)
2575 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2576 self.terminator = terminator;
2577 self
2578 }
2579
2580 /// The escape character in a row.
2581 /// - default is None
2582 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2583 self.escape = escape;
2584 self
2585 }
2586
2587 /// Set true to indicate that the CSV quotes should be doubled.
2588 /// - default to true
2589 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2590 self.double_quote = Some(double_quote);
2591 self
2592 }
2593
2594 /// Specifies whether newlines in (quoted) values are supported.
2595 ///
2596 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2597 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2598 /// parsed successfully, which may reduce performance.
2599 ///
2600 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2601 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2602 self.newlines_in_values = Some(newlines_in_values);
2603 self
2604 }
2605
2606 /// Set a `CompressionTypeVariant` of CSV
2607 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2608 pub fn with_file_compression_type(
2609 mut self,
2610 compression: CompressionTypeVariant,
2611 ) -> Self {
2612 self.compression = compression;
2613 self
2614 }
2615
2616 /// The delimiter character.
2617 pub fn delimiter(&self) -> u8 {
2618 self.delimiter
2619 }
2620
2621 /// The quote character.
2622 pub fn quote(&self) -> u8 {
2623 self.quote
2624 }
2625
2626 /// The terminator character.
2627 pub fn terminator(&self) -> Option<u8> {
2628 self.terminator
2629 }
2630
2631 /// The escape character.
2632 pub fn escape(&self) -> Option<u8> {
2633 self.escape
2634 }
2635}
2636
2637config_namespace! {
2638 /// Options controlling JSON format
2639 pub struct JsonOptions {
2640 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2641 pub schema_infer_max_rec: Option<usize>, default = None
2642 }
2643}
2644
2645pub trait OutputFormatExt: Display {}
2646
2647#[derive(Debug, Clone, PartialEq)]
2648#[allow(clippy::large_enum_variant)]
2649pub enum OutputFormat {
2650 CSV(CsvOptions),
2651 JSON(JsonOptions),
2652 #[cfg(feature = "parquet")]
2653 PARQUET(TableParquetOptions),
2654 AVRO,
2655 ARROW,
2656}
2657
2658impl Display for OutputFormat {
2659 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2660 let out = match self {
2661 OutputFormat::CSV(_) => "csv",
2662 OutputFormat::JSON(_) => "json",
2663 #[cfg(feature = "parquet")]
2664 OutputFormat::PARQUET(_) => "parquet",
2665 OutputFormat::AVRO => "avro",
2666 OutputFormat::ARROW => "arrow",
2667 };
2668 write!(f, "{out}")
2669 }
2670}
2671
2672#[cfg(test)]
2673mod tests {
2674 use crate::config::{
2675 ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2676 Extensions, TableOptions, TableParquetOptions,
2677 };
2678 use std::any::Any;
2679 use std::collections::HashMap;
2680
2681 #[derive(Default, Debug, Clone)]
2682 pub struct TestExtensionConfig {
2683 /// Should "foo" be replaced by "bar"?
2684 pub properties: HashMap<String, String>,
2685 }
2686
2687 impl ExtensionOptions for TestExtensionConfig {
2688 fn as_any(&self) -> &dyn Any {
2689 self
2690 }
2691
2692 fn as_any_mut(&mut self) -> &mut dyn Any {
2693 self
2694 }
2695
2696 fn cloned(&self) -> Box<dyn ExtensionOptions> {
2697 Box::new(self.clone())
2698 }
2699
2700 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2701 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2702 assert_eq!(key, "test");
2703 self.properties.insert(rem.to_owned(), value.to_owned());
2704 Ok(())
2705 }
2706
2707 fn entries(&self) -> Vec<ConfigEntry> {
2708 self.properties
2709 .iter()
2710 .map(|(k, v)| ConfigEntry {
2711 key: k.into(),
2712 value: Some(v.into()),
2713 description: "",
2714 })
2715 .collect()
2716 }
2717 }
2718
2719 impl ConfigExtension for TestExtensionConfig {
2720 const PREFIX: &'static str = "test";
2721 }
2722
2723 #[test]
2724 fn create_table_config() {
2725 let mut extension = Extensions::new();
2726 extension.insert(TestExtensionConfig::default());
2727 let table_config = TableOptions::new().with_extensions(extension);
2728 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2729 assert!(kafka_config.is_some())
2730 }
2731
2732 #[test]
2733 fn alter_test_extension_config() {
2734 let mut extension = Extensions::new();
2735 extension.insert(TestExtensionConfig::default());
2736 let mut table_config = TableOptions::new().with_extensions(extension);
2737 table_config.set_config_format(ConfigFileType::CSV);
2738 table_config.set("format.delimiter", ";").unwrap();
2739 assert_eq!(table_config.csv.delimiter, b';');
2740 table_config.set("test.bootstrap.servers", "asd").unwrap();
2741 let kafka_config = table_config
2742 .extensions
2743 .get::<TestExtensionConfig>()
2744 .unwrap();
2745 assert_eq!(
2746 kafka_config.properties.get("bootstrap.servers").unwrap(),
2747 "asd"
2748 );
2749 }
2750
2751 #[test]
2752 fn csv_u8_table_options() {
2753 let mut table_config = TableOptions::new();
2754 table_config.set_config_format(ConfigFileType::CSV);
2755 table_config.set("format.delimiter", ";").unwrap();
2756 assert_eq!(table_config.csv.delimiter as char, ';');
2757 table_config.set("format.escape", "\"").unwrap();
2758 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2759 table_config.set("format.escape", "\'").unwrap();
2760 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2761 }
2762
2763 #[test]
2764 fn warning_only_not_default() {
2765 use std::sync::atomic::AtomicUsize;
2766 static COUNT: AtomicUsize = AtomicUsize::new(0);
2767 use log::{Level, LevelFilter, Metadata, Record};
2768 struct SimpleLogger;
2769 impl log::Log for SimpleLogger {
2770 fn enabled(&self, metadata: &Metadata) -> bool {
2771 metadata.level() <= Level::Info
2772 }
2773
2774 fn log(&self, record: &Record) {
2775 if self.enabled(record.metadata()) {
2776 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2777 }
2778 }
2779 fn flush(&self) {}
2780 }
2781 log::set_logger(&SimpleLogger).unwrap();
2782 log::set_max_level(LevelFilter::Info);
2783 let mut sql_parser_options = crate::config::SqlParserOptions::default();
2784 sql_parser_options
2785 .set("enable_options_value_normalization", "false")
2786 .unwrap();
2787 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2788 sql_parser_options
2789 .set("enable_options_value_normalization", "true")
2790 .unwrap();
2791 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2792 }
2793
2794 #[cfg(feature = "parquet")]
2795 #[test]
2796 fn parquet_table_options() {
2797 let mut table_config = TableOptions::new();
2798 table_config.set_config_format(ConfigFileType::PARQUET);
2799 table_config
2800 .set("format.bloom_filter_enabled::col1", "true")
2801 .unwrap();
2802 assert_eq!(
2803 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2804 Some(true)
2805 );
2806 }
2807
2808 #[cfg(feature = "parquet_encryption")]
2809 #[test]
2810 fn parquet_table_encryption() {
2811 use crate::config::{
2812 ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
2813 };
2814 use parquet::encryption::decrypt::FileDecryptionProperties;
2815 use parquet::encryption::encrypt::FileEncryptionProperties;
2816
2817 let footer_key = b"0123456789012345".to_vec(); // 128bit/16
2818 let column_names = vec!["double_field", "float_field"];
2819 let column_keys =
2820 vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
2821
2822 let file_encryption_properties =
2823 FileEncryptionProperties::builder(footer_key.clone())
2824 .with_column_keys(column_names.clone(), column_keys.clone())
2825 .unwrap()
2826 .build()
2827 .unwrap();
2828
2829 let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
2830 .with_column_keys(column_names.clone(), column_keys.clone())
2831 .unwrap()
2832 .build()
2833 .unwrap();
2834
2835 // Test round-trip
2836 let config_encrypt: ConfigFileEncryptionProperties =
2837 (&file_encryption_properties).into();
2838 let encryption_properties_built: FileEncryptionProperties =
2839 config_encrypt.clone().into();
2840 assert_eq!(file_encryption_properties, encryption_properties_built);
2841
2842 let config_decrypt: ConfigFileDecryptionProperties =
2843 (&decryption_properties).into();
2844 let decryption_properties_built: FileDecryptionProperties =
2845 config_decrypt.clone().into();
2846 assert_eq!(decryption_properties, decryption_properties_built);
2847
2848 ///////////////////////////////////////////////////////////////////////////////////
2849 // Test encryption config
2850
2851 // Display original encryption config
2852 // println!("{:#?}", config_encrypt);
2853
2854 let mut table_config = TableOptions::new();
2855 table_config.set_config_format(ConfigFileType::PARQUET);
2856 table_config
2857 .parquet
2858 .set(
2859 "crypto.file_encryption.encrypt_footer",
2860 config_encrypt.encrypt_footer.to_string().as_str(),
2861 )
2862 .unwrap();
2863 table_config
2864 .parquet
2865 .set(
2866 "crypto.file_encryption.footer_key_as_hex",
2867 config_encrypt.footer_key_as_hex.as_str(),
2868 )
2869 .unwrap();
2870
2871 for (i, col_name) in column_names.iter().enumerate() {
2872 let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
2873 let value = hex::encode(column_keys[i].clone());
2874 table_config
2875 .parquet
2876 .set(key.as_str(), value.as_str())
2877 .unwrap();
2878 }
2879
2880 // Print matching final encryption config
2881 // println!("{:#?}", table_config.parquet.crypto.file_encryption);
2882
2883 assert_eq!(
2884 table_config.parquet.crypto.file_encryption,
2885 Some(config_encrypt)
2886 );
2887
2888 ///////////////////////////////////////////////////////////////////////////////////
2889 // Test decryption config
2890
2891 // Display original decryption config
2892 // println!("{:#?}", config_decrypt);
2893
2894 let mut table_config = TableOptions::new();
2895 table_config.set_config_format(ConfigFileType::PARQUET);
2896 table_config
2897 .parquet
2898 .set(
2899 "crypto.file_decryption.footer_key_as_hex",
2900 config_decrypt.footer_key_as_hex.as_str(),
2901 )
2902 .unwrap();
2903
2904 for (i, col_name) in column_names.iter().enumerate() {
2905 let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
2906 let value = hex::encode(column_keys[i].clone());
2907 table_config
2908 .parquet
2909 .set(key.as_str(), value.as_str())
2910 .unwrap();
2911 }
2912
2913 // Print matching final decryption config
2914 // println!("{:#?}", table_config.parquet.crypto.file_decryption);
2915
2916 assert_eq!(
2917 table_config.parquet.crypto.file_decryption,
2918 Some(config_decrypt.clone())
2919 );
2920
2921 // Set config directly
2922 let mut table_config = TableOptions::new();
2923 table_config.set_config_format(ConfigFileType::PARQUET);
2924 table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
2925 assert_eq!(
2926 table_config.parquet.crypto.file_decryption,
2927 Some(config_decrypt.clone())
2928 );
2929 }
2930
2931 #[cfg(feature = "parquet_encryption")]
2932 #[test]
2933 fn parquet_encryption_factory_config() {
2934 let mut parquet_options = TableParquetOptions::default();
2935
2936 assert_eq!(parquet_options.crypto.factory_id, None);
2937 assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
2938
2939 let mut input_config = TestExtensionConfig::default();
2940 input_config
2941 .properties
2942 .insert("key1".to_string(), "value 1".to_string());
2943 input_config
2944 .properties
2945 .insert("key2".to_string(), "value 2".to_string());
2946
2947 parquet_options
2948 .crypto
2949 .configure_factory("example_factory", &input_config);
2950
2951 assert_eq!(
2952 parquet_options.crypto.factory_id,
2953 Some("example_factory".to_string())
2954 );
2955 let factory_options = &parquet_options.crypto.factory_options.options;
2956 assert_eq!(factory_options.len(), 2);
2957 assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
2958 assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
2959 }
2960
2961 #[cfg(feature = "parquet")]
2962 #[test]
2963 fn parquet_table_options_config_entry() {
2964 let mut table_config = TableOptions::new();
2965 table_config.set_config_format(ConfigFileType::PARQUET);
2966 table_config
2967 .set("format.bloom_filter_enabled::col1", "true")
2968 .unwrap();
2969 let entries = table_config.entries();
2970 assert!(entries
2971 .iter()
2972 .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2973 }
2974
2975 #[cfg(feature = "parquet")]
2976 #[test]
2977 fn parquet_table_parquet_options_config_entry() {
2978 let mut table_parquet_options = TableParquetOptions::new();
2979 table_parquet_options
2980 .set(
2981 "crypto.file_encryption.column_key_as_hex::double_field",
2982 "31323334353637383930313233343530",
2983 )
2984 .unwrap();
2985 let entries = table_parquet_options.entries();
2986 assert!(entries
2987 .iter()
2988 .any(|item| item.key
2989 == "crypto.file_encryption.column_key_as_hex::double_field"))
2990 }
2991
2992 #[cfg(feature = "parquet")]
2993 #[test]
2994 fn parquet_table_options_config_metadata_entry() {
2995 let mut table_config = TableOptions::new();
2996 table_config.set_config_format(ConfigFileType::PARQUET);
2997 table_config.set("format.metadata::key1", "").unwrap();
2998 table_config.set("format.metadata::key2", "value2").unwrap();
2999 table_config
3000 .set("format.metadata::key3", "value with spaces ")
3001 .unwrap();
3002 table_config
3003 .set("format.metadata::key4", "value with special chars :: :")
3004 .unwrap();
3005
3006 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3007 assert_eq!(parsed_metadata.get("should not exist1"), None);
3008 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3009 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3010 assert_eq!(
3011 parsed_metadata.get("key3"),
3012 Some(&Some("value with spaces ".into()))
3013 );
3014 assert_eq!(
3015 parsed_metadata.get("key4"),
3016 Some(&Some("value with special chars :: :".into()))
3017 );
3018
3019 // duplicate keys are overwritten
3020 table_config.set("format.metadata::key_dupe", "A").unwrap();
3021 table_config.set("format.metadata::key_dupe", "B").unwrap();
3022 let parsed_metadata = table_config.parquet.key_value_metadata;
3023 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3024 }
3025}