1use std::sync::Arc;
21
22use crate::{
23 _internal_datafusion_err, DataFusionError, Result,
24 config::{ParquetOptions, TableParquetOptions},
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29use parquet::{
30 arrow::ARROW_SCHEMA_META_KEY,
31 basic::{BrotliLevel, GzipLevel, ZstdLevel},
32 file::{
33 metadata::KeyValue,
34 properties::{
35 DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36 WriterPropertiesBuilder,
37 },
38 },
39 schema::types::ColumnPath,
40};
41
42#[derive(Clone, Debug)]
44pub struct ParquetWriterOptions {
45 pub writer_options: WriterProperties,
47}
48
49impl ParquetWriterOptions {
50 pub fn new(writer_options: WriterProperties) -> Self {
51 Self { writer_options }
52 }
53}
54
55impl ParquetWriterOptions {
56 pub fn writer_options(&self) -> &WriterProperties {
57 &self.writer_options
58 }
59}
60
61impl TableParquetOptions {
62 pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65 self.key_value_metadata.insert(
66 ARROW_SCHEMA_META_KEY.into(),
67 Some(encode_arrow_schema(schema)),
68 );
69 }
70}
71
72impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
73 type Error = DataFusionError;
74
75 fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
76 Ok(ParquetWriterOptions {
78 writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
79 .build(),
80 })
81 }
82}
83
84impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
85 type Error = DataFusionError;
86
87 fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93 let TableParquetOptions {
95 global,
96 column_specific_options,
97 key_value_metadata,
98 crypto: _,
99 } = table_parquet_options;
100
101 let mut builder = global.into_writer_properties_builder()?;
102
103 if !global.skip_arrow_metadata
105 && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
106 {
107 return Err(_internal_datafusion_err!(
108 "arrow schema was not added to the kv_metadata, even though it is required by configuration settings"
109 ));
110 }
111
112 if !key_value_metadata.is_empty() {
114 builder = builder.set_key_value_metadata(Some(
115 key_value_metadata
116 .to_owned()
117 .drain()
118 .map(|(key, value)| KeyValue { key, value })
119 .collect(),
120 ));
121 }
122
123 for (column, options) in column_specific_options {
125 let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127 if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128 builder = builder
129 .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130 }
131
132 if let Some(encoding) = &options.encoding {
133 let parsed_encoding = parse_encoding_string(encoding)?;
134 builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135 }
136
137 if let Some(dictionary_enabled) = options.dictionary_enabled {
138 builder = builder
139 .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140 }
141
142 if let Some(compression) = &options.compression {
143 let parsed_compression = parse_compression_string(compression)?;
144 builder =
145 builder.set_column_compression(path.clone(), parsed_compression);
146 }
147
148 if let Some(statistics_enabled) = &options.statistics_enabled {
149 let parsed_value = parse_statistics_string(statistics_enabled)?;
150 builder =
151 builder.set_column_statistics_enabled(path.clone(), parsed_value);
152 }
153
154 if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155 builder =
156 builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157 }
158
159 if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160 builder =
161 builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162 }
163 }
164
165 Ok(builder)
166 }
167}
168
169impl ParquetOptions {
170 pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
177 let ParquetOptions {
178 data_pagesize_limit,
179 write_batch_size,
180 writer_version,
181 compression,
182 dictionary_enabled,
183 dictionary_page_size_limit,
184 statistics_enabled,
185 max_row_group_size,
186 created_by,
187 column_index_truncate_length,
188 statistics_truncate_length,
189 data_page_row_count_limit,
190 encoding,
191 bloom_filter_on_write,
192 bloom_filter_fpp,
193 bloom_filter_ndv,
194
195 enable_page_index: _,
197 pruning: _,
198 skip_metadata: _,
199 metadata_size_hint: _,
200 pushdown_filters: _,
201 reorder_filters: _,
202 force_filter_selections: _, allow_single_file_parallelism: _,
204 maximum_parallel_row_group_writers: _,
205 maximum_buffered_record_batches_per_stream: _,
206 bloom_filter_on_read: _, schema_force_view_types: _,
208 binary_as_string: _, coerce_int96: _, skip_arrow_metadata: _,
211 max_predicate_cache_size: _,
212 } = self;
213
214 let mut builder = WriterProperties::builder()
215 .set_data_page_size_limit(*data_pagesize_limit)
216 .set_write_batch_size(*write_batch_size)
217 .set_writer_version((*writer_version).into())
218 .set_dictionary_page_size_limit(*dictionary_page_size_limit)
219 .set_statistics_enabled(
220 statistics_enabled
221 .as_ref()
222 .and_then(|s| parse_statistics_string(s).ok())
223 .unwrap_or(DEFAULT_STATISTICS_ENABLED),
224 )
225 .set_max_row_group_size(*max_row_group_size)
226 .set_created_by(created_by.clone())
227 .set_column_index_truncate_length(*column_index_truncate_length)
228 .set_statistics_truncate_length(*statistics_truncate_length)
229 .set_data_page_row_count_limit(*data_page_row_count_limit)
230 .set_bloom_filter_enabled(*bloom_filter_on_write);
231
232 if let Some(bloom_filter_fpp) = bloom_filter_fpp {
233 builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
234 };
235 if let Some(bloom_filter_ndv) = bloom_filter_ndv {
236 builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
237 };
238 if let Some(dictionary_enabled) = dictionary_enabled {
239 builder = builder.set_dictionary_enabled(*dictionary_enabled);
240 };
241
242 if let Some(compression) = compression {
245 builder = builder.set_compression(parse_compression_string(compression)?);
246 }
247 if let Some(encoding) = encoding {
248 builder = builder.set_encoding(parse_encoding_string(encoding)?);
249 }
250
251 Ok(builder)
252 }
253}
254
255pub(crate) fn parse_encoding_string(
257 str_setting: &str,
258) -> Result<parquet::basic::Encoding> {
259 let str_setting_lower: &str = &str_setting.to_lowercase();
260 match str_setting_lower {
261 "plain" => Ok(parquet::basic::Encoding::PLAIN),
262 "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
263 "rle" => Ok(parquet::basic::Encoding::RLE),
264 #[expect(deprecated)]
265 "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
266 "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
267 "delta_length_byte_array" => {
268 Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
269 }
270 "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
271 "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
272 "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
273 _ => Err(DataFusionError::Configuration(format!(
274 "Unknown or unsupported parquet encoding: \
275 {str_setting}. Valid values are: plain, plain_dictionary, rle, \
276 bit_packed, delta_binary_packed, delta_length_byte_array, \
277 delta_byte_array, rle_dictionary, and byte_stream_split."
278 ))),
279 }
280}
281
282fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
285 let str_setting = str_setting.replace('\'', "");
287 let split_setting = str_setting.split_once('(');
288
289 match split_setting {
290 Some((codec, rh)) => {
291 let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
292 DataFusionError::Configuration(format!(
293 "Could not parse compression string. \
294 Got codec: {codec} and unknown level from {str_setting}"
295 ))
296 })?;
297 Ok((codec.to_owned(), Some(*level)))
298 }
299 None => Ok((str_setting.to_owned(), None)),
300 }
301}
302
303fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
306 if level.is_some() {
307 return Err(DataFusionError::Configuration(format!(
308 "Compression {codec} does not support specifying a level"
309 )));
310 }
311 Ok(())
312}
313
314fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
317 level.ok_or(DataFusionError::Configuration(format!(
318 "{codec} compression requires specifying a level such as {codec}(4)"
319 )))
320}
321
322pub fn parse_compression_string(
324 str_setting: &str,
325) -> Result<parquet::basic::Compression> {
326 let str_setting_lower: &str = &str_setting.to_lowercase();
327 let (codec, level) = split_compression_string(str_setting_lower)?;
328 let codec = codec.as_str();
329 match codec {
330 "uncompressed" => {
331 check_level_is_none(codec, &level)?;
332 Ok(parquet::basic::Compression::UNCOMPRESSED)
333 }
334 "snappy" => {
335 check_level_is_none(codec, &level)?;
336 Ok(parquet::basic::Compression::SNAPPY)
337 }
338 "gzip" => {
339 let level = require_level(codec, level)?;
340 Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
341 level,
342 )?))
343 }
344 "lzo" => {
345 check_level_is_none(codec, &level)?;
346 Ok(parquet::basic::Compression::LZO)
347 }
348 "brotli" => {
349 let level = require_level(codec, level)?;
350 Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
351 level,
352 )?))
353 }
354 "lz4" => {
355 check_level_is_none(codec, &level)?;
356 Ok(parquet::basic::Compression::LZ4)
357 }
358 "zstd" => {
359 let level = require_level(codec, level)?;
360 Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
361 level as i32,
362 )?))
363 }
364 "lz4_raw" => {
365 check_level_is_none(codec, &level)?;
366 Ok(parquet::basic::Compression::LZ4_RAW)
367 }
368 _ => Err(DataFusionError::Configuration(format!(
369 "Unknown or unsupported parquet compression: \
370 {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
371 lzo, brotli(level), lz4, zstd(level), and lz4_raw."
372 ))),
373 }
374}
375
376pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
377 let str_setting_lower: &str = &str_setting.to_lowercase();
378 match str_setting_lower {
379 "none" => Ok(EnabledStatistics::None),
380 "chunk" => Ok(EnabledStatistics::Chunk),
381 "page" => Ok(EnabledStatistics::Page),
382 _ => Err(DataFusionError::Configuration(format!(
383 "Unknown or unsupported parquet statistics setting {str_setting} \
384 valid options are none, page, and chunk"
385 ))),
386 }
387}
388
389#[cfg(feature = "parquet")]
390#[cfg(test)]
391mod tests {
392 use super::*;
393 #[cfg(feature = "parquet_encryption")]
394 use crate::config::ConfigFileEncryptionProperties;
395 use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
396 use crate::parquet_config::DFParquetWriterVersion;
397 use parquet::basic::Compression;
398 use parquet::file::properties::{
399 BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
400 EnabledStatistics,
401 };
402 use std::collections::HashMap;
403
404 const COL_NAME: &str = "configured";
405
406 fn column_options_with_non_defaults(
408 src_col_defaults: &ParquetOptions,
409 ) -> ParquetColumnOptions {
410 ParquetColumnOptions {
411 compression: Some("zstd(22)".into()),
412 dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
413 statistics_enabled: Some("none".into()),
414 encoding: Some("RLE".into()),
415 bloom_filter_enabled: Some(true),
416 bloom_filter_fpp: Some(0.72),
417 bloom_filter_ndv: Some(72),
418 }
419 }
420
421 fn parquet_options_with_non_defaults() -> ParquetOptions {
422 let defaults = ParquetOptions::default();
423 let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
424 {
425 DFParquetWriterVersion::V2_0
426 } else {
427 DFParquetWriterVersion::V1_0
428 };
429
430 ParquetOptions {
431 data_pagesize_limit: 42,
432 write_batch_size: 42,
433 writer_version,
434 compression: Some("zstd(22)".into()),
435 dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
436 dictionary_page_size_limit: 42,
437 statistics_enabled: Some("chunk".into()),
438 max_row_group_size: 42,
439 created_by: "wordy".into(),
440 column_index_truncate_length: Some(42),
441 statistics_truncate_length: Some(42),
442 data_page_row_count_limit: 42,
443 encoding: Some("BYTE_STREAM_SPLIT".into()),
444 bloom_filter_on_write: !defaults.bloom_filter_on_write,
445 bloom_filter_fpp: Some(0.42),
446 bloom_filter_ndv: Some(42),
447
448 enable_page_index: defaults.enable_page_index,
450 pruning: defaults.pruning,
451 skip_metadata: defaults.skip_metadata,
452 metadata_size_hint: defaults.metadata_size_hint,
453 pushdown_filters: defaults.pushdown_filters,
454 reorder_filters: defaults.reorder_filters,
455 force_filter_selections: defaults.force_filter_selections,
456 allow_single_file_parallelism: defaults.allow_single_file_parallelism,
457 maximum_parallel_row_group_writers: defaults
458 .maximum_parallel_row_group_writers,
459 maximum_buffered_record_batches_per_stream: defaults
460 .maximum_buffered_record_batches_per_stream,
461 bloom_filter_on_read: defaults.bloom_filter_on_read,
462 schema_force_view_types: defaults.schema_force_view_types,
463 binary_as_string: defaults.binary_as_string,
464 skip_arrow_metadata: defaults.skip_arrow_metadata,
465 coerce_int96: None,
466 max_predicate_cache_size: defaults.max_predicate_cache_size,
467 }
468 }
469
470 fn extract_column_options(
471 props: &WriterProperties,
472 col: ColumnPath,
473 ) -> ParquetColumnOptions {
474 let bloom_filter_default_props = props.bloom_filter_properties(&col);
475
476 ParquetColumnOptions {
477 bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
478 encoding: props.encoding(&col).map(|s| s.to_string()),
479 dictionary_enabled: Some(props.dictionary_enabled(&col)),
480 compression: match props.compression(&col) {
481 Compression::ZSTD(lvl) => {
482 Some(format!("zstd({})", lvl.compression_level()))
483 }
484 _ => None,
485 },
486 statistics_enabled: Some(
487 match props.statistics_enabled(&col) {
488 EnabledStatistics::None => "none",
489 EnabledStatistics::Chunk => "chunk",
490 EnabledStatistics::Page => "page",
491 }
492 .into(),
493 ),
494 bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
495 bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
496 }
497 }
498
499 fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
502 let default_col = ColumnPath::from("col doesn't have specific config");
503 let default_col_props = extract_column_options(props, default_col);
504
505 let configured_col = ColumnPath::from(COL_NAME);
506 let configured_col_props = extract_column_options(props, configured_col);
507
508 let key_value_metadata = props
509 .key_value_metadata()
510 .map(|pairs| {
511 HashMap::from_iter(
512 pairs
513 .iter()
514 .cloned()
515 .map(|KeyValue { key, value }| (key, value)),
516 )
517 })
518 .unwrap_or_default();
519
520 let global_options_defaults = ParquetOptions::default();
521
522 let column_specific_options = if configured_col_props.eq(&default_col_props) {
523 HashMap::default()
524 } else {
525 HashMap::from([(COL_NAME.into(), configured_col_props)])
526 };
527
528 #[cfg(feature = "parquet_encryption")]
529 let fep = props
530 .file_encryption_properties()
531 .map(ConfigFileEncryptionProperties::from);
532
533 #[cfg(not(feature = "parquet_encryption"))]
534 let fep = None;
535
536 TableParquetOptions {
537 global: ParquetOptions {
538 data_pagesize_limit: props.dictionary_page_size_limit(),
540 write_batch_size: props.write_batch_size(),
541 writer_version: props.writer_version().into(),
542 dictionary_page_size_limit: props.dictionary_page_size_limit(),
543 max_row_group_size: props.max_row_group_size(),
544 created_by: props.created_by().to_string(),
545 column_index_truncate_length: props.column_index_truncate_length(),
546 statistics_truncate_length: props.statistics_truncate_length(),
547 data_page_row_count_limit: props.data_page_row_count_limit(),
548
549 encoding: default_col_props.encoding,
551 compression: default_col_props.compression,
552 dictionary_enabled: default_col_props.dictionary_enabled,
553 statistics_enabled: default_col_props.statistics_enabled,
554 bloom_filter_on_write: default_col_props
555 .bloom_filter_enabled
556 .unwrap_or_default(),
557 bloom_filter_fpp: default_col_props.bloom_filter_fpp,
558 bloom_filter_ndv: default_col_props.bloom_filter_ndv,
559
560 enable_page_index: global_options_defaults.enable_page_index,
562 pruning: global_options_defaults.pruning,
563 skip_metadata: global_options_defaults.skip_metadata,
564 metadata_size_hint: global_options_defaults.metadata_size_hint,
565 pushdown_filters: global_options_defaults.pushdown_filters,
566 reorder_filters: global_options_defaults.reorder_filters,
567 force_filter_selections: global_options_defaults.force_filter_selections,
568 allow_single_file_parallelism: global_options_defaults
569 .allow_single_file_parallelism,
570 maximum_parallel_row_group_writers: global_options_defaults
571 .maximum_parallel_row_group_writers,
572 maximum_buffered_record_batches_per_stream: global_options_defaults
573 .maximum_buffered_record_batches_per_stream,
574 bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
575 max_predicate_cache_size: global_options_defaults
576 .max_predicate_cache_size,
577 schema_force_view_types: global_options_defaults.schema_force_view_types,
578 binary_as_string: global_options_defaults.binary_as_string,
579 skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
580 coerce_int96: None,
581 },
582 column_specific_options,
583 key_value_metadata,
584 crypto: ParquetEncryptionOptions {
585 file_encryption: fep,
586 file_decryption: None,
587 factory_id: None,
588 factory_options: Default::default(),
589 },
590 }
591 }
592
593 #[test]
594 fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
595 let mut table_parquet_opts = TableParquetOptions::default();
597 assert!(
598 !table_parquet_opts.global.skip_arrow_metadata,
599 "default false, to not skip the arrow schema requirement"
600 );
601
602 let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
604 assert!(
605 should_error.is_err(),
606 "should error without the required arrow schema in kv_metadata",
607 );
608
609 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
611 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
612 assert!(
613 should_succeed.is_ok(),
614 "should work with the arrow schema skipped by config",
615 );
616
617 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
619 table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
621 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
622 assert!(
623 should_succeed.is_ok(),
624 "should work with the arrow schema included in TableParquetOptions",
625 );
626 }
627
628 #[test]
629 fn table_parquet_opts_to_writer_props() {
630 let parquet_options = parquet_options_with_non_defaults();
632
633 let key = ARROW_SCHEMA_META_KEY.to_string();
635 let value = Some("bar".into());
636 let table_parquet_opts = TableParquetOptions {
637 global: parquet_options.clone(),
638 column_specific_options: [(
639 COL_NAME.into(),
640 column_options_with_non_defaults(&parquet_options),
641 )]
642 .into(),
643 key_value_metadata: [(key, value)].into(),
644 crypto: Default::default(),
645 };
646
647 let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
648 .unwrap()
649 .build();
650 assert_eq!(
651 table_parquet_opts,
652 session_config_from_writer_props(&writer_props),
653 "the writer_props should have the same configuration as the session's TableParquetOptions",
654 );
655 }
656
657 #[test]
660 fn test_defaults_match() {
661 let mut default_table_writer_opts = TableParquetOptions::default();
663 let default_parquet_opts = ParquetOptions::default();
664 assert_eq!(
665 default_table_writer_opts.global, default_parquet_opts,
666 "should have matching defaults for TableParquetOptions.global and ParquetOptions",
667 );
668
669 default_table_writer_opts =
671 default_table_writer_opts.with_skip_arrow_metadata(true);
672
673 let default_writer_props = WriterProperties::new();
675
676 let from_datafusion_defaults =
678 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
679 .unwrap()
680 .build();
681
682 assert_ne!(
684 default_writer_props.created_by(),
685 from_datafusion_defaults.created_by(),
686 "should have different created_by sources",
687 );
688 assert!(
689 default_writer_props
690 .created_by()
691 .starts_with("parquet-rs version"),
692 "should indicate that writer_props defaults came from the extern parquet crate",
693 );
694 assert!(
695 default_table_writer_opts
696 .global
697 .created_by
698 .starts_with("datafusion version"),
699 "should indicate that table_parquet_opts defaults came from datafusion",
700 );
701
702 assert_eq!(
704 default_writer_props.compression(&"default".into()),
705 Compression::UNCOMPRESSED,
706 "extern parquet's default is None"
707 );
708 assert!(
709 matches!(
710 from_datafusion_defaults.compression(&"default".into()),
711 Compression::ZSTD(_)
712 ),
713 "datafusion's default is zstd"
714 );
715
716 let same_created_by = default_table_writer_opts.global.created_by.clone();
718 let mut from_extern_parquet =
719 session_config_from_writer_props(&default_writer_props);
720 from_extern_parquet.global.created_by = same_created_by;
721 from_extern_parquet.global.compression = Some("zstd(3)".into());
722 from_extern_parquet.global.skip_arrow_metadata = true;
723
724 assert_eq!(
725 default_table_writer_opts, from_extern_parquet,
726 "the default writer_props should have the same configuration as the session's default TableParquetOptions",
727 );
728 }
729
730 #[test]
731 fn test_bloom_filter_defaults() {
732 let mut default_table_writer_opts = TableParquetOptions::default();
734 default_table_writer_opts.global.bloom_filter_on_write = true;
735 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
737 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
738 .unwrap()
739 .build();
740
741 let default_writer_props = WriterProperties::builder()
743 .set_bloom_filter_enabled(true)
744 .build();
745
746 assert_eq!(
747 default_writer_props.bloom_filter_properties(&"default".into()),
748 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
749 "parquet and datafusion props, should have the same bloom filter props",
750 );
751 assert_eq!(
752 default_writer_props.bloom_filter_properties(&"default".into()),
753 Some(&BloomFilterProperties::default()),
754 "should use the default bloom filter props"
755 );
756 }
757
758 #[test]
759 fn test_bloom_filter_set_fpp_only() {
760 let mut default_table_writer_opts = TableParquetOptions::default();
762 default_table_writer_opts.global.bloom_filter_on_write = true;
763 default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
764 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
766 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
767 .unwrap()
768 .build();
769
770 let default_writer_props = WriterProperties::builder()
772 .set_bloom_filter_enabled(true)
773 .set_bloom_filter_fpp(0.42)
774 .build();
775
776 assert_eq!(
777 default_writer_props.bloom_filter_properties(&"default".into()),
778 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
779 "parquet and datafusion props, should have the same bloom filter props",
780 );
781 assert_eq!(
782 default_writer_props.bloom_filter_properties(&"default".into()),
783 Some(&BloomFilterProperties {
784 fpp: 0.42,
785 ndv: DEFAULT_BLOOM_FILTER_NDV
786 }),
787 "should have only the fpp set, and the ndv at default",
788 );
789 }
790
791 #[test]
792 fn test_bloom_filter_set_ndv_only() {
793 let mut default_table_writer_opts = TableParquetOptions::default();
795 default_table_writer_opts.global.bloom_filter_on_write = true;
796 default_table_writer_opts.global.bloom_filter_ndv = Some(42);
797 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
799 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
800 .unwrap()
801 .build();
802
803 let default_writer_props = WriterProperties::builder()
805 .set_bloom_filter_enabled(true)
806 .set_bloom_filter_ndv(42)
807 .build();
808
809 assert_eq!(
810 default_writer_props.bloom_filter_properties(&"default".into()),
811 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
812 "parquet and datafusion props, should have the same bloom filter props",
813 );
814 assert_eq!(
815 default_writer_props.bloom_filter_properties(&"default".into()),
816 Some(&BloomFilterProperties {
817 fpp: DEFAULT_BLOOM_FILTER_FPP,
818 ndv: 42
819 }),
820 "should have only the ndv set, and the fpp at default",
821 );
822 }
823}