1use std::collections::HashMap;
19use std::ops::Range;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use arrow::error::ArrowError;
24use arrow::record_batch::{RecordBatch, RecordBatchReader};
25
26use crate::array_decoder::NaiveStripeDecoder;
27use crate::error::Result;
28use crate::predicate::Predicate;
29use crate::projection::ProjectionMask;
30use crate::reader::metadata::{read_metadata, FileMetadata};
31use crate::reader::ChunkReader;
32use crate::row_group_filter::evaluate_predicate;
33use crate::row_selection::RowSelection;
34use crate::schema::{ArrowSchemaOptions, RootDataType, TimestampPrecision};
35use crate::stripe::{Stripe, StripeMetadata};
36
37const DEFAULT_BATCH_SIZE: usize = 8192;
38
39pub struct ArrowReaderBuilder<R> {
40 pub(crate) reader: R,
41 pub(crate) file_metadata: Arc<FileMetadata>,
42 pub(crate) batch_size: usize,
43 pub(crate) projection: ProjectionMask,
44 pub(crate) schema_ref: Option<SchemaRef>,
45 pub(crate) file_byte_range: Option<Range<usize>>,
46 pub(crate) row_selection: Option<RowSelection>,
47 pub(crate) timestamp_precision: TimestampPrecision,
48 pub(crate) predicate: Option<Predicate>,
49}
50
51impl<R> ArrowReaderBuilder<R> {
52 pub(crate) fn new(reader: R, file_metadata: Arc<FileMetadata>) -> Self {
53 Self {
54 reader,
55 file_metadata,
56 batch_size: DEFAULT_BATCH_SIZE,
57 projection: ProjectionMask::all(),
58 schema_ref: None,
59 file_byte_range: None,
60 row_selection: None,
61 timestamp_precision: TimestampPrecision::default(),
62 predicate: None,
63 }
64 }
65
66 pub fn file_metadata(&self) -> &FileMetadata {
67 &self.file_metadata
68 }
69
70 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
71 self.batch_size = batch_size;
72 self
73 }
74
75 pub fn with_projection(mut self, projection: ProjectionMask) -> Self {
76 self.projection = projection;
77 self
78 }
79
80 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
81 self.schema_ref = Some(schema);
82 self
83 }
84
85 pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
87 self.file_byte_range = Some(range);
88 self
89 }
90
91 pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self {
114 self.row_selection = Some(row_selection);
115 self
116 }
117
118 pub fn with_timestamp_precision(mut self, precision: TimestampPrecision) -> Self {
136 self.timestamp_precision = precision;
137 self
138 }
139
140 pub fn with_predicate(mut self, predicate: Predicate) -> Self {
174 self.predicate = Some(predicate);
175 self
176 }
177
178 pub fn schema(&self) -> SchemaRef {
183 let projected_data_type = self
184 .file_metadata
185 .root_data_type()
186 .project(&self.projection);
187 let metadata = self
188 .file_metadata
189 .user_custom_metadata()
190 .iter()
191 .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
192 .collect::<HashMap<_, _>>();
193 self.schema_ref.clone().unwrap_or_else(|| {
194 let options =
195 ArrowSchemaOptions::new().with_timestamp_precision(self.timestamp_precision);
196 Arc::new(projected_data_type.create_arrow_schema_with_options(&metadata, options))
197 })
198 }
199}
200
201impl<R: ChunkReader> ArrowReaderBuilder<R> {
202 pub fn try_new(mut reader: R) -> Result<Self> {
203 let file_metadata = Arc::new(read_metadata(&mut reader)?);
204 Ok(Self::new(reader, file_metadata))
205 }
206
207 pub fn build(self) -> ArrowReader<R> {
208 let schema_ref = self.schema();
209 let projected_data_type = self
210 .file_metadata
211 .root_data_type()
212 .project(&self.projection);
213 let projected_data_type_clone = projected_data_type.clone();
214 let cursor = Cursor {
215 reader: self.reader,
216 file_metadata: self.file_metadata,
217 projected_data_type,
218 stripe_index: 0,
219 file_byte_range: self.file_byte_range,
220 };
221 ArrowReader {
222 cursor,
223 schema_ref,
224 current_stripe: None,
225 batch_size: self.batch_size,
226 row_selection: self.row_selection,
227 predicate: self.predicate,
228 projected_data_type: projected_data_type_clone,
229 }
230 }
231}
232
233pub struct ArrowReader<R> {
234 cursor: Cursor<R>,
235 schema_ref: SchemaRef,
236 current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
237 batch_size: usize,
238 row_selection: Option<RowSelection>,
239 predicate: Option<Predicate>,
240 projected_data_type: RootDataType,
241}
242
243impl<R> ArrowReader<R> {
244 pub fn total_row_count(&self) -> u64 {
245 self.cursor.file_metadata.number_of_rows()
246 }
247}
248
249impl<R: ChunkReader> ArrowReader<R> {
250 fn try_advance_stripe(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
251 let stripe = self.cursor.next().transpose()?;
252 match stripe {
253 Some(stripe) => {
254 let stripe_rows = stripe.number_of_rows();
255
256 let mut stripe_selection: Option<RowSelection> = None;
258 if let Some(ref predicate) = self.predicate {
259 match stripe.read_row_indexes(&self.cursor.file_metadata) {
261 Ok(row_index) => {
262 match evaluate_predicate(
264 predicate,
265 &row_index,
266 &self.projected_data_type,
267 ) {
268 Ok(row_group_filter) => {
269 let rows_per_group = self
271 .cursor
272 .file_metadata
273 .row_index_stride()
274 .unwrap_or(10_000);
275 stripe_selection = Some(RowSelection::from_row_group_filter(
276 &row_group_filter,
277 rows_per_group,
278 stripe_rows,
279 ));
280 }
281 Err(_) => {
282 stripe_selection = Some(RowSelection::select_all(stripe_rows));
285 }
286 }
287 }
288 Err(_) => {
289 stripe_selection = Some(RowSelection::select_all(stripe_rows));
291 }
292 }
293 }
294
295 let mut final_selection = stripe_selection;
297 if let Some(ref mut existing_selection) = self.row_selection {
298 if existing_selection.row_count() > 0 {
299 let existing_for_stripe = existing_selection.split_off(stripe_rows);
300 final_selection = match final_selection {
301 Some(predicate_selection) => {
302 Some(existing_for_stripe.and_then(&predicate_selection))
304 }
305 None => Some(existing_for_stripe),
306 };
307 }
308 }
309
310 let decoder = NaiveStripeDecoder::new_with_selection(
311 stripe,
312 self.schema_ref.clone(),
313 self.batch_size,
314 final_selection,
315 )?;
316 self.current_stripe = Some(Box::new(decoder));
317 self.next().transpose()
318 }
319 None => Ok(None),
320 }
321 }
322}
323
324impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
325 fn schema(&self) -> SchemaRef {
326 self.schema_ref.clone()
327 }
328}
329
330impl<R: ChunkReader> Iterator for ArrowReader<R> {
331 type Item = Result<RecordBatch, ArrowError>;
332
333 fn next(&mut self) -> Option<Self::Item> {
334 match self.current_stripe.as_mut() {
335 Some(stripe) => {
336 match stripe
337 .next()
338 .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err))))
339 {
340 Some(rb) => Some(rb),
341 None => self.try_advance_stripe().transpose(),
342 }
343 }
344 None => self.try_advance_stripe().transpose(),
345 }
346 }
347}
348
349pub(crate) struct Cursor<R> {
350 pub reader: R,
351 pub file_metadata: Arc<FileMetadata>,
352 pub projected_data_type: RootDataType,
353 pub stripe_index: usize,
354 pub file_byte_range: Option<Range<usize>>,
355}
356
357impl<R: ChunkReader> Cursor<R> {
358 fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
359 if let Some(range) = self.file_byte_range.clone() {
360 self.file_metadata
361 .stripe_metadatas()
362 .iter()
363 .filter(|info| {
364 let offset = info.offset() as usize;
365 range.contains(&offset)
366 })
367 .map(|info| info.to_owned())
368 .collect::<Vec<_>>()
369 } else {
370 self.file_metadata.stripe_metadatas().to_vec()
371 }
372 }
373}
374
375impl<R: ChunkReader> Iterator for Cursor<R> {
376 type Item = Result<Stripe>;
377
378 fn next(&mut self) -> Option<Self::Item> {
379 self.get_stripe_metadatas()
380 .get(self.stripe_index)
381 .map(|info| {
382 let stripe = Stripe::new(
383 &mut self.reader,
384 &self.file_metadata,
385 &self.projected_data_type.clone(),
386 info,
387 );
388 self.stripe_index += 1;
389 stripe
390 })
391 }
392}