datafusion_common/file_options/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how parquet files should be written
19
20use base64::Engine;
21use std::sync::Arc;
22
23use crate::{
24    config::{ParquetOptions, TableParquetOptions},
25    DataFusionError, Result, _internal_datafusion_err,
26};
27
28use arrow::datatypes::Schema;
29// TODO: handle once deprecated
30#[allow(deprecated)]
31use parquet::{
32    arrow::ARROW_SCHEMA_META_KEY,
33    basic::{BrotliLevel, GzipLevel, ZstdLevel},
34    file::{
35        metadata::KeyValue,
36        properties::{
37            EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
38            DEFAULT_STATISTICS_ENABLED,
39        },
40    },
41    schema::types::ColumnPath,
42};
43
44/// Options for writing parquet files
45#[derive(Clone, Debug)]
46pub struct ParquetWriterOptions {
47    /// parquet-rs writer properties
48    pub writer_options: WriterProperties,
49}
50
51impl ParquetWriterOptions {
52    pub fn new(writer_options: WriterProperties) -> Self {
53        Self { writer_options }
54    }
55}
56
57impl ParquetWriterOptions {
58    pub fn writer_options(&self) -> &WriterProperties {
59        &self.writer_options
60    }
61}
62
63impl TableParquetOptions {
64    /// Add the arrow schema to the parquet kv_metadata.
65    /// If already exists, then overwrites.
66    pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
67        self.key_value_metadata.insert(
68            ARROW_SCHEMA_META_KEY.into(),
69            Some(encode_arrow_schema(schema)),
70        );
71    }
72}
73
74impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
75    type Error = DataFusionError;
76
77    fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
78        // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
79        Ok(ParquetWriterOptions {
80            writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
81                .build(),
82        })
83    }
84}
85
86impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
87    type Error = DataFusionError;
88
89    /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
90    ///
91    /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
92    /// Note that any encryption options are ignored as building the `FileEncryptionProperties`
93    /// might require other inputs besides the [`TableParquetOptions`].
94    fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
95        // Table options include kv_metadata and col-specific options
96        let TableParquetOptions {
97            global,
98            column_specific_options,
99            key_value_metadata,
100            crypto: _,
101        } = table_parquet_options;
102
103        let mut builder = global.into_writer_properties_builder()?;
104
105        // check that the arrow schema is present in the kv_metadata, if configured to do so
106        if !global.skip_arrow_metadata
107            && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
108        {
109            return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
110        }
111
112        // add kv_meta, if any
113        if !key_value_metadata.is_empty() {
114            builder = builder.set_key_value_metadata(Some(
115                key_value_metadata
116                    .to_owned()
117                    .drain()
118                    .map(|(key, value)| KeyValue { key, value })
119                    .collect(),
120            ));
121        }
122
123        // Apply column-specific options:
124        for (column, options) in column_specific_options {
125            let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
126
127            if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
128                builder = builder
129                    .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
130            }
131
132            if let Some(encoding) = &options.encoding {
133                let parsed_encoding = parse_encoding_string(encoding)?;
134                builder = builder.set_column_encoding(path.clone(), parsed_encoding);
135            }
136
137            if let Some(dictionary_enabled) = options.dictionary_enabled {
138                builder = builder
139                    .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
140            }
141
142            if let Some(compression) = &options.compression {
143                let parsed_compression = parse_compression_string(compression)?;
144                builder =
145                    builder.set_column_compression(path.clone(), parsed_compression);
146            }
147
148            if let Some(statistics_enabled) = &options.statistics_enabled {
149                let parsed_value = parse_statistics_string(statistics_enabled)?;
150                builder =
151                    builder.set_column_statistics_enabled(path.clone(), parsed_value);
152            }
153
154            if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
155                builder =
156                    builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
157            }
158
159            if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160                builder =
161                    builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
162            }
163        }
164
165        Ok(builder)
166    }
167}
168
169/// Encodes the Arrow schema into the IPC format, and base64 encodes it
170///
171/// TODO: use extern parquet's private method, once publicly available.
172/// Refer to <https://github.com/apache/arrow-rs/pull/6916>
173fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
174    let options = arrow_ipc::writer::IpcWriteOptions::default();
175    let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
176    let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
177    let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
178        schema,
179        &mut dictionary_tracker,
180        &options,
181    );
182
183    // manually prepending the length to the schema as arrow uses the legacy IPC format
184    // TODO: change after addressing ARROW-9777
185    let schema_len = serialized_schema.ipc_message.len();
186    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
187    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
188    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
189    len_prefix_schema.append(&mut serialized_schema.ipc_message);
190
191    base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
192}
193
194impl ParquetOptions {
195    /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
196    ///
197    /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
198    /// applied per column; a customization which is not applicable for [`ParquetOptions`].
199    ///
200    /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
201    pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
202        #[allow(deprecated)]
203        let ParquetOptions {
204            data_pagesize_limit,
205            write_batch_size,
206            writer_version,
207            compression,
208            dictionary_enabled,
209            dictionary_page_size_limit,
210            statistics_enabled,
211            max_row_group_size,
212            created_by,
213            column_index_truncate_length,
214            statistics_truncate_length,
215            data_page_row_count_limit,
216            encoding,
217            bloom_filter_on_write,
218            bloom_filter_fpp,
219            bloom_filter_ndv,
220
221            // not in WriterProperties
222            enable_page_index: _,
223            pruning: _,
224            skip_metadata: _,
225            metadata_size_hint: _,
226            pushdown_filters: _,
227            reorder_filters: _,
228            allow_single_file_parallelism: _,
229            maximum_parallel_row_group_writers: _,
230            maximum_buffered_record_batches_per_stream: _,
231            bloom_filter_on_read: _, // reads not used for writer props
232            schema_force_view_types: _,
233            binary_as_string: _, // not used for writer props
234            coerce_int96: _,     // not used for writer props
235            skip_arrow_metadata: _,
236        } = self;
237
238        let mut builder = WriterProperties::builder()
239            .set_data_page_size_limit(*data_pagesize_limit)
240            .set_write_batch_size(*write_batch_size)
241            .set_writer_version(parse_version_string(writer_version.as_str())?)
242            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
243            .set_statistics_enabled(
244                statistics_enabled
245                    .as_ref()
246                    .and_then(|s| parse_statistics_string(s).ok())
247                    .unwrap_or(DEFAULT_STATISTICS_ENABLED),
248            )
249            .set_max_row_group_size(*max_row_group_size)
250            .set_created_by(created_by.clone())
251            .set_column_index_truncate_length(*column_index_truncate_length)
252            .set_statistics_truncate_length(*statistics_truncate_length)
253            .set_data_page_row_count_limit(*data_page_row_count_limit)
254            .set_bloom_filter_enabled(*bloom_filter_on_write);
255
256        if let Some(bloom_filter_fpp) = bloom_filter_fpp {
257            builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
258        };
259        if let Some(bloom_filter_ndv) = bloom_filter_ndv {
260            builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
261        };
262        if let Some(dictionary_enabled) = dictionary_enabled {
263            builder = builder.set_dictionary_enabled(*dictionary_enabled);
264        };
265
266        // We do not have access to default ColumnProperties set in Arrow.
267        // Therefore, only overwrite if these settings exist.
268        if let Some(compression) = compression {
269            builder = builder.set_compression(parse_compression_string(compression)?);
270        }
271        if let Some(encoding) = encoding {
272            builder = builder.set_encoding(parse_encoding_string(encoding)?);
273        }
274
275        Ok(builder)
276    }
277}
278
279/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
280pub(crate) fn parse_encoding_string(
281    str_setting: &str,
282) -> Result<parquet::basic::Encoding> {
283    let str_setting_lower: &str = &str_setting.to_lowercase();
284    match str_setting_lower {
285        "plain" => Ok(parquet::basic::Encoding::PLAIN),
286        "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
287        "rle" => Ok(parquet::basic::Encoding::RLE),
288        #[allow(deprecated)]
289        "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
290        "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
291        "delta_length_byte_array" => {
292            Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
293        }
294        "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
295        "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
296        "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
297        _ => Err(DataFusionError::Configuration(format!(
298            "Unknown or unsupported parquet encoding: \
299        {str_setting}. Valid values are: plain, plain_dictionary, rle, \
300        bit_packed, delta_binary_packed, delta_length_byte_array, \
301        delta_byte_array, rle_dictionary, and byte_stream_split."
302        ))),
303    }
304}
305
306/// Splits compression string into compression codec and optional compression_level
307/// I.e. gzip(2) -> gzip, 2
308fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
309    // ignore string literal chars passed from sqlparser i.e. remove single quotes
310    let str_setting = str_setting.replace('\'', "");
311    let split_setting = str_setting.split_once('(');
312
313    match split_setting {
314        Some((codec, rh)) => {
315            let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
316                DataFusionError::Configuration(format!(
317                    "Could not parse compression string. \
318                    Got codec: {codec} and unknown level from {str_setting}"
319                ))
320            })?;
321            Ok((codec.to_owned(), Some(*level)))
322        }
323        None => Ok((str_setting.to_owned(), None)),
324    }
325}
326
327/// Helper to ensure compression codecs which don't support levels
328/// don't have one set. E.g. snappy(2) is invalid.
329fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
330    if level.is_some() {
331        return Err(DataFusionError::Configuration(format!(
332            "Compression {codec} does not support specifying a level"
333        )));
334    }
335    Ok(())
336}
337
338/// Helper to ensure compression codecs which require a level
339/// do have one set. E.g. zstd is invalid, zstd(3) is valid
340fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
341    level.ok_or(DataFusionError::Configuration(format!(
342        "{codec} compression requires specifying a level such as {codec}(4)"
343    )))
344}
345
346/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
347pub fn parse_compression_string(
348    str_setting: &str,
349) -> Result<parquet::basic::Compression> {
350    let str_setting_lower: &str = &str_setting.to_lowercase();
351    let (codec, level) = split_compression_string(str_setting_lower)?;
352    let codec = codec.as_str();
353    match codec {
354        "uncompressed" => {
355            check_level_is_none(codec, &level)?;
356            Ok(parquet::basic::Compression::UNCOMPRESSED)
357        }
358        "snappy" => {
359            check_level_is_none(codec, &level)?;
360            Ok(parquet::basic::Compression::SNAPPY)
361        }
362        "gzip" => {
363            let level = require_level(codec, level)?;
364            Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
365                level,
366            )?))
367        }
368        "lzo" => {
369            check_level_is_none(codec, &level)?;
370            Ok(parquet::basic::Compression::LZO)
371        }
372        "brotli" => {
373            let level = require_level(codec, level)?;
374            Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
375                level,
376            )?))
377        }
378        "lz4" => {
379            check_level_is_none(codec, &level)?;
380            Ok(parquet::basic::Compression::LZ4)
381        }
382        "zstd" => {
383            let level = require_level(codec, level)?;
384            Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
385                level as i32,
386            )?))
387        }
388        "lz4_raw" => {
389            check_level_is_none(codec, &level)?;
390            Ok(parquet::basic::Compression::LZ4_RAW)
391        }
392        _ => Err(DataFusionError::Configuration(format!(
393            "Unknown or unsupported parquet compression: \
394        {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
395        lzo, brotli(level), lz4, zstd(level), and lz4_raw."
396        ))),
397    }
398}
399
400pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
401    let str_setting_lower: &str = &str_setting.to_lowercase();
402    match str_setting_lower {
403        "1.0" => Ok(WriterVersion::PARQUET_1_0),
404        "2.0" => Ok(WriterVersion::PARQUET_2_0),
405        _ => Err(DataFusionError::Configuration(format!(
406            "Unknown or unsupported parquet writer version {str_setting} \
407            valid options are 1.0 and 2.0"
408        ))),
409    }
410}
411
412pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
413    let str_setting_lower: &str = &str_setting.to_lowercase();
414    match str_setting_lower {
415        "none" => Ok(EnabledStatistics::None),
416        "chunk" => Ok(EnabledStatistics::Chunk),
417        "page" => Ok(EnabledStatistics::Page),
418        _ => Err(DataFusionError::Configuration(format!(
419            "Unknown or unsupported parquet statistics setting {str_setting} \
420            valid options are none, page, and chunk"
421        ))),
422    }
423}
424
425#[cfg(feature = "parquet")]
426#[cfg(test)]
427mod tests {
428    use parquet::{
429        basic::Compression,
430        file::properties::{
431            BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
432            DEFAULT_BLOOM_FILTER_NDV,
433        },
434    };
435    use std::collections::HashMap;
436
437    use super::*;
438    use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
439    #[cfg(feature = "parquet_encryption")]
440    use crate::encryption::map_encryption_to_config_encryption;
441
442    const COL_NAME: &str = "configured";
443
444    /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
445    fn column_options_with_non_defaults(
446        src_col_defaults: &ParquetOptions,
447    ) -> ParquetColumnOptions {
448        ParquetColumnOptions {
449            compression: Some("zstd(22)".into()),
450            dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
451            statistics_enabled: Some("none".into()),
452            encoding: Some("RLE".into()),
453            bloom_filter_enabled: Some(true),
454            bloom_filter_fpp: Some(0.72),
455            bloom_filter_ndv: Some(72),
456        }
457    }
458
459    fn parquet_options_with_non_defaults() -> ParquetOptions {
460        let defaults = ParquetOptions::default();
461        let writer_version = if defaults.writer_version.eq("1.0") {
462            "2.0"
463        } else {
464            "1.0"
465        };
466
467        #[allow(deprecated)] // max_statistics_size
468        ParquetOptions {
469            data_pagesize_limit: 42,
470            write_batch_size: 42,
471            writer_version: writer_version.into(),
472            compression: Some("zstd(22)".into()),
473            dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
474            dictionary_page_size_limit: 42,
475            statistics_enabled: Some("chunk".into()),
476            max_row_group_size: 42,
477            created_by: "wordy".into(),
478            column_index_truncate_length: Some(42),
479            statistics_truncate_length: Some(42),
480            data_page_row_count_limit: 42,
481            encoding: Some("BYTE_STREAM_SPLIT".into()),
482            bloom_filter_on_write: !defaults.bloom_filter_on_write,
483            bloom_filter_fpp: Some(0.42),
484            bloom_filter_ndv: Some(42),
485
486            // not in WriterProperties, but itemizing here to not skip newly added props
487            enable_page_index: defaults.enable_page_index,
488            pruning: defaults.pruning,
489            skip_metadata: defaults.skip_metadata,
490            metadata_size_hint: defaults.metadata_size_hint,
491            pushdown_filters: defaults.pushdown_filters,
492            reorder_filters: defaults.reorder_filters,
493            allow_single_file_parallelism: defaults.allow_single_file_parallelism,
494            maximum_parallel_row_group_writers: defaults
495                .maximum_parallel_row_group_writers,
496            maximum_buffered_record_batches_per_stream: defaults
497                .maximum_buffered_record_batches_per_stream,
498            bloom_filter_on_read: defaults.bloom_filter_on_read,
499            schema_force_view_types: defaults.schema_force_view_types,
500            binary_as_string: defaults.binary_as_string,
501            skip_arrow_metadata: defaults.skip_arrow_metadata,
502            coerce_int96: None,
503        }
504    }
505
506    fn extract_column_options(
507        props: &WriterProperties,
508        col: ColumnPath,
509    ) -> ParquetColumnOptions {
510        let bloom_filter_default_props = props.bloom_filter_properties(&col);
511
512        #[allow(deprecated)] // max_statistics_size
513        ParquetColumnOptions {
514            bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
515            encoding: props.encoding(&col).map(|s| s.to_string()),
516            dictionary_enabled: Some(props.dictionary_enabled(&col)),
517            compression: match props.compression(&col) {
518                Compression::ZSTD(lvl) => {
519                    Some(format!("zstd({})", lvl.compression_level()))
520                }
521                _ => None,
522            },
523            statistics_enabled: Some(
524                match props.statistics_enabled(&col) {
525                    EnabledStatistics::None => "none",
526                    EnabledStatistics::Chunk => "chunk",
527                    EnabledStatistics::Page => "page",
528                }
529                .into(),
530            ),
531            bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
532            bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
533        }
534    }
535
536    /// For testing only, take a single write's props and convert back into the session config.
537    /// (use identity to confirm correct.)
538    fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
539        let default_col = ColumnPath::from("col doesn't have specific config");
540        let default_col_props = extract_column_options(props, default_col);
541
542        let configured_col = ColumnPath::from(COL_NAME);
543        let configured_col_props = extract_column_options(props, configured_col);
544
545        let key_value_metadata = props
546            .key_value_metadata()
547            .map(|pairs| {
548                HashMap::from_iter(
549                    pairs
550                        .iter()
551                        .cloned()
552                        .map(|KeyValue { key, value }| (key, value)),
553                )
554            })
555            .unwrap_or_default();
556
557        let global_options_defaults = ParquetOptions::default();
558
559        let column_specific_options = if configured_col_props.eq(&default_col_props) {
560            HashMap::default()
561        } else {
562            HashMap::from([(COL_NAME.into(), configured_col_props)])
563        };
564
565        #[cfg(feature = "parquet_encryption")]
566        let fep = map_encryption_to_config_encryption(props.file_encryption_properties());
567        #[cfg(not(feature = "parquet_encryption"))]
568        let fep = None;
569
570        #[allow(deprecated)] // max_statistics_size
571        TableParquetOptions {
572            global: ParquetOptions {
573                // global options
574                data_pagesize_limit: props.dictionary_page_size_limit(),
575                write_batch_size: props.write_batch_size(),
576                writer_version: format!("{}.0", props.writer_version().as_num()),
577                dictionary_page_size_limit: props.dictionary_page_size_limit(),
578                max_row_group_size: props.max_row_group_size(),
579                created_by: props.created_by().to_string(),
580                column_index_truncate_length: props.column_index_truncate_length(),
581                statistics_truncate_length: props.statistics_truncate_length(),
582                data_page_row_count_limit: props.data_page_row_count_limit(),
583
584                // global options which set the default column props
585                encoding: default_col_props.encoding,
586                compression: default_col_props.compression,
587                dictionary_enabled: default_col_props.dictionary_enabled,
588                statistics_enabled: default_col_props.statistics_enabled,
589                bloom_filter_on_write: default_col_props
590                    .bloom_filter_enabled
591                    .unwrap_or_default(),
592                bloom_filter_fpp: default_col_props.bloom_filter_fpp,
593                bloom_filter_ndv: default_col_props.bloom_filter_ndv,
594
595                // not in WriterProperties
596                enable_page_index: global_options_defaults.enable_page_index,
597                pruning: global_options_defaults.pruning,
598                skip_metadata: global_options_defaults.skip_metadata,
599                metadata_size_hint: global_options_defaults.metadata_size_hint,
600                pushdown_filters: global_options_defaults.pushdown_filters,
601                reorder_filters: global_options_defaults.reorder_filters,
602                allow_single_file_parallelism: global_options_defaults
603                    .allow_single_file_parallelism,
604                maximum_parallel_row_group_writers: global_options_defaults
605                    .maximum_parallel_row_group_writers,
606                maximum_buffered_record_batches_per_stream: global_options_defaults
607                    .maximum_buffered_record_batches_per_stream,
608                bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
609                schema_force_view_types: global_options_defaults.schema_force_view_types,
610                binary_as_string: global_options_defaults.binary_as_string,
611                skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
612                coerce_int96: None,
613            },
614            column_specific_options,
615            key_value_metadata,
616            crypto: ParquetEncryptionOptions {
617                file_encryption: fep,
618                file_decryption: None,
619                factory_id: None,
620                factory_options: Default::default(),
621            },
622        }
623    }
624
625    #[test]
626    fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
627        // TableParquetOptions, all props set to default
628        let mut table_parquet_opts = TableParquetOptions::default();
629        assert!(
630            !table_parquet_opts.global.skip_arrow_metadata,
631            "default false, to not skip the arrow schema requirement"
632        );
633
634        // see errors without the schema added, using default settings
635        let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
636        assert!(
637            should_error.is_err(),
638            "should error without the required arrow schema in kv_metadata",
639        );
640
641        // succeeds if we permit skipping the arrow schema
642        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
643        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
644        assert!(
645            should_succeed.is_ok(),
646            "should work with the arrow schema skipped by config",
647        );
648
649        // Set the arrow schema back to required
650        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
651        // add the arrow schema to the kv_meta
652        table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
653        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
654        assert!(
655            should_succeed.is_ok(),
656            "should work with the arrow schema included in TableParquetOptions",
657        );
658    }
659
660    #[test]
661    fn table_parquet_opts_to_writer_props() {
662        // ParquetOptions, all props set to non-default
663        let parquet_options = parquet_options_with_non_defaults();
664
665        // TableParquetOptions, using ParquetOptions for global settings
666        let key = ARROW_SCHEMA_META_KEY.to_string();
667        let value = Some("bar".into());
668        let table_parquet_opts = TableParquetOptions {
669            global: parquet_options.clone(),
670            column_specific_options: [(
671                COL_NAME.into(),
672                column_options_with_non_defaults(&parquet_options),
673            )]
674            .into(),
675            key_value_metadata: [(key, value)].into(),
676            crypto: Default::default(),
677        };
678
679        let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
680            .unwrap()
681            .build();
682        assert_eq!(
683            table_parquet_opts,
684            session_config_from_writer_props(&writer_props),
685            "the writer_props should have the same configuration as the session's TableParquetOptions",
686        );
687    }
688
689    /// Ensure that the configuration defaults for writing parquet files are
690    /// consistent with the options in arrow-rs
691    #[test]
692    fn test_defaults_match() {
693        // ensure the global settings are the same
694        let mut default_table_writer_opts = TableParquetOptions::default();
695        let default_parquet_opts = ParquetOptions::default();
696        assert_eq!(
697            default_table_writer_opts.global,
698            default_parquet_opts,
699            "should have matching defaults for TableParquetOptions.global and ParquetOptions",
700        );
701
702        // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
703        default_table_writer_opts =
704            default_table_writer_opts.with_skip_arrow_metadata(true);
705
706        // WriterProperties::default, a.k.a. using extern parquet's defaults
707        let default_writer_props = WriterProperties::new();
708
709        // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
710        let from_datafusion_defaults =
711            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
712                .unwrap()
713                .build();
714
715        // Expected: how the defaults should not match
716        assert_ne!(
717            default_writer_props.created_by(),
718            from_datafusion_defaults.created_by(),
719            "should have different created_by sources",
720        );
721        assert!(
722            default_writer_props.created_by().starts_with("parquet-rs version"),
723            "should indicate that writer_props defaults came from the extern parquet crate",
724        );
725        assert!(
726            default_table_writer_opts
727                .global
728                .created_by
729                .starts_with("datafusion version"),
730            "should indicate that table_parquet_opts defaults came from datafusion",
731        );
732
733        // Expected: the datafusion default compression is different from arrow-rs's parquet
734        assert_eq!(
735            default_writer_props.compression(&"default".into()),
736            Compression::UNCOMPRESSED,
737            "extern parquet's default is None"
738        );
739        assert!(
740            matches!(
741                from_datafusion_defaults.compression(&"default".into()),
742                Compression::ZSTD(_)
743            ),
744            "datafusion's default is zstd"
745        );
746
747        // Expected: the remaining should match
748        let same_created_by = default_table_writer_opts.global.created_by.clone();
749        let mut from_extern_parquet =
750            session_config_from_writer_props(&default_writer_props);
751        from_extern_parquet.global.created_by = same_created_by;
752        from_extern_parquet.global.compression = Some("zstd(3)".into());
753        from_extern_parquet.global.skip_arrow_metadata = true;
754
755        assert_eq!(
756            default_table_writer_opts,
757            from_extern_parquet,
758            "the default writer_props should have the same configuration as the session's default TableParquetOptions",
759        );
760    }
761
762    #[test]
763    fn test_bloom_filter_defaults() {
764        // the TableParquetOptions::default, with only the bloom filter turned on
765        let mut default_table_writer_opts = TableParquetOptions::default();
766        default_table_writer_opts.global.bloom_filter_on_write = true;
767        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
768        let from_datafusion_defaults =
769            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
770                .unwrap()
771                .build();
772
773        // the WriterProperties::default, with only the bloom filter turned on
774        let default_writer_props = WriterProperties::builder()
775            .set_bloom_filter_enabled(true)
776            .build();
777
778        assert_eq!(
779            default_writer_props.bloom_filter_properties(&"default".into()),
780            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
781            "parquet and datafusion props, should have the same bloom filter props",
782        );
783        assert_eq!(
784            default_writer_props.bloom_filter_properties(&"default".into()),
785            Some(&BloomFilterProperties::default()),
786            "should use the default bloom filter props"
787        );
788    }
789
790    #[test]
791    fn test_bloom_filter_set_fpp_only() {
792        // the TableParquetOptions::default, with only fpp set
793        let mut default_table_writer_opts = TableParquetOptions::default();
794        default_table_writer_opts.global.bloom_filter_on_write = true;
795        default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
796        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
797        let from_datafusion_defaults =
798            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
799                .unwrap()
800                .build();
801
802        // the WriterProperties::default, with only fpp set
803        let default_writer_props = WriterProperties::builder()
804            .set_bloom_filter_enabled(true)
805            .set_bloom_filter_fpp(0.42)
806            .build();
807
808        assert_eq!(
809            default_writer_props.bloom_filter_properties(&"default".into()),
810            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
811            "parquet and datafusion props, should have the same bloom filter props",
812        );
813        assert_eq!(
814            default_writer_props.bloom_filter_properties(&"default".into()),
815            Some(&BloomFilterProperties {
816                fpp: 0.42,
817                ndv: DEFAULT_BLOOM_FILTER_NDV
818            }),
819            "should have only the fpp set, and the ndv at default",
820        );
821    }
822
823    #[test]
824    fn test_bloom_filter_set_ndv_only() {
825        // the TableParquetOptions::default, with only ndv set
826        let mut default_table_writer_opts = TableParquetOptions::default();
827        default_table_writer_opts.global.bloom_filter_on_write = true;
828        default_table_writer_opts.global.bloom_filter_ndv = Some(42);
829        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
830        let from_datafusion_defaults =
831            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
832                .unwrap()
833                .build();
834
835        // the WriterProperties::default, with only ndv set
836        let default_writer_props = WriterProperties::builder()
837            .set_bloom_filter_enabled(true)
838            .set_bloom_filter_ndv(42)
839            .build();
840
841        assert_eq!(
842            default_writer_props.bloom_filter_properties(&"default".into()),
843            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
844            "parquet and datafusion props, should have the same bloom filter props",
845        );
846        assert_eq!(
847            default_writer_props.bloom_filter_properties(&"default".into()),
848            Some(&BloomFilterProperties {
849                fpp: DEFAULT_BLOOM_FILTER_FPP,
850                ndv: 42
851            }),
852            "should have only the ndv set, and the fpp at default",
853        );
854    }
855}