use std::marker::PhantomData;
use arrow::{
array::{Array, ArrayRef, AsArray},
datatypes::{
ArrowPrimitiveType, ByteArrayType, Float32Type, Float64Type, GenericBinaryType,
GenericStringType, Int16Type, Int32Type, Int64Type, Int8Type,
},
};
use bytes::{BufMut, BytesMut};
use crate::{
encoding::{
boolean::BooleanEncoder,
byte::ByteRleEncoder,
float::FloatEncoder,
integer::{rle_v2::RleV2Encoder, NInt, SignedEncoding, UnsignedEncoding},
PrimitiveValueEncoder,
},
error::Result,
memory::EstimateMemory,
writer::StreamType,
};
use super::{ColumnEncoding, Stream};
pub trait ColumnStripeEncoder: EstimateMemory {
fn encode_array(&mut self, array: &ArrayRef) -> Result<()>;
fn column_encoding(&self) -> ColumnEncoding;
fn finish(&mut self) -> Vec<Stream>;
}
pub struct PrimitiveColumnEncoder<T: ArrowPrimitiveType, E: PrimitiveValueEncoder<T::Native>> {
encoder: E,
column_encoding: ColumnEncoding,
present: Option<BooleanEncoder>,
encoded_count: usize,
_phantom: PhantomData<T>,
}
impl<T: ArrowPrimitiveType, E: PrimitiveValueEncoder<T::Native>> PrimitiveColumnEncoder<T, E> {
pub fn new(column_encoding: ColumnEncoding) -> Self {
Self {
encoder: E::new(),
column_encoding,
present: None,
encoded_count: 0,
_phantom: Default::default(),
}
}
}
impl<T: ArrowPrimitiveType, E: PrimitiveValueEncoder<T::Native>> EstimateMemory
for PrimitiveColumnEncoder<T, E>
{
fn estimate_memory_size(&self) -> usize {
self.encoder.estimate_memory_size()
+ self
.present
.as_ref()
.map(|p| p.estimate_memory_size())
.unwrap_or(0)
}
}
impl<T: ArrowPrimitiveType, E: PrimitiveValueEncoder<T::Native>> ColumnStripeEncoder
for PrimitiveColumnEncoder<T, E>
{
fn encode_array(&mut self, array: &ArrayRef) -> Result<()> {
let array = array.as_primitive::<T>();
match (array.nulls(), &mut self.present) {
(Some(null_buffer), Some(present)) => {
present.extend(null_buffer);
for index in null_buffer.valid_indices() {
let v = array.value(index);
self.encoder.write_one(v);
}
}
(Some(null_buffer), None) => {
let mut present = BooleanEncoder::new();
present.extend_present(self.encoded_count);
present.extend(null_buffer);
self.present = Some(present);
for index in null_buffer.valid_indices() {
let v = array.value(index);
self.encoder.write_one(v);
}
}
(None, _) => {
let values = array.values();
self.encoder.write_slice(values);
if let Some(present) = self.present.as_mut() {
present.extend_present(array.len())
}
}
}
self.encoded_count += array.len() - array.null_count();
Ok(())
}
fn column_encoding(&self) -> ColumnEncoding {
self.column_encoding
}
fn finish(&mut self) -> Vec<Stream> {
let bytes = self.encoder.take_inner();
let data = Stream {
kind: StreamType::Data,
bytes,
};
self.encoded_count = 0;
match &mut self.present {
Some(present) => {
let bytes = present.finish();
let present = Stream {
kind: StreamType::Present,
bytes,
};
vec![data, present]
}
None => vec![data],
}
}
}
pub struct BooleanColumnEncoder {
encoder: BooleanEncoder,
present: Option<BooleanEncoder>,
encoded_count: usize,
}
impl BooleanColumnEncoder {
pub fn new() -> Self {
Self {
encoder: BooleanEncoder::new(),
present: None,
encoded_count: 0,
}
}
}
impl EstimateMemory for BooleanColumnEncoder {
fn estimate_memory_size(&self) -> usize {
self.encoder.estimate_memory_size()
+ self
.present
.as_ref()
.map(|p| p.estimate_memory_size())
.unwrap_or(0)
}
}
impl ColumnStripeEncoder for BooleanColumnEncoder {
fn encode_array(&mut self, array: &ArrayRef) -> Result<()> {
let array = array.as_boolean();
match (array.nulls(), &mut self.present) {
(Some(null_buffer), Some(present)) => {
present.extend(null_buffer);
for index in null_buffer.valid_indices() {
let v = array.value(index);
self.encoder.extend_boolean(v);
}
}
(Some(null_buffer), None) => {
let mut present = BooleanEncoder::new();
present.extend_present(self.encoded_count);
present.extend(null_buffer);
self.present = Some(present);
for index in null_buffer.valid_indices() {
let v = array.value(index);
self.encoder.extend_boolean(v);
}
}
(None, _) => {
let values = array.values();
self.encoder.extend_bb(values);
if let Some(present) = self.present.as_mut() {
present.extend_present(array.len())
}
}
}
self.encoded_count += array.len() - array.null_count();
Ok(())
}
fn column_encoding(&self) -> ColumnEncoding {
ColumnEncoding::Direct
}
fn finish(&mut self) -> Vec<Stream> {
let bytes = self.encoder.finish();
let data = Stream {
kind: StreamType::Data,
bytes,
};
self.encoded_count = 0;
match &mut self.present {
Some(present) => {
let bytes = present.finish();
let present = Stream {
kind: StreamType::Present,
bytes,
};
vec![data, present]
}
None => vec![data],
}
}
}
pub struct GenericBinaryColumnEncoder<T: ByteArrayType>
where
T::Offset: NInt,
{
string_bytes: BytesMut,
length_encoder: RleV2Encoder<T::Offset, UnsignedEncoding>,
present: Option<BooleanEncoder>,
encoded_count: usize,
}
impl<T: ByteArrayType> GenericBinaryColumnEncoder<T>
where
T::Offset: NInt,
{
pub fn new() -> Self {
Self {
string_bytes: BytesMut::new(),
length_encoder: RleV2Encoder::new(),
present: None,
encoded_count: 0,
}
}
}
impl<T: ByteArrayType> EstimateMemory for GenericBinaryColumnEncoder<T>
where
T::Offset: NInt,
{
fn estimate_memory_size(&self) -> usize {
self.string_bytes.len()
+ self.length_encoder.estimate_memory_size()
+ self
.present
.as_ref()
.map(|p| p.estimate_memory_size())
.unwrap_or(0)
}
}
impl<T: ByteArrayType> ColumnStripeEncoder for GenericBinaryColumnEncoder<T>
where
T::Offset: NInt,
{
fn encode_array(&mut self, array: &ArrayRef) -> Result<()> {
if array.is_empty() {
return Ok(());
}
let array = array.as_bytes::<T>();
match (array.nulls(), &mut self.present) {
(Some(null_buffer), Some(present)) => {
present.extend(null_buffer);
for index in null_buffer.valid_indices() {
self.length_encoder.write_one(array.value_length(index));
self.string_bytes.put_slice(array.value(index).as_ref());
}
}
(Some(null_buffer), None) => {
let mut present = BooleanEncoder::new();
present.extend_present(self.encoded_count);
present.extend(null_buffer);
self.present = Some(present);
for index in null_buffer.valid_indices() {
self.length_encoder.write_one(array.value_length(index));
self.string_bytes.put_slice(array.value(index).as_ref());
}
}
(None, _) => {
let offsets = array.offsets();
let first_offset = offsets[0];
let mut length_to_copy = <T::Offset as num::Zero>::zero();
let mut prev_offset = first_offset;
for &offset in offsets.iter().skip(1) {
let length = offset - prev_offset;
self.length_encoder.write_one(length);
length_to_copy += length;
prev_offset = offset;
}
let first_offset = first_offset.as_i64() as usize;
let end_offset = first_offset + length_to_copy.as_i64() as usize;
let string_bytes = &array.value_data()[first_offset..end_offset];
self.string_bytes.put_slice(string_bytes);
if let Some(present) = self.present.as_mut() {
present.extend_present(array.len())
}
}
}
self.encoded_count += array.len() - array.null_count();
Ok(())
}
fn column_encoding(&self) -> ColumnEncoding {
ColumnEncoding::DirectV2
}
fn finish(&mut self) -> Vec<Stream> {
let data_bytes = std::mem::take(&mut self.string_bytes);
let length_bytes = self.length_encoder.take_inner();
let data = Stream {
kind: StreamType::Data,
bytes: data_bytes.into(),
};
let length = Stream {
kind: StreamType::Length,
bytes: length_bytes,
};
self.encoded_count = 0;
match &mut self.present {
Some(present) => {
let bytes = present.finish();
let present = Stream {
kind: StreamType::Present,
bytes,
};
vec![data, length, present]
}
None => vec![data, length],
}
}
}
pub type FloatColumnEncoder = PrimitiveColumnEncoder<Float32Type, FloatEncoder<f32>>;
pub type DoubleColumnEncoder = PrimitiveColumnEncoder<Float64Type, FloatEncoder<f64>>;
pub type ByteColumnEncoder = PrimitiveColumnEncoder<Int8Type, ByteRleEncoder>;
pub type Int16ColumnEncoder = PrimitiveColumnEncoder<Int16Type, RleV2Encoder<i16, SignedEncoding>>;
pub type Int32ColumnEncoder = PrimitiveColumnEncoder<Int32Type, RleV2Encoder<i32, SignedEncoding>>;
pub type Int64ColumnEncoder = PrimitiveColumnEncoder<Int64Type, RleV2Encoder<i64, SignedEncoding>>;
pub type StringColumnEncoder = GenericBinaryColumnEncoder<GenericStringType<i32>>;
pub type LargeStringColumnEncoder = GenericBinaryColumnEncoder<GenericStringType<i64>>;
pub type BinaryColumnEncoder = GenericBinaryColumnEncoder<GenericBinaryType<i32>>;
pub type LargeBinaryColumnEncoder = GenericBinaryColumnEncoder<GenericBinaryType<i64>>;