use std::collections::VecDeque;
use std::convert::TryInto;
use parquet2::encoding::{hybrid_rle, Encoding};
use parquet2::page::{split_buffer as _split_buffer, DataPage};
use streaming_iterator::{convert, Convert, StreamingIterator};
use crate::bitmap::utils::BitmapIter;
use crate::bitmap::MutableBitmap;
use crate::error::ArrowError;
use super::super::DataPages;
#[derive(Debug)]
pub struct BinaryIter<'a> {
values: &'a [u8],
}
impl<'a> BinaryIter<'a> {
pub fn new(values: &'a [u8]) -> Self {
Self { values }
}
}
impl<'a> Iterator for BinaryIter<'a> {
type Item = &'a [u8];
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.values.is_empty() {
return None;
}
let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize;
self.values = &self.values[4..];
let result = &self.values[..length];
self.values = &self.values[length..];
Some(result)
}
}
pub fn not_implemented(
encoding: &Encoding,
is_optional: bool,
has_dict: bool,
version: &str,
physical_type: &str,
) -> ArrowError {
let required = if is_optional { "optional" } else { "required" };
let dict = if has_dict { ", dictionary-encoded" } else { "" };
ArrowError::NotYetImplemented(format!(
"Decoding \"{:?}\"-encoded{} {} {} pages is not yet implemented for {}",
encoding, dict, required, version, physical_type
))
}
#[inline]
pub fn split_buffer(page: &DataPage) -> (&[u8], &[u8], &[u8]) {
_split_buffer(page, page.descriptor())
}
pub(super) trait Pushable<T>: Sized {
fn push(&mut self, value: T);
fn len(&self) -> usize;
fn push_null(&mut self);
fn extend_constant(&mut self, additional: usize, value: T);
}
impl Pushable<bool> for MutableBitmap {
#[inline]
fn len(&self) -> usize {
self.len()
}
#[inline]
fn push(&mut self, value: bool) {
self.push(value)
}
#[inline]
fn push_null(&mut self) {
self.push(false)
}
#[inline]
fn extend_constant(&mut self, additional: usize, value: bool) {
self.extend_constant(additional, value)
}
}
impl<A: Copy + Default> Pushable<A> for Vec<A> {
#[inline]
fn len(&self) -> usize {
self.len()
}
#[inline]
fn push_null(&mut self) {
self.push(A::default())
}
#[inline]
fn push(&mut self, value: A) {
self.push(value)
}
#[inline]
fn extend_constant(&mut self, additional: usize, value: A) {
self.resize(self.len() + additional, value);
}
}
#[derive(Debug)]
pub struct OptionalPageValidity<'a> {
validity: Convert<hybrid_rle::Decoder<'a>>,
run_offset: usize,
consumed: usize,
length: usize,
}
impl<'a> OptionalPageValidity<'a> {
#[inline]
pub fn new(page: &'a DataPage) -> Self {
let (_, validity, _) = split_buffer(page);
let validity = convert(hybrid_rle::Decoder::new(validity, 1));
Self {
validity,
run_offset: 0,
consumed: 0,
length: page.num_values(),
}
}
#[inline]
pub fn len(&self) -> usize {
self.length - self.consumed
}
}
pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<Item = T>>(
validity: &mut MutableBitmap,
page_validity: &mut OptionalPageValidity<'a>,
limit: Option<usize>,
values: &mut P,
mut values_iter: I,
) {
let limit = limit.unwrap_or(usize::MAX);
let mut consumed_here = 0;
while consumed_here < limit {
if page_validity.run_offset == 0 {
page_validity.validity.advance()
}
if let Some(run) = page_validity.validity.get() {
match run {
hybrid_rle::HybridEncoded::Bitpacked(pack) => {
let pack_size = pack.len() * 8 - page_validity.run_offset;
let remaining = page_validity.length - page_validity.consumed;
let length = std::cmp::min(pack_size, remaining);
let additional = limit.min(length);
let iter = BitmapIter::new(pack, page_validity.run_offset, additional);
for is_valid in iter {
if is_valid {
values.push(values_iter.next().unwrap())
} else {
values.push_null()
};
}
validity.extend_from_slice(pack, page_validity.run_offset, additional);
if additional == length {
page_validity.run_offset = 0
} else {
page_validity.run_offset += additional;
};
consumed_here += additional;
page_validity.consumed += additional;
}
&hybrid_rle::HybridEncoded::Rle(value, length) => {
let is_set = value[0] == 1;
let length = length - page_validity.run_offset;
let additional = limit.min(length);
validity.extend_constant(additional, is_set);
if is_set {
(0..additional).for_each(|_| values.push(values_iter.next().unwrap()));
} else {
values.extend_constant(additional, T::default());
}
if additional == length {
page_validity.run_offset = 0
} else {
page_validity.run_offset += additional;
};
consumed_here += additional;
page_validity.consumed += additional;
}
};
} else {
break;
}
}
}
pub(super) trait PageState<'a> {
fn len(&self) -> usize;
}
pub(super) trait DecodedState<'a> {
fn len(&self) -> usize;
}
pub(super) trait Decoder<'a> {
type State: PageState<'a>;
type DecodedState: DecodedState<'a>;
fn build_state(&self, page: &'a DataPage) -> Result<Self::State, ArrowError>;
fn with_capacity(&self, capacity: usize) -> Self::DecodedState;
fn extend_from_state(
&self,
page: &mut Self::State,
decoded: &mut Self::DecodedState,
additional: usize,
);
}
pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>(
mut page: T::State,
chunk_size: usize,
items: &mut VecDeque<T::DecodedState>,
decoder: &T,
) {
let mut decoded = if let Some(decoded) = items.pop_back() {
debug_assert!(
decoded.len() < chunk_size,
"the temp state is expected to be incomplete"
);
decoded
} else {
decoder.with_capacity(chunk_size)
};
let remaining = chunk_size - decoded.len();
decoder.extend_from_state(&mut page, &mut decoded, remaining);
items.push_back(decoded);
while page.len() > 0 {
let mut decoded = decoder.with_capacity(chunk_size);
decoder.extend_from_state(&mut page, &mut decoded, chunk_size);
items.push_back(decoded)
}
}
#[derive(Debug)]
pub enum MaybeNext<P> {
Some(P),
None,
More,
}
#[inline]
pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>(
iter: &'a mut I,
items: &mut VecDeque<D::DecodedState>,
chunk_size: usize,
decoder: &D,
) -> MaybeNext<Result<D::DecodedState, ArrowError>> {
if items.len() > 1 {
let item = items.pop_front().unwrap();
return MaybeNext::Some(Ok(item));
}
match iter.next() {
Err(e) => MaybeNext::Some(Err(e.into())),
Ok(Some(page)) => {
let maybe_page = decoder.build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return MaybeNext::Some(Err(e)),
};
extend_from_new_page(page, chunk_size, items, decoder);
if items.front().unwrap().len() < chunk_size {
MaybeNext::More
} else {
let decoded = items.pop_front().unwrap();
MaybeNext::Some(Ok(decoded))
}
}
Ok(None) => {
if let Some(decoded) = items.pop_front() {
debug_assert!(decoded.len() <= chunk_size);
MaybeNext::Some(Ok(decoded))
} else {
MaybeNext::None
}
}
}
}
#[inline]
pub(super) fn dict_indices_decoder(
indices_buffer: &[u8],
additional: usize,
) -> hybrid_rle::HybridRleDecoder {
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];
hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional)
}