1mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::datatypes::*;
30use arrow::types::{NativeType, days_ms, i256};
31pub use nested::{num_values, write_rep_and_def};
32pub use pages::{to_leaves, to_nested, to_parquet_leaves};
33use polars_utils::pl_str::PlSmallStr;
34pub use utils::write_def_levels;
35
36pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
37pub use crate::parquet::encoding::Encoding;
38pub use crate::parquet::metadata::{
39 Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
40};
41pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
42use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
43pub use crate::parquet::schema::types::{
44 FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
45};
46pub use crate::parquet::write::{
47 Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
48 write_metadata_sidecar,
49};
50pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct StatisticsOptions {
56 pub min_value: bool,
57 pub max_value: bool,
58 pub distinct_count: bool,
59 pub null_count: bool,
60}
61
62impl Default for StatisticsOptions {
63 fn default() -> Self {
64 Self {
65 min_value: true,
66 max_value: true,
67 distinct_count: false,
68 null_count: true,
69 }
70 }
71}
72
73#[derive(Clone, Copy)]
75pub enum EncodeNullability {
76 Required,
77 Optional,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct WriteOptions {
83 pub statistics: StatisticsOptions,
85 pub version: Version,
87 pub compression: CompressionOptions,
89 pub data_page_size: Option<usize>,
91}
92
93#[derive(Clone)]
94pub struct ColumnWriteOptions {
95 pub field_id: Option<i32>,
96 pub metadata: Vec<KeyValue>,
97 pub children: ChildWriteOptions,
98}
99
100#[derive(Clone)]
101pub enum ChildWriteOptions {
102 Leaf(FieldWriteOptions),
103 ListLike(Box<ListLikeFieldWriteOptions>),
104 Struct(Box<StructFieldWriteOptions>),
105}
106
107impl ColumnWriteOptions {
108 pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {
109 match &self.children {
110 ChildWriteOptions::Leaf(o) => out.push(o),
111 ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),
112 ChildWriteOptions::Struct(o) => {
113 for o in &o.children {
114 o.to_leaves(out);
115 }
116 },
117 }
118 }
119}
120
121#[derive(Clone)]
122pub struct FieldWriteOptions {
123 pub encoding: Encoding,
124}
125
126impl ColumnWriteOptions {
127 pub fn default_with(children: ChildWriteOptions) -> Self {
128 Self {
129 field_id: None,
130 metadata: Vec::new(),
131 children,
132 }
133 }
134}
135
136impl FieldWriteOptions {
137 pub fn default_with_encoding(encoding: Encoding) -> Self {
138 Self { encoding }
139 }
140
141 pub fn into_default_column_write_options(self) -> ColumnWriteOptions {
142 ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))
143 }
144}
145
146#[derive(Clone)]
147pub struct ListLikeFieldWriteOptions {
148 pub child: ColumnWriteOptions,
149}
150
151#[derive(Clone)]
152pub struct StructFieldWriteOptions {
153 pub children: Vec<ColumnWriteOptions>,
154}
155
156use arrow::compute::aggregate::estimated_bytes_size;
157use arrow::match_integer_type;
158pub use file::FileWriter;
159pub use pages::{Nested, array_to_columns, arrays_to_columns};
160use polars_error::{PolarsResult, polars_bail};
161pub use row_group::{RowGroupIterator, row_group_iter};
162pub use schema::{schema_to_metadata_key, to_parquet_type};
163
164use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
165use crate::write::dictionary::encode_as_dictionary_optional;
166
167impl StatisticsOptions {
168 pub fn empty() -> Self {
169 Self {
170 min_value: false,
171 max_value: false,
172 distinct_count: false,
173 null_count: false,
174 }
175 }
176
177 pub fn full() -> Self {
178 Self {
179 min_value: true,
180 max_value: true,
181 distinct_count: true,
182 null_count: true,
183 }
184 }
185
186 pub fn is_empty(&self) -> bool {
187 !(self.min_value || self.max_value || self.distinct_count || self.null_count)
188 }
189
190 pub fn is_full(&self) -> bool {
191 self.min_value && self.max_value && self.distinct_count && self.null_count
192 }
193}
194
195impl WriteOptions {
196 pub fn has_statistics(&self) -> bool {
197 !self.statistics.is_empty()
198 }
199}
200
201impl EncodeNullability {
202 const fn new(is_optional: bool) -> Self {
203 if is_optional {
204 Self::Optional
205 } else {
206 Self::Required
207 }
208 }
209
210 fn is_optional(self) -> bool {
211 matches!(self, Self::Optional)
212 }
213}
214
215pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
217 let mut out = (0, 0);
220 for nested in nested.iter().rev() {
221 match nested {
222 Nested::LargeList(l_nested) => {
223 let start = *l_nested.offsets.first();
224 let end = *l_nested.offsets.last();
225 return (start as usize, (end - start) as usize);
226 },
227 Nested::List(l_nested) => {
228 let start = *l_nested.offsets.first();
229 let end = *l_nested.offsets.last();
230 return (start as usize, (end - start) as usize);
231 },
232 Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
233 Nested::Primitive(nested) => out = (0, nested.length),
234 Nested::Struct(_) => {},
235 }
236 }
237 out
238}
239
240fn decimal_length_from_precision(precision: usize) -> usize {
241 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
249}
250
251pub fn to_parquet_schema(
253 schema: &ArrowSchema,
254 column_options: &[ColumnWriteOptions],
255) -> PolarsResult<SchemaDescriptor> {
256 let parquet_types = schema
257 .iter_values()
258 .zip(column_options)
259 .map(|(field, options)| to_parquet_type(field, options))
260 .collect::<PolarsResult<Vec<_>>>()?;
261 Ok(SchemaDescriptor::new(
262 PlSmallStr::from_static("root"),
263 parquet_types,
264 ))
265}
266
267pub fn slice_parquet_array(
269 primitive_array: &mut dyn Array,
270 nested: &mut [Nested],
271 mut current_offset: usize,
272 mut current_length: usize,
273) {
274 for nested in nested.iter_mut() {
275 match nested {
276 Nested::LargeList(l_nested) => {
277 l_nested.offsets.slice(current_offset, current_length + 1);
278 if let Some(validity) = l_nested.validity.as_mut() {
279 validity.slice(current_offset, current_length)
280 };
281
282 current_length = l_nested.offsets.range() as usize;
284 current_offset = *l_nested.offsets.first() as usize;
285 },
286 Nested::List(l_nested) => {
287 l_nested.offsets.slice(current_offset, current_length + 1);
288 if let Some(validity) = l_nested.validity.as_mut() {
289 validity.slice(current_offset, current_length)
290 };
291
292 current_length = l_nested.offsets.range() as usize;
294 current_offset = *l_nested.offsets.first() as usize;
295 },
296 Nested::Struct(StructNested {
297 validity, length, ..
298 }) => {
299 *length = current_length;
300 if let Some(validity) = validity.as_mut() {
301 validity.slice(current_offset, current_length)
302 };
303 },
304 Nested::Primitive(PrimitiveNested {
305 validity, length, ..
306 }) => {
307 *length = current_length;
308 if let Some(validity) = validity.as_mut() {
309 validity.slice(current_offset, current_length)
310 };
311 primitive_array.slice(current_offset, current_length);
312 },
313 Nested::FixedSizeList(FixedSizeListNested {
314 validity,
315 length,
316 width,
317 ..
318 }) => {
319 if let Some(validity) = validity.as_mut() {
320 validity.slice(current_offset, current_length)
321 };
322 *length = current_length;
323 current_length *= *width;
325 current_offset *= *width;
326 },
327 }
328 }
329}
330
331pub fn get_max_length(nested: &[Nested]) -> usize {
333 let mut length = 0;
334 for nested in nested.iter() {
335 match nested {
336 Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
337 Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
338 Nested::FixedSizeList(nested) => length += nested.length * nested.width,
339 _ => {},
340 }
341 }
342 length
343}
344
345pub fn array_to_pages(
347 primitive_array: &dyn Array,
348 type_: ParquetPrimitiveType,
349 nested: &[Nested],
350 options: WriteOptions,
351 field_options: &FieldWriteOptions,
352) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
353 let mut encoding = field_options.encoding;
354 if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
355 return match_integer_type!(key_type, |$T| {
356 dictionary::array_to_pages::<$T>(
357 primitive_array.as_any().downcast_ref().unwrap(),
358 type_,
359 &nested,
360 options,
361 encoding,
362 )
363 });
364 };
365 if let Encoding::RleDictionary = encoding {
366 if matches!(nested.first(), Some(Nested::Primitive(_))) {
368 if let Some(result) =
369 encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
370 {
371 return result;
372 }
373 }
374
375 encoding = Encoding::Plain;
377 }
378
379 let nested = nested.to_vec();
380
381 let number_of_rows = nested[0].len();
382
383 let byte_size = estimated_bytes_size(primitive_array);
386
387 const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
388 let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
389 let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); let bytes_per_row = if number_of_rows == 0 {
391 0
392 } else {
393 ((byte_size as f64) / (number_of_rows as f64)) as usize
394 };
395 let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
396
397 let row_iter = (0..number_of_rows)
398 .step_by(rows_per_page)
399 .map(move |offset| {
400 let length = if offset + rows_per_page > number_of_rows {
401 number_of_rows - offset
402 } else {
403 rows_per_page
404 };
405 (offset, length)
406 });
407
408 let primitive_array = primitive_array.to_boxed();
409
410 let pages = row_iter.map(move |(offset, length)| {
411 let mut right_array = primitive_array.clone();
412 let mut right_nested = nested.clone();
413 slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
414
415 array_to_page(
416 right_array.as_ref(),
417 type_.clone(),
418 &right_nested,
419 options,
420 encoding,
421 )
422 });
423 Ok(DynIter::new(pages))
424}
425
426pub fn array_to_page(
428 array: &dyn Array,
429 type_: ParquetPrimitiveType,
430 nested: &[Nested],
431 options: WriteOptions,
432 encoding: Encoding,
433) -> PolarsResult<Page> {
434 if nested.len() == 1 {
435 return array_to_page_simple(array, type_, options, encoding);
437 }
438 array_to_page_nested(array, type_, nested, options, encoding)
439}
440
441pub fn array_to_page_simple(
443 array: &dyn Array,
444 type_: ParquetPrimitiveType,
445 options: WriteOptions,
446 encoding: Encoding,
447) -> PolarsResult<Page> {
448 let dtype = array.dtype();
449
450 match dtype.to_logical_type() {
451 ArrowDataType::Boolean => boolean::array_to_page(
452 array.as_any().downcast_ref().unwrap(),
453 options,
454 type_,
455 encoding,
456 ),
457 ArrowDataType::UInt8 => {
459 return primitive::array_to_page_integer::<u8, i32>(
460 array.as_any().downcast_ref().unwrap(),
461 options,
462 type_,
463 encoding,
464 );
465 },
466 ArrowDataType::UInt16 => {
467 return primitive::array_to_page_integer::<u16, i32>(
468 array.as_any().downcast_ref().unwrap(),
469 options,
470 type_,
471 encoding,
472 );
473 },
474 ArrowDataType::UInt32 => {
475 return primitive::array_to_page_integer::<u32, i32>(
476 array.as_any().downcast_ref().unwrap(),
477 options,
478 type_,
479 encoding,
480 );
481 },
482 ArrowDataType::UInt64 => {
483 return primitive::array_to_page_integer::<u64, i64>(
484 array.as_any().downcast_ref().unwrap(),
485 options,
486 type_,
487 encoding,
488 );
489 },
490 ArrowDataType::Int8 => {
491 return primitive::array_to_page_integer::<i8, i32>(
492 array.as_any().downcast_ref().unwrap(),
493 options,
494 type_,
495 encoding,
496 );
497 },
498 ArrowDataType::Int16 => {
499 return primitive::array_to_page_integer::<i16, i32>(
500 array.as_any().downcast_ref().unwrap(),
501 options,
502 type_,
503 encoding,
504 );
505 },
506 ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
507 return primitive::array_to_page_integer::<i32, i32>(
508 array.as_any().downcast_ref().unwrap(),
509 options,
510 type_,
511 encoding,
512 );
513 },
514 ArrowDataType::Int64
515 | ArrowDataType::Date64
516 | ArrowDataType::Time64(_)
517 | ArrowDataType::Timestamp(_, _)
518 | ArrowDataType::Duration(_) => {
519 return primitive::array_to_page_integer::<i64, i64>(
520 array.as_any().downcast_ref().unwrap(),
521 options,
522 type_,
523 encoding,
524 );
525 },
526 ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
527 array.as_any().downcast_ref().unwrap(),
528 options,
529 type_,
530 ),
531 ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
532 array.as_any().downcast_ref().unwrap(),
533 options,
534 type_,
535 ),
536 ArrowDataType::LargeUtf8 => {
537 let array =
538 polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
539 .unwrap();
540 return binary::array_to_page::<i64>(
541 array.as_any().downcast_ref().unwrap(),
542 options,
543 type_,
544 encoding,
545 );
546 },
547 ArrowDataType::LargeBinary => {
548 return binary::array_to_page::<i64>(
549 array.as_any().downcast_ref().unwrap(),
550 options,
551 type_,
552 encoding,
553 );
554 },
555 ArrowDataType::BinaryView => {
556 return binview::array_to_page(
557 array.as_any().downcast_ref().unwrap(),
558 options,
559 type_,
560 encoding,
561 );
562 },
563 ArrowDataType::Utf8View => {
564 let array =
565 polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
566 .unwrap();
567 return binview::array_to_page(
568 array.as_any().downcast_ref().unwrap(),
569 options,
570 type_,
571 encoding,
572 );
573 },
574 ArrowDataType::Null => {
575 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
576 primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
577 },
578 ArrowDataType::Interval(IntervalUnit::YearMonth) => {
579 let array = array
580 .as_any()
581 .downcast_ref::<PrimitiveArray<i32>>()
582 .unwrap();
583 let mut values = Vec::<u8>::with_capacity(12 * array.len());
584 array.values().iter().for_each(|x| {
585 let bytes = &x.to_le_bytes();
586 values.extend_from_slice(bytes);
587 values.extend_from_slice(&[0; 8]);
588 });
589 let array = FixedSizeBinaryArray::new(
590 ArrowDataType::FixedSizeBinary(12),
591 values.into(),
592 array.validity().cloned(),
593 );
594 let statistics = if options.has_statistics() {
595 Some(fixed_size_binary::build_statistics(
596 &array,
597 type_.clone(),
598 &options.statistics,
599 ))
600 } else {
601 None
602 };
603 fixed_size_binary::array_to_page(&array, options, type_, statistics)
604 },
605 ArrowDataType::Interval(IntervalUnit::DayTime) => {
606 let array = array
607 .as_any()
608 .downcast_ref::<PrimitiveArray<days_ms>>()
609 .unwrap();
610 let mut values = Vec::<u8>::with_capacity(12 * array.len());
611 array.values().iter().for_each(|x| {
612 let bytes = &x.to_le_bytes();
613 values.extend_from_slice(&[0; 4]); values.extend_from_slice(bytes); });
616 let array = FixedSizeBinaryArray::new(
617 ArrowDataType::FixedSizeBinary(12),
618 values.into(),
619 array.validity().cloned(),
620 );
621 let statistics = if options.has_statistics() {
622 Some(fixed_size_binary::build_statistics(
623 &array,
624 type_.clone(),
625 &options.statistics,
626 ))
627 } else {
628 None
629 };
630 fixed_size_binary::array_to_page(&array, options, type_, statistics)
631 },
632 ArrowDataType::FixedSizeBinary(_) => {
633 let array = array.as_any().downcast_ref().unwrap();
634 let statistics = if options.has_statistics() {
635 Some(fixed_size_binary::build_statistics(
636 array,
637 type_.clone(),
638 &options.statistics,
639 ))
640 } else {
641 None
642 };
643
644 fixed_size_binary::array_to_page(array, options, type_, statistics)
645 },
646 ArrowDataType::Decimal256(precision, _) => {
647 let precision = *precision;
648 let array = array
649 .as_any()
650 .downcast_ref::<PrimitiveArray<i256>>()
651 .unwrap();
652 if precision <= 9 {
653 let values = array
654 .values()
655 .iter()
656 .map(|x| x.0.as_i32())
657 .collect::<Vec<_>>()
658 .into();
659
660 let array = PrimitiveArray::<i32>::new(
661 ArrowDataType::Int32,
662 values,
663 array.validity().cloned(),
664 );
665 return primitive::array_to_page_integer::<i32, i32>(
666 &array, options, type_, encoding,
667 );
668 } else if precision <= 18 {
669 let values = array
670 .values()
671 .iter()
672 .map(|x| x.0.as_i64())
673 .collect::<Vec<_>>()
674 .into();
675
676 let array = PrimitiveArray::<i64>::new(
677 ArrowDataType::Int64,
678 values,
679 array.validity().cloned(),
680 );
681 return primitive::array_to_page_integer::<i64, i64>(
682 &array, options, type_, encoding,
683 );
684 } else if precision <= 38 {
685 let size = decimal_length_from_precision(precision);
686 let statistics = if options.has_statistics() {
687 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
688 array,
689 type_.clone(),
690 size,
691 &options.statistics,
692 );
693 Some(stats)
694 } else {
695 None
696 };
697
698 let mut values = Vec::<u8>::with_capacity(size * array.len());
699 array.values().iter().for_each(|x| {
700 let bytes = &x.0.low().to_be_bytes()[16 - size..];
701 values.extend_from_slice(bytes)
702 });
703 let array = FixedSizeBinaryArray::new(
704 ArrowDataType::FixedSizeBinary(size),
705 values.into(),
706 array.validity().cloned(),
707 );
708 fixed_size_binary::array_to_page(&array, options, type_, statistics)
709 } else {
710 let size = 32;
711 let array = array
712 .as_any()
713 .downcast_ref::<PrimitiveArray<i256>>()
714 .unwrap();
715 let statistics = if options.has_statistics() {
716 let stats = fixed_size_binary::build_statistics_decimal256(
717 array,
718 type_.clone(),
719 size,
720 &options.statistics,
721 );
722 Some(stats)
723 } else {
724 None
725 };
726 let mut values = Vec::<u8>::with_capacity(size * array.len());
727 array.values().iter().for_each(|x| {
728 let bytes = &x.to_be_bytes();
729 values.extend_from_slice(bytes)
730 });
731 let array = FixedSizeBinaryArray::new(
732 ArrowDataType::FixedSizeBinary(size),
733 values.into(),
734 array.validity().cloned(),
735 );
736
737 fixed_size_binary::array_to_page(&array, options, type_, statistics)
738 }
739 },
740 ArrowDataType::Decimal(precision, _) => {
741 let precision = *precision;
742 let array = array
743 .as_any()
744 .downcast_ref::<PrimitiveArray<i128>>()
745 .unwrap();
746 if precision <= 9 {
747 let values = array
748 .values()
749 .iter()
750 .map(|x| *x as i32)
751 .collect::<Vec<_>>()
752 .into();
753
754 let array = PrimitiveArray::<i32>::new(
755 ArrowDataType::Int32,
756 values,
757 array.validity().cloned(),
758 );
759 return primitive::array_to_page_integer::<i32, i32>(
760 &array, options, type_, encoding,
761 );
762 } else if precision <= 18 {
763 let values = array
764 .values()
765 .iter()
766 .map(|x| *x as i64)
767 .collect::<Vec<_>>()
768 .into();
769
770 let array = PrimitiveArray::<i64>::new(
771 ArrowDataType::Int64,
772 values,
773 array.validity().cloned(),
774 );
775 return primitive::array_to_page_integer::<i64, i64>(
776 &array, options, type_, encoding,
777 );
778 } else {
779 let size = decimal_length_from_precision(precision);
780
781 let statistics = if options.has_statistics() {
782 let stats = fixed_size_binary::build_statistics_decimal(
783 array,
784 type_.clone(),
785 size,
786 &options.statistics,
787 );
788 Some(stats)
789 } else {
790 None
791 };
792
793 let mut values = Vec::<u8>::with_capacity(size * array.len());
794 array.values().iter().for_each(|x| {
795 let bytes = &x.to_be_bytes()[16 - size..];
796 values.extend_from_slice(bytes)
797 });
798 let array = FixedSizeBinaryArray::new(
799 ArrowDataType::FixedSizeBinary(size),
800 values.into(),
801 array.validity().cloned(),
802 );
803 fixed_size_binary::array_to_page(&array, options, type_, statistics)
804 }
805 },
806 other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
807 }
808 .map(Page::Data)
809}
810
811fn array_to_page_nested(
812 array: &dyn Array,
813 type_: ParquetPrimitiveType,
814 nested: &[Nested],
815 options: WriteOptions,
816 _encoding: Encoding,
817) -> PolarsResult<Page> {
818 use ArrowDataType::*;
819 match array.dtype().to_logical_type() {
820 Null => {
821 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
822 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
823 },
824 Boolean => {
825 let array = array.as_any().downcast_ref().unwrap();
826 boolean::nested_array_to_page(array, options, type_, nested)
827 },
828 LargeUtf8 => {
829 let array =
830 polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
831 let array = array.as_any().downcast_ref().unwrap();
832 binary::nested_array_to_page::<i64>(array, options, type_, nested)
833 },
834 LargeBinary => {
835 let array = array.as_any().downcast_ref().unwrap();
836 binary::nested_array_to_page::<i64>(array, options, type_, nested)
837 },
838 BinaryView => {
839 let array = array.as_any().downcast_ref().unwrap();
840 binview::nested_array_to_page(array, options, type_, nested)
841 },
842 Utf8View => {
843 let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
844 let array = array.as_any().downcast_ref().unwrap();
845 binview::nested_array_to_page(array, options, type_, nested)
846 },
847 UInt8 => {
848 let array = array.as_any().downcast_ref().unwrap();
849 primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
850 },
851 UInt16 => {
852 let array = array.as_any().downcast_ref().unwrap();
853 primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
854 },
855 UInt32 => {
856 let array = array.as_any().downcast_ref().unwrap();
857 primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
858 },
859 UInt64 => {
860 let array = array.as_any().downcast_ref().unwrap();
861 primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
862 },
863 Int8 => {
864 let array = array.as_any().downcast_ref().unwrap();
865 primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
866 },
867 Int16 => {
868 let array = array.as_any().downcast_ref().unwrap();
869 primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
870 },
871 Int32 | Date32 | Time32(_) => {
872 let array = array.as_any().downcast_ref().unwrap();
873 primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
874 },
875 Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
876 let array = array.as_any().downcast_ref().unwrap();
877 primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
878 },
879 Float32 => {
880 let array = array.as_any().downcast_ref().unwrap();
881 primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
882 },
883 Float64 => {
884 let array = array.as_any().downcast_ref().unwrap();
885 primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
886 },
887 Decimal(precision, _) => {
888 let precision = *precision;
889 let array = array
890 .as_any()
891 .downcast_ref::<PrimitiveArray<i128>>()
892 .unwrap();
893 if precision <= 9 {
894 let values = array
895 .values()
896 .iter()
897 .map(|x| *x as i32)
898 .collect::<Vec<_>>()
899 .into();
900
901 let array = PrimitiveArray::<i32>::new(
902 ArrowDataType::Int32,
903 values,
904 array.validity().cloned(),
905 );
906 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
907 } else if precision <= 18 {
908 let values = array
909 .values()
910 .iter()
911 .map(|x| *x as i64)
912 .collect::<Vec<_>>()
913 .into();
914
915 let array = PrimitiveArray::<i64>::new(
916 ArrowDataType::Int64,
917 values,
918 array.validity().cloned(),
919 );
920 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
921 } else {
922 let size = decimal_length_from_precision(precision);
923
924 let statistics = if options.has_statistics() {
925 let stats = fixed_size_binary::build_statistics_decimal(
926 array,
927 type_.clone(),
928 size,
929 &options.statistics,
930 );
931 Some(stats)
932 } else {
933 None
934 };
935
936 let mut values = Vec::<u8>::with_capacity(size * array.len());
937 array.values().iter().for_each(|x| {
938 let bytes = &x.to_be_bytes()[16 - size..];
939 values.extend_from_slice(bytes)
940 });
941 let array = FixedSizeBinaryArray::new(
942 ArrowDataType::FixedSizeBinary(size),
943 values.into(),
944 array.validity().cloned(),
945 );
946 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
947 }
948 },
949 Decimal256(precision, _) => {
950 let precision = *precision;
951 let array = array
952 .as_any()
953 .downcast_ref::<PrimitiveArray<i256>>()
954 .unwrap();
955 if precision <= 9 {
956 let values = array
957 .values()
958 .iter()
959 .map(|x| x.0.as_i32())
960 .collect::<Vec<_>>()
961 .into();
962
963 let array = PrimitiveArray::<i32>::new(
964 ArrowDataType::Int32,
965 values,
966 array.validity().cloned(),
967 );
968 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
969 } else if precision <= 18 {
970 let values = array
971 .values()
972 .iter()
973 .map(|x| x.0.as_i64())
974 .collect::<Vec<_>>()
975 .into();
976
977 let array = PrimitiveArray::<i64>::new(
978 ArrowDataType::Int64,
979 values,
980 array.validity().cloned(),
981 );
982 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
983 } else if precision <= 38 {
984 let size = decimal_length_from_precision(precision);
985 let statistics = if options.has_statistics() {
986 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
987 array,
988 type_.clone(),
989 size,
990 &options.statistics,
991 );
992 Some(stats)
993 } else {
994 None
995 };
996
997 let mut values = Vec::<u8>::with_capacity(size * array.len());
998 array.values().iter().for_each(|x| {
999 let bytes = &x.0.low().to_be_bytes()[16 - size..];
1000 values.extend_from_slice(bytes)
1001 });
1002 let array = FixedSizeBinaryArray::new(
1003 ArrowDataType::FixedSizeBinary(size),
1004 values.into(),
1005 array.validity().cloned(),
1006 );
1007 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1008 } else {
1009 let size = 32;
1010 let array = array
1011 .as_any()
1012 .downcast_ref::<PrimitiveArray<i256>>()
1013 .unwrap();
1014 let statistics = if options.has_statistics() {
1015 let stats = fixed_size_binary::build_statistics_decimal256(
1016 array,
1017 type_.clone(),
1018 size,
1019 &options.statistics,
1020 );
1021 Some(stats)
1022 } else {
1023 None
1024 };
1025 let mut values = Vec::<u8>::with_capacity(size * array.len());
1026 array.values().iter().for_each(|x| {
1027 let bytes = &x.to_be_bytes();
1028 values.extend_from_slice(bytes)
1029 });
1030 let array = FixedSizeBinaryArray::new(
1031 ArrowDataType::FixedSizeBinary(size),
1032 values.into(),
1033 array.validity().cloned(),
1034 );
1035
1036 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1037 }
1038 },
1039 other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1040 }
1041 .map(Page::Data)
1042}
1043
1044fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
1045 dtype: &ArrowDataType,
1046 map: F,
1047 encodings: &mut Vec<T>,
1048) {
1049 use arrow::datatypes::PhysicalType::*;
1050 match dtype.to_physical_type() {
1051 Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1052 | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
1053 List | FixedSizeList | LargeList => {
1054 let a = dtype.to_logical_type();
1055 if let ArrowDataType::List(inner) = a {
1056 transverse_recursive(&inner.dtype, map, encodings)
1057 } else if let ArrowDataType::LargeList(inner) = a {
1058 transverse_recursive(&inner.dtype, map, encodings)
1059 } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1060 transverse_recursive(&inner.dtype, map, encodings)
1061 } else {
1062 unreachable!()
1063 }
1064 },
1065 Struct => {
1066 if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
1067 for field in fields {
1068 transverse_recursive(&field.dtype, map.clone(), encodings)
1069 }
1070 } else {
1071 unreachable!()
1072 }
1073 },
1074 Map => {
1075 if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1076 if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1077 for field in fields {
1078 transverse_recursive(&field.dtype, map.clone(), encodings)
1079 }
1080 } else {
1081 unreachable!()
1082 }
1083 } else {
1084 unreachable!()
1085 }
1086 },
1087 Union => todo!(),
1088 }
1089}
1090
1091pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1096 let mut encodings = vec![];
1097 transverse_recursive(dtype, map, &mut encodings);
1098 encodings
1099}