arrow2 0.10.1

Unofficial implementation of Apache Arrow spec in safe Rust
Documentation
use std::collections::VecDeque;

use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition};

use crate::{
    array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result,
    io::parquet::read::DataPages,
};

use super::super::nested_utils::*;
use super::super::utils::MaybeNext;
use super::utils::Binary;
use super::{
    super::utils,
    basic::{finish, Required, TraitBinaryArray},
};

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum State<'a> {
    Optional(Optional<'a>, utils::BinaryIter<'a>),
    Required(Required<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
    fn len(&self) -> usize {
        match self {
            State::Optional(validity, _) => validity.len(),
            State::Required(state) => state.remaining,
        }
    }
}

#[derive(Debug, Default)]
struct BinaryDecoder<O: Offset> {
    phantom_o: std::marker::PhantomData<O>,
}

impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
    type State = State<'a>;
    type DecodedState = (Binary<O>, MutableBitmap);

    fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
        let is_optional =
            page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;

        match (page.encoding(), page.dictionary_page(), is_optional) {
            (Encoding::Plain, None, true) => {
                let (_, _, values) = utils::split_buffer(page);

                let values = utils::BinaryIter::new(values);

                Ok(State::Optional(Optional::new(page), values))
            }
            (Encoding::Plain, None, false) => Ok(State::Required(Required::new(page))),
            _ => Err(utils::not_implemented(
                &page.encoding(),
                is_optional,
                false,
                "any",
                "Binary",
            )),
        }
    }

    fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
        (
            Binary::<O>::with_capacity(capacity),
            MutableBitmap::with_capacity(capacity),
        )
    }

    fn extend_from_state(
        &self,
        state: &mut Self::State,
        decoded: &mut Self::DecodedState,
        additional: usize,
    ) {
        let (values, validity) = decoded;
        match state {
            State::Optional(page_validity, page_values) => {
                let max_def = page_validity.max_def();
                read_optional_values(
                    page_validity.definition_levels.by_ref(),
                    max_def,
                    page_values.by_ref(),
                    values,
                    validity,
                    additional,
                )
            }
            State::Required(page) => {
                page.remaining -= additional;
                for x in page.values.by_ref().take(additional) {
                    values.push(x)
                }
            }
        }
    }
}

pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
    iter: I,
    data_type: DataType,
    init: InitNested,
    items: VecDeque<(Binary<O>, MutableBitmap)>,
    nested: VecDeque<NestedState>,
    chunk_size: usize,
    phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
    pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize) -> Self {
        Self {
            iter,
            data_type,
            init,
            items: VecDeque::new(),
            nested: VecDeque::new(),
            chunk_size,
            phantom_a: Default::default(),
        }
    }
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator<O, A, I> {
    type Item = Result<(NestedState, A)>;

    fn next(&mut self) -> Option<Self::Item> {
        let maybe_state = next(
            &mut self.iter,
            &mut self.items,
            &mut self.nested,
            &self.init,
            self.chunk_size,
            &BinaryDecoder::<O>::default(),
        );
        match maybe_state {
            MaybeNext::Some(Ok((nested, decoded))) => {
                Some(finish(&self.data_type, decoded.0, decoded.1).map(|array| (nested, array)))
            }
            MaybeNext::Some(Err(e)) => Some(Err(e)),
            MaybeNext::None => None,
            MaybeNext::More => self.next(),
        }
    }
}