orc_rust/
arrow_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::ops::Range;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use arrow::error::ArrowError;
24use arrow::record_batch::{RecordBatch, RecordBatchReader};
25
26use crate::array_decoder::NaiveStripeDecoder;
27use crate::error::Result;
28use crate::projection::ProjectionMask;
29use crate::reader::metadata::{read_metadata, FileMetadata};
30use crate::reader::ChunkReader;
31use crate::schema::RootDataType;
32use crate::stripe::{Stripe, StripeMetadata};
33
34const DEFAULT_BATCH_SIZE: usize = 8192;
35
36pub struct ArrowReaderBuilder<R> {
37    pub(crate) reader: R,
38    pub(crate) file_metadata: Arc<FileMetadata>,
39    pub(crate) batch_size: usize,
40    pub(crate) projection: ProjectionMask,
41    pub(crate) schema_ref: Option<SchemaRef>,
42    pub(crate) file_byte_range: Option<Range<usize>>,
43}
44
45impl<R> ArrowReaderBuilder<R> {
46    pub(crate) fn new(reader: R, file_metadata: Arc<FileMetadata>) -> Self {
47        Self {
48            reader,
49            file_metadata,
50            batch_size: DEFAULT_BATCH_SIZE,
51            projection: ProjectionMask::all(),
52            schema_ref: None,
53            file_byte_range: None,
54        }
55    }
56
57    pub fn file_metadata(&self) -> &FileMetadata {
58        &self.file_metadata
59    }
60
61    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
62        self.batch_size = batch_size;
63        self
64    }
65
66    pub fn with_projection(mut self, projection: ProjectionMask) -> Self {
67        self.projection = projection;
68        self
69    }
70
71    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
72        self.schema_ref = Some(schema);
73        self
74    }
75
76    /// Specifies a range of file bytes that will read the strips offset within this range
77    pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
78        self.file_byte_range = Some(range);
79        self
80    }
81
82    /// Returns the currently computed schema
83    ///
84    /// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
85    /// based on the current projection and the underlying file format.
86    pub fn schema(&self) -> SchemaRef {
87        let projected_data_type = self
88            .file_metadata
89            .root_data_type()
90            .project(&self.projection);
91        let metadata = self
92            .file_metadata
93            .user_custom_metadata()
94            .iter()
95            .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
96            .collect::<HashMap<_, _>>();
97        self.schema_ref
98            .clone()
99            .unwrap_or_else(|| Arc::new(projected_data_type.create_arrow_schema(&metadata)))
100    }
101}
102
103impl<R: ChunkReader> ArrowReaderBuilder<R> {
104    pub fn try_new(mut reader: R) -> Result<Self> {
105        let file_metadata = Arc::new(read_metadata(&mut reader)?);
106        Ok(Self::new(reader, file_metadata))
107    }
108
109    pub fn build(self) -> ArrowReader<R> {
110        let schema_ref = self.schema();
111        let projected_data_type = self
112            .file_metadata
113            .root_data_type()
114            .project(&self.projection);
115        let cursor = Cursor {
116            reader: self.reader,
117            file_metadata: self.file_metadata,
118            projected_data_type,
119            stripe_index: 0,
120            file_byte_range: self.file_byte_range,
121        };
122        ArrowReader {
123            cursor,
124            schema_ref,
125            current_stripe: None,
126            batch_size: self.batch_size,
127        }
128    }
129}
130
131pub struct ArrowReader<R> {
132    cursor: Cursor<R>,
133    schema_ref: SchemaRef,
134    current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
135    batch_size: usize,
136}
137
138impl<R> ArrowReader<R> {
139    pub fn total_row_count(&self) -> u64 {
140        self.cursor.file_metadata.number_of_rows()
141    }
142}
143
144impl<R: ChunkReader> ArrowReader<R> {
145    fn try_advance_stripe(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
146        let stripe = self.cursor.next().transpose()?;
147        match stripe {
148            Some(stripe) => {
149                let decoder =
150                    NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)?;
151                self.current_stripe = Some(Box::new(decoder));
152                self.next().transpose()
153            }
154            None => Ok(None),
155        }
156    }
157}
158
159impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
160    fn schema(&self) -> SchemaRef {
161        self.schema_ref.clone()
162    }
163}
164
165impl<R: ChunkReader> Iterator for ArrowReader<R> {
166    type Item = Result<RecordBatch, ArrowError>;
167
168    fn next(&mut self) -> Option<Self::Item> {
169        match self.current_stripe.as_mut() {
170            Some(stripe) => {
171                match stripe
172                    .next()
173                    .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err))))
174                {
175                    Some(rb) => Some(rb),
176                    None => self.try_advance_stripe().transpose(),
177                }
178            }
179            None => self.try_advance_stripe().transpose(),
180        }
181    }
182}
183
184pub(crate) struct Cursor<R> {
185    pub reader: R,
186    pub file_metadata: Arc<FileMetadata>,
187    pub projected_data_type: RootDataType,
188    pub stripe_index: usize,
189    pub file_byte_range: Option<Range<usize>>,
190}
191
192impl<R: ChunkReader> Cursor<R> {
193    fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
194        if let Some(range) = self.file_byte_range.clone() {
195            self.file_metadata
196                .stripe_metadatas()
197                .iter()
198                .filter(|info| {
199                    let offset = info.offset() as usize;
200                    range.contains(&offset)
201                })
202                .map(|info| info.to_owned())
203                .collect::<Vec<_>>()
204        } else {
205            self.file_metadata.stripe_metadatas().to_vec()
206        }
207    }
208}
209
210impl<R: ChunkReader> Iterator for Cursor<R> {
211    type Item = Result<Stripe>;
212
213    fn next(&mut self) -> Option<Self::Item> {
214        self.get_stripe_metadatas()
215            .get(self.stripe_index)
216            .map(|info| {
217                let stripe = Stripe::new(
218                    &mut self.reader,
219                    &self.file_metadata,
220                    &self.projected_data_type.clone(),
221                    info,
222                );
223                self.stripe_index += 1;
224                stripe
225            })
226    }
227}