Skip to main content

liquid_cache/liquid_array/
linear_integer_array.rs

1use std::any::Any;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::PrimitiveKind;
7use super::{LiquidArray, LiquidDataType, LiquidPrimitiveType};
8use crate::liquid_array::LiquidPrimitiveArray;
9use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
10use arrow::array::{
11    Array, ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
12    cast::AsArray,
13    types::{
14        Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type,
15        UInt32Type, UInt64Type,
16    },
17};
18use arrow::buffer::{BooleanBuffer, ScalarBuffer};
19use arrow::compute::kernels::filter;
20use arrow_schema::DataType;
21use bytes::Bytes;
22use num_traits::{AsPrimitive, Bounded, FromPrimitive};
23
24/// A linear-model based integer array, **only use it when you know the array is monotonic** and **you don't care about encoding speed!**.
25///
26/// Under the hood, it uses a linear model to predict the values and store the residuals:
27/// value\[i\] = intercept + round(slope * i) + residual\[i\]
28///
29/// Where `intercept` and `slope` are computed using a L-infinity linear fit, **this is time-consuming!**.
30///
31/// This array is only recommended if you know the array follows a linear model, e.g., kinetic values, offsets, etc.
32///
33/// Examples of not recommended use cases: random values like ids, categorical values, etc.
34#[derive(Debug)]
35pub struct LiquidLinearArray<T: LiquidPrimitiveType>
36where
37    T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
38{
39    // Signed residuals, bit-packed as a Liquid primitive array of i64.
40    residuals: LiquidPrimitiveArray<Int64Type>,
41    // Intercept term stored as f64 for simpler math/IO.
42    intercept: f64,
43    // Slope term of the linear model.
44    slope: f64,
45    // Keep the logical type parameter.
46    _phantom: PhantomData<T>,
47}
48
49/// Backward-compatible alias for i32.
50pub type LiquidLinearI32Array = LiquidLinearArray<Int32Type>;
51/// Linear-model array for `i8`.
52pub type LiquidLinearI8Array = LiquidLinearArray<Int8Type>;
53/// Linear-model array for `i16`.
54pub type LiquidLinearI16Array = LiquidLinearArray<Int16Type>;
55/// Linear-model array for `i64`.
56pub type LiquidLinearI64Array = LiquidLinearArray<Int64Type>;
57/// Linear-model array for `u8`.
58pub type LiquidLinearU8Array = LiquidLinearArray<UInt8Type>;
59/// Linear-model array for `u16`.
60pub type LiquidLinearU16Array = LiquidLinearArray<UInt16Type>;
61/// Linear-model array for `u32`.
62pub type LiquidLinearU32Array = LiquidLinearArray<UInt32Type>;
63/// Linear-model array for `u64`.
64pub type LiquidLinearU64Array = LiquidLinearArray<UInt64Type>;
65/// Linear-model array for `Date32` (days since epoch).
66pub type LiquidLinearDate32Array = LiquidLinearArray<Date32Type>;
67/// Linear-model array for `Date64` (ms since epoch).
68pub type LiquidLinearDate64Array = LiquidLinearArray<Date64Type>;
69
70impl<T> LiquidLinearArray<T>
71where
72    T: LiquidPrimitiveType,
73    T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
74{
75    /// Build from an Arrow `PrimitiveArray<T>` by training a linear model
76    /// using a fast L-infinity fit (Option 3) and storing residuals.
77    pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> Self {
78        let len = arrow_array.len();
79
80        // All nulls
81        if arrow_array.null_count() == len {
82            // All nulls
83            let res = PrimitiveArray::<Int64Type>::new_null(len);
84            return Self {
85                residuals: LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res),
86                intercept: 0.0, // arbitrary, unused since all nulls
87                slope: 0.0,
88                _phantom: PhantomData,
89            };
90        }
91
92        // Prepare compact non-null buffers for fast fitting (avoid iterator Option cost).
93        let (nn_values, nn_indices) = collect_non_null_f64_and_indices::<T>(&arrow_array);
94
95        // Option 3 parameters: L-infinity (Chebyshev) regression.
96        let (mut intercept, mut slope) = fit_linf(&nn_values, &nn_indices);
97
98        // Compute residuals in one unified loop; also track ranges for fallback decision.
99        let mut residuals: Vec<i64> = Vec::with_capacity(len);
100        let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
101        let vals = arrow_array.values();
102        let nulls_opt = arrow_array.nulls();
103
104        // Original value range
105        let mut orig_min_u64 = u64::MAX;
106        let mut orig_max_u64 = 0u64;
107        let mut orig_min_i64 = i64::MAX;
108        let mut orig_max_i64 = i64::MIN;
109        // Residual range
110        let mut res_min = i64::MAX;
111        let mut res_max = i64::MIN;
112
113        if is_unsigned {
114            let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
115            for i in 0..len {
116                let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
117                if valid {
118                    type U<TT> =
119                        <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
120                    let v_u: U<T> = vals[i].as_();
121                    let v_u64: u64 = v_u.as_();
122                    if v_u64 < orig_min_u64 {
123                        orig_min_u64 = v_u64;
124                    }
125                    if v_u64 > orig_max_u64 {
126                        orig_max_u64 = v_u64;
127                    }
128                    let pr = slope * (i as f64) + intercept;
129                    let p = predict_u64_saturated(pr, max_u64);
130                    let (pos, mag) = if v_u64 >= p {
131                        (true, v_u64 - p)
132                    } else {
133                        (false, p - v_u64)
134                    };
135                    let m = (mag & (i64::MAX as u64)) as i64;
136                    let r = if pos { m } else { -m };
137                    if r < res_min {
138                        res_min = r;
139                    }
140                    if r > res_max {
141                        res_max = r;
142                    }
143                    residuals.push(r);
144                } else {
145                    residuals.push(0);
146                }
147            }
148        } else {
149            let (min_i64, max_i64): (i64, i64) =
150                (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
151            for i in 0..len {
152                let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
153                if valid {
154                    let v_i64: i64 = vals[i].as_();
155                    if v_i64 < orig_min_i64 {
156                        orig_min_i64 = v_i64;
157                    }
158                    if v_i64 > orig_max_i64 {
159                        orig_max_i64 = v_i64;
160                    }
161                    let pr = slope * (i as f64) + intercept;
162                    let p = predict_i64_saturated(pr, min_i64, max_i64);
163                    let r = v_i64 - p;
164                    if r < res_min {
165                        res_min = r;
166                    }
167                    if r > res_max {
168                        res_max = r;
169                    }
170                    residuals.push(r);
171                } else {
172                    residuals.push(0);
173                }
174            }
175        }
176
177        // Fallback: ensure residual range is strictly smaller than original range
178        let res_width: u128 = (res_max as i128 - res_min as i128) as u128;
179        let orig_width: u128 = if is_unsigned {
180            (orig_max_u64 as u128).saturating_sub(orig_min_u64 as u128)
181        } else {
182            (orig_max_i64 as i128 - orig_min_i64 as i128) as u128
183        };
184        if res_width >= orig_width {
185            // Rebuild residuals with zero model
186            intercept = 0.0;
187            slope = 0.0;
188            residuals.clear();
189            if is_unsigned {
190                for i in 0..len {
191                    let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
192                    if valid {
193                        type U<TT> = <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
194                        let v_u: U<T> = vals[i].as_();
195                        let v_u64: u64 = v_u.as_();
196                        let r = (v_u64 & (i64::MAX as u64)) as i64;
197                        residuals.push(r);
198                    } else {
199                        residuals.push(0);
200                    }
201                }
202            } else {
203                for i in 0..len {
204                    let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
205                    if valid {
206                        let v_i64: i64 = vals[i].as_();
207                        residuals.push(v_i64);
208                    } else {
209                        residuals.push(0);
210                    }
211                }
212            }
213        }
214        let residuals_buf: ScalarBuffer<i64> = ScalarBuffer::from(residuals);
215        let nulls = arrow_array.nulls().cloned();
216        let res_prim = PrimitiveArray::<Int64Type>::new(residuals_buf, nulls);
217        let residuals = LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res_prim);
218
219        Self {
220            residuals,
221            intercept,
222            slope,
223            _phantom: PhantomData,
224        }
225    }
226
227    fn len(&self) -> usize {
228        self.residuals.len()
229    }
230
231    fn residual_starting_loc() -> usize {
232        // Header + intercept(native) + slope(f64), aligned to 8 bytes boundary
233        let header_size =
234            LiquidIPCHeader::size() + std::mem::size_of::<f64>() + std::mem::size_of::<f64>();
235        (header_size + 7) & !7
236    }
237
238    fn to_bytes_inner(&self) -> Vec<u8> {
239        let header = LiquidIPCHeader::new(
240            LiquidDataType::LinearInteger as u16,
241            get_physical_type_id::<T>(),
242        );
243        let start = Self::residual_starting_loc();
244        let mut out = Vec::with_capacity(start + 256);
245
246        // Header
247        out.extend_from_slice(&header.to_bytes());
248        // Model params (f64 intercept, then f64 slope)
249        out.extend_from_slice(&self.intercept.to_le_bytes());
250        out.extend_from_slice(&self.slope.to_le_bytes());
251
252        while out.len() < start {
253            out.push(0);
254        }
255
256        // Encode residuals (already LiquidPrimitiveArray<Int64Type>)
257        out.extend_from_slice(&self.residuals.to_bytes_inner());
258        out
259    }
260
261    /// Decode a `LiquidLinearArray<T>` from bytes.
262    pub fn from_bytes(bytes: Bytes) -> Self {
263        let _hdr = LiquidIPCHeader::from_bytes(&bytes);
264
265        // Read intercept as f64
266        let intercept_off = LiquidIPCHeader::size();
267        let intercept =
268            f64::from_le_bytes(bytes[intercept_off..intercept_off + 8].try_into().unwrap());
269
270        // Read slope
271        let slope_off = intercept_off + std::mem::size_of::<f64>();
272        let slope = f64::from_le_bytes(bytes[slope_off..slope_off + 8].try_into().unwrap());
273
274        // Decode residuals
275        let start = Self::residual_starting_loc();
276        let res_bytes = bytes.slice(start..);
277        let residuals = LiquidPrimitiveArray::<Int64Type>::from_bytes(res_bytes);
278
279        Self {
280            residuals,
281            intercept,
282            slope,
283            _phantom: PhantomData,
284        }
285    }
286}
287
288impl<T> LiquidArray for LiquidLinearArray<T>
289where
290    T: LiquidPrimitiveType,
291    T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
292{
293    fn as_any(&self) -> &dyn Any {
294        self
295    }
296
297    fn original_arrow_data_type(&self) -> DataType {
298        T::DATA_TYPE.clone()
299    }
300
301    fn get_array_memory_size(&self) -> usize {
302        self.residuals.get_array_memory_size()
303            + std::mem::size_of::<f64>() // intercept
304            + std::mem::size_of::<f64>() // slope
305    }
306
307    fn len(&self) -> usize {
308        self.len()
309    }
310
311    fn to_arrow_array(&self) -> ArrayRef {
312        let arr = self.residuals.to_arrow_array();
313        let (_dt, residuals, nulls) = arr.as_primitive::<Int64Type>().clone().into_parts();
314
315        // Reconstruct final values: predicted(i) +/- |residual_i|
316        let mut final_values = Vec::<T::Native>::with_capacity(self.len());
317        let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
318        if is_unsigned {
319            let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
320            for (i, &e) in residuals.iter().enumerate() {
321                let pr = self.slope * (i as f64) + self.intercept;
322                let p = predict_u64_saturated(pr, max_u64);
323                let mag = e.unsigned_abs();
324                let sum = if e >= 0 {
325                    p.saturating_add(mag)
326                } else {
327                    p.saturating_sub(mag)
328                };
329                final_values.push(T::Native::from_u64(sum).unwrap());
330            }
331        } else {
332            let (min_i64, max_i64): (i64, i64) =
333                (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
334            for (i, &e) in residuals.iter().enumerate() {
335                let pr = self.slope * (i as f64) + self.intercept;
336                let p = predict_i64_saturated(pr, min_i64, max_i64);
337                let sum = p.saturating_add(e);
338                final_values.push(T::Native::from_i64(sum).unwrap());
339            }
340        }
341
342        let values_buf: ScalarBuffer<T::Native> = ScalarBuffer::from(final_values);
343        Arc::new(PrimitiveArray::<T>::new(values_buf, nulls))
344    }
345
346    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
347        let arr = self.to_arrow_array();
348        let selection = BooleanArray::new(selection.clone(), None);
349        filter::filter(&arr, &selection).unwrap()
350    }
351
352    fn try_eval_predicate(
353        &self,
354        _predicate: &Arc<dyn datafusion::physical_plan::PhysicalExpr>,
355        _filter: &BooleanBuffer,
356    ) -> Option<BooleanArray> {
357        // No special predicate pushdown here.
358        None
359    }
360
361    fn to_bytes(&self) -> Vec<u8> {
362        self.to_bytes_inner()
363    }
364
365    fn data_type(&self) -> LiquidDataType {
366        LiquidDataType::LinearInteger
367    }
368}
369
370#[inline]
371fn predict_u64_saturated(pred: f64, max_u64: u64) -> u64 {
372    if !pred.is_finite() || pred <= 0.0 {
373        0
374    } else if pred >= max_u64 as f64 {
375        max_u64
376    } else {
377        pred.round() as u64
378    }
379}
380
381#[inline]
382fn predict_i64_saturated(pred: f64, min_i64: i64, max_i64: i64) -> i64 {
383    if !pred.is_finite() {
384        0
385    } else if pred <= min_i64 as f64 {
386        min_i64
387    } else if pred >= max_i64 as f64 {
388        max_i64
389    } else {
390        pred.round() as i64
391    }
392}
393
394/// L-infinity linear fit for y\[i\] ≈ intercept + slope * i (before rounding),
395/// minimizing the maximum absolute error over all non-null points.
396///
397/// Approach: minimize R(m) = max_i (y_i - m i) - min_i (y_i - m i), which is
398/// convex in m. Given m, the best intercept is b = (max_i s_i + min_i s_i)/2
399/// where s_i = y_i - m i. We find m by a few rounds of bisection using the
400/// subgradient sign derived from the argmax/argmin indices. O(n) per iteration.
401fn fit_linf(values: &[f64], idxs: &[u32]) -> (f64, f64) {
402    let n = values.len();
403    assert_eq!(values.len(), idxs.len());
404    if n == 0 {
405        return (0.0, 0.0);
406    }
407    if n == 1 {
408        return (values[0], 0.0);
409    }
410
411    let mut slope_min = f64::INFINITY;
412    let mut slope_max = f64::NEG_INFINITY;
413    for k in 1..n {
414        let di = (idxs[k] - idxs[k - 1]) as f64;
415        if di > 0.0 {
416            let dv = values[k] - values[k - 1];
417            let s = dv / di;
418            if s < slope_min {
419                slope_min = s;
420            }
421            if s > slope_max {
422                slope_max = s;
423            }
424        }
425    }
426    if !slope_min.is_finite() || !slope_max.is_finite() {
427        slope_min = 0.0;
428        slope_max = 0.0;
429    }
430
431    let mut lo = slope_min.min(slope_max);
432    let mut hi = slope_min.max(slope_max);
433    if (hi - lo).abs() < 1e-12 {
434        let pad = if hi.abs() < 1.0 { 1.0 } else { hi.abs() * 1e-6 };
435        lo -= pad;
436        hi += pad;
437    }
438
439    #[inline]
440    fn range_stats(values: &[f64], idxs: &[u32], m: f64) -> (f64, u32, f64, u32) {
441        let mut min_s = f64::INFINITY;
442        let mut max_s = f64::NEG_INFINITY;
443        let mut i_min = 0u32;
444        let mut i_max = 0u32;
445        for k in 0..values.len() {
446            let i = idxs[k] as f64;
447            let s = values[k] - m * i;
448            if s < min_s {
449                min_s = s;
450                i_min = idxs[k];
451            }
452            if s > max_s {
453                max_s = s;
454                i_max = idxs[k];
455            }
456        }
457        (min_s, i_min, max_s, i_max)
458    }
459
460    const MAX_ITERS: usize = 8;
461    for _ in 0..MAX_ITERS {
462        let m = 0.5 * (lo + hi);
463        let (_min_s, i_min, _max_s, i_max) = range_stats(values, idxs, m);
464        let g = (i_min as i64) - (i_max as i64);
465        if g > 0 {
466            hi = m;
467        } else if g < 0 {
468            lo = m;
469        } else {
470            lo = m;
471            hi = m;
472            break;
473        }
474        if (hi - lo).abs() < 1e-12 {
475            break;
476        }
477    }
478
479    let m = 0.5 * (lo + hi);
480    let (min_s, _i_min, max_s, _i_max) = range_stats(values, idxs, m);
481    let b = 0.5 * (max_s + min_s);
482    (b, m)
483}
484
485#[inline]
486fn collect_non_null_f64_and_indices<T>(arr: &PrimitiveArray<T>) -> (Vec<f64>, Vec<u32>)
487where
488    T: LiquidPrimitiveType,
489    T::Native: AsPrimitive<f64>,
490{
491    let nn = arr.len() - arr.null_count();
492    let mut values = Vec::with_capacity(nn);
493    let mut idxs = Vec::with_capacity(nn);
494    let vals = arr.values();
495    if arr.null_count() == 0 {
496        for (i, v) in vals.iter().enumerate() {
497            values.push(v.as_());
498            idxs.push(i as u32);
499        }
500    } else {
501        let nulls = arr.nulls().unwrap();
502        for (i, v) in vals.iter().enumerate() {
503            if nulls.is_valid(i) {
504                values.push(v.as_());
505                idxs.push(i as u32);
506            }
507        }
508    }
509    (values, idxs)
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515
516    fn roundtrip_eq(values: Vec<Option<i32>>) {
517        let arr = PrimitiveArray::<Int32Type>::from(values.clone());
518        let linear = LiquidLinearI32Array::from_arrow_array(arr.clone());
519        let decoded = linear.to_arrow_array();
520        assert_eq!(decoded.as_ref(), &arr);
521
522        let bytes = Bytes::from(linear.to_bytes());
523        let decoded = LiquidLinearI32Array::from_bytes(bytes);
524        let round = decoded.to_arrow_array();
525        assert_eq!(round.as_ref(), &arr);
526    }
527
528    macro_rules! roundtrip_eq_t {
529        ($T:ty, $values:expr) => {{
530            let arr = PrimitiveArray::<$T>::from(($values).clone());
531            let linear = LiquidLinearArray::<$T>::from_arrow_array(arr.clone());
532            let decoded = linear.to_arrow_array();
533            assert_eq!(decoded.as_ref(), &arr);
534
535            let bytes = Bytes::from(linear.to_bytes());
536            let decoded = LiquidLinearArray::<$T>::from_bytes(bytes);
537            let round = decoded.to_arrow_array();
538            assert_eq!(round.as_ref(), &arr);
539        }};
540    }
541
542    #[test]
543    fn test_roundtrip_basic() {
544        // Non-monotonic values to ensure we don't rely on simple increasing sequences
545        roundtrip_eq(vec![
546            Some(10),
547            Some(15),
548            Some(14),
549            Some(20),
550            Some(18),
551            Some(25),
552            Some(24),
553        ]);
554    }
555
556    #[test]
557    fn test_roundtrip_with_nulls() {
558        roundtrip_eq(vec![Some(10), None, Some(30), None, Some(50), Some(70)]);
559    }
560
561    #[test]
562    fn test_all_nulls() {
563        roundtrip_eq(vec![None, None, None, None]);
564    }
565
566    #[test]
567    fn test_single_value() {
568        roundtrip_eq(vec![Some(42)]);
569    }
570
571    #[test]
572    fn test_empty() {
573        roundtrip_eq(vec![]);
574    }
575
576    #[test]
577    fn test_negative_values() {
578        roundtrip_eq(vec![
579            Some(-100),
580            Some(-50),
581            Some(0),
582            Some(50),
583            Some(25),
584            None,
585            Some(-25),
586        ]);
587    }
588
589    #[test]
590    fn test_filter_basic() {
591        let original: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3), None, Some(5), Some(8)];
592        let arr = PrimitiveArray::<Int32Type>::from(original.clone());
593        let linear = LiquidLinearI32Array::from_arrow_array(arr);
594        let selection = BooleanBuffer::from(vec![true, false, true, false, true, false]);
595        let result = linear.filter(&selection);
596        let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
597        assert_eq!(result.as_ref(), &expected);
598    }
599
600    #[test]
601    fn test_original_arrow_data_type_returns_int32() {
602        let arr = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
603        let linear = LiquidLinearI32Array::from_arrow_array(arr);
604        assert_eq!(linear.original_arrow_data_type(), DataType::Int32);
605    }
606
607    #[test]
608    fn test_roundtrip_i8() {
609        roundtrip_eq_t!(Int8Type, vec![Some(-10), Some(0), Some(10), None, Some(20)]);
610    }
611
612    #[test]
613    fn test_roundtrip_i16() {
614        roundtrip_eq_t!(
615            Int16Type,
616            vec![Some(-1000), Some(0), Some(1000), None, Some(2000)]
617        );
618    }
619
620    #[test]
621    fn test_roundtrip_i64() {
622        roundtrip_eq_t!(
623            Int64Type,
624            vec![
625                Some(-10_000_000_000),
626                Some(0),
627                Some(10_000_000_000),
628                None,
629                Some(20_000_000_000),
630            ]
631        );
632    }
633
634    #[test]
635    fn test_roundtrip_u8() {
636        roundtrip_eq_t!(
637            UInt8Type,
638            vec![Some(0), Some(10), Some(200), None, Some(255)]
639        );
640    }
641
642    #[test]
643    fn test_roundtrip_u16() {
644        roundtrip_eq_t!(
645            UInt16Type,
646            vec![Some(0), Some(1000), Some(60000), None, Some(500)]
647        );
648    }
649
650    #[test]
651    fn test_roundtrip_u32() {
652        roundtrip_eq_t!(
653            UInt32Type,
654            vec![
655                Some(0),
656                Some(1_000_000),
657                Some(3_000_000_000),
658                None,
659                Some(123_456_789),
660            ]
661        );
662    }
663
664    #[test]
665    fn test_roundtrip_u64() {
666        roundtrip_eq_t!(
667            UInt64Type,
668            vec![
669                Some(0),
670                Some(10_000_000_000),
671                Some(9_000_000_000_000_000_000u64),
672                None,
673                Some(42),
674            ]
675        );
676    }
677
678    #[test]
679    fn test_roundtrip_date32() {
680        roundtrip_eq_t!(
681            Date32Type,
682            vec![Some(-365), Some(0), Some(365), None, Some(18262)]
683        );
684    }
685
686    #[test]
687    fn test_roundtrip_date64() {
688        roundtrip_eq_t!(
689            Date64Type,
690            vec![
691                Some(-86_400_000),
692                Some(0),
693                Some(86_400_000),
694                None,
695                Some(1_000_000_000_000),
696            ]
697        );
698    }
699
700    #[test]
701    fn test_compression() {
702        let original = (0..1_000_000).step_by(100).collect::<Vec<_>>();
703
704        let original = PrimitiveArray::<Int32Type>::from_iter_values(original);
705        let arrow_size = original.get_array_memory_size();
706
707        let liquid_linear = LiquidLinearI32Array::from_arrow_array(original.clone());
708        let liquid_linear_size = liquid_linear.get_array_memory_size();
709
710        let liquid_primitive =
711            LiquidPrimitiveArray::<Int32Type>::from_arrow_array(original.clone());
712        let liquid_primitive_size = liquid_primitive.get_array_memory_size();
713
714        println!(
715            "arrow_size: {arrow_size}, liquid_linear_size: {liquid_linear_size}, liquid_primitive_size: {liquid_primitive_size}",
716        );
717
718        assert!(liquid_linear_size < arrow_size);
719        assert!(liquid_primitive_size < arrow_size);
720        assert!(liquid_linear_size < liquid_primitive_size);
721
722        let original: ArrayRef = Arc::new(original);
723        assert_eq!(original.as_ref(), liquid_linear.to_arrow_array().as_ref());
724        assert_eq!(
725            original.as_ref(),
726            liquid_primitive.to_arrow_array().as_ref()
727        );
728    }
729}