orc_rust/array_decoder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
21use arrow::buffer::NullBuffer;
22use arrow::datatypes::ArrowNativeTypeOp;
23use arrow::datatypes::ArrowPrimitiveType;
24use arrow::datatypes::{DataType as ArrowDataType, Field};
25use arrow::datatypes::{
26    Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
27};
28use arrow::record_batch::{RecordBatch, RecordBatchOptions};
29use snafu::{ensure, ResultExt};
30
31use crate::column::Column;
32use crate::encoding::boolean::BooleanDecoder;
33use crate::encoding::byte::ByteRleDecoder;
34use crate::encoding::float::FloatDecoder;
35use crate::encoding::integer::get_rle_reader;
36use crate::encoding::PrimitiveValueDecoder;
37use crate::error::{
38    self, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
39};
40use crate::proto::stream::Kind;
41use crate::schema::DataType;
42use crate::stripe::Stripe;
43
44use self::decimal::new_decimal_decoder;
45use self::list::ListArrayDecoder;
46use self::map::MapArrayDecoder;
47use self::string::{new_binary_decoder, new_string_decoder};
48use self::struct_decoder::StructArrayDecoder;
49use self::timestamp::{new_timestamp_decoder, new_timestamp_instant_decoder};
50use self::union::UnionArrayDecoder;
51
52mod decimal;
53mod list;
54mod map;
55mod string;
56mod struct_decoder;
57mod timestamp;
58mod union;
59
60pub trait ArrayBatchDecoder: Send {
61    /// Used as base for decoding ORC columns into Arrow arrays. Provide an input `batch_size`
62    /// which specifies the upper limit of the number of values returned in the output array.
63    ///
64    /// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
65    /// then the child doesn't have a value (similar to other nullability). So we need
66    /// to take care to insert these null values as Arrow requires the child to hold
67    /// data in the null slot of the child.
68    // TODO: encode nullability in generic -> for a given column in a stripe, we will always know
69    //       upfront if we need to bother with nulls or not, so we don't need to keep checking this
70    //       for every invocation of next_batch
71    // NOTE: null parent may have non-null child, so would still have to account for this
72    fn next_batch(
73        &mut self,
74        batch_size: usize,
75        parent_present: Option<&NullBuffer>,
76    ) -> Result<ArrayRef>;
77}
78
79struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
80    iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
81    present: Option<PresentDecoder>,
82}
83
84impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
85    pub fn new(
86        iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
87        present: Option<PresentDecoder>,
88    ) -> Self {
89        Self { iter, present }
90    }
91
92    fn next_primitive_batch(
93        &mut self,
94        batch_size: usize,
95        parent_present: Option<&NullBuffer>,
96    ) -> Result<PrimitiveArray<T>> {
97        let present =
98            derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
99        let mut data = vec![T::Native::ZERO; batch_size];
100        match present {
101            Some(present) => {
102                self.iter.decode_spaced(data.as_mut_slice(), &present)?;
103                let array = PrimitiveArray::<T>::new(data.into(), Some(present));
104                Ok(array)
105            }
106            None => {
107                self.iter.decode(data.as_mut_slice())?;
108                let array = PrimitiveArray::<T>::from_iter_values(data);
109                Ok(array)
110            }
111        }
112    }
113}
114
115impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
116    fn next_batch(
117        &mut self,
118        batch_size: usize,
119        parent_present: Option<&NullBuffer>,
120    ) -> Result<ArrayRef> {
121        let array = self.next_primitive_batch(batch_size, parent_present)?;
122        let array = Arc::new(array) as ArrayRef;
123        Ok(array)
124    }
125}
126
127type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
128type Int32ArrayDecoder = PrimitiveArrayDecoder<Int32Type>;
129type Int16ArrayDecoder = PrimitiveArrayDecoder<Int16Type>;
130type Int8ArrayDecoder = PrimitiveArrayDecoder<Int8Type>;
131type Float32ArrayDecoder = PrimitiveArrayDecoder<Float32Type>;
132type Float64ArrayDecoder = PrimitiveArrayDecoder<Float64Type>;
133type DateArrayDecoder = PrimitiveArrayDecoder<Date32Type>; // TODO: does ORC encode as i64 or i32?
134
135struct BooleanArrayDecoder {
136    iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
137    present: Option<PresentDecoder>,
138}
139
140impl BooleanArrayDecoder {
141    pub fn new(
142        iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
143        present: Option<PresentDecoder>,
144    ) -> Self {
145        Self { iter, present }
146    }
147}
148
149impl ArrayBatchDecoder for BooleanArrayDecoder {
150    fn next_batch(
151        &mut self,
152        batch_size: usize,
153        parent_present: Option<&NullBuffer>,
154    ) -> Result<ArrayRef> {
155        let present =
156            derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
157        let mut data = vec![false; batch_size];
158        let array = match present {
159            Some(present) => {
160                self.iter.decode_spaced(data.as_mut_slice(), &present)?;
161                BooleanArray::new(data.into(), Some(present))
162            }
163            None => {
164                self.iter.decode(data.as_mut_slice())?;
165                BooleanArray::from(data)
166            }
167        };
168        Ok(Arc::new(array))
169    }
170}
171
172struct PresentDecoder {
173    // TODO: ideally directly reference BooleanDecoder, doing this way to avoid
174    //       the generic propagation that would be required (BooleanDecoder<R: Read>)
175    inner: Box<dyn PrimitiveValueDecoder<bool> + Send>,
176}
177
178impl PresentDecoder {
179    fn from_stripe(stripe: &Stripe, column: &Column) -> Option<Self> {
180        stripe
181            .stream_map()
182            .get_opt(column, Kind::Present)
183            .map(|stream| {
184                let inner = Box::new(BooleanDecoder::new(stream));
185                PresentDecoder { inner }
186            })
187    }
188
189    fn next_buffer(&mut self, size: usize) -> Result<NullBuffer> {
190        let mut data = vec![false; size];
191        self.inner.decode(&mut data)?;
192        Ok(NullBuffer::from(data))
193    }
194}
195
196fn merge_parent_present(
197    parent_present: &NullBuffer,
198    present: Result<NullBuffer>,
199) -> Result<NullBuffer> {
200    let present = present?;
201    let non_null_count = parent_present.len() - parent_present.null_count();
202    debug_assert!(present.len() == non_null_count);
203    let mut builder = BooleanBufferBuilder::new(parent_present.len());
204    builder.append_n(parent_present.len(), false);
205    for (idx, p) in parent_present.valid_indices().zip(present.iter()) {
206        builder.set_bit(idx, p);
207    }
208    Ok(builder.finish().into())
209}
210
211fn derive_present_vec(
212    present: &mut Option<PresentDecoder>,
213    parent_present: Option<&NullBuffer>,
214    batch_size: usize,
215) -> Option<Result<NullBuffer>> {
216    let present = match (present, parent_present) {
217        (Some(present), Some(parent_present)) => {
218            let element_count = parent_present.len() - parent_present.null_count();
219            let present = present.next_buffer(element_count);
220            Some(merge_parent_present(parent_present, present))
221        }
222        (Some(present), None) => Some(present.next_buffer(batch_size)),
223        (None, Some(parent_present)) => Some(Ok(parent_present.clone())),
224        (None, None) => None,
225    };
226
227    // omit the null buffer if there are no nulls
228    match present {
229        Some(Ok(present)) if present.null_count() > 0 => Some(Ok(present)),
230        _ => None,
231    }
232}
233
234pub struct NaiveStripeDecoder {
235    stripe: Stripe,
236    schema_ref: SchemaRef,
237    decoders: Vec<Box<dyn ArrayBatchDecoder>>,
238    index: usize,
239    batch_size: usize,
240    number_of_rows: usize,
241}
242
243impl Iterator for NaiveStripeDecoder {
244    type Item = Result<RecordBatch>;
245
246    fn next(&mut self) -> Option<Self::Item> {
247        if self.index < self.number_of_rows {
248            let record = self
249                .decode_next_batch(self.number_of_rows - self.index)
250                .transpose()?;
251            self.index += self.batch_size;
252            Some(record)
253        } else {
254            None
255        }
256    }
257}
258
259pub fn array_decoder_factory(
260    column: &Column,
261    field: Arc<Field>,
262    stripe: &Stripe,
263) -> Result<Box<dyn ArrayBatchDecoder>> {
264    let decoder: Box<dyn ArrayBatchDecoder> = match (column.data_type(), field.data_type()) {
265        // TODO: try make branches more generic, reduce duplication
266        (DataType::Boolean { .. }, ArrowDataType::Boolean) => {
267            let iter = stripe.stream_map().get(column, Kind::Data);
268            let iter = Box::new(BooleanDecoder::new(iter));
269            let present = PresentDecoder::from_stripe(stripe, column);
270            Box::new(BooleanArrayDecoder::new(iter, present))
271        }
272        (DataType::Byte { .. }, ArrowDataType::Int8) => {
273            let iter = stripe.stream_map().get(column, Kind::Data);
274            let iter = Box::new(ByteRleDecoder::new(iter));
275            let present = PresentDecoder::from_stripe(stripe, column);
276            Box::new(Int8ArrayDecoder::new(iter, present))
277        }
278        (DataType::Short { .. }, ArrowDataType::Int16) => {
279            let iter = stripe.stream_map().get(column, Kind::Data);
280            let iter = get_rle_reader(column, iter)?;
281            let present = PresentDecoder::from_stripe(stripe, column);
282            Box::new(Int16ArrayDecoder::new(iter, present))
283        }
284        (DataType::Int { .. }, ArrowDataType::Int32) => {
285            let iter = stripe.stream_map().get(column, Kind::Data);
286            let iter = get_rle_reader(column, iter)?;
287            let present = PresentDecoder::from_stripe(stripe, column);
288            Box::new(Int32ArrayDecoder::new(iter, present))
289        }
290        (DataType::Long { .. }, ArrowDataType::Int64) => {
291            let iter = stripe.stream_map().get(column, Kind::Data);
292            let iter = get_rle_reader(column, iter)?;
293            let present = PresentDecoder::from_stripe(stripe, column);
294            Box::new(Int64ArrayDecoder::new(iter, present))
295        }
296        (DataType::Float { .. }, ArrowDataType::Float32) => {
297            let iter = stripe.stream_map().get(column, Kind::Data);
298            let iter = Box::new(FloatDecoder::new(iter));
299            let present = PresentDecoder::from_stripe(stripe, column);
300            Box::new(Float32ArrayDecoder::new(iter, present))
301        }
302        (DataType::Double { .. }, ArrowDataType::Float64) => {
303            let iter = stripe.stream_map().get(column, Kind::Data);
304            let iter = Box::new(FloatDecoder::new(iter));
305            let present = PresentDecoder::from_stripe(stripe, column);
306            Box::new(Float64ArrayDecoder::new(iter, present))
307        }
308        (DataType::String { .. }, ArrowDataType::Utf8)
309        | (DataType::Varchar { .. }, ArrowDataType::Utf8)
310        | (DataType::Char { .. }, ArrowDataType::Utf8) => new_string_decoder(column, stripe)?,
311        (DataType::Binary { .. }, ArrowDataType::Binary) => new_binary_decoder(column, stripe)?,
312        (
313            DataType::Decimal {
314                precision, scale, ..
315            },
316            ArrowDataType::Decimal128(a_precision, a_scale),
317        ) if *precision as u8 == *a_precision && *scale as i8 == *a_scale => {
318            new_decimal_decoder(column, stripe, *precision, *scale)?
319        }
320        (DataType::Timestamp { .. }, field_type) => {
321            new_timestamp_decoder(column, field_type.clone(), stripe)?
322        }
323        (DataType::TimestampWithLocalTimezone { .. }, field_type) => {
324            new_timestamp_instant_decoder(column, field_type.clone(), stripe)?
325        }
326        (DataType::Date { .. }, ArrowDataType::Date32) => {
327            // TODO: allow Date64
328            let iter = stripe.stream_map().get(column, Kind::Data);
329            let iter = get_rle_reader(column, iter)?;
330            let present = PresentDecoder::from_stripe(stripe, column);
331            Box::new(DateArrayDecoder::new(iter, present))
332        }
333        (DataType::Struct { .. }, ArrowDataType::Struct(fields)) => {
334            Box::new(StructArrayDecoder::new(column, fields.clone(), stripe)?)
335        }
336        (DataType::List { .. }, ArrowDataType::List(field)) => {
337            // TODO: add support for ArrowDataType::LargeList
338            Box::new(ListArrayDecoder::new(column, field.clone(), stripe)?)
339        }
340        (DataType::Map { .. }, ArrowDataType::Map(entries, sorted)) => {
341            ensure!(!sorted, UnsupportedTypeVariantSnafu { msg: "Sorted map" });
342            let ArrowDataType::Struct(entries) = entries.data_type() else {
343                UnexpectedSnafu {
344                    msg: "arrow Map with non-Struct entry type".to_owned(),
345                }
346                .fail()?
347            };
348            ensure!(
349                entries.len() == 2,
350                UnexpectedSnafu {
351                    msg: format!(
352                        "arrow Map with {} columns per entry (expected 2)",
353                        entries.len()
354                    )
355                }
356            );
357            let keys_field = entries[0].clone();
358            let values_field = entries[1].clone();
359
360            Box::new(MapArrayDecoder::new(
361                column,
362                keys_field,
363                values_field,
364                stripe,
365            )?)
366        }
367        (DataType::Union { .. }, ArrowDataType::Union(fields, _)) => {
368            Box::new(UnionArrayDecoder::new(column, fields.clone(), stripe)?)
369        }
370        (data_type, field_type) => {
371            return MismatchedSchemaSnafu {
372                orc_type: data_type.clone(),
373                arrow_type: field_type.clone(),
374            }
375            .fail()
376        }
377    };
378
379    Ok(decoder)
380}
381
382impl NaiveStripeDecoder {
383    fn inner_decode_next_batch(&mut self, remaining: usize) -> Result<Vec<ArrayRef>> {
384        let chunk = self.batch_size.min(remaining);
385
386        let mut fields = Vec::with_capacity(self.stripe.columns().len());
387
388        for decoder in &mut self.decoders {
389            let array = decoder.next_batch(chunk, None)?;
390            if array.is_empty() {
391                break;
392            } else {
393                fields.push(array);
394            }
395        }
396
397        Ok(fields)
398    }
399
400    fn decode_next_batch(&mut self, remaining: usize) -> Result<Option<RecordBatch>> {
401        let fields = self.inner_decode_next_batch(remaining)?;
402
403        if fields.is_empty() {
404            if remaining == 0 {
405                Ok(None)
406            } else {
407                // In case of empty projection, we need to create a RecordBatch with `row_count` only
408                // to reflect the row number
409                Ok(Some(
410                    RecordBatch::try_new_with_options(
411                        Arc::clone(&self.schema_ref),
412                        fields,
413                        &RecordBatchOptions::new()
414                            .with_row_count(Some(self.batch_size.min(remaining))),
415                    )
416                    .context(error::ConvertRecordBatchSnafu)?,
417                ))
418            }
419        } else {
420            //TODO(weny): any better way?
421            let fields = self
422                .schema_ref
423                .fields
424                .into_iter()
425                .map(|field| field.name())
426                .zip(fields)
427                .collect::<Vec<_>>();
428
429            Ok(Some(
430                RecordBatch::try_from_iter(fields).context(error::ConvertRecordBatchSnafu)?,
431            ))
432        }
433    }
434
435    pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
436        let mut decoders = Vec::with_capacity(stripe.columns().len());
437        let number_of_rows = stripe.number_of_rows();
438
439        for (col, field) in stripe
440            .columns()
441            .iter()
442            .zip(schema_ref.fields.iter().cloned())
443        {
444            let decoder = array_decoder_factory(col, field, &stripe)?;
445            decoders.push(decoder);
446        }
447
448        Ok(Self {
449            stripe,
450            schema_ref,
451            decoders,
452            index: 0,
453            batch_size,
454            number_of_rows,
455        })
456    }
457}