1mod binary;
16mod boolean;
17mod dictionary;
18mod file;
19mod fixed_len_bytes;
20mod nested;
21mod pages;
22mod primitive;
23mod row_group;
24mod schema;
25mod sink;
26mod utf8;
27mod utils;
28
29use crate::array::*;
30use crate::datatypes::*;
31use crate::error::{Error, Result};
32use crate::types::days_ms;
33use crate::types::i256;
34use crate::types::NativeType;
35
36pub use nested::{num_values, write_rep_and_def};
37pub use pages::{to_leaves, to_nested, to_parquet_leaves};
38use parquet2::schema::types::PrimitiveType as ParquetPrimitiveType;
39pub use parquet2::{
40 compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel},
41 encoding::Encoding,
42 fallible_streaming_iterator,
43 metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData},
44 page::{CompressedDataPage, CompressedPage, Page},
45 schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType},
46 write::{
47 compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
48 Version,
49 },
50 FallibleStreamingIterator,
51};
52pub use utils::write_def_levels;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct WriteOptions {
57 pub write_statistics: bool,
59 pub version: Version,
61 pub compression: CompressionOptions,
63 pub data_pagesize_limit: Option<usize>,
65}
66
67use crate::compute::aggregate::estimated_bytes_size;
68pub use file::FileWriter;
69pub use row_group::{row_group_iter, RowGroupIterator};
70pub use schema::to_parquet_type;
71pub use sink::FileSink;
72
73pub use pages::array_to_columns;
74pub use pages::Nested;
75
76pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
78 let mut out = (0, 0);
81 for nested in nested.iter().rev() {
82 match nested {
83 Nested::LargeList(l_nested) => {
84 let start = *l_nested.offsets.first();
85 let end = *l_nested.offsets.last();
86 return (start as usize, (end - start) as usize);
87 }
88 Nested::List(l_nested) => {
89 let start = *l_nested.offsets.first();
90 let end = *l_nested.offsets.last();
91 return (start as usize, (end - start) as usize);
92 }
93 Nested::Primitive(_, _, len) => out = (0, *len),
94 _ => {}
95 }
96 }
97 out
98}
99
100pub(self) fn decimal_length_from_precision(precision: usize) -> usize {
101 (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
109}
110
111pub fn to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
113 let parquet_types = schema
114 .fields
115 .iter()
116 .map(to_parquet_type)
117 .collect::<Result<Vec<_>>>()?;
118 Ok(SchemaDescriptor::new("root".to_string(), parquet_types))
119}
120
121pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
125 if let (Encoding::DeltaBinaryPacked, DataType::Decimal(p, _)) =
126 (encoding, data_type.to_logical_type())
127 {
128 return *p <= 18;
129 };
130
131 matches!(
132 (encoding, data_type.to_logical_type()),
133 (Encoding::Plain, _)
134 | (
135 Encoding::DeltaLengthByteArray,
136 DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8,
137 )
138 | (Encoding::RleDictionary, DataType::Dictionary(_, _, _))
139 | (Encoding::PlainDictionary, DataType::Dictionary(_, _, _))
140 | (
141 Encoding::DeltaBinaryPacked,
142 DataType::Null
143 | DataType::UInt8
144 | DataType::UInt16
145 | DataType::UInt32
146 | DataType::UInt64
147 | DataType::Int8
148 | DataType::Int16
149 | DataType::Int32
150 | DataType::Date32
151 | DataType::Time32(_)
152 | DataType::Int64
153 | DataType::Date64
154 | DataType::Time64(_)
155 | DataType::Timestamp(_, _)
156 | DataType::Duration(_)
157 )
158 )
159}
160
161pub fn slice_parquet_array(
163 primitive_array: &mut dyn Array,
164 nested: &mut [Nested],
165 mut current_offset: usize,
166 mut current_length: usize,
167) {
168 for nested in nested.iter_mut() {
169 match nested {
170 Nested::LargeList(l_nested) => {
171 l_nested.offsets.slice(current_offset, current_length + 1);
172 if let Some(validity) = l_nested.validity.as_mut() {
173 validity.slice(current_offset, current_length)
174 };
175
176 current_length = l_nested.offsets.range() as usize;
177 current_offset = *l_nested.offsets.first() as usize;
178 }
179 Nested::List(l_nested) => {
180 l_nested.offsets.slice(current_offset, current_length + 1);
181 if let Some(validity) = l_nested.validity.as_mut() {
182 validity.slice(current_offset, current_length)
183 };
184
185 current_length = l_nested.offsets.range() as usize;
186 current_offset = *l_nested.offsets.first() as usize;
187 }
188 Nested::Struct(validity, _, length) => {
189 *length = current_length;
190 if let Some(validity) = validity.as_mut() {
191 validity.slice(current_offset, current_length)
192 };
193 }
194 Nested::Primitive(validity, _, length) => {
195 *length = current_length;
196 if let Some(validity) = validity.as_mut() {
197 validity.slice(current_offset, current_length)
198 };
199 primitive_array.slice(current_offset, current_length);
200 }
201 }
202 }
203}
204
205pub fn get_max_length(nested: &[Nested]) -> usize {
207 let mut length = 0;
208 for nested in nested.iter() {
209 match nested {
210 Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
211 Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
212 _ => {}
213 }
214 }
215 length
216}
217
218pub fn array_to_pages(
220 primitive_array: &dyn Array,
221 type_: ParquetPrimitiveType,
222 nested: &[Nested],
223 options: WriteOptions,
224 encoding: Encoding,
225) -> Result<DynIter<'static, Result<Page>>> {
226 if let DataType::Dictionary(key_type, _, _) = primitive_array.data_type().to_logical_type() {
227 return match_integer_type!(key_type, |$T| {
228 dictionary::array_to_pages::<$T>(
229 primitive_array.as_any().downcast_ref().unwrap(),
230 type_,
231 &nested,
232 options,
233 encoding,
234 )
235 });
236 };
237
238 let nested = nested.to_vec();
239 let primitive_array = primitive_array.to_boxed();
240
241 let number_of_rows = nested[0].len();
242
243 let byte_size = estimated_bytes_size(primitive_array.as_ref());
246
247 const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
248 let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
249 let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); let bytes_per_row = if number_of_rows == 0 {
251 0
252 } else {
253 ((byte_size as f64) / (number_of_rows as f64)) as usize
254 };
255 let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
256
257 let pages = (0..number_of_rows)
258 .step_by(rows_per_page)
259 .map(move |offset| {
260 let length = if offset + rows_per_page > number_of_rows {
261 number_of_rows - offset
262 } else {
263 rows_per_page
264 };
265
266 let mut right_array = primitive_array.clone();
267 let mut right_nested = nested.clone();
268 slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
269
270 array_to_page(
271 right_array.as_ref(),
272 type_.clone(),
273 &right_nested,
274 options,
275 encoding,
276 )
277 });
278
279 Ok(DynIter::new(pages))
280}
281
282pub fn array_to_page(
284 array: &dyn Array,
285 type_: ParquetPrimitiveType,
286 nested: &[Nested],
287 options: WriteOptions,
288 encoding: Encoding,
289) -> Result<Page> {
290 if nested.len() == 1 {
291 return array_to_page_simple(array, type_, options, encoding);
293 }
294 array_to_page_nested(array, type_, nested, options, encoding)
295}
296
297pub fn array_to_page_simple(
299 array: &dyn Array,
300 type_: ParquetPrimitiveType,
301 options: WriteOptions,
302 encoding: Encoding,
303) -> Result<Page> {
304 let data_type = array.data_type();
305 if !can_encode(data_type, encoding) {
306 return Err(Error::InvalidArgumentError(format!(
307 "The datatype {data_type:?} cannot be encoded by {encoding:?}"
308 )));
309 }
310
311 match data_type.to_logical_type() {
312 DataType::Boolean => {
313 boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, type_)
314 }
315 DataType::UInt8 => primitive::array_to_page_integer::<u8, i32>(
317 array.as_any().downcast_ref().unwrap(),
318 options,
319 type_,
320 encoding,
321 ),
322 DataType::UInt16 => primitive::array_to_page_integer::<u16, i32>(
323 array.as_any().downcast_ref().unwrap(),
324 options,
325 type_,
326 encoding,
327 ),
328 DataType::UInt32 => primitive::array_to_page_integer::<u32, i32>(
329 array.as_any().downcast_ref().unwrap(),
330 options,
331 type_,
332 encoding,
333 ),
334 DataType::UInt64 => primitive::array_to_page_integer::<u64, i64>(
335 array.as_any().downcast_ref().unwrap(),
336 options,
337 type_,
338 encoding,
339 ),
340 DataType::Int8 => primitive::array_to_page_integer::<i8, i32>(
341 array.as_any().downcast_ref().unwrap(),
342 options,
343 type_,
344 encoding,
345 ),
346 DataType::Int16 => primitive::array_to_page_integer::<i16, i32>(
347 array.as_any().downcast_ref().unwrap(),
348 options,
349 type_,
350 encoding,
351 ),
352 DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
353 primitive::array_to_page_integer::<i32, i32>(
354 array.as_any().downcast_ref().unwrap(),
355 options,
356 type_,
357 encoding,
358 )
359 }
360 DataType::Int64
361 | DataType::Date64
362 | DataType::Time64(_)
363 | DataType::Timestamp(_, _)
364 | DataType::Duration(_) => primitive::array_to_page_integer::<i64, i64>(
365 array.as_any().downcast_ref().unwrap(),
366 options,
367 type_,
368 encoding,
369 ),
370 DataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
371 array.as_any().downcast_ref().unwrap(),
372 options,
373 type_,
374 ),
375 DataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
376 array.as_any().downcast_ref().unwrap(),
377 options,
378 type_,
379 ),
380 DataType::Utf8 => utf8::array_to_page::<i32>(
381 array.as_any().downcast_ref().unwrap(),
382 options,
383 type_,
384 encoding,
385 ),
386 DataType::LargeUtf8 => utf8::array_to_page::<i64>(
387 array.as_any().downcast_ref().unwrap(),
388 options,
389 type_,
390 encoding,
391 ),
392 DataType::Binary => binary::array_to_page::<i32>(
393 array.as_any().downcast_ref().unwrap(),
394 options,
395 type_,
396 encoding,
397 ),
398 DataType::LargeBinary => binary::array_to_page::<i64>(
399 array.as_any().downcast_ref().unwrap(),
400 options,
401 type_,
402 encoding,
403 ),
404 DataType::Null => {
405 let array = Int32Array::new_null(DataType::Int32, array.len());
406 primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
407 }
408 DataType::Interval(IntervalUnit::YearMonth) => {
409 let type_ = type_;
410 let array = array
411 .as_any()
412 .downcast_ref::<PrimitiveArray<i32>>()
413 .unwrap();
414 let mut values = Vec::<u8>::with_capacity(12 * array.len());
415 array.values().iter().for_each(|x| {
416 let bytes = &x.to_le_bytes();
417 values.extend_from_slice(bytes);
418 values.extend_from_slice(&[0; 8]);
419 });
420 let array = FixedSizeBinaryArray::new(
421 DataType::FixedSizeBinary(12),
422 values.into(),
423 array.validity().cloned(),
424 );
425 let statistics = if options.write_statistics {
426 Some(fixed_len_bytes::build_statistics(&array, type_.clone()))
427 } else {
428 None
429 };
430 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
431 }
432 DataType::Interval(IntervalUnit::DayTime) => {
433 let type_ = type_;
434 let array = array
435 .as_any()
436 .downcast_ref::<PrimitiveArray<days_ms>>()
437 .unwrap();
438 let mut values = Vec::<u8>::with_capacity(12 * array.len());
439 array.values().iter().for_each(|x| {
440 let bytes = &x.to_le_bytes();
441 values.extend_from_slice(&[0; 4]); values.extend_from_slice(bytes); });
444 let array = FixedSizeBinaryArray::new(
445 DataType::FixedSizeBinary(12),
446 values.into(),
447 array.validity().cloned(),
448 );
449 let statistics = if options.write_statistics {
450 Some(fixed_len_bytes::build_statistics(&array, type_.clone()))
451 } else {
452 None
453 };
454 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
455 }
456 DataType::FixedSizeBinary(_) => {
457 let type_ = type_;
458 let array = array.as_any().downcast_ref().unwrap();
459 let statistics = if options.write_statistics {
460 Some(fixed_len_bytes::build_statistics(array, type_.clone()))
461 } else {
462 None
463 };
464
465 fixed_len_bytes::array_to_page(array, options, type_, statistics)
466 }
467 DataType::Decimal256(precision, _) => {
468 let type_ = type_;
469 let precision = *precision;
470 let array = array
471 .as_any()
472 .downcast_ref::<PrimitiveArray<i256>>()
473 .unwrap();
474 if precision <= 9 {
475 let values = array
476 .values()
477 .iter()
478 .map(|x| x.0.as_i32())
479 .collect::<Vec<_>>()
480 .into();
481
482 let array =
483 PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
484 primitive::array_to_page_integer::<i32, i32>(&array, options, type_, encoding)
485 } else if precision <= 18 {
486 let values = array
487 .values()
488 .iter()
489 .map(|x| x.0.as_i64())
490 .collect::<Vec<_>>()
491 .into();
492
493 let array =
494 PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
495 primitive::array_to_page_integer::<i64, i64>(&array, options, type_, encoding)
496 } else if precision <= 38 {
497 let size = decimal_length_from_precision(precision);
498 let statistics = if options.write_statistics {
499 let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
500 array,
501 type_.clone(),
502 size,
503 );
504 Some(stats)
505 } else {
506 None
507 };
508
509 let mut values = Vec::<u8>::with_capacity(size * array.len());
510 array.values().iter().for_each(|x| {
511 let bytes = &x.0.low().to_be_bytes()[16 - size..];
512 values.extend_from_slice(bytes)
513 });
514 let array = FixedSizeBinaryArray::new(
515 DataType::FixedSizeBinary(size),
516 values.into(),
517 array.validity().cloned(),
518 );
519 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
520 } else {
521 let size = 32;
522 let array = array
523 .as_any()
524 .downcast_ref::<PrimitiveArray<i256>>()
525 .unwrap();
526 let statistics = if options.write_statistics {
527 let stats =
528 fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size);
529 Some(stats)
530 } else {
531 None
532 };
533 let mut values = Vec::<u8>::with_capacity(size * array.len());
534 array.values().iter().for_each(|x| {
535 let bytes = &x.to_be_bytes();
536 values.extend_from_slice(bytes)
537 });
538 let array = FixedSizeBinaryArray::new(
539 DataType::FixedSizeBinary(size),
540 values.into(),
541 array.validity().cloned(),
542 );
543
544 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
545 }
546 }
547 DataType::Decimal(precision, _) => {
548 let type_ = type_;
549 let precision = *precision;
550 let array = array
551 .as_any()
552 .downcast_ref::<PrimitiveArray<i128>>()
553 .unwrap();
554 if precision <= 9 {
555 let values = array
556 .values()
557 .iter()
558 .map(|x| *x as i32)
559 .collect::<Vec<_>>()
560 .into();
561
562 let array =
563 PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
564 primitive::array_to_page_integer::<i32, i32>(&array, options, type_, encoding)
565 } else if precision <= 18 {
566 let values = array
567 .values()
568 .iter()
569 .map(|x| *x as i64)
570 .collect::<Vec<_>>()
571 .into();
572
573 let array =
574 PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
575 primitive::array_to_page_integer::<i64, i64>(&array, options, type_, encoding)
576 } else {
577 let size = decimal_length_from_precision(precision);
578
579 let statistics = if options.write_statistics {
580 let stats =
581 fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size);
582 Some(stats)
583 } else {
584 None
585 };
586
587 let mut values = Vec::<u8>::with_capacity(size * array.len());
588 array.values().iter().for_each(|x| {
589 let bytes = &x.to_be_bytes()[16 - size..];
590 values.extend_from_slice(bytes)
591 });
592 let array = FixedSizeBinaryArray::new(
593 DataType::FixedSizeBinary(size),
594 values.into(),
595 array.validity().cloned(),
596 );
597 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
598 }
599 }
600 other => Err(Error::NotYetImplemented(format!(
601 "Writing parquet pages for data type {other:?}"
602 ))),
603 }
604 .map(Page::Data)
605}
606
607fn array_to_page_nested(
608 array: &dyn Array,
609 type_: ParquetPrimitiveType,
610 nested: &[Nested],
611 options: WriteOptions,
612 _encoding: Encoding,
613) -> Result<Page> {
614 use DataType::*;
615 match array.data_type().to_logical_type() {
616 Null => {
617 let array = Int32Array::new_null(DataType::Int32, array.len());
618 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
619 }
620 Boolean => {
621 let array = array.as_any().downcast_ref().unwrap();
622 boolean::nested_array_to_page(array, options, type_, nested)
623 }
624 Utf8 => {
625 let array = array.as_any().downcast_ref().unwrap();
626 utf8::nested_array_to_page::<i32>(array, options, type_, nested)
627 }
628 LargeUtf8 => {
629 let array = array.as_any().downcast_ref().unwrap();
630 utf8::nested_array_to_page::<i64>(array, options, type_, nested)
631 }
632 Binary => {
633 let array = array.as_any().downcast_ref().unwrap();
634 binary::nested_array_to_page::<i32>(array, options, type_, nested)
635 }
636 LargeBinary => {
637 let array = array.as_any().downcast_ref().unwrap();
638 binary::nested_array_to_page::<i64>(array, options, type_, nested)
639 }
640 UInt8 => {
641 let array = array.as_any().downcast_ref().unwrap();
642 primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
643 }
644 UInt16 => {
645 let array = array.as_any().downcast_ref().unwrap();
646 primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
647 }
648 UInt32 => {
649 let array = array.as_any().downcast_ref().unwrap();
650 primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
651 }
652 UInt64 => {
653 let array = array.as_any().downcast_ref().unwrap();
654 primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
655 }
656 Int8 => {
657 let array = array.as_any().downcast_ref().unwrap();
658 primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
659 }
660 Int16 => {
661 let array = array.as_any().downcast_ref().unwrap();
662 primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
663 }
664 Int32 | Date32 | Time32(_) => {
665 let array = array.as_any().downcast_ref().unwrap();
666 primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
667 }
668 Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
669 let array = array.as_any().downcast_ref().unwrap();
670 primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
671 }
672 Float32 => {
673 let array = array.as_any().downcast_ref().unwrap();
674 primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
675 }
676 Float64 => {
677 let array = array.as_any().downcast_ref().unwrap();
678 primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
679 }
680 Decimal(precision, _) => {
681 let type_ = type_;
682 let precision = *precision;
683 let array = array
684 .as_any()
685 .downcast_ref::<PrimitiveArray<i128>>()
686 .unwrap();
687 if precision <= 9 {
688 let values = array
689 .values()
690 .iter()
691 .map(|x| *x as i32)
692 .collect::<Vec<_>>()
693 .into();
694
695 let array =
696 PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
697 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
698 } else if precision <= 18 {
699 let values = array
700 .values()
701 .iter()
702 .map(|x| *x as i64)
703 .collect::<Vec<_>>()
704 .into();
705
706 let array =
707 PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
708 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
709 } else {
710 let size = decimal_length_from_precision(precision);
711
712 let statistics = if options.write_statistics {
713 let stats =
714 fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size);
715 Some(stats)
716 } else {
717 None
718 };
719
720 let mut values = Vec::<u8>::with_capacity(size * array.len());
721 array.values().iter().for_each(|x| {
722 let bytes = &x.to_be_bytes()[16 - size..];
723 values.extend_from_slice(bytes)
724 });
725 let array = FixedSizeBinaryArray::new(
726 DataType::FixedSizeBinary(size),
727 values.into(),
728 array.validity().cloned(),
729 );
730 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
731 }
732 }
733 Decimal256(precision, _) => {
734 let type_ = type_;
735 let precision = *precision;
736 let array = array
737 .as_any()
738 .downcast_ref::<PrimitiveArray<i256>>()
739 .unwrap();
740 if precision <= 9 {
741 let values = array
742 .values()
743 .iter()
744 .map(|x| x.0.as_i32())
745 .collect::<Vec<_>>()
746 .into();
747
748 let array =
749 PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
750 primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
751 } else if precision <= 18 {
752 let values = array
753 .values()
754 .iter()
755 .map(|x| x.0.as_i64())
756 .collect::<Vec<_>>()
757 .into();
758
759 let array =
760 PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
761 primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
762 } else if precision <= 38 {
763 let size = decimal_length_from_precision(precision);
764 let statistics = if options.write_statistics {
765 let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
766 array,
767 type_.clone(),
768 size,
769 );
770 Some(stats)
771 } else {
772 None
773 };
774
775 let mut values = Vec::<u8>::with_capacity(size * array.len());
776 array.values().iter().for_each(|x| {
777 let bytes = &x.0.low().to_be_bytes()[16 - size..];
778 values.extend_from_slice(bytes)
779 });
780 let array = FixedSizeBinaryArray::new(
781 DataType::FixedSizeBinary(size),
782 values.into(),
783 array.validity().cloned(),
784 );
785 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
786 } else {
787 let size = 32;
788 let array = array
789 .as_any()
790 .downcast_ref::<PrimitiveArray<i256>>()
791 .unwrap();
792 let statistics = if options.write_statistics {
793 let stats =
794 fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size);
795 Some(stats)
796 } else {
797 None
798 };
799 let mut values = Vec::<u8>::with_capacity(size * array.len());
800 array.values().iter().for_each(|x| {
801 let bytes = &x.to_be_bytes();
802 values.extend_from_slice(bytes)
803 });
804 let array = FixedSizeBinaryArray::new(
805 DataType::FixedSizeBinary(size),
806 values.into(),
807 array.validity().cloned(),
808 );
809
810 fixed_len_bytes::array_to_page(&array, options, type_, statistics)
811 }
812 }
813 other => Err(Error::NotYetImplemented(format!(
814 "Writing nested parquet pages for data type {other:?}"
815 ))),
816 }
817 .map(Page::Data)
818}
819
820fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
821 data_type: &DataType,
822 map: F,
823 encodings: &mut Vec<T>,
824) {
825 use crate::datatypes::PhysicalType::*;
826 match data_type.to_physical_type() {
827 Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
828 | Dictionary(_) | LargeUtf8 => encodings.push(map(data_type)),
829 List | FixedSizeList | LargeList => {
830 let a = data_type.to_logical_type();
831 if let DataType::List(inner) = a {
832 transverse_recursive(&inner.data_type, map, encodings)
833 } else if let DataType::LargeList(inner) = a {
834 transverse_recursive(&inner.data_type, map, encodings)
835 } else if let DataType::FixedSizeList(inner, _) = a {
836 transverse_recursive(&inner.data_type, map, encodings)
837 } else {
838 unreachable!()
839 }
840 }
841 Struct => {
842 if let DataType::Struct(fields) = data_type.to_logical_type() {
843 for field in fields {
844 transverse_recursive(&field.data_type, map.clone(), encodings)
845 }
846 } else {
847 unreachable!()
848 }
849 }
850 Map => {
851 if let DataType::Map(field, _) = data_type.to_logical_type() {
852 if let DataType::Struct(fields) = field.data_type.to_logical_type() {
853 for field in fields {
854 transverse_recursive(&field.data_type, map.clone(), encodings)
855 }
856 } else {
857 unreachable!()
858 }
859 } else {
860 unreachable!()
861 }
862 }
863 Union => todo!(),
864 }
865}
866
867pub fn transverse<T, F: Fn(&DataType) -> T + Clone>(data_type: &DataType, map: F) -> Vec<T> {
884 let mut encodings = vec![];
885 transverse_recursive(data_type, map, &mut encodings);
886 encodings
887}