polars_parquet/arrow/write/
mod.rs

1//! APIs to write to Parquet format.
2//!
3//! # Arrow/Parquet Interoperability
4//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet
6//! representation. These include but are not limited to:
7//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`
8//! * `ArrowDataType::Int64`
9//! * `ArrowDataType::Duration`
10//! * `ArrowDataType::Date64`
11//! * `ArrowDataType::Time32(TimeUnit::Second)`
12//!
13//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::datatypes::*;
30use arrow::types::{NativeType, days_ms, i256};
31pub use nested::{num_values, write_rep_and_def};
32pub use pages::{to_leaves, to_nested, to_parquet_leaves};
33use polars_utils::pl_str::PlSmallStr;
34pub use utils::write_def_levels;
35
36pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
37pub use crate::parquet::encoding::Encoding;
38pub use crate::parquet::metadata::{
39    Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
40};
41pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
42use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
43pub use crate::parquet::schema::types::{
44    FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
45};
46pub use crate::parquet::write::{
47    Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
48    write_metadata_sidecar,
49};
50pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
51
52/// The statistics to write
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct StatisticsOptions {
56    pub min_value: bool,
57    pub max_value: bool,
58    pub distinct_count: bool,
59    pub null_count: bool,
60}
61
62impl Default for StatisticsOptions {
63    fn default() -> Self {
64        Self {
65            min_value: true,
66            max_value: true,
67            distinct_count: false,
68            null_count: true,
69        }
70    }
71}
72
73/// Options to encode an array
74#[derive(Clone, Copy)]
75pub enum EncodeNullability {
76    Required,
77    Optional,
78}
79
80/// Currently supported options to write to parquet
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct WriteOptions {
83    /// Whether to write statistics
84    pub statistics: StatisticsOptions,
85    /// The page and file version to use
86    pub version: Version,
87    /// The compression to apply to every page
88    pub compression: CompressionOptions,
89    /// The size to flush a page, defaults to 1024 * 1024 if None
90    pub data_page_size: Option<usize>,
91}
92
93#[derive(Clone)]
94pub struct ColumnWriteOptions {
95    pub field_id: Option<i32>,
96    pub metadata: Vec<KeyValue>,
97    pub children: ChildWriteOptions,
98}
99
100#[derive(Clone)]
101pub enum ChildWriteOptions {
102    Leaf(FieldWriteOptions),
103    ListLike(Box<ListLikeFieldWriteOptions>),
104    Struct(Box<StructFieldWriteOptions>),
105}
106
107impl ColumnWriteOptions {
108    pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {
109        match &self.children {
110            ChildWriteOptions::Leaf(o) => out.push(o),
111            ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),
112            ChildWriteOptions::Struct(o) => {
113                for o in &o.children {
114                    o.to_leaves(out);
115                }
116            },
117        }
118    }
119}
120
121#[derive(Clone)]
122pub struct FieldWriteOptions {
123    pub encoding: Encoding,
124}
125
126impl ColumnWriteOptions {
127    pub fn default_with(children: ChildWriteOptions) -> Self {
128        Self {
129            field_id: None,
130            metadata: Vec::new(),
131            children,
132        }
133    }
134}
135
136impl FieldWriteOptions {
137    pub fn default_with_encoding(encoding: Encoding) -> Self {
138        Self { encoding }
139    }
140
141    pub fn into_default_column_write_options(self) -> ColumnWriteOptions {
142        ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))
143    }
144}
145
146#[derive(Clone)]
147pub struct ListLikeFieldWriteOptions {
148    pub child: ColumnWriteOptions,
149}
150
151#[derive(Clone)]
152pub struct StructFieldWriteOptions {
153    pub children: Vec<ColumnWriteOptions>,
154}
155
156use arrow::compute::aggregate::estimated_bytes_size;
157use arrow::match_integer_type;
158pub use file::FileWriter;
159pub use pages::{Nested, array_to_columns, arrays_to_columns};
160use polars_error::{PolarsResult, polars_bail};
161pub use row_group::{RowGroupIterator, row_group_iter};
162pub use schema::{schema_to_metadata_key, to_parquet_type};
163
164use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
165use crate::write::dictionary::encode_as_dictionary_optional;
166
167impl StatisticsOptions {
168    pub fn empty() -> Self {
169        Self {
170            min_value: false,
171            max_value: false,
172            distinct_count: false,
173            null_count: false,
174        }
175    }
176
177    pub fn full() -> Self {
178        Self {
179            min_value: true,
180            max_value: true,
181            distinct_count: true,
182            null_count: true,
183        }
184    }
185
186    pub fn is_empty(&self) -> bool {
187        !(self.min_value || self.max_value || self.distinct_count || self.null_count)
188    }
189
190    pub fn is_full(&self) -> bool {
191        self.min_value && self.max_value && self.distinct_count && self.null_count
192    }
193}
194
195impl WriteOptions {
196    pub fn has_statistics(&self) -> bool {
197        !self.statistics.is_empty()
198    }
199}
200
201impl EncodeNullability {
202    const fn new(is_optional: bool) -> Self {
203        if is_optional {
204            Self::Optional
205        } else {
206            Self::Required
207        }
208    }
209
210    fn is_optional(self) -> bool {
211        matches!(self, Self::Optional)
212    }
213}
214
215/// returns offset and length to slice the leaf values
216pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
217    // find the deepest recursive dremel structure as that one determines how many values we must
218    // take
219    let mut out = (0, 0);
220    for nested in nested.iter().rev() {
221        match nested {
222            Nested::LargeList(l_nested) => {
223                let start = *l_nested.offsets.first();
224                let end = *l_nested.offsets.last();
225                return (start as usize, (end - start) as usize);
226            },
227            Nested::List(l_nested) => {
228                let start = *l_nested.offsets.first();
229                let end = *l_nested.offsets.last();
230                return (start as usize, (end - start) as usize);
231            },
232            Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
233            Nested::Primitive(nested) => out = (0, nested.length),
234            Nested::Struct(_) => {},
235        }
236    }
237    out
238}
239
240fn decimal_length_from_precision(precision: usize) -> usize {
241    // digits = floor(log_10(2^(8*n - 1) - 1))
242    // ceil(digits) = log10(2^(8*n - 1) - 1)
243    // 10^ceil(digits) = 2^(8*n - 1) - 1
244    // 10^ceil(digits) + 1 = 2^(8*n - 1)
245    // log2(10^ceil(digits) + 1) = (8*n - 1)
246    // log2(10^ceil(digits) + 1) + 1 = 8*n
247    // (log2(10^ceil(a) + 1) + 1) / 8 = n
248    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
249}
250
251/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].
252pub fn to_parquet_schema(
253    schema: &ArrowSchema,
254    column_options: &[ColumnWriteOptions],
255) -> PolarsResult<SchemaDescriptor> {
256    let parquet_types = schema
257        .iter_values()
258        .zip(column_options)
259        .map(|(field, options)| to_parquet_type(field, options))
260        .collect::<PolarsResult<Vec<_>>>()?;
261    Ok(SchemaDescriptor::new(
262        PlSmallStr::from_static("root"),
263        parquet_types,
264    ))
265}
266
267/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
268pub fn slice_parquet_array(
269    primitive_array: &mut dyn Array,
270    nested: &mut [Nested],
271    mut current_offset: usize,
272    mut current_length: usize,
273) {
274    for nested in nested.iter_mut() {
275        match nested {
276            Nested::LargeList(l_nested) => {
277                l_nested.offsets.slice(current_offset, current_length + 1);
278                if let Some(validity) = l_nested.validity.as_mut() {
279                    validity.slice(current_offset, current_length)
280                };
281
282                // Update the offset/ length so that the Primitive is sliced properly.
283                current_length = l_nested.offsets.range() as usize;
284                current_offset = *l_nested.offsets.first() as usize;
285            },
286            Nested::List(l_nested) => {
287                l_nested.offsets.slice(current_offset, current_length + 1);
288                if let Some(validity) = l_nested.validity.as_mut() {
289                    validity.slice(current_offset, current_length)
290                };
291
292                // Update the offset/ length so that the Primitive is sliced properly.
293                current_length = l_nested.offsets.range() as usize;
294                current_offset = *l_nested.offsets.first() as usize;
295            },
296            Nested::Struct(StructNested {
297                validity, length, ..
298            }) => {
299                *length = current_length;
300                if let Some(validity) = validity.as_mut() {
301                    validity.slice(current_offset, current_length)
302                };
303            },
304            Nested::Primitive(PrimitiveNested {
305                validity, length, ..
306            }) => {
307                *length = current_length;
308                if let Some(validity) = validity.as_mut() {
309                    validity.slice(current_offset, current_length)
310                };
311                primitive_array.slice(current_offset, current_length);
312            },
313            Nested::FixedSizeList(FixedSizeListNested {
314                validity,
315                length,
316                width,
317                ..
318            }) => {
319                if let Some(validity) = validity.as_mut() {
320                    validity.slice(current_offset, current_length)
321                };
322                *length = current_length;
323                // Update the offset/ length so that the Primitive is sliced properly.
324                current_length *= *width;
325                current_offset *= *width;
326            },
327        }
328    }
329}
330
331/// Get the length of [`Array`] that should be sliced.
332pub fn get_max_length(nested: &[Nested]) -> usize {
333    let mut length = 0;
334    for nested in nested.iter() {
335        match nested {
336            Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
337            Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
338            Nested::FixedSizeList(nested) => length += nested.length * nested.width,
339            _ => {},
340        }
341    }
342    length
343}
344
345/// Returns an iterator of [`Page`].
346pub fn array_to_pages(
347    primitive_array: &dyn Array,
348    type_: ParquetPrimitiveType,
349    nested: &[Nested],
350    options: WriteOptions,
351    field_options: &FieldWriteOptions,
352) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
353    let mut encoding = field_options.encoding;
354    if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
355        return match_integer_type!(key_type, |$T| {
356            dictionary::array_to_pages::<$T>(
357                primitive_array.as_any().downcast_ref().unwrap(),
358                type_,
359                &nested,
360                options,
361                encoding,
362            )
363        });
364    };
365    if let Encoding::RleDictionary = encoding {
366        // Only take this path for primitive columns
367        if matches!(nested.first(), Some(Nested::Primitive(_))) {
368            if let Some(result) =
369                encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
370            {
371                return result;
372            }
373        }
374
375        // We didn't succeed, fallback to plain
376        encoding = Encoding::Plain;
377    }
378
379    let nested = nested.to_vec();
380
381    let number_of_rows = nested[0].len();
382
383    // note: this is not correct if the array is sliced - the estimation should happen on the
384    // primitive after sliced for parquet
385    let byte_size = estimated_bytes_size(primitive_array);
386
387    const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
388    let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
389    let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
390    let bytes_per_row = if number_of_rows == 0 {
391        0
392    } else {
393        ((byte_size as f64) / (number_of_rows as f64)) as usize
394    };
395    let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
396
397    let row_iter = (0..number_of_rows)
398        .step_by(rows_per_page)
399        .map(move |offset| {
400            let length = if offset + rows_per_page > number_of_rows {
401                number_of_rows - offset
402            } else {
403                rows_per_page
404            };
405            (offset, length)
406        });
407
408    let primitive_array = primitive_array.to_boxed();
409
410    let pages = row_iter.map(move |(offset, length)| {
411        let mut right_array = primitive_array.clone();
412        let mut right_nested = nested.clone();
413        slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
414
415        array_to_page(
416            right_array.as_ref(),
417            type_.clone(),
418            &right_nested,
419            options,
420            encoding,
421        )
422    });
423    Ok(DynIter::new(pages))
424}
425
426/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
427pub fn array_to_page(
428    array: &dyn Array,
429    type_: ParquetPrimitiveType,
430    nested: &[Nested],
431    options: WriteOptions,
432    encoding: Encoding,
433) -> PolarsResult<Page> {
434    if nested.len() == 1 {
435        // special case where validity == def levels
436        return array_to_page_simple(array, type_, options, encoding);
437    }
438    array_to_page_nested(array, type_, nested, options, encoding)
439}
440
441/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
442pub fn array_to_page_simple(
443    array: &dyn Array,
444    type_: ParquetPrimitiveType,
445    options: WriteOptions,
446    encoding: Encoding,
447) -> PolarsResult<Page> {
448    let dtype = array.dtype();
449
450    match dtype.to_logical_type() {
451        ArrowDataType::Boolean => boolean::array_to_page(
452            array.as_any().downcast_ref().unwrap(),
453            options,
454            type_,
455            encoding,
456        ),
457        // casts below MUST match the casts done at the metadata (field -> parquet type).
458        ArrowDataType::UInt8 => {
459            return primitive::array_to_page_integer::<u8, i32>(
460                array.as_any().downcast_ref().unwrap(),
461                options,
462                type_,
463                encoding,
464            );
465        },
466        ArrowDataType::UInt16 => {
467            return primitive::array_to_page_integer::<u16, i32>(
468                array.as_any().downcast_ref().unwrap(),
469                options,
470                type_,
471                encoding,
472            );
473        },
474        ArrowDataType::UInt32 => {
475            return primitive::array_to_page_integer::<u32, i32>(
476                array.as_any().downcast_ref().unwrap(),
477                options,
478                type_,
479                encoding,
480            );
481        },
482        ArrowDataType::UInt64 => {
483            return primitive::array_to_page_integer::<u64, i64>(
484                array.as_any().downcast_ref().unwrap(),
485                options,
486                type_,
487                encoding,
488            );
489        },
490        ArrowDataType::Int8 => {
491            return primitive::array_to_page_integer::<i8, i32>(
492                array.as_any().downcast_ref().unwrap(),
493                options,
494                type_,
495                encoding,
496            );
497        },
498        ArrowDataType::Int16 => {
499            return primitive::array_to_page_integer::<i16, i32>(
500                array.as_any().downcast_ref().unwrap(),
501                options,
502                type_,
503                encoding,
504            );
505        },
506        ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
507            return primitive::array_to_page_integer::<i32, i32>(
508                array.as_any().downcast_ref().unwrap(),
509                options,
510                type_,
511                encoding,
512            );
513        },
514        ArrowDataType::Int64
515        | ArrowDataType::Date64
516        | ArrowDataType::Time64(_)
517        | ArrowDataType::Timestamp(_, _)
518        | ArrowDataType::Duration(_) => {
519            return primitive::array_to_page_integer::<i64, i64>(
520                array.as_any().downcast_ref().unwrap(),
521                options,
522                type_,
523                encoding,
524            );
525        },
526        ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
527            array.as_any().downcast_ref().unwrap(),
528            options,
529            type_,
530        ),
531        ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
532            array.as_any().downcast_ref().unwrap(),
533            options,
534            type_,
535        ),
536        ArrowDataType::LargeUtf8 => {
537            let array =
538                polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
539                    .unwrap();
540            return binary::array_to_page::<i64>(
541                array.as_any().downcast_ref().unwrap(),
542                options,
543                type_,
544                encoding,
545            );
546        },
547        ArrowDataType::LargeBinary => {
548            return binary::array_to_page::<i64>(
549                array.as_any().downcast_ref().unwrap(),
550                options,
551                type_,
552                encoding,
553            );
554        },
555        ArrowDataType::BinaryView => {
556            return binview::array_to_page(
557                array.as_any().downcast_ref().unwrap(),
558                options,
559                type_,
560                encoding,
561            );
562        },
563        ArrowDataType::Utf8View => {
564            let array =
565                polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
566                    .unwrap();
567            return binview::array_to_page(
568                array.as_any().downcast_ref().unwrap(),
569                options,
570                type_,
571                encoding,
572            );
573        },
574        ArrowDataType::Null => {
575            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
576            primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
577        },
578        ArrowDataType::Interval(IntervalUnit::YearMonth) => {
579            let array = array
580                .as_any()
581                .downcast_ref::<PrimitiveArray<i32>>()
582                .unwrap();
583            let mut values = Vec::<u8>::with_capacity(12 * array.len());
584            array.values().iter().for_each(|x| {
585                let bytes = &x.to_le_bytes();
586                values.extend_from_slice(bytes);
587                values.extend_from_slice(&[0; 8]);
588            });
589            let array = FixedSizeBinaryArray::new(
590                ArrowDataType::FixedSizeBinary(12),
591                values.into(),
592                array.validity().cloned(),
593            );
594            let statistics = if options.has_statistics() {
595                Some(fixed_size_binary::build_statistics(
596                    &array,
597                    type_.clone(),
598                    &options.statistics,
599                ))
600            } else {
601                None
602            };
603            fixed_size_binary::array_to_page(&array, options, type_, statistics)
604        },
605        ArrowDataType::Interval(IntervalUnit::DayTime) => {
606            let array = array
607                .as_any()
608                .downcast_ref::<PrimitiveArray<days_ms>>()
609                .unwrap();
610            let mut values = Vec::<u8>::with_capacity(12 * array.len());
611            array.values().iter().for_each(|x| {
612                let bytes = &x.to_le_bytes();
613                values.extend_from_slice(&[0; 4]); // months
614                values.extend_from_slice(bytes); // days and seconds
615            });
616            let array = FixedSizeBinaryArray::new(
617                ArrowDataType::FixedSizeBinary(12),
618                values.into(),
619                array.validity().cloned(),
620            );
621            let statistics = if options.has_statistics() {
622                Some(fixed_size_binary::build_statistics(
623                    &array,
624                    type_.clone(),
625                    &options.statistics,
626                ))
627            } else {
628                None
629            };
630            fixed_size_binary::array_to_page(&array, options, type_, statistics)
631        },
632        ArrowDataType::FixedSizeBinary(_) => {
633            let array = array.as_any().downcast_ref().unwrap();
634            let statistics = if options.has_statistics() {
635                Some(fixed_size_binary::build_statistics(
636                    array,
637                    type_.clone(),
638                    &options.statistics,
639                ))
640            } else {
641                None
642            };
643
644            fixed_size_binary::array_to_page(array, options, type_, statistics)
645        },
646        ArrowDataType::Decimal256(precision, _) => {
647            let precision = *precision;
648            let array = array
649                .as_any()
650                .downcast_ref::<PrimitiveArray<i256>>()
651                .unwrap();
652            if precision <= 9 {
653                let values = array
654                    .values()
655                    .iter()
656                    .map(|x| x.0.as_i32())
657                    .collect::<Vec<_>>()
658                    .into();
659
660                let array = PrimitiveArray::<i32>::new(
661                    ArrowDataType::Int32,
662                    values,
663                    array.validity().cloned(),
664                );
665                return primitive::array_to_page_integer::<i32, i32>(
666                    &array, options, type_, encoding,
667                );
668            } else if precision <= 18 {
669                let values = array
670                    .values()
671                    .iter()
672                    .map(|x| x.0.as_i64())
673                    .collect::<Vec<_>>()
674                    .into();
675
676                let array = PrimitiveArray::<i64>::new(
677                    ArrowDataType::Int64,
678                    values,
679                    array.validity().cloned(),
680                );
681                return primitive::array_to_page_integer::<i64, i64>(
682                    &array, options, type_, encoding,
683                );
684            } else if precision <= 38 {
685                let size = decimal_length_from_precision(precision);
686                let statistics = if options.has_statistics() {
687                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
688                        array,
689                        type_.clone(),
690                        size,
691                        &options.statistics,
692                    );
693                    Some(stats)
694                } else {
695                    None
696                };
697
698                let mut values = Vec::<u8>::with_capacity(size * array.len());
699                array.values().iter().for_each(|x| {
700                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
701                    values.extend_from_slice(bytes)
702                });
703                let array = FixedSizeBinaryArray::new(
704                    ArrowDataType::FixedSizeBinary(size),
705                    values.into(),
706                    array.validity().cloned(),
707                );
708                fixed_size_binary::array_to_page(&array, options, type_, statistics)
709            } else {
710                let size = 32;
711                let array = array
712                    .as_any()
713                    .downcast_ref::<PrimitiveArray<i256>>()
714                    .unwrap();
715                let statistics = if options.has_statistics() {
716                    let stats = fixed_size_binary::build_statistics_decimal256(
717                        array,
718                        type_.clone(),
719                        size,
720                        &options.statistics,
721                    );
722                    Some(stats)
723                } else {
724                    None
725                };
726                let mut values = Vec::<u8>::with_capacity(size * array.len());
727                array.values().iter().for_each(|x| {
728                    let bytes = &x.to_be_bytes();
729                    values.extend_from_slice(bytes)
730                });
731                let array = FixedSizeBinaryArray::new(
732                    ArrowDataType::FixedSizeBinary(size),
733                    values.into(),
734                    array.validity().cloned(),
735                );
736
737                fixed_size_binary::array_to_page(&array, options, type_, statistics)
738            }
739        },
740        ArrowDataType::Decimal(precision, _) => {
741            let precision = *precision;
742            let array = array
743                .as_any()
744                .downcast_ref::<PrimitiveArray<i128>>()
745                .unwrap();
746            if precision <= 9 {
747                let values = array
748                    .values()
749                    .iter()
750                    .map(|x| *x as i32)
751                    .collect::<Vec<_>>()
752                    .into();
753
754                let array = PrimitiveArray::<i32>::new(
755                    ArrowDataType::Int32,
756                    values,
757                    array.validity().cloned(),
758                );
759                return primitive::array_to_page_integer::<i32, i32>(
760                    &array, options, type_, encoding,
761                );
762            } else if precision <= 18 {
763                let values = array
764                    .values()
765                    .iter()
766                    .map(|x| *x as i64)
767                    .collect::<Vec<_>>()
768                    .into();
769
770                let array = PrimitiveArray::<i64>::new(
771                    ArrowDataType::Int64,
772                    values,
773                    array.validity().cloned(),
774                );
775                return primitive::array_to_page_integer::<i64, i64>(
776                    &array, options, type_, encoding,
777                );
778            } else {
779                let size = decimal_length_from_precision(precision);
780
781                let statistics = if options.has_statistics() {
782                    let stats = fixed_size_binary::build_statistics_decimal(
783                        array,
784                        type_.clone(),
785                        size,
786                        &options.statistics,
787                    );
788                    Some(stats)
789                } else {
790                    None
791                };
792
793                let mut values = Vec::<u8>::with_capacity(size * array.len());
794                array.values().iter().for_each(|x| {
795                    let bytes = &x.to_be_bytes()[16 - size..];
796                    values.extend_from_slice(bytes)
797                });
798                let array = FixedSizeBinaryArray::new(
799                    ArrowDataType::FixedSizeBinary(size),
800                    values.into(),
801                    array.validity().cloned(),
802                );
803                fixed_size_binary::array_to_page(&array, options, type_, statistics)
804            }
805        },
806        other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
807    }
808    .map(Page::Data)
809}
810
811fn array_to_page_nested(
812    array: &dyn Array,
813    type_: ParquetPrimitiveType,
814    nested: &[Nested],
815    options: WriteOptions,
816    _encoding: Encoding,
817) -> PolarsResult<Page> {
818    use ArrowDataType::*;
819    match array.dtype().to_logical_type() {
820        Null => {
821            let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
822            primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
823        },
824        Boolean => {
825            let array = array.as_any().downcast_ref().unwrap();
826            boolean::nested_array_to_page(array, options, type_, nested)
827        },
828        LargeUtf8 => {
829            let array =
830                polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
831            let array = array.as_any().downcast_ref().unwrap();
832            binary::nested_array_to_page::<i64>(array, options, type_, nested)
833        },
834        LargeBinary => {
835            let array = array.as_any().downcast_ref().unwrap();
836            binary::nested_array_to_page::<i64>(array, options, type_, nested)
837        },
838        BinaryView => {
839            let array = array.as_any().downcast_ref().unwrap();
840            binview::nested_array_to_page(array, options, type_, nested)
841        },
842        Utf8View => {
843            let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
844            let array = array.as_any().downcast_ref().unwrap();
845            binview::nested_array_to_page(array, options, type_, nested)
846        },
847        UInt8 => {
848            let array = array.as_any().downcast_ref().unwrap();
849            primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
850        },
851        UInt16 => {
852            let array = array.as_any().downcast_ref().unwrap();
853            primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
854        },
855        UInt32 => {
856            let array = array.as_any().downcast_ref().unwrap();
857            primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
858        },
859        UInt64 => {
860            let array = array.as_any().downcast_ref().unwrap();
861            primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
862        },
863        Int8 => {
864            let array = array.as_any().downcast_ref().unwrap();
865            primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
866        },
867        Int16 => {
868            let array = array.as_any().downcast_ref().unwrap();
869            primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
870        },
871        Int32 | Date32 | Time32(_) => {
872            let array = array.as_any().downcast_ref().unwrap();
873            primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
874        },
875        Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
876            let array = array.as_any().downcast_ref().unwrap();
877            primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
878        },
879        Float32 => {
880            let array = array.as_any().downcast_ref().unwrap();
881            primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
882        },
883        Float64 => {
884            let array = array.as_any().downcast_ref().unwrap();
885            primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
886        },
887        Decimal(precision, _) => {
888            let precision = *precision;
889            let array = array
890                .as_any()
891                .downcast_ref::<PrimitiveArray<i128>>()
892                .unwrap();
893            if precision <= 9 {
894                let values = array
895                    .values()
896                    .iter()
897                    .map(|x| *x as i32)
898                    .collect::<Vec<_>>()
899                    .into();
900
901                let array = PrimitiveArray::<i32>::new(
902                    ArrowDataType::Int32,
903                    values,
904                    array.validity().cloned(),
905                );
906                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
907            } else if precision <= 18 {
908                let values = array
909                    .values()
910                    .iter()
911                    .map(|x| *x as i64)
912                    .collect::<Vec<_>>()
913                    .into();
914
915                let array = PrimitiveArray::<i64>::new(
916                    ArrowDataType::Int64,
917                    values,
918                    array.validity().cloned(),
919                );
920                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
921            } else {
922                let size = decimal_length_from_precision(precision);
923
924                let statistics = if options.has_statistics() {
925                    let stats = fixed_size_binary::build_statistics_decimal(
926                        array,
927                        type_.clone(),
928                        size,
929                        &options.statistics,
930                    );
931                    Some(stats)
932                } else {
933                    None
934                };
935
936                let mut values = Vec::<u8>::with_capacity(size * array.len());
937                array.values().iter().for_each(|x| {
938                    let bytes = &x.to_be_bytes()[16 - size..];
939                    values.extend_from_slice(bytes)
940                });
941                let array = FixedSizeBinaryArray::new(
942                    ArrowDataType::FixedSizeBinary(size),
943                    values.into(),
944                    array.validity().cloned(),
945                );
946                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
947            }
948        },
949        Decimal256(precision, _) => {
950            let precision = *precision;
951            let array = array
952                .as_any()
953                .downcast_ref::<PrimitiveArray<i256>>()
954                .unwrap();
955            if precision <= 9 {
956                let values = array
957                    .values()
958                    .iter()
959                    .map(|x| x.0.as_i32())
960                    .collect::<Vec<_>>()
961                    .into();
962
963                let array = PrimitiveArray::<i32>::new(
964                    ArrowDataType::Int32,
965                    values,
966                    array.validity().cloned(),
967                );
968                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
969            } else if precision <= 18 {
970                let values = array
971                    .values()
972                    .iter()
973                    .map(|x| x.0.as_i64())
974                    .collect::<Vec<_>>()
975                    .into();
976
977                let array = PrimitiveArray::<i64>::new(
978                    ArrowDataType::Int64,
979                    values,
980                    array.validity().cloned(),
981                );
982                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
983            } else if precision <= 38 {
984                let size = decimal_length_from_precision(precision);
985                let statistics = if options.has_statistics() {
986                    let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
987                        array,
988                        type_.clone(),
989                        size,
990                        &options.statistics,
991                    );
992                    Some(stats)
993                } else {
994                    None
995                };
996
997                let mut values = Vec::<u8>::with_capacity(size * array.len());
998                array.values().iter().for_each(|x| {
999                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
1000                    values.extend_from_slice(bytes)
1001                });
1002                let array = FixedSizeBinaryArray::new(
1003                    ArrowDataType::FixedSizeBinary(size),
1004                    values.into(),
1005                    array.validity().cloned(),
1006                );
1007                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1008            } else {
1009                let size = 32;
1010                let array = array
1011                    .as_any()
1012                    .downcast_ref::<PrimitiveArray<i256>>()
1013                    .unwrap();
1014                let statistics = if options.has_statistics() {
1015                    let stats = fixed_size_binary::build_statistics_decimal256(
1016                        array,
1017                        type_.clone(),
1018                        size,
1019                        &options.statistics,
1020                    );
1021                    Some(stats)
1022                } else {
1023                    None
1024                };
1025                let mut values = Vec::<u8>::with_capacity(size * array.len());
1026                array.values().iter().for_each(|x| {
1027                    let bytes = &x.to_be_bytes();
1028                    values.extend_from_slice(bytes)
1029                });
1030                let array = FixedSizeBinaryArray::new(
1031                    ArrowDataType::FixedSizeBinary(size),
1032                    values.into(),
1033                    array.validity().cloned(),
1034                );
1035
1036                fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1037            }
1038        },
1039        other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1040    }
1041    .map(Page::Data)
1042}
1043
1044fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
1045    dtype: &ArrowDataType,
1046    map: F,
1047    encodings: &mut Vec<T>,
1048) {
1049    use arrow::datatypes::PhysicalType::*;
1050    match dtype.to_physical_type() {
1051        Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1052        | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
1053        List | FixedSizeList | LargeList => {
1054            let a = dtype.to_logical_type();
1055            if let ArrowDataType::List(inner) = a {
1056                transverse_recursive(&inner.dtype, map, encodings)
1057            } else if let ArrowDataType::LargeList(inner) = a {
1058                transverse_recursive(&inner.dtype, map, encodings)
1059            } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1060                transverse_recursive(&inner.dtype, map, encodings)
1061            } else {
1062                unreachable!()
1063            }
1064        },
1065        Struct => {
1066            if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
1067                for field in fields {
1068                    transverse_recursive(&field.dtype, map.clone(), encodings)
1069                }
1070            } else {
1071                unreachable!()
1072            }
1073        },
1074        Map => {
1075            if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1076                if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1077                    for field in fields {
1078                        transverse_recursive(&field.dtype, map.clone(), encodings)
1079                    }
1080                } else {
1081                    unreachable!()
1082                }
1083            } else {
1084                unreachable!()
1085            }
1086        },
1087        Union => todo!(),
1088    }
1089}
1090
1091/// Transverses the `dtype` up to its (parquet) columns and returns a vector of
1092/// items based on `map`.
1093///
1094/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
1095pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1096    let mut encodings = vec![];
1097    transverse_recursive(dtype, map, &mut encodings);
1098    encodings
1099}