1use std::{
6 any::Any,
7 fmt::Debug,
8 num::NonZero,
9 ops::{Mul, Shl, Shr},
10 sync::Arc,
11};
12
13use arrow::{
14 array::{
15 Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, AsArray, BooleanArray,
16 PrimitiveArray,
17 },
18 buffer::{BooleanBuffer, ScalarBuffer},
19 datatypes::{
20 ArrowNativeType, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type, UInt64Type,
21 },
22};
23use arrow_schema::DataType;
24use datafusion::{
25 physical_plan::{
26 PhysicalExpr,
27 expressions::{BinaryExpr, Literal},
28 },
29 scalar::ScalarValue,
30};
31use fastlanes::BitPacking;
32use num_traits::{AsPrimitive, Float, FromPrimitive};
33
34use super::LiquidDataType;
35use crate::liquid_array::LiquidArray;
36use crate::liquid_array::ipc::{PhysicalTypeMarker, get_physical_type_id};
37use crate::liquid_array::raw::BitPackedArray;
38use crate::liquid_array::{
39 LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking, Operator, SqueezeResult,
40 ipc::LiquidIPCHeader,
41};
42use crate::utils::get_bit_width;
43use crate::{cache::CacheExpression, liquid_array::SqueezeIoHandler};
44use bytes::Bytes;
45
46mod private {
47 use arrow::{
48 array::ArrowNumericType,
49 datatypes::{Float32Type, Float64Type},
50 };
51 use num_traits::AsPrimitive;
52
53 pub trait Sealed: ArrowNumericType<Native: AsPrimitive<f64> + AsPrimitive<f32>> {}
54
55 impl Sealed for Float32Type {}
56 impl Sealed for Float64Type {}
57}
58
59const NUM_SAMPLES: usize = 1024; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
62pub enum FloatSqueezePolicy {
63 #[default]
65 Quantize = 0,
66}
67
68pub trait LiquidFloatType:
71 ArrowPrimitiveType<
72 Native: AsPrimitive<
73 <Self::UnsignedIntType as ArrowPrimitiveType>::Native >
75 + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
76 + FromPrimitive
77 + AsPrimitive<<Self as ArrowPrimitiveType>::Native>
78 + Mul<<Self as ArrowPrimitiveType>::Native>
79 + Float >
81 + private::Sealed
82 + Debug
83 + PhysicalTypeMarker
84{
85 type UnsignedIntType:
86 ArrowPrimitiveType<
87 Native: BitPacking +
88 AsPrimitive<<Self as ArrowPrimitiveType>::Native>
89 + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
90 + AsPrimitive<u64>
91 >
92 + Debug;
93 type SignedIntType:
94 ArrowPrimitiveType<
95 Native: AsPrimitive<<Self as ArrowPrimitiveType>::Native>
96 + AsPrimitive<<Self::UnsignedIntType as ArrowPrimitiveType>::Native>
97 + Ord
98 + Shr<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
99 + Shl<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
100 + From<i32>
101 >
102 + Debug + Sync + Send;
103
104 const SWEET: <Self as ArrowPrimitiveType>::Native;
105 const MAX_EXPONENT: u8;
106 const FRACTIONAL_BITS: u8;
107 const F10: &'static [<Self as ArrowPrimitiveType>::Native];
108 const IF10: &'static [<Self as ArrowPrimitiveType>::Native];
109
110 #[inline]
111 fn fast_round(val: <Self as ArrowPrimitiveType>::Native) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
112 ((val + Self::SWEET) - Self::SWEET).as_()
113 }
114
115 #[inline]
116 fn encode_single_unchecked(val: &<Self as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
117 Self::fast_round(*val * Self::F10[exp.e as usize] * Self::IF10[exp.f as usize])
118 }
119
120 #[inline]
121 fn decode_single(val: &<Self::SignedIntType as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self as ArrowPrimitiveType>::Native {
122 let decoded_float: <Self as ArrowPrimitiveType>::Native = (*val).as_();
123 decoded_float * Self::F10[exp.f as usize] * Self::IF10[exp.e as usize]
124 }
125
126}
127
128impl LiquidFloatType for Float32Type {
129 type UnsignedIntType = UInt32Type;
130 type SignedIntType = Int32Type;
131 const FRACTIONAL_BITS: u8 = 23;
132 const MAX_EXPONENT: u8 = 10;
133 const SWEET: <Self as ArrowPrimitiveType>::Native = (1 << Self::FRACTIONAL_BITS)
134 as <Self as ArrowPrimitiveType>::Native
135 + (1 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
136 const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
137 1.0,
138 10.0,
139 100.0,
140 1000.0,
141 10000.0,
142 100000.0,
143 1000000.0,
144 10000000.0,
145 100000000.0,
146 1000000000.0,
147 10000000000.0, ];
149 const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
150 1.0,
151 0.1,
152 0.01,
153 0.001,
154 0.0001,
155 0.00001,
156 0.000001,
157 0.0000001,
158 0.00000001,
159 0.000000001,
160 0.0000000001, ];
162}
163
164impl LiquidFloatType for Float64Type {
165 type UnsignedIntType = UInt64Type;
166 type SignedIntType = Int64Type;
167 const FRACTIONAL_BITS: u8 = 52;
168 const MAX_EXPONENT: u8 = 18;
169 const SWEET: <Self as ArrowPrimitiveType>::Native = (1u64 << Self::FRACTIONAL_BITS)
170 as <Self as ArrowPrimitiveType>::Native
171 + (1u64 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
172 const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
173 1.0,
174 10.0,
175 100.0,
176 1000.0,
177 10000.0,
178 100000.0,
179 1000000.0,
180 10000000.0,
181 100000000.0,
182 1000000000.0,
183 10000000000.0,
184 100000000000.0,
185 1000000000000.0,
186 10000000000000.0,
187 100000000000000.0,
188 1000000000000000.0,
189 10000000000000000.0,
190 100000000000000000.0,
191 1000000000000000000.0,
192 10000000000000000000.0,
193 100000000000000000000.0,
194 1000000000000000000000.0,
195 10000000000000000000000.0,
196 100000000000000000000000.0, ];
198
199 const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
200 1.0,
201 0.1,
202 0.01,
203 0.001,
204 0.0001,
205 0.00001,
206 0.000001,
207 0.0000001,
208 0.00000001,
209 0.000000001,
210 0.0000000001,
211 0.00000000001,
212 0.000000000001,
213 0.0000000000001,
214 0.00000000000001,
215 0.000000000000001,
216 0.0000000000000001,
217 0.00000000000000001,
218 0.000000000000000001,
219 0.0000000000000000001,
220 0.00000000000000000001,
221 0.000000000000000000001,
222 0.0000000000000000000001,
223 0.00000000000000000000001, ];
225}
226
227pub type LiquidFloat32Array = LiquidFloatArray<Float32Type>;
229pub type LiquidFloat64Array = LiquidFloatArray<Float64Type>;
231
232#[derive(Debug, Clone)]
234pub struct LiquidFloatArray<T: LiquidFloatType> {
235 exponent: Exponents,
236 bit_packed: BitPackedArray<T::UnsignedIntType>,
237 patch_indices: Vec<u64>,
238 patch_values: Vec<T::Native>,
239 reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
240 squeeze_policy: FloatSqueezePolicy,
241}
242
243impl<T> LiquidFloatArray<T>
244where
245 T: LiquidFloatType,
246{
247 pub fn is_empty(&self) -> bool {
249 self.len() == 0
250 }
251
252 pub fn len(&self) -> usize {
254 self.bit_packed.len()
255 }
256
257 pub fn get_array_memory_size(&self) -> usize {
259 self.bit_packed.get_array_memory_size()
260 + size_of::<Exponents>()
261 + self.patch_indices.capacity() * size_of::<u64>()
262 + self.patch_values.capacity() * size_of::<T::Native>()
263 + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
264 }
265
266 pub fn from_arrow_array(arrow_array: arrow::array::PrimitiveArray<T>) -> LiquidFloatArray<T> {
268 let best_exponents = get_best_exponents::<T>(&arrow_array);
269 encode_arrow_array(&arrow_array, &best_exponents)
270 }
271
272 pub fn squeeze_policy(&self) -> FloatSqueezePolicy {
274 self.squeeze_policy
275 }
276}
277
278impl<T> LiquidArray for LiquidFloatArray<T>
279where
280 T: LiquidFloatType,
281{
282 fn as_any(&self) -> &dyn Any {
283 self
284 }
285
286 fn get_array_memory_size(&self) -> usize {
287 self.get_array_memory_size()
288 }
289
290 fn len(&self) -> usize {
291 self.len()
292 }
293
294 #[inline]
295 fn to_arrow_array(&self) -> ArrayRef {
296 let unsigned_array = self.bit_packed.to_primitive();
297 let (_data_type, values, _nulls) = unsigned_array.into_parts();
298 let nulls = self.bit_packed.nulls();
299 let mut decoded_values = Vec::from_iter(values.iter().map(|v| {
301 let mut val: <T::SignedIntType as ArrowPrimitiveType>::Native = (*v).as_();
302 val = val.add_wrapping(self.reference_value);
303 T::decode_single(&val, &self.exponent)
304 }));
305
306 if !self.patch_indices.is_empty() {
308 for i in 0..self.patch_indices.len() {
309 decoded_values[self.patch_indices[i].as_usize()] = self.patch_values[i];
310 }
311 }
312
313 Arc::new(PrimitiveArray::<T>::new(
314 ScalarBuffer::<<T as ArrowPrimitiveType>::Native>::from(decoded_values),
315 nulls.cloned(),
316 ))
317 }
318
319 fn original_arrow_data_type(&self) -> DataType {
320 T::DATA_TYPE.clone()
321 }
322
323 fn data_type(&self) -> LiquidDataType {
324 LiquidDataType::Float
325 }
326
327 fn to_bytes(&self) -> Vec<u8> {
328 self.to_bytes_inner()
329 }
330
331 fn is_empty(&self) -> bool {
332 self.len() == 0
333 }
334
335 fn to_best_arrow_array(&self) -> ArrayRef {
336 self.to_arrow_array()
337 }
338
339 fn squeeze(
340 &self,
341 io: Arc<dyn SqueezeIoHandler>,
342 _expression_hint: Option<&CacheExpression>,
343 ) -> Option<(super::LiquidSqueezedArrayRef, bytes::Bytes)> {
344 let orig_bw = self.bit_packed.bit_width()?;
345 if orig_bw.get() < 8 {
346 return None;
347 }
348
349 let new_bw = orig_bw.get() / 2;
351
352 let full_bytes = Bytes::from(self.to_bytes_inner());
353 let disk_range = 0u64..(full_bytes.len() as u64);
354
355 let (_dt, values, nulls) = self.bit_packed.to_primitive().into_parts();
356
357 match self.squeeze_policy {
358 FloatSqueezePolicy::Quantize => {
359 let shift = orig_bw.get() - new_bw;
360 let quantized_min = self.reference_value.shr(shift);
361 let quantized_values: ScalarBuffer<
363 <T::UnsignedIntType as ArrowPrimitiveType>::Native,
364 > = ScalarBuffer::from_iter(values.iter().map(|&v| {
365 let signed_val: <T::SignedIntType as ArrowPrimitiveType>::Native = v.as_();
366 let v_signed = self.reference_value.add_wrapping(signed_val);
367 let v_quantized: <T::SignedIntType as ArrowPrimitiveType>::Native =
368 v_signed.shr(shift);
369 v_quantized.sub_wrapping(quantized_min).as_()
370 }));
371 let quantized_array =
372 PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
373 quantized_values,
374 nulls.clone(),
375 );
376 let quantized_bitpacked =
377 BitPackedArray::from_primitive(quantized_array, NonZero::new(new_bw).unwrap());
378 let hybrid = LiquidFloatQuantizedArray::<T> {
379 exponent: self.exponent,
380 quantized: quantized_bitpacked,
381 reference_value: self.reference_value,
382 bucket_width: shift,
383 disk_range,
384 io,
385 patch_indices: self.patch_indices.clone(),
386 patch_values: self.patch_values.clone(),
387 };
388 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
389 }
390 }
391 }
392}
393
394impl<T> LiquidFloatArray<T>
395where
396 T: LiquidFloatType,
397{
398 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
439 let physical_type_id = get_physical_type_id::<T>();
441 let logical_type_id = LiquidDataType::Float as u16;
442 let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
443
444 let mut result = Vec::with_capacity(256); result.extend_from_slice(&header.to_bytes());
448
449 let ref_value_bytes = unsafe {
451 std::slice::from_raw_parts(
452 &self.reference_value as *const <T::SignedIntType as ArrowPrimitiveType>::Native
453 as *const u8,
454 std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>(),
455 )
456 };
457 result.extend_from_slice(ref_value_bytes);
458
459 let exponents_starting_loc = (result.len() + 7) & !7;
460 while result.len() < exponents_starting_loc {
462 result.push(0);
463 }
464
465 let exponent_e_bytes =
466 unsafe { std::slice::from_raw_parts(&self.exponent.e as *const u8, 1) };
467 let exponent_f_bytes =
468 unsafe { std::slice::from_raw_parts(&self.exponent.f as *const u8, 1) };
469 result.extend_from_slice(exponent_e_bytes);
471 result.extend_from_slice(exponent_f_bytes);
472 for _i in 0..6 {
473 result.push(0);
474 }
475
476 let patch_length = self.patch_indices.len() as u64;
478
479 let patch_length_bytes = unsafe {
480 std::slice::from_raw_parts(
481 &patch_length as *const u64 as *const u8,
482 std::mem::size_of::<u64>(),
483 )
484 };
485
486 result.extend_from_slice(patch_length_bytes);
488
489 if !self.patch_indices.is_empty() {
490 let patch_indices_bytes = unsafe {
491 std::slice::from_raw_parts(
492 self.patch_indices.as_ptr() as *const u8,
493 std::mem::size_of::<u64>() * self.patch_indices.len(),
494 )
495 };
496
497 result.extend_from_slice(patch_indices_bytes);
499
500 let patch_values_bytes = unsafe {
502 std::slice::from_raw_parts(
503 self.patch_values.as_ptr() as *const u8,
504 std::mem::size_of::<T::Native>() * self.patch_indices.len(),
505 )
506 };
507 result.extend_from_slice(patch_values_bytes);
508 }
509 let padding = ((result.len() + 7) & !7) - result.len();
510
511 for _i in 0..padding {
513 result.push(0);
514 }
515
516 self.bit_packed.to_bytes(&mut result);
518
519 result
520 }
521
522 pub fn from_bytes(bytes: Bytes) -> Self {
524 let header = LiquidIPCHeader::from_bytes(&bytes);
525
526 let physical_id = header.physical_type_id;
528 assert_eq!(physical_id, get_physical_type_id::<T>());
529 let logical_id = header.logical_type_id;
530 assert_eq!(logical_id, LiquidDataType::Float as u16);
531
532 let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
534 let reference_value = unsafe {
535 (ref_value_ptr as *const u8 as *const <T::SignedIntType as ArrowPrimitiveType>::Native)
536 .read_unaligned()
537 };
538
539 let mut next = ((LiquidIPCHeader::size()
541 + std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>())
542 + 7)
543 & !7;
544
545 let exponent_e = bytes[next];
547 let exponent_f = bytes[next + 1];
548 next += 8;
549
550 let mut patch_length = 0u64;
552 patch_length |= bytes[next] as u64;
553 patch_length |= (bytes[next + 1] as u64) << 8;
554 patch_length |= (bytes[next + 2] as u64) << 16;
555 patch_length |= (bytes[next + 3] as u64) << 24;
556 patch_length |= (bytes[next + 4] as u64) << 32;
557 patch_length |= (bytes[next + 5] as u64) << 40;
558 patch_length |= (bytes[next + 6] as u64) << 48;
559 patch_length |= (bytes[next + 7] as u64) << 56;
560 next += 8;
561
562 let mut patch_indices = Vec::new();
564 let mut patch_values = Vec::new();
565 if patch_length > 0 {
566 let count = patch_length as usize;
567 let idx_bytes = count * std::mem::size_of::<u64>();
568 let val_bytes = count * std::mem::size_of::<T::Native>();
569
570 let indices_slice = bytes.slice(next..next + idx_bytes);
571 next += idx_bytes;
572 patch_indices = unsafe {
573 let ptr = indices_slice.as_ptr() as *const u64;
574 std::slice::from_raw_parts(ptr, count).to_vec()
575 };
576
577 let values_slice = bytes.slice(next..next + val_bytes);
578 next += val_bytes;
579 patch_values = unsafe {
580 let ptr = values_slice.as_ptr() as *const T::Native;
581 std::slice::from_raw_parts(ptr, count).to_vec()
582 };
583 }
584
585 next = (next + 7) & !7;
587
588 let bit_packed = BitPackedArray::<T::UnsignedIntType>::from_bytes(bytes.slice(next..));
589
590 Self {
591 exponent: Exponents {
592 e: exponent_e,
593 f: exponent_f,
594 },
595 bit_packed,
596 patch_indices,
597 patch_values,
598 reference_value,
599 squeeze_policy: FloatSqueezePolicy::Quantize,
600 }
601 }
602}
603
604#[derive(Debug, Copy, Clone, PartialEq, Eq)]
605pub struct Exponents {
606 pub(crate) e: u8,
607 pub(crate) f: u8,
608}
609
610fn encode_arrow_array<T: LiquidFloatType>(
611 arrow_array: &PrimitiveArray<T>,
612 exp: &Exponents, ) -> LiquidFloatArray<T> {
614 let mut patch_indices: Vec<u64> = Vec::new();
615 let mut patch_values: Vec<T::Native> = Vec::new();
616 let mut patch_count: usize = 0;
617 let mut fill_value: Option<<T::SignedIntType as ArrowPrimitiveType>::Native> = None;
618 let values = arrow_array.values();
619 let nulls = arrow_array.nulls();
620
621 if arrow_array.null_count() == arrow_array.len() {
623 return LiquidFloatArray::<T> {
624 bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
625 exponent: Exponents { e: 0, f: 0 },
626 patch_indices: Vec::new(),
627 patch_values: Vec::new(),
628 reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native::ZERO,
629 squeeze_policy: FloatSqueezePolicy::Quantize,
630 };
631 }
632
633 let mut encoded_values = Vec::with_capacity(arrow_array.len());
634 for v in values.iter() {
635 let encoded = T::encode_single_unchecked(&v.as_(), exp);
636 let decoded = T::decode_single(&encoded, exp);
637 let neq = !decoded.eq(&v.as_()) as usize;
639 patch_count += neq;
640 encoded_values.push(encoded);
641 }
642
643 if patch_count > 0 {
644 patch_indices.resize_with(patch_count + 1, Default::default);
645 patch_values.resize_with(patch_count + 1, Default::default);
646 let mut patch_index: usize = 0;
647
648 for i in 0..encoded_values.len() {
649 let decoded = T::decode_single(&encoded_values[i], exp);
650 patch_indices[patch_index] = i.as_();
651 patch_values[patch_index] = arrow_array.value(i).as_();
652 patch_index += !(decoded.eq(&values[i].as_())) as usize;
653 }
654 assert_eq!(patch_index, patch_count);
655 unsafe {
656 patch_indices.set_len(patch_count);
657 patch_values.set_len(patch_count);
658 }
659 }
660
661 if patch_count > 0 && patch_count < arrow_array.len() {
664 for i in 0..encoded_values.len() {
665 if i >= patch_indices.len() || patch_indices[i] != i as u64 {
666 fill_value = encoded_values.get(i).copied();
667 break;
668 }
669 }
670 }
671
672 if let Some(fill_value) = fill_value {
675 for patch_idx in &patch_indices {
677 encoded_values[*patch_idx as usize] = fill_value;
678 }
679 }
680
681 let min = *encoded_values
682 .iter()
683 .min()
684 .expect("`encoded_values` shouldn't be all nulls");
685 let max = *encoded_values
686 .iter()
687 .max()
688 .expect("`encoded_values` shouldn't be all nulls");
689 let sub: <T::UnsignedIntType as ArrowPrimitiveType>::Native = max.sub_wrapping(min).as_();
690
691 let unsigned_encoded_values = encoded_values
692 .iter()
693 .map(|v| {
694 let k: <T::UnsignedIntType as ArrowPrimitiveType>::Native = v.sub_wrapping(min).as_();
695 k
696 })
697 .collect::<Vec<_>>();
698 let encoded_output = PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
699 ScalarBuffer::from(unsigned_encoded_values),
700 nulls.cloned(),
701 );
702
703 let bit_width = get_bit_width(sub.as_());
704 let bit_packed_array = BitPackedArray::from_primitive(encoded_output, bit_width);
705
706 LiquidFloatArray::<T> {
707 bit_packed: bit_packed_array,
708 exponent: *exp,
709 patch_indices,
710 patch_values,
711 reference_value: min,
712 squeeze_policy: FloatSqueezePolicy::Quantize,
713 }
714}
715
716fn get_best_exponents<T: LiquidFloatType>(arrow_array: &PrimitiveArray<T>) -> Exponents {
717 let mut best_exponents = Exponents { e: 0, f: 0 };
718 let mut min_encoded_size: usize = usize::MAX;
719
720 let sample_arrow_array: Option<PrimitiveArray<T>> =
721 (arrow_array.len() > NUM_SAMPLES).then(|| {
722 arrow_array
723 .iter()
724 .step_by(arrow_array.len() / NUM_SAMPLES)
725 .filter(|s| s.is_some())
726 .collect()
727 });
728
729 for e in 0..T::MAX_EXPONENT {
730 for f in 0..e {
731 let exp = Exponents { e, f };
732 let liquid_array =
733 encode_arrow_array(sample_arrow_array.as_ref().unwrap_or(arrow_array), &exp);
734 if liquid_array.get_array_memory_size() < min_encoded_size {
735 best_exponents = exp;
736 min_encoded_size = liquid_array.get_array_memory_size();
737 }
738 }
739 }
740 best_exponents
741}
742
743#[derive(Debug)]
744struct LiquidFloatQuantizedArray<T: LiquidFloatType> {
745 exponent: Exponents,
746 quantized: BitPackedArray<T::UnsignedIntType>,
747 reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
748 bucket_width: u8, disk_range: std::ops::Range<u64>,
750 io: Arc<dyn SqueezeIoHandler>,
751 patch_indices: Vec<u64>,
752 patch_values: Vec<T::Native>,
753}
754
755impl<T> LiquidFloatQuantizedArray<T>
756where
757 T: LiquidFloatType,
758{
759 #[allow(dead_code)]
760 fn as_any(&self) -> &dyn Any {
761 self
762 }
763
764 #[inline]
765 fn len(&self) -> usize {
766 self.quantized.len()
767 }
768
769 fn new_from_filtered(
770 &self,
771 filtered: PrimitiveArray<<T as LiquidFloatType>::UnsignedIntType>,
772 ) -> Self {
773 let bit_width = self
774 .quantized
775 .bit_width()
776 .expect("quantized bit width must exist");
777 let quantized = BitPackedArray::from_primitive(filtered, bit_width);
778 Self {
779 exponent: self.exponent,
780 quantized,
781 reference_value: self.reference_value,
782 bucket_width: self.bucket_width,
783 io: self.io.clone(),
784 patch_indices: self.patch_indices.clone(),
785 patch_values: self.patch_values.clone(),
786 disk_range: self.disk_range.clone(),
787 }
788 }
789
790 fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
791 let q_prim: PrimitiveArray<T::UnsignedIntType> = self.quantized.to_primitive();
792 let selection = BooleanArray::new(selection.clone(), None);
793 let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
794 let filtered = filtered.as_primitive::<T::UnsignedIntType>().clone();
795 self.new_from_filtered(filtered)
796 }
797
798 async fn hydrate_full_arrow(&self) -> ArrayRef {
799 let bytes = self
800 .io
801 .read(Some(self.disk_range.clone()))
802 .await
803 .expect("read squeezed backing");
804 let liquid = crate::liquid_array::ipc::read_from_bytes(
805 bytes,
806 &crate::liquid_array::ipc::LiquidIPCContext::new(None),
807 );
808 liquid.to_arrow_array()
809 }
810
811 #[inline]
812 fn handle_eq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
813 if k < lo || k > hi { Some(false) } else { None }
814 }
815
816 #[inline]
817 fn handle_neq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
818 if k < lo || k > hi { Some(true) } else { None }
819 }
820
821 #[inline]
822 fn handle_lt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
823 if k <= lo {
824 Some(false)
825 } else if hi < k {
826 Some(true)
827 } else {
828 None
829 }
830 }
831
832 #[inline]
833 fn handle_lteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
834 if k < lo {
835 Some(false)
836 } else if hi <= k {
837 Some(true)
838 } else {
839 None
840 }
841 }
842
843 #[inline]
844 fn handle_gt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
845 if k < lo {
846 Some(true)
847 } else if hi <= k {
848 Some(false)
849 } else {
850 None
851 }
852 }
853
854 #[inline]
855 fn handle_gteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
856 if k <= lo {
857 Some(true)
858 } else if hi < k {
859 Some(false)
860 } else {
861 None
862 }
863 }
864
865 fn try_eval_predicate_inner(
866 &self,
867 op: &Operator,
868 literal: &Literal,
869 ) -> SqueezeResult<Option<BooleanArray>> {
870 let k_opt: Option<T::Native> = match literal.value() {
872 ScalarValue::Int8(Some(v)) => T::Native::from_i8(*v),
873 ScalarValue::Int16(Some(v)) => T::Native::from_i16(*v),
874 ScalarValue::Int32(Some(v)) => T::Native::from_i32(*v),
875 ScalarValue::Int64(Some(v)) => T::Native::from_i64(*v),
876 ScalarValue::UInt8(Some(v)) => T::Native::from_u8(*v),
877 ScalarValue::UInt16(Some(v)) => T::Native::from_u16(*v),
878 ScalarValue::UInt32(Some(v)) => T::Native::from_u32(*v),
879 ScalarValue::UInt64(Some(v)) => T::Native::from_u64(*v),
880 ScalarValue::Date32(Some(v)) => T::Native::from_i32(*v),
881 ScalarValue::Date64(Some(v)) => T::Native::from_i64(*v),
882 ScalarValue::Float32(Some(v)) => T::Native::from_f32(*v),
883 ScalarValue::Float64(Some(v)) => T::Native::from_f64(*v),
884 _ => None,
885 };
886 let Some(k) = k_opt else { return Ok(None) };
887 let q_prim = self.quantized.to_primitive();
888 let (_dt, values, _nulls) = q_prim.into_parts();
889
890 let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
891 let mut next_patch_index = 0;
892 let mut ignore_patches = false;
893 if self.patch_indices.is_empty() {
894 ignore_patches = true;
895 }
896 let comp_fn = match op {
897 Operator::Eq => Self::handle_eq,
898 Operator::NotEq => Self::handle_neq,
899 Operator::Lt => Self::handle_lt,
900 Operator::LtEq => Self::handle_lteq,
901 Operator::Gt => Self::handle_gt,
902 Operator::GtEq => Self::handle_gteq,
903 };
904 for (i, &b) in values.iter().enumerate() {
906 if let Some(nulls) = self.quantized.nulls()
907 && !nulls.is_valid(i)
908 {
909 out_vals.push(false);
910 continue;
911 }
912 if !ignore_patches && i as u64 == self.patch_indices[next_patch_index] {
913 next_patch_index += 1;
914 if next_patch_index == self.patch_indices.len() {
915 ignore_patches = true;
916 }
917 out_vals.push(false);
918 continue;
919 }
920
921 let val: <T::SignedIntType as ArrowPrimitiveType>::Native = b.as_();
922 let lo = (val << self.bucket_width).add_wrapping(self.reference_value);
923 let hi = ((val.add_wrapping(1i32.into())) << self.bucket_width)
924 .add_wrapping(self.reference_value);
925 let val_lower = T::decode_single(&lo, &self.exponent);
926 let val_higher = T::decode_single(&hi, &self.exponent);
927
928 let decided = comp_fn(val_lower, val_higher, k);
929 if let Some(v) = decided {
930 out_vals.push(v);
931 } else {
932 return Err(NeedsBacking);
933 }
934 }
935
936 for (idx, patch_idx) in self.patch_indices.iter().enumerate() {
939 let patch_value = self.patch_values[idx];
940 out_vals[*patch_idx as usize] = match op {
941 Operator::Eq => patch_value == k,
942 Operator::NotEq => patch_value != k,
943 Operator::Lt => patch_value < k,
944 Operator::LtEq => patch_value <= k,
945 Operator::Gt => patch_value > k,
946 Operator::GtEq => patch_value >= k,
947 }
948 }
949
950 let bool_buf = arrow::buffer::BooleanBuffer::from_iter(out_vals);
951 let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
952 Ok(Some(out))
953 }
954}
955
956#[async_trait::async_trait]
957impl<T> LiquidSqueezedArray for LiquidFloatQuantizedArray<T>
958where
959 T: LiquidFloatType,
960{
961 fn as_any(&self) -> &dyn Any {
962 self
963 }
964
965 fn get_array_memory_size(&self) -> usize {
966 self.quantized.get_array_memory_size()
967 + size_of::<Exponents>()
968 + self.patch_indices.capacity() * size_of::<u64>()
969 + self.patch_values.capacity() * size_of::<T::Native>()
970 + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
971 }
972
973 fn len(&self) -> usize {
974 LiquidFloatQuantizedArray::<T>::len(self)
975 }
976
977 async fn to_arrow_array(&self) -> ArrayRef {
978 self.hydrate_full_arrow().await
979 }
980
981 fn data_type(&self) -> LiquidDataType {
982 LiquidDataType::Float
983 }
984
985 fn original_arrow_data_type(&self) -> DataType {
986 T::DATA_TYPE.clone()
987 }
988
989 async fn try_eval_predicate(
990 &self,
991 expr: &Arc<dyn PhysicalExpr>,
992 filter: &BooleanBuffer,
993 ) -> Option<BooleanArray> {
994 let filtered = self.filter_inner(filter);
996
997 if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>()
998 && let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>()
999 {
1000 let op = binary_expr.op();
1001 let supported_op = Operator::from_datafusion(op);
1002 if let Some(supported_op) = supported_op {
1003 match filtered.try_eval_predicate_inner(&supported_op, literal) {
1004 Ok(Some(mask)) => return Some(mask),
1005 Ok(None) => return None,
1006 Err(NeedsBacking) => {}
1007 }
1008
1009 use arrow::array::cast::AsArray;
1011 use datafusion::logical_expr::ColumnarValue;
1012 use datafusion::physical_expr_common::datum::apply_cmp;
1013
1014 let full = self.hydrate_full_arrow().await;
1015 let selection_array = BooleanArray::new(filter.clone(), None);
1016 let filtered_arr = arrow::compute::filter(&full, &selection_array).ok()?;
1017 let filtered_len = filtered_arr.len();
1018
1019 let lhs = ColumnarValue::Array(filtered_arr);
1020 let rhs = ColumnarValue::Scalar(literal.value().clone());
1021 let result = match op {
1022 datafusion::logical_expr::Operator::NotEq => {
1023 apply_cmp(datafusion::logical_expr::Operator::NotEq, &lhs, &rhs)
1024 }
1025 datafusion::logical_expr::Operator::Eq => {
1026 apply_cmp(datafusion::logical_expr::Operator::Eq, &lhs, &rhs)
1027 }
1028 datafusion::logical_expr::Operator::Lt => {
1029 apply_cmp(datafusion::logical_expr::Operator::Lt, &lhs, &rhs)
1030 }
1031 datafusion::logical_expr::Operator::LtEq => {
1032 apply_cmp(datafusion::logical_expr::Operator::LtEq, &lhs, &rhs)
1033 }
1034 datafusion::logical_expr::Operator::Gt => {
1035 apply_cmp(datafusion::logical_expr::Operator::Gt, &lhs, &rhs)
1036 }
1037 datafusion::logical_expr::Operator::GtEq => {
1038 apply_cmp(datafusion::logical_expr::Operator::GtEq, &lhs, &rhs)
1039 }
1040 _ => return None,
1041 };
1042 let result = result.ok()?;
1043 return Some(result.into_array(filtered_len).ok()?.as_boolean().clone());
1044 }
1045 }
1046 None
1047 }
1048}
1049
1050#[cfg(test)]
1051mod tests {
1052 use datafusion::logical_expr::Operator;
1053 use futures::executor::block_on;
1054 use rand::{RngExt as _, SeedableRng as _, distr::uniform::SampleUniform, rngs::StdRng};
1055
1056 use crate::cache::TestSqueezeIo;
1057
1058 use super::*;
1059
1060 macro_rules! test_roundtrip {
1061 ($test_name: ident, $type:ty, $values: expr) => {
1062 #[test]
1063 fn $test_name() {
1064 let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
1065 let array = PrimitiveArray::<$type>::from(original.clone());
1066
1067 let liquid_array = LiquidFloatArray::<$type>::from_arrow_array(array.clone());
1069 let result_array = liquid_array.to_arrow_array();
1070 let bytes_array =
1071 LiquidFloatArray::<$type>::from_bytes(liquid_array.to_bytes().into());
1072
1073 assert_eq!(result_array.as_ref(), &array);
1074 assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
1075 }
1076 };
1077 }
1078
1079 test_roundtrip!(
1081 test_float32_roundtrip_basic,
1082 Float32Type,
1083 vec![Some(-1.0), Some(1.0), Some(0.0)]
1084 );
1085
1086 test_roundtrip!(
1087 test_float32_roundtrip_with_nones,
1088 Float32Type,
1089 vec![Some(-1.0), Some(1.0), Some(0.0), None]
1090 );
1091
1092 test_roundtrip!(
1093 test_float32_roundtrip_all_nones,
1094 Float32Type,
1095 vec![None, None, None, None]
1096 );
1097
1098 test_roundtrip!(test_float32_roundtrip_empty, Float32Type, vec![]);
1099
1100 test_roundtrip!(
1102 test_float64_roundtrip_basic,
1103 Float64Type,
1104 vec![Some(-1.0), Some(1.0), Some(0.0)]
1105 );
1106
1107 test_roundtrip!(
1108 test_float64_roundtrip_with_nones,
1109 Float64Type,
1110 vec![Some(-1.0), Some(1.0), Some(0.0), None]
1111 );
1112
1113 test_roundtrip!(
1114 test_float64_roundtrip_all_nones,
1115 Float64Type,
1116 vec![None, None, None, None]
1117 );
1118
1119 test_roundtrip!(test_float64_roundtrip_empty, Float64Type, vec![]);
1120
1121 #[test]
1123 fn test_filter_basic() {
1124 let original = vec![Some(1.0), Some(2.1), Some(3.2), None, Some(5.5)];
1126 let array = PrimitiveArray::<Float32Type>::from(original);
1127 let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1128
1129 let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
1131
1132 let result_array = liquid_array.filter(&selection);
1134
1135 let expected = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(3.2), Some(5.5)]);
1137
1138 assert_eq!(result_array.as_ref(), &expected);
1139 }
1140
1141 #[test]
1142 fn test_original_arrow_data_type_returns_float32() {
1143 let array = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.5)]);
1144 let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1145 assert_eq!(liquid.original_arrow_data_type(), DataType::Float32);
1146 }
1147
1148 #[test]
1149 fn test_filter_all_nulls() {
1150 let original = vec![None, None, None, None];
1152 let array = PrimitiveArray::<Float32Type>::from(original);
1153 let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1154
1155 let selection = BooleanBuffer::from(vec![true, false, false, true]);
1157
1158 let result_array = liquid_array.filter(&selection);
1159
1160 let expected = PrimitiveArray::<Float32Type>::from(vec![None, None]);
1161
1162 assert_eq!(result_array.as_ref(), &expected);
1163 }
1164
1165 #[test]
1166 fn test_filter_empty_result() {
1167 let original = vec![Some(1.0), Some(2.1), Some(3.3)];
1168 let array = PrimitiveArray::<Float32Type>::from(original);
1169 let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1170
1171 let selection = BooleanBuffer::from(vec![false, false, false]);
1173
1174 let result_array = liquid_array.filter(&selection);
1175
1176 assert_eq!(result_array.len(), 0);
1177 }
1178
1179 #[test]
1180 fn test_compression_f32_f64() {
1181 fn run_compression_test<T: LiquidFloatType>(
1182 type_name: &str,
1183 data_fn: impl Fn(usize) -> T::Native,
1184 ) {
1185 let original: Vec<T::Native> = (0..2000).map(data_fn).collect();
1186 let array = PrimitiveArray::<T>::from_iter_values(original);
1187 let uncompressed_size = array.get_array_memory_size();
1188
1189 let liquid_array = LiquidFloatArray::<T>::from_arrow_array(array);
1190 let compressed_size = liquid_array.get_array_memory_size();
1191
1192 println!(
1193 "Type: {type_name}, uncompressed_size: {uncompressed_size}, compressed_size: {compressed_size}"
1194 );
1195 assert!(
1197 compressed_size < uncompressed_size,
1198 "{type_name} compression failed to reduce size"
1199 );
1200 }
1201
1202 run_compression_test::<Float32Type>("f32", |i| i as f32);
1204
1205 run_compression_test::<Float64Type>("f64", |i| i as f64);
1207 }
1208
1209 fn make_f_array_with_range<T>(
1211 len: usize,
1212 base_min: T::Native,
1213 range: T::Native,
1214 null_prob: f32,
1215 rng: &mut StdRng,
1216 ) -> PrimitiveArray<T>
1217 where
1218 T: LiquidFloatType,
1219 <T as arrow::array::ArrowPrimitiveType>::Native: SampleUniform,
1220 PrimitiveArray<T>: From<Vec<Option<<T as ArrowPrimitiveType>::Native>>>,
1221 {
1222 let mut vals: Vec<Option<T::Native>> = Vec::with_capacity(len);
1223 for _ in 0..len {
1224 if rng.random_bool(null_prob as f64) {
1225 vals.push(None);
1226 } else {
1227 vals.push(Some(rng.random_range(base_min..(base_min + range))));
1228 }
1229 }
1230 PrimitiveArray::<T>::from(vals)
1231 }
1232
1233 #[test]
1234 fn hybrid_squeeze_unsqueezable_small_range() {
1235 let mut rng = StdRng::seed_from_u64(0x51_71);
1236 let arr = make_f_array_with_range::<Float32Type>(64, 10_000.0, 100.0, 0.1, &mut rng);
1237 let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(arr);
1238 assert!(
1239 liquid
1240 .squeeze(Arc::new(TestSqueezeIo::default()), None)
1241 .is_none()
1242 );
1243 }
1244
1245 #[test]
1246 fn hybrid_squeeze_full_read_roundtrip_f32() {
1247 let mut rng = StdRng::seed_from_u64(0x51_72);
1248 let arr = make_f_array_with_range::<Float32Type>(
1249 2000,
1250 -50_000.0,
1251 (1 << 16) as f32,
1252 0.1,
1253 &mut rng,
1254 );
1255 let liq = LiquidFloatArray::<Float32Type>::from_arrow_array(arr.clone());
1256 let bytes_baseline = liq.to_bytes();
1257 let io = Arc::new(TestSqueezeIo::default());
1258 let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1259 io.set_bytes(bytes.clone());
1260 let recovered = LiquidFloatArray::<Float32Type>::from_bytes(bytes.clone());
1262 assert_eq!(
1263 recovered.to_arrow_array().as_primitive::<Float32Type>(),
1264 &arr
1265 );
1266 assert_eq!(bytes_baseline, recovered.to_bytes());
1267
1268 let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1269 let mask = BooleanBuffer::from(vec![true; arr.len()]);
1270 let build_expr =
1271 |op: Operator, k: f32| -> Arc<dyn datafusion::physical_plan::PhysicalExpr> {
1272 let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(k))));
1273 Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1274 };
1275
1276 let resolvable_cases: Vec<(Operator, f32, bool)> = vec![
1278 (Operator::Eq, min - 1.0, false), (Operator::NotEq, min - 1.0, true), (Operator::Lt, min, false), (Operator::LtEq, min - 1.0, false), (Operator::Gt, min - 1.0, true), (Operator::GtEq, min, true), ];
1285
1286 for (op, k, expected_const) in resolvable_cases {
1287 let expr = build_expr(op, k);
1288 io.reset_reads();
1289 let got = block_on(hybrid.try_eval_predicate(&expr, &mask)).expect("supported");
1290 let expected = {
1291 let vals: Vec<Option<bool>> = (0..arr.len())
1292 .map(|i| {
1293 if arr.is_null(i) {
1294 None
1295 } else {
1296 Some(expected_const)
1297 }
1298 })
1299 .collect();
1300 BooleanArray::from(vals)
1301 };
1302 assert_eq!(io.reads(), 0);
1303 assert_eq!(got, expected);
1304 }
1305
1306 let k_present = (0..arr.len())
1308 .find_map(|i| {
1309 if arr.is_null(i) {
1310 None
1311 } else {
1312 Some(arr.value(i))
1313 }
1314 })
1315 .unwrap();
1316 let expr_eq_present = build_expr(Operator::Eq, k_present);
1317 io.reset_reads();
1318 let got = block_on(hybrid.try_eval_predicate(&expr_eq_present, &mask)).expect("supported");
1319 let expected = {
1320 let vals: Vec<Option<bool>> = (0..arr.len())
1321 .map(|i| {
1322 if arr.is_null(i) {
1323 None
1324 } else {
1325 Some(arr.value(i) == k_present)
1326 }
1327 })
1328 .collect();
1329 BooleanArray::from(vals)
1330 };
1331 assert!(io.reads() > 0);
1332 assert_eq!(got, expected);
1333 }
1334
1335 #[test]
1336 fn hybrid_squeeze_full_read_roundtrip_f64() {
1337 let mut rng = StdRng::seed_from_u64(0x51_72);
1338 let arr = make_f_array_with_range::<Float64Type>(
1339 2000,
1340 -50_000.0f64,
1341 (1 << 16) as f64,
1342 0.1,
1343 &mut rng,
1344 );
1345 let liq = LiquidFloatArray::<Float64Type>::from_arrow_array(arr.clone());
1346 let bytes_baseline = liq.to_bytes();
1347 let io = Arc::new(TestSqueezeIo::default());
1348 let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1349 io.set_bytes(bytes.clone());
1350 let recovered = LiquidFloatArray::<Float64Type>::from_bytes(bytes.clone());
1352 assert_eq!(
1353 recovered.to_arrow_array().as_primitive::<Float64Type>(),
1354 &arr
1355 );
1356 assert_eq!(bytes_baseline, recovered.to_bytes());
1357
1358 let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1359 let mask = BooleanBuffer::from(vec![true; arr.len()]);
1360 let build_expr =
1361 |op: Operator, k: f64| -> Arc<dyn datafusion::physical_plan::PhysicalExpr> {
1362 let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(k))));
1363 Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1364 };
1365
1366 let resolvable_cases: Vec<(Operator, f64, bool)> = vec![
1368 (Operator::Eq, min - 1.0, false), (Operator::NotEq, min - 1.0, true), (Operator::Lt, min, false), (Operator::LtEq, min - 1.0, false), (Operator::Gt, min - 1.0, true), (Operator::GtEq, min, true), ];
1375
1376 for (op, k, expected_const) in resolvable_cases {
1377 let expr = build_expr(op, k);
1378 io.reset_reads();
1379 let got = block_on(hybrid.try_eval_predicate(&expr, &mask)).expect("supported");
1380 let expected = {
1381 let vals: Vec<Option<bool>> = (0..arr.len())
1382 .map(|i| {
1383 if arr.is_null(i) {
1384 None
1385 } else {
1386 Some(expected_const)
1387 }
1388 })
1389 .collect();
1390 BooleanArray::from(vals)
1391 };
1392 assert_eq!(io.reads(), 0);
1393 assert_eq!(got, expected);
1394 }
1395
1396 let k_present = (0..arr.len())
1398 .find_map(|i| {
1399 if arr.is_null(i) {
1400 None
1401 } else {
1402 Some(arr.value(i))
1403 }
1404 })
1405 .unwrap();
1406 let expr_eq_present = build_expr(Operator::Eq, k_present);
1407 io.reset_reads();
1408 let got = block_on(hybrid.try_eval_predicate(&expr_eq_present, &mask)).expect("supported");
1409 let expected = {
1410 let vals: Vec<Option<bool>> = (0..arr.len())
1411 .map(|i| {
1412 if arr.is_null(i) {
1413 None
1414 } else {
1415 Some(arr.value(i) == k_present)
1416 }
1417 })
1418 .collect();
1419 BooleanArray::from(vals)
1420 };
1421 assert!(io.reads() > 0);
1422 assert_eq!(got, expected);
1423 }
1424}