datafusion_common/file_options/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how parquet files should be written
19
20use base64::Engine;
21use std::sync::Arc;
22
23use crate::{
24    config::{ParquetOptions, TableParquetOptions},
25    DataFusionError, Result, _internal_datafusion_err,
26};
27
28use arrow::datatypes::Schema;
29// TODO: handle once deprecated
30use crate::encryption::add_crypto_to_writer_properties;
31#[allow(deprecated)]
32use parquet::{
33    arrow::ARROW_SCHEMA_META_KEY,
34    basic::{BrotliLevel, GzipLevel, ZstdLevel},
35    file::{
36        metadata::KeyValue,
37        properties::{
38            EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
39            DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
40        },
41    },
42    schema::types::ColumnPath,
43};
44
45/// Options for writing parquet files
46#[derive(Clone, Debug)]
47pub struct ParquetWriterOptions {
48    /// parquet-rs writer properties
49    pub writer_options: WriterProperties,
50}
51
52impl ParquetWriterOptions {
53    pub fn new(writer_options: WriterProperties) -> Self {
54        Self { writer_options }
55    }
56}
57
58impl ParquetWriterOptions {
59    pub fn writer_options(&self) -> &WriterProperties {
60        &self.writer_options
61    }
62}
63
64impl TableParquetOptions {
65    /// Add the arrow schema to the parquet kv_metadata.
66    /// If already exists, then overwrites.
67    pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
68        self.key_value_metadata.insert(
69            ARROW_SCHEMA_META_KEY.into(),
70            Some(encode_arrow_schema(schema)),
71        );
72    }
73}
74
75impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
76    type Error = DataFusionError;
77
78    fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
79        // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
80        Ok(ParquetWriterOptions {
81            writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
82                .build(),
83        })
84    }
85}
86
87impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
88    type Error = DataFusionError;
89
90    /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
91    ///
92    /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
93    fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
94        // Table options include kv_metadata and col-specific options
95        let TableParquetOptions {
96            global,
97            column_specific_options,
98            key_value_metadata,
99            crypto,
100        } = table_parquet_options;
101
102        let mut builder = global.into_writer_properties_builder()?;
103
104        builder = add_crypto_to_writer_properties(crypto, builder);
105
106        // check that the arrow schema is present in the kv_metadata, if configured to do so
107        if !global.skip_arrow_metadata
108            && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
109        {
110            return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
111        }
112
113        // add kv_meta, if any
114        if !key_value_metadata.is_empty() {
115            builder = builder.set_key_value_metadata(Some(
116                key_value_metadata
117                    .to_owned()
118                    .drain()
119                    .map(|(key, value)| KeyValue { key, value })
120                    .collect(),
121            ));
122        }
123
124        // Apply column-specific options:
125        for (column, options) in column_specific_options {
126            let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
127
128            if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
129                builder = builder
130                    .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
131            }
132
133            if let Some(encoding) = &options.encoding {
134                let parsed_encoding = parse_encoding_string(encoding)?;
135                builder = builder.set_column_encoding(path.clone(), parsed_encoding);
136            }
137
138            if let Some(dictionary_enabled) = options.dictionary_enabled {
139                builder = builder
140                    .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
141            }
142
143            if let Some(compression) = &options.compression {
144                let parsed_compression = parse_compression_string(compression)?;
145                builder =
146                    builder.set_column_compression(path.clone(), parsed_compression);
147            }
148
149            if let Some(statistics_enabled) = &options.statistics_enabled {
150                let parsed_value = parse_statistics_string(statistics_enabled)?;
151                builder =
152                    builder.set_column_statistics_enabled(path.clone(), parsed_value);
153            }
154
155            if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
156                builder =
157                    builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
158            }
159
160            if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
161                builder =
162                    builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
163            }
164
165            // max_statistics_size is deprecated, currently it is not being used
166            // TODO: remove once deprecated
167            #[allow(deprecated)]
168            if let Some(max_statistics_size) = options.max_statistics_size {
169                builder = {
170                    #[allow(deprecated)]
171                    builder.set_column_max_statistics_size(path, max_statistics_size)
172                }
173            }
174        }
175
176        Ok(builder)
177    }
178}
179
180/// Encodes the Arrow schema into the IPC format, and base64 encodes it
181///
182/// TODO: use extern parquet's private method, once publicly available.
183/// Refer to <https://github.com/apache/arrow-rs/pull/6916>
184fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
185    let options = arrow_ipc::writer::IpcWriteOptions::default();
186    let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
187    let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
188    let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
189        schema,
190        &mut dictionary_tracker,
191        &options,
192    );
193
194    // manually prepending the length to the schema as arrow uses the legacy IPC format
195    // TODO: change after addressing ARROW-9777
196    let schema_len = serialized_schema.ipc_message.len();
197    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
198    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
199    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
200    len_prefix_schema.append(&mut serialized_schema.ipc_message);
201
202    base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
203}
204
205impl ParquetOptions {
206    /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
207    ///
208    /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
209    /// applied per column; a customization which is not applicable for [`ParquetOptions`].
210    ///
211    /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
212    pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
213        #[allow(deprecated)]
214        let ParquetOptions {
215            data_pagesize_limit,
216            write_batch_size,
217            writer_version,
218            compression,
219            dictionary_enabled,
220            dictionary_page_size_limit,
221            statistics_enabled,
222            max_statistics_size,
223            max_row_group_size,
224            created_by,
225            column_index_truncate_length,
226            statistics_truncate_length,
227            data_page_row_count_limit,
228            encoding,
229            bloom_filter_on_write,
230            bloom_filter_fpp,
231            bloom_filter_ndv,
232
233            // not in WriterProperties
234            enable_page_index: _,
235            pruning: _,
236            skip_metadata: _,
237            metadata_size_hint: _,
238            pushdown_filters: _,
239            reorder_filters: _,
240            allow_single_file_parallelism: _,
241            maximum_parallel_row_group_writers: _,
242            maximum_buffered_record_batches_per_stream: _,
243            bloom_filter_on_read: _, // reads not used for writer props
244            schema_force_view_types: _,
245            binary_as_string: _, // not used for writer props
246            coerce_int96: _,     // not used for writer props
247            skip_arrow_metadata: _,
248        } = self;
249
250        let mut builder = WriterProperties::builder()
251            .set_data_page_size_limit(*data_pagesize_limit)
252            .set_write_batch_size(*write_batch_size)
253            .set_writer_version(parse_version_string(writer_version.as_str())?)
254            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
255            .set_statistics_enabled(
256                statistics_enabled
257                    .as_ref()
258                    .and_then(|s| parse_statistics_string(s).ok())
259                    .unwrap_or(DEFAULT_STATISTICS_ENABLED),
260            )
261            .set_max_row_group_size(*max_row_group_size)
262            .set_created_by(created_by.clone())
263            .set_column_index_truncate_length(*column_index_truncate_length)
264            .set_statistics_truncate_length(*statistics_truncate_length)
265            .set_data_page_row_count_limit(*data_page_row_count_limit)
266            .set_bloom_filter_enabled(*bloom_filter_on_write);
267
268        builder = {
269            #[allow(deprecated)]
270            builder.set_max_statistics_size(
271                max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
272            )
273        };
274
275        if let Some(bloom_filter_fpp) = bloom_filter_fpp {
276            builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
277        };
278        if let Some(bloom_filter_ndv) = bloom_filter_ndv {
279            builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
280        };
281        if let Some(dictionary_enabled) = dictionary_enabled {
282            builder = builder.set_dictionary_enabled(*dictionary_enabled);
283        };
284
285        // We do not have access to default ColumnProperties set in Arrow.
286        // Therefore, only overwrite if these settings exist.
287        if let Some(compression) = compression {
288            builder = builder.set_compression(parse_compression_string(compression)?);
289        }
290        if let Some(encoding) = encoding {
291            builder = builder.set_encoding(parse_encoding_string(encoding)?);
292        }
293
294        Ok(builder)
295    }
296}
297
298/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
299pub(crate) fn parse_encoding_string(
300    str_setting: &str,
301) -> Result<parquet::basic::Encoding> {
302    let str_setting_lower: &str = &str_setting.to_lowercase();
303    match str_setting_lower {
304        "plain" => Ok(parquet::basic::Encoding::PLAIN),
305        "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
306        "rle" => Ok(parquet::basic::Encoding::RLE),
307        #[allow(deprecated)]
308        "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
309        "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
310        "delta_length_byte_array" => {
311            Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
312        }
313        "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
314        "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
315        "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
316        _ => Err(DataFusionError::Configuration(format!(
317            "Unknown or unsupported parquet encoding: \
318        {str_setting}. Valid values are: plain, plain_dictionary, rle, \
319        bit_packed, delta_binary_packed, delta_length_byte_array, \
320        delta_byte_array, rle_dictionary, and byte_stream_split."
321        ))),
322    }
323}
324
325/// Splits compression string into compression codec and optional compression_level
326/// I.e. gzip(2) -> gzip, 2
327fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
328    // ignore string literal chars passed from sqlparser i.e. remove single quotes
329    let str_setting = str_setting.replace('\'', "");
330    let split_setting = str_setting.split_once('(');
331
332    match split_setting {
333        Some((codec, rh)) => {
334            let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
335                DataFusionError::Configuration(format!(
336                    "Could not parse compression string. \
337                    Got codec: {codec} and unknown level from {str_setting}"
338                ))
339            })?;
340            Ok((codec.to_owned(), Some(*level)))
341        }
342        None => Ok((str_setting.to_owned(), None)),
343    }
344}
345
346/// Helper to ensure compression codecs which don't support levels
347/// don't have one set. E.g. snappy(2) is invalid.
348fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
349    if level.is_some() {
350        return Err(DataFusionError::Configuration(format!(
351            "Compression {codec} does not support specifying a level"
352        )));
353    }
354    Ok(())
355}
356
357/// Helper to ensure compression codecs which require a level
358/// do have one set. E.g. zstd is invalid, zstd(3) is valid
359fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
360    level.ok_or(DataFusionError::Configuration(format!(
361        "{codec} compression requires specifying a level such as {codec}(4)"
362    )))
363}
364
365/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
366pub fn parse_compression_string(
367    str_setting: &str,
368) -> Result<parquet::basic::Compression> {
369    let str_setting_lower: &str = &str_setting.to_lowercase();
370    let (codec, level) = split_compression_string(str_setting_lower)?;
371    let codec = codec.as_str();
372    match codec {
373        "uncompressed" => {
374            check_level_is_none(codec, &level)?;
375            Ok(parquet::basic::Compression::UNCOMPRESSED)
376        }
377        "snappy" => {
378            check_level_is_none(codec, &level)?;
379            Ok(parquet::basic::Compression::SNAPPY)
380        }
381        "gzip" => {
382            let level = require_level(codec, level)?;
383            Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
384                level,
385            )?))
386        }
387        "lzo" => {
388            check_level_is_none(codec, &level)?;
389            Ok(parquet::basic::Compression::LZO)
390        }
391        "brotli" => {
392            let level = require_level(codec, level)?;
393            Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
394                level,
395            )?))
396        }
397        "lz4" => {
398            check_level_is_none(codec, &level)?;
399            Ok(parquet::basic::Compression::LZ4)
400        }
401        "zstd" => {
402            let level = require_level(codec, level)?;
403            Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
404                level as i32,
405            )?))
406        }
407        "lz4_raw" => {
408            check_level_is_none(codec, &level)?;
409            Ok(parquet::basic::Compression::LZ4_RAW)
410        }
411        _ => Err(DataFusionError::Configuration(format!(
412            "Unknown or unsupported parquet compression: \
413        {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
414        lzo, brotli(level), lz4, zstd(level), and lz4_raw."
415        ))),
416    }
417}
418
419pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
420    let str_setting_lower: &str = &str_setting.to_lowercase();
421    match str_setting_lower {
422        "1.0" => Ok(WriterVersion::PARQUET_1_0),
423        "2.0" => Ok(WriterVersion::PARQUET_2_0),
424        _ => Err(DataFusionError::Configuration(format!(
425            "Unknown or unsupported parquet writer version {str_setting} \
426            valid options are 1.0 and 2.0"
427        ))),
428    }
429}
430
431pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
432    let str_setting_lower: &str = &str_setting.to_lowercase();
433    match str_setting_lower {
434        "none" => Ok(EnabledStatistics::None),
435        "chunk" => Ok(EnabledStatistics::Chunk),
436        "page" => Ok(EnabledStatistics::Page),
437        _ => Err(DataFusionError::Configuration(format!(
438            "Unknown or unsupported parquet statistics setting {str_setting} \
439            valid options are none, page, and chunk"
440        ))),
441    }
442}
443
444#[cfg(feature = "parquet")]
445#[cfg(test)]
446mod tests {
447    use parquet::{
448        basic::Compression,
449        file::properties::{
450            BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
451            DEFAULT_BLOOM_FILTER_NDV,
452        },
453    };
454    use std::collections::HashMap;
455
456    use super::*;
457    use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
458    #[cfg(feature = "parquet_encryption")]
459    use crate::encryption::map_encryption_to_config_encryption;
460
461    const COL_NAME: &str = "configured";
462
463    /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
464    fn column_options_with_non_defaults(
465        src_col_defaults: &ParquetOptions,
466    ) -> ParquetColumnOptions {
467        #[allow(deprecated)] // max_statistics_size
468        ParquetColumnOptions {
469            compression: Some("zstd(22)".into()),
470            dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
471            statistics_enabled: Some("none".into()),
472            max_statistics_size: Some(72),
473            encoding: Some("RLE".into()),
474            bloom_filter_enabled: Some(true),
475            bloom_filter_fpp: Some(0.72),
476            bloom_filter_ndv: Some(72),
477        }
478    }
479
480    fn parquet_options_with_non_defaults() -> ParquetOptions {
481        let defaults = ParquetOptions::default();
482        let writer_version = if defaults.writer_version.eq("1.0") {
483            "2.0"
484        } else {
485            "1.0"
486        };
487
488        #[allow(deprecated)] // max_statistics_size
489        ParquetOptions {
490            data_pagesize_limit: 42,
491            write_batch_size: 42,
492            writer_version: writer_version.into(),
493            compression: Some("zstd(22)".into()),
494            dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
495            dictionary_page_size_limit: 42,
496            statistics_enabled: Some("chunk".into()),
497            max_statistics_size: Some(42),
498            max_row_group_size: 42,
499            created_by: "wordy".into(),
500            column_index_truncate_length: Some(42),
501            statistics_truncate_length: Some(42),
502            data_page_row_count_limit: 42,
503            encoding: Some("BYTE_STREAM_SPLIT".into()),
504            bloom_filter_on_write: !defaults.bloom_filter_on_write,
505            bloom_filter_fpp: Some(0.42),
506            bloom_filter_ndv: Some(42),
507
508            // not in WriterProperties, but itemizing here to not skip newly added props
509            enable_page_index: defaults.enable_page_index,
510            pruning: defaults.pruning,
511            skip_metadata: defaults.skip_metadata,
512            metadata_size_hint: defaults.metadata_size_hint,
513            pushdown_filters: defaults.pushdown_filters,
514            reorder_filters: defaults.reorder_filters,
515            allow_single_file_parallelism: defaults.allow_single_file_parallelism,
516            maximum_parallel_row_group_writers: defaults
517                .maximum_parallel_row_group_writers,
518            maximum_buffered_record_batches_per_stream: defaults
519                .maximum_buffered_record_batches_per_stream,
520            bloom_filter_on_read: defaults.bloom_filter_on_read,
521            schema_force_view_types: defaults.schema_force_view_types,
522            binary_as_string: defaults.binary_as_string,
523            skip_arrow_metadata: defaults.skip_arrow_metadata,
524            coerce_int96: None,
525        }
526    }
527
528    fn extract_column_options(
529        props: &WriterProperties,
530        col: ColumnPath,
531    ) -> ParquetColumnOptions {
532        let bloom_filter_default_props = props.bloom_filter_properties(&col);
533
534        #[allow(deprecated)] // max_statistics_size
535        ParquetColumnOptions {
536            bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
537            encoding: props.encoding(&col).map(|s| s.to_string()),
538            dictionary_enabled: Some(props.dictionary_enabled(&col)),
539            compression: match props.compression(&col) {
540                Compression::ZSTD(lvl) => {
541                    Some(format!("zstd({})", lvl.compression_level()))
542                }
543                _ => None,
544            },
545            statistics_enabled: Some(
546                match props.statistics_enabled(&col) {
547                    EnabledStatistics::None => "none",
548                    EnabledStatistics::Chunk => "chunk",
549                    EnabledStatistics::Page => "page",
550                }
551                .into(),
552            ),
553            bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
554            bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
555            max_statistics_size: Some(props.max_statistics_size(&col)),
556        }
557    }
558
559    /// For testing only, take a single write's props and convert back into the session config.
560    /// (use identity to confirm correct.)
561    fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
562        let default_col = ColumnPath::from("col doesn't have specific config");
563        let default_col_props = extract_column_options(props, default_col);
564
565        let configured_col = ColumnPath::from(COL_NAME);
566        let configured_col_props = extract_column_options(props, configured_col);
567
568        let key_value_metadata = props
569            .key_value_metadata()
570            .map(|pairs| {
571                HashMap::from_iter(
572                    pairs
573                        .iter()
574                        .cloned()
575                        .map(|KeyValue { key, value }| (key, value)),
576                )
577            })
578            .unwrap_or_default();
579
580        let global_options_defaults = ParquetOptions::default();
581
582        let column_specific_options = if configured_col_props.eq(&default_col_props) {
583            HashMap::default()
584        } else {
585            HashMap::from([(COL_NAME.into(), configured_col_props)])
586        };
587
588        #[cfg(feature = "parquet_encryption")]
589        let fep = map_encryption_to_config_encryption(props.file_encryption_properties());
590        #[cfg(not(feature = "parquet_encryption"))]
591        let fep = None;
592
593        #[allow(deprecated)] // max_statistics_size
594        TableParquetOptions {
595            global: ParquetOptions {
596                // global options
597                data_pagesize_limit: props.dictionary_page_size_limit(),
598                write_batch_size: props.write_batch_size(),
599                writer_version: format!("{}.0", props.writer_version().as_num()),
600                dictionary_page_size_limit: props.dictionary_page_size_limit(),
601                max_row_group_size: props.max_row_group_size(),
602                created_by: props.created_by().to_string(),
603                column_index_truncate_length: props.column_index_truncate_length(),
604                statistics_truncate_length: props.statistics_truncate_length(),
605                data_page_row_count_limit: props.data_page_row_count_limit(),
606
607                // global options which set the default column props
608                encoding: default_col_props.encoding,
609                compression: default_col_props.compression,
610                dictionary_enabled: default_col_props.dictionary_enabled,
611                statistics_enabled: default_col_props.statistics_enabled,
612                max_statistics_size: default_col_props.max_statistics_size,
613                bloom_filter_on_write: default_col_props
614                    .bloom_filter_enabled
615                    .unwrap_or_default(),
616                bloom_filter_fpp: default_col_props.bloom_filter_fpp,
617                bloom_filter_ndv: default_col_props.bloom_filter_ndv,
618
619                // not in WriterProperties
620                enable_page_index: global_options_defaults.enable_page_index,
621                pruning: global_options_defaults.pruning,
622                skip_metadata: global_options_defaults.skip_metadata,
623                metadata_size_hint: global_options_defaults.metadata_size_hint,
624                pushdown_filters: global_options_defaults.pushdown_filters,
625                reorder_filters: global_options_defaults.reorder_filters,
626                allow_single_file_parallelism: global_options_defaults
627                    .allow_single_file_parallelism,
628                maximum_parallel_row_group_writers: global_options_defaults
629                    .maximum_parallel_row_group_writers,
630                maximum_buffered_record_batches_per_stream: global_options_defaults
631                    .maximum_buffered_record_batches_per_stream,
632                bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
633                schema_force_view_types: global_options_defaults.schema_force_view_types,
634                binary_as_string: global_options_defaults.binary_as_string,
635                skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
636                coerce_int96: None,
637            },
638            column_specific_options,
639            key_value_metadata,
640            crypto: ParquetEncryptionOptions {
641                file_encryption: fep,
642                file_decryption: None,
643            },
644        }
645    }
646
647    #[test]
648    fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
649        // TableParquetOptions, all props set to default
650        let mut table_parquet_opts = TableParquetOptions::default();
651        assert!(
652            !table_parquet_opts.global.skip_arrow_metadata,
653            "default false, to not skip the arrow schema requirement"
654        );
655
656        // see errors without the schema added, using default settings
657        let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
658        assert!(
659            should_error.is_err(),
660            "should error without the required arrow schema in kv_metadata",
661        );
662
663        // succeeds if we permit skipping the arrow schema
664        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
665        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
666        assert!(
667            should_succeed.is_ok(),
668            "should work with the arrow schema skipped by config",
669        );
670
671        // Set the arrow schema back to required
672        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
673        // add the arrow schema to the kv_meta
674        table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
675        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
676        assert!(
677            should_succeed.is_ok(),
678            "should work with the arrow schema included in TableParquetOptions",
679        );
680    }
681
682    #[test]
683    fn table_parquet_opts_to_writer_props() {
684        // ParquetOptions, all props set to non-default
685        let parquet_options = parquet_options_with_non_defaults();
686
687        // TableParquetOptions, using ParquetOptions for global settings
688        let key = ARROW_SCHEMA_META_KEY.to_string();
689        let value = Some("bar".into());
690        let table_parquet_opts = TableParquetOptions {
691            global: parquet_options.clone(),
692            column_specific_options: [(
693                COL_NAME.into(),
694                column_options_with_non_defaults(&parquet_options),
695            )]
696            .into(),
697            key_value_metadata: [(key, value)].into(),
698            crypto: Default::default(),
699        };
700
701        let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
702            .unwrap()
703            .build();
704        assert_eq!(
705            table_parquet_opts,
706            session_config_from_writer_props(&writer_props),
707            "the writer_props should have the same configuration as the session's TableParquetOptions",
708        );
709    }
710
711    /// Ensure that the configuration defaults for writing parquet files are
712    /// consistent with the options in arrow-rs
713    #[test]
714    fn test_defaults_match() {
715        // ensure the global settings are the same
716        let mut default_table_writer_opts = TableParquetOptions::default();
717        let default_parquet_opts = ParquetOptions::default();
718        assert_eq!(
719            default_table_writer_opts.global,
720            default_parquet_opts,
721            "should have matching defaults for TableParquetOptions.global and ParquetOptions",
722        );
723
724        // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
725        default_table_writer_opts =
726            default_table_writer_opts.with_skip_arrow_metadata(true);
727
728        // WriterProperties::default, a.k.a. using extern parquet's defaults
729        let default_writer_props = WriterProperties::new();
730
731        // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
732        let from_datafusion_defaults =
733            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
734                .unwrap()
735                .build();
736
737        // Expected: how the defaults should not match
738        assert_ne!(
739            default_writer_props.created_by(),
740            from_datafusion_defaults.created_by(),
741            "should have different created_by sources",
742        );
743        assert!(
744            default_writer_props.created_by().starts_with("parquet-rs version"),
745            "should indicate that writer_props defaults came from the extern parquet crate",
746        );
747        assert!(
748            default_table_writer_opts
749                .global
750                .created_by
751                .starts_with("datafusion version"),
752            "should indicate that table_parquet_opts defaults came from datafusion",
753        );
754
755        // Expected: the datafusion default compression is different from arrow-rs's parquet
756        assert_eq!(
757            default_writer_props.compression(&"default".into()),
758            Compression::UNCOMPRESSED,
759            "extern parquet's default is None"
760        );
761        assert!(
762            matches!(
763                from_datafusion_defaults.compression(&"default".into()),
764                Compression::ZSTD(_)
765            ),
766            "datafusion's default is zstd"
767        );
768
769        // Expected: the remaining should match
770        let same_created_by = default_table_writer_opts.global.created_by.clone();
771        let mut from_extern_parquet =
772            session_config_from_writer_props(&default_writer_props);
773        from_extern_parquet.global.created_by = same_created_by;
774        from_extern_parquet.global.compression = Some("zstd(3)".into());
775        from_extern_parquet.global.skip_arrow_metadata = true;
776
777        assert_eq!(
778            default_table_writer_opts,
779            from_extern_parquet,
780            "the default writer_props should have the same configuration as the session's default TableParquetOptions",
781        );
782    }
783
784    #[test]
785    fn test_bloom_filter_defaults() {
786        // the TableParquetOptions::default, with only the bloom filter turned on
787        let mut default_table_writer_opts = TableParquetOptions::default();
788        default_table_writer_opts.global.bloom_filter_on_write = true;
789        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
790        let from_datafusion_defaults =
791            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
792                .unwrap()
793                .build();
794
795        // the WriterProperties::default, with only the bloom filter turned on
796        let default_writer_props = WriterProperties::builder()
797            .set_bloom_filter_enabled(true)
798            .build();
799
800        assert_eq!(
801            default_writer_props.bloom_filter_properties(&"default".into()),
802            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
803            "parquet and datafusion props, should have the same bloom filter props",
804        );
805        assert_eq!(
806            default_writer_props.bloom_filter_properties(&"default".into()),
807            Some(&BloomFilterProperties::default()),
808            "should use the default bloom filter props"
809        );
810    }
811
812    #[test]
813    fn test_bloom_filter_set_fpp_only() {
814        // the TableParquetOptions::default, with only fpp set
815        let mut default_table_writer_opts = TableParquetOptions::default();
816        default_table_writer_opts.global.bloom_filter_on_write = true;
817        default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
818        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
819        let from_datafusion_defaults =
820            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
821                .unwrap()
822                .build();
823
824        // the WriterProperties::default, with only fpp set
825        let default_writer_props = WriterProperties::builder()
826            .set_bloom_filter_enabled(true)
827            .set_bloom_filter_fpp(0.42)
828            .build();
829
830        assert_eq!(
831            default_writer_props.bloom_filter_properties(&"default".into()),
832            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
833            "parquet and datafusion props, should have the same bloom filter props",
834        );
835        assert_eq!(
836            default_writer_props.bloom_filter_properties(&"default".into()),
837            Some(&BloomFilterProperties {
838                fpp: 0.42,
839                ndv: DEFAULT_BLOOM_FILTER_NDV
840            }),
841            "should have only the fpp set, and the ndv at default",
842        );
843    }
844
845    #[test]
846    fn test_bloom_filter_set_ndv_only() {
847        // the TableParquetOptions::default, with only ndv set
848        let mut default_table_writer_opts = TableParquetOptions::default();
849        default_table_writer_opts.global.bloom_filter_on_write = true;
850        default_table_writer_opts.global.bloom_filter_ndv = Some(42);
851        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
852        let from_datafusion_defaults =
853            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
854                .unwrap()
855                .build();
856
857        // the WriterProperties::default, with only ndv set
858        let default_writer_props = WriterProperties::builder()
859            .set_bloom_filter_enabled(true)
860            .set_bloom_filter_ndv(42)
861            .build();
862
863        assert_eq!(
864            default_writer_props.bloom_filter_properties(&"default".into()),
865            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
866            "parquet and datafusion props, should have the same bloom filter props",
867        );
868        assert_eq!(
869            default_writer_props.bloom_filter_properties(&"default".into()),
870            Some(&BloomFilterProperties {
871                fpp: DEFAULT_BLOOM_FILTER_FPP,
872                ndv: 42
873            }),
874            "should have only the ndv set, and the fpp at default",
875        );
876    }
877}