1use std::sync::Arc;
21
22use crate::{
23 _internal_datafusion_err, DataFusionError, Result,
24 config::{ParquetOptions, TableParquetOptions},
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29use parquet::{
30 arrow::ARROW_SCHEMA_META_KEY,
31 basic::{BrotliLevel, GzipLevel, ZstdLevel},
32 file::{
33 metadata::KeyValue,
34 properties::{
35 DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36 WriterPropertiesBuilder,
37 },
38 },
39 schema::types::ColumnPath,
40};
41
42#[derive(Clone, Debug)]
44pub struct ParquetWriterOptions {
45 pub writer_options: WriterProperties,
47}
48
49impl ParquetWriterOptions {
50 pub fn new(writer_options: WriterProperties) -> Self {
51 Self { writer_options }
52 }
53}
54
55impl ParquetWriterOptions {
56 pub fn writer_options(&self) -> &WriterProperties {
57 &self.writer_options
58 }
59}
60
61impl TableParquetOptions {
62 pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65 self.key_value_metadata.insert(
66 ARROW_SCHEMA_META_KEY.into(),
67 Some(encode_arrow_schema(schema)),
68 );
69 }
70}
71
72impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
73 type Error = DataFusionError;
74
75 fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
76 Ok(ParquetWriterOptions {
78 writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
79 .build(),
80 })
81 }
82}
83
84impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
85 type Error = DataFusionError;
86
87 fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93 let TableParquetOptions {
95 global,
96 column_specific_options,
97 key_value_metadata,
98 crypto: _,
99 } = table_parquet_options;
100
101 let mut builder = global.into_writer_properties_builder()?;
102
103 if !global.skip_arrow_metadata
105 && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
106 {
107 return Err(_internal_datafusion_err!(
108 "arrow schema was not added to the kv_metadata, even though it is required by configuration settings"
109 ));
110 }
111
112 if !key_value_metadata.is_empty() {
114 builder = builder.set_key_value_metadata(Some(
115 key_value_metadata
116 .to_owned()
117 .drain()
118 .map(|(key, value)| KeyValue { key, value })
119 .collect(),
120 ));
121 }
122
123 for (column, options) in column_specific_options {
125 let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127 if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128 builder = builder
129 .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130 }
131
132 if let Some(encoding) = &options.encoding {
133 let parsed_encoding = parse_encoding_string(encoding)?;
134 builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135 }
136
137 if let Some(dictionary_enabled) = options.dictionary_enabled {
138 builder = builder
139 .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140 }
141
142 if let Some(compression) = &options.compression {
143 let parsed_compression = parse_compression_string(compression)?;
144 builder =
145 builder.set_column_compression(path.clone(), parsed_compression);
146 }
147
148 if let Some(statistics_enabled) = &options.statistics_enabled {
149 let parsed_value = parse_statistics_string(statistics_enabled)?;
150 builder =
151 builder.set_column_statistics_enabled(path.clone(), parsed_value);
152 }
153
154 if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155 builder =
156 builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157 }
158
159 if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160 builder =
161 builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162 }
163 }
164
165 Ok(builder)
166 }
167}
168
169impl ParquetOptions {
170 pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
177 let ParquetOptions {
178 data_pagesize_limit,
179 write_batch_size,
180 writer_version,
181 compression,
182 dictionary_enabled,
183 dictionary_page_size_limit,
184 statistics_enabled,
185 max_row_group_size,
186 created_by,
187 column_index_truncate_length,
188 statistics_truncate_length,
189 data_page_row_count_limit,
190 encoding,
191 bloom_filter_on_write,
192 bloom_filter_fpp,
193 bloom_filter_ndv,
194
195 enable_page_index: _,
197 pruning: _,
198 skip_metadata: _,
199 metadata_size_hint: _,
200 pushdown_filters: _,
201 reorder_filters: _,
202 force_filter_selections: _, allow_single_file_parallelism: _,
204 maximum_parallel_row_group_writers: _,
205 maximum_buffered_record_batches_per_stream: _,
206 bloom_filter_on_read: _, schema_force_view_types: _,
208 binary_as_string: _, coerce_int96: _, skip_arrow_metadata: _,
211 max_predicate_cache_size: _,
212 } = self;
213
214 let mut builder = WriterProperties::builder()
215 .set_data_page_size_limit(*data_pagesize_limit)
216 .set_write_batch_size(*write_batch_size)
217 .set_writer_version((*writer_version).into())
218 .set_dictionary_page_size_limit(*dictionary_page_size_limit)
219 .set_statistics_enabled(
220 statistics_enabled
221 .as_ref()
222 .and_then(|s| parse_statistics_string(s).ok())
223 .unwrap_or(DEFAULT_STATISTICS_ENABLED),
224 )
225 .set_max_row_group_row_count(Some(*max_row_group_size))
226 .set_created_by(created_by.clone())
227 .set_column_index_truncate_length(*column_index_truncate_length)
228 .set_statistics_truncate_length(*statistics_truncate_length)
229 .set_data_page_row_count_limit(*data_page_row_count_limit)
230 .set_bloom_filter_enabled(*bloom_filter_on_write);
231
232 if let Some(bloom_filter_fpp) = bloom_filter_fpp {
233 builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
234 };
235 if let Some(bloom_filter_ndv) = bloom_filter_ndv {
236 builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
237 };
238 if let Some(dictionary_enabled) = dictionary_enabled {
239 builder = builder.set_dictionary_enabled(*dictionary_enabled);
240 };
241
242 if let Some(compression) = compression {
245 builder = builder.set_compression(parse_compression_string(compression)?);
246 }
247 if let Some(encoding) = encoding {
248 builder = builder.set_encoding(parse_encoding_string(encoding)?);
249 }
250
251 Ok(builder)
252 }
253}
254
255pub(crate) fn parse_encoding_string(
257 str_setting: &str,
258) -> Result<parquet::basic::Encoding> {
259 let str_setting_lower: &str = &str_setting.to_lowercase();
260 match str_setting_lower {
261 "plain" => Ok(parquet::basic::Encoding::PLAIN),
262 "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
263 "rle" => Ok(parquet::basic::Encoding::RLE),
264 #[expect(deprecated)]
265 "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
266 "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
267 "delta_length_byte_array" => {
268 Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
269 }
270 "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
271 "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
272 "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
273 _ => Err(DataFusionError::Configuration(format!(
274 "Unknown or unsupported parquet encoding: \
275 {str_setting}. Valid values are: plain, plain_dictionary, rle, \
276 bit_packed, delta_binary_packed, delta_length_byte_array, \
277 delta_byte_array, rle_dictionary, and byte_stream_split."
278 ))),
279 }
280}
281
282fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
285 let str_setting = str_setting.replace('\'', "");
287 let split_setting = str_setting.split_once('(');
288
289 match split_setting {
290 Some((codec, rh)) => {
291 let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
292 DataFusionError::Configuration(format!(
293 "Could not parse compression string. \
294 Got codec: {codec} and unknown level from {str_setting}"
295 ))
296 })?;
297 Ok((codec.to_owned(), Some(*level)))
298 }
299 None => Ok((str_setting.to_owned(), None)),
300 }
301}
302
303fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
306 if level.is_some() {
307 return Err(DataFusionError::Configuration(format!(
308 "Compression {codec} does not support specifying a level"
309 )));
310 }
311 Ok(())
312}
313
314fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
317 level.ok_or(DataFusionError::Configuration(format!(
318 "{codec} compression requires specifying a level such as {codec}(4)"
319 )))
320}
321
322pub fn parse_compression_string(
324 str_setting: &str,
325) -> Result<parquet::basic::Compression> {
326 let str_setting_lower: &str = &str_setting.to_lowercase();
327 let (codec, level) = split_compression_string(str_setting_lower)?;
328 let codec = codec.as_str();
329 match codec {
330 "uncompressed" => {
331 check_level_is_none(codec, &level)?;
332 Ok(parquet::basic::Compression::UNCOMPRESSED)
333 }
334 "snappy" => {
335 check_level_is_none(codec, &level)?;
336 Ok(parquet::basic::Compression::SNAPPY)
337 }
338 "gzip" => {
339 let level = require_level(codec, level)?;
340 Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
341 level,
342 )?))
343 }
344 "brotli" => {
345 let level = require_level(codec, level)?;
346 Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
347 level,
348 )?))
349 }
350 "lz4" => {
351 check_level_is_none(codec, &level)?;
352 Ok(parquet::basic::Compression::LZ4)
353 }
354 "zstd" => {
355 let level = require_level(codec, level)?;
356 Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
357 level as i32,
358 )?))
359 }
360 "lz4_raw" => {
361 check_level_is_none(codec, &level)?;
362 Ok(parquet::basic::Compression::LZ4_RAW)
363 }
364 _ => Err(DataFusionError::Configuration(format!(
365 "Unknown or unsupported parquet compression: \
366 {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
367 brotli(level), lz4, zstd(level), and lz4_raw."
368 ))),
369 }
370}
371
372pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
373 let str_setting_lower: &str = &str_setting.to_lowercase();
374 match str_setting_lower {
375 "none" => Ok(EnabledStatistics::None),
376 "chunk" => Ok(EnabledStatistics::Chunk),
377 "page" => Ok(EnabledStatistics::Page),
378 _ => Err(DataFusionError::Configuration(format!(
379 "Unknown or unsupported parquet statistics setting {str_setting} \
380 valid options are none, page, and chunk"
381 ))),
382 }
383}
384
385#[cfg(feature = "parquet")]
386#[cfg(test)]
387mod tests {
388 use super::*;
389 #[cfg(feature = "parquet_encryption")]
390 use crate::config::ConfigFileEncryptionProperties;
391 use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
392 use crate::parquet_config::DFParquetWriterVersion;
393 use parquet::basic::Compression;
394 use parquet::file::properties::{
395 BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
396 DEFAULT_MAX_ROW_GROUP_ROW_COUNT, EnabledStatistics,
397 };
398 use std::collections::HashMap;
399
400 const COL_NAME: &str = "configured";
401
402 fn column_options_with_non_defaults(
404 src_col_defaults: &ParquetOptions,
405 ) -> ParquetColumnOptions {
406 ParquetColumnOptions {
407 compression: Some("zstd(22)".into()),
408 dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
409 statistics_enabled: Some("none".into()),
410 encoding: Some("RLE".into()),
411 bloom_filter_enabled: Some(true),
412 bloom_filter_fpp: Some(0.72),
413 bloom_filter_ndv: Some(72),
414 }
415 }
416
417 fn parquet_options_with_non_defaults() -> ParquetOptions {
418 let defaults = ParquetOptions::default();
419 let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
420 {
421 DFParquetWriterVersion::V2_0
422 } else {
423 DFParquetWriterVersion::V1_0
424 };
425
426 ParquetOptions {
427 data_pagesize_limit: 42,
428 write_batch_size: 42,
429 writer_version,
430 compression: Some("zstd(22)".into()),
431 dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
432 dictionary_page_size_limit: 42,
433 statistics_enabled: Some("chunk".into()),
434 max_row_group_size: 42,
435 created_by: "wordy".into(),
436 column_index_truncate_length: Some(42),
437 statistics_truncate_length: Some(42),
438 data_page_row_count_limit: 42,
439 encoding: Some("BYTE_STREAM_SPLIT".into()),
440 bloom_filter_on_write: !defaults.bloom_filter_on_write,
441 bloom_filter_fpp: Some(0.42),
442 bloom_filter_ndv: Some(42),
443
444 enable_page_index: defaults.enable_page_index,
446 pruning: defaults.pruning,
447 skip_metadata: defaults.skip_metadata,
448 metadata_size_hint: defaults.metadata_size_hint,
449 pushdown_filters: defaults.pushdown_filters,
450 reorder_filters: defaults.reorder_filters,
451 force_filter_selections: defaults.force_filter_selections,
452 allow_single_file_parallelism: defaults.allow_single_file_parallelism,
453 maximum_parallel_row_group_writers: defaults
454 .maximum_parallel_row_group_writers,
455 maximum_buffered_record_batches_per_stream: defaults
456 .maximum_buffered_record_batches_per_stream,
457 bloom_filter_on_read: defaults.bloom_filter_on_read,
458 schema_force_view_types: defaults.schema_force_view_types,
459 binary_as_string: defaults.binary_as_string,
460 skip_arrow_metadata: defaults.skip_arrow_metadata,
461 coerce_int96: None,
462 max_predicate_cache_size: defaults.max_predicate_cache_size,
463 }
464 }
465
466 fn extract_column_options(
467 props: &WriterProperties,
468 col: ColumnPath,
469 ) -> ParquetColumnOptions {
470 let bloom_filter_default_props = props.bloom_filter_properties(&col);
471
472 ParquetColumnOptions {
473 bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
474 encoding: props.encoding(&col).map(|s| s.to_string()),
475 dictionary_enabled: Some(props.dictionary_enabled(&col)),
476 compression: match props.compression(&col) {
477 Compression::ZSTD(lvl) => {
478 Some(format!("zstd({})", lvl.compression_level()))
479 }
480 _ => None,
481 },
482 statistics_enabled: Some(
483 match props.statistics_enabled(&col) {
484 EnabledStatistics::None => "none",
485 EnabledStatistics::Chunk => "chunk",
486 EnabledStatistics::Page => "page",
487 }
488 .into(),
489 ),
490 bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
491 bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
492 }
493 }
494
495 fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
498 let default_col = ColumnPath::from("col doesn't have specific config");
499 let default_col_props = extract_column_options(props, default_col);
500
501 let configured_col = ColumnPath::from(COL_NAME);
502 let configured_col_props = extract_column_options(props, configured_col);
503
504 let key_value_metadata = props
505 .key_value_metadata()
506 .map(|pairs| {
507 HashMap::from_iter(
508 pairs
509 .iter()
510 .cloned()
511 .map(|KeyValue { key, value }| (key, value)),
512 )
513 })
514 .unwrap_or_default();
515
516 let global_options_defaults = ParquetOptions::default();
517
518 let column_specific_options = if configured_col_props.eq(&default_col_props) {
519 HashMap::default()
520 } else {
521 HashMap::from([(COL_NAME.into(), configured_col_props)])
522 };
523
524 #[cfg(feature = "parquet_encryption")]
525 let fep = props
526 .file_encryption_properties()
527 .map(ConfigFileEncryptionProperties::from);
528
529 #[cfg(not(feature = "parquet_encryption"))]
530 let fep = None;
531
532 TableParquetOptions {
533 global: ParquetOptions {
534 data_pagesize_limit: props.dictionary_page_size_limit(),
536 write_batch_size: props.write_batch_size(),
537 writer_version: props.writer_version().into(),
538 dictionary_page_size_limit: props.dictionary_page_size_limit(),
539 max_row_group_size: props
540 .max_row_group_row_count()
541 .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
542 created_by: props.created_by().to_string(),
543 column_index_truncate_length: props.column_index_truncate_length(),
544 statistics_truncate_length: props.statistics_truncate_length(),
545 data_page_row_count_limit: props.data_page_row_count_limit(),
546
547 encoding: default_col_props.encoding,
549 compression: default_col_props.compression,
550 dictionary_enabled: default_col_props.dictionary_enabled,
551 statistics_enabled: default_col_props.statistics_enabled,
552 bloom_filter_on_write: default_col_props
553 .bloom_filter_enabled
554 .unwrap_or_default(),
555 bloom_filter_fpp: default_col_props.bloom_filter_fpp,
556 bloom_filter_ndv: default_col_props.bloom_filter_ndv,
557
558 enable_page_index: global_options_defaults.enable_page_index,
560 pruning: global_options_defaults.pruning,
561 skip_metadata: global_options_defaults.skip_metadata,
562 metadata_size_hint: global_options_defaults.metadata_size_hint,
563 pushdown_filters: global_options_defaults.pushdown_filters,
564 reorder_filters: global_options_defaults.reorder_filters,
565 force_filter_selections: global_options_defaults.force_filter_selections,
566 allow_single_file_parallelism: global_options_defaults
567 .allow_single_file_parallelism,
568 maximum_parallel_row_group_writers: global_options_defaults
569 .maximum_parallel_row_group_writers,
570 maximum_buffered_record_batches_per_stream: global_options_defaults
571 .maximum_buffered_record_batches_per_stream,
572 bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
573 max_predicate_cache_size: global_options_defaults
574 .max_predicate_cache_size,
575 schema_force_view_types: global_options_defaults.schema_force_view_types,
576 binary_as_string: global_options_defaults.binary_as_string,
577 skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
578 coerce_int96: None,
579 },
580 column_specific_options,
581 key_value_metadata,
582 crypto: ParquetEncryptionOptions {
583 file_encryption: fep,
584 file_decryption: None,
585 factory_id: None,
586 factory_options: Default::default(),
587 },
588 }
589 }
590
591 #[test]
592 fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
593 let mut table_parquet_opts = TableParquetOptions::default();
595 assert!(
596 !table_parquet_opts.global.skip_arrow_metadata,
597 "default false, to not skip the arrow schema requirement"
598 );
599
600 let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
602 assert!(
603 should_error.is_err(),
604 "should error without the required arrow schema in kv_metadata",
605 );
606
607 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
609 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
610 assert!(
611 should_succeed.is_ok(),
612 "should work with the arrow schema skipped by config",
613 );
614
615 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
617 table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
619 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
620 assert!(
621 should_succeed.is_ok(),
622 "should work with the arrow schema included in TableParquetOptions",
623 );
624 }
625
626 #[test]
627 fn table_parquet_opts_to_writer_props() {
628 let parquet_options = parquet_options_with_non_defaults();
630
631 let key = ARROW_SCHEMA_META_KEY.to_string();
633 let value = Some("bar".into());
634 let table_parquet_opts = TableParquetOptions {
635 global: parquet_options.clone(),
636 column_specific_options: [(
637 COL_NAME.into(),
638 column_options_with_non_defaults(&parquet_options),
639 )]
640 .into(),
641 key_value_metadata: [(key, value)].into(),
642 crypto: Default::default(),
643 };
644
645 let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
646 .unwrap()
647 .build();
648 assert_eq!(
649 table_parquet_opts,
650 session_config_from_writer_props(&writer_props),
651 "the writer_props should have the same configuration as the session's TableParquetOptions",
652 );
653 }
654
655 #[test]
658 fn test_defaults_match() {
659 let mut default_table_writer_opts = TableParquetOptions::default();
661 let default_parquet_opts = ParquetOptions::default();
662 assert_eq!(
663 default_table_writer_opts.global, default_parquet_opts,
664 "should have matching defaults for TableParquetOptions.global and ParquetOptions",
665 );
666
667 default_table_writer_opts =
669 default_table_writer_opts.with_skip_arrow_metadata(true);
670
671 let default_writer_props = WriterProperties::new();
673
674 let from_datafusion_defaults =
676 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
677 .unwrap()
678 .build();
679
680 assert_ne!(
682 default_writer_props.created_by(),
683 from_datafusion_defaults.created_by(),
684 "should have different created_by sources",
685 );
686 assert!(
687 default_writer_props
688 .created_by()
689 .starts_with("parquet-rs version"),
690 "should indicate that writer_props defaults came from the extern parquet crate",
691 );
692 assert!(
693 default_table_writer_opts
694 .global
695 .created_by
696 .starts_with("datafusion version"),
697 "should indicate that table_parquet_opts defaults came from datafusion",
698 );
699
700 assert_eq!(
702 default_writer_props.compression(&"default".into()),
703 Compression::UNCOMPRESSED,
704 "extern parquet's default is None"
705 );
706 assert!(
707 matches!(
708 from_datafusion_defaults.compression(&"default".into()),
709 Compression::ZSTD(_)
710 ),
711 "datafusion's default is zstd"
712 );
713
714 let same_created_by = default_table_writer_opts.global.created_by.clone();
716 let mut from_extern_parquet =
717 session_config_from_writer_props(&default_writer_props);
718 from_extern_parquet.global.created_by = same_created_by;
719 from_extern_parquet.global.compression = Some("zstd(3)".into());
720 from_extern_parquet.global.skip_arrow_metadata = true;
721
722 assert_eq!(
723 default_table_writer_opts, from_extern_parquet,
724 "the default writer_props should have the same configuration as the session's default TableParquetOptions",
725 );
726 }
727
728 #[test]
729 fn test_bloom_filter_defaults() {
730 let mut default_table_writer_opts = TableParquetOptions::default();
732 default_table_writer_opts.global.bloom_filter_on_write = true;
733 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
735 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
736 .unwrap()
737 .build();
738
739 let default_writer_props = WriterProperties::builder()
741 .set_bloom_filter_enabled(true)
742 .build();
743
744 assert_eq!(
745 default_writer_props.bloom_filter_properties(&"default".into()),
746 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
747 "parquet and datafusion props, should have the same bloom filter props",
748 );
749 assert_eq!(
750 default_writer_props.bloom_filter_properties(&"default".into()),
751 Some(&BloomFilterProperties::default()),
752 "should use the default bloom filter props"
753 );
754 }
755
756 #[test]
757 fn test_bloom_filter_set_fpp_only() {
758 let mut default_table_writer_opts = TableParquetOptions::default();
760 default_table_writer_opts.global.bloom_filter_on_write = true;
761 default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
762 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
764 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
765 .unwrap()
766 .build();
767
768 let default_writer_props = WriterProperties::builder()
770 .set_bloom_filter_enabled(true)
771 .set_bloom_filter_fpp(0.42)
772 .build();
773
774 assert_eq!(
775 default_writer_props.bloom_filter_properties(&"default".into()),
776 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
777 "parquet and datafusion props, should have the same bloom filter props",
778 );
779 assert_eq!(
780 default_writer_props.bloom_filter_properties(&"default".into()),
781 Some(&BloomFilterProperties {
782 fpp: 0.42,
783 ndv: DEFAULT_BLOOM_FILTER_NDV
784 }),
785 "should have only the fpp set, and the ndv at default",
786 );
787 }
788
789 #[test]
790 fn test_bloom_filter_set_ndv_only() {
791 let mut default_table_writer_opts = TableParquetOptions::default();
793 default_table_writer_opts.global.bloom_filter_on_write = true;
794 default_table_writer_opts.global.bloom_filter_ndv = Some(42);
795 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
797 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
798 .unwrap()
799 .build();
800
801 let default_writer_props = WriterProperties::builder()
803 .set_bloom_filter_enabled(true)
804 .set_bloom_filter_ndv(42)
805 .build();
806
807 assert_eq!(
808 default_writer_props.bloom_filter_properties(&"default".into()),
809 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
810 "parquet and datafusion props, should have the same bloom filter props",
811 );
812 assert_eq!(
813 default_writer_props.bloom_filter_properties(&"default".into()),
814 Some(&BloomFilterProperties {
815 fpp: DEFAULT_BLOOM_FILTER_FPP,
816 ndv: 42
817 }),
818 "should have only the ndv set, and the fpp at default",
819 );
820 }
821}