liquid_cache/liquid_array/
mod.rs1pub mod byte_view_array;
5mod decimal_array;
6mod fix_len_byte_array;
7mod float_array;
8mod hybrid_primitive_array;
9pub mod ipc;
10mod linear_integer_array;
11mod primitive_array;
12pub mod raw;
13mod squeezed_date32_array;
14#[cfg(test)]
15mod tests;
16pub(crate) mod utils;
17mod variant_array;
18
19use std::{any::Any, ops::Range, sync::Arc};
20
21use arrow::{
22 array::{ArrayRef, BooleanArray, cast::AsArray},
23 buffer::BooleanBuffer,
24 record_batch::RecordBatch,
25};
26use arrow_schema::{DataType, Field, Schema};
27pub use byte_view_array::LiquidByteViewArray;
28use bytes::Bytes;
29use datafusion_expr_common::operator::Operator as DFOperator;
30pub use decimal_array::LiquidDecimalArray;
31pub use fix_len_byte_array::LiquidFixedLenByteArray;
32pub use float_array::{LiquidFloat32Array, LiquidFloat64Array, LiquidFloatArray};
33pub use linear_integer_array::{
34 LiquidLinearArray, LiquidLinearDate32Array, LiquidLinearDate64Array, LiquidLinearI8Array,
35 LiquidLinearI16Array, LiquidLinearI32Array, LiquidLinearI64Array, LiquidLinearU8Array,
36 LiquidLinearU16Array, LiquidLinearU32Array, LiquidLinearU64Array,
37};
38pub use primitive_array::IntegerSqueezePolicy;
39pub use primitive_array::{
40 LiquidDate32Array, LiquidDate64Array, LiquidI8Array, LiquidI16Array, LiquidI32Array,
41 LiquidI64Array, LiquidPrimitiveArray, LiquidPrimitiveDeltaArray, LiquidPrimitiveType,
42 LiquidU8Array, LiquidU16Array, LiquidU32Array, LiquidU64Array,
43};
44pub use squeezed_date32_array::{Date32Field, SqueezedDate32Array};
45pub use variant_array::VariantStructSqueezedArray;
46
47use crate::cache::{CacheExpression, LiquidExpr};
48
49#[derive(Debug, Clone, Copy)]
51#[repr(u16)]
52pub enum LiquidDataType {
53 ByteViewArray = 4,
55 Integer = 1,
57 Float = 2,
59 FixedLenByteArray = 3,
61 Decimal = 6,
63 LinearInteger = 5,
65}
66
67impl From<u16> for LiquidDataType {
68 fn from(value: u16) -> Self {
69 match value {
70 4 => LiquidDataType::ByteViewArray,
71 1 => LiquidDataType::Integer,
72 2 => LiquidDataType::Float,
73 3 => LiquidDataType::FixedLenByteArray,
74 5 => LiquidDataType::LinearInteger,
75 6 => LiquidDataType::Decimal,
76 _ => panic!("Invalid liquid data type: {value}"),
77 }
78 }
79}
80
81pub trait LiquidArray: std::fmt::Debug + Send + Sync {
83 fn as_any(&self) -> &dyn Any;
85
86 fn get_array_memory_size(&self) -> usize;
88
89 fn len(&self) -> usize;
91
92 fn is_empty(&self) -> bool {
94 self.len() == 0
95 }
96
97 fn to_arrow_array(&self) -> ArrayRef;
99
100 fn to_best_arrow_array(&self) -> ArrayRef {
104 self.to_arrow_array()
105 }
106
107 fn data_type(&self) -> LiquidDataType;
109
110 fn original_arrow_data_type(&self) -> DataType;
112
113 fn to_bytes(&self) -> Vec<u8>;
115
116 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
118 let arrow_array = self.to_arrow_array();
119 let selection = BooleanArray::new(selection.clone(), None);
120 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
121 }
122
123 fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
128 let filtered = self.filter(filter);
129 eval_predicate_on_array(filtered, predicate)
130 }
131
132 fn squeeze(
140 &self,
141 _io: Arc<dyn SqueezeIoHandler>,
142 _expression_hint: Option<&CacheExpression>,
143 ) -> Option<(LiquidSqueezedArrayRef, bytes::Bytes)> {
144 None
145 }
146}
147
148pub type LiquidArrayRef = Arc<dyn LiquidArray>;
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum SqueezedBacking {
157 Liquid(usize),
159 Arrow(usize),
161}
162
163impl SqueezedBacking {
164 pub fn disk_bytes(&self) -> usize {
166 match self {
167 Self::Liquid(n) | Self::Arrow(n) => *n,
168 }
169 }
170}
171
172pub type LiquidSqueezedArrayRef = Arc<dyn LiquidSqueezedArray>;
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub struct NeedsBacking;
178
179pub type SqueezeResult<T> = Result<T, NeedsBacking>;
181
182enum Operator {
183 Eq,
184 NotEq,
185 Lt,
186 LtEq,
187 Gt,
188 GtEq,
189}
190
191impl Operator {
192 fn from_datafusion(op: &DFOperator) -> Option<Self> {
193 let op = match op {
194 DFOperator::Eq => Operator::Eq,
195 DFOperator::NotEq => Operator::NotEq,
196 DFOperator::Lt => Operator::Lt,
197 DFOperator::LtEq => Operator::LtEq,
198 DFOperator::Gt => Operator::Gt,
199 DFOperator::GtEq => Operator::GtEq,
200 _ => return None,
201 };
202 Some(op)
203 }
204}
205
206#[async_trait::async_trait]
209pub trait LiquidSqueezedArray: std::fmt::Debug + Send + Sync {
210 fn as_any(&self) -> &dyn Any;
212
213 fn get_array_memory_size(&self) -> usize;
215
216 fn len(&self) -> usize;
218
219 fn is_empty(&self) -> bool {
221 self.len() == 0
222 }
223
224 async fn to_arrow_array(&self) -> ArrayRef;
226
227 async fn to_best_arrow_array(&self) -> ArrayRef {
231 self.to_arrow_array().await
232 }
233
234 fn data_type(&self) -> LiquidDataType;
236
237 fn original_arrow_data_type(&self) -> DataType;
239
240 async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
242 let arrow_array = self.to_arrow_array().await;
243 let selection = BooleanArray::new(selection.clone(), None);
244 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
245 }
246
247 async fn try_eval_predicate(
252 &self,
253 predicate: &LiquidExpr,
254 filter: &BooleanBuffer,
255 ) -> BooleanArray {
256 let filtered = self.filter(filter).await;
257 eval_predicate_on_array(filtered, predicate)
258 }
259
260 fn disk_backing(&self) -> SqueezedBacking;
263}
264
265pub(crate) fn eval_predicate_on_array(array: ArrayRef, predicate: &LiquidExpr) -> BooleanArray {
266 let schema = Arc::new(Schema::new(vec![Field::new(
267 "liquid_predicate_col",
268 array.data_type().clone(),
269 true,
270 )]));
271 let record_batch = RecordBatch::try_new(schema, vec![array]).expect("predicate input batch");
272 let result = predicate
273 .physical_expr()
274 .evaluate(&record_batch)
275 .expect("validated LiquidExpr must evaluate");
276 let boolean_array = result
277 .into_array(record_batch.num_rows())
278 .expect("predicate output must be an array");
279 boolean_array.as_boolean().clone()
280}
281
282#[async_trait::async_trait]
284pub trait SqueezeIoHandler: std::fmt::Debug + Send + Sync {
285 async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes>;
287
288 fn tracing_decompress_count(&self, _decompress_cnt: usize, _total_cnt: usize) {
291 }
293
294 fn trace_io_saved(&self) {
297 }
299}
300
301pub trait PrimitiveKind {
304 const IS_UNSIGNED: bool;
306 const MAX_U64: u64;
308 const MIN_I64: i64;
310 const MAX_I64: i64;
312}
313
314macro_rules! impl_unsigned_kind {
315 ($t:ty, $max:expr) => {
316 impl PrimitiveKind for $t {
317 const IS_UNSIGNED: bool = true;
318 const MAX_U64: u64 = $max as u64;
319 const MIN_I64: i64 = 0; const MAX_I64: i64 = 0; }
322 };
323}
324
325macro_rules! impl_signed_kind {
326 ($t:ty, $min:expr, $max:expr) => {
327 impl PrimitiveKind for $t {
328 const IS_UNSIGNED: bool = false;
329 const MAX_U64: u64 = 0; const MIN_I64: i64 = $min as i64;
331 const MAX_I64: i64 = $max as i64;
332 }
333 };
334}
335
336use arrow::datatypes::{
337 Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, TimestampMicrosecondType,
338 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
339 UInt32Type, UInt64Type,
340};
341
342impl_unsigned_kind!(UInt8Type, u8::MAX);
343impl_unsigned_kind!(UInt16Type, u16::MAX);
344impl_unsigned_kind!(UInt32Type, u32::MAX);
345impl_unsigned_kind!(UInt64Type, u64::MAX);
346
347impl_signed_kind!(Int8Type, i8::MIN, i8::MAX);
348impl_signed_kind!(Int16Type, i16::MIN, i16::MAX);
349impl_signed_kind!(Int32Type, i32::MIN, i32::MAX);
350impl_signed_kind!(Int64Type, i64::MIN, i64::MAX);
351
352impl_signed_kind!(Date32Type, i32::MIN, i32::MAX);
354impl_signed_kind!(Date64Type, i64::MIN, i64::MAX);
355impl_signed_kind!(TimestampSecondType, i64::MIN, i64::MAX);
356impl_signed_kind!(TimestampMillisecondType, i64::MIN, i64::MAX);
357impl_signed_kind!(TimestampMicrosecondType, i64::MIN, i64::MAX);
358impl_signed_kind!(TimestampNanosecondType, i64::MIN, i64::MAX);