1use std::collections::HashMap;
19use std::ops::Range;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use arrow::error::ArrowError;
24use arrow::record_batch::{RecordBatch, RecordBatchReader};
25
26use crate::array_decoder::NaiveStripeDecoder;
27use crate::error::Result;
28use crate::projection::ProjectionMask;
29use crate::reader::metadata::{read_metadata, FileMetadata};
30use crate::reader::ChunkReader;
31use crate::row_selection::RowSelection;
32use crate::schema::RootDataType;
33use crate::stripe::{Stripe, StripeMetadata};
34
35const DEFAULT_BATCH_SIZE: usize = 8192;
36
37pub struct ArrowReaderBuilder<R> {
38 pub(crate) reader: R,
39 pub(crate) file_metadata: Arc<FileMetadata>,
40 pub(crate) batch_size: usize,
41 pub(crate) projection: ProjectionMask,
42 pub(crate) schema_ref: Option<SchemaRef>,
43 pub(crate) file_byte_range: Option<Range<usize>>,
44 pub(crate) row_selection: Option<RowSelection>,
45}
46
47impl<R> ArrowReaderBuilder<R> {
48 pub(crate) fn new(reader: R, file_metadata: Arc<FileMetadata>) -> Self {
49 Self {
50 reader,
51 file_metadata,
52 batch_size: DEFAULT_BATCH_SIZE,
53 projection: ProjectionMask::all(),
54 schema_ref: None,
55 file_byte_range: None,
56 row_selection: None,
57 }
58 }
59
60 pub fn file_metadata(&self) -> &FileMetadata {
61 &self.file_metadata
62 }
63
64 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
65 self.batch_size = batch_size;
66 self
67 }
68
69 pub fn with_projection(mut self, projection: ProjectionMask) -> Self {
70 self.projection = projection;
71 self
72 }
73
74 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
75 self.schema_ref = Some(schema);
76 self
77 }
78
79 pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
81 self.file_byte_range = Some(range);
82 self
83 }
84
85 pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self {
108 self.row_selection = Some(row_selection);
109 self
110 }
111
112 pub fn schema(&self) -> SchemaRef {
117 let projected_data_type = self
118 .file_metadata
119 .root_data_type()
120 .project(&self.projection);
121 let metadata = self
122 .file_metadata
123 .user_custom_metadata()
124 .iter()
125 .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
126 .collect::<HashMap<_, _>>();
127 self.schema_ref
128 .clone()
129 .unwrap_or_else(|| Arc::new(projected_data_type.create_arrow_schema(&metadata)))
130 }
131}
132
133impl<R: ChunkReader> ArrowReaderBuilder<R> {
134 pub fn try_new(mut reader: R) -> Result<Self> {
135 let file_metadata = Arc::new(read_metadata(&mut reader)?);
136 Ok(Self::new(reader, file_metadata))
137 }
138
139 pub fn build(self) -> ArrowReader<R> {
140 let schema_ref = self.schema();
141 let projected_data_type = self
142 .file_metadata
143 .root_data_type()
144 .project(&self.projection);
145 let cursor = Cursor {
146 reader: self.reader,
147 file_metadata: self.file_metadata,
148 projected_data_type,
149 stripe_index: 0,
150 file_byte_range: self.file_byte_range,
151 };
152 ArrowReader {
153 cursor,
154 schema_ref,
155 current_stripe: None,
156 batch_size: self.batch_size,
157 row_selection: self.row_selection,
158 }
159 }
160}
161
162pub struct ArrowReader<R> {
163 cursor: Cursor<R>,
164 schema_ref: SchemaRef,
165 current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
166 batch_size: usize,
167 row_selection: Option<RowSelection>,
168}
169
170impl<R> ArrowReader<R> {
171 pub fn total_row_count(&self) -> u64 {
172 self.cursor.file_metadata.number_of_rows()
173 }
174}
175
176impl<R: ChunkReader> ArrowReader<R> {
177 fn try_advance_stripe(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
178 let stripe = self.cursor.next().transpose()?;
179 match stripe {
180 Some(stripe) => {
181 let stripe_rows = stripe.number_of_rows();
183 let selection = self.row_selection.as_mut().and_then(|s| {
184 if s.row_count() > 0 {
185 Some(s.split_off(stripe_rows))
186 } else {
187 None
188 }
189 });
190
191 let decoder = NaiveStripeDecoder::new_with_selection(
192 stripe,
193 self.schema_ref.clone(),
194 self.batch_size,
195 selection,
196 )?;
197 self.current_stripe = Some(Box::new(decoder));
198 self.next().transpose()
199 }
200 None => Ok(None),
201 }
202 }
203}
204
205impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
206 fn schema(&self) -> SchemaRef {
207 self.schema_ref.clone()
208 }
209}
210
211impl<R: ChunkReader> Iterator for ArrowReader<R> {
212 type Item = Result<RecordBatch, ArrowError>;
213
214 fn next(&mut self) -> Option<Self::Item> {
215 match self.current_stripe.as_mut() {
216 Some(stripe) => {
217 match stripe
218 .next()
219 .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err))))
220 {
221 Some(rb) => Some(rb),
222 None => self.try_advance_stripe().transpose(),
223 }
224 }
225 None => self.try_advance_stripe().transpose(),
226 }
227 }
228}
229
230pub(crate) struct Cursor<R> {
231 pub reader: R,
232 pub file_metadata: Arc<FileMetadata>,
233 pub projected_data_type: RootDataType,
234 pub stripe_index: usize,
235 pub file_byte_range: Option<Range<usize>>,
236}
237
238impl<R: ChunkReader> Cursor<R> {
239 fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
240 if let Some(range) = self.file_byte_range.clone() {
241 self.file_metadata
242 .stripe_metadatas()
243 .iter()
244 .filter(|info| {
245 let offset = info.offset() as usize;
246 range.contains(&offset)
247 })
248 .map(|info| info.to_owned())
249 .collect::<Vec<_>>()
250 } else {
251 self.file_metadata.stripe_metadatas().to_vec()
252 }
253 }
254}
255
256impl<R: ChunkReader> Iterator for Cursor<R> {
257 type Item = Result<Stripe>;
258
259 fn next(&mut self) -> Option<Self::Item> {
260 self.get_stripe_metadatas()
261 .get(self.stripe_index)
262 .map(|info| {
263 let stripe = Stripe::new(
264 &mut self.reader,
265 &self.file_metadata,
266 &self.projected_data_type.clone(),
267 info,
268 );
269 self.stripe_index += 1;
270 stripe
271 })
272 }
273}