1use std::any::Any;
2use std::fmt::{Debug, Display};
3use std::sync::Arc;
4
5use arrow::array::{
6 ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
7 types::{
8 Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type,
9 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
10 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
11 },
12};
13use arrow::buffer::{BooleanBuffer, ScalarBuffer};
14use arrow_schema::DataType;
15use datafusion::physical_plan::PhysicalExpr;
16use fastlanes::BitPacking;
17use num_traits::{AsPrimitive, FromPrimitive};
18
19use super::LiquidDataType;
20use crate::cache::CacheExpression;
21use crate::liquid_array::hybrid_primitive_array::{
22 LiquidPrimitiveClampedArray, LiquidPrimitiveQuantizedArray,
23};
24use crate::liquid_array::ipc::{LiquidIPCHeader, PhysicalTypeMarker, get_physical_type_id};
25use crate::liquid_array::raw::BitPackedArray;
26use crate::liquid_array::{
27 LiquidArray, LiquidSqueezedArrayRef, PrimitiveKind, SqueezeIoHandler, SqueezedDate32Array,
28};
29use crate::utils::get_bit_width;
30use arrow::datatypes::ArrowNativeType;
31use bytes::Bytes;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
36pub enum IntegerSqueezePolicy {
37 Clamp = 0,
39 #[default]
41 Quantize = 1,
42}
43
44mod private {
45 pub trait Sealed {}
46}
47
48pub trait LiquidPrimitiveType:
56 ArrowPrimitiveType<
57 Native: AsPrimitive<<Self::UnSignedType as ArrowPrimitiveType>::Native>
58 + AsPrimitive<i64>
59 + FromPrimitive
60 + Display,
61 > + Debug
62 + Send
63 + Sync
64 + private::Sealed
65 + PrimitiveKind
66 + PhysicalTypeMarker
67{
68 type UnSignedType: ArrowPrimitiveType<Native: AsPrimitive<Self::Native> + AsPrimitive<u64> + BitPacking>
70 + Debug;
71}
72
73macro_rules! impl_has_unsigned_type {
74 ($($signed:ty => $unsigned:ty),*) => {
75 $(
76 impl private::Sealed for $signed {}
77 impl LiquidPrimitiveType for $signed {
78 type UnSignedType = $unsigned;
79 }
80 )*
81 }
82}
83
84impl_has_unsigned_type! {
85 Int32Type => UInt32Type,
86 Int64Type => UInt64Type,
87 Int16Type => UInt16Type,
88 Int8Type => UInt8Type,
89 UInt32Type => UInt32Type,
90 UInt64Type => UInt64Type,
91 UInt16Type => UInt16Type,
92 UInt8Type => UInt8Type,
93 Date64Type => UInt64Type,
94 Date32Type => UInt32Type,
95 TimestampSecondType => UInt64Type,
96 TimestampMillisecondType => UInt64Type,
97 TimestampMicrosecondType => UInt64Type,
98 TimestampNanosecondType => UInt64Type
99}
100
101pub type LiquidU8Array = LiquidPrimitiveArray<UInt8Type>;
103pub type LiquidU16Array = LiquidPrimitiveArray<UInt16Type>;
105pub type LiquidU32Array = LiquidPrimitiveArray<UInt32Type>;
107pub type LiquidU64Array = LiquidPrimitiveArray<UInt64Type>;
109pub type LiquidI8Array = LiquidPrimitiveArray<Int8Type>;
111pub type LiquidI16Array = LiquidPrimitiveArray<Int16Type>;
113pub type LiquidI32Array = LiquidPrimitiveArray<Int32Type>;
115pub type LiquidI64Array = LiquidPrimitiveArray<Int64Type>;
117pub type LiquidDate32Array = LiquidPrimitiveArray<Date32Type>;
119pub type LiquidDate64Array = LiquidPrimitiveArray<Date64Type>;
121
122#[derive(Debug)]
124pub struct LiquidPrimitiveArray<T: LiquidPrimitiveType> {
125 bit_packed: BitPackedArray<T::UnSignedType>,
126 reference_value: T::Native,
127 squeeze_policy: IntegerSqueezePolicy,
128}
129
130#[derive(Debug, Clone)]
132pub struct LiquidPrimitiveDeltaArray<T: LiquidPrimitiveType> {
133 bit_packed: BitPackedArray<T::UnSignedType>,
134 reference_value: T::Native,
135}
136
137impl<T> LiquidPrimitiveArray<T>
138where
139 T: LiquidPrimitiveType,
140{
141 pub fn get_array_memory_size(&self) -> usize {
143 self.bit_packed.get_array_memory_size()
144 + std::mem::size_of::<T::Native>()
145 + std::mem::size_of::<IntegerSqueezePolicy>()
146 }
147
148 pub fn len(&self) -> usize {
150 self.bit_packed.len()
151 }
152
153 pub fn is_empty(&self) -> bool {
155 self.len() == 0
156 }
157
158 pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveArray<T> {
160 let min = match arrow::compute::kernels::aggregate::min(&arrow_array) {
161 Some(v) => v,
162 None => {
163 return Self {
165 bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
166 reference_value: T::Native::ZERO,
167 squeeze_policy: IntegerSqueezePolicy::default(),
168 };
169 }
170 };
171 let max = arrow::compute::kernels::aggregate::max(&arrow_array).unwrap();
172
173 let sub = max.sub_wrapping(min) as <T as ArrowPrimitiveType>::Native;
178 let sub: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
179 sub.as_();
180 let bit_width = get_bit_width(sub.as_());
181
182 let (_data_type, values, nulls) = arrow_array.clone().into_parts();
183 let values = if min != T::Native::ZERO {
184 ScalarBuffer::from_iter(values.iter().map(|v| {
185 let k: <<T as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native =
186 v.sub_wrapping(min).as_();
187 k
188 }))
189 } else {
190 #[allow(clippy::missing_transmute_annotations)]
191 unsafe {
192 std::mem::transmute(values)
193 }
194 };
195
196 let unsigned_array =
197 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
198
199 let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
200
201 Self {
202 bit_packed: bit_packed_array,
203 reference_value: min,
204 squeeze_policy: IntegerSqueezePolicy::default(),
205 }
206 }
207
208 pub fn squeeze_policy(&self) -> IntegerSqueezePolicy {
210 self.squeeze_policy
211 }
212
213 pub fn set_squeeze_policy(&mut self, policy: IntegerSqueezePolicy) {
215 self.squeeze_policy = policy;
216 }
217
218 pub fn with_squeeze_policy(mut self, policy: IntegerSqueezePolicy) -> Self {
220 self.squeeze_policy = policy;
221 self
222 }
223}
224
225impl<T> LiquidPrimitiveDeltaArray<T>
226where
227 T: LiquidPrimitiveType,
228{
229 pub fn get_array_memory_size(&self) -> usize {
231 self.bit_packed.get_array_memory_size() + std::mem::size_of::<T::Native>()
232 }
233
234 pub fn len(&self) -> usize {
236 self.bit_packed.len()
237 }
238
239 pub fn is_empty(&self) -> bool {
241 self.len() == 0
242 }
243
244 pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> LiquidPrimitiveDeltaArray<T> {
246 use arrow::array::Array;
247
248 let len = arrow_array.len();
249 if arrow_array.null_count() == len {
251 return Self {
252 bit_packed: BitPackedArray::new_null_array(len),
253 reference_value: T::Native::ZERO,
254 };
255 }
256
257 let (_dt, values, nulls) = arrow_array.clone().into_parts();
258 let vals: Vec<T::Native> = values.to_vec();
259
260 type UnsignedNative<TT> =
261 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
262 let mut out: Vec<UnsignedNative<T>> = Vec::with_capacity(len);
263 let mut max_value: UnsignedNative<T> = UnsignedNative::<T>::ZERO;
264 let mut anchor: T::Native = T::Native::ZERO;
265
266 if let Some(_nb) = &nulls {
267 let nb = nulls.as_ref().unwrap();
269 let mut have_prev = false;
270 let mut prev: T::Native = T::Native::ZERO;
271
272 for (i, &cur) in vals.iter().enumerate() {
273 if !nb.is_valid(i) {
274 out.push(UnsignedNative::<T>::ZERO);
275 continue;
276 }
277 if !have_prev {
278 anchor = cur;
279 prev = cur;
280 have_prev = true;
281 out.push(UnsignedNative::<T>::ZERO);
282 continue;
283 }
284 let delta: T::Native = cur.sub_wrapping(prev);
285 let delta_i64: i64 = delta.as_();
287 let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
288 let delta_unsigned: UnsignedNative<T> =
289 UnsignedNative::<T>::usize_as(zigzag as usize);
290 if delta_unsigned > max_value {
291 max_value = delta_unsigned;
292 }
293 out.push(delta_unsigned);
294 prev = cur;
295 }
296 } else {
297 anchor = vals[0];
299 let mut prev: T::Native = anchor;
300 out.push(UnsignedNative::<T>::ZERO); for &cur in vals.iter().skip(1) {
302 let delta: T::Native = cur.sub_wrapping(prev);
303 let delta_i64: i64 = delta.as_();
305 let zigzag: u64 = ((delta_i64 << 1) ^ (delta_i64 >> 63)) as u64;
306 let delta_unsigned: UnsignedNative<T> =
307 UnsignedNative::<T>::usize_as(zigzag as usize);
308 if delta_unsigned > max_value {
309 max_value = delta_unsigned;
310 }
311 out.push(delta_unsigned);
312 prev = cur;
313 }
314 }
315
316 let bit_width = get_bit_width(max_value.as_());
317 let values = ScalarBuffer::from_iter(out);
318 let unsigned_array =
319 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(values, nulls);
320 let bit_packed_array = BitPackedArray::from_primitive(unsigned_array, bit_width);
321
322 Self {
323 bit_packed: bit_packed_array,
324 reference_value: anchor,
325 }
326 }
327}
328
329impl<T> LiquidArray for LiquidPrimitiveArray<T>
330where
331 T: LiquidPrimitiveType + super::PrimitiveKind,
332{
333 fn get_array_memory_size(&self) -> usize {
334 self.get_array_memory_size()
335 }
336
337 fn len(&self) -> usize {
338 self.len()
339 }
340
341 fn original_arrow_data_type(&self) -> DataType {
342 T::DATA_TYPE.clone()
343 }
344
345 fn as_any(&self) -> &dyn Any {
346 self
347 }
348
349 #[inline]
350 fn to_arrow_array(&self) -> ArrayRef {
351 let unsigned_array = self.bit_packed.to_primitive();
352 let (_data_type, values, _nulls) = unsigned_array.into_parts();
353 let nulls = self.bit_packed.nulls();
354 let values = if self.reference_value != T::Native::ZERO {
355 let reference_v = self.reference_value.as_();
356 ScalarBuffer::from_iter(values.iter().map(|v| {
357 let k: <T as ArrowPrimitiveType>::Native = (*v).add_wrapping(reference_v).as_();
358 k
359 }))
360 } else {
361 #[allow(clippy::missing_transmute_annotations)]
362 unsafe {
363 std::mem::transmute(values)
364 }
365 };
366
367 Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
368 }
369
370 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
371 let arrow_array = self.to_arrow_array();
372 let selection = BooleanArray::new(selection.clone(), None);
373 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
374 }
375
376 fn try_eval_predicate(
377 &self,
378 _predicate: &Arc<dyn PhysicalExpr>,
379 _filter: &BooleanBuffer,
380 ) -> Option<BooleanArray> {
381 None
383 }
384
385 fn to_bytes(&self) -> Vec<u8> {
386 self.to_bytes_inner()
387 }
388
389 fn data_type(&self) -> LiquidDataType {
390 LiquidDataType::Integer
391 }
392
393 fn squeeze(
394 &self,
395 io: Arc<dyn SqueezeIoHandler>,
396 expression_hint: Option<&CacheExpression>,
397 ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
398 let expression_hint = expression_hint?;
399 let full_bytes = Bytes::from(self.to_bytes_inner());
401 let disk_range = 0u64..(full_bytes.len() as u64);
402
403 if T::DATA_TYPE == DataType::Date32 {
404 let field = expression_hint.as_date32_field()?;
406 let squeezed =
407 SqueezedDate32Array::from_liquid_date32(self, field).with_backing(io, disk_range);
408 return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
409 }
410 if matches!(T::DATA_TYPE, DataType::Timestamp(_, _)) {
411 let field = expression_hint.as_date32_field()?;
412 let squeezed = SqueezedDate32Array::from_liquid_timestamp(self, field)
413 .with_backing(io, disk_range);
414 return Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, full_bytes));
415 }
416
417 let orig_bw = self.bit_packed.bit_width()?;
419 if orig_bw.get() < 8 {
420 return None;
421 }
422
423 let new_bw_u8 = std::num::NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
425
426 let unsigned_array = self.bit_packed.to_primitive();
428 let (_dt, values, nulls) = unsigned_array.into_parts();
429
430 match self.squeeze_policy {
431 IntegerSqueezePolicy::Clamp => {
432 type U<TT> =
434 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
435 let sentinel: U<T> = U::<T>::usize_as((1usize << new_bw_u8.get()) - 1);
436
437 let squeezed_values: ScalarBuffer<U<T>> = ScalarBuffer::from_iter(
439 values
440 .iter()
441 .map(|&v| if v >= sentinel { sentinel } else { v }),
442 );
443 let squeezed_unsigned =
444 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
445 squeezed_values,
446 nulls,
447 );
448 let squeezed_bitpacked =
449 BitPackedArray::from_primitive(squeezed_unsigned, new_bw_u8);
450
451 let hybrid = LiquidPrimitiveClampedArray::<T> {
452 squeezed: squeezed_bitpacked,
453 reference_value: self.reference_value,
454 disk_range,
455 io: io.clone(),
456 };
457 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
458 }
459 IntegerSqueezePolicy::Quantize => {
460 type U<TT> =
463 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
464 let max_offset: U<T> = if let Some(m) = values.iter().copied().max() {
465 m
466 } else {
467 U::<T>::ZERO
468 };
469
470 let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
472 let max_off_u64: u64 = num_traits::AsPrimitive::<u64>::as_(max_offset);
473 let range_size = max_off_u64.saturating_add(1);
474 let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
475
476 let quantized_values: ScalarBuffer<U<T>> =
477 ScalarBuffer::from_iter(values.iter().map(|&v| {
478 let v_u64: u64 = num_traits::AsPrimitive::<u64>::as_(v);
480 let mut idx_u64 = v_u64 / bucket_width_u64;
481 if idx_u64 >= bucket_count_u64 {
482 idx_u64 = bucket_count_u64 - 1;
483 }
484 U::<T>::usize_as(idx_u64 as usize)
485 }));
486 let quantized_unsigned =
487 PrimitiveArray::<<T as LiquidPrimitiveType>::UnSignedType>::new(
488 quantized_values,
489 nulls,
490 );
491 let quantized_bitpacked =
492 BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
493
494 let hybrid = LiquidPrimitiveQuantizedArray::<T> {
495 quantized: quantized_bitpacked,
496 reference_value: self.reference_value,
497 bucket_width: bucket_width_u64,
498 disk_range,
499 io,
500 };
501 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
502 }
503 }
504 }
505}
506
507impl<T> LiquidArray for LiquidPrimitiveDeltaArray<T>
508where
509 T: LiquidPrimitiveType + super::PrimitiveKind,
510{
511 fn get_array_memory_size(&self) -> usize {
512 self.get_array_memory_size()
513 }
514
515 fn len(&self) -> usize {
516 self.len()
517 }
518
519 fn original_arrow_data_type(&self) -> DataType {
520 T::DATA_TYPE.clone()
521 }
522
523 fn as_any(&self) -> &dyn Any {
524 self
525 }
526
527 #[inline]
528 fn to_arrow_array(&self) -> ArrayRef {
529 let unsigned_array = self.bit_packed.to_primitive();
531 let (_data_type, delta_values, _nulls) = unsigned_array.into_parts();
532 let nulls = self.bit_packed.nulls();
533
534 let mut reconstructed = Vec::with_capacity(delta_values.len());
536 let mut current_value = self.reference_value; if let Some(nulls) = nulls {
539 let mut have_prev = false;
540 for (i, &delta_unsigned) in delta_values.iter().enumerate() {
541 if !nulls.is_valid(i) {
542 reconstructed.push(T::Native::ZERO); continue;
544 }
545 if !have_prev {
546 reconstructed.push(current_value);
548 have_prev = true;
549 } else {
550 let zigzag: u64 = delta_unsigned.as_();
552 let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
553 let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
554 current_value = current_value.add_wrapping(delta);
555 reconstructed.push(current_value);
556 }
557 }
558 } else {
559 reconstructed.push(current_value); for &delta_unsigned in delta_values.iter().skip(1) {
562 let zigzag: u64 = delta_unsigned.as_();
563 let delta_i64 = (zigzag >> 1) as i64 ^ -((zigzag & 1) as i64);
564 let delta: T::Native = T::Native::from_i64(delta_i64).unwrap();
565 current_value = current_value.add_wrapping(delta);
566 reconstructed.push(current_value);
567 }
568 }
569
570 let values = ScalarBuffer::from_iter(reconstructed);
571 Arc::new(PrimitiveArray::<T>::new(values, nulls.cloned()))
572 }
573
574 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
575 let arrow_array = self.to_arrow_array();
576 let selection = BooleanArray::new(selection.clone(), None);
577 arrow::compute::kernels::filter::filter(&arrow_array, &selection).unwrap()
578 }
579
580 fn try_eval_predicate(
581 &self,
582 _predicate: &Arc<dyn PhysicalExpr>,
583 _filter: &BooleanBuffer,
584 ) -> Option<BooleanArray> {
585 None
587 }
588
589 fn to_bytes(&self) -> Vec<u8> {
590 self.to_bytes_inner()
591 }
592
593 fn data_type(&self) -> LiquidDataType {
594 LiquidDataType::Integer
595 }
596
597 fn squeeze(
598 &self,
599 _io: Arc<dyn SqueezeIoHandler>,
600 _expression_hint: Option<&CacheExpression>,
601 ) -> Option<(crate::liquid_array::LiquidSqueezedArrayRef, bytes::Bytes)> {
602 None
604 }
605}
606
607impl<T> LiquidPrimitiveArray<T>
608where
609 T: LiquidPrimitiveType,
610{
611 fn bit_pack_starting_loc() -> usize {
612 let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
613 (header_size + 7) & !7
614 }
615
616 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
635 let physical_type_id = get_physical_type_id::<T>();
637 let logical_type_id = super::LiquidDataType::Integer as u16;
638 let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
639
640 let bit_pack_starting_loc = Self::bit_pack_starting_loc();
641 let mut result = Vec::with_capacity(bit_pack_starting_loc + 256); result.extend_from_slice(&header.to_bytes());
645
646 let ref_value_bytes = unsafe {
648 std::slice::from_raw_parts(
649 &self.reference_value as *const T::Native as *const u8,
650 std::mem::size_of::<T::Native>(),
651 )
652 };
653 result.extend_from_slice(ref_value_bytes);
654 while result.len() < bit_pack_starting_loc {
655 result.push(0);
656 }
657
658 self.bit_packed.to_bytes(&mut result);
660
661 result
662 }
663
664 pub fn from_bytes(bytes: Bytes) -> Self {
666 let header = LiquidIPCHeader::from_bytes(&bytes);
667
668 let physical_id = header.physical_type_id;
669 assert_eq!(physical_id, get_physical_type_id::<T>());
670 let logical_id = header.logical_type_id;
671 assert_eq!(logical_id, super::LiquidDataType::Integer as u16);
672
673 let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
675 let reference_value =
676 unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
677
678 let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
680 let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
681
682 Self {
683 bit_packed,
684 reference_value,
685 squeeze_policy: IntegerSqueezePolicy::default(),
686 }
687 }
688}
689
690impl<T> LiquidPrimitiveDeltaArray<T>
691where
692 T: LiquidPrimitiveType,
693{
694 fn bit_pack_starting_loc() -> usize {
695 let header_size = LiquidIPCHeader::size() + std::mem::size_of::<T::Native>();
696 (header_size + 7) & !7
697 }
698
699 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
700 let physical_type_id = get_physical_type_id::<T>();
702 let logical_type_id = 1u16; let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
704
705 let bit_pack_starting_loc = Self::bit_pack_starting_loc();
706 let mut result = Vec::with_capacity(bit_pack_starting_loc + 256);
707
708 result.extend_from_slice(&header.to_bytes());
710
711 let ref_value_bytes = unsafe {
713 std::slice::from_raw_parts(
714 &self.reference_value as *const T::Native as *const u8,
715 std::mem::size_of::<T::Native>(),
716 )
717 };
718 result.extend_from_slice(ref_value_bytes);
719 while result.len() < bit_pack_starting_loc {
720 result.push(0);
721 }
722
723 self.bit_packed.to_bytes(&mut result);
725
726 result
727 }
728
729 pub fn from_bytes(bytes: Bytes) -> Self {
731 let header = LiquidIPCHeader::from_bytes(&bytes);
732
733 let physical_id = header.physical_type_id;
734 assert_eq!(physical_id, get_physical_type_id::<T>());
735 let logical_id = header.logical_type_id;
736 assert_eq!(logical_id, 1u16); let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
740 let reference_value =
741 unsafe { (ref_value_ptr as *const u8 as *const T::Native).read_unaligned() };
742
743 let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
745 let bit_packed = BitPackedArray::<T::UnSignedType>::from_bytes(bit_packed_data);
746
747 Self {
748 bit_packed,
749 reference_value,
750 }
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757 use arrow::array::Array;
758
759 macro_rules! test_roundtrip {
760 ($test_name:ident, $type:ty, $values:expr) => {
761 #[test]
762 fn $test_name() {
763 let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
765 let array = PrimitiveArray::<$type>::from(original.clone());
766
767 let liquid_array = LiquidPrimitiveArray::<$type>::from_arrow_array(array.clone());
769 let result_array = liquid_array.to_arrow_array();
770 let bytes_array =
771 LiquidPrimitiveArray::<$type>::from_bytes(liquid_array.to_bytes().into());
772
773 assert_eq!(result_array.as_ref(), &array);
774 assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
775 }
776 };
777 }
778
779 test_roundtrip!(
781 test_int8_roundtrip_basic,
782 Int8Type,
783 vec![Some(1), Some(2), Some(3), None, Some(5)]
784 );
785 test_roundtrip!(
786 test_int8_roundtrip_negative,
787 Int8Type,
788 vec![Some(-128), Some(-64), Some(0), Some(63), Some(127)]
789 );
790
791 test_roundtrip!(
793 test_int16_roundtrip_basic,
794 Int16Type,
795 vec![Some(1), Some(2), Some(3), None, Some(5)]
796 );
797 test_roundtrip!(
798 test_int16_roundtrip_negative,
799 Int16Type,
800 vec![
801 Some(-32768),
802 Some(-16384),
803 Some(0),
804 Some(16383),
805 Some(32767)
806 ]
807 );
808
809 test_roundtrip!(
811 test_int32_roundtrip_basic,
812 Int32Type,
813 vec![Some(1), Some(2), Some(3), None, Some(5)]
814 );
815 test_roundtrip!(
816 test_int32_roundtrip_negative,
817 Int32Type,
818 vec![
819 Some(-2147483648),
820 Some(-1073741824),
821 Some(0),
822 Some(1073741823),
823 Some(2147483647)
824 ]
825 );
826
827 test_roundtrip!(
829 test_int64_roundtrip_basic,
830 Int64Type,
831 vec![Some(1), Some(2), Some(3), None, Some(5)]
832 );
833 test_roundtrip!(
834 test_int64_roundtrip_negative,
835 Int64Type,
836 vec![
837 Some(-9223372036854775808),
838 Some(-4611686018427387904),
839 Some(0),
840 Some(4611686018427387903),
841 Some(9223372036854775807)
842 ]
843 );
844
845 test_roundtrip!(
847 test_uint8_roundtrip,
848 UInt8Type,
849 vec![Some(0), Some(128), Some(255), None, Some(64)]
850 );
851 test_roundtrip!(
852 test_uint16_roundtrip,
853 UInt16Type,
854 vec![Some(0), Some(32768), Some(65535), None, Some(16384)]
855 );
856 test_roundtrip!(
857 test_uint32_roundtrip,
858 UInt32Type,
859 vec![
860 Some(0),
861 Some(2147483648),
862 Some(4294967295),
863 None,
864 Some(1073741824)
865 ]
866 );
867 test_roundtrip!(
868 test_uint64_roundtrip,
869 UInt64Type,
870 vec![
871 Some(0),
872 Some(9223372036854775808),
873 Some(18446744073709551615),
874 None,
875 Some(4611686018427387904)
876 ]
877 );
878
879 test_roundtrip!(
880 test_date32_roundtrip,
881 Date32Type,
882 vec![Some(-365), Some(0), Some(365), None, Some(18262)]
883 );
884
885 test_roundtrip!(
886 test_date64_roundtrip,
887 Date64Type,
888 vec![Some(-365), Some(0), Some(365), None, Some(18262)]
889 );
890
891 #[test]
893 fn test_all_nulls() {
894 let original: Vec<Option<i32>> = vec![None, None, None];
895 let array = PrimitiveArray::<Int32Type>::from(original.clone());
896 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
897 let result_array = liquid_array.to_arrow_array();
898
899 assert_eq!(result_array.len(), original.len());
900 assert_eq!(result_array.null_count(), original.len());
901 }
902
903 #[test]
904 fn test_all_nulls_filter() {
905 let original: Vec<Option<i32>> = vec![None, None, None];
906 let array = PrimitiveArray::<Int32Type>::from(original.clone());
907 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
908 let result_array = liquid_array.filter(&BooleanBuffer::from(vec![true, false, true]));
909
910 assert_eq!(result_array.len(), 2);
911 assert_eq!(result_array.null_count(), 2);
912 }
913
914 #[test]
915 fn test_zero_reference_value() {
916 let original: Vec<Option<i32>> = vec![Some(0), Some(1), Some(2), None, Some(4)];
917 let array = PrimitiveArray::<Int32Type>::from(original.clone());
918 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
919 let result_array = liquid_array.to_arrow_array();
920
921 assert_eq!(liquid_array.reference_value, 0);
922 assert_eq!(result_array.as_ref(), &array);
923 }
924
925 #[test]
926 fn test_single_value() {
927 let original: Vec<Option<i32>> = vec![Some(42)];
928 let array = PrimitiveArray::<Int32Type>::from(original.clone());
929 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
930 let result_array = liquid_array.to_arrow_array();
931
932 assert_eq!(result_array.as_ref(), &array);
933 }
934
935 #[test]
936 fn test_filter_basic() {
937 let original = vec![Some(1), Some(2), Some(3), None, Some(5)];
939 let array = PrimitiveArray::<Int32Type>::from(original);
940 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
941
942 let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
944
945 let result_array = liquid_array.filter(&selection);
947
948 let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
950
951 assert_eq!(result_array.as_ref(), &expected);
952 }
953
954 #[test]
955 fn test_original_arrow_data_type_returns_int32() {
956 let array = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
957 let liquid = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
958 assert_eq!(liquid.original_arrow_data_type(), DataType::Int32);
959 }
960
961 #[test]
962 fn test_filter_all_nulls() {
963 let original = vec![None, None, None, None];
965 let array = PrimitiveArray::<Int32Type>::from(original);
966 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
967
968 let selection = BooleanBuffer::from(vec![true, false, false, true]);
970
971 let result_array = liquid_array.filter(&selection);
972
973 let expected = PrimitiveArray::<Int32Type>::from(vec![None, None]);
974
975 assert_eq!(result_array.as_ref(), &expected);
976 }
977
978 #[test]
979 fn test_filter_empty_result() {
980 let original = vec![Some(1), Some(2), Some(3)];
981 let array = PrimitiveArray::<Int32Type>::from(original);
982 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
983
984 let selection = BooleanBuffer::from(vec![false, false, false]);
986
987 let result_array = liquid_array.filter(&selection);
988
989 assert_eq!(result_array.len(), 0);
990 }
991
992 #[test]
993 fn test_delta_encoding_basic_roundtrip() {
994 let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
995 let array = PrimitiveArray::<Int32Type>::from(original.clone());
996
997 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
998 let result_array = liquid_delta.to_arrow_array();
999
1000 assert_eq!(result_array.as_ref(), &array);
1001 }
1002
1003 #[test]
1004 fn test_delta_encoding_with_nulls() {
1005 let original = vec![Some(1), None, Some(4), Some(7), None, Some(12)];
1006 let array = PrimitiveArray::<Int32Type>::from(original.clone());
1007
1008 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1009 let result_array = liquid_delta.to_arrow_array();
1010
1011 assert_eq!(result_array.as_ref(), &array);
1012 }
1013
1014 #[test]
1015 fn test_delta_encoding_serialization() {
1016 let original = vec![Some(1), Some(3), Some(6), Some(10), Some(15)];
1017 let array = PrimitiveArray::<Int32Type>::from(original.clone());
1018
1019 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array.clone());
1020 let bytes = liquid_delta.to_bytes();
1021 let reconstructed = LiquidPrimitiveDeltaArray::<Int32Type>::from_bytes(bytes.into());
1022 let result_array = reconstructed.to_arrow_array();
1023
1024 assert_eq!(result_array.as_ref(), &array);
1025 }
1026
1027 #[test]
1028 fn test_memory_comparison_sequential_data() {
1029 let sequential_data: Vec<Option<i32>> = (0..1000).map(Some).collect();
1031 let array = PrimitiveArray::<Int32Type>::from(sequential_data);
1032
1033 let liquid_regular = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
1034 let liquid_delta = LiquidPrimitiveDeltaArray::<Int32Type>::from_arrow_array(array);
1035
1036 let regular_size = liquid_regular.get_array_memory_size();
1037 let delta_size = liquid_delta.get_array_memory_size();
1038
1039 println!(
1040 "Sequential data - Regular: {} bytes, Delta: {} bytes",
1041 regular_size, delta_size
1042 );
1043 assert!(
1044 delta_size <= regular_size,
1045 "Delta encoding should be more efficient for sequential data"
1046 );
1047 }
1048}