1use std::sync::Arc;
21
22use crate::{
23 _internal_datafusion_err, DataFusionError, Result,
24 config::{ParquetCdcOptions, ParquetOptions, TableParquetOptions},
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29use parquet::{
30 arrow::ARROW_SCHEMA_META_KEY,
31 basic::{BrotliLevel, GzipLevel, ZstdLevel},
32 file::{
33 metadata::KeyValue,
34 properties::{
35 DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36 WriterPropertiesBuilder,
37 },
38 },
39 schema::types::ColumnPath,
40};
41
42#[derive(Clone, Debug)]
44pub struct ParquetWriterOptions {
45 pub writer_options: WriterProperties,
47}
48
49impl ParquetWriterOptions {
50 pub fn new(writer_options: WriterProperties) -> Self {
51 Self { writer_options }
52 }
53}
54
55impl ParquetWriterOptions {
56 pub fn writer_options(&self) -> &WriterProperties {
57 &self.writer_options
58 }
59}
60
61impl TableParquetOptions {
62 pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65 self.key_value_metadata.insert(
66 ARROW_SCHEMA_META_KEY.into(),
67 Some(encode_arrow_schema(schema)),
68 );
69 }
70}
71
72impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
73 type Error = DataFusionError;
74
75 fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
76 Ok(ParquetWriterOptions {
78 writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
79 .build(),
80 })
81 }
82}
83
84impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
85 type Error = DataFusionError;
86
87 fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93 let TableParquetOptions {
95 global,
96 column_specific_options,
97 key_value_metadata,
98 ..
99 } = table_parquet_options;
100
101 let mut builder = global.into_writer_properties_builder()?;
102
103 if !global.skip_arrow_metadata
105 && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
106 {
107 return Err(_internal_datafusion_err!(
108 "arrow schema was not added to the kv_metadata, even though it is required by configuration settings"
109 ));
110 }
111
112 if !key_value_metadata.is_empty() {
114 builder = builder.set_key_value_metadata(Some(
115 key_value_metadata
116 .to_owned()
117 .drain()
118 .map(|(key, value)| KeyValue { key, value })
119 .collect(),
120 ));
121 }
122
123 for (column, options) in column_specific_options {
125 let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127 if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128 builder = builder
129 .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130 }
131
132 if let Some(encoding) = &options.encoding {
133 let parsed_encoding = parse_encoding_string(encoding)?;
134 builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135 }
136
137 if let Some(dictionary_enabled) = options.dictionary_enabled {
138 builder = builder
139 .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140 }
141
142 if let Some(compression) = &options.compression {
143 let parsed_compression = parse_compression_string(compression)?;
144 builder =
145 builder.set_column_compression(path.clone(), parsed_compression);
146 }
147
148 if let Some(statistics_enabled) = &options.statistics_enabled {
149 let parsed_value = parse_statistics_string(statistics_enabled)?;
150 builder =
151 builder.set_column_statistics_enabled(path.clone(), parsed_value);
152 }
153
154 if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155 builder =
156 builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157 }
158
159 if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160 builder =
161 builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162 }
163 }
164
165 Ok(builder)
166 }
167}
168
169impl From<&ParquetCdcOptions> for Option<parquet::file::properties::CdcOptions> {
175 fn from(value: &ParquetCdcOptions) -> Self {
176 value
177 .enabled
178 .then_some(parquet::file::properties::CdcOptions {
179 min_chunk_size: value.min_chunk_size,
180 max_chunk_size: value.max_chunk_size,
181 norm_level: value.norm_level,
182 })
183 }
184}
185
186impl From<Option<&parquet::file::properties::CdcOptions>> for ParquetCdcOptions {
192 fn from(value: Option<&parquet::file::properties::CdcOptions>) -> Self {
193 match value {
194 Some(cdc) => ParquetCdcOptions {
195 enabled: true,
196 min_chunk_size: cdc.min_chunk_size,
197 max_chunk_size: cdc.max_chunk_size,
198 norm_level: cdc.norm_level,
199 },
200 None => ParquetCdcOptions::default(),
201 }
202 }
203}
204
205impl ParquetOptions {
206 pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
213 let ParquetOptions {
214 data_pagesize_limit,
215 write_batch_size,
216 writer_version,
217 compression,
218 dictionary_enabled,
219 dictionary_page_size_limit,
220 statistics_enabled,
221 max_row_group_size,
222 created_by,
223 column_index_truncate_length,
224 statistics_truncate_length,
225 data_page_row_count_limit,
226 encoding,
227 bloom_filter_on_write,
228 bloom_filter_fpp,
229 bloom_filter_ndv,
230 content_defined_chunking,
231
232 enable_page_index: _,
234 pruning: _,
235 skip_metadata: _,
236 metadata_size_hint: _,
237 pushdown_filters: _,
238 reorder_filters: _,
239 force_filter_selections: _, allow_single_file_parallelism: _,
241 maximum_parallel_row_group_writers: _,
242 maximum_buffered_record_batches_per_stream: _,
243 bloom_filter_on_read: _, schema_force_view_types: _,
245 binary_as_string: _, coerce_int96: _, coerce_int96_tz: _, skip_arrow_metadata: _,
249 max_predicate_cache_size: _,
250 } = self;
251
252 let mut builder = WriterProperties::builder()
253 .set_data_page_size_limit(*data_pagesize_limit)
254 .set_write_batch_size(*write_batch_size)
255 .set_writer_version((*writer_version).into())
256 .set_dictionary_page_size_limit(*dictionary_page_size_limit)
257 .set_statistics_enabled(
258 statistics_enabled
259 .as_ref()
260 .and_then(|s| parse_statistics_string(s).ok())
261 .unwrap_or(DEFAULT_STATISTICS_ENABLED),
262 )
263 .set_max_row_group_row_count(Some(*max_row_group_size))
264 .set_created_by(created_by.clone())
265 .set_column_index_truncate_length(*column_index_truncate_length)
266 .set_statistics_truncate_length(*statistics_truncate_length)
267 .set_data_page_row_count_limit(*data_page_row_count_limit)
268 .set_bloom_filter_enabled(*bloom_filter_on_write);
269
270 if let Some(bloom_filter_fpp) = bloom_filter_fpp {
271 builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
272 };
273 if let Some(bloom_filter_ndv) = bloom_filter_ndv {
274 builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
275 };
276 if let Some(dictionary_enabled) = dictionary_enabled {
277 builder = builder.set_dictionary_enabled(*dictionary_enabled);
278 };
279
280 if let Some(compression) = compression {
283 builder = builder.set_compression(parse_compression_string(compression)?);
284 }
285 if let Some(encoding) = encoding {
286 builder = builder.set_encoding(parse_encoding_string(encoding)?);
287 }
288 builder = builder.set_content_defined_chunking(content_defined_chunking.into());
289
290 Ok(builder)
291 }
292}
293
294pub(crate) fn parse_encoding_string(
296 str_setting: &str,
297) -> Result<parquet::basic::Encoding> {
298 let str_setting_lower: &str = &str_setting.to_lowercase();
299 match str_setting_lower {
300 "plain" => Ok(parquet::basic::Encoding::PLAIN),
301 "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
302 "rle" => Ok(parquet::basic::Encoding::RLE),
303 #[expect(deprecated)]
304 "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
305 "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
306 "delta_length_byte_array" => {
307 Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
308 }
309 "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
310 "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
311 "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
312 _ => Err(DataFusionError::Configuration(format!(
313 "Unknown or unsupported parquet encoding: \
314 {str_setting}. Valid values are: plain, plain_dictionary, rle, \
315 bit_packed, delta_binary_packed, delta_length_byte_array, \
316 delta_byte_array, rle_dictionary, and byte_stream_split."
317 ))),
318 }
319}
320
321fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
324 let str_setting = str_setting.replace('\'', "");
326 let split_setting = str_setting.split_once('(');
327
328 match split_setting {
329 Some((codec, rh)) => {
330 let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
331 DataFusionError::Configuration(format!(
332 "Could not parse compression string. \
333 Got codec: {codec} and unknown level from {str_setting}"
334 ))
335 })?;
336 Ok((codec.to_owned(), Some(*level)))
337 }
338 None => Ok((str_setting.to_owned(), None)),
339 }
340}
341
342fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
345 if level.is_some() {
346 return Err(DataFusionError::Configuration(format!(
347 "Compression {codec} does not support specifying a level"
348 )));
349 }
350 Ok(())
351}
352
353fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
356 level.ok_or(DataFusionError::Configuration(format!(
357 "{codec} compression requires specifying a level such as {codec}(4)"
358 )))
359}
360
361pub fn parse_compression_string(
363 str_setting: &str,
364) -> Result<parquet::basic::Compression> {
365 let str_setting_lower: &str = &str_setting.to_lowercase();
366 let (codec, level) = split_compression_string(str_setting_lower)?;
367 let codec = codec.as_str();
368 match codec {
369 "uncompressed" => {
370 check_level_is_none(codec, &level)?;
371 Ok(parquet::basic::Compression::UNCOMPRESSED)
372 }
373 "snappy" => {
374 check_level_is_none(codec, &level)?;
375 Ok(parquet::basic::Compression::SNAPPY)
376 }
377 "gzip" => {
378 let level = require_level(codec, level)?;
379 Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
380 level,
381 )?))
382 }
383 "brotli" => {
384 let level = require_level(codec, level)?;
385 Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
386 level,
387 )?))
388 }
389 "lz4" => {
390 check_level_is_none(codec, &level)?;
391 Ok(parquet::basic::Compression::LZ4)
392 }
393 "zstd" => {
394 let level = require_level(codec, level)?;
395 Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
396 level as i32,
397 )?))
398 }
399 "lz4_raw" => {
400 check_level_is_none(codec, &level)?;
401 Ok(parquet::basic::Compression::LZ4_RAW)
402 }
403 _ => Err(DataFusionError::Configuration(format!(
404 "Unknown or unsupported parquet compression: \
405 {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
406 brotli(level), lz4, zstd(level), and lz4_raw."
407 ))),
408 }
409}
410
411pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
412 let str_setting_lower: &str = &str_setting.to_lowercase();
413 match str_setting_lower {
414 "none" => Ok(EnabledStatistics::None),
415 "chunk" => Ok(EnabledStatistics::Chunk),
416 "page" => Ok(EnabledStatistics::Page),
417 _ => Err(DataFusionError::Configuration(format!(
418 "Unknown or unsupported parquet statistics setting {str_setting} \
419 valid options are none, page, and chunk"
420 ))),
421 }
422}
423
424#[cfg(feature = "parquet")]
425#[cfg(test)]
426mod tests {
427 use super::*;
428 #[cfg(feature = "parquet_encryption")]
429 use crate::config::ConfigFileEncryptionProperties;
430 use crate::config::{
431 ParquetCdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
432 };
433 use crate::parquet_config::DFParquetWriterVersion;
434 use parquet::basic::Compression;
435 use parquet::file::properties::{
436 BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
437 DEFAULT_MAX_ROW_GROUP_ROW_COUNT, EnabledStatistics,
438 };
439 use std::collections::HashMap;
440
441 const COL_NAME: &str = "configured";
442
443 fn column_options_with_non_defaults(
445 src_col_defaults: &ParquetOptions,
446 ) -> ParquetColumnOptions {
447 ParquetColumnOptions {
448 compression: Some("zstd(22)".into()),
449 dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
450 statistics_enabled: Some("none".into()),
451 encoding: Some("RLE".into()),
452 bloom_filter_enabled: Some(true),
453 bloom_filter_fpp: Some(0.72),
454 bloom_filter_ndv: Some(72),
455 }
456 }
457
458 fn parquet_options_with_non_defaults() -> ParquetOptions {
459 let defaults = ParquetOptions::default();
460 let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
461 {
462 DFParquetWriterVersion::V2_0
463 } else {
464 DFParquetWriterVersion::V1_0
465 };
466
467 ParquetOptions {
468 data_pagesize_limit: 42,
469 write_batch_size: 42,
470 writer_version,
471 compression: Some("zstd(22)".into()),
472 dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
473 dictionary_page_size_limit: 42,
474 statistics_enabled: Some("chunk".into()),
475 max_row_group_size: 42,
476 created_by: "wordy".into(),
477 column_index_truncate_length: Some(42),
478 statistics_truncate_length: Some(42),
479 data_page_row_count_limit: 42,
480 encoding: Some("BYTE_STREAM_SPLIT".into()),
481 bloom_filter_on_write: !defaults.bloom_filter_on_write,
482 bloom_filter_fpp: Some(0.42),
483 bloom_filter_ndv: Some(42),
484
485 enable_page_index: defaults.enable_page_index,
487 pruning: defaults.pruning,
488 skip_metadata: defaults.skip_metadata,
489 metadata_size_hint: defaults.metadata_size_hint,
490 pushdown_filters: defaults.pushdown_filters,
491 reorder_filters: defaults.reorder_filters,
492 force_filter_selections: defaults.force_filter_selections,
493 allow_single_file_parallelism: defaults.allow_single_file_parallelism,
494 maximum_parallel_row_group_writers: defaults
495 .maximum_parallel_row_group_writers,
496 maximum_buffered_record_batches_per_stream: defaults
497 .maximum_buffered_record_batches_per_stream,
498 bloom_filter_on_read: defaults.bloom_filter_on_read,
499 schema_force_view_types: defaults.schema_force_view_types,
500 binary_as_string: defaults.binary_as_string,
501 skip_arrow_metadata: defaults.skip_arrow_metadata,
502 coerce_int96: None,
503 coerce_int96_tz: None,
504 max_predicate_cache_size: defaults.max_predicate_cache_size,
505 content_defined_chunking: defaults.content_defined_chunking.clone(),
506 }
507 }
508
509 fn extract_column_options(
510 props: &WriterProperties,
511 col: ColumnPath,
512 ) -> ParquetColumnOptions {
513 let bloom_filter_default_props = props.bloom_filter_properties(&col);
514
515 ParquetColumnOptions {
516 bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
517 encoding: props.encoding(&col).map(|s| s.to_string()),
518 dictionary_enabled: Some(props.dictionary_enabled(&col)),
519 compression: match props.compression(&col) {
520 Compression::ZSTD(lvl) => {
521 Some(format!("zstd({})", lvl.compression_level()))
522 }
523 _ => None,
524 },
525 statistics_enabled: Some(
526 match props.statistics_enabled(&col) {
527 EnabledStatistics::None => "none",
528 EnabledStatistics::Chunk => "chunk",
529 EnabledStatistics::Page => "page",
530 }
531 .into(),
532 ),
533 bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
534 bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
535 }
536 }
537
538 fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
541 let default_col = ColumnPath::from("col doesn't have specific config");
542 let default_col_props = extract_column_options(props, default_col);
543
544 let configured_col = ColumnPath::from(COL_NAME);
545 let configured_col_props = extract_column_options(props, configured_col);
546
547 let key_value_metadata = props
548 .key_value_metadata()
549 .map(|pairs| {
550 HashMap::from_iter(
551 pairs
552 .iter()
553 .cloned()
554 .map(|KeyValue { key, value }| (key, value)),
555 )
556 })
557 .unwrap_or_default();
558
559 let global_options_defaults = ParquetOptions::default();
560
561 let column_specific_options = if configured_col_props.eq(&default_col_props) {
562 HashMap::default()
563 } else {
564 HashMap::from([(COL_NAME.into(), configured_col_props)])
565 };
566
567 #[cfg(feature = "parquet_encryption")]
568 let fep = props
569 .file_encryption_properties()
570 .map(ConfigFileEncryptionProperties::from);
571
572 #[cfg(not(feature = "parquet_encryption"))]
573 let fep = None;
574
575 TableParquetOptions {
576 global: ParquetOptions {
577 data_pagesize_limit: props.dictionary_page_size_limit(),
579 write_batch_size: props.write_batch_size(),
580 writer_version: props.writer_version().into(),
581 dictionary_page_size_limit: props.dictionary_page_size_limit(),
582 max_row_group_size: props
583 .max_row_group_row_count()
584 .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
585 created_by: props.created_by().to_string(),
586 column_index_truncate_length: props.column_index_truncate_length(),
587 statistics_truncate_length: props.statistics_truncate_length(),
588 data_page_row_count_limit: props.data_page_row_count_limit(),
589
590 encoding: default_col_props.encoding,
592 compression: default_col_props.compression,
593 dictionary_enabled: default_col_props.dictionary_enabled,
594 statistics_enabled: default_col_props.statistics_enabled,
595 bloom_filter_on_write: default_col_props
596 .bloom_filter_enabled
597 .unwrap_or_default(),
598 bloom_filter_fpp: default_col_props.bloom_filter_fpp,
599 bloom_filter_ndv: default_col_props.bloom_filter_ndv,
600
601 enable_page_index: global_options_defaults.enable_page_index,
603 pruning: global_options_defaults.pruning,
604 skip_metadata: global_options_defaults.skip_metadata,
605 metadata_size_hint: global_options_defaults.metadata_size_hint,
606 pushdown_filters: global_options_defaults.pushdown_filters,
607 reorder_filters: global_options_defaults.reorder_filters,
608 force_filter_selections: global_options_defaults.force_filter_selections,
609 allow_single_file_parallelism: global_options_defaults
610 .allow_single_file_parallelism,
611 maximum_parallel_row_group_writers: global_options_defaults
612 .maximum_parallel_row_group_writers,
613 maximum_buffered_record_batches_per_stream: global_options_defaults
614 .maximum_buffered_record_batches_per_stream,
615 bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
616 max_predicate_cache_size: global_options_defaults
617 .max_predicate_cache_size,
618 schema_force_view_types: global_options_defaults.schema_force_view_types,
619 binary_as_string: global_options_defaults.binary_as_string,
620 skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
621 coerce_int96: None,
622 coerce_int96_tz: None,
623 content_defined_chunking: props.content_defined_chunking().into(),
624 },
625 column_specific_options,
626 key_value_metadata,
627 crypto: ParquetEncryptionOptions {
628 file_encryption: fep,
629 file_decryption: None,
630 factory_id: None,
631 factory_options: Default::default(),
632 },
633 }
634 }
635
636 #[test]
637 fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
638 let mut table_parquet_opts = TableParquetOptions::default();
640 assert!(
641 !table_parquet_opts.global.skip_arrow_metadata,
642 "default false, to not skip the arrow schema requirement"
643 );
644
645 let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
647 assert!(
648 should_error.is_err(),
649 "should error without the required arrow schema in kv_metadata",
650 );
651
652 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
654 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
655 assert!(
656 should_succeed.is_ok(),
657 "should work with the arrow schema skipped by config",
658 );
659
660 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
662 table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
664 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
665 assert!(
666 should_succeed.is_ok(),
667 "should work with the arrow schema included in TableParquetOptions",
668 );
669 }
670
671 #[test]
672 fn table_parquet_opts_to_writer_props() {
673 let parquet_options = parquet_options_with_non_defaults();
675
676 let key = ARROW_SCHEMA_META_KEY.to_string();
678 let value = Some("bar".into());
679 let table_parquet_opts = TableParquetOptions {
680 global: parquet_options.clone(),
681 column_specific_options: [(
682 COL_NAME.into(),
683 column_options_with_non_defaults(&parquet_options),
684 )]
685 .into(),
686 key_value_metadata: [(key, value)].into(),
687 crypto: Default::default(),
688 };
689
690 let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
691 .unwrap()
692 .build();
693 assert_eq!(
694 table_parquet_opts,
695 session_config_from_writer_props(&writer_props),
696 "the writer_props should have the same configuration as the session's TableParquetOptions",
697 );
698 }
699
700 #[test]
703 fn test_defaults_match() {
704 let mut default_table_writer_opts = TableParquetOptions::default();
706 let default_parquet_opts = ParquetOptions::default();
707 assert_eq!(
708 default_table_writer_opts.global, default_parquet_opts,
709 "should have matching defaults for TableParquetOptions.global and ParquetOptions",
710 );
711
712 default_table_writer_opts =
714 default_table_writer_opts.with_skip_arrow_metadata(true);
715
716 let default_writer_props = WriterProperties::new();
718
719 let from_datafusion_defaults =
721 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
722 .unwrap()
723 .build();
724
725 assert_ne!(
727 default_writer_props.created_by(),
728 from_datafusion_defaults.created_by(),
729 "should have different created_by sources",
730 );
731 assert!(
732 default_writer_props
733 .created_by()
734 .starts_with("parquet-rs version"),
735 "should indicate that writer_props defaults came from the extern parquet crate",
736 );
737 assert!(
738 default_table_writer_opts
739 .global
740 .created_by
741 .starts_with("datafusion version"),
742 "should indicate that table_parquet_opts defaults came from datafusion",
743 );
744
745 assert_eq!(
747 default_writer_props.compression(&"default".into()),
748 Compression::UNCOMPRESSED,
749 "extern parquet's default is None"
750 );
751 assert!(
752 matches!(
753 from_datafusion_defaults.compression(&"default".into()),
754 Compression::ZSTD(_)
755 ),
756 "datafusion's default is zstd"
757 );
758
759 let same_created_by = default_table_writer_opts.global.created_by.clone();
761 let mut from_extern_parquet =
762 session_config_from_writer_props(&default_writer_props);
763 from_extern_parquet.global.created_by = same_created_by;
764 from_extern_parquet.global.compression = Some("zstd(3)".into());
765 from_extern_parquet.global.skip_arrow_metadata = true;
766
767 assert_eq!(
768 default_table_writer_opts, from_extern_parquet,
769 "the default writer_props should have the same configuration as the session's default TableParquetOptions",
770 );
771 }
772
773 #[test]
774 fn test_bloom_filter_defaults() {
775 let mut default_table_writer_opts = TableParquetOptions::default();
777 default_table_writer_opts.global.bloom_filter_on_write = true;
778 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
780 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
781 .unwrap()
782 .build();
783
784 let default_writer_props = WriterProperties::builder()
786 .set_bloom_filter_enabled(true)
787 .build();
788
789 assert_eq!(
790 default_writer_props.bloom_filter_properties(&"default".into()),
791 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
792 "parquet and datafusion props, should have the same bloom filter props",
793 );
794 assert_eq!(
795 default_writer_props.bloom_filter_properties(&"default".into()),
796 Some(&BloomFilterProperties::default()),
797 "should use the default bloom filter props"
798 );
799 }
800
801 #[test]
802 fn test_bloom_filter_set_fpp_only() {
803 let mut default_table_writer_opts = TableParquetOptions::default();
805 default_table_writer_opts.global.bloom_filter_on_write = true;
806 default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
807 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
809 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
810 .unwrap()
811 .build();
812
813 let default_writer_props = WriterProperties::builder()
815 .set_bloom_filter_enabled(true)
816 .set_bloom_filter_fpp(0.42)
817 .build();
818
819 assert_eq!(
820 default_writer_props.bloom_filter_properties(&"default".into()),
821 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
822 "parquet and datafusion props, should have the same bloom filter props",
823 );
824 assert_eq!(
825 default_writer_props.bloom_filter_properties(&"default".into()),
826 Some(&BloomFilterProperties {
827 fpp: 0.42,
828 ndv: DEFAULT_BLOOM_FILTER_NDV
829 }),
830 "should have only the fpp set, and the ndv at default",
831 );
832 }
833
834 #[test]
835 fn test_cdc_enabled_with_custom_options() {
836 let mut opts = TableParquetOptions::default();
837 opts.global.content_defined_chunking = ParquetCdcOptions {
838 enabled: true,
839 min_chunk_size: 128 * 1024,
840 max_chunk_size: 512 * 1024,
841 norm_level: 2,
842 };
843 opts.arrow_schema(&Arc::new(Schema::empty()));
844
845 let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
846 let cdc = props.content_defined_chunking().expect("CDC should be set");
847 assert_eq!(cdc.min_chunk_size, 128 * 1024);
848 assert_eq!(cdc.max_chunk_size, 512 * 1024);
849 assert_eq!(cdc.norm_level, 2);
850 }
851
852 #[test]
853 fn test_cdc_disabled_by_default() {
854 let mut opts = TableParquetOptions::default();
855 opts.arrow_schema(&Arc::new(Schema::empty()));
856
857 let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
858 assert!(props.content_defined_chunking().is_none());
859 }
860
861 #[test]
862 fn test_cdc_params_ignored_when_disabled() {
863 let mut opts = TableParquetOptions::default();
865 opts.global.content_defined_chunking = ParquetCdcOptions {
866 enabled: false,
867 min_chunk_size: 128 * 1024,
868 max_chunk_size: 512 * 1024,
869 norm_level: 2,
870 };
871 opts.arrow_schema(&Arc::new(Schema::empty()));
872
873 let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
874 assert!(props.content_defined_chunking().is_none());
875 }
876
877 #[test]
878 fn test_cdc_round_trip_through_writer_props() {
879 let mut opts = TableParquetOptions::default();
880 opts.global.content_defined_chunking = ParquetCdcOptions {
881 enabled: true,
882 min_chunk_size: 64 * 1024,
883 max_chunk_size: 2 * 1024 * 1024,
884 norm_level: -1,
885 };
886 opts.arrow_schema(&Arc::new(Schema::empty()));
887
888 let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
889 let recovered = session_config_from_writer_props(&props);
890
891 let cdc = recovered.global.content_defined_chunking;
892 assert!(cdc.enabled);
893 assert_eq!(cdc.min_chunk_size, 64 * 1024);
894 assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
895 assert_eq!(cdc.norm_level, -1);
896 }
897
898 #[test]
899 fn test_bloom_filter_set_ndv_only() {
900 let mut default_table_writer_opts = TableParquetOptions::default();
902 default_table_writer_opts.global.bloom_filter_on_write = true;
903 default_table_writer_opts.global.bloom_filter_ndv = Some(42);
904 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
906 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
907 .unwrap()
908 .build();
909
910 let default_writer_props = WriterProperties::builder()
912 .set_bloom_filter_enabled(true)
913 .set_bloom_filter_ndv(42)
914 .build();
915
916 assert_eq!(
917 default_writer_props.bloom_filter_properties(&"default".into()),
918 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
919 "parquet and datafusion props, should have the same bloom filter props",
920 );
921 assert_eq!(
922 default_writer_props.bloom_filter_properties(&"default".into()),
923 Some(&BloomFilterProperties {
924 fpp: DEFAULT_BLOOM_FILTER_FPP,
925 ndv: 42
926 }),
927 "should have only the ndv set, and the fpp at default",
928 );
929 }
930}