Skip to main content

datafusion_common/file_options/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how parquet files should be written
19
20use std::sync::Arc;
21
22use crate::{
23    _internal_datafusion_err, DataFusionError, Result,
24    config::{ParquetOptions, TableParquetOptions},
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29use parquet::{
30    arrow::ARROW_SCHEMA_META_KEY,
31    basic::{BrotliLevel, GzipLevel, ZstdLevel},
32    file::{
33        metadata::KeyValue,
34        properties::{
35            DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36            WriterPropertiesBuilder,
37        },
38    },
39    schema::types::ColumnPath,
40};
41
42/// Options for writing parquet files
43#[derive(Clone, Debug)]
44pub struct ParquetWriterOptions {
45    /// parquet-rs writer properties
46    pub writer_options: WriterProperties,
47}
48
49impl ParquetWriterOptions {
50    pub fn new(writer_options: WriterProperties) -> Self {
51        Self { writer_options }
52    }
53}
54
55impl ParquetWriterOptions {
56    pub fn writer_options(&self) -> &WriterProperties {
57        &self.writer_options
58    }
59}
60
61impl TableParquetOptions {
62    /// Add the arrow schema to the parquet kv_metadata.
63    /// If already exists, then overwrites.
64    pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65        self.key_value_metadata.insert(
66            ARROW_SCHEMA_META_KEY.into(),
67            Some(encode_arrow_schema(schema)),
68        );
69    }
70}
71
72impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
73    type Error = DataFusionError;
74
75    fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
76        // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
77        Ok(ParquetWriterOptions {
78            writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
79                .build(),
80        })
81    }
82}
83
84impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
85    type Error = DataFusionError;
86
87    /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
88    ///
89    /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
90    /// Note that any encryption options are ignored as building the `FileEncryptionProperties`
91    /// might require other inputs besides the [`TableParquetOptions`].
92    fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93        // Table options include kv_metadata and col-specific options
94        let TableParquetOptions {
95            global,
96            column_specific_options,
97            key_value_metadata,
98            crypto: _,
99        } = table_parquet_options;
100
101        let mut builder = global.into_writer_properties_builder()?;
102
103        // check that the arrow schema is present in the kv_metadata, if configured to do so
104        if !global.skip_arrow_metadata
105            && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
106        {
107            return Err(_internal_datafusion_err!(
108                "arrow schema was not added to the kv_metadata, even though it is required by configuration settings"
109            ));
110        }
111
112        // add kv_meta, if any
113        if !key_value_metadata.is_empty() {
114            builder = builder.set_key_value_metadata(Some(
115                key_value_metadata
116                    .to_owned()
117                    .drain()
118                    .map(|(key, value)| KeyValue { key, value })
119                    .collect(),
120            ));
121        }
122
123        // Apply column-specific options:
124        for (column, options) in column_specific_options {
125            let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127            if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128                builder = builder
129                    .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130            }
131
132            if let Some(encoding) = &options.encoding {
133                let parsed_encoding = parse_encoding_string(encoding)?;
134                builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135            }
136
137            if let Some(dictionary_enabled) = options.dictionary_enabled {
138                builder = builder
139                    .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140            }
141
142            if let Some(compression) = &options.compression {
143                let parsed_compression = parse_compression_string(compression)?;
144                builder =
145                    builder.set_column_compression(path.clone(), parsed_compression);
146            }
147
148            if let Some(statistics_enabled) = &options.statistics_enabled {
149                let parsed_value = parse_statistics_string(statistics_enabled)?;
150                builder =
151                    builder.set_column_statistics_enabled(path.clone(), parsed_value);
152            }
153
154            if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155                builder =
156                    builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157            }
158
159            if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160                builder =
161                    builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162            }
163        }
164
165        Ok(builder)
166    }
167}
168
169impl ParquetOptions {
170    /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
171    ///
172    /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
173    /// applied per column; a customization which is not applicable for [`ParquetOptions`].
174    ///
175    /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
176    pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
177        let ParquetOptions {
178            data_pagesize_limit,
179            write_batch_size,
180            writer_version,
181            compression,
182            dictionary_enabled,
183            dictionary_page_size_limit,
184            statistics_enabled,
185            max_row_group_size,
186            created_by,
187            column_index_truncate_length,
188            statistics_truncate_length,
189            data_page_row_count_limit,
190            encoding,
191            bloom_filter_on_write,
192            bloom_filter_fpp,
193            bloom_filter_ndv,
194
195            // not in WriterProperties
196            enable_page_index: _,
197            pruning: _,
198            skip_metadata: _,
199            metadata_size_hint: _,
200            pushdown_filters: _,
201            reorder_filters: _,
202            force_filter_selections: _, // not used for writer props
203            allow_single_file_parallelism: _,
204            maximum_parallel_row_group_writers: _,
205            maximum_buffered_record_batches_per_stream: _,
206            bloom_filter_on_read: _, // reads not used for writer props
207            schema_force_view_types: _,
208            binary_as_string: _, // not used for writer props
209            coerce_int96: _,     // not used for writer props
210            skip_arrow_metadata: _,
211            max_predicate_cache_size: _,
212        } = self;
213
214        let mut builder = WriterProperties::builder()
215            .set_data_page_size_limit(*data_pagesize_limit)
216            .set_write_batch_size(*write_batch_size)
217            .set_writer_version((*writer_version).into())
218            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
219            .set_statistics_enabled(
220                statistics_enabled
221                    .as_ref()
222                    .and_then(|s| parse_statistics_string(s).ok())
223                    .unwrap_or(DEFAULT_STATISTICS_ENABLED),
224            )
225            .set_max_row_group_row_count(Some(*max_row_group_size))
226            .set_created_by(created_by.clone())
227            .set_column_index_truncate_length(*column_index_truncate_length)
228            .set_statistics_truncate_length(*statistics_truncate_length)
229            .set_data_page_row_count_limit(*data_page_row_count_limit)
230            .set_bloom_filter_enabled(*bloom_filter_on_write);
231
232        if let Some(bloom_filter_fpp) = bloom_filter_fpp {
233            builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
234        };
235        if let Some(bloom_filter_ndv) = bloom_filter_ndv {
236            builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
237        };
238        if let Some(dictionary_enabled) = dictionary_enabled {
239            builder = builder.set_dictionary_enabled(*dictionary_enabled);
240        };
241
242        // We do not have access to default ColumnProperties set in Arrow.
243        // Therefore, only overwrite if these settings exist.
244        if let Some(compression) = compression {
245            builder = builder.set_compression(parse_compression_string(compression)?);
246        }
247        if let Some(encoding) = encoding {
248            builder = builder.set_encoding(parse_encoding_string(encoding)?);
249        }
250
251        Ok(builder)
252    }
253}
254
255/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
256pub(crate) fn parse_encoding_string(
257    str_setting: &str,
258) -> Result<parquet::basic::Encoding> {
259    let str_setting_lower: &str = &str_setting.to_lowercase();
260    match str_setting_lower {
261        "plain" => Ok(parquet::basic::Encoding::PLAIN),
262        "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
263        "rle" => Ok(parquet::basic::Encoding::RLE),
264        #[expect(deprecated)]
265        "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
266        "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
267        "delta_length_byte_array" => {
268            Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
269        }
270        "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
271        "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
272        "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
273        _ => Err(DataFusionError::Configuration(format!(
274            "Unknown or unsupported parquet encoding: \
275        {str_setting}. Valid values are: plain, plain_dictionary, rle, \
276        bit_packed, delta_binary_packed, delta_length_byte_array, \
277        delta_byte_array, rle_dictionary, and byte_stream_split."
278        ))),
279    }
280}
281
282/// Splits compression string into compression codec and optional compression_level
283/// I.e. gzip(2) -> gzip, 2
284fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
285    // ignore string literal chars passed from sqlparser i.e. remove single quotes
286    let str_setting = str_setting.replace('\'', "");
287    let split_setting = str_setting.split_once('(');
288
289    match split_setting {
290        Some((codec, rh)) => {
291            let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
292                DataFusionError::Configuration(format!(
293                    "Could not parse compression string. \
294                    Got codec: {codec} and unknown level from {str_setting}"
295                ))
296            })?;
297            Ok((codec.to_owned(), Some(*level)))
298        }
299        None => Ok((str_setting.to_owned(), None)),
300    }
301}
302
303/// Helper to ensure compression codecs which don't support levels
304/// don't have one set. E.g. snappy(2) is invalid.
305fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
306    if level.is_some() {
307        return Err(DataFusionError::Configuration(format!(
308            "Compression {codec} does not support specifying a level"
309        )));
310    }
311    Ok(())
312}
313
314/// Helper to ensure compression codecs which require a level
315/// do have one set. E.g. zstd is invalid, zstd(3) is valid
316fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
317    level.ok_or(DataFusionError::Configuration(format!(
318        "{codec} compression requires specifying a level such as {codec}(4)"
319    )))
320}
321
322/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
323pub fn parse_compression_string(
324    str_setting: &str,
325) -> Result<parquet::basic::Compression> {
326    let str_setting_lower: &str = &str_setting.to_lowercase();
327    let (codec, level) = split_compression_string(str_setting_lower)?;
328    let codec = codec.as_str();
329    match codec {
330        "uncompressed" => {
331            check_level_is_none(codec, &level)?;
332            Ok(parquet::basic::Compression::UNCOMPRESSED)
333        }
334        "snappy" => {
335            check_level_is_none(codec, &level)?;
336            Ok(parquet::basic::Compression::SNAPPY)
337        }
338        "gzip" => {
339            let level = require_level(codec, level)?;
340            Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
341                level,
342            )?))
343        }
344        "brotli" => {
345            let level = require_level(codec, level)?;
346            Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
347                level,
348            )?))
349        }
350        "lz4" => {
351            check_level_is_none(codec, &level)?;
352            Ok(parquet::basic::Compression::LZ4)
353        }
354        "zstd" => {
355            let level = require_level(codec, level)?;
356            Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
357                level as i32,
358            )?))
359        }
360        "lz4_raw" => {
361            check_level_is_none(codec, &level)?;
362            Ok(parquet::basic::Compression::LZ4_RAW)
363        }
364        _ => Err(DataFusionError::Configuration(format!(
365            "Unknown or unsupported parquet compression: \
366        {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
367        brotli(level), lz4, zstd(level), and lz4_raw."
368        ))),
369    }
370}
371
372pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
373    let str_setting_lower: &str = &str_setting.to_lowercase();
374    match str_setting_lower {
375        "none" => Ok(EnabledStatistics::None),
376        "chunk" => Ok(EnabledStatistics::Chunk),
377        "page" => Ok(EnabledStatistics::Page),
378        _ => Err(DataFusionError::Configuration(format!(
379            "Unknown or unsupported parquet statistics setting {str_setting} \
380            valid options are none, page, and chunk"
381        ))),
382    }
383}
384
385#[cfg(feature = "parquet")]
386#[cfg(test)]
387mod tests {
388    use super::*;
389    #[cfg(feature = "parquet_encryption")]
390    use crate::config::ConfigFileEncryptionProperties;
391    use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
392    use crate::parquet_config::DFParquetWriterVersion;
393    use parquet::basic::Compression;
394    use parquet::file::properties::{
395        BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
396        DEFAULT_MAX_ROW_GROUP_ROW_COUNT, EnabledStatistics,
397    };
398    use std::collections::HashMap;
399
400    const COL_NAME: &str = "configured";
401
402    /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
403    fn column_options_with_non_defaults(
404        src_col_defaults: &ParquetOptions,
405    ) -> ParquetColumnOptions {
406        ParquetColumnOptions {
407            compression: Some("zstd(22)".into()),
408            dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
409            statistics_enabled: Some("none".into()),
410            encoding: Some("RLE".into()),
411            bloom_filter_enabled: Some(true),
412            bloom_filter_fpp: Some(0.72),
413            bloom_filter_ndv: Some(72),
414        }
415    }
416
417    fn parquet_options_with_non_defaults() -> ParquetOptions {
418        let defaults = ParquetOptions::default();
419        let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
420        {
421            DFParquetWriterVersion::V2_0
422        } else {
423            DFParquetWriterVersion::V1_0
424        };
425
426        ParquetOptions {
427            data_pagesize_limit: 42,
428            write_batch_size: 42,
429            writer_version,
430            compression: Some("zstd(22)".into()),
431            dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
432            dictionary_page_size_limit: 42,
433            statistics_enabled: Some("chunk".into()),
434            max_row_group_size: 42,
435            created_by: "wordy".into(),
436            column_index_truncate_length: Some(42),
437            statistics_truncate_length: Some(42),
438            data_page_row_count_limit: 42,
439            encoding: Some("BYTE_STREAM_SPLIT".into()),
440            bloom_filter_on_write: !defaults.bloom_filter_on_write,
441            bloom_filter_fpp: Some(0.42),
442            bloom_filter_ndv: Some(42),
443
444            // not in WriterProperties, but itemizing here to not skip newly added props
445            enable_page_index: defaults.enable_page_index,
446            pruning: defaults.pruning,
447            skip_metadata: defaults.skip_metadata,
448            metadata_size_hint: defaults.metadata_size_hint,
449            pushdown_filters: defaults.pushdown_filters,
450            reorder_filters: defaults.reorder_filters,
451            force_filter_selections: defaults.force_filter_selections,
452            allow_single_file_parallelism: defaults.allow_single_file_parallelism,
453            maximum_parallel_row_group_writers: defaults
454                .maximum_parallel_row_group_writers,
455            maximum_buffered_record_batches_per_stream: defaults
456                .maximum_buffered_record_batches_per_stream,
457            bloom_filter_on_read: defaults.bloom_filter_on_read,
458            schema_force_view_types: defaults.schema_force_view_types,
459            binary_as_string: defaults.binary_as_string,
460            skip_arrow_metadata: defaults.skip_arrow_metadata,
461            coerce_int96: None,
462            max_predicate_cache_size: defaults.max_predicate_cache_size,
463        }
464    }
465
466    fn extract_column_options(
467        props: &WriterProperties,
468        col: ColumnPath,
469    ) -> ParquetColumnOptions {
470        let bloom_filter_default_props = props.bloom_filter_properties(&col);
471
472        ParquetColumnOptions {
473            bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
474            encoding: props.encoding(&col).map(|s| s.to_string()),
475            dictionary_enabled: Some(props.dictionary_enabled(&col)),
476            compression: match props.compression(&col) {
477                Compression::ZSTD(lvl) => {
478                    Some(format!("zstd({})", lvl.compression_level()))
479                }
480                _ => None,
481            },
482            statistics_enabled: Some(
483                match props.statistics_enabled(&col) {
484                    EnabledStatistics::None => "none",
485                    EnabledStatistics::Chunk => "chunk",
486                    EnabledStatistics::Page => "page",
487                }
488                .into(),
489            ),
490            bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
491            bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
492        }
493    }
494
495    /// For testing only, take a single write's props and convert back into the session config.
496    /// (use identity to confirm correct.)
497    fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
498        let default_col = ColumnPath::from("col doesn't have specific config");
499        let default_col_props = extract_column_options(props, default_col);
500
501        let configured_col = ColumnPath::from(COL_NAME);
502        let configured_col_props = extract_column_options(props, configured_col);
503
504        let key_value_metadata = props
505            .key_value_metadata()
506            .map(|pairs| {
507                HashMap::from_iter(
508                    pairs
509                        .iter()
510                        .cloned()
511                        .map(|KeyValue { key, value }| (key, value)),
512                )
513            })
514            .unwrap_or_default();
515
516        let global_options_defaults = ParquetOptions::default();
517
518        let column_specific_options = if configured_col_props.eq(&default_col_props) {
519            HashMap::default()
520        } else {
521            HashMap::from([(COL_NAME.into(), configured_col_props)])
522        };
523
524        #[cfg(feature = "parquet_encryption")]
525        let fep = props
526            .file_encryption_properties()
527            .map(ConfigFileEncryptionProperties::from);
528
529        #[cfg(not(feature = "parquet_encryption"))]
530        let fep = None;
531
532        TableParquetOptions {
533            global: ParquetOptions {
534                // global options
535                data_pagesize_limit: props.dictionary_page_size_limit(),
536                write_batch_size: props.write_batch_size(),
537                writer_version: props.writer_version().into(),
538                dictionary_page_size_limit: props.dictionary_page_size_limit(),
539                max_row_group_size: props
540                    .max_row_group_row_count()
541                    .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
542                created_by: props.created_by().to_string(),
543                column_index_truncate_length: props.column_index_truncate_length(),
544                statistics_truncate_length: props.statistics_truncate_length(),
545                data_page_row_count_limit: props.data_page_row_count_limit(),
546
547                // global options which set the default column props
548                encoding: default_col_props.encoding,
549                compression: default_col_props.compression,
550                dictionary_enabled: default_col_props.dictionary_enabled,
551                statistics_enabled: default_col_props.statistics_enabled,
552                bloom_filter_on_write: default_col_props
553                    .bloom_filter_enabled
554                    .unwrap_or_default(),
555                bloom_filter_fpp: default_col_props.bloom_filter_fpp,
556                bloom_filter_ndv: default_col_props.bloom_filter_ndv,
557
558                // not in WriterProperties
559                enable_page_index: global_options_defaults.enable_page_index,
560                pruning: global_options_defaults.pruning,
561                skip_metadata: global_options_defaults.skip_metadata,
562                metadata_size_hint: global_options_defaults.metadata_size_hint,
563                pushdown_filters: global_options_defaults.pushdown_filters,
564                reorder_filters: global_options_defaults.reorder_filters,
565                force_filter_selections: global_options_defaults.force_filter_selections,
566                allow_single_file_parallelism: global_options_defaults
567                    .allow_single_file_parallelism,
568                maximum_parallel_row_group_writers: global_options_defaults
569                    .maximum_parallel_row_group_writers,
570                maximum_buffered_record_batches_per_stream: global_options_defaults
571                    .maximum_buffered_record_batches_per_stream,
572                bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
573                max_predicate_cache_size: global_options_defaults
574                    .max_predicate_cache_size,
575                schema_force_view_types: global_options_defaults.schema_force_view_types,
576                binary_as_string: global_options_defaults.binary_as_string,
577                skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
578                coerce_int96: None,
579            },
580            column_specific_options,
581            key_value_metadata,
582            crypto: ParquetEncryptionOptions {
583                file_encryption: fep,
584                file_decryption: None,
585                factory_id: None,
586                factory_options: Default::default(),
587            },
588        }
589    }
590
591    #[test]
592    fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
593        // TableParquetOptions, all props set to default
594        let mut table_parquet_opts = TableParquetOptions::default();
595        assert!(
596            !table_parquet_opts.global.skip_arrow_metadata,
597            "default false, to not skip the arrow schema requirement"
598        );
599
600        // see errors without the schema added, using default settings
601        let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
602        assert!(
603            should_error.is_err(),
604            "should error without the required arrow schema in kv_metadata",
605        );
606
607        // succeeds if we permit skipping the arrow schema
608        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
609        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
610        assert!(
611            should_succeed.is_ok(),
612            "should work with the arrow schema skipped by config",
613        );
614
615        // Set the arrow schema back to required
616        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
617        // add the arrow schema to the kv_meta
618        table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
619        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
620        assert!(
621            should_succeed.is_ok(),
622            "should work with the arrow schema included in TableParquetOptions",
623        );
624    }
625
626    #[test]
627    fn table_parquet_opts_to_writer_props() {
628        // ParquetOptions, all props set to non-default
629        let parquet_options = parquet_options_with_non_defaults();
630
631        // TableParquetOptions, using ParquetOptions for global settings
632        let key = ARROW_SCHEMA_META_KEY.to_string();
633        let value = Some("bar".into());
634        let table_parquet_opts = TableParquetOptions {
635            global: parquet_options.clone(),
636            column_specific_options: [(
637                COL_NAME.into(),
638                column_options_with_non_defaults(&parquet_options),
639            )]
640            .into(),
641            key_value_metadata: [(key, value)].into(),
642            crypto: Default::default(),
643        };
644
645        let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
646            .unwrap()
647            .build();
648        assert_eq!(
649            table_parquet_opts,
650            session_config_from_writer_props(&writer_props),
651            "the writer_props should have the same configuration as the session's TableParquetOptions",
652        );
653    }
654
655    /// Ensure that the configuration defaults for writing parquet files are
656    /// consistent with the options in arrow-rs
657    #[test]
658    fn test_defaults_match() {
659        // ensure the global settings are the same
660        let mut default_table_writer_opts = TableParquetOptions::default();
661        let default_parquet_opts = ParquetOptions::default();
662        assert_eq!(
663            default_table_writer_opts.global, default_parquet_opts,
664            "should have matching defaults for TableParquetOptions.global and ParquetOptions",
665        );
666
667        // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
668        default_table_writer_opts =
669            default_table_writer_opts.with_skip_arrow_metadata(true);
670
671        // WriterProperties::default, a.k.a. using extern parquet's defaults
672        let default_writer_props = WriterProperties::new();
673
674        // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
675        let from_datafusion_defaults =
676            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
677                .unwrap()
678                .build();
679
680        // Expected: how the defaults should not match
681        assert_ne!(
682            default_writer_props.created_by(),
683            from_datafusion_defaults.created_by(),
684            "should have different created_by sources",
685        );
686        assert!(
687            default_writer_props
688                .created_by()
689                .starts_with("parquet-rs version"),
690            "should indicate that writer_props defaults came from the extern parquet crate",
691        );
692        assert!(
693            default_table_writer_opts
694                .global
695                .created_by
696                .starts_with("datafusion version"),
697            "should indicate that table_parquet_opts defaults came from datafusion",
698        );
699
700        // Expected: the datafusion default compression is different from arrow-rs's parquet
701        assert_eq!(
702            default_writer_props.compression(&"default".into()),
703            Compression::UNCOMPRESSED,
704            "extern parquet's default is None"
705        );
706        assert!(
707            matches!(
708                from_datafusion_defaults.compression(&"default".into()),
709                Compression::ZSTD(_)
710            ),
711            "datafusion's default is zstd"
712        );
713
714        // Expected: the remaining should match
715        let same_created_by = default_table_writer_opts.global.created_by.clone();
716        let mut from_extern_parquet =
717            session_config_from_writer_props(&default_writer_props);
718        from_extern_parquet.global.created_by = same_created_by;
719        from_extern_parquet.global.compression = Some("zstd(3)".into());
720        from_extern_parquet.global.skip_arrow_metadata = true;
721
722        assert_eq!(
723            default_table_writer_opts, from_extern_parquet,
724            "the default writer_props should have the same configuration as the session's default TableParquetOptions",
725        );
726    }
727
728    #[test]
729    fn test_bloom_filter_defaults() {
730        // the TableParquetOptions::default, with only the bloom filter turned on
731        let mut default_table_writer_opts = TableParquetOptions::default();
732        default_table_writer_opts.global.bloom_filter_on_write = true;
733        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
734        let from_datafusion_defaults =
735            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
736                .unwrap()
737                .build();
738
739        // the WriterProperties::default, with only the bloom filter turned on
740        let default_writer_props = WriterProperties::builder()
741            .set_bloom_filter_enabled(true)
742            .build();
743
744        assert_eq!(
745            default_writer_props.bloom_filter_properties(&"default".into()),
746            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
747            "parquet and datafusion props, should have the same bloom filter props",
748        );
749        assert_eq!(
750            default_writer_props.bloom_filter_properties(&"default".into()),
751            Some(&BloomFilterProperties::default()),
752            "should use the default bloom filter props"
753        );
754    }
755
756    #[test]
757    fn test_bloom_filter_set_fpp_only() {
758        // the TableParquetOptions::default, with only fpp set
759        let mut default_table_writer_opts = TableParquetOptions::default();
760        default_table_writer_opts.global.bloom_filter_on_write = true;
761        default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
762        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
763        let from_datafusion_defaults =
764            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
765                .unwrap()
766                .build();
767
768        // the WriterProperties::default, with only fpp set
769        let default_writer_props = WriterProperties::builder()
770            .set_bloom_filter_enabled(true)
771            .set_bloom_filter_fpp(0.42)
772            .build();
773
774        assert_eq!(
775            default_writer_props.bloom_filter_properties(&"default".into()),
776            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
777            "parquet and datafusion props, should have the same bloom filter props",
778        );
779        assert_eq!(
780            default_writer_props.bloom_filter_properties(&"default".into()),
781            Some(&BloomFilterProperties {
782                fpp: 0.42,
783                ndv: DEFAULT_BLOOM_FILTER_NDV
784            }),
785            "should have only the fpp set, and the ndv at default",
786        );
787    }
788
789    #[test]
790    fn test_bloom_filter_set_ndv_only() {
791        // the TableParquetOptions::default, with only ndv set
792        let mut default_table_writer_opts = TableParquetOptions::default();
793        default_table_writer_opts.global.bloom_filter_on_write = true;
794        default_table_writer_opts.global.bloom_filter_ndv = Some(42);
795        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
796        let from_datafusion_defaults =
797            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
798                .unwrap()
799                .build();
800
801        // the WriterProperties::default, with only ndv set
802        let default_writer_props = WriterProperties::builder()
803            .set_bloom_filter_enabled(true)
804            .set_bloom_filter_ndv(42)
805            .build();
806
807        assert_eq!(
808            default_writer_props.bloom_filter_properties(&"default".into()),
809            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
810            "parquet and datafusion props, should have the same bloom filter props",
811        );
812        assert_eq!(
813            default_writer_props.bloom_filter_properties(&"default".into()),
814            Some(&BloomFilterProperties {
815                fpp: DEFAULT_BLOOM_FILTER_FPP,
816                ndv: 42
817            }),
818            "should have only the ndv set, and the fpp at default",
819        );
820    }
821}