1mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::datatypes::*;
30use arrow::types::{NativeType, days_ms, i256};
31pub use nested::{num_values, write_rep_and_def};
32pub use pages::{to_leaves, to_nested, to_parquet_leaves};
33use polars_utils::pl_str::PlSmallStr;
34pub use utils::write_def_levels;
35
36pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
37pub use crate::parquet::encoding::Encoding;
38pub use crate::parquet::metadata::{
39 Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
40};
41pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
42use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
43pub use crate::parquet::schema::types::{
44 FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
45};
46pub use crate::parquet::write::{
47 Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
48 write_metadata_sidecar,
49};
50pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct StatisticsOptions {
56 pub min_value: bool,
57 pub max_value: bool,
58 pub distinct_count: bool,
59 pub null_count: bool,
60}
61
62impl Default for StatisticsOptions {
63 fn default() -> Self {
64 Self {
65 min_value: true,
66 max_value: true,
67 distinct_count: false,
68 null_count: true,
69 }
70 }
71}
72
73#[derive(Clone, Copy)]
75pub enum EncodeNullability {
76 Required,
77 Optional,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct WriteOptions {
83 pub statistics: StatisticsOptions,
85 pub version: Version,
87 pub compression: CompressionOptions,
89 pub data_page_size: Option<usize>,
91}
92
93use arrow::compute::aggregate::estimated_bytes_size;
94use arrow::match_integer_type;
95pub use file::FileWriter;
96pub use pages::{Nested, array_to_columns, arrays_to_columns};
97use polars_error::{PolarsResult, polars_bail};
98pub use row_group::{RowGroupIterator, row_group_iter};
99pub use schema::to_parquet_type;
100
101use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
102use crate::write::dictionary::encode_as_dictionary_optional;
103
104impl StatisticsOptions {
105 pub fn empty() -> Self {
106 Self {
107 min_value: false,
108 max_value: false,
109 distinct_count: false,
110 null_count: false,
111 }
112 }
113
114 pub fn full() -> Self {
115 Self {
116 min_value: true,
117 max_value: true,
118 distinct_count: true,
119 null_count: true,
120 }
121 }
122
123 pub fn is_empty(&self) -> bool {
124 !(self.min_value || self.max_value || self.distinct_count || self.null_count)
125 }
126
127 pub fn is_full(&self) -> bool {
128 self.min_value && self.max_value && self.distinct_count && self.null_count
129 }
130}
131
132impl WriteOptions {
133 pub fn has_statistics(&self) -> bool {
134 !self.statistics.is_empty()
135 }
136}
137
138impl EncodeNullability {
139 const fn new(is_optional: bool) -> Self {
140 if is_optional {
141 Self::Optional
142 } else {
143 Self::Required
144 }
145 }
146
147 fn is_optional(self) -> bool {
148 matches!(self, Self::Optional)
149 }
150}
151
152pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
154 let mut out = (0, 0);
157 for nested in nested.iter().rev() {
158 match nested {
159 Nested::LargeList(l_nested) => {
160 let start = *l_nested.offsets.first();
161 let end = *l_nested.offsets.last();
162 return (start as usize, (end - start) as usize);
163 },
164 Nested::List(l_nested) => {
165 let start = *l_nested.offsets.first();
166 let end = *l_nested.offsets.last();
167 return (start as usize, (end - start) as usize);
168 },
169 Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
170 Nested::Primitive(nested) => out = (0, nested.length),
171 Nested::Struct(_) => {},
172 }
173 }
174 out
175}
176
177fn decimal_length_from_precision(precision: usize) -> usize {
178 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
186}
187
188pub fn to_parquet_schema(schema: &ArrowSchema) -> PolarsResult<SchemaDescriptor> {
190 let parquet_types = schema
191 .iter_values()
192 .map(to_parquet_type)
193 .collect::<PolarsResult<Vec<_>>>()?;
194 Ok(SchemaDescriptor::new(
195 PlSmallStr::from_static("root"),
196 parquet_types,
197 ))
198}
199
200pub fn slice_parquet_array(
202 primitive_array: &mut dyn Array,
203 nested: &mut [Nested],
204 mut current_offset: usize,
205 mut current_length: usize,
206) {
207 for nested in nested.iter_mut() {
208 match nested {
209 Nested::LargeList(l_nested) => {
210 l_nested.offsets.slice(current_offset, current_length + 1);
211 if let Some(validity) = l_nested.validity.as_mut() {
212 validity.slice(current_offset, current_length)
213 };
214
215 current_length = l_nested.offsets.range() as usize;
217 current_offset = *l_nested.offsets.first() as usize;
218 },
219 Nested::List(l_nested) => {
220 l_nested.offsets.slice(current_offset, current_length + 1);
221 if let Some(validity) = l_nested.validity.as_mut() {
222 validity.slice(current_offset, current_length)
223 };
224
225 current_length = l_nested.offsets.range() as usize;
227 current_offset = *l_nested.offsets.first() as usize;
228 },
229 Nested::Struct(StructNested {
230 validity, length, ..
231 }) => {
232 *length = current_length;
233 if let Some(validity) = validity.as_mut() {
234 validity.slice(current_offset, current_length)
235 };
236 },
237 Nested::Primitive(PrimitiveNested {
238 validity, length, ..
239 }) => {
240 *length = current_length;
241 if let Some(validity) = validity.as_mut() {
242 validity.slice(current_offset, current_length)
243 };
244 primitive_array.slice(current_offset, current_length);
245 },
246 Nested::FixedSizeList(FixedSizeListNested {
247 validity,
248 length,
249 width,
250 ..
251 }) => {
252 if let Some(validity) = validity.as_mut() {
253 validity.slice(current_offset, current_length)
254 };
255 *length = current_length;
256 current_length *= *width;
258 current_offset *= *width;
259 },
260 }
261 }
262}
263
264pub fn get_max_length(nested: &[Nested]) -> usize {
266 let mut length = 0;
267 for nested in nested.iter() {
268 match nested {
269 Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
270 Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
271 Nested::FixedSizeList(nested) => length += nested.length * nested.width,
272 _ => {},
273 }
274 }
275 length
276}
277
278pub fn array_to_pages(
280 primitive_array: &dyn Array,
281 type_: ParquetPrimitiveType,
282 nested: &[Nested],
283 options: WriteOptions,
284 mut encoding: Encoding,
285) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
286 if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
287 return match_integer_type!(key_type, |$T| {
288 dictionary::array_to_pages::<$T>(
289 primitive_array.as_any().downcast_ref().unwrap(),
290 type_,
291 &nested,
292 options,
293 encoding,
294 )
295 });
296 };
297 if let Encoding::RleDictionary = encoding {
298 if matches!(nested.first(), Some(Nested::Primitive(_))) {
300 if let Some(result) =
301 encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
302 {
303 return result;
304 }
305 }
306
307 encoding = Encoding::Plain;
309 }
310
311 let nested = nested.to_vec();
312
313 let number_of_rows = nested[0].len();
314
315 let byte_size = estimated_bytes_size(primitive_array);
318
319 const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
320 let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
321 let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); let bytes_per_row = if number_of_rows == 0 {
323 0
324 } else {
325 ((byte_size as f64) / (number_of_rows as f64)) as usize
326 };
327 let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
328
329 let row_iter = (0..number_of_rows)
330 .step_by(rows_per_page)
331 .map(move |offset| {
332 let length = if offset + rows_per_page > number_of_rows {
333 number_of_rows - offset
334 } else {
335 rows_per_page
336 };
337 (offset, length)
338 });
339
340 let primitive_array = primitive_array.to_boxed();
341
342 let pages = row_iter.map(move |(offset, length)| {
343 let mut right_array = primitive_array.clone();
344 let mut right_nested = nested.clone();
345 slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
346
347 array_to_page(
348 right_array.as_ref(),
349 type_.clone(),
350 &right_nested,
351 options,
352 encoding,
353 )
354 });
355 Ok(DynIter::new(pages))
356}
357
358pub fn array_to_page(
360 array: &dyn Array,
361 type_: ParquetPrimitiveType,
362 nested: &[Nested],
363 options: WriteOptions,
364 encoding: Encoding,
365) -> PolarsResult<Page> {
366 if nested.len() == 1 {
367 return array_to_page_simple(array, type_, options, encoding);
369 }
370 array_to_page_nested(array, type_, nested, options, encoding)
371}
372
373pub fn array_to_page_simple(
375 array: &dyn Array,
376 type_: ParquetPrimitiveType,
377 options: WriteOptions,
378 encoding: Encoding,
379) -> PolarsResult<Page> {
380 let dtype = array.dtype();
381
382 match dtype.to_logical_type() {
383 ArrowDataType::Boolean => boolean::array_to_page(
384 array.as_any().downcast_ref().unwrap(),
385 options,
386 type_,
387 encoding,
388 ),
389 ArrowDataType::UInt8 => {
391 return primitive::array_to_page_integer::<u8, i32>(
392 array.as_any().downcast_ref().unwrap(),
393 options,
394 type_,
395 encoding,
396 );
397 },
398 ArrowDataType::UInt16 => {
399 return primitive::array_to_page_integer::<u16, i32>(
400 array.as_any().downcast_ref().unwrap(),
401 options,
402 type_,
403 encoding,
404 );
405 },
406 ArrowDataType::UInt32 => {
407 return primitive::array_to_page_integer::<u32, i32>(
408 array.as_any().downcast_ref().unwrap(),
409 options,
410 type_,
411 encoding,
412 );
413 },
414 ArrowDataType::UInt64 => {
415 return primitive::array_to_page_integer::<u64, i64>(
416 array.as_any().downcast_ref().unwrap(),
417 options,
418 type_,
419 encoding,
420 );
421 },
422 ArrowDataType::Int8 => {
423 return primitive::array_to_page_integer::<i8, i32>(
424 array.as_any().downcast_ref().unwrap(),
425 options,
426 type_,
427 encoding,
428 );
429 },
430 ArrowDataType::Int16 => {
431 return primitive::array_to_page_integer::<i16, i32>(
432 array.as_any().downcast_ref().unwrap(),
433 options,
434 type_,
435 encoding,
436 );
437 },
438 ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
439 return primitive::array_to_page_integer::<i32, i32>(
440 array.as_any().downcast_ref().unwrap(),
441 options,
442 type_,
443 encoding,
444 );
445 },
446 ArrowDataType::Int64
447 | ArrowDataType::Date64
448 | ArrowDataType::Time64(_)
449 | ArrowDataType::Timestamp(_, _)
450 | ArrowDataType::Duration(_) => {
451 return primitive::array_to_page_integer::<i64, i64>(
452 array.as_any().downcast_ref().unwrap(),
453 options,
454 type_,
455 encoding,
456 );
457 },
458 ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
459 array.as_any().downcast_ref().unwrap(),
460 options,
461 type_,
462 ),
463 ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
464 array.as_any().downcast_ref().unwrap(),
465 options,
466 type_,
467 ),
468 ArrowDataType::LargeUtf8 => {
469 let array =
470 polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
471 .unwrap();
472 return binary::array_to_page::<i64>(
473 array.as_any().downcast_ref().unwrap(),
474 options,
475 type_,
476 encoding,
477 );
478 },
479 ArrowDataType::LargeBinary => {
480 return binary::array_to_page::<i64>(
481 array.as_any().downcast_ref().unwrap(),
482 options,
483 type_,
484 encoding,
485 );
486 },
487 ArrowDataType::BinaryView => {
488 return binview::array_to_page(
489 array.as_any().downcast_ref().unwrap(),
490 options,
491 type_,
492 encoding,
493 );
494 },
495 ArrowDataType::Utf8View => {
496 let array =
497 polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
498 .unwrap();
499 return binview::array_to_page(
500 array.as_any().downcast_ref().unwrap(),
501 options,
502 type_,
503 encoding,
504 );
505 },
506 ArrowDataType::Null => {
507 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
508 primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
509 },
510 ArrowDataType::Interval(IntervalUnit::YearMonth) => {
511 let array = array
512 .as_any()
513 .downcast_ref::<PrimitiveArray<i32>>()
514 .unwrap();
515 let mut values = Vec::<u8>::with_capacity(12 * array.len());
516 array.values().iter().for_each(|x| {
517 let bytes = &x.to_le_bytes();
518 values.extend_from_slice(bytes);
519 values.extend_from_slice(&[0; 8]);
520 });
521 let array = FixedSizeBinaryArray::new(
522 ArrowDataType::FixedSizeBinary(12),
523 values.into(),
524 array.validity().cloned(),
525 );
526 let statistics = if options.has_statistics() {
527 Some(fixed_size_binary::build_statistics(
528 &array,
529 type_.clone(),
530 &options.statistics,
531 ))
532 } else {
533 None
534 };
535 fixed_size_binary::array_to_page(&array, options, type_, statistics)
536 },
537 ArrowDataType::Interval(IntervalUnit::DayTime) => {
538 let array = array
539 .as_any()
540 .downcast_ref::<PrimitiveArray<days_ms>>()
541 .unwrap();
542 let mut values = Vec::<u8>::with_capacity(12 * array.len());
543 array.values().iter().for_each(|x| {
544 let bytes = &x.to_le_bytes();
545 values.extend_from_slice(&[0; 4]); values.extend_from_slice(bytes); });
548 let array = FixedSizeBinaryArray::new(
549 ArrowDataType::FixedSizeBinary(12),
550 values.into(),
551 array.validity().cloned(),
552 );
553 let statistics = if options.has_statistics() {
554 Some(fixed_size_binary::build_statistics(
555 &array,
556 type_.clone(),
557 &options.statistics,
558 ))
559 } else {
560 None
561 };
562 fixed_size_binary::array_to_page(&array, options, type_, statistics)
563 },
564 ArrowDataType::FixedSizeBinary(_) => {
565 let array = array.as_any().downcast_ref().unwrap();
566 let statistics = if options.has_statistics() {
567 Some(fixed_size_binary::build_statistics(
568 array,
569 type_.clone(),
570 &options.statistics,
571 ))
572 } else {
573 None
574 };
575
576 fixed_size_binary::array_to_page(array, options, type_, statistics)
577 },
578 ArrowDataType::Decimal256(precision, _) => {
579 let precision = *precision;
580 let array = array
581 .as_any()
582 .downcast_ref::<PrimitiveArray<i256>>()
583 .unwrap();
584 if precision <= 9 {
585 let values = array
586 .values()
587 .iter()
588 .map(|x| x.0.as_i32())
589 .collect::<Vec<_>>()
590 .into();
591
592 let array = PrimitiveArray::<i32>::new(
593 ArrowDataType::Int32,
594 values,
595 array.validity().cloned(),
596 );
597 return primitive::array_to_page_integer::<i32, i32>(
598 &array, options, type_, encoding,
599 );
600 } else if precision <= 18 {
601 let values = array
602 .values()
603 .iter()
604 .map(|x| x.0.as_i64())
605 .collect::<Vec<_>>()
606 .into();
607
608 let array = PrimitiveArray::<i64>::new(
609 ArrowDataType::Int64,
610 values,
611 array.validity().cloned(),
612 );
613 return primitive::array_to_page_integer::<i64, i64>(
614 &array, options, type_, encoding,
615 );
616 } else if precision <= 38 {
617 let size = decimal_length_from_precision(precision);
618 let statistics = if options.has_statistics() {
619 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
620 array,
621 type_.clone(),
622 size,
623 &options.statistics,
624 );
625 Some(stats)
626 } else {
627 None
628 };
629
630 let mut values = Vec::<u8>::with_capacity(size * array.len());
631 array.values().iter().for_each(|x| {
632 let bytes = &x.0.low().to_be_bytes()[16 - size..];
633 values.extend_from_slice(bytes)
634 });
635 let array = FixedSizeBinaryArray::new(
636 ArrowDataType::FixedSizeBinary(size),
637 values.into(),
638 array.validity().cloned(),
639 );
640 fixed_size_binary::array_to_page(&array, options, type_, statistics)
641 } else {
642 let size = 32;
643 let array = array
644 .as_any()
645 .downcast_ref::<PrimitiveArray<i256>>()
646 .unwrap();
647 let statistics = if options.has_statistics() {
648 let stats = fixed_size_binary::build_statistics_decimal256(
649 array,
650 type_.clone(),
651 size,
652 &options.statistics,
653 );
654 Some(stats)
655 } else {
656 None
657 };
658 let mut values = Vec::<u8>::with_capacity(size * array.len());
659 array.values().iter().for_each(|x| {
660 let bytes = &x.to_be_bytes();
661 values.extend_from_slice(bytes)
662 });
663 let array = FixedSizeBinaryArray::new(
664 ArrowDataType::FixedSizeBinary(size),
665 values.into(),
666 array.validity().cloned(),
667 );
668
669 fixed_size_binary::array_to_page(&array, options, type_, statistics)
670 }
671 },
672 ArrowDataType::Decimal(precision, _) => {
673 let precision = *precision;
674 let array = array
675 .as_any()
676 .downcast_ref::<PrimitiveArray<i128>>()
677 .unwrap();
678 if precision <= 9 {
679 let values = array
680 .values()
681 .iter()
682 .map(|x| *x as i32)
683 .collect::<Vec<_>>()
684 .into();
685
686 let array = PrimitiveArray::<i32>::new(
687 ArrowDataType::Int32,
688 values,
689 array.validity().cloned(),
690 );
691 return primitive::array_to_page_integer::<i32, i32>(
692 &array, options, type_, encoding,
693 );
694 } else if precision <= 18 {
695 let values = array
696 .values()
697 .iter()
698 .map(|x| *x as i64)
699 .collect::<Vec<_>>()
700 .into();
701
702 let array = PrimitiveArray::<i64>::new(
703 ArrowDataType::Int64,
704 values,
705 array.validity().cloned(),
706 );
707 return primitive::array_to_page_integer::<i64, i64>(
708 &array, options, type_, encoding,
709 );
710 } else {
711 let size = decimal_length_from_precision(precision);
712
713 let statistics = if options.has_statistics() {
714 let stats = fixed_size_binary::build_statistics_decimal(
715 array,
716 type_.clone(),
717 size,
718 &options.statistics,
719 );
720 Some(stats)
721 } else {
722 None
723 };
724
725 let mut values = Vec::<u8>::with_capacity(size * array.len());
726 array.values().iter().for_each(|x| {
727 let bytes = &x.to_be_bytes()[16 - size..];
728 values.extend_from_slice(bytes)
729 });
730 let array = FixedSizeBinaryArray::new(
731 ArrowDataType::FixedSizeBinary(size),
732 values.into(),
733 array.validity().cloned(),
734 );
735 fixed_size_binary::array_to_page(&array, options, type_, statistics)
736 }
737 },
738 other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
739 }
740 .map(Page::Data)
741}
742
743fn array_to_page_nested(
744 array: &dyn Array,
745 type_: ParquetPrimitiveType,
746 nested: &[Nested],
747 options: WriteOptions,
748 _encoding: Encoding,
749) -> PolarsResult<Page> {
750 use ArrowDataType::*;
751 match array.dtype().to_logical_type() {
752 Null => {
753 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
754 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
755 },
756 Boolean => {
757 let array = array.as_any().downcast_ref().unwrap();
758 boolean::nested_array_to_page(array, options, type_, nested)
759 },
760 LargeUtf8 => {
761 let array =
762 polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
763 let array = array.as_any().downcast_ref().unwrap();
764 binary::nested_array_to_page::<i64>(array, options, type_, nested)
765 },
766 LargeBinary => {
767 let array = array.as_any().downcast_ref().unwrap();
768 binary::nested_array_to_page::<i64>(array, options, type_, nested)
769 },
770 BinaryView => {
771 let array = array.as_any().downcast_ref().unwrap();
772 binview::nested_array_to_page(array, options, type_, nested)
773 },
774 Utf8View => {
775 let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
776 let array = array.as_any().downcast_ref().unwrap();
777 binview::nested_array_to_page(array, options, type_, nested)
778 },
779 UInt8 => {
780 let array = array.as_any().downcast_ref().unwrap();
781 primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
782 },
783 UInt16 => {
784 let array = array.as_any().downcast_ref().unwrap();
785 primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
786 },
787 UInt32 => {
788 let array = array.as_any().downcast_ref().unwrap();
789 primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
790 },
791 UInt64 => {
792 let array = array.as_any().downcast_ref().unwrap();
793 primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
794 },
795 Int8 => {
796 let array = array.as_any().downcast_ref().unwrap();
797 primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
798 },
799 Int16 => {
800 let array = array.as_any().downcast_ref().unwrap();
801 primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
802 },
803 Int32 | Date32 | Time32(_) => {
804 let array = array.as_any().downcast_ref().unwrap();
805 primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
806 },
807 Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
808 let array = array.as_any().downcast_ref().unwrap();
809 primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
810 },
811 Float32 => {
812 let array = array.as_any().downcast_ref().unwrap();
813 primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
814 },
815 Float64 => {
816 let array = array.as_any().downcast_ref().unwrap();
817 primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
818 },
819 Decimal(precision, _) => {
820 let precision = *precision;
821 let array = array
822 .as_any()
823 .downcast_ref::<PrimitiveArray<i128>>()
824 .unwrap();
825 if precision <= 9 {
826 let values = array
827 .values()
828 .iter()
829 .map(|x| *x as i32)
830 .collect::<Vec<_>>()
831 .into();
832
833 let array = PrimitiveArray::<i32>::new(
834 ArrowDataType::Int32,
835 values,
836 array.validity().cloned(),
837 );
838 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
839 } else if precision <= 18 {
840 let values = array
841 .values()
842 .iter()
843 .map(|x| *x as i64)
844 .collect::<Vec<_>>()
845 .into();
846
847 let array = PrimitiveArray::<i64>::new(
848 ArrowDataType::Int64,
849 values,
850 array.validity().cloned(),
851 );
852 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
853 } else {
854 let size = decimal_length_from_precision(precision);
855
856 let statistics = if options.has_statistics() {
857 let stats = fixed_size_binary::build_statistics_decimal(
858 array,
859 type_.clone(),
860 size,
861 &options.statistics,
862 );
863 Some(stats)
864 } else {
865 None
866 };
867
868 let mut values = Vec::<u8>::with_capacity(size * array.len());
869 array.values().iter().for_each(|x| {
870 let bytes = &x.to_be_bytes()[16 - size..];
871 values.extend_from_slice(bytes)
872 });
873 let array = FixedSizeBinaryArray::new(
874 ArrowDataType::FixedSizeBinary(size),
875 values.into(),
876 array.validity().cloned(),
877 );
878 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
879 }
880 },
881 Decimal256(precision, _) => {
882 let precision = *precision;
883 let array = array
884 .as_any()
885 .downcast_ref::<PrimitiveArray<i256>>()
886 .unwrap();
887 if precision <= 9 {
888 let values = array
889 .values()
890 .iter()
891 .map(|x| x.0.as_i32())
892 .collect::<Vec<_>>()
893 .into();
894
895 let array = PrimitiveArray::<i32>::new(
896 ArrowDataType::Int32,
897 values,
898 array.validity().cloned(),
899 );
900 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
901 } else if precision <= 18 {
902 let values = array
903 .values()
904 .iter()
905 .map(|x| x.0.as_i64())
906 .collect::<Vec<_>>()
907 .into();
908
909 let array = PrimitiveArray::<i64>::new(
910 ArrowDataType::Int64,
911 values,
912 array.validity().cloned(),
913 );
914 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
915 } else if precision <= 38 {
916 let size = decimal_length_from_precision(precision);
917 let statistics = if options.has_statistics() {
918 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
919 array,
920 type_.clone(),
921 size,
922 &options.statistics,
923 );
924 Some(stats)
925 } else {
926 None
927 };
928
929 let mut values = Vec::<u8>::with_capacity(size * array.len());
930 array.values().iter().for_each(|x| {
931 let bytes = &x.0.low().to_be_bytes()[16 - size..];
932 values.extend_from_slice(bytes)
933 });
934 let array = FixedSizeBinaryArray::new(
935 ArrowDataType::FixedSizeBinary(size),
936 values.into(),
937 array.validity().cloned(),
938 );
939 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
940 } else {
941 let size = 32;
942 let array = array
943 .as_any()
944 .downcast_ref::<PrimitiveArray<i256>>()
945 .unwrap();
946 let statistics = if options.has_statistics() {
947 let stats = fixed_size_binary::build_statistics_decimal256(
948 array,
949 type_.clone(),
950 size,
951 &options.statistics,
952 );
953 Some(stats)
954 } else {
955 None
956 };
957 let mut values = Vec::<u8>::with_capacity(size * array.len());
958 array.values().iter().for_each(|x| {
959 let bytes = &x.to_be_bytes();
960 values.extend_from_slice(bytes)
961 });
962 let array = FixedSizeBinaryArray::new(
963 ArrowDataType::FixedSizeBinary(size),
964 values.into(),
965 array.validity().cloned(),
966 );
967
968 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
969 }
970 },
971 other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
972 }
973 .map(Page::Data)
974}
975
976fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
977 dtype: &ArrowDataType,
978 map: F,
979 encodings: &mut Vec<T>,
980) {
981 use arrow::datatypes::PhysicalType::*;
982 match dtype.to_physical_type() {
983 Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
984 | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
985 List | FixedSizeList | LargeList => {
986 let a = dtype.to_logical_type();
987 if let ArrowDataType::List(inner) = a {
988 transverse_recursive(&inner.dtype, map, encodings)
989 } else if let ArrowDataType::LargeList(inner) = a {
990 transverse_recursive(&inner.dtype, map, encodings)
991 } else if let ArrowDataType::FixedSizeList(inner, _) = a {
992 transverse_recursive(&inner.dtype, map, encodings)
993 } else {
994 unreachable!()
995 }
996 },
997 Struct => {
998 if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
999 for field in fields {
1000 transverse_recursive(&field.dtype, map.clone(), encodings)
1001 }
1002 } else {
1003 unreachable!()
1004 }
1005 },
1006 Map => {
1007 if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1008 if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1009 for field in fields {
1010 transverse_recursive(&field.dtype, map.clone(), encodings)
1011 }
1012 } else {
1013 unreachable!()
1014 }
1015 } else {
1016 unreachable!()
1017 }
1018 },
1019 Union => todo!(),
1020 }
1021}
1022
1023pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1028 let mut encodings = vec![];
1029 transverse_recursive(dtype, map, &mut encodings);
1030 encodings
1031}