Skip to main content

liquid_cache/liquid_array/
linear_integer_array.rs

1use std::any::Any;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::PrimitiveKind;
7use super::{LiquidArray, LiquidDataType, LiquidPrimitiveType};
8use crate::cache::LiquidExpr;
9use crate::liquid_array::LiquidPrimitiveArray;
10use crate::liquid_array::eval_predicate_on_array;
11use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
12use arrow::array::{
13    Array, ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
14    cast::AsArray,
15    types::{
16        Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type,
17        UInt32Type, UInt64Type,
18    },
19};
20use arrow::buffer::{BooleanBuffer, ScalarBuffer};
21use arrow::compute::kernels::filter;
22use arrow_schema::DataType;
23use bytes::Bytes;
24use num_traits::{AsPrimitive, Bounded, FromPrimitive};
25
26/// A linear-model based integer array, **only use it when you know the array is monotonic** and **you don't care about encoding speed!**.
27///
28/// Under the hood, it uses a linear model to predict the values and store the residuals:
29/// value\[i\] = intercept + round(slope * i) + residual\[i\]
30///
31/// Where `intercept` and `slope` are computed using a L-infinity linear fit, **this is time-consuming!**.
32///
33/// This array is only recommended if you know the array follows a linear model, e.g., kinetic values, offsets, etc.
34///
35/// Examples of not recommended use cases: random values like ids, categorical values, etc.
36#[derive(Debug)]
37pub struct LiquidLinearArray<T: LiquidPrimitiveType>
38where
39    T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
40{
41    // Signed residuals, bit-packed as a Liquid primitive array of i64.
42    residuals: LiquidPrimitiveArray<Int64Type>,
43    // Intercept term stored as f64 for simpler math/IO.
44    intercept: f64,
45    // Slope term of the linear model.
46    slope: f64,
47    // Keep the logical type parameter.
48    _phantom: PhantomData<T>,
49}
50
51/// Backward-compatible alias for i32.
52pub type LiquidLinearI32Array = LiquidLinearArray<Int32Type>;
53/// Linear-model array for `i8`.
54pub type LiquidLinearI8Array = LiquidLinearArray<Int8Type>;
55/// Linear-model array for `i16`.
56pub type LiquidLinearI16Array = LiquidLinearArray<Int16Type>;
57/// Linear-model array for `i64`.
58pub type LiquidLinearI64Array = LiquidLinearArray<Int64Type>;
59/// Linear-model array for `u8`.
60pub type LiquidLinearU8Array = LiquidLinearArray<UInt8Type>;
61/// Linear-model array for `u16`.
62pub type LiquidLinearU16Array = LiquidLinearArray<UInt16Type>;
63/// Linear-model array for `u32`.
64pub type LiquidLinearU32Array = LiquidLinearArray<UInt32Type>;
65/// Linear-model array for `u64`.
66pub type LiquidLinearU64Array = LiquidLinearArray<UInt64Type>;
67/// Linear-model array for `Date32` (days since epoch).
68pub type LiquidLinearDate32Array = LiquidLinearArray<Date32Type>;
69/// Linear-model array for `Date64` (ms since epoch).
70pub type LiquidLinearDate64Array = LiquidLinearArray<Date64Type>;
71
72impl<T> LiquidLinearArray<T>
73where
74    T: LiquidPrimitiveType,
75    T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
76{
77    /// Build from an Arrow `PrimitiveArray<T>` by training a linear model
78    /// using a fast L-infinity fit (Option 3) and storing residuals.
79    pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> Self {
80        let len = arrow_array.len();
81
82        // All nulls
83        if arrow_array.null_count() == len {
84            // All nulls
85            let res = PrimitiveArray::<Int64Type>::new_null(len);
86            return Self {
87                residuals: LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res),
88                intercept: 0.0, // arbitrary, unused since all nulls
89                slope: 0.0,
90                _phantom: PhantomData,
91            };
92        }
93
94        // Prepare compact non-null buffers for fast fitting (avoid iterator Option cost).
95        let (nn_values, nn_indices) = collect_non_null_f64_and_indices::<T>(&arrow_array);
96
97        // Option 3 parameters: L-infinity (Chebyshev) regression.
98        let (mut intercept, mut slope) = fit_linf(&nn_values, &nn_indices);
99
100        // Compute residuals in one unified loop; also track ranges for fallback decision.
101        let mut residuals: Vec<i64> = Vec::with_capacity(len);
102        let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
103        let vals = arrow_array.values();
104        let nulls_opt = arrow_array.nulls();
105
106        // Original value range
107        let mut orig_min_u64 = u64::MAX;
108        let mut orig_max_u64 = 0u64;
109        let mut orig_min_i64 = i64::MAX;
110        let mut orig_max_i64 = i64::MIN;
111        // Residual range
112        let mut res_min = i64::MAX;
113        let mut res_max = i64::MIN;
114
115        if is_unsigned {
116            let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
117            for i in 0..len {
118                let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
119                if valid {
120                    type U<TT> =
121                        <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
122                    let v_u: U<T> = vals[i].as_();
123                    let v_u64: u64 = v_u.as_();
124                    if v_u64 < orig_min_u64 {
125                        orig_min_u64 = v_u64;
126                    }
127                    if v_u64 > orig_max_u64 {
128                        orig_max_u64 = v_u64;
129                    }
130                    let pr = slope * (i as f64) + intercept;
131                    let p = predict_u64_saturated(pr, max_u64);
132                    let (pos, mag) = if v_u64 >= p {
133                        (true, v_u64 - p)
134                    } else {
135                        (false, p - v_u64)
136                    };
137                    let m = (mag & (i64::MAX as u64)) as i64;
138                    let r = if pos { m } else { -m };
139                    if r < res_min {
140                        res_min = r;
141                    }
142                    if r > res_max {
143                        res_max = r;
144                    }
145                    residuals.push(r);
146                } else {
147                    residuals.push(0);
148                }
149            }
150        } else {
151            let (min_i64, max_i64): (i64, i64) =
152                (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
153            for i in 0..len {
154                let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
155                if valid {
156                    let v_i64: i64 = vals[i].as_();
157                    if v_i64 < orig_min_i64 {
158                        orig_min_i64 = v_i64;
159                    }
160                    if v_i64 > orig_max_i64 {
161                        orig_max_i64 = v_i64;
162                    }
163                    let pr = slope * (i as f64) + intercept;
164                    let p = predict_i64_saturated(pr, min_i64, max_i64);
165                    let r = v_i64 - p;
166                    if r < res_min {
167                        res_min = r;
168                    }
169                    if r > res_max {
170                        res_max = r;
171                    }
172                    residuals.push(r);
173                } else {
174                    residuals.push(0);
175                }
176            }
177        }
178
179        // Fallback: ensure residual range is strictly smaller than original range
180        let res_width: u128 = (res_max as i128 - res_min as i128) as u128;
181        let orig_width: u128 = if is_unsigned {
182            (orig_max_u64 as u128).saturating_sub(orig_min_u64 as u128)
183        } else {
184            (orig_max_i64 as i128 - orig_min_i64 as i128) as u128
185        };
186        if res_width >= orig_width {
187            // Rebuild residuals with zero model
188            intercept = 0.0;
189            slope = 0.0;
190            residuals.clear();
191            if is_unsigned {
192                for i in 0..len {
193                    let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
194                    if valid {
195                        type U<TT> = <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
196                        let v_u: U<T> = vals[i].as_();
197                        let v_u64: u64 = v_u.as_();
198                        let r = (v_u64 & (i64::MAX as u64)) as i64;
199                        residuals.push(r);
200                    } else {
201                        residuals.push(0);
202                    }
203                }
204            } else {
205                for i in 0..len {
206                    let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
207                    if valid {
208                        let v_i64: i64 = vals[i].as_();
209                        residuals.push(v_i64);
210                    } else {
211                        residuals.push(0);
212                    }
213                }
214            }
215        }
216        let residuals_buf: ScalarBuffer<i64> = ScalarBuffer::from(residuals);
217        let nulls = arrow_array.nulls().cloned();
218        let res_prim = PrimitiveArray::<Int64Type>::new(residuals_buf, nulls);
219        let residuals = LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res_prim);
220
221        Self {
222            residuals,
223            intercept,
224            slope,
225            _phantom: PhantomData,
226        }
227    }
228
229    fn len(&self) -> usize {
230        self.residuals.len()
231    }
232
233    fn residual_starting_loc() -> usize {
234        // Header + intercept(native) + slope(f64), aligned to 8 bytes boundary
235        let header_size =
236            LiquidIPCHeader::size() + std::mem::size_of::<f64>() + std::mem::size_of::<f64>();
237        (header_size + 7) & !7
238    }
239
240    fn to_bytes_inner(&self) -> Vec<u8> {
241        let header = LiquidIPCHeader::new(
242            LiquidDataType::LinearInteger as u16,
243            get_physical_type_id::<T>(),
244        );
245        let start = Self::residual_starting_loc();
246        let mut out = Vec::with_capacity(start + 256);
247
248        // Header
249        out.extend_from_slice(&header.to_bytes());
250        // Model params (f64 intercept, then f64 slope)
251        out.extend_from_slice(&self.intercept.to_le_bytes());
252        out.extend_from_slice(&self.slope.to_le_bytes());
253
254        while out.len() < start {
255            out.push(0);
256        }
257
258        // Encode residuals (already LiquidPrimitiveArray<Int64Type>)
259        out.extend_from_slice(&self.residuals.to_bytes_inner());
260        out
261    }
262
263    /// Decode a `LiquidLinearArray<T>` from bytes.
264    pub fn from_bytes(bytes: Bytes) -> Self {
265        let _hdr = LiquidIPCHeader::from_bytes(&bytes);
266
267        // Read intercept as f64
268        let intercept_off = LiquidIPCHeader::size();
269        let intercept =
270            f64::from_le_bytes(bytes[intercept_off..intercept_off + 8].try_into().unwrap());
271
272        // Read slope
273        let slope_off = intercept_off + std::mem::size_of::<f64>();
274        let slope = f64::from_le_bytes(bytes[slope_off..slope_off + 8].try_into().unwrap());
275
276        // Decode residuals
277        let start = Self::residual_starting_loc();
278        let res_bytes = bytes.slice(start..);
279        let residuals = LiquidPrimitiveArray::<Int64Type>::from_bytes(res_bytes);
280
281        Self {
282            residuals,
283            intercept,
284            slope,
285            _phantom: PhantomData,
286        }
287    }
288}
289
290impl<T> LiquidArray for LiquidLinearArray<T>
291where
292    T: LiquidPrimitiveType,
293    T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
294{
295    fn as_any(&self) -> &dyn Any {
296        self
297    }
298
299    fn original_arrow_data_type(&self) -> DataType {
300        T::DATA_TYPE.clone()
301    }
302
303    fn get_array_memory_size(&self) -> usize {
304        self.residuals.get_array_memory_size()
305            + std::mem::size_of::<f64>() // intercept
306            + std::mem::size_of::<f64>() // slope
307    }
308
309    fn len(&self) -> usize {
310        self.len()
311    }
312
313    fn to_arrow_array(&self) -> ArrayRef {
314        let arr = self.residuals.to_arrow_array();
315        let (_dt, residuals, nulls) = arr.as_primitive::<Int64Type>().clone().into_parts();
316
317        // Reconstruct final values: predicted(i) +/- |residual_i|
318        let mut final_values = Vec::<T::Native>::with_capacity(self.len());
319        let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
320        if is_unsigned {
321            let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
322            for (i, &e) in residuals.iter().enumerate() {
323                let pr = self.slope * (i as f64) + self.intercept;
324                let p = predict_u64_saturated(pr, max_u64);
325                let mag = e.unsigned_abs();
326                let sum = if e >= 0 {
327                    p.saturating_add(mag)
328                } else {
329                    p.saturating_sub(mag)
330                };
331                final_values.push(T::Native::from_u64(sum).unwrap());
332            }
333        } else {
334            let (min_i64, max_i64): (i64, i64) =
335                (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
336            for (i, &e) in residuals.iter().enumerate() {
337                let pr = self.slope * (i as f64) + self.intercept;
338                let p = predict_i64_saturated(pr, min_i64, max_i64);
339                let sum = p.saturating_add(e);
340                final_values.push(T::Native::from_i64(sum).unwrap());
341            }
342        }
343
344        let values_buf: ScalarBuffer<T::Native> = ScalarBuffer::from(final_values);
345        Arc::new(PrimitiveArray::<T>::new(values_buf, nulls))
346    }
347
348    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
349        let arr = self.to_arrow_array();
350        let selection = BooleanArray::new(selection.clone(), None);
351        filter::filter(&arr, &selection).unwrap()
352    }
353
354    fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
355        let arr = self.filter(filter);
356        eval_predicate_on_array(arr, predicate)
357    }
358
359    fn to_bytes(&self) -> Vec<u8> {
360        self.to_bytes_inner()
361    }
362
363    fn data_type(&self) -> LiquidDataType {
364        LiquidDataType::LinearInteger
365    }
366}
367
368#[inline]
369fn predict_u64_saturated(pred: f64, max_u64: u64) -> u64 {
370    if !pred.is_finite() || pred <= 0.0 {
371        0
372    } else if pred >= max_u64 as f64 {
373        max_u64
374    } else {
375        pred.round() as u64
376    }
377}
378
379#[inline]
380fn predict_i64_saturated(pred: f64, min_i64: i64, max_i64: i64) -> i64 {
381    if !pred.is_finite() {
382        0
383    } else if pred <= min_i64 as f64 {
384        min_i64
385    } else if pred >= max_i64 as f64 {
386        max_i64
387    } else {
388        pred.round() as i64
389    }
390}
391
392/// L-infinity linear fit for y\[i\] ≈ intercept + slope * i (before rounding),
393/// minimizing the maximum absolute error over all non-null points.
394///
395/// Approach: minimize R(m) = max_i (y_i - m i) - min_i (y_i - m i), which is
396/// convex in m. Given m, the best intercept is b = (max_i s_i + min_i s_i)/2
397/// where s_i = y_i - m i. We find m by a few rounds of bisection using the
398/// subgradient sign derived from the argmax/argmin indices. O(n) per iteration.
399fn fit_linf(values: &[f64], idxs: &[u32]) -> (f64, f64) {
400    let n = values.len();
401    assert_eq!(values.len(), idxs.len());
402    if n == 0 {
403        return (0.0, 0.0);
404    }
405    if n == 1 {
406        return (values[0], 0.0);
407    }
408
409    let mut slope_min = f64::INFINITY;
410    let mut slope_max = f64::NEG_INFINITY;
411    for k in 1..n {
412        let di = (idxs[k] - idxs[k - 1]) as f64;
413        if di > 0.0 {
414            let dv = values[k] - values[k - 1];
415            let s = dv / di;
416            if s < slope_min {
417                slope_min = s;
418            }
419            if s > slope_max {
420                slope_max = s;
421            }
422        }
423    }
424    if !slope_min.is_finite() || !slope_max.is_finite() {
425        slope_min = 0.0;
426        slope_max = 0.0;
427    }
428
429    let mut lo = slope_min.min(slope_max);
430    let mut hi = slope_min.max(slope_max);
431    if (hi - lo).abs() < 1e-12 {
432        let pad = if hi.abs() < 1.0 { 1.0 } else { hi.abs() * 1e-6 };
433        lo -= pad;
434        hi += pad;
435    }
436
437    #[inline]
438    fn range_stats(values: &[f64], idxs: &[u32], m: f64) -> (f64, u32, f64, u32) {
439        let mut min_s = f64::INFINITY;
440        let mut max_s = f64::NEG_INFINITY;
441        let mut i_min = 0u32;
442        let mut i_max = 0u32;
443        for k in 0..values.len() {
444            let i = idxs[k] as f64;
445            let s = values[k] - m * i;
446            if s < min_s {
447                min_s = s;
448                i_min = idxs[k];
449            }
450            if s > max_s {
451                max_s = s;
452                i_max = idxs[k];
453            }
454        }
455        (min_s, i_min, max_s, i_max)
456    }
457
458    const MAX_ITERS: usize = 8;
459    for _ in 0..MAX_ITERS {
460        let m = 0.5 * (lo + hi);
461        let (_min_s, i_min, _max_s, i_max) = range_stats(values, idxs, m);
462        let g = (i_min as i64) - (i_max as i64);
463        if g > 0 {
464            hi = m;
465        } else if g < 0 {
466            lo = m;
467        } else {
468            lo = m;
469            hi = m;
470            break;
471        }
472        if (hi - lo).abs() < 1e-12 {
473            break;
474        }
475    }
476
477    let m = 0.5 * (lo + hi);
478    let (min_s, _i_min, max_s, _i_max) = range_stats(values, idxs, m);
479    let b = 0.5 * (max_s + min_s);
480    (b, m)
481}
482
483#[inline]
484fn collect_non_null_f64_and_indices<T>(arr: &PrimitiveArray<T>) -> (Vec<f64>, Vec<u32>)
485where
486    T: LiquidPrimitiveType,
487    T::Native: AsPrimitive<f64>,
488{
489    let nn = arr.len() - arr.null_count();
490    let mut values = Vec::with_capacity(nn);
491    let mut idxs = Vec::with_capacity(nn);
492    let vals = arr.values();
493    if arr.null_count() == 0 {
494        for (i, v) in vals.iter().enumerate() {
495            values.push(v.as_());
496            idxs.push(i as u32);
497        }
498    } else {
499        let nulls = arr.nulls().unwrap();
500        for (i, v) in vals.iter().enumerate() {
501            if nulls.is_valid(i) {
502                values.push(v.as_());
503                idxs.push(i as u32);
504            }
505        }
506    }
507    (values, idxs)
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    fn roundtrip_eq(values: Vec<Option<i32>>) {
515        let arr = PrimitiveArray::<Int32Type>::from(values.clone());
516        let linear = LiquidLinearI32Array::from_arrow_array(arr.clone());
517        let decoded = linear.to_arrow_array();
518        assert_eq!(decoded.as_ref(), &arr);
519
520        let bytes = Bytes::from(linear.to_bytes());
521        let decoded = LiquidLinearI32Array::from_bytes(bytes);
522        let round = decoded.to_arrow_array();
523        assert_eq!(round.as_ref(), &arr);
524    }
525
526    macro_rules! roundtrip_eq_t {
527        ($T:ty, $values:expr) => {{
528            let arr = PrimitiveArray::<$T>::from(($values).clone());
529            let linear = LiquidLinearArray::<$T>::from_arrow_array(arr.clone());
530            let decoded = linear.to_arrow_array();
531            assert_eq!(decoded.as_ref(), &arr);
532
533            let bytes = Bytes::from(linear.to_bytes());
534            let decoded = LiquidLinearArray::<$T>::from_bytes(bytes);
535            let round = decoded.to_arrow_array();
536            assert_eq!(round.as_ref(), &arr);
537        }};
538    }
539
540    #[test]
541    fn test_roundtrip_basic() {
542        // Non-monotonic values to ensure we don't rely on simple increasing sequences
543        roundtrip_eq(vec![
544            Some(10),
545            Some(15),
546            Some(14),
547            Some(20),
548            Some(18),
549            Some(25),
550            Some(24),
551        ]);
552    }
553
554    #[test]
555    fn test_roundtrip_with_nulls() {
556        roundtrip_eq(vec![Some(10), None, Some(30), None, Some(50), Some(70)]);
557    }
558
559    #[test]
560    fn test_all_nulls() {
561        roundtrip_eq(vec![None, None, None, None]);
562    }
563
564    #[test]
565    fn test_single_value() {
566        roundtrip_eq(vec![Some(42)]);
567    }
568
569    #[test]
570    fn test_empty() {
571        roundtrip_eq(vec![]);
572    }
573
574    #[test]
575    fn test_negative_values() {
576        roundtrip_eq(vec![
577            Some(-100),
578            Some(-50),
579            Some(0),
580            Some(50),
581            Some(25),
582            None,
583            Some(-25),
584        ]);
585    }
586
587    #[test]
588    fn test_filter_basic() {
589        let original: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3), None, Some(5), Some(8)];
590        let arr = PrimitiveArray::<Int32Type>::from(original.clone());
591        let linear = LiquidLinearI32Array::from_arrow_array(arr);
592        let selection = BooleanBuffer::from(vec![true, false, true, false, true, false]);
593        let result = linear.filter(&selection);
594        let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
595        assert_eq!(result.as_ref(), &expected);
596    }
597
598    #[test]
599    fn test_original_arrow_data_type_returns_int32() {
600        let arr = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
601        let linear = LiquidLinearI32Array::from_arrow_array(arr);
602        assert_eq!(linear.original_arrow_data_type(), DataType::Int32);
603    }
604
605    #[test]
606    fn test_roundtrip_i8() {
607        roundtrip_eq_t!(Int8Type, vec![Some(-10), Some(0), Some(10), None, Some(20)]);
608    }
609
610    #[test]
611    fn test_roundtrip_i16() {
612        roundtrip_eq_t!(
613            Int16Type,
614            vec![Some(-1000), Some(0), Some(1000), None, Some(2000)]
615        );
616    }
617
618    #[test]
619    fn test_roundtrip_i64() {
620        roundtrip_eq_t!(
621            Int64Type,
622            vec![
623                Some(-10_000_000_000),
624                Some(0),
625                Some(10_000_000_000),
626                None,
627                Some(20_000_000_000),
628            ]
629        );
630    }
631
632    #[test]
633    fn test_roundtrip_u8() {
634        roundtrip_eq_t!(
635            UInt8Type,
636            vec![Some(0), Some(10), Some(200), None, Some(255)]
637        );
638    }
639
640    #[test]
641    fn test_roundtrip_u16() {
642        roundtrip_eq_t!(
643            UInt16Type,
644            vec![Some(0), Some(1000), Some(60000), None, Some(500)]
645        );
646    }
647
648    #[test]
649    fn test_roundtrip_u32() {
650        roundtrip_eq_t!(
651            UInt32Type,
652            vec![
653                Some(0),
654                Some(1_000_000),
655                Some(3_000_000_000),
656                None,
657                Some(123_456_789),
658            ]
659        );
660    }
661
662    #[test]
663    fn test_roundtrip_u64() {
664        roundtrip_eq_t!(
665            UInt64Type,
666            vec![
667                Some(0),
668                Some(10_000_000_000),
669                Some(9_000_000_000_000_000_000u64),
670                None,
671                Some(42),
672            ]
673        );
674    }
675
676    #[test]
677    fn test_roundtrip_date32() {
678        roundtrip_eq_t!(
679            Date32Type,
680            vec![Some(-365), Some(0), Some(365), None, Some(18262)]
681        );
682    }
683
684    #[test]
685    fn test_roundtrip_date64() {
686        roundtrip_eq_t!(
687            Date64Type,
688            vec![
689                Some(-86_400_000),
690                Some(0),
691                Some(86_400_000),
692                None,
693                Some(1_000_000_000_000),
694            ]
695        );
696    }
697
698    #[test]
699    fn test_compression() {
700        let original = (0..1_000_000).step_by(100).collect::<Vec<_>>();
701
702        let original = PrimitiveArray::<Int32Type>::from_iter_values(original);
703        let arrow_size = original.get_array_memory_size();
704
705        let liquid_linear = LiquidLinearI32Array::from_arrow_array(original.clone());
706        let liquid_linear_size = liquid_linear.get_array_memory_size();
707
708        let liquid_primitive =
709            LiquidPrimitiveArray::<Int32Type>::from_arrow_array(original.clone());
710        let liquid_primitive_size = liquid_primitive.get_array_memory_size();
711
712        println!(
713            "arrow_size: {arrow_size}, liquid_linear_size: {liquid_linear_size}, liquid_primitive_size: {liquid_primitive_size}",
714        );
715
716        assert!(liquid_linear_size < arrow_size);
717        assert!(liquid_primitive_size < arrow_size);
718        assert!(liquid_linear_size < liquid_primitive_size);
719
720        let original: ArrayRef = Arc::new(original);
721        assert_eq!(original.as_ref(), liquid_linear.to_arrow_array().as_ref());
722        assert_eq!(
723            original.as_ref(),
724            liquid_primitive.to_arrow_array().as_ref()
725        );
726    }
727}