1use base64::Engine;
21use std::sync::Arc;
22
23use crate::{
24 config::{ParquetOptions, TableParquetOptions},
25 DataFusionError, Result, _internal_datafusion_err,
26};
27
28use arrow::datatypes::Schema;
29#[allow(deprecated)]
31use parquet::{
32 arrow::ARROW_SCHEMA_META_KEY,
33 basic::{BrotliLevel, GzipLevel, ZstdLevel},
34 file::{
35 metadata::KeyValue,
36 properties::{
37 EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
38 DEFAULT_STATISTICS_ENABLED,
39 },
40 },
41 schema::types::ColumnPath,
42};
43
44#[derive(Clone, Debug)]
46pub struct ParquetWriterOptions {
47 pub writer_options: WriterProperties,
49}
50
51impl ParquetWriterOptions {
52 pub fn new(writer_options: WriterProperties) -> Self {
53 Self { writer_options }
54 }
55}
56
57impl ParquetWriterOptions {
58 pub fn writer_options(&self) -> &WriterProperties {
59 &self.writer_options
60 }
61}
62
63impl TableParquetOptions {
64 pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
67 self.key_value_metadata.insert(
68 ARROW_SCHEMA_META_KEY.into(),
69 Some(encode_arrow_schema(schema)),
70 );
71 }
72}
73
74impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
75 type Error = DataFusionError;
76
77 fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
78 Ok(ParquetWriterOptions {
80 writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
81 .build(),
82 })
83 }
84}
85
86impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
87 type Error = DataFusionError;
88
89 fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
95 let TableParquetOptions {
97 global,
98 column_specific_options,
99 key_value_metadata,
100 crypto: _,
101 } = table_parquet_options;
102
103 let mut builder = global.into_writer_properties_builder()?;
104
105 if !global.skip_arrow_metadata
107 && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
108 {
109 return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
110 }
111
112 if !key_value_metadata.is_empty() {
114 builder = builder.set_key_value_metadata(Some(
115 key_value_metadata
116 .to_owned()
117 .drain()
118 .map(|(key, value)| KeyValue { key, value })
119 .collect(),
120 ));
121 }
122
123 for (column, options) in column_specific_options {
125 let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127 if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128 builder = builder
129 .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130 }
131
132 if let Some(encoding) = &options.encoding {
133 let parsed_encoding = parse_encoding_string(encoding)?;
134 builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135 }
136
137 if let Some(dictionary_enabled) = options.dictionary_enabled {
138 builder = builder
139 .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140 }
141
142 if let Some(compression) = &options.compression {
143 let parsed_compression = parse_compression_string(compression)?;
144 builder =
145 builder.set_column_compression(path.clone(), parsed_compression);
146 }
147
148 if let Some(statistics_enabled) = &options.statistics_enabled {
149 let parsed_value = parse_statistics_string(statistics_enabled)?;
150 builder =
151 builder.set_column_statistics_enabled(path.clone(), parsed_value);
152 }
153
154 if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155 builder =
156 builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157 }
158
159 if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160 builder =
161 builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162 }
163 }
164
165 Ok(builder)
166 }
167}
168
169fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
174 let options = arrow_ipc::writer::IpcWriteOptions::default();
175 let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
176 let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
177 let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
178 schema,
179 &mut dictionary_tracker,
180 &options,
181 );
182
183 let schema_len = serialized_schema.ipc_message.len();
186 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
187 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
188 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
189 len_prefix_schema.append(&mut serialized_schema.ipc_message);
190
191 base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
192}
193
194impl ParquetOptions {
195 pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
202 #[allow(deprecated)]
203 let ParquetOptions {
204 data_pagesize_limit,
205 write_batch_size,
206 writer_version,
207 compression,
208 dictionary_enabled,
209 dictionary_page_size_limit,
210 statistics_enabled,
211 max_row_group_size,
212 created_by,
213 column_index_truncate_length,
214 statistics_truncate_length,
215 data_page_row_count_limit,
216 encoding,
217 bloom_filter_on_write,
218 bloom_filter_fpp,
219 bloom_filter_ndv,
220
221 enable_page_index: _,
223 pruning: _,
224 skip_metadata: _,
225 metadata_size_hint: _,
226 pushdown_filters: _,
227 reorder_filters: _,
228 allow_single_file_parallelism: _,
229 maximum_parallel_row_group_writers: _,
230 maximum_buffered_record_batches_per_stream: _,
231 bloom_filter_on_read: _, schema_force_view_types: _,
233 binary_as_string: _, coerce_int96: _, skip_arrow_metadata: _,
236 } = self;
237
238 let mut builder = WriterProperties::builder()
239 .set_data_page_size_limit(*data_pagesize_limit)
240 .set_write_batch_size(*write_batch_size)
241 .set_writer_version(parse_version_string(writer_version.as_str())?)
242 .set_dictionary_page_size_limit(*dictionary_page_size_limit)
243 .set_statistics_enabled(
244 statistics_enabled
245 .as_ref()
246 .and_then(|s| parse_statistics_string(s).ok())
247 .unwrap_or(DEFAULT_STATISTICS_ENABLED),
248 )
249 .set_max_row_group_size(*max_row_group_size)
250 .set_created_by(created_by.clone())
251 .set_column_index_truncate_length(*column_index_truncate_length)
252 .set_statistics_truncate_length(*statistics_truncate_length)
253 .set_data_page_row_count_limit(*data_page_row_count_limit)
254 .set_bloom_filter_enabled(*bloom_filter_on_write);
255
256 if let Some(bloom_filter_fpp) = bloom_filter_fpp {
257 builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
258 };
259 if let Some(bloom_filter_ndv) = bloom_filter_ndv {
260 builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
261 };
262 if let Some(dictionary_enabled) = dictionary_enabled {
263 builder = builder.set_dictionary_enabled(*dictionary_enabled);
264 };
265
266 if let Some(compression) = compression {
269 builder = builder.set_compression(parse_compression_string(compression)?);
270 }
271 if let Some(encoding) = encoding {
272 builder = builder.set_encoding(parse_encoding_string(encoding)?);
273 }
274
275 Ok(builder)
276 }
277}
278
279pub(crate) fn parse_encoding_string(
281 str_setting: &str,
282) -> Result<parquet::basic::Encoding> {
283 let str_setting_lower: &str = &str_setting.to_lowercase();
284 match str_setting_lower {
285 "plain" => Ok(parquet::basic::Encoding::PLAIN),
286 "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
287 "rle" => Ok(parquet::basic::Encoding::RLE),
288 #[allow(deprecated)]
289 "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
290 "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
291 "delta_length_byte_array" => {
292 Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
293 }
294 "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
295 "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
296 "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
297 _ => Err(DataFusionError::Configuration(format!(
298 "Unknown or unsupported parquet encoding: \
299 {str_setting}. Valid values are: plain, plain_dictionary, rle, \
300 bit_packed, delta_binary_packed, delta_length_byte_array, \
301 delta_byte_array, rle_dictionary, and byte_stream_split."
302 ))),
303 }
304}
305
306fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
309 let str_setting = str_setting.replace('\'', "");
311 let split_setting = str_setting.split_once('(');
312
313 match split_setting {
314 Some((codec, rh)) => {
315 let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
316 DataFusionError::Configuration(format!(
317 "Could not parse compression string. \
318 Got codec: {codec} and unknown level from {str_setting}"
319 ))
320 })?;
321 Ok((codec.to_owned(), Some(*level)))
322 }
323 None => Ok((str_setting.to_owned(), None)),
324 }
325}
326
327fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
330 if level.is_some() {
331 return Err(DataFusionError::Configuration(format!(
332 "Compression {codec} does not support specifying a level"
333 )));
334 }
335 Ok(())
336}
337
338fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
341 level.ok_or(DataFusionError::Configuration(format!(
342 "{codec} compression requires specifying a level such as {codec}(4)"
343 )))
344}
345
346pub fn parse_compression_string(
348 str_setting: &str,
349) -> Result<parquet::basic::Compression> {
350 let str_setting_lower: &str = &str_setting.to_lowercase();
351 let (codec, level) = split_compression_string(str_setting_lower)?;
352 let codec = codec.as_str();
353 match codec {
354 "uncompressed" => {
355 check_level_is_none(codec, &level)?;
356 Ok(parquet::basic::Compression::UNCOMPRESSED)
357 }
358 "snappy" => {
359 check_level_is_none(codec, &level)?;
360 Ok(parquet::basic::Compression::SNAPPY)
361 }
362 "gzip" => {
363 let level = require_level(codec, level)?;
364 Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
365 level,
366 )?))
367 }
368 "lzo" => {
369 check_level_is_none(codec, &level)?;
370 Ok(parquet::basic::Compression::LZO)
371 }
372 "brotli" => {
373 let level = require_level(codec, level)?;
374 Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
375 level,
376 )?))
377 }
378 "lz4" => {
379 check_level_is_none(codec, &level)?;
380 Ok(parquet::basic::Compression::LZ4)
381 }
382 "zstd" => {
383 let level = require_level(codec, level)?;
384 Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
385 level as i32,
386 )?))
387 }
388 "lz4_raw" => {
389 check_level_is_none(codec, &level)?;
390 Ok(parquet::basic::Compression::LZ4_RAW)
391 }
392 _ => Err(DataFusionError::Configuration(format!(
393 "Unknown or unsupported parquet compression: \
394 {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
395 lzo, brotli(level), lz4, zstd(level), and lz4_raw."
396 ))),
397 }
398}
399
400pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
401 let str_setting_lower: &str = &str_setting.to_lowercase();
402 match str_setting_lower {
403 "1.0" => Ok(WriterVersion::PARQUET_1_0),
404 "2.0" => Ok(WriterVersion::PARQUET_2_0),
405 _ => Err(DataFusionError::Configuration(format!(
406 "Unknown or unsupported parquet writer version {str_setting} \
407 valid options are 1.0 and 2.0"
408 ))),
409 }
410}
411
412pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
413 let str_setting_lower: &str = &str_setting.to_lowercase();
414 match str_setting_lower {
415 "none" => Ok(EnabledStatistics::None),
416 "chunk" => Ok(EnabledStatistics::Chunk),
417 "page" => Ok(EnabledStatistics::Page),
418 _ => Err(DataFusionError::Configuration(format!(
419 "Unknown or unsupported parquet statistics setting {str_setting} \
420 valid options are none, page, and chunk"
421 ))),
422 }
423}
424
425#[cfg(feature = "parquet")]
426#[cfg(test)]
427mod tests {
428 use parquet::{
429 basic::Compression,
430 file::properties::{
431 BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
432 DEFAULT_BLOOM_FILTER_NDV,
433 },
434 };
435 use std::collections::HashMap;
436
437 use super::*;
438 use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
439 #[cfg(feature = "parquet_encryption")]
440 use crate::encryption::map_encryption_to_config_encryption;
441
442 const COL_NAME: &str = "configured";
443
444 fn column_options_with_non_defaults(
446 src_col_defaults: &ParquetOptions,
447 ) -> ParquetColumnOptions {
448 ParquetColumnOptions {
449 compression: Some("zstd(22)".into()),
450 dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
451 statistics_enabled: Some("none".into()),
452 encoding: Some("RLE".into()),
453 bloom_filter_enabled: Some(true),
454 bloom_filter_fpp: Some(0.72),
455 bloom_filter_ndv: Some(72),
456 }
457 }
458
459 fn parquet_options_with_non_defaults() -> ParquetOptions {
460 let defaults = ParquetOptions::default();
461 let writer_version = if defaults.writer_version.eq("1.0") {
462 "2.0"
463 } else {
464 "1.0"
465 };
466
467 #[allow(deprecated)] ParquetOptions {
469 data_pagesize_limit: 42,
470 write_batch_size: 42,
471 writer_version: writer_version.into(),
472 compression: Some("zstd(22)".into()),
473 dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
474 dictionary_page_size_limit: 42,
475 statistics_enabled: Some("chunk".into()),
476 max_row_group_size: 42,
477 created_by: "wordy".into(),
478 column_index_truncate_length: Some(42),
479 statistics_truncate_length: Some(42),
480 data_page_row_count_limit: 42,
481 encoding: Some("BYTE_STREAM_SPLIT".into()),
482 bloom_filter_on_write: !defaults.bloom_filter_on_write,
483 bloom_filter_fpp: Some(0.42),
484 bloom_filter_ndv: Some(42),
485
486 enable_page_index: defaults.enable_page_index,
488 pruning: defaults.pruning,
489 skip_metadata: defaults.skip_metadata,
490 metadata_size_hint: defaults.metadata_size_hint,
491 pushdown_filters: defaults.pushdown_filters,
492 reorder_filters: defaults.reorder_filters,
493 allow_single_file_parallelism: defaults.allow_single_file_parallelism,
494 maximum_parallel_row_group_writers: defaults
495 .maximum_parallel_row_group_writers,
496 maximum_buffered_record_batches_per_stream: defaults
497 .maximum_buffered_record_batches_per_stream,
498 bloom_filter_on_read: defaults.bloom_filter_on_read,
499 schema_force_view_types: defaults.schema_force_view_types,
500 binary_as_string: defaults.binary_as_string,
501 skip_arrow_metadata: defaults.skip_arrow_metadata,
502 coerce_int96: None,
503 }
504 }
505
506 fn extract_column_options(
507 props: &WriterProperties,
508 col: ColumnPath,
509 ) -> ParquetColumnOptions {
510 let bloom_filter_default_props = props.bloom_filter_properties(&col);
511
512 #[allow(deprecated)] ParquetColumnOptions {
514 bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
515 encoding: props.encoding(&col).map(|s| s.to_string()),
516 dictionary_enabled: Some(props.dictionary_enabled(&col)),
517 compression: match props.compression(&col) {
518 Compression::ZSTD(lvl) => {
519 Some(format!("zstd({})", lvl.compression_level()))
520 }
521 _ => None,
522 },
523 statistics_enabled: Some(
524 match props.statistics_enabled(&col) {
525 EnabledStatistics::None => "none",
526 EnabledStatistics::Chunk => "chunk",
527 EnabledStatistics::Page => "page",
528 }
529 .into(),
530 ),
531 bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
532 bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
533 }
534 }
535
536 fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
539 let default_col = ColumnPath::from("col doesn't have specific config");
540 let default_col_props = extract_column_options(props, default_col);
541
542 let configured_col = ColumnPath::from(COL_NAME);
543 let configured_col_props = extract_column_options(props, configured_col);
544
545 let key_value_metadata = props
546 .key_value_metadata()
547 .map(|pairs| {
548 HashMap::from_iter(
549 pairs
550 .iter()
551 .cloned()
552 .map(|KeyValue { key, value }| (key, value)),
553 )
554 })
555 .unwrap_or_default();
556
557 let global_options_defaults = ParquetOptions::default();
558
559 let column_specific_options = if configured_col_props.eq(&default_col_props) {
560 HashMap::default()
561 } else {
562 HashMap::from([(COL_NAME.into(), configured_col_props)])
563 };
564
565 #[cfg(feature = "parquet_encryption")]
566 let fep = map_encryption_to_config_encryption(props.file_encryption_properties());
567 #[cfg(not(feature = "parquet_encryption"))]
568 let fep = None;
569
570 #[allow(deprecated)] TableParquetOptions {
572 global: ParquetOptions {
573 data_pagesize_limit: props.dictionary_page_size_limit(),
575 write_batch_size: props.write_batch_size(),
576 writer_version: format!("{}.0", props.writer_version().as_num()),
577 dictionary_page_size_limit: props.dictionary_page_size_limit(),
578 max_row_group_size: props.max_row_group_size(),
579 created_by: props.created_by().to_string(),
580 column_index_truncate_length: props.column_index_truncate_length(),
581 statistics_truncate_length: props.statistics_truncate_length(),
582 data_page_row_count_limit: props.data_page_row_count_limit(),
583
584 encoding: default_col_props.encoding,
586 compression: default_col_props.compression,
587 dictionary_enabled: default_col_props.dictionary_enabled,
588 statistics_enabled: default_col_props.statistics_enabled,
589 bloom_filter_on_write: default_col_props
590 .bloom_filter_enabled
591 .unwrap_or_default(),
592 bloom_filter_fpp: default_col_props.bloom_filter_fpp,
593 bloom_filter_ndv: default_col_props.bloom_filter_ndv,
594
595 enable_page_index: global_options_defaults.enable_page_index,
597 pruning: global_options_defaults.pruning,
598 skip_metadata: global_options_defaults.skip_metadata,
599 metadata_size_hint: global_options_defaults.metadata_size_hint,
600 pushdown_filters: global_options_defaults.pushdown_filters,
601 reorder_filters: global_options_defaults.reorder_filters,
602 allow_single_file_parallelism: global_options_defaults
603 .allow_single_file_parallelism,
604 maximum_parallel_row_group_writers: global_options_defaults
605 .maximum_parallel_row_group_writers,
606 maximum_buffered_record_batches_per_stream: global_options_defaults
607 .maximum_buffered_record_batches_per_stream,
608 bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
609 schema_force_view_types: global_options_defaults.schema_force_view_types,
610 binary_as_string: global_options_defaults.binary_as_string,
611 skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
612 coerce_int96: None,
613 },
614 column_specific_options,
615 key_value_metadata,
616 crypto: ParquetEncryptionOptions {
617 file_encryption: fep,
618 file_decryption: None,
619 factory_id: None,
620 factory_options: Default::default(),
621 },
622 }
623 }
624
625 #[test]
626 fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
627 let mut table_parquet_opts = TableParquetOptions::default();
629 assert!(
630 !table_parquet_opts.global.skip_arrow_metadata,
631 "default false, to not skip the arrow schema requirement"
632 );
633
634 let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
636 assert!(
637 should_error.is_err(),
638 "should error without the required arrow schema in kv_metadata",
639 );
640
641 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
643 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
644 assert!(
645 should_succeed.is_ok(),
646 "should work with the arrow schema skipped by config",
647 );
648
649 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
651 table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
653 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
654 assert!(
655 should_succeed.is_ok(),
656 "should work with the arrow schema included in TableParquetOptions",
657 );
658 }
659
660 #[test]
661 fn table_parquet_opts_to_writer_props() {
662 let parquet_options = parquet_options_with_non_defaults();
664
665 let key = ARROW_SCHEMA_META_KEY.to_string();
667 let value = Some("bar".into());
668 let table_parquet_opts = TableParquetOptions {
669 global: parquet_options.clone(),
670 column_specific_options: [(
671 COL_NAME.into(),
672 column_options_with_non_defaults(&parquet_options),
673 )]
674 .into(),
675 key_value_metadata: [(key, value)].into(),
676 crypto: Default::default(),
677 };
678
679 let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
680 .unwrap()
681 .build();
682 assert_eq!(
683 table_parquet_opts,
684 session_config_from_writer_props(&writer_props),
685 "the writer_props should have the same configuration as the session's TableParquetOptions",
686 );
687 }
688
689 #[test]
692 fn test_defaults_match() {
693 let mut default_table_writer_opts = TableParquetOptions::default();
695 let default_parquet_opts = ParquetOptions::default();
696 assert_eq!(
697 default_table_writer_opts.global,
698 default_parquet_opts,
699 "should have matching defaults for TableParquetOptions.global and ParquetOptions",
700 );
701
702 default_table_writer_opts =
704 default_table_writer_opts.with_skip_arrow_metadata(true);
705
706 let default_writer_props = WriterProperties::new();
708
709 let from_datafusion_defaults =
711 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
712 .unwrap()
713 .build();
714
715 assert_ne!(
717 default_writer_props.created_by(),
718 from_datafusion_defaults.created_by(),
719 "should have different created_by sources",
720 );
721 assert!(
722 default_writer_props.created_by().starts_with("parquet-rs version"),
723 "should indicate that writer_props defaults came from the extern parquet crate",
724 );
725 assert!(
726 default_table_writer_opts
727 .global
728 .created_by
729 .starts_with("datafusion version"),
730 "should indicate that table_parquet_opts defaults came from datafusion",
731 );
732
733 assert_eq!(
735 default_writer_props.compression(&"default".into()),
736 Compression::UNCOMPRESSED,
737 "extern parquet's default is None"
738 );
739 assert!(
740 matches!(
741 from_datafusion_defaults.compression(&"default".into()),
742 Compression::ZSTD(_)
743 ),
744 "datafusion's default is zstd"
745 );
746
747 let same_created_by = default_table_writer_opts.global.created_by.clone();
749 let mut from_extern_parquet =
750 session_config_from_writer_props(&default_writer_props);
751 from_extern_parquet.global.created_by = same_created_by;
752 from_extern_parquet.global.compression = Some("zstd(3)".into());
753 from_extern_parquet.global.skip_arrow_metadata = true;
754
755 assert_eq!(
756 default_table_writer_opts,
757 from_extern_parquet,
758 "the default writer_props should have the same configuration as the session's default TableParquetOptions",
759 );
760 }
761
762 #[test]
763 fn test_bloom_filter_defaults() {
764 let mut default_table_writer_opts = TableParquetOptions::default();
766 default_table_writer_opts.global.bloom_filter_on_write = true;
767 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
769 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
770 .unwrap()
771 .build();
772
773 let default_writer_props = WriterProperties::builder()
775 .set_bloom_filter_enabled(true)
776 .build();
777
778 assert_eq!(
779 default_writer_props.bloom_filter_properties(&"default".into()),
780 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
781 "parquet and datafusion props, should have the same bloom filter props",
782 );
783 assert_eq!(
784 default_writer_props.bloom_filter_properties(&"default".into()),
785 Some(&BloomFilterProperties::default()),
786 "should use the default bloom filter props"
787 );
788 }
789
790 #[test]
791 fn test_bloom_filter_set_fpp_only() {
792 let mut default_table_writer_opts = TableParquetOptions::default();
794 default_table_writer_opts.global.bloom_filter_on_write = true;
795 default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
796 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
798 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
799 .unwrap()
800 .build();
801
802 let default_writer_props = WriterProperties::builder()
804 .set_bloom_filter_enabled(true)
805 .set_bloom_filter_fpp(0.42)
806 .build();
807
808 assert_eq!(
809 default_writer_props.bloom_filter_properties(&"default".into()),
810 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
811 "parquet and datafusion props, should have the same bloom filter props",
812 );
813 assert_eq!(
814 default_writer_props.bloom_filter_properties(&"default".into()),
815 Some(&BloomFilterProperties {
816 fpp: 0.42,
817 ndv: DEFAULT_BLOOM_FILTER_NDV
818 }),
819 "should have only the fpp set, and the ndv at default",
820 );
821 }
822
823 #[test]
824 fn test_bloom_filter_set_ndv_only() {
825 let mut default_table_writer_opts = TableParquetOptions::default();
827 default_table_writer_opts.global.bloom_filter_on_write = true;
828 default_table_writer_opts.global.bloom_filter_ndv = Some(42);
829 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
831 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
832 .unwrap()
833 .build();
834
835 let default_writer_props = WriterProperties::builder()
837 .set_bloom_filter_enabled(true)
838 .set_bloom_filter_ndv(42)
839 .build();
840
841 assert_eq!(
842 default_writer_props.bloom_filter_properties(&"default".into()),
843 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
844 "parquet and datafusion props, should have the same bloom filter props",
845 );
846 assert_eq!(
847 default_writer_props.bloom_filter_properties(&"default".into()),
848 Some(&BloomFilterProperties {
849 fpp: DEFAULT_BLOOM_FILTER_FPP,
850 ndv: 42
851 }),
852 "should have only the ndv set, and the fpp at default",
853 );
854 }
855}