1use std::sync::Arc;
19
20use arrow::array::{ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
21use arrow::buffer::NullBuffer;
22use arrow::datatypes::ArrowNativeTypeOp;
23use arrow::datatypes::ArrowPrimitiveType;
24use arrow::datatypes::DataType as ArrowDataType;
25use arrow::datatypes::{
26 Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
27};
28use arrow::record_batch::{RecordBatch, RecordBatchOptions};
29use snafu::{ensure, ResultExt};
30
31use crate::column::Column;
32use crate::encoding::boolean::BooleanDecoder;
33use crate::encoding::byte::ByteRleDecoder;
34use crate::encoding::float::FloatDecoder;
35use crate::encoding::integer::get_signed_int_decoder;
36use crate::encoding::PrimitiveValueDecoder;
37use crate::error::{
38 self, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
39};
40use crate::proto::stream::Kind;
41use crate::schema::DataType;
42use crate::stripe::Stripe;
43use crate::RowSelection;
44
45use self::decimal::new_decimal_decoder;
46use self::list::ListArrayDecoder;
47use self::map::MapArrayDecoder;
48use self::string::{new_binary_decoder, new_string_decoder};
49use self::struct_decoder::StructArrayDecoder;
50use self::timestamp::{new_timestamp_decoder, new_timestamp_instant_decoder};
51use self::union::UnionArrayDecoder;
52
53mod decimal;
54mod list;
55mod map;
56mod string;
57mod struct_decoder;
58mod timestamp;
59mod union;
60
61pub trait ArrayBatchDecoder: Send {
62 fn next_batch(
74 &mut self,
75 batch_size: usize,
76 parent_present: Option<&NullBuffer>,
77 ) -> Result<ArrayRef>;
78
79 fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()>;
85}
86
87struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
88 iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
89 present: Option<PresentDecoder>,
90}
91
92impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
93 pub fn new(
94 iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
95 present: Option<PresentDecoder>,
96 ) -> Self {
97 Self { iter, present }
98 }
99
100 fn next_primitive_batch(
101 &mut self,
102 batch_size: usize,
103 parent_present: Option<&NullBuffer>,
104 ) -> Result<PrimitiveArray<T>> {
105 let present =
106 derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
107 let mut data = vec![T::Native::ZERO; batch_size];
108 match present {
109 Some(present) => {
110 self.iter.decode_spaced(data.as_mut_slice(), &present)?;
111 let array = PrimitiveArray::<T>::new(data.into(), Some(present));
112 Ok(array)
113 }
114 None => {
115 self.iter.decode(data.as_mut_slice())?;
116 let array = PrimitiveArray::<T>::from_iter_values(data);
117 Ok(array)
118 }
119 }
120 }
121}
122
123impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
124 fn next_batch(
125 &mut self,
126 batch_size: usize,
127 parent_present: Option<&NullBuffer>,
128 ) -> Result<ArrayRef> {
129 let array = self.next_primitive_batch(batch_size, parent_present)?;
130 let array = Arc::new(array) as ArrayRef;
131 Ok(array)
132 }
133
134 fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
135 let non_null_count =
136 skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
137 self.iter.skip(non_null_count)
138 }
139}
140
141type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
142type Int32ArrayDecoder = PrimitiveArrayDecoder<Int32Type>;
143type Int16ArrayDecoder = PrimitiveArrayDecoder<Int16Type>;
144type Int8ArrayDecoder = PrimitiveArrayDecoder<Int8Type>;
145type Float32ArrayDecoder = PrimitiveArrayDecoder<Float32Type>;
146type Float64ArrayDecoder = PrimitiveArrayDecoder<Float64Type>;
147type DateArrayDecoder = PrimitiveArrayDecoder<Date32Type>; struct BooleanArrayDecoder {
150 iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
151 present: Option<PresentDecoder>,
152}
153
154impl BooleanArrayDecoder {
155 pub fn new(
156 iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
157 present: Option<PresentDecoder>,
158 ) -> Self {
159 Self { iter, present }
160 }
161}
162
163impl ArrayBatchDecoder for BooleanArrayDecoder {
164 fn next_batch(
165 &mut self,
166 batch_size: usize,
167 parent_present: Option<&NullBuffer>,
168 ) -> Result<ArrayRef> {
169 let present =
170 derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
171 let mut data = vec![false; batch_size];
172 let array = match present {
173 Some(present) => {
174 self.iter.decode_spaced(data.as_mut_slice(), &present)?;
175 BooleanArray::new(data.into(), Some(present))
176 }
177 None => {
178 self.iter.decode(data.as_mut_slice())?;
179 BooleanArray::from(data)
180 }
181 };
182 Ok(Arc::new(array))
183 }
184
185 fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
186 let non_null_count =
187 skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
188 self.iter.skip(non_null_count)
189 }
190}
191
192struct PresentDecoder {
193 inner: Box<dyn PrimitiveValueDecoder<bool> + Send>,
196}
197
198impl PresentDecoder {
199 fn from_stripe(stripe: &Stripe, column: &Column) -> Option<Self> {
200 stripe
201 .stream_map()
202 .get_opt(column, Kind::Present)
203 .map(|stream| {
204 let inner = Box::new(BooleanDecoder::new(stream));
205 PresentDecoder { inner }
206 })
207 }
208
209 fn next_buffer(&mut self, size: usize) -> Result<NullBuffer> {
210 let mut data = vec![false; size];
211 self.inner.decode(&mut data)?;
212 Ok(NullBuffer::from(data))
213 }
214}
215
216fn merge_parent_present(
217 parent_present: &NullBuffer,
218 present: Result<NullBuffer>,
219) -> Result<NullBuffer> {
220 let present = present?;
221 let non_null_count = parent_present.len() - parent_present.null_count();
222 debug_assert!(present.len() == non_null_count);
223 let mut builder = BooleanBufferBuilder::new(parent_present.len());
224 builder.append_n(parent_present.len(), false);
225 for (idx, p) in parent_present.valid_indices().zip(present.iter()) {
226 builder.set_bit(idx, p);
227 }
228 Ok(builder.finish().into())
229}
230
231fn derive_present_vec(
232 present: &mut Option<PresentDecoder>,
233 parent_present: Option<&NullBuffer>,
234 batch_size: usize,
235) -> Option<Result<NullBuffer>> {
236 let present = match (present, parent_present) {
237 (Some(present), Some(parent_present)) => {
238 let element_count = parent_present.len() - parent_present.null_count();
239 let present = present.next_buffer(element_count);
240 Some(merge_parent_present(parent_present, present))
241 }
242 (Some(present), None) => Some(present.next_buffer(batch_size)),
243 (None, Some(parent_present)) => Some(Ok(parent_present.clone())),
244 (None, None) => None,
245 };
246
247 match present {
249 Some(Ok(present)) if present.null_count() > 0 => Some(Ok(present)),
250 _ => None,
251 }
252}
253
254fn skip_present_and_get_non_null_count(
256 present: &mut Option<PresentDecoder>,
257 parent_present: Option<&NullBuffer>,
258 n: usize,
259) -> Result<usize> {
260 match (present, parent_present) {
261 (Some(present), Some(parent_present)) => {
262 let non_null_in_parent = parent_present.len() - parent_present.null_count();
265
266 let mut our_present = vec![false; non_null_in_parent];
268 present.inner.decode(&mut our_present)?;
269 let our_non_null_count = our_present.iter().filter(|&&v| v).count();
270
271 Ok(our_non_null_count)
272 }
273 (Some(present), None) => {
274 let mut present_values = vec![false; n];
276 present.inner.decode(&mut present_values)?;
277 Ok(present_values.iter().filter(|&&v| v).count())
278 }
279 (None, Some(parent_present)) => {
280 Ok(parent_present.len() - parent_present.null_count())
282 }
283 (None, None) => {
284 Ok(n)
286 }
287 }
288}
289
290pub struct NaiveStripeDecoder {
291 stripe: Stripe,
292 schema_ref: SchemaRef,
293 decoders: Vec<Box<dyn ArrayBatchDecoder>>,
294 index: usize,
295 batch_size: usize,
296 number_of_rows: usize,
297 row_selection: Option<RowSelection>,
298 selection_index: usize,
299}
300
301impl NaiveStripeDecoder {
302 fn next_with_row_selection(&mut self) -> Option<Result<RecordBatch>> {
314 loop {
316 let (is_skip, row_count) = {
317 let selectors = self.row_selection.as_ref().unwrap().selectors();
318 if self.selection_index >= selectors.len() {
319 return None;
320 }
321 let selector = selectors[self.selection_index];
322 (selector.skip, selector.row_count)
323 };
324
325 if is_skip {
326 let remaining = self.number_of_rows - self.index;
327 let actual_skip = row_count.min(remaining);
328
329 if actual_skip == 0 {
330 self.selection_index += 1;
332 continue;
333 }
334
335 if let Err(e) = self.skip_rows(actual_skip) {
337 return Some(Err(e));
338 }
339 self.index += actual_skip;
340
341 if actual_skip >= row_count {
342 self.selection_index += 1;
343 }
344 } else {
345 let rows_to_read = row_count.min(self.batch_size);
346 let remaining = self.number_of_rows - self.index;
347 let actual_rows = rows_to_read.min(remaining);
348
349 if actual_rows == 0 {
350 self.selection_index += 1;
352 continue;
353 }
354
355 let record = self.decode_next_batch(actual_rows).transpose()?;
356 self.index += actual_rows;
357
358 if actual_rows >= row_count {
359 self.selection_index += 1;
360 }
361 return Some(record);
362 }
363 }
364 }
365}
366
367impl Iterator for NaiveStripeDecoder {
368 type Item = Result<RecordBatch>;
369
370 fn next(&mut self) -> Option<Self::Item> {
372 if self.index < self.number_of_rows {
373 if self.row_selection.is_some() {
375 self.next_with_row_selection()
376 } else {
377 let record = self
379 .decode_next_batch(self.number_of_rows - self.index)
380 .transpose()?;
381 self.index += self.batch_size;
382 Some(record)
383 }
384 } else {
385 None
386 }
387 }
388}
389
390pub fn array_decoder_factory(
391 column: &Column,
392 hinted_arrow_type: &ArrowDataType,
393 stripe: &Stripe,
394) -> Result<Box<dyn ArrayBatchDecoder>> {
395 let decoder: Box<dyn ArrayBatchDecoder> = match (column.data_type(), hinted_arrow_type) {
396 (DataType::Boolean { .. }, ArrowDataType::Boolean) => {
398 let iter = stripe.stream_map().get(column, Kind::Data);
399 let iter = Box::new(BooleanDecoder::new(iter));
400 let present = PresentDecoder::from_stripe(stripe, column);
401 Box::new(BooleanArrayDecoder::new(iter, present))
402 }
403 (DataType::Byte { .. }, ArrowDataType::Int8) => {
404 let iter = stripe.stream_map().get(column, Kind::Data);
405 let iter = Box::new(ByteRleDecoder::new(iter));
406 let present = PresentDecoder::from_stripe(stripe, column);
407 Box::new(Int8ArrayDecoder::new(iter, present))
408 }
409 (DataType::Short { .. }, ArrowDataType::Int16) => {
410 let iter = stripe.stream_map().get(column, Kind::Data);
411 let iter = get_signed_int_decoder(iter, column.rle_version());
412 let present = PresentDecoder::from_stripe(stripe, column);
413 Box::new(Int16ArrayDecoder::new(iter, present))
414 }
415 (DataType::Int { .. }, ArrowDataType::Int32) => {
416 let iter = stripe.stream_map().get(column, Kind::Data);
417 let iter = get_signed_int_decoder(iter, column.rle_version());
418 let present = PresentDecoder::from_stripe(stripe, column);
419 Box::new(Int32ArrayDecoder::new(iter, present))
420 }
421 (DataType::Long { .. }, ArrowDataType::Int64) => {
422 let iter = stripe.stream_map().get(column, Kind::Data);
423 let iter = get_signed_int_decoder(iter, column.rle_version());
424 let present = PresentDecoder::from_stripe(stripe, column);
425 Box::new(Int64ArrayDecoder::new(iter, present))
426 }
427 (DataType::Float { .. }, ArrowDataType::Float32) => {
428 let iter = stripe.stream_map().get(column, Kind::Data);
429 let iter = Box::new(FloatDecoder::new(iter));
430 let present = PresentDecoder::from_stripe(stripe, column);
431 Box::new(Float32ArrayDecoder::new(iter, present))
432 }
433 (DataType::Double { .. }, ArrowDataType::Float64) => {
434 let iter = stripe.stream_map().get(column, Kind::Data);
435 let iter = Box::new(FloatDecoder::new(iter));
436 let present = PresentDecoder::from_stripe(stripe, column);
437 Box::new(Float64ArrayDecoder::new(iter, present))
438 }
439 (DataType::String { .. }, ArrowDataType::Utf8)
440 | (DataType::Varchar { .. }, ArrowDataType::Utf8)
441 | (DataType::Char { .. }, ArrowDataType::Utf8) => new_string_decoder(column, stripe)?,
442 (DataType::Binary { .. }, ArrowDataType::Binary) => new_binary_decoder(column, stripe)?,
443 (
444 DataType::Decimal {
445 precision, scale, ..
446 },
447 ArrowDataType::Decimal128(a_precision, a_scale),
448 ) if *precision as u8 == *a_precision && *scale as i8 == *a_scale => {
449 new_decimal_decoder(column, stripe, *precision, *scale)
450 }
451 (DataType::Timestamp { .. }, field_type) => {
452 new_timestamp_decoder(column, field_type.clone(), stripe)?
453 }
454 (DataType::TimestampWithLocalTimezone { .. }, field_type) => {
455 new_timestamp_instant_decoder(column, field_type.clone(), stripe)?
456 }
457 (DataType::Date { .. }, ArrowDataType::Date32) => {
458 let iter = stripe.stream_map().get(column, Kind::Data);
460 let iter = get_signed_int_decoder(iter, column.rle_version());
461 let present = PresentDecoder::from_stripe(stripe, column);
462 Box::new(DateArrayDecoder::new(iter, present))
463 }
464 (DataType::Struct { .. }, ArrowDataType::Struct(fields)) => {
465 Box::new(StructArrayDecoder::new(column, fields.clone(), stripe)?)
466 }
467 (DataType::List { .. }, ArrowDataType::List(field)) => {
468 Box::new(ListArrayDecoder::new(column, field.clone(), stripe)?)
470 }
471 (DataType::Map { .. }, ArrowDataType::Map(entries, sorted)) => {
472 ensure!(!sorted, UnsupportedTypeVariantSnafu { msg: "Sorted map" });
473 let ArrowDataType::Struct(entries) = entries.data_type() else {
474 UnexpectedSnafu {
475 msg: "arrow Map with non-Struct entry type".to_owned(),
476 }
477 .fail()?
478 };
479 ensure!(
480 entries.len() == 2,
481 UnexpectedSnafu {
482 msg: format!(
483 "arrow Map with {} columns per entry (expected 2)",
484 entries.len()
485 )
486 }
487 );
488 let keys_field = entries[0].clone();
489 let values_field = entries[1].clone();
490
491 Box::new(MapArrayDecoder::new(
492 column,
493 keys_field,
494 values_field,
495 stripe,
496 )?)
497 }
498 (DataType::Union { .. }, ArrowDataType::Union(fields, _)) => {
499 Box::new(UnionArrayDecoder::new(column, fields.clone(), stripe)?)
500 }
501 (data_type, field_type) => {
502 return MismatchedSchemaSnafu {
503 orc_type: data_type.clone(),
504 arrow_type: field_type.clone(),
505 }
506 .fail()
507 }
508 };
509
510 Ok(decoder)
511}
512
513impl NaiveStripeDecoder {
514 fn inner_decode_next_batch(&mut self, remaining: usize) -> Result<Vec<ArrayRef>> {
515 let chunk = self.batch_size.min(remaining);
516
517 let mut fields = Vec::with_capacity(self.stripe.columns().len());
518
519 for decoder in &mut self.decoders {
520 let array = decoder.next_batch(chunk, None)?;
521 if array.is_empty() {
522 break;
523 } else {
524 fields.push(array);
525 }
526 }
527
528 Ok(fields)
529 }
530
531 fn decode_next_batch(&mut self, remaining: usize) -> Result<Option<RecordBatch>> {
532 let fields = self.inner_decode_next_batch(remaining)?;
533
534 if fields.is_empty() {
535 if remaining == 0 {
536 Ok(None)
537 } else {
538 Ok(Some(
541 RecordBatch::try_new_with_options(
542 Arc::clone(&self.schema_ref),
543 fields,
544 &RecordBatchOptions::new()
545 .with_row_count(Some(self.batch_size.min(remaining))),
546 )
547 .context(error::ConvertRecordBatchSnafu)?,
548 ))
549 }
550 } else {
551 let fields = self
553 .schema_ref
554 .fields
555 .into_iter()
556 .map(|field| field.name())
557 .zip(fields)
558 .collect::<Vec<_>>();
559
560 Ok(Some(
561 RecordBatch::try_from_iter(fields).context(error::ConvertRecordBatchSnafu)?,
562 ))
563 }
564 }
565
566 pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
567 Self::new_with_selection(stripe, schema_ref, batch_size, None)
568 }
569
570 pub fn new_with_selection(
571 stripe: Stripe,
572 schema_ref: SchemaRef,
573 batch_size: usize,
574 row_selection: Option<RowSelection>,
575 ) -> Result<Self> {
576 let number_of_rows = stripe.number_of_rows();
577 let decoders = stripe
578 .columns()
579 .iter()
580 .zip(schema_ref.fields.iter())
581 .map(|(col, field)| array_decoder_factory(col, field.data_type(), &stripe))
582 .collect::<Result<Vec<_>>>()?;
583
584 Ok(Self {
585 stripe,
586 schema_ref,
587 decoders,
588 index: 0,
589 batch_size,
590 number_of_rows,
591 row_selection,
592 selection_index: 0,
593 })
594 }
595
596 fn skip_rows(&mut self, count: usize) -> Result<()> {
598 for decoder in &mut self.decoders {
601 decoder.skip_values(count, None)?;
602 }
603 Ok(())
604 }
605}