datafusion_common/file_options/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how parquet files should be written
19
20use std::sync::Arc;
21
22use crate::{
23    config::{ParquetOptions, TableParquetOptions},
24    DataFusionError, Result, _internal_datafusion_err,
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29// TODO: handle once deprecated
30#[allow(deprecated)]
31use parquet::{
32    arrow::ARROW_SCHEMA_META_KEY,
33    basic::{BrotliLevel, GzipLevel, ZstdLevel},
34    file::{
35        metadata::KeyValue,
36        properties::{
37            EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
38            DEFAULT_STATISTICS_ENABLED,
39        },
40    },
41    schema::types::ColumnPath,
42};
43
44/// Options for writing parquet files
45#[derive(Clone, Debug)]
46pub struct ParquetWriterOptions {
47    /// parquet-rs writer properties
48    pub writer_options: WriterProperties,
49}
50
51impl ParquetWriterOptions {
52    pub fn new(writer_options: WriterProperties) -> Self {
53        Self { writer_options }
54    }
55}
56
57impl ParquetWriterOptions {
58    pub fn writer_options(&self) -> &WriterProperties {
59        &self.writer_options
60    }
61}
62
63impl TableParquetOptions {
64    /// Add the arrow schema to the parquet kv_metadata.
65    /// If already exists, then overwrites.
66    pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
67        self.key_value_metadata.insert(
68            ARROW_SCHEMA_META_KEY.into(),
69            Some(encode_arrow_schema(schema)),
70        );
71    }
72}
73
74impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
75    type Error = DataFusionError;
76
77    fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
78        // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
79        Ok(ParquetWriterOptions {
80            writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
81                .build(),
82        })
83    }
84}
85
86impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
87    type Error = DataFusionError;
88
89    /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
90    ///
91    /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
92    /// Note that any encryption options are ignored as building the `FileEncryptionProperties`
93    /// might require other inputs besides the [`TableParquetOptions`].
94    fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
95        // Table options include kv_metadata and col-specific options
96        let TableParquetOptions {
97            global,
98            column_specific_options,
99            key_value_metadata,
100            crypto: _,
101        } = table_parquet_options;
102
103        let mut builder = global.into_writer_properties_builder()?;
104
105        // check that the arrow schema is present in the kv_metadata, if configured to do so
106        if !global.skip_arrow_metadata
107            && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
108        {
109            return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
110        }
111
112        // add kv_meta, if any
113        if !key_value_metadata.is_empty() {
114            builder = builder.set_key_value_metadata(Some(
115                key_value_metadata
116                    .to_owned()
117                    .drain()
118                    .map(|(key, value)| KeyValue { key, value })
119                    .collect(),
120            ));
121        }
122
123        // Apply column-specific options:
124        for (column, options) in column_specific_options {
125            let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127            if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128                builder = builder
129                    .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130            }
131
132            if let Some(encoding) = &options.encoding {
133                let parsed_encoding = parse_encoding_string(encoding)?;
134                builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135            }
136
137            if let Some(dictionary_enabled) = options.dictionary_enabled {
138                builder = builder
139                    .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140            }
141
142            if let Some(compression) = &options.compression {
143                let parsed_compression = parse_compression_string(compression)?;
144                builder =
145                    builder.set_column_compression(path.clone(), parsed_compression);
146            }
147
148            if let Some(statistics_enabled) = &options.statistics_enabled {
149                let parsed_value = parse_statistics_string(statistics_enabled)?;
150                builder =
151                    builder.set_column_statistics_enabled(path.clone(), parsed_value);
152            }
153
154            if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155                builder =
156                    builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157            }
158
159            if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160                builder =
161                    builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162            }
163        }
164
165        Ok(builder)
166    }
167}
168
169impl ParquetOptions {
170    /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
171    ///
172    /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
173    /// applied per column; a customization which is not applicable for [`ParquetOptions`].
174    ///
175    /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
176    pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
177        #[allow(deprecated)]
178        let ParquetOptions {
179            data_pagesize_limit,
180            write_batch_size,
181            writer_version,
182            compression,
183            dictionary_enabled,
184            dictionary_page_size_limit,
185            statistics_enabled,
186            max_row_group_size,
187            created_by,
188            column_index_truncate_length,
189            statistics_truncate_length,
190            data_page_row_count_limit,
191            encoding,
192            bloom_filter_on_write,
193            bloom_filter_fpp,
194            bloom_filter_ndv,
195
196            // not in WriterProperties
197            enable_page_index: _,
198            pruning: _,
199            skip_metadata: _,
200            metadata_size_hint: _,
201            pushdown_filters: _,
202            reorder_filters: _,
203            allow_single_file_parallelism: _,
204            maximum_parallel_row_group_writers: _,
205            maximum_buffered_record_batches_per_stream: _,
206            bloom_filter_on_read: _, // reads not used for writer props
207            schema_force_view_types: _,
208            binary_as_string: _, // not used for writer props
209            coerce_int96: _,     // not used for writer props
210            skip_arrow_metadata: _,
211            max_predicate_cache_size: _,
212        } = self;
213
214        let mut builder = WriterProperties::builder()
215            .set_data_page_size_limit(*data_pagesize_limit)
216            .set_write_batch_size(*write_batch_size)
217            .set_writer_version(parse_version_string(writer_version.as_str())?)
218            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
219            .set_statistics_enabled(
220                statistics_enabled
221                    .as_ref()
222                    .and_then(|s| parse_statistics_string(s).ok())
223                    .unwrap_or(DEFAULT_STATISTICS_ENABLED),
224            )
225            .set_max_row_group_size(*max_row_group_size)
226            .set_created_by(created_by.clone())
227            .set_column_index_truncate_length(*column_index_truncate_length)
228            .set_statistics_truncate_length(*statistics_truncate_length)
229            .set_data_page_row_count_limit(*data_page_row_count_limit)
230            .set_bloom_filter_enabled(*bloom_filter_on_write);
231
232        if let Some(bloom_filter_fpp) = bloom_filter_fpp {
233            builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
234        };
235        if let Some(bloom_filter_ndv) = bloom_filter_ndv {
236            builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
237        };
238        if let Some(dictionary_enabled) = dictionary_enabled {
239            builder = builder.set_dictionary_enabled(*dictionary_enabled);
240        };
241
242        // We do not have access to default ColumnProperties set in Arrow.
243        // Therefore, only overwrite if these settings exist.
244        if let Some(compression) = compression {
245            builder = builder.set_compression(parse_compression_string(compression)?);
246        }
247        if let Some(encoding) = encoding {
248            builder = builder.set_encoding(parse_encoding_string(encoding)?);
249        }
250
251        Ok(builder)
252    }
253}
254
255/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
256pub(crate) fn parse_encoding_string(
257    str_setting: &str,
258) -> Result<parquet::basic::Encoding> {
259    let str_setting_lower: &str = &str_setting.to_lowercase();
260    match str_setting_lower {
261        "plain" => Ok(parquet::basic::Encoding::PLAIN),
262        "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
263        "rle" => Ok(parquet::basic::Encoding::RLE),
264        #[allow(deprecated)]
265        "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
266        "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
267        "delta_length_byte_array" => {
268            Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
269        }
270        "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
271        "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
272        "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
273        _ => Err(DataFusionError::Configuration(format!(
274            "Unknown or unsupported parquet encoding: \
275        {str_setting}. Valid values are: plain, plain_dictionary, rle, \
276        bit_packed, delta_binary_packed, delta_length_byte_array, \
277        delta_byte_array, rle_dictionary, and byte_stream_split."
278        ))),
279    }
280}
281
282/// Splits compression string into compression codec and optional compression_level
283/// I.e. gzip(2) -> gzip, 2
284fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
285    // ignore string literal chars passed from sqlparser i.e. remove single quotes
286    let str_setting = str_setting.replace('\'', "");
287    let split_setting = str_setting.split_once('(');
288
289    match split_setting {
290        Some((codec, rh)) => {
291            let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
292                DataFusionError::Configuration(format!(
293                    "Could not parse compression string. \
294                    Got codec: {codec} and unknown level from {str_setting}"
295                ))
296            })?;
297            Ok((codec.to_owned(), Some(*level)))
298        }
299        None => Ok((str_setting.to_owned(), None)),
300    }
301}
302
303/// Helper to ensure compression codecs which don't support levels
304/// don't have one set. E.g. snappy(2) is invalid.
305fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
306    if level.is_some() {
307        return Err(DataFusionError::Configuration(format!(
308            "Compression {codec} does not support specifying a level"
309        )));
310    }
311    Ok(())
312}
313
314/// Helper to ensure compression codecs which require a level
315/// do have one set. E.g. zstd is invalid, zstd(3) is valid
316fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
317    level.ok_or(DataFusionError::Configuration(format!(
318        "{codec} compression requires specifying a level such as {codec}(4)"
319    )))
320}
321
322/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
323pub fn parse_compression_string(
324    str_setting: &str,
325) -> Result<parquet::basic::Compression> {
326    let str_setting_lower: &str = &str_setting.to_lowercase();
327    let (codec, level) = split_compression_string(str_setting_lower)?;
328    let codec = codec.as_str();
329    match codec {
330        "uncompressed" => {
331            check_level_is_none(codec, &level)?;
332            Ok(parquet::basic::Compression::UNCOMPRESSED)
333        }
334        "snappy" => {
335            check_level_is_none(codec, &level)?;
336            Ok(parquet::basic::Compression::SNAPPY)
337        }
338        "gzip" => {
339            let level = require_level(codec, level)?;
340            Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
341                level,
342            )?))
343        }
344        "lzo" => {
345            check_level_is_none(codec, &level)?;
346            Ok(parquet::basic::Compression::LZO)
347        }
348        "brotli" => {
349            let level = require_level(codec, level)?;
350            Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
351                level,
352            )?))
353        }
354        "lz4" => {
355            check_level_is_none(codec, &level)?;
356            Ok(parquet::basic::Compression::LZ4)
357        }
358        "zstd" => {
359            let level = require_level(codec, level)?;
360            Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
361                level as i32,
362            )?))
363        }
364        "lz4_raw" => {
365            check_level_is_none(codec, &level)?;
366            Ok(parquet::basic::Compression::LZ4_RAW)
367        }
368        _ => Err(DataFusionError::Configuration(format!(
369            "Unknown or unsupported parquet compression: \
370        {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
371        lzo, brotli(level), lz4, zstd(level), and lz4_raw."
372        ))),
373    }
374}
375
376pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
377    let str_setting_lower: &str = &str_setting.to_lowercase();
378    match str_setting_lower {
379        "1.0" => Ok(WriterVersion::PARQUET_1_0),
380        "2.0" => Ok(WriterVersion::PARQUET_2_0),
381        _ => Err(DataFusionError::Configuration(format!(
382            "Unknown or unsupported parquet writer version {str_setting} \
383            valid options are 1.0 and 2.0"
384        ))),
385    }
386}
387
388pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
389    let str_setting_lower: &str = &str_setting.to_lowercase();
390    match str_setting_lower {
391        "none" => Ok(EnabledStatistics::None),
392        "chunk" => Ok(EnabledStatistics::Chunk),
393        "page" => Ok(EnabledStatistics::Page),
394        _ => Err(DataFusionError::Configuration(format!(
395            "Unknown or unsupported parquet statistics setting {str_setting} \
396            valid options are none, page, and chunk"
397        ))),
398    }
399}
400
401#[cfg(feature = "parquet")]
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use crate::config::{
406        ConfigFileEncryptionProperties, ParquetColumnOptions, ParquetEncryptionOptions,
407        ParquetOptions,
408    };
409    use parquet::basic::Compression;
410    use parquet::file::properties::{
411        BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
412        DEFAULT_BLOOM_FILTER_NDV,
413    };
414    use std::collections::HashMap;
415
416    const COL_NAME: &str = "configured";
417
418    /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
419    fn column_options_with_non_defaults(
420        src_col_defaults: &ParquetOptions,
421    ) -> ParquetColumnOptions {
422        ParquetColumnOptions {
423            compression: Some("zstd(22)".into()),
424            dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
425            statistics_enabled: Some("none".into()),
426            encoding: Some("RLE".into()),
427            bloom_filter_enabled: Some(true),
428            bloom_filter_fpp: Some(0.72),
429            bloom_filter_ndv: Some(72),
430        }
431    }
432
433    fn parquet_options_with_non_defaults() -> ParquetOptions {
434        let defaults = ParquetOptions::default();
435        let writer_version = if defaults.writer_version.eq("1.0") {
436            "2.0"
437        } else {
438            "1.0"
439        };
440
441        #[allow(deprecated)] // max_statistics_size
442        ParquetOptions {
443            data_pagesize_limit: 42,
444            write_batch_size: 42,
445            writer_version: writer_version.into(),
446            compression: Some("zstd(22)".into()),
447            dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
448            dictionary_page_size_limit: 42,
449            statistics_enabled: Some("chunk".into()),
450            max_row_group_size: 42,
451            created_by: "wordy".into(),
452            column_index_truncate_length: Some(42),
453            statistics_truncate_length: Some(42),
454            data_page_row_count_limit: 42,
455            encoding: Some("BYTE_STREAM_SPLIT".into()),
456            bloom_filter_on_write: !defaults.bloom_filter_on_write,
457            bloom_filter_fpp: Some(0.42),
458            bloom_filter_ndv: Some(42),
459
460            // not in WriterProperties, but itemizing here to not skip newly added props
461            enable_page_index: defaults.enable_page_index,
462            pruning: defaults.pruning,
463            skip_metadata: defaults.skip_metadata,
464            metadata_size_hint: defaults.metadata_size_hint,
465            pushdown_filters: defaults.pushdown_filters,
466            reorder_filters: defaults.reorder_filters,
467            allow_single_file_parallelism: defaults.allow_single_file_parallelism,
468            maximum_parallel_row_group_writers: defaults
469                .maximum_parallel_row_group_writers,
470            maximum_buffered_record_batches_per_stream: defaults
471                .maximum_buffered_record_batches_per_stream,
472            bloom_filter_on_read: defaults.bloom_filter_on_read,
473            schema_force_view_types: defaults.schema_force_view_types,
474            binary_as_string: defaults.binary_as_string,
475            skip_arrow_metadata: defaults.skip_arrow_metadata,
476            coerce_int96: None,
477            max_predicate_cache_size: defaults.max_predicate_cache_size,
478        }
479    }
480
481    fn extract_column_options(
482        props: &WriterProperties,
483        col: ColumnPath,
484    ) -> ParquetColumnOptions {
485        let bloom_filter_default_props = props.bloom_filter_properties(&col);
486
487        #[allow(deprecated)] // max_statistics_size
488        ParquetColumnOptions {
489            bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
490            encoding: props.encoding(&col).map(|s| s.to_string()),
491            dictionary_enabled: Some(props.dictionary_enabled(&col)),
492            compression: match props.compression(&col) {
493                Compression::ZSTD(lvl) => {
494                    Some(format!("zstd({})", lvl.compression_level()))
495                }
496                _ => None,
497            },
498            statistics_enabled: Some(
499                match props.statistics_enabled(&col) {
500                    EnabledStatistics::None => "none",
501                    EnabledStatistics::Chunk => "chunk",
502                    EnabledStatistics::Page => "page",
503                }
504                .into(),
505            ),
506            bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
507            bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
508        }
509    }
510
511    /// For testing only, take a single write's props and convert back into the session config.
512    /// (use identity to confirm correct.)
513    fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
514        let default_col = ColumnPath::from("col doesn't have specific config");
515        let default_col_props = extract_column_options(props, default_col);
516
517        let configured_col = ColumnPath::from(COL_NAME);
518        let configured_col_props = extract_column_options(props, configured_col);
519
520        let key_value_metadata = props
521            .key_value_metadata()
522            .map(|pairs| {
523                HashMap::from_iter(
524                    pairs
525                        .iter()
526                        .cloned()
527                        .map(|KeyValue { key, value }| (key, value)),
528                )
529            })
530            .unwrap_or_default();
531
532        let global_options_defaults = ParquetOptions::default();
533
534        let column_specific_options = if configured_col_props.eq(&default_col_props) {
535            HashMap::default()
536        } else {
537            HashMap::from([(COL_NAME.into(), configured_col_props)])
538        };
539
540        #[cfg(feature = "parquet_encryption")]
541        let fep = props
542            .file_encryption_properties()
543            .map(ConfigFileEncryptionProperties::from);
544
545        #[cfg(not(feature = "parquet_encryption"))]
546        let fep = None;
547
548        #[allow(deprecated)] // max_statistics_size
549        TableParquetOptions {
550            global: ParquetOptions {
551                // global options
552                data_pagesize_limit: props.dictionary_page_size_limit(),
553                write_batch_size: props.write_batch_size(),
554                writer_version: format!("{}.0", props.writer_version().as_num()),
555                dictionary_page_size_limit: props.dictionary_page_size_limit(),
556                max_row_group_size: props.max_row_group_size(),
557                created_by: props.created_by().to_string(),
558                column_index_truncate_length: props.column_index_truncate_length(),
559                statistics_truncate_length: props.statistics_truncate_length(),
560                data_page_row_count_limit: props.data_page_row_count_limit(),
561
562                // global options which set the default column props
563                encoding: default_col_props.encoding,
564                compression: default_col_props.compression,
565                dictionary_enabled: default_col_props.dictionary_enabled,
566                statistics_enabled: default_col_props.statistics_enabled,
567                bloom_filter_on_write: default_col_props
568                    .bloom_filter_enabled
569                    .unwrap_or_default(),
570                bloom_filter_fpp: default_col_props.bloom_filter_fpp,
571                bloom_filter_ndv: default_col_props.bloom_filter_ndv,
572
573                // not in WriterProperties
574                enable_page_index: global_options_defaults.enable_page_index,
575                pruning: global_options_defaults.pruning,
576                skip_metadata: global_options_defaults.skip_metadata,
577                metadata_size_hint: global_options_defaults.metadata_size_hint,
578                pushdown_filters: global_options_defaults.pushdown_filters,
579                reorder_filters: global_options_defaults.reorder_filters,
580                allow_single_file_parallelism: global_options_defaults
581                    .allow_single_file_parallelism,
582                maximum_parallel_row_group_writers: global_options_defaults
583                    .maximum_parallel_row_group_writers,
584                maximum_buffered_record_batches_per_stream: global_options_defaults
585                    .maximum_buffered_record_batches_per_stream,
586                bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
587                max_predicate_cache_size: global_options_defaults
588                    .max_predicate_cache_size,
589                schema_force_view_types: global_options_defaults.schema_force_view_types,
590                binary_as_string: global_options_defaults.binary_as_string,
591                skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
592                coerce_int96: None,
593            },
594            column_specific_options,
595            key_value_metadata,
596            crypto: ParquetEncryptionOptions {
597                file_encryption: fep,
598                file_decryption: None,
599                factory_id: None,
600                factory_options: Default::default(),
601            },
602        }
603    }
604
605    #[test]
606    fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
607        // TableParquetOptions, all props set to default
608        let mut table_parquet_opts = TableParquetOptions::default();
609        assert!(
610            !table_parquet_opts.global.skip_arrow_metadata,
611            "default false, to not skip the arrow schema requirement"
612        );
613
614        // see errors without the schema added, using default settings
615        let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
616        assert!(
617            should_error.is_err(),
618            "should error without the required arrow schema in kv_metadata",
619        );
620
621        // succeeds if we permit skipping the arrow schema
622        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
623        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
624        assert!(
625            should_succeed.is_ok(),
626            "should work with the arrow schema skipped by config",
627        );
628
629        // Set the arrow schema back to required
630        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
631        // add the arrow schema to the kv_meta
632        table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
633        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
634        assert!(
635            should_succeed.is_ok(),
636            "should work with the arrow schema included in TableParquetOptions",
637        );
638    }
639
640    #[test]
641    fn table_parquet_opts_to_writer_props() {
642        // ParquetOptions, all props set to non-default
643        let parquet_options = parquet_options_with_non_defaults();
644
645        // TableParquetOptions, using ParquetOptions for global settings
646        let key = ARROW_SCHEMA_META_KEY.to_string();
647        let value = Some("bar".into());
648        let table_parquet_opts = TableParquetOptions {
649            global: parquet_options.clone(),
650            column_specific_options: [(
651                COL_NAME.into(),
652                column_options_with_non_defaults(&parquet_options),
653            )]
654            .into(),
655            key_value_metadata: [(key, value)].into(),
656            crypto: Default::default(),
657        };
658
659        let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
660            .unwrap()
661            .build();
662        assert_eq!(
663            table_parquet_opts,
664            session_config_from_writer_props(&writer_props),
665            "the writer_props should have the same configuration as the session's TableParquetOptions",
666        );
667    }
668
669    /// Ensure that the configuration defaults for writing parquet files are
670    /// consistent with the options in arrow-rs
671    #[test]
672    fn test_defaults_match() {
673        // ensure the global settings are the same
674        let mut default_table_writer_opts = TableParquetOptions::default();
675        let default_parquet_opts = ParquetOptions::default();
676        assert_eq!(
677            default_table_writer_opts.global,
678            default_parquet_opts,
679            "should have matching defaults for TableParquetOptions.global and ParquetOptions",
680        );
681
682        // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
683        default_table_writer_opts =
684            default_table_writer_opts.with_skip_arrow_metadata(true);
685
686        // WriterProperties::default, a.k.a. using extern parquet's defaults
687        let default_writer_props = WriterProperties::new();
688
689        // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
690        let from_datafusion_defaults =
691            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
692                .unwrap()
693                .build();
694
695        // Expected: how the defaults should not match
696        assert_ne!(
697            default_writer_props.created_by(),
698            from_datafusion_defaults.created_by(),
699            "should have different created_by sources",
700        );
701        assert!(
702            default_writer_props.created_by().starts_with("parquet-rs version"),
703            "should indicate that writer_props defaults came from the extern parquet crate",
704        );
705        assert!(
706            default_table_writer_opts
707                .global
708                .created_by
709                .starts_with("datafusion version"),
710            "should indicate that table_parquet_opts defaults came from datafusion",
711        );
712
713        // Expected: the datafusion default compression is different from arrow-rs's parquet
714        assert_eq!(
715            default_writer_props.compression(&"default".into()),
716            Compression::UNCOMPRESSED,
717            "extern parquet's default is None"
718        );
719        assert!(
720            matches!(
721                from_datafusion_defaults.compression(&"default".into()),
722                Compression::ZSTD(_)
723            ),
724            "datafusion's default is zstd"
725        );
726
727        // Expected: the remaining should match
728        let same_created_by = default_table_writer_opts.global.created_by.clone();
729        let mut from_extern_parquet =
730            session_config_from_writer_props(&default_writer_props);
731        from_extern_parquet.global.created_by = same_created_by;
732        from_extern_parquet.global.compression = Some("zstd(3)".into());
733        from_extern_parquet.global.skip_arrow_metadata = true;
734
735        assert_eq!(
736            default_table_writer_opts,
737            from_extern_parquet,
738            "the default writer_props should have the same configuration as the session's default TableParquetOptions",
739        );
740    }
741
742    #[test]
743    fn test_bloom_filter_defaults() {
744        // the TableParquetOptions::default, with only the bloom filter turned on
745        let mut default_table_writer_opts = TableParquetOptions::default();
746        default_table_writer_opts.global.bloom_filter_on_write = true;
747        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
748        let from_datafusion_defaults =
749            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
750                .unwrap()
751                .build();
752
753        // the WriterProperties::default, with only the bloom filter turned on
754        let default_writer_props = WriterProperties::builder()
755            .set_bloom_filter_enabled(true)
756            .build();
757
758        assert_eq!(
759            default_writer_props.bloom_filter_properties(&"default".into()),
760            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
761            "parquet and datafusion props, should have the same bloom filter props",
762        );
763        assert_eq!(
764            default_writer_props.bloom_filter_properties(&"default".into()),
765            Some(&BloomFilterProperties::default()),
766            "should use the default bloom filter props"
767        );
768    }
769
770    #[test]
771    fn test_bloom_filter_set_fpp_only() {
772        // the TableParquetOptions::default, with only fpp set
773        let mut default_table_writer_opts = TableParquetOptions::default();
774        default_table_writer_opts.global.bloom_filter_on_write = true;
775        default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
776        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
777        let from_datafusion_defaults =
778            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
779                .unwrap()
780                .build();
781
782        // the WriterProperties::default, with only fpp set
783        let default_writer_props = WriterProperties::builder()
784            .set_bloom_filter_enabled(true)
785            .set_bloom_filter_fpp(0.42)
786            .build();
787
788        assert_eq!(
789            default_writer_props.bloom_filter_properties(&"default".into()),
790            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
791            "parquet and datafusion props, should have the same bloom filter props",
792        );
793        assert_eq!(
794            default_writer_props.bloom_filter_properties(&"default".into()),
795            Some(&BloomFilterProperties {
796                fpp: 0.42,
797                ndv: DEFAULT_BLOOM_FILTER_NDV
798            }),
799            "should have only the fpp set, and the ndv at default",
800        );
801    }
802
803    #[test]
804    fn test_bloom_filter_set_ndv_only() {
805        // the TableParquetOptions::default, with only ndv set
806        let mut default_table_writer_opts = TableParquetOptions::default();
807        default_table_writer_opts.global.bloom_filter_on_write = true;
808        default_table_writer_opts.global.bloom_filter_ndv = Some(42);
809        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
810        let from_datafusion_defaults =
811            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
812                .unwrap()
813                .build();
814
815        // the WriterProperties::default, with only ndv set
816        let default_writer_props = WriterProperties::builder()
817            .set_bloom_filter_enabled(true)
818            .set_bloom_filter_ndv(42)
819            .build();
820
821        assert_eq!(
822            default_writer_props.bloom_filter_properties(&"default".into()),
823            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
824            "parquet and datafusion props, should have the same bloom filter props",
825        );
826        assert_eq!(
827            default_writer_props.bloom_filter_properties(&"default".into()),
828            Some(&BloomFilterProperties {
829                fpp: DEFAULT_BLOOM_FILTER_FPP,
830                ndv: 42
831            }),
832            "should have only the ndv set, and the fpp at default",
833        );
834    }
835}