orc_rust/
arrow_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::ops::Range;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use arrow::error::ArrowError;
24use arrow::record_batch::{RecordBatch, RecordBatchReader};
25
26use crate::array_decoder::NaiveStripeDecoder;
27use crate::error::Result;
28use crate::predicate::Predicate;
29use crate::projection::ProjectionMask;
30use crate::reader::metadata::{read_metadata, FileMetadata};
31use crate::reader::ChunkReader;
32use crate::row_group_filter::evaluate_predicate;
33use crate::row_selection::RowSelection;
34use crate::schema::{ArrowSchemaOptions, RootDataType, TimestampPrecision};
35use crate::stripe::{Stripe, StripeMetadata};
36
37const DEFAULT_BATCH_SIZE: usize = 8192;
38
39pub struct ArrowReaderBuilder<R> {
40    pub(crate) reader: R,
41    pub(crate) file_metadata: Arc<FileMetadata>,
42    pub(crate) batch_size: usize,
43    pub(crate) projection: ProjectionMask,
44    pub(crate) schema_ref: Option<SchemaRef>,
45    pub(crate) file_byte_range: Option<Range<usize>>,
46    pub(crate) row_selection: Option<RowSelection>,
47    pub(crate) timestamp_precision: TimestampPrecision,
48    pub(crate) predicate: Option<Predicate>,
49}
50
51impl<R> ArrowReaderBuilder<R> {
52    pub(crate) fn new(reader: R, file_metadata: Arc<FileMetadata>) -> Self {
53        Self {
54            reader,
55            file_metadata,
56            batch_size: DEFAULT_BATCH_SIZE,
57            projection: ProjectionMask::all(),
58            schema_ref: None,
59            file_byte_range: None,
60            row_selection: None,
61            timestamp_precision: TimestampPrecision::default(),
62            predicate: None,
63        }
64    }
65
66    pub fn file_metadata(&self) -> &FileMetadata {
67        &self.file_metadata
68    }
69
70    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
71        self.batch_size = batch_size;
72        self
73    }
74
75    pub fn with_projection(mut self, projection: ProjectionMask) -> Self {
76        self.projection = projection;
77        self
78    }
79
80    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
81        self.schema_ref = Some(schema);
82        self
83    }
84
85    /// Specifies a range of file bytes that will read the strips offset within this range
86    pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
87        self.file_byte_range = Some(range);
88        self
89    }
90
91    /// Set a [`RowSelection`] to filter rows
92    ///
93    /// The [`RowSelection`] specifies which rows should be decoded from the ORC file.
94    /// This can be used to skip rows that don't match predicates, reducing I/O and
95    /// improving query performance.
96    ///
97    /// # Example
98    ///
99    /// ```no_run
100    /// # use std::fs::File;
101    /// # use orc_rust::arrow_reader::ArrowReaderBuilder;
102    /// # use orc_rust::row_selection::{RowSelection, RowSelector};
103    /// let file = File::open("data.orc").unwrap();
104    /// let selection = vec![
105    ///     RowSelector::skip(100),
106    ///     RowSelector::select(50),
107    /// ].into();
108    /// let reader = ArrowReaderBuilder::try_new(file)
109    ///     .unwrap()
110    ///     .with_row_selection(selection)
111    ///     .build();
112    /// ```
113    pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self {
114        self.row_selection = Some(row_selection);
115        self
116    }
117
118    /// Sets the timestamp precision for reading timestamp columns.
119    ///
120    /// By default, timestamps are read as Nanosecond precision.
121    /// Use this method to switch to Microsecond precision if needed for compatibility.
122    ///
123    /// # Examples
124    ///
125    /// ```no_run
126    /// # use std::fs::File;
127    /// # use orc_rust::arrow_reader::ArrowReaderBuilder;
128    /// # use orc_rust::schema::TimestampPrecision;
129    /// let file = File::open("/path/to/file.orc").unwrap();
130    /// let reader = ArrowReaderBuilder::try_new(file)
131    ///     .unwrap()
132    ///     .with_timestamp_precision(TimestampPrecision::Microsecond)
133    ///     .build();
134    /// ```
135    pub fn with_timestamp_precision(mut self, precision: TimestampPrecision) -> Self {
136        self.timestamp_precision = precision;
137        self
138    }
139
140    /// Set a predicate for row group filtering
141    ///
142    /// The predicate will be evaluated against row group statistics to automatically
143    /// generate a [`RowSelection`] that skips filtered row groups. This provides
144    /// efficient predicate pushdown based on ORC row indexes.
145    ///
146    /// The predicate is evaluated lazily when each stripe is read, using the row group
147    /// statistics from the stripe's index section.
148    ///
149    /// If both `with_predicate()` and `with_row_selection()` are called, the results
150    /// are combined using logical AND (both conditions must be satisfied).
151    ///
152    /// # Example
153    ///
154    /// ```no_run
155    /// # use std::fs::File;
156    /// # use orc_rust::{ArrowReaderBuilder, Predicate, PredicateValue};
157    /// let file = File::open("data.orc").unwrap();
158    ///
159    /// // Filter: age >= 18
160    /// let predicate = Predicate::gte("age", PredicateValue::Int32(Some(18)));
161    ///
162    /// let reader = ArrowReaderBuilder::try_new(file)
163    ///     .unwrap()
164    ///     .with_predicate(predicate)
165    ///     .build();
166    /// ```
167    ///
168    /// # Notes
169    ///
170    /// - Predicate evaluation requires row indexes to be present in the ORC file
171    /// - If row indexes are missing, the predicate is ignored (all row groups are kept)
172    /// - Only primitive columns have row indexes; predicates on compound types may be limited
173    pub fn with_predicate(mut self, predicate: Predicate) -> Self {
174        self.predicate = Some(predicate);
175        self
176    }
177
178    /// Returns the currently computed schema
179    ///
180    /// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
181    /// based on the current projection and the underlying file format.
182    pub fn schema(&self) -> SchemaRef {
183        let projected_data_type = self
184            .file_metadata
185            .root_data_type()
186            .project(&self.projection);
187        let metadata = self
188            .file_metadata
189            .user_custom_metadata()
190            .iter()
191            .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
192            .collect::<HashMap<_, _>>();
193        self.schema_ref.clone().unwrap_or_else(|| {
194            let options =
195                ArrowSchemaOptions::new().with_timestamp_precision(self.timestamp_precision);
196            Arc::new(projected_data_type.create_arrow_schema_with_options(&metadata, options))
197        })
198    }
199}
200
201impl<R: ChunkReader> ArrowReaderBuilder<R> {
202    pub fn try_new(mut reader: R) -> Result<Self> {
203        let file_metadata = Arc::new(read_metadata(&mut reader)?);
204        Ok(Self::new(reader, file_metadata))
205    }
206
207    pub fn build(self) -> ArrowReader<R> {
208        let schema_ref = self.schema();
209        let projected_data_type = self
210            .file_metadata
211            .root_data_type()
212            .project(&self.projection);
213        let projected_data_type_clone = projected_data_type.clone();
214        let cursor = Cursor {
215            reader: self.reader,
216            file_metadata: self.file_metadata,
217            projected_data_type,
218            stripe_index: 0,
219            file_byte_range: self.file_byte_range,
220        };
221        ArrowReader {
222            cursor,
223            schema_ref,
224            current_stripe: None,
225            batch_size: self.batch_size,
226            row_selection: self.row_selection,
227            predicate: self.predicate,
228            projected_data_type: projected_data_type_clone,
229        }
230    }
231}
232
233pub struct ArrowReader<R> {
234    cursor: Cursor<R>,
235    schema_ref: SchemaRef,
236    current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
237    batch_size: usize,
238    row_selection: Option<RowSelection>,
239    predicate: Option<Predicate>,
240    projected_data_type: RootDataType,
241}
242
243impl<R> ArrowReader<R> {
244    pub fn total_row_count(&self) -> u64 {
245        self.cursor.file_metadata.number_of_rows()
246    }
247}
248
249impl<R: ChunkReader> ArrowReader<R> {
250    fn try_advance_stripe(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
251        let stripe = self.cursor.next().transpose()?;
252        match stripe {
253            Some(stripe) => {
254                let stripe_rows = stripe.number_of_rows();
255
256                // Evaluate predicate if present
257                let mut stripe_selection: Option<RowSelection> = None;
258                if let Some(ref predicate) = self.predicate {
259                    // Try to read row indexes for this stripe
260                    match stripe.read_row_indexes(&self.cursor.file_metadata) {
261                        Ok(row_index) => {
262                            // Evaluate predicate against row group statistics
263                            match evaluate_predicate(
264                                predicate,
265                                &row_index,
266                                &self.projected_data_type,
267                            ) {
268                                Ok(row_group_filter) => {
269                                    // Generate RowSelection from filter results
270                                    let rows_per_group = self
271                                        .cursor
272                                        .file_metadata
273                                        .row_index_stride()
274                                        .unwrap_or(10_000);
275                                    stripe_selection = Some(RowSelection::from_row_group_filter(
276                                        &row_group_filter,
277                                        rows_per_group,
278                                        stripe_rows,
279                                    ));
280                                }
281                                Err(_) => {
282                                    // Predicate evaluation failed (e.g., column not found)
283                                    // Keep all rows (maybe)
284                                    stripe_selection = Some(RowSelection::select_all(stripe_rows));
285                                }
286                            }
287                        }
288                        Err(_) => {
289                            // Row indexes not available, keep all rows (maybe)
290                            stripe_selection = Some(RowSelection::select_all(stripe_rows));
291                        }
292                    }
293                }
294
295                // Combine with existing row_selection if present
296                let mut final_selection = stripe_selection;
297                if let Some(ref mut existing_selection) = self.row_selection {
298                    if existing_selection.row_count() > 0 {
299                        let existing_for_stripe = existing_selection.split_off(stripe_rows);
300                        final_selection = match final_selection {
301                            Some(predicate_selection) => {
302                                // Both predicate and manual selection: combine with AND
303                                Some(existing_for_stripe.and_then(&predicate_selection))
304                            }
305                            None => Some(existing_for_stripe),
306                        };
307                    }
308                }
309
310                let decoder = NaiveStripeDecoder::new_with_selection(
311                    stripe,
312                    self.schema_ref.clone(),
313                    self.batch_size,
314                    final_selection,
315                )?;
316                self.current_stripe = Some(Box::new(decoder));
317                self.next().transpose()
318            }
319            None => Ok(None),
320        }
321    }
322}
323
324impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
325    fn schema(&self) -> SchemaRef {
326        self.schema_ref.clone()
327    }
328}
329
330impl<R: ChunkReader> Iterator for ArrowReader<R> {
331    type Item = Result<RecordBatch, ArrowError>;
332
333    fn next(&mut self) -> Option<Self::Item> {
334        match self.current_stripe.as_mut() {
335            Some(stripe) => {
336                match stripe
337                    .next()
338                    .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err))))
339                {
340                    Some(rb) => Some(rb),
341                    None => self.try_advance_stripe().transpose(),
342                }
343            }
344            None => self.try_advance_stripe().transpose(),
345        }
346    }
347}
348
349pub(crate) struct Cursor<R> {
350    pub reader: R,
351    pub file_metadata: Arc<FileMetadata>,
352    pub projected_data_type: RootDataType,
353    pub stripe_index: usize,
354    pub file_byte_range: Option<Range<usize>>,
355}
356
357impl<R: ChunkReader> Cursor<R> {
358    fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
359        if let Some(range) = self.file_byte_range.clone() {
360            self.file_metadata
361                .stripe_metadatas()
362                .iter()
363                .filter(|info| {
364                    let offset = info.offset() as usize;
365                    range.contains(&offset)
366                })
367                .map(|info| info.to_owned())
368                .collect::<Vec<_>>()
369        } else {
370            self.file_metadata.stripe_metadatas().to_vec()
371        }
372    }
373}
374
375impl<R: ChunkReader> Iterator for Cursor<R> {
376    type Item = Result<Stripe>;
377
378    fn next(&mut self) -> Option<Self::Item> {
379        self.get_stripe_metadatas()
380            .get(self.stripe_index)
381            .map(|info| {
382                let stripe = Stripe::new(
383                    &mut self.reader,
384                    &self.file_metadata,
385                    &self.projected_data_type.clone(),
386                    info,
387                );
388                self.stripe_index += 1;
389                stripe
390            })
391    }
392}