Skip to main content

liquid_cache/liquid_array/
primitive_array.rs

1use std::any::Any;
2use std::fmt::{Debug, Display};
3use std::sync::Arc;
4
5use arrow::array::{
6    ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
7    types::{
8        Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type,
9        TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
10        TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
11    },
12};
13use arrow::buffer::{BooleanBuffer, ScalarBuffer};
14use arrow_schema::DataType;
15use fastlanes::BitPacking;
16use num_traits::{AsPrimitive, FromPrimitive};
17
18use super::LiquidDataType;
19use crate::cache::{CacheExpression, LiquidExpr};
20use crate::liquid_array::hybrid_primitive_array::{
21    LiquidPrimitiveClampedArray, LiquidPrimitiveQuantizedArray,
22};
23use crate::liquid_array::ipc::{LiquidIPCHeader, PhysicalTypeMarker, get_physical_type_id};
24use crate::liquid_array::raw::BitPackedArray;
25use crate::liquid_array::{
26    LiquidArray, LiquidSqueezedArrayRef, PrimitiveKind, SqueezeIoHandler, SqueezedDate32Array,
27    eval_predicate_on_array,
28};
29use crate::utils::get_bit_width;
30use arrow::datatypes::ArrowNativeType;
31use bytes::Bytes;
32
33/// Squeeze policy for primitive integer arrays.
34/// Users can choose whether to clamp or quantize when squeezing.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
36pub enum IntegerSqueezePolicy {
37    /// Clamp values above the squeezed range to a sentinel (recoverable for non-clamped rows).
38    Clamp = 0,
39    /// Quantize values into buckets (good for coarse filtering; requires disk to recover values).
40    #[default]
41    Quantize = 1,
42}
43
44mod private {
45    pub trait Sealed {}
46}
47
48/// LiquidPrimitiveType is a sealed trait that represents the primitive types supported by Liquid.
49/// Implemented for all supported integer, date, and timestamp Arrow primitive types.
50///
51/// I have to admit this trait is super complicated.
52/// Luckily users never have to worry about it, they can just use the types that are already implemented.
53/// We could have implemented this as a macro, but macro is ugly.
54/// Type is spec, code is proof.
55pub trait LiquidPrimitiveType:
56    ArrowPrimitiveType<
57        Native: AsPrimitive<<Self::UnSignedType as ArrowPrimitiveType>::Native>
58                    + AsPrimitive<i64>
59                    + FromPrimitive
60                    + Display,
61    > + Debug
62    + Send
63    + Sync
64    + private::Sealed
65    + PrimitiveKind
66    + PhysicalTypeMarker
67{
68    /// The unsigned type that can be used to represent the signed type.
69    type UnSignedType: ArrowPrimitiveType<Native: AsPrimitive<Self::Native> + AsPrimitive<u64> + BitPacking>
70        + Debug;
71}
72
73macro_rules! impl_has_unsigned_type {
74    ($($signed:ty => $unsigned:ty),*) => {
75        $(
76            impl private::Sealed for $signed {}
77            impl LiquidPrimitiveType for $signed {
78                type UnSignedType = $unsigned;
79            }
80        )*
81    }
82}
83
84impl_has_unsigned_type! {
85    Int32Type => UInt32Type,
86    Int64Type => UInt64Type,
87    Int16Type => UInt16Type,
88    Int8Type => UInt8Type,
89    UInt32Type => UInt32Type,
90    UInt64Type => UInt64Type,
91    UInt16Type => UInt16Type,
92    UInt8Type => UInt8Type,
93    Date64Type => UInt64Type,
94    Date32Type => UInt32Type,
95    TimestampSecondType => UInt64Type,
96    TimestampMillisecondType => UInt64Type,
97    TimestampMicrosecondType => UInt64Type,
98    TimestampNanosecondType => UInt64Type
99}
100
101/// Liquid's unsigned 8-bit integer array.
102pub type LiquidU8Array = LiquidPrimitiveArray<UInt8Type>;
103/// Liquid's unsigned 16-bit integer array.
104pub type LiquidU16Array = LiquidPrimitiveArray<UInt16Type>;
105/// Liquid's unsigned 32-bit integer array.
106pub type LiquidU32Array = LiquidPrimitiveArray<UInt32Type>;
107/// Liquid's unsigned 64-bit integer array.
108pub type LiquidU64Array = LiquidPrimitiveArray<UInt64Type>;
109/// Liquid's signed 8-bit integer array.
110pub type LiquidI8Array = LiquidPrimitiveArray<Int8Type>;
111/// Liquid's signed 16-bit integer array.
112pub type LiquidI16Array = LiquidPrimitiveArray<Int16Type>;
113/// Liquid's signed 32-bit integer array.
114pub type LiquidI32Array = LiquidPrimitiveArray<Int32Type>;
115/// Liquid's signed 64-bit integer array.
116pub type LiquidI64Array = LiquidPrimitiveArray<Int64Type>;
117/// Liquid's 32-bit date array.
118pub type LiquidDate32Array = LiquidPrimitiveArray<Date32Type>;
119/// Liquid's 64-bit date array.
120pub type LiquidDate64Array = LiquidPrimitiveArray<Date64Type>;
121
122/// Liquid's primitive array
123#[derive(Debug)]
124pub struct LiquidPrimitiveArray<T: LiquidPrimitiveType> {
125    bit_packed: BitPackedArray<T::UnSignedType>,
126    reference_value: T::Native,
127    squeeze_policy: IntegerSqueezePolicy,
128}
129
130/// Liquid's primitive array which uses delta encoding for compression
131#[derive(Debug, Clone)]
132pub struct LiquidPrimitiveDeltaArray<T: LiquidPrimitiveType> {
133    bit_packed: BitPackedArray<T::UnSignedType>,
134    reference_value: T::Native,
135}
136
137impl<T> LiquidPrimitiveArray<T>
138where
139    T: LiquidPrimitiveType,
140{
141    /// Get the memory size of the Liquid primitive array.
142    pub fn get_array_memory_size(&self) -> usize {
143        self.bit_packed.get_array_memory_size()
144            + std::mem::size_of::<T::Native>()
145            + std::mem::size_of::<IntegerSqueezePolicy>()
146    }
147
148    /// Get the length of the Liquid primitive array.
149    pub fn len(&self) -> usize {
150        self.bit_packed.len()
151    }
152
153    /// Check if the Liquid primitive array is empty.
154    pub fn is_empty(&self) -> bool {
155        self.len() == 0
156    }
157
158    /// Create a Liquid primitive array from an Arrow primitive array.
159    pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveArray<T> {
160        let min = match arrow::compute::kernels::aggregate::min(&arrow_array) {
161            Some(v) => v,
162            None => {
163                // entire array is null
164                return Self {
165                    bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
166                    reference_value: T::Native::ZERO,
167                    squeeze_policy: IntegerSqueezePolicy::default(),
168                };
169            }
170        };
171        let max = arrow::compute::kernels::aggregate::max(&arrow_array).unwrap();
172
173        // be careful of overflow:
174        // Want: 127i8 - (-128i8) -> 255u64,
175        // but we get -1i8
176        // (-1i8) as u8 as u64 -> 255u64
177        let sub = max.sub_wrapping(min) as <T as ArrowPrimitiveType>::Native;
178        let sub: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
179            sub.as_();
180        let bit_width = get_bit_width(sub.as_());
181
182        let (_data_type, values, nulls) = arrow_array.clone().into_parts();
183        let values = if min != T::Native::ZERO {
184            ScalarBuffer::from_iter(values.iter().map(|v| {
185                let k: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
186                    v.sub_wrapping(min).as_();
187                k
188            }))
189        } else {
190            #[allow(clippy::missing_transmute_annotations)]
191            unsafe {
192                std::mem::transmute(values)
193            }
194        };
195
196        let unsigned_array =
197            PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
198
199        let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
200
201        Self {
202            bit_packed: bit_packed_array,
203            reference_value: min,
204            squeeze_policy: IntegerSqueezePolicy::default(),
205        }
206    }
207
208    /// Get the current squeeze policy for this array.
209    pub fn squeeze_policy(&self) -> IntegerSqueezePolicy {
210        self.squeeze_policy
211    }
212
213    /// Set the squeeze policy for this array.
214    pub fn set_squeeze_policy(&mut self, policy: IntegerSqueezePolicy) {
215        self.squeeze_policy = policy;
216    }
217
218    /// Set the squeeze policy, returning self for chaining.
219    pub fn with_squeeze_policy(mut self, policy: IntegerSqueezePolicy) -> Self {
220        self.squeeze_policy = policy;
221        self
222    }
223}
224
225impl<T> LiquidPrimitiveDeltaArray<T>
226where
227    T: LiquidPrimitiveType,
228{
229    /// Get the memory size of the Liquid primitive delta array.
230    pub fn get_array_memory_size(&self) -> usize {
231        self.bit_packed.get_array_memory_size() + std::mem::size_of::<T::Native>()
232    }
233
234    /// Get the length of the Liquid primitive delta array.
235    pub fn len(&self) -> usize {
236        self.bit_packed.len()
237    }
238
239    /// Check if the Liquid primitive delta array is empty.
240    pub fn is_empty(&self) -> bool {
241        self.len() == 0
242    }
243
244    /// Create a Liquid primitive delta array from an Arrow primitive array.
245    pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveDeltaArray<T> {
246        use arrow::array::Array;
247
248        let len = arrow_array.len();
249        // check if entire array is already null
250        if arrow_array.null_count() == len {
251            return Self {
252                bit_packed: BitPackedArray::new_null_array(len),
253                reference_value: T::Native::ZERO,
254            };
255        }
256
257        let (_dt, values, nulls) = arrow_array.clone().into_parts();
258        let vals: Vec<T::Native> = values.to_vec();
259
260        type UnsignedNative<TT> =
261            <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
262        let mut out: Vec<UnsignedNative<T>> = Vec::with_capacity(len);
263        let mut max_value: UnsignedNative<T> = UnsignedNative::<T>::ZERO;
264        let mut anchor: T::Native = T::Native::ZERO;
265
266        if let Some(_nb) = &nulls {
267            // Nulls present: write 0 for nulls; prev will the last prev non-null value
268            let nb = nulls.as_ref().unwrap();
269            let mut have_prev = false;
270            let mut prev: T::Native = T::Native::ZERO;
271
272            for (i, &cur) in vals.iter().enumerate() {
273                if !nb.is_valid(i) {
274                    out.push(UnsignedNative::<T>::ZERO);
275                    continue;
276                }
277                if !have_prev {
278                    anchor = cur;
279                    prev = cur;
280                    have_prev = true;
281                    out.push(UnsignedNative::<T>::ZERO);
282                    continue;
283                }
284                let delta: T::Native = cur.sub_wrapping(prev);
285                // zig zag encoding
286                let delta_i64: i64 = delta.as_();
287                let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
288                let delta_unsigned: UnsignedNative<T> =
289                    UnsignedNative::<T>::usize_as(zigzag as usize);
290                if delta_unsigned > max_value {
291                    max_value = delta_unsigned;
292                }
293                out.push(delta_unsigned);
294                prev = cur;
295            }
296        } else {
297            // No nulls: first value is anchor, remainder are deltas with their previous values
298            anchor = vals[0];
299            let mut prev: T::Native = anchor;
300            out.push(UnsignedNative::<T>::ZERO); // anchor will have a difference of 0
301            for &cur in vals.iter().skip(1) {
302                let delta: T::Native = cur.sub_wrapping(prev);
303                // zig zag encoding
304                let delta_i64: i64 = delta.as_();
305                let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
306                let delta_unsigned: UnsignedNative<T> =
307                    UnsignedNative::<T>::usize_as(zigzag as usize);
308                if delta_unsigned > max_value {
309                    max_value = delta_unsigned;
310                }
311                out.push(delta_unsigned);
312                prev = cur;
313            }
314        }
315
316        let bit_width = get_bit_width(max_value.as_());
317        let values = ScalarBuffer::from_iter(out);
318        let unsigned_array =
319            PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
320        let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
321
322        Self {
323            bit_packed: bit_packed_array,
324            reference_value: anchor,
325        }
326    }
327}
328
329impl<T> LiquidArray for LiquidPrimitiveArray<T>
330where
331    T: LiquidPrimitiveType + super::PrimitiveKind,
332{
333    fn get_array_memory_size(&self) -> usize {
334        self.get_array_memory_size()
335    }
336
337    fn len(&self) -> usize {
338        self.len()
339    }
340
341    fn original_arrow_data_type(&self) -> DataType {
342        T::DATA_TYPE.clone()
343    }
344
345    fn as_any(&self) -> &dyn Any {
346        self
347    }
348
349    #[inline]
350    fn to_arrow_array(&self) -> ArrayRef {
351        let unsigned_array = self.bit_packed.to_primitive();
352        let (_data_type, values, _nulls) = unsigned_array.into_parts();
353        let nulls = self.bit_packed.nulls();
354        let values = if self.reference_value != T::Native::ZERO {
355            let reference_v = self.reference_value.as_();
356            ScalarBuffer::from_iter(values.iter().map(|v| {
357                let k: <T as ArrowPrimitiveType>::Native = (*v).add_wrapping(reference_v).as_();
358                k
359            }))
360        } else {
361            #[allow(clippy::missing_transmute_annotations)]
362            unsafe {
363                std::mem::transmute(values)
364            }
365        };
366
367        Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
368    }
369
370    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
371        let arrow_array = self.to_arrow_array();
372        let selection = BooleanArray::new(selection.clone(), None);
373        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
374    }
375
376    fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
377        let filtered = self.filter(filter);
378        eval_predicate_on_array(filtered, predicate)
379    }
380
381    fn to_bytes(&self) -> Vec<u8> {
382        self.to_bytes_inner()
383    }
384
385    fn data_type(&self) -> LiquidDataType {
386        LiquidDataType::Integer
387    }
388
389    fn squeeze(
390        &self,
391        io: Arc<dyn SqueezeIoHandler>,
392        expression_hint: Option<&CacheExpression>,
393    ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
394        let expression_hint = expression_hint?;
395        // Full bytes (original format) are what we store to disk
396        let full_bytes = Bytes::from(self.to_bytes_inner());
397        let disk_range = 0u64..(full_bytes.len() as u64);
398
399        if T::DATA_TYPE == DataType::Date32 {
400            // Special handle for Date32 arrays with component extraction support.
401            let field = expression_hint.as_date32_field()?;
402            let squeezed =
403                SqueezedDate32Array::from_liquid_date32(self, field).with_backing(io, disk_range);
404            return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
405        }
406        if matches!(T::DATA_TYPE, DataType::Timestamp(_, _)) {
407            let field = expression_hint.as_date32_field()?;
408            let squeezed = SqueezedDate32Array::from_liquid_timestamp(self, field)
409                .with_backing(io, disk_range);
410            return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
411        }
412
413        // Only squeeze if we have a concrete bit width and it is large enough
414        let orig_bw = self.bit_packed.bit_width()?;
415        if orig_bw.get() < 8 {
416            return None;
417        }
418
419        // New squeezed bit width is half of the original
420        let new_bw_u8 = std::num::NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
421
422        // Decode original unsigned offsets
423        let unsigned_array = self.bit_packed.to_primitive();
424        let (_dt, values, nulls) = unsigned_array.into_parts();
425
426        match self.squeeze_policy {
427            IntegerSqueezePolicy::Clamp => {
428                // Sentinel is the max representable value with new_bw bits
429                type U<TT> =
430                    <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
431                let sentinel: U<T> = U::<T>::usize_as((1usize << new_bw_u8.get()) - 1);
432
433                // Clamp values to the squeezed width; values >= sentinel become sentinel
434                let squeezed_values: ScalarBuffer<U<T>> = ScalarBuffer::from_iter(
435                    values
436                        .iter()
437                        .map(|&v| if v >= sentinel { sentinel } else { v }),
438                );
439                let squeezed_unsigned =
440                    PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
441                        squeezed_values,
442                        nulls,
443                    );
444                let squeezed_bitpacked =
445                    BitPackedArray::from_primitive(squeezed_unsigned, new_bw_u8);
446
447                let hybrid = LiquidPrimitiveClampedArray::<T> {
448                    squeezed: squeezed_bitpacked,
449                    reference_value: self.reference_value,
450                    disk_range,
451                    io: io.clone(),
452                };
453                Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
454            }
455            IntegerSqueezePolicy::Quantize => {
456                // Quantize value offsets into buckets of width W.
457                // Determine actual max offset value.
458                type U<TT> =
459                    <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
460                let max_offset: U<T> = if let Some(m) = values.iter().copied().max() {
461                    m
462                } else {
463                    U::<T>::ZERO
464                };
465
466                // Compute bucket count and width: ceil((max_offset+1)/bucket_count)
467                let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
468                let max_off_u64: u64 = num_traits::AsPrimitive::<u64>::as_(max_offset);
469                let range_size = max_off_u64.saturating_add(1);
470                let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
471
472                let quantized_values: ScalarBuffer<U<T>> =
473                    ScalarBuffer::from_iter(values.iter().map(|&v| {
474                        // v / bucket_width, clamped to last bucket
475                        let v_u64: u64 = num_traits::AsPrimitive::<u64>::as_(v);
476                        let mut idx_u64 = v_u64 / bucket_width_u64;
477                        if idx_u64 >= bucket_count_u64 {
478                            idx_u64 = bucket_count_u64 - 1;
479                        }
480                        U::<T>::usize_as(idx_u64 as usize)
481                    }));
482                let quantized_unsigned =
483                    PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
484                        quantized_values,
485                        nulls,
486                    );
487                let quantized_bitpacked =
488                    BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
489
490                let hybrid = LiquidPrimitiveQuantizedArray::<T> {
491                    quantized: quantized_bitpacked,
492                    reference_value: self.reference_value,
493                    bucket_width: bucket_width_u64,
494                    disk_range,
495                    io,
496                };
497                Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
498            }
499        }
500    }
501}
502
503impl<T> LiquidArray for LiquidPrimitiveDeltaArray<T>
504where
505    T: LiquidPrimitiveType + super::PrimitiveKind,
506{
507    fn get_array_memory_size(&self) -> usize {
508        self.get_array_memory_size()
509    }
510
511    fn len(&self) -> usize {
512        self.len()
513    }
514
515    fn original_arrow_data_type(&self) -> DataType {
516        T::DATA_TYPE.clone()
517    }
518
519    fn as_any(&self) -> &dyn Any {
520        self
521    }
522
523    #[inline]
524    fn to_arrow_array(&self) -> ArrayRef {
525        // Reconstruct original values from deltas
526        let unsigned_array = self.bit_packed.to_primitive();
527        let (_data_type, delta_values, _nulls) = unsigned_array.into_parts();
528        let nulls = self.bit_packed.nulls();
529
530        // Reconstruct original values by applying deltas
531        let mut reconstructed = Vec::with_capacity(delta_values.len());
532        let mut current_value = self.reference_value; // anchor
533
534        if let Some(nulls) = nulls {
535            let mut have_prev = false;
536            for (i, &delta_unsigned) in delta_values.iter().enumerate() {
537                if !nulls.is_valid(i) {
538                    reconstructed.push(T::Native::ZERO); // Will be masked out by nulls
539                    continue;
540                }
541                if !have_prev {
542                    // First non-null value is the anchor
543                    reconstructed.push(current_value);
544                    have_prev = true;
545                } else {
546                    // Apply delta to get next value
547                    let zigzag: u64 = delta_unsigned.as_();
548                    let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
549                    let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
550                    current_value = current_value.add_wrapping(delta);
551                    reconstructed.push(current_value);
552                }
553            }
554        } else {
555            // No nulls case
556            reconstructed.push(current_value); // First value is anchor
557            for &delta_unsigned in delta_values.iter().skip(1) {
558                let zigzag: u64 = delta_unsigned.as_();
559                let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
560                let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
561                current_value = current_value.add_wrapping(delta);
562                reconstructed.push(current_value);
563            }
564        }
565
566        let values = ScalarBuffer::from_iter(reconstructed);
567        Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
568    }
569
570    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
571        let arrow_array = self.to_arrow_array();
572        let selection = BooleanArray::new(selection.clone(), None);
573        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
574    }
575
576    fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
577        let filtered = self.filter(filter);
578        eval_predicate_on_array(filtered, predicate)
579    }
580
581    fn to_bytes(&self) -> Vec<u8> {
582        self.to_bytes_inner()
583    }
584
585    fn data_type(&self) -> LiquidDataType {
586        LiquidDataType::Integer
587    }
588
589    fn squeeze(
590        &self,
591        _io: Arc<dyn SqueezeIoHandler>,
592        _expression_hint: Option<&CacheExpression>,
593    ) -> Option<(crate::liquid_array::LiquidSqueezedArrayRef, bytes::Bytes)> {
594        // Not implemented for delta arrays
595        None
596    }
597}
598
599impl<T> LiquidPrimitiveArray<T>
600where
601    T: LiquidPrimitiveType,
602{
603    fn bit_pack_starting_loc() -> usize {
604        let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
605        (header_size + 7) & !7
606    }
607
608    /*
609    Serialized LiquidPrimitiveArray Memory Layout:
610    +--------------------------------------------------+
611    | LiquidIPCHeader (16 bytes)                       |
612    +--------------------------------------------------+
613
614    +--------------------------------------------------+
615    | reference_value (size_of::<T::Native> bytes)     |  // The reference value (e.g. minimum value)
616    +--------------------------------------------------+
617    | Padding (to 8-byte alignment)                    |  // Padding to ensure 8-byte alignment
618    +--------------------------------------------------+
619
620    +--------------------------------------------------+
621    | BitPackedArray Data                              |
622    +--------------------------------------------------+
623    | [BitPackedArray Header & Bit-Packed Values]      |  // Written by self.bit_packed.to_bytes()
624    +--------------------------------------------------+
625    */
626    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
627        // Determine type ID based on the type
628        let physical_type_id = get_physical_type_id::<T>();
629        let logical_type_id = super::LiquidDataType::Integer as u16;
630        let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
631
632        let bit_pack_starting_loc = Self::bit_pack_starting_loc();
633        let mut result = Vec::with_capacity(bit_pack_starting_loc + 256); // Pre-allocate a reasonable size
634
635        // Write header
636        result.extend_from_slice(&header.to_bytes());
637
638        // Write reference value
639        let ref_value_bytes = unsafe {
640            std::slice::from_raw_parts(
641                &self.reference_value as *const T::Native as *const u8,
642                std::mem::size_of::<T::Native>(),
643            )
644        };
645        result.extend_from_slice(ref_value_bytes);
646        while result.len() < bit_pack_starting_loc {
647            result.push(0);
648        }
649
650        // Let BitPackedArray write the rest of the data
651        self.bit_packed.to_bytes(&mut result);
652
653        result
654    }
655
656    /// Deserialize a LiquidPrimitiveArray from bytes
657    pub fn from_bytes(bytes: Bytes) -> Self {
658        let header = LiquidIPCHeader::from_bytes(&bytes);
659
660        let physical_id = header.physical_type_id;
661        assert_eq!(physical_id, get_physical_type_id::<T>());
662        let logical_id = header.logical_type_id;
663        assert_eq!(logical_id, super::LiquidDataType::Integer as u16);
664
665        // Get the reference value
666        let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
667        let reference_value =
668            unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
669
670        // Skip ahead to the BitPackedArray data
671        let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
672        let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
673
674        Self {
675            bit_packed,
676            reference_value,
677            squeeze_policy: IntegerSqueezePolicy::default(),
678        }
679    }
680}
681
682impl<T> LiquidPrimitiveDeltaArray<T>
683where
684    T: LiquidPrimitiveType,
685{
686    fn bit_pack_starting_loc() -> usize {
687        let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
688        (header_size + 7) & !7
689    }
690
691    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
692        // Determine type ID based on the type
693        let physical_type_id = get_physical_type_id::<T>();
694        let logical_type_id = 1u16; // Delta encoding type ID
695        let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
696
697        let bit_pack_starting_loc = Self::bit_pack_starting_loc();
698        let mut result = Vec::with_capacity(bit_pack_starting_loc + 256);
699
700        // Write header
701        result.extend_from_slice(&header.to_bytes());
702
703        // Write anchor value (reference_value)
704        let ref_value_bytes = unsafe {
705            std::slice::from_raw_parts(
706                &self.reference_value as *const T::Native as *const u8,
707                std::mem::size_of::<T::Native>(),
708            )
709        };
710        result.extend_from_slice(ref_value_bytes);
711        while result.len() < bit_pack_starting_loc {
712            result.push(0);
713        }
714
715        // Let BitPackedArray write the rest of the data
716        self.bit_packed.to_bytes(&mut result);
717
718        result
719    }
720
721    /// Deserialize a LiquidPrimitiveDeltaArray from bytes
722    pub fn from_bytes(bytes: Bytes) -> Self {
723        let header = LiquidIPCHeader::from_bytes(&bytes);
724
725        let physical_id = header.physical_type_id;
726        assert_eq!(physical_id, get_physical_type_id::<T>());
727        let logical_id = header.logical_type_id;
728        assert_eq!(logical_id, 1u16); // Delta encoding type ID
729
730        // Get the anchor value
731        let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
732        let reference_value =
733            unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
734
735        // Skip ahead to the BitPackedArray data
736        let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
737        let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
738
739        Self {
740            bit_packed,
741            reference_value,
742        }
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749    use arrow::array::Array;
750
751    macro_rules! test_roundtrip {
752        ($test_name:ident, $type:ty, $values:expr) => {
753            #[test]
754            fn $test_name() {
755                // Create the original array
756                let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
757                let array = PrimitiveArray::<$type>::from(original.clone());
758
759                // Convert to Liquid array and back
760                let liquid_array = LiquidPrimitiveArray::<$type>::from_arrow_array(array.clone());
761                let result_array = liquid_array.to_arrow_array();
762                let bytes_array =
763                    LiquidPrimitiveArray::<$type>::from_bytes(liquid_array.to_bytes().into());
764
765                assert_eq!(result_array.as_ref(), &array);
766                assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
767            }
768        };
769    }
770
771    // Test cases for Int8Type
772    test_roundtrip!(
773        test_int8_roundtrip_basic,
774        Int8Type,
775        vec![Some(1), Some(2), Some(3), None, Some(5)]
776    );
777    test_roundtrip!(
778        test_int8_roundtrip_negative,
779        Int8Type,
780        vec![Some(-128), Some(-64), Some(0), Some(63), Some(127)]
781    );
782
783    // Test cases for Int16Type
784    test_roundtrip!(
785        test_int16_roundtrip_basic,
786        Int16Type,
787        vec![Some(1), Some(2), Some(3), None, Some(5)]
788    );
789    test_roundtrip!(
790        test_int16_roundtrip_negative,
791        Int16Type,
792        vec![
793            Some(-32768),
794            Some(-16384),
795            Some(0),
796            Some(16383),
797            Some(32767)
798        ]
799    );
800
801    // Test cases for Int32Type
802    test_roundtrip!(
803        test_int32_roundtrip_basic,
804        Int32Type,
805        vec![Some(1), Some(2), Some(3), None, Some(5)]
806    );
807    test_roundtrip!(
808        test_int32_roundtrip_negative,
809        Int32Type,
810        vec![
811            Some(-2147483648),
812            Some(-1073741824),
813            Some(0),
814            Some(1073741823),
815            Some(2147483647)
816        ]
817    );
818
819    // Test cases for Int64Type
820    test_roundtrip!(
821        test_int64_roundtrip_basic,
822        Int64Type,
823        vec![Some(1), Some(2), Some(3), None, Some(5)]
824    );
825    test_roundtrip!(
826        test_int64_roundtrip_negative,
827        Int64Type,
828        vec![
829            Some(-9223372036854775808),
830            Some(-4611686018427387904),
831            Some(0),
832            Some(4611686018427387903),
833            Some(9223372036854775807)
834        ]
835    );
836
837    // Test cases for unsigned types
838    test_roundtrip!(
839        test_uint8_roundtrip,
840        UInt8Type,
841        vec![Some(0), Some(128), Some(255), None, Some(64)]
842    );
843    test_roundtrip!(
844        test_uint16_roundtrip,
845        UInt16Type,
846        vec![Some(0), Some(32768), Some(65535), None, Some(16384)]
847    );
848    test_roundtrip!(
849        test_uint32_roundtrip,
850        UInt32Type,
851        vec![
852            Some(0),
853            Some(2147483648),
854            Some(4294967295),
855            None,
856            Some(1073741824)
857        ]
858    );
859    test_roundtrip!(
860        test_uint64_roundtrip,
861        UInt64Type,
862        vec![
863            Some(0),
864            Some(9223372036854775808),
865            Some(18446744073709551615),
866            None,
867            Some(4611686018427387904)
868        ]
869    );
870
871    test_roundtrip!(
872        test_date32_roundtrip,
873        Date32Type,
874        vec![Some(-365), Some(0), Some(365), None, Some(18262)]
875    );
876
877    test_roundtrip!(
878        test_date64_roundtrip,
879        Date64Type,
880        vec![Some(-365), Some(0), Some(365), None, Some(18262)]
881    );
882
883    // Edge cases
884    #[test]
885    fn test_all_nulls() {
886        let original: Vec<Option<i32>> = vec![None, None, None];
887        let array = PrimitiveArray::<Int32Type>::from(original.clone());
888        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
889        let result_array = liquid_array.to_arrow_array();
890
891        assert_eq!(result_array.len(), original.len());
892        assert_eq!(result_array.null_count(), original.len());
893    }
894
895    #[test]
896    fn test_all_nulls_filter() {
897        let original: Vec<Option<i32>> = vec![None, None, None];
898        let array = PrimitiveArray::<Int32Type>::from(original.clone());
899        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
900        let result_array = liquid_array.filter(&BooleanBuffer::from(vec![true, false, true]));
901
902        assert_eq!(result_array.len(), 2);
903        assert_eq!(result_array.null_count(), 2);
904    }
905
906    #[test]
907    fn test_zero_reference_value() {
908        let original: Vec<Option<i32>> = vec![Some(0), Some(1), Some(2), None, Some(4)];
909        let array = PrimitiveArray::<Int32Type>::from(original.clone());
910        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
911        let result_array = liquid_array.to_arrow_array();
912
913        assert_eq!(liquid_array.reference_value, 0);
914        assert_eq!(result_array.as_ref(), &array);
915    }
916
917    #[test]
918    fn test_single_value() {
919        let original: Vec<Option<i32>> = vec![Some(42)];
920        let array = PrimitiveArray::<Int32Type>::from(original.clone());
921        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
922        let result_array = liquid_array.to_arrow_array();
923
924        assert_eq!(result_array.as_ref(), &array);
925    }
926
927    #[test]
928    fn test_filter_basic() {
929        // Create original array with some values
930        let original = vec![Some(1), Some(2), Some(3), None, Some(5)];
931        let array = PrimitiveArray::<Int32Type>::from(original);
932        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
933
934        // Create selection mask: keep indices 0, 2, and 4
935        let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
936
937        // Apply filter
938        let result_array = liquid_array.filter(&selection);
939
940        // Expected result after filtering
941        let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
942
943        assert_eq!(result_array.as_ref(), &expected);
944    }
945
946    #[test]
947    fn test_original_arrow_data_type_returns_int32() {
948        let array = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
949        let liquid = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
950        assert_eq!(liquid.original_arrow_data_type(), DataType::Int32);
951    }
952
953    #[test]
954    fn test_filter_all_nulls() {
955        // Create array with all nulls
956        let original = vec![None, None, None, None];
957        let array = PrimitiveArray::<Int32Type>::from(original);
958        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
959
960        // Keep first and last elements
961        let selection = BooleanBuffer::from(vec![true, false, false, true]);
962
963        let result_array = liquid_array.filter(&selection);
964
965        let expected = PrimitiveArray::<Int32Type>::from(vec![None, None]);
966
967        assert_eq!(result_array.as_ref(), &expected);
968    }
969
970    #[test]
971    fn test_filter_empty_result() {
972        let original = vec![Some(1), Some(2), Some(3)];
973        let array = PrimitiveArray::<Int32Type>::from(original);
974        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
975
976        // Filter out all elements
977        let selection = BooleanBuffer::from(vec![false, false, false]);
978
979        let result_array = liquid_array.filter(&selection);
980
981        assert_eq!(result_array.len(), 0);
982    }
983
984    #[test]
985    fn test_delta_encoding_basic_roundtrip() {
986        let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
987        let array = PrimitiveArray::<Int32Type>::from(original.clone());
988
989        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
990        let result_array = liquid_delta.to_arrow_array();
991
992        assert_eq!(result_array.as_ref(), &array);
993    }
994
995    #[test]
996    fn test_delta_encoding_with_nulls() {
997        let original = vec![Some(1), None, Some(4), Some(7), None, Some(12)];
998        let array = PrimitiveArray::<Int32Type>::from(original.clone());
999
1000        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1001        let result_array = liquid_delta.to_arrow_array();
1002
1003        assert_eq!(result_array.as_ref(), &array);
1004    }
1005
1006    #[test]
1007    fn test_delta_encoding_serialization() {
1008        let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
1009        let array = PrimitiveArray::<Int32Type>::from(original.clone());
1010
1011        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1012        let bytes = liquid_delta.to_bytes();
1013        let reconstructed = LiquidPrimitiveDeltaArray::<Int32Type>::from_bytes(bytes.into());
1014        let result_array = reconstructed.to_arrow_array();
1015
1016        assert_eq!(result_array.as_ref(), &array);
1017    }
1018
1019    #[test]
1020    fn test_memory_comparison_sequential_data() {
1021        // Sequential data: delta encoding performs better
1022        let sequential_data: Vec<Option<i32>> = (0..1000).map(Some).collect();
1023        let array = PrimitiveArray::<Int32Type>::from(sequential_data);
1024
1025        let liquid_regular = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
1026        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array);
1027
1028        let regular_size = liquid_regular.get_array_memory_size();
1029        let delta_size = liquid_delta.get_array_memory_size();
1030
1031        println!(
1032            "Sequential data - Regular: {} bytes, Delta: {} bytes",
1033            regular_size, delta_size
1034        );
1035        assert!(
1036            delta_size <= regular_size,
1037            "Delta encoding should be more efficient for sequential data"
1038        );
1039    }
1040}