use std::sync::Arc;
use arrow::{
array::{ArrayRef, StructArray},
buffer::NullBuffer,
datatypes::Fields,
};
use snafu::ResultExt;
use crate::error::Result;
use crate::stripe::Stripe;
use crate::{column::Column, error::ArrowSnafu};
use super::{array_decoder_factory, derive_present_vec, ArrayBatchDecoder, PresentDecoder};
pub struct StructArrayDecoder {
fields: Fields,
decoders: Vec<Box<dyn ArrayBatchDecoder>>,
present: Option<PresentDecoder>,
}
impl StructArrayDecoder {
pub fn new(column: &Column, fields: Fields, stripe: &Stripe) -> Result<Self> {
let present = PresentDecoder::from_stripe(stripe, column);
let decoders = column
.children()
.iter()
.zip(fields.iter())
.map(|(child, field)| array_decoder_factory(child, field.data_type(), stripe))
.collect::<Result<Vec<_>>>()?;
Ok(Self {
decoders,
present,
fields,
})
}
}
impl ArrayBatchDecoder for StructArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let present =
derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
let child_arrays = self
.decoders
.iter_mut()
.map(|child| child.next_batch(batch_size, present.as_ref()))
.collect::<Result<Vec<_>>>()?;
let null_buffer = present;
let array = StructArray::try_new(self.fields.clone(), child_arrays, null_buffer)
.context(ArrowSnafu)?;
let array = Arc::new(array);
Ok(array)
}
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
use super::derive_present_vec;
let present = derive_present_vec(&mut self.present, parent_present, n).transpose()?;
for decoder in &mut self.decoders {
decoder.skip_values(n, present.as_ref())?;
}
Ok(())
}
}