Skip to main content

liquid_cache/liquid_array/
float_array.rs

1///
2/// Acknowledgement:
3/// The ALP compression implemented in this file is based on the Rust implementation available at https://github.com/spiraldb/alp
4///
5use std::{
6    any::Any,
7    fmt::Debug,
8    num::NonZero,
9    ops::{Mul, Shl, Shr},
10    sync::Arc,
11};
12
13use arrow::{
14    array::{
15        Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, AsArray, BooleanArray,
16        PrimitiveArray,
17    },
18    buffer::{BooleanBuffer, ScalarBuffer},
19    datatypes::{
20        ArrowNativeType, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type, UInt64Type,
21    },
22};
23use arrow_schema::DataType;
24use datafusion_common::ScalarValue;
25use datafusion_expr_common::columnar_value::ColumnarValue;
26use datafusion_expr_common::operator::Operator as DFOperator;
27use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
28use datafusion_physical_expr_common::datum::apply_cmp;
29use fastlanes::BitPacking;
30use num_traits::{AsPrimitive, Float, FromPrimitive};
31
32use super::LiquidDataType;
33use crate::cache::LiquidExpr;
34use crate::liquid_array::LiquidArray;
35use crate::liquid_array::ipc::{PhysicalTypeMarker, get_physical_type_id};
36use crate::liquid_array::raw::BitPackedArray;
37use crate::liquid_array::{
38    LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking, Operator, SqueezeResult,
39    SqueezedBacking, eval_predicate_on_array, ipc::LiquidIPCHeader,
40};
41use crate::utils::get_bit_width;
42use crate::{cache::CacheExpression, liquid_array::SqueezeIoHandler};
43use bytes::Bytes;
44
45mod private {
46    use arrow::{
47        array::ArrowNumericType,
48        datatypes::{Float32Type, Float64Type},
49    };
50    use num_traits::AsPrimitive;
51
52    pub trait Sealed: ArrowNumericType<Native: AsPrimitive<f64> + AsPrimitive<f32>> {}
53
54    impl Sealed for Float32Type {}
55    impl Sealed for Float64Type {}
56}
57
58const NUM_SAMPLES: usize = 1024; // we use FASTLANES to encode array, the sample size needs to be at least 1024 to get a good estimate of the best exponents
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub enum FloatSqueezePolicy {
62    /// Quantize values into buckets (good for coarse filtering; requires disk to recover values).
63    #[default]
64    Quantize = 0,
65}
66
67/// LiquidFloatType is a sealed trait that represents all the float types supported by Liquid.
68/// Implementors are Float32Type and Float64Type. TODO(): What about Float16Type, decimal types?
69pub trait LiquidFloatType:
70        ArrowPrimitiveType<
71            Native: AsPrimitive<
72                <Self::UnsignedIntType as ArrowPrimitiveType>::Native // Native must be convertible to the Native type of Self::UnSignedType
73            >
74            + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
75            + FromPrimitive
76            + AsPrimitive<<Self as ArrowPrimitiveType>::Native>
77            + Mul<<Self as ArrowPrimitiveType>::Native>
78            + Float // required for decode_single and encode_single_unchecked
79        >
80        + private::Sealed
81        + Debug
82        + PhysicalTypeMarker
83{
84    type UnsignedIntType:
85        ArrowPrimitiveType<
86            Native: BitPacking +
87                AsPrimitive<<Self as ArrowPrimitiveType>::Native>
88                + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
89                + AsPrimitive<u64>
90        >
91        + Debug;
92    type SignedIntType:
93        ArrowPrimitiveType<
94            Native: AsPrimitive<<Self as ArrowPrimitiveType>::Native>
95                + AsPrimitive<<Self::UnsignedIntType as ArrowPrimitiveType>::Native>
96                + Ord
97                + Shr<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
98                + Shl<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
99                + From<i32>
100        >
101        + Debug + Sync + Send;
102
103    const SWEET: <Self as ArrowPrimitiveType>::Native;
104    const MAX_EXPONENT: u8;
105    const FRACTIONAL_BITS: u8;
106    const F10: &'static [<Self as ArrowPrimitiveType>::Native];
107    const IF10: &'static [<Self as ArrowPrimitiveType>::Native];
108
109    #[inline]
110    fn fast_round(val: <Self as ArrowPrimitiveType>::Native) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
111        ((val + Self::SWEET) - Self::SWEET).as_()
112    }
113
114    #[inline]
115    fn encode_single_unchecked(val: &<Self as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
116        Self::fast_round(*val * Self::F10[exp.e as usize] * Self::IF10[exp.f as usize])
117    }
118
119    #[inline]
120    fn decode_single(val: &<Self::SignedIntType as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self as ArrowPrimitiveType>::Native {
121        let decoded_float: <Self as ArrowPrimitiveType>::Native = (*val).as_();
122        decoded_float * Self::F10[exp.f as usize] * Self::IF10[exp.e as usize]
123    }
124
125}
126
127impl LiquidFloatType for Float32Type {
128    type UnsignedIntType = UInt32Type;
129    type SignedIntType = Int32Type;
130    const FRACTIONAL_BITS: u8 = 23;
131    const MAX_EXPONENT: u8 = 10;
132    const SWEET: <Self as ArrowPrimitiveType>::Native = (1 << Self::FRACTIONAL_BITS)
133        as <Self as ArrowPrimitiveType>::Native
134        + (1 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
135    const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
136        1.0,
137        10.0,
138        100.0,
139        1000.0,
140        10000.0,
141        100000.0,
142        1000000.0,
143        10000000.0,
144        100000000.0,
145        1000000000.0,
146        10000000000.0, // 10^10
147    ];
148    const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
149        1.0,
150        0.1,
151        0.01,
152        0.001,
153        0.0001,
154        0.00001,
155        0.000001,
156        0.0000001,
157        0.00000001,
158        0.000000001,
159        0.0000000001, // 10^-10
160    ];
161}
162
163impl LiquidFloatType for Float64Type {
164    type UnsignedIntType = UInt64Type;
165    type SignedIntType = Int64Type;
166    const FRACTIONAL_BITS: u8 = 52;
167    const MAX_EXPONENT: u8 = 18;
168    const SWEET: <Self as ArrowPrimitiveType>::Native = (1u64 << Self::FRACTIONAL_BITS)
169        as <Self as ArrowPrimitiveType>::Native
170        + (1u64 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
171    const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
172        1.0,
173        10.0,
174        100.0,
175        1000.0,
176        10000.0,
177        100000.0,
178        1000000.0,
179        10000000.0,
180        100000000.0,
181        1000000000.0,
182        10000000000.0,
183        100000000000.0,
184        1000000000000.0,
185        10000000000000.0,
186        100000000000000.0,
187        1000000000000000.0,
188        10000000000000000.0,
189        100000000000000000.0,
190        1000000000000000000.0,
191        10000000000000000000.0,
192        100000000000000000000.0,
193        1000000000000000000000.0,
194        10000000000000000000000.0,
195        100000000000000000000000.0, // 10^23
196    ];
197
198    const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
199        1.0,
200        0.1,
201        0.01,
202        0.001,
203        0.0001,
204        0.00001,
205        0.000001,
206        0.0000001,
207        0.00000001,
208        0.000000001,
209        0.0000000001,
210        0.00000000001,
211        0.000000000001,
212        0.0000000000001,
213        0.00000000000001,
214        0.000000000000001,
215        0.0000000000000001,
216        0.00000000000000001,
217        0.000000000000000001,
218        0.0000000000000000001,
219        0.00000000000000000001,
220        0.000000000000000000001,
221        0.0000000000000000000001,
222        0.00000000000000000000001, // 10^-23
223    ];
224}
225
226/// Liquid's single-precision floating point array
227pub type LiquidFloat32Array = LiquidFloatArray<Float32Type>;
228/// Liquid's double precision floating point array
229pub type LiquidFloat64Array = LiquidFloatArray<Float64Type>;
230
231/// An array that stores floats in ALP
232#[derive(Debug, Clone)]
233pub struct LiquidFloatArray<T: LiquidFloatType> {
234    exponent: Exponents,
235    bit_packed: BitPackedArray<T::UnsignedIntType>,
236    patch_indices: Vec<u64>,
237    patch_values: Vec<T::Native>,
238    reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
239    squeeze_policy: FloatSqueezePolicy,
240}
241
242impl<T> LiquidFloatArray<T>
243where
244    T: LiquidFloatType,
245{
246    /// Check if the Liquid float array is empty.
247    pub fn is_empty(&self) -> bool {
248        self.len() == 0
249    }
250
251    /// Get the length of the Liquid float array.
252    pub fn len(&self) -> usize {
253        self.bit_packed.len()
254    }
255
256    /// Get the memory size of the Liquid primitive array.
257    pub fn get_array_memory_size(&self) -> usize {
258        self.bit_packed.get_array_memory_size()
259            + size_of::<Exponents>()
260            + self.patch_indices.capacity() * size_of::<u64>()
261            + self.patch_values.capacity() * size_of::<T::Native>()
262            + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
263    }
264
265    /// Create a Liquid primitive array from an Arrow float array.
266    pub fn from_arrow_array(arrow_array: arrow::array::PrimitiveArray<T>) -> LiquidFloatArray<T> {
267        let best_exponents = get_best_exponents::<T>(&arrow_array);
268        encode_arrow_array(&arrow_array, &best_exponents)
269    }
270
271    /// Get current squeeze policy for this array
272    pub fn squeeze_policy(&self) -> FloatSqueezePolicy {
273        self.squeeze_policy
274    }
275}
276
277impl<T> LiquidArray for LiquidFloatArray<T>
278where
279    T: LiquidFloatType,
280{
281    fn as_any(&self) -> &dyn Any {
282        self
283    }
284
285    fn get_array_memory_size(&self) -> usize {
286        self.get_array_memory_size()
287    }
288
289    fn len(&self) -> usize {
290        self.len()
291    }
292
293    #[inline]
294    fn to_arrow_array(&self) -> ArrayRef {
295        let unsigned_array = self.bit_packed.to_primitive();
296        let (_data_type, values, _nulls) = unsigned_array.into_parts();
297        let nulls = self.bit_packed.nulls();
298        // TODO(): Check if we should align vectors to cache line boundary
299        let mut decoded_values = Vec::from_iter(values.iter().map(|v| {
300            let mut val: <T::SignedIntType as ArrowPrimitiveType>::Native = (*v).as_();
301            val = val.add_wrapping(self.reference_value);
302            T::decode_single(&val, &self.exponent)
303        }));
304
305        // Patch values
306        if !self.patch_indices.is_empty() {
307            for i in 0..self.patch_indices.len() {
308                decoded_values[self.patch_indices[i].as_usize()] = self.patch_values[i];
309            }
310        }
311
312        Arc::new(PrimitiveArray::<T>::new(
313            ScalarBuffer::<<T as ArrowPrimitiveType>::Native>::from(decoded_values),
314            nulls.cloned(),
315        ))
316    }
317
318    fn original_arrow_data_type(&self) -> DataType {
319        T::DATA_TYPE.clone()
320    }
321
322    fn data_type(&self) -> LiquidDataType {
323        LiquidDataType::Float
324    }
325
326    fn to_bytes(&self) -> Vec<u8> {
327        self.to_bytes_inner()
328    }
329
330    fn is_empty(&self) -> bool {
331        self.len() == 0
332    }
333
334    fn to_best_arrow_array(&self) -> ArrayRef {
335        self.to_arrow_array()
336    }
337
338    fn squeeze(
339        &self,
340        io: Arc<dyn SqueezeIoHandler>,
341        _expression_hint: Option<&CacheExpression>,
342    ) -> Option<(super::LiquidSqueezedArrayRef, bytes::Bytes)> {
343        let orig_bw = self.bit_packed.bit_width()?;
344        if orig_bw.get() < 8 {
345            return None;
346        }
347
348        // New squeezed bit width is half of the original
349        let new_bw = orig_bw.get() / 2;
350
351        let full_bytes = Bytes::from(self.to_bytes_inner());
352        let disk_range = 0u64..(full_bytes.len() as u64);
353
354        let (_dt, values, nulls) = self.bit_packed.to_primitive().into_parts();
355
356        match self.squeeze_policy {
357            FloatSqueezePolicy::Quantize => {
358                let shift = orig_bw.get() - new_bw;
359                let quantized_min = self.reference_value.shr(shift);
360                // let quantized_max = values
361                let quantized_values: ScalarBuffer<
362                    <T::UnsignedIntType as ArrowPrimitiveType>::Native,
363                > = ScalarBuffer::from_iter(values.iter().map(|&v| {
364                    let signed_val: <T::SignedIntType as ArrowPrimitiveType>::Native = v.as_();
365                    let v_signed = self.reference_value.add_wrapping(signed_val);
366                    let v_quantized: <T::SignedIntType as ArrowPrimitiveType>::Native =
367                        v_signed.shr(shift);
368                    v_quantized.sub_wrapping(quantized_min).as_()
369                }));
370                let quantized_array =
371                    PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
372                        quantized_values,
373                        nulls.clone(),
374                    );
375                let quantized_bitpacked =
376                    BitPackedArray::from_primitive(quantized_array, NonZero::new(new_bw).unwrap());
377                let hybrid = LiquidFloatQuantizedArray::<T> {
378                    exponent: self.exponent,
379                    quantized: quantized_bitpacked,
380                    reference_value: self.reference_value,
381                    bucket_width: shift,
382                    disk_range,
383                    io,
384                    patch_indices: self.patch_indices.clone(),
385                    patch_values: self.patch_values.clone(),
386                };
387                Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
388            }
389        }
390    }
391}
392
393impl<T> LiquidFloatArray<T>
394where
395    T: LiquidFloatType,
396{
397    /*
398    Serialized LiquidFloatArray Memory Layout:
399    +--------------------------------------------------+
400    | LiquidIPCHeader (16 bytes)                       |
401    +--------------------------------------------------+
402
403    +--------------------------------------------------+
404    | reference_value                                  |
405    | (size_of::<T::SignedIntType::Native> bytes)      |  // The reference value (e.g. minimum value)
406    +--------------------------------------------------+
407    | Padding (to 8-byte alignment)                    |  // Padding to ensure 8-byte alignment
408    +--------------------------------------------------+
409
410    +--------------------------------------------------+
411    | Exponents                                        |
412    +--------------------------------------------------+
413    | e (1 byte)                                       |
414    +--------------------------------------------------+
415    | f (1 byte)                                       |
416    +--------------------------------------------------+
417    | Padding (6 bytes)                                |
418    +--------------------------------------------------+
419
420    +--------------------------------------------------+
421    | Patch Data                                       |
422    +--------------------------------------------------+
423    | patch_length (8 bytes)                           |
424    +--------------------------------------------------+
425    | patch_indices (8 * patch_length btyes)           |
426    +--------------------------------------------------+
427    | patch_values (length *size_of::<T::Native> btyes;|
428    |               8-byte aligned)                    |
429    +--------------------------------------------------+
430
431    +--------------------------------------------------+
432    | BitPackedArray Data                              |
433    +--------------------------------------------------+
434    | [BitPackedArray Header & Bit-Packed Values]      |  // Written by self.bit_packed.to_bytes()
435    +--------------------------------------------------+
436    */
437    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
438        // Determine type ID based on the type
439        let physical_type_id = get_physical_type_id::<T>();
440        let logical_type_id = LiquidDataType::Float as u16;
441        let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
442
443        let mut result = Vec::with_capacity(256); // Pre-allocate a reasonable size
444
445        // Write header
446        result.extend_from_slice(&header.to_bytes());
447
448        // Write reference value
449        let ref_value_bytes = unsafe {
450            std::slice::from_raw_parts(
451                &self.reference_value as *const <T::SignedIntType as ArrowPrimitiveType>::Native
452                    as *const u8,
453                std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>(),
454            )
455        };
456        result.extend_from_slice(ref_value_bytes);
457
458        let exponents_starting_loc = (result.len() + 7) & !7;
459        // Insert padding before exponents start
460        while result.len() < exponents_starting_loc {
461            result.push(0);
462        }
463
464        let exponent_e_bytes =
465            unsafe { std::slice::from_raw_parts(&self.exponent.e as *const u8, 1) };
466        let exponent_f_bytes =
467            unsafe { std::slice::from_raw_parts(&self.exponent.f as *const u8, 1) };
468        // Write exponents and padding
469        result.extend_from_slice(exponent_e_bytes);
470        result.extend_from_slice(exponent_f_bytes);
471        for _i in 0..6 {
472            result.push(0);
473        }
474
475        // Number of bytes occupied by usize is target-dependent; use u64 instead
476        let patch_length = self.patch_indices.len() as u64;
477
478        let patch_length_bytes = unsafe {
479            std::slice::from_raw_parts(
480                &patch_length as *const u64 as *const u8,
481                std::mem::size_of::<u64>(),
482            )
483        };
484
485        // Write the patch length
486        result.extend_from_slice(patch_length_bytes);
487
488        if !self.patch_indices.is_empty() {
489            let patch_indices_bytes = unsafe {
490                std::slice::from_raw_parts(
491                    self.patch_indices.as_ptr() as *const u8,
492                    std::mem::size_of::<u64>() * self.patch_indices.len(),
493                )
494            };
495
496            // Write the patch indices
497            result.extend_from_slice(patch_indices_bytes);
498
499            // Write the patch values
500            let patch_values_bytes = unsafe {
501                std::slice::from_raw_parts(
502                    self.patch_values.as_ptr() as *const u8,
503                    std::mem::size_of::<T::Native>() * self.patch_indices.len(),
504                )
505            };
506            result.extend_from_slice(patch_values_bytes);
507        }
508        let padding = ((result.len() + 7) & !7) - result.len();
509
510        // Add padding before writing bit-packed array
511        for _i in 0..padding {
512            result.push(0);
513        }
514
515        // Serialize bit-packed values
516        self.bit_packed.to_bytes(&mut result);
517
518        result
519    }
520
521    /// Deserialize a LiquidFloatArray from bytes, using zero-copy where possible.
522    pub fn from_bytes(bytes: Bytes) -> Self {
523        let header = LiquidIPCHeader::from_bytes(&bytes);
524
525        // Verify the type id
526        let physical_id = header.physical_type_id;
527        assert_eq!(physical_id, get_physical_type_id::<T>());
528        let logical_id = header.logical_type_id;
529        assert_eq!(logical_id, LiquidDataType::Float as u16);
530
531        // Get the reference value
532        let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
533        let reference_value = unsafe {
534            (ref_value_ptr as *const u8 as *const <T::SignedIntType as ArrowPrimitiveType>::Native)
535                .read_unaligned()
536        };
537
538        // Read exponents (e, f) & skip padding
539        let mut next = ((LiquidIPCHeader::size()
540            + std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>())
541            + 7)
542            & !7;
543
544        // Read exponent fields (1 byte each) and skip 6 padding bytes
545        let exponent_e = bytes[next];
546        let exponent_f = bytes[next + 1];
547        next += 8;
548
549        // Read patch length (8 bytes)
550        let mut patch_length = 0u64;
551        patch_length |= bytes[next] as u64;
552        patch_length |= (bytes[next + 1] as u64) << 8;
553        patch_length |= (bytes[next + 2] as u64) << 16;
554        patch_length |= (bytes[next + 3] as u64) << 24;
555        patch_length |= (bytes[next + 4] as u64) << 32;
556        patch_length |= (bytes[next + 5] as u64) << 40;
557        patch_length |= (bytes[next + 6] as u64) << 48;
558        patch_length |= (bytes[next + 7] as u64) << 56;
559        next += 8;
560
561        // Read patch indices
562        let mut patch_indices = Vec::new();
563        let mut patch_values = Vec::new();
564        if patch_length > 0 {
565            let count = patch_length as usize;
566            let idx_bytes = count * std::mem::size_of::<u64>();
567            let val_bytes = count * std::mem::size_of::<T::Native>();
568
569            let indices_slice = bytes.slice(next..next + idx_bytes);
570            next += idx_bytes;
571            patch_indices = unsafe {
572                let ptr = indices_slice.as_ptr() as *const u64;
573                std::slice::from_raw_parts(ptr, count).to_vec()
574            };
575
576            let values_slice = bytes.slice(next..next + val_bytes);
577            next += val_bytes;
578            patch_values = unsafe {
579                let ptr = values_slice.as_ptr() as *const T::Native;
580                std::slice::from_raw_parts(ptr, count).to_vec()
581            };
582        }
583
584        // Align up to 8 bytes for bit-packed array
585        next = (next + 7) & !7;
586
587        let bit_packed = BitPackedArray::<T::UnsignedIntType>::from_bytes(bytes.slice(next..));
588
589        Self {
590            exponent: Exponents {
591                e: exponent_e,
592                f: exponent_f,
593            },
594            bit_packed,
595            patch_indices,
596            patch_values,
597            reference_value,
598            squeeze_policy: FloatSqueezePolicy::Quantize,
599        }
600    }
601}
602
603#[derive(Debug, Copy, Clone, PartialEq, Eq)]
604pub struct Exponents {
605    pub(crate) e: u8,
606    pub(crate) f: u8,
607}
608
609fn encode_arrow_array<T: LiquidFloatType>(
610    arrow_array: &PrimitiveArray<T>,
611    exp: &Exponents, // fill_value: &mut Option<<T::UnsignedIntType as ArrowPrimitiveType>::Native>
612) -> LiquidFloatArray<T> {
613    let mut patch_indices: Vec<u64> = Vec::new();
614    let mut patch_values: Vec<T::Native> = Vec::new();
615    let mut patch_count: usize = 0;
616    let mut fill_value: Option<<T::SignedIntType as ArrowPrimitiveType>::Native> = None;
617    let values = arrow_array.values();
618    let nulls = arrow_array.nulls();
619
620    // All values are null
621    if arrow_array.null_count() == arrow_array.len() {
622        return LiquidFloatArray::<T> {
623            bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
624            exponent: Exponents { e: 0, f: 0 },
625            patch_indices: Vec::new(),
626            patch_values: Vec::new(),
627            reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native::ZERO,
628            squeeze_policy: FloatSqueezePolicy::Quantize,
629        };
630    }
631
632    let mut encoded_values = Vec::with_capacity(arrow_array.len());
633    for v in values.iter() {
634        let encoded = T::encode_single_unchecked(&v.as_(), exp);
635        let decoded = T::decode_single(&encoded, exp);
636        // TODO(): Check if this is a bitwise comparison
637        let neq = !decoded.eq(&v.as_()) as usize;
638        patch_count += neq;
639        encoded_values.push(encoded);
640    }
641
642    if patch_count > 0 {
643        patch_indices.resize_with(patch_count + 1, Default::default);
644        patch_values.resize_with(patch_count + 1, Default::default);
645        let mut patch_index: usize = 0;
646
647        for i in 0..encoded_values.len() {
648            let decoded = T::decode_single(&encoded_values[i], exp);
649            patch_indices[patch_index] = i.as_();
650            patch_values[patch_index] = arrow_array.value(i).as_();
651            patch_index += !(decoded.eq(&values[i].as_())) as usize;
652        }
653        assert_eq!(patch_index, patch_count);
654        unsafe {
655            patch_indices.set_len(patch_count);
656            patch_values.set_len(patch_count);
657        }
658    }
659
660    // find the first successfully encoded value (i.e., not patched)
661    // this is our fill value for missing values
662    if patch_count > 0 && patch_count < arrow_array.len() {
663        for i in 0..encoded_values.len() {
664            if i >= patch_indices.len() || patch_indices[i] != i as u64 {
665                fill_value = encoded_values.get(i).copied();
666                break;
667            }
668        }
669    }
670
671    // replace the patched values in the encoded array with the fill value
672    // for better downstream compression
673    if let Some(fill_value) = fill_value {
674        // handle the edge case where the first N >= 1 chunks are all patches
675        for patch_idx in &patch_indices {
676            encoded_values[*patch_idx as usize] = fill_value;
677        }
678    }
679
680    let min = *encoded_values
681        .iter()
682        .min()
683        .expect("`encoded_values` shouldn't be all nulls");
684    let max = *encoded_values
685        .iter()
686        .max()
687        .expect("`encoded_values` shouldn't be all nulls");
688    let sub: <T::UnsignedIntType as ArrowPrimitiveType>::Native = max.sub_wrapping(min).as_();
689
690    let unsigned_encoded_values = encoded_values
691        .iter()
692        .map(|v| {
693            let k: <T::UnsignedIntType as ArrowPrimitiveType>::Native = v.sub_wrapping(min).as_();
694            k
695        })
696        .collect::<Vec<_>>();
697    let encoded_output = PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
698        ScalarBuffer::from(unsigned_encoded_values),
699        nulls.cloned(),
700    );
701
702    let bit_width = get_bit_width(sub.as_());
703    let bit_packed_array = BitPackedArray::from_primitive(encoded_output, bit_width);
704
705    LiquidFloatArray::<T> {
706        bit_packed: bit_packed_array,
707        exponent: *exp,
708        patch_indices,
709        patch_values,
710        reference_value: min,
711        squeeze_policy: FloatSqueezePolicy::Quantize,
712    }
713}
714
715fn get_best_exponents<T: LiquidFloatType>(arrow_array: &PrimitiveArray<T>) -> Exponents {
716    let mut best_exponents = Exponents { e: 0, f: 0 };
717    let mut min_encoded_size: usize = usize::MAX;
718
719    let sample_arrow_array: Option<PrimitiveArray<T>> =
720        (arrow_array.len() > NUM_SAMPLES).then(|| {
721            arrow_array
722                .iter()
723                .step_by(arrow_array.len() / NUM_SAMPLES)
724                .filter(|s| s.is_some())
725                .collect()
726        });
727
728    for e in 0..T::MAX_EXPONENT {
729        for f in 0..e {
730            let exp = Exponents { e, f };
731            let liquid_array =
732                encode_arrow_array(sample_arrow_array.as_ref().unwrap_or(arrow_array), &exp);
733            if liquid_array.get_array_memory_size() < min_encoded_size {
734                best_exponents = exp;
735                min_encoded_size = liquid_array.get_array_memory_size();
736            }
737        }
738    }
739    best_exponents
740}
741
742#[derive(Debug)]
743struct LiquidFloatQuantizedArray<T: LiquidFloatType> {
744    exponent: Exponents,
745    quantized: BitPackedArray<T::UnsignedIntType>,
746    reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
747    bucket_width: u8, // Width of each bucket (in bits)
748    disk_range: std::ops::Range<u64>,
749    io: Arc<dyn SqueezeIoHandler>,
750    patch_indices: Vec<u64>,
751    patch_values: Vec<T::Native>,
752}
753
754impl<T> LiquidFloatQuantizedArray<T>
755where
756    T: LiquidFloatType,
757{
758    #[allow(dead_code)]
759    fn as_any(&self) -> &dyn Any {
760        self
761    }
762
763    #[inline]
764    fn len(&self) -> usize {
765        self.quantized.len()
766    }
767
768    fn new_from_filtered(
769        &self,
770        filtered: PrimitiveArray<<T as LiquidFloatType>::UnsignedIntType>,
771    ) -> Self {
772        let bit_width = self
773            .quantized
774            .bit_width()
775            .expect("quantized bit width must exist");
776        let quantized = BitPackedArray::from_primitive(filtered, bit_width);
777        Self {
778            exponent: self.exponent,
779            quantized,
780            reference_value: self.reference_value,
781            bucket_width: self.bucket_width,
782            io: self.io.clone(),
783            patch_indices: self.patch_indices.clone(),
784            patch_values: self.patch_values.clone(),
785            disk_range: self.disk_range.clone(),
786        }
787    }
788
789    fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
790        let q_prim: PrimitiveArray<T::UnsignedIntType> = self.quantized.to_primitive();
791        let selection = BooleanArray::new(selection.clone(), None);
792        let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
793        let filtered = filtered.as_primitive::<T::UnsignedIntType>().clone();
794        self.new_from_filtered(filtered)
795    }
796
797    async fn hydrate_full_arrow(&self) -> ArrayRef {
798        let bytes = self
799            .io
800            .read(Some(self.disk_range.clone()))
801            .await
802            .expect("read squeezed backing");
803        let liquid = crate::liquid_array::ipc::read_from_bytes(
804            bytes,
805            &crate::liquid_array::ipc::LiquidIPCContext::new(None),
806        );
807        liquid.to_arrow_array()
808    }
809
810    #[inline]
811    fn handle_eq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
812        if k < lo || k > hi { Some(false) } else { None }
813    }
814
815    #[inline]
816    fn handle_neq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
817        if k < lo || k > hi { Some(true) } else { None }
818    }
819
820    #[inline]
821    fn handle_lt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
822        if k <= lo {
823            Some(false)
824        } else if hi < k {
825            Some(true)
826        } else {
827            None
828        }
829    }
830
831    #[inline]
832    fn handle_lteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
833        if k < lo {
834            Some(false)
835        } else if hi <= k {
836            Some(true)
837        } else {
838            None
839        }
840    }
841
842    #[inline]
843    fn handle_gt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
844        if k < lo {
845            Some(true)
846        } else if hi <= k {
847            Some(false)
848        } else {
849            None
850        }
851    }
852
853    #[inline]
854    fn handle_gteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
855        if k <= lo {
856            Some(true)
857        } else if hi < k {
858            Some(false)
859        } else {
860            None
861        }
862    }
863
864    fn try_eval_predicate_inner(
865        &self,
866        op: &Operator,
867        literal: &Literal,
868    ) -> SqueezeResult<Option<BooleanArray>> {
869        // Extract scalar value as T::Native
870        let k_opt: Option<T::Native> = match literal.value() {
871            ScalarValue::Int8(Some(v)) => T::Native::from_i8(*v),
872            ScalarValue::Int16(Some(v)) => T::Native::from_i16(*v),
873            ScalarValue::Int32(Some(v)) => T::Native::from_i32(*v),
874            ScalarValue::Int64(Some(v)) => T::Native::from_i64(*v),
875            ScalarValue::UInt8(Some(v)) => T::Native::from_u8(*v),
876            ScalarValue::UInt16(Some(v)) => T::Native::from_u16(*v),
877            ScalarValue::UInt32(Some(v)) => T::Native::from_u32(*v),
878            ScalarValue::UInt64(Some(v)) => T::Native::from_u64(*v),
879            ScalarValue::Date32(Some(v)) => T::Native::from_i32(*v),
880            ScalarValue::Date64(Some(v)) => T::Native::from_i64(*v),
881            ScalarValue::Float32(Some(v)) => T::Native::from_f32(*v),
882            ScalarValue::Float64(Some(v)) => T::Native::from_f64(*v),
883            _ => None,
884        };
885        let Some(k) = k_opt else { return Ok(None) };
886        let q_prim = self.quantized.to_primitive();
887        let (_dt, values, _nulls) = q_prim.into_parts();
888
889        let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
890        let mut next_patch_index = 0;
891        let mut ignore_patches = false;
892        if self.patch_indices.is_empty() {
893            ignore_patches = true;
894        }
895        let comp_fn = match op {
896            Operator::Eq => Self::handle_eq,
897            Operator::NotEq => Self::handle_neq,
898            Operator::Lt => Self::handle_lt,
899            Operator::LtEq => Self::handle_lteq,
900            Operator::Gt => Self::handle_gt,
901            Operator::GtEq => Self::handle_gteq,
902        };
903        // TODO(): This might not be very vectorization-friendly right now. Figure out optimizations
904        for (i, &b) in values.iter().enumerate() {
905            if let Some(nulls) = self.quantized.nulls()
906                && !nulls.is_valid(i)
907            {
908                out_vals.push(false);
909                continue;
910            }
911            if !ignore_patches && i as u64 == self.patch_indices[next_patch_index] {
912                next_patch_index += 1;
913                if next_patch_index == self.patch_indices.len() {
914                    ignore_patches = true;
915                }
916                out_vals.push(false);
917                continue;
918            }
919
920            let val: <T::SignedIntType as ArrowPrimitiveType>::Native = b.as_();
921            let lo = (val << self.bucket_width).add_wrapping(self.reference_value);
922            let hi = ((val.add_wrapping(1i32.into())) << self.bucket_width)
923                .add_wrapping(self.reference_value);
924            let val_lower = T::decode_single(&lo, &self.exponent);
925            let val_higher = T::decode_single(&hi, &self.exponent);
926
927            let decided = comp_fn(val_lower, val_higher, k);
928            if let Some(v) = decided {
929                out_vals.push(v);
930            } else {
931                return Err(NeedsBacking);
932            }
933        }
934
935        // Handle patches separately
936        // TODO(): Vectorize this
937        for (idx, patch_idx) in self.patch_indices.iter().enumerate() {
938            let patch_value = self.patch_values[idx];
939            out_vals[*patch_idx as usize] = match op {
940                Operator::Eq => patch_value == k,
941                Operator::NotEq => patch_value != k,
942                Operator::Lt => patch_value < k,
943                Operator::LtEq => patch_value <= k,
944                Operator::Gt => patch_value > k,
945                Operator::GtEq => patch_value >= k,
946            }
947        }
948
949        let bool_buf = arrow::buffer::BooleanBuffer::from_iter(out_vals);
950        let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
951        Ok(Some(out))
952    }
953}
954
955#[async_trait::async_trait]
956impl<T> LiquidSqueezedArray for LiquidFloatQuantizedArray<T>
957where
958    T: LiquidFloatType,
959{
960    fn as_any(&self) -> &dyn Any {
961        self
962    }
963
964    fn get_array_memory_size(&self) -> usize {
965        self.quantized.get_array_memory_size()
966            + size_of::<Exponents>()
967            + self.patch_indices.capacity() * size_of::<u64>()
968            + self.patch_values.capacity() * size_of::<T::Native>()
969            + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
970    }
971
972    fn len(&self) -> usize {
973        LiquidFloatQuantizedArray::<T>::len(self)
974    }
975
976    async fn to_arrow_array(&self) -> ArrayRef {
977        self.hydrate_full_arrow().await
978    }
979
980    fn data_type(&self) -> LiquidDataType {
981        LiquidDataType::Float
982    }
983
984    fn original_arrow_data_type(&self) -> DataType {
985        T::DATA_TYPE.clone()
986    }
987
988    fn disk_backing(&self) -> SqueezedBacking {
989        SqueezedBacking::Liquid((self.disk_range.end - self.disk_range.start) as usize)
990    }
991
992    async fn try_eval_predicate(
993        &self,
994        liquid_expr: &LiquidExpr,
995        filter: &BooleanBuffer,
996    ) -> BooleanArray {
997        // Apply selection first to reduce input rows
998        let filtered = self.filter_inner(filter);
999        let expr = liquid_expr.physical_expr();
1000
1001        if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>()
1002            && let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>()
1003        {
1004            let op = binary_expr.op();
1005            let supported_op = Operator::from_datafusion(op);
1006            if let Some(supported_op) = supported_op {
1007                match filtered.try_eval_predicate_inner(&supported_op, literal) {
1008                    Ok(Some(mask)) => return mask,
1009                    Ok(None) => {
1010                        let fallback = self.filter(filter).await;
1011                        return eval_predicate_on_array(fallback, liquid_expr);
1012                    }
1013                    Err(NeedsBacking) => {}
1014                }
1015
1016                // Fallback: hydrate full Arrow and evaluate predicate on filtered rows.
1017                use arrow::array::cast::AsArray;
1018
1019                let full = self.hydrate_full_arrow().await;
1020                let selection_array = BooleanArray::new(filter.clone(), None);
1021                let filtered_arr = arrow::compute::filter(&full, &selection_array)
1022                    .expect("selection must match array length");
1023                let filtered_len = filtered_arr.len();
1024
1025                let lhs = ColumnarValue::Array(filtered_arr);
1026                let rhs = ColumnarValue::Scalar(literal.value().clone());
1027                let result = match op {
1028                    DFOperator::NotEq => apply_cmp(DFOperator::NotEq, &lhs, &rhs),
1029                    DFOperator::Eq => apply_cmp(DFOperator::Eq, &lhs, &rhs),
1030                    DFOperator::Lt => apply_cmp(DFOperator::Lt, &lhs, &rhs),
1031                    DFOperator::LtEq => apply_cmp(DFOperator::LtEq, &lhs, &rhs),
1032                    DFOperator::Gt => apply_cmp(DFOperator::Gt, &lhs, &rhs),
1033                    DFOperator::GtEq => apply_cmp(DFOperator::GtEq, &lhs, &rhs),
1034                    _ => {
1035                        let fallback = self.filter(filter).await;
1036                        return eval_predicate_on_array(fallback, liquid_expr);
1037                    }
1038                };
1039                let result = result.expect("validated LiquidExpr comparison must evaluate");
1040                return result
1041                    .into_array(filtered_len)
1042                    .expect("comparison output must be an array")
1043                    .as_boolean()
1044                    .clone();
1045            }
1046        }
1047        let fallback = self.filter(filter).await;
1048        eval_predicate_on_array(fallback, liquid_expr)
1049    }
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054    use datafusion_expr_common::operator::Operator;
1055    use datafusion_physical_expr::PhysicalExpr;
1056    use futures::executor::block_on;
1057    use rand::{RngExt as _, SeedableRng as _, distr::uniform::SampleUniform, rngs::StdRng};
1058
1059    use crate::cache::TestSqueezeIo;
1060
1061    use super::*;
1062
1063    macro_rules! test_roundtrip {
1064        ($test_name: ident, $type:ty, $values: expr) => {
1065            #[test]
1066            fn $test_name() {
1067                let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
1068                let array = PrimitiveArray::<$type>::from(original.clone());
1069
1070                // Convert to Liquid array and back
1071                let liquid_array = LiquidFloatArray::<$type>::from_arrow_array(array.clone());
1072                let result_array = liquid_array.to_arrow_array();
1073                let bytes_array =
1074                    LiquidFloatArray::<$type>::from_bytes(liquid_array.to_bytes().into());
1075
1076                assert_eq!(result_array.as_ref(), &array);
1077                assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
1078            }
1079        };
1080    }
1081
1082    // Test cases for Float32
1083    test_roundtrip!(
1084        test_float32_roundtrip_basic,
1085        Float32Type,
1086        vec![Some(-1.0), Some(1.0), Some(0.0)]
1087    );
1088
1089    test_roundtrip!(
1090        test_float32_roundtrip_with_nones,
1091        Float32Type,
1092        vec![Some(-1.0), Some(1.0), Some(0.0), None]
1093    );
1094
1095    test_roundtrip!(
1096        test_float32_roundtrip_all_nones,
1097        Float32Type,
1098        vec![None, None, None, None]
1099    );
1100
1101    test_roundtrip!(test_float32_roundtrip_empty, Float32Type, vec![]);
1102
1103    // Test cases for Float64
1104    test_roundtrip!(
1105        test_float64_roundtrip_basic,
1106        Float64Type,
1107        vec![Some(-1.0), Some(1.0), Some(0.0)]
1108    );
1109
1110    test_roundtrip!(
1111        test_float64_roundtrip_with_nones,
1112        Float64Type,
1113        vec![Some(-1.0), Some(1.0), Some(0.0), None]
1114    );
1115
1116    test_roundtrip!(
1117        test_float64_roundtrip_all_nones,
1118        Float64Type,
1119        vec![None, None, None, None]
1120    );
1121
1122    test_roundtrip!(test_float64_roundtrip_empty, Float64Type, vec![]);
1123
1124    // Tests with ilters
1125    #[test]
1126    fn test_filter_basic() {
1127        // Create original array with some values
1128        let original = vec![Some(1.0), Some(2.1), Some(3.2), None, Some(5.5)];
1129        let array = PrimitiveArray::<Float32Type>::from(original);
1130        let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1131
1132        // Create selection mask: keep indices 0, 2, and 4
1133        let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
1134
1135        // Apply filter
1136        let result_array = liquid_array.filter(&selection);
1137
1138        // Expected result after filtering
1139        let expected = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(3.2), Some(5.5)]);
1140
1141        assert_eq!(result_array.as_ref(), &expected);
1142    }
1143
1144    #[test]
1145    fn test_original_arrow_data_type_returns_float32() {
1146        let array = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.5)]);
1147        let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1148        assert_eq!(liquid.original_arrow_data_type(), DataType::Float32);
1149    }
1150
1151    #[test]
1152    fn test_filter_all_nulls() {
1153        // Create array with all nulls
1154        let original = vec![None, None, None, None];
1155        let array = PrimitiveArray::<Float32Type>::from(original);
1156        let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1157
1158        // Keep first and last elements
1159        let selection = BooleanBuffer::from(vec![true, false, false, true]);
1160
1161        let result_array = liquid_array.filter(&selection);
1162
1163        let expected = PrimitiveArray::<Float32Type>::from(vec![None, None]);
1164
1165        assert_eq!(result_array.as_ref(), &expected);
1166    }
1167
1168    #[test]
1169    fn test_filter_empty_result() {
1170        let original = vec![Some(1.0), Some(2.1), Some(3.3)];
1171        let array = PrimitiveArray::<Float32Type>::from(original);
1172        let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1173
1174        // Filter out all elements
1175        let selection = BooleanBuffer::from(vec![false, false, false]);
1176
1177        let result_array = liquid_array.filter(&selection);
1178
1179        assert_eq!(result_array.len(), 0);
1180    }
1181
1182    #[test]
1183    fn test_compression_f32_f64() {
1184        fn run_compression_test<T: LiquidFloatType>(
1185            type_name: &str,
1186            data_fn: impl Fn(usize) -> T::Native,
1187        ) {
1188            let original: Vec<T::Native> = (0..2000).map(data_fn).collect();
1189            let array = PrimitiveArray::<T>::from_iter_values(original);
1190            let uncompressed_size = array.get_array_memory_size();
1191
1192            let liquid_array = LiquidFloatArray::<T>::from_arrow_array(array);
1193            let compressed_size = liquid_array.get_array_memory_size();
1194
1195            println!(
1196                "Type: {type_name}, uncompressed_size: {uncompressed_size}, compressed_size: {compressed_size}"
1197            );
1198            // Assert that compression actually reduced the size
1199            assert!(
1200                compressed_size < uncompressed_size,
1201                "{type_name} compression failed to reduce size"
1202            );
1203        }
1204
1205        // Run for f32
1206        run_compression_test::<Float32Type>("f32", |i| i as f32);
1207
1208        // Run for f64
1209        run_compression_test::<Float64Type>("f64", |i| i as f64);
1210    }
1211
1212    //  --------- Hybrid (squeeze) tests ----------
1213    fn make_f_array_with_range<T>(
1214        len: usize,
1215        base_min: T::Native,
1216        range: T::Native,
1217        null_prob: f32,
1218        rng: &mut StdRng,
1219    ) -> PrimitiveArray<T>
1220    where
1221        T: LiquidFloatType,
1222        <T as arrow::array::ArrowPrimitiveType>::Native: SampleUniform,
1223        PrimitiveArray<T>: From<Vec<Option<<T as ArrowPrimitiveType>::Native>>>,
1224    {
1225        let mut vals: Vec<Option<T::Native>> = Vec::with_capacity(len);
1226        for _ in 0..len {
1227            if rng.random_bool(null_prob as f64) {
1228                vals.push(None);
1229            } else {
1230                vals.push(Some(rng.random_range(base_min..(base_min + range))));
1231            }
1232        }
1233        PrimitiveArray::<T>::from(vals)
1234    }
1235
1236    #[test]
1237    fn hybrid_squeeze_unsqueezable_small_range() {
1238        let mut rng = StdRng::seed_from_u64(0x51_71);
1239        let arr = make_f_array_with_range::<Float32Type>(64, 10_000.0, 100.0, 0.1, &mut rng);
1240        let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(arr);
1241        assert!(
1242            liquid
1243                .squeeze(Arc::new(TestSqueezeIo::default()), None)
1244                .is_none()
1245        );
1246    }
1247
1248    #[test]
1249    fn hybrid_squeeze_full_read_roundtrip_f32() {
1250        let mut rng = StdRng::seed_from_u64(0x51_72);
1251        let arr = make_f_array_with_range::<Float32Type>(
1252            2000,
1253            -50_000.0,
1254            (1 << 16) as f32,
1255            0.1,
1256            &mut rng,
1257        );
1258        let liq = LiquidFloatArray::<Float32Type>::from_arrow_array(arr.clone());
1259        let bytes_baseline = liq.to_bytes();
1260        let io = Arc::new(TestSqueezeIo::default());
1261        let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1262        io.set_bytes(bytes.clone());
1263        // ensure we can recover the original by hydrating from full bytes
1264        let recovered = LiquidFloatArray::<Float32Type>::from_bytes(bytes.clone());
1265        assert_eq!(
1266            recovered.to_arrow_array().as_primitive::<Float32Type>(),
1267            &arr
1268        );
1269        assert_eq!(bytes_baseline, recovered.to_bytes());
1270
1271        let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1272        let mask = BooleanBuffer::from(vec![true; arr.len()]);
1273        let build_expr = |op: Operator, k: f32| -> Arc<dyn PhysicalExpr> {
1274            let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(k))));
1275            Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1276        };
1277
1278        // Expect resolvable results without IO
1279        let resolvable_cases: Vec<(Operator, f32, bool)> = vec![
1280            (Operator::Eq, min - 1.0, false),   // eq false everywhere
1281            (Operator::NotEq, min - 1.0, true), // neq true everywhere
1282            (Operator::Lt, min, false),         // lt false everywhere
1283            (Operator::LtEq, min - 1.0, false), // lte false everywhere
1284            (Operator::Gt, min - 1.0, true),    // gt true everywhere
1285            (Operator::GtEq, min, true),        // gte true everywhere
1286        ];
1287
1288        for (op, k, expected_const) in resolvable_cases {
1289            let expr = build_expr(op, k);
1290            io.reset_reads();
1291            let got = block_on(hybrid.try_eval_predicate(
1292                &crate::cache::LiquidExpr::new_unchecked(expr.clone()),
1293                &mask,
1294            ));
1295            let expected = {
1296                let vals: Vec<Option<bool>> = (0..arr.len())
1297                    .map(|i| {
1298                        if arr.is_null(i) {
1299                            None
1300                        } else {
1301                            Some(expected_const)
1302                        }
1303                    })
1304                    .collect();
1305                BooleanArray::from(vals)
1306            };
1307            assert_eq!(io.reads(), 0);
1308            assert_eq!(got, expected);
1309        }
1310
1311        // Unresolvable for Eq: pick a present value (ensures ambiguous bucket)
1312        let k_present = (0..arr.len())
1313            .find_map(|i| {
1314                if arr.is_null(i) {
1315                    None
1316                } else {
1317                    Some(arr.value(i))
1318                }
1319            })
1320            .unwrap();
1321        let expr_eq_present = build_expr(Operator::Eq, k_present);
1322        io.reset_reads();
1323        let got = block_on(hybrid.try_eval_predicate(
1324            &crate::cache::LiquidExpr::new_unchecked(expr_eq_present.clone()),
1325            &mask,
1326        ));
1327        let expected = {
1328            let vals: Vec<Option<bool>> = (0..arr.len())
1329                .map(|i| {
1330                    if arr.is_null(i) {
1331                        None
1332                    } else {
1333                        Some(arr.value(i) == k_present)
1334                    }
1335                })
1336                .collect();
1337            BooleanArray::from(vals)
1338        };
1339        assert!(io.reads() > 0);
1340        assert_eq!(got, expected);
1341    }
1342
1343    #[test]
1344    fn hybrid_squeeze_full_read_roundtrip_f64() {
1345        let mut rng = StdRng::seed_from_u64(0x51_72);
1346        let arr = make_f_array_with_range::<Float64Type>(
1347            2000,
1348            -50_000.0f64,
1349            (1 << 16) as f64,
1350            0.1,
1351            &mut rng,
1352        );
1353        let liq = LiquidFloatArray::<Float64Type>::from_arrow_array(arr.clone());
1354        let bytes_baseline = liq.to_bytes();
1355        let io = Arc::new(TestSqueezeIo::default());
1356        let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1357        io.set_bytes(bytes.clone());
1358        // ensure we can recover the original by hydrating from full bytes
1359        let recovered = LiquidFloatArray::<Float64Type>::from_bytes(bytes.clone());
1360        assert_eq!(
1361            recovered.to_arrow_array().as_primitive::<Float64Type>(),
1362            &arr
1363        );
1364        assert_eq!(bytes_baseline, recovered.to_bytes());
1365
1366        let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1367        let mask = BooleanBuffer::from(vec![true; arr.len()]);
1368        let build_expr = |op: Operator, k: f64| -> Arc<dyn PhysicalExpr> {
1369            let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(k))));
1370            Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1371        };
1372
1373        // Expect resolvable results without IO
1374        let resolvable_cases: Vec<(Operator, f64, bool)> = vec![
1375            (Operator::Eq, min - 1.0, false),   // eq false everywhere
1376            (Operator::NotEq, min - 1.0, true), // neq true everywhere
1377            (Operator::Lt, min, false),         // lt false everywhere
1378            (Operator::LtEq, min - 1.0, false), // lte false everywhere
1379            (Operator::Gt, min - 1.0, true),    // gt true everywhere
1380            (Operator::GtEq, min, true),        // gte true everywhere
1381        ];
1382
1383        for (op, k, expected_const) in resolvable_cases {
1384            let expr = build_expr(op, k);
1385            io.reset_reads();
1386            let got = block_on(hybrid.try_eval_predicate(
1387                &crate::cache::LiquidExpr::new_unchecked(expr.clone()),
1388                &mask,
1389            ));
1390            let expected = {
1391                let vals: Vec<Option<bool>> = (0..arr.len())
1392                    .map(|i| {
1393                        if arr.is_null(i) {
1394                            None
1395                        } else {
1396                            Some(expected_const)
1397                        }
1398                    })
1399                    .collect();
1400                BooleanArray::from(vals)
1401            };
1402            assert_eq!(io.reads(), 0);
1403            assert_eq!(got, expected);
1404        }
1405
1406        // Unresolvable for Eq: pick a present value (ensures ambiguous bucket)
1407        let k_present = (0..arr.len())
1408            .find_map(|i| {
1409                if arr.is_null(i) {
1410                    None
1411                } else {
1412                    Some(arr.value(i))
1413                }
1414            })
1415            .unwrap();
1416        let expr_eq_present = build_expr(Operator::Eq, k_present);
1417        io.reset_reads();
1418        let got = block_on(hybrid.try_eval_predicate(
1419            &crate::cache::LiquidExpr::new_unchecked(expr_eq_present.clone()),
1420            &mask,
1421        ));
1422        let expected = {
1423            let vals: Vec<Option<bool>> = (0..arr.len())
1424                .map(|i| {
1425                    if arr.is_null(i) {
1426                        None
1427                    } else {
1428                        Some(arr.value(i) == k_present)
1429                    }
1430                })
1431                .collect();
1432            BooleanArray::from(vals)
1433        };
1434        assert!(io.reads() > 0);
1435        assert_eq!(got, expected);
1436    }
1437}