1use std::sync::Arc;
19
20use arrow::array::{ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
21use arrow::buffer::NullBuffer;
22use arrow::datatypes::ArrowNativeTypeOp;
23use arrow::datatypes::ArrowPrimitiveType;
24use arrow::datatypes::{DataType as ArrowDataType, Field};
25use arrow::datatypes::{
26 Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
27};
28use arrow::record_batch::{RecordBatch, RecordBatchOptions};
29use snafu::{ensure, ResultExt};
30
31use crate::column::Column;
32use crate::encoding::boolean::BooleanDecoder;
33use crate::encoding::byte::ByteRleDecoder;
34use crate::encoding::float::FloatDecoder;
35use crate::encoding::integer::get_rle_reader;
36use crate::encoding::PrimitiveValueDecoder;
37use crate::error::{
38 self, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
39};
40use crate::proto::stream::Kind;
41use crate::schema::DataType;
42use crate::stripe::Stripe;
43
44use self::decimal::new_decimal_decoder;
45use self::list::ListArrayDecoder;
46use self::map::MapArrayDecoder;
47use self::string::{new_binary_decoder, new_string_decoder};
48use self::struct_decoder::StructArrayDecoder;
49use self::timestamp::{new_timestamp_decoder, new_timestamp_instant_decoder};
50use self::union::UnionArrayDecoder;
51
52mod decimal;
53mod list;
54mod map;
55mod string;
56mod struct_decoder;
57mod timestamp;
58mod union;
59
60pub trait ArrayBatchDecoder: Send {
61 fn next_batch(
73 &mut self,
74 batch_size: usize,
75 parent_present: Option<&NullBuffer>,
76 ) -> Result<ArrayRef>;
77}
78
79struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
80 iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
81 present: Option<PresentDecoder>,
82}
83
84impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
85 pub fn new(
86 iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
87 present: Option<PresentDecoder>,
88 ) -> Self {
89 Self { iter, present }
90 }
91
92 fn next_primitive_batch(
93 &mut self,
94 batch_size: usize,
95 parent_present: Option<&NullBuffer>,
96 ) -> Result<PrimitiveArray<T>> {
97 let present =
98 derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
99 let mut data = vec![T::Native::ZERO; batch_size];
100 match present {
101 Some(present) => {
102 self.iter.decode_spaced(data.as_mut_slice(), &present)?;
103 let array = PrimitiveArray::<T>::new(data.into(), Some(present));
104 Ok(array)
105 }
106 None => {
107 self.iter.decode(data.as_mut_slice())?;
108 let array = PrimitiveArray::<T>::from_iter_values(data);
109 Ok(array)
110 }
111 }
112 }
113}
114
115impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
116 fn next_batch(
117 &mut self,
118 batch_size: usize,
119 parent_present: Option<&NullBuffer>,
120 ) -> Result<ArrayRef> {
121 let array = self.next_primitive_batch(batch_size, parent_present)?;
122 let array = Arc::new(array) as ArrayRef;
123 Ok(array)
124 }
125}
126
127type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
128type Int32ArrayDecoder = PrimitiveArrayDecoder<Int32Type>;
129type Int16ArrayDecoder = PrimitiveArrayDecoder<Int16Type>;
130type Int8ArrayDecoder = PrimitiveArrayDecoder<Int8Type>;
131type Float32ArrayDecoder = PrimitiveArrayDecoder<Float32Type>;
132type Float64ArrayDecoder = PrimitiveArrayDecoder<Float64Type>;
133type DateArrayDecoder = PrimitiveArrayDecoder<Date32Type>; struct BooleanArrayDecoder {
136 iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
137 present: Option<PresentDecoder>,
138}
139
140impl BooleanArrayDecoder {
141 pub fn new(
142 iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
143 present: Option<PresentDecoder>,
144 ) -> Self {
145 Self { iter, present }
146 }
147}
148
149impl ArrayBatchDecoder for BooleanArrayDecoder {
150 fn next_batch(
151 &mut self,
152 batch_size: usize,
153 parent_present: Option<&NullBuffer>,
154 ) -> Result<ArrayRef> {
155 let present =
156 derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
157 let mut data = vec![false; batch_size];
158 let array = match present {
159 Some(present) => {
160 self.iter.decode_spaced(data.as_mut_slice(), &present)?;
161 BooleanArray::new(data.into(), Some(present))
162 }
163 None => {
164 self.iter.decode(data.as_mut_slice())?;
165 BooleanArray::from(data)
166 }
167 };
168 Ok(Arc::new(array))
169 }
170}
171
172struct PresentDecoder {
173 inner: Box<dyn PrimitiveValueDecoder<bool> + Send>,
176}
177
178impl PresentDecoder {
179 fn from_stripe(stripe: &Stripe, column: &Column) -> Option<Self> {
180 stripe
181 .stream_map()
182 .get_opt(column, Kind::Present)
183 .map(|stream| {
184 let inner = Box::new(BooleanDecoder::new(stream));
185 PresentDecoder { inner }
186 })
187 }
188
189 fn next_buffer(&mut self, size: usize) -> Result<NullBuffer> {
190 let mut data = vec![false; size];
191 self.inner.decode(&mut data)?;
192 Ok(NullBuffer::from(data))
193 }
194}
195
196fn merge_parent_present(
197 parent_present: &NullBuffer,
198 present: Result<NullBuffer>,
199) -> Result<NullBuffer> {
200 let present = present?;
201 let non_null_count = parent_present.len() - parent_present.null_count();
202 debug_assert!(present.len() == non_null_count);
203 let mut builder = BooleanBufferBuilder::new(parent_present.len());
204 builder.append_n(parent_present.len(), false);
205 for (idx, p) in parent_present.valid_indices().zip(present.iter()) {
206 builder.set_bit(idx, p);
207 }
208 Ok(builder.finish().into())
209}
210
211fn derive_present_vec(
212 present: &mut Option<PresentDecoder>,
213 parent_present: Option<&NullBuffer>,
214 batch_size: usize,
215) -> Option<Result<NullBuffer>> {
216 let present = match (present, parent_present) {
217 (Some(present), Some(parent_present)) => {
218 let element_count = parent_present.len() - parent_present.null_count();
219 let present = present.next_buffer(element_count);
220 Some(merge_parent_present(parent_present, present))
221 }
222 (Some(present), None) => Some(present.next_buffer(batch_size)),
223 (None, Some(parent_present)) => Some(Ok(parent_present.clone())),
224 (None, None) => None,
225 };
226
227 match present {
229 Some(Ok(present)) if present.null_count() > 0 => Some(Ok(present)),
230 _ => None,
231 }
232}
233
234pub struct NaiveStripeDecoder {
235 stripe: Stripe,
236 schema_ref: SchemaRef,
237 decoders: Vec<Box<dyn ArrayBatchDecoder>>,
238 index: usize,
239 batch_size: usize,
240 number_of_rows: usize,
241}
242
243impl Iterator for NaiveStripeDecoder {
244 type Item = Result<RecordBatch>;
245
246 fn next(&mut self) -> Option<Self::Item> {
247 if self.index < self.number_of_rows {
248 let record = self
249 .decode_next_batch(self.number_of_rows - self.index)
250 .transpose()?;
251 self.index += self.batch_size;
252 Some(record)
253 } else {
254 None
255 }
256 }
257}
258
259pub fn array_decoder_factory(
260 column: &Column,
261 field: Arc<Field>,
262 stripe: &Stripe,
263) -> Result<Box<dyn ArrayBatchDecoder>> {
264 let decoder: Box<dyn ArrayBatchDecoder> = match (column.data_type(), field.data_type()) {
265 (DataType::Boolean { .. }, ArrowDataType::Boolean) => {
267 let iter = stripe.stream_map().get(column, Kind::Data);
268 let iter = Box::new(BooleanDecoder::new(iter));
269 let present = PresentDecoder::from_stripe(stripe, column);
270 Box::new(BooleanArrayDecoder::new(iter, present))
271 }
272 (DataType::Byte { .. }, ArrowDataType::Int8) => {
273 let iter = stripe.stream_map().get(column, Kind::Data);
274 let iter = Box::new(ByteRleDecoder::new(iter));
275 let present = PresentDecoder::from_stripe(stripe, column);
276 Box::new(Int8ArrayDecoder::new(iter, present))
277 }
278 (DataType::Short { .. }, ArrowDataType::Int16) => {
279 let iter = stripe.stream_map().get(column, Kind::Data);
280 let iter = get_rle_reader(column, iter)?;
281 let present = PresentDecoder::from_stripe(stripe, column);
282 Box::new(Int16ArrayDecoder::new(iter, present))
283 }
284 (DataType::Int { .. }, ArrowDataType::Int32) => {
285 let iter = stripe.stream_map().get(column, Kind::Data);
286 let iter = get_rle_reader(column, iter)?;
287 let present = PresentDecoder::from_stripe(stripe, column);
288 Box::new(Int32ArrayDecoder::new(iter, present))
289 }
290 (DataType::Long { .. }, ArrowDataType::Int64) => {
291 let iter = stripe.stream_map().get(column, Kind::Data);
292 let iter = get_rle_reader(column, iter)?;
293 let present = PresentDecoder::from_stripe(stripe, column);
294 Box::new(Int64ArrayDecoder::new(iter, present))
295 }
296 (DataType::Float { .. }, ArrowDataType::Float32) => {
297 let iter = stripe.stream_map().get(column, Kind::Data);
298 let iter = Box::new(FloatDecoder::new(iter));
299 let present = PresentDecoder::from_stripe(stripe, column);
300 Box::new(Float32ArrayDecoder::new(iter, present))
301 }
302 (DataType::Double { .. }, ArrowDataType::Float64) => {
303 let iter = stripe.stream_map().get(column, Kind::Data);
304 let iter = Box::new(FloatDecoder::new(iter));
305 let present = PresentDecoder::from_stripe(stripe, column);
306 Box::new(Float64ArrayDecoder::new(iter, present))
307 }
308 (DataType::String { .. }, ArrowDataType::Utf8)
309 | (DataType::Varchar { .. }, ArrowDataType::Utf8)
310 | (DataType::Char { .. }, ArrowDataType::Utf8) => new_string_decoder(column, stripe)?,
311 (DataType::Binary { .. }, ArrowDataType::Binary) => new_binary_decoder(column, stripe)?,
312 (
313 DataType::Decimal {
314 precision, scale, ..
315 },
316 ArrowDataType::Decimal128(a_precision, a_scale),
317 ) if *precision as u8 == *a_precision && *scale as i8 == *a_scale => {
318 new_decimal_decoder(column, stripe, *precision, *scale)?
319 }
320 (DataType::Timestamp { .. }, field_type) => {
321 new_timestamp_decoder(column, field_type.clone(), stripe)?
322 }
323 (DataType::TimestampWithLocalTimezone { .. }, field_type) => {
324 new_timestamp_instant_decoder(column, field_type.clone(), stripe)?
325 }
326 (DataType::Date { .. }, ArrowDataType::Date32) => {
327 let iter = stripe.stream_map().get(column, Kind::Data);
329 let iter = get_rle_reader(column, iter)?;
330 let present = PresentDecoder::from_stripe(stripe, column);
331 Box::new(DateArrayDecoder::new(iter, present))
332 }
333 (DataType::Struct { .. }, ArrowDataType::Struct(fields)) => {
334 Box::new(StructArrayDecoder::new(column, fields.clone(), stripe)?)
335 }
336 (DataType::List { .. }, ArrowDataType::List(field)) => {
337 Box::new(ListArrayDecoder::new(column, field.clone(), stripe)?)
339 }
340 (DataType::Map { .. }, ArrowDataType::Map(entries, sorted)) => {
341 ensure!(!sorted, UnsupportedTypeVariantSnafu { msg: "Sorted map" });
342 let ArrowDataType::Struct(entries) = entries.data_type() else {
343 UnexpectedSnafu {
344 msg: "arrow Map with non-Struct entry type".to_owned(),
345 }
346 .fail()?
347 };
348 ensure!(
349 entries.len() == 2,
350 UnexpectedSnafu {
351 msg: format!(
352 "arrow Map with {} columns per entry (expected 2)",
353 entries.len()
354 )
355 }
356 );
357 let keys_field = entries[0].clone();
358 let values_field = entries[1].clone();
359
360 Box::new(MapArrayDecoder::new(
361 column,
362 keys_field,
363 values_field,
364 stripe,
365 )?)
366 }
367 (DataType::Union { .. }, ArrowDataType::Union(fields, _)) => {
368 Box::new(UnionArrayDecoder::new(column, fields.clone(), stripe)?)
369 }
370 (data_type, field_type) => {
371 return MismatchedSchemaSnafu {
372 orc_type: data_type.clone(),
373 arrow_type: field_type.clone(),
374 }
375 .fail()
376 }
377 };
378
379 Ok(decoder)
380}
381
382impl NaiveStripeDecoder {
383 fn inner_decode_next_batch(&mut self, remaining: usize) -> Result<Vec<ArrayRef>> {
384 let chunk = self.batch_size.min(remaining);
385
386 let mut fields = Vec::with_capacity(self.stripe.columns().len());
387
388 for decoder in &mut self.decoders {
389 let array = decoder.next_batch(chunk, None)?;
390 if array.is_empty() {
391 break;
392 } else {
393 fields.push(array);
394 }
395 }
396
397 Ok(fields)
398 }
399
400 fn decode_next_batch(&mut self, remaining: usize) -> Result<Option<RecordBatch>> {
401 let fields = self.inner_decode_next_batch(remaining)?;
402
403 if fields.is_empty() {
404 if remaining == 0 {
405 Ok(None)
406 } else {
407 Ok(Some(
410 RecordBatch::try_new_with_options(
411 Arc::clone(&self.schema_ref),
412 fields,
413 &RecordBatchOptions::new()
414 .with_row_count(Some(self.batch_size.min(remaining))),
415 )
416 .context(error::ConvertRecordBatchSnafu)?,
417 ))
418 }
419 } else {
420 let fields = self
422 .schema_ref
423 .fields
424 .into_iter()
425 .map(|field| field.name())
426 .zip(fields)
427 .collect::<Vec<_>>();
428
429 Ok(Some(
430 RecordBatch::try_from_iter(fields).context(error::ConvertRecordBatchSnafu)?,
431 ))
432 }
433 }
434
435 pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
436 let mut decoders = Vec::with_capacity(stripe.columns().len());
437 let number_of_rows = stripe.number_of_rows();
438
439 for (col, field) in stripe
440 .columns()
441 .iter()
442 .zip(schema_ref.fields.iter().cloned())
443 {
444 let decoder = array_decoder_factory(col, field, &stripe)?;
445 decoders.push(decoder);
446 }
447
448 Ok(Self {
449 stripe,
450 schema_ref,
451 decoders,
452 index: 0,
453 batch_size,
454 number_of_rows,
455 })
456 }
457}