1use std::sync::Arc;
21
22use crate::{
23 config::{ParquetOptions, TableParquetOptions},
24 DataFusionError, Result, _internal_datafusion_err,
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29#[allow(deprecated)]
31use parquet::{
32 arrow::ARROW_SCHEMA_META_KEY,
33 basic::{BrotliLevel, GzipLevel, ZstdLevel},
34 file::{
35 metadata::KeyValue,
36 properties::{
37 EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
38 DEFAULT_STATISTICS_ENABLED,
39 },
40 },
41 schema::types::ColumnPath,
42};
43
44#[derive(Clone, Debug)]
46pub struct ParquetWriterOptions {
47 pub writer_options: WriterProperties,
49}
50
51impl ParquetWriterOptions {
52 pub fn new(writer_options: WriterProperties) -> Self {
53 Self { writer_options }
54 }
55}
56
57impl ParquetWriterOptions {
58 pub fn writer_options(&self) -> &WriterProperties {
59 &self.writer_options
60 }
61}
62
63impl TableParquetOptions {
64 pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
67 self.key_value_metadata.insert(
68 ARROW_SCHEMA_META_KEY.into(),
69 Some(encode_arrow_schema(schema)),
70 );
71 }
72}
73
74impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
75 type Error = DataFusionError;
76
77 fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
78 Ok(ParquetWriterOptions {
80 writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
81 .build(),
82 })
83 }
84}
85
86impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
87 type Error = DataFusionError;
88
89 fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
95 let TableParquetOptions {
97 global,
98 column_specific_options,
99 key_value_metadata,
100 crypto: _,
101 } = table_parquet_options;
102
103 let mut builder = global.into_writer_properties_builder()?;
104
105 if !global.skip_arrow_metadata
107 && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
108 {
109 return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
110 }
111
112 if !key_value_metadata.is_empty() {
114 builder = builder.set_key_value_metadata(Some(
115 key_value_metadata
116 .to_owned()
117 .drain()
118 .map(|(key, value)| KeyValue { key, value })
119 .collect(),
120 ));
121 }
122
123 for (column, options) in column_specific_options {
125 let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127 if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128 builder = builder
129 .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130 }
131
132 if let Some(encoding) = &options.encoding {
133 let parsed_encoding = parse_encoding_string(encoding)?;
134 builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135 }
136
137 if let Some(dictionary_enabled) = options.dictionary_enabled {
138 builder = builder
139 .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140 }
141
142 if let Some(compression) = &options.compression {
143 let parsed_compression = parse_compression_string(compression)?;
144 builder =
145 builder.set_column_compression(path.clone(), parsed_compression);
146 }
147
148 if let Some(statistics_enabled) = &options.statistics_enabled {
149 let parsed_value = parse_statistics_string(statistics_enabled)?;
150 builder =
151 builder.set_column_statistics_enabled(path.clone(), parsed_value);
152 }
153
154 if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155 builder =
156 builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157 }
158
159 if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160 builder =
161 builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162 }
163 }
164
165 Ok(builder)
166 }
167}
168
169impl ParquetOptions {
170 pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
177 #[allow(deprecated)]
178 let ParquetOptions {
179 data_pagesize_limit,
180 write_batch_size,
181 writer_version,
182 compression,
183 dictionary_enabled,
184 dictionary_page_size_limit,
185 statistics_enabled,
186 max_row_group_size,
187 created_by,
188 column_index_truncate_length,
189 statistics_truncate_length,
190 data_page_row_count_limit,
191 encoding,
192 bloom_filter_on_write,
193 bloom_filter_fpp,
194 bloom_filter_ndv,
195
196 enable_page_index: _,
198 pruning: _,
199 skip_metadata: _,
200 metadata_size_hint: _,
201 pushdown_filters: _,
202 reorder_filters: _,
203 allow_single_file_parallelism: _,
204 maximum_parallel_row_group_writers: _,
205 maximum_buffered_record_batches_per_stream: _,
206 bloom_filter_on_read: _, schema_force_view_types: _,
208 binary_as_string: _, coerce_int96: _, skip_arrow_metadata: _,
211 max_predicate_cache_size: _,
212 } = self;
213
214 let mut builder = WriterProperties::builder()
215 .set_data_page_size_limit(*data_pagesize_limit)
216 .set_write_batch_size(*write_batch_size)
217 .set_writer_version(parse_version_string(writer_version.as_str())?)
218 .set_dictionary_page_size_limit(*dictionary_page_size_limit)
219 .set_statistics_enabled(
220 statistics_enabled
221 .as_ref()
222 .and_then(|s| parse_statistics_string(s).ok())
223 .unwrap_or(DEFAULT_STATISTICS_ENABLED),
224 )
225 .set_max_row_group_size(*max_row_group_size)
226 .set_created_by(created_by.clone())
227 .set_column_index_truncate_length(*column_index_truncate_length)
228 .set_statistics_truncate_length(*statistics_truncate_length)
229 .set_data_page_row_count_limit(*data_page_row_count_limit)
230 .set_bloom_filter_enabled(*bloom_filter_on_write);
231
232 if let Some(bloom_filter_fpp) = bloom_filter_fpp {
233 builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
234 };
235 if let Some(bloom_filter_ndv) = bloom_filter_ndv {
236 builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
237 };
238 if let Some(dictionary_enabled) = dictionary_enabled {
239 builder = builder.set_dictionary_enabled(*dictionary_enabled);
240 };
241
242 if let Some(compression) = compression {
245 builder = builder.set_compression(parse_compression_string(compression)?);
246 }
247 if let Some(encoding) = encoding {
248 builder = builder.set_encoding(parse_encoding_string(encoding)?);
249 }
250
251 Ok(builder)
252 }
253}
254
255pub(crate) fn parse_encoding_string(
257 str_setting: &str,
258) -> Result<parquet::basic::Encoding> {
259 let str_setting_lower: &str = &str_setting.to_lowercase();
260 match str_setting_lower {
261 "plain" => Ok(parquet::basic::Encoding::PLAIN),
262 "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
263 "rle" => Ok(parquet::basic::Encoding::RLE),
264 #[allow(deprecated)]
265 "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
266 "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
267 "delta_length_byte_array" => {
268 Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
269 }
270 "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
271 "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
272 "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
273 _ => Err(DataFusionError::Configuration(format!(
274 "Unknown or unsupported parquet encoding: \
275 {str_setting}. Valid values are: plain, plain_dictionary, rle, \
276 bit_packed, delta_binary_packed, delta_length_byte_array, \
277 delta_byte_array, rle_dictionary, and byte_stream_split."
278 ))),
279 }
280}
281
282fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
285 let str_setting = str_setting.replace('\'', "");
287 let split_setting = str_setting.split_once('(');
288
289 match split_setting {
290 Some((codec, rh)) => {
291 let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
292 DataFusionError::Configuration(format!(
293 "Could not parse compression string. \
294 Got codec: {codec} and unknown level from {str_setting}"
295 ))
296 })?;
297 Ok((codec.to_owned(), Some(*level)))
298 }
299 None => Ok((str_setting.to_owned(), None)),
300 }
301}
302
303fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
306 if level.is_some() {
307 return Err(DataFusionError::Configuration(format!(
308 "Compression {codec} does not support specifying a level"
309 )));
310 }
311 Ok(())
312}
313
314fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
317 level.ok_or(DataFusionError::Configuration(format!(
318 "{codec} compression requires specifying a level such as {codec}(4)"
319 )))
320}
321
322pub fn parse_compression_string(
324 str_setting: &str,
325) -> Result<parquet::basic::Compression> {
326 let str_setting_lower: &str = &str_setting.to_lowercase();
327 let (codec, level) = split_compression_string(str_setting_lower)?;
328 let codec = codec.as_str();
329 match codec {
330 "uncompressed" => {
331 check_level_is_none(codec, &level)?;
332 Ok(parquet::basic::Compression::UNCOMPRESSED)
333 }
334 "snappy" => {
335 check_level_is_none(codec, &level)?;
336 Ok(parquet::basic::Compression::SNAPPY)
337 }
338 "gzip" => {
339 let level = require_level(codec, level)?;
340 Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
341 level,
342 )?))
343 }
344 "lzo" => {
345 check_level_is_none(codec, &level)?;
346 Ok(parquet::basic::Compression::LZO)
347 }
348 "brotli" => {
349 let level = require_level(codec, level)?;
350 Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
351 level,
352 )?))
353 }
354 "lz4" => {
355 check_level_is_none(codec, &level)?;
356 Ok(parquet::basic::Compression::LZ4)
357 }
358 "zstd" => {
359 let level = require_level(codec, level)?;
360 Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
361 level as i32,
362 )?))
363 }
364 "lz4_raw" => {
365 check_level_is_none(codec, &level)?;
366 Ok(parquet::basic::Compression::LZ4_RAW)
367 }
368 _ => Err(DataFusionError::Configuration(format!(
369 "Unknown or unsupported parquet compression: \
370 {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
371 lzo, brotli(level), lz4, zstd(level), and lz4_raw."
372 ))),
373 }
374}
375
376pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
377 let str_setting_lower: &str = &str_setting.to_lowercase();
378 match str_setting_lower {
379 "1.0" => Ok(WriterVersion::PARQUET_1_0),
380 "2.0" => Ok(WriterVersion::PARQUET_2_0),
381 _ => Err(DataFusionError::Configuration(format!(
382 "Unknown or unsupported parquet writer version {str_setting} \
383 valid options are 1.0 and 2.0"
384 ))),
385 }
386}
387
388pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
389 let str_setting_lower: &str = &str_setting.to_lowercase();
390 match str_setting_lower {
391 "none" => Ok(EnabledStatistics::None),
392 "chunk" => Ok(EnabledStatistics::Chunk),
393 "page" => Ok(EnabledStatistics::Page),
394 _ => Err(DataFusionError::Configuration(format!(
395 "Unknown or unsupported parquet statistics setting {str_setting} \
396 valid options are none, page, and chunk"
397 ))),
398 }
399}
400
401#[cfg(feature = "parquet")]
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use crate::config::{
406 ConfigFileEncryptionProperties, ParquetColumnOptions, ParquetEncryptionOptions,
407 ParquetOptions,
408 };
409 use parquet::basic::Compression;
410 use parquet::file::properties::{
411 BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
412 DEFAULT_BLOOM_FILTER_NDV,
413 };
414 use std::collections::HashMap;
415
416 const COL_NAME: &str = "configured";
417
418 fn column_options_with_non_defaults(
420 src_col_defaults: &ParquetOptions,
421 ) -> ParquetColumnOptions {
422 ParquetColumnOptions {
423 compression: Some("zstd(22)".into()),
424 dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
425 statistics_enabled: Some("none".into()),
426 encoding: Some("RLE".into()),
427 bloom_filter_enabled: Some(true),
428 bloom_filter_fpp: Some(0.72),
429 bloom_filter_ndv: Some(72),
430 }
431 }
432
433 fn parquet_options_with_non_defaults() -> ParquetOptions {
434 let defaults = ParquetOptions::default();
435 let writer_version = if defaults.writer_version.eq("1.0") {
436 "2.0"
437 } else {
438 "1.0"
439 };
440
441 #[allow(deprecated)] ParquetOptions {
443 data_pagesize_limit: 42,
444 write_batch_size: 42,
445 writer_version: writer_version.into(),
446 compression: Some("zstd(22)".into()),
447 dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
448 dictionary_page_size_limit: 42,
449 statistics_enabled: Some("chunk".into()),
450 max_row_group_size: 42,
451 created_by: "wordy".into(),
452 column_index_truncate_length: Some(42),
453 statistics_truncate_length: Some(42),
454 data_page_row_count_limit: 42,
455 encoding: Some("BYTE_STREAM_SPLIT".into()),
456 bloom_filter_on_write: !defaults.bloom_filter_on_write,
457 bloom_filter_fpp: Some(0.42),
458 bloom_filter_ndv: Some(42),
459
460 enable_page_index: defaults.enable_page_index,
462 pruning: defaults.pruning,
463 skip_metadata: defaults.skip_metadata,
464 metadata_size_hint: defaults.metadata_size_hint,
465 pushdown_filters: defaults.pushdown_filters,
466 reorder_filters: defaults.reorder_filters,
467 allow_single_file_parallelism: defaults.allow_single_file_parallelism,
468 maximum_parallel_row_group_writers: defaults
469 .maximum_parallel_row_group_writers,
470 maximum_buffered_record_batches_per_stream: defaults
471 .maximum_buffered_record_batches_per_stream,
472 bloom_filter_on_read: defaults.bloom_filter_on_read,
473 schema_force_view_types: defaults.schema_force_view_types,
474 binary_as_string: defaults.binary_as_string,
475 skip_arrow_metadata: defaults.skip_arrow_metadata,
476 coerce_int96: None,
477 max_predicate_cache_size: defaults.max_predicate_cache_size,
478 }
479 }
480
481 fn extract_column_options(
482 props: &WriterProperties,
483 col: ColumnPath,
484 ) -> ParquetColumnOptions {
485 let bloom_filter_default_props = props.bloom_filter_properties(&col);
486
487 #[allow(deprecated)] ParquetColumnOptions {
489 bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
490 encoding: props.encoding(&col).map(|s| s.to_string()),
491 dictionary_enabled: Some(props.dictionary_enabled(&col)),
492 compression: match props.compression(&col) {
493 Compression::ZSTD(lvl) => {
494 Some(format!("zstd({})", lvl.compression_level()))
495 }
496 _ => None,
497 },
498 statistics_enabled: Some(
499 match props.statistics_enabled(&col) {
500 EnabledStatistics::None => "none",
501 EnabledStatistics::Chunk => "chunk",
502 EnabledStatistics::Page => "page",
503 }
504 .into(),
505 ),
506 bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
507 bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
508 }
509 }
510
511 fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
514 let default_col = ColumnPath::from("col doesn't have specific config");
515 let default_col_props = extract_column_options(props, default_col);
516
517 let configured_col = ColumnPath::from(COL_NAME);
518 let configured_col_props = extract_column_options(props, configured_col);
519
520 let key_value_metadata = props
521 .key_value_metadata()
522 .map(|pairs| {
523 HashMap::from_iter(
524 pairs
525 .iter()
526 .cloned()
527 .map(|KeyValue { key, value }| (key, value)),
528 )
529 })
530 .unwrap_or_default();
531
532 let global_options_defaults = ParquetOptions::default();
533
534 let column_specific_options = if configured_col_props.eq(&default_col_props) {
535 HashMap::default()
536 } else {
537 HashMap::from([(COL_NAME.into(), configured_col_props)])
538 };
539
540 #[cfg(feature = "parquet_encryption")]
541 let fep = props
542 .file_encryption_properties()
543 .map(ConfigFileEncryptionProperties::from);
544
545 #[cfg(not(feature = "parquet_encryption"))]
546 let fep = None;
547
548 #[allow(deprecated)] TableParquetOptions {
550 global: ParquetOptions {
551 data_pagesize_limit: props.dictionary_page_size_limit(),
553 write_batch_size: props.write_batch_size(),
554 writer_version: format!("{}.0", props.writer_version().as_num()),
555 dictionary_page_size_limit: props.dictionary_page_size_limit(),
556 max_row_group_size: props.max_row_group_size(),
557 created_by: props.created_by().to_string(),
558 column_index_truncate_length: props.column_index_truncate_length(),
559 statistics_truncate_length: props.statistics_truncate_length(),
560 data_page_row_count_limit: props.data_page_row_count_limit(),
561
562 encoding: default_col_props.encoding,
564 compression: default_col_props.compression,
565 dictionary_enabled: default_col_props.dictionary_enabled,
566 statistics_enabled: default_col_props.statistics_enabled,
567 bloom_filter_on_write: default_col_props
568 .bloom_filter_enabled
569 .unwrap_or_default(),
570 bloom_filter_fpp: default_col_props.bloom_filter_fpp,
571 bloom_filter_ndv: default_col_props.bloom_filter_ndv,
572
573 enable_page_index: global_options_defaults.enable_page_index,
575 pruning: global_options_defaults.pruning,
576 skip_metadata: global_options_defaults.skip_metadata,
577 metadata_size_hint: global_options_defaults.metadata_size_hint,
578 pushdown_filters: global_options_defaults.pushdown_filters,
579 reorder_filters: global_options_defaults.reorder_filters,
580 allow_single_file_parallelism: global_options_defaults
581 .allow_single_file_parallelism,
582 maximum_parallel_row_group_writers: global_options_defaults
583 .maximum_parallel_row_group_writers,
584 maximum_buffered_record_batches_per_stream: global_options_defaults
585 .maximum_buffered_record_batches_per_stream,
586 bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
587 max_predicate_cache_size: global_options_defaults
588 .max_predicate_cache_size,
589 schema_force_view_types: global_options_defaults.schema_force_view_types,
590 binary_as_string: global_options_defaults.binary_as_string,
591 skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
592 coerce_int96: None,
593 },
594 column_specific_options,
595 key_value_metadata,
596 crypto: ParquetEncryptionOptions {
597 file_encryption: fep,
598 file_decryption: None,
599 factory_id: None,
600 factory_options: Default::default(),
601 },
602 }
603 }
604
605 #[test]
606 fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
607 let mut table_parquet_opts = TableParquetOptions::default();
609 assert!(
610 !table_parquet_opts.global.skip_arrow_metadata,
611 "default false, to not skip the arrow schema requirement"
612 );
613
614 let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
616 assert!(
617 should_error.is_err(),
618 "should error without the required arrow schema in kv_metadata",
619 );
620
621 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
623 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
624 assert!(
625 should_succeed.is_ok(),
626 "should work with the arrow schema skipped by config",
627 );
628
629 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
631 table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
633 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
634 assert!(
635 should_succeed.is_ok(),
636 "should work with the arrow schema included in TableParquetOptions",
637 );
638 }
639
640 #[test]
641 fn table_parquet_opts_to_writer_props() {
642 let parquet_options = parquet_options_with_non_defaults();
644
645 let key = ARROW_SCHEMA_META_KEY.to_string();
647 let value = Some("bar".into());
648 let table_parquet_opts = TableParquetOptions {
649 global: parquet_options.clone(),
650 column_specific_options: [(
651 COL_NAME.into(),
652 column_options_with_non_defaults(&parquet_options),
653 )]
654 .into(),
655 key_value_metadata: [(key, value)].into(),
656 crypto: Default::default(),
657 };
658
659 let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
660 .unwrap()
661 .build();
662 assert_eq!(
663 table_parquet_opts,
664 session_config_from_writer_props(&writer_props),
665 "the writer_props should have the same configuration as the session's TableParquetOptions",
666 );
667 }
668
669 #[test]
672 fn test_defaults_match() {
673 let mut default_table_writer_opts = TableParquetOptions::default();
675 let default_parquet_opts = ParquetOptions::default();
676 assert_eq!(
677 default_table_writer_opts.global,
678 default_parquet_opts,
679 "should have matching defaults for TableParquetOptions.global and ParquetOptions",
680 );
681
682 default_table_writer_opts =
684 default_table_writer_opts.with_skip_arrow_metadata(true);
685
686 let default_writer_props = WriterProperties::new();
688
689 let from_datafusion_defaults =
691 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
692 .unwrap()
693 .build();
694
695 assert_ne!(
697 default_writer_props.created_by(),
698 from_datafusion_defaults.created_by(),
699 "should have different created_by sources",
700 );
701 assert!(
702 default_writer_props.created_by().starts_with("parquet-rs version"),
703 "should indicate that writer_props defaults came from the extern parquet crate",
704 );
705 assert!(
706 default_table_writer_opts
707 .global
708 .created_by
709 .starts_with("datafusion version"),
710 "should indicate that table_parquet_opts defaults came from datafusion",
711 );
712
713 assert_eq!(
715 default_writer_props.compression(&"default".into()),
716 Compression::UNCOMPRESSED,
717 "extern parquet's default is None"
718 );
719 assert!(
720 matches!(
721 from_datafusion_defaults.compression(&"default".into()),
722 Compression::ZSTD(_)
723 ),
724 "datafusion's default is zstd"
725 );
726
727 let same_created_by = default_table_writer_opts.global.created_by.clone();
729 let mut from_extern_parquet =
730 session_config_from_writer_props(&default_writer_props);
731 from_extern_parquet.global.created_by = same_created_by;
732 from_extern_parquet.global.compression = Some("zstd(3)".into());
733 from_extern_parquet.global.skip_arrow_metadata = true;
734
735 assert_eq!(
736 default_table_writer_opts,
737 from_extern_parquet,
738 "the default writer_props should have the same configuration as the session's default TableParquetOptions",
739 );
740 }
741
742 #[test]
743 fn test_bloom_filter_defaults() {
744 let mut default_table_writer_opts = TableParquetOptions::default();
746 default_table_writer_opts.global.bloom_filter_on_write = true;
747 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
749 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
750 .unwrap()
751 .build();
752
753 let default_writer_props = WriterProperties::builder()
755 .set_bloom_filter_enabled(true)
756 .build();
757
758 assert_eq!(
759 default_writer_props.bloom_filter_properties(&"default".into()),
760 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
761 "parquet and datafusion props, should have the same bloom filter props",
762 );
763 assert_eq!(
764 default_writer_props.bloom_filter_properties(&"default".into()),
765 Some(&BloomFilterProperties::default()),
766 "should use the default bloom filter props"
767 );
768 }
769
770 #[test]
771 fn test_bloom_filter_set_fpp_only() {
772 let mut default_table_writer_opts = TableParquetOptions::default();
774 default_table_writer_opts.global.bloom_filter_on_write = true;
775 default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
776 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
778 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
779 .unwrap()
780 .build();
781
782 let default_writer_props = WriterProperties::builder()
784 .set_bloom_filter_enabled(true)
785 .set_bloom_filter_fpp(0.42)
786 .build();
787
788 assert_eq!(
789 default_writer_props.bloom_filter_properties(&"default".into()),
790 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
791 "parquet and datafusion props, should have the same bloom filter props",
792 );
793 assert_eq!(
794 default_writer_props.bloom_filter_properties(&"default".into()),
795 Some(&BloomFilterProperties {
796 fpp: 0.42,
797 ndv: DEFAULT_BLOOM_FILTER_NDV
798 }),
799 "should have only the fpp set, and the ndv at default",
800 );
801 }
802
803 #[test]
804 fn test_bloom_filter_set_ndv_only() {
805 let mut default_table_writer_opts = TableParquetOptions::default();
807 default_table_writer_opts.global.bloom_filter_on_write = true;
808 default_table_writer_opts.global.bloom_filter_ndv = Some(42);
809 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
811 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
812 .unwrap()
813 .build();
814
815 let default_writer_props = WriterProperties::builder()
817 .set_bloom_filter_enabled(true)
818 .set_bloom_filter_ndv(42)
819 .build();
820
821 assert_eq!(
822 default_writer_props.bloom_filter_properties(&"default".into()),
823 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
824 "parquet and datafusion props, should have the same bloom filter props",
825 );
826 assert_eq!(
827 default_writer_props.bloom_filter_properties(&"default".into()),
828 Some(&BloomFilterProperties {
829 fpp: DEFAULT_BLOOM_FILTER_FPP,
830 ndv: 42
831 }),
832 "should have only the ndv set, and the fpp at default",
833 );
834 }
835}