1mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::bitmap::Bitmap;
30use arrow::datatypes::*;
31use arrow::types::{NativeType, days_ms, i256};
32pub use nested::{num_values, write_rep_and_def};
33pub use pages::{to_leaves, to_nested, to_parquet_leaves};
34use polars_utils::pl_str::PlSmallStr;
35pub use utils::write_def_levels;
36
37pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
38pub use crate::parquet::encoding::Encoding;
39pub use crate::parquet::metadata::{
40 Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
41};
42pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
43use crate::parquet::schema::Repetition;
44use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
45pub use crate::parquet::schema::types::{
46 FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
47};
48pub use crate::parquet::write::{
49 Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
50 write_metadata_sidecar,
51};
52pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
57#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
58pub struct StatisticsOptions {
59 pub min_value: bool,
60 pub max_value: bool,
61 pub distinct_count: bool,
62 pub null_count: bool,
63}
64
65impl Default for StatisticsOptions {
66 fn default() -> Self {
67 Self {
68 min_value: true,
69 max_value: true,
70 distinct_count: false,
71 null_count: true,
72 }
73 }
74}
75
76#[derive(Clone, Copy)]
78pub enum EncodeNullability {
79 Required,
80 Optional,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub struct WriteOptions {
86 pub statistics: StatisticsOptions,
88 pub version: Version,
90 pub compression: CompressionOptions,
92 pub data_page_size: Option<usize>,
94}
95
96#[derive(Clone)]
97pub struct ColumnWriteOptions {
98 pub field_id: Option<i32>,
99 pub metadata: Vec<KeyValue>,
100 pub required: Option<bool>,
101 pub children: ChildWriteOptions,
102}
103
104#[derive(Clone)]
105pub enum ChildWriteOptions {
106 Leaf(FieldWriteOptions),
107 ListLike(Box<ListLikeFieldWriteOptions>),
108 Struct(Box<StructFieldWriteOptions>),
109}
110
111impl ColumnWriteOptions {
112 pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {
113 match &self.children {
114 ChildWriteOptions::Leaf(o) => out.push(o),
115 ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),
116 ChildWriteOptions::Struct(o) => {
117 for o in &o.children {
118 o.to_leaves(out);
119 }
120 },
121 }
122 }
123}
124
125#[derive(Clone)]
126pub struct FieldWriteOptions {
127 pub encoding: Encoding,
128}
129
130impl ColumnWriteOptions {
131 pub fn default_with(children: ChildWriteOptions) -> Self {
132 Self {
133 field_id: None,
134 metadata: Vec::new(),
135 required: None,
136 children,
137 }
138 }
139}
140
141impl FieldWriteOptions {
142 pub fn default_with_encoding(encoding: Encoding) -> Self {
143 Self { encoding }
144 }
145
146 pub fn into_default_column_write_options(self) -> ColumnWriteOptions {
147 ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))
148 }
149}
150
151#[derive(Clone)]
152pub struct ListLikeFieldWriteOptions {
153 pub child: ColumnWriteOptions,
154}
155
156#[derive(Clone)]
157pub struct StructFieldWriteOptions {
158 pub children: Vec<ColumnWriteOptions>,
159}
160
161use arrow::compute::aggregate::estimated_bytes_size;
162use arrow::match_integer_type;
163pub use file::FileWriter;
164pub use pages::{Nested, array_to_columns, arrays_to_columns};
165use polars_error::{PolarsResult, polars_bail};
166pub use row_group::{RowGroupIterator, row_group_iter};
167pub use schema::{schema_to_metadata_key, to_parquet_type};
168
169use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
170use crate::write::dictionary::encode_as_dictionary_optional;
171
172impl StatisticsOptions {
173 pub fn empty() -> Self {
174 Self {
175 min_value: false,
176 max_value: false,
177 distinct_count: false,
178 null_count: false,
179 }
180 }
181
182 pub fn full() -> Self {
183 Self {
184 min_value: true,
185 max_value: true,
186 distinct_count: true,
187 null_count: true,
188 }
189 }
190
191 pub fn is_empty(&self) -> bool {
192 !(self.min_value || self.max_value || self.distinct_count || self.null_count)
193 }
194
195 pub fn is_full(&self) -> bool {
196 self.min_value && self.max_value && self.distinct_count && self.null_count
197 }
198}
199
200impl WriteOptions {
201 pub fn has_statistics(&self) -> bool {
202 !self.statistics.is_empty()
203 }
204}
205
206impl EncodeNullability {
207 const fn new(is_optional: bool) -> Self {
208 if is_optional {
209 Self::Optional
210 } else {
211 Self::Required
212 }
213 }
214
215 fn is_optional(self) -> bool {
216 matches!(self, Self::Optional)
217 }
218}
219
220pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
222 let mut out = (0, 0);
225 for nested in nested.iter().rev() {
226 match nested {
227 Nested::LargeList(l_nested) => {
228 let start = *l_nested.offsets.first();
229 let end = *l_nested.offsets.last();
230 return (start as usize, (end - start) as usize);
231 },
232 Nested::List(l_nested) => {
233 let start = *l_nested.offsets.first();
234 let end = *l_nested.offsets.last();
235 return (start as usize, (end - start) as usize);
236 },
237 Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
238 Nested::Primitive(nested) => out = (0, nested.length),
239 Nested::Struct(_) => {},
240 }
241 }
242 out
243}
244
245fn decimal_length_from_precision(precision: usize) -> usize {
246 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
254}
255
256pub fn to_parquet_schema(
258 schema: &ArrowSchema,
259 column_options: &[ColumnWriteOptions],
260) -> PolarsResult<SchemaDescriptor> {
261 let parquet_types = schema
262 .iter_values()
263 .zip(column_options)
264 .map(|(field, options)| to_parquet_type(field, options))
265 .collect::<PolarsResult<Vec<_>>>()?;
266 Ok(SchemaDescriptor::new(
267 PlSmallStr::from_static("root"),
268 parquet_types,
269 ))
270}
271
272pub fn slice_parquet_array(
274 primitive_array: &mut dyn Array,
275 nested: &mut [Nested],
276 mut current_offset: usize,
277 mut current_length: usize,
278) {
279 for nested in nested.iter_mut() {
280 match nested {
281 Nested::LargeList(l_nested) => {
282 l_nested.offsets.slice(current_offset, current_length + 1);
283 if let Some(validity) = l_nested.validity.as_mut() {
284 validity.slice(current_offset, current_length)
285 };
286
287 current_length = l_nested.offsets.range() as usize;
289 current_offset = *l_nested.offsets.first() as usize;
290 },
291 Nested::List(l_nested) => {
292 l_nested.offsets.slice(current_offset, current_length + 1);
293 if let Some(validity) = l_nested.validity.as_mut() {
294 validity.slice(current_offset, current_length)
295 };
296
297 current_length = l_nested.offsets.range() as usize;
299 current_offset = *l_nested.offsets.first() as usize;
300 },
301 Nested::Struct(StructNested {
302 validity, length, ..
303 }) => {
304 *length = current_length;
305 if let Some(validity) = validity.as_mut() {
306 validity.slice(current_offset, current_length)
307 };
308 },
309 Nested::Primitive(PrimitiveNested {
310 validity, length, ..
311 }) => {
312 *length = current_length;
313 if let Some(validity) = validity.as_mut() {
314 validity.slice(current_offset, current_length)
315 };
316 primitive_array.slice(current_offset, current_length);
317 },
318 Nested::FixedSizeList(FixedSizeListNested {
319 validity,
320 length,
321 width,
322 ..
323 }) => {
324 if let Some(validity) = validity.as_mut() {
325 validity.slice(current_offset, current_length)
326 };
327 *length = current_length;
328 current_length *= *width;
330 current_offset *= *width;
331 },
332 }
333 }
334}
335
336pub fn get_max_length(nested: &[Nested]) -> usize {
338 let mut length = 0;
339 for nested in nested.iter() {
340 match nested {
341 Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
342 Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
343 Nested::FixedSizeList(nested) => length += nested.length * nested.width,
344 _ => {},
345 }
346 }
347 length
348}
349
350pub fn array_to_pages(
352 primitive_array: &dyn Array,
353 type_: ParquetPrimitiveType,
354 nested: &[Nested],
355 options: WriteOptions,
356 field_options: &FieldWriteOptions,
357) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
358 let mut encoding = field_options.encoding;
359 if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
360 return match_integer_type!(key_type, |$T| {
361 dictionary::array_to_pages::<$T>(
362 primitive_array.as_any().downcast_ref().unwrap(),
363 type_,
364 &nested,
365 options,
366 encoding,
367 )
368 });
369 };
370 if let Encoding::RleDictionary = encoding {
371 if matches!(nested.first(), Some(Nested::Primitive(_))) {
373 if let Some(result) =
374 encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
375 {
376 return result;
377 }
378 }
379
380 encoding = Encoding::Plain;
382 }
383
384 let nested = nested.to_vec();
385
386 let number_of_rows = nested[0].len();
387
388 let byte_size = estimated_bytes_size(primitive_array);
391
392 const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
393 let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
394 let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); let bytes_per_row = if number_of_rows == 0 {
396 0
397 } else {
398 ((byte_size as f64) / (number_of_rows as f64)) as usize
399 };
400 let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
401
402 let row_iter = (0..number_of_rows)
403 .step_by(rows_per_page)
404 .map(move |offset| {
405 let length = if offset + rows_per_page > number_of_rows {
406 number_of_rows - offset
407 } else {
408 rows_per_page
409 };
410 (offset, length)
411 });
412
413 let primitive_array = primitive_array.to_boxed();
414
415 let pages = row_iter.map(move |(offset, length)| {
416 let mut right_array = primitive_array.clone();
417 let mut right_nested = nested.clone();
418 slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
419
420 array_to_page(
421 right_array.as_ref(),
422 type_.clone(),
423 &right_nested,
424 options,
425 encoding,
426 )
427 });
428 Ok(DynIter::new(pages))
429}
430
431pub fn array_to_page(
433 array: &dyn Array,
434 type_: ParquetPrimitiveType,
435 nested: &[Nested],
436 options: WriteOptions,
437 encoding: Encoding,
438) -> PolarsResult<Page> {
439 if nested.len() == 1 {
440 return array_to_page_simple(array, type_, options, encoding);
442 }
443 array_to_page_nested(array, type_, nested, options, encoding)
444}
445
446pub fn array_to_page_simple(
448 array: &dyn Array,
449 type_: ParquetPrimitiveType,
450 options: WriteOptions,
451 encoding: Encoding,
452) -> PolarsResult<Page> {
453 let dtype = array.dtype();
454
455 if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
456 polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
457 }
458
459 match dtype.to_logical_type() {
460 ArrowDataType::Struct(fs) if fs.is_empty() => boolean::array_to_page(
462 &BooleanArray::new(
463 ArrowDataType::Boolean,
464 Bitmap::new_zeroed(array.len()),
465 array.validity().cloned(),
466 ),
467 options,
468 type_,
469 encoding,
470 ),
471
472 ArrowDataType::Boolean => boolean::array_to_page(
473 array.as_any().downcast_ref().unwrap(),
474 options,
475 type_,
476 encoding,
477 ),
478 ArrowDataType::UInt8 => {
480 return primitive::array_to_page_integer::<u8, i32>(
481 array.as_any().downcast_ref().unwrap(),
482 options,
483 type_,
484 encoding,
485 );
486 },
487 ArrowDataType::UInt16 => {
488 return primitive::array_to_page_integer::<u16, i32>(
489 array.as_any().downcast_ref().unwrap(),
490 options,
491 type_,
492 encoding,
493 );
494 },
495 ArrowDataType::UInt32 => {
496 return primitive::array_to_page_integer::<u32, i32>(
497 array.as_any().downcast_ref().unwrap(),
498 options,
499 type_,
500 encoding,
501 );
502 },
503 ArrowDataType::UInt64 => {
504 return primitive::array_to_page_integer::<u64, i64>(
505 array.as_any().downcast_ref().unwrap(),
506 options,
507 type_,
508 encoding,
509 );
510 },
511 ArrowDataType::Int8 => {
512 return primitive::array_to_page_integer::<i8, i32>(
513 array.as_any().downcast_ref().unwrap(),
514 options,
515 type_,
516 encoding,
517 );
518 },
519 ArrowDataType::Int16 => {
520 return primitive::array_to_page_integer::<i16, i32>(
521 array.as_any().downcast_ref().unwrap(),
522 options,
523 type_,
524 encoding,
525 );
526 },
527 ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
528 return primitive::array_to_page_integer::<i32, i32>(
529 array.as_any().downcast_ref().unwrap(),
530 options,
531 type_,
532 encoding,
533 );
534 },
535 ArrowDataType::Int64
536 | ArrowDataType::Date64
537 | ArrowDataType::Time64(_)
538 | ArrowDataType::Timestamp(_, _)
539 | ArrowDataType::Duration(_) => {
540 return primitive::array_to_page_integer::<i64, i64>(
541 array.as_any().downcast_ref().unwrap(),
542 options,
543 type_,
544 encoding,
545 );
546 },
547 ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
548 array.as_any().downcast_ref().unwrap(),
549 options,
550 type_,
551 ),
552 ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
553 array.as_any().downcast_ref().unwrap(),
554 options,
555 type_,
556 ),
557 ArrowDataType::LargeUtf8 => {
558 let array =
559 polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
560 .unwrap();
561 return binary::array_to_page::<i64>(
562 array.as_any().downcast_ref().unwrap(),
563 options,
564 type_,
565 encoding,
566 );
567 },
568 ArrowDataType::LargeBinary => {
569 return binary::array_to_page::<i64>(
570 array.as_any().downcast_ref().unwrap(),
571 options,
572 type_,
573 encoding,
574 );
575 },
576 ArrowDataType::BinaryView => {
577 return binview::array_to_page(
578 array.as_any().downcast_ref().unwrap(),
579 options,
580 type_,
581 encoding,
582 );
583 },
584 ArrowDataType::Utf8View => {
585 let array =
586 polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
587 .unwrap();
588 return binview::array_to_page(
589 array.as_any().downcast_ref().unwrap(),
590 options,
591 type_,
592 encoding,
593 );
594 },
595 ArrowDataType::Null => {
596 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
597 primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
598 },
599 ArrowDataType::Interval(IntervalUnit::YearMonth) => {
600 let array = array
601 .as_any()
602 .downcast_ref::<PrimitiveArray<i32>>()
603 .unwrap();
604 let mut values = Vec::<u8>::with_capacity(12 * array.len());
605 array.values().iter().for_each(|x| {
606 let bytes = &x.to_le_bytes();
607 values.extend_from_slice(bytes);
608 values.extend_from_slice(&[0; 8]);
609 });
610 let array = FixedSizeBinaryArray::new(
611 ArrowDataType::FixedSizeBinary(12),
612 values.into(),
613 array.validity().cloned(),
614 );
615 let statistics = if options.has_statistics() {
616 Some(fixed_size_binary::build_statistics(
617 &array,
618 type_.clone(),
619 &options.statistics,
620 ))
621 } else {
622 None
623 };
624 fixed_size_binary::array_to_page(&array, options, type_, statistics)
625 },
626 ArrowDataType::Interval(IntervalUnit::DayTime) => {
627 let array = array
628 .as_any()
629 .downcast_ref::<PrimitiveArray<days_ms>>()
630 .unwrap();
631 let mut values = Vec::<u8>::with_capacity(12 * array.len());
632 array.values().iter().for_each(|x| {
633 let bytes = &x.to_le_bytes();
634 values.extend_from_slice(&[0; 4]); values.extend_from_slice(bytes); });
637 let array = FixedSizeBinaryArray::new(
638 ArrowDataType::FixedSizeBinary(12),
639 values.into(),
640 array.validity().cloned(),
641 );
642 let statistics = if options.has_statistics() {
643 Some(fixed_size_binary::build_statistics(
644 &array,
645 type_.clone(),
646 &options.statistics,
647 ))
648 } else {
649 None
650 };
651 fixed_size_binary::array_to_page(&array, options, type_, statistics)
652 },
653 ArrowDataType::FixedSizeBinary(_) => {
654 let array = array.as_any().downcast_ref().unwrap();
655 let statistics = if options.has_statistics() {
656 Some(fixed_size_binary::build_statistics(
657 array,
658 type_.clone(),
659 &options.statistics,
660 ))
661 } else {
662 None
663 };
664
665 fixed_size_binary::array_to_page(array, options, type_, statistics)
666 },
667 ArrowDataType::Decimal256(precision, _) => {
668 let precision = *precision;
669 let array = array
670 .as_any()
671 .downcast_ref::<PrimitiveArray<i256>>()
672 .unwrap();
673 if precision <= 9 {
674 let values = array
675 .values()
676 .iter()
677 .map(|x| x.0.as_i32())
678 .collect::<Vec<_>>()
679 .into();
680
681 let array = PrimitiveArray::<i32>::new(
682 ArrowDataType::Int32,
683 values,
684 array.validity().cloned(),
685 );
686 return primitive::array_to_page_integer::<i32, i32>(
687 &array, options, type_, encoding,
688 );
689 } else if precision <= 18 {
690 let values = array
691 .values()
692 .iter()
693 .map(|x| x.0.as_i64())
694 .collect::<Vec<_>>()
695 .into();
696
697 let array = PrimitiveArray::<i64>::new(
698 ArrowDataType::Int64,
699 values,
700 array.validity().cloned(),
701 );
702 return primitive::array_to_page_integer::<i64, i64>(
703 &array, options, type_, encoding,
704 );
705 } else if precision <= 38 {
706 let size = decimal_length_from_precision(precision);
707 let statistics = if options.has_statistics() {
708 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
709 array,
710 type_.clone(),
711 size,
712 &options.statistics,
713 );
714 Some(stats)
715 } else {
716 None
717 };
718
719 let mut values = Vec::<u8>::with_capacity(size * array.len());
720 array.values().iter().for_each(|x| {
721 let bytes = &x.0.low().to_be_bytes()[16 - size..];
722 values.extend_from_slice(bytes)
723 });
724 let array = FixedSizeBinaryArray::new(
725 ArrowDataType::FixedSizeBinary(size),
726 values.into(),
727 array.validity().cloned(),
728 );
729 fixed_size_binary::array_to_page(&array, options, type_, statistics)
730 } else {
731 let size = 32;
732 let array = array
733 .as_any()
734 .downcast_ref::<PrimitiveArray<i256>>()
735 .unwrap();
736 let statistics = if options.has_statistics() {
737 let stats = fixed_size_binary::build_statistics_decimal256(
738 array,
739 type_.clone(),
740 size,
741 &options.statistics,
742 );
743 Some(stats)
744 } else {
745 None
746 };
747 let mut values = Vec::<u8>::with_capacity(size * array.len());
748 array.values().iter().for_each(|x| {
749 let bytes = &x.to_be_bytes();
750 values.extend_from_slice(bytes)
751 });
752 let array = FixedSizeBinaryArray::new(
753 ArrowDataType::FixedSizeBinary(size),
754 values.into(),
755 array.validity().cloned(),
756 );
757
758 fixed_size_binary::array_to_page(&array, options, type_, statistics)
759 }
760 },
761 ArrowDataType::Decimal(precision, _) => {
762 let precision = *precision;
763 let array = array
764 .as_any()
765 .downcast_ref::<PrimitiveArray<i128>>()
766 .unwrap();
767 if precision <= 9 {
768 let values = array
769 .values()
770 .iter()
771 .map(|x| *x as i32)
772 .collect::<Vec<_>>()
773 .into();
774
775 let array = PrimitiveArray::<i32>::new(
776 ArrowDataType::Int32,
777 values,
778 array.validity().cloned(),
779 );
780 return primitive::array_to_page_integer::<i32, i32>(
781 &array, options, type_, encoding,
782 );
783 } else if precision <= 18 {
784 let values = array
785 .values()
786 .iter()
787 .map(|x| *x as i64)
788 .collect::<Vec<_>>()
789 .into();
790
791 let array = PrimitiveArray::<i64>::new(
792 ArrowDataType::Int64,
793 values,
794 array.validity().cloned(),
795 );
796 return primitive::array_to_page_integer::<i64, i64>(
797 &array, options, type_, encoding,
798 );
799 } else {
800 let size = decimal_length_from_precision(precision);
801
802 let statistics = if options.has_statistics() {
803 let stats = fixed_size_binary::build_statistics_decimal(
804 array,
805 type_.clone(),
806 size,
807 &options.statistics,
808 );
809 Some(stats)
810 } else {
811 None
812 };
813
814 let mut values = Vec::<u8>::with_capacity(size * array.len());
815 array.values().iter().for_each(|x| {
816 let bytes = &x.to_be_bytes()[16 - size..];
817 values.extend_from_slice(bytes)
818 });
819 let array = FixedSizeBinaryArray::new(
820 ArrowDataType::FixedSizeBinary(size),
821 values.into(),
822 array.validity().cloned(),
823 );
824 fixed_size_binary::array_to_page(&array, options, type_, statistics)
825 }
826 },
827 ArrowDataType::UInt128 => {
828 let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
829 let statistics = if options.has_statistics() {
830 let stats = fixed_size_binary::build_statistics_decimal(
831 array,
832 type_.clone(),
833 16,
834 &options.statistics,
835 );
836 Some(stats)
837 } else {
838 None
839 };
840 let array = FixedSizeBinaryArray::new(
841 ArrowDataType::FixedSizeBinary(16),
842 array.values().clone().try_transmute().unwrap(),
843 array.validity().cloned(),
844 );
845 fixed_size_binary::array_to_page(&array, options, type_, statistics)
846 },
847 ArrowDataType::Int128 => {
848 let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
849 let statistics = if options.has_statistics() {
850 let stats = fixed_size_binary::build_statistics_decimal(
851 array,
852 type_.clone(),
853 16,
854 &options.statistics,
855 );
856 Some(stats)
857 } else {
858 None
859 };
860 let array = FixedSizeBinaryArray::new(
861 ArrowDataType::FixedSizeBinary(16),
862 array.values().clone().try_transmute().unwrap(),
863 array.validity().cloned(),
864 );
865 fixed_size_binary::array_to_page(&array, options, type_, statistics)
866 },
867 other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
868 }
869 .map(Page::Data)
870}
871
872fn array_to_page_nested(
873 array: &dyn Array,
874 type_: ParquetPrimitiveType,
875 nested: &[Nested],
876 options: WriteOptions,
877 _encoding: Encoding,
878) -> PolarsResult<Page> {
879 if type_.field_info.repetition == Repetition::Required
880 && array.validity().is_some_and(|v| v.unset_bits() > 0)
881 {
882 polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
883 }
884
885 use ArrowDataType::*;
886 match array.dtype().to_logical_type() {
887 Null => {
888 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
889 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
890 },
891 Struct(fs) if fs.is_empty() => {
893 let array = BooleanArray::new(
894 ArrowDataType::Boolean,
895 Bitmap::new_zeroed(array.len()),
896 array.validity().cloned(),
897 );
898 boolean::nested_array_to_page(&array, options, type_, nested)
899 },
900 Boolean => {
901 let array = array.as_any().downcast_ref().unwrap();
902 boolean::nested_array_to_page(array, options, type_, nested)
903 },
904 LargeUtf8 => {
905 let array =
906 polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
907 let array = array.as_any().downcast_ref().unwrap();
908 binary::nested_array_to_page::<i64>(array, options, type_, nested)
909 },
910 LargeBinary => {
911 let array = array.as_any().downcast_ref().unwrap();
912 binary::nested_array_to_page::<i64>(array, options, type_, nested)
913 },
914 BinaryView => {
915 let array = array.as_any().downcast_ref().unwrap();
916 binview::nested_array_to_page(array, options, type_, nested)
917 },
918 Utf8View => {
919 let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
920 let array = array.as_any().downcast_ref().unwrap();
921 binview::nested_array_to_page(array, options, type_, nested)
922 },
923 UInt8 => {
924 let array = array.as_any().downcast_ref().unwrap();
925 primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
926 },
927 UInt16 => {
928 let array = array.as_any().downcast_ref().unwrap();
929 primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
930 },
931 UInt32 => {
932 let array = array.as_any().downcast_ref().unwrap();
933 primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
934 },
935 UInt64 => {
936 let array = array.as_any().downcast_ref().unwrap();
937 primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
938 },
939 Int8 => {
940 let array = array.as_any().downcast_ref().unwrap();
941 primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
942 },
943 Int16 => {
944 let array = array.as_any().downcast_ref().unwrap();
945 primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
946 },
947 Int32 | Date32 | Time32(_) => {
948 let array = array.as_any().downcast_ref().unwrap();
949 primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
950 },
951 Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
952 let array = array.as_any().downcast_ref().unwrap();
953 primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
954 },
955 Float32 => {
956 let array = array.as_any().downcast_ref().unwrap();
957 primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
958 },
959 Float64 => {
960 let array = array.as_any().downcast_ref().unwrap();
961 primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
962 },
963 Decimal(precision, _) => {
964 let precision = *precision;
965 let array = array
966 .as_any()
967 .downcast_ref::<PrimitiveArray<i128>>()
968 .unwrap();
969 if precision <= 9 {
970 let values = array
971 .values()
972 .iter()
973 .map(|x| *x as i32)
974 .collect::<Vec<_>>()
975 .into();
976
977 let array = PrimitiveArray::<i32>::new(
978 ArrowDataType::Int32,
979 values,
980 array.validity().cloned(),
981 );
982 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
983 } else if precision <= 18 {
984 let values = array
985 .values()
986 .iter()
987 .map(|x| *x as i64)
988 .collect::<Vec<_>>()
989 .into();
990
991 let array = PrimitiveArray::<i64>::new(
992 ArrowDataType::Int64,
993 values,
994 array.validity().cloned(),
995 );
996 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
997 } else {
998 let size = decimal_length_from_precision(precision);
999
1000 let statistics = if options.has_statistics() {
1001 let stats = fixed_size_binary::build_statistics_decimal(
1002 array,
1003 type_.clone(),
1004 size,
1005 &options.statistics,
1006 );
1007 Some(stats)
1008 } else {
1009 None
1010 };
1011
1012 let mut values = Vec::<u8>::with_capacity(size * array.len());
1013 array.values().iter().for_each(|x| {
1014 let bytes = &x.to_be_bytes()[16 - size..];
1015 values.extend_from_slice(bytes)
1016 });
1017 let array = FixedSizeBinaryArray::new(
1018 ArrowDataType::FixedSizeBinary(size),
1019 values.into(),
1020 array.validity().cloned(),
1021 );
1022 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1023 }
1024 },
1025 Decimal256(precision, _) => {
1026 let precision = *precision;
1027 let array = array
1028 .as_any()
1029 .downcast_ref::<PrimitiveArray<i256>>()
1030 .unwrap();
1031 if precision <= 9 {
1032 let values = array
1033 .values()
1034 .iter()
1035 .map(|x| x.0.as_i32())
1036 .collect::<Vec<_>>()
1037 .into();
1038
1039 let array = PrimitiveArray::<i32>::new(
1040 ArrowDataType::Int32,
1041 values,
1042 array.validity().cloned(),
1043 );
1044 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1045 } else if precision <= 18 {
1046 let values = array
1047 .values()
1048 .iter()
1049 .map(|x| x.0.as_i64())
1050 .collect::<Vec<_>>()
1051 .into();
1052
1053 let array = PrimitiveArray::<i64>::new(
1054 ArrowDataType::Int64,
1055 values,
1056 array.validity().cloned(),
1057 );
1058 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1059 } else if precision <= 38 {
1060 let size = decimal_length_from_precision(precision);
1061 let statistics = if options.has_statistics() {
1062 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1063 array,
1064 type_.clone(),
1065 size,
1066 &options.statistics,
1067 );
1068 Some(stats)
1069 } else {
1070 None
1071 };
1072
1073 let mut values = Vec::<u8>::with_capacity(size * array.len());
1074 array.values().iter().for_each(|x| {
1075 let bytes = &x.0.low().to_be_bytes()[16 - size..];
1076 values.extend_from_slice(bytes)
1077 });
1078 let array = FixedSizeBinaryArray::new(
1079 ArrowDataType::FixedSizeBinary(size),
1080 values.into(),
1081 array.validity().cloned(),
1082 );
1083 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1084 } else {
1085 let size = 32;
1086 let array = array
1087 .as_any()
1088 .downcast_ref::<PrimitiveArray<i256>>()
1089 .unwrap();
1090 let statistics = if options.has_statistics() {
1091 let stats = fixed_size_binary::build_statistics_decimal256(
1092 array,
1093 type_.clone(),
1094 size,
1095 &options.statistics,
1096 );
1097 Some(stats)
1098 } else {
1099 None
1100 };
1101 let mut values = Vec::<u8>::with_capacity(size * array.len());
1102 array.values().iter().for_each(|x| {
1103 let bytes = &x.to_be_bytes();
1104 values.extend_from_slice(bytes)
1105 });
1106 let array = FixedSizeBinaryArray::new(
1107 ArrowDataType::FixedSizeBinary(size),
1108 values.into(),
1109 array.validity().cloned(),
1110 );
1111
1112 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1113 }
1114 },
1115 Int128 => {
1116 let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1117 let statistics = if options.has_statistics() {
1118 let stats = fixed_size_binary::build_statistics_decimal(
1119 array,
1120 type_.clone(),
1121 16,
1122 &options.statistics,
1123 );
1124 Some(stats)
1125 } else {
1126 None
1127 };
1128 let array = FixedSizeBinaryArray::new(
1129 ArrowDataType::FixedSizeBinary(16),
1130 array.values().clone().try_transmute().unwrap(),
1131 array.validity().cloned(),
1132 );
1133 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1134 },
1135 UInt128 => {
1136 let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
1137 let statistics = if options.has_statistics() {
1138 let stats = fixed_size_binary::build_statistics_decimal(
1139 array,
1140 type_.clone(),
1141 16,
1142 &options.statistics,
1143 );
1144 Some(stats)
1145 } else {
1146 None
1147 };
1148 let array = FixedSizeBinaryArray::new(
1149 ArrowDataType::FixedSizeBinary(16),
1150 array.values().clone().try_transmute().unwrap(),
1151 array.validity().cloned(),
1152 );
1153 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1154 },
1155 other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1156 }
1157 .map(Page::Data)
1158}
1159
1160fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
1161 dtype: &ArrowDataType,
1162 map: F,
1163 encodings: &mut Vec<T>,
1164) {
1165 use arrow::datatypes::PhysicalType::*;
1166 match dtype.to_physical_type() {
1167 Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1168 | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
1169 List | FixedSizeList | LargeList => {
1170 let a = dtype.to_logical_type();
1171 if let ArrowDataType::List(inner) = a {
1172 transverse_recursive(&inner.dtype, map, encodings)
1173 } else if let ArrowDataType::LargeList(inner) = a {
1174 transverse_recursive(&inner.dtype, map, encodings)
1175 } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1176 transverse_recursive(&inner.dtype, map, encodings)
1177 } else {
1178 unreachable!()
1179 }
1180 },
1181 Struct => {
1182 if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
1183 for field in fields {
1184 transverse_recursive(&field.dtype, map.clone(), encodings)
1185 }
1186 } else {
1187 unreachable!()
1188 }
1189 },
1190 Map => {
1191 if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1192 if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1193 for field in fields {
1194 transverse_recursive(&field.dtype, map.clone(), encodings)
1195 }
1196 } else {
1197 unreachable!()
1198 }
1199 } else {
1200 unreachable!()
1201 }
1202 },
1203 Union => todo!(),
1204 }
1205}
1206
1207pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1212 let mut encodings = vec![];
1213 transverse_recursive(dtype, map, &mut encodings);
1214 encodings
1215}