Skip to main content

liquid_cache/liquid_array/
mod.rs

1//! LiquidArray is the core data structure of LiquidCache.
2//! You should not use this module directly.
3//! Instead, use `liquid_cache_datafusion_server` or `liquid_cache_datafusion_client` to interact with LiquidCache.
4pub mod byte_view_array;
5mod decimal_array;
6mod fix_len_byte_array;
7mod float_array;
8mod hybrid_primitive_array;
9pub mod ipc;
10mod linear_integer_array;
11mod primitive_array;
12pub mod raw;
13mod squeezed_date32_array;
14#[cfg(test)]
15mod tests;
16pub(crate) mod utils;
17mod variant_array;
18
19use std::{any::Any, ops::Range, sync::Arc};
20
21use arrow::{
22    array::{ArrayRef, BooleanArray, cast::AsArray},
23    buffer::BooleanBuffer,
24    record_batch::RecordBatch,
25};
26use arrow_schema::{DataType, Field, Schema};
27pub use byte_view_array::LiquidByteViewArray;
28use bytes::Bytes;
29use datafusion_expr_common::operator::Operator as DFOperator;
30pub use decimal_array::LiquidDecimalArray;
31pub use fix_len_byte_array::LiquidFixedLenByteArray;
32pub use float_array::{LiquidFloat32Array, LiquidFloat64Array, LiquidFloatArray};
33pub use linear_integer_array::{
34    LiquidLinearArray, LiquidLinearDate32Array, LiquidLinearDate64Array, LiquidLinearI8Array,
35    LiquidLinearI16Array, LiquidLinearI32Array, LiquidLinearI64Array, LiquidLinearU8Array,
36    LiquidLinearU16Array, LiquidLinearU32Array, LiquidLinearU64Array,
37};
38pub use primitive_array::IntegerSqueezePolicy;
39pub use primitive_array::{
40    LiquidDate32Array, LiquidDate64Array, LiquidI8Array, LiquidI16Array, LiquidI32Array,
41    LiquidI64Array, LiquidPrimitiveArray, LiquidPrimitiveDeltaArray, LiquidPrimitiveType,
42    LiquidU8Array, LiquidU16Array, LiquidU32Array, LiquidU64Array,
43};
44pub use squeezed_date32_array::{Date32Field, SqueezedDate32Array};
45pub use variant_array::VariantStructSqueezedArray;
46
47use crate::cache::{CacheExpression, LiquidExpr};
48
49/// Liquid data type is only logical type
50#[derive(Debug, Clone, Copy)]
51#[repr(u16)]
52pub enum LiquidDataType {
53    /// A byte-view array (dictionary + FSST raw + views).
54    ByteViewArray = 4,
55    /// An integer.
56    Integer = 1,
57    /// A float.
58    Float = 2,
59    /// A fixed length byte array.
60    FixedLenByteArray = 3,
61    /// A decimal encoded as a primitive u64 array.
62    Decimal = 6,
63    /// A linear-model based integer (signed residuals + model params).
64    LinearInteger = 5,
65}
66
67impl From<u16> for LiquidDataType {
68    fn from(value: u16) -> Self {
69        match value {
70            4 => LiquidDataType::ByteViewArray,
71            1 => LiquidDataType::Integer,
72            2 => LiquidDataType::Float,
73            3 => LiquidDataType::FixedLenByteArray,
74            5 => LiquidDataType::LinearInteger,
75            6 => LiquidDataType::Decimal,
76            _ => panic!("Invalid liquid data type: {value}"),
77        }
78    }
79}
80
81/// A Liquid array.
82pub trait LiquidArray: std::fmt::Debug + Send + Sync {
83    /// Get the underlying any type.
84    fn as_any(&self) -> &dyn Any;
85
86    /// Get the memory size of the Liquid array.
87    fn get_array_memory_size(&self) -> usize;
88
89    /// Get the length of the Liquid array.
90    fn len(&self) -> usize;
91
92    /// Check if the Liquid array is empty.
93    fn is_empty(&self) -> bool {
94        self.len() == 0
95    }
96
97    /// Convert the Liquid array to an Arrow array.
98    fn to_arrow_array(&self) -> ArrayRef;
99
100    /// Convert the Liquid array to an Arrow array.
101    /// Except that it will pick the best encoding for the arrow array.
102    /// Meaning that it may not obey the data type of the original arrow array.
103    fn to_best_arrow_array(&self) -> ArrayRef {
104        self.to_arrow_array()
105    }
106
107    /// Get the logical data type of the Liquid array.
108    fn data_type(&self) -> LiquidDataType;
109
110    /// Get the original arrow data type of the Liquid array.
111    fn original_arrow_data_type(&self) -> DataType;
112
113    /// Serialize the Liquid array to a byte array.
114    fn to_bytes(&self) -> Vec<u8>;
115
116    /// Filter the Liquid array with a boolean array and return an **arrow array**.
117    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
118        let arrow_array = self.to_arrow_array();
119        let selection = BooleanArray::new(selection.clone(), None);
120        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
121    }
122
123    /// Evaluate a predicate on the Liquid array with a filter.
124    ///
125    /// Note that the filter is a boolean buffer, not a boolean array, i.e., filter can't be nullable.
126    /// The returned boolean mask is nullable if the the original array is nullable.
127    fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
128        let filtered = self.filter(filter);
129        eval_predicate_on_array(filtered, predicate)
130    }
131
132    /// Squeeze the Liquid array to a `LiquidHybridArrayRef` and a `bytes::Bytes`.
133    /// Return `None` if the Liquid array cannot be squeezed.
134    ///
135    /// This is the bridge from in-memory array to hybrid array.
136    /// Important: The returned `Bytes` is the data that is stored on disk, it is the same as to_bytes().
137    ///
138    /// Hydrating the hybrid array from the stored bytes should yield the same `LiquidArray`.
139    fn squeeze(
140        &self,
141        _io: Arc<dyn SqueezeIoHandler>,
142        _expression_hint: Option<&CacheExpression>,
143    ) -> Option<(LiquidSqueezedArrayRef, bytes::Bytes)> {
144        None
145    }
146}
147
148/// A reference to a Liquid array.
149pub type LiquidArrayRef = Arc<dyn LiquidArray>;
150
151/// On-disk backing for a squeezed array.
152///
153/// Each variant carries the byte length of the persisted backing data, so the
154/// cache can release the disk budget when the entry is evicted.
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum SqueezedBacking {
157    /// Bytes are stored using the Liquid IPC format.
158    Liquid(usize),
159    /// Bytes are stored using Arrow IPC (or another Arrow-compatible encoding).
160    Arrow(usize),
161}
162
163impl SqueezedBacking {
164    /// Byte length of the backing data persisted on disk.
165    pub fn disk_bytes(&self) -> usize {
166        match self {
167            Self::Liquid(n) | Self::Arrow(n) => *n,
168        }
169    }
170}
171
172/// A reference to a Liquid squeezed array.
173pub type LiquidSqueezedArrayRef = Arc<dyn LiquidSqueezedArray>;
174
175/// Signals that the squeezed representation needs to be hydrated from disk.
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub struct NeedsBacking;
178
179/// Result type for squeezed operations that may require disk hydration.
180pub type SqueezeResult<T> = Result<T, NeedsBacking>;
181
182enum Operator {
183    Eq,
184    NotEq,
185    Lt,
186    LtEq,
187    Gt,
188    GtEq,
189}
190
191impl Operator {
192    fn from_datafusion(op: &DFOperator) -> Option<Self> {
193        let op = match op {
194            DFOperator::Eq => Operator::Eq,
195            DFOperator::NotEq => Operator::NotEq,
196            DFOperator::Lt => Operator::Lt,
197            DFOperator::LtEq => Operator::LtEq,
198            DFOperator::Gt => Operator::Gt,
199            DFOperator::GtEq => Operator::GtEq,
200            _ => return None,
201        };
202        Some(op)
203    }
204}
205
206/// A Liquid squeezed array is a Liquid array that part of its data is stored on disk.
207/// `LiquidSqueezedArray` is more complex than in-memory `LiquidArray` because it needs to handle IO.
208#[async_trait::async_trait]
209pub trait LiquidSqueezedArray: std::fmt::Debug + Send + Sync {
210    /// Get the underlying any type.
211    fn as_any(&self) -> &dyn Any;
212
213    /// Get the memory size of the Liquid array.
214    fn get_array_memory_size(&self) -> usize;
215
216    /// Get the length of the Liquid array.
217    fn len(&self) -> usize;
218
219    /// Check if the Liquid array is empty.
220    fn is_empty(&self) -> bool {
221        self.len() == 0
222    }
223
224    /// Convert the Liquid array to an Arrow array.
225    async fn to_arrow_array(&self) -> ArrayRef;
226
227    /// Convert the Liquid array to an Arrow array.
228    /// Except that it will pick the best encoding for the arrow array.
229    /// Meaning that it may not obey the data type of the original arrow array.
230    async fn to_best_arrow_array(&self) -> ArrayRef {
231        self.to_arrow_array().await
232    }
233
234    /// Get the logical data type of the Liquid array.
235    fn data_type(&self) -> LiquidDataType;
236
237    /// Get the original arrow data type of the Liquid squeezed array.
238    fn original_arrow_data_type(&self) -> DataType;
239
240    /// Filter the Liquid array with a boolean array and return an **arrow array**.
241    async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
242        let arrow_array = self.to_arrow_array().await;
243        let selection = BooleanArray::new(selection.clone(), None);
244        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
245    }
246
247    /// Evaluate a predicate on the Liquid array with a filter.
248    ///
249    /// Note that the filter is a boolean buffer, not a boolean array, i.e., filter can't be nullable.
250    /// The returned boolean mask is nullable if the the original array is nullable.
251    async fn try_eval_predicate(
252        &self,
253        predicate: &LiquidExpr,
254        filter: &BooleanBuffer,
255    ) -> BooleanArray {
256        let filtered = self.filter(filter).await;
257        eval_predicate_on_array(filtered, predicate)
258    }
259
260    /// Describe how the squeezed array persists its backing bytes on disk,
261    /// including the byte length of the persisted data.
262    fn disk_backing(&self) -> SqueezedBacking;
263}
264
265pub(crate) fn eval_predicate_on_array(array: ArrayRef, predicate: &LiquidExpr) -> BooleanArray {
266    let schema = Arc::new(Schema::new(vec![Field::new(
267        "liquid_predicate_col",
268        array.data_type().clone(),
269        true,
270    )]));
271    let record_batch = RecordBatch::try_new(schema, vec![array]).expect("predicate input batch");
272    let result = predicate
273        .physical_expr()
274        .evaluate(&record_batch)
275        .expect("validated LiquidExpr must evaluate");
276    let boolean_array = result
277        .into_array(record_batch.num_rows())
278        .expect("predicate output must be an array");
279    boolean_array.as_boolean().clone()
280}
281
282/// A trait to read the backing bytes of a squeezed array from disk.
283#[async_trait::async_trait]
284pub trait SqueezeIoHandler: std::fmt::Debug + Send + Sync {
285    /// Read the backing bytes of a squeezed array from disk.
286    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes>;
287
288    /// Trace the number of decompressions performed.
289    // TODO: this is ugly.
290    fn tracing_decompress_count(&self, _decompress_cnt: usize, _total_cnt: usize) {
291        // Do nothing by default
292    }
293
294    /// Trace the number of IO saved by squeezing.
295    // TODO: this is ugly.
296    fn trace_io_saved(&self) {
297        // Do nothing by default
298    }
299}
300
301/// Compile-time info about primitive kind (signed vs unsigned) and bounds.
302/// Implemented for all Liquid-supported primitive integer and date types.
303pub trait PrimitiveKind {
304    /// Whether the logical type is unsigned (true for u8/u16/u32/u64).
305    const IS_UNSIGNED: bool;
306    /// Maximum representable value as u64 for unsigned types (unused for signed).
307    const MAX_U64: u64;
308    /// Minimum representable value as i64 for signed/date types (unused for unsigned).
309    const MIN_I64: i64;
310    /// Maximum representable value as i64 for signed/date types (unused for unsigned).
311    const MAX_I64: i64;
312}
313
314macro_rules! impl_unsigned_kind {
315    ($t:ty, $max:expr) => {
316        impl PrimitiveKind for $t {
317            const IS_UNSIGNED: bool = true;
318            const MAX_U64: u64 = $max as u64;
319            const MIN_I64: i64 = 0; // unused
320            const MAX_I64: i64 = 0; // unused
321        }
322    };
323}
324
325macro_rules! impl_signed_kind {
326    ($t:ty, $min:expr, $max:expr) => {
327        impl PrimitiveKind for $t {
328            const IS_UNSIGNED: bool = false;
329            const MAX_U64: u64 = 0; // unused
330            const MIN_I64: i64 = $min as i64;
331            const MAX_I64: i64 = $max as i64;
332        }
333    };
334}
335
336use arrow::datatypes::{
337    Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, TimestampMicrosecondType,
338    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
339    UInt32Type, UInt64Type,
340};
341
342impl_unsigned_kind!(UInt8Type, u8::MAX);
343impl_unsigned_kind!(UInt16Type, u16::MAX);
344impl_unsigned_kind!(UInt32Type, u32::MAX);
345impl_unsigned_kind!(UInt64Type, u64::MAX);
346
347impl_signed_kind!(Int8Type, i8::MIN, i8::MAX);
348impl_signed_kind!(Int16Type, i16::MIN, i16::MAX);
349impl_signed_kind!(Int32Type, i32::MIN, i32::MAX);
350impl_signed_kind!(Int64Type, i64::MIN, i64::MAX);
351
352// Dates are logically signed in Arrow (Date32: i32 days, Date64: i64 ms)
353impl_signed_kind!(Date32Type, i32::MIN, i32::MAX);
354impl_signed_kind!(Date64Type, i64::MIN, i64::MAX);
355impl_signed_kind!(TimestampSecondType, i64::MIN, i64::MAX);
356impl_signed_kind!(TimestampMillisecondType, i64::MIN, i64::MAX);
357impl_signed_kind!(TimestampMicrosecondType, i64::MIN, i64::MAX);
358impl_signed_kind!(TimestampNanosecondType, i64::MIN, i64::MAX);