Skip to main content

liquid_cache/liquid_array/
decimal_array.rs

1use std::any::Any;
2use std::mem::size_of;
3use std::num::NonZero;
4use std::sync::Arc;
5
6use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
7use arrow::buffer::{BooleanBuffer, ScalarBuffer};
8use arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType, UInt64Type, i256};
9use arrow_schema::DataType;
10use bytes::Bytes;
11use datafusion::physical_plan::PhysicalExpr;
12use datafusion::physical_plan::expressions::{
13    BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal,
14};
15use num_traits::ToPrimitive;
16
17use super::{
18    LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking,
19    Operator, SqueezeIoHandler, SqueezeResult,
20};
21use crate::cache::CacheExpression;
22use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
23use crate::liquid_array::raw::BitPackedArray;
24use crate::utils::get_bit_width;
25
26#[derive(Debug, Clone, Copy)]
27struct DecimalMeta {
28    precision: u8,
29    scale: i8,
30    is_256: bool,
31}
32
33impl DecimalMeta {
34    fn from_data_type(data_type: &DataType) -> Self {
35        match data_type {
36            DataType::Decimal128(precision, scale) => Self {
37                precision: *precision,
38                scale: *scale,
39                is_256: false,
40            },
41            DataType::Decimal256(precision, scale) => Self {
42                precision: *precision,
43                scale: *scale,
44                is_256: true,
45            },
46            _ => panic!("unsupported decimal data type: {data_type:?}"),
47        }
48    }
49
50    fn data_type(&self) -> DataType {
51        if self.is_256 {
52            DataType::Decimal256(self.precision, self.scale)
53        } else {
54            DataType::Decimal128(self.precision, self.scale)
55        }
56    }
57
58    fn arrow_code(&self) -> u8 {
59        if self.is_256 { 1 } else { 0 }
60    }
61}
62
63#[repr(C)]
64struct DecimalArrayHeader {
65    arrow_type: u8, // 0 for Decimal128, 1 for Decimal256
66    precision: u8,
67    scale: i8,
68    __padding: u8,
69    __reserved: u32,
70}
71
72impl DecimalArrayHeader {
73    const fn size() -> usize {
74        8
75    }
76
77    fn from_meta(meta: DecimalMeta) -> Self {
78        Self {
79            arrow_type: meta.arrow_code(),
80            precision: meta.precision,
81            scale: meta.scale,
82            __padding: 0,
83            __reserved: 0,
84        }
85    }
86
87    fn to_bytes(&self) -> [u8; Self::size()] {
88        let mut bytes = [0; Self::size()];
89        bytes[0] = self.arrow_type;
90        bytes[1] = self.precision;
91        bytes[2] = self.scale as u8;
92        bytes
93    }
94
95    fn from_bytes(bytes: &[u8]) -> Self {
96        if bytes.len() < Self::size() {
97            panic!(
98                "value too small for DecimalArrayHeader, expected at least {} bytes, got {}",
99                Self::size(),
100                bytes.len()
101            );
102        }
103        Self {
104            arrow_type: bytes[0],
105            precision: bytes[1],
106            scale: bytes[2] as i8,
107            __padding: 0,
108            __reserved: 0,
109        }
110    }
111}
112
113/// Liquid decimal array stored as a compressed u64 primitive.
114#[derive(Debug)]
115pub struct LiquidDecimalArray {
116    meta: DecimalMeta,
117    bit_packed: BitPackedArray<UInt64Type>,
118    reference_value: u64,
119}
120
121impl LiquidDecimalArray {
122    pub(crate) fn fits_u64<T: DecimalType>(array: &PrimitiveArray<T>) -> bool
123    where
124        T::Native: ToPrimitive,
125    {
126        array.iter().flatten().all(|v| v.to_u64().is_some())
127    }
128
129    pub(crate) fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self
130    where
131        T::Native: ToPrimitive,
132    {
133        debug_assert!(Self::fits_u64(array));
134        let meta = DecimalMeta::from_data_type(array.data_type());
135        if array.null_count() == array.len() {
136            return Self {
137                meta,
138                bit_packed: BitPackedArray::new_null_array(array.len()),
139                reference_value: 0,
140            };
141        }
142
143        let nulls = array.nulls().cloned();
144        let mut min = u64::MAX;
145        let mut max = 0u64;
146        let values: Vec<u64> = array
147            .iter()
148            .map(|v| match v {
149                Some(v) => {
150                    let value = v.to_u64().expect("decimal fits u64");
151                    if value < min {
152                        min = value;
153                    }
154                    if value > max {
155                        max = value;
156                    }
157                    value
158                }
159                None => 0,
160            })
161            .collect();
162
163        let bit_width = get_bit_width(max - min);
164        let offsets = ScalarBuffer::from_iter(values.iter().map(|v| v.saturating_sub(min)));
165        let unsigned_array = PrimitiveArray::<UInt64Type>::new(offsets, nulls);
166        let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
167
168        Self {
169            meta,
170            bit_packed,
171            reference_value: min,
172        }
173    }
174
175    fn bit_pack_starting_loc() -> usize {
176        let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
177        (header_size + size_of::<u64>() + 7) & !7
178    }
179
180    fn to_u64_array(&self) -> PrimitiveArray<UInt64Type> {
181        let unsigned_array = self.bit_packed.to_primitive();
182        let (_data_type, values, _nulls) = unsigned_array.into_parts();
183        let nulls = self.bit_packed.nulls();
184        let values = if self.reference_value != 0 {
185            let reference_value = self.reference_value;
186            ScalarBuffer::from_iter(values.iter().map(|v| v.wrapping_add(reference_value)))
187        } else {
188            values
189        };
190        PrimitiveArray::<UInt64Type>::new(values, nulls.cloned())
191    }
192
193    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
194        let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
195        let mut result = Vec::with_capacity(Self::bit_pack_starting_loc() + 256);
196        result.resize(header_size, 0);
197
198        let logical_type_id = LiquidDataType::Decimal as u16;
199        let physical_type_id = get_physical_type_id::<UInt64Type>();
200        let ipc_header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
201        result[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
202
203        let decimal_header = DecimalArrayHeader::from_meta(self.meta);
204        result[LiquidIPCHeader::size()..header_size].copy_from_slice(&decimal_header.to_bytes());
205
206        result.extend_from_slice(&self.reference_value.to_le_bytes());
207        while result.len() < Self::bit_pack_starting_loc() {
208            result.push(0);
209        }
210        self.bit_packed.to_bytes(&mut result);
211        result
212    }
213
214    pub(crate) fn from_bytes(bytes: Bytes) -> Self {
215        let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
216        let header = LiquidIPCHeader::from_bytes(&bytes);
217
218        assert_eq!(header.logical_type_id, LiquidDataType::Decimal as u16);
219        assert_eq!(
220            header.physical_type_id,
221            get_physical_type_id::<UInt64Type>()
222        );
223
224        let decimal_header =
225            DecimalArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
226        let meta = DecimalMeta {
227            precision: decimal_header.precision,
228            scale: decimal_header.scale,
229            is_256: match decimal_header.arrow_type {
230                0 => false,
231                1 => true,
232                _ => panic!(
233                    "unsupported decimal type code: {}",
234                    decimal_header.arrow_type
235                ),
236            },
237        };
238
239        let ref_start = header_size;
240        let ref_end = ref_start + size_of::<u64>();
241        let reference_value = u64::from_le_bytes(bytes[ref_start..ref_end].try_into().unwrap());
242
243        let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
244        let bit_packed = BitPackedArray::<UInt64Type>::from_bytes(bit_packed_data);
245
246        Self {
247            meta,
248            bit_packed,
249            reference_value,
250        }
251    }
252}
253
254impl LiquidArray for LiquidDecimalArray {
255    fn as_any(&self) -> &dyn Any {
256        self
257    }
258
259    fn get_array_memory_size(&self) -> usize {
260        self.bit_packed.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
261    }
262
263    fn len(&self) -> usize {
264        self.bit_packed.len()
265    }
266
267    fn to_arrow_array(&self) -> ArrayRef {
268        let u64_array = self.to_u64_array();
269        let (_data_type, values, nulls) = u64_array.into_parts();
270        let data_type = self.meta.data_type();
271        if self.meta.is_256 {
272            let values_i256 =
273                ScalarBuffer::from_iter(values.iter().map(|v| i256::from_i128(*v as i128)));
274            let array = PrimitiveArray::<Decimal256Type>::new(values_i256, nulls);
275            Arc::new(array.with_data_type(data_type))
276        } else {
277            let values_i128 = ScalarBuffer::from_iter(values.iter().map(|v| *v as i128));
278            let array = PrimitiveArray::<Decimal128Type>::new(values_i128, nulls);
279            Arc::new(array.with_data_type(data_type))
280        }
281    }
282
283    fn original_arrow_data_type(&self) -> DataType {
284        self.meta.data_type()
285    }
286
287    fn to_bytes(&self) -> Vec<u8> {
288        self.to_bytes_inner()
289    }
290
291    fn data_type(&self) -> LiquidDataType {
292        LiquidDataType::Decimal
293    }
294
295    fn squeeze(
296        &self,
297        io: Arc<dyn SqueezeIoHandler>,
298        expression_hint: Option<&CacheExpression>,
299    ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
300        let _expression_hint = expression_hint?;
301        let full_bytes = Bytes::from(self.to_bytes_inner());
302        let disk_range = 0u64..(full_bytes.len() as u64);
303
304        let orig_bw = self.bit_packed.bit_width()?;
305        if orig_bw.get() < 8 {
306            return None;
307        }
308
309        let new_bw_u8 = NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
310        let unsigned_array = self.bit_packed.to_primitive();
311        let (_dt, values, nulls) = unsigned_array.into_parts();
312
313        let max_offset = values.iter().copied().max().unwrap_or(0);
314        let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
315        let range_size = max_offset.saturating_add(1);
316        let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
317
318        let quantized_values: ScalarBuffer<u64> =
319            ScalarBuffer::from_iter(values.iter().map(|&v| {
320                let mut idx_u64 = v / bucket_width_u64;
321                if idx_u64 >= bucket_count_u64 {
322                    idx_u64 = bucket_count_u64 - 1;
323                }
324                idx_u64
325            }));
326        let quantized_unsigned = PrimitiveArray::<UInt64Type>::new(quantized_values, nulls);
327        let quantized_bitpacked = BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
328
329        let hybrid = LiquidDecimalQuantizedArray {
330            quantized: quantized_bitpacked,
331            reference_value: self.reference_value,
332            bucket_width: bucket_width_u64,
333            disk_range,
334            io,
335            meta: self.meta,
336        };
337        Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
338    }
339}
340
341#[derive(Debug, Clone)]
342pub(crate) struct LiquidDecimalQuantizedArray {
343    quantized: BitPackedArray<UInt64Type>,
344    reference_value: u64,
345    bucket_width: u64,
346    disk_range: std::ops::Range<u64>,
347    io: Arc<dyn SqueezeIoHandler>,
348    meta: DecimalMeta,
349}
350
351impl LiquidDecimalQuantizedArray {
352    fn len(&self) -> usize {
353        self.quantized.len()
354    }
355
356    fn new_from_filtered(&self, filtered: PrimitiveArray<UInt64Type>) -> Self {
357        let bit_width = self
358            .quantized
359            .bit_width()
360            .expect("quantized bit width must exist");
361        let quantized = BitPackedArray::from_primitive(filtered, bit_width);
362        Self {
363            quantized,
364            reference_value: self.reference_value,
365            bucket_width: self.bucket_width,
366            disk_range: self.disk_range.clone(),
367            io: self.io.clone(),
368            meta: self.meta,
369        }
370    }
371
372    fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
373        let q_prim: PrimitiveArray<UInt64Type> = self.quantized.to_primitive();
374        let selection = BooleanArray::new(selection.clone(), None);
375        let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
376        let filtered = filtered.as_primitive::<UInt64Type>().clone();
377        self.new_from_filtered(filtered)
378    }
379
380    async fn hydrate_full_arrow(&self) -> ArrayRef {
381        let bytes = self
382            .io
383            .read(Some(self.disk_range.clone()))
384            .await
385            .expect("read squeezed backing");
386        let liquid = crate::liquid_array::ipc::read_from_bytes(
387            bytes,
388            &crate::liquid_array::ipc::LiquidIPCContext::new(None),
389        );
390        liquid.to_arrow_array()
391    }
392
393    fn literal_to_u64(&self, literal: &Literal) -> Option<u64> {
394        use datafusion::common::ScalarValue;
395        match literal.value() {
396            ScalarValue::Decimal128(Some(v), _precision, scale) => {
397                if *scale != self.meta.scale {
398                    return None;
399                }
400                v.to_u64()
401            }
402            ScalarValue::Decimal256(Some(v), _precision, scale) => {
403                if *scale != self.meta.scale {
404                    return None;
405                }
406                v.to_u64()
407            }
408            _ => None,
409        }
410    }
411
412    fn try_eval_predicate_inner(
413        &self,
414        op: &Operator,
415        literal: &Literal,
416    ) -> SqueezeResult<Option<BooleanArray>> {
417        let k = match self.literal_to_u64(literal) {
418            Some(k) => k,
419            None => return Ok(None),
420        };
421
422        let q_prim = self.quantized.to_primitive();
423        let (_dt, values, _nulls) = q_prim.into_parts();
424        let nulls_opt = self.quantized.nulls();
425
426        let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
427
428        let push_const_for_below = |op: &Operator| -> bool {
429            match op {
430                Operator::Eq => false,
431                Operator::NotEq => true,
432                Operator::Lt => false,
433                Operator::LtEq => false,
434                Operator::Gt => true,
435                Operator::GtEq => true,
436            }
437        };
438
439        if k < self.reference_value {
440            let const_val = push_const_for_below(op);
441            if let Some(n) = nulls_opt {
442                for (i, _b) in values.iter().enumerate() {
443                    out_vals.push(n.is_valid(i) && const_val);
444                }
445            } else {
446                out_vals.resize(values.len(), const_val);
447            }
448        } else {
449            let rel = k - self.reference_value;
450            let bw = self.bucket_width;
451            let q = rel / bw;
452            let r = rel % bw;
453
454            let less_side: bool = matches!(
455                op,
456                Operator::Eq | Operator::NotEq | Operator::Lt | Operator::LtEq
457            );
458            let greater_side: bool = matches!(op, Operator::NotEq | Operator::Gt | Operator::GtEq);
459            let on_equal_bucket = |r: u64, bw: u64| -> Option<bool> {
460                match op {
461                    Operator::Eq | Operator::NotEq => None,
462                    Operator::Lt => (r == 0).then_some(false),
463                    Operator::LtEq => (r + 1 == bw).then_some(true),
464                    Operator::Gt => (r + 1 == bw).then_some(false),
465                    Operator::GtEq => (r == 0).then_some(true),
466                }
467            };
468
469            if let Some(n) = nulls_opt {
470                for (i, &b) in values.iter().enumerate() {
471                    if !n.is_valid(i) {
472                        out_vals.push(false);
473                        continue;
474                    }
475                    let v = if b < q {
476                        less_side
477                    } else if b > q {
478                        greater_side
479                    } else {
480                        match on_equal_bucket(r, bw) {
481                            Some(val) => val,
482                            None => return Err(NeedsBacking),
483                        }
484                    };
485                    out_vals.push(v);
486                }
487            } else {
488                for &b in values.iter() {
489                    let v = if b < q {
490                        less_side
491                    } else if b > q {
492                        greater_side
493                    } else {
494                        match on_equal_bucket(r, bw) {
495                            Some(val) => val,
496                            None => return Err(NeedsBacking),
497                        }
498                    };
499                    out_vals.push(v);
500                }
501            }
502        }
503
504        let bool_buf = BooleanBuffer::from_iter(out_vals);
505        let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
506        Ok(Some(out))
507    }
508}
509
510#[async_trait::async_trait]
511impl LiquidSqueezedArray for LiquidDecimalQuantizedArray {
512    fn as_any(&self) -> &dyn Any {
513        self
514    }
515
516    fn get_array_memory_size(&self) -> usize {
517        self.quantized.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
518    }
519
520    fn len(&self) -> usize {
521        LiquidDecimalQuantizedArray::len(self)
522    }
523
524    async fn to_arrow_array(&self) -> ArrayRef {
525        self.hydrate_full_arrow().await
526    }
527
528    fn data_type(&self) -> LiquidDataType {
529        LiquidDataType::Decimal
530    }
531
532    fn original_arrow_data_type(&self) -> DataType {
533        self.meta.data_type()
534    }
535
536    async fn try_eval_predicate(
537        &self,
538        expr: &Arc<dyn PhysicalExpr>,
539        filter: &BooleanBuffer,
540    ) -> Option<BooleanArray> {
541        let filtered = self.filter_inner(filter);
542
543        let expr = unwrap_dynamic_filter(expr)?;
544        let binary_expr = expr.as_any().downcast_ref::<BinaryExpr>()?;
545        binary_expr.left().as_any().downcast_ref::<Column>()?;
546
547        let literal = binary_expr.right().as_any().downcast_ref::<Literal>()?;
548
549        let op = Operator::from_datafusion(binary_expr.op())?;
550        match filtered.try_eval_predicate_inner(&op, literal) {
551            Ok(Some(mask)) => {
552                self.io.trace_io_saved();
553                return Some(mask);
554            }
555            Ok(None) => return None,
556            Err(NeedsBacking) => {}
557        }
558
559        use arrow::array::cast::AsArray;
560        use datafusion::logical_expr::ColumnarValue;
561        use datafusion::physical_expr_common::datum::apply_cmp;
562
563        let full = self.hydrate_full_arrow().await;
564        let selection_array = BooleanArray::new(filter.clone(), None);
565        let filtered_arr = arrow::compute::filter(&full, &selection_array).ok()?;
566        let filtered_len = filtered_arr.len();
567
568        let lhs = ColumnarValue::Array(filtered_arr);
569        let rhs = ColumnarValue::Scalar(literal.value().clone());
570        let result = match binary_expr.op() {
571            datafusion::logical_expr::Operator::NotEq => {
572                apply_cmp(datafusion::logical_expr::Operator::NotEq, &lhs, &rhs)
573            }
574            datafusion::logical_expr::Operator::Eq => {
575                apply_cmp(datafusion::logical_expr::Operator::Eq, &lhs, &rhs)
576            }
577            datafusion::logical_expr::Operator::Lt => {
578                apply_cmp(datafusion::logical_expr::Operator::Lt, &lhs, &rhs)
579            }
580            datafusion::logical_expr::Operator::LtEq => {
581                apply_cmp(datafusion::logical_expr::Operator::LtEq, &lhs, &rhs)
582            }
583            datafusion::logical_expr::Operator::Gt => {
584                apply_cmp(datafusion::logical_expr::Operator::Gt, &lhs, &rhs)
585            }
586            datafusion::logical_expr::Operator::GtEq => {
587                apply_cmp(datafusion::logical_expr::Operator::GtEq, &lhs, &rhs)
588            }
589            _ => return None,
590        };
591        let result = result.ok()?;
592        Some(result.into_array(filtered_len).ok()?.as_boolean().clone())
593    }
594}
595
596fn unwrap_dynamic_filter(expr: &Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>> {
597    if let Some(dynamic_filter) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
598        dynamic_filter.current().ok()
599    } else {
600        Some(expr.clone())
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607    use crate::cache::{CacheExpression, TestSqueezeIo};
608    use arrow::array::Decimal128Builder;
609    use arrow::buffer::BooleanBuffer;
610    use datafusion::logical_expr::Operator as DFOperator;
611    use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
612    use datafusion::scalar::ScalarValue;
613    use futures::executor::block_on;
614    use std::sync::Arc;
615
616    #[test]
617    fn decimal_u64_roundtrip() {
618        let mut builder = Decimal128Builder::new();
619        builder.append_value(100_i128);
620        builder.append_null();
621        builder.append_value(250_i128);
622        let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
623
624        let liquid = LiquidDecimalArray::from_decimal_array(&original);
625        let arrow = liquid.to_arrow_array();
626        assert_eq!(arrow.as_ref(), &original);
627    }
628
629    #[test]
630    fn decimal_u64_ipc_roundtrip() {
631        let mut builder = Decimal128Builder::new();
632        builder.append_value(12345_i128);
633        builder.append_value(67890_i128);
634        let original = builder.finish().with_precision_and_scale(12, 3).unwrap();
635
636        let liquid = LiquidDecimalArray::from_decimal_array(&original);
637        let bytes = liquid.to_bytes();
638        let decoded = LiquidDecimalArray::from_bytes(bytes.into());
639        let arrow = decoded.to_arrow_array();
640        assert_eq!(arrow.as_ref(), &original);
641    }
642
643    #[test]
644    fn decimal_quantized_predicate_eval() {
645        let mut builder = Decimal128Builder::new();
646        builder.append_value(100_i128);
647        builder.append_value(200_i128);
648        builder.append_null();
649        builder.append_value(300_i128);
650        let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
651
652        let liquid = LiquidDecimalArray::from_decimal_array(&original);
653        let hint = CacheExpression::PredicateColumn;
654        let io = Arc::new(TestSqueezeIo::default());
655        let (hybrid, bytes) = liquid.squeeze(io.clone(), Some(&hint)).expect("squeezable");
656        io.set_bytes(bytes);
657
658        let mask = BooleanBuffer::new_set(original.len());
659        let lit = Arc::new(Literal::new(ScalarValue::Decimal128(Some(100_i128), 10, 2)));
660        let col = Arc::new(Column::new("col", 0));
661        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(col, DFOperator::GtEq, lit));
662
663        let got = block_on(hybrid.try_eval_predicate(&expr, &mask)).expect("supported");
664        let expected = BooleanArray::from(vec![Some(true), Some(true), None, Some(true)]);
665        assert_eq!(got, expected);
666        assert_eq!(io.reads(), 0);
667    }
668}