datafusion_common/file_options/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how parquet files should be written
19
20use std::sync::Arc;
21
22use crate::{
23    _internal_datafusion_err, DataFusionError, Result,
24    config::{ParquetOptions, TableParquetOptions},
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29use parquet::{
30    arrow::ARROW_SCHEMA_META_KEY,
31    basic::{BrotliLevel, GzipLevel, ZstdLevel},
32    file::{
33        metadata::KeyValue,
34        properties::{
35            DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36            WriterPropertiesBuilder,
37        },
38    },
39    schema::types::ColumnPath,
40};
41
42/// Options for writing parquet files
43#[derive(Clone, Debug)]
44pub struct ParquetWriterOptions {
45    /// parquet-rs writer properties
46    pub writer_options: WriterProperties,
47}
48
49impl ParquetWriterOptions {
50    pub fn new(writer_options: WriterProperties) -> Self {
51        Self { writer_options }
52    }
53}
54
55impl ParquetWriterOptions {
56    pub fn writer_options(&self) -> &WriterProperties {
57        &self.writer_options
58    }
59}
60
61impl TableParquetOptions {
62    /// Add the arrow schema to the parquet kv_metadata.
63    /// If already exists, then overwrites.
64    pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65        self.key_value_metadata.insert(
66            ARROW_SCHEMA_META_KEY.into(),
67            Some(encode_arrow_schema(schema)),
68        );
69    }
70}
71
72impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
73    type Error = DataFusionError;
74
75    fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
76        // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
77        Ok(ParquetWriterOptions {
78            writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
79                .build(),
80        })
81    }
82}
83
84impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
85    type Error = DataFusionError;
86
87    /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
88    ///
89    /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
90    /// Note that any encryption options are ignored as building the `FileEncryptionProperties`
91    /// might require other inputs besides the [`TableParquetOptions`].
92    fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93        // Table options include kv_metadata and col-specific options
94        let TableParquetOptions {
95            global,
96            column_specific_options,
97            key_value_metadata,
98            crypto: _,
99        } = table_parquet_options;
100
101        let mut builder = global.into_writer_properties_builder()?;
102
103        // check that the arrow schema is present in the kv_metadata, if configured to do so
104        if !global.skip_arrow_metadata
105            && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
106        {
107            return Err(_internal_datafusion_err!(
108                "arrow schema was not added to the kv_metadata, even though it is required by configuration settings"
109            ));
110        }
111
112        // add kv_meta, if any
113        if !key_value_metadata.is_empty() {
114            builder = builder.set_key_value_metadata(Some(
115                key_value_metadata
116                    .to_owned()
117                    .drain()
118                    .map(|(key, value)| KeyValue { key, value })
119                    .collect(),
120            ));
121        }
122
123        // Apply column-specific options:
124        for (column, options) in column_specific_options {
125            let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127            if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128                builder = builder
129                    .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130            }
131
132            if let Some(encoding) = &options.encoding {
133                let parsed_encoding = parse_encoding_string(encoding)?;
134                builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135            }
136
137            if let Some(dictionary_enabled) = options.dictionary_enabled {
138                builder = builder
139                    .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140            }
141
142            if let Some(compression) = &options.compression {
143                let parsed_compression = parse_compression_string(compression)?;
144                builder =
145                    builder.set_column_compression(path.clone(), parsed_compression);
146            }
147
148            if let Some(statistics_enabled) = &options.statistics_enabled {
149                let parsed_value = parse_statistics_string(statistics_enabled)?;
150                builder =
151                    builder.set_column_statistics_enabled(path.clone(), parsed_value);
152            }
153
154            if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155                builder =
156                    builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157            }
158
159            if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160                builder =
161                    builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162            }
163        }
164
165        Ok(builder)
166    }
167}
168
169impl ParquetOptions {
170    /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
171    ///
172    /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
173    /// applied per column; a customization which is not applicable for [`ParquetOptions`].
174    ///
175    /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
176    pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
177        let ParquetOptions {
178            data_pagesize_limit,
179            write_batch_size,
180            writer_version,
181            compression,
182            dictionary_enabled,
183            dictionary_page_size_limit,
184            statistics_enabled,
185            max_row_group_size,
186            created_by,
187            column_index_truncate_length,
188            statistics_truncate_length,
189            data_page_row_count_limit,
190            encoding,
191            bloom_filter_on_write,
192            bloom_filter_fpp,
193            bloom_filter_ndv,
194
195            // not in WriterProperties
196            enable_page_index: _,
197            pruning: _,
198            skip_metadata: _,
199            metadata_size_hint: _,
200            pushdown_filters: _,
201            reorder_filters: _,
202            force_filter_selections: _, // not used for writer props
203            allow_single_file_parallelism: _,
204            maximum_parallel_row_group_writers: _,
205            maximum_buffered_record_batches_per_stream: _,
206            bloom_filter_on_read: _, // reads not used for writer props
207            schema_force_view_types: _,
208            binary_as_string: _, // not used for writer props
209            coerce_int96: _,     // not used for writer props
210            skip_arrow_metadata: _,
211            max_predicate_cache_size: _,
212        } = self;
213
214        let mut builder = WriterProperties::builder()
215            .set_data_page_size_limit(*data_pagesize_limit)
216            .set_write_batch_size(*write_batch_size)
217            .set_writer_version((*writer_version).into())
218            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
219            .set_statistics_enabled(
220                statistics_enabled
221                    .as_ref()
222                    .and_then(|s| parse_statistics_string(s).ok())
223                    .unwrap_or(DEFAULT_STATISTICS_ENABLED),
224            )
225            .set_max_row_group_size(*max_row_group_size)
226            .set_created_by(created_by.clone())
227            .set_column_index_truncate_length(*column_index_truncate_length)
228            .set_statistics_truncate_length(*statistics_truncate_length)
229            .set_data_page_row_count_limit(*data_page_row_count_limit)
230            .set_bloom_filter_enabled(*bloom_filter_on_write);
231
232        if let Some(bloom_filter_fpp) = bloom_filter_fpp {
233            builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
234        };
235        if let Some(bloom_filter_ndv) = bloom_filter_ndv {
236            builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
237        };
238        if let Some(dictionary_enabled) = dictionary_enabled {
239            builder = builder.set_dictionary_enabled(*dictionary_enabled);
240        };
241
242        // We do not have access to default ColumnProperties set in Arrow.
243        // Therefore, only overwrite if these settings exist.
244        if let Some(compression) = compression {
245            builder = builder.set_compression(parse_compression_string(compression)?);
246        }
247        if let Some(encoding) = encoding {
248            builder = builder.set_encoding(parse_encoding_string(encoding)?);
249        }
250
251        Ok(builder)
252    }
253}
254
255/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
256pub(crate) fn parse_encoding_string(
257    str_setting: &str,
258) -> Result<parquet::basic::Encoding> {
259    let str_setting_lower: &str = &str_setting.to_lowercase();
260    match str_setting_lower {
261        "plain" => Ok(parquet::basic::Encoding::PLAIN),
262        "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
263        "rle" => Ok(parquet::basic::Encoding::RLE),
264        #[expect(deprecated)]
265        "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
266        "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
267        "delta_length_byte_array" => {
268            Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
269        }
270        "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
271        "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
272        "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
273        _ => Err(DataFusionError::Configuration(format!(
274            "Unknown or unsupported parquet encoding: \
275        {str_setting}. Valid values are: plain, plain_dictionary, rle, \
276        bit_packed, delta_binary_packed, delta_length_byte_array, \
277        delta_byte_array, rle_dictionary, and byte_stream_split."
278        ))),
279    }
280}
281
282/// Splits compression string into compression codec and optional compression_level
283/// I.e. gzip(2) -> gzip, 2
284fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
285    // ignore string literal chars passed from sqlparser i.e. remove single quotes
286    let str_setting = str_setting.replace('\'', "");
287    let split_setting = str_setting.split_once('(');
288
289    match split_setting {
290        Some((codec, rh)) => {
291            let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
292                DataFusionError::Configuration(format!(
293                    "Could not parse compression string. \
294                    Got codec: {codec} and unknown level from {str_setting}"
295                ))
296            })?;
297            Ok((codec.to_owned(), Some(*level)))
298        }
299        None => Ok((str_setting.to_owned(), None)),
300    }
301}
302
303/// Helper to ensure compression codecs which don't support levels
304/// don't have one set. E.g. snappy(2) is invalid.
305fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
306    if level.is_some() {
307        return Err(DataFusionError::Configuration(format!(
308            "Compression {codec} does not support specifying a level"
309        )));
310    }
311    Ok(())
312}
313
314/// Helper to ensure compression codecs which require a level
315/// do have one set. E.g. zstd is invalid, zstd(3) is valid
316fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
317    level.ok_or(DataFusionError::Configuration(format!(
318        "{codec} compression requires specifying a level such as {codec}(4)"
319    )))
320}
321
322/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
323pub fn parse_compression_string(
324    str_setting: &str,
325) -> Result<parquet::basic::Compression> {
326    let str_setting_lower: &str = &str_setting.to_lowercase();
327    let (codec, level) = split_compression_string(str_setting_lower)?;
328    let codec = codec.as_str();
329    match codec {
330        "uncompressed" => {
331            check_level_is_none(codec, &level)?;
332            Ok(parquet::basic::Compression::UNCOMPRESSED)
333        }
334        "snappy" => {
335            check_level_is_none(codec, &level)?;
336            Ok(parquet::basic::Compression::SNAPPY)
337        }
338        "gzip" => {
339            let level = require_level(codec, level)?;
340            Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
341                level,
342            )?))
343        }
344        "lzo" => {
345            check_level_is_none(codec, &level)?;
346            Ok(parquet::basic::Compression::LZO)
347        }
348        "brotli" => {
349            let level = require_level(codec, level)?;
350            Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
351                level,
352            )?))
353        }
354        "lz4" => {
355            check_level_is_none(codec, &level)?;
356            Ok(parquet::basic::Compression::LZ4)
357        }
358        "zstd" => {
359            let level = require_level(codec, level)?;
360            Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
361                level as i32,
362            )?))
363        }
364        "lz4_raw" => {
365            check_level_is_none(codec, &level)?;
366            Ok(parquet::basic::Compression::LZ4_RAW)
367        }
368        _ => Err(DataFusionError::Configuration(format!(
369            "Unknown or unsupported parquet compression: \
370        {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
371        lzo, brotli(level), lz4, zstd(level), and lz4_raw."
372        ))),
373    }
374}
375
376pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
377    let str_setting_lower: &str = &str_setting.to_lowercase();
378    match str_setting_lower {
379        "none" => Ok(EnabledStatistics::None),
380        "chunk" => Ok(EnabledStatistics::Chunk),
381        "page" => Ok(EnabledStatistics::Page),
382        _ => Err(DataFusionError::Configuration(format!(
383            "Unknown or unsupported parquet statistics setting {str_setting} \
384            valid options are none, page, and chunk"
385        ))),
386    }
387}
388
389#[cfg(feature = "parquet")]
390#[cfg(test)]
391mod tests {
392    use super::*;
393    #[cfg(feature = "parquet_encryption")]
394    use crate::config::ConfigFileEncryptionProperties;
395    use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
396    use crate::parquet_config::DFParquetWriterVersion;
397    use parquet::basic::Compression;
398    use parquet::file::properties::{
399        BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
400        EnabledStatistics,
401    };
402    use std::collections::HashMap;
403
404    const COL_NAME: &str = "configured";
405
406    /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
407    fn column_options_with_non_defaults(
408        src_col_defaults: &ParquetOptions,
409    ) -> ParquetColumnOptions {
410        ParquetColumnOptions {
411            compression: Some("zstd(22)".into()),
412            dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
413            statistics_enabled: Some("none".into()),
414            encoding: Some("RLE".into()),
415            bloom_filter_enabled: Some(true),
416            bloom_filter_fpp: Some(0.72),
417            bloom_filter_ndv: Some(72),
418        }
419    }
420
421    fn parquet_options_with_non_defaults() -> ParquetOptions {
422        let defaults = ParquetOptions::default();
423        let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
424        {
425            DFParquetWriterVersion::V2_0
426        } else {
427            DFParquetWriterVersion::V1_0
428        };
429
430        ParquetOptions {
431            data_pagesize_limit: 42,
432            write_batch_size: 42,
433            writer_version,
434            compression: Some("zstd(22)".into()),
435            dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
436            dictionary_page_size_limit: 42,
437            statistics_enabled: Some("chunk".into()),
438            max_row_group_size: 42,
439            created_by: "wordy".into(),
440            column_index_truncate_length: Some(42),
441            statistics_truncate_length: Some(42),
442            data_page_row_count_limit: 42,
443            encoding: Some("BYTE_STREAM_SPLIT".into()),
444            bloom_filter_on_write: !defaults.bloom_filter_on_write,
445            bloom_filter_fpp: Some(0.42),
446            bloom_filter_ndv: Some(42),
447
448            // not in WriterProperties, but itemizing here to not skip newly added props
449            enable_page_index: defaults.enable_page_index,
450            pruning: defaults.pruning,
451            skip_metadata: defaults.skip_metadata,
452            metadata_size_hint: defaults.metadata_size_hint,
453            pushdown_filters: defaults.pushdown_filters,
454            reorder_filters: defaults.reorder_filters,
455            force_filter_selections: defaults.force_filter_selections,
456            allow_single_file_parallelism: defaults.allow_single_file_parallelism,
457            maximum_parallel_row_group_writers: defaults
458                .maximum_parallel_row_group_writers,
459            maximum_buffered_record_batches_per_stream: defaults
460                .maximum_buffered_record_batches_per_stream,
461            bloom_filter_on_read: defaults.bloom_filter_on_read,
462            schema_force_view_types: defaults.schema_force_view_types,
463            binary_as_string: defaults.binary_as_string,
464            skip_arrow_metadata: defaults.skip_arrow_metadata,
465            coerce_int96: None,
466            max_predicate_cache_size: defaults.max_predicate_cache_size,
467        }
468    }
469
470    fn extract_column_options(
471        props: &WriterProperties,
472        col: ColumnPath,
473    ) -> ParquetColumnOptions {
474        let bloom_filter_default_props = props.bloom_filter_properties(&col);
475
476        ParquetColumnOptions {
477            bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
478            encoding: props.encoding(&col).map(|s| s.to_string()),
479            dictionary_enabled: Some(props.dictionary_enabled(&col)),
480            compression: match props.compression(&col) {
481                Compression::ZSTD(lvl) => {
482                    Some(format!("zstd({})", lvl.compression_level()))
483                }
484                _ => None,
485            },
486            statistics_enabled: Some(
487                match props.statistics_enabled(&col) {
488                    EnabledStatistics::None => "none",
489                    EnabledStatistics::Chunk => "chunk",
490                    EnabledStatistics::Page => "page",
491                }
492                .into(),
493            ),
494            bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
495            bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
496        }
497    }
498
499    /// For testing only, take a single write's props and convert back into the session config.
500    /// (use identity to confirm correct.)
501    fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
502        let default_col = ColumnPath::from("col doesn't have specific config");
503        let default_col_props = extract_column_options(props, default_col);
504
505        let configured_col = ColumnPath::from(COL_NAME);
506        let configured_col_props = extract_column_options(props, configured_col);
507
508        let key_value_metadata = props
509            .key_value_metadata()
510            .map(|pairs| {
511                HashMap::from_iter(
512                    pairs
513                        .iter()
514                        .cloned()
515                        .map(|KeyValue { key, value }| (key, value)),
516                )
517            })
518            .unwrap_or_default();
519
520        let global_options_defaults = ParquetOptions::default();
521
522        let column_specific_options = if configured_col_props.eq(&default_col_props) {
523            HashMap::default()
524        } else {
525            HashMap::from([(COL_NAME.into(), configured_col_props)])
526        };
527
528        #[cfg(feature = "parquet_encryption")]
529        let fep = props
530            .file_encryption_properties()
531            .map(ConfigFileEncryptionProperties::from);
532
533        #[cfg(not(feature = "parquet_encryption"))]
534        let fep = None;
535
536        TableParquetOptions {
537            global: ParquetOptions {
538                // global options
539                data_pagesize_limit: props.dictionary_page_size_limit(),
540                write_batch_size: props.write_batch_size(),
541                writer_version: props.writer_version().into(),
542                dictionary_page_size_limit: props.dictionary_page_size_limit(),
543                max_row_group_size: props.max_row_group_size(),
544                created_by: props.created_by().to_string(),
545                column_index_truncate_length: props.column_index_truncate_length(),
546                statistics_truncate_length: props.statistics_truncate_length(),
547                data_page_row_count_limit: props.data_page_row_count_limit(),
548
549                // global options which set the default column props
550                encoding: default_col_props.encoding,
551                compression: default_col_props.compression,
552                dictionary_enabled: default_col_props.dictionary_enabled,
553                statistics_enabled: default_col_props.statistics_enabled,
554                bloom_filter_on_write: default_col_props
555                    .bloom_filter_enabled
556                    .unwrap_or_default(),
557                bloom_filter_fpp: default_col_props.bloom_filter_fpp,
558                bloom_filter_ndv: default_col_props.bloom_filter_ndv,
559
560                // not in WriterProperties
561                enable_page_index: global_options_defaults.enable_page_index,
562                pruning: global_options_defaults.pruning,
563                skip_metadata: global_options_defaults.skip_metadata,
564                metadata_size_hint: global_options_defaults.metadata_size_hint,
565                pushdown_filters: global_options_defaults.pushdown_filters,
566                reorder_filters: global_options_defaults.reorder_filters,
567                force_filter_selections: global_options_defaults.force_filter_selections,
568                allow_single_file_parallelism: global_options_defaults
569                    .allow_single_file_parallelism,
570                maximum_parallel_row_group_writers: global_options_defaults
571                    .maximum_parallel_row_group_writers,
572                maximum_buffered_record_batches_per_stream: global_options_defaults
573                    .maximum_buffered_record_batches_per_stream,
574                bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
575                max_predicate_cache_size: global_options_defaults
576                    .max_predicate_cache_size,
577                schema_force_view_types: global_options_defaults.schema_force_view_types,
578                binary_as_string: global_options_defaults.binary_as_string,
579                skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
580                coerce_int96: None,
581            },
582            column_specific_options,
583            key_value_metadata,
584            crypto: ParquetEncryptionOptions {
585                file_encryption: fep,
586                file_decryption: None,
587                factory_id: None,
588                factory_options: Default::default(),
589            },
590        }
591    }
592
593    #[test]
594    fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
595        // TableParquetOptions, all props set to default
596        let mut table_parquet_opts = TableParquetOptions::default();
597        assert!(
598            !table_parquet_opts.global.skip_arrow_metadata,
599            "default false, to not skip the arrow schema requirement"
600        );
601
602        // see errors without the schema added, using default settings
603        let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
604        assert!(
605            should_error.is_err(),
606            "should error without the required arrow schema in kv_metadata",
607        );
608
609        // succeeds if we permit skipping the arrow schema
610        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
611        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
612        assert!(
613            should_succeed.is_ok(),
614            "should work with the arrow schema skipped by config",
615        );
616
617        // Set the arrow schema back to required
618        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
619        // add the arrow schema to the kv_meta
620        table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
621        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
622        assert!(
623            should_succeed.is_ok(),
624            "should work with the arrow schema included in TableParquetOptions",
625        );
626    }
627
628    #[test]
629    fn table_parquet_opts_to_writer_props() {
630        // ParquetOptions, all props set to non-default
631        let parquet_options = parquet_options_with_non_defaults();
632
633        // TableParquetOptions, using ParquetOptions for global settings
634        let key = ARROW_SCHEMA_META_KEY.to_string();
635        let value = Some("bar".into());
636        let table_parquet_opts = TableParquetOptions {
637            global: parquet_options.clone(),
638            column_specific_options: [(
639                COL_NAME.into(),
640                column_options_with_non_defaults(&parquet_options),
641            )]
642            .into(),
643            key_value_metadata: [(key, value)].into(),
644            crypto: Default::default(),
645        };
646
647        let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
648            .unwrap()
649            .build();
650        assert_eq!(
651            table_parquet_opts,
652            session_config_from_writer_props(&writer_props),
653            "the writer_props should have the same configuration as the session's TableParquetOptions",
654        );
655    }
656
657    /// Ensure that the configuration defaults for writing parquet files are
658    /// consistent with the options in arrow-rs
659    #[test]
660    fn test_defaults_match() {
661        // ensure the global settings are the same
662        let mut default_table_writer_opts = TableParquetOptions::default();
663        let default_parquet_opts = ParquetOptions::default();
664        assert_eq!(
665            default_table_writer_opts.global, default_parquet_opts,
666            "should have matching defaults for TableParquetOptions.global and ParquetOptions",
667        );
668
669        // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
670        default_table_writer_opts =
671            default_table_writer_opts.with_skip_arrow_metadata(true);
672
673        // WriterProperties::default, a.k.a. using extern parquet's defaults
674        let default_writer_props = WriterProperties::new();
675
676        // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
677        let from_datafusion_defaults =
678            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
679                .unwrap()
680                .build();
681
682        // Expected: how the defaults should not match
683        assert_ne!(
684            default_writer_props.created_by(),
685            from_datafusion_defaults.created_by(),
686            "should have different created_by sources",
687        );
688        assert!(
689            default_writer_props
690                .created_by()
691                .starts_with("parquet-rs version"),
692            "should indicate that writer_props defaults came from the extern parquet crate",
693        );
694        assert!(
695            default_table_writer_opts
696                .global
697                .created_by
698                .starts_with("datafusion version"),
699            "should indicate that table_parquet_opts defaults came from datafusion",
700        );
701
702        // Expected: the datafusion default compression is different from arrow-rs's parquet
703        assert_eq!(
704            default_writer_props.compression(&"default".into()),
705            Compression::UNCOMPRESSED,
706            "extern parquet's default is None"
707        );
708        assert!(
709            matches!(
710                from_datafusion_defaults.compression(&"default".into()),
711                Compression::ZSTD(_)
712            ),
713            "datafusion's default is zstd"
714        );
715
716        // Expected: the remaining should match
717        let same_created_by = default_table_writer_opts.global.created_by.clone();
718        let mut from_extern_parquet =
719            session_config_from_writer_props(&default_writer_props);
720        from_extern_parquet.global.created_by = same_created_by;
721        from_extern_parquet.global.compression = Some("zstd(3)".into());
722        from_extern_parquet.global.skip_arrow_metadata = true;
723
724        assert_eq!(
725            default_table_writer_opts, from_extern_parquet,
726            "the default writer_props should have the same configuration as the session's default TableParquetOptions",
727        );
728    }
729
730    #[test]
731    fn test_bloom_filter_defaults() {
732        // the TableParquetOptions::default, with only the bloom filter turned on
733        let mut default_table_writer_opts = TableParquetOptions::default();
734        default_table_writer_opts.global.bloom_filter_on_write = true;
735        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
736        let from_datafusion_defaults =
737            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
738                .unwrap()
739                .build();
740
741        // the WriterProperties::default, with only the bloom filter turned on
742        let default_writer_props = WriterProperties::builder()
743            .set_bloom_filter_enabled(true)
744            .build();
745
746        assert_eq!(
747            default_writer_props.bloom_filter_properties(&"default".into()),
748            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
749            "parquet and datafusion props, should have the same bloom filter props",
750        );
751        assert_eq!(
752            default_writer_props.bloom_filter_properties(&"default".into()),
753            Some(&BloomFilterProperties::default()),
754            "should use the default bloom filter props"
755        );
756    }
757
758    #[test]
759    fn test_bloom_filter_set_fpp_only() {
760        // the TableParquetOptions::default, with only fpp set
761        let mut default_table_writer_opts = TableParquetOptions::default();
762        default_table_writer_opts.global.bloom_filter_on_write = true;
763        default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
764        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
765        let from_datafusion_defaults =
766            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
767                .unwrap()
768                .build();
769
770        // the WriterProperties::default, with only fpp set
771        let default_writer_props = WriterProperties::builder()
772            .set_bloom_filter_enabled(true)
773            .set_bloom_filter_fpp(0.42)
774            .build();
775
776        assert_eq!(
777            default_writer_props.bloom_filter_properties(&"default".into()),
778            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
779            "parquet and datafusion props, should have the same bloom filter props",
780        );
781        assert_eq!(
782            default_writer_props.bloom_filter_properties(&"default".into()),
783            Some(&BloomFilterProperties {
784                fpp: 0.42,
785                ndv: DEFAULT_BLOOM_FILTER_NDV
786            }),
787            "should have only the fpp set, and the ndv at default",
788        );
789    }
790
791    #[test]
792    fn test_bloom_filter_set_ndv_only() {
793        // the TableParquetOptions::default, with only ndv set
794        let mut default_table_writer_opts = TableParquetOptions::default();
795        default_table_writer_opts.global.bloom_filter_on_write = true;
796        default_table_writer_opts.global.bloom_filter_ndv = Some(42);
797        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
798        let from_datafusion_defaults =
799            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
800                .unwrap()
801                .build();
802
803        // the WriterProperties::default, with only ndv set
804        let default_writer_props = WriterProperties::builder()
805            .set_bloom_filter_enabled(true)
806            .set_bloom_filter_ndv(42)
807            .build();
808
809        assert_eq!(
810            default_writer_props.bloom_filter_properties(&"default".into()),
811            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
812            "parquet and datafusion props, should have the same bloom filter props",
813        );
814        assert_eq!(
815            default_writer_props.bloom_filter_properties(&"default".into()),
816            Some(&BloomFilterProperties {
817                fpp: DEFAULT_BLOOM_FILTER_FPP,
818                ndv: 42
819            }),
820            "should have only the ndv set, and the fpp at default",
821        );
822    }
823}