use std::io::Read;
use std::marker::PhantomData;
use std::sync::Arc;
use arrow::array::{ArrayRef, DictionaryArray, GenericByteArray, StringArray};
use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer};
use arrow::compute::kernels::cast;
use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType, GenericStringType};
use snafu::{ensure, ResultExt};
use crate::array_decoder::derive_present_vec;
use crate::column::Column;
use crate::compression::Decompressor;
use crate::encoding::integer::get_unsigned_int_decoder;
use crate::encoding::PrimitiveValueDecoder;
use crate::error::{ArrowSnafu, IoSnafu, OffsetOverflowSnafu, Result};
use crate::proto::column_encoding::Kind as ColumnEncodingKind;
use crate::proto::stream::Kind;
use crate::stripe::Stripe;
use super::{ArrayBatchDecoder, Int64ArrayDecoder, PresentDecoder};
pub fn new_binary_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn ArrayBatchDecoder>> {
let present = PresentDecoder::from_stripe(stripe, column);
let lengths = stripe.stream_map().get(column, Kind::Length);
let lengths = get_unsigned_int_decoder(lengths, column.rle_version());
let bytes = Box::new(stripe.stream_map().get(column, Kind::Data));
Ok(Box::new(BinaryArrayDecoder::new(bytes, lengths, present)))
}
pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn ArrayBatchDecoder>> {
let kind = column.encoding().kind();
let present = PresentDecoder::from_stripe(stripe, column);
let lengths = stripe.stream_map().get(column, Kind::Length);
let lengths = get_unsigned_int_decoder(lengths, column.rle_version());
match kind {
ColumnEncodingKind::Direct | ColumnEncodingKind::DirectV2 => {
let bytes = Box::new(stripe.stream_map().get(column, Kind::Data));
Ok(Box::new(DirectStringArrayDecoder::new(
bytes, lengths, present,
)))
}
ColumnEncodingKind::Dictionary | ColumnEncodingKind::DictionaryV2 => {
let bytes = Box::new(stripe.stream_map().get(column, Kind::DictionaryData));
let dictionary_size = column.dictionary_size();
let dictionary_strings = DirectStringArrayDecoder::new(bytes, lengths, None)
.next_byte_batch(dictionary_size, None)?;
let dictionary_strings = Arc::new(dictionary_strings);
let indexes = stripe.stream_map().get(column, Kind::Data);
let indexes = get_unsigned_int_decoder(indexes, column.rle_version());
let indexes = Int64ArrayDecoder::new(indexes, present);
Ok(Box::new(DictionaryStringArrayDecoder::new(
indexes,
dictionary_strings,
)?))
}
}
}
pub type DirectStringArrayDecoder = GenericByteArrayDecoder<GenericStringType<i32>>;
pub type BinaryArrayDecoder = GenericByteArrayDecoder<GenericBinaryType<i32>>;
pub struct GenericByteArrayDecoder<T: ByteArrayType> {
bytes: Box<Decompressor>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
present: Option<PresentDecoder>,
phantom: PhantomData<T>,
}
impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
fn new(
bytes: Box<Decompressor>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
present: Option<PresentDecoder>,
) -> Self {
Self {
bytes,
lengths,
present,
phantom: Default::default(),
}
}
fn next_byte_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<GenericByteArray<T>> {
let present =
derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;
let mut lengths = vec![0; batch_size];
if let Some(present) = &present {
self.lengths.decode_spaced(&mut lengths, present)?;
} else {
self.lengths.decode(&mut lengths)?;
}
let total_length: i64 = lengths.iter().sum();
ensure!(
total_length <= i32::MAX as i64,
OffsetOverflowSnafu {
total_length,
max_size: i32::MAX,
batch_size,
}
);
let mut bytes = Vec::with_capacity(total_length as usize);
self.bytes
.by_ref()
.take(total_length as u64)
.read_to_end(&mut bytes)
.context(IoSnafu)?;
let bytes = Buffer::from(bytes);
let offsets =
OffsetBuffer::<T::Offset>::from_lengths(lengths.into_iter().map(|l| l as usize));
let null_buffer = match present {
Some(present) if present.null_count() == 0 => None,
_ => present,
};
let array =
GenericByteArray::<T>::try_new(offsets, bytes, null_buffer).context(ArrowSnafu)?;
Ok(array)
}
}
impl<T: ByteArrayType> ArrayBatchDecoder for GenericByteArrayDecoder<T> {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let array = self.next_byte_batch(batch_size, parent_present)?;
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
use crate::array_decoder::skip_present_and_get_non_null_count;
let non_null_count =
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
let mut lengths = vec![0; non_null_count];
self.lengths.decode(&mut lengths)?;
let total_bytes: i64 = lengths.iter().sum();
std::io::copy(
&mut self.bytes.by_ref().take(total_bytes as u64),
&mut std::io::sink(),
)
.context(IoSnafu)?;
Ok(())
}
}
pub struct DictionaryStringArrayDecoder {
indexes: Int64ArrayDecoder,
dictionary: Arc<StringArray>,
}
impl DictionaryStringArrayDecoder {
fn new(indexes: Int64ArrayDecoder, dictionary: Arc<StringArray>) -> Result<Self> {
Ok(Self {
indexes,
dictionary,
})
}
}
impl ArrayBatchDecoder for DictionaryStringArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let keys = self
.indexes
.next_primitive_batch(batch_size, parent_present)?;
let array = DictionaryArray::try_new(keys, self.dictionary.clone()).context(ArrowSnafu)?;
let array = cast(&array, &DataType::Utf8).context(ArrowSnafu)?;
let array = Arc::new(array);
Ok(array)
}
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.indexes.skip_values(n, parent_present)
}
}