datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::parsers::CompressionTypeVariant;
26use crate::utils::get_available_parallelism;
27use crate::{DataFusionError, Result};
28use std::any::Any;
29use std::collections::{BTreeMap, HashMap};
30use std::error::Error;
31use std::fmt::{self, Display};
32use std::str::FromStr;
33
34#[cfg(feature = "parquet_encryption")]
35use hex;
36
37/// A macro that wraps a configuration struct and automatically derives
38/// [`Default`] and [`ConfigField`] for it, allowing it to be used
39/// in the [`ConfigOptions`] configuration tree.
40///
41/// `transform` is used to normalize values before parsing.
42///
43/// For example,
44///
45/// ```ignore
46/// config_namespace! {
47/// /// Amazing config
48/// pub struct MyConfig {
49/// /// Field 1 doc
50/// field1: String, transform = str::to_lowercase, default = "".to_string()
51///
52/// /// Field 2 doc
53/// field2: usize, default = 232
54///
55/// /// Field 3 doc
56/// field3: Option<usize>, default = None
57/// }
58///}
59/// ```
60///
61/// Will generate
62///
63/// ```ignore
64/// /// Amazing config
65/// #[derive(Debug, Clone)]
66/// #[non_exhaustive]
67/// pub struct MyConfig {
68/// /// Field 1 doc
69/// field1: String,
70/// /// Field 2 doc
71/// field2: usize,
72/// /// Field 3 doc
73/// field3: Option<usize>,
74/// }
75/// impl ConfigField for MyConfig {
76/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
77/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
78/// match key {
79/// "field1" => {
80/// let value = str::to_lowercase(value);
81/// self.field1.set(rem, value.as_ref())
82/// },
83/// "field2" => self.field2.set(rem, value.as_ref()),
84/// "field3" => self.field3.set(rem, value.as_ref()),
85/// _ => _internal_err!(
86/// "Config value \"{}\" not found on MyConfig",
87/// key
88/// ),
89/// }
90/// }
91///
92/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
93/// let key = format!("{}.field1", key_prefix);
94/// let desc = "Field 1 doc";
95/// self.field1.visit(v, key.as_str(), desc);
96/// let key = format!("{}.field2", key_prefix);
97/// let desc = "Field 2 doc";
98/// self.field2.visit(v, key.as_str(), desc);
99/// let key = format!("{}.field3", key_prefix);
100/// let desc = "Field 3 doc";
101/// self.field3.visit(v, key.as_str(), desc);
102/// }
103/// }
104///
105/// impl Default for MyConfig {
106/// fn default() -> Self {
107/// Self {
108/// field1: "".to_string(),
109/// field2: 232,
110/// field3: None,
111/// }
112/// }
113/// }
114/// ```
115///
116/// NB: Misplaced commas may result in nonsensical errors
117#[macro_export]
118macro_rules! config_namespace {
119 (
120 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
121 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
122 $(#[allow($($struct_de:tt)*)])?
123 $vis:vis struct $struct_name:ident {
124 $(
125 $(#[doc = $d:tt])* // Field-level documentation attributes
126 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
127 $(#[allow($($field_de:tt)*)])?
128 $field_vis:vis $field_name:ident : $field_type:ty,
129 $(warn = $warn:expr,)?
130 $(transform = $transform:expr,)?
131 default = $default:expr
132 )*$(,)*
133 }
134 ) => {
135 $(#[doc = $struct_d])* // Apply struct documentation
136 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
137 $(#[allow($($struct_de)*)])?
138 #[derive(Debug, Clone, PartialEq)]
139 $vis struct $struct_name {
140 $(
141 $(#[doc = $d])* // Apply field documentation
142 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
143 $(#[allow($($field_de)*)])?
144 $field_vis $field_name: $field_type,
145 )*
146 }
147
148 impl $crate::config::ConfigField for $struct_name {
149 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
150 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
151 match key {
152 $(
153 stringify!($field_name) => {
154 // Safely apply deprecated attribute if present
155 // $(#[allow(deprecated)])?
156 {
157 $(let value = $transform(value);)? // Apply transformation if specified
158 #[allow(deprecated)]
159 let ret = self.$field_name.set(rem, value.as_ref());
160
161 $(if !$warn.is_empty() {
162 let default: $field_type = $default;
163 #[allow(deprecated)]
164 if default != self.$field_name {
165 log::warn!($warn);
166 }
167 })? // Log warning if specified, and the value is not the default
168 ret
169 }
170 },
171 )*
172 _ => return $crate::error::_config_err!(
173 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
174 )
175 }
176 }
177
178 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
179 $(
180 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
181 let desc = concat!($($d),*).trim();
182 #[allow(deprecated)]
183 self.$field_name.visit(v, key.as_str(), desc);
184 )*
185 }
186 }
187 impl Default for $struct_name {
188 fn default() -> Self {
189 #[allow(deprecated)]
190 Self {
191 $($field_name: $default),*
192 }
193 }
194 }
195 }
196}
197config_namespace! {
198 /// Options related to catalog and directory scanning
199 ///
200 /// See also: [`SessionConfig`]
201 ///
202 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
203 pub struct CatalogOptions {
204 /// Whether the default catalog and schema should be created automatically.
205 pub create_default_catalog_and_schema: bool, default = true
206
207 /// The default catalog name - this impacts what SQL queries use if not specified
208 pub default_catalog: String, default = "datafusion".to_string()
209
210 /// The default schema name - this impacts what SQL queries use if not specified
211 pub default_schema: String, default = "public".to_string()
212
213 /// Should DataFusion provide access to `information_schema`
214 /// virtual tables for displaying schema information
215 pub information_schema: bool, default = false
216
217 /// Location scanned to load tables for `default` schema
218 pub location: Option<String>, default = None
219
220 /// Type of `TableProvider` to use when loading `default` schema
221 pub format: Option<String>, default = None
222
223 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
224 /// if not specified explicitly in the statement.
225 pub has_header: bool, default = true
226
227 /// Specifies whether newlines in (quoted) CSV values are supported.
228 ///
229 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
230 /// if not specified explicitly in the statement.
231 ///
232 /// Parsing newlines in quoted values may be affected by execution behaviour such as
233 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
234 /// parsed successfully, which may reduce performance.
235 pub newlines_in_values: bool, default = false
236 }
237}
238
239config_namespace! {
240 /// Options related to SQL parser
241 ///
242 /// See also: [`SessionConfig`]
243 ///
244 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
245 pub struct SqlParserOptions {
246 /// When set to true, SQL parser will parse float as decimal type
247 pub parse_float_as_decimal: bool, default = false
248
249 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
250 pub enable_ident_normalization: bool, default = true
251
252 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
253 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
254 /// are normalized automatically.
255 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
256
257 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
258 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
259 pub dialect: String, default = "generic".to_string()
260 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
261
262 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
263 /// ignore the length. If false, error if a `VARCHAR` with a length is
264 /// specified. The Arrow type system does not have a notion of maximum
265 /// string length and thus DataFusion can not enforce such limits.
266 pub support_varchar_with_length: bool, default = true
267
268 /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
269 /// If false, they are mapped to `Utf8`.
270 /// Default is true.
271 pub map_string_types_to_utf8view: bool, default = true
272
273 /// When set to true, the source locations relative to the original SQL
274 /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
275 /// and recorded in the logical plan nodes.
276 pub collect_spans: bool, default = false
277
278 /// Specifies the recursion depth limit when parsing complex SQL Queries
279 pub recursion_limit: usize, default = 50
280 }
281}
282
283#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
284pub enum SpillCompression {
285 Zstd,
286 Lz4Frame,
287 #[default]
288 Uncompressed,
289}
290
291impl FromStr for SpillCompression {
292 type Err = DataFusionError;
293
294 fn from_str(s: &str) -> Result<Self, Self::Err> {
295 match s.to_ascii_lowercase().as_str() {
296 "zstd" => Ok(Self::Zstd),
297 "lz4_frame" => Ok(Self::Lz4Frame),
298 "uncompressed" | "" => Ok(Self::Uncompressed),
299 other => Err(DataFusionError::Configuration(format!(
300 "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
301 ))),
302 }
303 }
304}
305
306impl ConfigField for SpillCompression {
307 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
308 v.some(key, self, description)
309 }
310
311 fn set(&mut self, _: &str, value: &str) -> Result<()> {
312 *self = SpillCompression::from_str(value)?;
313 Ok(())
314 }
315}
316
317impl Display for SpillCompression {
318 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319 let str = match self {
320 Self::Zstd => "zstd",
321 Self::Lz4Frame => "lz4_frame",
322 Self::Uncompressed => "uncompressed",
323 };
324 write!(f, "{str}")
325 }
326}
327
328impl From<SpillCompression> for Option<CompressionType> {
329 fn from(c: SpillCompression) -> Self {
330 match c {
331 SpillCompression::Zstd => Some(CompressionType::ZSTD),
332 SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
333 SpillCompression::Uncompressed => None,
334 }
335 }
336}
337
338config_namespace! {
339 /// Options related to query execution
340 ///
341 /// See also: [`SessionConfig`]
342 ///
343 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
344 pub struct ExecutionOptions {
345 /// Default batch size while creating new batches, it's especially useful for
346 /// buffer-in-memory batches since creating tiny batches would result in too much
347 /// metadata memory consumption
348 pub batch_size: usize, default = 8192
349
350 /// When set to true, record batches will be examined between each operator and
351 /// small batches will be coalesced into larger batches. This is helpful when there
352 /// are highly selective filters or joins that could produce tiny output batches. The
353 /// target batch size is determined by the configuration setting
354 pub coalesce_batches: bool, default = true
355
356 /// Should DataFusion collect statistics when first creating a table.
357 /// Has no effect after the table is created. Applies to the default
358 /// `ListingTableProvider` in DataFusion. Defaults to true.
359 pub collect_statistics: bool, default = true
360
361 /// Number of partitions for query execution. Increasing partitions can increase
362 /// concurrency.
363 ///
364 /// Defaults to the number of CPU cores on the system
365 pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
366
367 /// The default time zone
368 ///
369 /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
370 /// according to this time zone, and then extract the hour
371 pub time_zone: String, default = "+00:00".into()
372
373 /// Parquet options
374 pub parquet: ParquetOptions, default = Default::default()
375
376 /// Fan-out during initial physical planning.
377 ///
378 /// This is mostly use to plan `UNION` children in parallel.
379 ///
380 /// Defaults to the number of CPU cores on the system
381 pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
382
383 /// When set to true, skips verifying that the schema produced by
384 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
385 /// schema of the input plan.
386 ///
387 /// When set to false, if the schema does not match exactly
388 /// (including nullability and metadata), a planning error will be raised.
389 ///
390 /// This is used to workaround bugs in the planner that are now caught by
391 /// the new schema verification step.
392 pub skip_physical_aggregate_schema_check: bool, default = false
393
394 /// Sets the compression codec used when spilling data to disk.
395 ///
396 /// Since datafusion writes spill files using the Arrow IPC Stream format,
397 /// only codecs supported by the Arrow IPC Stream Writer are allowed.
398 /// Valid values are: uncompressed, lz4_frame, zstd.
399 /// Note: lz4_frame offers faster (de)compression, but typically results in
400 /// larger spill files. In contrast, zstd achieves
401 /// higher compression ratios at the cost of slower (de)compression speed.
402 pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
403
404 /// Specifies the reserved memory for each spillable sort operation to
405 /// facilitate an in-memory merge.
406 ///
407 /// When a sort operation spills to disk, the in-memory data must be
408 /// sorted and merged before being written to a file. This setting reserves
409 /// a specific amount of memory for that in-memory sort/merge process.
410 ///
411 /// Note: This setting is irrelevant if the sort operation cannot spill
412 /// (i.e., if there's no `DiskManager` configured).
413 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
414
415 /// When sorting, below what size should data be concatenated
416 /// and sorted in a single RecordBatch rather than sorted in
417 /// batches and merged.
418 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
419
420 /// Number of files to read in parallel when inferring schema and statistics
421 pub meta_fetch_concurrency: usize, default = 32
422
423 /// Guarantees a minimum level of output files running in parallel.
424 /// RecordBatches will be distributed in round robin fashion to each
425 /// parallel writer. Each writer is closed and a new file opened once
426 /// soft_max_rows_per_output_file is reached.
427 pub minimum_parallel_output_files: usize, default = 4
428
429 /// Target number of rows in output files when writing multiple.
430 /// This is a soft max, so it can be exceeded slightly. There also
431 /// will be one file smaller than the limit if the total
432 /// number of rows written is not roughly divisible by the soft max
433 pub soft_max_rows_per_output_file: usize, default = 50000000
434
435 /// This is the maximum number of RecordBatches buffered
436 /// for each output file being worked. Higher values can potentially
437 /// give faster write performance at the cost of higher peak
438 /// memory consumption
439 pub max_buffered_batches_per_output_file: usize, default = 2
440
441 /// Should sub directories be ignored when scanning directories for data
442 /// files. Defaults to true (ignores subdirectories), consistent with
443 /// Hive. Note that this setting does not affect reading partitioned
444 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
445 pub listing_table_ignore_subdirectory: bool, default = true
446
447 /// Should DataFusion support recursive CTEs
448 pub enable_recursive_ctes: bool, default = true
449
450 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
451 /// statistics into the same file groups.
452 /// Currently experimental
453 pub split_file_groups_by_statistics: bool, default = false
454
455 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
456 pub keep_partition_by_columns: bool, default = false
457
458 /// Aggregation ratio (number of distinct groups / number of input rows)
459 /// threshold for skipping partial aggregation. If the value is greater
460 /// then partial aggregation will skip aggregation for further input
461 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
462
463 /// Number of input rows partial aggregation partition should process, before
464 /// aggregation ratio check and trying to switch to skipping aggregation mode
465 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
466
467 /// Should DataFusion use row number estimates at the input to decide
468 /// whether increasing parallelism is beneficial or not. By default,
469 /// only exact row numbers (not estimates) are used for this decision.
470 /// Setting this flag to `true` will likely produce better plans.
471 /// if the source of statistics is accurate.
472 /// We plan to make this the default in the future.
473 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
474
475 /// Should DataFusion enforce batch size in joins or not. By default,
476 /// DataFusion will not enforce batch size in joins. Enforcing batch size
477 /// in joins can reduce memory usage when joining large
478 /// tables with a highly-selective join filter, but is also slightly slower.
479 pub enforce_batch_size_in_joins: bool, default = false
480
481 /// Size (bytes) of data buffer DataFusion uses when writing output files.
482 /// This affects the size of the data chunks that are uploaded to remote
483 /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
484 /// written, it may be necessary to increase this size to avoid errors from
485 /// the remote end point.
486 pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
487 }
488}
489
490config_namespace! {
491 /// Options for reading and writing parquet files
492 ///
493 /// See also: [`SessionConfig`]
494 ///
495 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
496 pub struct ParquetOptions {
497 // The following options affect reading parquet files
498
499 /// (reading) If true, reads the Parquet data page level metadata (the
500 /// Page Index), if present, to reduce the I/O and number of
501 /// rows decoded.
502 pub enable_page_index: bool, default = true
503
504 /// (reading) If true, the parquet reader attempts to skip entire row groups based
505 /// on the predicate in the query and the metadata (min/max values) stored in
506 /// the parquet file
507 pub pruning: bool, default = true
508
509 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
510 /// the file Schema. This setting can help avoid schema conflicts when querying
511 /// multiple parquet files with schemas containing compatible types but different metadata
512 pub skip_metadata: bool, default = true
513
514 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
515 /// bytes of the parquet file optimistically. If not specified, two reads are required:
516 /// One read to fetch the 8-byte parquet footer and
517 /// another to fetch the metadata length encoded in the footer
518 pub metadata_size_hint: Option<usize>, default = None
519
520 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
521 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
522 pub pushdown_filters: bool, default = false
523
524 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
525 /// will be reordered heuristically to minimize the cost of evaluation. If false,
526 /// the filters are applied in the same order as written in the query
527 pub reorder_filters: bool, default = false
528
529 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
530 /// and `Binary/BinaryLarge` with `BinaryView`.
531 pub schema_force_view_types: bool, default = true
532
533 /// (reading) If true, parquet reader will read columns of
534 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
535 ///
536 /// Parquet files generated by some legacy writers do not correctly set
537 /// the UTF8 flag for strings, causing string columns to be loaded as
538 /// BLOB instead.
539 pub binary_as_string: bool, default = false
540
541 /// (reading) If true, parquet reader will read columns of
542 /// physical type int96 as originating from a different resolution
543 /// than nanosecond. This is useful for reading data from systems like Spark
544 /// which stores microsecond resolution timestamps in an int96 allowing it
545 /// to write values with a larger date range than 64-bit timestamps with
546 /// nanosecond resolution.
547 pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
548
549 /// (reading) Use any available bloom filters when reading parquet files
550 pub bloom_filter_on_read: bool, default = true
551
552 // The following options affect writing to parquet files
553 // and map to parquet::file::properties::WriterProperties
554
555 /// (writing) Sets best effort maximum size of data page in bytes
556 pub data_pagesize_limit: usize, default = 1024 * 1024
557
558 /// (writing) Sets write_batch_size in bytes
559 pub write_batch_size: usize, default = 1024
560
561 /// (writing) Sets parquet writer version
562 /// valid values are "1.0" and "2.0"
563 pub writer_version: String, default = "1.0".to_string()
564
565 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
566 ///
567 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
568 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
569 pub skip_arrow_metadata: bool, default = false
570
571 /// (writing) Sets default parquet compression codec.
572 /// Valid values are: uncompressed, snappy, gzip(level),
573 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
574 /// These values are not case sensitive. If NULL, uses
575 /// default parquet writer setting
576 ///
577 /// Note that this default setting is not the same as
578 /// the default parquet writer setting.
579 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
580
581 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
582 /// default parquet writer setting
583 pub dictionary_enabled: Option<bool>, default = Some(true)
584
585 /// (writing) Sets best effort maximum dictionary page size, in bytes
586 pub dictionary_page_size_limit: usize, default = 1024 * 1024
587
588 /// (writing) Sets if statistics are enabled for any column
589 /// Valid values are: "none", "chunk", and "page"
590 /// These values are not case sensitive. If NULL, uses
591 /// default parquet writer setting
592 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
593
594 /// (writing) Sets max statistics size for any column. If NULL, uses
595 /// default parquet writer setting
596 /// max_statistics_size is deprecated, currently it is not being used
597 // TODO: remove once deprecated
598 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
599 pub max_statistics_size: Option<usize>, default = Some(4096)
600
601 /// (writing) Target maximum number of rows in each row group (defaults to 1M
602 /// rows). Writing larger row groups requires more memory to write, but
603 /// can get better compression and be faster to read.
604 pub max_row_group_size: usize, default = 1024 * 1024
605
606 /// (writing) Sets "created by" property
607 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
608
609 /// (writing) Sets column index truncate length
610 pub column_index_truncate_length: Option<usize>, default = Some(64)
611
612 /// (writing) Sets statictics truncate length. If NULL, uses
613 /// default parquet writer setting
614 pub statistics_truncate_length: Option<usize>, default = None
615
616 /// (writing) Sets best effort maximum number of rows in data page
617 pub data_page_row_count_limit: usize, default = 20_000
618
619 /// (writing) Sets default encoding for any column.
620 /// Valid values are: plain, plain_dictionary, rle,
621 /// bit_packed, delta_binary_packed, delta_length_byte_array,
622 /// delta_byte_array, rle_dictionary, and byte_stream_split.
623 /// These values are not case sensitive. If NULL, uses
624 /// default parquet writer setting
625 pub encoding: Option<String>, transform = str::to_lowercase, default = None
626
627 /// (writing) Write bloom filters for all columns when creating parquet files
628 pub bloom_filter_on_write: bool, default = false
629
630 /// (writing) Sets bloom filter false positive probability. If NULL, uses
631 /// default parquet writer setting
632 pub bloom_filter_fpp: Option<f64>, default = None
633
634 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
635 /// default parquet writer setting
636 pub bloom_filter_ndv: Option<u64>, default = None
637
638 /// (writing) Controls whether DataFusion will attempt to speed up writing
639 /// parquet files by serializing them in parallel. Each column
640 /// in each row group in each output file are serialized in parallel
641 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
642 pub allow_single_file_parallelism: bool, default = true
643
644 /// (writing) By default parallel parquet writer is tuned for minimum
645 /// memory usage in a streaming execution plan. You may see
646 /// a performance benefit when writing large parquet files
647 /// by increasing maximum_parallel_row_group_writers and
648 /// maximum_buffered_record_batches_per_stream if your system
649 /// has idle cores and can tolerate additional memory usage.
650 /// Boosting these values is likely worthwhile when
651 /// writing out already in-memory data, such as from a cached
652 /// data frame.
653 pub maximum_parallel_row_group_writers: usize, default = 1
654
655 /// (writing) By default parallel parquet writer is tuned for minimum
656 /// memory usage in a streaming execution plan. You may see
657 /// a performance benefit when writing large parquet files
658 /// by increasing maximum_parallel_row_group_writers and
659 /// maximum_buffered_record_batches_per_stream if your system
660 /// has idle cores and can tolerate additional memory usage.
661 /// Boosting these values is likely worthwhile when
662 /// writing out already in-memory data, such as from a cached
663 /// data frame.
664 pub maximum_buffered_record_batches_per_stream: usize, default = 2
665 }
666}
667
668config_namespace! {
669 /// Options for configuring Parquet Modular Encryption
670 pub struct ParquetEncryptionOptions {
671 /// Optional file decryption properties
672 pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
673
674 /// Optional file encryption properties
675 pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
676 }
677}
678
679config_namespace! {
680 /// Options related to query optimization
681 ///
682 /// See also: [`SessionConfig`]
683 ///
684 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
685 pub struct OptimizerOptions {
686 /// When set to true, the optimizer will push a limit operation into
687 /// grouped aggregations which have no aggregate expressions, as a soft limit,
688 /// emitting groups once the limit is reached, before all rows in the group are read.
689 pub enable_distinct_aggregation_soft_limit: bool, default = true
690
691 /// When set to true, the physical plan optimizer will try to add round robin
692 /// repartitioning to increase parallelism to leverage more CPU cores
693 pub enable_round_robin_repartition: bool, default = true
694
695 /// When set to true, the optimizer will attempt to perform limit operations
696 /// during aggregations, if possible
697 pub enable_topk_aggregation: bool, default = true
698
699 /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
700 /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
701 /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
702 /// This means that if we already have 10 timestamps in the year 2025
703 /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
704 pub enable_dynamic_filter_pushdown: bool, default = true
705
706 /// When set to true, the optimizer will insert filters before a join between
707 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
708 /// filter can add additional overhead when the file format does not fully support
709 /// predicate push down.
710 pub filter_null_join_keys: bool, default = false
711
712 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
713 /// in parallel using the provided `target_partitions` level
714 pub repartition_aggregations: bool, default = true
715
716 /// Minimum total files size in bytes to perform file scan repartitioning.
717 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
718
719 /// Should DataFusion repartition data using the join keys to execute joins in parallel
720 /// using the provided `target_partitions` level
721 pub repartition_joins: bool, default = true
722
723 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
724 /// its inputs do not have any ordering or filtering If the flag is not enabled,
725 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
726 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
727 /// RightAnti, and RightSemi - being produced only at the end of the execution.
728 /// This is not typical in stream processing. Additionally, without proper design for
729 /// long runner execution, all types of joins may encounter out-of-memory errors.
730 pub allow_symmetric_joins_without_pruning: bool, default = true
731
732 /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
733 /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
734 ///
735 /// For FileSources, only Parquet and CSV formats are currently supported.
736 ///
737 /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
738 /// might be partitioned into smaller chunks) for parallel scanning.
739 /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
740 /// happen within a single file.
741 ///
742 /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
743 /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
744 /// the total number of partitions and batches per partition, but does not slice the initial
745 /// record tables provided to the MemTable on creation.
746 pub repartition_file_scans: bool, default = true
747
748 /// Should DataFusion repartition data using the partitions keys to execute window
749 /// functions in parallel using the provided `target_partitions` level
750 pub repartition_windows: bool, default = true
751
752 /// Should DataFusion execute sorts in a per-partition fashion and merge
753 /// afterwards instead of coalescing first and sorting globally.
754 /// With this flag is enabled, plans in the form below
755 ///
756 /// ```text
757 /// "SortExec: [a@0 ASC]",
758 /// " CoalescePartitionsExec",
759 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
760 /// ```
761 /// would turn into the plan below which performs better in multithreaded environments
762 ///
763 /// ```text
764 /// "SortPreservingMergeExec: [a@0 ASC]",
765 /// " SortExec: [a@0 ASC]",
766 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
767 /// ```
768 pub repartition_sorts: bool, default = true
769
770 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
771 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
772 /// using `SortPreservingMergeExec`)
773 ///
774 /// When false, DataFusion will maximize plan parallelism using
775 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
776 pub prefer_existing_sort: bool, default = false
777
778 /// When set to true, the logical plan optimizer will produce warning
779 /// messages if any optimization rules produce errors and then proceed to the next
780 /// rule. When set to false, any rules that produce errors will cause the query to fail
781 pub skip_failed_rules: bool, default = false
782
783 /// Number of times that the optimizer will attempt to optimize the plan
784 pub max_passes: usize, default = 3
785
786 /// When set to true, the physical plan optimizer will run a top down
787 /// process to reorder the join keys
788 pub top_down_join_key_reordering: bool, default = true
789
790 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
791 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
792 pub prefer_hash_join: bool, default = true
793
794 /// The maximum estimated size in bytes for one input side of a HashJoin
795 /// will be collected into a single partition
796 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
797
798 /// The maximum estimated size in rows for one input side of a HashJoin
799 /// will be collected into a single partition
800 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
801
802 /// The default filter selectivity used by Filter Statistics
803 /// when an exact selectivity cannot be determined. Valid values are
804 /// between 0 (no selectivity) and 100 (all rows are selected).
805 pub default_filter_selectivity: u8, default = 20
806
807 /// When set to true, the optimizer will not attempt to convert Union to Interleave
808 pub prefer_existing_union: bool, default = false
809
810 /// When set to true, if the returned type is a view type
811 /// then the output will be coerced to a non-view.
812 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
813 pub expand_views_at_output: bool, default = false
814 }
815}
816
817config_namespace! {
818 /// Options controlling explain output
819 ///
820 /// See also: [`SessionConfig`]
821 ///
822 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
823 pub struct ExplainOptions {
824 /// When set to true, the explain statement will only print logical plans
825 pub logical_plan_only: bool, default = false
826
827 /// When set to true, the explain statement will only print physical plans
828 pub physical_plan_only: bool, default = false
829
830 /// When set to true, the explain statement will print operator statistics
831 /// for physical plans
832 pub show_statistics: bool, default = false
833
834 /// When set to true, the explain statement will print the partition sizes
835 pub show_sizes: bool, default = true
836
837 /// When set to true, the explain statement will print schema information
838 pub show_schema: bool, default = false
839
840 /// Display format of explain. Default is "indent".
841 /// When set to "tree", it will print the plan in a tree-rendered format.
842 pub format: String, default = "indent".to_string()
843 }
844}
845
846impl ExecutionOptions {
847 /// Returns the correct parallelism based on the provided `value`.
848 /// If `value` is `"0"`, returns the default available parallelism, computed with
849 /// `get_available_parallelism`. Otherwise, returns `value`.
850 fn normalized_parallelism(value: &str) -> String {
851 if value.parse::<usize>() == Ok(0) {
852 get_available_parallelism().to_string()
853 } else {
854 value.to_owned()
855 }
856 }
857}
858
859config_namespace! {
860 /// Options controlling the format of output when printing record batches
861 /// Copies [`arrow::util::display::FormatOptions`]
862 pub struct FormatOptions {
863 /// If set to `true` any formatting errors will be written to the output
864 /// instead of being converted into a [`std::fmt::Error`]
865 pub safe: bool, default = true
866 /// Format string for nulls
867 pub null: String, default = "".into()
868 /// Date format for date arrays
869 pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
870 /// Format for DateTime arrays
871 pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
872 /// Timestamp format for timestamp arrays
873 pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
874 /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
875 pub timestamp_tz_format: Option<String>, default = None
876 /// Time format for time arrays
877 pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
878 /// Duration format. Can be either `"pretty"` or `"ISO8601"`
879 pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
880 /// Show types in visual representation batches
881 pub types_info: bool, default = false
882 }
883}
884
885impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
886 type Error = DataFusionError;
887 fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
888 let duration_format = match self.duration_format.as_str() {
889 "pretty" => arrow::util::display::DurationFormat::Pretty,
890 "iso8601" => arrow::util::display::DurationFormat::ISO8601,
891 _ => {
892 return _config_err!(
893 "Invalid duration format: {}. Valid values are pretty or iso8601",
894 self.duration_format
895 )
896 }
897 };
898
899 Ok(arrow::util::display::FormatOptions::new()
900 .with_display_error(self.safe)
901 .with_null(&self.null)
902 .with_date_format(self.date_format.as_deref())
903 .with_datetime_format(self.datetime_format.as_deref())
904 .with_timestamp_format(self.timestamp_format.as_deref())
905 .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
906 .with_time_format(self.time_format.as_deref())
907 .with_duration_format(duration_format)
908 .with_types_info(self.types_info))
909 }
910}
911
912/// A key value pair, with a corresponding description
913#[derive(Debug)]
914pub struct ConfigEntry {
915 /// A unique string to identify this config value
916 pub key: String,
917
918 /// The value if any
919 pub value: Option<String>,
920
921 /// A description of this configuration entry
922 pub description: &'static str,
923}
924
925/// Configuration options struct, able to store both built-in configuration and custom options
926#[derive(Debug, Clone, Default)]
927#[non_exhaustive]
928pub struct ConfigOptions {
929 /// Catalog options
930 pub catalog: CatalogOptions,
931 /// Execution options
932 pub execution: ExecutionOptions,
933 /// Optimizer options
934 pub optimizer: OptimizerOptions,
935 /// SQL parser options
936 pub sql_parser: SqlParserOptions,
937 /// Explain options
938 pub explain: ExplainOptions,
939 /// Optional extensions registered using [`Extensions::insert`]
940 pub extensions: Extensions,
941 /// Formatting options when printing batches
942 pub format: FormatOptions,
943}
944
945impl ConfigField for ConfigOptions {
946 fn set(&mut self, key: &str, value: &str) -> Result<()> {
947 // Extensions are handled in the public `ConfigOptions::set`
948 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
949 match key {
950 "catalog" => self.catalog.set(rem, value),
951 "execution" => self.execution.set(rem, value),
952 "optimizer" => self.optimizer.set(rem, value),
953 "explain" => self.explain.set(rem, value),
954 "sql_parser" => self.sql_parser.set(rem, value),
955 "format" => self.format.set(rem, value),
956 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
957 }
958 }
959
960 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
961 self.catalog.visit(v, "datafusion.catalog", "");
962 self.execution.visit(v, "datafusion.execution", "");
963 self.optimizer.visit(v, "datafusion.optimizer", "");
964 self.explain.visit(v, "datafusion.explain", "");
965 self.sql_parser.visit(v, "datafusion.sql_parser", "");
966 self.format.visit(v, "datafusion.format", "");
967 }
968}
969
970impl ConfigOptions {
971 /// Creates a new [`ConfigOptions`] with default values
972 pub fn new() -> Self {
973 Self::default()
974 }
975
976 /// Set extensions to provided value
977 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
978 self.extensions = extensions;
979 self
980 }
981
982 /// Set a configuration option
983 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
984 let Some((prefix, key)) = key.split_once('.') else {
985 return _config_err!("could not find config namespace for key \"{key}\"");
986 };
987
988 if prefix == "datafusion" {
989 return ConfigField::set(self, key, value);
990 }
991
992 let Some(e) = self.extensions.0.get_mut(prefix) else {
993 return _config_err!("Could not find config namespace \"{prefix}\"");
994 };
995 e.0.set(key, value)
996 }
997
998 /// Create new ConfigOptions struct, taking values from
999 /// environment variables where possible.
1000 ///
1001 /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
1002 /// control `datafusion.execution.batch_size`.
1003 pub fn from_env() -> Result<Self> {
1004 struct Visitor(Vec<String>);
1005
1006 impl Visit for Visitor {
1007 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1008 self.0.push(key.to_string())
1009 }
1010
1011 fn none(&mut self, key: &str, _: &'static str) {
1012 self.0.push(key.to_string())
1013 }
1014 }
1015
1016 // Extract the names of all fields and then look up the corresponding
1017 // environment variables. This isn't hugely efficient but avoids
1018 // ambiguity between `a.b` and `a_b` which would both correspond
1019 // to an environment variable of `A_B`
1020
1021 let mut keys = Visitor(vec![]);
1022 let mut ret = Self::default();
1023 ret.visit(&mut keys, "datafusion", "");
1024
1025 for key in keys.0 {
1026 let env = key.to_uppercase().replace('.', "_");
1027 if let Some(var) = std::env::var_os(env) {
1028 let value = var.to_string_lossy();
1029 log::info!("Set {key} to {value} from the environment variable");
1030 ret.set(&key, value.as_ref())?;
1031 }
1032 }
1033
1034 Ok(ret)
1035 }
1036
1037 /// Create new ConfigOptions struct, taking values from a string hash map.
1038 ///
1039 /// Only the built-in configurations will be extracted from the hash map
1040 /// and other key value pairs will be ignored.
1041 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1042 struct Visitor(Vec<String>);
1043
1044 impl Visit for Visitor {
1045 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1046 self.0.push(key.to_string())
1047 }
1048
1049 fn none(&mut self, key: &str, _: &'static str) {
1050 self.0.push(key.to_string())
1051 }
1052 }
1053
1054 let mut keys = Visitor(vec![]);
1055 let mut ret = Self::default();
1056 ret.visit(&mut keys, "datafusion", "");
1057
1058 for key in keys.0 {
1059 if let Some(var) = settings.get(&key) {
1060 ret.set(&key, var)?;
1061 }
1062 }
1063
1064 Ok(ret)
1065 }
1066
1067 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1068 pub fn entries(&self) -> Vec<ConfigEntry> {
1069 struct Visitor(Vec<ConfigEntry>);
1070
1071 impl Visit for Visitor {
1072 fn some<V: Display>(
1073 &mut self,
1074 key: &str,
1075 value: V,
1076 description: &'static str,
1077 ) {
1078 self.0.push(ConfigEntry {
1079 key: key.to_string(),
1080 value: Some(value.to_string()),
1081 description,
1082 })
1083 }
1084
1085 fn none(&mut self, key: &str, description: &'static str) {
1086 self.0.push(ConfigEntry {
1087 key: key.to_string(),
1088 value: None,
1089 description,
1090 })
1091 }
1092 }
1093
1094 let mut v = Visitor(vec![]);
1095 self.visit(&mut v, "datafusion", "");
1096
1097 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1098 v.0
1099 }
1100
1101 /// Generate documentation that can be included in the user guide
1102 pub fn generate_config_markdown() -> String {
1103 use std::fmt::Write as _;
1104
1105 let mut s = Self::default();
1106
1107 // Normalize for display
1108 s.execution.target_partitions = 0;
1109 s.execution.planning_concurrency = 0;
1110
1111 let mut docs = "| key | default | description |\n".to_string();
1112 docs += "|-----|---------|-------------|\n";
1113 let mut entries = s.entries();
1114 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1115
1116 for entry in s.entries() {
1117 let _ = writeln!(
1118 &mut docs,
1119 "| {} | {} | {} |",
1120 entry.key,
1121 entry.value.as_deref().unwrap_or("NULL"),
1122 entry.description
1123 );
1124 }
1125 docs
1126 }
1127}
1128
1129/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1130/// within DataFusion [`ConfigOptions`]
1131///
1132/// This mechanism can be used to pass configuration to user defined functions
1133/// or optimizer passes
1134///
1135/// # Example
1136/// ```
1137/// use datafusion_common::{
1138/// config::ConfigExtension, extensions_options,
1139/// config::ConfigOptions,
1140/// };
1141/// // Define a new configuration struct using the `extensions_options` macro
1142/// extensions_options! {
1143/// /// My own config options.
1144/// pub struct MyConfig {
1145/// /// Should "foo" be replaced by "bar"?
1146/// pub foo_to_bar: bool, default = true
1147///
1148/// /// How many "baz" should be created?
1149/// pub baz_count: usize, default = 1337
1150/// }
1151/// }
1152///
1153/// impl ConfigExtension for MyConfig {
1154/// const PREFIX: &'static str = "my_config";
1155/// }
1156///
1157/// // set up config struct and register extension
1158/// let mut config = ConfigOptions::default();
1159/// config.extensions.insert(MyConfig::default());
1160///
1161/// // overwrite config default
1162/// config.set("my_config.baz_count", "42").unwrap();
1163///
1164/// // check config state
1165/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1166/// assert!(my_config.foo_to_bar,);
1167/// assert_eq!(my_config.baz_count, 42,);
1168/// ```
1169///
1170/// # Note:
1171/// Unfortunately associated constants are not currently object-safe, and so this
1172/// extends the object-safe [`ExtensionOptions`]
1173pub trait ConfigExtension: ExtensionOptions {
1174 /// Configuration namespace prefix to use
1175 ///
1176 /// All values under this will be prefixed with `$PREFIX + "."`
1177 const PREFIX: &'static str;
1178}
1179
1180/// An object-safe API for storing arbitrary configuration.
1181///
1182/// See [`ConfigExtension`] for user defined configuration
1183pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1184 /// Return `self` as [`Any`]
1185 ///
1186 /// This is needed until trait upcasting is stabilized
1187 fn as_any(&self) -> &dyn Any;
1188
1189 /// Return `self` as [`Any`]
1190 ///
1191 /// This is needed until trait upcasting is stabilized
1192 fn as_any_mut(&mut self) -> &mut dyn Any;
1193
1194 /// Return a deep clone of this [`ExtensionOptions`]
1195 ///
1196 /// It is important this does not share mutable state to avoid consistency issues
1197 /// with configuration changing whilst queries are executing
1198 fn cloned(&self) -> Box<dyn ExtensionOptions>;
1199
1200 /// Set the given `key`, `value` pair
1201 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1202
1203 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1204 fn entries(&self) -> Vec<ConfigEntry>;
1205}
1206
1207/// A type-safe container for [`ConfigExtension`]
1208#[derive(Debug, Default, Clone)]
1209pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1210
1211impl Extensions {
1212 /// Create a new, empty [`Extensions`]
1213 pub fn new() -> Self {
1214 Self(BTreeMap::new())
1215 }
1216
1217 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1218 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1219 assert_ne!(T::PREFIX, "datafusion");
1220 let e = ExtensionBox(Box::new(extension));
1221 self.0.insert(T::PREFIX, e);
1222 }
1223
1224 /// Retrieves the extension of the given type if any
1225 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1226 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1227 }
1228
1229 /// Retrieves the extension of the given type if any
1230 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1231 let e = self.0.get_mut(T::PREFIX)?;
1232 e.0.as_any_mut().downcast_mut()
1233 }
1234}
1235
1236#[derive(Debug)]
1237struct ExtensionBox(Box<dyn ExtensionOptions>);
1238
1239impl Clone for ExtensionBox {
1240 fn clone(&self) -> Self {
1241 Self(self.0.cloned())
1242 }
1243}
1244
1245/// A trait implemented by `config_namespace` and for field types that provides
1246/// the ability to walk and mutate the configuration tree
1247pub trait ConfigField {
1248 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1249
1250 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1251}
1252
1253impl<F: ConfigField + Default> ConfigField for Option<F> {
1254 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1255 match self {
1256 Some(s) => s.visit(v, key, description),
1257 None => v.none(key, description),
1258 }
1259 }
1260
1261 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1262 self.get_or_insert_with(Default::default).set(key, value)
1263 }
1264}
1265
1266/// Default transformation to parse a [`ConfigField`] for a string.
1267///
1268/// This uses [`FromStr`] to parse the data.
1269pub fn default_config_transform<T>(input: &str) -> Result<T>
1270where
1271 T: FromStr,
1272 <T as FromStr>::Err: Sync + Send + Error + 'static,
1273{
1274 input.parse().map_err(|e| {
1275 DataFusionError::Context(
1276 format!(
1277 "Error parsing '{}' as {}",
1278 input,
1279 std::any::type_name::<T>()
1280 ),
1281 Box::new(DataFusionError::External(Box::new(e))),
1282 )
1283 })
1284}
1285
1286/// Macro that generates [`ConfigField`] for a given type.
1287///
1288/// # Usage
1289/// This always requires [`Display`] to be implemented for the given type.
1290///
1291/// There are two ways to invoke this macro. The first one uses
1292/// [`default_config_transform`]/[`FromStr`] to parse the data:
1293///
1294/// ```ignore
1295/// config_field(MyType);
1296/// ```
1297///
1298/// Note that the parsing error MUST implement [`std::error::Error`]!
1299///
1300/// Or you can specify how you want to parse an [`str`] into the type:
1301///
1302/// ```ignore
1303/// fn parse_it(s: &str) -> Result<MyType> {
1304/// ...
1305/// }
1306///
1307/// config_field(
1308/// MyType,
1309/// value => parse_it(value)
1310/// );
1311/// ```
1312#[macro_export]
1313macro_rules! config_field {
1314 ($t:ty) => {
1315 config_field!($t, value => $crate::config::default_config_transform(value)?);
1316 };
1317
1318 ($t:ty, $arg:ident => $transform:expr) => {
1319 impl $crate::config::ConfigField for $t {
1320 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1321 v.some(key, self, description)
1322 }
1323
1324 fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1325 *self = $transform;
1326 Ok(())
1327 }
1328 }
1329 };
1330}
1331
1332config_field!(String);
1333config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1334config_field!(usize);
1335config_field!(f64);
1336config_field!(u64);
1337
1338impl ConfigField for u8 {
1339 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1340 v.some(key, self, description)
1341 }
1342
1343 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1344 if value.is_empty() {
1345 return Err(DataFusionError::Configuration(format!(
1346 "Input string for {key} key is empty"
1347 )));
1348 }
1349 // Check if the string is a valid number
1350 if let Ok(num) = value.parse::<u8>() {
1351 // TODO: Let's decide how we treat the numerical strings.
1352 *self = num;
1353 } else {
1354 let bytes = value.as_bytes();
1355 // Check if the first character is ASCII (single byte)
1356 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1357 return Err(DataFusionError::Configuration(format!(
1358 "Error parsing {value} as u8. Non-ASCII string provided"
1359 )));
1360 }
1361 *self = bytes[0];
1362 }
1363 Ok(())
1364 }
1365}
1366
1367impl ConfigField for CompressionTypeVariant {
1368 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1369 v.some(key, self, description)
1370 }
1371
1372 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1373 *self = CompressionTypeVariant::from_str(value)?;
1374 Ok(())
1375 }
1376}
1377
1378/// An implementation trait used to recursively walk configuration
1379pub trait Visit {
1380 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1381
1382 fn none(&mut self, key: &str, description: &'static str);
1383}
1384
1385/// Convenience macro to create [`ExtensionsOptions`].
1386///
1387/// The created structure implements the following traits:
1388///
1389/// - [`Clone`]
1390/// - [`Debug`]
1391/// - [`Default`]
1392/// - [`ExtensionOptions`]
1393///
1394/// # Usage
1395/// The syntax is:
1396///
1397/// ```text
1398/// extensions_options! {
1399/// /// Struct docs (optional).
1400/// [<vis>] struct <StructName> {
1401/// /// Field docs (optional)
1402/// [<vis>] <field_name>: <field_type>, default = <default_value>
1403///
1404/// ... more fields
1405/// }
1406/// }
1407/// ```
1408///
1409/// The placeholders are:
1410/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1411/// - `<StructName>`: Struct name like `MyStruct`.
1412/// - `<field_name>`: Field name like `my_field`.
1413/// - `<field_type>`: Field type like `u8`.
1414/// - `<default_value>`: Default value matching the field type like `42`.
1415///
1416/// # Example
1417/// See also a full example on the [`ConfigExtension`] documentation
1418///
1419/// ```
1420/// use datafusion_common::extensions_options;
1421///
1422/// extensions_options! {
1423/// /// My own config options.
1424/// pub struct MyConfig {
1425/// /// Should "foo" be replaced by "bar"?
1426/// pub foo_to_bar: bool, default = true
1427///
1428/// /// How many "baz" should be created?
1429/// pub baz_count: usize, default = 1337
1430/// }
1431/// }
1432/// ```
1433///
1434///
1435/// [`Debug`]: std::fmt::Debug
1436/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1437#[macro_export]
1438macro_rules! extensions_options {
1439 (
1440 $(#[doc = $struct_d:tt])*
1441 $vis:vis struct $struct_name:ident {
1442 $(
1443 $(#[doc = $d:tt])*
1444 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1445 )*$(,)*
1446 }
1447 ) => {
1448 $(#[doc = $struct_d])*
1449 #[derive(Debug, Clone)]
1450 #[non_exhaustive]
1451 $vis struct $struct_name{
1452 $(
1453 $(#[doc = $d])*
1454 $field_vis $field_name : $field_type,
1455 )*
1456 }
1457
1458 impl Default for $struct_name {
1459 fn default() -> Self {
1460 Self {
1461 $($field_name: $default),*
1462 }
1463 }
1464 }
1465
1466 impl $crate::config::ExtensionOptions for $struct_name {
1467 fn as_any(&self) -> &dyn ::std::any::Any {
1468 self
1469 }
1470
1471 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1472 self
1473 }
1474
1475 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1476 Box::new(self.clone())
1477 }
1478
1479 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1480 $crate::config::ConfigField::set(self, key, value)
1481 }
1482
1483 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1484 struct Visitor(Vec<$crate::config::ConfigEntry>);
1485
1486 impl $crate::config::Visit for Visitor {
1487 fn some<V: std::fmt::Display>(
1488 &mut self,
1489 key: &str,
1490 value: V,
1491 description: &'static str,
1492 ) {
1493 self.0.push($crate::config::ConfigEntry {
1494 key: key.to_string(),
1495 value: Some(value.to_string()),
1496 description,
1497 })
1498 }
1499
1500 fn none(&mut self, key: &str, description: &'static str) {
1501 self.0.push($crate::config::ConfigEntry {
1502 key: key.to_string(),
1503 value: None,
1504 description,
1505 })
1506 }
1507 }
1508
1509 let mut v = Visitor(vec![]);
1510 // The prefix is not used for extensions.
1511 // The description is generated in ConfigField::visit.
1512 // We can just pass empty strings here.
1513 $crate::config::ConfigField::visit(self, &mut v, "", "");
1514 v.0
1515 }
1516 }
1517
1518 impl $crate::config::ConfigField for $struct_name {
1519 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1520 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1521 match key {
1522 $(
1523 stringify!($field_name) => {
1524 // Safely apply deprecated attribute if present
1525 // $(#[allow(deprecated)])?
1526 {
1527 #[allow(deprecated)]
1528 self.$field_name.set(rem, value.as_ref())
1529 }
1530 },
1531 )*
1532 _ => return $crate::error::_config_err!(
1533 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1534 )
1535 }
1536 }
1537
1538 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1539 $(
1540 let key = stringify!($field_name).to_string();
1541 let desc = concat!($($d),*).trim();
1542 #[allow(deprecated)]
1543 self.$field_name.visit(v, key.as_str(), desc);
1544 )*
1545 }
1546 }
1547 }
1548}
1549
1550/// These file types have special built in behavior for configuration.
1551/// Use TableOptions::Extensions for configuring other file types.
1552#[derive(Debug, Clone)]
1553pub enum ConfigFileType {
1554 CSV,
1555 #[cfg(feature = "parquet")]
1556 PARQUET,
1557 JSON,
1558}
1559
1560/// Represents the configuration options available for handling different table formats within a data processing application.
1561/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1562/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1563#[derive(Debug, Clone, Default)]
1564pub struct TableOptions {
1565 /// Configuration options for CSV file handling. This includes settings like the delimiter,
1566 /// quote character, and whether the first row is considered as headers.
1567 pub csv: CsvOptions,
1568
1569 /// Configuration options for Parquet file handling. This includes settings for compression,
1570 /// encoding, and other Parquet-specific file characteristics.
1571 pub parquet: TableParquetOptions,
1572
1573 /// Configuration options for JSON file handling.
1574 pub json: JsonOptions,
1575
1576 /// The current file format that the table operations should assume. This option allows
1577 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1578 pub current_format: Option<ConfigFileType>,
1579
1580 /// Optional extensions that can be used to extend or customize the behavior of the table
1581 /// options. Extensions can be registered using `Extensions::insert` and might include
1582 /// custom file handling logic, additional configuration parameters, or other enhancements.
1583 pub extensions: Extensions,
1584}
1585
1586impl ConfigField for TableOptions {
1587 /// Visits configuration settings for the current file format, or all formats if none is selected.
1588 ///
1589 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1590 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1591 /// it visits all available format settings.
1592 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1593 if let Some(file_type) = &self.current_format {
1594 match file_type {
1595 #[cfg(feature = "parquet")]
1596 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1597 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1598 ConfigFileType::JSON => self.json.visit(v, "format", ""),
1599 }
1600 } else {
1601 self.csv.visit(v, "csv", "");
1602 self.parquet.visit(v, "parquet", "");
1603 self.json.visit(v, "json", "");
1604 }
1605 }
1606
1607 /// Sets a configuration value for a specific key within `TableOptions`.
1608 ///
1609 /// This method delegates setting configuration values to the specific file format configurations,
1610 /// based on the current format selected. If no format is selected, it returns an error.
1611 ///
1612 /// # Parameters
1613 ///
1614 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1615 /// for CSV format.
1616 /// * `value`: The value to set for the specified configuration key.
1617 ///
1618 /// # Returns
1619 ///
1620 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1621 /// or if setting the configuration value fails for the specific format.
1622 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1623 // Extensions are handled in the public `ConfigOptions::set`
1624 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1625 match key {
1626 "format" => {
1627 let Some(format) = &self.current_format else {
1628 return _config_err!("Specify a format for TableOptions");
1629 };
1630 match format {
1631 #[cfg(feature = "parquet")]
1632 ConfigFileType::PARQUET => self.parquet.set(rem, value),
1633 ConfigFileType::CSV => self.csv.set(rem, value),
1634 ConfigFileType::JSON => self.json.set(rem, value),
1635 }
1636 }
1637 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1638 }
1639 }
1640}
1641
1642impl TableOptions {
1643 /// Constructs a new instance of `TableOptions` with default settings.
1644 ///
1645 /// # Returns
1646 ///
1647 /// A new `TableOptions` instance with default configuration values.
1648 pub fn new() -> Self {
1649 Self::default()
1650 }
1651
1652 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1653 ///
1654 /// # Parameters
1655 ///
1656 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1657 ///
1658 /// # Returns
1659 ///
1660 /// A new `TableOptions` instance with settings applied from the session config.
1661 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1662 let initial = TableOptions::default();
1663 initial.combine_with_session_config(config)
1664 }
1665
1666 /// Updates the current `TableOptions` with settings from a given session config.
1667 ///
1668 /// # Parameters
1669 ///
1670 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1671 ///
1672 /// # Returns
1673 ///
1674 /// A new `TableOptions` instance with updated settings from the session config.
1675 #[must_use = "this method returns a new instance"]
1676 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1677 let mut clone = self.clone();
1678 clone.parquet.global = config.execution.parquet.clone();
1679 clone
1680 }
1681
1682 /// Sets the file format for the table.
1683 ///
1684 /// # Parameters
1685 ///
1686 /// * `format`: The file format to use (e.g., CSV, Parquet).
1687 pub fn set_config_format(&mut self, format: ConfigFileType) {
1688 self.current_format = Some(format);
1689 }
1690
1691 /// Sets the extensions for this `TableOptions` instance.
1692 ///
1693 /// # Parameters
1694 ///
1695 /// * `extensions`: The `Extensions` instance to set.
1696 ///
1697 /// # Returns
1698 ///
1699 /// A new `TableOptions` instance with the specified extensions applied.
1700 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1701 self.extensions = extensions;
1702 self
1703 }
1704
1705 /// Sets a specific configuration option.
1706 ///
1707 /// # Parameters
1708 ///
1709 /// * `key`: The configuration key (e.g., "format.delimiter").
1710 /// * `value`: The value to set for the specified key.
1711 ///
1712 /// # Returns
1713 ///
1714 /// A result indicating success or failure in setting the configuration option.
1715 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1716 let Some((prefix, _)) = key.split_once('.') else {
1717 return _config_err!("could not find config namespace for key \"{key}\"");
1718 };
1719
1720 if prefix == "format" {
1721 return ConfigField::set(self, key, value);
1722 }
1723
1724 if prefix == "execution" {
1725 return Ok(());
1726 }
1727
1728 let Some(e) = self.extensions.0.get_mut(prefix) else {
1729 return _config_err!("Could not find config namespace \"{prefix}\"");
1730 };
1731 e.0.set(key, value)
1732 }
1733
1734 /// Initializes a new `TableOptions` from a hash map of string settings.
1735 ///
1736 /// # Parameters
1737 ///
1738 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1739 ///
1740 /// # Returns
1741 ///
1742 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1743 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1744 let mut ret = Self::default();
1745 for (k, v) in settings {
1746 ret.set(k, v)?;
1747 }
1748
1749 Ok(ret)
1750 }
1751
1752 /// Modifies the current `TableOptions` instance with settings from a hash map.
1753 ///
1754 /// # Parameters
1755 ///
1756 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1757 ///
1758 /// # Returns
1759 ///
1760 /// A result indicating success or failure in applying the settings.
1761 pub fn alter_with_string_hash_map(
1762 &mut self,
1763 settings: &HashMap<String, String>,
1764 ) -> Result<()> {
1765 for (k, v) in settings {
1766 self.set(k, v)?;
1767 }
1768 Ok(())
1769 }
1770
1771 /// Retrieves all configuration entries from this `TableOptions`.
1772 ///
1773 /// # Returns
1774 ///
1775 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1776 pub fn entries(&self) -> Vec<ConfigEntry> {
1777 struct Visitor(Vec<ConfigEntry>);
1778
1779 impl Visit for Visitor {
1780 fn some<V: Display>(
1781 &mut self,
1782 key: &str,
1783 value: V,
1784 description: &'static str,
1785 ) {
1786 self.0.push(ConfigEntry {
1787 key: key.to_string(),
1788 value: Some(value.to_string()),
1789 description,
1790 })
1791 }
1792
1793 fn none(&mut self, key: &str, description: &'static str) {
1794 self.0.push(ConfigEntry {
1795 key: key.to_string(),
1796 value: None,
1797 description,
1798 })
1799 }
1800 }
1801
1802 let mut v = Visitor(vec![]);
1803 self.visit(&mut v, "format", "");
1804
1805 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1806 v.0
1807 }
1808}
1809
1810/// Options that control how Parquet files are read, including global options
1811/// that apply to all columns and optional column-specific overrides
1812///
1813/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1814/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1815/// (e.g. sorting_columns).
1816#[derive(Clone, Default, Debug, PartialEq)]
1817pub struct TableParquetOptions {
1818 /// Global Parquet options that propagates to all columns.
1819 pub global: ParquetOptions,
1820 /// Column specific options. Default usage is parquet.XX::column.
1821 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1822 /// Additional file-level metadata to include. Inserted into the key_value_metadata
1823 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1824 ///
1825 /// Multiple entries are permitted
1826 /// ```sql
1827 /// OPTIONS (
1828 /// 'format.metadata::key1' '',
1829 /// 'format.metadata::key2' 'value',
1830 /// 'format.metadata::key3' 'value has spaces',
1831 /// 'format.metadata::key4' 'value has special chars :: :',
1832 /// 'format.metadata::key_dupe' 'original will be overwritten',
1833 /// 'format.metadata::key_dupe' 'final'
1834 /// )
1835 /// ```
1836 pub key_value_metadata: HashMap<String, Option<String>>,
1837 /// Options for configuring Parquet modular encryption
1838 /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
1839 /// These can be set via 'format.crypto', for example:
1840 /// ```sql
1841 /// OPTIONS (
1842 /// 'format.crypto.file_encryption.encrypt_footer' 'true',
1843 /// 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" */
1844 /// 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1845 /// 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1846 /// -- Same for decryption
1847 /// 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
1848 /// 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
1849 /// 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
1850 /// )
1851 /// ```
1852 /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
1853 /// Note that keys must be provided as in hex format since these are binary strings.
1854 pub crypto: ParquetEncryptionOptions,
1855}
1856
1857impl TableParquetOptions {
1858 /// Return new default TableParquetOptions
1859 pub fn new() -> Self {
1860 Self::default()
1861 }
1862
1863 /// Set whether the encoding of the arrow metadata should occur
1864 /// during the writing of parquet.
1865 ///
1866 /// Default is to encode the arrow schema in the file kv_metadata.
1867 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1868 Self {
1869 global: ParquetOptions {
1870 skip_arrow_metadata: skip,
1871 ..self.global
1872 },
1873 ..self
1874 }
1875 }
1876}
1877
1878impl ConfigField for TableParquetOptions {
1879 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1880 self.global.visit(v, key_prefix, description);
1881 self.column_specific_options
1882 .visit(v, key_prefix, description);
1883 self.crypto
1884 .visit(v, &format!("{key_prefix}.crypto"), description);
1885 }
1886
1887 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1888 // Determine if the key is a global, metadata, or column-specific setting
1889 if key.starts_with("metadata::") {
1890 let k = match key.split("::").collect::<Vec<_>>()[..] {
1891 [_meta] | [_meta, ""] => {
1892 return _config_err!(
1893 "Invalid metadata key provided, missing key in metadata::<key>"
1894 )
1895 }
1896 [_meta, k] => k.into(),
1897 _ => {
1898 return _config_err!(
1899 "Invalid metadata key provided, found too many '::' in \"{key}\""
1900 )
1901 }
1902 };
1903 self.key_value_metadata.insert(k, Some(value.into()));
1904 Ok(())
1905 } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
1906 self.crypto.set(crypto_feature, value)
1907 } else if key.contains("::") {
1908 self.column_specific_options.set(key, value)
1909 } else {
1910 self.global.set(key, value)
1911 }
1912 }
1913}
1914
1915macro_rules! config_namespace_with_hashmap {
1916 (
1917 $(#[doc = $struct_d:tt])*
1918 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
1919 $vis:vis struct $struct_name:ident {
1920 $(
1921 $(#[doc = $d:tt])*
1922 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1923 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1924 )*$(,)*
1925 }
1926 ) => {
1927
1928 $(#[doc = $struct_d])*
1929 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
1930 #[derive(Debug, Clone, PartialEq)]
1931 $vis struct $struct_name{
1932 $(
1933 $(#[doc = $d])*
1934 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1935 $field_vis $field_name : $field_type,
1936 )*
1937 }
1938
1939 impl ConfigField for $struct_name {
1940 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1941 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1942 match key {
1943 $(
1944 stringify!($field_name) => {
1945 // Handle deprecated fields
1946 #[allow(deprecated)] // Allow deprecated fields
1947 $(let value = $transform(value);)?
1948 self.$field_name.set(rem, value.as_ref())
1949 },
1950 )*
1951 _ => _config_err!(
1952 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1953 )
1954 }
1955 }
1956
1957 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1958 $(
1959 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1960 let desc = concat!($($d),*).trim();
1961 // Handle deprecated fields
1962 #[allow(deprecated)]
1963 self.$field_name.visit(v, key.as_str(), desc);
1964 )*
1965 }
1966 }
1967
1968 impl Default for $struct_name {
1969 fn default() -> Self {
1970 #[allow(deprecated)]
1971 Self {
1972 $($field_name: $default),*
1973 }
1974 }
1975 }
1976
1977 impl ConfigField for HashMap<String,$struct_name> {
1978 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1979 let parts: Vec<&str> = key.splitn(2, "::").collect();
1980 match parts.as_slice() {
1981 [inner_key, hashmap_key] => {
1982 // Get or create the struct for the specified key
1983 let inner_value = self
1984 .entry((*hashmap_key).to_owned())
1985 .or_insert_with($struct_name::default);
1986
1987 inner_value.set(inner_key, value)
1988 }
1989 _ => _config_err!("Unrecognized key '{key}'."),
1990 }
1991 }
1992
1993 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1994 for (column_name, col_options) in self {
1995 $(
1996 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1997 let desc = concat!($($d),*).trim();
1998 #[allow(deprecated)]
1999 col_options.$field_name.visit(v, key.as_str(), desc);
2000 )*
2001 }
2002 }
2003 }
2004 }
2005}
2006
2007config_namespace_with_hashmap! {
2008 /// Options controlling parquet format for individual columns.
2009 ///
2010 /// See [`ParquetOptions`] for more details
2011 pub struct ParquetColumnOptions {
2012 /// Sets if bloom filter is enabled for the column path.
2013 pub bloom_filter_enabled: Option<bool>, default = None
2014
2015 /// Sets encoding for the column path.
2016 /// Valid values are: plain, plain_dictionary, rle,
2017 /// bit_packed, delta_binary_packed, delta_length_byte_array,
2018 /// delta_byte_array, rle_dictionary, and byte_stream_split.
2019 /// These values are not case-sensitive. If NULL, uses
2020 /// default parquet options
2021 pub encoding: Option<String>, default = None
2022
2023 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2024 /// default parquet options
2025 pub dictionary_enabled: Option<bool>, default = None
2026
2027 /// Sets default parquet compression codec for the column path.
2028 /// Valid values are: uncompressed, snappy, gzip(level),
2029 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2030 /// These values are not case-sensitive. If NULL, uses
2031 /// default parquet options
2032 pub compression: Option<String>, transform = str::to_lowercase, default = None
2033
2034 /// Sets if statistics are enabled for the column
2035 /// Valid values are: "none", "chunk", and "page"
2036 /// These values are not case sensitive. If NULL, uses
2037 /// default parquet options
2038 pub statistics_enabled: Option<String>, default = None
2039
2040 /// Sets bloom filter false positive probability for the column path. If NULL, uses
2041 /// default parquet options
2042 pub bloom_filter_fpp: Option<f64>, default = None
2043
2044 /// Sets bloom filter number of distinct values. If NULL, uses
2045 /// default parquet options
2046 pub bloom_filter_ndv: Option<u64>, default = None
2047
2048 /// Sets max statistics size for the column path. If NULL, uses
2049 /// default parquet options
2050 /// max_statistics_size is deprecated, currently it is not being used
2051 // TODO: remove once deprecated
2052 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
2053 pub max_statistics_size: Option<usize>, default = None
2054 }
2055}
2056
2057#[derive(Clone, Debug, PartialEq)]
2058pub struct ConfigFileEncryptionProperties {
2059 /// Should the parquet footer be encrypted
2060 /// default is true
2061 pub encrypt_footer: bool,
2062 /// Key to use for the parquet footer encoded in hex format
2063 pub footer_key_as_hex: String,
2064 /// Metadata information for footer key
2065 pub footer_key_metadata_as_hex: String,
2066 /// HashMap of column names --> (key in hex format, metadata)
2067 pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2068 /// AAD prefix string uniquely identifies the file and prevents file swapping
2069 pub aad_prefix_as_hex: String,
2070 /// If true, store the AAD prefix in the file
2071 /// default is false
2072 pub store_aad_prefix: bool,
2073}
2074
2075// Setup to match EncryptionPropertiesBuilder::new()
2076impl Default for ConfigFileEncryptionProperties {
2077 fn default() -> Self {
2078 ConfigFileEncryptionProperties {
2079 encrypt_footer: true,
2080 footer_key_as_hex: String::new(),
2081 footer_key_metadata_as_hex: String::new(),
2082 column_encryption_properties: Default::default(),
2083 aad_prefix_as_hex: String::new(),
2084 store_aad_prefix: false,
2085 }
2086 }
2087}
2088
2089config_namespace_with_hashmap! {
2090 pub struct ColumnEncryptionProperties {
2091 /// Per column encryption key
2092 pub column_key_as_hex: String, default = "".to_string()
2093 /// Per column encryption key metadata
2094 pub column_metadata_as_hex: Option<String>, default = None
2095 }
2096}
2097
2098impl ConfigField for ConfigFileEncryptionProperties {
2099 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2100 let key = format!("{key_prefix}.encrypt_footer");
2101 let desc = "Encrypt the footer";
2102 self.encrypt_footer.visit(v, key.as_str(), desc);
2103
2104 let key = format!("{key_prefix}.footer_key_as_hex");
2105 let desc = "Key to use for the parquet footer";
2106 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2107
2108 let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2109 let desc = "Metadata to use for the parquet footer";
2110 self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2111
2112 let key = format!("{key_prefix}.aad_prefix_as_hex");
2113 let desc = "AAD prefix to use";
2114 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2115
2116 let key = format!("{key_prefix}.store_aad_prefix");
2117 let desc = "If true, store the AAD prefix";
2118 self.store_aad_prefix.visit(v, key.as_str(), desc);
2119
2120 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2121 }
2122
2123 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2124 // Any hex encoded values must be pre-encoded using
2125 // hex::encode() before calling set.
2126
2127 if key.contains("::") {
2128 // Handle any column specific properties
2129 return self.column_encryption_properties.set(key, value);
2130 };
2131
2132 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2133 match key {
2134 "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2135 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2136 "footer_key_metadata_as_hex" => {
2137 self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2138 }
2139 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2140 "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2141 _ => _config_err!(
2142 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2143 key
2144 ),
2145 }
2146 }
2147}
2148
2149#[cfg(feature = "parquet_encryption")]
2150impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2151 fn from(val: ConfigFileEncryptionProperties) -> Self {
2152 let mut fep = FileEncryptionProperties::builder(
2153 hex::decode(val.footer_key_as_hex).unwrap(),
2154 )
2155 .with_plaintext_footer(!val.encrypt_footer)
2156 .with_aad_prefix_storage(val.store_aad_prefix);
2157
2158 if !val.footer_key_metadata_as_hex.is_empty() {
2159 fep = fep.with_footer_key_metadata(
2160 hex::decode(&val.footer_key_metadata_as_hex)
2161 .expect("Invalid footer key metadata"),
2162 );
2163 }
2164
2165 for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2166 let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2167 .expect("Invalid column encryption key");
2168 let key_metadata = encryption_props
2169 .column_metadata_as_hex
2170 .as_ref()
2171 .map(|x| hex::decode(x).expect("Invalid column metadata"));
2172 match key_metadata {
2173 Some(key_metadata) => {
2174 fep = fep.with_column_key_and_metadata(
2175 column_name,
2176 encryption_key,
2177 key_metadata,
2178 );
2179 }
2180 None => {
2181 fep = fep.with_column_key(column_name, encryption_key);
2182 }
2183 }
2184 }
2185
2186 if !val.aad_prefix_as_hex.is_empty() {
2187 let aad_prefix: Vec<u8> =
2188 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2189 fep = fep.with_aad_prefix(aad_prefix);
2190 }
2191 fep.build().unwrap()
2192 }
2193}
2194
2195#[cfg(feature = "parquet_encryption")]
2196impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties {
2197 fn from(f: &FileEncryptionProperties) -> Self {
2198 let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2199
2200 let mut column_encryption_properties: HashMap<
2201 String,
2202 ColumnEncryptionProperties,
2203 > = HashMap::new();
2204
2205 for (i, column_name) in column_names_vec.iter().enumerate() {
2206 let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2207 let column_metadata_as_hex: Option<String> =
2208 column_metas_vec.get(i).map(hex::encode);
2209 column_encryption_properties.insert(
2210 column_name.clone(),
2211 ColumnEncryptionProperties {
2212 column_key_as_hex,
2213 column_metadata_as_hex,
2214 },
2215 );
2216 }
2217 let mut aad_prefix: Vec<u8> = Vec::new();
2218 if let Some(prefix) = f.aad_prefix() {
2219 aad_prefix = prefix.clone();
2220 }
2221 ConfigFileEncryptionProperties {
2222 encrypt_footer: f.encrypt_footer(),
2223 footer_key_as_hex: hex::encode(f.footer_key()),
2224 footer_key_metadata_as_hex: f
2225 .footer_key_metadata()
2226 .map(hex::encode)
2227 .unwrap_or_default(),
2228 column_encryption_properties,
2229 aad_prefix_as_hex: hex::encode(aad_prefix),
2230 store_aad_prefix: f.store_aad_prefix(),
2231 }
2232 }
2233}
2234
2235#[derive(Clone, Debug, PartialEq)]
2236pub struct ConfigFileDecryptionProperties {
2237 /// Binary string to use for the parquet footer encoded in hex format
2238 pub footer_key_as_hex: String,
2239 /// HashMap of column names --> key in hex format
2240 pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2241 /// AAD prefix string uniquely identifies the file and prevents file swapping
2242 pub aad_prefix_as_hex: String,
2243 /// If true, then verify signature for files with plaintext footers.
2244 /// default = true
2245 pub footer_signature_verification: bool,
2246}
2247
2248config_namespace_with_hashmap! {
2249 pub struct ColumnDecryptionProperties {
2250 /// Per column encryption key
2251 pub column_key_as_hex: String, default = "".to_string()
2252 }
2253}
2254
2255// Setup to match DecryptionPropertiesBuilder::new()
2256impl Default for ConfigFileDecryptionProperties {
2257 fn default() -> Self {
2258 ConfigFileDecryptionProperties {
2259 footer_key_as_hex: String::new(),
2260 column_decryption_properties: Default::default(),
2261 aad_prefix_as_hex: String::new(),
2262 footer_signature_verification: true,
2263 }
2264 }
2265}
2266
2267impl ConfigField for ConfigFileDecryptionProperties {
2268 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2269 let key = format!("{key_prefix}.footer_key_as_hex");
2270 let desc = "Key to use for the parquet footer";
2271 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2272
2273 let key = format!("{key_prefix}.aad_prefix_as_hex");
2274 let desc = "AAD prefix to use";
2275 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2276
2277 let key = format!("{key_prefix}.footer_signature_verification");
2278 let desc = "If true, verify the footer signature";
2279 self.footer_signature_verification
2280 .visit(v, key.as_str(), desc);
2281
2282 self.column_decryption_properties.visit(v, key_prefix, desc);
2283 }
2284
2285 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2286 // Any hex encoded values must be pre-encoded using
2287 // hex::encode() before calling set.
2288
2289 if key.contains("::") {
2290 // Handle any column specific properties
2291 return self.column_decryption_properties.set(key, value);
2292 };
2293
2294 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2295 match key {
2296 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2297 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2298 "footer_signature_verification" => {
2299 self.footer_signature_verification.set(rem, value.as_ref())
2300 }
2301 _ => _config_err!(
2302 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2303 key
2304 ),
2305 }
2306 }
2307}
2308
2309#[cfg(feature = "parquet_encryption")]
2310impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2311 fn from(val: ConfigFileDecryptionProperties) -> Self {
2312 let mut column_names: Vec<&str> = Vec::new();
2313 let mut column_keys: Vec<Vec<u8>> = Vec::new();
2314
2315 for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2316 column_names.push(col_name.as_str());
2317 column_keys.push(
2318 hex::decode(&decryption_properties.column_key_as_hex)
2319 .expect("Invalid column decryption key"),
2320 );
2321 }
2322
2323 let mut fep = FileDecryptionProperties::builder(
2324 hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2325 )
2326 .with_column_keys(column_names, column_keys)
2327 .unwrap();
2328
2329 if !val.footer_signature_verification {
2330 fep = fep.disable_footer_signature_verification();
2331 }
2332
2333 if !val.aad_prefix_as_hex.is_empty() {
2334 let aad_prefix =
2335 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2336 fep = fep.with_aad_prefix(aad_prefix);
2337 }
2338
2339 fep.build().unwrap()
2340 }
2341}
2342
2343#[cfg(feature = "parquet_encryption")]
2344impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties {
2345 fn from(f: &FileDecryptionProperties) -> Self {
2346 let (column_names_vec, column_keys_vec) = f.column_keys();
2347 let mut column_decryption_properties: HashMap<
2348 String,
2349 ColumnDecryptionProperties,
2350 > = HashMap::new();
2351 for (i, column_name) in column_names_vec.iter().enumerate() {
2352 let props = ColumnDecryptionProperties {
2353 column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2354 };
2355 column_decryption_properties.insert(column_name.clone(), props);
2356 }
2357
2358 let mut aad_prefix: Vec<u8> = Vec::new();
2359 if let Some(prefix) = f.aad_prefix() {
2360 aad_prefix = prefix.clone();
2361 }
2362 ConfigFileDecryptionProperties {
2363 footer_key_as_hex: hex::encode(
2364 f.footer_key(None).unwrap_or_default().as_ref(),
2365 ),
2366 column_decryption_properties,
2367 aad_prefix_as_hex: hex::encode(aad_prefix),
2368 footer_signature_verification: f.check_plaintext_footer_integrity(),
2369 }
2370 }
2371}
2372
2373config_namespace! {
2374 /// Options controlling CSV format
2375 pub struct CsvOptions {
2376 /// Specifies whether there is a CSV header (i.e. the first line
2377 /// consists of is column names). The value `None` indicates that
2378 /// the configuration should be consulted.
2379 pub has_header: Option<bool>, default = None
2380 pub delimiter: u8, default = b','
2381 pub quote: u8, default = b'"'
2382 pub terminator: Option<u8>, default = None
2383 pub escape: Option<u8>, default = None
2384 pub double_quote: Option<bool>, default = None
2385 /// Specifies whether newlines in (quoted) values are supported.
2386 ///
2387 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2388 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2389 /// parsed successfully, which may reduce performance.
2390 ///
2391 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2392 pub newlines_in_values: Option<bool>, default = None
2393 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2394 pub schema_infer_max_rec: Option<usize>, default = None
2395 pub date_format: Option<String>, default = None
2396 pub datetime_format: Option<String>, default = None
2397 pub timestamp_format: Option<String>, default = None
2398 pub timestamp_tz_format: Option<String>, default = None
2399 pub time_format: Option<String>, default = None
2400 // The output format for Nulls in the CSV writer.
2401 pub null_value: Option<String>, default = None
2402 // The input regex for Nulls when loading CSVs.
2403 pub null_regex: Option<String>, default = None
2404 pub comment: Option<u8>, default = None
2405 }
2406}
2407
2408impl CsvOptions {
2409 /// Set a limit in terms of records to scan to infer the schema
2410 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2411 pub fn with_compression(
2412 mut self,
2413 compression_type_variant: CompressionTypeVariant,
2414 ) -> Self {
2415 self.compression = compression_type_variant;
2416 self
2417 }
2418
2419 /// Set a limit in terms of records to scan to infer the schema
2420 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2421 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2422 self.schema_infer_max_rec = Some(max_rec);
2423 self
2424 }
2425
2426 /// Set true to indicate that the first line is a header.
2427 /// - default to true
2428 pub fn with_has_header(mut self, has_header: bool) -> Self {
2429 self.has_header = Some(has_header);
2430 self
2431 }
2432
2433 /// Returns true if the first line is a header. If format options does not
2434 /// specify whether there is a header, returns `None` (indicating that the
2435 /// configuration should be consulted).
2436 pub fn has_header(&self) -> Option<bool> {
2437 self.has_header
2438 }
2439
2440 /// The character separating values within a row.
2441 /// - default to ','
2442 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2443 self.delimiter = delimiter;
2444 self
2445 }
2446
2447 /// The quote character in a row.
2448 /// - default to '"'
2449 pub fn with_quote(mut self, quote: u8) -> Self {
2450 self.quote = quote;
2451 self
2452 }
2453
2454 /// The character that terminates a row.
2455 /// - default to None (CRLF)
2456 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2457 self.terminator = terminator;
2458 self
2459 }
2460
2461 /// The escape character in a row.
2462 /// - default is None
2463 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2464 self.escape = escape;
2465 self
2466 }
2467
2468 /// Set true to indicate that the CSV quotes should be doubled.
2469 /// - default to true
2470 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2471 self.double_quote = Some(double_quote);
2472 self
2473 }
2474
2475 /// Specifies whether newlines in (quoted) values are supported.
2476 ///
2477 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2478 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2479 /// parsed successfully, which may reduce performance.
2480 ///
2481 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2482 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2483 self.newlines_in_values = Some(newlines_in_values);
2484 self
2485 }
2486
2487 /// Set a `CompressionTypeVariant` of CSV
2488 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2489 pub fn with_file_compression_type(
2490 mut self,
2491 compression: CompressionTypeVariant,
2492 ) -> Self {
2493 self.compression = compression;
2494 self
2495 }
2496
2497 /// The delimiter character.
2498 pub fn delimiter(&self) -> u8 {
2499 self.delimiter
2500 }
2501
2502 /// The quote character.
2503 pub fn quote(&self) -> u8 {
2504 self.quote
2505 }
2506
2507 /// The terminator character.
2508 pub fn terminator(&self) -> Option<u8> {
2509 self.terminator
2510 }
2511
2512 /// The escape character.
2513 pub fn escape(&self) -> Option<u8> {
2514 self.escape
2515 }
2516}
2517
2518config_namespace! {
2519 /// Options controlling JSON format
2520 pub struct JsonOptions {
2521 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2522 pub schema_infer_max_rec: Option<usize>, default = None
2523 }
2524}
2525
2526pub trait OutputFormatExt: Display {}
2527
2528#[derive(Debug, Clone, PartialEq)]
2529#[allow(clippy::large_enum_variant)]
2530pub enum OutputFormat {
2531 CSV(CsvOptions),
2532 JSON(JsonOptions),
2533 #[cfg(feature = "parquet")]
2534 PARQUET(TableParquetOptions),
2535 AVRO,
2536 ARROW,
2537}
2538
2539impl Display for OutputFormat {
2540 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2541 let out = match self {
2542 OutputFormat::CSV(_) => "csv",
2543 OutputFormat::JSON(_) => "json",
2544 #[cfg(feature = "parquet")]
2545 OutputFormat::PARQUET(_) => "parquet",
2546 OutputFormat::AVRO => "avro",
2547 OutputFormat::ARROW => "arrow",
2548 };
2549 write!(f, "{out}")
2550 }
2551}
2552
2553#[cfg(test)]
2554mod tests {
2555 use crate::config::{
2556 ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2557 Extensions, TableOptions,
2558 };
2559 use std::any::Any;
2560 use std::collections::HashMap;
2561
2562 #[derive(Default, Debug, Clone)]
2563 pub struct TestExtensionConfig {
2564 /// Should "foo" be replaced by "bar"?
2565 pub properties: HashMap<String, String>,
2566 }
2567
2568 impl ExtensionOptions for TestExtensionConfig {
2569 fn as_any(&self) -> &dyn Any {
2570 self
2571 }
2572
2573 fn as_any_mut(&mut self) -> &mut dyn Any {
2574 self
2575 }
2576
2577 fn cloned(&self) -> Box<dyn ExtensionOptions> {
2578 Box::new(self.clone())
2579 }
2580
2581 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2582 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2583 assert_eq!(key, "test");
2584 self.properties.insert(rem.to_owned(), value.to_owned());
2585 Ok(())
2586 }
2587
2588 fn entries(&self) -> Vec<ConfigEntry> {
2589 self.properties
2590 .iter()
2591 .map(|(k, v)| ConfigEntry {
2592 key: k.into(),
2593 value: Some(v.into()),
2594 description: "",
2595 })
2596 .collect()
2597 }
2598 }
2599
2600 impl ConfigExtension for TestExtensionConfig {
2601 const PREFIX: &'static str = "test";
2602 }
2603
2604 #[test]
2605 fn create_table_config() {
2606 let mut extension = Extensions::new();
2607 extension.insert(TestExtensionConfig::default());
2608 let table_config = TableOptions::new().with_extensions(extension);
2609 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2610 assert!(kafka_config.is_some())
2611 }
2612
2613 #[test]
2614 fn alter_test_extension_config() {
2615 let mut extension = Extensions::new();
2616 extension.insert(TestExtensionConfig::default());
2617 let mut table_config = TableOptions::new().with_extensions(extension);
2618 table_config.set_config_format(ConfigFileType::CSV);
2619 table_config.set("format.delimiter", ";").unwrap();
2620 assert_eq!(table_config.csv.delimiter, b';');
2621 table_config.set("test.bootstrap.servers", "asd").unwrap();
2622 let kafka_config = table_config
2623 .extensions
2624 .get::<TestExtensionConfig>()
2625 .unwrap();
2626 assert_eq!(
2627 kafka_config.properties.get("bootstrap.servers").unwrap(),
2628 "asd"
2629 );
2630 }
2631
2632 #[test]
2633 fn csv_u8_table_options() {
2634 let mut table_config = TableOptions::new();
2635 table_config.set_config_format(ConfigFileType::CSV);
2636 table_config.set("format.delimiter", ";").unwrap();
2637 assert_eq!(table_config.csv.delimiter as char, ';');
2638 table_config.set("format.escape", "\"").unwrap();
2639 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2640 table_config.set("format.escape", "\'").unwrap();
2641 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2642 }
2643
2644 #[test]
2645 fn warning_only_not_default() {
2646 use std::sync::atomic::AtomicUsize;
2647 static COUNT: AtomicUsize = AtomicUsize::new(0);
2648 use log::{Level, LevelFilter, Metadata, Record};
2649 struct SimpleLogger;
2650 impl log::Log for SimpleLogger {
2651 fn enabled(&self, metadata: &Metadata) -> bool {
2652 metadata.level() <= Level::Info
2653 }
2654
2655 fn log(&self, record: &Record) {
2656 if self.enabled(record.metadata()) {
2657 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2658 }
2659 }
2660 fn flush(&self) {}
2661 }
2662 log::set_logger(&SimpleLogger).unwrap();
2663 log::set_max_level(LevelFilter::Info);
2664 let mut sql_parser_options = crate::config::SqlParserOptions::default();
2665 sql_parser_options
2666 .set("enable_options_value_normalization", "false")
2667 .unwrap();
2668 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2669 sql_parser_options
2670 .set("enable_options_value_normalization", "true")
2671 .unwrap();
2672 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2673 }
2674
2675 #[cfg(feature = "parquet")]
2676 #[test]
2677 fn parquet_table_options() {
2678 let mut table_config = TableOptions::new();
2679 table_config.set_config_format(ConfigFileType::PARQUET);
2680 table_config
2681 .set("format.bloom_filter_enabled::col1", "true")
2682 .unwrap();
2683 assert_eq!(
2684 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2685 Some(true)
2686 );
2687 }
2688
2689 #[cfg(feature = "parquet_encryption")]
2690 #[test]
2691 fn parquet_table_encryption() {
2692 use crate::config::{
2693 ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
2694 };
2695 use parquet::encryption::decrypt::FileDecryptionProperties;
2696 use parquet::encryption::encrypt::FileEncryptionProperties;
2697
2698 let footer_key = b"0123456789012345".to_vec(); // 128bit/16
2699 let column_names = vec!["double_field", "float_field"];
2700 let column_keys =
2701 vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
2702
2703 let file_encryption_properties =
2704 FileEncryptionProperties::builder(footer_key.clone())
2705 .with_column_keys(column_names.clone(), column_keys.clone())
2706 .unwrap()
2707 .build()
2708 .unwrap();
2709
2710 let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
2711 .with_column_keys(column_names.clone(), column_keys.clone())
2712 .unwrap()
2713 .build()
2714 .unwrap();
2715
2716 // Test round-trip
2717 let config_encrypt: ConfigFileEncryptionProperties =
2718 (&file_encryption_properties).into();
2719 let encryption_properties_built: FileEncryptionProperties =
2720 config_encrypt.clone().into();
2721 assert_eq!(file_encryption_properties, encryption_properties_built);
2722
2723 let config_decrypt: ConfigFileDecryptionProperties =
2724 (&decryption_properties).into();
2725 let decryption_properties_built: FileDecryptionProperties =
2726 config_decrypt.clone().into();
2727 assert_eq!(decryption_properties, decryption_properties_built);
2728
2729 ///////////////////////////////////////////////////////////////////////////////////
2730 // Test encryption config
2731
2732 // Display original encryption config
2733 // println!("{:#?}", config_encrypt);
2734
2735 let mut table_config = TableOptions::new();
2736 table_config.set_config_format(ConfigFileType::PARQUET);
2737 table_config
2738 .parquet
2739 .set(
2740 "crypto.file_encryption.encrypt_footer",
2741 config_encrypt.encrypt_footer.to_string().as_str(),
2742 )
2743 .unwrap();
2744 table_config
2745 .parquet
2746 .set(
2747 "crypto.file_encryption.footer_key_as_hex",
2748 config_encrypt.footer_key_as_hex.as_str(),
2749 )
2750 .unwrap();
2751
2752 for (i, col_name) in column_names.iter().enumerate() {
2753 let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
2754 let value = hex::encode(column_keys[i].clone());
2755 table_config
2756 .parquet
2757 .set(key.as_str(), value.as_str())
2758 .unwrap();
2759 }
2760
2761 // Print matching final encryption config
2762 // println!("{:#?}", table_config.parquet.crypto.file_encryption);
2763
2764 assert_eq!(
2765 table_config.parquet.crypto.file_encryption,
2766 Some(config_encrypt)
2767 );
2768
2769 ///////////////////////////////////////////////////////////////////////////////////
2770 // Test decryption config
2771
2772 // Display original decryption config
2773 // println!("{:#?}", config_decrypt);
2774
2775 let mut table_config = TableOptions::new();
2776 table_config.set_config_format(ConfigFileType::PARQUET);
2777 table_config
2778 .parquet
2779 .set(
2780 "crypto.file_decryption.footer_key_as_hex",
2781 config_decrypt.footer_key_as_hex.as_str(),
2782 )
2783 .unwrap();
2784
2785 for (i, col_name) in column_names.iter().enumerate() {
2786 let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
2787 let value = hex::encode(column_keys[i].clone());
2788 table_config
2789 .parquet
2790 .set(key.as_str(), value.as_str())
2791 .unwrap();
2792 }
2793
2794 // Print matching final decryption config
2795 // println!("{:#?}", table_config.parquet.crypto.file_decryption);
2796
2797 assert_eq!(
2798 table_config.parquet.crypto.file_decryption,
2799 Some(config_decrypt.clone())
2800 );
2801
2802 // Set config directly
2803 let mut table_config = TableOptions::new();
2804 table_config.set_config_format(ConfigFileType::PARQUET);
2805 table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
2806 assert_eq!(
2807 table_config.parquet.crypto.file_decryption,
2808 Some(config_decrypt.clone())
2809 );
2810 }
2811
2812 #[cfg(feature = "parquet")]
2813 #[test]
2814 fn parquet_table_options_config_entry() {
2815 let mut table_config = TableOptions::new();
2816 table_config.set_config_format(ConfigFileType::PARQUET);
2817 table_config
2818 .set("format.bloom_filter_enabled::col1", "true")
2819 .unwrap();
2820 let entries = table_config.entries();
2821 assert!(entries
2822 .iter()
2823 .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2824 }
2825
2826 #[cfg(feature = "parquet")]
2827 #[test]
2828 fn parquet_table_options_config_metadata_entry() {
2829 let mut table_config = TableOptions::new();
2830 table_config.set_config_format(ConfigFileType::PARQUET);
2831 table_config.set("format.metadata::key1", "").unwrap();
2832 table_config.set("format.metadata::key2", "value2").unwrap();
2833 table_config
2834 .set("format.metadata::key3", "value with spaces ")
2835 .unwrap();
2836 table_config
2837 .set("format.metadata::key4", "value with special chars :: :")
2838 .unwrap();
2839
2840 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2841 assert_eq!(parsed_metadata.get("should not exist1"), None);
2842 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2843 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2844 assert_eq!(
2845 parsed_metadata.get("key3"),
2846 Some(&Some("value with spaces ".into()))
2847 );
2848 assert_eq!(
2849 parsed_metadata.get("key4"),
2850 Some(&Some("value with special chars :: :".into()))
2851 );
2852
2853 // duplicate keys are overwritten
2854 table_config.set("format.metadata::key_dupe", "A").unwrap();
2855 table_config.set("format.metadata::key_dupe", "B").unwrap();
2856 let parsed_metadata = table_config.parquet.key_value_metadata;
2857 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2858 }
2859}