polars_parquet/arrow/write/
mod.rs

1//! APIs to write to Parquet format.
2//!
3//! # Arrow/Parquet Interoperability
4//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet
6//! representation. These include but are not limited to:
7//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`
8//! * `ArrowDataType::Int64`
9//! * `ArrowDataType::Duration`
10//! * `ArrowDataType::Date64`
11//! * `ArrowDataType::Time32(TimeUnit::Second)`
12//!
13//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::datatypes::*;
30use arrow::types::{NativeType, days_ms, i256};
31pub use nested::{num_values, write_rep_and_def};
32pub use pages::{to_leaves, to_nested, to_parquet_leaves};
33use polars_utils::pl_str::PlSmallStr;
34pub use utils::write_def_levels;
35
36pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
37pub use crate::parquet::encoding::Encoding;
38pub use crate::parquet::metadata::{
39    Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
40};
41pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
42use crate::parquet::schema::Repetition;
43use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
44pub use crate::parquet::schema::types::{
45    FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
46};
47pub use crate::parquet::write::{
48    Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
49    write_metadata_sidecar,
50};
51pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
52
53/// The statistics to write
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
56#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
57pub struct StatisticsOptions {
58    pub min_value: bool,
59    pub max_value: bool,
60    pub distinct_count: bool,
61    pub null_count: bool,
62}
63
64impl Default for StatisticsOptions {
65    fn default() -> Self {
66        Self {
67            min_value: true,
68            max_value: true,
69            distinct_count: false,
70            null_count: true,
71        }
72    }
73}
74
75/// Options to encode an array
76#[derive(Clone, Copy)]
77pub enum EncodeNullability {
78    Required,
79    Optional,
80}
81
82/// Currently supported options to write to parquet
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct WriteOptions {
85    /// Whether to write statistics
86    pub statistics: StatisticsOptions,
87    /// The page and file version to use
88    pub version: Version,
89    /// The compression to apply to every page
90    pub compression: CompressionOptions,
91    /// The size to flush a page, defaults to 1024 * 1024 if None
92    pub data_page_size: Option<usize>,
93}
94
95#[derive(Clone)]
96pub struct ColumnWriteOptions {
97    pub field_id: Option<i32>,
98    pub metadata: Vec<KeyValue>,
99    pub required: Option<bool>,
100    pub children: ChildWriteOptions,
101}
102
103#[derive(Clone)]
104pub enum ChildWriteOptions {
105    Leaf(FieldWriteOptions),
106    ListLike(Box<ListLikeFieldWriteOptions>),
107    Struct(Box<StructFieldWriteOptions>),
108}
109
110impl ColumnWriteOptions {
111    pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {
112        match &self.children {
113            ChildWriteOptions::Leaf(o) => out.push(o),
114            ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),
115            ChildWriteOptions::Struct(o) => {
116                for o in &o.children {
117                    o.to_leaves(out);
118                }
119            },
120        }
121    }
122}
123
124#[derive(Clone)]
125pub struct FieldWriteOptions {
126    pub encoding: Encoding,
127}
128
129impl ColumnWriteOptions {
130    pub fn default_with(children: ChildWriteOptions) -> Self {
131        Self {
132            field_id: None,
133            metadata: Vec::new(),
134            required: None,
135            children,
136        }
137    }
138}
139
140impl FieldWriteOptions {
141    pub fn default_with_encoding(encoding: Encoding) -> Self {
142        Self { encoding }
143    }
144
145    pub fn into_default_column_write_options(self) -> ColumnWriteOptions {
146        ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))
147    }
148}
149
150#[derive(Clone)]
151pub struct ListLikeFieldWriteOptions {
152    pub child: ColumnWriteOptions,
153}
154
155#[derive(Clone)]
156pub struct StructFieldWriteOptions {
157    pub children: Vec<ColumnWriteOptions>,
158}
159
160use arrow::compute::aggregate::estimated_bytes_size;
161use arrow::match_integer_type;
162pub use file::FileWriter;
163pub use pages::{Nested, array_to_columns, arrays_to_columns};
164use polars_error::{PolarsResult, polars_bail};
165pub use row_group::{RowGroupIterator, row_group_iter};
166pub use schema::{schema_to_metadata_key, to_parquet_type};
167
168use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
169use crate::write::dictionary::encode_as_dictionary_optional;
170
171impl StatisticsOptions {
172    pub fn empty() -> Self {
173        Self {
174            min_value: false,
175            max_value: false,
176            distinct_count: false,
177            null_count: false,
178        }
179    }
180
181    pub fn full() -> Self {
182        Self {
183            min_value: true,
184            max_value: true,
185            distinct_count: true,
186            null_count: true,
187        }
188    }
189
190    pub fn is_empty(&self) -> bool {
191        !(self.min_value || self.max_value || self.distinct_count || self.null_count)
192    }
193
194    pub fn is_full(&self) -> bool {
195        self.min_value && self.max_value && self.distinct_count && self.null_count
196    }
197}
198
199impl WriteOptions {
200    pub fn has_statistics(&self) -> bool {
201        !self.statistics.is_empty()
202    }
203}
204
205impl EncodeNullability {
206    const fn new(is_optional: bool) -> Self {
207        if is_optional {
208            Self::Optional
209        } else {
210            Self::Required
211        }
212    }
213
214    fn is_optional(self) -> bool {
215        matches!(self, Self::Optional)
216    }
217}
218
219/// returns offset and length to slice the leaf values
220pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
221    // find the deepest recursive dremel structure as that one determines how many values we must
222    // take
223    let mut out = (0, 0);
224    for nested in nested.iter().rev() {
225        match nested {
226            Nested::LargeList(l_nested) => {
227                let start = *l_nested.offsets.first();
228                let end = *l_nested.offsets.last();
229                return (start as usize, (end - start) as usize);
230            },
231            Nested::List(l_nested) => {
232                let start = *l_nested.offsets.first();
233                let end = *l_nested.offsets.last();
234                return (start as usize, (end - start) as usize);
235            },
236            Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
237            Nested::Primitive(nested) => out = (0, nested.length),
238            Nested::Struct(_) => {},
239        }
240    }
241    out
242}
243
244fn decimal_length_from_precision(precision: usize) -> usize {
245    // digits = floor(log_10(2^(8*n - 1) - 1))
246    // ceil(digits) = log10(2^(8*n - 1) - 1)
247    // 10^ceil(digits) = 2^(8*n - 1) - 1
248    // 10^ceil(digits) + 1 = 2^(8*n - 1)
249    // log2(10^ceil(digits) + 1) = (8*n - 1)
250    // log2(10^ceil(digits) + 1) + 1 = 8*n
251    // (log2(10^ceil(a) + 1) + 1) / 8 = n
252    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
253}
254
255/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].
256pub fn to_parquet_schema(
257    schema: &ArrowSchema,
258    column_options: &[ColumnWriteOptions],
259) -> PolarsResult<SchemaDescriptor> {
260    let parquet_types = schema
261        .iter_values()
262        .zip(column_options)
263        .map(|(field, options)| to_parquet_type(field, options))
264        .collect::<PolarsResult<Vec<_>>>()?;
265    Ok(SchemaDescriptor::new(
266        PlSmallStr::from_static("root"),
267        parquet_types,
268    ))
269}
270
271/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
272pub fn slice_parquet_array(
273    primitive_array: &mut dyn Array,
274    nested: &mut [Nested],
275    mut current_offset: usize,
276    mut current_length: usize,
277) {
278    for nested in nested.iter_mut() {
279        match nested {
280            Nested::LargeList(l_nested) => {
281                l_nested.offsets.slice(current_offset, current_length + 1);
282                if let Some(validity) = l_nested.validity.as_mut() {
283                    validity.slice(current_offset, current_length)
284                };
285
286                // Update the offset/ length so that the Primitive is sliced properly.
287                current_length = l_nested.offsets.range() as usize;
288                current_offset = *l_nested.offsets.first() as usize;
289            },
290            Nested::List(l_nested) => {
291                l_nested.offsets.slice(current_offset, current_length + 1);
292                if let Some(validity) = l_nested.validity.as_mut() {
293                    validity.slice(current_offset, current_length)
294                };
295
296                // Update the offset/ length so that the Primitive is sliced properly.
297                current_length = l_nested.offsets.range() as usize;
298                current_offset = *l_nested.offsets.first() as usize;
299            },
300            Nested::Struct(StructNested {
301                validity, length, ..
302            }) => {
303                *length = current_length;
304                if let Some(validity) = validity.as_mut() {
305                    validity.slice(current_offset, current_length)
306                };
307            },
308            Nested::Primitive(PrimitiveNested {
309                validity, length, ..
310            }) => {
311                *length = current_length;
312                if let Some(validity) = validity.as_mut() {
313                    validity.slice(current_offset, current_length)
314                };
315                primitive_array.slice(current_offset, current_length);
316            },
317            Nested::FixedSizeList(FixedSizeListNested {
318                validity,
319                length,
320                width,
321                ..
322            }) => {
323                if let Some(validity) = validity.as_mut() {
324                    validity.slice(current_offset, current_length)
325                };
326                *length = current_length;
327                // Update the offset/ length so that the Primitive is sliced properly.
328                current_length *= *width;
329                current_offset *= *width;
330            },
331        }
332    }
333}
334
335/// Get the length of [`Array`] that should be sliced.
336pub fn get_max_length(nested: &[Nested]) -> usize {
337    let mut length = 0;
338    for nested in nested.iter() {
339        match nested {
340            Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
341            Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
342            Nested::FixedSizeList(nested) => length += nested.length * nested.width,
343            _ => {},
344        }
345    }
346    length
347}
348
349/// Returns an iterator of [`Page`].
350pub fn array_to_pages(
351    primitive_array: &dyn Array,
352    type_: ParquetPrimitiveType,
353    nested: &[Nested],
354    options: WriteOptions,
355    field_options: &FieldWriteOptions,
356) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
357    let mut encoding = field_options.encoding;
358    if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
359        return match_integer_type!(key_type, |$T| {
360            dictionary::array_to_pages::<$T>(
361                primitive_array.as_any().downcast_ref().unwrap(),
362                type_,
363                &nested,
364                options,
365                encoding,
366            )
367        });
368    };
369    if let Encoding::RleDictionary = encoding {
370        // Only take this path for primitive columns
371        if matches!(nested.first(), Some(Nested::Primitive(_))) {
372            if let Some(result) =
373                encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
374            {
375                return result;
376            }
377        }
378
379        // We didn't succeed, fallback to plain
380        encoding = Encoding::Plain;
381    }
382
383    let nested = nested.to_vec();
384
385    let number_of_rows = nested[0].len();
386
387    // note: this is not correct if the array is sliced - the estimation should happen on the
388    // primitive after sliced for parquet
389    let byte_size = estimated_bytes_size(primitive_array);
390
391    const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
392    let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
393    let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
394    let bytes_per_row = if number_of_rows == 0 {
395        0
396    } else {
397        ((byte_size as f64) / (number_of_rows as f64)) as usize
398    };
399    let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
400
401    let row_iter = (0..number_of_rows)
402        .step_by(rows_per_page)
403        .map(move |offset| {
404            let length = if offset + rows_per_page > number_of_rows {
405                number_of_rows - offset
406            } else {
407                rows_per_page
408            };
409            (offset, length)
410        });
411
412    let primitive_array = primitive_array.to_boxed();
413
414    let pages = row_iter.map(move |(offset, length)| {
415        let mut right_array = primitive_array.clone();
416        let mut right_nested = nested.clone();
417        slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
418
419        array_to_page(
420            right_array.as_ref(),
421            type_.clone(),
422            &right_nested,
423            options,
424            encoding,
425        )
426    });
427    Ok(DynIter::new(pages))
428}
429
430/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
431pub fn array_to_page(
432    array: &dyn Array,
433    type_: ParquetPrimitiveType,
434    nested: &[Nested],
435    options: WriteOptions,
436    encoding: Encoding,
437) -> PolarsResult<Page> {
438    if nested.len() == 1 {
439        // special case where validity == def levels
440        return array_to_page_simple(array, type_, options, encoding);
441    }
442    array_to_page_nested(array, type_, nested, options, encoding)
443}
444
445/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
446pub fn array_to_page_simple(
447    array: &dyn Array,
448    type_: ParquetPrimitiveType,
449    options: WriteOptions,
450    encoding: Encoding,
451) -> PolarsResult<Page> {
452    let dtype = array.dtype();
453
454    if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
455        polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
456    }
457
458    match dtype.to_logical_type() {
459        ArrowDataType::Boolean => boolean::array_to_page(
460            array.as_any().downcast_ref().unwrap(),
461            options,
462            type_,
463            encoding,
464        ),
465        // casts below MUST match the casts done at the metadata (field -> parquet type).
466        ArrowDataType::UInt8 => {
467            return primitive::array_to_page_integer::<u8, i32>(
468                array.as_any().downcast_ref().unwrap(),
469                options,
470                type_,
471                encoding,
472            );
473        },
474        ArrowDataType::UInt16 => {
475            return primitive::array_to_page_integer::<u16, i32>(
476                array.as_any().downcast_ref().unwrap(),
477                options,
478                type_,
479                encoding,
480            );
481        },
482        ArrowDataType::UInt32 => {
483            return primitive::array_to_page_integer::<u32, i32>(
484                array.as_any().downcast_ref().unwrap(),
485                options,
486                type_,
487                encoding,
488            );
489        },
490        ArrowDataType::UInt64 => {
491            return primitive::array_to_page_integer::<u64, i64>(
492                array.as_any().downcast_ref().unwrap(),
493                options,
494                type_,
495                encoding,
496            );
497        },
498        ArrowDataType::Int8 => {
499            return primitive::array_to_page_integer::<i8, i32>(
500                array.as_any().downcast_ref().unwrap(),
501                options,
502                type_,
503                encoding,
504            );
505        },
506        ArrowDataType::Int16 => {
507            return primitive::array_to_page_integer::<i16, i32>(
508                array.as_any().downcast_ref().unwrap(),
509                options,
510                type_,
511                encoding,
512            );
513        },
514        ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
515            return primitive::array_to_page_integer::<i32, i32>(
516                array.as_any().downcast_ref().unwrap(),
517                options,
518                type_,
519                encoding,
520            );
521        },
522        ArrowDataType::Int64
523        | ArrowDataType::Date64
524        | ArrowDataType::Time64(_)
525        | ArrowDataType::Timestamp(_, _)
526        | ArrowDataType::Duration(_) => {
527            return primitive::array_to_page_integer::<i64, i64>(
528                array.as_any().downcast_ref().unwrap(),
529                options,
530                type_,
531                encoding,
532            );
533        },
534        ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
535            array.as_any().downcast_ref().unwrap(),
536            options,
537            type_,
538        ),
539        ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
540            array.as_any().downcast_ref().unwrap(),
541            options,
542            type_,
543        ),
544        ArrowDataType::LargeUtf8 => {
545            let array =
546                polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
547                    .unwrap();
548            return binary::array_to_page::<i64>(
549                array.as_any().downcast_ref().unwrap(),
550                options,
551                type_,
552                encoding,
553            );
554        },
555        ArrowDataType::LargeBinary => {
556            return binary::array_to_page::<i64>(
557                array.as_any().downcast_ref().unwrap(),
558                options,
559                type_,
560                encoding,
561            );
562        },
563        ArrowDataType::BinaryView => {
564            return binview::array_to_page(
565                array.as_any().downcast_ref().unwrap(),
566                options,
567                type_,
568                encoding,
569            );
570        },
571        ArrowDataType::Utf8View => {
572            let array =
573                polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
574                    .unwrap();
575            return binview::array_to_page(
576                array.as_any().downcast_ref().unwrap(),
577                options,
578                type_,
579                encoding,
580            );
581        },
582        ArrowDataType::Null => {
583            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
584            primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
585        },
586        ArrowDataType::Interval(IntervalUnit::YearMonth) => {
587            let array = array
588                .as_any()
589                .downcast_ref::<PrimitiveArray<i32>>()
590                .unwrap();
591            let mut values = Vec::<u8>::with_capacity(12 * array.len());
592            array.values().iter().for_each(|x| {
593                let bytes = &x.to_le_bytes();
594                values.extend_from_slice(bytes);
595                values.extend_from_slice(&[0; 8]);
596            });
597            let array = FixedSizeBinaryArray::new(
598                ArrowDataType::FixedSizeBinary(12),
599                values.into(),
600                array.validity().cloned(),
601            );
602            let statistics = if options.has_statistics() {
603                Some(fixed_size_binary::build_statistics(
604                    &array,
605                    type_.clone(),
606                    &options.statistics,
607                ))
608            } else {
609                None
610            };
611            fixed_size_binary::array_to_page(&array, options, type_, statistics)
612        },
613        ArrowDataType::Interval(IntervalUnit::DayTime) => {
614            let array = array
615                .as_any()
616                .downcast_ref::<PrimitiveArray<days_ms>>()
617                .unwrap();
618            let mut values = Vec::<u8>::with_capacity(12 * array.len());
619            array.values().iter().for_each(|x| {
620                let bytes = &x.to_le_bytes();
621                values.extend_from_slice(&[0; 4]); // months
622                values.extend_from_slice(bytes); // days and seconds
623            });
624            let array = FixedSizeBinaryArray::new(
625                ArrowDataType::FixedSizeBinary(12),
626                values.into(),
627                array.validity().cloned(),
628            );
629            let statistics = if options.has_statistics() {
630                Some(fixed_size_binary::build_statistics(
631                    &array,
632                    type_.clone(),
633                    &options.statistics,
634                ))
635            } else {
636                None
637            };
638            fixed_size_binary::array_to_page(&array, options, type_, statistics)
639        },
640        ArrowDataType::FixedSizeBinary(_) => {
641            let array = array.as_any().downcast_ref().unwrap();
642            let statistics = if options.has_statistics() {
643                Some(fixed_size_binary::build_statistics(
644                    array,
645                    type_.clone(),
646                    &options.statistics,
647                ))
648            } else {
649                None
650            };
651
652            fixed_size_binary::array_to_page(array, options, type_, statistics)
653        },
654        ArrowDataType::Decimal256(precision, _) => {
655            let precision = *precision;
656            let array = array
657                .as_any()
658                .downcast_ref::<PrimitiveArray<i256>>()
659                .unwrap();
660            if precision <= 9 {
661                let values = array
662                    .values()
663                    .iter()
664                    .map(|x| x.0.as_i32())
665                    .collect::<Vec<_>>()
666                    .into();
667
668                let array = PrimitiveArray::<i32>::new(
669                    ArrowDataType::Int32,
670                    values,
671                    array.validity().cloned(),
672                );
673                return primitive::array_to_page_integer::<i32, i32>(
674                    &array, options, type_, encoding,
675                );
676            } else if precision <= 18 {
677                let values = array
678                    .values()
679                    .iter()
680                    .map(|x| x.0.as_i64())
681                    .collect::<Vec<_>>()
682                    .into();
683
684                let array = PrimitiveArray::<i64>::new(
685                    ArrowDataType::Int64,
686                    values,
687                    array.validity().cloned(),
688                );
689                return primitive::array_to_page_integer::<i64, i64>(
690                    &array, options, type_, encoding,
691                );
692            } else if precision <= 38 {
693                let size = decimal_length_from_precision(precision);
694                let statistics = if options.has_statistics() {
695                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
696                        array,
697                        type_.clone(),
698                        size,
699                        &options.statistics,
700                    );
701                    Some(stats)
702                } else {
703                    None
704                };
705
706                let mut values = Vec::<u8>::with_capacity(size * array.len());
707                array.values().iter().for_each(|x| {
708                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
709                    values.extend_from_slice(bytes)
710                });
711                let array = FixedSizeBinaryArray::new(
712                    ArrowDataType::FixedSizeBinary(size),
713                    values.into(),
714                    array.validity().cloned(),
715                );
716                fixed_size_binary::array_to_page(&array, options, type_, statistics)
717            } else {
718                let size = 32;
719                let array = array
720                    .as_any()
721                    .downcast_ref::<PrimitiveArray<i256>>()
722                    .unwrap();
723                let statistics = if options.has_statistics() {
724                    let stats = fixed_size_binary::build_statistics_decimal256(
725                        array,
726                        type_.clone(),
727                        size,
728                        &options.statistics,
729                    );
730                    Some(stats)
731                } else {
732                    None
733                };
734                let mut values = Vec::<u8>::with_capacity(size * array.len());
735                array.values().iter().for_each(|x| {
736                    let bytes = &x.to_be_bytes();
737                    values.extend_from_slice(bytes)
738                });
739                let array = FixedSizeBinaryArray::new(
740                    ArrowDataType::FixedSizeBinary(size),
741                    values.into(),
742                    array.validity().cloned(),
743                );
744
745                fixed_size_binary::array_to_page(&array, options, type_, statistics)
746            }
747        },
748        ArrowDataType::Decimal(precision, _) => {
749            let precision = *precision;
750            let array = array
751                .as_any()
752                .downcast_ref::<PrimitiveArray<i128>>()
753                .unwrap();
754            if precision <= 9 {
755                let values = array
756                    .values()
757                    .iter()
758                    .map(|x| *x as i32)
759                    .collect::<Vec<_>>()
760                    .into();
761
762                let array = PrimitiveArray::<i32>::new(
763                    ArrowDataType::Int32,
764                    values,
765                    array.validity().cloned(),
766                );
767                return primitive::array_to_page_integer::<i32, i32>(
768                    &array, options, type_, encoding,
769                );
770            } else if precision <= 18 {
771                let values = array
772                    .values()
773                    .iter()
774                    .map(|x| *x as i64)
775                    .collect::<Vec<_>>()
776                    .into();
777
778                let array = PrimitiveArray::<i64>::new(
779                    ArrowDataType::Int64,
780                    values,
781                    array.validity().cloned(),
782                );
783                return primitive::array_to_page_integer::<i64, i64>(
784                    &array, options, type_, encoding,
785                );
786            } else {
787                let size = decimal_length_from_precision(precision);
788
789                let statistics = if options.has_statistics() {
790                    let stats = fixed_size_binary::build_statistics_decimal(
791                        array,
792                        type_.clone(),
793                        size,
794                        &options.statistics,
795                    );
796                    Some(stats)
797                } else {
798                    None
799                };
800
801                let mut values = Vec::<u8>::with_capacity(size * array.len());
802                array.values().iter().for_each(|x| {
803                    let bytes = &x.to_be_bytes()[16 - size..];
804                    values.extend_from_slice(bytes)
805                });
806                let array = FixedSizeBinaryArray::new(
807                    ArrowDataType::FixedSizeBinary(size),
808                    values.into(),
809                    array.validity().cloned(),
810                );
811                fixed_size_binary::array_to_page(&array, options, type_, statistics)
812            }
813        },
814        ArrowDataType::Int128 => {
815            let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
816            let statistics = if options.has_statistics() {
817                let stats = fixed_size_binary::build_statistics_decimal(
818                    array,
819                    type_.clone(),
820                    16,
821                    &options.statistics,
822                );
823                Some(stats)
824            } else {
825                None
826            };
827            let array = FixedSizeBinaryArray::new(
828                ArrowDataType::FixedSizeBinary(16),
829                array.values().clone().try_transmute().unwrap(),
830                array.validity().cloned(),
831            );
832            fixed_size_binary::array_to_page(&array, options, type_, statistics)
833        },
834        other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
835    }
836    .map(Page::Data)
837}
838
839fn array_to_page_nested(
840    array: &dyn Array,
841    type_: ParquetPrimitiveType,
842    nested: &[Nested],
843    options: WriteOptions,
844    _encoding: Encoding,
845) -> PolarsResult<Page> {
846    if type_.field_info.repetition == Repetition::Required
847        && array.validity().is_some_and(|v| v.unset_bits() > 0)
848    {
849        polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
850    }
851
852    use ArrowDataType::*;
853    match array.dtype().to_logical_type() {
854        Null => {
855            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
856            primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
857        },
858        Boolean => {
859            let array = array.as_any().downcast_ref().unwrap();
860            boolean::nested_array_to_page(array, options, type_, nested)
861        },
862        LargeUtf8 => {
863            let array =
864                polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
865            let array = array.as_any().downcast_ref().unwrap();
866            binary::nested_array_to_page::<i64>(array, options, type_, nested)
867        },
868        LargeBinary => {
869            let array = array.as_any().downcast_ref().unwrap();
870            binary::nested_array_to_page::<i64>(array, options, type_, nested)
871        },
872        BinaryView => {
873            let array = array.as_any().downcast_ref().unwrap();
874            binview::nested_array_to_page(array, options, type_, nested)
875        },
876        Utf8View => {
877            let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
878            let array = array.as_any().downcast_ref().unwrap();
879            binview::nested_array_to_page(array, options, type_, nested)
880        },
881        UInt8 => {
882            let array = array.as_any().downcast_ref().unwrap();
883            primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
884        },
885        UInt16 => {
886            let array = array.as_any().downcast_ref().unwrap();
887            primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
888        },
889        UInt32 => {
890            let array = array.as_any().downcast_ref().unwrap();
891            primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
892        },
893        UInt64 => {
894            let array = array.as_any().downcast_ref().unwrap();
895            primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
896        },
897        Int8 => {
898            let array = array.as_any().downcast_ref().unwrap();
899            primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
900        },
901        Int16 => {
902            let array = array.as_any().downcast_ref().unwrap();
903            primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
904        },
905        Int32 | Date32 | Time32(_) => {
906            let array = array.as_any().downcast_ref().unwrap();
907            primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
908        },
909        Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
910            let array = array.as_any().downcast_ref().unwrap();
911            primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
912        },
913        Float32 => {
914            let array = array.as_any().downcast_ref().unwrap();
915            primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
916        },
917        Float64 => {
918            let array = array.as_any().downcast_ref().unwrap();
919            primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
920        },
921        Decimal(precision, _) => {
922            let precision = *precision;
923            let array = array
924                .as_any()
925                .downcast_ref::<PrimitiveArray<i128>>()
926                .unwrap();
927            if precision <= 9 {
928                let values = array
929                    .values()
930                    .iter()
931                    .map(|x| *x as i32)
932                    .collect::<Vec<_>>()
933                    .into();
934
935                let array = PrimitiveArray::<i32>::new(
936                    ArrowDataType::Int32,
937                    values,
938                    array.validity().cloned(),
939                );
940                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
941            } else if precision <= 18 {
942                let values = array
943                    .values()
944                    .iter()
945                    .map(|x| *x as i64)
946                    .collect::<Vec<_>>()
947                    .into();
948
949                let array = PrimitiveArray::<i64>::new(
950                    ArrowDataType::Int64,
951                    values,
952                    array.validity().cloned(),
953                );
954                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
955            } else {
956                let size = decimal_length_from_precision(precision);
957
958                let statistics = if options.has_statistics() {
959                    let stats = fixed_size_binary::build_statistics_decimal(
960                        array,
961                        type_.clone(),
962                        size,
963                        &options.statistics,
964                    );
965                    Some(stats)
966                } else {
967                    None
968                };
969
970                let mut values = Vec::<u8>::with_capacity(size * array.len());
971                array.values().iter().for_each(|x| {
972                    let bytes = &x.to_be_bytes()[16 - size..];
973                    values.extend_from_slice(bytes)
974                });
975                let array = FixedSizeBinaryArray::new(
976                    ArrowDataType::FixedSizeBinary(size),
977                    values.into(),
978                    array.validity().cloned(),
979                );
980                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
981            }
982        },
983        Decimal256(precision, _) => {
984            let precision = *precision;
985            let array = array
986                .as_any()
987                .downcast_ref::<PrimitiveArray<i256>>()
988                .unwrap();
989            if precision <= 9 {
990                let values = array
991                    .values()
992                    .iter()
993                    .map(|x| x.0.as_i32())
994                    .collect::<Vec<_>>()
995                    .into();
996
997                let array = PrimitiveArray::<i32>::new(
998                    ArrowDataType::Int32,
999                    values,
1000                    array.validity().cloned(),
1001                );
1002                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1003            } else if precision <= 18 {
1004                let values = array
1005                    .values()
1006                    .iter()
1007                    .map(|x| x.0.as_i64())
1008                    .collect::<Vec<_>>()
1009                    .into();
1010
1011                let array = PrimitiveArray::<i64>::new(
1012                    ArrowDataType::Int64,
1013                    values,
1014                    array.validity().cloned(),
1015                );
1016                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1017            } else if precision <= 38 {
1018                let size = decimal_length_from_precision(precision);
1019                let statistics = if options.has_statistics() {
1020                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1021                        array,
1022                        type_.clone(),
1023                        size,
1024                        &options.statistics,
1025                    );
1026                    Some(stats)
1027                } else {
1028                    None
1029                };
1030
1031                let mut values = Vec::<u8>::with_capacity(size * array.len());
1032                array.values().iter().for_each(|x| {
1033                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
1034                    values.extend_from_slice(bytes)
1035                });
1036                let array = FixedSizeBinaryArray::new(
1037                    ArrowDataType::FixedSizeBinary(size),
1038                    values.into(),
1039                    array.validity().cloned(),
1040                );
1041                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1042            } else {
1043                let size = 32;
1044                let array = array
1045                    .as_any()
1046                    .downcast_ref::<PrimitiveArray<i256>>()
1047                    .unwrap();
1048                let statistics = if options.has_statistics() {
1049                    let stats = fixed_size_binary::build_statistics_decimal256(
1050                        array,
1051                        type_.clone(),
1052                        size,
1053                        &options.statistics,
1054                    );
1055                    Some(stats)
1056                } else {
1057                    None
1058                };
1059                let mut values = Vec::<u8>::with_capacity(size * array.len());
1060                array.values().iter().for_each(|x| {
1061                    let bytes = &x.to_be_bytes();
1062                    values.extend_from_slice(bytes)
1063                });
1064                let array = FixedSizeBinaryArray::new(
1065                    ArrowDataType::FixedSizeBinary(size),
1066                    values.into(),
1067                    array.validity().cloned(),
1068                );
1069
1070                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1071            }
1072        },
1073        Int128 => {
1074            let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1075            let statistics = if options.has_statistics() {
1076                let stats = fixed_size_binary::build_statistics_decimal(
1077                    array,
1078                    type_.clone(),
1079                    16,
1080                    &options.statistics,
1081                );
1082                Some(stats)
1083            } else {
1084                None
1085            };
1086            let array = FixedSizeBinaryArray::new(
1087                ArrowDataType::FixedSizeBinary(16),
1088                array.values().clone().try_transmute().unwrap(),
1089                array.validity().cloned(),
1090            );
1091            fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1092        },
1093        other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1094    }
1095    .map(Page::Data)
1096}
1097
1098fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
1099    dtype: &ArrowDataType,
1100    map: F,
1101    encodings: &mut Vec<T>,
1102) {
1103    use arrow::datatypes::PhysicalType::*;
1104    match dtype.to_physical_type() {
1105        Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1106        | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
1107        List | FixedSizeList | LargeList => {
1108            let a = dtype.to_logical_type();
1109            if let ArrowDataType::List(inner) = a {
1110                transverse_recursive(&inner.dtype, map, encodings)
1111            } else if let ArrowDataType::LargeList(inner) = a {
1112                transverse_recursive(&inner.dtype, map, encodings)
1113            } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1114                transverse_recursive(&inner.dtype, map, encodings)
1115            } else {
1116                unreachable!()
1117            }
1118        },
1119        Struct => {
1120            if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
1121                for field in fields {
1122                    transverse_recursive(&field.dtype, map.clone(), encodings)
1123                }
1124            } else {
1125                unreachable!()
1126            }
1127        },
1128        Map => {
1129            if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1130                if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1131                    for field in fields {
1132                        transverse_recursive(&field.dtype, map.clone(), encodings)
1133                    }
1134                } else {
1135                    unreachable!()
1136                }
1137            } else {
1138                unreachable!()
1139            }
1140        },
1141        Union => todo!(),
1142    }
1143}
1144
1145/// Transverses the `dtype` up to its (parquet) columns and returns a vector of
1146/// items based on `map`.
1147///
1148/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
1149pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1150    let mut encodings = vec![];
1151    transverse_recursive(dtype, map, &mut encodings);
1152    encodings
1153}