liquid_cache/liquid_array/
mod.rs1mod byte_array;
5pub mod byte_view_array;
6mod decimal_array;
7mod fix_len_byte_array;
8mod float_array;
9mod hybrid_primitive_array;
10pub mod ipc;
11mod linear_integer_array;
12mod primitive_array;
13pub mod raw;
14mod squeezed_date32_array;
15#[cfg(test)]
16mod tests;
17pub(crate) mod utils;
18mod variant_array;
19
20use std::{any::Any, ops::Range, sync::Arc};
21
22use arrow::{
23 array::{ArrayRef, BooleanArray},
24 buffer::BooleanBuffer,
25};
26use arrow_schema::DataType;
27pub use byte_array::{LiquidByteArray, get_bytes_needle, get_string_needle};
28pub use byte_view_array::LiquidByteViewArray;
29use bytes::Bytes;
30use datafusion::logical_expr::Operator as DFOperator;
31use datafusion::physical_plan::PhysicalExpr;
32pub use decimal_array::LiquidDecimalArray;
33pub use fix_len_byte_array::LiquidFixedLenByteArray;
34use float_array::LiquidFloatType;
35pub use float_array::{LiquidFloat32Array, LiquidFloat64Array, LiquidFloatArray};
36pub use linear_integer_array::{
37 LiquidLinearArray, LiquidLinearDate32Array, LiquidLinearDate64Array, LiquidLinearI8Array,
38 LiquidLinearI16Array, LiquidLinearI32Array, LiquidLinearI64Array, LiquidLinearU8Array,
39 LiquidLinearU16Array, LiquidLinearU32Array, LiquidLinearU64Array,
40};
41pub use primitive_array::IntegerSqueezePolicy;
42pub use primitive_array::{
43 LiquidDate32Array, LiquidDate64Array, LiquidI8Array, LiquidI16Array, LiquidI32Array,
44 LiquidI64Array, LiquidPrimitiveArray, LiquidPrimitiveDeltaArray, LiquidPrimitiveType,
45 LiquidU8Array, LiquidU16Array, LiquidU32Array, LiquidU64Array,
46};
47pub use squeezed_date32_array::{Date32Field, SqueezedDate32Array};
48pub use variant_array::VariantStructSqueezedArray;
49
50use crate::{cache::CacheExpression, liquid_array::raw::FsstArray};
51
52#[derive(Debug, Clone, Copy)]
54#[repr(u16)]
55pub enum LiquidDataType {
56 ByteArray = 0,
58 ByteViewArray = 4,
60 Integer = 1,
62 Float = 2,
64 FixedLenByteArray = 3,
66 Decimal = 6,
68 LinearInteger = 5,
70}
71
72impl From<u16> for LiquidDataType {
73 fn from(value: u16) -> Self {
74 match value {
75 0 => LiquidDataType::ByteArray,
76 4 => LiquidDataType::ByteViewArray,
77 1 => LiquidDataType::Integer,
78 2 => LiquidDataType::Float,
79 3 => LiquidDataType::FixedLenByteArray,
80 5 => LiquidDataType::LinearInteger,
81 6 => LiquidDataType::Decimal,
82 _ => panic!("Invalid liquid data type: {value}"),
83 }
84 }
85}
86
87pub trait AsLiquidArray {
89 fn as_string_array_opt(&self) -> Option<&LiquidByteArray>;
91
92 fn as_string(&self) -> &LiquidByteArray {
94 self.as_string_array_opt().expect("liquid string array")
95 }
96
97 fn as_binary_array_opt(&self) -> Option<&LiquidByteArray>;
99
100 fn as_binary(&self) -> &LiquidByteArray {
102 self.as_binary_array_opt().expect("liquid binary array")
103 }
104
105 fn as_byte_view_array_opt(&self) -> Option<&LiquidByteViewArray<FsstArray>>;
107
108 fn as_byte_view(&self) -> &LiquidByteViewArray<FsstArray> {
110 self.as_byte_view_array_opt()
111 .expect("liquid byte view array")
112 }
113
114 fn as_primitive_array_opt<T: LiquidPrimitiveType>(&self) -> Option<&LiquidPrimitiveArray<T>>;
116
117 fn as_primitive<T: LiquidPrimitiveType>(&self) -> &LiquidPrimitiveArray<T> {
119 self.as_primitive_array_opt()
120 .expect("liquid primitive array")
121 }
122
123 fn as_float_array_opt<T: LiquidFloatType>(&self) -> Option<&LiquidFloatArray<T>>;
125
126 fn as_float<T: LiquidFloatType>(&self) -> &LiquidFloatArray<T> {
128 self.as_float_array_opt().expect("liquid float array")
129 }
130}
131
132impl AsLiquidArray for dyn LiquidArray + '_ {
133 fn as_string_array_opt(&self) -> Option<&LiquidByteArray> {
134 self.as_any().downcast_ref()
135 }
136
137 fn as_primitive_array_opt<T: LiquidPrimitiveType>(&self) -> Option<&LiquidPrimitiveArray<T>> {
138 self.as_any().downcast_ref()
139 }
140
141 fn as_binary_array_opt(&self) -> Option<&LiquidByteArray> {
142 self.as_any().downcast_ref()
143 }
144
145 fn as_byte_view_array_opt(&self) -> Option<&LiquidByteViewArray<FsstArray>> {
146 self.as_any().downcast_ref()
147 }
148
149 fn as_float_array_opt<T: LiquidFloatType>(&self) -> Option<&LiquidFloatArray<T>> {
150 self.as_any().downcast_ref()
151 }
152}
153
154pub trait LiquidArray: std::fmt::Debug + Send + Sync {
156 fn as_any(&self) -> &dyn Any;
158
159 fn get_array_memory_size(&self) -> usize;
161
162 fn len(&self) -> usize;
164
165 fn is_empty(&self) -> bool {
167 self.len() == 0
168 }
169
170 fn to_arrow_array(&self) -> ArrayRef;
172
173 fn to_best_arrow_array(&self) -> ArrayRef {
177 self.to_arrow_array()
178 }
179
180 fn data_type(&self) -> LiquidDataType;
182
183 fn original_arrow_data_type(&self) -> DataType;
185
186 fn to_bytes(&self) -> Vec<u8>;
188
189 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
191 let arrow_array = self.to_arrow_array();
192 let selection = BooleanArray::new(selection.clone(), None);
193 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
194 }
195
196 fn try_eval_predicate(
202 &self,
203 _predicate: &Arc<dyn PhysicalExpr>,
204 _filter: &BooleanBuffer,
205 ) -> Option<BooleanArray> {
206 None
207 }
208
209 fn squeeze(
217 &self,
218 _io: Arc<dyn SqueezeIoHandler>,
219 _expression_hint: Option<&CacheExpression>,
220 ) -> Option<(LiquidSqueezedArrayRef, bytes::Bytes)> {
221 None
222 }
223}
224
225pub type LiquidArrayRef = Arc<dyn LiquidArray>;
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub enum SqueezedBacking {
231 Liquid,
233 Arrow,
235}
236
237pub type LiquidSqueezedArrayRef = Arc<dyn LiquidSqueezedArray>;
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub struct NeedsBacking;
243
244pub type SqueezeResult<T> = Result<T, NeedsBacking>;
246
247enum Operator {
248 Eq,
249 NotEq,
250 Lt,
251 LtEq,
252 Gt,
253 GtEq,
254}
255
256impl Operator {
257 fn from_datafusion(op: &DFOperator) -> Option<Self> {
258 let op = match op {
259 DFOperator::Eq => Operator::Eq,
260 DFOperator::NotEq => Operator::NotEq,
261 DFOperator::Lt => Operator::Lt,
262 DFOperator::LtEq => Operator::LtEq,
263 DFOperator::Gt => Operator::Gt,
264 DFOperator::GtEq => Operator::GtEq,
265 _ => return None,
266 };
267 Some(op)
268 }
269}
270
271#[async_trait::async_trait]
274pub trait LiquidSqueezedArray: std::fmt::Debug + Send + Sync {
275 fn as_any(&self) -> &dyn Any;
277
278 fn get_array_memory_size(&self) -> usize;
280
281 fn len(&self) -> usize;
283
284 fn is_empty(&self) -> bool {
286 self.len() == 0
287 }
288
289 async fn to_arrow_array(&self) -> ArrayRef;
291
292 async fn to_best_arrow_array(&self) -> ArrayRef {
296 self.to_arrow_array().await
297 }
298
299 fn data_type(&self) -> LiquidDataType;
301
302 fn original_arrow_data_type(&self) -> DataType;
304
305 async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
307 let arrow_array = self.to_arrow_array().await;
308 let selection = BooleanArray::new(selection.clone(), None);
309 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
310 }
311
312 async fn try_eval_predicate(
318 &self,
319 _predicate: &Arc<dyn PhysicalExpr>,
320 _filter: &BooleanBuffer,
321 ) -> Option<BooleanArray> {
322 None
323 }
324
325 fn disk_backing(&self) -> SqueezedBacking {
327 SqueezedBacking::Liquid
328 }
329}
330
331#[async_trait::async_trait]
333pub trait SqueezeIoHandler: std::fmt::Debug + Send + Sync {
334 async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes>;
336
337 fn tracing_decompress_count(&self, _decompress_cnt: usize, _total_cnt: usize) {
340 }
342
343 fn trace_io_saved(&self) {
346 }
348}
349
350pub trait PrimitiveKind {
353 const IS_UNSIGNED: bool;
355 const MAX_U64: u64;
357 const MIN_I64: i64;
359 const MAX_I64: i64;
361}
362
363macro_rules! impl_unsigned_kind {
364 ($t:ty, $max:expr) => {
365 impl PrimitiveKind for $t {
366 const IS_UNSIGNED: bool = true;
367 const MAX_U64: u64 = $max as u64;
368 const MIN_I64: i64 = 0; const MAX_I64: i64 = 0; }
371 };
372}
373
374macro_rules! impl_signed_kind {
375 ($t:ty, $min:expr, $max:expr) => {
376 impl PrimitiveKind for $t {
377 const IS_UNSIGNED: bool = false;
378 const MAX_U64: u64 = 0; const MIN_I64: i64 = $min as i64;
380 const MAX_I64: i64 = $max as i64;
381 }
382 };
383}
384
385use arrow::datatypes::{
386 Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, TimestampMicrosecondType,
387 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
388 UInt32Type, UInt64Type,
389};
390
391impl_unsigned_kind!(UInt8Type, u8::MAX);
392impl_unsigned_kind!(UInt16Type, u16::MAX);
393impl_unsigned_kind!(UInt32Type, u32::MAX);
394impl_unsigned_kind!(UInt64Type, u64::MAX);
395
396impl_signed_kind!(Int8Type, i8::MIN, i8::MAX);
397impl_signed_kind!(Int16Type, i16::MIN, i16::MAX);
398impl_signed_kind!(Int32Type, i32::MIN, i32::MAX);
399impl_signed_kind!(Int64Type, i64::MIN, i64::MAX);
400
401impl_signed_kind!(Date32Type, i32::MIN, i32::MAX);
403impl_signed_kind!(Date64Type, i64::MIN, i64::MAX);
404impl_signed_kind!(TimestampSecondType, i64::MIN, i64::MAX);
405impl_signed_kind!(TimestampMillisecondType, i64::MIN, i64::MAX);
406impl_signed_kind!(TimestampMicrosecondType, i64::MIN, i64::MAX);
407impl_signed_kind!(TimestampNanosecondType, i64::MIN, i64::MAX);