1use std::{
6 any::Any,
7 fmt::Debug,
8 num::NonZero,
9 ops::{Mul, Shl, Shr},
10 sync::Arc,
11};
12
13use arrow::{
14 array::{
15 Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, AsArray, BooleanArray,
16 PrimitiveArray,
17 },
18 buffer::{BooleanBuffer, ScalarBuffer},
19 datatypes::{
20 ArrowNativeType, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type, UInt64Type,
21 },
22};
23use arrow_schema::DataType;
24use datafusion_common::ScalarValue;
25use datafusion_expr_common::columnar_value::ColumnarValue;
26use datafusion_expr_common::operator::Operator as DFOperator;
27use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
28use datafusion_physical_expr_common::datum::apply_cmp;
29use fastlanes::BitPacking;
30use num_traits::{AsPrimitive, Float, FromPrimitive};
31
32use super::LiquidDataType;
33use crate::cache::LiquidExpr;
34use crate::liquid_array::LiquidArray;
35use crate::liquid_array::ipc::{PhysicalTypeMarker, get_physical_type_id};
36use crate::liquid_array::raw::BitPackedArray;
37use crate::liquid_array::{
38 LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking, Operator, SqueezeResult,
39 SqueezedBacking, eval_predicate_on_array, ipc::LiquidIPCHeader,
40};
41use crate::utils::get_bit_width;
42use crate::{cache::CacheExpression, liquid_array::SqueezeIoHandler};
43use bytes::Bytes;
44
45mod private {
46 use arrow::{
47 array::ArrowNumericType,
48 datatypes::{Float32Type, Float64Type},
49 };
50 use num_traits::AsPrimitive;
51
52 pub trait Sealed: ArrowNumericType<Native: AsPrimitive<f64> + AsPrimitive<f32>> {}
53
54 impl Sealed for Float32Type {}
55 impl Sealed for Float64Type {}
56}
57
58const NUM_SAMPLES: usize = 1024; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub enum FloatSqueezePolicy {
62 #[default]
64 Quantize = 0,
65}
66
67pub trait LiquidFloatType:
70 ArrowPrimitiveType<
71 Native: AsPrimitive<
72 <Self::UnsignedIntType as ArrowPrimitiveType>::Native >
74 + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
75 + FromPrimitive
76 + AsPrimitive<<Self as ArrowPrimitiveType>::Native>
77 + Mul<<Self as ArrowPrimitiveType>::Native>
78 + Float >
80 + private::Sealed
81 + Debug
82 + PhysicalTypeMarker
83{
84 type UnsignedIntType:
85 ArrowPrimitiveType<
86 Native: BitPacking +
87 AsPrimitive<<Self as ArrowPrimitiveType>::Native>
88 + AsPrimitive<<Self::SignedIntType as ArrowPrimitiveType>::Native>
89 + AsPrimitive<u64>
90 >
91 + Debug;
92 type SignedIntType:
93 ArrowPrimitiveType<
94 Native: AsPrimitive<<Self as ArrowPrimitiveType>::Native>
95 + AsPrimitive<<Self::UnsignedIntType as ArrowPrimitiveType>::Native>
96 + Ord
97 + Shr<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
98 + Shl<u8, Output = <Self::SignedIntType as ArrowPrimitiveType>::Native>
99 + From<i32>
100 >
101 + Debug + Sync + Send;
102
103 const SWEET: <Self as ArrowPrimitiveType>::Native;
104 const MAX_EXPONENT: u8;
105 const FRACTIONAL_BITS: u8;
106 const F10: &'static [<Self as ArrowPrimitiveType>::Native];
107 const IF10: &'static [<Self as ArrowPrimitiveType>::Native];
108
109 #[inline]
110 fn fast_round(val: <Self as ArrowPrimitiveType>::Native) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
111 ((val + Self::SWEET) - Self::SWEET).as_()
112 }
113
114 #[inline]
115 fn encode_single_unchecked(val: &<Self as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self::SignedIntType as ArrowPrimitiveType>::Native {
116 Self::fast_round(*val * Self::F10[exp.e as usize] * Self::IF10[exp.f as usize])
117 }
118
119 #[inline]
120 fn decode_single(val: &<Self::SignedIntType as ArrowPrimitiveType>::Native, exp: &Exponents) -> <Self as ArrowPrimitiveType>::Native {
121 let decoded_float: <Self as ArrowPrimitiveType>::Native = (*val).as_();
122 decoded_float * Self::F10[exp.f as usize] * Self::IF10[exp.e as usize]
123 }
124
125}
126
127impl LiquidFloatType for Float32Type {
128 type UnsignedIntType = UInt32Type;
129 type SignedIntType = Int32Type;
130 const FRACTIONAL_BITS: u8 = 23;
131 const MAX_EXPONENT: u8 = 10;
132 const SWEET: <Self as ArrowPrimitiveType>::Native = (1 << Self::FRACTIONAL_BITS)
133 as <Self as ArrowPrimitiveType>::Native
134 + (1 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
135 const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
136 1.0,
137 10.0,
138 100.0,
139 1000.0,
140 10000.0,
141 100000.0,
142 1000000.0,
143 10000000.0,
144 100000000.0,
145 1000000000.0,
146 10000000000.0, ];
148 const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
149 1.0,
150 0.1,
151 0.01,
152 0.001,
153 0.0001,
154 0.00001,
155 0.000001,
156 0.0000001,
157 0.00000001,
158 0.000000001,
159 0.0000000001, ];
161}
162
163impl LiquidFloatType for Float64Type {
164 type UnsignedIntType = UInt64Type;
165 type SignedIntType = Int64Type;
166 const FRACTIONAL_BITS: u8 = 52;
167 const MAX_EXPONENT: u8 = 18;
168 const SWEET: <Self as ArrowPrimitiveType>::Native = (1u64 << Self::FRACTIONAL_BITS)
169 as <Self as ArrowPrimitiveType>::Native
170 + (1u64 << (Self::FRACTIONAL_BITS - 1)) as <Self as ArrowPrimitiveType>::Native;
171 const F10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
172 1.0,
173 10.0,
174 100.0,
175 1000.0,
176 10000.0,
177 100000.0,
178 1000000.0,
179 10000000.0,
180 100000000.0,
181 1000000000.0,
182 10000000000.0,
183 100000000000.0,
184 1000000000000.0,
185 10000000000000.0,
186 100000000000000.0,
187 1000000000000000.0,
188 10000000000000000.0,
189 100000000000000000.0,
190 1000000000000000000.0,
191 10000000000000000000.0,
192 100000000000000000000.0,
193 1000000000000000000000.0,
194 10000000000000000000000.0,
195 100000000000000000000000.0, ];
197
198 const IF10: &'static [<Self as ArrowPrimitiveType>::Native] = &[
199 1.0,
200 0.1,
201 0.01,
202 0.001,
203 0.0001,
204 0.00001,
205 0.000001,
206 0.0000001,
207 0.00000001,
208 0.000000001,
209 0.0000000001,
210 0.00000000001,
211 0.000000000001,
212 0.0000000000001,
213 0.00000000000001,
214 0.000000000000001,
215 0.0000000000000001,
216 0.00000000000000001,
217 0.000000000000000001,
218 0.0000000000000000001,
219 0.00000000000000000001,
220 0.000000000000000000001,
221 0.0000000000000000000001,
222 0.00000000000000000000001, ];
224}
225
226pub type LiquidFloat32Array = LiquidFloatArray<Float32Type>;
228pub type LiquidFloat64Array = LiquidFloatArray<Float64Type>;
230
231#[derive(Debug, Clone)]
233pub struct LiquidFloatArray<T: LiquidFloatType> {
234 exponent: Exponents,
235 bit_packed: BitPackedArray<T::UnsignedIntType>,
236 patch_indices: Vec<u64>,
237 patch_values: Vec<T::Native>,
238 reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
239 squeeze_policy: FloatSqueezePolicy,
240}
241
242impl<T> LiquidFloatArray<T>
243where
244 T: LiquidFloatType,
245{
246 pub fn is_empty(&self) -> bool {
248 self.len() == 0
249 }
250
251 pub fn len(&self) -> usize {
253 self.bit_packed.len()
254 }
255
256 pub fn get_array_memory_size(&self) -> usize {
258 self.bit_packed.get_array_memory_size()
259 + size_of::<Exponents>()
260 + self.patch_indices.capacity() * size_of::<u64>()
261 + self.patch_values.capacity() * size_of::<T::Native>()
262 + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
263 }
264
265 pub fn from_arrow_array(arrow_array: arrow::array::PrimitiveArray<T>) -> LiquidFloatArray<T> {
267 let best_exponents = get_best_exponents::<T>(&arrow_array);
268 encode_arrow_array(&arrow_array, &best_exponents)
269 }
270
271 pub fn squeeze_policy(&self) -> FloatSqueezePolicy {
273 self.squeeze_policy
274 }
275}
276
277impl<T> LiquidArray for LiquidFloatArray<T>
278where
279 T: LiquidFloatType,
280{
281 fn as_any(&self) -> &dyn Any {
282 self
283 }
284
285 fn get_array_memory_size(&self) -> usize {
286 self.get_array_memory_size()
287 }
288
289 fn len(&self) -> usize {
290 self.len()
291 }
292
293 #[inline]
294 fn to_arrow_array(&self) -> ArrayRef {
295 let unsigned_array = self.bit_packed.to_primitive();
296 let (_data_type, values, _nulls) = unsigned_array.into_parts();
297 let nulls = self.bit_packed.nulls();
298 let mut decoded_values = Vec::from_iter(values.iter().map(|v| {
300 let mut val: <T::SignedIntType as ArrowPrimitiveType>::Native = (*v).as_();
301 val = val.add_wrapping(self.reference_value);
302 T::decode_single(&val, &self.exponent)
303 }));
304
305 if !self.patch_indices.is_empty() {
307 for i in 0..self.patch_indices.len() {
308 decoded_values[self.patch_indices[i].as_usize()] = self.patch_values[i];
309 }
310 }
311
312 Arc::new(PrimitiveArray::<T>::new(
313 ScalarBuffer::<<T as ArrowPrimitiveType>::Native>::from(decoded_values),
314 nulls.cloned(),
315 ))
316 }
317
318 fn original_arrow_data_type(&self) -> DataType {
319 T::DATA_TYPE.clone()
320 }
321
322 fn data_type(&self) -> LiquidDataType {
323 LiquidDataType::Float
324 }
325
326 fn to_bytes(&self) -> Vec<u8> {
327 self.to_bytes_inner()
328 }
329
330 fn is_empty(&self) -> bool {
331 self.len() == 0
332 }
333
334 fn to_best_arrow_array(&self) -> ArrayRef {
335 self.to_arrow_array()
336 }
337
338 fn squeeze(
339 &self,
340 io: Arc<dyn SqueezeIoHandler>,
341 _expression_hint: Option<&CacheExpression>,
342 ) -> Option<(super::LiquidSqueezedArrayRef, bytes::Bytes)> {
343 let orig_bw = self.bit_packed.bit_width()?;
344 if orig_bw.get() < 8 {
345 return None;
346 }
347
348 let new_bw = orig_bw.get() / 2;
350
351 let full_bytes = Bytes::from(self.to_bytes_inner());
352 let disk_range = 0u64..(full_bytes.len() as u64);
353
354 let (_dt, values, nulls) = self.bit_packed.to_primitive().into_parts();
355
356 match self.squeeze_policy {
357 FloatSqueezePolicy::Quantize => {
358 let shift = orig_bw.get() - new_bw;
359 let quantized_min = self.reference_value.shr(shift);
360 let quantized_values: ScalarBuffer<
362 <T::UnsignedIntType as ArrowPrimitiveType>::Native,
363 > = ScalarBuffer::from_iter(values.iter().map(|&v| {
364 let signed_val: <T::SignedIntType as ArrowPrimitiveType>::Native = v.as_();
365 let v_signed = self.reference_value.add_wrapping(signed_val);
366 let v_quantized: <T::SignedIntType as ArrowPrimitiveType>::Native =
367 v_signed.shr(shift);
368 v_quantized.sub_wrapping(quantized_min).as_()
369 }));
370 let quantized_array =
371 PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
372 quantized_values,
373 nulls.clone(),
374 );
375 let quantized_bitpacked =
376 BitPackedArray::from_primitive(quantized_array, NonZero::new(new_bw).unwrap());
377 let hybrid = LiquidFloatQuantizedArray::<T> {
378 exponent: self.exponent,
379 quantized: quantized_bitpacked,
380 reference_value: self.reference_value,
381 bucket_width: shift,
382 disk_range,
383 io,
384 patch_indices: self.patch_indices.clone(),
385 patch_values: self.patch_values.clone(),
386 };
387 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
388 }
389 }
390 }
391}
392
393impl<T> LiquidFloatArray<T>
394where
395 T: LiquidFloatType,
396{
397 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
438 let physical_type_id = get_physical_type_id::<T>();
440 let logical_type_id = LiquidDataType::Float as u16;
441 let header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
442
443 let mut result = Vec::with_capacity(256); result.extend_from_slice(&header.to_bytes());
447
448 let ref_value_bytes = unsafe {
450 std::slice::from_raw_parts(
451 &self.reference_value as *const <T::SignedIntType as ArrowPrimitiveType>::Native
452 as *const u8,
453 std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>(),
454 )
455 };
456 result.extend_from_slice(ref_value_bytes);
457
458 let exponents_starting_loc = (result.len() + 7) & !7;
459 while result.len() < exponents_starting_loc {
461 result.push(0);
462 }
463
464 let exponent_e_bytes =
465 unsafe { std::slice::from_raw_parts(&self.exponent.e as *const u8, 1) };
466 let exponent_f_bytes =
467 unsafe { std::slice::from_raw_parts(&self.exponent.f as *const u8, 1) };
468 result.extend_from_slice(exponent_e_bytes);
470 result.extend_from_slice(exponent_f_bytes);
471 for _i in 0..6 {
472 result.push(0);
473 }
474
475 let patch_length = self.patch_indices.len() as u64;
477
478 let patch_length_bytes = unsafe {
479 std::slice::from_raw_parts(
480 &patch_length as *const u64 as *const u8,
481 std::mem::size_of::<u64>(),
482 )
483 };
484
485 result.extend_from_slice(patch_length_bytes);
487
488 if !self.patch_indices.is_empty() {
489 let patch_indices_bytes = unsafe {
490 std::slice::from_raw_parts(
491 self.patch_indices.as_ptr() as *const u8,
492 std::mem::size_of::<u64>() * self.patch_indices.len(),
493 )
494 };
495
496 result.extend_from_slice(patch_indices_bytes);
498
499 let patch_values_bytes = unsafe {
501 std::slice::from_raw_parts(
502 self.patch_values.as_ptr() as *const u8,
503 std::mem::size_of::<T::Native>() * self.patch_indices.len(),
504 )
505 };
506 result.extend_from_slice(patch_values_bytes);
507 }
508 let padding = ((result.len() + 7) & !7) - result.len();
509
510 for _i in 0..padding {
512 result.push(0);
513 }
514
515 self.bit_packed.to_bytes(&mut result);
517
518 result
519 }
520
521 pub fn from_bytes(bytes: Bytes) -> Self {
523 let header = LiquidIPCHeader::from_bytes(&bytes);
524
525 let physical_id = header.physical_type_id;
527 assert_eq!(physical_id, get_physical_type_id::<T>());
528 let logical_id = header.logical_type_id;
529 assert_eq!(logical_id, LiquidDataType::Float as u16);
530
531 let ref_value_ptr = &bytes[LiquidIPCHeader::size()];
533 let reference_value = unsafe {
534 (ref_value_ptr as *const u8 as *const <T::SignedIntType as ArrowPrimitiveType>::Native)
535 .read_unaligned()
536 };
537
538 let mut next = ((LiquidIPCHeader::size()
540 + std::mem::size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>())
541 + 7)
542 & !7;
543
544 let exponent_e = bytes[next];
546 let exponent_f = bytes[next + 1];
547 next += 8;
548
549 let mut patch_length = 0u64;
551 patch_length |= bytes[next] as u64;
552 patch_length |= (bytes[next + 1] as u64) << 8;
553 patch_length |= (bytes[next + 2] as u64) << 16;
554 patch_length |= (bytes[next + 3] as u64) << 24;
555 patch_length |= (bytes[next + 4] as u64) << 32;
556 patch_length |= (bytes[next + 5] as u64) << 40;
557 patch_length |= (bytes[next + 6] as u64) << 48;
558 patch_length |= (bytes[next + 7] as u64) << 56;
559 next += 8;
560
561 let mut patch_indices = Vec::new();
563 let mut patch_values = Vec::new();
564 if patch_length > 0 {
565 let count = patch_length as usize;
566 let idx_bytes = count * std::mem::size_of::<u64>();
567 let val_bytes = count * std::mem::size_of::<T::Native>();
568
569 let indices_slice = bytes.slice(next..next + idx_bytes);
570 next += idx_bytes;
571 patch_indices = unsafe {
572 let ptr = indices_slice.as_ptr() as *const u64;
573 std::slice::from_raw_parts(ptr, count).to_vec()
574 };
575
576 let values_slice = bytes.slice(next..next + val_bytes);
577 next += val_bytes;
578 patch_values = unsafe {
579 let ptr = values_slice.as_ptr() as *const T::Native;
580 std::slice::from_raw_parts(ptr, count).to_vec()
581 };
582 }
583
584 next = (next + 7) & !7;
586
587 let bit_packed = BitPackedArray::<T::UnsignedIntType>::from_bytes(bytes.slice(next..));
588
589 Self {
590 exponent: Exponents {
591 e: exponent_e,
592 f: exponent_f,
593 },
594 bit_packed,
595 patch_indices,
596 patch_values,
597 reference_value,
598 squeeze_policy: FloatSqueezePolicy::Quantize,
599 }
600 }
601}
602
603#[derive(Debug, Copy, Clone, PartialEq, Eq)]
604pub struct Exponents {
605 pub(crate) e: u8,
606 pub(crate) f: u8,
607}
608
609fn encode_arrow_array<T: LiquidFloatType>(
610 arrow_array: &PrimitiveArray<T>,
611 exp: &Exponents, ) -> LiquidFloatArray<T> {
613 let mut patch_indices: Vec<u64> = Vec::new();
614 let mut patch_values: Vec<T::Native> = Vec::new();
615 let mut patch_count: usize = 0;
616 let mut fill_value: Option<<T::SignedIntType as ArrowPrimitiveType>::Native> = None;
617 let values = arrow_array.values();
618 let nulls = arrow_array.nulls();
619
620 if arrow_array.null_count() == arrow_array.len() {
622 return LiquidFloatArray::<T> {
623 bit_packed: BitPackedArray::new_null_array(arrow_array.len()),
624 exponent: Exponents { e: 0, f: 0 },
625 patch_indices: Vec::new(),
626 patch_values: Vec::new(),
627 reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native::ZERO,
628 squeeze_policy: FloatSqueezePolicy::Quantize,
629 };
630 }
631
632 let mut encoded_values = Vec::with_capacity(arrow_array.len());
633 for v in values.iter() {
634 let encoded = T::encode_single_unchecked(&v.as_(), exp);
635 let decoded = T::decode_single(&encoded, exp);
636 let neq = !decoded.eq(&v.as_()) as usize;
638 patch_count += neq;
639 encoded_values.push(encoded);
640 }
641
642 if patch_count > 0 {
643 patch_indices.resize_with(patch_count + 1, Default::default);
644 patch_values.resize_with(patch_count + 1, Default::default);
645 let mut patch_index: usize = 0;
646
647 for i in 0..encoded_values.len() {
648 let decoded = T::decode_single(&encoded_values[i], exp);
649 patch_indices[patch_index] = i.as_();
650 patch_values[patch_index] = arrow_array.value(i).as_();
651 patch_index += !(decoded.eq(&values[i].as_())) as usize;
652 }
653 assert_eq!(patch_index, patch_count);
654 unsafe {
655 patch_indices.set_len(patch_count);
656 patch_values.set_len(patch_count);
657 }
658 }
659
660 if patch_count > 0 && patch_count < arrow_array.len() {
663 for i in 0..encoded_values.len() {
664 if i >= patch_indices.len() || patch_indices[i] != i as u64 {
665 fill_value = encoded_values.get(i).copied();
666 break;
667 }
668 }
669 }
670
671 if let Some(fill_value) = fill_value {
674 for patch_idx in &patch_indices {
676 encoded_values[*patch_idx as usize] = fill_value;
677 }
678 }
679
680 let min = *encoded_values
681 .iter()
682 .min()
683 .expect("`encoded_values` shouldn't be all nulls");
684 let max = *encoded_values
685 .iter()
686 .max()
687 .expect("`encoded_values` shouldn't be all nulls");
688 let sub: <T::UnsignedIntType as ArrowPrimitiveType>::Native = max.sub_wrapping(min).as_();
689
690 let unsigned_encoded_values = encoded_values
691 .iter()
692 .map(|v| {
693 let k: <T::UnsignedIntType as ArrowPrimitiveType>::Native = v.sub_wrapping(min).as_();
694 k
695 })
696 .collect::<Vec<_>>();
697 let encoded_output = PrimitiveArray::<<T as LiquidFloatType>::UnsignedIntType>::new(
698 ScalarBuffer::from(unsigned_encoded_values),
699 nulls.cloned(),
700 );
701
702 let bit_width = get_bit_width(sub.as_());
703 let bit_packed_array = BitPackedArray::from_primitive(encoded_output, bit_width);
704
705 LiquidFloatArray::<T> {
706 bit_packed: bit_packed_array,
707 exponent: *exp,
708 patch_indices,
709 patch_values,
710 reference_value: min,
711 squeeze_policy: FloatSqueezePolicy::Quantize,
712 }
713}
714
715fn get_best_exponents<T: LiquidFloatType>(arrow_array: &PrimitiveArray<T>) -> Exponents {
716 let mut best_exponents = Exponents { e: 0, f: 0 };
717 let mut min_encoded_size: usize = usize::MAX;
718
719 let sample_arrow_array: Option<PrimitiveArray<T>> =
720 (arrow_array.len() > NUM_SAMPLES).then(|| {
721 arrow_array
722 .iter()
723 .step_by(arrow_array.len() / NUM_SAMPLES)
724 .filter(|s| s.is_some())
725 .collect()
726 });
727
728 for e in 0..T::MAX_EXPONENT {
729 for f in 0..e {
730 let exp = Exponents { e, f };
731 let liquid_array =
732 encode_arrow_array(sample_arrow_array.as_ref().unwrap_or(arrow_array), &exp);
733 if liquid_array.get_array_memory_size() < min_encoded_size {
734 best_exponents = exp;
735 min_encoded_size = liquid_array.get_array_memory_size();
736 }
737 }
738 }
739 best_exponents
740}
741
742#[derive(Debug)]
743struct LiquidFloatQuantizedArray<T: LiquidFloatType> {
744 exponent: Exponents,
745 quantized: BitPackedArray<T::UnsignedIntType>,
746 reference_value: <T::SignedIntType as ArrowPrimitiveType>::Native,
747 bucket_width: u8, disk_range: std::ops::Range<u64>,
749 io: Arc<dyn SqueezeIoHandler>,
750 patch_indices: Vec<u64>,
751 patch_values: Vec<T::Native>,
752}
753
754impl<T> LiquidFloatQuantizedArray<T>
755where
756 T: LiquidFloatType,
757{
758 #[allow(dead_code)]
759 fn as_any(&self) -> &dyn Any {
760 self
761 }
762
763 #[inline]
764 fn len(&self) -> usize {
765 self.quantized.len()
766 }
767
768 fn new_from_filtered(
769 &self,
770 filtered: PrimitiveArray<<T as LiquidFloatType>::UnsignedIntType>,
771 ) -> Self {
772 let bit_width = self
773 .quantized
774 .bit_width()
775 .expect("quantized bit width must exist");
776 let quantized = BitPackedArray::from_primitive(filtered, bit_width);
777 Self {
778 exponent: self.exponent,
779 quantized,
780 reference_value: self.reference_value,
781 bucket_width: self.bucket_width,
782 io: self.io.clone(),
783 patch_indices: self.patch_indices.clone(),
784 patch_values: self.patch_values.clone(),
785 disk_range: self.disk_range.clone(),
786 }
787 }
788
789 fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
790 let q_prim: PrimitiveArray<T::UnsignedIntType> = self.quantized.to_primitive();
791 let selection = BooleanArray::new(selection.clone(), None);
792 let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
793 let filtered = filtered.as_primitive::<T::UnsignedIntType>().clone();
794 self.new_from_filtered(filtered)
795 }
796
797 async fn hydrate_full_arrow(&self) -> ArrayRef {
798 let bytes = self
799 .io
800 .read(Some(self.disk_range.clone()))
801 .await
802 .expect("read squeezed backing");
803 let liquid = crate::liquid_array::ipc::read_from_bytes(
804 bytes,
805 &crate::liquid_array::ipc::LiquidIPCContext::new(None),
806 );
807 liquid.to_arrow_array()
808 }
809
810 #[inline]
811 fn handle_eq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
812 if k < lo || k > hi { Some(false) } else { None }
813 }
814
815 #[inline]
816 fn handle_neq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
817 if k < lo || k > hi { Some(true) } else { None }
818 }
819
820 #[inline]
821 fn handle_lt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
822 if k <= lo {
823 Some(false)
824 } else if hi < k {
825 Some(true)
826 } else {
827 None
828 }
829 }
830
831 #[inline]
832 fn handle_lteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
833 if k < lo {
834 Some(false)
835 } else if hi <= k {
836 Some(true)
837 } else {
838 None
839 }
840 }
841
842 #[inline]
843 fn handle_gt(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
844 if k < lo {
845 Some(true)
846 } else if hi <= k {
847 Some(false)
848 } else {
849 None
850 }
851 }
852
853 #[inline]
854 fn handle_gteq(lo: T::Native, hi: T::Native, k: T::Native) -> Option<bool> {
855 if k <= lo {
856 Some(true)
857 } else if hi < k {
858 Some(false)
859 } else {
860 None
861 }
862 }
863
864 fn try_eval_predicate_inner(
865 &self,
866 op: &Operator,
867 literal: &Literal,
868 ) -> SqueezeResult<Option<BooleanArray>> {
869 let k_opt: Option<T::Native> = match literal.value() {
871 ScalarValue::Int8(Some(v)) => T::Native::from_i8(*v),
872 ScalarValue::Int16(Some(v)) => T::Native::from_i16(*v),
873 ScalarValue::Int32(Some(v)) => T::Native::from_i32(*v),
874 ScalarValue::Int64(Some(v)) => T::Native::from_i64(*v),
875 ScalarValue::UInt8(Some(v)) => T::Native::from_u8(*v),
876 ScalarValue::UInt16(Some(v)) => T::Native::from_u16(*v),
877 ScalarValue::UInt32(Some(v)) => T::Native::from_u32(*v),
878 ScalarValue::UInt64(Some(v)) => T::Native::from_u64(*v),
879 ScalarValue::Date32(Some(v)) => T::Native::from_i32(*v),
880 ScalarValue::Date64(Some(v)) => T::Native::from_i64(*v),
881 ScalarValue::Float32(Some(v)) => T::Native::from_f32(*v),
882 ScalarValue::Float64(Some(v)) => T::Native::from_f64(*v),
883 _ => None,
884 };
885 let Some(k) = k_opt else { return Ok(None) };
886 let q_prim = self.quantized.to_primitive();
887 let (_dt, values, _nulls) = q_prim.into_parts();
888
889 let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
890 let mut next_patch_index = 0;
891 let mut ignore_patches = false;
892 if self.patch_indices.is_empty() {
893 ignore_patches = true;
894 }
895 let comp_fn = match op {
896 Operator::Eq => Self::handle_eq,
897 Operator::NotEq => Self::handle_neq,
898 Operator::Lt => Self::handle_lt,
899 Operator::LtEq => Self::handle_lteq,
900 Operator::Gt => Self::handle_gt,
901 Operator::GtEq => Self::handle_gteq,
902 };
903 for (i, &b) in values.iter().enumerate() {
905 if let Some(nulls) = self.quantized.nulls()
906 && !nulls.is_valid(i)
907 {
908 out_vals.push(false);
909 continue;
910 }
911 if !ignore_patches && i as u64 == self.patch_indices[next_patch_index] {
912 next_patch_index += 1;
913 if next_patch_index == self.patch_indices.len() {
914 ignore_patches = true;
915 }
916 out_vals.push(false);
917 continue;
918 }
919
920 let val: <T::SignedIntType as ArrowPrimitiveType>::Native = b.as_();
921 let lo = (val << self.bucket_width).add_wrapping(self.reference_value);
922 let hi = ((val.add_wrapping(1i32.into())) << self.bucket_width)
923 .add_wrapping(self.reference_value);
924 let val_lower = T::decode_single(&lo, &self.exponent);
925 let val_higher = T::decode_single(&hi, &self.exponent);
926
927 let decided = comp_fn(val_lower, val_higher, k);
928 if let Some(v) = decided {
929 out_vals.push(v);
930 } else {
931 return Err(NeedsBacking);
932 }
933 }
934
935 for (idx, patch_idx) in self.patch_indices.iter().enumerate() {
938 let patch_value = self.patch_values[idx];
939 out_vals[*patch_idx as usize] = match op {
940 Operator::Eq => patch_value == k,
941 Operator::NotEq => patch_value != k,
942 Operator::Lt => patch_value < k,
943 Operator::LtEq => patch_value <= k,
944 Operator::Gt => patch_value > k,
945 Operator::GtEq => patch_value >= k,
946 }
947 }
948
949 let bool_buf = arrow::buffer::BooleanBuffer::from_iter(out_vals);
950 let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
951 Ok(Some(out))
952 }
953}
954
955#[async_trait::async_trait]
956impl<T> LiquidSqueezedArray for LiquidFloatQuantizedArray<T>
957where
958 T: LiquidFloatType,
959{
960 fn as_any(&self) -> &dyn Any {
961 self
962 }
963
964 fn get_array_memory_size(&self) -> usize {
965 self.quantized.get_array_memory_size()
966 + size_of::<Exponents>()
967 + self.patch_indices.capacity() * size_of::<u64>()
968 + self.patch_values.capacity() * size_of::<T::Native>()
969 + size_of::<<T::SignedIntType as ArrowPrimitiveType>::Native>()
970 }
971
972 fn len(&self) -> usize {
973 LiquidFloatQuantizedArray::<T>::len(self)
974 }
975
976 async fn to_arrow_array(&self) -> ArrayRef {
977 self.hydrate_full_arrow().await
978 }
979
980 fn data_type(&self) -> LiquidDataType {
981 LiquidDataType::Float
982 }
983
984 fn original_arrow_data_type(&self) -> DataType {
985 T::DATA_TYPE.clone()
986 }
987
988 fn disk_backing(&self) -> SqueezedBacking {
989 SqueezedBacking::Liquid((self.disk_range.end - self.disk_range.start) as usize)
990 }
991
992 async fn try_eval_predicate(
993 &self,
994 liquid_expr: &LiquidExpr,
995 filter: &BooleanBuffer,
996 ) -> BooleanArray {
997 let filtered = self.filter_inner(filter);
999 let expr = liquid_expr.physical_expr();
1000
1001 if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>()
1002 && let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>()
1003 {
1004 let op = binary_expr.op();
1005 let supported_op = Operator::from_datafusion(op);
1006 if let Some(supported_op) = supported_op {
1007 match filtered.try_eval_predicate_inner(&supported_op, literal) {
1008 Ok(Some(mask)) => return mask,
1009 Ok(None) => {
1010 let fallback = self.filter(filter).await;
1011 return eval_predicate_on_array(fallback, liquid_expr);
1012 }
1013 Err(NeedsBacking) => {}
1014 }
1015
1016 use arrow::array::cast::AsArray;
1018
1019 let full = self.hydrate_full_arrow().await;
1020 let selection_array = BooleanArray::new(filter.clone(), None);
1021 let filtered_arr = arrow::compute::filter(&full, &selection_array)
1022 .expect("selection must match array length");
1023 let filtered_len = filtered_arr.len();
1024
1025 let lhs = ColumnarValue::Array(filtered_arr);
1026 let rhs = ColumnarValue::Scalar(literal.value().clone());
1027 let result = match op {
1028 DFOperator::NotEq => apply_cmp(DFOperator::NotEq, &lhs, &rhs),
1029 DFOperator::Eq => apply_cmp(DFOperator::Eq, &lhs, &rhs),
1030 DFOperator::Lt => apply_cmp(DFOperator::Lt, &lhs, &rhs),
1031 DFOperator::LtEq => apply_cmp(DFOperator::LtEq, &lhs, &rhs),
1032 DFOperator::Gt => apply_cmp(DFOperator::Gt, &lhs, &rhs),
1033 DFOperator::GtEq => apply_cmp(DFOperator::GtEq, &lhs, &rhs),
1034 _ => {
1035 let fallback = self.filter(filter).await;
1036 return eval_predicate_on_array(fallback, liquid_expr);
1037 }
1038 };
1039 let result = result.expect("validated LiquidExpr comparison must evaluate");
1040 return result
1041 .into_array(filtered_len)
1042 .expect("comparison output must be an array")
1043 .as_boolean()
1044 .clone();
1045 }
1046 }
1047 let fallback = self.filter(filter).await;
1048 eval_predicate_on_array(fallback, liquid_expr)
1049 }
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use datafusion_expr_common::operator::Operator;
1055 use datafusion_physical_expr::PhysicalExpr;
1056 use futures::executor::block_on;
1057 use rand::{RngExt as _, SeedableRng as _, distr::uniform::SampleUniform, rngs::StdRng};
1058
1059 use crate::cache::TestSqueezeIo;
1060
1061 use super::*;
1062
1063 macro_rules! test_roundtrip {
1064 ($test_name: ident, $type:ty, $values: expr) => {
1065 #[test]
1066 fn $test_name() {
1067 let original: Vec<Option<<$type as ArrowPrimitiveType>::Native>> = $values;
1068 let array = PrimitiveArray::<$type>::from(original.clone());
1069
1070 let liquid_array = LiquidFloatArray::<$type>::from_arrow_array(array.clone());
1072 let result_array = liquid_array.to_arrow_array();
1073 let bytes_array =
1074 LiquidFloatArray::<$type>::from_bytes(liquid_array.to_bytes().into());
1075
1076 assert_eq!(result_array.as_ref(), &array);
1077 assert_eq!(bytes_array.to_arrow_array().as_ref(), &array);
1078 }
1079 };
1080 }
1081
1082 test_roundtrip!(
1084 test_float32_roundtrip_basic,
1085 Float32Type,
1086 vec![Some(-1.0), Some(1.0), Some(0.0)]
1087 );
1088
1089 test_roundtrip!(
1090 test_float32_roundtrip_with_nones,
1091 Float32Type,
1092 vec![Some(-1.0), Some(1.0), Some(0.0), None]
1093 );
1094
1095 test_roundtrip!(
1096 test_float32_roundtrip_all_nones,
1097 Float32Type,
1098 vec![None, None, None, None]
1099 );
1100
1101 test_roundtrip!(test_float32_roundtrip_empty, Float32Type, vec![]);
1102
1103 test_roundtrip!(
1105 test_float64_roundtrip_basic,
1106 Float64Type,
1107 vec![Some(-1.0), Some(1.0), Some(0.0)]
1108 );
1109
1110 test_roundtrip!(
1111 test_float64_roundtrip_with_nones,
1112 Float64Type,
1113 vec![Some(-1.0), Some(1.0), Some(0.0), None]
1114 );
1115
1116 test_roundtrip!(
1117 test_float64_roundtrip_all_nones,
1118 Float64Type,
1119 vec![None, None, None, None]
1120 );
1121
1122 test_roundtrip!(test_float64_roundtrip_empty, Float64Type, vec![]);
1123
1124 #[test]
1126 fn test_filter_basic() {
1127 let original = vec![Some(1.0), Some(2.1), Some(3.2), None, Some(5.5)];
1129 let array = PrimitiveArray::<Float32Type>::from(original);
1130 let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1131
1132 let selection = BooleanBuffer::from(vec![true, false, true, false, true]);
1134
1135 let result_array = liquid_array.filter(&selection);
1137
1138 let expected = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(3.2), Some(5.5)]);
1140
1141 assert_eq!(result_array.as_ref(), &expected);
1142 }
1143
1144 #[test]
1145 fn test_original_arrow_data_type_returns_float32() {
1146 let array = PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.5)]);
1147 let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1148 assert_eq!(liquid.original_arrow_data_type(), DataType::Float32);
1149 }
1150
1151 #[test]
1152 fn test_filter_all_nulls() {
1153 let original = vec![None, None, None, None];
1155 let array = PrimitiveArray::<Float32Type>::from(original);
1156 let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1157
1158 let selection = BooleanBuffer::from(vec![true, false, false, true]);
1160
1161 let result_array = liquid_array.filter(&selection);
1162
1163 let expected = PrimitiveArray::<Float32Type>::from(vec![None, None]);
1164
1165 assert_eq!(result_array.as_ref(), &expected);
1166 }
1167
1168 #[test]
1169 fn test_filter_empty_result() {
1170 let original = vec![Some(1.0), Some(2.1), Some(3.3)];
1171 let array = PrimitiveArray::<Float32Type>::from(original);
1172 let liquid_array = LiquidFloatArray::<Float32Type>::from_arrow_array(array);
1173
1174 let selection = BooleanBuffer::from(vec![false, false, false]);
1176
1177 let result_array = liquid_array.filter(&selection);
1178
1179 assert_eq!(result_array.len(), 0);
1180 }
1181
1182 #[test]
1183 fn test_compression_f32_f64() {
1184 fn run_compression_test<T: LiquidFloatType>(
1185 type_name: &str,
1186 data_fn: impl Fn(usize) -> T::Native,
1187 ) {
1188 let original: Vec<T::Native> = (0..2000).map(data_fn).collect();
1189 let array = PrimitiveArray::<T>::from_iter_values(original);
1190 let uncompressed_size = array.get_array_memory_size();
1191
1192 let liquid_array = LiquidFloatArray::<T>::from_arrow_array(array);
1193 let compressed_size = liquid_array.get_array_memory_size();
1194
1195 println!(
1196 "Type: {type_name}, uncompressed_size: {uncompressed_size}, compressed_size: {compressed_size}"
1197 );
1198 assert!(
1200 compressed_size < uncompressed_size,
1201 "{type_name} compression failed to reduce size"
1202 );
1203 }
1204
1205 run_compression_test::<Float32Type>("f32", |i| i as f32);
1207
1208 run_compression_test::<Float64Type>("f64", |i| i as f64);
1210 }
1211
1212 fn make_f_array_with_range<T>(
1214 len: usize,
1215 base_min: T::Native,
1216 range: T::Native,
1217 null_prob: f32,
1218 rng: &mut StdRng,
1219 ) -> PrimitiveArray<T>
1220 where
1221 T: LiquidFloatType,
1222 <T as arrow::array::ArrowPrimitiveType>::Native: SampleUniform,
1223 PrimitiveArray<T>: From<Vec<Option<<T as ArrowPrimitiveType>::Native>>>,
1224 {
1225 let mut vals: Vec<Option<T::Native>> = Vec::with_capacity(len);
1226 for _ in 0..len {
1227 if rng.random_bool(null_prob as f64) {
1228 vals.push(None);
1229 } else {
1230 vals.push(Some(rng.random_range(base_min..(base_min + range))));
1231 }
1232 }
1233 PrimitiveArray::<T>::from(vals)
1234 }
1235
1236 #[test]
1237 fn hybrid_squeeze_unsqueezable_small_range() {
1238 let mut rng = StdRng::seed_from_u64(0x51_71);
1239 let arr = make_f_array_with_range::<Float32Type>(64, 10_000.0, 100.0, 0.1, &mut rng);
1240 let liquid = LiquidFloatArray::<Float32Type>::from_arrow_array(arr);
1241 assert!(
1242 liquid
1243 .squeeze(Arc::new(TestSqueezeIo::default()), None)
1244 .is_none()
1245 );
1246 }
1247
1248 #[test]
1249 fn hybrid_squeeze_full_read_roundtrip_f32() {
1250 let mut rng = StdRng::seed_from_u64(0x51_72);
1251 let arr = make_f_array_with_range::<Float32Type>(
1252 2000,
1253 -50_000.0,
1254 (1 << 16) as f32,
1255 0.1,
1256 &mut rng,
1257 );
1258 let liq = LiquidFloatArray::<Float32Type>::from_arrow_array(arr.clone());
1259 let bytes_baseline = liq.to_bytes();
1260 let io = Arc::new(TestSqueezeIo::default());
1261 let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1262 io.set_bytes(bytes.clone());
1263 let recovered = LiquidFloatArray::<Float32Type>::from_bytes(bytes.clone());
1265 assert_eq!(
1266 recovered.to_arrow_array().as_primitive::<Float32Type>(),
1267 &arr
1268 );
1269 assert_eq!(bytes_baseline, recovered.to_bytes());
1270
1271 let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1272 let mask = BooleanBuffer::from(vec![true; arr.len()]);
1273 let build_expr = |op: Operator, k: f32| -> Arc<dyn PhysicalExpr> {
1274 let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(k))));
1275 Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1276 };
1277
1278 let resolvable_cases: Vec<(Operator, f32, bool)> = vec![
1280 (Operator::Eq, min - 1.0, false), (Operator::NotEq, min - 1.0, true), (Operator::Lt, min, false), (Operator::LtEq, min - 1.0, false), (Operator::Gt, min - 1.0, true), (Operator::GtEq, min, true), ];
1287
1288 for (op, k, expected_const) in resolvable_cases {
1289 let expr = build_expr(op, k);
1290 io.reset_reads();
1291 let got = block_on(hybrid.try_eval_predicate(
1292 &crate::cache::LiquidExpr::new_unchecked(expr.clone()),
1293 &mask,
1294 ));
1295 let expected = {
1296 let vals: Vec<Option<bool>> = (0..arr.len())
1297 .map(|i| {
1298 if arr.is_null(i) {
1299 None
1300 } else {
1301 Some(expected_const)
1302 }
1303 })
1304 .collect();
1305 BooleanArray::from(vals)
1306 };
1307 assert_eq!(io.reads(), 0);
1308 assert_eq!(got, expected);
1309 }
1310
1311 let k_present = (0..arr.len())
1313 .find_map(|i| {
1314 if arr.is_null(i) {
1315 None
1316 } else {
1317 Some(arr.value(i))
1318 }
1319 })
1320 .unwrap();
1321 let expr_eq_present = build_expr(Operator::Eq, k_present);
1322 io.reset_reads();
1323 let got = block_on(hybrid.try_eval_predicate(
1324 &crate::cache::LiquidExpr::new_unchecked(expr_eq_present.clone()),
1325 &mask,
1326 ));
1327 let expected = {
1328 let vals: Vec<Option<bool>> = (0..arr.len())
1329 .map(|i| {
1330 if arr.is_null(i) {
1331 None
1332 } else {
1333 Some(arr.value(i) == k_present)
1334 }
1335 })
1336 .collect();
1337 BooleanArray::from(vals)
1338 };
1339 assert!(io.reads() > 0);
1340 assert_eq!(got, expected);
1341 }
1342
1343 #[test]
1344 fn hybrid_squeeze_full_read_roundtrip_f64() {
1345 let mut rng = StdRng::seed_from_u64(0x51_72);
1346 let arr = make_f_array_with_range::<Float64Type>(
1347 2000,
1348 -50_000.0f64,
1349 (1 << 16) as f64,
1350 0.1,
1351 &mut rng,
1352 );
1353 let liq = LiquidFloatArray::<Float64Type>::from_arrow_array(arr.clone());
1354 let bytes_baseline = liq.to_bytes();
1355 let io = Arc::new(TestSqueezeIo::default());
1356 let (hybrid, bytes) = liq.squeeze(io.clone(), None).expect("squeezable");
1357 io.set_bytes(bytes.clone());
1358 let recovered = LiquidFloatArray::<Float64Type>::from_bytes(bytes.clone());
1360 assert_eq!(
1361 recovered.to_arrow_array().as_primitive::<Float64Type>(),
1362 &arr
1363 );
1364 assert_eq!(bytes_baseline, recovered.to_bytes());
1365
1366 let min = arrow::compute::kernels::aggregate::min(&arr).unwrap();
1367 let mask = BooleanBuffer::from(vec![true; arr.len()]);
1368 let build_expr = |op: Operator, k: f64| -> Arc<dyn PhysicalExpr> {
1369 let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(k))));
1370 Arc::new(BinaryExpr::new(lit.clone(), op, lit))
1371 };
1372
1373 let resolvable_cases: Vec<(Operator, f64, bool)> = vec![
1375 (Operator::Eq, min - 1.0, false), (Operator::NotEq, min - 1.0, true), (Operator::Lt, min, false), (Operator::LtEq, min - 1.0, false), (Operator::Gt, min - 1.0, true), (Operator::GtEq, min, true), ];
1382
1383 for (op, k, expected_const) in resolvable_cases {
1384 let expr = build_expr(op, k);
1385 io.reset_reads();
1386 let got = block_on(hybrid.try_eval_predicate(
1387 &crate::cache::LiquidExpr::new_unchecked(expr.clone()),
1388 &mask,
1389 ));
1390 let expected = {
1391 let vals: Vec<Option<bool>> = (0..arr.len())
1392 .map(|i| {
1393 if arr.is_null(i) {
1394 None
1395 } else {
1396 Some(expected_const)
1397 }
1398 })
1399 .collect();
1400 BooleanArray::from(vals)
1401 };
1402 assert_eq!(io.reads(), 0);
1403 assert_eq!(got, expected);
1404 }
1405
1406 let k_present = (0..arr.len())
1408 .find_map(|i| {
1409 if arr.is_null(i) {
1410 None
1411 } else {
1412 Some(arr.value(i))
1413 }
1414 })
1415 .unwrap();
1416 let expr_eq_present = build_expr(Operator::Eq, k_present);
1417 io.reset_reads();
1418 let got = block_on(hybrid.try_eval_predicate(
1419 &crate::cache::LiquidExpr::new_unchecked(expr_eq_present.clone()),
1420 &mask,
1421 ));
1422 let expected = {
1423 let vals: Vec<Option<bool>> = (0..arr.len())
1424 .map(|i| {
1425 if arr.is_null(i) {
1426 None
1427 } else {
1428 Some(arr.value(i) == k_present)
1429 }
1430 })
1431 .collect();
1432 BooleanArray::from(vals)
1433 };
1434 assert!(io.reads() > 0);
1435 assert_eq!(got, expected);
1436 }
1437}