polars_parquet/arrow/write/
mod.rs

1//! APIs to write to Parquet format.
2//!
3//! # Arrow/Parquet Interoperability
4//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet
6//! representation. These include but are not limited to:
7//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`
8//! * `ArrowDataType::Int64`
9//! * `ArrowDataType::Duration`
10//! * `ArrowDataType::Date64`
11//! * `ArrowDataType::Time32(TimeUnit::Second)`
12//!
13//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::bitmap::Bitmap;
30use arrow::datatypes::*;
31use arrow::types::{NativeType, days_ms, i256};
32pub use nested::{num_values, write_rep_and_def};
33pub use pages::{to_leaves, to_nested, to_parquet_leaves};
34use polars_utils::pl_str::PlSmallStr;
35pub use utils::write_def_levels;
36
37pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
38pub use crate::parquet::encoding::Encoding;
39pub use crate::parquet::metadata::{
40    Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
41};
42pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
43use crate::parquet::schema::Repetition;
44use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
45pub use crate::parquet::schema::types::{
46    FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
47};
48pub use crate::parquet::write::{
49    Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
50    write_metadata_sidecar,
51};
52pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
53
54/// The statistics to write
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
57#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
58pub struct StatisticsOptions {
59    pub min_value: bool,
60    pub max_value: bool,
61    pub distinct_count: bool,
62    pub null_count: bool,
63}
64
65impl Default for StatisticsOptions {
66    fn default() -> Self {
67        Self {
68            min_value: true,
69            max_value: true,
70            distinct_count: false,
71            null_count: true,
72        }
73    }
74}
75
76/// Options to encode an array
77#[derive(Clone, Copy)]
78pub enum EncodeNullability {
79    Required,
80    Optional,
81}
82
83/// Currently supported options to write to parquet
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub struct WriteOptions {
86    /// Whether to write statistics
87    pub statistics: StatisticsOptions,
88    /// The page and file version to use
89    pub version: Version,
90    /// The compression to apply to every page
91    pub compression: CompressionOptions,
92    /// The size to flush a page, defaults to 1024 * 1024 if None
93    pub data_page_size: Option<usize>,
94}
95
96#[derive(Clone)]
97pub struct ColumnWriteOptions {
98    pub field_id: Option<i32>,
99    pub metadata: Vec<KeyValue>,
100    pub required: Option<bool>,
101    pub children: ChildWriteOptions,
102}
103
104#[derive(Clone)]
105pub enum ChildWriteOptions {
106    Leaf(FieldWriteOptions),
107    ListLike(Box<ListLikeFieldWriteOptions>),
108    Struct(Box<StructFieldWriteOptions>),
109}
110
111impl ColumnWriteOptions {
112    pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {
113        match &self.children {
114            ChildWriteOptions::Leaf(o) => out.push(o),
115            ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),
116            ChildWriteOptions::Struct(o) => {
117                for o in &o.children {
118                    o.to_leaves(out);
119                }
120            },
121        }
122    }
123}
124
125#[derive(Clone)]
126pub struct FieldWriteOptions {
127    pub encoding: Encoding,
128}
129
130impl ColumnWriteOptions {
131    pub fn default_with(children: ChildWriteOptions) -> Self {
132        Self {
133            field_id: None,
134            metadata: Vec::new(),
135            required: None,
136            children,
137        }
138    }
139}
140
141impl FieldWriteOptions {
142    pub fn default_with_encoding(encoding: Encoding) -> Self {
143        Self { encoding }
144    }
145
146    pub fn into_default_column_write_options(self) -> ColumnWriteOptions {
147        ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))
148    }
149}
150
151#[derive(Clone)]
152pub struct ListLikeFieldWriteOptions {
153    pub child: ColumnWriteOptions,
154}
155
156#[derive(Clone)]
157pub struct StructFieldWriteOptions {
158    pub children: Vec<ColumnWriteOptions>,
159}
160
161use arrow::compute::aggregate::estimated_bytes_size;
162use arrow::match_integer_type;
163pub use file::FileWriter;
164pub use pages::{Nested, array_to_columns, arrays_to_columns};
165use polars_error::{PolarsResult, polars_bail};
166pub use row_group::{RowGroupIterator, row_group_iter};
167pub use schema::{schema_to_metadata_key, to_parquet_type};
168
169use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
170use crate::write::dictionary::encode_as_dictionary_optional;
171
172impl StatisticsOptions {
173    pub fn empty() -> Self {
174        Self {
175            min_value: false,
176            max_value: false,
177            distinct_count: false,
178            null_count: false,
179        }
180    }
181
182    pub fn full() -> Self {
183        Self {
184            min_value: true,
185            max_value: true,
186            distinct_count: true,
187            null_count: true,
188        }
189    }
190
191    pub fn is_empty(&self) -> bool {
192        !(self.min_value || self.max_value || self.distinct_count || self.null_count)
193    }
194
195    pub fn is_full(&self) -> bool {
196        self.min_value && self.max_value && self.distinct_count && self.null_count
197    }
198}
199
200impl WriteOptions {
201    pub fn has_statistics(&self) -> bool {
202        !self.statistics.is_empty()
203    }
204}
205
206impl EncodeNullability {
207    const fn new(is_optional: bool) -> Self {
208        if is_optional {
209            Self::Optional
210        } else {
211            Self::Required
212        }
213    }
214
215    fn is_optional(self) -> bool {
216        matches!(self, Self::Optional)
217    }
218}
219
220/// returns offset and length to slice the leaf values
221pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
222    // find the deepest recursive dremel structure as that one determines how many values we must
223    // take
224    let mut out = (0, 0);
225    for nested in nested.iter().rev() {
226        match nested {
227            Nested::LargeList(l_nested) => {
228                let start = *l_nested.offsets.first();
229                let end = *l_nested.offsets.last();
230                return (start as usize, (end - start) as usize);
231            },
232            Nested::List(l_nested) => {
233                let start = *l_nested.offsets.first();
234                let end = *l_nested.offsets.last();
235                return (start as usize, (end - start) as usize);
236            },
237            Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
238            Nested::Primitive(nested) => out = (0, nested.length),
239            Nested::Struct(_) => {},
240        }
241    }
242    out
243}
244
245fn decimal_length_from_precision(precision: usize) -> usize {
246    // digits = floor(log_10(2^(8*n - 1) - 1))
247    // ceil(digits) = log10(2^(8*n - 1) - 1)
248    // 10^ceil(digits) = 2^(8*n - 1) - 1
249    // 10^ceil(digits) + 1 = 2^(8*n - 1)
250    // log2(10^ceil(digits) + 1) = (8*n - 1)
251    // log2(10^ceil(digits) + 1) + 1 = 8*n
252    // (log2(10^ceil(a) + 1) + 1) / 8 = n
253    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
254}
255
256/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].
257pub fn to_parquet_schema(
258    schema: &ArrowSchema,
259    column_options: &[ColumnWriteOptions],
260) -> PolarsResult<SchemaDescriptor> {
261    let parquet_types = schema
262        .iter_values()
263        .zip(column_options)
264        .map(|(field, options)| to_parquet_type(field, options))
265        .collect::<PolarsResult<Vec<_>>>()?;
266    Ok(SchemaDescriptor::new(
267        PlSmallStr::from_static("root"),
268        parquet_types,
269    ))
270}
271
272/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
273pub fn slice_parquet_array(
274    primitive_array: &mut dyn Array,
275    nested: &mut [Nested],
276    mut current_offset: usize,
277    mut current_length: usize,
278) {
279    for nested in nested.iter_mut() {
280        match nested {
281            Nested::LargeList(l_nested) => {
282                l_nested.offsets.slice(current_offset, current_length + 1);
283                if let Some(validity) = l_nested.validity.as_mut() {
284                    validity.slice(current_offset, current_length)
285                };
286
287                // Update the offset/ length so that the Primitive is sliced properly.
288                current_length = l_nested.offsets.range() as usize;
289                current_offset = *l_nested.offsets.first() as usize;
290            },
291            Nested::List(l_nested) => {
292                l_nested.offsets.slice(current_offset, current_length + 1);
293                if let Some(validity) = l_nested.validity.as_mut() {
294                    validity.slice(current_offset, current_length)
295                };
296
297                // Update the offset/ length so that the Primitive is sliced properly.
298                current_length = l_nested.offsets.range() as usize;
299                current_offset = *l_nested.offsets.first() as usize;
300            },
301            Nested::Struct(StructNested {
302                validity, length, ..
303            }) => {
304                *length = current_length;
305                if let Some(validity) = validity.as_mut() {
306                    validity.slice(current_offset, current_length)
307                };
308            },
309            Nested::Primitive(PrimitiveNested {
310                validity, length, ..
311            }) => {
312                *length = current_length;
313                if let Some(validity) = validity.as_mut() {
314                    validity.slice(current_offset, current_length)
315                };
316                primitive_array.slice(current_offset, current_length);
317            },
318            Nested::FixedSizeList(FixedSizeListNested {
319                validity,
320                length,
321                width,
322                ..
323            }) => {
324                if let Some(validity) = validity.as_mut() {
325                    validity.slice(current_offset, current_length)
326                };
327                *length = current_length;
328                // Update the offset/ length so that the Primitive is sliced properly.
329                current_length *= *width;
330                current_offset *= *width;
331            },
332        }
333    }
334}
335
336/// Get the length of [`Array`] that should be sliced.
337pub fn get_max_length(nested: &[Nested]) -> usize {
338    let mut length = 0;
339    for nested in nested.iter() {
340        match nested {
341            Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
342            Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
343            Nested::FixedSizeList(nested) => length += nested.length * nested.width,
344            _ => {},
345        }
346    }
347    length
348}
349
350/// Returns an iterator of [`Page`].
351pub fn array_to_pages(
352    primitive_array: &dyn Array,
353    type_: ParquetPrimitiveType,
354    nested: &[Nested],
355    options: WriteOptions,
356    field_options: &FieldWriteOptions,
357) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
358    let mut encoding = field_options.encoding;
359    if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
360        return match_integer_type!(key_type, |$T| {
361            dictionary::array_to_pages::<$T>(
362                primitive_array.as_any().downcast_ref().unwrap(),
363                type_,
364                &nested,
365                options,
366                encoding,
367            )
368        });
369    };
370    if let Encoding::RleDictionary = encoding {
371        // Only take this path for primitive columns
372        if matches!(nested.first(), Some(Nested::Primitive(_))) {
373            if let Some(result) =
374                encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
375            {
376                return result;
377            }
378        }
379
380        // We didn't succeed, fallback to plain
381        encoding = Encoding::Plain;
382    }
383
384    let nested = nested.to_vec();
385
386    let number_of_rows = nested[0].len();
387
388    // note: this is not correct if the array is sliced - the estimation should happen on the
389    // primitive after sliced for parquet
390    let byte_size = estimated_bytes_size(primitive_array);
391
392    const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
393    let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
394    let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
395    let bytes_per_row = if number_of_rows == 0 {
396        0
397    } else {
398        ((byte_size as f64) / (number_of_rows as f64)) as usize
399    };
400    let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
401
402    let row_iter = (0..number_of_rows)
403        .step_by(rows_per_page)
404        .map(move |offset| {
405            let length = if offset + rows_per_page > number_of_rows {
406                number_of_rows - offset
407            } else {
408                rows_per_page
409            };
410            (offset, length)
411        });
412
413    let primitive_array = primitive_array.to_boxed();
414
415    let pages = row_iter.map(move |(offset, length)| {
416        let mut right_array = primitive_array.clone();
417        let mut right_nested = nested.clone();
418        slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
419
420        array_to_page(
421            right_array.as_ref(),
422            type_.clone(),
423            &right_nested,
424            options,
425            encoding,
426        )
427    });
428    Ok(DynIter::new(pages))
429}
430
431/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
432pub fn array_to_page(
433    array: &dyn Array,
434    type_: ParquetPrimitiveType,
435    nested: &[Nested],
436    options: WriteOptions,
437    encoding: Encoding,
438) -> PolarsResult<Page> {
439    if nested.len() == 1 {
440        // special case where validity == def levels
441        return array_to_page_simple(array, type_, options, encoding);
442    }
443    array_to_page_nested(array, type_, nested, options, encoding)
444}
445
446/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
447pub fn array_to_page_simple(
448    array: &dyn Array,
449    type_: ParquetPrimitiveType,
450    options: WriteOptions,
451    encoding: Encoding,
452) -> PolarsResult<Page> {
453    let dtype = array.dtype();
454
455    if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
456        polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
457    }
458
459    match dtype.to_logical_type() {
460        // Map empty struct to boolean array with same validity.
461        ArrowDataType::Struct(fs) if fs.is_empty() => boolean::array_to_page(
462            &BooleanArray::new(
463                ArrowDataType::Boolean,
464                Bitmap::new_zeroed(array.len()),
465                array.validity().cloned(),
466            ),
467            options,
468            type_,
469            encoding,
470        ),
471
472        ArrowDataType::Boolean => boolean::array_to_page(
473            array.as_any().downcast_ref().unwrap(),
474            options,
475            type_,
476            encoding,
477        ),
478        // casts below MUST match the casts done at the metadata (field -> parquet type).
479        ArrowDataType::UInt8 => {
480            return primitive::array_to_page_integer::<u8, i32>(
481                array.as_any().downcast_ref().unwrap(),
482                options,
483                type_,
484                encoding,
485            );
486        },
487        ArrowDataType::UInt16 => {
488            return primitive::array_to_page_integer::<u16, i32>(
489                array.as_any().downcast_ref().unwrap(),
490                options,
491                type_,
492                encoding,
493            );
494        },
495        ArrowDataType::UInt32 => {
496            return primitive::array_to_page_integer::<u32, i32>(
497                array.as_any().downcast_ref().unwrap(),
498                options,
499                type_,
500                encoding,
501            );
502        },
503        ArrowDataType::UInt64 => {
504            return primitive::array_to_page_integer::<u64, i64>(
505                array.as_any().downcast_ref().unwrap(),
506                options,
507                type_,
508                encoding,
509            );
510        },
511        ArrowDataType::Int8 => {
512            return primitive::array_to_page_integer::<i8, i32>(
513                array.as_any().downcast_ref().unwrap(),
514                options,
515                type_,
516                encoding,
517            );
518        },
519        ArrowDataType::Int16 => {
520            return primitive::array_to_page_integer::<i16, i32>(
521                array.as_any().downcast_ref().unwrap(),
522                options,
523                type_,
524                encoding,
525            );
526        },
527        ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
528            return primitive::array_to_page_integer::<i32, i32>(
529                array.as_any().downcast_ref().unwrap(),
530                options,
531                type_,
532                encoding,
533            );
534        },
535        ArrowDataType::Int64
536        | ArrowDataType::Date64
537        | ArrowDataType::Time64(_)
538        | ArrowDataType::Timestamp(_, _)
539        | ArrowDataType::Duration(_) => {
540            return primitive::array_to_page_integer::<i64, i64>(
541                array.as_any().downcast_ref().unwrap(),
542                options,
543                type_,
544                encoding,
545            );
546        },
547        ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
548            array.as_any().downcast_ref().unwrap(),
549            options,
550            type_,
551        ),
552        ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
553            array.as_any().downcast_ref().unwrap(),
554            options,
555            type_,
556        ),
557        ArrowDataType::LargeUtf8 => {
558            let array =
559                polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
560                    .unwrap();
561            return binary::array_to_page::<i64>(
562                array.as_any().downcast_ref().unwrap(),
563                options,
564                type_,
565                encoding,
566            );
567        },
568        ArrowDataType::LargeBinary => {
569            return binary::array_to_page::<i64>(
570                array.as_any().downcast_ref().unwrap(),
571                options,
572                type_,
573                encoding,
574            );
575        },
576        ArrowDataType::BinaryView => {
577            return binview::array_to_page(
578                array.as_any().downcast_ref().unwrap(),
579                options,
580                type_,
581                encoding,
582            );
583        },
584        ArrowDataType::Utf8View => {
585            let array =
586                polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
587                    .unwrap();
588            return binview::array_to_page(
589                array.as_any().downcast_ref().unwrap(),
590                options,
591                type_,
592                encoding,
593            );
594        },
595        ArrowDataType::Null => {
596            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
597            primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
598        },
599        ArrowDataType::Interval(IntervalUnit::YearMonth) => {
600            let array = array
601                .as_any()
602                .downcast_ref::<PrimitiveArray<i32>>()
603                .unwrap();
604            let mut values = Vec::<u8>::with_capacity(12 * array.len());
605            array.values().iter().for_each(|x| {
606                let bytes = &x.to_le_bytes();
607                values.extend_from_slice(bytes);
608                values.extend_from_slice(&[0; 8]);
609            });
610            let array = FixedSizeBinaryArray::new(
611                ArrowDataType::FixedSizeBinary(12),
612                values.into(),
613                array.validity().cloned(),
614            );
615            let statistics = if options.has_statistics() {
616                Some(fixed_size_binary::build_statistics(
617                    &array,
618                    type_.clone(),
619                    &options.statistics,
620                ))
621            } else {
622                None
623            };
624            fixed_size_binary::array_to_page(&array, options, type_, statistics)
625        },
626        ArrowDataType::Interval(IntervalUnit::DayTime) => {
627            let array = array
628                .as_any()
629                .downcast_ref::<PrimitiveArray<days_ms>>()
630                .unwrap();
631            let mut values = Vec::<u8>::with_capacity(12 * array.len());
632            array.values().iter().for_each(|x| {
633                let bytes = &x.to_le_bytes();
634                values.extend_from_slice(&[0; 4]); // months
635                values.extend_from_slice(bytes); // days and seconds
636            });
637            let array = FixedSizeBinaryArray::new(
638                ArrowDataType::FixedSizeBinary(12),
639                values.into(),
640                array.validity().cloned(),
641            );
642            let statistics = if options.has_statistics() {
643                Some(fixed_size_binary::build_statistics(
644                    &array,
645                    type_.clone(),
646                    &options.statistics,
647                ))
648            } else {
649                None
650            };
651            fixed_size_binary::array_to_page(&array, options, type_, statistics)
652        },
653        ArrowDataType::FixedSizeBinary(_) => {
654            let array = array.as_any().downcast_ref().unwrap();
655            let statistics = if options.has_statistics() {
656                Some(fixed_size_binary::build_statistics(
657                    array,
658                    type_.clone(),
659                    &options.statistics,
660                ))
661            } else {
662                None
663            };
664
665            fixed_size_binary::array_to_page(array, options, type_, statistics)
666        },
667        ArrowDataType::Decimal256(precision, _) => {
668            let precision = *precision;
669            let array = array
670                .as_any()
671                .downcast_ref::<PrimitiveArray<i256>>()
672                .unwrap();
673            if precision <= 9 {
674                let values = array
675                    .values()
676                    .iter()
677                    .map(|x| x.0.as_i32())
678                    .collect::<Vec<_>>()
679                    .into();
680
681                let array = PrimitiveArray::<i32>::new(
682                    ArrowDataType::Int32,
683                    values,
684                    array.validity().cloned(),
685                );
686                return primitive::array_to_page_integer::<i32, i32>(
687                    &array, options, type_, encoding,
688                );
689            } else if precision <= 18 {
690                let values = array
691                    .values()
692                    .iter()
693                    .map(|x| x.0.as_i64())
694                    .collect::<Vec<_>>()
695                    .into();
696
697                let array = PrimitiveArray::<i64>::new(
698                    ArrowDataType::Int64,
699                    values,
700                    array.validity().cloned(),
701                );
702                return primitive::array_to_page_integer::<i64, i64>(
703                    &array, options, type_, encoding,
704                );
705            } else if precision <= 38 {
706                let size = decimal_length_from_precision(precision);
707                let statistics = if options.has_statistics() {
708                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
709                        array,
710                        type_.clone(),
711                        size,
712                        &options.statistics,
713                    );
714                    Some(stats)
715                } else {
716                    None
717                };
718
719                let mut values = Vec::<u8>::with_capacity(size * array.len());
720                array.values().iter().for_each(|x| {
721                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
722                    values.extend_from_slice(bytes)
723                });
724                let array = FixedSizeBinaryArray::new(
725                    ArrowDataType::FixedSizeBinary(size),
726                    values.into(),
727                    array.validity().cloned(),
728                );
729                fixed_size_binary::array_to_page(&array, options, type_, statistics)
730            } else {
731                let size = 32;
732                let array = array
733                    .as_any()
734                    .downcast_ref::<PrimitiveArray<i256>>()
735                    .unwrap();
736                let statistics = if options.has_statistics() {
737                    let stats = fixed_size_binary::build_statistics_decimal256(
738                        array,
739                        type_.clone(),
740                        size,
741                        &options.statistics,
742                    );
743                    Some(stats)
744                } else {
745                    None
746                };
747                let mut values = Vec::<u8>::with_capacity(size * array.len());
748                array.values().iter().for_each(|x| {
749                    let bytes = &x.to_be_bytes();
750                    values.extend_from_slice(bytes)
751                });
752                let array = FixedSizeBinaryArray::new(
753                    ArrowDataType::FixedSizeBinary(size),
754                    values.into(),
755                    array.validity().cloned(),
756                );
757
758                fixed_size_binary::array_to_page(&array, options, type_, statistics)
759            }
760        },
761        ArrowDataType::Decimal(precision, _) => {
762            let precision = *precision;
763            let array = array
764                .as_any()
765                .downcast_ref::<PrimitiveArray<i128>>()
766                .unwrap();
767            if precision <= 9 {
768                let values = array
769                    .values()
770                    .iter()
771                    .map(|x| *x as i32)
772                    .collect::<Vec<_>>()
773                    .into();
774
775                let array = PrimitiveArray::<i32>::new(
776                    ArrowDataType::Int32,
777                    values,
778                    array.validity().cloned(),
779                );
780                return primitive::array_to_page_integer::<i32, i32>(
781                    &array, options, type_, encoding,
782                );
783            } else if precision <= 18 {
784                let values = array
785                    .values()
786                    .iter()
787                    .map(|x| *x as i64)
788                    .collect::<Vec<_>>()
789                    .into();
790
791                let array = PrimitiveArray::<i64>::new(
792                    ArrowDataType::Int64,
793                    values,
794                    array.validity().cloned(),
795                );
796                return primitive::array_to_page_integer::<i64, i64>(
797                    &array, options, type_, encoding,
798                );
799            } else {
800                let size = decimal_length_from_precision(precision);
801
802                let statistics = if options.has_statistics() {
803                    let stats = fixed_size_binary::build_statistics_decimal(
804                        array,
805                        type_.clone(),
806                        size,
807                        &options.statistics,
808                    );
809                    Some(stats)
810                } else {
811                    None
812                };
813
814                let mut values = Vec::<u8>::with_capacity(size * array.len());
815                array.values().iter().for_each(|x| {
816                    let bytes = &x.to_be_bytes()[16 - size..];
817                    values.extend_from_slice(bytes)
818                });
819                let array = FixedSizeBinaryArray::new(
820                    ArrowDataType::FixedSizeBinary(size),
821                    values.into(),
822                    array.validity().cloned(),
823                );
824                fixed_size_binary::array_to_page(&array, options, type_, statistics)
825            }
826        },
827        ArrowDataType::UInt128 => {
828            let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
829            let statistics = if options.has_statistics() {
830                let stats = fixed_size_binary::build_statistics_decimal(
831                    array,
832                    type_.clone(),
833                    16,
834                    &options.statistics,
835                );
836                Some(stats)
837            } else {
838                None
839            };
840            let array = FixedSizeBinaryArray::new(
841                ArrowDataType::FixedSizeBinary(16),
842                array.values().clone().try_transmute().unwrap(),
843                array.validity().cloned(),
844            );
845            fixed_size_binary::array_to_page(&array, options, type_, statistics)
846        },
847        ArrowDataType::Int128 => {
848            let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
849            let statistics = if options.has_statistics() {
850                let stats = fixed_size_binary::build_statistics_decimal(
851                    array,
852                    type_.clone(),
853                    16,
854                    &options.statistics,
855                );
856                Some(stats)
857            } else {
858                None
859            };
860            let array = FixedSizeBinaryArray::new(
861                ArrowDataType::FixedSizeBinary(16),
862                array.values().clone().try_transmute().unwrap(),
863                array.validity().cloned(),
864            );
865            fixed_size_binary::array_to_page(&array, options, type_, statistics)
866        },
867        other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
868    }
869    .map(Page::Data)
870}
871
872fn array_to_page_nested(
873    array: &dyn Array,
874    type_: ParquetPrimitiveType,
875    nested: &[Nested],
876    options: WriteOptions,
877    _encoding: Encoding,
878) -> PolarsResult<Page> {
879    if type_.field_info.repetition == Repetition::Required
880        && array.validity().is_some_and(|v| v.unset_bits() > 0)
881    {
882        polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
883    }
884
885    use ArrowDataType::*;
886    match array.dtype().to_logical_type() {
887        Null => {
888            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
889            primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
890        },
891        // Map empty struct to boolean array with same validity.
892        Struct(fs) if fs.is_empty() => {
893            let array = BooleanArray::new(
894                ArrowDataType::Boolean,
895                Bitmap::new_zeroed(array.len()),
896                array.validity().cloned(),
897            );
898            boolean::nested_array_to_page(&array, options, type_, nested)
899        },
900        Boolean => {
901            let array = array.as_any().downcast_ref().unwrap();
902            boolean::nested_array_to_page(array, options, type_, nested)
903        },
904        LargeUtf8 => {
905            let array =
906                polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
907            let array = array.as_any().downcast_ref().unwrap();
908            binary::nested_array_to_page::<i64>(array, options, type_, nested)
909        },
910        LargeBinary => {
911            let array = array.as_any().downcast_ref().unwrap();
912            binary::nested_array_to_page::<i64>(array, options, type_, nested)
913        },
914        BinaryView => {
915            let array = array.as_any().downcast_ref().unwrap();
916            binview::nested_array_to_page(array, options, type_, nested)
917        },
918        Utf8View => {
919            let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
920            let array = array.as_any().downcast_ref().unwrap();
921            binview::nested_array_to_page(array, options, type_, nested)
922        },
923        UInt8 => {
924            let array = array.as_any().downcast_ref().unwrap();
925            primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
926        },
927        UInt16 => {
928            let array = array.as_any().downcast_ref().unwrap();
929            primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
930        },
931        UInt32 => {
932            let array = array.as_any().downcast_ref().unwrap();
933            primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
934        },
935        UInt64 => {
936            let array = array.as_any().downcast_ref().unwrap();
937            primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
938        },
939        Int8 => {
940            let array = array.as_any().downcast_ref().unwrap();
941            primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
942        },
943        Int16 => {
944            let array = array.as_any().downcast_ref().unwrap();
945            primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
946        },
947        Int32 | Date32 | Time32(_) => {
948            let array = array.as_any().downcast_ref().unwrap();
949            primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
950        },
951        Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
952            let array = array.as_any().downcast_ref().unwrap();
953            primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
954        },
955        Float32 => {
956            let array = array.as_any().downcast_ref().unwrap();
957            primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
958        },
959        Float64 => {
960            let array = array.as_any().downcast_ref().unwrap();
961            primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
962        },
963        Decimal(precision, _) => {
964            let precision = *precision;
965            let array = array
966                .as_any()
967                .downcast_ref::<PrimitiveArray<i128>>()
968                .unwrap();
969            if precision <= 9 {
970                let values = array
971                    .values()
972                    .iter()
973                    .map(|x| *x as i32)
974                    .collect::<Vec<_>>()
975                    .into();
976
977                let array = PrimitiveArray::<i32>::new(
978                    ArrowDataType::Int32,
979                    values,
980                    array.validity().cloned(),
981                );
982                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
983            } else if precision <= 18 {
984                let values = array
985                    .values()
986                    .iter()
987                    .map(|x| *x as i64)
988                    .collect::<Vec<_>>()
989                    .into();
990
991                let array = PrimitiveArray::<i64>::new(
992                    ArrowDataType::Int64,
993                    values,
994                    array.validity().cloned(),
995                );
996                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
997            } else {
998                let size = decimal_length_from_precision(precision);
999
1000                let statistics = if options.has_statistics() {
1001                    let stats = fixed_size_binary::build_statistics_decimal(
1002                        array,
1003                        type_.clone(),
1004                        size,
1005                        &options.statistics,
1006                    );
1007                    Some(stats)
1008                } else {
1009                    None
1010                };
1011
1012                let mut values = Vec::<u8>::with_capacity(size * array.len());
1013                array.values().iter().for_each(|x| {
1014                    let bytes = &x.to_be_bytes()[16 - size..];
1015                    values.extend_from_slice(bytes)
1016                });
1017                let array = FixedSizeBinaryArray::new(
1018                    ArrowDataType::FixedSizeBinary(size),
1019                    values.into(),
1020                    array.validity().cloned(),
1021                );
1022                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1023            }
1024        },
1025        Decimal256(precision, _) => {
1026            let precision = *precision;
1027            let array = array
1028                .as_any()
1029                .downcast_ref::<PrimitiveArray<i256>>()
1030                .unwrap();
1031            if precision <= 9 {
1032                let values = array
1033                    .values()
1034                    .iter()
1035                    .map(|x| x.0.as_i32())
1036                    .collect::<Vec<_>>()
1037                    .into();
1038
1039                let array = PrimitiveArray::<i32>::new(
1040                    ArrowDataType::Int32,
1041                    values,
1042                    array.validity().cloned(),
1043                );
1044                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1045            } else if precision <= 18 {
1046                let values = array
1047                    .values()
1048                    .iter()
1049                    .map(|x| x.0.as_i64())
1050                    .collect::<Vec<_>>()
1051                    .into();
1052
1053                let array = PrimitiveArray::<i64>::new(
1054                    ArrowDataType::Int64,
1055                    values,
1056                    array.validity().cloned(),
1057                );
1058                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1059            } else if precision <= 38 {
1060                let size = decimal_length_from_precision(precision);
1061                let statistics = if options.has_statistics() {
1062                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1063                        array,
1064                        type_.clone(),
1065                        size,
1066                        &options.statistics,
1067                    );
1068                    Some(stats)
1069                } else {
1070                    None
1071                };
1072
1073                let mut values = Vec::<u8>::with_capacity(size * array.len());
1074                array.values().iter().for_each(|x| {
1075                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
1076                    values.extend_from_slice(bytes)
1077                });
1078                let array = FixedSizeBinaryArray::new(
1079                    ArrowDataType::FixedSizeBinary(size),
1080                    values.into(),
1081                    array.validity().cloned(),
1082                );
1083                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1084            } else {
1085                let size = 32;
1086                let array = array
1087                    .as_any()
1088                    .downcast_ref::<PrimitiveArray<i256>>()
1089                    .unwrap();
1090                let statistics = if options.has_statistics() {
1091                    let stats = fixed_size_binary::build_statistics_decimal256(
1092                        array,
1093                        type_.clone(),
1094                        size,
1095                        &options.statistics,
1096                    );
1097                    Some(stats)
1098                } else {
1099                    None
1100                };
1101                let mut values = Vec::<u8>::with_capacity(size * array.len());
1102                array.values().iter().for_each(|x| {
1103                    let bytes = &x.to_be_bytes();
1104                    values.extend_from_slice(bytes)
1105                });
1106                let array = FixedSizeBinaryArray::new(
1107                    ArrowDataType::FixedSizeBinary(size),
1108                    values.into(),
1109                    array.validity().cloned(),
1110                );
1111
1112                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1113            }
1114        },
1115        Int128 => {
1116            let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1117            let statistics = if options.has_statistics() {
1118                let stats = fixed_size_binary::build_statistics_decimal(
1119                    array,
1120                    type_.clone(),
1121                    16,
1122                    &options.statistics,
1123                );
1124                Some(stats)
1125            } else {
1126                None
1127            };
1128            let array = FixedSizeBinaryArray::new(
1129                ArrowDataType::FixedSizeBinary(16),
1130                array.values().clone().try_transmute().unwrap(),
1131                array.validity().cloned(),
1132            );
1133            fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1134        },
1135        UInt128 => {
1136            let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
1137            let statistics = if options.has_statistics() {
1138                let stats = fixed_size_binary::build_statistics_decimal(
1139                    array,
1140                    type_.clone(),
1141                    16,
1142                    &options.statistics,
1143                );
1144                Some(stats)
1145            } else {
1146                None
1147            };
1148            let array = FixedSizeBinaryArray::new(
1149                ArrowDataType::FixedSizeBinary(16),
1150                array.values().clone().try_transmute().unwrap(),
1151                array.validity().cloned(),
1152            );
1153            fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1154        },
1155        other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1156    }
1157    .map(Page::Data)
1158}
1159
1160fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
1161    dtype: &ArrowDataType,
1162    map: F,
1163    encodings: &mut Vec<T>,
1164) {
1165    use arrow::datatypes::PhysicalType::*;
1166    match dtype.to_physical_type() {
1167        Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1168        | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
1169        List | FixedSizeList | LargeList => {
1170            let a = dtype.to_logical_type();
1171            if let ArrowDataType::List(inner) = a {
1172                transverse_recursive(&inner.dtype, map, encodings)
1173            } else if let ArrowDataType::LargeList(inner) = a {
1174                transverse_recursive(&inner.dtype, map, encodings)
1175            } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1176                transverse_recursive(&inner.dtype, map, encodings)
1177            } else {
1178                unreachable!()
1179            }
1180        },
1181        Struct => {
1182            if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
1183                for field in fields {
1184                    transverse_recursive(&field.dtype, map.clone(), encodings)
1185                }
1186            } else {
1187                unreachable!()
1188            }
1189        },
1190        Map => {
1191            if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1192                if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1193                    for field in fields {
1194                        transverse_recursive(&field.dtype, map.clone(), encodings)
1195                    }
1196                } else {
1197                    unreachable!()
1198                }
1199            } else {
1200                unreachable!()
1201            }
1202        },
1203        Union => todo!(),
1204    }
1205}
1206
1207/// Transverses the `dtype` up to its (parquet) columns and returns a vector of
1208/// items based on `map`.
1209///
1210/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
1211pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1212    let mut encodings = vec![];
1213    transverse_recursive(dtype, map, &mut encodings);
1214    encodings
1215}