datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use std::any::Any;
21use std::collections::{BTreeMap, HashMap};
22use std::error::Error;
23use std::fmt::{self, Display};
24use std::str::FromStr;
25
26use crate::error::_config_err;
27use crate::parsers::CompressionTypeVariant;
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30
31/// A macro that wraps a configuration struct and automatically derives
32/// [`Default`] and [`ConfigField`] for it, allowing it to be used
33/// in the [`ConfigOptions`] configuration tree.
34///
35/// `transform` is used to normalize values before parsing.
36///
37/// For example,
38///
39/// ```ignore
40/// config_namespace! {
41/// /// Amazing config
42/// pub struct MyConfig {
43/// /// Field 1 doc
44/// field1: String, transform = str::to_lowercase, default = "".to_string()
45///
46/// /// Field 2 doc
47/// field2: usize, default = 232
48///
49/// /// Field 3 doc
50/// field3: Option<usize>, default = None
51/// }
52///}
53/// ```
54///
55/// Will generate
56///
57/// ```ignore
58/// /// Amazing config
59/// #[derive(Debug, Clone)]
60/// #[non_exhaustive]
61/// pub struct MyConfig {
62/// /// Field 1 doc
63/// field1: String,
64/// /// Field 2 doc
65/// field2: usize,
66/// /// Field 3 doc
67/// field3: Option<usize>,
68/// }
69/// impl ConfigField for MyConfig {
70/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
71/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
72/// match key {
73/// "field1" => {
74/// let value = str::to_lowercase(value);
75/// self.field1.set(rem, value.as_ref())
76/// },
77/// "field2" => self.field2.set(rem, value.as_ref()),
78/// "field3" => self.field3.set(rem, value.as_ref()),
79/// _ => _internal_err!(
80/// "Config value \"{}\" not found on MyConfig",
81/// key
82/// ),
83/// }
84/// }
85///
86/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
87/// let key = format!("{}.field1", key_prefix);
88/// let desc = "Field 1 doc";
89/// self.field1.visit(v, key.as_str(), desc);
90/// let key = format!("{}.field2", key_prefix);
91/// let desc = "Field 2 doc";
92/// self.field2.visit(v, key.as_str(), desc);
93/// let key = format!("{}.field3", key_prefix);
94/// let desc = "Field 3 doc";
95/// self.field3.visit(v, key.as_str(), desc);
96/// }
97/// }
98///
99/// impl Default for MyConfig {
100/// fn default() -> Self {
101/// Self {
102/// field1: "".to_string(),
103/// field2: 232,
104/// field3: None,
105/// }
106/// }
107/// }
108/// ```
109///
110/// NB: Misplaced commas may result in nonsensical errors
111#[macro_export]
112macro_rules! config_namespace {
113 (
114 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
115 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
116 $(#[allow($($struct_de:tt)*)])?
117 $vis:vis struct $struct_name:ident {
118 $(
119 $(#[doc = $d:tt])* // Field-level documentation attributes
120 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
121 $(#[allow($($field_de:tt)*)])?
122 $field_vis:vis $field_name:ident : $field_type:ty,
123 $(warn = $warn:expr,)?
124 $(transform = $transform:expr,)?
125 default = $default:expr
126 )*$(,)*
127 }
128 ) => {
129 $(#[doc = $struct_d])* // Apply struct documentation
130 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
131 $(#[allow($($struct_de)*)])?
132 #[derive(Debug, Clone, PartialEq)]
133 $vis struct $struct_name {
134 $(
135 $(#[doc = $d])* // Apply field documentation
136 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
137 $(#[allow($($field_de)*)])?
138 $field_vis $field_name: $field_type,
139 )*
140 }
141
142 impl $crate::config::ConfigField for $struct_name {
143 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
144 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
145 match key {
146 $(
147 stringify!($field_name) => {
148 // Safely apply deprecated attribute if present
149 // $(#[allow(deprecated)])?
150 {
151 $(let value = $transform(value);)? // Apply transformation if specified
152 #[allow(deprecated)]
153 let ret = self.$field_name.set(rem, value.as_ref());
154
155 $(if !$warn.is_empty() {
156 let default: $field_type = $default;
157 #[allow(deprecated)]
158 if default != self.$field_name {
159 log::warn!($warn);
160 }
161 })? // Log warning if specified, and the value is not the default
162 ret
163 }
164 },
165 )*
166 _ => return $crate::error::_config_err!(
167 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
168 )
169 }
170 }
171
172 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
173 $(
174 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
175 let desc = concat!($($d),*).trim();
176 #[allow(deprecated)]
177 self.$field_name.visit(v, key.as_str(), desc);
178 )*
179 }
180 }
181 impl Default for $struct_name {
182 fn default() -> Self {
183 #[allow(deprecated)]
184 Self {
185 $($field_name: $default),*
186 }
187 }
188 }
189 }
190}
191
192config_namespace! {
193 /// Options related to catalog and directory scanning
194 ///
195 /// See also: [`SessionConfig`]
196 ///
197 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
198 pub struct CatalogOptions {
199 /// Whether the default catalog and schema should be created automatically.
200 pub create_default_catalog_and_schema: bool, default = true
201
202 /// The default catalog name - this impacts what SQL queries use if not specified
203 pub default_catalog: String, default = "datafusion".to_string()
204
205 /// The default schema name - this impacts what SQL queries use if not specified
206 pub default_schema: String, default = "public".to_string()
207
208 /// Should DataFusion provide access to `information_schema`
209 /// virtual tables for displaying schema information
210 pub information_schema: bool, default = false
211
212 /// Location scanned to load tables for `default` schema
213 pub location: Option<String>, default = None
214
215 /// Type of `TableProvider` to use when loading `default` schema
216 pub format: Option<String>, default = None
217
218 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
219 /// if not specified explicitly in the statement.
220 pub has_header: bool, default = true
221
222 /// Specifies whether newlines in (quoted) CSV values are supported.
223 ///
224 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
225 /// if not specified explicitly in the statement.
226 ///
227 /// Parsing newlines in quoted values may be affected by execution behaviour such as
228 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
229 /// parsed successfully, which may reduce performance.
230 pub newlines_in_values: bool, default = false
231 }
232}
233
234config_namespace! {
235 /// Options related to SQL parser
236 ///
237 /// See also: [`SessionConfig`]
238 ///
239 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
240 pub struct SqlParserOptions {
241 /// When set to true, SQL parser will parse float as decimal type
242 pub parse_float_as_decimal: bool, default = false
243
244 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
245 pub enable_ident_normalization: bool, default = true
246
247 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
248 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
249 /// are normalized automatically.
250 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
251
252 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
253 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
254 pub dialect: String, default = "generic".to_string()
255 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
256
257 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
258 /// ignore the length. If false, error if a `VARCHAR` with a length is
259 /// specified. The Arrow type system does not have a notion of maximum
260 /// string length and thus DataFusion can not enforce such limits.
261 pub support_varchar_with_length: bool, default = true
262
263 /// If true, `VARCHAR` is mapped to `Utf8View` during SQL planning.
264 /// If false, `VARCHAR` is mapped to `Utf8` during SQL planning.
265 /// Default is false.
266 pub map_varchar_to_utf8view: bool, default = false
267
268 /// When set to true, the source locations relative to the original SQL
269 /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
270 /// and recorded in the logical plan nodes.
271 pub collect_spans: bool, default = false
272
273 /// Specifies the recursion depth limit when parsing complex SQL Queries
274 pub recursion_limit: usize, default = 50
275 }
276}
277
278config_namespace! {
279 /// Options related to query execution
280 ///
281 /// See also: [`SessionConfig`]
282 ///
283 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
284 pub struct ExecutionOptions {
285 /// Default batch size while creating new batches, it's especially useful for
286 /// buffer-in-memory batches since creating tiny batches would result in too much
287 /// metadata memory consumption
288 pub batch_size: usize, default = 8192
289
290 /// When set to true, record batches will be examined between each operator and
291 /// small batches will be coalesced into larger batches. This is helpful when there
292 /// are highly selective filters or joins that could produce tiny output batches. The
293 /// target batch size is determined by the configuration setting
294 pub coalesce_batches: bool, default = true
295
296 /// Should DataFusion collect statistics after listing files
297 pub collect_statistics: bool, default = false
298
299 /// Number of partitions for query execution. Increasing partitions can increase
300 /// concurrency.
301 ///
302 /// Defaults to the number of CPU cores on the system
303 pub target_partitions: usize, default = get_available_parallelism()
304
305 /// The default time zone
306 ///
307 /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
308 /// according to this time zone, and then extract the hour
309 pub time_zone: Option<String>, default = Some("+00:00".into())
310
311 /// Parquet options
312 pub parquet: ParquetOptions, default = Default::default()
313
314 /// Fan-out during initial physical planning.
315 ///
316 /// This is mostly use to plan `UNION` children in parallel.
317 ///
318 /// Defaults to the number of CPU cores on the system
319 pub planning_concurrency: usize, default = get_available_parallelism()
320
321 /// When set to true, skips verifying that the schema produced by
322 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
323 /// schema of the input plan.
324 ///
325 /// When set to false, if the schema does not match exactly
326 /// (including nullability and metadata), a planning error will be raised.
327 ///
328 /// This is used to workaround bugs in the planner that are now caught by
329 /// the new schema verification step.
330 pub skip_physical_aggregate_schema_check: bool, default = false
331
332 /// Specifies the reserved memory for each spillable sort operation to
333 /// facilitate an in-memory merge.
334 ///
335 /// When a sort operation spills to disk, the in-memory data must be
336 /// sorted and merged before being written to a file. This setting reserves
337 /// a specific amount of memory for that in-memory sort/merge process.
338 ///
339 /// Note: This setting is irrelevant if the sort operation cannot spill
340 /// (i.e., if there's no `DiskManager` configured).
341 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
342
343 /// When sorting, below what size should data be concatenated
344 /// and sorted in a single RecordBatch rather than sorted in
345 /// batches and merged.
346 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
347
348 /// Number of files to read in parallel when inferring schema and statistics
349 pub meta_fetch_concurrency: usize, default = 32
350
351 /// Guarantees a minimum level of output files running in parallel.
352 /// RecordBatches will be distributed in round robin fashion to each
353 /// parallel writer. Each writer is closed and a new file opened once
354 /// soft_max_rows_per_output_file is reached.
355 pub minimum_parallel_output_files: usize, default = 4
356
357 /// Target number of rows in output files when writing multiple.
358 /// This is a soft max, so it can be exceeded slightly. There also
359 /// will be one file smaller than the limit if the total
360 /// number of rows written is not roughly divisible by the soft max
361 pub soft_max_rows_per_output_file: usize, default = 50000000
362
363 /// This is the maximum number of RecordBatches buffered
364 /// for each output file being worked. Higher values can potentially
365 /// give faster write performance at the cost of higher peak
366 /// memory consumption
367 pub max_buffered_batches_per_output_file: usize, default = 2
368
369 /// Should sub directories be ignored when scanning directories for data
370 /// files. Defaults to true (ignores subdirectories), consistent with
371 /// Hive. Note that this setting does not affect reading partitioned
372 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
373 pub listing_table_ignore_subdirectory: bool, default = true
374
375 /// Should DataFusion support recursive CTEs
376 pub enable_recursive_ctes: bool, default = true
377
378 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
379 /// statistics into the same file groups.
380 /// Currently experimental
381 pub split_file_groups_by_statistics: bool, default = false
382
383 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
384 pub keep_partition_by_columns: bool, default = false
385
386 /// Aggregation ratio (number of distinct groups / number of input rows)
387 /// threshold for skipping partial aggregation. If the value is greater
388 /// then partial aggregation will skip aggregation for further input
389 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
390
391 /// Number of input rows partial aggregation partition should process, before
392 /// aggregation ratio check and trying to switch to skipping aggregation mode
393 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
394
395 /// Should DataFusion use row number estimates at the input to decide
396 /// whether increasing parallelism is beneficial or not. By default,
397 /// only exact row numbers (not estimates) are used for this decision.
398 /// Setting this flag to `true` will likely produce better plans.
399 /// if the source of statistics is accurate.
400 /// We plan to make this the default in the future.
401 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
402
403 /// Should DataFusion enforce batch size in joins or not. By default,
404 /// DataFusion will not enforce batch size in joins. Enforcing batch size
405 /// in joins can reduce memory usage when joining large
406 /// tables with a highly-selective join filter, but is also slightly slower.
407 pub enforce_batch_size_in_joins: bool, default = false
408 }
409}
410
411config_namespace! {
412 /// Options for reading and writing parquet files
413 ///
414 /// See also: [`SessionConfig`]
415 ///
416 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
417 pub struct ParquetOptions {
418 // The following options affect reading parquet files
419
420 /// (reading) If true, reads the Parquet data page level metadata (the
421 /// Page Index), if present, to reduce the I/O and number of
422 /// rows decoded.
423 pub enable_page_index: bool, default = true
424
425 /// (reading) If true, the parquet reader attempts to skip entire row groups based
426 /// on the predicate in the query and the metadata (min/max values) stored in
427 /// the parquet file
428 pub pruning: bool, default = true
429
430 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
431 /// the file Schema. This setting can help avoid schema conflicts when querying
432 /// multiple parquet files with schemas containing compatible types but different metadata
433 pub skip_metadata: bool, default = true
434
435 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
436 /// bytes of the parquet file optimistically. If not specified, two reads are required:
437 /// One read to fetch the 8-byte parquet footer and
438 /// another to fetch the metadata length encoded in the footer
439 pub metadata_size_hint: Option<usize>, default = None
440
441 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
442 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
443 pub pushdown_filters: bool, default = false
444
445 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
446 /// will be reordered heuristically to minimize the cost of evaluation. If false,
447 /// the filters are applied in the same order as written in the query
448 pub reorder_filters: bool, default = false
449
450 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
451 /// and `Binary/BinaryLarge` with `BinaryView`.
452 pub schema_force_view_types: bool, default = true
453
454 /// (reading) If true, parquet reader will read columns of
455 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
456 ///
457 /// Parquet files generated by some legacy writers do not correctly set
458 /// the UTF8 flag for strings, causing string columns to be loaded as
459 /// BLOB instead.
460 pub binary_as_string: bool, default = false
461
462 /// (reading) If true, parquet reader will read columns of
463 /// physical type int96 as originating from a different resolution
464 /// than nanosecond. This is useful for reading data from systems like Spark
465 /// which stores microsecond resolution timestamps in an int96 allowing it
466 /// to write values with a larger date range than 64-bit timestamps with
467 /// nanosecond resolution.
468 pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
469
470 // The following options affect writing to parquet files
471 // and map to parquet::file::properties::WriterProperties
472
473 /// (writing) Sets best effort maximum size of data page in bytes
474 pub data_pagesize_limit: usize, default = 1024 * 1024
475
476 /// (writing) Sets write_batch_size in bytes
477 pub write_batch_size: usize, default = 1024
478
479 /// (writing) Sets parquet writer version
480 /// valid values are "1.0" and "2.0"
481 pub writer_version: String, default = "1.0".to_string()
482
483 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
484 ///
485 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
486 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
487 pub skip_arrow_metadata: bool, default = false
488
489 /// (writing) Sets default parquet compression codec.
490 /// Valid values are: uncompressed, snappy, gzip(level),
491 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
492 /// These values are not case sensitive. If NULL, uses
493 /// default parquet writer setting
494 ///
495 /// Note that this default setting is not the same as
496 /// the default parquet writer setting.
497 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
498
499 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
500 /// default parquet writer setting
501 pub dictionary_enabled: Option<bool>, default = Some(true)
502
503 /// (writing) Sets best effort maximum dictionary page size, in bytes
504 pub dictionary_page_size_limit: usize, default = 1024 * 1024
505
506 /// (writing) Sets if statistics are enabled for any column
507 /// Valid values are: "none", "chunk", and "page"
508 /// These values are not case sensitive. If NULL, uses
509 /// default parquet writer setting
510 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
511
512 /// (writing) Sets max statistics size for any column. If NULL, uses
513 /// default parquet writer setting
514 /// max_statistics_size is deprecated, currently it is not being used
515 // TODO: remove once deprecated
516 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
517 pub max_statistics_size: Option<usize>, default = Some(4096)
518
519 /// (writing) Target maximum number of rows in each row group (defaults to 1M
520 /// rows). Writing larger row groups requires more memory to write, but
521 /// can get better compression and be faster to read.
522 pub max_row_group_size: usize, default = 1024 * 1024
523
524 /// (writing) Sets "created by" property
525 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
526
527 /// (writing) Sets column index truncate length
528 pub column_index_truncate_length: Option<usize>, default = Some(64)
529
530 /// (writing) Sets statictics truncate length. If NULL, uses
531 /// default parquet writer setting
532 pub statistics_truncate_length: Option<usize>, default = None
533
534 /// (writing) Sets best effort maximum number of rows in data page
535 pub data_page_row_count_limit: usize, default = 20_000
536
537 /// (writing) Sets default encoding for any column.
538 /// Valid values are: plain, plain_dictionary, rle,
539 /// bit_packed, delta_binary_packed, delta_length_byte_array,
540 /// delta_byte_array, rle_dictionary, and byte_stream_split.
541 /// These values are not case sensitive. If NULL, uses
542 /// default parquet writer setting
543 pub encoding: Option<String>, transform = str::to_lowercase, default = None
544
545 /// (writing) Use any available bloom filters when reading parquet files
546 pub bloom_filter_on_read: bool, default = true
547
548 /// (writing) Write bloom filters for all columns when creating parquet files
549 pub bloom_filter_on_write: bool, default = false
550
551 /// (writing) Sets bloom filter false positive probability. If NULL, uses
552 /// default parquet writer setting
553 pub bloom_filter_fpp: Option<f64>, default = None
554
555 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
556 /// default parquet writer setting
557 pub bloom_filter_ndv: Option<u64>, default = None
558
559 /// (writing) Controls whether DataFusion will attempt to speed up writing
560 /// parquet files by serializing them in parallel. Each column
561 /// in each row group in each output file are serialized in parallel
562 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
563 pub allow_single_file_parallelism: bool, default = true
564
565 /// (writing) By default parallel parquet writer is tuned for minimum
566 /// memory usage in a streaming execution plan. You may see
567 /// a performance benefit when writing large parquet files
568 /// by increasing maximum_parallel_row_group_writers and
569 /// maximum_buffered_record_batches_per_stream if your system
570 /// has idle cores and can tolerate additional memory usage.
571 /// Boosting these values is likely worthwhile when
572 /// writing out already in-memory data, such as from a cached
573 /// data frame.
574 pub maximum_parallel_row_group_writers: usize, default = 1
575
576 /// (writing) By default parallel parquet writer is tuned for minimum
577 /// memory usage in a streaming execution plan. You may see
578 /// a performance benefit when writing large parquet files
579 /// by increasing maximum_parallel_row_group_writers and
580 /// maximum_buffered_record_batches_per_stream if your system
581 /// has idle cores and can tolerate additional memory usage.
582 /// Boosting these values is likely worthwhile when
583 /// writing out already in-memory data, such as from a cached
584 /// data frame.
585 pub maximum_buffered_record_batches_per_stream: usize, default = 2
586 }
587}
588
589config_namespace! {
590 /// Options related to query optimization
591 ///
592 /// See also: [`SessionConfig`]
593 ///
594 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
595 pub struct OptimizerOptions {
596 /// When set to true, the optimizer will push a limit operation into
597 /// grouped aggregations which have no aggregate expressions, as a soft limit,
598 /// emitting groups once the limit is reached, before all rows in the group are read.
599 pub enable_distinct_aggregation_soft_limit: bool, default = true
600
601 /// When set to true, the physical plan optimizer will try to add round robin
602 /// repartitioning to increase parallelism to leverage more CPU cores
603 pub enable_round_robin_repartition: bool, default = true
604
605 /// When set to true, the optimizer will attempt to perform limit operations
606 /// during aggregations, if possible
607 pub enable_topk_aggregation: bool, default = true
608
609 /// When set to true, the optimizer will insert filters before a join between
610 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
611 /// filter can add additional overhead when the file format does not fully support
612 /// predicate push down.
613 pub filter_null_join_keys: bool, default = false
614
615 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
616 /// in parallel using the provided `target_partitions` level
617 pub repartition_aggregations: bool, default = true
618
619 /// Minimum total files size in bytes to perform file scan repartitioning.
620 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
621
622 /// Should DataFusion repartition data using the join keys to execute joins in parallel
623 /// using the provided `target_partitions` level
624 pub repartition_joins: bool, default = true
625
626 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
627 /// its inputs do not have any ordering or filtering If the flag is not enabled,
628 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
629 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
630 /// RightAnti, and RightSemi - being produced only at the end of the execution.
631 /// This is not typical in stream processing. Additionally, without proper design for
632 /// long runner execution, all types of joins may encounter out-of-memory errors.
633 pub allow_symmetric_joins_without_pruning: bool, default = true
634
635 /// When set to `true`, file groups will be repartitioned to achieve maximum parallelism.
636 /// Currently Parquet and CSV formats are supported.
637 ///
638 /// If set to `true`, all files will be repartitioned evenly (i.e., a single large file
639 /// might be partitioned into smaller chunks) for parallel scanning.
640 /// If set to `false`, different files will be read in parallel, but repartitioning won't
641 /// happen within a single file.
642 pub repartition_file_scans: bool, default = true
643
644 /// Should DataFusion repartition data using the partitions keys to execute window
645 /// functions in parallel using the provided `target_partitions` level
646 pub repartition_windows: bool, default = true
647
648 /// Should DataFusion execute sorts in a per-partition fashion and merge
649 /// afterwards instead of coalescing first and sorting globally.
650 /// With this flag is enabled, plans in the form below
651 ///
652 /// ```text
653 /// "SortExec: [a@0 ASC]",
654 /// " CoalescePartitionsExec",
655 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
656 /// ```
657 /// would turn into the plan below which performs better in multithreaded environments
658 ///
659 /// ```text
660 /// "SortPreservingMergeExec: [a@0 ASC]",
661 /// " SortExec: [a@0 ASC]",
662 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
663 /// ```
664 pub repartition_sorts: bool, default = true
665
666 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
667 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
668 /// using `SortPreservingMergeExec`)
669 ///
670 /// When false, DataFusion will maximize plan parallelism using
671 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
672 pub prefer_existing_sort: bool, default = false
673
674 /// When set to true, the logical plan optimizer will produce warning
675 /// messages if any optimization rules produce errors and then proceed to the next
676 /// rule. When set to false, any rules that produce errors will cause the query to fail
677 pub skip_failed_rules: bool, default = false
678
679 /// Number of times that the optimizer will attempt to optimize the plan
680 pub max_passes: usize, default = 3
681
682 /// When set to true, the physical plan optimizer will run a top down
683 /// process to reorder the join keys
684 pub top_down_join_key_reordering: bool, default = true
685
686 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
687 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
688 pub prefer_hash_join: bool, default = true
689
690 /// The maximum estimated size in bytes for one input side of a HashJoin
691 /// will be collected into a single partition
692 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
693
694 /// The maximum estimated size in rows for one input side of a HashJoin
695 /// will be collected into a single partition
696 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
697
698 /// The default filter selectivity used by Filter Statistics
699 /// when an exact selectivity cannot be determined. Valid values are
700 /// between 0 (no selectivity) and 100 (all rows are selected).
701 pub default_filter_selectivity: u8, default = 20
702
703 /// When set to true, the optimizer will not attempt to convert Union to Interleave
704 pub prefer_existing_union: bool, default = false
705
706 /// When set to true, if the returned type is a view type
707 /// then the output will be coerced to a non-view.
708 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
709 pub expand_views_at_output: bool, default = false
710 }
711}
712
713config_namespace! {
714 /// Options controlling explain output
715 ///
716 /// See also: [`SessionConfig`]
717 ///
718 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
719 pub struct ExplainOptions {
720 /// When set to true, the explain statement will only print logical plans
721 pub logical_plan_only: bool, default = false
722
723 /// When set to true, the explain statement will only print physical plans
724 pub physical_plan_only: bool, default = false
725
726 /// When set to true, the explain statement will print operator statistics
727 /// for physical plans
728 pub show_statistics: bool, default = false
729
730 /// When set to true, the explain statement will print the partition sizes
731 pub show_sizes: bool, default = true
732
733 /// When set to true, the explain statement will print schema information
734 pub show_schema: bool, default = false
735
736 /// Display format of explain. Default is "indent".
737 /// When set to "tree", it will print the plan in a tree-rendered format.
738 pub format: String, default = "indent".to_string()
739 }
740}
741
742/// A key value pair, with a corresponding description
743#[derive(Debug)]
744pub struct ConfigEntry {
745 /// A unique string to identify this config value
746 pub key: String,
747
748 /// The value if any
749 pub value: Option<String>,
750
751 /// A description of this configuration entry
752 pub description: &'static str,
753}
754
755/// Configuration options struct, able to store both built-in configuration and custom options
756#[derive(Debug, Clone, Default)]
757#[non_exhaustive]
758pub struct ConfigOptions {
759 /// Catalog options
760 pub catalog: CatalogOptions,
761 /// Execution options
762 pub execution: ExecutionOptions,
763 /// Optimizer options
764 pub optimizer: OptimizerOptions,
765 /// SQL parser options
766 pub sql_parser: SqlParserOptions,
767 /// Explain options
768 pub explain: ExplainOptions,
769 /// Optional extensions registered using [`Extensions::insert`]
770 pub extensions: Extensions,
771}
772
773impl ConfigField for ConfigOptions {
774 fn set(&mut self, key: &str, value: &str) -> Result<()> {
775 // Extensions are handled in the public `ConfigOptions::set`
776 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
777 match key {
778 "catalog" => self.catalog.set(rem, value),
779 "execution" => self.execution.set(rem, value),
780 "optimizer" => self.optimizer.set(rem, value),
781 "explain" => self.explain.set(rem, value),
782 "sql_parser" => self.sql_parser.set(rem, value),
783 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
784 }
785 }
786
787 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
788 self.catalog.visit(v, "datafusion.catalog", "");
789 self.execution.visit(v, "datafusion.execution", "");
790 self.optimizer.visit(v, "datafusion.optimizer", "");
791 self.explain.visit(v, "datafusion.explain", "");
792 self.sql_parser.visit(v, "datafusion.sql_parser", "");
793 }
794}
795
796impl ConfigOptions {
797 /// Creates a new [`ConfigOptions`] with default values
798 pub fn new() -> Self {
799 Self::default()
800 }
801
802 /// Set extensions to provided value
803 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
804 self.extensions = extensions;
805 self
806 }
807
808 /// Set a configuration option
809 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
810 let Some((prefix, key)) = key.split_once('.') else {
811 return _config_err!("could not find config namespace for key \"{key}\"");
812 };
813
814 if prefix == "datafusion" {
815 return ConfigField::set(self, key, value);
816 }
817
818 let Some(e) = self.extensions.0.get_mut(prefix) else {
819 return _config_err!("Could not find config namespace \"{prefix}\"");
820 };
821 e.0.set(key, value)
822 }
823
824 /// Create new ConfigOptions struct, taking values from
825 /// environment variables where possible.
826 ///
827 /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
828 /// control `datafusion.execution.batch_size`.
829 pub fn from_env() -> Result<Self> {
830 struct Visitor(Vec<String>);
831
832 impl Visit for Visitor {
833 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
834 self.0.push(key.to_string())
835 }
836
837 fn none(&mut self, key: &str, _: &'static str) {
838 self.0.push(key.to_string())
839 }
840 }
841
842 // Extract the names of all fields and then look up the corresponding
843 // environment variables. This isn't hugely efficient but avoids
844 // ambiguity between `a.b` and `a_b` which would both correspond
845 // to an environment variable of `A_B`
846
847 let mut keys = Visitor(vec![]);
848 let mut ret = Self::default();
849 ret.visit(&mut keys, "datafusion", "");
850
851 for key in keys.0 {
852 let env = key.to_uppercase().replace('.', "_");
853 if let Some(var) = std::env::var_os(env) {
854 ret.set(&key, var.to_string_lossy().as_ref())?;
855 }
856 }
857
858 Ok(ret)
859 }
860
861 /// Create new ConfigOptions struct, taking values from a string hash map.
862 ///
863 /// Only the built-in configurations will be extracted from the hash map
864 /// and other key value pairs will be ignored.
865 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
866 struct Visitor(Vec<String>);
867
868 impl Visit for Visitor {
869 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
870 self.0.push(key.to_string())
871 }
872
873 fn none(&mut self, key: &str, _: &'static str) {
874 self.0.push(key.to_string())
875 }
876 }
877
878 let mut keys = Visitor(vec![]);
879 let mut ret = Self::default();
880 ret.visit(&mut keys, "datafusion", "");
881
882 for key in keys.0 {
883 if let Some(var) = settings.get(&key) {
884 ret.set(&key, var)?;
885 }
886 }
887
888 Ok(ret)
889 }
890
891 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
892 pub fn entries(&self) -> Vec<ConfigEntry> {
893 struct Visitor(Vec<ConfigEntry>);
894
895 impl Visit for Visitor {
896 fn some<V: Display>(
897 &mut self,
898 key: &str,
899 value: V,
900 description: &'static str,
901 ) {
902 self.0.push(ConfigEntry {
903 key: key.to_string(),
904 value: Some(value.to_string()),
905 description,
906 })
907 }
908
909 fn none(&mut self, key: &str, description: &'static str) {
910 self.0.push(ConfigEntry {
911 key: key.to_string(),
912 value: None,
913 description,
914 })
915 }
916 }
917
918 let mut v = Visitor(vec![]);
919 self.visit(&mut v, "datafusion", "");
920
921 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
922 v.0
923 }
924
925 /// Generate documentation that can be included in the user guide
926 pub fn generate_config_markdown() -> String {
927 use std::fmt::Write as _;
928
929 let mut s = Self::default();
930
931 // Normalize for display
932 s.execution.target_partitions = 0;
933 s.execution.planning_concurrency = 0;
934
935 let mut docs = "| key | default | description |\n".to_string();
936 docs += "|-----|---------|-------------|\n";
937 let mut entries = s.entries();
938 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
939
940 for entry in s.entries() {
941 let _ = writeln!(
942 &mut docs,
943 "| {} | {} | {} |",
944 entry.key,
945 entry.value.as_deref().unwrap_or("NULL"),
946 entry.description
947 );
948 }
949 docs
950 }
951}
952
953/// [`ConfigExtension`] provides a mechanism to store third-party configuration
954/// within DataFusion [`ConfigOptions`]
955///
956/// This mechanism can be used to pass configuration to user defined functions
957/// or optimizer passes
958///
959/// # Example
960/// ```
961/// use datafusion_common::{
962/// config::ConfigExtension, extensions_options,
963/// config::ConfigOptions,
964/// };
965/// // Define a new configuration struct using the `extensions_options` macro
966/// extensions_options! {
967/// /// My own config options.
968/// pub struct MyConfig {
969/// /// Should "foo" be replaced by "bar"?
970/// pub foo_to_bar: bool, default = true
971///
972/// /// How many "baz" should be created?
973/// pub baz_count: usize, default = 1337
974/// }
975/// }
976///
977/// impl ConfigExtension for MyConfig {
978/// const PREFIX: &'static str = "my_config";
979/// }
980///
981/// // set up config struct and register extension
982/// let mut config = ConfigOptions::default();
983/// config.extensions.insert(MyConfig::default());
984///
985/// // overwrite config default
986/// config.set("my_config.baz_count", "42").unwrap();
987///
988/// // check config state
989/// let my_config = config.extensions.get::<MyConfig>().unwrap();
990/// assert!(my_config.foo_to_bar,);
991/// assert_eq!(my_config.baz_count, 42,);
992/// ```
993///
994/// # Note:
995/// Unfortunately associated constants are not currently object-safe, and so this
996/// extends the object-safe [`ExtensionOptions`]
997pub trait ConfigExtension: ExtensionOptions {
998 /// Configuration namespace prefix to use
999 ///
1000 /// All values under this will be prefixed with `$PREFIX + "."`
1001 const PREFIX: &'static str;
1002}
1003
1004/// An object-safe API for storing arbitrary configuration.
1005///
1006/// See [`ConfigExtension`] for user defined configuration
1007pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1008 /// Return `self` as [`Any`]
1009 ///
1010 /// This is needed until trait upcasting is stabilized
1011 fn as_any(&self) -> &dyn Any;
1012
1013 /// Return `self` as [`Any`]
1014 ///
1015 /// This is needed until trait upcasting is stabilized
1016 fn as_any_mut(&mut self) -> &mut dyn Any;
1017
1018 /// Return a deep clone of this [`ExtensionOptions`]
1019 ///
1020 /// It is important this does not share mutable state to avoid consistency issues
1021 /// with configuration changing whilst queries are executing
1022 fn cloned(&self) -> Box<dyn ExtensionOptions>;
1023
1024 /// Set the given `key`, `value` pair
1025 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1026
1027 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1028 fn entries(&self) -> Vec<ConfigEntry>;
1029}
1030
1031/// A type-safe container for [`ConfigExtension`]
1032#[derive(Debug, Default, Clone)]
1033pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1034
1035impl Extensions {
1036 /// Create a new, empty [`Extensions`]
1037 pub fn new() -> Self {
1038 Self(BTreeMap::new())
1039 }
1040
1041 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1042 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1043 assert_ne!(T::PREFIX, "datafusion");
1044 let e = ExtensionBox(Box::new(extension));
1045 self.0.insert(T::PREFIX, e);
1046 }
1047
1048 /// Retrieves the extension of the given type if any
1049 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1050 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1051 }
1052
1053 /// Retrieves the extension of the given type if any
1054 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1055 let e = self.0.get_mut(T::PREFIX)?;
1056 e.0.as_any_mut().downcast_mut()
1057 }
1058}
1059
1060#[derive(Debug)]
1061struct ExtensionBox(Box<dyn ExtensionOptions>);
1062
1063impl Clone for ExtensionBox {
1064 fn clone(&self) -> Self {
1065 Self(self.0.cloned())
1066 }
1067}
1068
1069/// A trait implemented by `config_namespace` and for field types that provides
1070/// the ability to walk and mutate the configuration tree
1071pub trait ConfigField {
1072 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1073
1074 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1075}
1076
1077impl<F: ConfigField + Default> ConfigField for Option<F> {
1078 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1079 match self {
1080 Some(s) => s.visit(v, key, description),
1081 None => v.none(key, description),
1082 }
1083 }
1084
1085 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1086 self.get_or_insert_with(Default::default).set(key, value)
1087 }
1088}
1089
1090fn default_transform<T>(input: &str) -> Result<T>
1091where
1092 T: FromStr,
1093 <T as FromStr>::Err: Sync + Send + Error + 'static,
1094{
1095 input.parse().map_err(|e| {
1096 DataFusionError::Context(
1097 format!(
1098 "Error parsing '{}' as {}",
1099 input,
1100 std::any::type_name::<T>()
1101 ),
1102 Box::new(DataFusionError::External(Box::new(e))),
1103 )
1104 })
1105}
1106
1107#[macro_export]
1108macro_rules! config_field {
1109 ($t:ty) => {
1110 config_field!($t, value => default_transform(value)?);
1111 };
1112
1113 ($t:ty, $arg:ident => $transform:expr) => {
1114 impl ConfigField for $t {
1115 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1116 v.some(key, self, description)
1117 }
1118
1119 fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
1120 *self = $transform;
1121 Ok(())
1122 }
1123 }
1124 };
1125}
1126
1127config_field!(String);
1128config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
1129config_field!(usize);
1130config_field!(f64);
1131config_field!(u64);
1132
1133impl ConfigField for u8 {
1134 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1135 v.some(key, self, description)
1136 }
1137
1138 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1139 if value.is_empty() {
1140 return Err(DataFusionError::Configuration(format!(
1141 "Input string for {} key is empty",
1142 key
1143 )));
1144 }
1145 // Check if the string is a valid number
1146 if let Ok(num) = value.parse::<u8>() {
1147 // TODO: Let's decide how we treat the numerical strings.
1148 *self = num;
1149 } else {
1150 let bytes = value.as_bytes();
1151 // Check if the first character is ASCII (single byte)
1152 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1153 return Err(DataFusionError::Configuration(format!(
1154 "Error parsing {} as u8. Non-ASCII string provided",
1155 value
1156 )));
1157 }
1158 *self = bytes[0];
1159 }
1160 Ok(())
1161 }
1162}
1163
1164impl ConfigField for CompressionTypeVariant {
1165 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1166 v.some(key, self, description)
1167 }
1168
1169 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1170 *self = CompressionTypeVariant::from_str(value)?;
1171 Ok(())
1172 }
1173}
1174
1175/// An implementation trait used to recursively walk configuration
1176pub trait Visit {
1177 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1178
1179 fn none(&mut self, key: &str, description: &'static str);
1180}
1181
1182/// Convenience macro to create [`ExtensionsOptions`].
1183///
1184/// The created structure implements the following traits:
1185///
1186/// - [`Clone`]
1187/// - [`Debug`]
1188/// - [`Default`]
1189/// - [`ExtensionOptions`]
1190///
1191/// # Usage
1192/// The syntax is:
1193///
1194/// ```text
1195/// extensions_options! {
1196/// /// Struct docs (optional).
1197/// [<vis>] struct <StructName> {
1198/// /// Field docs (optional)
1199/// [<vis>] <field_name>: <field_type>, default = <default_value>
1200///
1201/// ... more fields
1202/// }
1203/// }
1204/// ```
1205///
1206/// The placeholders are:
1207/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1208/// - `<StructName>`: Struct name like `MyStruct`.
1209/// - `<field_name>`: Field name like `my_field`.
1210/// - `<field_type>`: Field type like `u8`.
1211/// - `<default_value>`: Default value matching the field type like `42`.
1212///
1213/// # Example
1214/// See also a full example on the [`ConfigExtension`] documentation
1215///
1216/// ```
1217/// use datafusion_common::extensions_options;
1218///
1219/// extensions_options! {
1220/// /// My own config options.
1221/// pub struct MyConfig {
1222/// /// Should "foo" be replaced by "bar"?
1223/// pub foo_to_bar: bool, default = true
1224///
1225/// /// How many "baz" should be created?
1226/// pub baz_count: usize, default = 1337
1227/// }
1228/// }
1229/// ```
1230///
1231///
1232/// [`Debug`]: std::fmt::Debug
1233/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1234#[macro_export]
1235macro_rules! extensions_options {
1236 (
1237 $(#[doc = $struct_d:tt])*
1238 $vis:vis struct $struct_name:ident {
1239 $(
1240 $(#[doc = $d:tt])*
1241 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1242 )*$(,)*
1243 }
1244 ) => {
1245 $(#[doc = $struct_d])*
1246 #[derive(Debug, Clone)]
1247 #[non_exhaustive]
1248 $vis struct $struct_name{
1249 $(
1250 $(#[doc = $d])*
1251 $field_vis $field_name : $field_type,
1252 )*
1253 }
1254
1255 impl Default for $struct_name {
1256 fn default() -> Self {
1257 Self {
1258 $($field_name: $default),*
1259 }
1260 }
1261 }
1262
1263 impl $crate::config::ExtensionOptions for $struct_name {
1264 fn as_any(&self) -> &dyn ::std::any::Any {
1265 self
1266 }
1267
1268 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1269 self
1270 }
1271
1272 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1273 Box::new(self.clone())
1274 }
1275
1276 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1277 $crate::config::ConfigField::set(self, key, value)
1278 }
1279
1280 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1281 struct Visitor(Vec<$crate::config::ConfigEntry>);
1282
1283 impl $crate::config::Visit for Visitor {
1284 fn some<V: std::fmt::Display>(
1285 &mut self,
1286 key: &str,
1287 value: V,
1288 description: &'static str,
1289 ) {
1290 self.0.push($crate::config::ConfigEntry {
1291 key: key.to_string(),
1292 value: Some(value.to_string()),
1293 description,
1294 })
1295 }
1296
1297 fn none(&mut self, key: &str, description: &'static str) {
1298 self.0.push($crate::config::ConfigEntry {
1299 key: key.to_string(),
1300 value: None,
1301 description,
1302 })
1303 }
1304 }
1305
1306 let mut v = Visitor(vec![]);
1307 // The prefix is not used for extensions.
1308 // The description is generated in ConfigField::visit.
1309 // We can just pass empty strings here.
1310 $crate::config::ConfigField::visit(self, &mut v, "", "");
1311 v.0
1312 }
1313 }
1314
1315 impl $crate::config::ConfigField for $struct_name {
1316 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1317 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1318 match key {
1319 $(
1320 stringify!($field_name) => {
1321 // Safely apply deprecated attribute if present
1322 // $(#[allow(deprecated)])?
1323 {
1324 #[allow(deprecated)]
1325 self.$field_name.set(rem, value.as_ref())
1326 }
1327 },
1328 )*
1329 _ => return $crate::error::_config_err!(
1330 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1331 )
1332 }
1333 }
1334
1335 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1336 $(
1337 let key = stringify!($field_name).to_string();
1338 let desc = concat!($($d),*).trim();
1339 #[allow(deprecated)]
1340 self.$field_name.visit(v, key.as_str(), desc);
1341 )*
1342 }
1343 }
1344 }
1345}
1346
1347/// These file types have special built in behavior for configuration.
1348/// Use TableOptions::Extensions for configuring other file types.
1349#[derive(Debug, Clone)]
1350pub enum ConfigFileType {
1351 CSV,
1352 #[cfg(feature = "parquet")]
1353 PARQUET,
1354 JSON,
1355}
1356
1357/// Represents the configuration options available for handling different table formats within a data processing application.
1358/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1359/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1360#[derive(Debug, Clone, Default)]
1361pub struct TableOptions {
1362 /// Configuration options for CSV file handling. This includes settings like the delimiter,
1363 /// quote character, and whether the first row is considered as headers.
1364 pub csv: CsvOptions,
1365
1366 /// Configuration options for Parquet file handling. This includes settings for compression,
1367 /// encoding, and other Parquet-specific file characteristics.
1368 pub parquet: TableParquetOptions,
1369
1370 /// Configuration options for JSON file handling.
1371 pub json: JsonOptions,
1372
1373 /// The current file format that the table operations should assume. This option allows
1374 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1375 pub current_format: Option<ConfigFileType>,
1376
1377 /// Optional extensions that can be used to extend or customize the behavior of the table
1378 /// options. Extensions can be registered using `Extensions::insert` and might include
1379 /// custom file handling logic, additional configuration parameters, or other enhancements.
1380 pub extensions: Extensions,
1381}
1382
1383impl ConfigField for TableOptions {
1384 /// Visits configuration settings for the current file format, or all formats if none is selected.
1385 ///
1386 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1387 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1388 /// it visits all available format settings.
1389 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1390 if let Some(file_type) = &self.current_format {
1391 match file_type {
1392 #[cfg(feature = "parquet")]
1393 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1394 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1395 ConfigFileType::JSON => self.json.visit(v, "format", ""),
1396 }
1397 } else {
1398 self.csv.visit(v, "csv", "");
1399 self.parquet.visit(v, "parquet", "");
1400 self.json.visit(v, "json", "");
1401 }
1402 }
1403
1404 /// Sets a configuration value for a specific key within `TableOptions`.
1405 ///
1406 /// This method delegates setting configuration values to the specific file format configurations,
1407 /// based on the current format selected. If no format is selected, it returns an error.
1408 ///
1409 /// # Parameters
1410 ///
1411 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1412 /// for CSV format.
1413 /// * `value`: The value to set for the specified configuration key.
1414 ///
1415 /// # Returns
1416 ///
1417 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1418 /// or if setting the configuration value fails for the specific format.
1419 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1420 // Extensions are handled in the public `ConfigOptions::set`
1421 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1422 match key {
1423 "format" => {
1424 let Some(format) = &self.current_format else {
1425 return _config_err!("Specify a format for TableOptions");
1426 };
1427 match format {
1428 #[cfg(feature = "parquet")]
1429 ConfigFileType::PARQUET => self.parquet.set(rem, value),
1430 ConfigFileType::CSV => self.csv.set(rem, value),
1431 ConfigFileType::JSON => self.json.set(rem, value),
1432 }
1433 }
1434 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1435 }
1436 }
1437}
1438
1439impl TableOptions {
1440 /// Constructs a new instance of `TableOptions` with default settings.
1441 ///
1442 /// # Returns
1443 ///
1444 /// A new `TableOptions` instance with default configuration values.
1445 pub fn new() -> Self {
1446 Self::default()
1447 }
1448
1449 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1450 ///
1451 /// # Parameters
1452 ///
1453 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1454 ///
1455 /// # Returns
1456 ///
1457 /// A new `TableOptions` instance with settings applied from the session config.
1458 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1459 let initial = TableOptions::default();
1460 initial.combine_with_session_config(config)
1461 }
1462
1463 /// Updates the current `TableOptions` with settings from a given session config.
1464 ///
1465 /// # Parameters
1466 ///
1467 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1468 ///
1469 /// # Returns
1470 ///
1471 /// A new `TableOptions` instance with updated settings from the session config.
1472 #[must_use = "this method returns a new instance"]
1473 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1474 let mut clone = self.clone();
1475 clone.parquet.global = config.execution.parquet.clone();
1476 clone
1477 }
1478
1479 /// Sets the file format for the table.
1480 ///
1481 /// # Parameters
1482 ///
1483 /// * `format`: The file format to use (e.g., CSV, Parquet).
1484 pub fn set_config_format(&mut self, format: ConfigFileType) {
1485 self.current_format = Some(format);
1486 }
1487
1488 /// Sets the extensions for this `TableOptions` instance.
1489 ///
1490 /// # Parameters
1491 ///
1492 /// * `extensions`: The `Extensions` instance to set.
1493 ///
1494 /// # Returns
1495 ///
1496 /// A new `TableOptions` instance with the specified extensions applied.
1497 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1498 self.extensions = extensions;
1499 self
1500 }
1501
1502 /// Sets a specific configuration option.
1503 ///
1504 /// # Parameters
1505 ///
1506 /// * `key`: The configuration key (e.g., "format.delimiter").
1507 /// * `value`: The value to set for the specified key.
1508 ///
1509 /// # Returns
1510 ///
1511 /// A result indicating success or failure in setting the configuration option.
1512 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1513 let Some((prefix, _)) = key.split_once('.') else {
1514 return _config_err!("could not find config namespace for key \"{key}\"");
1515 };
1516
1517 if prefix == "format" {
1518 return ConfigField::set(self, key, value);
1519 }
1520
1521 if prefix == "execution" {
1522 return Ok(());
1523 }
1524
1525 let Some(e) = self.extensions.0.get_mut(prefix) else {
1526 return _config_err!("Could not find config namespace \"{prefix}\"");
1527 };
1528 e.0.set(key, value)
1529 }
1530
1531 /// Initializes a new `TableOptions` from a hash map of string settings.
1532 ///
1533 /// # Parameters
1534 ///
1535 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1536 ///
1537 /// # Returns
1538 ///
1539 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1540 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1541 let mut ret = Self::default();
1542 for (k, v) in settings {
1543 ret.set(k, v)?;
1544 }
1545
1546 Ok(ret)
1547 }
1548
1549 /// Modifies the current `TableOptions` instance with settings from a hash map.
1550 ///
1551 /// # Parameters
1552 ///
1553 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1554 ///
1555 /// # Returns
1556 ///
1557 /// A result indicating success or failure in applying the settings.
1558 pub fn alter_with_string_hash_map(
1559 &mut self,
1560 settings: &HashMap<String, String>,
1561 ) -> Result<()> {
1562 for (k, v) in settings {
1563 self.set(k, v)?;
1564 }
1565 Ok(())
1566 }
1567
1568 /// Retrieves all configuration entries from this `TableOptions`.
1569 ///
1570 /// # Returns
1571 ///
1572 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1573 pub fn entries(&self) -> Vec<ConfigEntry> {
1574 struct Visitor(Vec<ConfigEntry>);
1575
1576 impl Visit for Visitor {
1577 fn some<V: Display>(
1578 &mut self,
1579 key: &str,
1580 value: V,
1581 description: &'static str,
1582 ) {
1583 self.0.push(ConfigEntry {
1584 key: key.to_string(),
1585 value: Some(value.to_string()),
1586 description,
1587 })
1588 }
1589
1590 fn none(&mut self, key: &str, description: &'static str) {
1591 self.0.push(ConfigEntry {
1592 key: key.to_string(),
1593 value: None,
1594 description,
1595 })
1596 }
1597 }
1598
1599 let mut v = Visitor(vec![]);
1600 self.visit(&mut v, "format", "");
1601
1602 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1603 v.0
1604 }
1605}
1606
1607/// Options that control how Parquet files are read, including global options
1608/// that apply to all columns and optional column-specific overrides
1609///
1610/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1611/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1612/// (e.g. sorting_columns).
1613#[derive(Clone, Default, Debug, PartialEq)]
1614pub struct TableParquetOptions {
1615 /// Global Parquet options that propagates to all columns.
1616 pub global: ParquetOptions,
1617 /// Column specific options. Default usage is parquet.XX::column.
1618 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1619 /// Additional file-level metadata to include. Inserted into the key_value_metadata
1620 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1621 ///
1622 /// Multiple entries are permitted
1623 /// ```sql
1624 /// OPTIONS (
1625 /// 'format.metadata::key1' '',
1626 /// 'format.metadata::key2' 'value',
1627 /// 'format.metadata::key3' 'value has spaces',
1628 /// 'format.metadata::key4' 'value has special chars :: :',
1629 /// 'format.metadata::key_dupe' 'original will be overwritten',
1630 /// 'format.metadata::key_dupe' 'final'
1631 /// )
1632 /// ```
1633 pub key_value_metadata: HashMap<String, Option<String>>,
1634}
1635
1636impl TableParquetOptions {
1637 /// Return new default TableParquetOptions
1638 pub fn new() -> Self {
1639 Self::default()
1640 }
1641
1642 /// Set whether the encoding of the arrow metadata should occur
1643 /// during the writing of parquet.
1644 ///
1645 /// Default is to encode the arrow schema in the file kv_metadata.
1646 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1647 Self {
1648 global: ParquetOptions {
1649 skip_arrow_metadata: skip,
1650 ..self.global
1651 },
1652 ..self
1653 }
1654 }
1655}
1656
1657impl ConfigField for TableParquetOptions {
1658 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1659 self.global.visit(v, key_prefix, description);
1660 self.column_specific_options
1661 .visit(v, key_prefix, description)
1662 }
1663
1664 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1665 // Determine if the key is a global, metadata, or column-specific setting
1666 if key.starts_with("metadata::") {
1667 let k = match key.split("::").collect::<Vec<_>>()[..] {
1668 [_meta] | [_meta, ""] => {
1669 return _config_err!(
1670 "Invalid metadata key provided, missing key in metadata::<key>"
1671 )
1672 }
1673 [_meta, k] => k.into(),
1674 _ => {
1675 return _config_err!(
1676 "Invalid metadata key provided, found too many '::' in \"{key}\""
1677 )
1678 }
1679 };
1680 self.key_value_metadata.insert(k, Some(value.into()));
1681 Ok(())
1682 } else if key.contains("::") {
1683 self.column_specific_options.set(key, value)
1684 } else {
1685 self.global.set(key, value)
1686 }
1687 }
1688}
1689
1690macro_rules! config_namespace_with_hashmap {
1691 (
1692 $(#[doc = $struct_d:tt])*
1693 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
1694 $vis:vis struct $struct_name:ident {
1695 $(
1696 $(#[doc = $d:tt])*
1697 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1698 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1699 )*$(,)*
1700 }
1701 ) => {
1702
1703 $(#[doc = $struct_d])*
1704 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
1705 #[derive(Debug, Clone, PartialEq)]
1706 $vis struct $struct_name{
1707 $(
1708 $(#[doc = $d])*
1709 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1710 $field_vis $field_name : $field_type,
1711 )*
1712 }
1713
1714 impl ConfigField for $struct_name {
1715 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1716 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1717 match key {
1718 $(
1719 stringify!($field_name) => {
1720 // Handle deprecated fields
1721 #[allow(deprecated)] // Allow deprecated fields
1722 $(let value = $transform(value);)?
1723 self.$field_name.set(rem, value.as_ref())
1724 },
1725 )*
1726 _ => _config_err!(
1727 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1728 )
1729 }
1730 }
1731
1732 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1733 $(
1734 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1735 let desc = concat!($($d),*).trim();
1736 // Handle deprecated fields
1737 #[allow(deprecated)]
1738 self.$field_name.visit(v, key.as_str(), desc);
1739 )*
1740 }
1741 }
1742
1743 impl Default for $struct_name {
1744 fn default() -> Self {
1745 #[allow(deprecated)]
1746 Self {
1747 $($field_name: $default),*
1748 }
1749 }
1750 }
1751
1752 impl ConfigField for HashMap<String,$struct_name> {
1753 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1754 let parts: Vec<&str> = key.splitn(2, "::").collect();
1755 match parts.as_slice() {
1756 [inner_key, hashmap_key] => {
1757 // Get or create the struct for the specified key
1758 let inner_value = self
1759 .entry((*hashmap_key).to_owned())
1760 .or_insert_with($struct_name::default);
1761
1762 inner_value.set(inner_key, value)
1763 }
1764 _ => _config_err!("Unrecognized key '{key}'."),
1765 }
1766 }
1767
1768 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1769 for (column_name, col_options) in self {
1770 $(
1771 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1772 let desc = concat!($($d),*).trim();
1773 #[allow(deprecated)]
1774 col_options.$field_name.visit(v, key.as_str(), desc);
1775 )*
1776 }
1777 }
1778 }
1779 }
1780}
1781
1782config_namespace_with_hashmap! {
1783 /// Options controlling parquet format for individual columns.
1784 ///
1785 /// See [`ParquetOptions`] for more details
1786 pub struct ParquetColumnOptions {
1787 /// Sets if bloom filter is enabled for the column path.
1788 pub bloom_filter_enabled: Option<bool>, default = None
1789
1790 /// Sets encoding for the column path.
1791 /// Valid values are: plain, plain_dictionary, rle,
1792 /// bit_packed, delta_binary_packed, delta_length_byte_array,
1793 /// delta_byte_array, rle_dictionary, and byte_stream_split.
1794 /// These values are not case-sensitive. If NULL, uses
1795 /// default parquet options
1796 pub encoding: Option<String>, default = None
1797
1798 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
1799 /// default parquet options
1800 pub dictionary_enabled: Option<bool>, default = None
1801
1802 /// Sets default parquet compression codec for the column path.
1803 /// Valid values are: uncompressed, snappy, gzip(level),
1804 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
1805 /// These values are not case-sensitive. If NULL, uses
1806 /// default parquet options
1807 pub compression: Option<String>, transform = str::to_lowercase, default = None
1808
1809 /// Sets if statistics are enabled for the column
1810 /// Valid values are: "none", "chunk", and "page"
1811 /// These values are not case sensitive. If NULL, uses
1812 /// default parquet options
1813 pub statistics_enabled: Option<String>, default = None
1814
1815 /// Sets bloom filter false positive probability for the column path. If NULL, uses
1816 /// default parquet options
1817 pub bloom_filter_fpp: Option<f64>, default = None
1818
1819 /// Sets bloom filter number of distinct values. If NULL, uses
1820 /// default parquet options
1821 pub bloom_filter_ndv: Option<u64>, default = None
1822
1823 /// Sets max statistics size for the column path. If NULL, uses
1824 /// default parquet options
1825 /// max_statistics_size is deprecated, currently it is not being used
1826 // TODO: remove once deprecated
1827 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
1828 pub max_statistics_size: Option<usize>, default = None
1829 }
1830}
1831
1832config_namespace! {
1833 /// Options controlling CSV format
1834 pub struct CsvOptions {
1835 /// Specifies whether there is a CSV header (i.e. the first line
1836 /// consists of is column names). The value `None` indicates that
1837 /// the configuration should be consulted.
1838 pub has_header: Option<bool>, default = None
1839 pub delimiter: u8, default = b','
1840 pub quote: u8, default = b'"'
1841 pub terminator: Option<u8>, default = None
1842 pub escape: Option<u8>, default = None
1843 pub double_quote: Option<bool>, default = None
1844 /// Specifies whether newlines in (quoted) values are supported.
1845 ///
1846 /// Parsing newlines in quoted values may be affected by execution behaviour such as
1847 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1848 /// parsed successfully, which may reduce performance.
1849 ///
1850 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1851 pub newlines_in_values: Option<bool>, default = None
1852 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1853 pub schema_infer_max_rec: Option<usize>, default = None
1854 pub date_format: Option<String>, default = None
1855 pub datetime_format: Option<String>, default = None
1856 pub timestamp_format: Option<String>, default = None
1857 pub timestamp_tz_format: Option<String>, default = None
1858 pub time_format: Option<String>, default = None
1859 // The output format for Nulls in the CSV writer.
1860 pub null_value: Option<String>, default = None
1861 // The input regex for Nulls when loading CSVs.
1862 pub null_regex: Option<String>, default = None
1863 pub comment: Option<u8>, default = None
1864 }
1865}
1866
1867impl CsvOptions {
1868 /// Set a limit in terms of records to scan to infer the schema
1869 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1870 pub fn with_compression(
1871 mut self,
1872 compression_type_variant: CompressionTypeVariant,
1873 ) -> Self {
1874 self.compression = compression_type_variant;
1875 self
1876 }
1877
1878 /// Set a limit in terms of records to scan to infer the schema
1879 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1880 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
1881 self.schema_infer_max_rec = Some(max_rec);
1882 self
1883 }
1884
1885 /// Set true to indicate that the first line is a header.
1886 /// - default to true
1887 pub fn with_has_header(mut self, has_header: bool) -> Self {
1888 self.has_header = Some(has_header);
1889 self
1890 }
1891
1892 /// Returns true if the first line is a header. If format options does not
1893 /// specify whether there is a header, returns `None` (indicating that the
1894 /// configuration should be consulted).
1895 pub fn has_header(&self) -> Option<bool> {
1896 self.has_header
1897 }
1898
1899 /// The character separating values within a row.
1900 /// - default to ','
1901 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
1902 self.delimiter = delimiter;
1903 self
1904 }
1905
1906 /// The quote character in a row.
1907 /// - default to '"'
1908 pub fn with_quote(mut self, quote: u8) -> Self {
1909 self.quote = quote;
1910 self
1911 }
1912
1913 /// The character that terminates a row.
1914 /// - default to None (CRLF)
1915 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
1916 self.terminator = terminator;
1917 self
1918 }
1919
1920 /// The escape character in a row.
1921 /// - default is None
1922 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
1923 self.escape = escape;
1924 self
1925 }
1926
1927 /// Set true to indicate that the CSV quotes should be doubled.
1928 /// - default to true
1929 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
1930 self.double_quote = Some(double_quote);
1931 self
1932 }
1933
1934 /// Specifies whether newlines in (quoted) values are supported.
1935 ///
1936 /// Parsing newlines in quoted values may be affected by execution behaviour such as
1937 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1938 /// parsed successfully, which may reduce performance.
1939 ///
1940 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1941 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
1942 self.newlines_in_values = Some(newlines_in_values);
1943 self
1944 }
1945
1946 /// Set a `CompressionTypeVariant` of CSV
1947 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
1948 pub fn with_file_compression_type(
1949 mut self,
1950 compression: CompressionTypeVariant,
1951 ) -> Self {
1952 self.compression = compression;
1953 self
1954 }
1955
1956 /// The delimiter character.
1957 pub fn delimiter(&self) -> u8 {
1958 self.delimiter
1959 }
1960
1961 /// The quote character.
1962 pub fn quote(&self) -> u8 {
1963 self.quote
1964 }
1965
1966 /// The terminator character.
1967 pub fn terminator(&self) -> Option<u8> {
1968 self.terminator
1969 }
1970
1971 /// The escape character.
1972 pub fn escape(&self) -> Option<u8> {
1973 self.escape
1974 }
1975}
1976
1977config_namespace! {
1978 /// Options controlling JSON format
1979 pub struct JsonOptions {
1980 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1981 pub schema_infer_max_rec: Option<usize>, default = None
1982 }
1983}
1984
1985pub trait FormatOptionsExt: Display {}
1986
1987#[derive(Debug, Clone, PartialEq)]
1988#[allow(clippy::large_enum_variant)]
1989pub enum FormatOptions {
1990 CSV(CsvOptions),
1991 JSON(JsonOptions),
1992 #[cfg(feature = "parquet")]
1993 PARQUET(TableParquetOptions),
1994 AVRO,
1995 ARROW,
1996}
1997
1998impl Display for FormatOptions {
1999 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2000 let out = match self {
2001 FormatOptions::CSV(_) => "csv",
2002 FormatOptions::JSON(_) => "json",
2003 #[cfg(feature = "parquet")]
2004 FormatOptions::PARQUET(_) => "parquet",
2005 FormatOptions::AVRO => "avro",
2006 FormatOptions::ARROW => "arrow",
2007 };
2008 write!(f, "{}", out)
2009 }
2010}
2011
2012#[cfg(test)]
2013mod tests {
2014 use std::any::Any;
2015 use std::collections::HashMap;
2016
2017 use crate::config::{
2018 ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2019 Extensions, TableOptions,
2020 };
2021
2022 #[derive(Default, Debug, Clone)]
2023 pub struct TestExtensionConfig {
2024 /// Should "foo" be replaced by "bar"?
2025 pub properties: HashMap<String, String>,
2026 }
2027
2028 impl ExtensionOptions for TestExtensionConfig {
2029 fn as_any(&self) -> &dyn Any {
2030 self
2031 }
2032
2033 fn as_any_mut(&mut self) -> &mut dyn Any {
2034 self
2035 }
2036
2037 fn cloned(&self) -> Box<dyn ExtensionOptions> {
2038 Box::new(self.clone())
2039 }
2040
2041 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2042 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2043 assert_eq!(key, "test");
2044 self.properties.insert(rem.to_owned(), value.to_owned());
2045 Ok(())
2046 }
2047
2048 fn entries(&self) -> Vec<ConfigEntry> {
2049 self.properties
2050 .iter()
2051 .map(|(k, v)| ConfigEntry {
2052 key: k.into(),
2053 value: Some(v.into()),
2054 description: "",
2055 })
2056 .collect()
2057 }
2058 }
2059
2060 impl ConfigExtension for TestExtensionConfig {
2061 const PREFIX: &'static str = "test";
2062 }
2063
2064 #[test]
2065 fn create_table_config() {
2066 let mut extension = Extensions::new();
2067 extension.insert(TestExtensionConfig::default());
2068 let table_config = TableOptions::new().with_extensions(extension);
2069 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2070 assert!(kafka_config.is_some())
2071 }
2072
2073 #[test]
2074 fn alter_test_extension_config() {
2075 let mut extension = Extensions::new();
2076 extension.insert(TestExtensionConfig::default());
2077 let mut table_config = TableOptions::new().with_extensions(extension);
2078 table_config.set_config_format(ConfigFileType::CSV);
2079 table_config.set("format.delimiter", ";").unwrap();
2080 assert_eq!(table_config.csv.delimiter, b';');
2081 table_config.set("test.bootstrap.servers", "asd").unwrap();
2082 let kafka_config = table_config
2083 .extensions
2084 .get::<TestExtensionConfig>()
2085 .unwrap();
2086 assert_eq!(
2087 kafka_config.properties.get("bootstrap.servers").unwrap(),
2088 "asd"
2089 );
2090 }
2091
2092 #[test]
2093 fn csv_u8_table_options() {
2094 let mut table_config = TableOptions::new();
2095 table_config.set_config_format(ConfigFileType::CSV);
2096 table_config.set("format.delimiter", ";").unwrap();
2097 assert_eq!(table_config.csv.delimiter as char, ';');
2098 table_config.set("format.escape", "\"").unwrap();
2099 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2100 table_config.set("format.escape", "\'").unwrap();
2101 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2102 }
2103
2104 #[test]
2105 fn warning_only_not_default() {
2106 use std::sync::atomic::AtomicUsize;
2107 static COUNT: AtomicUsize = AtomicUsize::new(0);
2108 use log::{Level, LevelFilter, Metadata, Record};
2109 struct SimpleLogger;
2110 impl log::Log for SimpleLogger {
2111 fn enabled(&self, metadata: &Metadata) -> bool {
2112 metadata.level() <= Level::Info
2113 }
2114
2115 fn log(&self, record: &Record) {
2116 if self.enabled(record.metadata()) {
2117 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2118 }
2119 }
2120 fn flush(&self) {}
2121 }
2122 log::set_logger(&SimpleLogger).unwrap();
2123 log::set_max_level(LevelFilter::Info);
2124 let mut sql_parser_options = crate::config::SqlParserOptions::default();
2125 sql_parser_options
2126 .set("enable_options_value_normalization", "false")
2127 .unwrap();
2128 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2129 sql_parser_options
2130 .set("enable_options_value_normalization", "true")
2131 .unwrap();
2132 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2133 }
2134
2135 #[cfg(feature = "parquet")]
2136 #[test]
2137 fn parquet_table_options() {
2138 let mut table_config = TableOptions::new();
2139 table_config.set_config_format(ConfigFileType::PARQUET);
2140 table_config
2141 .set("format.bloom_filter_enabled::col1", "true")
2142 .unwrap();
2143 assert_eq!(
2144 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2145 Some(true)
2146 );
2147 }
2148
2149 #[cfg(feature = "parquet")]
2150 #[test]
2151 fn parquet_table_options_config_entry() {
2152 let mut table_config = TableOptions::new();
2153 table_config.set_config_format(ConfigFileType::PARQUET);
2154 table_config
2155 .set("format.bloom_filter_enabled::col1", "true")
2156 .unwrap();
2157 let entries = table_config.entries();
2158 assert!(entries
2159 .iter()
2160 .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2161 }
2162
2163 #[cfg(feature = "parquet")]
2164 #[test]
2165 fn parquet_table_options_config_metadata_entry() {
2166 let mut table_config = TableOptions::new();
2167 table_config.set_config_format(ConfigFileType::PARQUET);
2168 table_config.set("format.metadata::key1", "").unwrap();
2169 table_config.set("format.metadata::key2", "value2").unwrap();
2170 table_config
2171 .set("format.metadata::key3", "value with spaces ")
2172 .unwrap();
2173 table_config
2174 .set("format.metadata::key4", "value with special chars :: :")
2175 .unwrap();
2176
2177 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2178 assert_eq!(parsed_metadata.get("should not exist1"), None);
2179 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2180 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2181 assert_eq!(
2182 parsed_metadata.get("key3"),
2183 Some(&Some("value with spaces ".into()))
2184 );
2185 assert_eq!(
2186 parsed_metadata.get("key4"),
2187 Some(&Some("value with special chars :: :".into()))
2188 );
2189
2190 // duplicate keys are overwritten
2191 table_config.set("format.metadata::key_dupe", "A").unwrap();
2192 table_config.set("format.metadata::key_dupe", "B").unwrap();
2193 let parsed_metadata = table_config.parquet.key_value_metadata;
2194 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2195 }
2196}