use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;
use arrow::array::{
ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
types::{
Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
},
};
use arrow::buffer::{BooleanBuffer, ScalarBuffer};
use arrow_schema::DataType;
use datafusion::physical_plan::PhysicalExpr;
use fastlanes::BitPacking;
use num_traits::{AsPrimitive, FromPrimitive};
use super::LiquidDataType;
use crate::cache::CacheExpression;
use crate::liquid_array::hybrid_primitive_array::{
LiquidPrimitiveClampedArray, LiquidPrimitiveQuantizedArray,
};
use crate::liquid_array::ipc::{LiquidIPCHeader, PhysicalTypeMarker, get_physical_type_id};
use crate::liquid_array::raw::BitPackedArray;
use crate::liquid_array::{
LiquidArray, LiquidSqueezedArrayRef, PrimitiveKind, SqueezeIoHandler, SqueezedDate32Array,
};
use crate::utils::get_bit_width;
use arrow::datatypes::ArrowNativeType;
use bytes::Bytes;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IntegerSqueezePolicy {
Clamp = 0,
#[default]
Quantize = 1,
}
mod private {
pub trait Sealed {}
}
pub trait LiquidPrimitiveType:
ArrowPrimitiveType<
Native: AsPrimitive<<Self::UnSignedType as ArrowPrimitiveType>::Native>
+ AsPrimitive<i64>
+ FromPrimitive
+ Display,
> + Debug
+ Send
+ Sync
+ private::Sealed
+ PrimitiveKind
+ PhysicalTypeMarker
{
type UnSignedType: ArrowPrimitiveType<Native: AsPrimitive<Self::Native> + AsPrimitive<u64> + BitPacking>
+ Debug;
}
macro_rules! impl_has_unsigned_type {
($($signed:ty => $unsigned:ty),*) => {
$(
impl private::Sealed for $signed {}
impl LiquidPrimitiveType for $signed {
type UnSignedType = $unsigned;
}
)*
}
}
impl_has_unsigned_type! {
Int32Type => UInt32Type,
Int64Type => UInt64Type,
Int16Type => UInt16Type,
Int8Type => UInt8Type,
UInt32Type => UInt32Type,
UInt64Type => UInt64Type,
UInt16Type => UInt16Type,
UInt8Type => UInt8Type,
Date64Type => UInt64Type,
Date32Type => UInt32Type,
TimestampSecondType => UInt64Type,
TimestampMillisecondType => UInt64Type,
TimestampMicrosecondType => UInt64Type,
TimestampNanosecondType => UInt64Type
}
pub type LiquidU8Array = LiquidPrimitiveArray<UInt8Type>;
pub type LiquidU16Array = LiquidPrimitiveArray<UInt16Type>;
pub type LiquidU32Array = LiquidPrimitiveArray<UInt32Type>;
pub type LiquidU64Array = LiquidPrimitiveArray<UInt64Type>;
pub type LiquidI8Array = LiquidPrimitiveArray<Int8Type>;
pub type LiquidI16Array = LiquidPrimitiveArray<Int16Type>;
pub type LiquidI32Array = LiquidPrimitiveArray<Int32Type>;
pub type LiquidI64Array = LiquidPrimitiveArray<Int64Type>;
pub type LiquidDate32Array = LiquidPrimitiveArray<Date32Type>;
pub type LiquidDate64Array = LiquidPrimitiveArray<Date64Type>;
#[derive(Debug)]
pub struct LiquidPrimitiveArray<T: LiquidPrimitiveType> {
bit_packed: BitPackedArray<T::UnSignedType>,
reference_value: T::Native,
squeeze_policy: IntegerSqueezePolicy,
}
#[derive(Debug, Clone)]
pub struct LiquidPrimitiveDeltaArray<T: LiquidPrimitiveType> {
bit_packed: BitPackedArray<T::UnSignedType>,
reference_value: T::Native,
}
impl<T> LiquidPrimitiveArray<T>
where
T: LiquidPrimitiveType,
{
pub fn get_array_memory_size(&self) -> usize {
self.bit_packed.get_array_memory_size()
+ std::mem::size_of::<T::Native>()
+ std::mem::size_of::<IntegerSqueezePolicy>()
}
pub fn len(&self) -> usize {
self.bit_packed.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveArray<T> {
let min = match arrow::compute::kernels::aggregate::min(&arrow_array) {
Some(v) => v,
None => {
return Self {
bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
reference_value: T::Native::ZERO,
squeeze_policy: IntegerSqueezePolicy::default(),
};
}
};
let max = arrow::compute::kernels::aggregate::max(&arrow_array).unwrap();
let sub = max.sub_wrapping(min) as <T as ArrowPrimitiveType>::Native;
let sub: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
sub.as_();
let bit_width = get_bit_width(sub.as_());
let (_data_type, values, nulls) = arrow_array.clone().into_parts();
let values = if min != T::Native::ZERO {
ScalarBuffer::from_iter(values.iter().map(|v| {
let k: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
v.sub_wrapping(min).as_();
k
}))
} else {
#[allow(clippy::missing_transmute_annotations)]
unsafe {
std::mem::transmute(values)
}
};
let unsigned_array =
PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
Self {
bit_packed: bit_packed_array,
reference_value: min,
squeeze_policy: IntegerSqueezePolicy::default(),
}
}
pub fn squeeze_policy(&self) -> IntegerSqueezePolicy {
self.squeeze_policy
}
pub fn set_squeeze_policy(&mut self, policy: IntegerSqueezePolicy) {
self.squeeze_policy = policy;
}
pub fn with_squeeze_policy(mut self, policy: IntegerSqueezePolicy) -> Self {
self.squeeze_policy = policy;
self
}
}
impl<T> LiquidPrimitiveDeltaArray<T>
where
T: LiquidPrimitiveType,
{
pub fn get_array_memory_size(&self) -> usize {
self.bit_packed.get_array_memory_size() + std::mem::size_of::<T::Native>()
}
pub fn len(&self) -> usize {
self.bit_packed.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveDeltaArray<T> {
use arrow::array::Array;
let len = arrow_array.len();
if arrow_array.null_count() == len {
return Self {
bit_packed: BitPackedArray::new_null_array(len),
reference_value: T::Native::ZERO,
};
}
let (_dt, values, nulls) = arrow_array.clone().into_parts();
let vals: Vec<T::Native> = values.to_vec();
type UnsignedNative<TT> =
<<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
let mut out: Vec<UnsignedNative<T>> = Vec::with_capacity(len);
let mut max_value: UnsignedNative<T> = UnsignedNative::<T>::ZERO;
let mut anchor: T::Native = T::Native::ZERO;
if let Some(_nb) = &nulls {
let nb = nulls.as_ref().unwrap();
let mut have_prev = false;
let mut prev: T::Native = T::Native::ZERO;
for (i, &cur) in vals.iter().enumerate() {
if !nb.is_valid(i) {
out.push(UnsignedNative::<T>::ZERO);
continue;
}
if !have_prev {
anchor = cur;
prev = cur;
have_prev = true;
out.push(UnsignedNative::<T>::ZERO);
continue;
}
let delta: T::Native = cur.sub_wrapping(prev);
let delta_i64: i64 = delta.as_();
let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
let delta_unsigned: UnsignedNative<T> =
UnsignedNative::<T>::usize_as(zigzag as usize);
if delta_unsigned > max_value {
max_value = delta_unsigned;
}
out.push(delta_unsigned);
prev = cur;
}
} else {
anchor = vals[0];
let mut prev: T::Native = anchor;
out.push(UnsignedNative::<T>::ZERO); for &cur in vals.iter().skip(1) {
let delta: T::Native = cur.sub_wrapping(prev);
let delta_i64: i64 = delta.as_();
let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
let delta_unsigned: UnsignedNative<T> =
UnsignedNative::<T>::usize_as(zigzag as usize);
if delta_unsigned > max_value {
max_value = delta_unsigned;
}
out.push(delta_unsigned);
prev = cur;
}
}
let bit_width = get_bit_width(max_value.as_());
let values = ScalarBuffer::from_iter(out);
let unsigned_array =
PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
Self {
bit_packed: bit_packed_array,
reference_value: anchor,
}
}
}
impl<T> LiquidArray for LiquidPrimitiveArray<T>
where
T: LiquidPrimitiveType + super::PrimitiveKind,
{
fn get_array_memory_size(&self) -> usize {
self.get_array_memory_size()
}
fn len(&self) -> usize {
self.len()
}
fn original_arrow_data_type(&self) -> DataType {
T::DATA_TYPE.clone()
}
fn as_any(&self) -> &dyn Any {
self
}
#[inline]
fn to_arrow_array(&self) -> ArrayRef {
let unsigned_array = self.bit_packed.to_primitive();
let (_data_type, values, _nulls) = unsigned_array.into_parts();
let nulls = self.bit_packed.nulls();
let values = if self.reference_value != T::Native::ZERO {
let reference_v = self.reference_value.as_();
ScalarBuffer::from_iter(values.iter().map(|v| {
let k: <T as ArrowPrimitiveType>::Native = (*v).add_wrapping(reference_v).as_();
k
}))
} else {
#[allow(clippy::missing_transmute_annotations)]
unsafe {
std::mem::transmute(values)
}
};
Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
}
fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
let arrow_array = self.to_arrow_array();
let selection = BooleanArray::new(selection.clone(), None);
arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
}
fn try_eval_predicate(
&self,
_predicate: &Arc<dyn PhysicalExpr>,
_filter: &BooleanBuffer,
) -> Option<BooleanArray> {
None
}
fn to_bytes(&self) -> Vec<u8> {
self.to_bytes_inner()
}
fn data_type(&self) -> LiquidDataType {
LiquidDataType::Integer
}
fn squeeze(
&self,
io: Arc<dyn SqueezeIoHandler>,
expression_hint: Option<&CacheExpression>,
) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
let expression_hint = expression_hint?;
let full_bytes = Bytes::from(self.to_bytes_inner());
let disk_range = 0u64..(full_bytes.len() as u64);
if T::DATA_TYPE == DataType::Date32 {
let field = expression_hint.as_date32_field()?;
let squeezed =
SqueezedDate32Array::from_liquid_date32(self, field).with_backing(io, disk_range);
return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
}
if matches!(T::DATA_TYPE, DataType::Timestamp(_, _)) {
let field = expression_hint.as_date32_field()?;
let squeezed = SqueezedDate32Array::from_liquid_timestamp(self, field)
.with_backing(io, disk_range);
return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
}
let orig_bw = self.bit_packed.bit_width()?;
if orig_bw.get() < 8 {
return None;
}
let new_bw_u8 = std::num::NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
let unsigned_array = self.bit_packed.to_primitive();
let (_dt, values, nulls) = unsigned_array.into_parts();
match self.squeeze_policy {
IntegerSqueezePolicy::Clamp => {
type U<TT> =
<<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
let sentinel: U<T> = U::<T>::usize_as((1usize << new_bw_u8.get()) - 1);
let squeezed_values: ScalarBuffer<U<T>> = ScalarBuffer::from_iter(
values
.iter()
.map(|&v| if v >= sentinel { sentinel } else { v }),
);
let squeezed_unsigned =
PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
squeezed_values,
nulls,
);
let squeezed_bitpacked =
BitPackedArray::from_primitive(squeezed_unsigned, new_bw_u8);
let hybrid = LiquidPrimitiveClampedArray::<T> {
squeezed: squeezed_bitpacked,
reference_value: self.reference_value,
disk_range,
io: io.clone(),
};
Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
}
IntegerSqueezePolicy::Quantize => {
type U<TT> =
<<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
let max_offset: U<T> = if let Some(m) = values.iter().copied().max() {
m
} else {
U::<T>::ZERO
};
let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
let max_off_u64: u64 = num_traits::AsPrimitive::<u64>::as_(max_offset);
let range_size = max_off_u64.saturating_add(1);
let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
let quantized_values: ScalarBuffer<U<T>> =
ScalarBuffer::from_iter(values.iter().map(|&v| {
let v_u64: u64 = num_traits::AsPrimitive::<u64>::as_(v);
let mut idx_u64 = v_u64 / bucket_width_u64;
if idx_u64 >= bucket_count_u64 {
idx_u64 = bucket_count_u64 - 1;
}
U::<T>::usize_as(idx_u64 as usize)
}));
let quantized_unsigned =
PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
quantized_values,
nulls,
);
let quantized_bitpacked =
BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
let hybrid = LiquidPrimitiveQuantizedArray::<T> {
quantized: quantized_bitpacked,
reference_value: self.reference_value,
bucket_width: bucket_width_u64,
disk_range,
io,
};
Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
}
}
}
}
impl<T> LiquidArray for LiquidPrimitiveDeltaArray<T>
where
T: LiquidPrimitiveType + super::PrimitiveKind,
{
fn get_array_memory_size(&self) -> usize {
self.get_array_memory_size()
}
fn len(&self) -> usize {
self.len()
}
fn original_arrow_data_type(&self) -> DataType {
T::DATA_TYPE.clone()
}
fn as_any(&self) -> &dyn Any {
self
}
#[inline]
fn to_arrow_array(&self) -> ArrayRef {
let unsigned_array = self.bit_packed.to_primitive();
let (_data_type, delta_values, _nulls) = unsigned_array.into_parts();
let nulls = self.bit_packed.nulls();
let mut reconstructed = Vec::with_capacity(delta_values.len());
let mut current_value = self.reference_value;
if let Some(nulls) = nulls {
let mut have_prev = false;
for (i, &delta_unsigned) in delta_values.iter().enumerate() {
if !nulls.is_valid(i) {
reconstructed.push(T::Native::ZERO); continue;
}
if !have_prev {
reconstructed.push(current_value);
have_prev = true;
} else {
let zigzag: u64 = delta_unsigned.as_();
let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
current_value = current_value.add_wrapping(delta);
reconstructed.push(current_value);
}
}
} else {
reconstructed.push(current_value); for &delta_unsigned in delta_values.iter().skip(1) {
let zigzag: u64 = delta_unsigned.as_();
let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
current_value = current_value.add_wrapping(delta);
reconstructed.push(current_value);
}
}
let values = ScalarBuffer::from_iter(reconstructed);
Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
}
fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
let arrow_array = self.to_arrow_array();
let selection = BooleanArray::new(selection.clone(), None);
arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
}
fn try_eval_predicate(
&self,
_predicate: &Arc<dyn PhysicalExpr>,
_filter: &BooleanBuffer,
) -> Option<BooleanArray> {
None
}
fn to_bytes(&self) -> Vec<u8> {
self.to_bytes_inner()
}
fn data_type(&self) -> LiquidDataType {
LiquidDataType::Integer
}
fn squeeze(
&self,
_io: Arc<dyn SqueezeIoHandler>,
_expression_hint: Option<&CacheExpression>,
) -> Option<(crate::liquid_array::LiquidSqueezedArrayRef, bytes::Bytes)> {
None
}
}
impl<T> LiquidPrimitiveArray<T>
where
T: LiquidPrimitiveType,
{
fn bit_pack_starting_loc() -> usize {
let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
(header_size + 7) & !7
}
pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
let physical_type_id = get_physical_type_id::<T>();
let logical_type_id = super::LiquidDataType::Integer as u16;
let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
let bit_pack_starting_loc = Self::bit_pack_starting_loc();
let mut result = Vec::with_capacity(bit_pack_starting_loc + 256);
result.extend_from_slice(&header.to_bytes());
let ref_value_bytes = unsafe {
std::slice::from_raw_parts(
&self.reference_value as *const T::Native as *const u8,
std::mem::size_of::<T::Native>(),
)
};
result.extend_from_slice(ref_value_bytes);
while result.len() < bit_pack_starting_loc {
result.push(0);
}
self.bit_packed.to_bytes(&mut result);
result
}
pub fn from_bytes(bytes: Bytes) -> Self {
let header = LiquidIPCHeader::from_bytes(&bytes);
let physical_id = header.physical_type_id;
assert_eq!(physical_id, get_physical_type_id::<T>());
let logical_id = header.logical_type_id;
assert_eq!(logical_id, super::LiquidDataType::Integer as u16);
let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
let reference_value =
unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
Self {
bit_packed,
reference_value,
squeeze_policy: IntegerSqueezePolicy::default(),
}
}
}
impl<T> LiquidPrimitiveDeltaArray<T>
where
T: LiquidPrimitiveType,
{
fn bit_pack_starting_loc() -> usize {
let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
(header_size + 7) & !7
}
pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
let physical_type_id = get_physical_type_id::<T>();
let logical_type_id = 1u16; let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
let bit_pack_starting_loc = Self::bit_pack_starting_loc();
let mut result = Vec::with_capacity(bit_pack_starting_loc + 256);
result.extend_from_slice(&header.to_bytes());
let ref_value_bytes = unsafe {
std::slice::from_raw_parts(
&self.reference_value as *const T::Native as *const u8,
std::mem::size_of::<T::Native>(),
)
};
result.extend_from_slice(ref_value_bytes);
while result.len() < bit_pack_starting_loc {
result.push(0);
}
self.bit_packed.to_bytes(&mut result);
result
}
pub fn from_bytes(bytes: Bytes) -> Self {
let header = LiquidIPCHeader::from_bytes(&bytes);
let physical_id = header.physical_type_id;
assert_eq!(physical_id, get_physical_type_id::<T>());
let logical_id = header.logical_type_id;
assert_eq!(logical_id, 1u16);
let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
let reference_value =
unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
Self {
bit_packed,
reference_value,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Array;
macro_rules! test_roundtrip {
($test_name:ident, $type:ty, $values:expr) => {
#[test]
fn $test_name() {
let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
let array = PrimitiveArray::<$type>::from(original.clone());
let liquid_array = LiquidPrimitiveArray::<$type>::from_arrow_array(array.clone());
let result_array = liquid_array.to_arrow_array();
let bytes_array =
LiquidPrimitiveArray::<$type>::from_bytes(liquid_array.to_bytes().into());
assert_eq!(result_array.as_ref(), &array);
assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
}
};
}
test_roundtrip!(
test_int8_roundtrip_basic,
Int8Type,
vec![Some(1), Some(2), Some(3), None, Some(5)]
);
test_roundtrip!(
test_int8_roundtrip_negative,
Int8Type,
vec![Some(-128), Some(-64), Some(0), Some(63), Some(127)]
);
test_roundtrip!(
test_int16_roundtrip_basic,
Int16Type,
vec![Some(1), Some(2), Some(3), None, Some(5)]
);
test_roundtrip!(
test_int16_roundtrip_negative,
Int16Type,
vec![
Some(-32768),
Some(-16384),
Some(0),
Some(16383),
Some(32767)
]
);
test_roundtrip!(
test_int32_roundtrip_basic,
Int32Type,
vec![Some(1), Some(2), Some(3), None, Some(5)]
);
test_roundtrip!(
test_int32_roundtrip_negative,
Int32Type,
vec![
Some(-2147483648),
Some(-1073741824),
Some(0),
Some(1073741823),
Some(2147483647)
]
);
test_roundtrip!(
test_int64_roundtrip_basic,
Int64Type,
vec![Some(1), Some(2), Some(3), None, Some(5)]
);
test_roundtrip!(
test_int64_roundtrip_negative,
Int64Type,
vec![
Some(-9223372036854775808),
Some(-4611686018427387904),
Some(0),
Some(4611686018427387903),
Some(9223372036854775807)
]
);
test_roundtrip!(
test_uint8_roundtrip,
UInt8Type,
vec![Some(0), Some(128), Some(255), None, Some(64)]
);
test_roundtrip!(
test_uint16_roundtrip,
UInt16Type,
vec![Some(0), Some(32768), Some(65535), None, Some(16384)]
);
test_roundtrip!(
test_uint32_roundtrip,
UInt32Type,
vec![
Some(0),
Some(2147483648),
Some(4294967295),
None,
Some(1073741824)
]
);
test_roundtrip!(
test_uint64_roundtrip,
UInt64Type,
vec![
Some(0),
Some(9223372036854775808),
Some(18446744073709551615),
None,
Some(4611686018427387904)
]
);
test_roundtrip!(
test_date32_roundtrip,
Date32Type,
vec![Some(-365), Some(0), Some(365), None, Some(18262)]
);
test_roundtrip!(
test_date64_roundtrip,
Date64Type,
vec![Some(-365), Some(0), Some(365), None, Some(18262)]
);
#[test]
fn test_all_nulls() {
let original: Vec<Option<i32>> = vec![None, None, None];
let array = PrimitiveArray::<Int32Type>::from(original.clone());
let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
let result_array = liquid_array.to_arrow_array();
assert_eq!(result_array.len(), original.len());
assert_eq!(result_array.null_count(), original.len());
}
#[test]
fn test_all_nulls_filter() {
let original: Vec<Option<i32>> = vec![None, None, None];
let array = PrimitiveArray::<Int32Type>::from(original.clone());
let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
let result_array = liquid_array.filter(&BooleanBuffer::from(vec![true, false, true]));
assert_eq!(result_array.len(), 2);
assert_eq!(result_array.null_count(), 2);
}
#[test]
fn test_zero_reference_value() {
let original: Vec<Option<i32>> = vec![Some(0), Some(1), Some(2), None, Some(4)];
let array = PrimitiveArray::<Int32Type>::from(original.clone());
let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
let result_array = liquid_array.to_arrow_array();
assert_eq!(liquid_array.reference_value, 0);
assert_eq!(result_array.as_ref(), &array);
}
#[test]
fn test_single_value() {
let original: Vec<Option<i32>> = vec![Some(42)];
let array = PrimitiveArray::<Int32Type>::from(original.clone());
let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
let result_array = liquid_array.to_arrow_array();
assert_eq!(result_array.as_ref(), &array);
}
#[test]
fn test_filter_basic() {
let original = vec![Some(1), Some(2), Some(3), None, Some(5)];
let array = PrimitiveArray::<Int32Type>::from(original);
let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
let result_array = liquid_array.filter(&selection);
let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
assert_eq!(result_array.as_ref(), &expected);
}
#[test]
fn test_original_arrow_data_type_returns_int32() {
let array = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
let liquid = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
assert_eq!(liquid.original_arrow_data_type(), DataType::Int32);
}
#[test]
fn test_filter_all_nulls() {
let original = vec![None, None, None, None];
let array = PrimitiveArray::<Int32Type>::from(original);
let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
let selection = BooleanBuffer::from(vec![true, false, false, true]);
let result_array = liquid_array.filter(&selection);
let expected = PrimitiveArray::<Int32Type>::from(vec![None, None]);
assert_eq!(result_array.as_ref(), &expected);
}
#[test]
fn test_filter_empty_result() {
let original = vec![Some(1), Some(2), Some(3)];
let array = PrimitiveArray::<Int32Type>::from(original);
let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
let selection = BooleanBuffer::from(vec![false, false, false]);
let result_array = liquid_array.filter(&selection);
assert_eq!(result_array.len(), 0);
}
#[test]
fn test_delta_encoding_basic_roundtrip() {
let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
let array = PrimitiveArray::<Int32Type>::from(original.clone());
let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
let result_array = liquid_delta.to_arrow_array();
assert_eq!(result_array.as_ref(), &array);
}
#[test]
fn test_delta_encoding_with_nulls() {
let original = vec![Some(1), None, Some(4), Some(7), None, Some(12)];
let array = PrimitiveArray::<Int32Type>::from(original.clone());
let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
let result_array = liquid_delta.to_arrow_array();
assert_eq!(result_array.as_ref(), &array);
}
#[test]
fn test_delta_encoding_serialization() {
let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
let array = PrimitiveArray::<Int32Type>::from(original.clone());
let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
let bytes = liquid_delta.to_bytes();
let reconstructed = LiquidPrimitiveDeltaArray::<Int32Type>::from_bytes(bytes.into());
let result_array = reconstructed.to_arrow_array();
assert_eq!(result_array.as_ref(), &array);
}
#[test]
fn test_memory_comparison_sequential_data() {
let sequential_data: Vec<Option<i32>> = (0..1000).map(Some).collect();
let array = PrimitiveArray::<Int32Type>::from(sequential_data);
let liquid_regular = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array);
let regular_size = liquid_regular.get_array_memory_size();
let delta_size = liquid_delta.get_array_memory_size();
println!(
"Sequential data - Regular: {} bytes, Delta: {} bytes",
regular_size, delta_size
);
assert!(
delta_size <= regular_size,
"Delta encoding should be more efficient for sequential data"
);
}
}