Skip to main content

parquet/arrow/array_reader/
fixed_len_byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::record_reader::buffer::ValuesBuffer;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{Encoding, Type};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::errors::{ParquetError, Result};
28use crate::schema::types::ColumnDescPtr;
29use arrow_array::{
30    ArrayRef, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
31    FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray,
32};
33use arrow_buffer::{Buffer, IntervalDayTime, i256};
34use arrow_data::ArrayDataBuilder;
35use arrow_schema::{DataType as ArrowType, IntervalUnit};
36use bytes::Bytes;
37use half::f16;
38use std::any::Any;
39use std::ops::Range;
40use std::sync::Arc;
41
42/// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column
43///
44/// `batch_size` is used to pre-allocate internal buffers,
45/// avoiding reallocations when reading the first batch of data.
46pub fn make_fixed_len_byte_array_reader(
47    pages: Box<dyn PageIterator>,
48    column_desc: ColumnDescPtr,
49    arrow_type: Option<ArrowType>,
50    batch_size: usize,
51) -> Result<Box<dyn ArrayReader>> {
52    // Check if Arrow type is specified, else create it from Parquet type
53    let data_type = match arrow_type {
54        Some(t) => t,
55        None => parquet_to_arrow_field(column_desc.as_ref())?
56            .data_type()
57            .clone(),
58    };
59
60    let byte_length = match column_desc.physical_type() {
61        Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
62        t => {
63            return Err(general_err!(
64                "invalid physical type for fixed length byte array reader - {}",
65                t
66            ));
67        }
68    };
69    match &data_type {
70        ArrowType::FixedSizeBinary(_) => {}
71        ArrowType::Decimal32(_, _) => {
72            if byte_length > 4 {
73                return Err(general_err!(
74                    "decimal 32 type too large, must be less then 4 bytes, got {}",
75                    byte_length
76                ));
77            }
78        }
79        ArrowType::Decimal64(_, _) => {
80            if byte_length > 8 {
81                return Err(general_err!(
82                    "decimal 64 type too large, must be less then 8 bytes, got {}",
83                    byte_length
84                ));
85            }
86        }
87        ArrowType::Decimal128(_, _) => {
88            if byte_length > 16 {
89                return Err(general_err!(
90                    "decimal 128 type too large, must be less than 16 bytes, got {}",
91                    byte_length
92                ));
93            }
94        }
95        ArrowType::Decimal256(_, _) => {
96            if byte_length > 32 {
97                return Err(general_err!(
98                    "decimal 256 type too large, must be less than 32 bytes, got {}",
99                    byte_length
100                ));
101            }
102        }
103        ArrowType::Interval(_) => {
104            if byte_length != 12 {
105                // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
106                return Err(general_err!(
107                    "interval type must consist of 12 bytes got {}",
108                    byte_length
109                ));
110            }
111        }
112        ArrowType::Float16 => {
113            if byte_length != 2 {
114                return Err(general_err!(
115                    "float 16 type must be 2 bytes, got {}",
116                    byte_length
117                ));
118            }
119        }
120        _ => {
121            return Err(general_err!(
122                "invalid data type for fixed length byte array reader - {}",
123                data_type
124            ));
125        }
126    }
127
128    Ok(Box::new(FixedLenByteArrayReader::new(
129        pages,
130        column_desc,
131        data_type,
132        byte_length,
133        batch_size,
134    )))
135}
136
137struct FixedLenByteArrayReader {
138    data_type: ArrowType,
139    byte_length: usize,
140    pages: Box<dyn PageIterator>,
141    def_levels_buffer: Option<Vec<i16>>,
142    rep_levels_buffer: Option<Vec<i16>>,
143    record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
144}
145
146impl FixedLenByteArrayReader {
147    fn new(
148        pages: Box<dyn PageIterator>,
149        column_desc: ColumnDescPtr,
150        data_type: ArrowType,
151        byte_length: usize,
152        batch_size: usize,
153    ) -> Self {
154        let record_reader = GenericRecordReader::new(column_desc, batch_size);
155        Self {
156            data_type,
157            byte_length,
158            pages,
159            def_levels_buffer: None,
160            rep_levels_buffer: None,
161            record_reader,
162        }
163    }
164}
165
166impl ArrayReader for FixedLenByteArrayReader {
167    fn as_any(&self) -> &dyn Any {
168        self
169    }
170
171    fn get_data_type(&self) -> &ArrowType {
172        &self.data_type
173    }
174
175    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
176        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
177    }
178
179    fn consume_batch(&mut self) -> Result<ArrayRef> {
180        let record_data = self.record_reader.consume_record_data();
181
182        let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
183            .len(self.record_reader.num_values())
184            .add_buffer(Buffer::from_vec(record_data.buffer))
185            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
186
187        let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
188
189        // TODO: An improvement might be to do this conversion on read
190        // Note the conversions below apply to all elements regardless of null slots as the
191        // conversion lambdas are all infallible. This improves performance by avoiding a branch in
192        // the inner loop (see docs for `PrimitiveArray::from_unary`).
193        let array: ArrayRef = match &self.data_type {
194            ArrowType::Decimal32(p, s) => {
195                let f = |b: &[u8]| i32::from_be_bytes(sign_extend_be(b));
196                Arc::new(Decimal32Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
197                    as ArrayRef
198            }
199            ArrowType::Decimal64(p, s) => {
200                let f = |b: &[u8]| i64::from_be_bytes(sign_extend_be(b));
201                Arc::new(Decimal64Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
202                    as ArrayRef
203            }
204            ArrowType::Decimal128(p, s) => {
205                let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
206                Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
207                    as ArrayRef
208            }
209            ArrowType::Decimal256(p, s) => {
210                let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
211                Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
212                    as ArrayRef
213            }
214            ArrowType::Interval(unit) => {
215                // An interval is stored as 3x 32-bit unsigned integers storing months, days,
216                // and milliseconds
217                match unit {
218                    IntervalUnit::YearMonth => {
219                        let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
220                        Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
221                    }
222                    IntervalUnit::DayTime => {
223                        let f = |b: &[u8]| {
224                            IntervalDayTime::new(
225                                i32::from_le_bytes(b[4..8].try_into().unwrap()),
226                                i32::from_le_bytes(b[8..12].try_into().unwrap()),
227                            )
228                        };
229                        Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
230                    }
231                    IntervalUnit::MonthDayNano => {
232                        return Err(nyi_err!("MonthDayNano intervals not supported"));
233                    }
234                }
235            }
236            ArrowType::Float16 => {
237                let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
238                Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
239            }
240            _ => Arc::new(binary) as ArrayRef,
241        };
242
243        self.def_levels_buffer = self.record_reader.consume_def_levels();
244        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
245        self.record_reader.reset();
246
247        Ok(array)
248    }
249
250    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
251        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
252    }
253
254    fn get_def_levels(&self) -> Option<&[i16]> {
255        self.def_levels_buffer.as_deref()
256    }
257
258    fn get_rep_levels(&self) -> Option<&[i16]> {
259        self.rep_levels_buffer.as_deref()
260    }
261}
262
263#[derive(Default)]
264struct FixedLenByteArrayBuffer {
265    buffer: Vec<u8>,
266    /// The length of each element in bytes
267    byte_length: Option<usize>,
268    /// Preserved value-count hint used to allocate `buffer` once `byte_length`
269    /// becomes known on the first decode.
270    values_capacity: Option<usize>,
271}
272
273#[inline]
274fn move_values<F>(
275    buffer: &mut Vec<u8>,
276    byte_length: usize,
277    values_range: Range<usize>,
278    valid_mask: &[u8],
279    mut op: F,
280) where
281    F: FnMut(&mut Vec<u8>, usize, usize, usize),
282{
283    for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
284        debug_assert!(level_pos >= value_pos);
285        if level_pos <= value_pos {
286            break;
287        }
288
289        let level_pos_bytes = level_pos * byte_length;
290        let value_pos_bytes = value_pos * byte_length;
291
292        op(buffer, level_pos_bytes, value_pos_bytes, byte_length)
293    }
294}
295
296impl ValuesBuffer for FixedLenByteArrayBuffer {
297    fn with_capacity(capacity: usize) -> Self {
298        // `byte_length` is not known initially, so preserve the value-count
299        // hint so the first decode can allocate the exact byte capacity.
300        Self {
301            buffer: Vec::new(),
302            byte_length: None,
303            values_capacity: Some(capacity),
304        }
305    }
306
307    fn pad_nulls(
308        &mut self,
309        read_offset: usize,
310        values_read: usize,
311        levels_read: usize,
312        valid_mask: &[u8],
313    ) {
314        let byte_length = self.byte_length.unwrap_or_default();
315
316        assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
317        self.buffer
318            .resize((read_offset + levels_read) * byte_length, 0);
319
320        let values_range = read_offset..read_offset + values_read;
321        // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4,
322        // the simple loop is preferred as the compiler can eliminate the loop via unrolling.
323        // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows
324        // the loop to be vectorized, yielding much better performance.
325        const VEC_CUTOFF: usize = 4;
326        if byte_length > VEC_CUTOFF {
327            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
328                let split = buffer.split_at_mut(level_pos_bytes);
329                let dst = &mut split.1[..byte_length];
330                let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length];
331                dst.copy_from_slice(src);
332            };
333            move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
334        } else {
335            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
336                for i in 0..byte_length {
337                    buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i]
338                }
339            };
340            move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
341        }
342    }
343}
344
345struct ValueDecoder {
346    byte_length: usize,
347    dict_page: Option<Bytes>,
348    decoder: Option<Decoder>,
349}
350
351impl ColumnValueDecoder for ValueDecoder {
352    type Buffer = FixedLenByteArrayBuffer;
353
354    fn new(col: &ColumnDescPtr) -> Self {
355        Self {
356            byte_length: col.type_length() as usize,
357            dict_page: None,
358            decoder: None,
359        }
360    }
361
362    fn set_dict(
363        &mut self,
364        buf: Bytes,
365        num_values: u32,
366        encoding: Encoding,
367        _is_sorted: bool,
368    ) -> Result<()> {
369        if !matches!(
370            encoding,
371            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
372        ) {
373            return Err(nyi_err!(
374                "Invalid/Unsupported encoding type for dictionary: {}",
375                encoding
376            ));
377        }
378        let expected_len = num_values as usize * self.byte_length;
379        if expected_len > buf.len() {
380            return Err(general_err!(
381                "too few bytes in dictionary page, expected {} got {}",
382                expected_len,
383                buf.len()
384            ));
385        }
386
387        self.dict_page = Some(buf);
388        Ok(())
389    }
390
391    fn set_data(
392        &mut self,
393        encoding: Encoding,
394        data: Bytes,
395        num_levels: usize,
396        num_values: Option<usize>,
397    ) -> Result<()> {
398        self.decoder = Some(match encoding {
399            Encoding::PLAIN => Decoder::Plain {
400                buf: data,
401                offset: 0,
402            },
403            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
404                decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
405            },
406            Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
407                decoder: DeltaByteArrayDecoder::new(data)?,
408            },
409            Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
410                buf: data,
411                offset: 0,
412            },
413            _ => {
414                return Err(general_err!(
415                    "unsupported encoding for fixed length byte array: {}",
416                    encoding
417                ));
418            }
419        });
420        Ok(())
421    }
422
423    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
424        match out.byte_length {
425            Some(x) => assert_eq!(x, self.byte_length),
426            None => {
427                out.byte_length = Some(self.byte_length);
428                // TODO: collapse to a let-chain once MSRV ≥ 1.88
429                // (`if out.buffer.is_empty() && let Some(cap) = out.values_capacity.take()`)
430                if out.buffer.is_empty() {
431                    if let Some(values_capacity) = out.values_capacity.take() {
432                        // now that the byte length per output element is known,
433                        // allocate the actual needed space.
434                        let byte_capacity = values_capacity.saturating_mul(self.byte_length);
435                        out.buffer = Vec::with_capacity(byte_capacity);
436                    }
437                }
438            }
439        }
440
441        match self.decoder.as_mut().unwrap() {
442            Decoder::Plain { offset, buf } => {
443                let to_read =
444                    (num_values * self.byte_length).min(buf.len() - *offset) / self.byte_length;
445                let end_offset = *offset + to_read * self.byte_length;
446                out.buffer
447                    .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
448                *offset = end_offset;
449                Ok(to_read)
450            }
451            Decoder::Dict { decoder } => {
452                let dict = self.dict_page.as_ref().unwrap();
453                // All data must be NULL
454                if dict.is_empty() {
455                    return Ok(0);
456                }
457
458                decoder.read(num_values, |keys| {
459                    out.buffer.reserve(keys.len() * self.byte_length);
460                    for key in keys {
461                        let offset = *key as usize * self.byte_length;
462                        let val = &dict.as_ref()[offset..offset + self.byte_length];
463                        out.buffer.extend_from_slice(val);
464                    }
465                    Ok(())
466                })
467            }
468            Decoder::Delta { decoder } => {
469                let to_read = num_values.min(decoder.remaining());
470                out.buffer.reserve(to_read * self.byte_length);
471
472                decoder.read(to_read, |slice| {
473                    if slice.len() != self.byte_length {
474                        return Err(general_err!(
475                            "encountered array with incorrect length, got {} expected {}",
476                            slice.len(),
477                            self.byte_length
478                        ));
479                    }
480                    out.buffer.extend_from_slice(slice);
481                    Ok(())
482                })
483            }
484            Decoder::ByteStreamSplit { buf, offset } => {
485                // we have n=`byte_length` streams of length `buf.len/byte_length`
486                // to read value i, we need the i'th byte from each of the streams
487                // so `offset` should be the value offset, not the byte offset
488                let total_values = buf.len() / self.byte_length;
489                let to_read = num_values.min(total_values - *offset);
490
491                // now read the n streams and reassemble values into the output buffer
492                read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);
493
494                *offset += to_read;
495                Ok(to_read)
496            }
497        }
498    }
499
500    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
501        match self.decoder.as_mut().unwrap() {
502            Decoder::Plain { offset, buf } => {
503                let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
504                *offset += to_read * self.byte_length;
505                Ok(to_read)
506            }
507            Decoder::Dict { decoder } => decoder.skip(num_values),
508            Decoder::Delta { decoder } => decoder.skip(num_values),
509            Decoder::ByteStreamSplit { offset, buf } => {
510                let total_values = buf.len() / self.byte_length;
511                let to_read = num_values.min(total_values - *offset);
512                *offset += to_read;
513                Ok(to_read)
514            }
515        }
516    }
517}
518
519// `src` is an array laid out like a NxM matrix where N == `data_width` and
520// M == total_values_in_src. Each "row" of the matrix is a stream of bytes, with stream `i`
521// containing the `ith` byte for each value. Each "column" is a single value.
522// This will reassemble `num_values` values by reading columns of the matrix starting at
523// `offset`. Values will be appended to `dst`.
524fn read_byte_stream_split(
525    dst: &mut Vec<u8>,
526    src: &mut Bytes,
527    offset: usize,
528    num_values: usize,
529    data_width: usize,
530) {
531    let stride = src.len() / data_width;
532    let idx = dst.len();
533    dst.resize(idx + num_values * data_width, 0u8);
534    let dst_slc = &mut dst[idx..idx + num_values * data_width];
535    for j in 0..data_width {
536        let src_slc = &src[offset + j * stride..offset + j * stride + num_values];
537        for i in 0..num_values {
538            dst_slc[i * data_width + j] = src_slc[i];
539        }
540    }
541}
542
543enum Decoder {
544    Plain { buf: Bytes, offset: usize },
545    Dict { decoder: DictIndexDecoder },
546    Delta { decoder: DeltaByteArrayDecoder },
547    ByteStreamSplit { buf: Bytes, offset: usize },
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use crate::arrow::ArrowWriter;
554    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
555    use arrow::datatypes::Field;
556    use arrow::error::Result as ArrowResult;
557    use arrow_array::{Array, ListArray};
558    use arrow_array::{Decimal256Array, RecordBatch};
559    use bytes::Bytes;
560    use std::sync::Arc;
561
562    #[test]
563    fn test_decimal_list() {
564        let decimals = Decimal256Array::from_iter_values(
565            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
566        );
567
568        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
569        let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new_list_field(
570            decimals.data_type().clone(),
571            false,
572        ))))
573        .len(7)
574        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
575        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
576        .child_data(vec![decimals.into_data()])
577        .build()
578        .unwrap();
579
580        let written =
581            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
582                .unwrap();
583
584        let mut buffer = Vec::with_capacity(1024);
585        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
586        writer.write(&written).unwrap();
587        writer.close().unwrap();
588
589        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
590            .unwrap()
591            .collect::<ArrowResult<Vec<_>>>()
592            .unwrap();
593
594        assert_eq!(&written.slice(0, 3), &read[0]);
595        assert_eq!(&written.slice(3, 3), &read[1]);
596        assert_eq!(&written.slice(6, 1), &read[2]);
597    }
598}