orc_rust/
arrow_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::ops::Range;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use arrow::error::ArrowError;
24use arrow::record_batch::{RecordBatch, RecordBatchReader};
25
26use crate::array_decoder::NaiveStripeDecoder;
27use crate::error::Result;
28use crate::projection::ProjectionMask;
29use crate::reader::metadata::{read_metadata, FileMetadata};
30use crate::reader::ChunkReader;
31use crate::row_selection::RowSelection;
32use crate::schema::RootDataType;
33use crate::stripe::{Stripe, StripeMetadata};
34
35const DEFAULT_BATCH_SIZE: usize = 8192;
36
37pub struct ArrowReaderBuilder<R> {
38    pub(crate) reader: R,
39    pub(crate) file_metadata: Arc<FileMetadata>,
40    pub(crate) batch_size: usize,
41    pub(crate) projection: ProjectionMask,
42    pub(crate) schema_ref: Option<SchemaRef>,
43    pub(crate) file_byte_range: Option<Range<usize>>,
44    pub(crate) row_selection: Option<RowSelection>,
45}
46
47impl<R> ArrowReaderBuilder<R> {
48    pub(crate) fn new(reader: R, file_metadata: Arc<FileMetadata>) -> Self {
49        Self {
50            reader,
51            file_metadata,
52            batch_size: DEFAULT_BATCH_SIZE,
53            projection: ProjectionMask::all(),
54            schema_ref: None,
55            file_byte_range: None,
56            row_selection: None,
57        }
58    }
59
60    pub fn file_metadata(&self) -> &FileMetadata {
61        &self.file_metadata
62    }
63
64    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
65        self.batch_size = batch_size;
66        self
67    }
68
69    pub fn with_projection(mut self, projection: ProjectionMask) -> Self {
70        self.projection = projection;
71        self
72    }
73
74    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
75        self.schema_ref = Some(schema);
76        self
77    }
78
79    /// Specifies a range of file bytes that will read the strips offset within this range
80    pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
81        self.file_byte_range = Some(range);
82        self
83    }
84
85    /// Set a [`RowSelection`] to filter rows
86    ///
87    /// The [`RowSelection`] specifies which rows should be decoded from the ORC file.
88    /// This can be used to skip rows that don't match predicates, reducing I/O and
89    /// improving query performance.
90    ///
91    /// # Example
92    ///
93    /// ```no_run
94    /// # use std::fs::File;
95    /// # use orc_rust::arrow_reader::ArrowReaderBuilder;
96    /// # use orc_rust::row_selection::{RowSelection, RowSelector};
97    /// let file = File::open("data.orc").unwrap();
98    /// let selection = vec![
99    ///     RowSelector::skip(100),
100    ///     RowSelector::select(50),
101    /// ].into();
102    /// let reader = ArrowReaderBuilder::try_new(file)
103    ///     .unwrap()
104    ///     .with_row_selection(selection)
105    ///     .build();
106    /// ```
107    pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self {
108        self.row_selection = Some(row_selection);
109        self
110    }
111
112    /// Returns the currently computed schema
113    ///
114    /// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
115    /// based on the current projection and the underlying file format.
116    pub fn schema(&self) -> SchemaRef {
117        let projected_data_type = self
118            .file_metadata
119            .root_data_type()
120            .project(&self.projection);
121        let metadata = self
122            .file_metadata
123            .user_custom_metadata()
124            .iter()
125            .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
126            .collect::<HashMap<_, _>>();
127        self.schema_ref
128            .clone()
129            .unwrap_or_else(|| Arc::new(projected_data_type.create_arrow_schema(&metadata)))
130    }
131}
132
133impl<R: ChunkReader> ArrowReaderBuilder<R> {
134    pub fn try_new(mut reader: R) -> Result<Self> {
135        let file_metadata = Arc::new(read_metadata(&mut reader)?);
136        Ok(Self::new(reader, file_metadata))
137    }
138
139    pub fn build(self) -> ArrowReader<R> {
140        let schema_ref = self.schema();
141        let projected_data_type = self
142            .file_metadata
143            .root_data_type()
144            .project(&self.projection);
145        let cursor = Cursor {
146            reader: self.reader,
147            file_metadata: self.file_metadata,
148            projected_data_type,
149            stripe_index: 0,
150            file_byte_range: self.file_byte_range,
151        };
152        ArrowReader {
153            cursor,
154            schema_ref,
155            current_stripe: None,
156            batch_size: self.batch_size,
157            row_selection: self.row_selection,
158        }
159    }
160}
161
162pub struct ArrowReader<R> {
163    cursor: Cursor<R>,
164    schema_ref: SchemaRef,
165    current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
166    batch_size: usize,
167    row_selection: Option<RowSelection>,
168}
169
170impl<R> ArrowReader<R> {
171    pub fn total_row_count(&self) -> u64 {
172        self.cursor.file_metadata.number_of_rows()
173    }
174}
175
176impl<R: ChunkReader> ArrowReader<R> {
177    fn try_advance_stripe(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
178        let stripe = self.cursor.next().transpose()?;
179        match stripe {
180            Some(stripe) => {
181                // Split off the row selection for this stripe
182                let stripe_rows = stripe.number_of_rows();
183                let selection = self.row_selection.as_mut().and_then(|s| {
184                    if s.row_count() > 0 {
185                        Some(s.split_off(stripe_rows))
186                    } else {
187                        None
188                    }
189                });
190
191                let decoder = NaiveStripeDecoder::new_with_selection(
192                    stripe,
193                    self.schema_ref.clone(),
194                    self.batch_size,
195                    selection,
196                )?;
197                self.current_stripe = Some(Box::new(decoder));
198                self.next().transpose()
199            }
200            None => Ok(None),
201        }
202    }
203}
204
205impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
206    fn schema(&self) -> SchemaRef {
207        self.schema_ref.clone()
208    }
209}
210
211impl<R: ChunkReader> Iterator for ArrowReader<R> {
212    type Item = Result<RecordBatch, ArrowError>;
213
214    fn next(&mut self) -> Option<Self::Item> {
215        match self.current_stripe.as_mut() {
216            Some(stripe) => {
217                match stripe
218                    .next()
219                    .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err))))
220                {
221                    Some(rb) => Some(rb),
222                    None => self.try_advance_stripe().transpose(),
223                }
224            }
225            None => self.try_advance_stripe().transpose(),
226        }
227    }
228}
229
230pub(crate) struct Cursor<R> {
231    pub reader: R,
232    pub file_metadata: Arc<FileMetadata>,
233    pub projected_data_type: RootDataType,
234    pub stripe_index: usize,
235    pub file_byte_range: Option<Range<usize>>,
236}
237
238impl<R: ChunkReader> Cursor<R> {
239    fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
240        if let Some(range) = self.file_byte_range.clone() {
241            self.file_metadata
242                .stripe_metadatas()
243                .iter()
244                .filter(|info| {
245                    let offset = info.offset() as usize;
246                    range.contains(&offset)
247                })
248                .map(|info| info.to_owned())
249                .collect::<Vec<_>>()
250        } else {
251            self.file_metadata.stripe_metadatas().to_vec()
252        }
253    }
254}
255
256impl<R: ChunkReader> Iterator for Cursor<R> {
257    type Item = Result<Stripe>;
258
259    fn next(&mut self) -> Option<Self::Item> {
260        self.get_stripe_metadatas()
261            .get(self.stripe_index)
262            .map(|info| {
263                let stripe = Stripe::new(
264                    &mut self.reader,
265                    &self.file_metadata,
266                    &self.projected_data_type.clone(),
267                    info,
268                );
269                self.stripe_index += 1;
270                stripe
271            })
272    }
273}