polars_parquet/arrow/write/
mod.rs

1//! APIs to write to Parquet format.
2//!
3//! # Arrow/Parquet Interoperability
4//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet
6//! representation. These include but are not limited to:
7//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`
8//! * `ArrowDataType::Int64`
9//! * `ArrowDataType::Duration`
10//! * `ArrowDataType::Date64`
11//! * `ArrowDataType::Time32(TimeUnit::Second)`
12//!
13//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::datatypes::*;
30use arrow::types::{NativeType, days_ms, i256};
31pub use nested::{num_values, write_rep_and_def};
32pub use pages::{to_leaves, to_nested, to_parquet_leaves};
33use polars_utils::pl_str::PlSmallStr;
34pub use utils::write_def_levels;
35
36pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
37pub use crate::parquet::encoding::Encoding;
38pub use crate::parquet::metadata::{
39    Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
40};
41pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
42use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
43pub use crate::parquet::schema::types::{
44    FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
45};
46pub use crate::parquet::write::{
47    Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
48    write_metadata_sidecar,
49};
50pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
51
52/// The statistics to write
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct StatisticsOptions {
56    pub min_value: bool,
57    pub max_value: bool,
58    pub distinct_count: bool,
59    pub null_count: bool,
60}
61
62impl Default for StatisticsOptions {
63    fn default() -> Self {
64        Self {
65            min_value: true,
66            max_value: true,
67            distinct_count: false,
68            null_count: true,
69        }
70    }
71}
72
73/// Options to encode an array
74#[derive(Clone, Copy)]
75pub enum EncodeNullability {
76    Required,
77    Optional,
78}
79
80/// Currently supported options to write to parquet
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct WriteOptions {
83    /// Whether to write statistics
84    pub statistics: StatisticsOptions,
85    /// The page and file version to use
86    pub version: Version,
87    /// The compression to apply to every page
88    pub compression: CompressionOptions,
89    /// The size to flush a page, defaults to 1024 * 1024 if None
90    pub data_page_size: Option<usize>,
91}
92
93use arrow::compute::aggregate::estimated_bytes_size;
94use arrow::match_integer_type;
95pub use file::FileWriter;
96pub use pages::{Nested, array_to_columns, arrays_to_columns};
97use polars_error::{PolarsResult, polars_bail};
98pub use row_group::{RowGroupIterator, row_group_iter};
99pub use schema::to_parquet_type;
100
101use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
102use crate::write::dictionary::encode_as_dictionary_optional;
103
104impl StatisticsOptions {
105    pub fn empty() -> Self {
106        Self {
107            min_value: false,
108            max_value: false,
109            distinct_count: false,
110            null_count: false,
111        }
112    }
113
114    pub fn full() -> Self {
115        Self {
116            min_value: true,
117            max_value: true,
118            distinct_count: true,
119            null_count: true,
120        }
121    }
122
123    pub fn is_empty(&self) -> bool {
124        !(self.min_value || self.max_value || self.distinct_count || self.null_count)
125    }
126
127    pub fn is_full(&self) -> bool {
128        self.min_value && self.max_value && self.distinct_count && self.null_count
129    }
130}
131
132impl WriteOptions {
133    pub fn has_statistics(&self) -> bool {
134        !self.statistics.is_empty()
135    }
136}
137
138impl EncodeNullability {
139    const fn new(is_optional: bool) -> Self {
140        if is_optional {
141            Self::Optional
142        } else {
143            Self::Required
144        }
145    }
146
147    fn is_optional(self) -> bool {
148        matches!(self, Self::Optional)
149    }
150}
151
152/// returns offset and length to slice the leaf values
153pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
154    // find the deepest recursive dremel structure as that one determines how many values we must
155    // take
156    let mut out = (0, 0);
157    for nested in nested.iter().rev() {
158        match nested {
159            Nested::LargeList(l_nested) => {
160                let start = *l_nested.offsets.first();
161                let end = *l_nested.offsets.last();
162                return (start as usize, (end - start) as usize);
163            },
164            Nested::List(l_nested) => {
165                let start = *l_nested.offsets.first();
166                let end = *l_nested.offsets.last();
167                return (start as usize, (end - start) as usize);
168            },
169            Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
170            Nested::Primitive(nested) => out = (0, nested.length),
171            Nested::Struct(_) => {},
172        }
173    }
174    out
175}
176
177fn decimal_length_from_precision(precision: usize) -> usize {
178    // digits = floor(log_10(2^(8*n - 1) - 1))
179    // ceil(digits) = log10(2^(8*n - 1) - 1)
180    // 10^ceil(digits) = 2^(8*n - 1) - 1
181    // 10^ceil(digits) + 1 = 2^(8*n - 1)
182    // log2(10^ceil(digits) + 1) = (8*n - 1)
183    // log2(10^ceil(digits) + 1) + 1 = 8*n
184    // (log2(10^ceil(a) + 1) + 1) / 8 = n
185    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
186}
187
188/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].
189pub fn to_parquet_schema(schema: &ArrowSchema) -> PolarsResult<SchemaDescriptor> {
190    let parquet_types = schema
191        .iter_values()
192        .map(to_parquet_type)
193        .collect::<PolarsResult<Vec<_>>>()?;
194    Ok(SchemaDescriptor::new(
195        PlSmallStr::from_static("root"),
196        parquet_types,
197    ))
198}
199
200/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
201pub fn slice_parquet_array(
202    primitive_array: &mut dyn Array,
203    nested: &mut [Nested],
204    mut current_offset: usize,
205    mut current_length: usize,
206) {
207    for nested in nested.iter_mut() {
208        match nested {
209            Nested::LargeList(l_nested) => {
210                l_nested.offsets.slice(current_offset, current_length + 1);
211                if let Some(validity) = l_nested.validity.as_mut() {
212                    validity.slice(current_offset, current_length)
213                };
214
215                // Update the offset/ length so that the Primitive is sliced properly.
216                current_length = l_nested.offsets.range() as usize;
217                current_offset = *l_nested.offsets.first() as usize;
218            },
219            Nested::List(l_nested) => {
220                l_nested.offsets.slice(current_offset, current_length + 1);
221                if let Some(validity) = l_nested.validity.as_mut() {
222                    validity.slice(current_offset, current_length)
223                };
224
225                // Update the offset/ length so that the Primitive is sliced properly.
226                current_length = l_nested.offsets.range() as usize;
227                current_offset = *l_nested.offsets.first() as usize;
228            },
229            Nested::Struct(StructNested {
230                validity, length, ..
231            }) => {
232                *length = current_length;
233                if let Some(validity) = validity.as_mut() {
234                    validity.slice(current_offset, current_length)
235                };
236            },
237            Nested::Primitive(PrimitiveNested {
238                validity, length, ..
239            }) => {
240                *length = current_length;
241                if let Some(validity) = validity.as_mut() {
242                    validity.slice(current_offset, current_length)
243                };
244                primitive_array.slice(current_offset, current_length);
245            },
246            Nested::FixedSizeList(FixedSizeListNested {
247                validity,
248                length,
249                width,
250                ..
251            }) => {
252                if let Some(validity) = validity.as_mut() {
253                    validity.slice(current_offset, current_length)
254                };
255                *length = current_length;
256                // Update the offset/ length so that the Primitive is sliced properly.
257                current_length *= *width;
258                current_offset *= *width;
259            },
260        }
261    }
262}
263
264/// Get the length of [`Array`] that should be sliced.
265pub fn get_max_length(nested: &[Nested]) -> usize {
266    let mut length = 0;
267    for nested in nested.iter() {
268        match nested {
269            Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
270            Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
271            Nested::FixedSizeList(nested) => length += nested.length * nested.width,
272            _ => {},
273        }
274    }
275    length
276}
277
278/// Returns an iterator of [`Page`].
279pub fn array_to_pages(
280    primitive_array: &dyn Array,
281    type_: ParquetPrimitiveType,
282    nested: &[Nested],
283    options: WriteOptions,
284    mut encoding: Encoding,
285) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
286    if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
287        return match_integer_type!(key_type, |$T| {
288            dictionary::array_to_pages::<$T>(
289                primitive_array.as_any().downcast_ref().unwrap(),
290                type_,
291                &nested,
292                options,
293                encoding,
294            )
295        });
296    };
297    if let Encoding::RleDictionary = encoding {
298        // Only take this path for primitive columns
299        if matches!(nested.first(), Some(Nested::Primitive(_))) {
300            if let Some(result) =
301                encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
302            {
303                return result;
304            }
305        }
306
307        // We didn't succeed, fallback to plain
308        encoding = Encoding::Plain;
309    }
310
311    let nested = nested.to_vec();
312
313    let number_of_rows = nested[0].len();
314
315    // note: this is not correct if the array is sliced - the estimation should happen on the
316    // primitive after sliced for parquet
317    let byte_size = estimated_bytes_size(primitive_array);
318
319    const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
320    let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
321    let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
322    let bytes_per_row = if number_of_rows == 0 {
323        0
324    } else {
325        ((byte_size as f64) / (number_of_rows as f64)) as usize
326    };
327    let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
328
329    let row_iter = (0..number_of_rows)
330        .step_by(rows_per_page)
331        .map(move |offset| {
332            let length = if offset + rows_per_page > number_of_rows {
333                number_of_rows - offset
334            } else {
335                rows_per_page
336            };
337            (offset, length)
338        });
339
340    let primitive_array = primitive_array.to_boxed();
341
342    let pages = row_iter.map(move |(offset, length)| {
343        let mut right_array = primitive_array.clone();
344        let mut right_nested = nested.clone();
345        slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
346
347        array_to_page(
348            right_array.as_ref(),
349            type_.clone(),
350            &right_nested,
351            options,
352            encoding,
353        )
354    });
355    Ok(DynIter::new(pages))
356}
357
358/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
359pub fn array_to_page(
360    array: &dyn Array,
361    type_: ParquetPrimitiveType,
362    nested: &[Nested],
363    options: WriteOptions,
364    encoding: Encoding,
365) -> PolarsResult<Page> {
366    if nested.len() == 1 {
367        // special case where validity == def levels
368        return array_to_page_simple(array, type_, options, encoding);
369    }
370    array_to_page_nested(array, type_, nested, options, encoding)
371}
372
373/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
374pub fn array_to_page_simple(
375    array: &dyn Array,
376    type_: ParquetPrimitiveType,
377    options: WriteOptions,
378    encoding: Encoding,
379) -> PolarsResult<Page> {
380    let dtype = array.dtype();
381
382    match dtype.to_logical_type() {
383        ArrowDataType::Boolean => boolean::array_to_page(
384            array.as_any().downcast_ref().unwrap(),
385            options,
386            type_,
387            encoding,
388        ),
389        // casts below MUST match the casts done at the metadata (field -> parquet type).
390        ArrowDataType::UInt8 => {
391            return primitive::array_to_page_integer::<u8, i32>(
392                array.as_any().downcast_ref().unwrap(),
393                options,
394                type_,
395                encoding,
396            );
397        },
398        ArrowDataType::UInt16 => {
399            return primitive::array_to_page_integer::<u16, i32>(
400                array.as_any().downcast_ref().unwrap(),
401                options,
402                type_,
403                encoding,
404            );
405        },
406        ArrowDataType::UInt32 => {
407            return primitive::array_to_page_integer::<u32, i32>(
408                array.as_any().downcast_ref().unwrap(),
409                options,
410                type_,
411                encoding,
412            );
413        },
414        ArrowDataType::UInt64 => {
415            return primitive::array_to_page_integer::<u64, i64>(
416                array.as_any().downcast_ref().unwrap(),
417                options,
418                type_,
419                encoding,
420            );
421        },
422        ArrowDataType::Int8 => {
423            return primitive::array_to_page_integer::<i8, i32>(
424                array.as_any().downcast_ref().unwrap(),
425                options,
426                type_,
427                encoding,
428            );
429        },
430        ArrowDataType::Int16 => {
431            return primitive::array_to_page_integer::<i16, i32>(
432                array.as_any().downcast_ref().unwrap(),
433                options,
434                type_,
435                encoding,
436            );
437        },
438        ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
439            return primitive::array_to_page_integer::<i32, i32>(
440                array.as_any().downcast_ref().unwrap(),
441                options,
442                type_,
443                encoding,
444            );
445        },
446        ArrowDataType::Int64
447        | ArrowDataType::Date64
448        | ArrowDataType::Time64(_)
449        | ArrowDataType::Timestamp(_, _)
450        | ArrowDataType::Duration(_) => {
451            return primitive::array_to_page_integer::<i64, i64>(
452                array.as_any().downcast_ref().unwrap(),
453                options,
454                type_,
455                encoding,
456            );
457        },
458        ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
459            array.as_any().downcast_ref().unwrap(),
460            options,
461            type_,
462        ),
463        ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
464            array.as_any().downcast_ref().unwrap(),
465            options,
466            type_,
467        ),
468        ArrowDataType::LargeUtf8 => {
469            let array =
470                polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
471                    .unwrap();
472            return binary::array_to_page::<i64>(
473                array.as_any().downcast_ref().unwrap(),
474                options,
475                type_,
476                encoding,
477            );
478        },
479        ArrowDataType::LargeBinary => {
480            return binary::array_to_page::<i64>(
481                array.as_any().downcast_ref().unwrap(),
482                options,
483                type_,
484                encoding,
485            );
486        },
487        ArrowDataType::BinaryView => {
488            return binview::array_to_page(
489                array.as_any().downcast_ref().unwrap(),
490                options,
491                type_,
492                encoding,
493            );
494        },
495        ArrowDataType::Utf8View => {
496            let array =
497                polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
498                    .unwrap();
499            return binview::array_to_page(
500                array.as_any().downcast_ref().unwrap(),
501                options,
502                type_,
503                encoding,
504            );
505        },
506        ArrowDataType::Null => {
507            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
508            primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
509        },
510        ArrowDataType::Interval(IntervalUnit::YearMonth) => {
511            let array = array
512                .as_any()
513                .downcast_ref::<PrimitiveArray<i32>>()
514                .unwrap();
515            let mut values = Vec::<u8>::with_capacity(12 * array.len());
516            array.values().iter().for_each(|x| {
517                let bytes = &x.to_le_bytes();
518                values.extend_from_slice(bytes);
519                values.extend_from_slice(&[0; 8]);
520            });
521            let array = FixedSizeBinaryArray::new(
522                ArrowDataType::FixedSizeBinary(12),
523                values.into(),
524                array.validity().cloned(),
525            );
526            let statistics = if options.has_statistics() {
527                Some(fixed_size_binary::build_statistics(
528                    &array,
529                    type_.clone(),
530                    &options.statistics,
531                ))
532            } else {
533                None
534            };
535            fixed_size_binary::array_to_page(&array, options, type_, statistics)
536        },
537        ArrowDataType::Interval(IntervalUnit::DayTime) => {
538            let array = array
539                .as_any()
540                .downcast_ref::<PrimitiveArray<days_ms>>()
541                .unwrap();
542            let mut values = Vec::<u8>::with_capacity(12 * array.len());
543            array.values().iter().for_each(|x| {
544                let bytes = &x.to_le_bytes();
545                values.extend_from_slice(&[0; 4]); // months
546                values.extend_from_slice(bytes); // days and seconds
547            });
548            let array = FixedSizeBinaryArray::new(
549                ArrowDataType::FixedSizeBinary(12),
550                values.into(),
551                array.validity().cloned(),
552            );
553            let statistics = if options.has_statistics() {
554                Some(fixed_size_binary::build_statistics(
555                    &array,
556                    type_.clone(),
557                    &options.statistics,
558                ))
559            } else {
560                None
561            };
562            fixed_size_binary::array_to_page(&array, options, type_, statistics)
563        },
564        ArrowDataType::FixedSizeBinary(_) => {
565            let array = array.as_any().downcast_ref().unwrap();
566            let statistics = if options.has_statistics() {
567                Some(fixed_size_binary::build_statistics(
568                    array,
569                    type_.clone(),
570                    &options.statistics,
571                ))
572            } else {
573                None
574            };
575
576            fixed_size_binary::array_to_page(array, options, type_, statistics)
577        },
578        ArrowDataType::Decimal256(precision, _) => {
579            let precision = *precision;
580            let array = array
581                .as_any()
582                .downcast_ref::<PrimitiveArray<i256>>()
583                .unwrap();
584            if precision <= 9 {
585                let values = array
586                    .values()
587                    .iter()
588                    .map(|x| x.0.as_i32())
589                    .collect::<Vec<_>>()
590                    .into();
591
592                let array = PrimitiveArray::<i32>::new(
593                    ArrowDataType::Int32,
594                    values,
595                    array.validity().cloned(),
596                );
597                return primitive::array_to_page_integer::<i32, i32>(
598                    &array, options, type_, encoding,
599                );
600            } else if precision <= 18 {
601                let values = array
602                    .values()
603                    .iter()
604                    .map(|x| x.0.as_i64())
605                    .collect::<Vec<_>>()
606                    .into();
607
608                let array = PrimitiveArray::<i64>::new(
609                    ArrowDataType::Int64,
610                    values,
611                    array.validity().cloned(),
612                );
613                return primitive::array_to_page_integer::<i64, i64>(
614                    &array, options, type_, encoding,
615                );
616            } else if precision <= 38 {
617                let size = decimal_length_from_precision(precision);
618                let statistics = if options.has_statistics() {
619                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
620                        array,
621                        type_.clone(),
622                        size,
623                        &options.statistics,
624                    );
625                    Some(stats)
626                } else {
627                    None
628                };
629
630                let mut values = Vec::<u8>::with_capacity(size * array.len());
631                array.values().iter().for_each(|x| {
632                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
633                    values.extend_from_slice(bytes)
634                });
635                let array = FixedSizeBinaryArray::new(
636                    ArrowDataType::FixedSizeBinary(size),
637                    values.into(),
638                    array.validity().cloned(),
639                );
640                fixed_size_binary::array_to_page(&array, options, type_, statistics)
641            } else {
642                let size = 32;
643                let array = array
644                    .as_any()
645                    .downcast_ref::<PrimitiveArray<i256>>()
646                    .unwrap();
647                let statistics = if options.has_statistics() {
648                    let stats = fixed_size_binary::build_statistics_decimal256(
649                        array,
650                        type_.clone(),
651                        size,
652                        &options.statistics,
653                    );
654                    Some(stats)
655                } else {
656                    None
657                };
658                let mut values = Vec::<u8>::with_capacity(size * array.len());
659                array.values().iter().for_each(|x| {
660                    let bytes = &x.to_be_bytes();
661                    values.extend_from_slice(bytes)
662                });
663                let array = FixedSizeBinaryArray::new(
664                    ArrowDataType::FixedSizeBinary(size),
665                    values.into(),
666                    array.validity().cloned(),
667                );
668
669                fixed_size_binary::array_to_page(&array, options, type_, statistics)
670            }
671        },
672        ArrowDataType::Decimal(precision, _) => {
673            let precision = *precision;
674            let array = array
675                .as_any()
676                .downcast_ref::<PrimitiveArray<i128>>()
677                .unwrap();
678            if precision <= 9 {
679                let values = array
680                    .values()
681                    .iter()
682                    .map(|x| *x as i32)
683                    .collect::<Vec<_>>()
684                    .into();
685
686                let array = PrimitiveArray::<i32>::new(
687                    ArrowDataType::Int32,
688                    values,
689                    array.validity().cloned(),
690                );
691                return primitive::array_to_page_integer::<i32, i32>(
692                    &array, options, type_, encoding,
693                );
694            } else if precision <= 18 {
695                let values = array
696                    .values()
697                    .iter()
698                    .map(|x| *x as i64)
699                    .collect::<Vec<_>>()
700                    .into();
701
702                let array = PrimitiveArray::<i64>::new(
703                    ArrowDataType::Int64,
704                    values,
705                    array.validity().cloned(),
706                );
707                return primitive::array_to_page_integer::<i64, i64>(
708                    &array, options, type_, encoding,
709                );
710            } else {
711                let size = decimal_length_from_precision(precision);
712
713                let statistics = if options.has_statistics() {
714                    let stats = fixed_size_binary::build_statistics_decimal(
715                        array,
716                        type_.clone(),
717                        size,
718                        &options.statistics,
719                    );
720                    Some(stats)
721                } else {
722                    None
723                };
724
725                let mut values = Vec::<u8>::with_capacity(size * array.len());
726                array.values().iter().for_each(|x| {
727                    let bytes = &x.to_be_bytes()[16 - size..];
728                    values.extend_from_slice(bytes)
729                });
730                let array = FixedSizeBinaryArray::new(
731                    ArrowDataType::FixedSizeBinary(size),
732                    values.into(),
733                    array.validity().cloned(),
734                );
735                fixed_size_binary::array_to_page(&array, options, type_, statistics)
736            }
737        },
738        other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
739    }
740    .map(Page::Data)
741}
742
743fn array_to_page_nested(
744    array: &dyn Array,
745    type_: ParquetPrimitiveType,
746    nested: &[Nested],
747    options: WriteOptions,
748    _encoding: Encoding,
749) -> PolarsResult<Page> {
750    use ArrowDataType::*;
751    match array.dtype().to_logical_type() {
752        Null => {
753            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
754            primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
755        },
756        Boolean => {
757            let array = array.as_any().downcast_ref().unwrap();
758            boolean::nested_array_to_page(array, options, type_, nested)
759        },
760        LargeUtf8 => {
761            let array =
762                polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
763            let array = array.as_any().downcast_ref().unwrap();
764            binary::nested_array_to_page::<i64>(array, options, type_, nested)
765        },
766        LargeBinary => {
767            let array = array.as_any().downcast_ref().unwrap();
768            binary::nested_array_to_page::<i64>(array, options, type_, nested)
769        },
770        BinaryView => {
771            let array = array.as_any().downcast_ref().unwrap();
772            binview::nested_array_to_page(array, options, type_, nested)
773        },
774        Utf8View => {
775            let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
776            let array = array.as_any().downcast_ref().unwrap();
777            binview::nested_array_to_page(array, options, type_, nested)
778        },
779        UInt8 => {
780            let array = array.as_any().downcast_ref().unwrap();
781            primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
782        },
783        UInt16 => {
784            let array = array.as_any().downcast_ref().unwrap();
785            primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
786        },
787        UInt32 => {
788            let array = array.as_any().downcast_ref().unwrap();
789            primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
790        },
791        UInt64 => {
792            let array = array.as_any().downcast_ref().unwrap();
793            primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
794        },
795        Int8 => {
796            let array = array.as_any().downcast_ref().unwrap();
797            primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
798        },
799        Int16 => {
800            let array = array.as_any().downcast_ref().unwrap();
801            primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
802        },
803        Int32 | Date32 | Time32(_) => {
804            let array = array.as_any().downcast_ref().unwrap();
805            primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
806        },
807        Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
808            let array = array.as_any().downcast_ref().unwrap();
809            primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
810        },
811        Float32 => {
812            let array = array.as_any().downcast_ref().unwrap();
813            primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
814        },
815        Float64 => {
816            let array = array.as_any().downcast_ref().unwrap();
817            primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
818        },
819        Decimal(precision, _) => {
820            let precision = *precision;
821            let array = array
822                .as_any()
823                .downcast_ref::<PrimitiveArray<i128>>()
824                .unwrap();
825            if precision <= 9 {
826                let values = array
827                    .values()
828                    .iter()
829                    .map(|x| *x as i32)
830                    .collect::<Vec<_>>()
831                    .into();
832
833                let array = PrimitiveArray::<i32>::new(
834                    ArrowDataType::Int32,
835                    values,
836                    array.validity().cloned(),
837                );
838                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
839            } else if precision <= 18 {
840                let values = array
841                    .values()
842                    .iter()
843                    .map(|x| *x as i64)
844                    .collect::<Vec<_>>()
845                    .into();
846
847                let array = PrimitiveArray::<i64>::new(
848                    ArrowDataType::Int64,
849                    values,
850                    array.validity().cloned(),
851                );
852                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
853            } else {
854                let size = decimal_length_from_precision(precision);
855
856                let statistics = if options.has_statistics() {
857                    let stats = fixed_size_binary::build_statistics_decimal(
858                        array,
859                        type_.clone(),
860                        size,
861                        &options.statistics,
862                    );
863                    Some(stats)
864                } else {
865                    None
866                };
867
868                let mut values = Vec::<u8>::with_capacity(size * array.len());
869                array.values().iter().for_each(|x| {
870                    let bytes = &x.to_be_bytes()[16 - size..];
871                    values.extend_from_slice(bytes)
872                });
873                let array = FixedSizeBinaryArray::new(
874                    ArrowDataType::FixedSizeBinary(size),
875                    values.into(),
876                    array.validity().cloned(),
877                );
878                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
879            }
880        },
881        Decimal256(precision, _) => {
882            let precision = *precision;
883            let array = array
884                .as_any()
885                .downcast_ref::<PrimitiveArray<i256>>()
886                .unwrap();
887            if precision <= 9 {
888                let values = array
889                    .values()
890                    .iter()
891                    .map(|x| x.0.as_i32())
892                    .collect::<Vec<_>>()
893                    .into();
894
895                let array = PrimitiveArray::<i32>::new(
896                    ArrowDataType::Int32,
897                    values,
898                    array.validity().cloned(),
899                );
900                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
901            } else if precision <= 18 {
902                let values = array
903                    .values()
904                    .iter()
905                    .map(|x| x.0.as_i64())
906                    .collect::<Vec<_>>()
907                    .into();
908
909                let array = PrimitiveArray::<i64>::new(
910                    ArrowDataType::Int64,
911                    values,
912                    array.validity().cloned(),
913                );
914                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
915            } else if precision <= 38 {
916                let size = decimal_length_from_precision(precision);
917                let statistics = if options.has_statistics() {
918                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
919                        array,
920                        type_.clone(),
921                        size,
922                        &options.statistics,
923                    );
924                    Some(stats)
925                } else {
926                    None
927                };
928
929                let mut values = Vec::<u8>::with_capacity(size * array.len());
930                array.values().iter().for_each(|x| {
931                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
932                    values.extend_from_slice(bytes)
933                });
934                let array = FixedSizeBinaryArray::new(
935                    ArrowDataType::FixedSizeBinary(size),
936                    values.into(),
937                    array.validity().cloned(),
938                );
939                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
940            } else {
941                let size = 32;
942                let array = array
943                    .as_any()
944                    .downcast_ref::<PrimitiveArray<i256>>()
945                    .unwrap();
946                let statistics = if options.has_statistics() {
947                    let stats = fixed_size_binary::build_statistics_decimal256(
948                        array,
949                        type_.clone(),
950                        size,
951                        &options.statistics,
952                    );
953                    Some(stats)
954                } else {
955                    None
956                };
957                let mut values = Vec::<u8>::with_capacity(size * array.len());
958                array.values().iter().for_each(|x| {
959                    let bytes = &x.to_be_bytes();
960                    values.extend_from_slice(bytes)
961                });
962                let array = FixedSizeBinaryArray::new(
963                    ArrowDataType::FixedSizeBinary(size),
964                    values.into(),
965                    array.validity().cloned(),
966                );
967
968                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
969            }
970        },
971        other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
972    }
973    .map(Page::Data)
974}
975
976fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
977    dtype: &ArrowDataType,
978    map: F,
979    encodings: &mut Vec<T>,
980) {
981    use arrow::datatypes::PhysicalType::*;
982    match dtype.to_physical_type() {
983        Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
984        | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
985        List | FixedSizeList | LargeList => {
986            let a = dtype.to_logical_type();
987            if let ArrowDataType::List(inner) = a {
988                transverse_recursive(&inner.dtype, map, encodings)
989            } else if let ArrowDataType::LargeList(inner) = a {
990                transverse_recursive(&inner.dtype, map, encodings)
991            } else if let ArrowDataType::FixedSizeList(inner, _) = a {
992                transverse_recursive(&inner.dtype, map, encodings)
993            } else {
994                unreachable!()
995            }
996        },
997        Struct => {
998            if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
999                for field in fields {
1000                    transverse_recursive(&field.dtype, map.clone(), encodings)
1001                }
1002            } else {
1003                unreachable!()
1004            }
1005        },
1006        Map => {
1007            if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1008                if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1009                    for field in fields {
1010                        transverse_recursive(&field.dtype, map.clone(), encodings)
1011                    }
1012                } else {
1013                    unreachable!()
1014                }
1015            } else {
1016                unreachable!()
1017            }
1018        },
1019        Union => todo!(),
1020    }
1021}
1022
1023/// Transverses the `dtype` up to its (parquet) columns and returns a vector of
1024/// items based on `map`.
1025///
1026/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
1027pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1028    let mut encodings = vec![];
1029    transverse_recursive(dtype, map, &mut encodings);
1030    encodings
1031}