arrow2/io/parquet/write/
mod.rs

1//! APIs to write to Parquet format.
2//!
3//! # Arrow/Parquet Interoperability
4//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5//! there are Arrow [DataTypes](crate::datatypes::DataType) which do not have a parquet
6//! representation. These include but are not limited to:
7//! * `DataType::Timestamp(TimeUnit::Second, _)`
8//! * `DataType::Int64`
9//! * `DataType::Duration`
10//! * `DataType::Date64`
11//! * `DataType::Time32(TimeUnit::Second)`
12//!
13//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15mod binary;
16mod boolean;
17mod dictionary;
18mod file;
19mod fixed_len_bytes;
20mod nested;
21mod pages;
22mod primitive;
23mod row_group;
24mod schema;
25mod sink;
26mod utf8;
27mod utils;
28
29use crate::array::*;
30use crate::datatypes::*;
31use crate::error::{Error, Result};
32use crate::types::days_ms;
33use crate::types::i256;
34use crate::types::NativeType;
35
36pub use nested::{num_values, write_rep_and_def};
37pub use pages::{to_leaves, to_nested, to_parquet_leaves};
38use parquet2::schema::types::PrimitiveType as ParquetPrimitiveType;
39pub use parquet2::{
40    compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel},
41    encoding::Encoding,
42    fallible_streaming_iterator,
43    metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData},
44    page::{CompressedDataPage, CompressedPage, Page},
45    schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType},
46    write::{
47        compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
48        Version,
49    },
50    FallibleStreamingIterator,
51};
52pub use utils::write_def_levels;
53
54/// Currently supported options to write to parquet
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct WriteOptions {
57    /// Whether to write statistics
58    pub write_statistics: bool,
59    /// The page and file version to use
60    pub version: Version,
61    /// The compression to apply to every page
62    pub compression: CompressionOptions,
63    /// The size to flush a page, defaults to 1024 * 1024 if None
64    pub data_pagesize_limit: Option<usize>,
65}
66
67use crate::compute::aggregate::estimated_bytes_size;
68pub use file::FileWriter;
69pub use row_group::{row_group_iter, RowGroupIterator};
70pub use schema::to_parquet_type;
71pub use sink::FileSink;
72
73pub use pages::array_to_columns;
74pub use pages::Nested;
75
76/// returns offset and length to slice the leaf values
77pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
78    // find the deepest recursive dremel structure as that one determines how many values we must
79    // take
80    let mut out = (0, 0);
81    for nested in nested.iter().rev() {
82        match nested {
83            Nested::LargeList(l_nested) => {
84                let start = *l_nested.offsets.first();
85                let end = *l_nested.offsets.last();
86                return (start as usize, (end - start) as usize);
87            }
88            Nested::List(l_nested) => {
89                let start = *l_nested.offsets.first();
90                let end = *l_nested.offsets.last();
91                return (start as usize, (end - start) as usize);
92            }
93            Nested::Primitive(_, _, len) => out = (0, *len),
94            _ => {}
95        }
96    }
97    out
98}
99
100pub(self) fn decimal_length_from_precision(precision: usize) -> usize {
101    // digits = floor(log_10(2^(8*n - 1) - 1))
102    // ceil(digits) = log10(2^(8*n - 1) - 1)
103    // 10^ceil(digits) = 2^(8*n - 1) - 1
104    // 10^ceil(digits) + 1 = 2^(8*n - 1)
105    // log2(10^ceil(digits) + 1) = (8*n - 1)
106    // log2(10^ceil(digits) + 1) + 1 = 8*n
107    // (log2(10^ceil(a) + 1) + 1) / 8 = n
108    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
109}
110
111/// Creates a parquet [`SchemaDescriptor`] from a [`Schema`].
112pub fn to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
113    let parquet_types = schema
114        .fields
115        .iter()
116        .map(to_parquet_type)
117        .collect::<Result<Vec<_>>>()?;
118    Ok(SchemaDescriptor::new("root".to_string(), parquet_types))
119}
120
121/// Checks whether the `data_type` can be encoded as `encoding`.
122/// Note that this is whether this implementation supports it, which is a subset of
123/// what the parquet spec allows.
124pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
125    if let (Encoding::DeltaBinaryPacked, DataType::Decimal(p, _)) =
126        (encoding, data_type.to_logical_type())
127    {
128        return *p <= 18;
129    };
130
131    matches!(
132        (encoding, data_type.to_logical_type()),
133        (Encoding::Plain, _)
134            | (
135                Encoding::DeltaLengthByteArray,
136                DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8,
137            )
138            | (Encoding::RleDictionary, DataType::Dictionary(_, _, _))
139            | (Encoding::PlainDictionary, DataType::Dictionary(_, _, _))
140            | (
141                Encoding::DeltaBinaryPacked,
142                DataType::Null
143                    | DataType::UInt8
144                    | DataType::UInt16
145                    | DataType::UInt32
146                    | DataType::UInt64
147                    | DataType::Int8
148                    | DataType::Int16
149                    | DataType::Int32
150                    | DataType::Date32
151                    | DataType::Time32(_)
152                    | DataType::Int64
153                    | DataType::Date64
154                    | DataType::Time64(_)
155                    | DataType::Timestamp(_, _)
156                    | DataType::Duration(_)
157            )
158    )
159}
160
161/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
162pub fn slice_parquet_array(
163    primitive_array: &mut dyn Array,
164    nested: &mut [Nested],
165    mut current_offset: usize,
166    mut current_length: usize,
167) {
168    for nested in nested.iter_mut() {
169        match nested {
170            Nested::LargeList(l_nested) => {
171                l_nested.offsets.slice(current_offset, current_length + 1);
172                if let Some(validity) = l_nested.validity.as_mut() {
173                    validity.slice(current_offset, current_length)
174                };
175
176                current_length = l_nested.offsets.range() as usize;
177                current_offset = *l_nested.offsets.first() as usize;
178            }
179            Nested::List(l_nested) => {
180                l_nested.offsets.slice(current_offset, current_length + 1);
181                if let Some(validity) = l_nested.validity.as_mut() {
182                    validity.slice(current_offset, current_length)
183                };
184
185                current_length = l_nested.offsets.range() as usize;
186                current_offset = *l_nested.offsets.first() as usize;
187            }
188            Nested::Struct(validity, _, length) => {
189                *length = current_length;
190                if let Some(validity) = validity.as_mut() {
191                    validity.slice(current_offset, current_length)
192                };
193            }
194            Nested::Primitive(validity, _, length) => {
195                *length = current_length;
196                if let Some(validity) = validity.as_mut() {
197                    validity.slice(current_offset, current_length)
198                };
199                primitive_array.slice(current_offset, current_length);
200            }
201        }
202    }
203}
204
205/// Get the length of [`Array`] that should be sliced.
206pub fn get_max_length(nested: &[Nested]) -> usize {
207    let mut length = 0;
208    for nested in nested.iter() {
209        match nested {
210            Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
211            Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
212            _ => {}
213        }
214    }
215    length
216}
217
218/// Returns an iterator of [`Page`].
219pub fn array_to_pages(
220    primitive_array: &dyn Array,
221    type_: ParquetPrimitiveType,
222    nested: &[Nested],
223    options: WriteOptions,
224    encoding: Encoding,
225) -> Result<DynIter<'static, Result<Page>>> {
226    if let DataType::Dictionary(key_type, _, _) = primitive_array.data_type().to_logical_type() {
227        return match_integer_type!(key_type, |$T| {
228            dictionary::array_to_pages::<$T>(
229                primitive_array.as_any().downcast_ref().unwrap(),
230                type_,
231                &nested,
232                options,
233                encoding,
234            )
235        });
236    };
237
238    let nested = nested.to_vec();
239    let primitive_array = primitive_array.to_boxed();
240
241    let number_of_rows = nested[0].len();
242
243    // note: this is not correct if the array is sliced - the estimation should happen on the
244    // primitive after sliced for parquet
245    let byte_size = estimated_bytes_size(primitive_array.as_ref());
246
247    const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
248    let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
249    let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
250    let bytes_per_row = if number_of_rows == 0 {
251        0
252    } else {
253        ((byte_size as f64) / (number_of_rows as f64)) as usize
254    };
255    let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
256
257    let pages = (0..number_of_rows)
258        .step_by(rows_per_page)
259        .map(move |offset| {
260            let length = if offset + rows_per_page > number_of_rows {
261                number_of_rows - offset
262            } else {
263                rows_per_page
264            };
265
266            let mut right_array = primitive_array.clone();
267            let mut right_nested = nested.clone();
268            slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
269
270            array_to_page(
271                right_array.as_ref(),
272                type_.clone(),
273                &right_nested,
274                options,
275                encoding,
276            )
277        });
278
279    Ok(DynIter::new(pages))
280}
281
282/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
283pub fn array_to_page(
284    array: &dyn Array,
285    type_: ParquetPrimitiveType,
286    nested: &[Nested],
287    options: WriteOptions,
288    encoding: Encoding,
289) -> Result<Page> {
290    if nested.len() == 1 {
291        // special case where validity == def levels
292        return array_to_page_simple(array, type_, options, encoding);
293    }
294    array_to_page_nested(array, type_, nested, options, encoding)
295}
296
297/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
298pub fn array_to_page_simple(
299    array: &dyn Array,
300    type_: ParquetPrimitiveType,
301    options: WriteOptions,
302    encoding: Encoding,
303) -> Result<Page> {
304    let data_type = array.data_type();
305    if !can_encode(data_type, encoding) {
306        return Err(Error::InvalidArgumentError(format!(
307            "The datatype {data_type:?} cannot be encoded by {encoding:?}"
308        )));
309    }
310
311    match data_type.to_logical_type() {
312        DataType::Boolean => {
313            boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, type_)
314        }
315        // casts below MUST match the casts done at the metadata (field -> parquet type).
316        DataType::UInt8 => primitive::array_to_page_integer::<u8, i32>(
317            array.as_any().downcast_ref().unwrap(),
318            options,
319            type_,
320            encoding,
321        ),
322        DataType::UInt16 => primitive::array_to_page_integer::<u16, i32>(
323            array.as_any().downcast_ref().unwrap(),
324            options,
325            type_,
326            encoding,
327        ),
328        DataType::UInt32 => primitive::array_to_page_integer::<u32, i32>(
329            array.as_any().downcast_ref().unwrap(),
330            options,
331            type_,
332            encoding,
333        ),
334        DataType::UInt64 => primitive::array_to_page_integer::<u64, i64>(
335            array.as_any().downcast_ref().unwrap(),
336            options,
337            type_,
338            encoding,
339        ),
340        DataType::Int8 => primitive::array_to_page_integer::<i8, i32>(
341            array.as_any().downcast_ref().unwrap(),
342            options,
343            type_,
344            encoding,
345        ),
346        DataType::Int16 => primitive::array_to_page_integer::<i16, i32>(
347            array.as_any().downcast_ref().unwrap(),
348            options,
349            type_,
350            encoding,
351        ),
352        DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
353            primitive::array_to_page_integer::<i32, i32>(
354                array.as_any().downcast_ref().unwrap(),
355                options,
356                type_,
357                encoding,
358            )
359        }
360        DataType::Int64
361        | DataType::Date64
362        | DataType::Time64(_)
363        | DataType::Timestamp(_, _)
364        | DataType::Duration(_) => primitive::array_to_page_integer::<i64, i64>(
365            array.as_any().downcast_ref().unwrap(),
366            options,
367            type_,
368            encoding,
369        ),
370        DataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
371            array.as_any().downcast_ref().unwrap(),
372            options,
373            type_,
374        ),
375        DataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
376            array.as_any().downcast_ref().unwrap(),
377            options,
378            type_,
379        ),
380        DataType::Utf8 => utf8::array_to_page::<i32>(
381            array.as_any().downcast_ref().unwrap(),
382            options,
383            type_,
384            encoding,
385        ),
386        DataType::LargeUtf8 => utf8::array_to_page::<i64>(
387            array.as_any().downcast_ref().unwrap(),
388            options,
389            type_,
390            encoding,
391        ),
392        DataType::Binary => binary::array_to_page::<i32>(
393            array.as_any().downcast_ref().unwrap(),
394            options,
395            type_,
396            encoding,
397        ),
398        DataType::LargeBinary => binary::array_to_page::<i64>(
399            array.as_any().downcast_ref().unwrap(),
400            options,
401            type_,
402            encoding,
403        ),
404        DataType::Null => {
405            let array = Int32Array::new_null(DataType::Int32, array.len());
406            primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
407        }
408        DataType::Interval(IntervalUnit::YearMonth) => {
409            let type_ = type_;
410            let array = array
411                .as_any()
412                .downcast_ref::<PrimitiveArray<i32>>()
413                .unwrap();
414            let mut values = Vec::<u8>::with_capacity(12 * array.len());
415            array.values().iter().for_each(|x| {
416                let bytes = &x.to_le_bytes();
417                values.extend_from_slice(bytes);
418                values.extend_from_slice(&[0; 8]);
419            });
420            let array = FixedSizeBinaryArray::new(
421                DataType::FixedSizeBinary(12),
422                values.into(),
423                array.validity().cloned(),
424            );
425            let statistics = if options.write_statistics {
426                Some(fixed_len_bytes::build_statistics(&array, type_.clone()))
427            } else {
428                None
429            };
430            fixed_len_bytes::array_to_page(&array, options, type_, statistics)
431        }
432        DataType::Interval(IntervalUnit::DayTime) => {
433            let type_ = type_;
434            let array = array
435                .as_any()
436                .downcast_ref::<PrimitiveArray<days_ms>>()
437                .unwrap();
438            let mut values = Vec::<u8>::with_capacity(12 * array.len());
439            array.values().iter().for_each(|x| {
440                let bytes = &x.to_le_bytes();
441                values.extend_from_slice(&[0; 4]); // months
442                values.extend_from_slice(bytes); // days and seconds
443            });
444            let array = FixedSizeBinaryArray::new(
445                DataType::FixedSizeBinary(12),
446                values.into(),
447                array.validity().cloned(),
448            );
449            let statistics = if options.write_statistics {
450                Some(fixed_len_bytes::build_statistics(&array, type_.clone()))
451            } else {
452                None
453            };
454            fixed_len_bytes::array_to_page(&array, options, type_, statistics)
455        }
456        DataType::FixedSizeBinary(_) => {
457            let type_ = type_;
458            let array = array.as_any().downcast_ref().unwrap();
459            let statistics = if options.write_statistics {
460                Some(fixed_len_bytes::build_statistics(array, type_.clone()))
461            } else {
462                None
463            };
464
465            fixed_len_bytes::array_to_page(array, options, type_, statistics)
466        }
467        DataType::Decimal256(precision, _) => {
468            let type_ = type_;
469            let precision = *precision;
470            let array = array
471                .as_any()
472                .downcast_ref::<PrimitiveArray<i256>>()
473                .unwrap();
474            if precision <= 9 {
475                let values = array
476                    .values()
477                    .iter()
478                    .map(|x| x.0.as_i32())
479                    .collect::<Vec<_>>()
480                    .into();
481
482                let array =
483                    PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
484                primitive::array_to_page_integer::<i32, i32>(&array, options, type_, encoding)
485            } else if precision <= 18 {
486                let values = array
487                    .values()
488                    .iter()
489                    .map(|x| x.0.as_i64())
490                    .collect::<Vec<_>>()
491                    .into();
492
493                let array =
494                    PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
495                primitive::array_to_page_integer::<i64, i64>(&array, options, type_, encoding)
496            } else if precision <= 38 {
497                let size = decimal_length_from_precision(precision);
498                let statistics = if options.write_statistics {
499                    let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
500                        array,
501                        type_.clone(),
502                        size,
503                    );
504                    Some(stats)
505                } else {
506                    None
507                };
508
509                let mut values = Vec::<u8>::with_capacity(size * array.len());
510                array.values().iter().for_each(|x| {
511                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
512                    values.extend_from_slice(bytes)
513                });
514                let array = FixedSizeBinaryArray::new(
515                    DataType::FixedSizeBinary(size),
516                    values.into(),
517                    array.validity().cloned(),
518                );
519                fixed_len_bytes::array_to_page(&array, options, type_, statistics)
520            } else {
521                let size = 32;
522                let array = array
523                    .as_any()
524                    .downcast_ref::<PrimitiveArray<i256>>()
525                    .unwrap();
526                let statistics = if options.write_statistics {
527                    let stats =
528                        fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size);
529                    Some(stats)
530                } else {
531                    None
532                };
533                let mut values = Vec::<u8>::with_capacity(size * array.len());
534                array.values().iter().for_each(|x| {
535                    let bytes = &x.to_be_bytes();
536                    values.extend_from_slice(bytes)
537                });
538                let array = FixedSizeBinaryArray::new(
539                    DataType::FixedSizeBinary(size),
540                    values.into(),
541                    array.validity().cloned(),
542                );
543
544                fixed_len_bytes::array_to_page(&array, options, type_, statistics)
545            }
546        }
547        DataType::Decimal(precision, _) => {
548            let type_ = type_;
549            let precision = *precision;
550            let array = array
551                .as_any()
552                .downcast_ref::<PrimitiveArray<i128>>()
553                .unwrap();
554            if precision <= 9 {
555                let values = array
556                    .values()
557                    .iter()
558                    .map(|x| *x as i32)
559                    .collect::<Vec<_>>()
560                    .into();
561
562                let array =
563                    PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
564                primitive::array_to_page_integer::<i32, i32>(&array, options, type_, encoding)
565            } else if precision <= 18 {
566                let values = array
567                    .values()
568                    .iter()
569                    .map(|x| *x as i64)
570                    .collect::<Vec<_>>()
571                    .into();
572
573                let array =
574                    PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
575                primitive::array_to_page_integer::<i64, i64>(&array, options, type_, encoding)
576            } else {
577                let size = decimal_length_from_precision(precision);
578
579                let statistics = if options.write_statistics {
580                    let stats =
581                        fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size);
582                    Some(stats)
583                } else {
584                    None
585                };
586
587                let mut values = Vec::<u8>::with_capacity(size * array.len());
588                array.values().iter().for_each(|x| {
589                    let bytes = &x.to_be_bytes()[16 - size..];
590                    values.extend_from_slice(bytes)
591                });
592                let array = FixedSizeBinaryArray::new(
593                    DataType::FixedSizeBinary(size),
594                    values.into(),
595                    array.validity().cloned(),
596                );
597                fixed_len_bytes::array_to_page(&array, options, type_, statistics)
598            }
599        }
600        other => Err(Error::NotYetImplemented(format!(
601            "Writing parquet pages for data type {other:?}"
602        ))),
603    }
604    .map(Page::Data)
605}
606
607fn array_to_page_nested(
608    array: &dyn Array,
609    type_: ParquetPrimitiveType,
610    nested: &[Nested],
611    options: WriteOptions,
612    _encoding: Encoding,
613) -> Result<Page> {
614    use DataType::*;
615    match array.data_type().to_logical_type() {
616        Null => {
617            let array = Int32Array::new_null(DataType::Int32, array.len());
618            primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
619        }
620        Boolean => {
621            let array = array.as_any().downcast_ref().unwrap();
622            boolean::nested_array_to_page(array, options, type_, nested)
623        }
624        Utf8 => {
625            let array = array.as_any().downcast_ref().unwrap();
626            utf8::nested_array_to_page::<i32>(array, options, type_, nested)
627        }
628        LargeUtf8 => {
629            let array = array.as_any().downcast_ref().unwrap();
630            utf8::nested_array_to_page::<i64>(array, options, type_, nested)
631        }
632        Binary => {
633            let array = array.as_any().downcast_ref().unwrap();
634            binary::nested_array_to_page::<i32>(array, options, type_, nested)
635        }
636        LargeBinary => {
637            let array = array.as_any().downcast_ref().unwrap();
638            binary::nested_array_to_page::<i64>(array, options, type_, nested)
639        }
640        UInt8 => {
641            let array = array.as_any().downcast_ref().unwrap();
642            primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
643        }
644        UInt16 => {
645            let array = array.as_any().downcast_ref().unwrap();
646            primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
647        }
648        UInt32 => {
649            let array = array.as_any().downcast_ref().unwrap();
650            primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
651        }
652        UInt64 => {
653            let array = array.as_any().downcast_ref().unwrap();
654            primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
655        }
656        Int8 => {
657            let array = array.as_any().downcast_ref().unwrap();
658            primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
659        }
660        Int16 => {
661            let array = array.as_any().downcast_ref().unwrap();
662            primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
663        }
664        Int32 | Date32 | Time32(_) => {
665            let array = array.as_any().downcast_ref().unwrap();
666            primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
667        }
668        Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
669            let array = array.as_any().downcast_ref().unwrap();
670            primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
671        }
672        Float32 => {
673            let array = array.as_any().downcast_ref().unwrap();
674            primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
675        }
676        Float64 => {
677            let array = array.as_any().downcast_ref().unwrap();
678            primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
679        }
680        Decimal(precision, _) => {
681            let type_ = type_;
682            let precision = *precision;
683            let array = array
684                .as_any()
685                .downcast_ref::<PrimitiveArray<i128>>()
686                .unwrap();
687            if precision <= 9 {
688                let values = array
689                    .values()
690                    .iter()
691                    .map(|x| *x as i32)
692                    .collect::<Vec<_>>()
693                    .into();
694
695                let array =
696                    PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
697                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
698            } else if precision <= 18 {
699                let values = array
700                    .values()
701                    .iter()
702                    .map(|x| *x as i64)
703                    .collect::<Vec<_>>()
704                    .into();
705
706                let array =
707                    PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
708                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
709            } else {
710                let size = decimal_length_from_precision(precision);
711
712                let statistics = if options.write_statistics {
713                    let stats =
714                        fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size);
715                    Some(stats)
716                } else {
717                    None
718                };
719
720                let mut values = Vec::<u8>::with_capacity(size * array.len());
721                array.values().iter().for_each(|x| {
722                    let bytes = &x.to_be_bytes()[16 - size..];
723                    values.extend_from_slice(bytes)
724                });
725                let array = FixedSizeBinaryArray::new(
726                    DataType::FixedSizeBinary(size),
727                    values.into(),
728                    array.validity().cloned(),
729                );
730                fixed_len_bytes::array_to_page(&array, options, type_, statistics)
731            }
732        }
733        Decimal256(precision, _) => {
734            let type_ = type_;
735            let precision = *precision;
736            let array = array
737                .as_any()
738                .downcast_ref::<PrimitiveArray<i256>>()
739                .unwrap();
740            if precision <= 9 {
741                let values = array
742                    .values()
743                    .iter()
744                    .map(|x| x.0.as_i32())
745                    .collect::<Vec<_>>()
746                    .into();
747
748                let array =
749                    PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
750                primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
751            } else if precision <= 18 {
752                let values = array
753                    .values()
754                    .iter()
755                    .map(|x| x.0.as_i64())
756                    .collect::<Vec<_>>()
757                    .into();
758
759                let array =
760                    PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
761                primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
762            } else if precision <= 38 {
763                let size = decimal_length_from_precision(precision);
764                let statistics = if options.write_statistics {
765                    let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
766                        array,
767                        type_.clone(),
768                        size,
769                    );
770                    Some(stats)
771                } else {
772                    None
773                };
774
775                let mut values = Vec::<u8>::with_capacity(size * array.len());
776                array.values().iter().for_each(|x| {
777                    let bytes = &x.0.low().to_be_bytes()[16 - size..];
778                    values.extend_from_slice(bytes)
779                });
780                let array = FixedSizeBinaryArray::new(
781                    DataType::FixedSizeBinary(size),
782                    values.into(),
783                    array.validity().cloned(),
784                );
785                fixed_len_bytes::array_to_page(&array, options, type_, statistics)
786            } else {
787                let size = 32;
788                let array = array
789                    .as_any()
790                    .downcast_ref::<PrimitiveArray<i256>>()
791                    .unwrap();
792                let statistics = if options.write_statistics {
793                    let stats =
794                        fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size);
795                    Some(stats)
796                } else {
797                    None
798                };
799                let mut values = Vec::<u8>::with_capacity(size * array.len());
800                array.values().iter().for_each(|x| {
801                    let bytes = &x.to_be_bytes();
802                    values.extend_from_slice(bytes)
803                });
804                let array = FixedSizeBinaryArray::new(
805                    DataType::FixedSizeBinary(size),
806                    values.into(),
807                    array.validity().cloned(),
808                );
809
810                fixed_len_bytes::array_to_page(&array, options, type_, statistics)
811            }
812        }
813        other => Err(Error::NotYetImplemented(format!(
814            "Writing nested parquet pages for data type {other:?}"
815        ))),
816    }
817    .map(Page::Data)
818}
819
820fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
821    data_type: &DataType,
822    map: F,
823    encodings: &mut Vec<T>,
824) {
825    use crate::datatypes::PhysicalType::*;
826    match data_type.to_physical_type() {
827        Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
828        | Dictionary(_) | LargeUtf8 => encodings.push(map(data_type)),
829        List | FixedSizeList | LargeList => {
830            let a = data_type.to_logical_type();
831            if let DataType::List(inner) = a {
832                transverse_recursive(&inner.data_type, map, encodings)
833            } else if let DataType::LargeList(inner) = a {
834                transverse_recursive(&inner.data_type, map, encodings)
835            } else if let DataType::FixedSizeList(inner, _) = a {
836                transverse_recursive(&inner.data_type, map, encodings)
837            } else {
838                unreachable!()
839            }
840        }
841        Struct => {
842            if let DataType::Struct(fields) = data_type.to_logical_type() {
843                for field in fields {
844                    transverse_recursive(&field.data_type, map.clone(), encodings)
845                }
846            } else {
847                unreachable!()
848            }
849        }
850        Map => {
851            if let DataType::Map(field, _) = data_type.to_logical_type() {
852                if let DataType::Struct(fields) = field.data_type.to_logical_type() {
853                    for field in fields {
854                        transverse_recursive(&field.data_type, map.clone(), encodings)
855                    }
856                } else {
857                    unreachable!()
858                }
859            } else {
860                unreachable!()
861            }
862        }
863        Union => todo!(),
864    }
865}
866
867/// Transverses the `data_type` up to its (parquet) columns and returns a vector of
868/// items based on `map`.
869/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
870/// # Example
871/// ```
872/// use arrow2::io::parquet::write::{transverse, Encoding};
873/// use arrow2::datatypes::{DataType, Field};
874///
875/// let dt = DataType::Struct(vec![
876///     Field::new("a", DataType::Int64, true),
877///     Field::new("b", DataType::List(Box::new(Field::new("item", DataType::Int32, true))), true),
878/// ]);
879///
880/// let encodings = transverse(&dt, |dt| Encoding::Plain);
881/// assert_eq!(encodings, vec![Encoding::Plain, Encoding::Plain]);
882/// ```
883pub fn transverse<T, F: Fn(&DataType) -> T + Clone>(data_type: &DataType, map: F) -> Vec<T> {
884    let mut encodings = vec![];
885    transverse_recursive(data_type, map, &mut encodings);
886    encodings
887}