use std::collections::VecDeque;
use parquet2::{
encoding::Encoding, page::DataPage, schema::Repetition, types::decode,
types::NativeType as ParquetNativeType,
};
use crate::{
array::PrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result,
types::NativeType,
};
use super::super::nested_utils::*;
use super::super::utils;
use super::super::DataPages;
use super::basic::{Values, ValuesDictionary};
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum State<'a, P>
where
P: ParquetNativeType,
{
Optional(Optional<'a>, Values<'a, P>),
Required(Values<'a, P>),
RequiredDictionary(ValuesDictionary<'a, P>),
OptionalDictionary(Optional<'a>, ValuesDictionary<'a, P>),
}
impl<'a, P> utils::PageState<'a> for State<'a, P>
where
P: ParquetNativeType,
{
fn len(&self) -> usize {
match self {
State::Optional(optional, _) => optional.len(),
State::Required(required) => required.len(),
State::RequiredDictionary(required) => required.len(),
State::OptionalDictionary(optional, _) => optional.len(),
}
}
}
#[derive(Debug)]
struct PrimitiveDecoder<T, P, F>
where
T: NativeType,
P: ParquetNativeType,
F: Fn(P) -> T,
{
phantom: std::marker::PhantomData<T>,
phantom_p: std::marker::PhantomData<P>,
op: F,
}
impl<'a, T, P, F> PrimitiveDecoder<T, P, F>
where
T: NativeType,
P: ParquetNativeType,
F: Fn(P) -> T,
{
#[inline]
fn new(op: F) -> Self {
Self {
phantom: std::marker::PhantomData,
phantom_p: std::marker::PhantomData,
op,
}
}
}
impl<'a, T, P, F> utils::Decoder<'a> for PrimitiveDecoder<T, P, F>
where
T: NativeType,
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
type State = State<'a, P>;
type DecodedState = (Vec<T>, MutableBitmap);
fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, _, true) => {
Ok(State::Optional(Optional::new(page), Values::new(page)))
}
(Encoding::Plain, _, false) => Ok(State::Required(Values::new(page))),
_ => Err(utils::not_implemented(
&page.encoding(),
is_optional,
false,
"any",
"Primitive",
)),
}
}
fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Vec::<T>::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
)
}
fn extend_from_state(
&self,
state: &mut Self::State,
decoded: &mut Self::DecodedState,
remaining: usize,
) {
let (values, validity) = decoded;
match state {
State::Optional(page_validity, page_values) => {
let max_def = page_validity.max_def();
read_optional_values(
page_validity.definition_levels.by_ref(),
max_def,
page_values.values.by_ref().map(decode).map(self.op),
values,
validity,
remaining,
)
}
State::Required(page) => {
values.extend(
page.values
.by_ref()
.map(decode)
.map(self.op)
.take(remaining),
);
}
State::RequiredDictionary(page) => {
let op1 = |index: u32| page.dict[index as usize];
values.extend(page.values.by_ref().map(op1).map(self.op).take(remaining));
}
State::OptionalDictionary(page_validity, page_values) => {
let max_def = page_validity.max_def();
let op1 = |index: u32| page_values.dict[index as usize];
read_optional_values(
page_validity.definition_levels.by_ref(),
max_def,
page_values.values.by_ref().map(op1).map(self.op),
values,
validity,
remaining,
)
}
}
}
}
fn finish<T: NativeType>(
data_type: &DataType,
values: Vec<T>,
validity: MutableBitmap,
) -> PrimitiveArray<T> {
PrimitiveArray::from_data(data_type.clone(), values.into(), validity.into())
}
#[derive(Debug)]
pub struct ArrayIterator<T, I, P, F>
where
I: DataPages,
T: NativeType,
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
iter: I,
init: InitNested,
data_type: DataType,
items: VecDeque<(Vec<T>, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
decoder: PrimitiveDecoder<T, P, F>,
}
impl<T, I, P, F> ArrayIterator<T, I, P, F>
where
I: DataPages,
T: NativeType,
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize, op: F) -> Self {
Self {
iter,
init,
data_type,
items: VecDeque::new(),
nested: VecDeque::new(),
chunk_size,
decoder: PrimitiveDecoder::new(op),
}
}
}
impl<T, I, P, F> Iterator for ArrayIterator<T, I, P, F>
where
I: DataPages,
T: NativeType,
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
type Item = Result<(NestedState, PrimitiveArray<T>)>;
fn next(&mut self) -> Option<Self::Item> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.nested,
&self.init,
self.chunk_size,
&self.decoder,
);
match maybe_state {
utils::MaybeNext::Some(Ok((nested, state))) => {
Some(Ok((nested, finish(&self.data_type, state.0, state.1))))
}
utils::MaybeNext::Some(Err(e)) => Some(Err(e)),
utils::MaybeNext::None => None,
utils::MaybeNext::More => self.next(),
}
}
}