Skip to main content

liquid_cache/liquid_array/
decimal_array.rs

1use std::any::Any;
2use std::mem::size_of;
3use std::num::NonZero;
4use std::sync::Arc;
5
6use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
7use arrow::buffer::{BooleanBuffer, ScalarBuffer};
8use arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType, UInt64Type, i256};
9use arrow_schema::DataType;
10use bytes::Bytes;
11use datafusion_common::ScalarValue;
12use datafusion_expr_common::columnar_value::ColumnarValue;
13use datafusion_expr_common::operator::Operator as DFOperator;
14use datafusion_physical_expr::PhysicalExpr;
15use datafusion_physical_expr::expressions::{
16    BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal,
17};
18use datafusion_physical_expr_common::datum::apply_cmp;
19use num_traits::ToPrimitive;
20
21use super::{
22    LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking,
23    Operator, SqueezeIoHandler, SqueezeResult, SqueezedBacking,
24};
25use crate::cache::{CacheExpression, LiquidExpr};
26use crate::liquid_array::eval_predicate_on_array;
27use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
28use crate::liquid_array::raw::BitPackedArray;
29use crate::utils::get_bit_width;
30
31#[derive(Debug, Clone, Copy)]
32struct DecimalMeta {
33    precision: u8,
34    scale: i8,
35    is_256: bool,
36}
37
38impl DecimalMeta {
39    fn from_data_type(data_type: &DataType) -> Self {
40        match data_type {
41            DataType::Decimal128(precision, scale) => Self {
42                precision: *precision,
43                scale: *scale,
44                is_256: false,
45            },
46            DataType::Decimal256(precision, scale) => Self {
47                precision: *precision,
48                scale: *scale,
49                is_256: true,
50            },
51            _ => panic!("unsupported decimal data type: {data_type:?}"),
52        }
53    }
54
55    fn data_type(&self) -> DataType {
56        if self.is_256 {
57            DataType::Decimal256(self.precision, self.scale)
58        } else {
59            DataType::Decimal128(self.precision, self.scale)
60        }
61    }
62
63    fn arrow_code(&self) -> u8 {
64        if self.is_256 { 1 } else { 0 }
65    }
66}
67
68#[repr(C)]
69struct DecimalArrayHeader {
70    arrow_type: u8, // 0 for Decimal128, 1 for Decimal256
71    precision: u8,
72    scale: i8,
73    __padding: u8,
74    __reserved: u32,
75}
76
77impl DecimalArrayHeader {
78    const fn size() -> usize {
79        8
80    }
81
82    fn from_meta(meta: DecimalMeta) -> Self {
83        Self {
84            arrow_type: meta.arrow_code(),
85            precision: meta.precision,
86            scale: meta.scale,
87            __padding: 0,
88            __reserved: 0,
89        }
90    }
91
92    fn to_bytes(&self) -> [u8; Self::size()] {
93        let mut bytes = [0; Self::size()];
94        bytes[0] = self.arrow_type;
95        bytes[1] = self.precision;
96        bytes[2] = self.scale as u8;
97        bytes
98    }
99
100    fn from_bytes(bytes: &[u8]) -> Self {
101        if bytes.len() < Self::size() {
102            panic!(
103                "value too small for DecimalArrayHeader, expected at least {} bytes, got {}",
104                Self::size(),
105                bytes.len()
106            );
107        }
108        Self {
109            arrow_type: bytes[0],
110            precision: bytes[1],
111            scale: bytes[2] as i8,
112            __padding: 0,
113            __reserved: 0,
114        }
115    }
116}
117
118/// Liquid decimal array stored as a compressed u64 primitive.
119#[derive(Debug)]
120pub struct LiquidDecimalArray {
121    meta: DecimalMeta,
122    bit_packed: BitPackedArray<UInt64Type>,
123    reference_value: u64,
124}
125
126impl LiquidDecimalArray {
127    pub(crate) fn fits_u64<T: DecimalType>(array: &PrimitiveArray<T>) -> bool
128    where
129        T::Native: ToPrimitive,
130    {
131        array.iter().flatten().all(|v| v.to_u64().is_some())
132    }
133
134    pub(crate) fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self
135    where
136        T::Native: ToPrimitive,
137    {
138        debug_assert!(Self::fits_u64(array));
139        let meta = DecimalMeta::from_data_type(array.data_type());
140        if array.null_count() == array.len() {
141            return Self {
142                meta,
143                bit_packed: BitPackedArray::new_null_array(array.len()),
144                reference_value: 0,
145            };
146        }
147
148        let nulls = array.nulls().cloned();
149        let mut min = u64::MAX;
150        let mut max = 0u64;
151        let values: Vec<u64> = array
152            .iter()
153            .map(|v| match v {
154                Some(v) => {
155                    let value = v.to_u64().expect("decimal fits u64");
156                    if value < min {
157                        min = value;
158                    }
159                    if value > max {
160                        max = value;
161                    }
162                    value
163                }
164                None => 0,
165            })
166            .collect();
167
168        let bit_width = get_bit_width(max - min);
169        let offsets = ScalarBuffer::from_iter(values.iter().map(|v| v.saturating_sub(min)));
170        let unsigned_array = PrimitiveArray::<UInt64Type>::new(offsets, nulls);
171        let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
172
173        Self {
174            meta,
175            bit_packed,
176            reference_value: min,
177        }
178    }
179
180    fn bit_pack_starting_loc() -> usize {
181        let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
182        (header_size + size_of::<u64>() + 7) & !7
183    }
184
185    fn to_u64_array(&self) -> PrimitiveArray<UInt64Type> {
186        let unsigned_array = self.bit_packed.to_primitive();
187        let (_data_type, values, _nulls) = unsigned_array.into_parts();
188        let nulls = self.bit_packed.nulls();
189        let values = if self.reference_value != 0 {
190            let reference_value = self.reference_value;
191            ScalarBuffer::from_iter(values.iter().map(|v| v.wrapping_add(reference_value)))
192        } else {
193            values
194        };
195        PrimitiveArray::<UInt64Type>::new(values, nulls.cloned())
196    }
197
198    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
199        let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
200        let mut result = Vec::with_capacity(Self::bit_pack_starting_loc() + 256);
201        result.resize(header_size, 0);
202
203        let logical_type_id = LiquidDataType::Decimal as u16;
204        let physical_type_id = get_physical_type_id::<UInt64Type>();
205        let ipc_header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
206        result[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
207
208        let decimal_header = DecimalArrayHeader::from_meta(self.meta);
209        result[LiquidIPCHeader::size()..header_size].copy_from_slice(&decimal_header.to_bytes());
210
211        result.extend_from_slice(&self.reference_value.to_le_bytes());
212        while result.len() < Self::bit_pack_starting_loc() {
213            result.push(0);
214        }
215        self.bit_packed.to_bytes(&mut result);
216        result
217    }
218
219    pub(crate) fn from_bytes(bytes: Bytes) -> Self {
220        let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
221        let header = LiquidIPCHeader::from_bytes(&bytes);
222
223        assert_eq!(header.logical_type_id, LiquidDataType::Decimal as u16);
224        assert_eq!(
225            header.physical_type_id,
226            get_physical_type_id::<UInt64Type>()
227        );
228
229        let decimal_header =
230            DecimalArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
231        let meta = DecimalMeta {
232            precision: decimal_header.precision,
233            scale: decimal_header.scale,
234            is_256: match decimal_header.arrow_type {
235                0 => false,
236                1 => true,
237                _ => panic!(
238                    "unsupported decimal type code: {}",
239                    decimal_header.arrow_type
240                ),
241            },
242        };
243
244        let ref_start = header_size;
245        let ref_end = ref_start + size_of::<u64>();
246        let reference_value = u64::from_le_bytes(bytes[ref_start..ref_end].try_into().unwrap());
247
248        let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
249        let bit_packed = BitPackedArray::<UInt64Type>::from_bytes(bit_packed_data);
250
251        Self {
252            meta,
253            bit_packed,
254            reference_value,
255        }
256    }
257}
258
259impl LiquidArray for LiquidDecimalArray {
260    fn as_any(&self) -> &dyn Any {
261        self
262    }
263
264    fn get_array_memory_size(&self) -> usize {
265        self.bit_packed.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
266    }
267
268    fn len(&self) -> usize {
269        self.bit_packed.len()
270    }
271
272    fn to_arrow_array(&self) -> ArrayRef {
273        let u64_array = self.to_u64_array();
274        let (_data_type, values, nulls) = u64_array.into_parts();
275        let data_type = self.meta.data_type();
276        if self.meta.is_256 {
277            let values_i256 =
278                ScalarBuffer::from_iter(values.iter().map(|v| i256::from_i128(*v as i128)));
279            let array = PrimitiveArray::<Decimal256Type>::new(values_i256, nulls);
280            Arc::new(array.with_data_type(data_type))
281        } else {
282            let values_i128 = ScalarBuffer::from_iter(values.iter().map(|v| *v as i128));
283            let array = PrimitiveArray::<Decimal128Type>::new(values_i128, nulls);
284            Arc::new(array.with_data_type(data_type))
285        }
286    }
287
288    fn original_arrow_data_type(&self) -> DataType {
289        self.meta.data_type()
290    }
291
292    fn to_bytes(&self) -> Vec<u8> {
293        self.to_bytes_inner()
294    }
295
296    fn data_type(&self) -> LiquidDataType {
297        LiquidDataType::Decimal
298    }
299
300    fn squeeze(
301        &self,
302        io: Arc<dyn SqueezeIoHandler>,
303        expression_hint: Option<&CacheExpression>,
304    ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
305        let _expression_hint = expression_hint?;
306        let full_bytes = Bytes::from(self.to_bytes_inner());
307        let disk_range = 0u64..(full_bytes.len() as u64);
308
309        let orig_bw = self.bit_packed.bit_width()?;
310        if orig_bw.get() < 8 {
311            return None;
312        }
313
314        let new_bw_u8 = NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
315        let unsigned_array = self.bit_packed.to_primitive();
316        let (_dt, values, nulls) = unsigned_array.into_parts();
317
318        let max_offset = values.iter().copied().max().unwrap_or(0);
319        let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
320        let range_size = max_offset.saturating_add(1);
321        let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
322
323        let quantized_values: ScalarBuffer<u64> =
324            ScalarBuffer::from_iter(values.iter().map(|&v| {
325                let mut idx_u64 = v / bucket_width_u64;
326                if idx_u64 >= bucket_count_u64 {
327                    idx_u64 = bucket_count_u64 - 1;
328                }
329                idx_u64
330            }));
331        let quantized_unsigned = PrimitiveArray::<UInt64Type>::new(quantized_values, nulls);
332        let quantized_bitpacked = BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
333
334        let hybrid = LiquidDecimalQuantizedArray {
335            quantized: quantized_bitpacked,
336            reference_value: self.reference_value,
337            bucket_width: bucket_width_u64,
338            disk_range,
339            io,
340            meta: self.meta,
341        };
342        Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
343    }
344}
345
346#[derive(Debug, Clone)]
347pub(crate) struct LiquidDecimalQuantizedArray {
348    quantized: BitPackedArray<UInt64Type>,
349    reference_value: u64,
350    bucket_width: u64,
351    disk_range: std::ops::Range<u64>,
352    io: Arc<dyn SqueezeIoHandler>,
353    meta: DecimalMeta,
354}
355
356impl LiquidDecimalQuantizedArray {
357    fn len(&self) -> usize {
358        self.quantized.len()
359    }
360
361    fn new_from_filtered(&self, filtered: PrimitiveArray<UInt64Type>) -> Self {
362        let bit_width = self
363            .quantized
364            .bit_width()
365            .expect("quantized bit width must exist");
366        let quantized = BitPackedArray::from_primitive(filtered, bit_width);
367        Self {
368            quantized,
369            reference_value: self.reference_value,
370            bucket_width: self.bucket_width,
371            disk_range: self.disk_range.clone(),
372            io: self.io.clone(),
373            meta: self.meta,
374        }
375    }
376
377    fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
378        let q_prim: PrimitiveArray<UInt64Type> = self.quantized.to_primitive();
379        let selection = BooleanArray::new(selection.clone(), None);
380        let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
381        let filtered = filtered.as_primitive::<UInt64Type>().clone();
382        self.new_from_filtered(filtered)
383    }
384
385    async fn hydrate_full_arrow(&self) -> ArrayRef {
386        let bytes = self
387            .io
388            .read(Some(self.disk_range.clone()))
389            .await
390            .expect("read squeezed backing");
391        let liquid = crate::liquid_array::ipc::read_from_bytes(
392            bytes,
393            &crate::liquid_array::ipc::LiquidIPCContext::new(None),
394        );
395        liquid.to_arrow_array()
396    }
397
398    fn literal_to_u64(&self, literal: &Literal) -> Option<u64> {
399        match literal.value() {
400            ScalarValue::Decimal128(Some(v), _precision, scale) => {
401                if *scale != self.meta.scale {
402                    return None;
403                }
404                v.to_u64()
405            }
406            ScalarValue::Decimal256(Some(v), _precision, scale) => {
407                if *scale != self.meta.scale {
408                    return None;
409                }
410                v.to_u64()
411            }
412            _ => None,
413        }
414    }
415
416    fn try_eval_predicate_inner(
417        &self,
418        op: &Operator,
419        literal: &Literal,
420    ) -> SqueezeResult<Option<BooleanArray>> {
421        let k = match self.literal_to_u64(literal) {
422            Some(k) => k,
423            None => return Ok(None),
424        };
425
426        let q_prim = self.quantized.to_primitive();
427        let (_dt, values, _nulls) = q_prim.into_parts();
428        let nulls_opt = self.quantized.nulls();
429
430        let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
431
432        let push_const_for_below = |op: &Operator| -> bool {
433            match op {
434                Operator::Eq => false,
435                Operator::NotEq => true,
436                Operator::Lt => false,
437                Operator::LtEq => false,
438                Operator::Gt => true,
439                Operator::GtEq => true,
440            }
441        };
442
443        if k < self.reference_value {
444            let const_val = push_const_for_below(op);
445            if let Some(n) = nulls_opt {
446                for (i, _b) in values.iter().enumerate() {
447                    out_vals.push(n.is_valid(i) && const_val);
448                }
449            } else {
450                out_vals.resize(values.len(), const_val);
451            }
452        } else {
453            let rel = k - self.reference_value;
454            let bw = self.bucket_width;
455            let q = rel / bw;
456            let r = rel % bw;
457
458            let less_side: bool = matches!(
459                op,
460                Operator::Eq | Operator::NotEq | Operator::Lt | Operator::LtEq
461            );
462            let greater_side: bool = matches!(op, Operator::NotEq | Operator::Gt | Operator::GtEq);
463            let on_equal_bucket = |r: u64, bw: u64| -> Option<bool> {
464                match op {
465                    Operator::Eq | Operator::NotEq => None,
466                    Operator::Lt => (r == 0).then_some(false),
467                    Operator::LtEq => (r + 1 == bw).then_some(true),
468                    Operator::Gt => (r + 1 == bw).then_some(false),
469                    Operator::GtEq => (r == 0).then_some(true),
470                }
471            };
472
473            if let Some(n) = nulls_opt {
474                for (i, &b) in values.iter().enumerate() {
475                    if !n.is_valid(i) {
476                        out_vals.push(false);
477                        continue;
478                    }
479                    let v = if b < q {
480                        less_side
481                    } else if b > q {
482                        greater_side
483                    } else {
484                        match on_equal_bucket(r, bw) {
485                            Some(val) => val,
486                            None => return Err(NeedsBacking),
487                        }
488                    };
489                    out_vals.push(v);
490                }
491            } else {
492                for &b in values.iter() {
493                    let v = if b < q {
494                        less_side
495                    } else if b > q {
496                        greater_side
497                    } else {
498                        match on_equal_bucket(r, bw) {
499                            Some(val) => val,
500                            None => return Err(NeedsBacking),
501                        }
502                    };
503                    out_vals.push(v);
504                }
505            }
506        }
507
508        let bool_buf = BooleanBuffer::from_iter(out_vals);
509        let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
510        Ok(Some(out))
511    }
512}
513
514#[async_trait::async_trait]
515impl LiquidSqueezedArray for LiquidDecimalQuantizedArray {
516    fn as_any(&self) -> &dyn Any {
517        self
518    }
519
520    fn get_array_memory_size(&self) -> usize {
521        self.quantized.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
522    }
523
524    fn len(&self) -> usize {
525        LiquidDecimalQuantizedArray::len(self)
526    }
527
528    async fn to_arrow_array(&self) -> ArrayRef {
529        self.hydrate_full_arrow().await
530    }
531
532    fn data_type(&self) -> LiquidDataType {
533        LiquidDataType::Decimal
534    }
535
536    fn original_arrow_data_type(&self) -> DataType {
537        self.meta.data_type()
538    }
539
540    fn disk_backing(&self) -> SqueezedBacking {
541        SqueezedBacking::Liquid((self.disk_range.end - self.disk_range.start) as usize)
542    }
543
544    async fn try_eval_predicate(
545        &self,
546        liquid_expr: &LiquidExpr,
547        filter: &BooleanBuffer,
548    ) -> BooleanArray {
549        let filtered = self.filter_inner(filter);
550
551        let expr = if let Some(expr) = unwrap_dynamic_filter(liquid_expr.physical_expr()) {
552            expr
553        } else {
554            let fallback = self.filter(filter).await;
555            return eval_predicate_on_array(fallback, liquid_expr);
556        };
557        let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() else {
558            let fallback = self.filter(filter).await;
559            return eval_predicate_on_array(fallback, liquid_expr);
560        };
561        if binary_expr
562            .left()
563            .as_any()
564            .downcast_ref::<Column>()
565            .is_none()
566        {
567            let fallback = self.filter(filter).await;
568            return eval_predicate_on_array(fallback, liquid_expr);
569        }
570
571        let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>() else {
572            let fallback = self.filter(filter).await;
573            return eval_predicate_on_array(fallback, liquid_expr);
574        };
575
576        let Some(op) = Operator::from_datafusion(binary_expr.op()) else {
577            let fallback = self.filter(filter).await;
578            return eval_predicate_on_array(fallback, liquid_expr);
579        };
580        match filtered.try_eval_predicate_inner(&op, literal) {
581            Ok(Some(mask)) => {
582                self.io.trace_io_saved();
583                return mask;
584            }
585            Ok(None) => {
586                let fallback = self.filter(filter).await;
587                return eval_predicate_on_array(fallback, liquid_expr);
588            }
589            Err(NeedsBacking) => {}
590        }
591
592        use arrow::array::cast::AsArray;
593
594        let full = self.hydrate_full_arrow().await;
595        let selection_array = BooleanArray::new(filter.clone(), None);
596        let filtered_arr = arrow::compute::filter(&full, &selection_array)
597            .expect("selection must match array length");
598        let filtered_len = filtered_arr.len();
599
600        let lhs = ColumnarValue::Array(filtered_arr);
601        let rhs = ColumnarValue::Scalar(literal.value().clone());
602        let result = match binary_expr.op() {
603            DFOperator::NotEq => apply_cmp(DFOperator::NotEq, &lhs, &rhs),
604            DFOperator::Eq => apply_cmp(DFOperator::Eq, &lhs, &rhs),
605            DFOperator::Lt => apply_cmp(DFOperator::Lt, &lhs, &rhs),
606            DFOperator::LtEq => apply_cmp(DFOperator::LtEq, &lhs, &rhs),
607            DFOperator::Gt => apply_cmp(DFOperator::Gt, &lhs, &rhs),
608            DFOperator::GtEq => apply_cmp(DFOperator::GtEq, &lhs, &rhs),
609            _ => {
610                let fallback = self.filter(filter).await;
611                return eval_predicate_on_array(fallback, liquid_expr);
612            }
613        };
614        let result = result.expect("validated LiquidExpr comparison must evaluate");
615        result
616            .into_array(filtered_len)
617            .expect("comparison output must be an array")
618            .as_boolean()
619            .clone()
620    }
621}
622
623fn unwrap_dynamic_filter(expr: &Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>> {
624    if let Some(dynamic_filter) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
625        dynamic_filter.current().ok()
626    } else {
627        Some(expr.clone())
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::cache::{CacheExpression, TestSqueezeIo};
635    use arrow::array::Decimal128Builder;
636    use arrow::buffer::BooleanBuffer;
637    use datafusion_common::ScalarValue;
638    use datafusion_expr_common::operator::Operator as DFOperator;
639    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
640    use futures::executor::block_on;
641    use std::sync::Arc;
642
643    #[test]
644    fn decimal_u64_roundtrip() {
645        let mut builder = Decimal128Builder::new();
646        builder.append_value(100_i128);
647        builder.append_null();
648        builder.append_value(250_i128);
649        let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
650
651        let liquid = LiquidDecimalArray::from_decimal_array(&original);
652        let arrow = liquid.to_arrow_array();
653        assert_eq!(arrow.as_ref(), &original);
654    }
655
656    #[test]
657    fn decimal_u64_ipc_roundtrip() {
658        let mut builder = Decimal128Builder::new();
659        builder.append_value(12345_i128);
660        builder.append_value(67890_i128);
661        let original = builder.finish().with_precision_and_scale(12, 3).unwrap();
662
663        let liquid = LiquidDecimalArray::from_decimal_array(&original);
664        let bytes = liquid.to_bytes();
665        let decoded = LiquidDecimalArray::from_bytes(bytes.into());
666        let arrow = decoded.to_arrow_array();
667        assert_eq!(arrow.as_ref(), &original);
668    }
669
670    #[test]
671    fn decimal_quantized_predicate_eval() {
672        let mut builder = Decimal128Builder::new();
673        builder.append_value(100_i128);
674        builder.append_value(200_i128);
675        builder.append_null();
676        builder.append_value(300_i128);
677        let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
678
679        let liquid = LiquidDecimalArray::from_decimal_array(&original);
680        let hint = CacheExpression::PredicateColumn;
681        let io = Arc::new(TestSqueezeIo::default());
682        let (hybrid, bytes) = liquid.squeeze(io.clone(), Some(&hint)).expect("squeezable");
683        io.set_bytes(bytes);
684
685        let mask = BooleanBuffer::new_set(original.len());
686        let lit = Arc::new(Literal::new(ScalarValue::Decimal128(Some(100_i128), 10, 2)));
687        let col = Arc::new(Column::new("col", 0));
688        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(col, DFOperator::GtEq, lit));
689
690        let got = block_on(hybrid.try_eval_predicate(
691            &crate::cache::LiquidExpr::new_unchecked(expr.clone()),
692            &mask,
693        ));
694        let expected = BooleanArray::from(vec![Some(true), Some(true), None, Some(true)]);
695        assert_eq!(got, expected);
696        assert_eq!(io.reads(), 0);
697    }
698}