1use std::collections::HashMap;
19use std::ops::Range;
20use std::sync::Arc;
21
22use arrow::datatypes::SchemaRef;
23use arrow::error::ArrowError;
24use arrow::record_batch::{RecordBatch, RecordBatchReader};
25
26use crate::array_decoder::NaiveStripeDecoder;
27use crate::error::Result;
28use crate::projection::ProjectionMask;
29use crate::reader::metadata::{read_metadata, FileMetadata};
30use crate::reader::ChunkReader;
31use crate::schema::RootDataType;
32use crate::stripe::{Stripe, StripeMetadata};
33
34const DEFAULT_BATCH_SIZE: usize = 8192;
35
36pub struct ArrowReaderBuilder<R> {
37 pub(crate) reader: R,
38 pub(crate) file_metadata: Arc<FileMetadata>,
39 pub(crate) batch_size: usize,
40 pub(crate) projection: ProjectionMask,
41 pub(crate) schema_ref: Option<SchemaRef>,
42 pub(crate) file_byte_range: Option<Range<usize>>,
43}
44
45impl<R> ArrowReaderBuilder<R> {
46 pub(crate) fn new(reader: R, file_metadata: Arc<FileMetadata>) -> Self {
47 Self {
48 reader,
49 file_metadata,
50 batch_size: DEFAULT_BATCH_SIZE,
51 projection: ProjectionMask::all(),
52 schema_ref: None,
53 file_byte_range: None,
54 }
55 }
56
57 pub fn file_metadata(&self) -> &FileMetadata {
58 &self.file_metadata
59 }
60
61 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
62 self.batch_size = batch_size;
63 self
64 }
65
66 pub fn with_projection(mut self, projection: ProjectionMask) -> Self {
67 self.projection = projection;
68 self
69 }
70
71 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
72 self.schema_ref = Some(schema);
73 self
74 }
75
76 pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
78 self.file_byte_range = Some(range);
79 self
80 }
81
82 pub fn schema(&self) -> SchemaRef {
87 let projected_data_type = self
88 .file_metadata
89 .root_data_type()
90 .project(&self.projection);
91 let metadata = self
92 .file_metadata
93 .user_custom_metadata()
94 .iter()
95 .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
96 .collect::<HashMap<_, _>>();
97 self.schema_ref
98 .clone()
99 .unwrap_or_else(|| Arc::new(projected_data_type.create_arrow_schema(&metadata)))
100 }
101}
102
103impl<R: ChunkReader> ArrowReaderBuilder<R> {
104 pub fn try_new(mut reader: R) -> Result<Self> {
105 let file_metadata = Arc::new(read_metadata(&mut reader)?);
106 Ok(Self::new(reader, file_metadata))
107 }
108
109 pub fn build(self) -> ArrowReader<R> {
110 let schema_ref = self.schema();
111 let projected_data_type = self
112 .file_metadata
113 .root_data_type()
114 .project(&self.projection);
115 let cursor = Cursor {
116 reader: self.reader,
117 file_metadata: self.file_metadata,
118 projected_data_type,
119 stripe_index: 0,
120 file_byte_range: self.file_byte_range,
121 };
122 ArrowReader {
123 cursor,
124 schema_ref,
125 current_stripe: None,
126 batch_size: self.batch_size,
127 }
128 }
129}
130
131pub struct ArrowReader<R> {
132 cursor: Cursor<R>,
133 schema_ref: SchemaRef,
134 current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
135 batch_size: usize,
136}
137
138impl<R> ArrowReader<R> {
139 pub fn total_row_count(&self) -> u64 {
140 self.cursor.file_metadata.number_of_rows()
141 }
142}
143
144impl<R: ChunkReader> ArrowReader<R> {
145 fn try_advance_stripe(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
146 let stripe = self.cursor.next().transpose()?;
147 match stripe {
148 Some(stripe) => {
149 let decoder =
150 NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)?;
151 self.current_stripe = Some(Box::new(decoder));
152 self.next().transpose()
153 }
154 None => Ok(None),
155 }
156 }
157}
158
159impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
160 fn schema(&self) -> SchemaRef {
161 self.schema_ref.clone()
162 }
163}
164
165impl<R: ChunkReader> Iterator for ArrowReader<R> {
166 type Item = Result<RecordBatch, ArrowError>;
167
168 fn next(&mut self) -> Option<Self::Item> {
169 match self.current_stripe.as_mut() {
170 Some(stripe) => {
171 match stripe
172 .next()
173 .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err))))
174 {
175 Some(rb) => Some(rb),
176 None => self.try_advance_stripe().transpose(),
177 }
178 }
179 None => self.try_advance_stripe().transpose(),
180 }
181 }
182}
183
184pub(crate) struct Cursor<R> {
185 pub reader: R,
186 pub file_metadata: Arc<FileMetadata>,
187 pub projected_data_type: RootDataType,
188 pub stripe_index: usize,
189 pub file_byte_range: Option<Range<usize>>,
190}
191
192impl<R: ChunkReader> Cursor<R> {
193 fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
194 if let Some(range) = self.file_byte_range.clone() {
195 self.file_metadata
196 .stripe_metadatas()
197 .iter()
198 .filter(|info| {
199 let offset = info.offset() as usize;
200 range.contains(&offset)
201 })
202 .map(|info| info.to_owned())
203 .collect::<Vec<_>>()
204 } else {
205 self.file_metadata.stripe_metadatas().to_vec()
206 }
207 }
208}
209
210impl<R: ChunkReader> Iterator for Cursor<R> {
211 type Item = Result<Stripe>;
212
213 fn next(&mut self) -> Option<Self::Item> {
214 self.get_stripe_metadatas()
215 .get(self.stripe_index)
216 .map(|info| {
217 let stripe = Stripe::new(
218 &mut self.reader,
219 &self.file_metadata,
220 &self.projected_data_type.clone(),
221 info,
222 );
223 self.stripe_index += 1;
224 stripe
225 })
226 }
227}