1use base64::Engine;
21use std::sync::Arc;
22
23use crate::{
24 config::{ParquetOptions, TableParquetOptions},
25 DataFusionError, Result, _internal_datafusion_err,
26};
27
28use arrow::datatypes::Schema;
29use crate::encryption::add_crypto_to_writer_properties;
31#[allow(deprecated)]
32use parquet::{
33 arrow::ARROW_SCHEMA_META_KEY,
34 basic::{BrotliLevel, GzipLevel, ZstdLevel},
35 file::{
36 metadata::KeyValue,
37 properties::{
38 EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
39 DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
40 },
41 },
42 schema::types::ColumnPath,
43};
44
45#[derive(Clone, Debug)]
47pub struct ParquetWriterOptions {
48 pub writer_options: WriterProperties,
50}
51
52impl ParquetWriterOptions {
53 pub fn new(writer_options: WriterProperties) -> Self {
54 Self { writer_options }
55 }
56}
57
58impl ParquetWriterOptions {
59 pub fn writer_options(&self) -> &WriterProperties {
60 &self.writer_options
61 }
62}
63
64impl TableParquetOptions {
65 pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
68 self.key_value_metadata.insert(
69 ARROW_SCHEMA_META_KEY.into(),
70 Some(encode_arrow_schema(schema)),
71 );
72 }
73}
74
75impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
76 type Error = DataFusionError;
77
78 fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
79 Ok(ParquetWriterOptions {
81 writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
82 .build(),
83 })
84 }
85}
86
87impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
88 type Error = DataFusionError;
89
90 fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
94 let TableParquetOptions {
96 global,
97 column_specific_options,
98 key_value_metadata,
99 crypto,
100 } = table_parquet_options;
101
102 let mut builder = global.into_writer_properties_builder()?;
103
104 builder = add_crypto_to_writer_properties(crypto, builder);
105
106 if !global.skip_arrow_metadata
108 && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
109 {
110 return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
111 }
112
113 if !key_value_metadata.is_empty() {
115 builder = builder.set_key_value_metadata(Some(
116 key_value_metadata
117 .to_owned()
118 .drain()
119 .map(|(key, value)| KeyValue { key, value })
120 .collect(),
121 ));
122 }
123
124 for (column, options) in column_specific_options {
126 let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
127
128 if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
129 builder = builder
130 .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
131 }
132
133 if let Some(encoding) = &options.encoding {
134 let parsed_encoding = parse_encoding_string(encoding)?;
135 builder = builder.set_column_encoding(path.clone(), parsed_encoding);
136 }
137
138 if let Some(dictionary_enabled) = options.dictionary_enabled {
139 builder = builder
140 .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
141 }
142
143 if let Some(compression) = &options.compression {
144 let parsed_compression = parse_compression_string(compression)?;
145 builder =
146 builder.set_column_compression(path.clone(), parsed_compression);
147 }
148
149 if let Some(statistics_enabled) = &options.statistics_enabled {
150 let parsed_value = parse_statistics_string(statistics_enabled)?;
151 builder =
152 builder.set_column_statistics_enabled(path.clone(), parsed_value);
153 }
154
155 if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
156 builder =
157 builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
158 }
159
160 if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
161 builder =
162 builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
163 }
164
165 #[allow(deprecated)]
168 if let Some(max_statistics_size) = options.max_statistics_size {
169 builder = {
170 #[allow(deprecated)]
171 builder.set_column_max_statistics_size(path, max_statistics_size)
172 }
173 }
174 }
175
176 Ok(builder)
177 }
178}
179
180fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
185 let options = arrow_ipc::writer::IpcWriteOptions::default();
186 let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
187 let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
188 let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
189 schema,
190 &mut dictionary_tracker,
191 &options,
192 );
193
194 let schema_len = serialized_schema.ipc_message.len();
197 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
198 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
199 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
200 len_prefix_schema.append(&mut serialized_schema.ipc_message);
201
202 base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
203}
204
205impl ParquetOptions {
206 pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
213 #[allow(deprecated)]
214 let ParquetOptions {
215 data_pagesize_limit,
216 write_batch_size,
217 writer_version,
218 compression,
219 dictionary_enabled,
220 dictionary_page_size_limit,
221 statistics_enabled,
222 max_statistics_size,
223 max_row_group_size,
224 created_by,
225 column_index_truncate_length,
226 statistics_truncate_length,
227 data_page_row_count_limit,
228 encoding,
229 bloom_filter_on_write,
230 bloom_filter_fpp,
231 bloom_filter_ndv,
232
233 enable_page_index: _,
235 pruning: _,
236 skip_metadata: _,
237 metadata_size_hint: _,
238 pushdown_filters: _,
239 reorder_filters: _,
240 allow_single_file_parallelism: _,
241 maximum_parallel_row_group_writers: _,
242 maximum_buffered_record_batches_per_stream: _,
243 bloom_filter_on_read: _, schema_force_view_types: _,
245 binary_as_string: _, coerce_int96: _, skip_arrow_metadata: _,
248 } = self;
249
250 let mut builder = WriterProperties::builder()
251 .set_data_page_size_limit(*data_pagesize_limit)
252 .set_write_batch_size(*write_batch_size)
253 .set_writer_version(parse_version_string(writer_version.as_str())?)
254 .set_dictionary_page_size_limit(*dictionary_page_size_limit)
255 .set_statistics_enabled(
256 statistics_enabled
257 .as_ref()
258 .and_then(|s| parse_statistics_string(s).ok())
259 .unwrap_or(DEFAULT_STATISTICS_ENABLED),
260 )
261 .set_max_row_group_size(*max_row_group_size)
262 .set_created_by(created_by.clone())
263 .set_column_index_truncate_length(*column_index_truncate_length)
264 .set_statistics_truncate_length(*statistics_truncate_length)
265 .set_data_page_row_count_limit(*data_page_row_count_limit)
266 .set_bloom_filter_enabled(*bloom_filter_on_write);
267
268 builder = {
269 #[allow(deprecated)]
270 builder.set_max_statistics_size(
271 max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
272 )
273 };
274
275 if let Some(bloom_filter_fpp) = bloom_filter_fpp {
276 builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
277 };
278 if let Some(bloom_filter_ndv) = bloom_filter_ndv {
279 builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
280 };
281 if let Some(dictionary_enabled) = dictionary_enabled {
282 builder = builder.set_dictionary_enabled(*dictionary_enabled);
283 };
284
285 if let Some(compression) = compression {
288 builder = builder.set_compression(parse_compression_string(compression)?);
289 }
290 if let Some(encoding) = encoding {
291 builder = builder.set_encoding(parse_encoding_string(encoding)?);
292 }
293
294 Ok(builder)
295 }
296}
297
298pub(crate) fn parse_encoding_string(
300 str_setting: &str,
301) -> Result<parquet::basic::Encoding> {
302 let str_setting_lower: &str = &str_setting.to_lowercase();
303 match str_setting_lower {
304 "plain" => Ok(parquet::basic::Encoding::PLAIN),
305 "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
306 "rle" => Ok(parquet::basic::Encoding::RLE),
307 #[allow(deprecated)]
308 "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
309 "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
310 "delta_length_byte_array" => {
311 Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
312 }
313 "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
314 "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
315 "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
316 _ => Err(DataFusionError::Configuration(format!(
317 "Unknown or unsupported parquet encoding: \
318 {str_setting}. Valid values are: plain, plain_dictionary, rle, \
319 bit_packed, delta_binary_packed, delta_length_byte_array, \
320 delta_byte_array, rle_dictionary, and byte_stream_split."
321 ))),
322 }
323}
324
325fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
328 let str_setting = str_setting.replace('\'', "");
330 let split_setting = str_setting.split_once('(');
331
332 match split_setting {
333 Some((codec, rh)) => {
334 let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
335 DataFusionError::Configuration(format!(
336 "Could not parse compression string. \
337 Got codec: {codec} and unknown level from {str_setting}"
338 ))
339 })?;
340 Ok((codec.to_owned(), Some(*level)))
341 }
342 None => Ok((str_setting.to_owned(), None)),
343 }
344}
345
346fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
349 if level.is_some() {
350 return Err(DataFusionError::Configuration(format!(
351 "Compression {codec} does not support specifying a level"
352 )));
353 }
354 Ok(())
355}
356
357fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
360 level.ok_or(DataFusionError::Configuration(format!(
361 "{codec} compression requires specifying a level such as {codec}(4)"
362 )))
363}
364
365pub fn parse_compression_string(
367 str_setting: &str,
368) -> Result<parquet::basic::Compression> {
369 let str_setting_lower: &str = &str_setting.to_lowercase();
370 let (codec, level) = split_compression_string(str_setting_lower)?;
371 let codec = codec.as_str();
372 match codec {
373 "uncompressed" => {
374 check_level_is_none(codec, &level)?;
375 Ok(parquet::basic::Compression::UNCOMPRESSED)
376 }
377 "snappy" => {
378 check_level_is_none(codec, &level)?;
379 Ok(parquet::basic::Compression::SNAPPY)
380 }
381 "gzip" => {
382 let level = require_level(codec, level)?;
383 Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
384 level,
385 )?))
386 }
387 "lzo" => {
388 check_level_is_none(codec, &level)?;
389 Ok(parquet::basic::Compression::LZO)
390 }
391 "brotli" => {
392 let level = require_level(codec, level)?;
393 Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
394 level,
395 )?))
396 }
397 "lz4" => {
398 check_level_is_none(codec, &level)?;
399 Ok(parquet::basic::Compression::LZ4)
400 }
401 "zstd" => {
402 let level = require_level(codec, level)?;
403 Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
404 level as i32,
405 )?))
406 }
407 "lz4_raw" => {
408 check_level_is_none(codec, &level)?;
409 Ok(parquet::basic::Compression::LZ4_RAW)
410 }
411 _ => Err(DataFusionError::Configuration(format!(
412 "Unknown or unsupported parquet compression: \
413 {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
414 lzo, brotli(level), lz4, zstd(level), and lz4_raw."
415 ))),
416 }
417}
418
419pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
420 let str_setting_lower: &str = &str_setting.to_lowercase();
421 match str_setting_lower {
422 "1.0" => Ok(WriterVersion::PARQUET_1_0),
423 "2.0" => Ok(WriterVersion::PARQUET_2_0),
424 _ => Err(DataFusionError::Configuration(format!(
425 "Unknown or unsupported parquet writer version {str_setting} \
426 valid options are 1.0 and 2.0"
427 ))),
428 }
429}
430
431pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
432 let str_setting_lower: &str = &str_setting.to_lowercase();
433 match str_setting_lower {
434 "none" => Ok(EnabledStatistics::None),
435 "chunk" => Ok(EnabledStatistics::Chunk),
436 "page" => Ok(EnabledStatistics::Page),
437 _ => Err(DataFusionError::Configuration(format!(
438 "Unknown or unsupported parquet statistics setting {str_setting} \
439 valid options are none, page, and chunk"
440 ))),
441 }
442}
443
444#[cfg(feature = "parquet")]
445#[cfg(test)]
446mod tests {
447 use parquet::{
448 basic::Compression,
449 file::properties::{
450 BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
451 DEFAULT_BLOOM_FILTER_NDV,
452 },
453 };
454 use std::collections::HashMap;
455
456 use super::*;
457 use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
458 #[cfg(feature = "parquet_encryption")]
459 use crate::encryption::map_encryption_to_config_encryption;
460
461 const COL_NAME: &str = "configured";
462
463 fn column_options_with_non_defaults(
465 src_col_defaults: &ParquetOptions,
466 ) -> ParquetColumnOptions {
467 #[allow(deprecated)] ParquetColumnOptions {
469 compression: Some("zstd(22)".into()),
470 dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
471 statistics_enabled: Some("none".into()),
472 max_statistics_size: Some(72),
473 encoding: Some("RLE".into()),
474 bloom_filter_enabled: Some(true),
475 bloom_filter_fpp: Some(0.72),
476 bloom_filter_ndv: Some(72),
477 }
478 }
479
480 fn parquet_options_with_non_defaults() -> ParquetOptions {
481 let defaults = ParquetOptions::default();
482 let writer_version = if defaults.writer_version.eq("1.0") {
483 "2.0"
484 } else {
485 "1.0"
486 };
487
488 #[allow(deprecated)] ParquetOptions {
490 data_pagesize_limit: 42,
491 write_batch_size: 42,
492 writer_version: writer_version.into(),
493 compression: Some("zstd(22)".into()),
494 dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
495 dictionary_page_size_limit: 42,
496 statistics_enabled: Some("chunk".into()),
497 max_statistics_size: Some(42),
498 max_row_group_size: 42,
499 created_by: "wordy".into(),
500 column_index_truncate_length: Some(42),
501 statistics_truncate_length: Some(42),
502 data_page_row_count_limit: 42,
503 encoding: Some("BYTE_STREAM_SPLIT".into()),
504 bloom_filter_on_write: !defaults.bloom_filter_on_write,
505 bloom_filter_fpp: Some(0.42),
506 bloom_filter_ndv: Some(42),
507
508 enable_page_index: defaults.enable_page_index,
510 pruning: defaults.pruning,
511 skip_metadata: defaults.skip_metadata,
512 metadata_size_hint: defaults.metadata_size_hint,
513 pushdown_filters: defaults.pushdown_filters,
514 reorder_filters: defaults.reorder_filters,
515 allow_single_file_parallelism: defaults.allow_single_file_parallelism,
516 maximum_parallel_row_group_writers: defaults
517 .maximum_parallel_row_group_writers,
518 maximum_buffered_record_batches_per_stream: defaults
519 .maximum_buffered_record_batches_per_stream,
520 bloom_filter_on_read: defaults.bloom_filter_on_read,
521 schema_force_view_types: defaults.schema_force_view_types,
522 binary_as_string: defaults.binary_as_string,
523 skip_arrow_metadata: defaults.skip_arrow_metadata,
524 coerce_int96: None,
525 }
526 }
527
528 fn extract_column_options(
529 props: &WriterProperties,
530 col: ColumnPath,
531 ) -> ParquetColumnOptions {
532 let bloom_filter_default_props = props.bloom_filter_properties(&col);
533
534 #[allow(deprecated)] ParquetColumnOptions {
536 bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
537 encoding: props.encoding(&col).map(|s| s.to_string()),
538 dictionary_enabled: Some(props.dictionary_enabled(&col)),
539 compression: match props.compression(&col) {
540 Compression::ZSTD(lvl) => {
541 Some(format!("zstd({})", lvl.compression_level()))
542 }
543 _ => None,
544 },
545 statistics_enabled: Some(
546 match props.statistics_enabled(&col) {
547 EnabledStatistics::None => "none",
548 EnabledStatistics::Chunk => "chunk",
549 EnabledStatistics::Page => "page",
550 }
551 .into(),
552 ),
553 bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
554 bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
555 max_statistics_size: Some(props.max_statistics_size(&col)),
556 }
557 }
558
559 fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
562 let default_col = ColumnPath::from("col doesn't have specific config");
563 let default_col_props = extract_column_options(props, default_col);
564
565 let configured_col = ColumnPath::from(COL_NAME);
566 let configured_col_props = extract_column_options(props, configured_col);
567
568 let key_value_metadata = props
569 .key_value_metadata()
570 .map(|pairs| {
571 HashMap::from_iter(
572 pairs
573 .iter()
574 .cloned()
575 .map(|KeyValue { key, value }| (key, value)),
576 )
577 })
578 .unwrap_or_default();
579
580 let global_options_defaults = ParquetOptions::default();
581
582 let column_specific_options = if configured_col_props.eq(&default_col_props) {
583 HashMap::default()
584 } else {
585 HashMap::from([(COL_NAME.into(), configured_col_props)])
586 };
587
588 #[cfg(feature = "parquet_encryption")]
589 let fep = map_encryption_to_config_encryption(props.file_encryption_properties());
590 #[cfg(not(feature = "parquet_encryption"))]
591 let fep = None;
592
593 #[allow(deprecated)] TableParquetOptions {
595 global: ParquetOptions {
596 data_pagesize_limit: props.dictionary_page_size_limit(),
598 write_batch_size: props.write_batch_size(),
599 writer_version: format!("{}.0", props.writer_version().as_num()),
600 dictionary_page_size_limit: props.dictionary_page_size_limit(),
601 max_row_group_size: props.max_row_group_size(),
602 created_by: props.created_by().to_string(),
603 column_index_truncate_length: props.column_index_truncate_length(),
604 statistics_truncate_length: props.statistics_truncate_length(),
605 data_page_row_count_limit: props.data_page_row_count_limit(),
606
607 encoding: default_col_props.encoding,
609 compression: default_col_props.compression,
610 dictionary_enabled: default_col_props.dictionary_enabled,
611 statistics_enabled: default_col_props.statistics_enabled,
612 max_statistics_size: default_col_props.max_statistics_size,
613 bloom_filter_on_write: default_col_props
614 .bloom_filter_enabled
615 .unwrap_or_default(),
616 bloom_filter_fpp: default_col_props.bloom_filter_fpp,
617 bloom_filter_ndv: default_col_props.bloom_filter_ndv,
618
619 enable_page_index: global_options_defaults.enable_page_index,
621 pruning: global_options_defaults.pruning,
622 skip_metadata: global_options_defaults.skip_metadata,
623 metadata_size_hint: global_options_defaults.metadata_size_hint,
624 pushdown_filters: global_options_defaults.pushdown_filters,
625 reorder_filters: global_options_defaults.reorder_filters,
626 allow_single_file_parallelism: global_options_defaults
627 .allow_single_file_parallelism,
628 maximum_parallel_row_group_writers: global_options_defaults
629 .maximum_parallel_row_group_writers,
630 maximum_buffered_record_batches_per_stream: global_options_defaults
631 .maximum_buffered_record_batches_per_stream,
632 bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
633 schema_force_view_types: global_options_defaults.schema_force_view_types,
634 binary_as_string: global_options_defaults.binary_as_string,
635 skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
636 coerce_int96: None,
637 },
638 column_specific_options,
639 key_value_metadata,
640 crypto: ParquetEncryptionOptions {
641 file_encryption: fep,
642 file_decryption: None,
643 },
644 }
645 }
646
647 #[test]
648 fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
649 let mut table_parquet_opts = TableParquetOptions::default();
651 assert!(
652 !table_parquet_opts.global.skip_arrow_metadata,
653 "default false, to not skip the arrow schema requirement"
654 );
655
656 let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
658 assert!(
659 should_error.is_err(),
660 "should error without the required arrow schema in kv_metadata",
661 );
662
663 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
665 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
666 assert!(
667 should_succeed.is_ok(),
668 "should work with the arrow schema skipped by config",
669 );
670
671 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
673 table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
675 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
676 assert!(
677 should_succeed.is_ok(),
678 "should work with the arrow schema included in TableParquetOptions",
679 );
680 }
681
682 #[test]
683 fn table_parquet_opts_to_writer_props() {
684 let parquet_options = parquet_options_with_non_defaults();
686
687 let key = ARROW_SCHEMA_META_KEY.to_string();
689 let value = Some("bar".into());
690 let table_parquet_opts = TableParquetOptions {
691 global: parquet_options.clone(),
692 column_specific_options: [(
693 COL_NAME.into(),
694 column_options_with_non_defaults(&parquet_options),
695 )]
696 .into(),
697 key_value_metadata: [(key, value)].into(),
698 crypto: Default::default(),
699 };
700
701 let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
702 .unwrap()
703 .build();
704 assert_eq!(
705 table_parquet_opts,
706 session_config_from_writer_props(&writer_props),
707 "the writer_props should have the same configuration as the session's TableParquetOptions",
708 );
709 }
710
711 #[test]
714 fn test_defaults_match() {
715 let mut default_table_writer_opts = TableParquetOptions::default();
717 let default_parquet_opts = ParquetOptions::default();
718 assert_eq!(
719 default_table_writer_opts.global,
720 default_parquet_opts,
721 "should have matching defaults for TableParquetOptions.global and ParquetOptions",
722 );
723
724 default_table_writer_opts =
726 default_table_writer_opts.with_skip_arrow_metadata(true);
727
728 let default_writer_props = WriterProperties::new();
730
731 let from_datafusion_defaults =
733 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
734 .unwrap()
735 .build();
736
737 assert_ne!(
739 default_writer_props.created_by(),
740 from_datafusion_defaults.created_by(),
741 "should have different created_by sources",
742 );
743 assert!(
744 default_writer_props.created_by().starts_with("parquet-rs version"),
745 "should indicate that writer_props defaults came from the extern parquet crate",
746 );
747 assert!(
748 default_table_writer_opts
749 .global
750 .created_by
751 .starts_with("datafusion version"),
752 "should indicate that table_parquet_opts defaults came from datafusion",
753 );
754
755 assert_eq!(
757 default_writer_props.compression(&"default".into()),
758 Compression::UNCOMPRESSED,
759 "extern parquet's default is None"
760 );
761 assert!(
762 matches!(
763 from_datafusion_defaults.compression(&"default".into()),
764 Compression::ZSTD(_)
765 ),
766 "datafusion's default is zstd"
767 );
768
769 let same_created_by = default_table_writer_opts.global.created_by.clone();
771 let mut from_extern_parquet =
772 session_config_from_writer_props(&default_writer_props);
773 from_extern_parquet.global.created_by = same_created_by;
774 from_extern_parquet.global.compression = Some("zstd(3)".into());
775 from_extern_parquet.global.skip_arrow_metadata = true;
776
777 assert_eq!(
778 default_table_writer_opts,
779 from_extern_parquet,
780 "the default writer_props should have the same configuration as the session's default TableParquetOptions",
781 );
782 }
783
784 #[test]
785 fn test_bloom_filter_defaults() {
786 let mut default_table_writer_opts = TableParquetOptions::default();
788 default_table_writer_opts.global.bloom_filter_on_write = true;
789 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
791 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
792 .unwrap()
793 .build();
794
795 let default_writer_props = WriterProperties::builder()
797 .set_bloom_filter_enabled(true)
798 .build();
799
800 assert_eq!(
801 default_writer_props.bloom_filter_properties(&"default".into()),
802 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
803 "parquet and datafusion props, should have the same bloom filter props",
804 );
805 assert_eq!(
806 default_writer_props.bloom_filter_properties(&"default".into()),
807 Some(&BloomFilterProperties::default()),
808 "should use the default bloom filter props"
809 );
810 }
811
812 #[test]
813 fn test_bloom_filter_set_fpp_only() {
814 let mut default_table_writer_opts = TableParquetOptions::default();
816 default_table_writer_opts.global.bloom_filter_on_write = true;
817 default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
818 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
820 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
821 .unwrap()
822 .build();
823
824 let default_writer_props = WriterProperties::builder()
826 .set_bloom_filter_enabled(true)
827 .set_bloom_filter_fpp(0.42)
828 .build();
829
830 assert_eq!(
831 default_writer_props.bloom_filter_properties(&"default".into()),
832 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
833 "parquet and datafusion props, should have the same bloom filter props",
834 );
835 assert_eq!(
836 default_writer_props.bloom_filter_properties(&"default".into()),
837 Some(&BloomFilterProperties {
838 fpp: 0.42,
839 ndv: DEFAULT_BLOOM_FILTER_NDV
840 }),
841 "should have only the fpp set, and the ndv at default",
842 );
843 }
844
845 #[test]
846 fn test_bloom_filter_set_ndv_only() {
847 let mut default_table_writer_opts = TableParquetOptions::default();
849 default_table_writer_opts.global.bloom_filter_on_write = true;
850 default_table_writer_opts.global.bloom_filter_ndv = Some(42);
851 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
853 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
854 .unwrap()
855 .build();
856
857 let default_writer_props = WriterProperties::builder()
859 .set_bloom_filter_enabled(true)
860 .set_bloom_filter_ndv(42)
861 .build();
862
863 assert_eq!(
864 default_writer_props.bloom_filter_properties(&"default".into()),
865 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
866 "parquet and datafusion props, should have the same bloom filter props",
867 );
868 assert_eq!(
869 default_writer_props.bloom_filter_properties(&"default".into()),
870 Some(&BloomFilterProperties {
871 fpp: DEFAULT_BLOOM_FILTER_FPP,
872 ndv: 42
873 }),
874 "should have only the ndv set, and the fpp at default",
875 );
876 }
877}