1use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::record_reader::buffer::ValuesBuffer;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{Encoding, Type};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::errors::{ParquetError, Result};
28use crate::schema::types::ColumnDescPtr;
29use arrow_array::{
30 ArrayRef, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
31 FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray,
32};
33use arrow_buffer::{Buffer, IntervalDayTime, i256};
34use arrow_data::ArrayDataBuilder;
35use arrow_schema::{DataType as ArrowType, IntervalUnit};
36use bytes::Bytes;
37use half::f16;
38use std::any::Any;
39use std::ops::Range;
40use std::sync::Arc;
41
42pub fn make_fixed_len_byte_array_reader(
47 pages: Box<dyn PageIterator>,
48 column_desc: ColumnDescPtr,
49 arrow_type: Option<ArrowType>,
50 batch_size: usize,
51) -> Result<Box<dyn ArrayReader>> {
52 let data_type = match arrow_type {
54 Some(t) => t,
55 None => parquet_to_arrow_field(column_desc.as_ref())?
56 .data_type()
57 .clone(),
58 };
59
60 let byte_length = match column_desc.physical_type() {
61 Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
62 t => {
63 return Err(general_err!(
64 "invalid physical type for fixed length byte array reader - {}",
65 t
66 ));
67 }
68 };
69 match &data_type {
70 ArrowType::FixedSizeBinary(_) => {}
71 ArrowType::Decimal32(_, _) => {
72 if byte_length > 4 {
73 return Err(general_err!(
74 "decimal 32 type too large, must be less then 4 bytes, got {}",
75 byte_length
76 ));
77 }
78 }
79 ArrowType::Decimal64(_, _) => {
80 if byte_length > 8 {
81 return Err(general_err!(
82 "decimal 64 type too large, must be less then 8 bytes, got {}",
83 byte_length
84 ));
85 }
86 }
87 ArrowType::Decimal128(_, _) => {
88 if byte_length > 16 {
89 return Err(general_err!(
90 "decimal 128 type too large, must be less than 16 bytes, got {}",
91 byte_length
92 ));
93 }
94 }
95 ArrowType::Decimal256(_, _) => {
96 if byte_length > 32 {
97 return Err(general_err!(
98 "decimal 256 type too large, must be less than 32 bytes, got {}",
99 byte_length
100 ));
101 }
102 }
103 ArrowType::Interval(_) => {
104 if byte_length != 12 {
105 return Err(general_err!(
107 "interval type must consist of 12 bytes got {}",
108 byte_length
109 ));
110 }
111 }
112 ArrowType::Float16 => {
113 if byte_length != 2 {
114 return Err(general_err!(
115 "float 16 type must be 2 bytes, got {}",
116 byte_length
117 ));
118 }
119 }
120 _ => {
121 return Err(general_err!(
122 "invalid data type for fixed length byte array reader - {}",
123 data_type
124 ));
125 }
126 }
127
128 Ok(Box::new(FixedLenByteArrayReader::new(
129 pages,
130 column_desc,
131 data_type,
132 byte_length,
133 batch_size,
134 )))
135}
136
137struct FixedLenByteArrayReader {
138 data_type: ArrowType,
139 byte_length: usize,
140 pages: Box<dyn PageIterator>,
141 def_levels_buffer: Option<Vec<i16>>,
142 rep_levels_buffer: Option<Vec<i16>>,
143 record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
144}
145
146impl FixedLenByteArrayReader {
147 fn new(
148 pages: Box<dyn PageIterator>,
149 column_desc: ColumnDescPtr,
150 data_type: ArrowType,
151 byte_length: usize,
152 batch_size: usize,
153 ) -> Self {
154 let record_reader = GenericRecordReader::new(column_desc, batch_size);
155 Self {
156 data_type,
157 byte_length,
158 pages,
159 def_levels_buffer: None,
160 rep_levels_buffer: None,
161 record_reader,
162 }
163 }
164}
165
166impl ArrayReader for FixedLenByteArrayReader {
167 fn as_any(&self) -> &dyn Any {
168 self
169 }
170
171 fn get_data_type(&self) -> &ArrowType {
172 &self.data_type
173 }
174
175 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
176 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
177 }
178
179 fn consume_batch(&mut self) -> Result<ArrayRef> {
180 let record_data = self.record_reader.consume_record_data();
181
182 let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
183 .len(self.record_reader.num_values())
184 .add_buffer(Buffer::from_vec(record_data.buffer))
185 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
186
187 let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
188
189 let array: ArrayRef = match &self.data_type {
194 ArrowType::Decimal32(p, s) => {
195 let f = |b: &[u8]| i32::from_be_bytes(sign_extend_be(b));
196 Arc::new(Decimal32Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
197 as ArrayRef
198 }
199 ArrowType::Decimal64(p, s) => {
200 let f = |b: &[u8]| i64::from_be_bytes(sign_extend_be(b));
201 Arc::new(Decimal64Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
202 as ArrayRef
203 }
204 ArrowType::Decimal128(p, s) => {
205 let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
206 Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
207 as ArrayRef
208 }
209 ArrowType::Decimal256(p, s) => {
210 let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
211 Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
212 as ArrayRef
213 }
214 ArrowType::Interval(unit) => {
215 match unit {
218 IntervalUnit::YearMonth => {
219 let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
220 Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
221 }
222 IntervalUnit::DayTime => {
223 let f = |b: &[u8]| {
224 IntervalDayTime::new(
225 i32::from_le_bytes(b[4..8].try_into().unwrap()),
226 i32::from_le_bytes(b[8..12].try_into().unwrap()),
227 )
228 };
229 Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
230 }
231 IntervalUnit::MonthDayNano => {
232 return Err(nyi_err!("MonthDayNano intervals not supported"));
233 }
234 }
235 }
236 ArrowType::Float16 => {
237 let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
238 Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
239 }
240 _ => Arc::new(binary) as ArrayRef,
241 };
242
243 self.def_levels_buffer = self.record_reader.consume_def_levels();
244 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
245 self.record_reader.reset();
246
247 Ok(array)
248 }
249
250 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
251 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
252 }
253
254 fn get_def_levels(&self) -> Option<&[i16]> {
255 self.def_levels_buffer.as_deref()
256 }
257
258 fn get_rep_levels(&self) -> Option<&[i16]> {
259 self.rep_levels_buffer.as_deref()
260 }
261}
262
263#[derive(Default)]
264struct FixedLenByteArrayBuffer {
265 buffer: Vec<u8>,
266 byte_length: Option<usize>,
268 values_capacity: Option<usize>,
271}
272
273#[inline]
274fn move_values<F>(
275 buffer: &mut Vec<u8>,
276 byte_length: usize,
277 values_range: Range<usize>,
278 valid_mask: &[u8],
279 mut op: F,
280) where
281 F: FnMut(&mut Vec<u8>, usize, usize, usize),
282{
283 for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
284 debug_assert!(level_pos >= value_pos);
285 if level_pos <= value_pos {
286 break;
287 }
288
289 let level_pos_bytes = level_pos * byte_length;
290 let value_pos_bytes = value_pos * byte_length;
291
292 op(buffer, level_pos_bytes, value_pos_bytes, byte_length)
293 }
294}
295
296impl ValuesBuffer for FixedLenByteArrayBuffer {
297 fn with_capacity(capacity: usize) -> Self {
298 Self {
301 buffer: Vec::new(),
302 byte_length: None,
303 values_capacity: Some(capacity),
304 }
305 }
306
307 fn pad_nulls(
308 &mut self,
309 read_offset: usize,
310 values_read: usize,
311 levels_read: usize,
312 valid_mask: &[u8],
313 ) {
314 let byte_length = self.byte_length.unwrap_or_default();
315
316 assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
317 self.buffer
318 .resize((read_offset + levels_read) * byte_length, 0);
319
320 let values_range = read_offset..read_offset + values_read;
321 const VEC_CUTOFF: usize = 4;
326 if byte_length > VEC_CUTOFF {
327 let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
328 let split = buffer.split_at_mut(level_pos_bytes);
329 let dst = &mut split.1[..byte_length];
330 let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length];
331 dst.copy_from_slice(src);
332 };
333 move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
334 } else {
335 let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
336 for i in 0..byte_length {
337 buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i]
338 }
339 };
340 move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
341 }
342 }
343}
344
345struct ValueDecoder {
346 byte_length: usize,
347 dict_page: Option<Bytes>,
348 decoder: Option<Decoder>,
349}
350
351impl ColumnValueDecoder for ValueDecoder {
352 type Buffer = FixedLenByteArrayBuffer;
353
354 fn new(col: &ColumnDescPtr) -> Self {
355 Self {
356 byte_length: col.type_length() as usize,
357 dict_page: None,
358 decoder: None,
359 }
360 }
361
362 fn set_dict(
363 &mut self,
364 buf: Bytes,
365 num_values: u32,
366 encoding: Encoding,
367 _is_sorted: bool,
368 ) -> Result<()> {
369 if !matches!(
370 encoding,
371 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
372 ) {
373 return Err(nyi_err!(
374 "Invalid/Unsupported encoding type for dictionary: {}",
375 encoding
376 ));
377 }
378 let expected_len = num_values as usize * self.byte_length;
379 if expected_len > buf.len() {
380 return Err(general_err!(
381 "too few bytes in dictionary page, expected {} got {}",
382 expected_len,
383 buf.len()
384 ));
385 }
386
387 self.dict_page = Some(buf);
388 Ok(())
389 }
390
391 fn set_data(
392 &mut self,
393 encoding: Encoding,
394 data: Bytes,
395 num_levels: usize,
396 num_values: Option<usize>,
397 ) -> Result<()> {
398 self.decoder = Some(match encoding {
399 Encoding::PLAIN => Decoder::Plain {
400 buf: data,
401 offset: 0,
402 },
403 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
404 decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
405 },
406 Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
407 decoder: DeltaByteArrayDecoder::new(data)?,
408 },
409 Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
410 buf: data,
411 offset: 0,
412 },
413 _ => {
414 return Err(general_err!(
415 "unsupported encoding for fixed length byte array: {}",
416 encoding
417 ));
418 }
419 });
420 Ok(())
421 }
422
423 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
424 match out.byte_length {
425 Some(x) => assert_eq!(x, self.byte_length),
426 None => {
427 out.byte_length = Some(self.byte_length);
428 if out.buffer.is_empty() {
431 if let Some(values_capacity) = out.values_capacity.take() {
432 let byte_capacity = values_capacity.saturating_mul(self.byte_length);
435 out.buffer = Vec::with_capacity(byte_capacity);
436 }
437 }
438 }
439 }
440
441 match self.decoder.as_mut().unwrap() {
442 Decoder::Plain { offset, buf } => {
443 let to_read =
444 (num_values * self.byte_length).min(buf.len() - *offset) / self.byte_length;
445 let end_offset = *offset + to_read * self.byte_length;
446 out.buffer
447 .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
448 *offset = end_offset;
449 Ok(to_read)
450 }
451 Decoder::Dict { decoder } => {
452 let dict = self.dict_page.as_ref().unwrap();
453 if dict.is_empty() {
455 return Ok(0);
456 }
457
458 decoder.read(num_values, |keys| {
459 out.buffer.reserve(keys.len() * self.byte_length);
460 for key in keys {
461 let offset = *key as usize * self.byte_length;
462 let val = &dict.as_ref()[offset..offset + self.byte_length];
463 out.buffer.extend_from_slice(val);
464 }
465 Ok(())
466 })
467 }
468 Decoder::Delta { decoder } => {
469 let to_read = num_values.min(decoder.remaining());
470 out.buffer.reserve(to_read * self.byte_length);
471
472 decoder.read(to_read, |slice| {
473 if slice.len() != self.byte_length {
474 return Err(general_err!(
475 "encountered array with incorrect length, got {} expected {}",
476 slice.len(),
477 self.byte_length
478 ));
479 }
480 out.buffer.extend_from_slice(slice);
481 Ok(())
482 })
483 }
484 Decoder::ByteStreamSplit { buf, offset } => {
485 let total_values = buf.len() / self.byte_length;
489 let to_read = num_values.min(total_values - *offset);
490
491 read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);
493
494 *offset += to_read;
495 Ok(to_read)
496 }
497 }
498 }
499
500 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
501 match self.decoder.as_mut().unwrap() {
502 Decoder::Plain { offset, buf } => {
503 let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
504 *offset += to_read * self.byte_length;
505 Ok(to_read)
506 }
507 Decoder::Dict { decoder } => decoder.skip(num_values),
508 Decoder::Delta { decoder } => decoder.skip(num_values),
509 Decoder::ByteStreamSplit { offset, buf } => {
510 let total_values = buf.len() / self.byte_length;
511 let to_read = num_values.min(total_values - *offset);
512 *offset += to_read;
513 Ok(to_read)
514 }
515 }
516 }
517}
518
519fn read_byte_stream_split(
525 dst: &mut Vec<u8>,
526 src: &mut Bytes,
527 offset: usize,
528 num_values: usize,
529 data_width: usize,
530) {
531 let stride = src.len() / data_width;
532 let idx = dst.len();
533 dst.resize(idx + num_values * data_width, 0u8);
534 let dst_slc = &mut dst[idx..idx + num_values * data_width];
535 for j in 0..data_width {
536 let src_slc = &src[offset + j * stride..offset + j * stride + num_values];
537 for i in 0..num_values {
538 dst_slc[i * data_width + j] = src_slc[i];
539 }
540 }
541}
542
543enum Decoder {
544 Plain { buf: Bytes, offset: usize },
545 Dict { decoder: DictIndexDecoder },
546 Delta { decoder: DeltaByteArrayDecoder },
547 ByteStreamSplit { buf: Bytes, offset: usize },
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use crate::arrow::ArrowWriter;
554 use crate::arrow::arrow_reader::ParquetRecordBatchReader;
555 use arrow::datatypes::Field;
556 use arrow::error::Result as ArrowResult;
557 use arrow_array::{Array, ListArray};
558 use arrow_array::{Decimal256Array, RecordBatch};
559 use bytes::Bytes;
560 use std::sync::Arc;
561
562 #[test]
563 fn test_decimal_list() {
564 let decimals = Decimal256Array::from_iter_values(
565 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
566 );
567
568 let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new_list_field(
570 decimals.data_type().clone(),
571 false,
572 ))))
573 .len(7)
574 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
575 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
576 .child_data(vec![decimals.into_data()])
577 .build()
578 .unwrap();
579
580 let written =
581 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
582 .unwrap();
583
584 let mut buffer = Vec::with_capacity(1024);
585 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
586 writer.write(&written).unwrap();
587 writer.close().unwrap();
588
589 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
590 .unwrap()
591 .collect::<ArrowResult<Vec<_>>>()
592 .unwrap();
593
594 assert_eq!(&written.slice(0, 3), &read[0]);
595 assert_eq!(&written.slice(3, 3), &read[1]);
596 assert_eq!(&written.slice(6, 1), &read[2]);
597 }
598}