1use std::any::Any;
2use std::fmt::{Debug, Display};
3use std::sync::Arc;
4
5use arrow::array::{
6 ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
7 types::{
8 Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type,
9 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
10 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
11 },
12};
13use arrow::buffer::{BooleanBuffer, ScalarBuffer};
14use arrow_schema::DataType;
15use fastlanes::BitPacking;
16use num_traits::{AsPrimitive, FromPrimitive};
17
18use super::LiquidDataType;
19use crate::cache::{CacheExpression, LiquidExpr};
20use crate::liquid_array::hybrid_primitive_array::{
21 LiquidPrimitiveClampedArray, LiquidPrimitiveQuantizedArray,
22};
23use crate::liquid_array::ipc::{LiquidIPCHeader, PhysicalTypeMarker, get_physical_type_id};
24use crate::liquid_array::raw::BitPackedArray;
25use crate::liquid_array::{
26 LiquidArray, LiquidSqueezedArrayRef, PrimitiveKind, SqueezeIoHandler, SqueezedDate32Array,
27 eval_predicate_on_array,
28};
29use crate::utils::get_bit_width;
30use arrow::datatypes::ArrowNativeType;
31use bytes::Bytes;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
36pub enum IntegerSqueezePolicy {
37 Clamp = 0,
39 #[default]
41 Quantize = 1,
42}
43
44mod private {
45 pub trait Sealed {}
46}
47
48pub trait LiquidPrimitiveType:
56 ArrowPrimitiveType<
57 Native: AsPrimitive<<Self::UnSignedType as ArrowPrimitiveType>::Native>
58 + AsPrimitive<i64>
59 + FromPrimitive
60 + Display,
61 > + Debug
62 + Send
63 + Sync
64 + private::Sealed
65 + PrimitiveKind
66 + PhysicalTypeMarker
67{
68 type UnSignedType: ArrowPrimitiveType<Native: AsPrimitive<Self::Native> + AsPrimitive<u64> + BitPacking>
70 + Debug;
71}
72
73macro_rules! impl_has_unsigned_type {
74 ($($signed:ty => $unsigned:ty),*) => {
75 $(
76 impl private::Sealed for $signed {}
77 impl LiquidPrimitiveType for $signed {
78 type UnSignedType = $unsigned;
79 }
80 )*
81 }
82}
83
84impl_has_unsigned_type! {
85 Int32Type => UInt32Type,
86 Int64Type => UInt64Type,
87 Int16Type => UInt16Type,
88 Int8Type => UInt8Type,
89 UInt32Type => UInt32Type,
90 UInt64Type => UInt64Type,
91 UInt16Type => UInt16Type,
92 UInt8Type => UInt8Type,
93 Date64Type => UInt64Type,
94 Date32Type => UInt32Type,
95 TimestampSecondType => UInt64Type,
96 TimestampMillisecondType => UInt64Type,
97 TimestampMicrosecondType => UInt64Type,
98 TimestampNanosecondType => UInt64Type
99}
100
101pub type LiquidU8Array = LiquidPrimitiveArray<UInt8Type>;
103pub type LiquidU16Array = LiquidPrimitiveArray<UInt16Type>;
105pub type LiquidU32Array = LiquidPrimitiveArray<UInt32Type>;
107pub type LiquidU64Array = LiquidPrimitiveArray<UInt64Type>;
109pub type LiquidI8Array = LiquidPrimitiveArray<Int8Type>;
111pub type LiquidI16Array = LiquidPrimitiveArray<Int16Type>;
113pub type LiquidI32Array = LiquidPrimitiveArray<Int32Type>;
115pub type LiquidI64Array = LiquidPrimitiveArray<Int64Type>;
117pub type LiquidDate32Array = LiquidPrimitiveArray<Date32Type>;
119pub type LiquidDate64Array = LiquidPrimitiveArray<Date64Type>;
121
122#[derive(Debug)]
124pub struct LiquidPrimitiveArray<T: LiquidPrimitiveType> {
125 bit_packed: BitPackedArray<T::UnSignedType>,
126 reference_value: T::Native,
127 squeeze_policy: IntegerSqueezePolicy,
128}
129
130#[derive(Debug, Clone)]
132pub struct LiquidPrimitiveDeltaArray<T: LiquidPrimitiveType> {
133 bit_packed: BitPackedArray<T::UnSignedType>,
134 reference_value: T::Native,
135}
136
137impl<T> LiquidPrimitiveArray<T>
138where
139 T: LiquidPrimitiveType,
140{
141 pub fn get_array_memory_size(&self) -> usize {
143 self.bit_packed.get_array_memory_size()
144 + std::mem::size_of::<T::Native>()
145 + std::mem::size_of::<IntegerSqueezePolicy>()
146 }
147
148 pub fn len(&self) -> usize {
150 self.bit_packed.len()
151 }
152
153 pub fn is_empty(&self) -> bool {
155 self.len() == 0
156 }
157
158 pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveArray<T> {
160 let min = match arrow::compute::kernels::aggregate::min(&arrow_array) {
161 Some(v) => v,
162 None => {
163 return Self {
165 bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
166 reference_value: T::Native::ZERO,
167 squeeze_policy: IntegerSqueezePolicy::default(),
168 };
169 }
170 };
171 let max = arrow::compute::kernels::aggregate::max(&arrow_array).unwrap();
172
173 let sub = max.sub_wrapping(min) as <T as ArrowPrimitiveType>::Native;
178 let sub: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
179 sub.as_();
180 let bit_width = get_bit_width(sub.as_());
181
182 let (_data_type, values, nulls) = arrow_array.clone().into_parts();
183 let values = if min != T::Native::ZERO {
184 ScalarBuffer::from_iter(values.iter().map(|v| {
185 let k: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
186 v.sub_wrapping(min).as_();
187 k
188 }))
189 } else {
190 #[allow(clippy::missing_transmute_annotations)]
191 unsafe {
192 std::mem::transmute(values)
193 }
194 };
195
196 let unsigned_array =
197 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
198
199 let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
200
201 Self {
202 bit_packed: bit_packed_array,
203 reference_value: min,
204 squeeze_policy: IntegerSqueezePolicy::default(),
205 }
206 }
207
208 pub fn squeeze_policy(&self) -> IntegerSqueezePolicy {
210 self.squeeze_policy
211 }
212
213 pub fn set_squeeze_policy(&mut self, policy: IntegerSqueezePolicy) {
215 self.squeeze_policy = policy;
216 }
217
218 pub fn with_squeeze_policy(mut self, policy: IntegerSqueezePolicy) -> Self {
220 self.squeeze_policy = policy;
221 self
222 }
223}
224
225impl<T> LiquidPrimitiveDeltaArray<T>
226where
227 T: LiquidPrimitiveType,
228{
229 pub fn get_array_memory_size(&self) -> usize {
231 self.bit_packed.get_array_memory_size() + std::mem::size_of::<T::Native>()
232 }
233
234 pub fn len(&self) -> usize {
236 self.bit_packed.len()
237 }
238
239 pub fn is_empty(&self) -> bool {
241 self.len() == 0
242 }
243
244 pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveDeltaArray<T> {
246 use arrow::array::Array;
247
248 let len = arrow_array.len();
249 if arrow_array.null_count() == len {
251 return Self {
252 bit_packed: BitPackedArray::new_null_array(len),
253 reference_value: T::Native::ZERO,
254 };
255 }
256
257 let (_dt, values, nulls) = arrow_array.clone().into_parts();
258 let vals: Vec<T::Native> = values.to_vec();
259
260 type UnsignedNative<TT> =
261 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
262 let mut out: Vec<UnsignedNative<T>> = Vec::with_capacity(len);
263 let mut max_value: UnsignedNative<T> = UnsignedNative::<T>::ZERO;
264 let mut anchor: T::Native = T::Native::ZERO;
265
266 if let Some(_nb) = &nulls {
267 let nb = nulls.as_ref().unwrap();
269 let mut have_prev = false;
270 let mut prev: T::Native = T::Native::ZERO;
271
272 for (i, &cur) in vals.iter().enumerate() {
273 if !nb.is_valid(i) {
274 out.push(UnsignedNative::<T>::ZERO);
275 continue;
276 }
277 if !have_prev {
278 anchor = cur;
279 prev = cur;
280 have_prev = true;
281 out.push(UnsignedNative::<T>::ZERO);
282 continue;
283 }
284 let delta: T::Native = cur.sub_wrapping(prev);
285 let delta_i64: i64 = delta.as_();
287 let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
288 let delta_unsigned: UnsignedNative<T> =
289 UnsignedNative::<T>::usize_as(zigzag as usize);
290 if delta_unsigned > max_value {
291 max_value = delta_unsigned;
292 }
293 out.push(delta_unsigned);
294 prev = cur;
295 }
296 } else {
297 anchor = vals[0];
299 let mut prev: T::Native = anchor;
300 out.push(UnsignedNative::<T>::ZERO); for &cur in vals.iter().skip(1) {
302 let delta: T::Native = cur.sub_wrapping(prev);
303 let delta_i64: i64 = delta.as_();
305 let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
306 let delta_unsigned: UnsignedNative<T> =
307 UnsignedNative::<T>::usize_as(zigzag as usize);
308 if delta_unsigned > max_value {
309 max_value = delta_unsigned;
310 }
311 out.push(delta_unsigned);
312 prev = cur;
313 }
314 }
315
316 let bit_width = get_bit_width(max_value.as_());
317 let values = ScalarBuffer::from_iter(out);
318 let unsigned_array =
319 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
320 let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
321
322 Self {
323 bit_packed: bit_packed_array,
324 reference_value: anchor,
325 }
326 }
327}
328
329impl<T> LiquidArray for LiquidPrimitiveArray<T>
330where
331 T: LiquidPrimitiveType + super::PrimitiveKind,
332{
333 fn get_array_memory_size(&self) -> usize {
334 self.get_array_memory_size()
335 }
336
337 fn len(&self) -> usize {
338 self.len()
339 }
340
341 fn original_arrow_data_type(&self) -> DataType {
342 T::DATA_TYPE.clone()
343 }
344
345 fn as_any(&self) -> &dyn Any {
346 self
347 }
348
349 #[inline]
350 fn to_arrow_array(&self) -> ArrayRef {
351 let unsigned_array = self.bit_packed.to_primitive();
352 let (_data_type, values, _nulls) = unsigned_array.into_parts();
353 let nulls = self.bit_packed.nulls();
354 let values = if self.reference_value != T::Native::ZERO {
355 let reference_v = self.reference_value.as_();
356 ScalarBuffer::from_iter(values.iter().map(|v| {
357 let k: <T as ArrowPrimitiveType>::Native = (*v).add_wrapping(reference_v).as_();
358 k
359 }))
360 } else {
361 #[allow(clippy::missing_transmute_annotations)]
362 unsafe {
363 std::mem::transmute(values)
364 }
365 };
366
367 Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
368 }
369
370 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
371 let arrow_array = self.to_arrow_array();
372 let selection = BooleanArray::new(selection.clone(), None);
373 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
374 }
375
376 fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
377 let filtered = self.filter(filter);
378 eval_predicate_on_array(filtered, predicate)
379 }
380
381 fn to_bytes(&self) -> Vec<u8> {
382 self.to_bytes_inner()
383 }
384
385 fn data_type(&self) -> LiquidDataType {
386 LiquidDataType::Integer
387 }
388
389 fn squeeze(
390 &self,
391 io: Arc<dyn SqueezeIoHandler>,
392 expression_hint: Option<&CacheExpression>,
393 ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
394 let expression_hint = expression_hint?;
395 let full_bytes = Bytes::from(self.to_bytes_inner());
397 let disk_range = 0u64..(full_bytes.len() as u64);
398
399 if T::DATA_TYPE == DataType::Date32 {
400 let field = expression_hint.as_date32_field()?;
402 let squeezed =
403 SqueezedDate32Array::from_liquid_date32(self, field).with_backing(io, disk_range);
404 return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
405 }
406 if matches!(T::DATA_TYPE, DataType::Timestamp(_, _)) {
407 let field = expression_hint.as_date32_field()?;
408 let squeezed = SqueezedDate32Array::from_liquid_timestamp(self, field)
409 .with_backing(io, disk_range);
410 return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
411 }
412
413 let orig_bw = self.bit_packed.bit_width()?;
415 if orig_bw.get() < 8 {
416 return None;
417 }
418
419 let new_bw_u8 = std::num::NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
421
422 let unsigned_array = self.bit_packed.to_primitive();
424 let (_dt, values, nulls) = unsigned_array.into_parts();
425
426 match self.squeeze_policy {
427 IntegerSqueezePolicy::Clamp => {
428 type U<TT> =
430 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
431 let sentinel: U<T> = U::<T>::usize_as((1usize << new_bw_u8.get()) - 1);
432
433 let squeezed_values: ScalarBuffer<U<T>> = ScalarBuffer::from_iter(
435 values
436 .iter()
437 .map(|&v| if v >= sentinel { sentinel } else { v }),
438 );
439 let squeezed_unsigned =
440 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
441 squeezed_values,
442 nulls,
443 );
444 let squeezed_bitpacked =
445 BitPackedArray::from_primitive(squeezed_unsigned, new_bw_u8);
446
447 let hybrid = LiquidPrimitiveClampedArray::<T> {
448 squeezed: squeezed_bitpacked,
449 reference_value: self.reference_value,
450 disk_range,
451 io: io.clone(),
452 };
453 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
454 }
455 IntegerSqueezePolicy::Quantize => {
456 type U<TT> =
459 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
460 let max_offset: U<T> = if let Some(m) = values.iter().copied().max() {
461 m
462 } else {
463 U::<T>::ZERO
464 };
465
466 let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
468 let max_off_u64: u64 = num_traits::AsPrimitive::<u64>::as_(max_offset);
469 let range_size = max_off_u64.saturating_add(1);
470 let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
471
472 let quantized_values: ScalarBuffer<U<T>> =
473 ScalarBuffer::from_iter(values.iter().map(|&v| {
474 let v_u64: u64 = num_traits::AsPrimitive::<u64>::as_(v);
476 let mut idx_u64 = v_u64 / bucket_width_u64;
477 if idx_u64 >= bucket_count_u64 {
478 idx_u64 = bucket_count_u64 - 1;
479 }
480 U::<T>::usize_as(idx_u64 as usize)
481 }));
482 let quantized_unsigned =
483 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
484 quantized_values,
485 nulls,
486 );
487 let quantized_bitpacked =
488 BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
489
490 let hybrid = LiquidPrimitiveQuantizedArray::<T> {
491 quantized: quantized_bitpacked,
492 reference_value: self.reference_value,
493 bucket_width: bucket_width_u64,
494 disk_range,
495 io,
496 };
497 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
498 }
499 }
500 }
501}
502
503impl<T> LiquidArray for LiquidPrimitiveDeltaArray<T>
504where
505 T: LiquidPrimitiveType + super::PrimitiveKind,
506{
507 fn get_array_memory_size(&self) -> usize {
508 self.get_array_memory_size()
509 }
510
511 fn len(&self) -> usize {
512 self.len()
513 }
514
515 fn original_arrow_data_type(&self) -> DataType {
516 T::DATA_TYPE.clone()
517 }
518
519 fn as_any(&self) -> &dyn Any {
520 self
521 }
522
523 #[inline]
524 fn to_arrow_array(&self) -> ArrayRef {
525 let unsigned_array = self.bit_packed.to_primitive();
527 let (_data_type, delta_values, _nulls) = unsigned_array.into_parts();
528 let nulls = self.bit_packed.nulls();
529
530 let mut reconstructed = Vec::with_capacity(delta_values.len());
532 let mut current_value = self.reference_value; if let Some(nulls) = nulls {
535 let mut have_prev = false;
536 for (i, &delta_unsigned) in delta_values.iter().enumerate() {
537 if !nulls.is_valid(i) {
538 reconstructed.push(T::Native::ZERO); continue;
540 }
541 if !have_prev {
542 reconstructed.push(current_value);
544 have_prev = true;
545 } else {
546 let zigzag: u64 = delta_unsigned.as_();
548 let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
549 let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
550 current_value = current_value.add_wrapping(delta);
551 reconstructed.push(current_value);
552 }
553 }
554 } else {
555 reconstructed.push(current_value); for &delta_unsigned in delta_values.iter().skip(1) {
558 let zigzag: u64 = delta_unsigned.as_();
559 let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
560 let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
561 current_value = current_value.add_wrapping(delta);
562 reconstructed.push(current_value);
563 }
564 }
565
566 let values = ScalarBuffer::from_iter(reconstructed);
567 Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
568 }
569
570 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
571 let arrow_array = self.to_arrow_array();
572 let selection = BooleanArray::new(selection.clone(), None);
573 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
574 }
575
576 fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
577 let filtered = self.filter(filter);
578 eval_predicate_on_array(filtered, predicate)
579 }
580
581 fn to_bytes(&self) -> Vec<u8> {
582 self.to_bytes_inner()
583 }
584
585 fn data_type(&self) -> LiquidDataType {
586 LiquidDataType::Integer
587 }
588
589 fn squeeze(
590 &self,
591 _io: Arc<dyn SqueezeIoHandler>,
592 _expression_hint: Option<&CacheExpression>,
593 ) -> Option<(crate::liquid_array::LiquidSqueezedArrayRef, bytes::Bytes)> {
594 None
596 }
597}
598
599impl<T> LiquidPrimitiveArray<T>
600where
601 T: LiquidPrimitiveType,
602{
603 fn bit_pack_starting_loc() -> usize {
604 let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
605 (header_size + 7) & !7
606 }
607
608 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
627 let physical_type_id = get_physical_type_id::<T>();
629 let logical_type_id = super::LiquidDataType::Integer as u16;
630 let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
631
632 let bit_pack_starting_loc = Self::bit_pack_starting_loc();
633 let mut result = Vec::with_capacity(bit_pack_starting_loc + 256); result.extend_from_slice(&header.to_bytes());
637
638 let ref_value_bytes = unsafe {
640 std::slice::from_raw_parts(
641 &self.reference_value as *const T::Native as *const u8,
642 std::mem::size_of::<T::Native>(),
643 )
644 };
645 result.extend_from_slice(ref_value_bytes);
646 while result.len() < bit_pack_starting_loc {
647 result.push(0);
648 }
649
650 self.bit_packed.to_bytes(&mut result);
652
653 result
654 }
655
656 pub fn from_bytes(bytes: Bytes) -> Self {
658 let header = LiquidIPCHeader::from_bytes(&bytes);
659
660 let physical_id = header.physical_type_id;
661 assert_eq!(physical_id, get_physical_type_id::<T>());
662 let logical_id = header.logical_type_id;
663 assert_eq!(logical_id, super::LiquidDataType::Integer as u16);
664
665 let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
667 let reference_value =
668 unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
669
670 let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
672 let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
673
674 Self {
675 bit_packed,
676 reference_value,
677 squeeze_policy: IntegerSqueezePolicy::default(),
678 }
679 }
680}
681
682impl<T> LiquidPrimitiveDeltaArray<T>
683where
684 T: LiquidPrimitiveType,
685{
686 fn bit_pack_starting_loc() -> usize {
687 let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
688 (header_size + 7) & !7
689 }
690
691 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
692 let physical_type_id = get_physical_type_id::<T>();
694 let logical_type_id = 1u16; let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
696
697 let bit_pack_starting_loc = Self::bit_pack_starting_loc();
698 let mut result = Vec::with_capacity(bit_pack_starting_loc + 256);
699
700 result.extend_from_slice(&header.to_bytes());
702
703 let ref_value_bytes = unsafe {
705 std::slice::from_raw_parts(
706 &self.reference_value as *const T::Native as *const u8,
707 std::mem::size_of::<T::Native>(),
708 )
709 };
710 result.extend_from_slice(ref_value_bytes);
711 while result.len() < bit_pack_starting_loc {
712 result.push(0);
713 }
714
715 self.bit_packed.to_bytes(&mut result);
717
718 result
719 }
720
721 pub fn from_bytes(bytes: Bytes) -> Self {
723 let header = LiquidIPCHeader::from_bytes(&bytes);
724
725 let physical_id = header.physical_type_id;
726 assert_eq!(physical_id, get_physical_type_id::<T>());
727 let logical_id = header.logical_type_id;
728 assert_eq!(logical_id, 1u16); let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
732 let reference_value =
733 unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
734
735 let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
737 let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
738
739 Self {
740 bit_packed,
741 reference_value,
742 }
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749 use arrow::array::Array;
750
751 macro_rules! test_roundtrip {
752 ($test_name:ident, $type:ty, $values:expr) => {
753 #[test]
754 fn $test_name() {
755 let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
757 let array = PrimitiveArray::<$type>::from(original.clone());
758
759 let liquid_array = LiquidPrimitiveArray::<$type>::from_arrow_array(array.clone());
761 let result_array = liquid_array.to_arrow_array();
762 let bytes_array =
763 LiquidPrimitiveArray::<$type>::from_bytes(liquid_array.to_bytes().into());
764
765 assert_eq!(result_array.as_ref(), &array);
766 assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
767 }
768 };
769 }
770
771 test_roundtrip!(
773 test_int8_roundtrip_basic,
774 Int8Type,
775 vec![Some(1), Some(2), Some(3), None, Some(5)]
776 );
777 test_roundtrip!(
778 test_int8_roundtrip_negative,
779 Int8Type,
780 vec![Some(-128), Some(-64), Some(0), Some(63), Some(127)]
781 );
782
783 test_roundtrip!(
785 test_int16_roundtrip_basic,
786 Int16Type,
787 vec![Some(1), Some(2), Some(3), None, Some(5)]
788 );
789 test_roundtrip!(
790 test_int16_roundtrip_negative,
791 Int16Type,
792 vec![
793 Some(-32768),
794 Some(-16384),
795 Some(0),
796 Some(16383),
797 Some(32767)
798 ]
799 );
800
801 test_roundtrip!(
803 test_int32_roundtrip_basic,
804 Int32Type,
805 vec![Some(1), Some(2), Some(3), None, Some(5)]
806 );
807 test_roundtrip!(
808 test_int32_roundtrip_negative,
809 Int32Type,
810 vec![
811 Some(-2147483648),
812 Some(-1073741824),
813 Some(0),
814 Some(1073741823),
815 Some(2147483647)
816 ]
817 );
818
819 test_roundtrip!(
821 test_int64_roundtrip_basic,
822 Int64Type,
823 vec![Some(1), Some(2), Some(3), None, Some(5)]
824 );
825 test_roundtrip!(
826 test_int64_roundtrip_negative,
827 Int64Type,
828 vec![
829 Some(-9223372036854775808),
830 Some(-4611686018427387904),
831 Some(0),
832 Some(4611686018427387903),
833 Some(9223372036854775807)
834 ]
835 );
836
837 test_roundtrip!(
839 test_uint8_roundtrip,
840 UInt8Type,
841 vec![Some(0), Some(128), Some(255), None, Some(64)]
842 );
843 test_roundtrip!(
844 test_uint16_roundtrip,
845 UInt16Type,
846 vec![Some(0), Some(32768), Some(65535), None, Some(16384)]
847 );
848 test_roundtrip!(
849 test_uint32_roundtrip,
850 UInt32Type,
851 vec![
852 Some(0),
853 Some(2147483648),
854 Some(4294967295),
855 None,
856 Some(1073741824)
857 ]
858 );
859 test_roundtrip!(
860 test_uint64_roundtrip,
861 UInt64Type,
862 vec![
863 Some(0),
864 Some(9223372036854775808),
865 Some(18446744073709551615),
866 None,
867 Some(4611686018427387904)
868 ]
869 );
870
871 test_roundtrip!(
872 test_date32_roundtrip,
873 Date32Type,
874 vec![Some(-365), Some(0), Some(365), None, Some(18262)]
875 );
876
877 test_roundtrip!(
878 test_date64_roundtrip,
879 Date64Type,
880 vec![Some(-365), Some(0), Some(365), None, Some(18262)]
881 );
882
883 #[test]
885 fn test_all_nulls() {
886 let original: Vec<Option<i32>> = vec![None, None, None];
887 let array = PrimitiveArray::<Int32Type>::from(original.clone());
888 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
889 let result_array = liquid_array.to_arrow_array();
890
891 assert_eq!(result_array.len(), original.len());
892 assert_eq!(result_array.null_count(), original.len());
893 }
894
895 #[test]
896 fn test_all_nulls_filter() {
897 let original: Vec<Option<i32>> = vec![None, None, None];
898 let array = PrimitiveArray::<Int32Type>::from(original.clone());
899 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
900 let result_array = liquid_array.filter(&BooleanBuffer::from(vec![true, false, true]));
901
902 assert_eq!(result_array.len(), 2);
903 assert_eq!(result_array.null_count(), 2);
904 }
905
906 #[test]
907 fn test_zero_reference_value() {
908 let original: Vec<Option<i32>> = vec![Some(0), Some(1), Some(2), None, Some(4)];
909 let array = PrimitiveArray::<Int32Type>::from(original.clone());
910 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
911 let result_array = liquid_array.to_arrow_array();
912
913 assert_eq!(liquid_array.reference_value, 0);
914 assert_eq!(result_array.as_ref(), &array);
915 }
916
917 #[test]
918 fn test_single_value() {
919 let original: Vec<Option<i32>> = vec![Some(42)];
920 let array = PrimitiveArray::<Int32Type>::from(original.clone());
921 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
922 let result_array = liquid_array.to_arrow_array();
923
924 assert_eq!(result_array.as_ref(), &array);
925 }
926
927 #[test]
928 fn test_filter_basic() {
929 let original = vec![Some(1), Some(2), Some(3), None, Some(5)];
931 let array = PrimitiveArray::<Int32Type>::from(original);
932 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
933
934 let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
936
937 let result_array = liquid_array.filter(&selection);
939
940 let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
942
943 assert_eq!(result_array.as_ref(), &expected);
944 }
945
946 #[test]
947 fn test_original_arrow_data_type_returns_int32() {
948 let array = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
949 let liquid = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
950 assert_eq!(liquid.original_arrow_data_type(), DataType::Int32);
951 }
952
953 #[test]
954 fn test_filter_all_nulls() {
955 let original = vec![None, None, None, None];
957 let array = PrimitiveArray::<Int32Type>::from(original);
958 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
959
960 let selection = BooleanBuffer::from(vec![true, false, false, true]);
962
963 let result_array = liquid_array.filter(&selection);
964
965 let expected = PrimitiveArray::<Int32Type>::from(vec![None, None]);
966
967 assert_eq!(result_array.as_ref(), &expected);
968 }
969
970 #[test]
971 fn test_filter_empty_result() {
972 let original = vec![Some(1), Some(2), Some(3)];
973 let array = PrimitiveArray::<Int32Type>::from(original);
974 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
975
976 let selection = BooleanBuffer::from(vec![false, false, false]);
978
979 let result_array = liquid_array.filter(&selection);
980
981 assert_eq!(result_array.len(), 0);
982 }
983
984 #[test]
985 fn test_delta_encoding_basic_roundtrip() {
986 let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
987 let array = PrimitiveArray::<Int32Type>::from(original.clone());
988
989 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
990 let result_array = liquid_delta.to_arrow_array();
991
992 assert_eq!(result_array.as_ref(), &array);
993 }
994
995 #[test]
996 fn test_delta_encoding_with_nulls() {
997 let original = vec![Some(1), None, Some(4), Some(7), None, Some(12)];
998 let array = PrimitiveArray::<Int32Type>::from(original.clone());
999
1000 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1001 let result_array = liquid_delta.to_arrow_array();
1002
1003 assert_eq!(result_array.as_ref(), &array);
1004 }
1005
1006 #[test]
1007 fn test_delta_encoding_serialization() {
1008 let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
1009 let array = PrimitiveArray::<Int32Type>::from(original.clone());
1010
1011 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1012 let bytes = liquid_delta.to_bytes();
1013 let reconstructed = LiquidPrimitiveDeltaArray::<Int32Type>::from_bytes(bytes.into());
1014 let result_array = reconstructed.to_arrow_array();
1015
1016 assert_eq!(result_array.as_ref(), &array);
1017 }
1018
1019 #[test]
1020 fn test_memory_comparison_sequential_data() {
1021 let sequential_data: Vec<Option<i32>> = (0..1000).map(Some).collect();
1023 let array = PrimitiveArray::<Int32Type>::from(sequential_data);
1024
1025 let liquid_regular = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
1026 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array);
1027
1028 let regular_size = liquid_regular.get_array_memory_size();
1029 let delta_size = liquid_delta.get_array_memory_size();
1030
1031 println!(
1032 "Sequential data - Regular: {} bytes, Delta: {} bytes",
1033 regular_size, delta_size
1034 );
1035 assert!(
1036 delta_size <= regular_size,
1037 "Delta encoding should be more efficient for sequential data"
1038 );
1039 }
1040}