mod byte_array;
pub mod byte_view_array;
mod decimal_array;
mod fix_len_byte_array;
mod float_array;
mod hybrid_primitive_array;
pub mod ipc;
mod linear_integer_array;
mod primitive_array;
pub mod raw;
mod squeezed_date32_array;
#[cfg(test)]
mod tests;
pub(crate) mod utils;
mod variant_array;
use std::{any::Any, ops::Range, sync::Arc};
use arrow::{
array::{ArrayRef, BooleanArray},
buffer::BooleanBuffer,
};
use arrow_schema::DataType;
pub use byte_array::{LiquidByteArray, get_bytes_needle, get_string_needle};
pub use byte_view_array::LiquidByteViewArray;
use bytes::Bytes;
use datafusion::logical_expr::Operator as DFOperator;
use datafusion::physical_plan::PhysicalExpr;
pub use decimal_array::LiquidDecimalArray;
pub use fix_len_byte_array::LiquidFixedLenByteArray;
use float_array::LiquidFloatType;
pub use float_array::{LiquidFloat32Array, LiquidFloat64Array, LiquidFloatArray};
pub use linear_integer_array::{
LiquidLinearArray, LiquidLinearDate32Array, LiquidLinearDate64Array, LiquidLinearI8Array,
LiquidLinearI16Array, LiquidLinearI32Array, LiquidLinearI64Array, LiquidLinearU8Array,
LiquidLinearU16Array, LiquidLinearU32Array, LiquidLinearU64Array,
};
pub use primitive_array::IntegerSqueezePolicy;
pub use primitive_array::{
LiquidDate32Array, LiquidDate64Array, LiquidI8Array, LiquidI16Array, LiquidI32Array,
LiquidI64Array, LiquidPrimitiveArray, LiquidPrimitiveDeltaArray, LiquidPrimitiveType,
LiquidU8Array, LiquidU16Array, LiquidU32Array, LiquidU64Array,
};
pub use squeezed_date32_array::{Date32Field, SqueezedDate32Array};
pub use variant_array::VariantStructSqueezedArray;
use crate::{cache::CacheExpression, liquid_array::raw::FsstArray};
#[derive(Debug, Clone, Copy)]
#[repr(u16)]
pub enum LiquidDataType {
ByteArray = 0,
ByteViewArray = 4,
Integer = 1,
Float = 2,
FixedLenByteArray = 3,
Decimal = 6,
LinearInteger = 5,
}
impl From<u16> for LiquidDataType {
fn from(value: u16) -> Self {
match value {
0 => LiquidDataType::ByteArray,
4 => LiquidDataType::ByteViewArray,
1 => LiquidDataType::Integer,
2 => LiquidDataType::Float,
3 => LiquidDataType::FixedLenByteArray,
5 => LiquidDataType::LinearInteger,
6 => LiquidDataType::Decimal,
_ => panic!("Invalid liquid data type: {value}"),
}
}
}
pub trait AsLiquidArray {
fn as_string_array_opt(&self) -> Option<&LiquidByteArray>;
fn as_string(&self) -> &LiquidByteArray {
self.as_string_array_opt().expect("liquid string array")
}
fn as_binary_array_opt(&self) -> Option<&LiquidByteArray>;
fn as_binary(&self) -> &LiquidByteArray {
self.as_binary_array_opt().expect("liquid binary array")
}
fn as_byte_view_array_opt(&self) -> Option<&LiquidByteViewArray<FsstArray>>;
fn as_byte_view(&self) -> &LiquidByteViewArray<FsstArray> {
self.as_byte_view_array_opt()
.expect("liquid byte view array")
}
fn as_primitive_array_opt<T: LiquidPrimitiveType>(&self) -> Option<&LiquidPrimitiveArray<T>>;
fn as_primitive<T: LiquidPrimitiveType>(&self) -> &LiquidPrimitiveArray<T> {
self.as_primitive_array_opt()
.expect("liquid primitive array")
}
fn as_float_array_opt<T: LiquidFloatType>(&self) -> Option<&LiquidFloatArray<T>>;
fn as_float<T: LiquidFloatType>(&self) -> &LiquidFloatArray<T> {
self.as_float_array_opt().expect("liquid float array")
}
}
impl AsLiquidArray for dyn LiquidArray + '_ {
fn as_string_array_opt(&self) -> Option<&LiquidByteArray> {
self.as_any().downcast_ref()
}
fn as_primitive_array_opt<T: LiquidPrimitiveType>(&self) -> Option<&LiquidPrimitiveArray<T>> {
self.as_any().downcast_ref()
}
fn as_binary_array_opt(&self) -> Option<&LiquidByteArray> {
self.as_any().downcast_ref()
}
fn as_byte_view_array_opt(&self) -> Option<&LiquidByteViewArray<FsstArray>> {
self.as_any().downcast_ref()
}
fn as_float_array_opt<T: LiquidFloatType>(&self) -> Option<&LiquidFloatArray<T>> {
self.as_any().downcast_ref()
}
}
pub trait LiquidArray: std::fmt::Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn get_array_memory_size(&self) -> usize;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn to_arrow_array(&self) -> ArrayRef;
fn to_best_arrow_array(&self) -> ArrayRef {
self.to_arrow_array()
}
fn data_type(&self) -> LiquidDataType;
fn original_arrow_data_type(&self) -> DataType;
fn to_bytes(&self) -> Vec<u8>;
fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
let arrow_array = self.to_arrow_array();
let selection = BooleanArray::new(selection.clone(), None);
arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
}
fn try_eval_predicate(
&self,
_predicate: &Arc<dyn PhysicalExpr>,
_filter: &BooleanBuffer,
) -> Option<BooleanArray> {
None
}
fn squeeze(
&self,
_io: Arc<dyn SqueezeIoHandler>,
_expression_hint: Option<&CacheExpression>,
) -> Option<(LiquidSqueezedArrayRef, bytes::Bytes)> {
None
}
}
pub type LiquidArrayRef = Arc<dyn LiquidArray>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SqueezedBacking {
Liquid,
Arrow,
}
pub type LiquidSqueezedArrayRef = Arc<dyn LiquidSqueezedArray>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NeedsBacking;
pub type SqueezeResult<T> = Result<T, NeedsBacking>;
enum Operator {
Eq,
NotEq,
Lt,
LtEq,
Gt,
GtEq,
}
impl Operator {
fn from_datafusion(op: &DFOperator) -> Option<Self> {
let op = match op {
DFOperator::Eq => Operator::Eq,
DFOperator::NotEq => Operator::NotEq,
DFOperator::Lt => Operator::Lt,
DFOperator::LtEq => Operator::LtEq,
DFOperator::Gt => Operator::Gt,
DFOperator::GtEq => Operator::GtEq,
_ => return None,
};
Some(op)
}
}
#[async_trait::async_trait]
pub trait LiquidSqueezedArray: std::fmt::Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn get_array_memory_size(&self) -> usize;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
async fn to_arrow_array(&self) -> ArrayRef;
async fn to_best_arrow_array(&self) -> ArrayRef {
self.to_arrow_array().await
}
fn data_type(&self) -> LiquidDataType;
fn original_arrow_data_type(&self) -> DataType;
async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
let arrow_array = self.to_arrow_array().await;
let selection = BooleanArray::new(selection.clone(), None);
arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
}
async fn try_eval_predicate(
&self,
_predicate: &Arc<dyn PhysicalExpr>,
_filter: &BooleanBuffer,
) -> Option<BooleanArray> {
None
}
fn disk_backing(&self) -> SqueezedBacking {
SqueezedBacking::Liquid
}
}
#[async_trait::async_trait]
pub trait SqueezeIoHandler: std::fmt::Debug + Send + Sync {
async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes>;
fn tracing_decompress_count(&self, _decompress_cnt: usize, _total_cnt: usize) {
}
fn trace_io_saved(&self) {
}
}
pub trait PrimitiveKind {
const IS_UNSIGNED: bool;
const MAX_U64: u64;
const MIN_I64: i64;
const MAX_I64: i64;
}
macro_rules! impl_unsigned_kind {
($t:ty, $max:expr) => {
impl PrimitiveKind for $t {
const IS_UNSIGNED: bool = true;
const MAX_U64: u64 = $max as u64;
const MIN_I64: i64 = 0; const MAX_I64: i64 = 0; }
};
}
macro_rules! impl_signed_kind {
($t:ty, $min:expr, $max:expr) => {
impl PrimitiveKind for $t {
const IS_UNSIGNED: bool = false;
const MAX_U64: u64 = 0; const MIN_I64: i64 = $min as i64;
const MAX_I64: i64 = $max as i64;
}
};
}
use arrow::datatypes::{
Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
UInt32Type, UInt64Type,
};
impl_unsigned_kind!(UInt8Type, u8::MAX);
impl_unsigned_kind!(UInt16Type, u16::MAX);
impl_unsigned_kind!(UInt32Type, u32::MAX);
impl_unsigned_kind!(UInt64Type, u64::MAX);
impl_signed_kind!(Int8Type, i8::MIN, i8::MAX);
impl_signed_kind!(Int16Type, i16::MIN, i16::MAX);
impl_signed_kind!(Int32Type, i32::MIN, i32::MAX);
impl_signed_kind!(Int64Type, i64::MIN, i64::MAX);
impl_signed_kind!(Date32Type, i32::MIN, i32::MAX);
impl_signed_kind!(Date64Type, i64::MIN, i64::MAX);
impl_signed_kind!(TimestampSecondType, i64::MIN, i64::MAX);
impl_signed_kind!(TimestampMillisecondType, i64::MIN, i64::MAX);
impl_signed_kind!(TimestampMicrosecondType, i64::MIN, i64::MAX);
impl_signed_kind!(TimestampNanosecondType, i64::MIN, i64::MAX);