Skip to main content

liquid_cache/liquid_array/
primitive_array.rs

1use std::any::Any;
2use std::fmt::{Debug, Display};
3use std::sync::Arc;
4
5use arrow::array::{
6    ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
7    types::{
8        Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type,
9        TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
10        TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
11    },
12};
13use arrow::buffer::{BooleanBuffer, ScalarBuffer};
14use arrow_schema::DataType;
15use datafusion::physical_plan::PhysicalExpr;
16use fastlanes::BitPacking;
17use num_traits::{AsPrimitive, FromPrimitive};
18
19use super::LiquidDataType;
20use crate::cache::CacheExpression;
21use crate::liquid_array::hybrid_primitive_array::{
22    LiquidPrimitiveClampedArray, LiquidPrimitiveQuantizedArray,
23};
24use crate::liquid_array::ipc::{LiquidIPCHeader, PhysicalTypeMarker, get_physical_type_id};
25use crate::liquid_array::raw::BitPackedArray;
26use crate::liquid_array::{
27    LiquidArray, LiquidSqueezedArrayRef, PrimitiveKind, SqueezeIoHandler, SqueezedDate32Array,
28};
29use crate::utils::get_bit_width;
30use arrow::datatypes::ArrowNativeType;
31use bytes::Bytes;
32
33/// Squeeze policy for primitive integer arrays.
34/// Users can choose whether to clamp or quantize when squeezing.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
36pub enum IntegerSqueezePolicy {
37    /// Clamp values above the squeezed range to a sentinel (recoverable for non-clamped rows).
38    Clamp = 0,
39    /// Quantize values into buckets (good for coarse filtering; requires disk to recover values).
40    #[default]
41    Quantize = 1,
42}
43
44mod private {
45    pub trait Sealed {}
46}
47
48/// LiquidPrimitiveType is a sealed trait that represents the primitive types supported by Liquid.
49/// Implemented for all supported integer, date, and timestamp Arrow primitive types.
50///
51/// I have to admit this trait is super complicated.
52/// Luckily users never have to worry about it, they can just use the types that are already implemented.
53/// We could have implemented this as a macro, but macro is ugly.
54/// Type is spec, code is proof.
55pub trait LiquidPrimitiveType:
56    ArrowPrimitiveType<
57        Native: AsPrimitive<<Self::UnSignedType as ArrowPrimitiveType>::Native>
58                    + AsPrimitive<i64>
59                    + FromPrimitive
60                    + Display,
61    > + Debug
62    + Send
63    + Sync
64    + private::Sealed
65    + PrimitiveKind
66    + PhysicalTypeMarker
67{
68    /// The unsigned type that can be used to represent the signed type.
69    type UnSignedType: ArrowPrimitiveType<Native: AsPrimitive<Self::Native> + AsPrimitive<u64> + BitPacking>
70        + Debug;
71}
72
73macro_rules! impl_has_unsigned_type {
74    ($($signed:ty => $unsigned:ty),*) => {
75        $(
76            impl private::Sealed for $signed {}
77            impl LiquidPrimitiveType for $signed {
78                type UnSignedType = $unsigned;
79            }
80        )*
81    }
82}
83
84impl_has_unsigned_type! {
85    Int32Type => UInt32Type,
86    Int64Type => UInt64Type,
87    Int16Type => UInt16Type,
88    Int8Type => UInt8Type,
89    UInt32Type => UInt32Type,
90    UInt64Type => UInt64Type,
91    UInt16Type => UInt16Type,
92    UInt8Type => UInt8Type,
93    Date64Type => UInt64Type,
94    Date32Type => UInt32Type,
95    TimestampSecondType => UInt64Type,
96    TimestampMillisecondType => UInt64Type,
97    TimestampMicrosecondType => UInt64Type,
98    TimestampNanosecondType => UInt64Type
99}
100
101/// Liquid's unsigned 8-bit integer array.
102pub type LiquidU8Array = LiquidPrimitiveArray<UInt8Type>;
103/// Liquid's unsigned 16-bit integer array.
104pub type LiquidU16Array = LiquidPrimitiveArray<UInt16Type>;
105/// Liquid's unsigned 32-bit integer array.
106pub type LiquidU32Array = LiquidPrimitiveArray<UInt32Type>;
107/// Liquid's unsigned 64-bit integer array.
108pub type LiquidU64Array = LiquidPrimitiveArray<UInt64Type>;
109/// Liquid's signed 8-bit integer array.
110pub type LiquidI8Array = LiquidPrimitiveArray<Int8Type>;
111/// Liquid's signed 16-bit integer array.
112pub type LiquidI16Array = LiquidPrimitiveArray<Int16Type>;
113/// Liquid's signed 32-bit integer array.
114pub type LiquidI32Array = LiquidPrimitiveArray<Int32Type>;
115/// Liquid's signed 64-bit integer array.
116pub type LiquidI64Array = LiquidPrimitiveArray<Int64Type>;
117/// Liquid's 32-bit date array.
118pub type LiquidDate32Array = LiquidPrimitiveArray<Date32Type>;
119/// Liquid's 64-bit date array.
120pub type LiquidDate64Array = LiquidPrimitiveArray<Date64Type>;
121
122/// Liquid's primitive array
123#[derive(Debug)]
124pub struct LiquidPrimitiveArray<T: LiquidPrimitiveType> {
125    bit_packed: BitPackedArray<T::UnSignedType>,
126    reference_value: T::Native,
127    squeeze_policy: IntegerSqueezePolicy,
128}
129
130/// Liquid's primitive array which uses delta encoding for compression
131#[derive(Debug, Clone)]
132pub struct LiquidPrimitiveDeltaArray<T: LiquidPrimitiveType> {
133    bit_packed: BitPackedArray<T::UnSignedType>,
134    reference_value: T::Native,
135}
136
137impl<T> LiquidPrimitiveArray<T>
138where
139    T: LiquidPrimitiveType,
140{
141    /// Get the memory size of the Liquid primitive array.
142    pub fn get_array_memory_size(&self) -> usize {
143        self.bit_packed.get_array_memory_size()
144            + std::mem::size_of::<T::Native>()
145            + std::mem::size_of::<IntegerSqueezePolicy>()
146    }
147
148    /// Get the length of the Liquid primitive array.
149    pub fn len(&self) -> usize {
150        self.bit_packed.len()
151    }
152
153    /// Check if the Liquid primitive array is empty.
154    pub fn is_empty(&self) -> bool {
155        self.len() == 0
156    }
157
158    /// Create a Liquid primitive array from an Arrow primitive array.
159    pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveArray<T> {
160        let min = match arrow::compute::kernels::aggregate::min(&arrow_array) {
161            Some(v) => v,
162            None => {
163                // entire array is null
164                return Self {
165                    bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
166                    reference_value: T::Native::ZERO,
167                    squeeze_policy: IntegerSqueezePolicy::default(),
168                };
169            }
170        };
171        let max = arrow::compute::kernels::aggregate::max(&arrow_array).unwrap();
172
173        // be careful of overflow:
174        // Want: 127i8 - (-128i8) -> 255u64,
175        // but we get -1i8
176        // (-1i8) as u8 as u64 -> 255u64
177        let sub = max.sub_wrapping(min) as <T as ArrowPrimitiveType>::Native;
178        let sub: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
179            sub.as_();
180        let bit_width = get_bit_width(sub.as_());
181
182        let (_data_type, values, nulls) = arrow_array.clone().into_parts();
183        let values = if min != T::Native::ZERO {
184            ScalarBuffer::from_iter(values.iter().map(|v| {
185                let k: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
186                    v.sub_wrapping(min).as_();
187                k
188            }))
189        } else {
190            #[allow(clippy::missing_transmute_annotations)]
191            unsafe {
192                std::mem::transmute(values)
193            }
194        };
195
196        let unsigned_array =
197            PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
198
199        let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
200
201        Self {
202            bit_packed: bit_packed_array,
203            reference_value: min,
204            squeeze_policy: IntegerSqueezePolicy::default(),
205        }
206    }
207
208    /// Get the current squeeze policy for this array.
209    pub fn squeeze_policy(&self) -> IntegerSqueezePolicy {
210        self.squeeze_policy
211    }
212
213    /// Set the squeeze policy for this array.
214    pub fn set_squeeze_policy(&mut self, policy: IntegerSqueezePolicy) {
215        self.squeeze_policy = policy;
216    }
217
218    /// Set the squeeze policy, returning self for chaining.
219    pub fn with_squeeze_policy(mut self, policy: IntegerSqueezePolicy) -> Self {
220        self.squeeze_policy = policy;
221        self
222    }
223}
224
225impl<T> LiquidPrimitiveDeltaArray<T>
226where
227    T: LiquidPrimitiveType,
228{
229    /// Get the memory size of the Liquid primitive delta array.
230    pub fn get_array_memory_size(&self) -> usize {
231        self.bit_packed.get_array_memory_size() + std::mem::size_of::<T::Native>()
232    }
233
234    /// Get the length of the Liquid primitive delta array.
235    pub fn len(&self) -> usize {
236        self.bit_packed.len()
237    }
238
239    /// Check if the Liquid primitive delta array is empty.
240    pub fn is_empty(&self) -> bool {
241        self.len() == 0
242    }
243
244    /// Create a Liquid primitive delta array from an Arrow primitive array.
245    pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveDeltaArray<T> {
246        use arrow::array::Array;
247
248        let len = arrow_array.len();
249        // check if entire array is already null
250        if arrow_array.null_count() == len {
251            return Self {
252                bit_packed: BitPackedArray::new_null_array(len),
253                reference_value: T::Native::ZERO,
254            };
255        }
256
257        let (_dt, values, nulls) = arrow_array.clone().into_parts();
258        let vals: Vec<T::Native> = values.to_vec();
259
260        type UnsignedNative<TT> =
261            <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
262        let mut out: Vec<UnsignedNative<T>> = Vec::with_capacity(len);
263        let mut max_value: UnsignedNative<T> = UnsignedNative::<T>::ZERO;
264        let mut anchor: T::Native = T::Native::ZERO;
265
266        if let Some(_nb) = &nulls {
267            // Nulls present: write 0 for nulls; prev will the last prev non-null value
268            let nb = nulls.as_ref().unwrap();
269            let mut have_prev = false;
270            let mut prev: T::Native = T::Native::ZERO;
271
272            for (i, &cur) in vals.iter().enumerate() {
273                if !nb.is_valid(i) {
274                    out.push(UnsignedNative::<T>::ZERO);
275                    continue;
276                }
277                if !have_prev {
278                    anchor = cur;
279                    prev = cur;
280                    have_prev = true;
281                    out.push(UnsignedNative::<T>::ZERO);
282                    continue;
283                }
284                let delta: T::Native = cur.sub_wrapping(prev);
285                // zig zag encoding
286                let delta_i64: i64 = delta.as_();
287                let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
288                let delta_unsigned: UnsignedNative<T> =
289                    UnsignedNative::<T>::usize_as(zigzag as usize);
290                if delta_unsigned > max_value {
291                    max_value = delta_unsigned;
292                }
293                out.push(delta_unsigned);
294                prev = cur;
295            }
296        } else {
297            // No nulls: first value is anchor, remainder are deltas with their previous values
298            anchor = vals[0];
299            let mut prev: T::Native = anchor;
300            out.push(UnsignedNative::<T>::ZERO); // anchor will have a difference of 0
301            for &cur in vals.iter().skip(1) {
302                let delta: T::Native = cur.sub_wrapping(prev);
303                // zig zag encoding
304                let delta_i64: i64 = delta.as_();
305                let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
306                let delta_unsigned: UnsignedNative<T> =
307                    UnsignedNative::<T>::usize_as(zigzag as usize);
308                if delta_unsigned > max_value {
309                    max_value = delta_unsigned;
310                }
311                out.push(delta_unsigned);
312                prev = cur;
313            }
314        }
315
316        let bit_width = get_bit_width(max_value.as_());
317        let values = ScalarBuffer::from_iter(out);
318        let unsigned_array =
319            PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
320        let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
321
322        Self {
323            bit_packed: bit_packed_array,
324            reference_value: anchor,
325        }
326    }
327}
328
329impl<T> LiquidArray for LiquidPrimitiveArray<T>
330where
331    T: LiquidPrimitiveType + super::PrimitiveKind,
332{
333    fn get_array_memory_size(&self) -> usize {
334        self.get_array_memory_size()
335    }
336
337    fn len(&self) -> usize {
338        self.len()
339    }
340
341    fn original_arrow_data_type(&self) -> DataType {
342        T::DATA_TYPE.clone()
343    }
344
345    fn as_any(&self) -> &dyn Any {
346        self
347    }
348
349    #[inline]
350    fn to_arrow_array(&self) -> ArrayRef {
351        let unsigned_array = self.bit_packed.to_primitive();
352        let (_data_type, values, _nulls) = unsigned_array.into_parts();
353        let nulls = self.bit_packed.nulls();
354        let values = if self.reference_value != T::Native::ZERO {
355            let reference_v = self.reference_value.as_();
356            ScalarBuffer::from_iter(values.iter().map(|v| {
357                let k: <T as ArrowPrimitiveType>::Native = (*v).add_wrapping(reference_v).as_();
358                k
359            }))
360        } else {
361            #[allow(clippy::missing_transmute_annotations)]
362            unsafe {
363                std::mem::transmute(values)
364            }
365        };
366
367        Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
368    }
369
370    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
371        let arrow_array = self.to_arrow_array();
372        let selection = BooleanArray::new(selection.clone(), None);
373        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
374    }
375
376    fn try_eval_predicate(
377        &self,
378        _predicate: &Arc<dyn PhysicalExpr>,
379        _filter: &BooleanBuffer,
380    ) -> Option<BooleanArray> {
381        // primitive array is not supported for liquid predicate
382        None
383    }
384
385    fn to_bytes(&self) -> Vec<u8> {
386        self.to_bytes_inner()
387    }
388
389    fn data_type(&self) -> LiquidDataType {
390        LiquidDataType::Integer
391    }
392
393    fn squeeze(
394        &self,
395        io: Arc<dyn SqueezeIoHandler>,
396        expression_hint: Option<&CacheExpression>,
397    ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
398        let expression_hint = expression_hint?;
399        // Full bytes (original format) are what we store to disk
400        let full_bytes = Bytes::from(self.to_bytes_inner());
401        let disk_range = 0u64..(full_bytes.len() as u64);
402
403        if T::DATA_TYPE == DataType::Date32 {
404            // Special handle for Date32 arrays with component extraction support.
405            let field = expression_hint.as_date32_field()?;
406            let squeezed =
407                SqueezedDate32Array::from_liquid_date32(self, field).with_backing(io, disk_range);
408            return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
409        }
410        if matches!(T::DATA_TYPE, DataType::Timestamp(_, _)) {
411            let field = expression_hint.as_date32_field()?;
412            let squeezed = SqueezedDate32Array::from_liquid_timestamp(self, field)
413                .with_backing(io, disk_range);
414            return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
415        }
416
417        // Only squeeze if we have a concrete bit width and it is large enough
418        let orig_bw = self.bit_packed.bit_width()?;
419        if orig_bw.get() < 8 {
420            return None;
421        }
422
423        // New squeezed bit width is half of the original
424        let new_bw_u8 = std::num::NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
425
426        // Decode original unsigned offsets
427        let unsigned_array = self.bit_packed.to_primitive();
428        let (_dt, values, nulls) = unsigned_array.into_parts();
429
430        match self.squeeze_policy {
431            IntegerSqueezePolicy::Clamp => {
432                // Sentinel is the max representable value with new_bw bits
433                type U<TT> =
434                    <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
435                let sentinel: U<T> = U::<T>::usize_as((1usize << new_bw_u8.get()) - 1);
436
437                // Clamp values to the squeezed width; values >= sentinel become sentinel
438                let squeezed_values: ScalarBuffer<U<T>> = ScalarBuffer::from_iter(
439                    values
440                        .iter()
441                        .map(|&v| if v >= sentinel { sentinel } else { v }),
442                );
443                let squeezed_unsigned =
444                    PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
445                        squeezed_values,
446                        nulls,
447                    );
448                let squeezed_bitpacked =
449                    BitPackedArray::from_primitive(squeezed_unsigned, new_bw_u8);
450
451                let hybrid = LiquidPrimitiveClampedArray::<T> {
452                    squeezed: squeezed_bitpacked,
453                    reference_value: self.reference_value,
454                    disk_range,
455                    io: io.clone(),
456                };
457                Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
458            }
459            IntegerSqueezePolicy::Quantize => {
460                // Quantize value offsets into buckets of width W.
461                // Determine actual max offset value.
462                type U<TT> =
463                    <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
464                let max_offset: U<T> = if let Some(m) = values.iter().copied().max() {
465                    m
466                } else {
467                    U::<T>::ZERO
468                };
469
470                // Compute bucket count and width: ceil((max_offset+1)/bucket_count)
471                let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
472                let max_off_u64: u64 = num_traits::AsPrimitive::<u64>::as_(max_offset);
473                let range_size = max_off_u64.saturating_add(1);
474                let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
475
476                let quantized_values: ScalarBuffer<U<T>> =
477                    ScalarBuffer::from_iter(values.iter().map(|&v| {
478                        // v / bucket_width, clamped to last bucket
479                        let v_u64: u64 = num_traits::AsPrimitive::<u64>::as_(v);
480                        let mut idx_u64 = v_u64 / bucket_width_u64;
481                        if idx_u64 >= bucket_count_u64 {
482                            idx_u64 = bucket_count_u64 - 1;
483                        }
484                        U::<T>::usize_as(idx_u64 as usize)
485                    }));
486                let quantized_unsigned =
487                    PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
488                        quantized_values,
489                        nulls,
490                    );
491                let quantized_bitpacked =
492                    BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
493
494                let hybrid = LiquidPrimitiveQuantizedArray::<T> {
495                    quantized: quantized_bitpacked,
496                    reference_value: self.reference_value,
497                    bucket_width: bucket_width_u64,
498                    disk_range,
499                    io,
500                };
501                Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
502            }
503        }
504    }
505}
506
507impl<T> LiquidArray for LiquidPrimitiveDeltaArray<T>
508where
509    T: LiquidPrimitiveType + super::PrimitiveKind,
510{
511    fn get_array_memory_size(&self) -> usize {
512        self.get_array_memory_size()
513    }
514
515    fn len(&self) -> usize {
516        self.len()
517    }
518
519    fn original_arrow_data_type(&self) -> DataType {
520        T::DATA_TYPE.clone()
521    }
522
523    fn as_any(&self) -> &dyn Any {
524        self
525    }
526
527    #[inline]
528    fn to_arrow_array(&self) -> ArrayRef {
529        // Reconstruct original values from deltas
530        let unsigned_array = self.bit_packed.to_primitive();
531        let (_data_type, delta_values, _nulls) = unsigned_array.into_parts();
532        let nulls = self.bit_packed.nulls();
533
534        // Reconstruct original values by applying deltas
535        let mut reconstructed = Vec::with_capacity(delta_values.len());
536        let mut current_value = self.reference_value; // anchor
537
538        if let Some(nulls) = nulls {
539            let mut have_prev = false;
540            for (i, &delta_unsigned) in delta_values.iter().enumerate() {
541                if !nulls.is_valid(i) {
542                    reconstructed.push(T::Native::ZERO); // Will be masked out by nulls
543                    continue;
544                }
545                if !have_prev {
546                    // First non-null value is the anchor
547                    reconstructed.push(current_value);
548                    have_prev = true;
549                } else {
550                    // Apply delta to get next value
551                    let zigzag: u64 = delta_unsigned.as_();
552                    let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
553                    let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
554                    current_value = current_value.add_wrapping(delta);
555                    reconstructed.push(current_value);
556                }
557            }
558        } else {
559            // No nulls case
560            reconstructed.push(current_value); // First value is anchor
561            for &delta_unsigned in delta_values.iter().skip(1) {
562                let zigzag: u64 = delta_unsigned.as_();
563                let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
564                let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
565                current_value = current_value.add_wrapping(delta);
566                reconstructed.push(current_value);
567            }
568        }
569
570        let values = ScalarBuffer::from_iter(reconstructed);
571        Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
572    }
573
574    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
575        let arrow_array = self.to_arrow_array();
576        let selection = BooleanArray::new(selection.clone(), None);
577        arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
578    }
579
580    fn try_eval_predicate(
581        &self,
582        _predicate: &Arc<dyn PhysicalExpr>,
583        _filter: &BooleanBuffer,
584    ) -> Option<BooleanArray> {
585        // primitive delta array is not supported for liquid predicate
586        None
587    }
588
589    fn to_bytes(&self) -> Vec<u8> {
590        self.to_bytes_inner()
591    }
592
593    fn data_type(&self) -> LiquidDataType {
594        LiquidDataType::Integer
595    }
596
597    fn squeeze(
598        &self,
599        _io: Arc<dyn SqueezeIoHandler>,
600        _expression_hint: Option<&CacheExpression>,
601    ) -> Option<(crate::liquid_array::LiquidSqueezedArrayRef, bytes::Bytes)> {
602        // Not implemented for delta arrays
603        None
604    }
605}
606
607impl<T> LiquidPrimitiveArray<T>
608where
609    T: LiquidPrimitiveType,
610{
611    fn bit_pack_starting_loc() -> usize {
612        let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
613        (header_size + 7) & !7
614    }
615
616    /*
617    Serialized LiquidPrimitiveArray Memory Layout:
618    +--------------------------------------------------+
619    | LiquidIPCHeader (16 bytes)                       |
620    +--------------------------------------------------+
621
622    +--------------------------------------------------+
623    | reference_value (size_of::<T::Native> bytes)     |  // The reference value (e.g. minimum value)
624    +--------------------------------------------------+
625    | Padding (to 8-byte alignment)                    |  // Padding to ensure 8-byte alignment
626    +--------------------------------------------------+
627
628    +--------------------------------------------------+
629    | BitPackedArray Data                              |
630    +--------------------------------------------------+
631    | [BitPackedArray Header & Bit-Packed Values]      |  // Written by self.bit_packed.to_bytes()
632    +--------------------------------------------------+
633    */
634    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
635        // Determine type ID based on the type
636        let physical_type_id = get_physical_type_id::<T>();
637        let logical_type_id = super::LiquidDataType::Integer as u16;
638        let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
639
640        let bit_pack_starting_loc = Self::bit_pack_starting_loc();
641        let mut result = Vec::with_capacity(bit_pack_starting_loc + 256); // Pre-allocate a reasonable size
642
643        // Write header
644        result.extend_from_slice(&header.to_bytes());
645
646        // Write reference value
647        let ref_value_bytes = unsafe {
648            std::slice::from_raw_parts(
649                &self.reference_value as *const T::Native as *const u8,
650                std::mem::size_of::<T::Native>(),
651            )
652        };
653        result.extend_from_slice(ref_value_bytes);
654        while result.len() < bit_pack_starting_loc {
655            result.push(0);
656        }
657
658        // Let BitPackedArray write the rest of the data
659        self.bit_packed.to_bytes(&mut result);
660
661        result
662    }
663
664    /// Deserialize a LiquidPrimitiveArray from bytes
665    pub fn from_bytes(bytes: Bytes) -> Self {
666        let header = LiquidIPCHeader::from_bytes(&bytes);
667
668        let physical_id = header.physical_type_id;
669        assert_eq!(physical_id, get_physical_type_id::<T>());
670        let logical_id = header.logical_type_id;
671        assert_eq!(logical_id, super::LiquidDataType::Integer as u16);
672
673        // Get the reference value
674        let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
675        let reference_value =
676            unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
677
678        // Skip ahead to the BitPackedArray data
679        let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
680        let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
681
682        Self {
683            bit_packed,
684            reference_value,
685            squeeze_policy: IntegerSqueezePolicy::default(),
686        }
687    }
688}
689
690impl<T> LiquidPrimitiveDeltaArray<T>
691where
692    T: LiquidPrimitiveType,
693{
694    fn bit_pack_starting_loc() -> usize {
695        let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
696        (header_size + 7) & !7
697    }
698
699    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
700        // Determine type ID based on the type
701        let physical_type_id = get_physical_type_id::<T>();
702        let logical_type_id = 1u16; // Delta encoding type ID
703        let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
704
705        let bit_pack_starting_loc = Self::bit_pack_starting_loc();
706        let mut result = Vec::with_capacity(bit_pack_starting_loc + 256);
707
708        // Write header
709        result.extend_from_slice(&header.to_bytes());
710
711        // Write anchor value (reference_value)
712        let ref_value_bytes = unsafe {
713            std::slice::from_raw_parts(
714                &self.reference_value as *const T::Native as *const u8,
715                std::mem::size_of::<T::Native>(),
716            )
717        };
718        result.extend_from_slice(ref_value_bytes);
719        while result.len() < bit_pack_starting_loc {
720            result.push(0);
721        }
722
723        // Let BitPackedArray write the rest of the data
724        self.bit_packed.to_bytes(&mut result);
725
726        result
727    }
728
729    /// Deserialize a LiquidPrimitiveDeltaArray from bytes
730    pub fn from_bytes(bytes: Bytes) -> Self {
731        let header = LiquidIPCHeader::from_bytes(&bytes);
732
733        let physical_id = header.physical_type_id;
734        assert_eq!(physical_id, get_physical_type_id::<T>());
735        let logical_id = header.logical_type_id;
736        assert_eq!(logical_id, 1u16); // Delta encoding type ID
737
738        // Get the anchor value
739        let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
740        let reference_value =
741            unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
742
743        // Skip ahead to the BitPackedArray data
744        let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
745        let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
746
747        Self {
748            bit_packed,
749            reference_value,
750        }
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757    use arrow::array::Array;
758
759    macro_rules! test_roundtrip {
760        ($test_name:ident, $type:ty, $values:expr) => {
761            #[test]
762            fn $test_name() {
763                // Create the original array
764                let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
765                let array = PrimitiveArray::<$type>::from(original.clone());
766
767                // Convert to Liquid array and back
768                let liquid_array = LiquidPrimitiveArray::<$type>::from_arrow_array(array.clone());
769                let result_array = liquid_array.to_arrow_array();
770                let bytes_array =
771                    LiquidPrimitiveArray::<$type>::from_bytes(liquid_array.to_bytes().into());
772
773                assert_eq!(result_array.as_ref(), &array);
774                assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
775            }
776        };
777    }
778
779    // Test cases for Int8Type
780    test_roundtrip!(
781        test_int8_roundtrip_basic,
782        Int8Type,
783        vec![Some(1), Some(2), Some(3), None, Some(5)]
784    );
785    test_roundtrip!(
786        test_int8_roundtrip_negative,
787        Int8Type,
788        vec![Some(-128), Some(-64), Some(0), Some(63), Some(127)]
789    );
790
791    // Test cases for Int16Type
792    test_roundtrip!(
793        test_int16_roundtrip_basic,
794        Int16Type,
795        vec![Some(1), Some(2), Some(3), None, Some(5)]
796    );
797    test_roundtrip!(
798        test_int16_roundtrip_negative,
799        Int16Type,
800        vec![
801            Some(-32768),
802            Some(-16384),
803            Some(0),
804            Some(16383),
805            Some(32767)
806        ]
807    );
808
809    // Test cases for Int32Type
810    test_roundtrip!(
811        test_int32_roundtrip_basic,
812        Int32Type,
813        vec![Some(1), Some(2), Some(3), None, Some(5)]
814    );
815    test_roundtrip!(
816        test_int32_roundtrip_negative,
817        Int32Type,
818        vec![
819            Some(-2147483648),
820            Some(-1073741824),
821            Some(0),
822            Some(1073741823),
823            Some(2147483647)
824        ]
825    );
826
827    // Test cases for Int64Type
828    test_roundtrip!(
829        test_int64_roundtrip_basic,
830        Int64Type,
831        vec![Some(1), Some(2), Some(3), None, Some(5)]
832    );
833    test_roundtrip!(
834        test_int64_roundtrip_negative,
835        Int64Type,
836        vec![
837            Some(-9223372036854775808),
838            Some(-4611686018427387904),
839            Some(0),
840            Some(4611686018427387903),
841            Some(9223372036854775807)
842        ]
843    );
844
845    // Test cases for unsigned types
846    test_roundtrip!(
847        test_uint8_roundtrip,
848        UInt8Type,
849        vec![Some(0), Some(128), Some(255), None, Some(64)]
850    );
851    test_roundtrip!(
852        test_uint16_roundtrip,
853        UInt16Type,
854        vec![Some(0), Some(32768), Some(65535), None, Some(16384)]
855    );
856    test_roundtrip!(
857        test_uint32_roundtrip,
858        UInt32Type,
859        vec![
860            Some(0),
861            Some(2147483648),
862            Some(4294967295),
863            None,
864            Some(1073741824)
865        ]
866    );
867    test_roundtrip!(
868        test_uint64_roundtrip,
869        UInt64Type,
870        vec![
871            Some(0),
872            Some(9223372036854775808),
873            Some(18446744073709551615),
874            None,
875            Some(4611686018427387904)
876        ]
877    );
878
879    test_roundtrip!(
880        test_date32_roundtrip,
881        Date32Type,
882        vec![Some(-365), Some(0), Some(365), None, Some(18262)]
883    );
884
885    test_roundtrip!(
886        test_date64_roundtrip,
887        Date64Type,
888        vec![Some(-365), Some(0), Some(365), None, Some(18262)]
889    );
890
891    // Edge cases
892    #[test]
893    fn test_all_nulls() {
894        let original: Vec<Option<i32>> = vec![None, None, None];
895        let array = PrimitiveArray::<Int32Type>::from(original.clone());
896        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
897        let result_array = liquid_array.to_arrow_array();
898
899        assert_eq!(result_array.len(), original.len());
900        assert_eq!(result_array.null_count(), original.len());
901    }
902
903    #[test]
904    fn test_all_nulls_filter() {
905        let original: Vec<Option<i32>> = vec![None, None, None];
906        let array = PrimitiveArray::<Int32Type>::from(original.clone());
907        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
908        let result_array = liquid_array.filter(&BooleanBuffer::from(vec![true, false, true]));
909
910        assert_eq!(result_array.len(), 2);
911        assert_eq!(result_array.null_count(), 2);
912    }
913
914    #[test]
915    fn test_zero_reference_value() {
916        let original: Vec<Option<i32>> = vec![Some(0), Some(1), Some(2), None, Some(4)];
917        let array = PrimitiveArray::<Int32Type>::from(original.clone());
918        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
919        let result_array = liquid_array.to_arrow_array();
920
921        assert_eq!(liquid_array.reference_value, 0);
922        assert_eq!(result_array.as_ref(), &array);
923    }
924
925    #[test]
926    fn test_single_value() {
927        let original: Vec<Option<i32>> = vec![Some(42)];
928        let array = PrimitiveArray::<Int32Type>::from(original.clone());
929        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
930        let result_array = liquid_array.to_arrow_array();
931
932        assert_eq!(result_array.as_ref(), &array);
933    }
934
935    #[test]
936    fn test_filter_basic() {
937        // Create original array with some values
938        let original = vec![Some(1), Some(2), Some(3), None, Some(5)];
939        let array = PrimitiveArray::<Int32Type>::from(original);
940        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
941
942        // Create selection mask: keep indices 0, 2, and 4
943        let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
944
945        // Apply filter
946        let result_array = liquid_array.filter(&selection);
947
948        // Expected result after filtering
949        let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
950
951        assert_eq!(result_array.as_ref(), &expected);
952    }
953
954    #[test]
955    fn test_original_arrow_data_type_returns_int32() {
956        let array = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
957        let liquid = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
958        assert_eq!(liquid.original_arrow_data_type(), DataType::Int32);
959    }
960
961    #[test]
962    fn test_filter_all_nulls() {
963        // Create array with all nulls
964        let original = vec![None, None, None, None];
965        let array = PrimitiveArray::<Int32Type>::from(original);
966        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
967
968        // Keep first and last elements
969        let selection = BooleanBuffer::from(vec![true, false, false, true]);
970
971        let result_array = liquid_array.filter(&selection);
972
973        let expected = PrimitiveArray::<Int32Type>::from(vec![None, None]);
974
975        assert_eq!(result_array.as_ref(), &expected);
976    }
977
978    #[test]
979    fn test_filter_empty_result() {
980        let original = vec![Some(1), Some(2), Some(3)];
981        let array = PrimitiveArray::<Int32Type>::from(original);
982        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
983
984        // Filter out all elements
985        let selection = BooleanBuffer::from(vec![false, false, false]);
986
987        let result_array = liquid_array.filter(&selection);
988
989        assert_eq!(result_array.len(), 0);
990    }
991
992    #[test]
993    fn test_delta_encoding_basic_roundtrip() {
994        let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
995        let array = PrimitiveArray::<Int32Type>::from(original.clone());
996
997        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
998        let result_array = liquid_delta.to_arrow_array();
999
1000        assert_eq!(result_array.as_ref(), &array);
1001    }
1002
1003    #[test]
1004    fn test_delta_encoding_with_nulls() {
1005        let original = vec![Some(1), None, Some(4), Some(7), None, Some(12)];
1006        let array = PrimitiveArray::<Int32Type>::from(original.clone());
1007
1008        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1009        let result_array = liquid_delta.to_arrow_array();
1010
1011        assert_eq!(result_array.as_ref(), &array);
1012    }
1013
1014    #[test]
1015    fn test_delta_encoding_serialization() {
1016        let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
1017        let array = PrimitiveArray::<Int32Type>::from(original.clone());
1018
1019        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1020        let bytes = liquid_delta.to_bytes();
1021        let reconstructed = LiquidPrimitiveDeltaArray::<Int32Type>::from_bytes(bytes.into());
1022        let result_array = reconstructed.to_arrow_array();
1023
1024        assert_eq!(result_array.as_ref(), &array);
1025    }
1026
1027    #[test]
1028    fn test_memory_comparison_sequential_data() {
1029        // Sequential data: delta encoding performs better
1030        let sequential_data: Vec<Option<i32>> = (0..1000).map(Some).collect();
1031        let array = PrimitiveArray::<Int32Type>::from(sequential_data);
1032
1033        let liquid_regular = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
1034        let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array);
1035
1036        let regular_size = liquid_regular.get_array_memory_size();
1037        let delta_size = liquid_delta.get_array_memory_size();
1038
1039        println!(
1040            "Sequential data - Regular: {} bytes, Delta: {} bytes",
1041            regular_size, delta_size
1042        );
1043        assert!(
1044            delta_size <= regular_size,
1045            "Delta encoding should be more efficient for sequential data"
1046        );
1047    }
1048}