Skip to main content

liquid_cache/liquid_array/
float_array.rs

1///
2/// Acknowledgement:
3/// The ALP compression implemented in this file is based on the Rust implementation available at https://github.com/spiraldb/alp
4///
5use std::{
6    any::Any,
7    fmt::Debug,
8    num::NonZero,
9    ops::{Mul, Shl, Shr},
10    sync::Arc,
11};
12
13use arrow::{
14    array::{
15        Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, AsArray, BooleanArray,
16        PrimitiveArray,
17    },
18    buffer::{BooleanBuffer, ScalarBuffer},
19    datatypes::{
20        ArrowNativeType, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type, UInt64Type,
21    },
22};
23use arrow_schema::DataType;
24use datafusion::{
25    physical_plan::{
26        PhysicalExpr,
27        expressions::{BinaryExpr, Literal},
28    },
29    scalar::ScalarValue,
30};
31use fastlanes::BitPacking;
32use num_traits::{AsPrimitive, Float, FromPrimitive};
33
34use super::LiquidDataType;
35use crate::liquid_array::LiquidArray;
36use crate::liquid_array::ipc::{PhysicalTypeMarker, get_physical_type_id};
37use crate::liquid_array::raw::BitPackedArray;
38use crate::liquid_array::{
39    LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking, Operator, SqueezeResult,
40    ipc::LiquidIPCHeader,
41};
42use crate::utils::get_bit_width;
43use crate::{cache::CacheExpression, liquid_array::SqueezeIoHandler};
44use bytes::Bytes;
45
46mod private {
47    use arrow::{
48        array::ArrowNumericType,
49        datatypes::{Float32Type, Float64Type},
50    };
51    use num_traits::AsPrimitive;
52
53    pub trait Sealed: ArrowNumericType<Native: AsPrimitive<f64> + AsPrimitive<f32>> {}
54
55    impl Sealed for Float32Type {}
56    impl Sealed for Float64Type {}
57}
58
59const NUM_SAMPLES: usize = 1024; // we use FASTLANES to encode array, the sample size needs to be at least 1024 to get a good estimate of the best exponents
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
62pub enum FloatSqueezePolicy {
63    /// Quantize values into buckets (good for coarse filtering; requires disk to recover values).
64    #[default]
65    Quantize = 0,
66}
67
68/// LiquidFloatType is a sealed trait that represents all the float types supported by Liquid.
69/// Implementors are Float32Type and Float64Type. TODO(): What about Float16Type, decimal types?
70pub trait LiquidFloatType:
71        ArrowPrimitiveType<
72            Native: AsPrimitive<
73                <Self::UnsignedIntType as ArrowPrimitiveType>::Native // Native must be convertible to the Native type of Self::UnSignedType
74            >
75            + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
76            + FromPrimitive
77            + AsPrimitive<<Self as ArrowPrimitiveType>::Native>
78            + Mul<<Self as ArrowPrimitiveType>::Native>
79            + Float // required for decode_single and encode_single_unchecked
80        >
81        + private::Sealed
82        + Debug
83        + PhysicalTypeMarker
84{
85    type UnsignedIntType:
86        ArrowPrimitiveType<
87            Native: BitPacking +
88                AsPrimitive<<Self as ArrowPrimitiveType>::Native>
89                + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
90                + AsPrimitive<u64>
91        >
92        + Debug;
93    type SignedIntType:
94        ArrowPrimitiveType<
95            Native: AsPrimitive<<Self as ArrowPrimitiveType>::Native>
96                + AsPrimitive<<Self::UnsignedIntType as ArrowPrimitiveType>::Native>
97                + Ord
98                + Shr<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
99                + Shl<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
100                + From<i32>
101        >
102        + Debug + Sync + Send;
103
104    const SWEET: <Self as ArrowPrimitiveType>::Native;
105    const MAX_EXPONENT: u8;
106    const FRACTIONAL_BITS: u8;
107    const F10: &'static [<Self as ArrowPrimitiveType>::Native];
108    const IF10: &'static [<Self as ArrowPrimitiveType>::Native];
109
110    #[inline]
111    fn fast_round(val: <Self as ArrowPrimitiveType>::Native) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
112        ((val + Self::SWEET) - Self::SWEET).as_()
113    }
114
115    #[inline]
116    fn encode_single_unchecked(val: &<Self as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
117        Self::fast_round(*val * Self::F10[exp.e as usize] * Self::IF10[exp.f as usize])
118    }
119
120    #[inline]
121    fn decode_single(val: &<Self::SignedIntType as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self as ArrowPrimitiveType>::Native {
122        let decoded_float: <Self as ArrowPrimitiveType>::Native = (*val).as_();
123        decoded_float * Self::F10[exp.f as usize] * Self::IF10[exp.e as usize]
124    }
125
126}
127
128impl LiquidFloatType for Float32Type {
129    type UnsignedIntType = UInt32Type;
130    type SignedIntType = Int32Type;
131    const FRACTIONAL_BITS: u8 = 23;
132    const MAX_EXPONENT: u8 = 10;
133    const SWEET: <Self as ArrowPrimitiveType>::Native = (1 << Self::FRACTIONAL_BITS)
134        as <Self as ArrowPrimitiveType>::Native
135        + (1 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
136    const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
137        1.0,
138        10.0,
139        100.0,
140        1000.0,
141        10000.0,
142        100000.0,
143        1000000.0,
144        10000000.0,
145        100000000.0,
146        1000000000.0,
147        10000000000.0, // 10^10
148    ];
149    const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
150        1.0,
151        0.1,
152        0.01,
153        0.001,
154        0.0001,
155        0.00001,
156        0.000001,
157        0.0000001,
158        0.00000001,
159        0.000000001,
160        0.0000000001, // 10^-10
161    ];
162}
163
164impl LiquidFloatType for Float64Type {
165    type UnsignedIntType = UInt64Type;
166    type SignedIntType = Int64Type;
167    const FRACTIONAL_BITS: u8 = 52;
168    const MAX_EXPONENT: u8 = 18;
169    const SWEET: <Self as ArrowPrimitiveType>::Native = (1u64 << Self::FRACTIONAL_BITS)
170        as <Self as ArrowPrimitiveType>::Native
171        + (1u64 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
172    const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
173        1.0,
174        10.0,
175        100.0,
176        1000.0,
177        10000.0,
178        100000.0,
179        1000000.0,
180        10000000.0,
181        100000000.0,
182        1000000000.0,
183        10000000000.0,
184        100000000000.0,
185        1000000000000.0,
186        10000000000000.0,
187        100000000000000.0,
188        1000000000000000.0,
189        10000000000000000.0,
190        100000000000000000.0,
191        1000000000000000000.0,
192        10000000000000000000.0,
193        100000000000000000000.0,
194        1000000000000000000000.0,
195        10000000000000000000000.0,
196        100000000000000000000000.0, // 10^23
197    ];
198
199    const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
200        1.0,
201        0.1,
202        0.01,
203        0.001,
204        0.0001,
205        0.00001,
206        0.000001,
207        0.0000001,
208        0.00000001,
209        0.000000001,
210        0.0000000001,
211        0.00000000001,
212        0.000000000001,
213        0.0000000000001,
214        0.00000000000001,
215        0.000000000000001,
216        0.0000000000000001,
217        0.00000000000000001,
218        0.000000000000000001,
219        0.0000000000000000001,
220        0.00000000000000000001,
221        0.000000000000000000001,
222        0.0000000000000000000001,
223        0.00000000000000000000001, // 10^-23
224    ];
225}
226
227/// Liquid's single-precision floating point array
228pub type LiquidFloat32Array = LiquidFloatArray<Float32Type>;
229/// Liquid's double precision floating point array
230pub type LiquidFloat64Array = LiquidFloatArray<Float64Type>;
231
232/// An array that stores floats in ALP
233#[derive(Debug, Clone)]
234pub struct LiquidFloatArray<T: LiquidFloatType> {
235    exponent: Exponents,
236    bit_packed: BitPackedArray<T::UnsignedIntType>,
237    patch_indices: Vec<u64>,
238    patch_values: Vec<T::Native>,
239    reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
240    squeeze_policy: FloatSqueezePolicy,
241}
242
243impl<T> LiquidFloatArray<T>
244where
245    T: LiquidFloatType,
246{
247    /// Check if the Liquid float array is empty.
248    pub fn is_empty(&self) -> bool {
249        self.len() == 0
250    }
251
252    /// Get the length of the Liquid float array.
253    pub fn len(&self) -> usize {
254        self.bit_packed.len()
255    }
256
257    /// Get the memory size of the Liquid primitive array.
258    pub fn get_array_memory_size(&self) -> usize {
259        self.bit_packed.get_array_memory_size()
260            + size_of::<Exponents>()
261            + self.patch_indices.capacity() * size_of::<u64>()
262            + self.patch_values.capacity() * size_of::<T::Native>()
263            + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
264    }
265
266    /// Create a Liquid primitive array from an Arrow float array.
267    pub fn from_arrow_array(arrow_array: arrow::array::PrimitiveArray<T>) -> LiquidFloatArray<T> {
268        let best_exponents = get_best_exponents::<T>(&arrow_array);
269        encode_arrow_array(&arrow_array, &best_exponents)
270    }
271
272    /// Get current squeeze policy for this array
273    pub fn squeeze_policy(&self) -> FloatSqueezePolicy {
274        self.squeeze_policy
275    }
276}
277
278impl<T> LiquidArray for LiquidFloatArray<T>
279where
280    T: LiquidFloatType,
281{
282    fn as_any(&self) -> &dyn Any {
283        self
284    }
285
286    fn get_array_memory_size(&self) -> usize {
287        self.get_array_memory_size()
288    }
289
290    fn len(&self) -> usize {
291        self.len()
292    }
293
294    #[inline]
295    fn to_arrow_array(&self) -> ArrayRef {
296        let unsigned_array = self.bit_packed.to_primitive();
297        let (_data_type, values, _nulls) = unsigned_array.into_parts();
298        let nulls = self.bit_packed.nulls();
299        // TODO(): Check if we should align vectors to cache line boundary
300        let mut decoded_values = Vec::from_iter(values.iter().map(|v| {
301            let mut val: <T::SignedIntType as ArrowPrimitiveType>::Native = (*v).as_();
302            val = val.add_wrapping(self.reference_value);
303            T::decode_single(&val, &self.exponent)
304        }));
305
306        // Patch values
307        if !self.patch_indices.is_empty() {
308            for i in 0..self.patch_indices.len() {
309                decoded_values[self.patch_indices[i].as_usize()] = self.patch_values[i];
310            }
311        }
312
313        Arc::new(PrimitiveArray::<T>::new(
314            ScalarBuffer::<<T as ArrowPrimitiveType>::Native>::from(decoded_values),
315            nulls.cloned(),
316        ))
317    }
318
319    fn original_arrow_data_type(&self) -> DataType {
320        T::DATA_TYPE.clone()
321    }
322
323    fn data_type(&self) -> LiquidDataType {
324        LiquidDataType::Float
325    }
326
327    fn to_bytes(&self) -> Vec<u8> {
328        self.to_bytes_inner()
329    }
330
331    fn is_empty(&self) -> bool {
332        self.len() == 0
333    }
334
335    fn to_best_arrow_array(&self) -> ArrayRef {
336        self.to_arrow_array()
337    }
338
339    fn squeeze(
340        &self,
341        io: Arc<dyn SqueezeIoHandler>,
342        _expression_hint: Option<&CacheExpression>,
343    ) -> Option<(super::LiquidSqueezedArrayRef, bytes::Bytes)> {
344        let orig_bw = self.bit_packed.bit_width()?;
345        if orig_bw.get() < 8 {
346            return None;
347        }
348
349        // New squeezed bit width is half of the original
350        let new_bw = orig_bw.get() / 2;
351
352        let full_bytes = Bytes::from(self.to_bytes_inner());
353        let disk_range = 0u64..(full_bytes.len() as u64);
354
355        let (_dt, values, nulls) = self.bit_packed.to_primitive().into_parts();
356
357        match self.squeeze_policy {
358            FloatSqueezePolicy::Quantize => {
359                let shift = orig_bw.get() - new_bw;
360                let quantized_min = self.reference_value.shr(shift);
361                // let quantized_max = values
362                let quantized_values: ScalarBuffer<
363                    <T::UnsignedIntType as ArrowPrimitiveType>::Native,
364                > = ScalarBuffer::from_iter(values.iter().map(|&v| {
365                    let signed_val: <T::SignedIntType as ArrowPrimitiveType>::Native = v.as_();
366                    let v_signed = self.reference_value.add_wrapping(signed_val);
367                    let v_quantized: <T::SignedIntType as ArrowPrimitiveType>::Native =
368                        v_signed.shr(shift);
369                    v_quantized.sub_wrapping(quantized_min).as_()
370                }));
371                let quantized_array =
372                    PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
373                        quantized_values,
374                        nulls.clone(),
375                    );
376                let quantized_bitpacked =
377                    BitPackedArray::from_primitive(quantized_array, NonZero::new(new_bw).unwrap());
378                let hybrid = LiquidFloatQuantizedArray::<T> {
379                    exponent: self.exponent,
380                    quantized: quantized_bitpacked,
381                    reference_value: self.reference_value,
382                    bucket_width: shift,
383                    disk_range,
384                    io,
385                    patch_indices: self.patch_indices.clone(),
386                    patch_values: self.patch_values.clone(),
387                };
388                Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
389            }
390        }
391    }
392}
393
394impl<T> LiquidFloatArray<T>
395where
396    T: LiquidFloatType,
397{
398    /*
399    Serialized LiquidFloatArray Memory Layout:
400    +--------------------------------------------------+
401    | LiquidIPCHeader (16 bytes)                       |
402    +--------------------------------------------------+
403
404    +--------------------------------------------------+
405    | reference_value                                  |
406    | (size_of::<T::SignedIntType::Native> bytes)      |  // The reference value (e.g. minimum value)
407    +--------------------------------------------------+
408    | Padding (to 8-byte alignment)                    |  // Padding to ensure 8-byte alignment
409    +--------------------------------------------------+
410
411    +--------------------------------------------------+
412    | Exponents                                        |
413    +--------------------------------------------------+
414    | e (1 byte)                                       |
415    +--------------------------------------------------+
416    | f (1 byte)                                       |
417    +--------------------------------------------------+
418    | Padding (6 bytes)                                |
419    +--------------------------------------------------+
420
421    +--------------------------------------------------+
422    | Patch Data                                       |
423    +--------------------------------------------------+
424    | patch_length (8 bytes)                           |
425    +--------------------------------------------------+
426    | patch_indices (8 * patch_length btyes)           |
427    +--------------------------------------------------+
428    | patch_values (length *size_of::<T::Native> btyes;|
429    |               8-byte aligned)                    |
430    +--------------------------------------------------+
431
432    +--------------------------------------------------+
433    | BitPackedArray Data                              |
434    +--------------------------------------------------+
435    | [BitPackedArray Header & Bit-Packed Values]      |  // Written by self.bit_packed.to_bytes()
436    +--------------------------------------------------+
437    */
438    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
439        // Determine type ID based on the type
440        let physical_type_id = get_physical_type_id::<T>();
441        let logical_type_id = LiquidDataType::Float as u16;
442        let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
443
444        let mut result = Vec::with_capacity(256); // Pre-allocate a reasonable size
445
446        // Write header
447        result.extend_from_slice(&header.to_bytes());
448
449        // Write reference value
450        let ref_value_bytes = unsafe {
451            std::slice::from_raw_parts(
452                &self.reference_value as *const <T::SignedIntType as ArrowPrimitiveType>::Native
453                    as *const u8,
454                std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>(),
455            )
456        };
457        result.extend_from_slice(ref_value_bytes);
458
459        let exponents_starting_loc = (result.len() + 7) & !7;
460        // Insert padding before exponents start
461        while result.len() < exponents_starting_loc {
462            result.push(0);
463        }
464
465        let exponent_e_bytes =
466            unsafe { std::slice::from_raw_parts(&self.exponent.e as *const u8, 1) };
467        let exponent_f_bytes =
468            unsafe { std::slice::from_raw_parts(&self.exponent.f as *const u8, 1) };
469        // Write exponents and padding
470        result.extend_from_slice(exponent_e_bytes);
471        result.extend_from_slice(exponent_f_bytes);
472        for _i in 0..6 {
473            result.push(0);
474        }
475
476        // Number of bytes occupied by usize is target-dependent; use u64 instead
477        let patch_length = self.patch_indices.len() as u64;
478
479        let patch_length_bytes = unsafe {
480            std::slice::from_raw_parts(
481                &patch_length as *const u64 as *const u8,
482                std::mem::size_of::<u64>(),
483            )
484        };
485
486        // Write the patch length
487        result.extend_from_slice(patch_length_bytes);
488
489        if !self.patch_indices.is_empty() {
490            let patch_indices_bytes = unsafe {
491                std::slice::from_raw_parts(
492                    self.patch_indices.as_ptr() as *const u8,
493                    std::mem::size_of::<u64>() * self.patch_indices.len(),
494                )
495            };
496
497            // Write the patch indices
498            result.extend_from_slice(patch_indices_bytes);
499
500            // Write the patch values
501            let patch_values_bytes = unsafe {
502                std::slice::from_raw_parts(
503                    self.patch_values.as_ptr() as *const u8,
504                    std::mem::size_of::<T::Native>() * self.patch_indices.len(),
505                )
506            };
507            result.extend_from_slice(patch_values_bytes);
508        }
509        let padding = ((result.len() + 7) & !7) - result.len();
510
511        // Add padding before writing bit-packed array
512        for _i in 0..padding {
513            result.push(0);
514        }
515
516        // Serialize bit-packed values
517        self.bit_packed.to_bytes(&mut result);
518
519        result
520    }
521
522    /// Deserialize a LiquidFloatArray from bytes, using zero-copy where possible.
523    pub fn from_bytes(bytes: Bytes) -> Self {
524        let header = LiquidIPCHeader::from_bytes(&bytes);
525
526        // Verify the type id
527        let physical_id = header.physical_type_id;
528        assert_eq!(physical_id, get_physical_type_id::<T>());
529        let logical_id = header.logical_type_id;
530        assert_eq!(logical_id, LiquidDataType::Float as u16);
531
532        // Get the reference value
533        let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
534        let reference_value = unsafe {
535            (ref_value_ptr as *const u8 as *const <T::SignedIntType as ArrowPrimitiveType>::Native)
536                .read_unaligned()
537        };
538
539        // Read exponents (e, f) & skip padding
540        let mut next = ((LiquidIPCHeader::size()
541            + std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>())
542            + 7)
543            & !7;
544
545        // Read exponent fields (1 byte each) and skip 6 padding bytes
546        let exponent_e = bytes[next];
547        let exponent_f = bytes[next + 1];
548        next += 8;
549
550        // Read patch length (8 bytes)
551        let mut patch_length = 0u64;
552        patch_length |= bytes[next] as u64;
553        patch_length |= (bytes[next + 1] as u64) << 8;
554        patch_length |= (bytes[next + 2] as u64) << 16;
555        patch_length |= (bytes[next + 3] as u64) << 24;
556        patch_length |= (bytes[next + 4] as u64) << 32;
557        patch_length |= (bytes[next + 5] as u64) << 40;
558        patch_length |= (bytes[next + 6] as u64) << 48;
559        patch_length |= (bytes[next + 7] as u64) << 56;
560        next += 8;
561
562        // Read patch indices
563        let mut patch_indices = Vec::new();
564        let mut patch_values = Vec::new();
565        if patch_length > 0 {
566            let count = patch_length as usize;
567            let idx_bytes = count * std::mem::size_of::<u64>();
568            let val_bytes = count * std::mem::size_of::<T::Native>();
569
570            let indices_slice = bytes.slice(next..next + idx_bytes);
571            next += idx_bytes;
572            patch_indices = unsafe {
573                let ptr = indices_slice.as_ptr() as *const u64;
574                std::slice::from_raw_parts(ptr, count).to_vec()
575            };
576
577            let values_slice = bytes.slice(next..next + val_bytes);
578            next += val_bytes;
579            patch_values = unsafe {
580                let ptr = values_slice.as_ptr() as *const T::Native;
581                std::slice::from_raw_parts(ptr, count).to_vec()
582            };
583        }
584
585        // Align up to 8 bytes for bit-packed array
586        next = (next + 7) & !7;
587
588        let bit_packed = BitPackedArray::<T::UnsignedIntType>::from_bytes(bytes.slice(next..));
589
590        Self {
591            exponent: Exponents {
592                e: exponent_e,
593                f: exponent_f,
594            },
595            bit_packed,
596            patch_indices,
597            patch_values,
598            reference_value,
599            squeeze_policy: FloatSqueezePolicy::Quantize,
600        }
601    }
602}
603
604#[derive(Debug, Copy, Clone, PartialEq, Eq)]
605pub struct Exponents {
606    pub(crate) e: u8,
607    pub(crate) f: u8,
608}
609
610fn encode_arrow_array<T: LiquidFloatType>(
611    arrow_array: &PrimitiveArray<T>,
612    exp: &Exponents, // fill_value: &mut Option<<T::UnsignedIntType as ArrowPrimitiveType>::Native>
613) -> LiquidFloatArray<T> {
614    let mut patch_indices: Vec<u64> = Vec::new();
615    let mut patch_values: Vec<T::Native> = Vec::new();
616    let mut patch_count: usize = 0;
617    let mut fill_value: Option<<T::SignedIntType as ArrowPrimitiveType>::Native> = None;
618    let values = arrow_array.values();
619    let nulls = arrow_array.nulls();
620
621    // All values are null
622    if arrow_array.null_count() == arrow_array.len() {
623        return LiquidFloatArray::<T> {
624            bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
625            exponent: Exponents { e: 0, f: 0 },
626            patch_indices: Vec::new(),
627            patch_values: Vec::new(),
628            reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native::ZERO,
629            squeeze_policy: FloatSqueezePolicy::Quantize,
630        };
631    }
632
633    let mut encoded_values = Vec::with_capacity(arrow_array.len());
634    for v in values.iter() {
635        let encoded = T::encode_single_unchecked(&v.as_(), exp);
636        let decoded = T::decode_single(&encoded, exp);
637        // TODO(): Check if this is a bitwise comparison
638        let neq = !decoded.eq(&v.as_()) as usize;
639        patch_count += neq;
640        encoded_values.push(encoded);
641    }
642
643    if patch_count > 0 {
644        patch_indices.resize_with(patch_count + 1, Default::default);
645        patch_values.resize_with(patch_count + 1, Default::default);
646        let mut patch_index: usize = 0;
647
648        for i in 0..encoded_values.len() {
649            let decoded = T::decode_single(&encoded_values[i], exp);
650            patch_indices[patch_index] = i.as_();
651            patch_values[patch_index] = arrow_array.value(i).as_();
652            patch_index += !(decoded.eq(&values[i].as_())) as usize;
653        }
654        assert_eq!(patch_index, patch_count);
655        unsafe {
656            patch_indices.set_len(patch_count);
657            patch_values.set_len(patch_count);
658        }
659    }
660
661    // find the first successfully encoded value (i.e., not patched)
662    // this is our fill value for missing values
663    if patch_count > 0 && patch_count < arrow_array.len() {
664        for i in 0..encoded_values.len() {
665            if i >= patch_indices.len() || patch_indices[i] != i as u64 {
666                fill_value = encoded_values.get(i).copied();
667                break;
668            }
669        }
670    }
671
672    // replace the patched values in the encoded array with the fill value
673    // for better downstream compression
674    if let Some(fill_value) = fill_value {
675        // handle the edge case where the first N >= 1 chunks are all patches
676        for patch_idx in &patch_indices {
677            encoded_values[*patch_idx as usize] = fill_value;
678        }
679    }
680
681    let min = *encoded_values
682        .iter()
683        .min()
684        .expect("`encoded_values` shouldn't be all nulls");
685    let max = *encoded_values
686        .iter()
687        .max()
688        .expect("`encoded_values` shouldn't be all nulls");
689    let sub: <T::UnsignedIntType as ArrowPrimitiveType>::Native = max.sub_wrapping(min).as_();
690
691    let unsigned_encoded_values = encoded_values
692        .iter()
693        .map(|v| {
694            let k: <T::UnsignedIntType as ArrowPrimitiveType>::Native = v.sub_wrapping(min).as_();
695            k
696        })
697        .collect::<Vec<_>>();
698    let encoded_output = PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
699        ScalarBuffer::from(unsigned_encoded_values),
700        nulls.cloned(),
701    );
702
703    let bit_width = get_bit_width(sub.as_());
704    let bit_packed_array = BitPackedArray::from_primitive(encoded_output, bit_width);
705
706    LiquidFloatArray::<T> {
707        bit_packed: bit_packed_array,
708        exponent: *exp,
709        patch_indices,
710        patch_values,
711        reference_value: min,
712        squeeze_policy: FloatSqueezePolicy::Quantize,
713    }
714}
715
716fn get_best_exponents<T: LiquidFloatType>(arrow_array: &PrimitiveArray<T>) -> Exponents {
717    let mut best_exponents = Exponents { e: 0, f: 0 };
718    let mut min_encoded_size: usize = usize::MAX;
719
720    let sample_arrow_array: Option<PrimitiveArray<T>> =
721        (arrow_array.len() > NUM_SAMPLES).then(|| {
722            arrow_array
723                .iter()
724                .step_by(arrow_array.len() / NUM_SAMPLES)
725                .filter(|s| s.is_some())
726                .collect()
727        });
728
729    for e in 0..T::MAX_EXPONENT {
730        for f in 0..e {
731            let exp = Exponents { e, f };
732            let liquid_array =
733                encode_arrow_array(sample_arrow_array.as_ref().unwrap_or(arrow_array), &exp);
734            if liquid_array.get_array_memory_size() < min_encoded_size {
735                best_exponents = exp;
736                min_encoded_size = liquid_array.get_array_memory_size();
737            }
738        }
739    }
740    best_exponents
741}
742
743#[derive(Debug)]
744struct LiquidFloatQuantizedArray<T: LiquidFloatType> {
745    exponent: Exponents,
746    quantized: BitPackedArray<T::UnsignedIntType>,
747    reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
748    bucket_width: u8, // Width of each bucket (in bits)
749    disk_range: std::ops::Range<u64>,
750    io: Arc<dyn SqueezeIoHandler>,
751    patch_indices: Vec<u64>,
752    patch_values: Vec<T::Native>,
753}
754
755impl<T> LiquidFloatQuantizedArray<T>
756where
757    T: LiquidFloatType,
758{
759    #[allow(dead_code)]
760    fn as_any(&self) -> &dyn Any {
761        self
762    }
763
764    #[inline]
765    fn len(&self) -> usize {
766        self.quantized.len()
767    }
768
769    fn new_from_filtered(
770        &self,
771        filtered: PrimitiveArray<<T as LiquidFloatType>::UnsignedIntType>,
772    ) -> Self {
773        let bit_width = self
774            .quantized
775            .bit_width()
776            .expect("quantized bit width must exist");
777        let quantized = BitPackedArray::from_primitive(filtered, bit_width);
778        Self {
779            exponent: self.exponent,
780            quantized,
781            reference_value: self.reference_value,
782            bucket_width: self.bucket_width,
783            io: self.io.clone(),
784            patch_indices: self.patch_indices.clone(),
785            patch_values: self.patch_values.clone(),
786            disk_range: self.disk_range.clone(),
787        }
788    }
789
790    fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
791        let q_prim: PrimitiveArray<T::UnsignedIntType> = self.quantized.to_primitive();
792        let selection = BooleanArray::new(selection.clone(), None);
793        let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
794        let filtered = filtered.as_primitive::<T::UnsignedIntType>().clone();
795        self.new_from_filtered(filtered)
796    }
797
798    async fn hydrate_full_arrow(&self) -> ArrayRef {
799        let bytes = self
800            .io
801            .read(Some(self.disk_range.clone()))
802            .await
803            .expect("read squeezed backing");
804        let liquid = crate::liquid_array::ipc::read_from_bytes(
805            bytes,
806            &crate::liquid_array::ipc::LiquidIPCContext::new(None),
807        );
808        liquid.to_arrow_array()
809    }
810
811    #[inline]
812    fn handle_eq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
813        if k < lo || k > hi { Some(false) } else { None }
814    }
815
816    #[inline]
817    fn handle_neq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
818        if k < lo || k > hi { Some(true) } else { None }
819    }
820
821    #[inline]
822    fn handle_lt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
823        if k <= lo {
824            Some(false)
825        } else if hi < k {
826            Some(true)
827        } else {
828            None
829        }
830    }
831
832    #[inline]
833    fn handle_lteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
834        if k < lo {
835            Some(false)
836        } else if hi <= k {
837            Some(true)
838        } else {
839            None
840        }
841    }
842
843    #[inline]
844    fn handle_gt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
845        if k < lo {
846            Some(true)
847        } else if hi <= k {
848            Some(false)
849        } else {
850            None
851        }
852    }
853
854    #[inline]
855    fn handle_gteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
856        if k <= lo {
857            Some(true)
858        } else if hi < k {
859            Some(false)
860        } else {
861            None
862        }
863    }
864
865    fn try_eval_predicate_inner(
866        &self,
867        op: &Operator,
868        literal: &Literal,
869    ) -> SqueezeResult<Option<BooleanArray>> {
870        // Extract scalar value as T::Native
871        let k_opt: Option<T::Native> = match literal.value() {
872            ScalarValue::Int8(Some(v)) => T::Native::from_i8(*v),
873            ScalarValue::Int16(Some(v)) => T::Native::from_i16(*v),
874            ScalarValue::Int32(Some(v)) => T::Native::from_i32(*v),
875            ScalarValue::Int64(Some(v)) => T::Native::from_i64(*v),
876            ScalarValue::UInt8(Some(v)) => T::Native::from_u8(*v),
877            ScalarValue::UInt16(Some(v)) => T::Native::from_u16(*v),
878            ScalarValue::UInt32(Some(v)) => T::Native::from_u32(*v),
879            ScalarValue::UInt64(Some(v)) => T::Native::from_u64(*v),
880            ScalarValue::Date32(Some(v)) => T::Native::from_i32(*v),
881            ScalarValue::Date64(Some(v)) => T::Native::from_i64(*v),
882            ScalarValue::Float32(Some(v)) => T::Native::from_f32(*v),
883            ScalarValue::Float64(Some(v)) => T::Native::from_f64(*v),
884            _ => None,
885        };
886        let Some(k) = k_opt else { return Ok(None) };
887        let q_prim = self.quantized.to_primitive();
888        let (_dt, values, _nulls) = q_prim.into_parts();
889
890        let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
891        let mut next_patch_index = 0;
892        let mut ignore_patches = false;
893        if self.patch_indices.is_empty() {
894            ignore_patches = true;
895        }
896        let comp_fn = match op {
897            Operator::Eq => Self::handle_eq,
898            Operator::NotEq => Self::handle_neq,
899            Operator::Lt => Self::handle_lt,
900            Operator::LtEq => Self::handle_lteq,
901            Operator::Gt => Self::handle_gt,
902            Operator::GtEq => Self::handle_gteq,
903        };
904        // TODO(): This might not be very vectorization-friendly right now. Figure out optimizations
905        for (i, &b) in values.iter().enumerate() {
906            if let Some(nulls) = self.quantized.nulls()
907                && !nulls.is_valid(i)
908            {
909                out_vals.push(false);
910                continue;
911            }
912            if !ignore_patches && i as u64 == self.patch_indices[next_patch_index] {
913                next_patch_index += 1;
914                if next_patch_index == self.patch_indices.len() {
915                    ignore_patches = true;
916                }
917                out_vals.push(false);
918                continue;
919            }
920
921            let val: <T::SignedIntType as ArrowPrimitiveType>::Native = b.as_();
922            let lo = (val << self.bucket_width).add_wrapping(self.reference_value);
923            let hi = ((val.add_wrapping(1i32.into())) << self.bucket_width)
924                .add_wrapping(self.reference_value);
925            let val_lower = T::decode_single(&lo, &self.exponent);
926            let val_higher = T::decode_single(&hi, &self.exponent);
927
928            let decided = comp_fn(val_lower, val_higher, k);
929            if let Some(v) = decided {
930                out_vals.push(v);
931            } else {
932                return Err(NeedsBacking);
933            }
934        }
935
936        // Handle patches separately
937        // TODO(): Vectorize this
938        for (idx, patch_idx) in self.patch_indices.iter().enumerate() {
939            let patch_value = self.patch_values[idx];
940            out_vals[*patch_idx as usize] = match op {
941                Operator::Eq => patch_value == k,
942                Operator::NotEq => patch_value != k,
943                Operator::Lt => patch_value < k,
944                Operator::LtEq => patch_value <= k,
945                Operator::Gt => patch_value > k,
946                Operator::GtEq => patch_value >= k,
947            }
948        }
949
950        let bool_buf = arrow::buffer::BooleanBuffer::from_iter(out_vals);
951        let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
952        Ok(Some(out))
953    }
954}
955
956#[async_trait::async_trait]
957impl<T> LiquidSqueezedArray for LiquidFloatQuantizedArray<T>
958where
959    T: LiquidFloatType,
960{
961    fn as_any(&self) -> &dyn Any {
962        self
963    }
964
965    fn get_array_memory_size(&self) -> usize {
966        self.quantized.get_array_memory_size()
967            + size_of::<Exponents>()
968            + self.patch_indices.capacity() * size_of::<u64>()
969            + self.patch_values.capacity() * size_of::<T::Native>()
970            + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
971    }
972
973    fn len(&self) -> usize {
974        LiquidFloatQuantizedArray::<T>::len(self)
975    }
976
977    async fn to_arrow_array(&self) -> ArrayRef {
978        self.hydrate_full_arrow().await
979    }
980
981    fn data_type(&self) -> LiquidDataType {
982        LiquidDataType::Float
983    }
984
985    fn original_arrow_data_type(&self) -> DataType {
986        T::DATA_TYPE.clone()
987    }
988
989    async fn try_eval_predicate(
990        &self,
991        expr: &Arc<dyn PhysicalExpr>,
992        filter: &BooleanBuffer,
993    ) -> Option<BooleanArray> {
994        // Apply selection first to reduce input rows
995        let filtered = self.filter_inner(filter);
996
997        if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>()
998            && let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>()
999        {
1000            let op = binary_expr.op();
1001            let supported_op = Operator::from_datafusion(op);
1002            if let Some(supported_op) = supported_op {
1003                match filtered.try_eval_predicate_inner(&supported_op, literal) {
1004                    Ok(Some(mask)) => return Some(mask),
1005                    Ok(None) => return None,
1006                    Err(NeedsBacking) => {}
1007                }
1008
1009                // Fallback: hydrate full Arrow and evaluate predicate on filtered rows.
1010                use arrow::array::cast::AsArray;
1011                use datafusion::logical_expr::ColumnarValue;
1012                use datafusion::physical_expr_common::datum::apply_cmp;
1013
1014                let full = self.hydrate_full_arrow().await;
1015                let selection_array = BooleanArray::new(filter.clone(), None);
1016                let filtered_arr = arrow::compute::filter(&full, &selection_array).ok()?;
1017                let filtered_len = filtered_arr.len();
1018
1019                let lhs = ColumnarValue::Array(filtered_arr);
1020                let rhs = ColumnarValue::Scalar(literal.value().clone());
1021                let result = match op {
1022                    datafusion::logical_expr::Operator::NotEq => {
1023                        apply_cmp(datafusion::logical_expr::Operator::NotEq, &lhs, &rhs)
1024                    }
1025                    datafusion::logical_expr::Operator::Eq => {
1026                        apply_cmp(datafusion::logical_expr::Operator::Eq, &lhs, &rhs)
1027                    }
1028                    datafusion::logical_expr::Operator::Lt => {
1029                        apply_cmp(datafusion::logical_expr::Operator::Lt, &lhs, &rhs)
1030                    }
1031                    datafusion::logical_expr::Operator::LtEq => {
1032                        apply_cmp(datafusion::logical_expr::Operator::LtEq, &lhs, &rhs)
1033                    }
1034                    datafusion::logical_expr::Operator::Gt => {
1035                        apply_cmp(datafusion::logical_expr::Operator::Gt, &lhs, &rhs)
1036                    }
1037                    datafusion::logical_expr::Operator::GtEq => {
1038                        apply_cmp(datafusion::logical_expr::Operator::GtEq, &lhs, &rhs)
1039                    }
1040                    _ => return None,
1041                };
1042                let result = result.ok()?;
1043                return Some(result.into_array(filtered_len).ok()?.as_boolean().clone());
1044            }
1045        }
1046        None
1047    }
1048}
1049
1050#[cfg(test)]
1051mod tests {
1052    use datafusion::logical_expr::Operator;
1053    use futures::executor::block_on;
1054    use rand::{RngExt as _, SeedableRng as _, distr::uniform::SampleUniform, rngs::StdRng};
1055
1056    use crate::cache::TestSqueezeIo;
1057
1058    use super::*;
1059
1060    macro_rules! test_roundtrip {
1061        ($test_name: ident, $type:ty, $values: expr) => {
1062            #[test]
1063            fn $test_name() {
1064                let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
1065                let array = PrimitiveArray::<$type>::from(original.clone());
1066
1067                // Convert to Liquid array and back
1068                let liquid_array = LiquidFloatArray::<$type>::from_arrow_array(array.clone());
1069                let result_array = liquid_array.to_arrow_array();
1070                let bytes_array =
1071                    LiquidFloatArray::<$type>::from_bytes(liquid_array.to_bytes().into());
1072
1073                assert_eq!(result_array.as_ref(), &array);
1074                assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
1075            }
1076        };
1077    }
1078
1079    // Test cases for Float32
1080    test_roundtrip!(
1081        test_float32_roundtrip_basic,
1082        Float32Type,
1083        vec![Some(-1.0), Some(1.0), Some(0.0)]
1084    );
1085
1086    test_roundtrip!(
1087        test_float32_roundtrip_with_nones,
1088        Float32Type,
1089        vec![Some(-1.0), Some(1.0), Some(0.0), None]
1090    );
1091
1092    test_roundtrip!(
1093        test_float32_roundtrip_all_nones,
1094        Float32Type,
1095        vec![None, None, None, None]
1096    );
1097
1098    test_roundtrip!(test_float32_roundtrip_empty, Float32Type, vec![]);
1099
1100    // Test cases for Float64
1101    test_roundtrip!(
1102        test_float64_roundtrip_basic,
1103        Float64Type,
1104        vec![Some(-1.0), Some(1.0), Some(0.0)]
1105    );
1106
1107    test_roundtrip!(
1108        test_float64_roundtrip_with_nones,
1109        Float64Type,
1110        vec![Some(-1.0), Some(1.0), Some(0.0), None]
1111    );
1112
1113    test_roundtrip!(
1114        test_float64_roundtrip_all_nones,
1115        Float64Type,
1116        vec![None, None, None, None]
1117    );
1118
1119    test_roundtrip!(test_float64_roundtrip_empty, Float64Type, vec![]);
1120
1121    // Tests with ilters
1122    #[test]
1123    fn test_filter_basic() {
1124        // Create original array with some values
1125        let original = vec![Some(1.0), Some(2.1), Some(3.2), None, Some(5.5)];
1126        let array = PrimitiveArray::<Float32Type>::from(original);
1127        let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1128
1129        // Create selection mask: keep indices 0, 2, and 4
1130        let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
1131
1132        // Apply filter
1133        let result_array = liquid_array.filter(&selection);
1134
1135        // Expected result after filtering
1136        let expected = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(3.2), Some(5.5)]);
1137
1138        assert_eq!(result_array.as_ref(), &expected);
1139    }
1140
1141    #[test]
1142    fn test_original_arrow_data_type_returns_float32() {
1143        let array = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.5)]);
1144        let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1145        assert_eq!(liquid.original_arrow_data_type(), DataType::Float32);
1146    }
1147
1148    #[test]
1149    fn test_filter_all_nulls() {
1150        // Create array with all nulls
1151        let original = vec![None, None, None, None];
1152        let array = PrimitiveArray::<Float32Type>::from(original);
1153        let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1154
1155        // Keep first and last elements
1156        let selection = BooleanBuffer::from(vec![true, false, false, true]);
1157
1158        let result_array = liquid_array.filter(&selection);
1159
1160        let expected = PrimitiveArray::<Float32Type>::from(vec![None, None]);
1161
1162        assert_eq!(result_array.as_ref(), &expected);
1163    }
1164
1165    #[test]
1166    fn test_filter_empty_result() {
1167        let original = vec![Some(1.0), Some(2.1), Some(3.3)];
1168        let array = PrimitiveArray::<Float32Type>::from(original);
1169        let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1170
1171        // Filter out all elements
1172        let selection = BooleanBuffer::from(vec![false, false, false]);
1173
1174        let result_array = liquid_array.filter(&selection);
1175
1176        assert_eq!(result_array.len(), 0);
1177    }
1178
1179    #[test]
1180    fn test_compression_f32_f64() {
1181        fn run_compression_test<T: LiquidFloatType>(
1182            type_name: &str,
1183            data_fn: impl Fn(usize) -> T::Native,
1184        ) {
1185            let original: Vec<T::Native> = (0..2000).map(data_fn).collect();
1186            let array = PrimitiveArray::<T>::from_iter_values(original);
1187            let uncompressed_size = array.get_array_memory_size();
1188
1189            let liquid_array = LiquidFloatArray::<T>::from_arrow_array(array);
1190            let compressed_size = liquid_array.get_array_memory_size();
1191
1192            println!(
1193                "Type: {type_name}, uncompressed_size: {uncompressed_size}, compressed_size: {compressed_size}"
1194            );
1195            // Assert that compression actually reduced the size
1196            assert!(
1197                compressed_size < uncompressed_size,
1198                "{type_name} compression failed to reduce size"
1199            );
1200        }
1201
1202        // Run for f32
1203        run_compression_test::<Float32Type>("f32", |i| i as f32);
1204
1205        // Run for f64
1206        run_compression_test::<Float64Type>("f64", |i| i as f64);
1207    }
1208
1209    //  --------- Hybrid (squeeze) tests ----------
1210    fn make_f_array_with_range<T>(
1211        len: usize,
1212        base_min: T::Native,
1213        range: T::Native,
1214        null_prob: f32,
1215        rng: &mut StdRng,
1216    ) -> PrimitiveArray<T>
1217    where
1218        T: LiquidFloatType,
1219        <T as arrow::array::ArrowPrimitiveType>::Native: SampleUniform,
1220        PrimitiveArray<T>: From<Vec<Option<<T as ArrowPrimitiveType>::Native>>>,
1221    {
1222        let mut vals: Vec<Option<T::Native>> = Vec::with_capacity(len);
1223        for _ in 0..len {
1224            if rng.random_bool(null_prob as f64) {
1225                vals.push(None);
1226            } else {
1227                vals.push(Some(rng.random_range(base_min..(base_min + range))));
1228            }
1229        }
1230        PrimitiveArray::<T>::from(vals)
1231    }
1232
1233    #[test]
1234    fn hybrid_squeeze_unsqueezable_small_range() {
1235        let mut rng = StdRng::seed_from_u64(0x51_71);
1236        let arr = make_f_array_with_range::<Float32Type>(64, 10_000.0, 100.0, 0.1, &mut rng);
1237        let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(arr);
1238        assert!(
1239            liquid
1240                .squeeze(Arc::new(TestSqueezeIo::default()), None)
1241                .is_none()
1242        );
1243    }
1244
1245    #[test]
1246    fn hybrid_squeeze_full_read_roundtrip_f32() {
1247        let mut rng = StdRng::seed_from_u64(0x51_72);
1248        let arr = make_f_array_with_range::<Float32Type>(
1249            2000,
1250            -50_000.0,
1251            (1 << 16) as f32,
1252            0.1,
1253            &mut rng,
1254        );
1255        let liq = LiquidFloatArray::<Float32Type>::from_arrow_array(arr.clone());
1256        let bytes_baseline = liq.to_bytes();
1257        let io = Arc::new(TestSqueezeIo::default());
1258        let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1259        io.set_bytes(bytes.clone());
1260        // ensure we can recover the original by hydrating from full bytes
1261        let recovered = LiquidFloatArray::<Float32Type>::from_bytes(bytes.clone());
1262        assert_eq!(
1263            recovered.to_arrow_array().as_primitive::<Float32Type>(),
1264            &arr
1265        );
1266        assert_eq!(bytes_baseline, recovered.to_bytes());
1267
1268        let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1269        let mask = BooleanBuffer::from(vec![true; arr.len()]);
1270        let build_expr =
1271            |op: Operator, k: f32| -> Arc<dyn datafusion::physical_plan::PhysicalExpr> {
1272                let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(k))));
1273                Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1274            };
1275
1276        // Expect resolvable results without IO
1277        let resolvable_cases: Vec<(Operator, f32, bool)> = vec![
1278            (Operator::Eq, min - 1.0, false),   // eq false everywhere
1279            (Operator::NotEq, min - 1.0, true), // neq true everywhere
1280            (Operator::Lt, min, false),         // lt false everywhere
1281            (Operator::LtEq, min - 1.0, false), // lte false everywhere
1282            (Operator::Gt, min - 1.0, true),    // gt true everywhere
1283            (Operator::GtEq, min, true),        // gte true everywhere
1284        ];
1285
1286        for (op, k, expected_const) in resolvable_cases {
1287            let expr = build_expr(op, k);
1288            io.reset_reads();
1289            let got = block_on(hybrid.try_eval_predicate(&expr, &mask)).expect("supported");
1290            let expected = {
1291                let vals: Vec<Option<bool>> = (0..arr.len())
1292                    .map(|i| {
1293                        if arr.is_null(i) {
1294                            None
1295                        } else {
1296                            Some(expected_const)
1297                        }
1298                    })
1299                    .collect();
1300                BooleanArray::from(vals)
1301            };
1302            assert_eq!(io.reads(), 0);
1303            assert_eq!(got, expected);
1304        }
1305
1306        // Unresolvable for Eq: pick a present value (ensures ambiguous bucket)
1307        let k_present = (0..arr.len())
1308            .find_map(|i| {
1309                if arr.is_null(i) {
1310                    None
1311                } else {
1312                    Some(arr.value(i))
1313                }
1314            })
1315            .unwrap();
1316        let expr_eq_present = build_expr(Operator::Eq, k_present);
1317        io.reset_reads();
1318        let got = block_on(hybrid.try_eval_predicate(&expr_eq_present, &mask)).expect("supported");
1319        let expected = {
1320            let vals: Vec<Option<bool>> = (0..arr.len())
1321                .map(|i| {
1322                    if arr.is_null(i) {
1323                        None
1324                    } else {
1325                        Some(arr.value(i) == k_present)
1326                    }
1327                })
1328                .collect();
1329            BooleanArray::from(vals)
1330        };
1331        assert!(io.reads() > 0);
1332        assert_eq!(got, expected);
1333    }
1334
1335    #[test]
1336    fn hybrid_squeeze_full_read_roundtrip_f64() {
1337        let mut rng = StdRng::seed_from_u64(0x51_72);
1338        let arr = make_f_array_with_range::<Float64Type>(
1339            2000,
1340            -50_000.0f64,
1341            (1 << 16) as f64,
1342            0.1,
1343            &mut rng,
1344        );
1345        let liq = LiquidFloatArray::<Float64Type>::from_arrow_array(arr.clone());
1346        let bytes_baseline = liq.to_bytes();
1347        let io = Arc::new(TestSqueezeIo::default());
1348        let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1349        io.set_bytes(bytes.clone());
1350        // ensure we can recover the original by hydrating from full bytes
1351        let recovered = LiquidFloatArray::<Float64Type>::from_bytes(bytes.clone());
1352        assert_eq!(
1353            recovered.to_arrow_array().as_primitive::<Float64Type>(),
1354            &arr
1355        );
1356        assert_eq!(bytes_baseline, recovered.to_bytes());
1357
1358        let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1359        let mask = BooleanBuffer::from(vec![true; arr.len()]);
1360        let build_expr =
1361            |op: Operator, k: f64| -> Arc<dyn datafusion::physical_plan::PhysicalExpr> {
1362                let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(k))));
1363                Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1364            };
1365
1366        // Expect resolvable results without IO
1367        let resolvable_cases: Vec<(Operator, f64, bool)> = vec![
1368            (Operator::Eq, min - 1.0, false),   // eq false everywhere
1369            (Operator::NotEq, min - 1.0, true), // neq true everywhere
1370            (Operator::Lt, min, false),         // lt false everywhere
1371            (Operator::LtEq, min - 1.0, false), // lte false everywhere
1372            (Operator::Gt, min - 1.0, true),    // gt true everywhere
1373            (Operator::GtEq, min, true),        // gte true everywhere
1374        ];
1375
1376        for (op, k, expected_const) in resolvable_cases {
1377            let expr = build_expr(op, k);
1378            io.reset_reads();
1379            let got = block_on(hybrid.try_eval_predicate(&expr, &mask)).expect("supported");
1380            let expected = {
1381                let vals: Vec<Option<bool>> = (0..arr.len())
1382                    .map(|i| {
1383                        if arr.is_null(i) {
1384                            None
1385                        } else {
1386                            Some(expected_const)
1387                        }
1388                    })
1389                    .collect();
1390                BooleanArray::from(vals)
1391            };
1392            assert_eq!(io.reads(), 0);
1393            assert_eq!(got, expected);
1394        }
1395
1396        // Unresolvable for Eq: pick a present value (ensures ambiguous bucket)
1397        let k_present = (0..arr.len())
1398            .find_map(|i| {
1399                if arr.is_null(i) {
1400                    None
1401                } else {
1402                    Some(arr.value(i))
1403                }
1404            })
1405            .unwrap();
1406        let expr_eq_present = build_expr(Operator::Eq, k_present);
1407        io.reset_reads();
1408        let got = block_on(hybrid.try_eval_predicate(&expr_eq_present, &mask)).expect("supported");
1409        let expected = {
1410            let vals: Vec<Option<bool>> = (0..arr.len())
1411                .map(|i| {
1412                    if arr.is_null(i) {
1413                        None
1414                    } else {
1415                        Some(arr.value(i) == k_present)
1416                    }
1417                })
1418                .collect();
1419            BooleanArray::from(vals)
1420        };
1421        assert!(io.reads() > 0);
1422        assert_eq!(got, expected);
1423    }
1424}