1mod binary;
16mod binview;
17mod boolean;
18mod dictionary;
19mod file;
20mod fixed_size_binary;
21mod nested;
22mod pages;
23mod primitive;
24mod row_group;
25mod schema;
26mod utils;
27
28use arrow::array::*;
29use arrow::bitmap::Bitmap;
30use arrow::datatypes::*;
31use arrow::types::{NativeType, days_ms, i256};
32pub use nested::{num_values, write_rep_and_def};
33pub use pages::{to_leaves, to_nested, to_parquet_leaves};
34use polars_utils::float16::pf16;
35use polars_utils::pl_str::PlSmallStr;
36pub use utils::write_def_levels;
37
38pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
39pub use crate::parquet::encoding::Encoding;
40pub use crate::parquet::metadata::{
41 Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
42};
43pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
44use crate::parquet::schema::Repetition;
45use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
46pub use crate::parquet::schema::types::{
47 FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
48};
49pub use crate::parquet::write::{
50 Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
51 write_metadata_sidecar,
52};
53pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
54use crate::write::fixed_size_binary::build_statistics_float16;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
60pub struct StatisticsOptions {
61 pub min_value: bool,
62 pub max_value: bool,
63 pub distinct_count: bool,
64 pub null_count: bool,
65}
66
67impl Default for StatisticsOptions {
68 fn default() -> Self {
69 Self {
70 min_value: true,
71 max_value: true,
72 distinct_count: false,
73 null_count: true,
74 }
75 }
76}
77
78#[derive(Clone, Copy)]
80pub enum EncodeNullability {
81 Required,
82 Optional,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct WriteOptions {
88 pub statistics: StatisticsOptions,
90 pub version: Version,
92 pub compression: CompressionOptions,
94 pub data_page_size: Option<usize>,
96}
97
98use arrow::compute::aggregate::estimated_bytes_size;
99use arrow::match_integer_type;
100pub use file::FileWriter;
101pub use pages::{Nested, array_to_columns, arrays_to_columns};
102use polars_error::{PolarsResult, polars_bail};
103pub use row_group::{RowGroupIterator, row_group_iter};
104pub use schema::{schema_to_metadata_key, to_parquet_type};
105
106use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
107use crate::write::dictionary::encode_as_dictionary_optional;
108
109impl StatisticsOptions {
110 pub fn empty() -> Self {
111 Self {
112 min_value: false,
113 max_value: false,
114 distinct_count: false,
115 null_count: false,
116 }
117 }
118
119 pub fn full() -> Self {
120 Self {
121 min_value: true,
122 max_value: true,
123 distinct_count: true,
124 null_count: true,
125 }
126 }
127
128 pub fn is_empty(&self) -> bool {
129 !(self.min_value || self.max_value || self.distinct_count || self.null_count)
130 }
131
132 pub fn is_full(&self) -> bool {
133 self.min_value && self.max_value && self.distinct_count && self.null_count
134 }
135}
136
137impl WriteOptions {
138 pub fn has_statistics(&self) -> bool {
139 !self.statistics.is_empty()
140 }
141}
142
143impl EncodeNullability {
144 const fn new(is_optional: bool) -> Self {
145 if is_optional {
146 Self::Optional
147 } else {
148 Self::Required
149 }
150 }
151
152 fn is_optional(self) -> bool {
153 matches!(self, Self::Optional)
154 }
155}
156
157pub(crate) fn row_slice_ranges(
161 number_of_rows: usize,
162 byte_size: usize,
163 options: WriteOptions,
164) -> impl Iterator<Item = (usize, usize)> {
165 const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
167 let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); let bytes_per_row = if number_of_rows == 0 {
170 0
171 } else {
172 ((byte_size as f64) / (number_of_rows as f64)) as usize
173 };
174 let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
175
176 (0..number_of_rows)
177 .step_by(rows_per_page)
178 .map(move |offset| {
179 let length = (offset + rows_per_page).min(number_of_rows) - offset;
180 (offset, length)
181 })
182}
183
184pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
186 let mut out = (0, 0);
189 for nested in nested.iter().rev() {
190 match nested {
191 Nested::LargeList(l_nested) => {
192 let start = *l_nested.offsets.first();
193 let end = *l_nested.offsets.last();
194 return (start as usize, (end - start) as usize);
195 },
196 Nested::List(l_nested) => {
197 let start = *l_nested.offsets.first();
198 let end = *l_nested.offsets.last();
199 return (start as usize, (end - start) as usize);
200 },
201 Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
202 Nested::Primitive(nested) => out = (0, nested.length),
203 Nested::Struct(_) => {},
204 }
205 }
206 out
207}
208
209fn decimal_length_from_precision(precision: usize) -> usize {
210 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
218}
219
220pub fn to_parquet_schema(schema: &ArrowSchema) -> PolarsResult<SchemaDescriptor> {
222 let parquet_types = schema
223 .iter_values()
224 .map(to_parquet_type)
225 .collect::<PolarsResult<Vec<_>>>()?;
226 Ok(SchemaDescriptor::new(
227 PlSmallStr::from_static("root"),
228 parquet_types,
229 ))
230}
231
232pub fn slice_parquet_array(
234 primitive_array: &mut dyn Array,
235 nested: &mut [Nested],
236 mut current_offset: usize,
237 mut current_length: usize,
238) {
239 for nested in nested.iter_mut() {
240 match nested {
241 Nested::LargeList(l_nested) => {
242 l_nested.offsets.slice(current_offset, current_length + 1);
243 if let Some(validity) = l_nested.validity.as_mut() {
244 validity.slice(current_offset, current_length)
245 };
246
247 current_length = l_nested.offsets.range() as usize;
249 current_offset = *l_nested.offsets.first() as usize;
250 },
251 Nested::List(l_nested) => {
252 l_nested.offsets.slice(current_offset, current_length + 1);
253 if let Some(validity) = l_nested.validity.as_mut() {
254 validity.slice(current_offset, current_length)
255 };
256
257 current_length = l_nested.offsets.range() as usize;
259 current_offset = *l_nested.offsets.first() as usize;
260 },
261 Nested::Struct(StructNested {
262 validity, length, ..
263 }) => {
264 *length = current_length;
265 if let Some(validity) = validity.as_mut() {
266 validity.slice(current_offset, current_length)
267 };
268 },
269 Nested::Primitive(PrimitiveNested {
270 validity, length, ..
271 }) => {
272 *length = current_length;
273 if let Some(validity) = validity.as_mut() {
274 validity.slice(current_offset, current_length)
275 };
276 primitive_array.slice(current_offset, current_length);
277 },
278 Nested::FixedSizeList(FixedSizeListNested {
279 validity,
280 length,
281 width,
282 ..
283 }) => {
284 if let Some(validity) = validity.as_mut() {
285 validity.slice(current_offset, current_length)
286 };
287 *length = current_length;
288 current_length *= *width;
290 current_offset *= *width;
291 },
292 }
293 }
294}
295
296pub fn get_max_length(nested: &[Nested]) -> usize {
298 let mut length = 0;
299 for nested in nested.iter() {
300 match nested {
301 Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
302 Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
303 Nested::FixedSizeList(nested) => length += nested.length * nested.width,
304 _ => {},
305 }
306 }
307 length
308}
309
310pub fn array_to_pages(
312 primitive_array: &dyn Array,
313 type_: ParquetPrimitiveType,
314 nested: &[Nested],
315 options: WriteOptions,
316 mut encoding: Encoding,
317) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
318 if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_storage() {
319 return match_integer_type!(key_type, |$T| {
320 dictionary::array_to_pages::<$T>(
321 primitive_array.as_any().downcast_ref().unwrap(),
322 type_,
323 &nested,
324 options,
325 encoding,
326 )
327 });
328 };
329 if let Encoding::RleDictionary = encoding {
330 if matches!(nested.first(), Some(Nested::Primitive(_))) {
332 if let Some(result) =
333 encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
334 {
335 return result;
336 }
337 }
338
339 encoding = Encoding::Plain;
341 }
342
343 let nested = nested.to_vec();
344 let number_of_rows = nested[0].len();
345 let byte_size = estimated_bytes_size(primitive_array);
348 let primitive_array = primitive_array.to_boxed();
349
350 let pages =
351 row_slice_ranges(number_of_rows, byte_size, options).map(move |(offset, length)| {
352 let mut right_array = primitive_array.clone();
353 let mut right_nested = nested.clone();
354 slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
355
356 array_to_page(
357 right_array.as_ref(),
358 type_.clone(),
359 &right_nested,
360 options,
361 encoding,
362 )
363 });
364 Ok(DynIter::new(pages))
365}
366
367pub fn array_to_page(
369 array: &dyn Array,
370 type_: ParquetPrimitiveType,
371 nested: &[Nested],
372 options: WriteOptions,
373 encoding: Encoding,
374) -> PolarsResult<Page> {
375 if nested.len() == 1 {
376 return array_to_page_simple(array, type_, options, encoding);
378 }
379 array_to_page_nested(array, type_, nested, options, encoding)
380}
381
382pub fn array_to_page_simple(
384 array: &dyn Array,
385 type_: ParquetPrimitiveType,
386 options: WriteOptions,
387 encoding: Encoding,
388) -> PolarsResult<Page> {
389 let dtype = array.dtype();
390
391 if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
392 polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
393 }
394
395 match dtype {
396 ArrowDataType::Struct(fs) if fs.is_empty() => boolean::array_to_page(
398 &BooleanArray::new(
399 ArrowDataType::Boolean,
400 Bitmap::new_zeroed(array.len()),
401 array.validity().cloned(),
402 ),
403 options,
404 type_,
405 encoding,
406 ),
407
408 ArrowDataType::Boolean => boolean::array_to_page(
409 array.as_any().downcast_ref().unwrap(),
410 options,
411 type_,
412 encoding,
413 ),
414 ArrowDataType::UInt8 => {
416 return primitive::array_to_page_integer::<u8, i32>(
417 array.as_any().downcast_ref().unwrap(),
418 options,
419 type_,
420 encoding,
421 );
422 },
423 ArrowDataType::UInt16 => {
424 return primitive::array_to_page_integer::<u16, i32>(
425 array.as_any().downcast_ref().unwrap(),
426 options,
427 type_,
428 encoding,
429 );
430 },
431 ArrowDataType::UInt32 => {
432 return primitive::array_to_page_integer::<u32, i32>(
433 array.as_any().downcast_ref().unwrap(),
434 options,
435 type_,
436 encoding,
437 );
438 },
439 ArrowDataType::UInt64 => {
440 return primitive::array_to_page_integer::<u64, i64>(
441 array.as_any().downcast_ref().unwrap(),
442 options,
443 type_,
444 encoding,
445 );
446 },
447 ArrowDataType::Int8 => {
448 return primitive::array_to_page_integer::<i8, i32>(
449 array.as_any().downcast_ref().unwrap(),
450 options,
451 type_,
452 encoding,
453 );
454 },
455 ArrowDataType::Int16 => {
456 return primitive::array_to_page_integer::<i16, i32>(
457 array.as_any().downcast_ref().unwrap(),
458 options,
459 type_,
460 encoding,
461 );
462 },
463 ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
464 return primitive::array_to_page_integer::<i32, i32>(
465 array.as_any().downcast_ref().unwrap(),
466 options,
467 type_,
468 encoding,
469 );
470 },
471 ArrowDataType::Int64
472 | ArrowDataType::Date64
473 | ArrowDataType::Time64(_)
474 | ArrowDataType::Timestamp(_, _)
475 | ArrowDataType::Duration(_) => {
476 return primitive::array_to_page_integer::<i64, i64>(
477 array.as_any().downcast_ref().unwrap(),
478 options,
479 type_,
480 encoding,
481 );
482 },
483 ArrowDataType::Float16 => {
484 let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();
485 let statistics = options
486 .has_statistics()
487 .then(|| build_statistics_float16(array, type_.clone(), &options.statistics));
488 let array = FixedSizeBinaryArray::new(
489 ArrowDataType::FixedSizeBinary(2),
490 array.values().clone().try_transmute().unwrap(),
491 array.validity().cloned(),
492 );
493 fixed_size_binary::array_to_page(&array, options, type_, statistics)
494 },
495 ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
496 array.as_any().downcast_ref().unwrap(),
497 options,
498 type_,
499 ),
500 ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
501 array.as_any().downcast_ref().unwrap(),
502 options,
503 type_,
504 ),
505 ArrowDataType::LargeUtf8 => {
506 let array =
507 polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
508 .unwrap();
509 return binary::array_to_page::<i64>(
510 array.as_any().downcast_ref().unwrap(),
511 options,
512 type_,
513 encoding,
514 );
515 },
516 ArrowDataType::LargeBinary => {
517 return binary::array_to_page::<i64>(
518 array.as_any().downcast_ref().unwrap(),
519 options,
520 type_,
521 encoding,
522 );
523 },
524 ArrowDataType::BinaryView => {
525 return binview::array_to_page(
526 array.as_any().downcast_ref().unwrap(),
527 options,
528 type_,
529 encoding,
530 );
531 },
532 ArrowDataType::Utf8View => {
533 let array =
534 polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
535 .unwrap();
536 return binview::array_to_page(
537 array.as_any().downcast_ref().unwrap(),
538 options,
539 type_,
540 encoding,
541 );
542 },
543 ArrowDataType::Null => {
544 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
545 primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
546 },
547 ArrowDataType::Interval(IntervalUnit::YearMonth) => {
548 let array = array
549 .as_any()
550 .downcast_ref::<PrimitiveArray<i32>>()
551 .unwrap();
552 let mut values = Vec::<u8>::with_capacity(12 * array.len());
553 array.values().iter().for_each(|x| {
554 let bytes = &x.to_le_bytes();
555 values.extend_from_slice(bytes);
556 values.extend_from_slice(&[0; 8]);
557 });
558 let array = FixedSizeBinaryArray::new(
559 ArrowDataType::FixedSizeBinary(12),
560 values.into(),
561 array.validity().cloned(),
562 );
563 let statistics = if options.has_statistics() {
564 Some(fixed_size_binary::build_statistics(
565 &array,
566 type_.clone(),
567 &options.statistics,
568 ))
569 } else {
570 None
571 };
572 fixed_size_binary::array_to_page(&array, options, type_, statistics)
573 },
574 ArrowDataType::Interval(IntervalUnit::DayTime) => {
575 let array = array
576 .as_any()
577 .downcast_ref::<PrimitiveArray<days_ms>>()
578 .unwrap();
579 let mut values = Vec::<u8>::with_capacity(12 * array.len());
580 array.values().iter().for_each(|x| {
581 let bytes = &x.to_le_bytes();
582 values.extend_from_slice(&[0; 4]); values.extend_from_slice(bytes); });
585 let array = FixedSizeBinaryArray::new(
586 ArrowDataType::FixedSizeBinary(12),
587 values.into(),
588 array.validity().cloned(),
589 );
590 let statistics = if options.has_statistics() {
591 Some(fixed_size_binary::build_statistics(
592 &array,
593 type_.clone(),
594 &options.statistics,
595 ))
596 } else {
597 None
598 };
599 fixed_size_binary::array_to_page(&array, options, type_, statistics)
600 },
601 ArrowDataType::FixedSizeBinary(_) => {
602 let array = array.as_any().downcast_ref().unwrap();
603 let statistics = if options.has_statistics() {
604 Some(fixed_size_binary::build_statistics(
605 array,
606 type_.clone(),
607 &options.statistics,
608 ))
609 } else {
610 None
611 };
612
613 fixed_size_binary::array_to_page(array, options, type_, statistics)
614 },
615 ArrowDataType::Decimal256(precision, _) => {
616 let precision = *precision;
617 let array = array
618 .as_any()
619 .downcast_ref::<PrimitiveArray<i256>>()
620 .unwrap();
621 if precision <= 9 {
622 let values = array
623 .values()
624 .iter()
625 .map(|x| x.0.as_i32())
626 .collect::<Vec<_>>()
627 .into();
628
629 let array = PrimitiveArray::<i32>::new(
630 ArrowDataType::Int32,
631 values,
632 array.validity().cloned(),
633 );
634 return primitive::array_to_page_integer::<i32, i32>(
635 &array, options, type_, encoding,
636 );
637 } else if precision <= 18 {
638 let values = array
639 .values()
640 .iter()
641 .map(|x| x.0.as_i64())
642 .collect::<Vec<_>>()
643 .into();
644
645 let array = PrimitiveArray::<i64>::new(
646 ArrowDataType::Int64,
647 values,
648 array.validity().cloned(),
649 );
650 return primitive::array_to_page_integer::<i64, i64>(
651 &array, options, type_, encoding,
652 );
653 } else if precision <= 38 {
654 let size = decimal_length_from_precision(precision);
655 let statistics = if options.has_statistics() {
656 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
657 array,
658 type_.clone(),
659 size,
660 &options.statistics,
661 );
662 Some(stats)
663 } else {
664 None
665 };
666
667 let mut values = Vec::<u8>::with_capacity(size * array.len());
668 array.values().iter().for_each(|x| {
669 let bytes = &x.0.low().to_be_bytes()[16 - size..];
670 values.extend_from_slice(bytes)
671 });
672 let array = FixedSizeBinaryArray::new(
673 ArrowDataType::FixedSizeBinary(size),
674 values.into(),
675 array.validity().cloned(),
676 );
677 fixed_size_binary::array_to_page(&array, options, type_, statistics)
678 } else {
679 let size = 32;
680 let array = array
681 .as_any()
682 .downcast_ref::<PrimitiveArray<i256>>()
683 .unwrap();
684 let statistics = if options.has_statistics() {
685 let stats = fixed_size_binary::build_statistics_decimal256(
686 array,
687 type_.clone(),
688 size,
689 &options.statistics,
690 );
691 Some(stats)
692 } else {
693 None
694 };
695 let mut values = Vec::<u8>::with_capacity(size * array.len());
696 array.values().iter().for_each(|x| {
697 let bytes = &x.to_be_bytes();
698 values.extend_from_slice(bytes)
699 });
700 let array = FixedSizeBinaryArray::new(
701 ArrowDataType::FixedSizeBinary(size),
702 values.into(),
703 array.validity().cloned(),
704 );
705
706 fixed_size_binary::array_to_page(&array, options, type_, statistics)
707 }
708 },
709 ArrowDataType::Decimal(precision, _) => {
710 let precision = *precision;
711 let array = array
712 .as_any()
713 .downcast_ref::<PrimitiveArray<i128>>()
714 .unwrap();
715 if precision <= 9 {
716 let values = array
717 .values()
718 .iter()
719 .map(|x| *x as i32)
720 .collect::<Vec<_>>()
721 .into();
722
723 let array = PrimitiveArray::<i32>::new(
724 ArrowDataType::Int32,
725 values,
726 array.validity().cloned(),
727 );
728 return primitive::array_to_page_integer::<i32, i32>(
729 &array, options, type_, encoding,
730 );
731 } else if precision <= 18 {
732 let values = array
733 .values()
734 .iter()
735 .map(|x| *x as i64)
736 .collect::<Vec<_>>()
737 .into();
738
739 let array = PrimitiveArray::<i64>::new(
740 ArrowDataType::Int64,
741 values,
742 array.validity().cloned(),
743 );
744 return primitive::array_to_page_integer::<i64, i64>(
745 &array, options, type_, encoding,
746 );
747 } else {
748 let size = decimal_length_from_precision(precision);
749
750 let statistics = if options.has_statistics() {
751 let stats = fixed_size_binary::build_statistics_decimal(
752 array,
753 type_.clone(),
754 size,
755 &options.statistics,
756 );
757 Some(stats)
758 } else {
759 None
760 };
761
762 let mut values = Vec::<u8>::with_capacity(size * array.len());
763 array.values().iter().for_each(|x| {
764 let bytes = &x.to_be_bytes()[16 - size..];
765 values.extend_from_slice(bytes)
766 });
767 let array = FixedSizeBinaryArray::new(
768 ArrowDataType::FixedSizeBinary(size),
769 values.into(),
770 array.validity().cloned(),
771 );
772 fixed_size_binary::array_to_page(&array, options, type_, statistics)
773 }
774 },
775 ArrowDataType::UInt128 => {
776 let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
777 let statistics = if options.has_statistics() {
778 let stats = fixed_size_binary::build_statistics_decimal(
779 array,
780 type_.clone(),
781 16,
782 &options.statistics,
783 );
784 Some(stats)
785 } else {
786 None
787 };
788 let array = FixedSizeBinaryArray::new(
789 ArrowDataType::FixedSizeBinary(16),
790 array.values().clone().try_transmute().unwrap(),
791 array.validity().cloned(),
792 );
793 fixed_size_binary::array_to_page(&array, options, type_, statistics)
794 },
795 ArrowDataType::Int128 => {
796 let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
797 let statistics = if options.has_statistics() {
798 let stats = fixed_size_binary::build_statistics_decimal(
799 array,
800 type_.clone(),
801 16,
802 &options.statistics,
803 );
804 Some(stats)
805 } else {
806 None
807 };
808 let array = FixedSizeBinaryArray::new(
809 ArrowDataType::FixedSizeBinary(16),
810 array.values().clone().try_transmute().unwrap(),
811 array.validity().cloned(),
812 );
813 fixed_size_binary::array_to_page(&array, options, type_, statistics)
814 },
815 ArrowDataType::Extension(ext) => {
816 let mut boxed = array.to_boxed();
817 assert!(matches!(boxed.dtype(), ArrowDataType::Extension(ext2) if ext2 == ext));
818 *boxed.dtype_mut() = ext.inner.clone();
819 return array_to_page_simple(boxed.as_ref(), type_, options, encoding);
820 },
821 other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
822 }
823 .map(Page::Data)
824}
825
826fn array_to_page_nested(
827 array: &dyn Array,
828 type_: ParquetPrimitiveType,
829 nested: &[Nested],
830 options: WriteOptions,
831 _encoding: Encoding,
832) -> PolarsResult<Page> {
833 if type_.field_info.repetition == Repetition::Required
834 && array.validity().is_some_and(|v| v.unset_bits() > 0)
835 {
836 polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
837 }
838
839 use ArrowDataType::*;
840 match array.dtype().to_storage() {
841 Null => {
842 let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
843 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
844 },
845 Struct(fs) if fs.is_empty() => {
847 let array = BooleanArray::new(
848 ArrowDataType::Boolean,
849 Bitmap::new_zeroed(array.len()),
850 array.validity().cloned(),
851 );
852 boolean::nested_array_to_page(&array, options, type_, nested)
853 },
854 Boolean => {
855 let array = array.as_any().downcast_ref().unwrap();
856 boolean::nested_array_to_page(array, options, type_, nested)
857 },
858 LargeUtf8 => {
859 let array =
860 polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
861 let array = array.as_any().downcast_ref().unwrap();
862 binary::nested_array_to_page::<i64>(array, options, type_, nested)
863 },
864 LargeBinary => {
865 let array = array.as_any().downcast_ref().unwrap();
866 binary::nested_array_to_page::<i64>(array, options, type_, nested)
867 },
868 BinaryView => {
869 let array = array.as_any().downcast_ref().unwrap();
870 binview::nested_array_to_page(array, options, type_, nested)
871 },
872 Utf8View => {
873 let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
874 let array = array.as_any().downcast_ref().unwrap();
875 binview::nested_array_to_page(array, options, type_, nested)
876 },
877 UInt8 => {
878 let array = array.as_any().downcast_ref().unwrap();
879 primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
880 },
881 UInt16 => {
882 let array = array.as_any().downcast_ref().unwrap();
883 primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
884 },
885 UInt32 => {
886 let array = array.as_any().downcast_ref().unwrap();
887 primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
888 },
889 UInt64 => {
890 let array = array.as_any().downcast_ref().unwrap();
891 primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
892 },
893 Int8 => {
894 let array = array.as_any().downcast_ref().unwrap();
895 primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
896 },
897 Int16 => {
898 let array = array.as_any().downcast_ref().unwrap();
899 primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
900 },
901 Int32 | Date32 | Time32(_) => {
902 let array = array.as_any().downcast_ref().unwrap();
903 primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
904 },
905 Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
906 let array = array.as_any().downcast_ref().unwrap();
907 primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
908 },
909 Float16 => {
910 let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();
911 let statistics = options
912 .has_statistics()
913 .then(|| build_statistics_float16(array, type_.clone(), &options.statistics));
914 let array = FixedSizeBinaryArray::new(
915 ArrowDataType::FixedSizeBinary(2),
916 array.values().clone().try_transmute().unwrap(),
917 array.validity().cloned(),
918 );
919 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
920 },
921 Float32 => {
922 let array = array.as_any().downcast_ref().unwrap();
923 primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
924 },
925 Float64 => {
926 let array = array.as_any().downcast_ref().unwrap();
927 primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
928 },
929 Decimal(precision, _) => {
930 let precision = *precision;
931 let array = array
932 .as_any()
933 .downcast_ref::<PrimitiveArray<i128>>()
934 .unwrap();
935 if precision <= 9 {
936 let values = array
937 .values()
938 .iter()
939 .map(|x| *x as i32)
940 .collect::<Vec<_>>()
941 .into();
942
943 let array = PrimitiveArray::<i32>::new(
944 ArrowDataType::Int32,
945 values,
946 array.validity().cloned(),
947 );
948 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
949 } else if precision <= 18 {
950 let values = array
951 .values()
952 .iter()
953 .map(|x| *x as i64)
954 .collect::<Vec<_>>()
955 .into();
956
957 let array = PrimitiveArray::<i64>::new(
958 ArrowDataType::Int64,
959 values,
960 array.validity().cloned(),
961 );
962 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
963 } else {
964 let size = decimal_length_from_precision(precision);
965
966 let statistics = if options.has_statistics() {
967 let stats = fixed_size_binary::build_statistics_decimal(
968 array,
969 type_.clone(),
970 size,
971 &options.statistics,
972 );
973 Some(stats)
974 } else {
975 None
976 };
977
978 let mut values = Vec::<u8>::with_capacity(size * array.len());
979 array.values().iter().for_each(|x| {
980 let bytes = &x.to_be_bytes()[16 - size..];
981 values.extend_from_slice(bytes)
982 });
983 let array = FixedSizeBinaryArray::new(
984 ArrowDataType::FixedSizeBinary(size),
985 values.into(),
986 array.validity().cloned(),
987 );
988 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
989 }
990 },
991 Decimal256(precision, _) => {
992 let precision = *precision;
993 let array = array
994 .as_any()
995 .downcast_ref::<PrimitiveArray<i256>>()
996 .unwrap();
997 if precision <= 9 {
998 let values = array
999 .values()
1000 .iter()
1001 .map(|x| x.0.as_i32())
1002 .collect::<Vec<_>>()
1003 .into();
1004
1005 let array = PrimitiveArray::<i32>::new(
1006 ArrowDataType::Int32,
1007 values,
1008 array.validity().cloned(),
1009 );
1010 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1011 } else if precision <= 18 {
1012 let values = array
1013 .values()
1014 .iter()
1015 .map(|x| x.0.as_i64())
1016 .collect::<Vec<_>>()
1017 .into();
1018
1019 let array = PrimitiveArray::<i64>::new(
1020 ArrowDataType::Int64,
1021 values,
1022 array.validity().cloned(),
1023 );
1024 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1025 } else if precision <= 38 {
1026 let size = decimal_length_from_precision(precision);
1027 let statistics = if options.has_statistics() {
1028 let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1029 array,
1030 type_.clone(),
1031 size,
1032 &options.statistics,
1033 );
1034 Some(stats)
1035 } else {
1036 None
1037 };
1038
1039 let mut values = Vec::<u8>::with_capacity(size * array.len());
1040 array.values().iter().for_each(|x| {
1041 let bytes = &x.0.low().to_be_bytes()[16 - size..];
1042 values.extend_from_slice(bytes)
1043 });
1044 let array = FixedSizeBinaryArray::new(
1045 ArrowDataType::FixedSizeBinary(size),
1046 values.into(),
1047 array.validity().cloned(),
1048 );
1049 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1050 } else {
1051 let size = 32;
1052 let array = array
1053 .as_any()
1054 .downcast_ref::<PrimitiveArray<i256>>()
1055 .unwrap();
1056 let statistics = if options.has_statistics() {
1057 let stats = fixed_size_binary::build_statistics_decimal256(
1058 array,
1059 type_.clone(),
1060 size,
1061 &options.statistics,
1062 );
1063 Some(stats)
1064 } else {
1065 None
1066 };
1067 let mut values = Vec::<u8>::with_capacity(size * array.len());
1068 array.values().iter().for_each(|x| {
1069 let bytes = &x.to_be_bytes();
1070 values.extend_from_slice(bytes)
1071 });
1072 let array = FixedSizeBinaryArray::new(
1073 ArrowDataType::FixedSizeBinary(size),
1074 values.into(),
1075 array.validity().cloned(),
1076 );
1077
1078 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1079 }
1080 },
1081 Int128 => {
1082 let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1083 let mut no_mm_options = options;
1085 no_mm_options.statistics.min_value = false;
1086 no_mm_options.statistics.max_value = false;
1087 let statistics = if no_mm_options.has_statistics() {
1088 let stats = fixed_size_binary::build_statistics_decimal(
1089 array,
1090 type_.clone(),
1091 16,
1092 &no_mm_options.statistics,
1093 );
1094 Some(stats)
1095 } else {
1096 None
1097 };
1098 let array = FixedSizeBinaryArray::new(
1099 ArrowDataType::FixedSizeBinary(16),
1100 array.values().clone().try_transmute().unwrap(),
1101 array.validity().cloned(),
1102 );
1103 fixed_size_binary::nested_array_to_page(
1104 &array,
1105 no_mm_options,
1106 type_,
1107 nested,
1108 statistics,
1109 )
1110 },
1111 UInt128 => {
1112 let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
1113 let statistics = if options.has_statistics() {
1114 let stats = fixed_size_binary::build_statistics_decimal(
1115 array,
1116 type_.clone(),
1117 16,
1118 &options.statistics,
1119 );
1120 Some(stats)
1121 } else {
1122 None
1123 };
1124 let array = FixedSizeBinaryArray::new(
1125 ArrowDataType::FixedSizeBinary(16),
1126 array.values().clone().try_transmute().unwrap(),
1127 array.validity().cloned(),
1128 );
1129 fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1130 },
1131 other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1132 }
1133 .map(Page::Data)
1134}
1135
1136fn get_encodings_recursive(dtype: &ArrowDataType, encodings: &mut Vec<Encoding>) {
1137 use arrow::datatypes::PhysicalType::*;
1138 match dtype.to_physical_type() {
1139 Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1140 | Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => {
1141 encodings.push(get_primitive_dtype_encoding(dtype))
1142 },
1143 List | FixedSizeList | LargeList => {
1144 let a = dtype.to_storage();
1145 if let ArrowDataType::List(inner) = a {
1146 get_encodings_recursive(&inner.dtype, encodings)
1147 } else if let ArrowDataType::LargeList(inner) = a {
1148 get_encodings_recursive(&inner.dtype, encodings)
1149 } else if let ArrowDataType::FixedSizeList(inner, _) = a {
1150 get_encodings_recursive(&inner.dtype, encodings)
1151 } else {
1152 unreachable!()
1153 }
1154 },
1155 Struct => {
1156 if let ArrowDataType::Struct(fields) = dtype.to_storage() {
1157 if fields.is_empty() {
1158 encodings.push(Encoding::Rle)
1160 }
1161
1162 for field in fields {
1163 get_encodings_recursive(&field.dtype, encodings)
1164 }
1165 } else {
1166 unreachable!()
1167 }
1168 },
1169 Map => {
1170 if let ArrowDataType::Map(field, _) = dtype.to_storage() {
1171 if let ArrowDataType::Struct(fields) = field.dtype.to_storage() {
1172 for field in fields {
1173 get_encodings_recursive(&field.dtype, encodings)
1174 }
1175 } else {
1176 unreachable!()
1177 }
1178 } else {
1179 unreachable!()
1180 }
1181 },
1182 Union => todo!(),
1183 }
1184}
1185
1186pub fn get_dtype_encoding(dtype: &ArrowDataType) -> Vec<Encoding> {
1191 let mut encodings = vec![];
1192 get_encodings_recursive(dtype, &mut encodings);
1193 encodings
1194}
1195
1196fn get_primitive_dtype_encoding(dtype: &ArrowDataType) -> Encoding {
1197 match dtype.to_physical_type() {
1198 PhysicalType::Dictionary(_)
1199 | PhysicalType::LargeBinary
1200 | PhysicalType::LargeUtf8
1201 | PhysicalType::Utf8View
1202 | PhysicalType::BinaryView
1203 | PhysicalType::Primitive(_) => Encoding::RleDictionary,
1204 _ => Encoding::Plain,
1206 }
1207}