orc_rust/array_decoder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
21use arrow::buffer::NullBuffer;
22use arrow::datatypes::ArrowNativeTypeOp;
23use arrow::datatypes::ArrowPrimitiveType;
24use arrow::datatypes::DataType as ArrowDataType;
25use arrow::datatypes::{
26    Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
27};
28use arrow::record_batch::{RecordBatch, RecordBatchOptions};
29use snafu::{ensure, ResultExt};
30
31use crate::column::Column;
32use crate::encoding::boolean::BooleanDecoder;
33use crate::encoding::byte::ByteRleDecoder;
34use crate::encoding::float::FloatDecoder;
35use crate::encoding::integer::get_signed_int_decoder;
36use crate::encoding::PrimitiveValueDecoder;
37use crate::error::{
38    self, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
39};
40use crate::proto::stream::Kind;
41use crate::schema::DataType;
42use crate::stripe::Stripe;
43use crate::RowSelection;
44
45use self::decimal::new_decimal_decoder;
46use self::list::ListArrayDecoder;
47use self::map::MapArrayDecoder;
48use self::string::{new_binary_decoder, new_string_decoder};
49use self::struct_decoder::StructArrayDecoder;
50use self::timestamp::{new_timestamp_decoder, new_timestamp_instant_decoder};
51use self::union::UnionArrayDecoder;
52
53mod decimal;
54mod list;
55mod map;
56mod string;
57mod struct_decoder;
58mod timestamp;
59mod union;
60
61pub trait ArrayBatchDecoder: Send {
62    /// Used as base for decoding ORC columns into Arrow arrays. Provide an input `batch_size`
63    /// which specifies the upper limit of the number of values returned in the output array.
64    ///
65    /// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
66    /// then the child doesn't have a value (similar to other nullability). So we need
67    /// to take care to insert these null values as Arrow requires the child to hold
68    /// data in the null slot of the child.
69    // TODO: encode nullability in generic -> for a given column in a stripe, we will always know
70    //       upfront if we need to bother with nulls or not, so we don't need to keep checking this
71    //       for every invocation of next_batch
72    // NOTE: null parent may have non-null child, so would still have to account for this
73    fn next_batch(
74        &mut self,
75        batch_size: usize,
76        parent_present: Option<&NullBuffer>,
77    ) -> Result<ArrayRef>;
78
79    /// Skip the next `n` values without decoding them, failing if it cannot skip the enough values.
80    /// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
81    /// then the child doesn't have a value (similar to other nullability). So we need
82    /// to take care to insert these null values as Arrow requires the child to hold
83    /// data in the null slot of the child.
84    fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()>;
85}
86
87struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
88    iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
89    present: Option<PresentDecoder>,
90}
91
92impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
93    pub fn new(
94        iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
95        present: Option<PresentDecoder>,
96    ) -> Self {
97        Self { iter, present }
98    }
99
100    fn next_primitive_batch(
101        &mut self,
102        batch_size: usize,
103        parent_present: Option<&NullBuffer>,
104    ) -> Result<PrimitiveArray<T>> {
105        let present =
106            derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
107        let mut data = vec![T::Native::ZERO; batch_size];
108        match present {
109            Some(present) => {
110                self.iter.decode_spaced(data.as_mut_slice(), &present)?;
111                let array = PrimitiveArray::<T>::new(data.into(), Some(present));
112                Ok(array)
113            }
114            None => {
115                self.iter.decode(data.as_mut_slice())?;
116                let array = PrimitiveArray::<T>::from_iter_values(data);
117                Ok(array)
118            }
119        }
120    }
121}
122
123impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
124    fn next_batch(
125        &mut self,
126        batch_size: usize,
127        parent_present: Option<&NullBuffer>,
128    ) -> Result<ArrayRef> {
129        let array = self.next_primitive_batch(batch_size, parent_present)?;
130        let array = Arc::new(array) as ArrayRef;
131        Ok(array)
132    }
133
134    fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
135        let non_null_count =
136            skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
137        self.iter.skip(non_null_count)
138    }
139}
140
141type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
142type Int32ArrayDecoder = PrimitiveArrayDecoder<Int32Type>;
143type Int16ArrayDecoder = PrimitiveArrayDecoder<Int16Type>;
144type Int8ArrayDecoder = PrimitiveArrayDecoder<Int8Type>;
145type Float32ArrayDecoder = PrimitiveArrayDecoder<Float32Type>;
146type Float64ArrayDecoder = PrimitiveArrayDecoder<Float64Type>;
147type DateArrayDecoder = PrimitiveArrayDecoder<Date32Type>; // TODO: does ORC encode as i64 or i32?
148
149struct BooleanArrayDecoder {
150    iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
151    present: Option<PresentDecoder>,
152}
153
154impl BooleanArrayDecoder {
155    pub fn new(
156        iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
157        present: Option<PresentDecoder>,
158    ) -> Self {
159        Self { iter, present }
160    }
161}
162
163impl ArrayBatchDecoder for BooleanArrayDecoder {
164    fn next_batch(
165        &mut self,
166        batch_size: usize,
167        parent_present: Option<&NullBuffer>,
168    ) -> Result<ArrayRef> {
169        let present =
170            derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
171        let mut data = vec![false; batch_size];
172        let array = match present {
173            Some(present) => {
174                self.iter.decode_spaced(data.as_mut_slice(), &present)?;
175                BooleanArray::new(data.into(), Some(present))
176            }
177            None => {
178                self.iter.decode(data.as_mut_slice())?;
179                BooleanArray::from(data)
180            }
181        };
182        Ok(Arc::new(array))
183    }
184
185    fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
186        let non_null_count =
187            skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
188        self.iter.skip(non_null_count)
189    }
190}
191
192struct PresentDecoder {
193    // TODO: ideally directly reference BooleanDecoder, doing this way to avoid
194    //       the generic propagation that would be required (BooleanDecoder<R: Read>)
195    inner: Box<dyn PrimitiveValueDecoder<bool> + Send>,
196}
197
198impl PresentDecoder {
199    fn from_stripe(stripe: &Stripe, column: &Column) -> Option<Self> {
200        stripe
201            .stream_map()
202            .get_opt(column, Kind::Present)
203            .map(|stream| {
204                let inner = Box::new(BooleanDecoder::new(stream));
205                PresentDecoder { inner }
206            })
207    }
208
209    fn next_buffer(&mut self, size: usize) -> Result<NullBuffer> {
210        let mut data = vec![false; size];
211        self.inner.decode(&mut data)?;
212        Ok(NullBuffer::from(data))
213    }
214}
215
216fn merge_parent_present(
217    parent_present: &NullBuffer,
218    present: Result<NullBuffer>,
219) -> Result<NullBuffer> {
220    let present = present?;
221    let non_null_count = parent_present.len() - parent_present.null_count();
222    debug_assert!(present.len() == non_null_count);
223    let mut builder = BooleanBufferBuilder::new(parent_present.len());
224    builder.append_n(parent_present.len(), false);
225    for (idx, p) in parent_present.valid_indices().zip(present.iter()) {
226        builder.set_bit(idx, p);
227    }
228    Ok(builder.finish().into())
229}
230
231fn derive_present_vec(
232    present: &mut Option<PresentDecoder>,
233    parent_present: Option<&NullBuffer>,
234    batch_size: usize,
235) -> Option<Result<NullBuffer>> {
236    let present = match (present, parent_present) {
237        (Some(present), Some(parent_present)) => {
238            let element_count = parent_present.len() - parent_present.null_count();
239            let present = present.next_buffer(element_count);
240            Some(merge_parent_present(parent_present, present))
241        }
242        (Some(present), None) => Some(present.next_buffer(batch_size)),
243        (None, Some(parent_present)) => Some(Ok(parent_present.clone())),
244        (None, None) => None,
245    };
246
247    // omit the null buffer if there are no nulls
248    match present {
249        Some(Ok(present)) if present.null_count() > 0 => Some(Ok(present)),
250        _ => None,
251    }
252}
253
254/// Skip n values and return the non-null count for the data stream
255fn skip_present_and_get_non_null_count(
256    present: &mut Option<PresentDecoder>,
257    parent_present: Option<&NullBuffer>,
258    n: usize,
259) -> Result<usize> {
260    match (present, parent_present) {
261        (Some(present), Some(parent_present)) => {
262            // Parent has nulls, so we need to decode parent present to know how many
263            // of our present values to skip
264            let non_null_in_parent = parent_present.len() - parent_present.null_count();
265
266            // Skip our present values for non-null parents and count non-nulls
267            let mut our_present = vec![false; non_null_in_parent];
268            present.inner.decode(&mut our_present)?;
269            let our_non_null_count = our_present.iter().filter(|&&v| v).count();
270
271            Ok(our_non_null_count)
272        }
273        (Some(present), None) => {
274            // No parent present, skip n values and count non-nulls
275            let mut present_values = vec![false; n];
276            present.inner.decode(&mut present_values)?;
277            Ok(present_values.iter().filter(|&&v| v).count())
278        }
279        (None, Some(parent_present)) => {
280            // No our present stream, all non-null parents have data
281            Ok(parent_present.len() - parent_present.null_count())
282        }
283        (None, None) => {
284            // No nulls at all, all n values have data
285            Ok(n)
286        }
287    }
288}
289
290pub struct NaiveStripeDecoder {
291    stripe: Stripe,
292    schema_ref: SchemaRef,
293    decoders: Vec<Box<dyn ArrayBatchDecoder>>,
294    index: usize,
295    batch_size: usize,
296    number_of_rows: usize,
297    row_selection: Option<RowSelection>,
298    selection_index: usize,
299}
300
301impl NaiveStripeDecoder {
302    /// Advance according to the configured row selection and return the next batch, if any.
303    ///
304    /// Behavior:
305    /// - Iterates `RowSelection` segments (skip/select) starting at `selection_index`.
306    /// - For skip segments: clamp to remaining rows in this stripe, advance decoders via
307    ///   `skip_rows(actual_skip)`, and advance `index`. If the segment is fully consumed,
308    ///   increment `selection_index`.
309    /// - For select segments: decode up to `min(row_count, batch_size, remaining_in_stripe)`,
310    ///   advance `index`, update `selection_index` if fully consumed, and return the batch.
311    /// - If a segment requests rows beyond the end of the stripe, it is skipped (advancing
312    ///   `selection_index`) without touching decoders.
313    fn next_with_row_selection(&mut self) -> Option<Result<RecordBatch>> {
314        // Process selectors until we produce a batch or exhaust selection
315        loop {
316            let (is_skip, row_count) = {
317                let selectors = self.row_selection.as_ref().unwrap().selectors();
318                if self.selection_index >= selectors.len() {
319                    return None;
320                }
321                let selector = selectors[self.selection_index];
322                (selector.skip, selector.row_count)
323            };
324
325            if is_skip {
326                let remaining = self.number_of_rows - self.index;
327                let actual_skip = row_count.min(remaining);
328
329                if actual_skip == 0 {
330                    // Nothing to skip in this stripe; try next selector
331                    self.selection_index += 1;
332                    continue;
333                }
334
335                // Keep decoders in sync by skipping values per column
336                if let Err(e) = self.skip_rows(actual_skip) {
337                    return Some(Err(e));
338                }
339                self.index += actual_skip;
340
341                if actual_skip >= row_count {
342                    self.selection_index += 1;
343                }
344            } else {
345                let rows_to_read = row_count.min(self.batch_size);
346                let remaining = self.number_of_rows - self.index;
347                let actual_rows = rows_to_read.min(remaining);
348
349                if actual_rows == 0 {
350                    // Nothing to read from this selector in this stripe; advance selector
351                    self.selection_index += 1;
352                    continue;
353                }
354
355                let record = self.decode_next_batch(actual_rows).transpose()?;
356                self.index += actual_rows;
357
358                if actual_rows >= row_count {
359                    self.selection_index += 1;
360                }
361                return Some(record);
362            }
363        }
364    }
365}
366
367impl Iterator for NaiveStripeDecoder {
368    type Item = Result<RecordBatch>;
369
370    // TODO: check if we can make this more efficient
371    fn next(&mut self) -> Option<Self::Item> {
372        if self.index < self.number_of_rows {
373            // Handle row selection if present
374            if self.row_selection.is_some() {
375                self.next_with_row_selection()
376            } else {
377                // No row selection - decode normally
378                let record = self
379                    .decode_next_batch(self.number_of_rows - self.index)
380                    .transpose()?;
381                self.index += self.batch_size;
382                Some(record)
383            }
384        } else {
385            None
386        }
387    }
388}
389
390pub fn array_decoder_factory(
391    column: &Column,
392    hinted_arrow_type: &ArrowDataType,
393    stripe: &Stripe,
394) -> Result<Box<dyn ArrayBatchDecoder>> {
395    let decoder: Box<dyn ArrayBatchDecoder> = match (column.data_type(), hinted_arrow_type) {
396        // TODO: try make branches more generic, reduce duplication
397        (DataType::Boolean { .. }, ArrowDataType::Boolean) => {
398            let iter = stripe.stream_map().get(column, Kind::Data);
399            let iter = Box::new(BooleanDecoder::new(iter));
400            let present = PresentDecoder::from_stripe(stripe, column);
401            Box::new(BooleanArrayDecoder::new(iter, present))
402        }
403        (DataType::Byte { .. }, ArrowDataType::Int8) => {
404            let iter = stripe.stream_map().get(column, Kind::Data);
405            let iter = Box::new(ByteRleDecoder::new(iter));
406            let present = PresentDecoder::from_stripe(stripe, column);
407            Box::new(Int8ArrayDecoder::new(iter, present))
408        }
409        (DataType::Short { .. }, ArrowDataType::Int16) => {
410            let iter = stripe.stream_map().get(column, Kind::Data);
411            let iter = get_signed_int_decoder(iter, column.rle_version());
412            let present = PresentDecoder::from_stripe(stripe, column);
413            Box::new(Int16ArrayDecoder::new(iter, present))
414        }
415        (DataType::Int { .. }, ArrowDataType::Int32) => {
416            let iter = stripe.stream_map().get(column, Kind::Data);
417            let iter = get_signed_int_decoder(iter, column.rle_version());
418            let present = PresentDecoder::from_stripe(stripe, column);
419            Box::new(Int32ArrayDecoder::new(iter, present))
420        }
421        (DataType::Long { .. }, ArrowDataType::Int64) => {
422            let iter = stripe.stream_map().get(column, Kind::Data);
423            let iter = get_signed_int_decoder(iter, column.rle_version());
424            let present = PresentDecoder::from_stripe(stripe, column);
425            Box::new(Int64ArrayDecoder::new(iter, present))
426        }
427        (DataType::Float { .. }, ArrowDataType::Float32) => {
428            let iter = stripe.stream_map().get(column, Kind::Data);
429            let iter = Box::new(FloatDecoder::new(iter));
430            let present = PresentDecoder::from_stripe(stripe, column);
431            Box::new(Float32ArrayDecoder::new(iter, present))
432        }
433        (DataType::Double { .. }, ArrowDataType::Float64) => {
434            let iter = stripe.stream_map().get(column, Kind::Data);
435            let iter = Box::new(FloatDecoder::new(iter));
436            let present = PresentDecoder::from_stripe(stripe, column);
437            Box::new(Float64ArrayDecoder::new(iter, present))
438        }
439        (DataType::String { .. }, ArrowDataType::Utf8)
440        | (DataType::Varchar { .. }, ArrowDataType::Utf8)
441        | (DataType::Char { .. }, ArrowDataType::Utf8) => new_string_decoder(column, stripe)?,
442        (DataType::Binary { .. }, ArrowDataType::Binary) => new_binary_decoder(column, stripe)?,
443        (
444            DataType::Decimal {
445                precision, scale, ..
446            },
447            ArrowDataType::Decimal128(a_precision, a_scale),
448        ) if *precision as u8 == *a_precision && *scale as i8 == *a_scale => {
449            new_decimal_decoder(column, stripe, *precision, *scale)
450        }
451        (DataType::Timestamp { .. }, field_type) => {
452            new_timestamp_decoder(column, field_type.clone(), stripe)?
453        }
454        (DataType::TimestampWithLocalTimezone { .. }, field_type) => {
455            new_timestamp_instant_decoder(column, field_type.clone(), stripe)?
456        }
457        (DataType::Date { .. }, ArrowDataType::Date32) => {
458            // TODO: allow Date64
459            let iter = stripe.stream_map().get(column, Kind::Data);
460            let iter = get_signed_int_decoder(iter, column.rle_version());
461            let present = PresentDecoder::from_stripe(stripe, column);
462            Box::new(DateArrayDecoder::new(iter, present))
463        }
464        (DataType::Struct { .. }, ArrowDataType::Struct(fields)) => {
465            Box::new(StructArrayDecoder::new(column, fields.clone(), stripe)?)
466        }
467        (DataType::List { .. }, ArrowDataType::List(field)) => {
468            // TODO: add support for ArrowDataType::LargeList
469            Box::new(ListArrayDecoder::new(column, field.clone(), stripe)?)
470        }
471        (DataType::Map { .. }, ArrowDataType::Map(entries, sorted)) => {
472            ensure!(!sorted, UnsupportedTypeVariantSnafu { msg: "Sorted map" });
473            let ArrowDataType::Struct(entries) = entries.data_type() else {
474                UnexpectedSnafu {
475                    msg: "arrow Map with non-Struct entry type".to_owned(),
476                }
477                .fail()?
478            };
479            ensure!(
480                entries.len() == 2,
481                UnexpectedSnafu {
482                    msg: format!(
483                        "arrow Map with {} columns per entry (expected 2)",
484                        entries.len()
485                    )
486                }
487            );
488            let keys_field = entries[0].clone();
489            let values_field = entries[1].clone();
490
491            Box::new(MapArrayDecoder::new(
492                column,
493                keys_field,
494                values_field,
495                stripe,
496            )?)
497        }
498        (DataType::Union { .. }, ArrowDataType::Union(fields, _)) => {
499            Box::new(UnionArrayDecoder::new(column, fields.clone(), stripe)?)
500        }
501        (data_type, field_type) => {
502            return MismatchedSchemaSnafu {
503                orc_type: data_type.clone(),
504                arrow_type: field_type.clone(),
505            }
506            .fail()
507        }
508    };
509
510    Ok(decoder)
511}
512
513impl NaiveStripeDecoder {
514    fn inner_decode_next_batch(&mut self, remaining: usize) -> Result<Vec<ArrayRef>> {
515        let chunk = self.batch_size.min(remaining);
516
517        let mut fields = Vec::with_capacity(self.stripe.columns().len());
518
519        for decoder in &mut self.decoders {
520            let array = decoder.next_batch(chunk, None)?;
521            if array.is_empty() {
522                break;
523            } else {
524                fields.push(array);
525            }
526        }
527
528        Ok(fields)
529    }
530
531    fn decode_next_batch(&mut self, remaining: usize) -> Result<Option<RecordBatch>> {
532        let fields = self.inner_decode_next_batch(remaining)?;
533
534        if fields.is_empty() {
535            if remaining == 0 {
536                Ok(None)
537            } else {
538                // In case of empty projection, we need to create a RecordBatch with `row_count` only
539                // to reflect the row number
540                Ok(Some(
541                    RecordBatch::try_new_with_options(
542                        Arc::clone(&self.schema_ref),
543                        fields,
544                        &RecordBatchOptions::new()
545                            .with_row_count(Some(self.batch_size.min(remaining))),
546                    )
547                    .context(error::ConvertRecordBatchSnafu)?,
548                ))
549            }
550        } else {
551            //TODO(weny): any better way?
552            let fields = self
553                .schema_ref
554                .fields
555                .into_iter()
556                .map(|field| field.name())
557                .zip(fields)
558                .collect::<Vec<_>>();
559
560            Ok(Some(
561                RecordBatch::try_from_iter(fields).context(error::ConvertRecordBatchSnafu)?,
562            ))
563        }
564    }
565
566    pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
567        Self::new_with_selection(stripe, schema_ref, batch_size, None)
568    }
569
570    pub fn new_with_selection(
571        stripe: Stripe,
572        schema_ref: SchemaRef,
573        batch_size: usize,
574        row_selection: Option<RowSelection>,
575    ) -> Result<Self> {
576        let number_of_rows = stripe.number_of_rows();
577        let decoders = stripe
578            .columns()
579            .iter()
580            .zip(schema_ref.fields.iter())
581            .map(|(col, field)| array_decoder_factory(col, field.data_type(), &stripe))
582            .collect::<Result<Vec<_>>>()?;
583
584        Ok(Self {
585            stripe,
586            schema_ref,
587            decoders,
588            index: 0,
589            batch_size,
590            number_of_rows,
591            row_selection,
592            selection_index: 0,
593        })
594    }
595
596    /// Skip the specified number of rows by calling skip_values on each decoder
597    fn skip_rows(&mut self, count: usize) -> Result<()> {
598        // Call skip_values on each decoder to efficiently skip rows
599        // Top-level decoders don't have parent_present
600        for decoder in &mut self.decoders {
601            decoder.skip_values(count, None)?;
602        }
603        Ok(())
604    }
605}