1mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::datatypes::*;
30use arrow::types::{NativeType, days_ms, i256};
31pub use nested::{num_values, write_rep_and_def};
32pub use pages::{to_leaves, to_nested, to_parquet_leaves};
33use polars_utils::pl_str::PlSmallStr;
34pub use utils::write_def_levels;
35
36pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
37pub use crate::parquet::encoding::Encoding;
38pub use crate::parquet::metadata::{
39 Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
40};
41pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
42use crate::parquet::schema::Repetition;
43use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
44pub use crate::parquet::schema::types::{
45 FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
46};
47pub use crate::parquet::write::{
48 Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
49 write_metadata_sidecar,
50};
51pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
56#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
57pub struct StatisticsOptions {
58 pub min_value: bool,
59 pub max_value: bool,
60 pub distinct_count: bool,
61 pub null_count: bool,
62}
63
64impl Default for StatisticsOptions {
65 fn default() -> Self {
66 Self {
67 min_value: true,
68 max_value: true,
69 distinct_count: false,
70 null_count: true,
71 }
72 }
73}
74
75#[derive(Clone, Copy)]
77pub enum EncodeNullability {
78 Required,
79 Optional,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct WriteOptions {
85 pub statistics: StatisticsOptions,
87 pub version: Version,
89 pub compression: CompressionOptions,
91 pub data_page_size: Option<usize>,
93}
94
95#[derive(Clone)]
96pub struct ColumnWriteOptions {
97 pub field_id: Option<i32>,
98 pub metadata: Vec<KeyValue>,
99 pub required: Option<bool>,
100 pub children: ChildWriteOptions,
101}
102
103#[derive(Clone)]
104pub enum ChildWriteOptions {
105 Leaf(FieldWriteOptions),
106 ListLike(Box<ListLikeFieldWriteOptions>),
107 Struct(Box<StructFieldWriteOptions>),
108}
109
110impl ColumnWriteOptions {
111 pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {
112 match &self.children {
113 ChildWriteOptions::Leaf(o) => out.push(o),
114 ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),
115 ChildWriteOptions::Struct(o) => {
116 for o in &o.children {
117 o.to_leaves(out);
118 }
119 },
120 }
121 }
122}
123
124#[derive(Clone)]
125pub struct FieldWriteOptions {
126 pub encoding: Encoding,
127}
128
129impl ColumnWriteOptions {
130 pub fn default_with(children: ChildWriteOptions) -> Self {
131 Self {
132 field_id: None,
133 metadata: Vec::new(),
134 required: None,
135 children,
136 }
137 }
138}
139
140impl FieldWriteOptions {
141 pub fn default_with_encoding(encoding: Encoding) -> Self {
142 Self { encoding }
143 }
144
145 pub fn into_default_column_write_options(self) -> ColumnWriteOptions {
146 ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))
147 }
148}
149
150#[derive(Clone)]
151pub struct ListLikeFieldWriteOptions {
152 pub child: ColumnWriteOptions,
153}
154
155#[derive(Clone)]
156pub struct StructFieldWriteOptions {
157 pub children: Vec<ColumnWriteOptions>,
158}
159
160use arrow::compute::aggregate::estimated_bytes_size;
161use arrow::match_integer_type;
162pub use file::FileWriter;
163pub use pages::{Nested, array_to_columns, arrays_to_columns};
164use polars_error::{PolarsResult, polars_bail};
165pub use row_group::{RowGroupIterator, row_group_iter};
166pub use schema::{schema_to_metadata_key, to_parquet_type};
167
168use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
169use crate::write::dictionary::encode_as_dictionary_optional;
170
171impl StatisticsOptions {
172 pub fn empty() -> Self {
173 Self {
174 min_value: false,
175 max_value: false,
176 distinct_count: false,
177 null_count: false,
178 }
179 }
180
181 pub fn full() -> Self {
182 Self {
183 min_value: true,
184 max_value: true,
185 distinct_count: true,
186 null_count: true,
187 }
188 }
189
190 pub fn is_empty(&self) -> bool {
191 !(self.min_value || self.max_value || self.distinct_count || self.null_count)
192 }
193
194 pub fn is_full(&self) -> bool {
195 self.min_value && self.max_value && self.distinct_count && self.null_count
196 }
197}
198
199impl WriteOptions {
200 pub fn has_statistics(&self) -> bool {
201 !self.statistics.is_empty()
202 }
203}
204
205impl EncodeNullability {
206 const fn new(is_optional: bool) -> Self {
207 if is_optional {
208 Self::Optional
209 } else {
210 Self::Required
211 }
212 }
213
214 fn is_optional(self) -> bool {
215 matches!(self, Self::Optional)
216 }
217}
218
219pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
221 let mut out = (0, 0);
224 for nested in nested.iter().rev() {
225 match nested {
226 Nested::LargeList(l_nested) => {
227 let start = *l_nested.offsets.first();
228 let end = *l_nested.offsets.last();
229 return (start as usize, (end - start) as usize);
230 },
231 Nested::List(l_nested) => {
232 let start = *l_nested.offsets.first();
233 let end = *l_nested.offsets.last();
234 return (start as usize, (end - start) as usize);
235 },
236 Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
237 Nested::Primitive(nested) => out = (0, nested.length),
238 Nested::Struct(_) => {},
239 }
240 }
241 out
242}
243
244fn decimal_length_from_precision(precision: usize) -> usize {
245 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
253}
254
255pub fn to_parquet_schema(
257 schema: &ArrowSchema,
258 column_options: &[ColumnWriteOptions],
259) -> PolarsResult<SchemaDescriptor> {
260 let parquet_types = schema
261 .iter_values()
262 .zip(column_options)
263 .map(|(field, options)| to_parquet_type(field, options))
264 .collect::<PolarsResult<Vec<_>>>()?;
265 Ok(SchemaDescriptor::new(
266 PlSmallStr::from_static("root"),
267 parquet_types,
268 ))
269}
270
271pub fn slice_parquet_array(
273 primitive_array: &mut dyn Array,
274 nested: &mut [Nested],
275 mut current_offset: usize,
276 mut current_length: usize,
277) {
278 for nested in nested.iter_mut() {
279 match nested {
280 Nested::LargeList(l_nested) => {
281 l_nested.offsets.slice(current_offset, current_length + 1);
282 if let Some(validity) = l_nested.validity.as_mut() {
283 validity.slice(current_offset, current_length)
284 };
285
286 current_length = l_nested.offsets.range() as usize;
288 current_offset = *l_nested.offsets.first() as usize;
289 },
290 Nested::List(l_nested) => {
291 l_nested.offsets.slice(current_offset, current_length + 1);
292 if let Some(validity) = l_nested.validity.as_mut() {
293 validity.slice(current_offset, current_length)
294 };
295
296 current_length = l_nested.offsets.range() as usize;
298 current_offset = *l_nested.offsets.first() as usize;
299 },
300 Nested::Struct(StructNested {
301 validity, length, ..
302 }) => {
303 *length = current_length;
304 if let Some(validity) = validity.as_mut() {
305 validity.slice(current_offset, current_length)
306 };
307 },
308 Nested::Primitive(PrimitiveNested {
309 validity, length, ..
310 }) => {
311 *length = current_length;
312 if let Some(validity) = validity.as_mut() {
313 validity.slice(current_offset, current_length)
314 };
315 primitive_array.slice(current_offset, current_length);
316 },
317 Nested::FixedSizeList(FixedSizeListNested {
318 validity,
319 length,
320 width,
321 ..
322 }) => {
323 if let Some(validity) = validity.as_mut() {
324 validity.slice(current_offset, current_length)
325 };
326 *length = current_length;
327 current_length *= *width;
329 current_offset *= *width;
330 },
331 }
332 }
333}
334
335pub fn get_max_length(nested: &[Nested]) -> usize {
337 let mut length = 0;
338 for nested in nested.iter() {
339 match nested {
340 Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
341 Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
342 Nested::FixedSizeList(nested) => length += nested.length * nested.width,
343 _ => {},
344 }
345 }
346 length
347}
348
349pub fn array_to_pages(
351 primitive_array: &dyn Array,
352 type_: ParquetPrimitiveType,
353 nested: &[Nested],
354 options: WriteOptions,
355 field_options: &FieldWriteOptions,
356) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
357 let mut encoding = field_options.encoding;
358 if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
359 return match_integer_type!(key_type, |$T| {
360 dictionary::array_to_pages::<$T>(
361 primitive_array.as_any().downcast_ref().unwrap(),
362 type_,
363 &nested,
364 options,
365 encoding,
366 )
367 });
368 };
369 if let Encoding::RleDictionary = encoding {
370 if matches!(nested.first(), Some(Nested::Primitive(_))) {
372 if let Some(result) =
373 encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
374 {
375 return result;
376 }
377 }
378
379 encoding = Encoding::Plain;
381 }
382
383 let nested = nested.to_vec();
384
385 let number_of_rows = nested[0].len();
386
387 let byte_size = estimated_bytes_size(primitive_array);
390
391 const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
392 let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
393 let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); let bytes_per_row = if number_of_rows == 0 {
395 0
396 } else {
397 ((byte_size as f64) / (number_of_rows as f64)) as usize
398 };
399 let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
400
401 let row_iter = (0..number_of_rows)
402 .step_by(rows_per_page)
403 .map(move |offset| {
404 let length = if offset + rows_per_page > number_of_rows {
405 number_of_rows - offset
406 } else {
407 rows_per_page
408 };
409 (offset, length)
410 });
411
412 let primitive_array = primitive_array.to_boxed();
413
414 let pages = row_iter.map(move |(offset, length)| {
415 let mut right_array = primitive_array.clone();
416 let mut right_nested = nested.clone();
417 slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
418
419 array_to_page(
420 right_array.as_ref(),
421 type_.clone(),
422 &right_nested,
423 options,
424 encoding,
425 )
426 });
427 Ok(DynIter::new(pages))
428}
429
430pub fn array_to_page(
432 array: &dyn Array,
433 type_: ParquetPrimitiveType,
434 nested: &[Nested],
435 options: WriteOptions,
436 encoding: Encoding,
437) -> PolarsResult<Page> {
438 if nested.len() == 1 {
439 return array_to_page_simple(array, type_, options, encoding);
441 }
442 array_to_page_nested(array, type_, nested, options, encoding)
443}
444
445pub fn array_to_page_simple(
447 array: &dyn Array,
448 type_: ParquetPrimitiveType,
449 options: WriteOptions,
450 encoding: Encoding,
451) -> PolarsResult<Page> {
452 let dtype = array.dtype();
453
454 if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
455 polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
456 }
457
458 match dtype.to_logical_type() {
459 ArrowDataType::Boolean => boolean::array_to_page(
460 array.as_any().downcast_ref().unwrap(),
461 options,
462 type_,
463 encoding,
464 ),
465 ArrowDataType::UInt8 => {
467 return primitive::array_to_page_integer::<u8, i32>(
468 array.as_any().downcast_ref().unwrap(),
469 options,
470 type_,
471 encoding,
472 );
473 },
474 ArrowDataType::UInt16 => {
475 return primitive::array_to_page_integer::<u16, i32>(
476 array.as_any().downcast_ref().unwrap(),
477 options,
478 type_,
479 encoding,
480 );
481 },
482 ArrowDataType::UInt32 => {
483 return primitive::array_to_page_integer::<u32, i32>(
484 array.as_any().downcast_ref().unwrap(),
485 options,
486 type_,
487 encoding,
488 );
489 },
490 ArrowDataType::UInt64 => {
491 return primitive::array_to_page_integer::<u64, i64>(
492 array.as_any().downcast_ref().unwrap(),
493 options,
494 type_,
495 encoding,
496 );
497 },
498 ArrowDataType::Int8 => {
499 return primitive::array_to_page_integer::<i8, i32>(
500 array.as_any().downcast_ref().unwrap(),
501 options,
502 type_,
503 encoding,
504 );
505 },
506 ArrowDataType::Int16 => {
507 return primitive::array_to_page_integer::<i16, i32>(
508 array.as_any().downcast_ref().unwrap(),
509 options,
510 type_,
511 encoding,
512 );
513 },
514 ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
515 return primitive::array_to_page_integer::<i32, i32>(
516 array.as_any().downcast_ref().unwrap(),
517 options,
518 type_,
519 encoding,
520 );
521 },
522 ArrowDataType::Int64
523 | ArrowDataType::Date64
524 | ArrowDataType::Time64(_)
525 | ArrowDataType::Timestamp(_, _)
526 | ArrowDataType::Duration(_) => {
527 return primitive::array_to_page_integer::<i64, i64>(
528 array.as_any().downcast_ref().unwrap(),
529 options,
530 type_,
531 encoding,
532 );
533 },
534 ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
535 array.as_any().downcast_ref().unwrap(),
536 options,
537 type_,
538 ),
539 ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
540 array.as_any().downcast_ref().unwrap(),
541 options,
542 type_,
543 ),
544 ArrowDataType::LargeUtf8 => {
545 let array =
546 polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
547 .unwrap();
548 return binary::array_to_page::<i64>(
549 array.as_any().downcast_ref().unwrap(),
550 options,
551 type_,
552 encoding,
553 );
554 },
555 ArrowDataType::LargeBinary => {
556 return binary::array_to_page::<i64>(
557 array.as_any().downcast_ref().unwrap(),
558 options,
559 type_,
560 encoding,
561 );
562 },
563 ArrowDataType::BinaryView => {
564 return binview::array_to_page(
565 array.as_any().downcast_ref().unwrap(),
566 options,
567 type_,
568 encoding,
569 );
570 },
571 ArrowDataType::Utf8View => {
572 let array =
573 polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
574 .unwrap();
575 return binview::array_to_page(
576 array.as_any().downcast_ref().unwrap(),
577 options,
578 type_,
579 encoding,
580 );
581 },
582 ArrowDataType::Null => {
583 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
584 primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
585 },
586 ArrowDataType::Interval(IntervalUnit::YearMonth) => {
587 let array = array
588 .as_any()
589 .downcast_ref::<PrimitiveArray<i32>>()
590 .unwrap();
591 let mut values = Vec::<u8>::with_capacity(12 * array.len());
592 array.values().iter().for_each(|x| {
593 let bytes = &x.to_le_bytes();
594 values.extend_from_slice(bytes);
595 values.extend_from_slice(&[0; 8]);
596 });
597 let array = FixedSizeBinaryArray::new(
598 ArrowDataType::FixedSizeBinary(12),
599 values.into(),
600 array.validity().cloned(),
601 );
602 let statistics = if options.has_statistics() {
603 Some(fixed_size_binary::build_statistics(
604 &array,
605 type_.clone(),
606 &options.statistics,
607 ))
608 } else {
609 None
610 };
611 fixed_size_binary::array_to_page(&array, options, type_, statistics)
612 },
613 ArrowDataType::Interval(IntervalUnit::DayTime) => {
614 let array = array
615 .as_any()
616 .downcast_ref::<PrimitiveArray<days_ms>>()
617 .unwrap();
618 let mut values = Vec::<u8>::with_capacity(12 * array.len());
619 array.values().iter().for_each(|x| {
620 let bytes = &x.to_le_bytes();
621 values.extend_from_slice(&[0; 4]); values.extend_from_slice(bytes); });
624 let array = FixedSizeBinaryArray::new(
625 ArrowDataType::FixedSizeBinary(12),
626 values.into(),
627 array.validity().cloned(),
628 );
629 let statistics = if options.has_statistics() {
630 Some(fixed_size_binary::build_statistics(
631 &array,
632 type_.clone(),
633 &options.statistics,
634 ))
635 } else {
636 None
637 };
638 fixed_size_binary::array_to_page(&array, options, type_, statistics)
639 },
640 ArrowDataType::FixedSizeBinary(_) => {
641 let array = array.as_any().downcast_ref().unwrap();
642 let statistics = if options.has_statistics() {
643 Some(fixed_size_binary::build_statistics(
644 array,
645 type_.clone(),
646 &options.statistics,
647 ))
648 } else {
649 None
650 };
651
652 fixed_size_binary::array_to_page(array, options, type_, statistics)
653 },
654 ArrowDataType::Decimal256(precision, _) => {
655 let precision = *precision;
656 let array = array
657 .as_any()
658 .downcast_ref::<PrimitiveArray<i256>>()
659 .unwrap();
660 if precision <= 9 {
661 let values = array
662 .values()
663 .iter()
664 .map(|x| x.0.as_i32())
665 .collect::<Vec<_>>()
666 .into();
667
668 let array = PrimitiveArray::<i32>::new(
669 ArrowDataType::Int32,
670 values,
671 array.validity().cloned(),
672 );
673 return primitive::array_to_page_integer::<i32, i32>(
674 &array, options, type_, encoding,
675 );
676 } else if precision <= 18 {
677 let values = array
678 .values()
679 .iter()
680 .map(|x| x.0.as_i64())
681 .collect::<Vec<_>>()
682 .into();
683
684 let array = PrimitiveArray::<i64>::new(
685 ArrowDataType::Int64,
686 values,
687 array.validity().cloned(),
688 );
689 return primitive::array_to_page_integer::<i64, i64>(
690 &array, options, type_, encoding,
691 );
692 } else if precision <= 38 {
693 let size = decimal_length_from_precision(precision);
694 let statistics = if options.has_statistics() {
695 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
696 array,
697 type_.clone(),
698 size,
699 &options.statistics,
700 );
701 Some(stats)
702 } else {
703 None
704 };
705
706 let mut values = Vec::<u8>::with_capacity(size * array.len());
707 array.values().iter().for_each(|x| {
708 let bytes = &x.0.low().to_be_bytes()[16 - size..];
709 values.extend_from_slice(bytes)
710 });
711 let array = FixedSizeBinaryArray::new(
712 ArrowDataType::FixedSizeBinary(size),
713 values.into(),
714 array.validity().cloned(),
715 );
716 fixed_size_binary::array_to_page(&array, options, type_, statistics)
717 } else {
718 let size = 32;
719 let array = array
720 .as_any()
721 .downcast_ref::<PrimitiveArray<i256>>()
722 .unwrap();
723 let statistics = if options.has_statistics() {
724 let stats = fixed_size_binary::build_statistics_decimal256(
725 array,
726 type_.clone(),
727 size,
728 &options.statistics,
729 );
730 Some(stats)
731 } else {
732 None
733 };
734 let mut values = Vec::<u8>::with_capacity(size * array.len());
735 array.values().iter().for_each(|x| {
736 let bytes = &x.to_be_bytes();
737 values.extend_from_slice(bytes)
738 });
739 let array = FixedSizeBinaryArray::new(
740 ArrowDataType::FixedSizeBinary(size),
741 values.into(),
742 array.validity().cloned(),
743 );
744
745 fixed_size_binary::array_to_page(&array, options, type_, statistics)
746 }
747 },
748 ArrowDataType::Decimal(precision, _) => {
749 let precision = *precision;
750 let array = array
751 .as_any()
752 .downcast_ref::<PrimitiveArray<i128>>()
753 .unwrap();
754 if precision <= 9 {
755 let values = array
756 .values()
757 .iter()
758 .map(|x| *x as i32)
759 .collect::<Vec<_>>()
760 .into();
761
762 let array = PrimitiveArray::<i32>::new(
763 ArrowDataType::Int32,
764 values,
765 array.validity().cloned(),
766 );
767 return primitive::array_to_page_integer::<i32, i32>(
768 &array, options, type_, encoding,
769 );
770 } else if precision <= 18 {
771 let values = array
772 .values()
773 .iter()
774 .map(|x| *x as i64)
775 .collect::<Vec<_>>()
776 .into();
777
778 let array = PrimitiveArray::<i64>::new(
779 ArrowDataType::Int64,
780 values,
781 array.validity().cloned(),
782 );
783 return primitive::array_to_page_integer::<i64, i64>(
784 &array, options, type_, encoding,
785 );
786 } else {
787 let size = decimal_length_from_precision(precision);
788
789 let statistics = if options.has_statistics() {
790 let stats = fixed_size_binary::build_statistics_decimal(
791 array,
792 type_.clone(),
793 size,
794 &options.statistics,
795 );
796 Some(stats)
797 } else {
798 None
799 };
800
801 let mut values = Vec::<u8>::with_capacity(size * array.len());
802 array.values().iter().for_each(|x| {
803 let bytes = &x.to_be_bytes()[16 - size..];
804 values.extend_from_slice(bytes)
805 });
806 let array = FixedSizeBinaryArray::new(
807 ArrowDataType::FixedSizeBinary(size),
808 values.into(),
809 array.validity().cloned(),
810 );
811 fixed_size_binary::array_to_page(&array, options, type_, statistics)
812 }
813 },
814 ArrowDataType::Int128 => {
815 let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
816 let statistics = if options.has_statistics() {
817 let stats = fixed_size_binary::build_statistics_decimal(
818 array,
819 type_.clone(),
820 16,
821 &options.statistics,
822 );
823 Some(stats)
824 } else {
825 None
826 };
827 let array = FixedSizeBinaryArray::new(
828 ArrowDataType::FixedSizeBinary(16),
829 array.values().clone().try_transmute().unwrap(),
830 array.validity().cloned(),
831 );
832 fixed_size_binary::array_to_page(&array, options, type_, statistics)
833 },
834 other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
835 }
836 .map(Page::Data)
837}
838
839fn array_to_page_nested(
840 array: &dyn Array,
841 type_: ParquetPrimitiveType,
842 nested: &[Nested],
843 options: WriteOptions,
844 _encoding: Encoding,
845) -> PolarsResult<Page> {
846 if type_.field_info.repetition == Repetition::Required
847 && array.validity().is_some_and(|v| v.unset_bits() > 0)
848 {
849 polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
850 }
851
852 use ArrowDataType::*;
853 match array.dtype().to_logical_type() {
854 Null => {
855 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
856 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
857 },
858 Boolean => {
859 let array = array.as_any().downcast_ref().unwrap();
860 boolean::nested_array_to_page(array, options, type_, nested)
861 },
862 LargeUtf8 => {
863 let array =
864 polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
865 let array = array.as_any().downcast_ref().unwrap();
866 binary::nested_array_to_page::<i64>(array, options, type_, nested)
867 },
868 LargeBinary => {
869 let array = array.as_any().downcast_ref().unwrap();
870 binary::nested_array_to_page::<i64>(array, options, type_, nested)
871 },
872 BinaryView => {
873 let array = array.as_any().downcast_ref().unwrap();
874 binview::nested_array_to_page(array, options, type_, nested)
875 },
876 Utf8View => {
877 let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
878 let array = array.as_any().downcast_ref().unwrap();
879 binview::nested_array_to_page(array, options, type_, nested)
880 },
881 UInt8 => {
882 let array = array.as_any().downcast_ref().unwrap();
883 primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
884 },
885 UInt16 => {
886 let array = array.as_any().downcast_ref().unwrap();
887 primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
888 },
889 UInt32 => {
890 let array = array.as_any().downcast_ref().unwrap();
891 primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
892 },
893 UInt64 => {
894 let array = array.as_any().downcast_ref().unwrap();
895 primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
896 },
897 Int8 => {
898 let array = array.as_any().downcast_ref().unwrap();
899 primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
900 },
901 Int16 => {
902 let array = array.as_any().downcast_ref().unwrap();
903 primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
904 },
905 Int32 | Date32 | Time32(_) => {
906 let array = array.as_any().downcast_ref().unwrap();
907 primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
908 },
909 Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
910 let array = array.as_any().downcast_ref().unwrap();
911 primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
912 },
913 Float32 => {
914 let array = array.as_any().downcast_ref().unwrap();
915 primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
916 },
917 Float64 => {
918 let array = array.as_any().downcast_ref().unwrap();
919 primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
920 },
921 Decimal(precision, _) => {
922 let precision = *precision;
923 let array = array
924 .as_any()
925 .downcast_ref::<PrimitiveArray<i128>>()
926 .unwrap();
927 if precision <= 9 {
928 let values = array
929 .values()
930 .iter()
931 .map(|x| *x as i32)
932 .collect::<Vec<_>>()
933 .into();
934
935 let array = PrimitiveArray::<i32>::new(
936 ArrowDataType::Int32,
937 values,
938 array.validity().cloned(),
939 );
940 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
941 } else if precision <= 18 {
942 let values = array
943 .values()
944 .iter()
945 .map(|x| *x as i64)
946 .collect::<Vec<_>>()
947 .into();
948
949 let array = PrimitiveArray::<i64>::new(
950 ArrowDataType::Int64,
951 values,
952 array.validity().cloned(),
953 );
954 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
955 } else {
956 let size = decimal_length_from_precision(precision);
957
958 let statistics = if options.has_statistics() {
959 let stats = fixed_size_binary::build_statistics_decimal(
960 array,
961 type_.clone(),
962 size,
963 &options.statistics,
964 );
965 Some(stats)
966 } else {
967 None
968 };
969
970 let mut values = Vec::<u8>::with_capacity(size * array.len());
971 array.values().iter().for_each(|x| {
972 let bytes = &x.to_be_bytes()[16 - size..];
973 values.extend_from_slice(bytes)
974 });
975 let array = FixedSizeBinaryArray::new(
976 ArrowDataType::FixedSizeBinary(size),
977 values.into(),
978 array.validity().cloned(),
979 );
980 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
981 }
982 },
983 Decimal256(precision, _) => {
984 let precision = *precision;
985 let array = array
986 .as_any()
987 .downcast_ref::<PrimitiveArray<i256>>()
988 .unwrap();
989 if precision <= 9 {
990 let values = array
991 .values()
992 .iter()
993 .map(|x| x.0.as_i32())
994 .collect::<Vec<_>>()
995 .into();
996
997 let array = PrimitiveArray::<i32>::new(
998 ArrowDataType::Int32,
999 values,
1000 array.validity().cloned(),
1001 );
1002 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1003 } else if precision <= 18 {
1004 let values = array
1005 .values()
1006 .iter()
1007 .map(|x| x.0.as_i64())
1008 .collect::<Vec<_>>()
1009 .into();
1010
1011 let array = PrimitiveArray::<i64>::new(
1012 ArrowDataType::Int64,
1013 values,
1014 array.validity().cloned(),
1015 );
1016 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1017 } else if precision <= 38 {
1018 let size = decimal_length_from_precision(precision);
1019 let statistics = if options.has_statistics() {
1020 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1021 array,
1022 type_.clone(),
1023 size,
1024 &options.statistics,
1025 );
1026 Some(stats)
1027 } else {
1028 None
1029 };
1030
1031 let mut values = Vec::<u8>::with_capacity(size * array.len());
1032 array.values().iter().for_each(|x| {
1033 let bytes = &x.0.low().to_be_bytes()[16 - size..];
1034 values.extend_from_slice(bytes)
1035 });
1036 let array = FixedSizeBinaryArray::new(
1037 ArrowDataType::FixedSizeBinary(size),
1038 values.into(),
1039 array.validity().cloned(),
1040 );
1041 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1042 } else {
1043 let size = 32;
1044 let array = array
1045 .as_any()
1046 .downcast_ref::<PrimitiveArray<i256>>()
1047 .unwrap();
1048 let statistics = if options.has_statistics() {
1049 let stats = fixed_size_binary::build_statistics_decimal256(
1050 array,
1051 type_.clone(),
1052 size,
1053 &options.statistics,
1054 );
1055 Some(stats)
1056 } else {
1057 None
1058 };
1059 let mut values = Vec::<u8>::with_capacity(size * array.len());
1060 array.values().iter().for_each(|x| {
1061 let bytes = &x.to_be_bytes();
1062 values.extend_from_slice(bytes)
1063 });
1064 let array = FixedSizeBinaryArray::new(
1065 ArrowDataType::FixedSizeBinary(size),
1066 values.into(),
1067 array.validity().cloned(),
1068 );
1069
1070 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1071 }
1072 },
1073 Int128 => {
1074 let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1075 let statistics = if options.has_statistics() {
1076 let stats = fixed_size_binary::build_statistics_decimal(
1077 array,
1078 type_.clone(),
1079 16,
1080 &options.statistics,
1081 );
1082 Some(stats)
1083 } else {
1084 None
1085 };
1086 let array = FixedSizeBinaryArray::new(
1087 ArrowDataType::FixedSizeBinary(16),
1088 array.values().clone().try_transmute().unwrap(),
1089 array.validity().cloned(),
1090 );
1091 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1092 },
1093 other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1094 }
1095 .map(Page::Data)
1096}
1097
1098fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
1099 dtype: &ArrowDataType,
1100 map: F,
1101 encodings: &mut Vec<T>,
1102) {
1103 use arrow::datatypes::PhysicalType::*;
1104 match dtype.to_physical_type() {
1105 Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1106 | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
1107 List | FixedSizeList | LargeList => {
1108 let a = dtype.to_logical_type();
1109 if let ArrowDataType::List(inner) = a {
1110 transverse_recursive(&inner.dtype, map, encodings)
1111 } else if let ArrowDataType::LargeList(inner) = a {
1112 transverse_recursive(&inner.dtype, map, encodings)
1113 } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1114 transverse_recursive(&inner.dtype, map, encodings)
1115 } else {
1116 unreachable!()
1117 }
1118 },
1119 Struct => {
1120 if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
1121 for field in fields {
1122 transverse_recursive(&field.dtype, map.clone(), encodings)
1123 }
1124 } else {
1125 unreachable!()
1126 }
1127 },
1128 Map => {
1129 if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1130 if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1131 for field in fields {
1132 transverse_recursive(&field.dtype, map.clone(), encodings)
1133 }
1134 } else {
1135 unreachable!()
1136 }
1137 } else {
1138 unreachable!()
1139 }
1140 },
1141 Union => todo!(),
1142 }
1143}
1144
1145pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1150 let mut encodings = vec![];
1151 transverse_recursive(dtype, map, &mut encodings);
1152 encodings
1153}