Skip to main content

datafusion_common/file_options/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how parquet files should be written
19
20use std::sync::Arc;
21
22use crate::{
23    _internal_datafusion_err, DataFusionError, Result,
24    config::{ParquetCdcOptions, ParquetOptions, TableParquetOptions},
25};
26
27use arrow::datatypes::Schema;
28use parquet::arrow::encode_arrow_schema;
29use parquet::{
30    arrow::ARROW_SCHEMA_META_KEY,
31    basic::{BrotliLevel, GzipLevel, ZstdLevel},
32    file::{
33        metadata::KeyValue,
34        properties::{
35            DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36            WriterPropertiesBuilder,
37        },
38    },
39    schema::types::ColumnPath,
40};
41
42/// Options for writing parquet files
43#[derive(Clone, Debug)]
44pub struct ParquetWriterOptions {
45    /// parquet-rs writer properties
46    pub writer_options: WriterProperties,
47}
48
49impl ParquetWriterOptions {
50    pub fn new(writer_options: WriterProperties) -> Self {
51        Self { writer_options }
52    }
53}
54
55impl ParquetWriterOptions {
56    pub fn writer_options(&self) -> &WriterProperties {
57        &self.writer_options
58    }
59}
60
61impl TableParquetOptions {
62    /// Add the arrow schema to the parquet kv_metadata.
63    /// If already exists, then overwrites.
64    pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65        self.key_value_metadata.insert(
66            ARROW_SCHEMA_META_KEY.into(),
67            Some(encode_arrow_schema(schema)),
68        );
69    }
70}
71
72impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
73    type Error = DataFusionError;
74
75    fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
76        // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
77        Ok(ParquetWriterOptions {
78            writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
79                .build(),
80        })
81    }
82}
83
84impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
85    type Error = DataFusionError;
86
87    /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
88    ///
89    /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
90    /// Note that any encryption options are ignored as building the `FileEncryptionProperties`
91    /// might require other inputs besides the [`TableParquetOptions`].
92    fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93        // Table options include kv_metadata and col-specific options
94        let TableParquetOptions {
95            global,
96            column_specific_options,
97            key_value_metadata,
98            ..
99        } = table_parquet_options;
100
101        let mut builder = global.into_writer_properties_builder()?;
102
103        // check that the arrow schema is present in the kv_metadata, if configured to do so
104        if !global.skip_arrow_metadata
105            && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
106        {
107            return Err(_internal_datafusion_err!(
108                "arrow schema was not added to the kv_metadata, even though it is required by configuration settings"
109            ));
110        }
111
112        // add kv_meta, if any
113        if !key_value_metadata.is_empty() {
114            builder = builder.set_key_value_metadata(Some(
115                key_value_metadata
116                    .to_owned()
117                    .drain()
118                    .map(|(key, value)| KeyValue { key, value })
119                    .collect(),
120            ));
121        }
122
123        // Apply column-specific options:
124        for (column, options) in column_specific_options {
125            let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127            if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128                builder = builder
129                    .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130            }
131
132            if let Some(encoding) = &options.encoding {
133                let parsed_encoding = parse_encoding_string(encoding)?;
134                builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135            }
136
137            if let Some(dictionary_enabled) = options.dictionary_enabled {
138                builder = builder
139                    .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140            }
141
142            if let Some(compression) = &options.compression {
143                let parsed_compression = parse_compression_string(compression)?;
144                builder =
145                    builder.set_column_compression(path.clone(), parsed_compression);
146            }
147
148            if let Some(statistics_enabled) = &options.statistics_enabled {
149                let parsed_value = parse_statistics_string(statistics_enabled)?;
150                builder =
151                    builder.set_column_statistics_enabled(path.clone(), parsed_value);
152            }
153
154            if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155                builder =
156                    builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157            }
158
159            if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160                builder =
161                    builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162            }
163        }
164
165        Ok(builder)
166    }
167}
168
169/// Convert DataFusion's [`ParquetCdcOptions`] into parquet-rs's `Option<CdcOptions>`.
170///
171/// parquet-rs has no `enabled` flag; CDC is on when the option is `Some`. So a
172/// disabled [`ParquetCdcOptions`] maps to `None`, and an enabled one to `Some`
173/// with the chunking parameters.
174impl From<&ParquetCdcOptions> for Option<parquet::file::properties::CdcOptions> {
175    fn from(value: &ParquetCdcOptions) -> Self {
176        value
177            .enabled
178            .then_some(parquet::file::properties::CdcOptions {
179                min_chunk_size: value.min_chunk_size,
180                max_chunk_size: value.max_chunk_size,
181                norm_level: value.norm_level,
182            })
183    }
184}
185
186/// Convert parquet-rs's `Option<&CdcOptions>` back into DataFusion's
187/// [`ParquetCdcOptions`].
188///
189/// The presence of parquet-rs options means CDC was enabled, so `Some` maps to
190/// `enabled: true`; `None` yields the disabled default.
191impl From<Option<&parquet::file::properties::CdcOptions>> for ParquetCdcOptions {
192    fn from(value: Option<&parquet::file::properties::CdcOptions>) -> Self {
193        match value {
194            Some(cdc) => ParquetCdcOptions {
195                enabled: true,
196                min_chunk_size: cdc.min_chunk_size,
197                max_chunk_size: cdc.max_chunk_size,
198                norm_level: cdc.norm_level,
199            },
200            None => ParquetCdcOptions::default(),
201        }
202    }
203}
204
205impl ParquetOptions {
206    /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
207    ///
208    /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
209    /// applied per column; a customization which is not applicable for [`ParquetOptions`].
210    ///
211    /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
212    pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
213        let ParquetOptions {
214            data_pagesize_limit,
215            write_batch_size,
216            writer_version,
217            compression,
218            dictionary_enabled,
219            dictionary_page_size_limit,
220            statistics_enabled,
221            max_row_group_size,
222            created_by,
223            column_index_truncate_length,
224            statistics_truncate_length,
225            data_page_row_count_limit,
226            encoding,
227            bloom_filter_on_write,
228            bloom_filter_fpp,
229            bloom_filter_ndv,
230            content_defined_chunking,
231
232            // not in WriterProperties
233            enable_page_index: _,
234            pruning: _,
235            skip_metadata: _,
236            metadata_size_hint: _,
237            pushdown_filters: _,
238            reorder_filters: _,
239            force_filter_selections: _, // not used for writer props
240            allow_single_file_parallelism: _,
241            maximum_parallel_row_group_writers: _,
242            maximum_buffered_record_batches_per_stream: _,
243            bloom_filter_on_read: _, // reads not used for writer props
244            schema_force_view_types: _,
245            binary_as_string: _, // not used for writer props
246            coerce_int96: _,     // not used for writer props
247            coerce_int96_tz: _,  // not used for writer props
248            skip_arrow_metadata: _,
249            max_predicate_cache_size: _,
250        } = self;
251
252        let mut builder = WriterProperties::builder()
253            .set_data_page_size_limit(*data_pagesize_limit)
254            .set_write_batch_size(*write_batch_size)
255            .set_writer_version((*writer_version).into())
256            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
257            .set_statistics_enabled(
258                statistics_enabled
259                    .as_ref()
260                    .and_then(|s| parse_statistics_string(s).ok())
261                    .unwrap_or(DEFAULT_STATISTICS_ENABLED),
262            )
263            .set_max_row_group_row_count(Some(*max_row_group_size))
264            .set_created_by(created_by.clone())
265            .set_column_index_truncate_length(*column_index_truncate_length)
266            .set_statistics_truncate_length(*statistics_truncate_length)
267            .set_data_page_row_count_limit(*data_page_row_count_limit)
268            .set_bloom_filter_enabled(*bloom_filter_on_write);
269
270        if let Some(bloom_filter_fpp) = bloom_filter_fpp {
271            builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
272        };
273        if let Some(bloom_filter_ndv) = bloom_filter_ndv {
274            builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
275        };
276        if let Some(dictionary_enabled) = dictionary_enabled {
277            builder = builder.set_dictionary_enabled(*dictionary_enabled);
278        };
279
280        // We do not have access to default ColumnProperties set in Arrow.
281        // Therefore, only overwrite if these settings exist.
282        if let Some(compression) = compression {
283            builder = builder.set_compression(parse_compression_string(compression)?);
284        }
285        if let Some(encoding) = encoding {
286            builder = builder.set_encoding(parse_encoding_string(encoding)?);
287        }
288        builder = builder.set_content_defined_chunking(content_defined_chunking.into());
289
290        Ok(builder)
291    }
292}
293
294/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
295pub(crate) fn parse_encoding_string(
296    str_setting: &str,
297) -> Result<parquet::basic::Encoding> {
298    let str_setting_lower: &str = &str_setting.to_lowercase();
299    match str_setting_lower {
300        "plain" => Ok(parquet::basic::Encoding::PLAIN),
301        "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
302        "rle" => Ok(parquet::basic::Encoding::RLE),
303        #[expect(deprecated)]
304        "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
305        "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
306        "delta_length_byte_array" => {
307            Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
308        }
309        "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
310        "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
311        "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
312        _ => Err(DataFusionError::Configuration(format!(
313            "Unknown or unsupported parquet encoding: \
314        {str_setting}. Valid values are: plain, plain_dictionary, rle, \
315        bit_packed, delta_binary_packed, delta_length_byte_array, \
316        delta_byte_array, rle_dictionary, and byte_stream_split."
317        ))),
318    }
319}
320
321/// Splits compression string into compression codec and optional compression_level
322/// I.e. gzip(2) -> gzip, 2
323fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
324    // ignore string literal chars passed from sqlparser i.e. remove single quotes
325    let str_setting = str_setting.replace('\'', "");
326    let split_setting = str_setting.split_once('(');
327
328    match split_setting {
329        Some((codec, rh)) => {
330            let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
331                DataFusionError::Configuration(format!(
332                    "Could not parse compression string. \
333                    Got codec: {codec} and unknown level from {str_setting}"
334                ))
335            })?;
336            Ok((codec.to_owned(), Some(*level)))
337        }
338        None => Ok((str_setting.to_owned(), None)),
339    }
340}
341
342/// Helper to ensure compression codecs which don't support levels
343/// don't have one set. E.g. snappy(2) is invalid.
344fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
345    if level.is_some() {
346        return Err(DataFusionError::Configuration(format!(
347            "Compression {codec} does not support specifying a level"
348        )));
349    }
350    Ok(())
351}
352
353/// Helper to ensure compression codecs which require a level
354/// do have one set. E.g. zstd is invalid, zstd(3) is valid
355fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
356    level.ok_or(DataFusionError::Configuration(format!(
357        "{codec} compression requires specifying a level such as {codec}(4)"
358    )))
359}
360
361/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
362pub fn parse_compression_string(
363    str_setting: &str,
364) -> Result<parquet::basic::Compression> {
365    let str_setting_lower: &str = &str_setting.to_lowercase();
366    let (codec, level) = split_compression_string(str_setting_lower)?;
367    let codec = codec.as_str();
368    match codec {
369        "uncompressed" => {
370            check_level_is_none(codec, &level)?;
371            Ok(parquet::basic::Compression::UNCOMPRESSED)
372        }
373        "snappy" => {
374            check_level_is_none(codec, &level)?;
375            Ok(parquet::basic::Compression::SNAPPY)
376        }
377        "gzip" => {
378            let level = require_level(codec, level)?;
379            Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
380                level,
381            )?))
382        }
383        "brotli" => {
384            let level = require_level(codec, level)?;
385            Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
386                level,
387            )?))
388        }
389        "lz4" => {
390            check_level_is_none(codec, &level)?;
391            Ok(parquet::basic::Compression::LZ4)
392        }
393        "zstd" => {
394            let level = require_level(codec, level)?;
395            Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
396                level as i32,
397            )?))
398        }
399        "lz4_raw" => {
400            check_level_is_none(codec, &level)?;
401            Ok(parquet::basic::Compression::LZ4_RAW)
402        }
403        _ => Err(DataFusionError::Configuration(format!(
404            "Unknown or unsupported parquet compression: \
405        {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
406        brotli(level), lz4, zstd(level), and lz4_raw."
407        ))),
408    }
409}
410
411pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
412    let str_setting_lower: &str = &str_setting.to_lowercase();
413    match str_setting_lower {
414        "none" => Ok(EnabledStatistics::None),
415        "chunk" => Ok(EnabledStatistics::Chunk),
416        "page" => Ok(EnabledStatistics::Page),
417        _ => Err(DataFusionError::Configuration(format!(
418            "Unknown or unsupported parquet statistics setting {str_setting} \
419            valid options are none, page, and chunk"
420        ))),
421    }
422}
423
424#[cfg(feature = "parquet")]
425#[cfg(test)]
426mod tests {
427    use super::*;
428    #[cfg(feature = "parquet_encryption")]
429    use crate::config::ConfigFileEncryptionProperties;
430    use crate::config::{
431        ParquetCdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
432    };
433    use crate::parquet_config::DFParquetWriterVersion;
434    use parquet::basic::Compression;
435    use parquet::file::properties::{
436        BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
437        DEFAULT_MAX_ROW_GROUP_ROW_COUNT, EnabledStatistics,
438    };
439    use std::collections::HashMap;
440
441    const COL_NAME: &str = "configured";
442
443    /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
444    fn column_options_with_non_defaults(
445        src_col_defaults: &ParquetOptions,
446    ) -> ParquetColumnOptions {
447        ParquetColumnOptions {
448            compression: Some("zstd(22)".into()),
449            dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
450            statistics_enabled: Some("none".into()),
451            encoding: Some("RLE".into()),
452            bloom_filter_enabled: Some(true),
453            bloom_filter_fpp: Some(0.72),
454            bloom_filter_ndv: Some(72),
455        }
456    }
457
458    fn parquet_options_with_non_defaults() -> ParquetOptions {
459        let defaults = ParquetOptions::default();
460        let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
461        {
462            DFParquetWriterVersion::V2_0
463        } else {
464            DFParquetWriterVersion::V1_0
465        };
466
467        ParquetOptions {
468            data_pagesize_limit: 42,
469            write_batch_size: 42,
470            writer_version,
471            compression: Some("zstd(22)".into()),
472            dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
473            dictionary_page_size_limit: 42,
474            statistics_enabled: Some("chunk".into()),
475            max_row_group_size: 42,
476            created_by: "wordy".into(),
477            column_index_truncate_length: Some(42),
478            statistics_truncate_length: Some(42),
479            data_page_row_count_limit: 42,
480            encoding: Some("BYTE_STREAM_SPLIT".into()),
481            bloom_filter_on_write: !defaults.bloom_filter_on_write,
482            bloom_filter_fpp: Some(0.42),
483            bloom_filter_ndv: Some(42),
484
485            // not in WriterProperties, but itemizing here to not skip newly added props
486            enable_page_index: defaults.enable_page_index,
487            pruning: defaults.pruning,
488            skip_metadata: defaults.skip_metadata,
489            metadata_size_hint: defaults.metadata_size_hint,
490            pushdown_filters: defaults.pushdown_filters,
491            reorder_filters: defaults.reorder_filters,
492            force_filter_selections: defaults.force_filter_selections,
493            allow_single_file_parallelism: defaults.allow_single_file_parallelism,
494            maximum_parallel_row_group_writers: defaults
495                .maximum_parallel_row_group_writers,
496            maximum_buffered_record_batches_per_stream: defaults
497                .maximum_buffered_record_batches_per_stream,
498            bloom_filter_on_read: defaults.bloom_filter_on_read,
499            schema_force_view_types: defaults.schema_force_view_types,
500            binary_as_string: defaults.binary_as_string,
501            skip_arrow_metadata: defaults.skip_arrow_metadata,
502            coerce_int96: None,
503            coerce_int96_tz: None,
504            max_predicate_cache_size: defaults.max_predicate_cache_size,
505            content_defined_chunking: defaults.content_defined_chunking.clone(),
506        }
507    }
508
509    fn extract_column_options(
510        props: &WriterProperties,
511        col: ColumnPath,
512    ) -> ParquetColumnOptions {
513        let bloom_filter_default_props = props.bloom_filter_properties(&col);
514
515        ParquetColumnOptions {
516            bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
517            encoding: props.encoding(&col).map(|s| s.to_string()),
518            dictionary_enabled: Some(props.dictionary_enabled(&col)),
519            compression: match props.compression(&col) {
520                Compression::ZSTD(lvl) => {
521                    Some(format!("zstd({})", lvl.compression_level()))
522                }
523                _ => None,
524            },
525            statistics_enabled: Some(
526                match props.statistics_enabled(&col) {
527                    EnabledStatistics::None => "none",
528                    EnabledStatistics::Chunk => "chunk",
529                    EnabledStatistics::Page => "page",
530                }
531                .into(),
532            ),
533            bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
534            bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
535        }
536    }
537
538    /// For testing only, take a single write's props and convert back into the session config.
539    /// (use identity to confirm correct.)
540    fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
541        let default_col = ColumnPath::from("col doesn't have specific config");
542        let default_col_props = extract_column_options(props, default_col);
543
544        let configured_col = ColumnPath::from(COL_NAME);
545        let configured_col_props = extract_column_options(props, configured_col);
546
547        let key_value_metadata = props
548            .key_value_metadata()
549            .map(|pairs| {
550                HashMap::from_iter(
551                    pairs
552                        .iter()
553                        .cloned()
554                        .map(|KeyValue { key, value }| (key, value)),
555                )
556            })
557            .unwrap_or_default();
558
559        let global_options_defaults = ParquetOptions::default();
560
561        let column_specific_options = if configured_col_props.eq(&default_col_props) {
562            HashMap::default()
563        } else {
564            HashMap::from([(COL_NAME.into(), configured_col_props)])
565        };
566
567        #[cfg(feature = "parquet_encryption")]
568        let fep = props
569            .file_encryption_properties()
570            .map(ConfigFileEncryptionProperties::from);
571
572        #[cfg(not(feature = "parquet_encryption"))]
573        let fep = None;
574
575        TableParquetOptions {
576            global: ParquetOptions {
577                // global options
578                data_pagesize_limit: props.dictionary_page_size_limit(),
579                write_batch_size: props.write_batch_size(),
580                writer_version: props.writer_version().into(),
581                dictionary_page_size_limit: props.dictionary_page_size_limit(),
582                max_row_group_size: props
583                    .max_row_group_row_count()
584                    .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
585                created_by: props.created_by().to_string(),
586                column_index_truncate_length: props.column_index_truncate_length(),
587                statistics_truncate_length: props.statistics_truncate_length(),
588                data_page_row_count_limit: props.data_page_row_count_limit(),
589
590                // global options which set the default column props
591                encoding: default_col_props.encoding,
592                compression: default_col_props.compression,
593                dictionary_enabled: default_col_props.dictionary_enabled,
594                statistics_enabled: default_col_props.statistics_enabled,
595                bloom_filter_on_write: default_col_props
596                    .bloom_filter_enabled
597                    .unwrap_or_default(),
598                bloom_filter_fpp: default_col_props.bloom_filter_fpp,
599                bloom_filter_ndv: default_col_props.bloom_filter_ndv,
600
601                // not in WriterProperties
602                enable_page_index: global_options_defaults.enable_page_index,
603                pruning: global_options_defaults.pruning,
604                skip_metadata: global_options_defaults.skip_metadata,
605                metadata_size_hint: global_options_defaults.metadata_size_hint,
606                pushdown_filters: global_options_defaults.pushdown_filters,
607                reorder_filters: global_options_defaults.reorder_filters,
608                force_filter_selections: global_options_defaults.force_filter_selections,
609                allow_single_file_parallelism: global_options_defaults
610                    .allow_single_file_parallelism,
611                maximum_parallel_row_group_writers: global_options_defaults
612                    .maximum_parallel_row_group_writers,
613                maximum_buffered_record_batches_per_stream: global_options_defaults
614                    .maximum_buffered_record_batches_per_stream,
615                bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
616                max_predicate_cache_size: global_options_defaults
617                    .max_predicate_cache_size,
618                schema_force_view_types: global_options_defaults.schema_force_view_types,
619                binary_as_string: global_options_defaults.binary_as_string,
620                skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
621                coerce_int96: None,
622                coerce_int96_tz: None,
623                content_defined_chunking: props.content_defined_chunking().into(),
624            },
625            column_specific_options,
626            key_value_metadata,
627            crypto: ParquetEncryptionOptions {
628                file_encryption: fep,
629                file_decryption: None,
630                factory_id: None,
631                factory_options: Default::default(),
632            },
633        }
634    }
635
636    #[test]
637    fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
638        // TableParquetOptions, all props set to default
639        let mut table_parquet_opts = TableParquetOptions::default();
640        assert!(
641            !table_parquet_opts.global.skip_arrow_metadata,
642            "default false, to not skip the arrow schema requirement"
643        );
644
645        // see errors without the schema added, using default settings
646        let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
647        assert!(
648            should_error.is_err(),
649            "should error without the required arrow schema in kv_metadata",
650        );
651
652        // succeeds if we permit skipping the arrow schema
653        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
654        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
655        assert!(
656            should_succeed.is_ok(),
657            "should work with the arrow schema skipped by config",
658        );
659
660        // Set the arrow schema back to required
661        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
662        // add the arrow schema to the kv_meta
663        table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
664        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
665        assert!(
666            should_succeed.is_ok(),
667            "should work with the arrow schema included in TableParquetOptions",
668        );
669    }
670
671    #[test]
672    fn table_parquet_opts_to_writer_props() {
673        // ParquetOptions, all props set to non-default
674        let parquet_options = parquet_options_with_non_defaults();
675
676        // TableParquetOptions, using ParquetOptions for global settings
677        let key = ARROW_SCHEMA_META_KEY.to_string();
678        let value = Some("bar".into());
679        let table_parquet_opts = TableParquetOptions {
680            global: parquet_options.clone(),
681            column_specific_options: [(
682                COL_NAME.into(),
683                column_options_with_non_defaults(&parquet_options),
684            )]
685            .into(),
686            key_value_metadata: [(key, value)].into(),
687            crypto: Default::default(),
688        };
689
690        let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
691            .unwrap()
692            .build();
693        assert_eq!(
694            table_parquet_opts,
695            session_config_from_writer_props(&writer_props),
696            "the writer_props should have the same configuration as the session's TableParquetOptions",
697        );
698    }
699
700    /// Ensure that the configuration defaults for writing parquet files are
701    /// consistent with the options in arrow-rs
702    #[test]
703    fn test_defaults_match() {
704        // ensure the global settings are the same
705        let mut default_table_writer_opts = TableParquetOptions::default();
706        let default_parquet_opts = ParquetOptions::default();
707        assert_eq!(
708            default_table_writer_opts.global, default_parquet_opts,
709            "should have matching defaults for TableParquetOptions.global and ParquetOptions",
710        );
711
712        // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
713        default_table_writer_opts =
714            default_table_writer_opts.with_skip_arrow_metadata(true);
715
716        // WriterProperties::default, a.k.a. using extern parquet's defaults
717        let default_writer_props = WriterProperties::new();
718
719        // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
720        let from_datafusion_defaults =
721            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
722                .unwrap()
723                .build();
724
725        // Expected: how the defaults should not match
726        assert_ne!(
727            default_writer_props.created_by(),
728            from_datafusion_defaults.created_by(),
729            "should have different created_by sources",
730        );
731        assert!(
732            default_writer_props
733                .created_by()
734                .starts_with("parquet-rs version"),
735            "should indicate that writer_props defaults came from the extern parquet crate",
736        );
737        assert!(
738            default_table_writer_opts
739                .global
740                .created_by
741                .starts_with("datafusion version"),
742            "should indicate that table_parquet_opts defaults came from datafusion",
743        );
744
745        // Expected: the datafusion default compression is different from arrow-rs's parquet
746        assert_eq!(
747            default_writer_props.compression(&"default".into()),
748            Compression::UNCOMPRESSED,
749            "extern parquet's default is None"
750        );
751        assert!(
752            matches!(
753                from_datafusion_defaults.compression(&"default".into()),
754                Compression::ZSTD(_)
755            ),
756            "datafusion's default is zstd"
757        );
758
759        // Expected: the remaining should match
760        let same_created_by = default_table_writer_opts.global.created_by.clone();
761        let mut from_extern_parquet =
762            session_config_from_writer_props(&default_writer_props);
763        from_extern_parquet.global.created_by = same_created_by;
764        from_extern_parquet.global.compression = Some("zstd(3)".into());
765        from_extern_parquet.global.skip_arrow_metadata = true;
766
767        assert_eq!(
768            default_table_writer_opts, from_extern_parquet,
769            "the default writer_props should have the same configuration as the session's default TableParquetOptions",
770        );
771    }
772
773    #[test]
774    fn test_bloom_filter_defaults() {
775        // the TableParquetOptions::default, with only the bloom filter turned on
776        let mut default_table_writer_opts = TableParquetOptions::default();
777        default_table_writer_opts.global.bloom_filter_on_write = true;
778        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
779        let from_datafusion_defaults =
780            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
781                .unwrap()
782                .build();
783
784        // the WriterProperties::default, with only the bloom filter turned on
785        let default_writer_props = WriterProperties::builder()
786            .set_bloom_filter_enabled(true)
787            .build();
788
789        assert_eq!(
790            default_writer_props.bloom_filter_properties(&"default".into()),
791            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
792            "parquet and datafusion props, should have the same bloom filter props",
793        );
794        assert_eq!(
795            default_writer_props.bloom_filter_properties(&"default".into()),
796            Some(&BloomFilterProperties::default()),
797            "should use the default bloom filter props"
798        );
799    }
800
801    #[test]
802    fn test_bloom_filter_set_fpp_only() {
803        // the TableParquetOptions::default, with only fpp set
804        let mut default_table_writer_opts = TableParquetOptions::default();
805        default_table_writer_opts.global.bloom_filter_on_write = true;
806        default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
807        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
808        let from_datafusion_defaults =
809            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
810                .unwrap()
811                .build();
812
813        // the WriterProperties::default, with only fpp set
814        let default_writer_props = WriterProperties::builder()
815            .set_bloom_filter_enabled(true)
816            .set_bloom_filter_fpp(0.42)
817            .build();
818
819        assert_eq!(
820            default_writer_props.bloom_filter_properties(&"default".into()),
821            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
822            "parquet and datafusion props, should have the same bloom filter props",
823        );
824        assert_eq!(
825            default_writer_props.bloom_filter_properties(&"default".into()),
826            Some(&BloomFilterProperties {
827                fpp: 0.42,
828                ndv: DEFAULT_BLOOM_FILTER_NDV
829            }),
830            "should have only the fpp set, and the ndv at default",
831        );
832    }
833
834    #[test]
835    fn test_cdc_enabled_with_custom_options() {
836        let mut opts = TableParquetOptions::default();
837        opts.global.content_defined_chunking = ParquetCdcOptions {
838            enabled: true,
839            min_chunk_size: 128 * 1024,
840            max_chunk_size: 512 * 1024,
841            norm_level: 2,
842        };
843        opts.arrow_schema(&Arc::new(Schema::empty()));
844
845        let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
846        let cdc = props.content_defined_chunking().expect("CDC should be set");
847        assert_eq!(cdc.min_chunk_size, 128 * 1024);
848        assert_eq!(cdc.max_chunk_size, 512 * 1024);
849        assert_eq!(cdc.norm_level, 2);
850    }
851
852    #[test]
853    fn test_cdc_disabled_by_default() {
854        let mut opts = TableParquetOptions::default();
855        opts.arrow_schema(&Arc::new(Schema::empty()));
856
857        let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
858        assert!(props.content_defined_chunking().is_none());
859    }
860
861    #[test]
862    fn test_cdc_params_ignored_when_disabled() {
863        // Parameters are customized but `enabled` is false, so CDC stays off.
864        let mut opts = TableParquetOptions::default();
865        opts.global.content_defined_chunking = ParquetCdcOptions {
866            enabled: false,
867            min_chunk_size: 128 * 1024,
868            max_chunk_size: 512 * 1024,
869            norm_level: 2,
870        };
871        opts.arrow_schema(&Arc::new(Schema::empty()));
872
873        let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
874        assert!(props.content_defined_chunking().is_none());
875    }
876
877    #[test]
878    fn test_cdc_round_trip_through_writer_props() {
879        let mut opts = TableParquetOptions::default();
880        opts.global.content_defined_chunking = ParquetCdcOptions {
881            enabled: true,
882            min_chunk_size: 64 * 1024,
883            max_chunk_size: 2 * 1024 * 1024,
884            norm_level: -1,
885        };
886        opts.arrow_schema(&Arc::new(Schema::empty()));
887
888        let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
889        let recovered = session_config_from_writer_props(&props);
890
891        let cdc = recovered.global.content_defined_chunking;
892        assert!(cdc.enabled);
893        assert_eq!(cdc.min_chunk_size, 64 * 1024);
894        assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
895        assert_eq!(cdc.norm_level, -1);
896    }
897
898    #[test]
899    fn test_bloom_filter_set_ndv_only() {
900        // the TableParquetOptions::default, with only ndv set
901        let mut default_table_writer_opts = TableParquetOptions::default();
902        default_table_writer_opts.global.bloom_filter_on_write = true;
903        default_table_writer_opts.global.bloom_filter_ndv = Some(42);
904        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
905        let from_datafusion_defaults =
906            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
907                .unwrap()
908                .build();
909
910        // the WriterProperties::default, with only ndv set
911        let default_writer_props = WriterProperties::builder()
912            .set_bloom_filter_enabled(true)
913            .set_bloom_filter_ndv(42)
914            .build();
915
916        assert_eq!(
917            default_writer_props.bloom_filter_properties(&"default".into()),
918            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
919            "parquet and datafusion props, should have the same bloom filter props",
920        );
921        assert_eq!(
922            default_writer_props.bloom_filter_properties(&"default".into()),
923            Some(&BloomFilterProperties {
924                fpp: DEFAULT_BLOOM_FILTER_FPP,
925                ndv: 42
926            }),
927            "should have only the ndv set, and the fpp at default",
928        );
929    }
930}