Skip to main content

liquid_cache/liquid_array/
mod.rs

1//! LiquidArray is the core data structure of LiquidCache.
2//! You should not use this module directly.
3//! Instead, use `liquid_cache_datafusion_server` or `liquid_cache_datafusion_client` to interact with LiquidCache.
4mod byte_array;
5pub mod byte_view_array;
6mod decimal_array;
7mod fix_len_byte_array;
8mod float_array;
9mod hybrid_primitive_array;
10pub mod ipc;
11mod linear_integer_array;
12mod primitive_array;
13pub mod raw;
14mod squeezed_date32_array;
15#[cfg(test)]
16mod tests;
17pub(crate) mod utils;
18mod variant_array;
19
20use std::{any::Any, ops::Range, sync::Arc};
21
22use arrow::{
23    array::{ArrayRef, BooleanArray},
24    buffer::BooleanBuffer,
25};
26use arrow_schema::DataType;
27pub use byte_array::{LiquidByteArray, get_bytes_needle, get_string_needle};
28pub use byte_view_array::LiquidByteViewArray;
29use bytes::Bytes;
30use datafusion::logical_expr::Operator as DFOperator;
31use datafusion::physical_plan::PhysicalExpr;
32pub use decimal_array::LiquidDecimalArray;
33pub use fix_len_byte_array::LiquidFixedLenByteArray;
34use float_array::LiquidFloatType;
35pub use float_array::{LiquidFloat32Array, LiquidFloat64Array, LiquidFloatArray};
36pub use linear_integer_array::{
37    LiquidLinearArray, LiquidLinearDate32Array, LiquidLinearDate64Array, LiquidLinearI8Array,
38    LiquidLinearI16Array, LiquidLinearI32Array, LiquidLinearI64Array, LiquidLinearU8Array,
39    LiquidLinearU16Array, LiquidLinearU32Array, LiquidLinearU64Array,
40};
41pub use primitive_array::IntegerSqueezePolicy;
42pub use primitive_array::{
43    LiquidDate32Array, LiquidDate64Array, LiquidI8Array, LiquidI16Array, LiquidI32Array,
44    LiquidI64Array, LiquidPrimitiveArray, LiquidPrimitiveDeltaArray, LiquidPrimitiveType,
45    LiquidU8Array, LiquidU16Array, LiquidU32Array, LiquidU64Array,
46};
47pub use squeezed_date32_array::{Date32Field, SqueezedDate32Array};
48pub use variant_array::VariantStructSqueezedArray;
49
50use crate::{cache::CacheExpression, liquid_array::raw::FsstArray};
51
52/// Liquid data type is only logical type
53#[derive(Debug, Clone, Copy)]
54#[repr(u16)]
55pub enum LiquidDataType {
56    /// A byte array.
57    ByteArray = 0,
58    /// A byte-view array (dictionary + FSST raw + views).
59    ByteViewArray = 4,
60    /// An integer.
61    Integer = 1,
62    /// A float.
63    Float = 2,
64    /// A fixed length byte array.
65    FixedLenByteArray = 3,
66    /// A decimal encoded as a primitive u64 array.
67    Decimal = 6,
68    /// A linear-model based integer (signed residuals + model params).
69    LinearInteger = 5,
70}
71
72impl From<u16> for LiquidDataType {
73    fn from(value: u16) -> Self {
74        match value {
75            0 => LiquidDataType::ByteArray,
76            4 => LiquidDataType::ByteViewArray,
77            1 => LiquidDataType::Integer,
78            2 => LiquidDataType::Float,
79            3 => LiquidDataType::FixedLenByteArray,
80            5 => LiquidDataType::LinearInteger,
81            6 => LiquidDataType::Decimal,
82            _ => panic!("Invalid liquid data type: {value}"),
83        }
84    }
85}
86
87/// A trait to access the underlying Liquid array.
88pub trait AsLiquidArray {
89    /// Get the underlying string array.
90    fn as_string_array_opt(&self) -> Option<&LiquidByteArray>;
91
92    /// Get the underlying string array.
93    fn as_string(&self) -> &LiquidByteArray {
94        self.as_string_array_opt().expect("liquid string array")
95    }
96
97    /// Get the underlying binary array.
98    fn as_binary_array_opt(&self) -> Option<&LiquidByteArray>;
99
100    /// Get the underlying binary array.
101    fn as_binary(&self) -> &LiquidByteArray {
102        self.as_binary_array_opt().expect("liquid binary array")
103    }
104
105    /// Get the underlying byte view array.
106    fn as_byte_view_array_opt(&self) -> Option<&LiquidByteViewArray<FsstArray>>;
107
108    /// Get the underlying byte view array.
109    fn as_byte_view(&self) -> &LiquidByteViewArray<FsstArray> {
110        self.as_byte_view_array_opt()
111            .expect("liquid byte view array")
112    }
113
114    /// Get the underlying primitive array.
115    fn as_primitive_array_opt<T: LiquidPrimitiveType>(&self) -> Option<&LiquidPrimitiveArray<T>>;
116
117    /// Get the underlying primitive array.
118    fn as_primitive<T: LiquidPrimitiveType>(&self) -> &LiquidPrimitiveArray<T> {
119        self.as_primitive_array_opt()
120            .expect("liquid primitive array")
121    }
122
123    /// Get the underlying float array.
124    fn as_float_array_opt<T: LiquidFloatType>(&self) -> Option<&LiquidFloatArray<T>>;
125
126    /// Get the underlying float array.
127    fn as_float<T: LiquidFloatType>(&self) -> &LiquidFloatArray<T> {
128        self.as_float_array_opt().expect("liquid float array")
129    }
130}
131
132impl AsLiquidArray for dyn LiquidArray + '_ {
133    fn as_string_array_opt(&self) -> Option<&LiquidByteArray> {
134        self.as_any().downcast_ref()
135    }
136
137    fn as_primitive_array_opt<T: LiquidPrimitiveType>(&self) -> Option<&LiquidPrimitiveArray<T>> {
138        self.as_any().downcast_ref()
139    }
140
141    fn as_binary_array_opt(&self) -> Option<&LiquidByteArray> {
142        self.as_any().downcast_ref()
143    }
144
145    fn as_byte_view_array_opt(&self) -> Option<&LiquidByteViewArray<FsstArray>> {
146        self.as_any().downcast_ref()
147    }
148
149    fn as_float_array_opt<T: LiquidFloatType>(&self) -> Option<&LiquidFloatArray<T>> {
150        self.as_any().downcast_ref()
151    }
152}
153
154/// A Liquid array.
155pub trait LiquidArray: std::fmt::Debug + Send + Sync {
156    /// Get the underlying any type.
157    fn as_any(&self) -> &dyn Any;
158
159    /// Get the memory size of the Liquid array.
160    fn get_array_memory_size(&self) -> usize;
161
162    /// Get the length of the Liquid array.
163    fn len(&self) -> usize;
164
165    /// Check if the Liquid array is empty.
166    fn is_empty(&self) -> bool {
167        self.len() == 0
168    }
169
170    /// Convert the Liquid array to an Arrow array.
171    fn to_arrow_array(&self) -> ArrayRef;
172
173    /// Convert the Liquid array to an Arrow array.
174    /// Except that it will pick the best encoding for the arrow array.
175    /// Meaning that it may not obey the data type of the original arrow array.
176    fn to_best_arrow_array(&self) -> ArrayRef {
177        self.to_arrow_array()
178    }
179
180    /// Get the logical data type of the Liquid array.
181    fn data_type(&self) -> LiquidDataType;
182
183    /// Get the original arrow data type of the Liquid array.
184    fn original_arrow_data_type(&self) -> DataType;
185
186    /// Serialize the Liquid array to a byte array.
187    fn to_bytes(&self) -> Vec<u8>;
188
189    /// Filter the Liquid array with a boolean array and return an **arrow array**.
190    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
191        let arrow_array = self.to_arrow_array();
192        let selection = BooleanArray::new(selection.clone(), None);
193        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
194    }
195
196    /// Try to evaluate a predicate on the Liquid array with a filter.
197    /// Returns `None` if the predicate is not supported.
198    ///
199    /// Note that the filter is a boolean buffer, not a boolean array, i.e., filter can't be nullable.
200    /// The returned boolean mask is nullable if the the original array is nullable.
201    fn try_eval_predicate(
202        &self,
203        _predicate: &Arc<dyn PhysicalExpr>,
204        _filter: &BooleanBuffer,
205    ) -> Option<BooleanArray> {
206        None
207    }
208
209    /// Squeeze the Liquid array to a `LiquidHybridArrayRef` and a `bytes::Bytes`.
210    /// Return `None` if the Liquid array cannot be squeezed.
211    ///
212    /// This is the bridge from in-memory array to hybrid array.
213    /// Important: The returned `Bytes` is the data that is stored on disk, it is the same as to_bytes().
214    ///
215    /// Hydrating the hybrid array from the stored bytes should yield the same `LiquidArray`.
216    fn squeeze(
217        &self,
218        _io: Arc<dyn SqueezeIoHandler>,
219        _expression_hint: Option<&CacheExpression>,
220    ) -> Option<(LiquidSqueezedArrayRef, bytes::Bytes)> {
221        None
222    }
223}
224
225/// A reference to a Liquid array.
226pub type LiquidArrayRef = Arc<dyn LiquidArray>;
227
228/// On-disk backing type for a hybrid array.
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub enum SqueezedBacking {
231    /// Bytes are stored using the Liquid IPC format.
232    Liquid,
233    /// Bytes are stored using Arrow IPC (or another Arrow-compatible encoding).
234    Arrow,
235}
236
237/// A reference to a Liquid squeezed array.
238pub type LiquidSqueezedArrayRef = Arc<dyn LiquidSqueezedArray>;
239
240/// Signals that the squeezed representation needs to be hydrated from disk.
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub struct NeedsBacking;
243
244/// Result type for squeezed operations that may require disk hydration.
245pub type SqueezeResult<T> = Result<T, NeedsBacking>;
246
247enum Operator {
248    Eq,
249    NotEq,
250    Lt,
251    LtEq,
252    Gt,
253    GtEq,
254}
255
256impl Operator {
257    fn from_datafusion(op: &DFOperator) -> Option<Self> {
258        let op = match op {
259            DFOperator::Eq => Operator::Eq,
260            DFOperator::NotEq => Operator::NotEq,
261            DFOperator::Lt => Operator::Lt,
262            DFOperator::LtEq => Operator::LtEq,
263            DFOperator::Gt => Operator::Gt,
264            DFOperator::GtEq => Operator::GtEq,
265            _ => return None,
266        };
267        Some(op)
268    }
269}
270
271/// A Liquid squeezed array is a Liquid array that part of its data is stored on disk.
272/// `LiquidSqueezedArray` is more complex than in-memory `LiquidArray` because it needs to handle IO.
273#[async_trait::async_trait]
274pub trait LiquidSqueezedArray: std::fmt::Debug + Send + Sync {
275    /// Get the underlying any type.
276    fn as_any(&self) -> &dyn Any;
277
278    /// Get the memory size of the Liquid array.
279    fn get_array_memory_size(&self) -> usize;
280
281    /// Get the length of the Liquid array.
282    fn len(&self) -> usize;
283
284    /// Check if the Liquid array is empty.
285    fn is_empty(&self) -> bool {
286        self.len() == 0
287    }
288
289    /// Convert the Liquid array to an Arrow array.
290    async fn to_arrow_array(&self) -> ArrayRef;
291
292    /// Convert the Liquid array to an Arrow array.
293    /// Except that it will pick the best encoding for the arrow array.
294    /// Meaning that it may not obey the data type of the original arrow array.
295    async fn to_best_arrow_array(&self) -> ArrayRef {
296        self.to_arrow_array().await
297    }
298
299    /// Get the logical data type of the Liquid array.
300    fn data_type(&self) -> LiquidDataType;
301
302    /// Get the original arrow data type of the Liquid squeezed array.
303    fn original_arrow_data_type(&self) -> DataType;
304
305    /// Filter the Liquid array with a boolean array and return an **arrow array**.
306    async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
307        let arrow_array = self.to_arrow_array().await;
308        let selection = BooleanArray::new(selection.clone(), None);
309        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
310    }
311
312    /// Try to evaluate a predicate on the Liquid array with a filter.
313    /// Returns `Ok(None)` if the predicate is not supported.
314    ///
315    /// Note that the filter is a boolean buffer, not a boolean array, i.e., filter can't be nullable.
316    /// The returned boolean mask is nullable if the the original array is nullable.
317    async fn try_eval_predicate(
318        &self,
319        _predicate: &Arc<dyn PhysicalExpr>,
320        _filter: &BooleanBuffer,
321    ) -> Option<BooleanArray> {
322        None
323    }
324
325    /// Describe how the squeezed array persists its backing bytes on disk.
326    fn disk_backing(&self) -> SqueezedBacking {
327        SqueezedBacking::Liquid
328    }
329}
330
331/// A trait to read the backing bytes of a squeezed array from disk.
332#[async_trait::async_trait]
333pub trait SqueezeIoHandler: std::fmt::Debug + Send + Sync {
334    /// Read the backing bytes of a squeezed array from disk.
335    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes>;
336
337    /// Trace the number of decompressions performed.
338    // TODO: this is ugly.
339    fn tracing_decompress_count(&self, _decompress_cnt: usize, _total_cnt: usize) {
340        // Do nothing by default
341    }
342
343    /// Trace the number of IO saved by squeezing.
344    // TODO: this is ugly.
345    fn trace_io_saved(&self) {
346        // Do nothing by default
347    }
348}
349
350/// Compile-time info about primitive kind (signed vs unsigned) and bounds.
351/// Implemented for all Liquid-supported primitive integer and date types.
352pub trait PrimitiveKind {
353    /// Whether the logical type is unsigned (true for u8/u16/u32/u64).
354    const IS_UNSIGNED: bool;
355    /// Maximum representable value as u64 for unsigned types (unused for signed).
356    const MAX_U64: u64;
357    /// Minimum representable value as i64 for signed/date types (unused for unsigned).
358    const MIN_I64: i64;
359    /// Maximum representable value as i64 for signed/date types (unused for unsigned).
360    const MAX_I64: i64;
361}
362
363macro_rules! impl_unsigned_kind {
364    ($t:ty, $max:expr) => {
365        impl PrimitiveKind for $t {
366            const IS_UNSIGNED: bool = true;
367            const MAX_U64: u64 = $max as u64;
368            const MIN_I64: i64 = 0; // unused
369            const MAX_I64: i64 = 0; // unused
370        }
371    };
372}
373
374macro_rules! impl_signed_kind {
375    ($t:ty, $min:expr, $max:expr) => {
376        impl PrimitiveKind for $t {
377            const IS_UNSIGNED: bool = false;
378            const MAX_U64: u64 = 0; // unused
379            const MIN_I64: i64 = $min as i64;
380            const MAX_I64: i64 = $max as i64;
381        }
382    };
383}
384
385use arrow::datatypes::{
386    Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, TimestampMicrosecondType,
387    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
388    UInt32Type, UInt64Type,
389};
390
391impl_unsigned_kind!(UInt8Type, u8::MAX);
392impl_unsigned_kind!(UInt16Type, u16::MAX);
393impl_unsigned_kind!(UInt32Type, u32::MAX);
394impl_unsigned_kind!(UInt64Type, u64::MAX);
395
396impl_signed_kind!(Int8Type, i8::MIN, i8::MAX);
397impl_signed_kind!(Int16Type, i16::MIN, i16::MAX);
398impl_signed_kind!(Int32Type, i32::MIN, i32::MAX);
399impl_signed_kind!(Int64Type, i64::MIN, i64::MAX);
400
401// Dates are logically signed in Arrow (Date32: i32 days, Date64: i64 ms)
402impl_signed_kind!(Date32Type, i32::MIN, i32::MAX);
403impl_signed_kind!(Date64Type, i64::MIN, i64::MAX);
404impl_signed_kind!(TimestampSecondType, i64::MIN, i64::MAX);
405impl_signed_kind!(TimestampMillisecondType, i64::MIN, i64::MAX);
406impl_signed_kind!(TimestampMicrosecondType, i64::MIN, i64::MAX);
407impl_signed_kind!(TimestampNanosecondType, i64::MIN, i64::MAX);