Skip to main content

polars_parquet/arrow/write/
mod.rs

1//! APIs to write to Parquet format.
2//!
3//! # Arrow/Parquet Interoperability
4//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet
6//! representation. These include but are not limited to:
7//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`
8//! * `ArrowDataType::Int64`
9//! * `ArrowDataType::Duration`
10//! * `ArrowDataType::Date64`
11//! * `ArrowDataType::Time32(TimeUnit::Second)`
12//!
13//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::bitmap::Bitmap;
30use arrow::datatypes::*;
31use arrow::types::{NativeType, days_ms, i256};
32pub use nested::{num_values, write_rep_and_def};
33pub use pages::{to_leaves, to_nested, to_parquet_leaves};
34use polars_utils::float16::pf16;
35use polars_utils::pl_str::PlSmallStr;
36pub use utils::write_def_levels;
37
38pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
39pub use crate::parquet::encoding::Encoding;
40pub use crate::parquet::metadata::{
41    Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
42};
43pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
44use crate::parquet::schema::Repetition;
45use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
46pub use crate::parquet::schema::types::{
47    FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
48};
49pub use crate::parquet::write::{
50    Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
51    write_metadata_sidecar,
52};
53pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
54use crate::write::fixed_size_binary::build_statistics_float16;
55
56/// The statistics to write
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
60pub struct StatisticsOptions {
61    pub min_value: bool,
62    pub max_value: bool,
63    pub distinct_count: bool,
64    pub null_count: bool,
65}
66
67impl Default for StatisticsOptions {
68    fn default() -> Self {
69        Self {
70            min_value: true,
71            max_value: true,
72            distinct_count: false,
73            null_count: true,
74        }
75    }
76}
77
78/// Options to encode an array
79#[derive(Clone, Copy)]
80pub enum EncodeNullability {
81    Required,
82    Optional,
83}
84
85/// Currently supported options to write to parquet
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct WriteOptions {
88    /// Whether to write statistics
89    pub statistics: StatisticsOptions,
90    /// The page and file version to use
91    pub version: Version,
92    /// The compression to apply to every page
93    pub compression: CompressionOptions,
94    /// The size to flush a page, defaults to 1024 * 1024 if None
95    pub data_page_size: Option<usize>,
96}
97
98use arrow::compute::aggregate::estimated_bytes_size;
99use arrow::match_integer_type;
100pub use file::FileWriter;
101pub use pages::{Nested, array_to_columns, arrays_to_columns};
102use polars_error::{PolarsResult, polars_bail};
103pub use row_group::{RowGroupIterator, row_group_iter};
104pub use schema::{schema_to_metadata_key, to_parquet_type};
105
106use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
107use crate::write::dictionary::encode_as_dictionary_optional;
108
109impl StatisticsOptions {
110    pub fn empty() -> Self {
111        Self {
112            min_value: false,
113            max_value: false,
114            distinct_count: false,
115            null_count: false,
116        }
117    }
118
119    pub fn full() -> Self {
120        Self {
121            min_value: true,
122            max_value: true,
123            distinct_count: true,
124            null_count: true,
125        }
126    }
127
128    pub fn is_empty(&self) -> bool {
129        !(self.min_value || self.max_value || self.distinct_count || self.null_count)
130    }
131
132    pub fn is_full(&self) -> bool {
133        self.min_value && self.max_value && self.distinct_count && self.null_count
134    }
135}
136
137impl WriteOptions {
138    pub fn has_statistics(&self) -> bool {
139        !self.statistics.is_empty()
140    }
141}
142
143impl EncodeNullability {
144    const fn new(is_optional: bool) -> Self {
145        if is_optional {
146            Self::Optional
147        } else {
148            Self::Required
149        }
150    }
151
152    fn is_optional(self) -> bool {
153        matches!(self, Self::Optional)
154    }
155}
156
157/// `data_page_size`: Set a target threshold for the approximate encoded size of data
158/// pages within a column chunk (in bytes). If None, use the default data page size of 1MByte.
159/// See: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html
160pub(crate) fn row_slice_ranges(
161    number_of_rows: usize,
162    byte_size: usize,
163    options: WriteOptions,
164) -> impl Iterator<Item = (usize, usize)> {
165    const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; // 1 MB
166    let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
167    let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
168
169    let bytes_per_row = if number_of_rows == 0 {
170        0
171    } else {
172        ((byte_size as f64) / (number_of_rows as f64)) as usize
173    };
174    let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
175
176    (0..number_of_rows)
177        .step_by(rows_per_page)
178        .map(move |offset| {
179            let length = (offset + rows_per_page).min(number_of_rows) - offset;
180            (offset, length)
181        })
182}
183
184/// returns offset and length to slice the leaf values
185pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
186    // find the deepest recursive dremel structure as that one determines how many values we must
187    // take
188    let mut out = (0, 0);
189    for nested in nested.iter().rev() {
190        match nested {
191            Nested::LargeList(l_nested) => {
192                let start = *l_nested.offsets.first();
193                let end = *l_nested.offsets.last();
194                return (start as usize, (end - start) as usize);
195            },
196            Nested::List(l_nested) => {
197                let start = *l_nested.offsets.first();
198                let end = *l_nested.offsets.last();
199                return (start as usize, (end - start) as usize);
200            },
201            Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
202            Nested::Primitive(nested) => out = (0, nested.length),
203            Nested::Struct(_) => {},
204        }
205    }
206    out
207}
208
209fn decimal_length_from_precision(precision: usize) -> usize {
210    // digits = floor(log_10(2^(8*n - 1) - 1))
211    // ceil(digits) = log10(2^(8*n - 1) - 1)
212    // 10^ceil(digits) = 2^(8*n - 1) - 1
213    // 10^ceil(digits) + 1 = 2^(8*n - 1)
214    // log2(10^ceil(digits) + 1) = (8*n - 1)
215    // log2(10^ceil(digits) + 1) + 1 = 8*n
216    // (log2(10^ceil(a) + 1) + 1) / 8 = n
217    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
218}
219
220/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].
221pub fn to_parquet_schema(schema: &ArrowSchema) -> PolarsResult<SchemaDescriptor> {
222    let parquet_types = schema
223        .iter_values()
224        .map(to_parquet_type)
225        .collect::<PolarsResult<Vec<_>>>()?;
226    Ok(SchemaDescriptor::new(
227        PlSmallStr::from_static("root"),
228        parquet_types,
229    ))
230}
231
232/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
233pub fn slice_parquet_array(
234    primitive_array: &mut dyn Array,
235    nested: &mut [Nested],
236    mut current_offset: usize,
237    mut current_length: usize,
238) {
239    for nested in nested.iter_mut() {
240        match nested {
241            Nested::LargeList(l_nested) => {
242                l_nested.offsets.slice(current_offset, current_length + 1);
243                if let Some(validity) = l_nested.validity.as_mut() {
244                    validity.slice(current_offset, current_length)
245                };
246
247                // Update the offset/ length so that the Primitive is sliced properly.
248                current_length = l_nested.offsets.range() as usize;
249                current_offset = *l_nested.offsets.first() as usize;
250            },
251            Nested::List(l_nested) => {
252                l_nested.offsets.slice(current_offset, current_length + 1);
253                if let Some(validity) = l_nested.validity.as_mut() {
254                    validity.slice(current_offset, current_length)
255                };
256
257                // Update the offset/ length so that the Primitive is sliced properly.
258                current_length = l_nested.offsets.range() as usize;
259                current_offset = *l_nested.offsets.first() as usize;
260            },
261            Nested::Struct(StructNested {
262                validity, length, ..
263            }) => {
264                *length = current_length;
265                if let Some(validity) = validity.as_mut() {
266                    validity.slice(current_offset, current_length)
267                };
268            },
269            Nested::Primitive(PrimitiveNested {
270                validity, length, ..
271            }) => {
272                *length = current_length;
273                if let Some(validity) = validity.as_mut() {
274                    validity.slice(current_offset, current_length)
275                };
276                primitive_array.slice(current_offset, current_length);
277            },
278            Nested::FixedSizeList(FixedSizeListNested {
279                validity,
280                length,
281                width,
282                ..
283            }) => {
284                if let Some(validity) = validity.as_mut() {
285                    validity.slice(current_offset, current_length)
286                };
287                *length = current_length;
288                // Update the offset/ length so that the Primitive is sliced properly.
289                current_length *= *width;
290                current_offset *= *width;
291            },
292        }
293    }
294}
295
296/// Get the length of [`Array`] that should be sliced.
297pub fn get_max_length(nested: &[Nested]) -> usize {
298    let mut length = 0;
299    for nested in nested.iter() {
300        match nested {
301            Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
302            Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
303            Nested::FixedSizeList(nested) => length += nested.length * nested.width,
304            _ => {},
305        }
306    }
307    length
308}
309
310/// Returns an iterator of [`Page`].
311pub fn array_to_pages(
312    primitive_array: &dyn Array,
313    type_: ParquetPrimitiveType,
314    nested: &[Nested],
315    options: WriteOptions,
316    mut encoding: Encoding,
317) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
318    if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_storage() {
319        return match_integer_type!(key_type, |$T| {
320            dictionary::array_to_pages::<$T>(
321                primitive_array.as_any().downcast_ref().unwrap(),
322                type_,
323                &nested,
324                options,
325                encoding,
326            )
327        });
328    };
329    if let Encoding::RleDictionary = encoding {
330        // Only take this path for primitive columns
331        if matches!(nested.first(), Some(Nested::Primitive(_))) {
332            if let Some(result) =
333                encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
334            {
335                return result;
336            }
337        }
338
339        // We didn't succeed, fallback to plain
340        encoding = Encoding::Plain;
341    }
342
343    let nested = nested.to_vec();
344    let number_of_rows = nested[0].len();
345    // note: this is not correct if the array is sliced - the estimation should happen on the
346    // primitive after sliced for parquet
347    let byte_size = estimated_bytes_size(primitive_array);
348    let primitive_array = primitive_array.to_boxed();
349
350    let pages =
351        row_slice_ranges(number_of_rows, byte_size, options).map(move |(offset, length)| {
352            let mut right_array = primitive_array.clone();
353            let mut right_nested = nested.clone();
354            slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
355
356            array_to_page(
357                right_array.as_ref(),
358                type_.clone(),
359                &right_nested,
360                options,
361                encoding,
362            )
363        });
364    Ok(DynIter::new(pages))
365}
366
367/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
368pub fn array_to_page(
369    array: &dyn Array,
370    type_: ParquetPrimitiveType,
371    nested: &[Nested],
372    options: WriteOptions,
373    encoding: Encoding,
374) -> PolarsResult<Page> {
375    if nested.len() == 1 {
376        // special case where validity == def levels
377        return array_to_page_simple(array, type_, options, encoding);
378    }
379    array_to_page_nested(array, type_, nested, options, encoding)
380}
381
382/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
383pub fn array_to_page_simple(
384    array: &dyn Array,
385    type_: ParquetPrimitiveType,
386    options: WriteOptions,
387    encoding: Encoding,
388) -> PolarsResult<Page> {
389    let dtype = array.dtype();
390
391    if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
392        polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
393    }
394
395    match dtype {
396        // Map empty struct to boolean array with same validity.
397        ArrowDataType::Struct(fs) if fs.is_empty() => boolean::array_to_page(
398            &BooleanArray::new(
399                ArrowDataType::Boolean,
400                Bitmap::new_zeroed(array.len()),
401                array.validity().cloned(),
402            ),
403            options,
404            type_,
405            encoding,
406        ),
407
408        ArrowDataType::Boolean => boolean::array_to_page(
409            array.as_any().downcast_ref().unwrap(),
410            options,
411            type_,
412            encoding,
413        ),
414        // casts below MUST match the casts done at the metadata (field -> parquet type).
415        ArrowDataType::UInt8 => {
416            return primitive::array_to_page_integer::<u8, i32>(
417                array.as_any().downcast_ref().unwrap(),
418                options,
419                type_,
420                encoding,
421            );
422        },
423        ArrowDataType::UInt16 => {
424            return primitive::array_to_page_integer::<u16, i32>(
425                array.as_any().downcast_ref().unwrap(),
426                options,
427                type_,
428                encoding,
429            );
430        },
431        ArrowDataType::UInt32 => {
432            return primitive::array_to_page_integer::<u32, i32>(
433                array.as_any().downcast_ref().unwrap(),
434                options,
435                type_,
436                encoding,
437            );
438        },
439        ArrowDataType::UInt64 => {
440            return primitive::array_to_page_integer::<u64, i64>(
441                array.as_any().downcast_ref().unwrap(),
442                options,
443                type_,
444                encoding,
445            );
446        },
447        ArrowDataType::Int8 => {
448            return primitive::array_to_page_integer::<i8, i32>(
449                array.as_any().downcast_ref().unwrap(),
450                options,
451                type_,
452                encoding,
453            );
454        },
455        ArrowDataType::Int16 => {
456            return primitive::array_to_page_integer::<i16, i32>(
457                array.as_any().downcast_ref().unwrap(),
458                options,
459                type_,
460                encoding,
461            );
462        },
463        ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
464            return primitive::array_to_page_integer::<i32, i32>(
465                array.as_any().downcast_ref().unwrap(),
466                options,
467                type_,
468                encoding,
469            );
470        },
471        ArrowDataType::Int64
472        | ArrowDataType::Date64
473        | ArrowDataType::Time64(_)
474        | ArrowDataType::Timestamp(_, _)
475        | ArrowDataType::Duration(_) => {
476            return primitive::array_to_page_integer::<i64, i64>(
477                array.as_any().downcast_ref().unwrap(),
478                options,
479                type_,
480                encoding,
481            );
482        },
483        ArrowDataType::Float16 => {
484            let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();
485            let statistics = options
486                .has_statistics()
487                .then(|| build_statistics_float16(array, type_.clone(), &options.statistics));
488            let array = FixedSizeBinaryArray::new(
489                ArrowDataType::FixedSizeBinary(2),
490                array.values().clone().try_transmute().unwrap(),
491                array.validity().cloned(),
492            );
493            fixed_size_binary::array_to_page(&array, options, type_, statistics)
494        },
495        ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
496            array.as_any().downcast_ref().unwrap(),
497            options,
498            type_,
499        ),
500        ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
501            array.as_any().downcast_ref().unwrap(),
502            options,
503            type_,
504        ),
505        ArrowDataType::LargeUtf8 => {
506            let array =
507                polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
508                    .unwrap();
509            return binary::array_to_page::<i64>(
510                array.as_any().downcast_ref().unwrap(),
511                options,
512                type_,
513                encoding,
514            );
515        },
516        ArrowDataType::LargeBinary => {
517            return binary::array_to_page::<i64>(
518                array.as_any().downcast_ref().unwrap(),
519                options,
520                type_,
521                encoding,
522            );
523        },
524        ArrowDataType::BinaryView => {
525            return binview::array_to_page(
526                array.as_any().downcast_ref().unwrap(),
527                options,
528                type_,
529                encoding,
530            );
531        },
532        ArrowDataType::Utf8View => {
533            let array =
534                polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
535                    .unwrap();
536            return binview::array_to_page(
537                array.as_any().downcast_ref().unwrap(),
538                options,
539                type_,
540                encoding,
541            );
542        },
543        ArrowDataType::Null => {
544            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
545            primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
546        },
547        ArrowDataType::Interval(IntervalUnit::YearMonth) => {
548            let array = array
549                .as_any()
550                .downcast_ref::<PrimitiveArray<i32>>()
551                .unwrap();
552            let mut values = Vec::<u8>::with_capacity(12 * array.len());
553            array.values().iter().for_each(|x| {
554                let bytes = &x.to_le_bytes();
555                values.extend_from_slice(bytes);
556                values.extend_from_slice(&[0; 8]);
557            });
558            let array = FixedSizeBinaryArray::new(
559                ArrowDataType::FixedSizeBinary(12),
560                values.into(),
561                array.validity().cloned(),
562            );
563            let statistics = if options.has_statistics() {
564                Some(fixed_size_binary::build_statistics(
565                    &array,
566                    type_.clone(),
567                    &options.statistics,
568                ))
569            } else {
570                None
571            };
572            fixed_size_binary::array_to_page(&array, options, type_, statistics)
573        },
574        ArrowDataType::Interval(IntervalUnit::DayTime) => {
575            let array = array
576                .as_any()
577                .downcast_ref::<PrimitiveArray<days_ms>>()
578                .unwrap();
579            let mut values = Vec::<u8>::with_capacity(12 * array.len());
580            array.values().iter().for_each(|x| {
581                let bytes = &x.to_le_bytes();
582                values.extend_from_slice(&[0; 4]); // months
583                values.extend_from_slice(bytes); // days and seconds
584            });
585            let array = FixedSizeBinaryArray::new(
586                ArrowDataType::FixedSizeBinary(12),
587                values.into(),
588                array.validity().cloned(),
589            );
590            let statistics = if options.has_statistics() {
591                Some(fixed_size_binary::build_statistics(
592                    &array,
593                    type_.clone(),
594                    &options.statistics,
595                ))
596            } else {
597                None
598            };
599            fixed_size_binary::array_to_page(&array, options, type_, statistics)
600        },
601        ArrowDataType::FixedSizeBinary(_) => {
602            let array = array.as_any().downcast_ref().unwrap();
603            let statistics = if options.has_statistics() {
604                Some(fixed_size_binary::build_statistics(
605                    array,
606                    type_.clone(),
607                    &options.statistics,
608                ))
609            } else {
610                None
611            };
612
613            fixed_size_binary::array_to_page(array, options, type_, statistics)
614        },
615        ArrowDataType::Decimal256(precision, _) => {
616            let precision = *precision;
617            let array = array
618                .as_any()
619                .downcast_ref::<PrimitiveArray<i256>>()
620                .unwrap();
621            if precision <= 9 {
622                let values = array
623                    .values()
624                    .iter()
625                    .map(|x| x.0.as_i32())
626                    .collect::<Vec<_>>()
627                    .into();
628
629                let array = PrimitiveArray::<i32>::new(
630                    ArrowDataType::Int32,
631                    values,
632                    array.validity().cloned(),
633                );
634                return primitive::array_to_page_integer::<i32, i32>(
635                    &array, options, type_, encoding,
636                );
637            } else if precision <= 18 {
638                let values = array
639                    .values()
640                    .iter()
641                    .map(|x| x.0.as_i64())
642                    .collect::<Vec<_>>()
643                    .into();
644
645                let array = PrimitiveArray::<i64>::new(
646                    ArrowDataType::Int64,
647                    values,
648                    array.validity().cloned(),
649                );
650                return primitive::array_to_page_integer::<i64, i64>(
651                    &array, options, type_, encoding,
652                );
653            } else if precision <= 38 {
654                let size = decimal_length_from_precision(precision);
655                let statistics = if options.has_statistics() {
656                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
657                        array,
658                        type_.clone(),
659                        size,
660                        &options.statistics,
661                    );
662                    Some(stats)
663                } else {
664                    None
665                };
666
667                let mut values = Vec::<u8>::with_capacity(size * array.len());
668                array.values().iter().for_each(|x| {
669                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
670                    values.extend_from_slice(bytes)
671                });
672                let array = FixedSizeBinaryArray::new(
673                    ArrowDataType::FixedSizeBinary(size),
674                    values.into(),
675                    array.validity().cloned(),
676                );
677                fixed_size_binary::array_to_page(&array, options, type_, statistics)
678            } else {
679                let size = 32;
680                let array = array
681                    .as_any()
682                    .downcast_ref::<PrimitiveArray<i256>>()
683                    .unwrap();
684                let statistics = if options.has_statistics() {
685                    let stats = fixed_size_binary::build_statistics_decimal256(
686                        array,
687                        type_.clone(),
688                        size,
689                        &options.statistics,
690                    );
691                    Some(stats)
692                } else {
693                    None
694                };
695                let mut values = Vec::<u8>::with_capacity(size * array.len());
696                array.values().iter().for_each(|x| {
697                    let bytes = &x.to_be_bytes();
698                    values.extend_from_slice(bytes)
699                });
700                let array = FixedSizeBinaryArray::new(
701                    ArrowDataType::FixedSizeBinary(size),
702                    values.into(),
703                    array.validity().cloned(),
704                );
705
706                fixed_size_binary::array_to_page(&array, options, type_, statistics)
707            }
708        },
709        ArrowDataType::Decimal(precision, _) => {
710            let precision = *precision;
711            let array = array
712                .as_any()
713                .downcast_ref::<PrimitiveArray<i128>>()
714                .unwrap();
715            if precision <= 9 {
716                let values = array
717                    .values()
718                    .iter()
719                    .map(|x| *x as i32)
720                    .collect::<Vec<_>>()
721                    .into();
722
723                let array = PrimitiveArray::<i32>::new(
724                    ArrowDataType::Int32,
725                    values,
726                    array.validity().cloned(),
727                );
728                return primitive::array_to_page_integer::<i32, i32>(
729                    &array, options, type_, encoding,
730                );
731            } else if precision <= 18 {
732                let values = array
733                    .values()
734                    .iter()
735                    .map(|x| *x as i64)
736                    .collect::<Vec<_>>()
737                    .into();
738
739                let array = PrimitiveArray::<i64>::new(
740                    ArrowDataType::Int64,
741                    values,
742                    array.validity().cloned(),
743                );
744                return primitive::array_to_page_integer::<i64, i64>(
745                    &array, options, type_, encoding,
746                );
747            } else {
748                let size = decimal_length_from_precision(precision);
749
750                let statistics = if options.has_statistics() {
751                    let stats = fixed_size_binary::build_statistics_decimal(
752                        array,
753                        type_.clone(),
754                        size,
755                        &options.statistics,
756                    );
757                    Some(stats)
758                } else {
759                    None
760                };
761
762                let mut values = Vec::<u8>::with_capacity(size * array.len());
763                array.values().iter().for_each(|x| {
764                    let bytes = &x.to_be_bytes()[16 - size..];
765                    values.extend_from_slice(bytes)
766                });
767                let array = FixedSizeBinaryArray::new(
768                    ArrowDataType::FixedSizeBinary(size),
769                    values.into(),
770                    array.validity().cloned(),
771                );
772                fixed_size_binary::array_to_page(&array, options, type_, statistics)
773            }
774        },
775        ArrowDataType::UInt128 => {
776            let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
777            let statistics = if options.has_statistics() {
778                let stats = fixed_size_binary::build_statistics_decimal(
779                    array,
780                    type_.clone(),
781                    16,
782                    &options.statistics,
783                );
784                Some(stats)
785            } else {
786                None
787            };
788            let array = FixedSizeBinaryArray::new(
789                ArrowDataType::FixedSizeBinary(16),
790                array.values().clone().try_transmute().unwrap(),
791                array.validity().cloned(),
792            );
793            fixed_size_binary::array_to_page(&array, options, type_, statistics)
794        },
795        ArrowDataType::Int128 => {
796            let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
797            let statistics = if options.has_statistics() {
798                let stats = fixed_size_binary::build_statistics_decimal(
799                    array,
800                    type_.clone(),
801                    16,
802                    &options.statistics,
803                );
804                Some(stats)
805            } else {
806                None
807            };
808            let array = FixedSizeBinaryArray::new(
809                ArrowDataType::FixedSizeBinary(16),
810                array.values().clone().try_transmute().unwrap(),
811                array.validity().cloned(),
812            );
813            fixed_size_binary::array_to_page(&array, options, type_, statistics)
814        },
815        ArrowDataType::Extension(ext) => {
816            let mut boxed = array.to_boxed();
817            assert!(matches!(boxed.dtype(), ArrowDataType::Extension(ext2) if ext2 == ext));
818            *boxed.dtype_mut() = ext.inner.clone();
819            return array_to_page_simple(boxed.as_ref(), type_, options, encoding);
820        },
821        other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
822    }
823    .map(Page::Data)
824}
825
826fn array_to_page_nested(
827    array: &dyn Array,
828    type_: ParquetPrimitiveType,
829    nested: &[Nested],
830    options: WriteOptions,
831    _encoding: Encoding,
832) -> PolarsResult<Page> {
833    if type_.field_info.repetition == Repetition::Required
834        && array.validity().is_some_and(|v| v.unset_bits() > 0)
835    {
836        polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
837    }
838
839    use ArrowDataType::*;
840    match array.dtype().to_storage() {
841        Null => {
842            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
843            primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
844        },
845        // Map empty struct to boolean array with same validity.
846        Struct(fs) if fs.is_empty() => {
847            let array = BooleanArray::new(
848                ArrowDataType::Boolean,
849                Bitmap::new_zeroed(array.len()),
850                array.validity().cloned(),
851            );
852            boolean::nested_array_to_page(&array, options, type_, nested)
853        },
854        Boolean => {
855            let array = array.as_any().downcast_ref().unwrap();
856            boolean::nested_array_to_page(array, options, type_, nested)
857        },
858        LargeUtf8 => {
859            let array =
860                polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
861            let array = array.as_any().downcast_ref().unwrap();
862            binary::nested_array_to_page::<i64>(array, options, type_, nested)
863        },
864        LargeBinary => {
865            let array = array.as_any().downcast_ref().unwrap();
866            binary::nested_array_to_page::<i64>(array, options, type_, nested)
867        },
868        BinaryView => {
869            let array = array.as_any().downcast_ref().unwrap();
870            binview::nested_array_to_page(array, options, type_, nested)
871        },
872        Utf8View => {
873            let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
874            let array = array.as_any().downcast_ref().unwrap();
875            binview::nested_array_to_page(array, options, type_, nested)
876        },
877        UInt8 => {
878            let array = array.as_any().downcast_ref().unwrap();
879            primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
880        },
881        UInt16 => {
882            let array = array.as_any().downcast_ref().unwrap();
883            primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
884        },
885        UInt32 => {
886            let array = array.as_any().downcast_ref().unwrap();
887            primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
888        },
889        UInt64 => {
890            let array = array.as_any().downcast_ref().unwrap();
891            primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
892        },
893        Int8 => {
894            let array = array.as_any().downcast_ref().unwrap();
895            primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
896        },
897        Int16 => {
898            let array = array.as_any().downcast_ref().unwrap();
899            primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
900        },
901        Int32 | Date32 | Time32(_) => {
902            let array = array.as_any().downcast_ref().unwrap();
903            primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
904        },
905        Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
906            let array = array.as_any().downcast_ref().unwrap();
907            primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
908        },
909        Float16 => {
910            let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();
911            let statistics = options
912                .has_statistics()
913                .then(|| build_statistics_float16(array, type_.clone(), &options.statistics));
914            let array = FixedSizeBinaryArray::new(
915                ArrowDataType::FixedSizeBinary(2),
916                array.values().clone().try_transmute().unwrap(),
917                array.validity().cloned(),
918            );
919            fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
920        },
921        Float32 => {
922            let array = array.as_any().downcast_ref().unwrap();
923            primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
924        },
925        Float64 => {
926            let array = array.as_any().downcast_ref().unwrap();
927            primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
928        },
929        Decimal(precision, _) => {
930            let precision = *precision;
931            let array = array
932                .as_any()
933                .downcast_ref::<PrimitiveArray<i128>>()
934                .unwrap();
935            if precision <= 9 {
936                let values = array
937                    .values()
938                    .iter()
939                    .map(|x| *x as i32)
940                    .collect::<Vec<_>>()
941                    .into();
942
943                let array = PrimitiveArray::<i32>::new(
944                    ArrowDataType::Int32,
945                    values,
946                    array.validity().cloned(),
947                );
948                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
949            } else if precision <= 18 {
950                let values = array
951                    .values()
952                    .iter()
953                    .map(|x| *x as i64)
954                    .collect::<Vec<_>>()
955                    .into();
956
957                let array = PrimitiveArray::<i64>::new(
958                    ArrowDataType::Int64,
959                    values,
960                    array.validity().cloned(),
961                );
962                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
963            } else {
964                let size = decimal_length_from_precision(precision);
965
966                let statistics = if options.has_statistics() {
967                    let stats = fixed_size_binary::build_statistics_decimal(
968                        array,
969                        type_.clone(),
970                        size,
971                        &options.statistics,
972                    );
973                    Some(stats)
974                } else {
975                    None
976                };
977
978                let mut values = Vec::<u8>::with_capacity(size * array.len());
979                array.values().iter().for_each(|x| {
980                    let bytes = &x.to_be_bytes()[16 - size..];
981                    values.extend_from_slice(bytes)
982                });
983                let array = FixedSizeBinaryArray::new(
984                    ArrowDataType::FixedSizeBinary(size),
985                    values.into(),
986                    array.validity().cloned(),
987                );
988                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
989            }
990        },
991        Decimal256(precision, _) => {
992            let precision = *precision;
993            let array = array
994                .as_any()
995                .downcast_ref::<PrimitiveArray<i256>>()
996                .unwrap();
997            if precision <= 9 {
998                let values = array
999                    .values()
1000                    .iter()
1001                    .map(|x| x.0.as_i32())
1002                    .collect::<Vec<_>>()
1003                    .into();
1004
1005                let array = PrimitiveArray::<i32>::new(
1006                    ArrowDataType::Int32,
1007                    values,
1008                    array.validity().cloned(),
1009                );
1010                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1011            } else if precision <= 18 {
1012                let values = array
1013                    .values()
1014                    .iter()
1015                    .map(|x| x.0.as_i64())
1016                    .collect::<Vec<_>>()
1017                    .into();
1018
1019                let array = PrimitiveArray::<i64>::new(
1020                    ArrowDataType::Int64,
1021                    values,
1022                    array.validity().cloned(),
1023                );
1024                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1025            } else if precision <= 38 {
1026                let size = decimal_length_from_precision(precision);
1027                let statistics = if options.has_statistics() {
1028                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1029                        array,
1030                        type_.clone(),
1031                        size,
1032                        &options.statistics,
1033                    );
1034                    Some(stats)
1035                } else {
1036                    None
1037                };
1038
1039                let mut values = Vec::<u8>::with_capacity(size * array.len());
1040                array.values().iter().for_each(|x| {
1041                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
1042                    values.extend_from_slice(bytes)
1043                });
1044                let array = FixedSizeBinaryArray::new(
1045                    ArrowDataType::FixedSizeBinary(size),
1046                    values.into(),
1047                    array.validity().cloned(),
1048                );
1049                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1050            } else {
1051                let size = 32;
1052                let array = array
1053                    .as_any()
1054                    .downcast_ref::<PrimitiveArray<i256>>()
1055                    .unwrap();
1056                let statistics = if options.has_statistics() {
1057                    let stats = fixed_size_binary::build_statistics_decimal256(
1058                        array,
1059                        type_.clone(),
1060                        size,
1061                        &options.statistics,
1062                    );
1063                    Some(stats)
1064                } else {
1065                    None
1066                };
1067                let mut values = Vec::<u8>::with_capacity(size * array.len());
1068                array.values().iter().for_each(|x| {
1069                    let bytes = &x.to_be_bytes();
1070                    values.extend_from_slice(bytes)
1071                });
1072                let array = FixedSizeBinaryArray::new(
1073                    ArrowDataType::FixedSizeBinary(size),
1074                    values.into(),
1075                    array.validity().cloned(),
1076                );
1077
1078                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1079            }
1080        },
1081        Int128 => {
1082            let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1083            // Can't write min/max statistics for signed 128-bit integer, see #25965.
1084            let mut no_mm_options = options;
1085            no_mm_options.statistics.min_value = false;
1086            no_mm_options.statistics.max_value = false;
1087            let statistics = if no_mm_options.has_statistics() {
1088                let stats = fixed_size_binary::build_statistics_decimal(
1089                    array,
1090                    type_.clone(),
1091                    16,
1092                    &no_mm_options.statistics,
1093                );
1094                Some(stats)
1095            } else {
1096                None
1097            };
1098            let array = FixedSizeBinaryArray::new(
1099                ArrowDataType::FixedSizeBinary(16),
1100                array.values().clone().try_transmute().unwrap(),
1101                array.validity().cloned(),
1102            );
1103            fixed_size_binary::nested_array_to_page(
1104                &array,
1105                no_mm_options,
1106                type_,
1107                nested,
1108                statistics,
1109            )
1110        },
1111        UInt128 => {
1112            let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
1113            let statistics = if options.has_statistics() {
1114                let stats = fixed_size_binary::build_statistics_decimal(
1115                    array,
1116                    type_.clone(),
1117                    16,
1118                    &options.statistics,
1119                );
1120                Some(stats)
1121            } else {
1122                None
1123            };
1124            let array = FixedSizeBinaryArray::new(
1125                ArrowDataType::FixedSizeBinary(16),
1126                array.values().clone().try_transmute().unwrap(),
1127                array.validity().cloned(),
1128            );
1129            fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1130        },
1131        other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1132    }
1133    .map(Page::Data)
1134}
1135
1136fn get_encodings_recursive(dtype: &ArrowDataType, encodings: &mut Vec<Encoding>) {
1137    use arrow::datatypes::PhysicalType::*;
1138    match dtype.to_physical_type() {
1139        Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1140        | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => {
1141            encodings.push(get_primitive_dtype_encoding(dtype))
1142        },
1143        List | FixedSizeList | LargeList => {
1144            let a = dtype.to_storage();
1145            if let ArrowDataType::List(inner) = a {
1146                get_encodings_recursive(&inner.dtype, encodings)
1147            } else if let ArrowDataType::LargeList(inner) = a {
1148                get_encodings_recursive(&inner.dtype, encodings)
1149            } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1150                get_encodings_recursive(&inner.dtype, encodings)
1151            } else {
1152                unreachable!()
1153            }
1154        },
1155        Struct => {
1156            if let ArrowDataType::Struct(fields) = dtype.to_storage() {
1157                if fields.is_empty() {
1158                    // 0-field struct writes as a boolean column representing outer validity.
1159                    encodings.push(Encoding::Rle)
1160                }
1161
1162                for field in fields {
1163                    get_encodings_recursive(&field.dtype, encodings)
1164                }
1165            } else {
1166                unreachable!()
1167            }
1168        },
1169        Map => {
1170            if let ArrowDataType::Map(field, _) = dtype.to_storage() {
1171                if let ArrowDataType::Struct(fields) = field.dtype.to_storage() {
1172                    for field in fields {
1173                        get_encodings_recursive(&field.dtype, encodings)
1174                    }
1175                } else {
1176                    unreachable!()
1177                }
1178            } else {
1179                unreachable!()
1180            }
1181        },
1182        Union => todo!(),
1183    }
1184}
1185
1186/// Transverses the `dtype` up to its (parquet) columns and returns a vector of
1187/// items based on `map`.
1188///
1189/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
1190pub fn get_dtype_encoding(dtype: &ArrowDataType) -> Vec<Encoding> {
1191    let mut encodings = vec![];
1192    get_encodings_recursive(dtype, &mut encodings);
1193    encodings
1194}
1195
1196fn get_primitive_dtype_encoding(dtype: &ArrowDataType) -> Encoding {
1197    match dtype.to_physical_type() {
1198        PhysicalType::Dictionary(_)
1199        | PhysicalType::LargeBinary
1200        | PhysicalType::LargeUtf8
1201        | PhysicalType::Utf8View
1202        | PhysicalType::BinaryView
1203        | PhysicalType::Primitive(_) => Encoding::RleDictionary,
1204        // remaining is plain
1205        _ => Encoding::Plain,
1206    }
1207}