1use std::any::Any;
2use std::mem::size_of;
3use std::num::NonZero;
4use std::sync::Arc;
5
6use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
7use arrow::buffer::{BooleanBuffer, ScalarBuffer};
8use arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType, UInt64Type, i256};
9use arrow_schema::DataType;
10use bytes::Bytes;
11use datafusion_common::ScalarValue;
12use datafusion_expr_common::columnar_value::ColumnarValue;
13use datafusion_expr_common::operator::Operator as DFOperator;
14use datafusion_physical_expr::PhysicalExpr;
15use datafusion_physical_expr::expressions::{
16 BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal,
17};
18use datafusion_physical_expr_common::datum::apply_cmp;
19use num_traits::ToPrimitive;
20
21use super::{
22 LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking,
23 Operator, SqueezeIoHandler, SqueezeResult, SqueezedBacking,
24};
25use crate::cache::{CacheExpression, LiquidExpr};
26use crate::liquid_array::eval_predicate_on_array;
27use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
28use crate::liquid_array::raw::BitPackedArray;
29use crate::utils::get_bit_width;
30
31#[derive(Debug, Clone, Copy)]
32struct DecimalMeta {
33 precision: u8,
34 scale: i8,
35 is_256: bool,
36}
37
38impl DecimalMeta {
39 fn from_data_type(data_type: &DataType) -> Self {
40 match data_type {
41 DataType::Decimal128(precision, scale) => Self {
42 precision: *precision,
43 scale: *scale,
44 is_256: false,
45 },
46 DataType::Decimal256(precision, scale) => Self {
47 precision: *precision,
48 scale: *scale,
49 is_256: true,
50 },
51 _ => panic!("unsupported decimal data type: {data_type:?}"),
52 }
53 }
54
55 fn data_type(&self) -> DataType {
56 if self.is_256 {
57 DataType::Decimal256(self.precision, self.scale)
58 } else {
59 DataType::Decimal128(self.precision, self.scale)
60 }
61 }
62
63 fn arrow_code(&self) -> u8 {
64 if self.is_256 { 1 } else { 0 }
65 }
66}
67
68#[repr(C)]
69struct DecimalArrayHeader {
70 arrow_type: u8, precision: u8,
72 scale: i8,
73 __padding: u8,
74 __reserved: u32,
75}
76
77impl DecimalArrayHeader {
78 const fn size() -> usize {
79 8
80 }
81
82 fn from_meta(meta: DecimalMeta) -> Self {
83 Self {
84 arrow_type: meta.arrow_code(),
85 precision: meta.precision,
86 scale: meta.scale,
87 __padding: 0,
88 __reserved: 0,
89 }
90 }
91
92 fn to_bytes(&self) -> [u8; Self::size()] {
93 let mut bytes = [0; Self::size()];
94 bytes[0] = self.arrow_type;
95 bytes[1] = self.precision;
96 bytes[2] = self.scale as u8;
97 bytes
98 }
99
100 fn from_bytes(bytes: &[u8]) -> Self {
101 if bytes.len() < Self::size() {
102 panic!(
103 "value too small for DecimalArrayHeader, expected at least {} bytes, got {}",
104 Self::size(),
105 bytes.len()
106 );
107 }
108 Self {
109 arrow_type: bytes[0],
110 precision: bytes[1],
111 scale: bytes[2] as i8,
112 __padding: 0,
113 __reserved: 0,
114 }
115 }
116}
117
118#[derive(Debug)]
120pub struct LiquidDecimalArray {
121 meta: DecimalMeta,
122 bit_packed: BitPackedArray<UInt64Type>,
123 reference_value: u64,
124}
125
126impl LiquidDecimalArray {
127 pub(crate) fn fits_u64<T: DecimalType>(array: &PrimitiveArray<T>) -> bool
128 where
129 T::Native: ToPrimitive,
130 {
131 array.iter().flatten().all(|v| v.to_u64().is_some())
132 }
133
134 pub(crate) fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self
135 where
136 T::Native: ToPrimitive,
137 {
138 debug_assert!(Self::fits_u64(array));
139 let meta = DecimalMeta::from_data_type(array.data_type());
140 if array.null_count() == array.len() {
141 return Self {
142 meta,
143 bit_packed: BitPackedArray::new_null_array(array.len()),
144 reference_value: 0,
145 };
146 }
147
148 let nulls = array.nulls().cloned();
149 let mut min = u64::MAX;
150 let mut max = 0u64;
151 let values: Vec<u64> = array
152 .iter()
153 .map(|v| match v {
154 Some(v) => {
155 let value = v.to_u64().expect("decimal fits u64");
156 if value < min {
157 min = value;
158 }
159 if value > max {
160 max = value;
161 }
162 value
163 }
164 None => 0,
165 })
166 .collect();
167
168 let bit_width = get_bit_width(max - min);
169 let offsets = ScalarBuffer::from_iter(values.iter().map(|v| v.saturating_sub(min)));
170 let unsigned_array = PrimitiveArray::<UInt64Type>::new(offsets, nulls);
171 let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
172
173 Self {
174 meta,
175 bit_packed,
176 reference_value: min,
177 }
178 }
179
180 fn bit_pack_starting_loc() -> usize {
181 let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
182 (header_size + size_of::<u64>() + 7) & !7
183 }
184
185 fn to_u64_array(&self) -> PrimitiveArray<UInt64Type> {
186 let unsigned_array = self.bit_packed.to_primitive();
187 let (_data_type, values, _nulls) = unsigned_array.into_parts();
188 let nulls = self.bit_packed.nulls();
189 let values = if self.reference_value != 0 {
190 let reference_value = self.reference_value;
191 ScalarBuffer::from_iter(values.iter().map(|v| v.wrapping_add(reference_value)))
192 } else {
193 values
194 };
195 PrimitiveArray::<UInt64Type>::new(values, nulls.cloned())
196 }
197
198 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
199 let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
200 let mut result = Vec::with_capacity(Self::bit_pack_starting_loc() + 256);
201 result.resize(header_size, 0);
202
203 let logical_type_id = LiquidDataType::Decimal as u16;
204 let physical_type_id = get_physical_type_id::<UInt64Type>();
205 let ipc_header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
206 result[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
207
208 let decimal_header = DecimalArrayHeader::from_meta(self.meta);
209 result[LiquidIPCHeader::size()..header_size].copy_from_slice(&decimal_header.to_bytes());
210
211 result.extend_from_slice(&self.reference_value.to_le_bytes());
212 while result.len() < Self::bit_pack_starting_loc() {
213 result.push(0);
214 }
215 self.bit_packed.to_bytes(&mut result);
216 result
217 }
218
219 pub(crate) fn from_bytes(bytes: Bytes) -> Self {
220 let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
221 let header = LiquidIPCHeader::from_bytes(&bytes);
222
223 assert_eq!(header.logical_type_id, LiquidDataType::Decimal as u16);
224 assert_eq!(
225 header.physical_type_id,
226 get_physical_type_id::<UInt64Type>()
227 );
228
229 let decimal_header =
230 DecimalArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
231 let meta = DecimalMeta {
232 precision: decimal_header.precision,
233 scale: decimal_header.scale,
234 is_256: match decimal_header.arrow_type {
235 0 => false,
236 1 => true,
237 _ => panic!(
238 "unsupported decimal type code: {}",
239 decimal_header.arrow_type
240 ),
241 },
242 };
243
244 let ref_start = header_size;
245 let ref_end = ref_start + size_of::<u64>();
246 let reference_value = u64::from_le_bytes(bytes[ref_start..ref_end].try_into().unwrap());
247
248 let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
249 let bit_packed = BitPackedArray::<UInt64Type>::from_bytes(bit_packed_data);
250
251 Self {
252 meta,
253 bit_packed,
254 reference_value,
255 }
256 }
257}
258
259impl LiquidArray for LiquidDecimalArray {
260 fn as_any(&self) -> &dyn Any {
261 self
262 }
263
264 fn get_array_memory_size(&self) -> usize {
265 self.bit_packed.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
266 }
267
268 fn len(&self) -> usize {
269 self.bit_packed.len()
270 }
271
272 fn to_arrow_array(&self) -> ArrayRef {
273 let u64_array = self.to_u64_array();
274 let (_data_type, values, nulls) = u64_array.into_parts();
275 let data_type = self.meta.data_type();
276 if self.meta.is_256 {
277 let values_i256 =
278 ScalarBuffer::from_iter(values.iter().map(|v| i256::from_i128(*v as i128)));
279 let array = PrimitiveArray::<Decimal256Type>::new(values_i256, nulls);
280 Arc::new(array.with_data_type(data_type))
281 } else {
282 let values_i128 = ScalarBuffer::from_iter(values.iter().map(|v| *v as i128));
283 let array = PrimitiveArray::<Decimal128Type>::new(values_i128, nulls);
284 Arc::new(array.with_data_type(data_type))
285 }
286 }
287
288 fn original_arrow_data_type(&self) -> DataType {
289 self.meta.data_type()
290 }
291
292 fn to_bytes(&self) -> Vec<u8> {
293 self.to_bytes_inner()
294 }
295
296 fn data_type(&self) -> LiquidDataType {
297 LiquidDataType::Decimal
298 }
299
300 fn squeeze(
301 &self,
302 io: Arc<dyn SqueezeIoHandler>,
303 expression_hint: Option<&CacheExpression>,
304 ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
305 let _expression_hint = expression_hint?;
306 let full_bytes = Bytes::from(self.to_bytes_inner());
307 let disk_range = 0u64..(full_bytes.len() as u64);
308
309 let orig_bw = self.bit_packed.bit_width()?;
310 if orig_bw.get() < 8 {
311 return None;
312 }
313
314 let new_bw_u8 = NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
315 let unsigned_array = self.bit_packed.to_primitive();
316 let (_dt, values, nulls) = unsigned_array.into_parts();
317
318 let max_offset = values.iter().copied().max().unwrap_or(0);
319 let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
320 let range_size = max_offset.saturating_add(1);
321 let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
322
323 let quantized_values: ScalarBuffer<u64> =
324 ScalarBuffer::from_iter(values.iter().map(|&v| {
325 let mut idx_u64 = v / bucket_width_u64;
326 if idx_u64 >= bucket_count_u64 {
327 idx_u64 = bucket_count_u64 - 1;
328 }
329 idx_u64
330 }));
331 let quantized_unsigned = PrimitiveArray::<UInt64Type>::new(quantized_values, nulls);
332 let quantized_bitpacked = BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
333
334 let hybrid = LiquidDecimalQuantizedArray {
335 quantized: quantized_bitpacked,
336 reference_value: self.reference_value,
337 bucket_width: bucket_width_u64,
338 disk_range,
339 io,
340 meta: self.meta,
341 };
342 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
343 }
344}
345
346#[derive(Debug, Clone)]
347pub(crate) struct LiquidDecimalQuantizedArray {
348 quantized: BitPackedArray<UInt64Type>,
349 reference_value: u64,
350 bucket_width: u64,
351 disk_range: std::ops::Range<u64>,
352 io: Arc<dyn SqueezeIoHandler>,
353 meta: DecimalMeta,
354}
355
356impl LiquidDecimalQuantizedArray {
357 fn len(&self) -> usize {
358 self.quantized.len()
359 }
360
361 fn new_from_filtered(&self, filtered: PrimitiveArray<UInt64Type>) -> Self {
362 let bit_width = self
363 .quantized
364 .bit_width()
365 .expect("quantized bit width must exist");
366 let quantized = BitPackedArray::from_primitive(filtered, bit_width);
367 Self {
368 quantized,
369 reference_value: self.reference_value,
370 bucket_width: self.bucket_width,
371 disk_range: self.disk_range.clone(),
372 io: self.io.clone(),
373 meta: self.meta,
374 }
375 }
376
377 fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
378 let q_prim: PrimitiveArray<UInt64Type> = self.quantized.to_primitive();
379 let selection = BooleanArray::new(selection.clone(), None);
380 let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
381 let filtered = filtered.as_primitive::<UInt64Type>().clone();
382 self.new_from_filtered(filtered)
383 }
384
385 async fn hydrate_full_arrow(&self) -> ArrayRef {
386 let bytes = self
387 .io
388 .read(Some(self.disk_range.clone()))
389 .await
390 .expect("read squeezed backing");
391 let liquid = crate::liquid_array::ipc::read_from_bytes(
392 bytes,
393 &crate::liquid_array::ipc::LiquidIPCContext::new(None),
394 );
395 liquid.to_arrow_array()
396 }
397
398 fn literal_to_u64(&self, literal: &Literal) -> Option<u64> {
399 match literal.value() {
400 ScalarValue::Decimal128(Some(v), _precision, scale) => {
401 if *scale != self.meta.scale {
402 return None;
403 }
404 v.to_u64()
405 }
406 ScalarValue::Decimal256(Some(v), _precision, scale) => {
407 if *scale != self.meta.scale {
408 return None;
409 }
410 v.to_u64()
411 }
412 _ => None,
413 }
414 }
415
416 fn try_eval_predicate_inner(
417 &self,
418 op: &Operator,
419 literal: &Literal,
420 ) -> SqueezeResult<Option<BooleanArray>> {
421 let k = match self.literal_to_u64(literal) {
422 Some(k) => k,
423 None => return Ok(None),
424 };
425
426 let q_prim = self.quantized.to_primitive();
427 let (_dt, values, _nulls) = q_prim.into_parts();
428 let nulls_opt = self.quantized.nulls();
429
430 let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
431
432 let push_const_for_below = |op: &Operator| -> bool {
433 match op {
434 Operator::Eq => false,
435 Operator::NotEq => true,
436 Operator::Lt => false,
437 Operator::LtEq => false,
438 Operator::Gt => true,
439 Operator::GtEq => true,
440 }
441 };
442
443 if k < self.reference_value {
444 let const_val = push_const_for_below(op);
445 if let Some(n) = nulls_opt {
446 for (i, _b) in values.iter().enumerate() {
447 out_vals.push(n.is_valid(i) && const_val);
448 }
449 } else {
450 out_vals.resize(values.len(), const_val);
451 }
452 } else {
453 let rel = k - self.reference_value;
454 let bw = self.bucket_width;
455 let q = rel / bw;
456 let r = rel % bw;
457
458 let less_side: bool = matches!(
459 op,
460 Operator::Eq | Operator::NotEq | Operator::Lt | Operator::LtEq
461 );
462 let greater_side: bool = matches!(op, Operator::NotEq | Operator::Gt | Operator::GtEq);
463 let on_equal_bucket = |r: u64, bw: u64| -> Option<bool> {
464 match op {
465 Operator::Eq | Operator::NotEq => None,
466 Operator::Lt => (r == 0).then_some(false),
467 Operator::LtEq => (r + 1 == bw).then_some(true),
468 Operator::Gt => (r + 1 == bw).then_some(false),
469 Operator::GtEq => (r == 0).then_some(true),
470 }
471 };
472
473 if let Some(n) = nulls_opt {
474 for (i, &b) in values.iter().enumerate() {
475 if !n.is_valid(i) {
476 out_vals.push(false);
477 continue;
478 }
479 let v = if b < q {
480 less_side
481 } else if b > q {
482 greater_side
483 } else {
484 match on_equal_bucket(r, bw) {
485 Some(val) => val,
486 None => return Err(NeedsBacking),
487 }
488 };
489 out_vals.push(v);
490 }
491 } else {
492 for &b in values.iter() {
493 let v = if b < q {
494 less_side
495 } else if b > q {
496 greater_side
497 } else {
498 match on_equal_bucket(r, bw) {
499 Some(val) => val,
500 None => return Err(NeedsBacking),
501 }
502 };
503 out_vals.push(v);
504 }
505 }
506 }
507
508 let bool_buf = BooleanBuffer::from_iter(out_vals);
509 let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
510 Ok(Some(out))
511 }
512}
513
514#[async_trait::async_trait]
515impl LiquidSqueezedArray for LiquidDecimalQuantizedArray {
516 fn as_any(&self) -> &dyn Any {
517 self
518 }
519
520 fn get_array_memory_size(&self) -> usize {
521 self.quantized.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
522 }
523
524 fn len(&self) -> usize {
525 LiquidDecimalQuantizedArray::len(self)
526 }
527
528 async fn to_arrow_array(&self) -> ArrayRef {
529 self.hydrate_full_arrow().await
530 }
531
532 fn data_type(&self) -> LiquidDataType {
533 LiquidDataType::Decimal
534 }
535
536 fn original_arrow_data_type(&self) -> DataType {
537 self.meta.data_type()
538 }
539
540 fn disk_backing(&self) -> SqueezedBacking {
541 SqueezedBacking::Liquid((self.disk_range.end - self.disk_range.start) as usize)
542 }
543
544 async fn try_eval_predicate(
545 &self,
546 liquid_expr: &LiquidExpr,
547 filter: &BooleanBuffer,
548 ) -> BooleanArray {
549 let filtered = self.filter_inner(filter);
550
551 let expr = if let Some(expr) = unwrap_dynamic_filter(liquid_expr.physical_expr()) {
552 expr
553 } else {
554 let fallback = self.filter(filter).await;
555 return eval_predicate_on_array(fallback, liquid_expr);
556 };
557 let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() else {
558 let fallback = self.filter(filter).await;
559 return eval_predicate_on_array(fallback, liquid_expr);
560 };
561 if binary_expr
562 .left()
563 .as_any()
564 .downcast_ref::<Column>()
565 .is_none()
566 {
567 let fallback = self.filter(filter).await;
568 return eval_predicate_on_array(fallback, liquid_expr);
569 }
570
571 let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>() else {
572 let fallback = self.filter(filter).await;
573 return eval_predicate_on_array(fallback, liquid_expr);
574 };
575
576 let Some(op) = Operator::from_datafusion(binary_expr.op()) else {
577 let fallback = self.filter(filter).await;
578 return eval_predicate_on_array(fallback, liquid_expr);
579 };
580 match filtered.try_eval_predicate_inner(&op, literal) {
581 Ok(Some(mask)) => {
582 self.io.trace_io_saved();
583 return mask;
584 }
585 Ok(None) => {
586 let fallback = self.filter(filter).await;
587 return eval_predicate_on_array(fallback, liquid_expr);
588 }
589 Err(NeedsBacking) => {}
590 }
591
592 use arrow::array::cast::AsArray;
593
594 let full = self.hydrate_full_arrow().await;
595 let selection_array = BooleanArray::new(filter.clone(), None);
596 let filtered_arr = arrow::compute::filter(&full, &selection_array)
597 .expect("selection must match array length");
598 let filtered_len = filtered_arr.len();
599
600 let lhs = ColumnarValue::Array(filtered_arr);
601 let rhs = ColumnarValue::Scalar(literal.value().clone());
602 let result = match binary_expr.op() {
603 DFOperator::NotEq => apply_cmp(DFOperator::NotEq, &lhs, &rhs),
604 DFOperator::Eq => apply_cmp(DFOperator::Eq, &lhs, &rhs),
605 DFOperator::Lt => apply_cmp(DFOperator::Lt, &lhs, &rhs),
606 DFOperator::LtEq => apply_cmp(DFOperator::LtEq, &lhs, &rhs),
607 DFOperator::Gt => apply_cmp(DFOperator::Gt, &lhs, &rhs),
608 DFOperator::GtEq => apply_cmp(DFOperator::GtEq, &lhs, &rhs),
609 _ => {
610 let fallback = self.filter(filter).await;
611 return eval_predicate_on_array(fallback, liquid_expr);
612 }
613 };
614 let result = result.expect("validated LiquidExpr comparison must evaluate");
615 result
616 .into_array(filtered_len)
617 .expect("comparison output must be an array")
618 .as_boolean()
619 .clone()
620 }
621}
622
623fn unwrap_dynamic_filter(expr: &Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>> {
624 if let Some(dynamic_filter) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
625 dynamic_filter.current().ok()
626 } else {
627 Some(expr.clone())
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::cache::{CacheExpression, TestSqueezeIo};
635 use arrow::array::Decimal128Builder;
636 use arrow::buffer::BooleanBuffer;
637 use datafusion_common::ScalarValue;
638 use datafusion_expr_common::operator::Operator as DFOperator;
639 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
640 use futures::executor::block_on;
641 use std::sync::Arc;
642
643 #[test]
644 fn decimal_u64_roundtrip() {
645 let mut builder = Decimal128Builder::new();
646 builder.append_value(100_i128);
647 builder.append_null();
648 builder.append_value(250_i128);
649 let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
650
651 let liquid = LiquidDecimalArray::from_decimal_array(&original);
652 let arrow = liquid.to_arrow_array();
653 assert_eq!(arrow.as_ref(), &original);
654 }
655
656 #[test]
657 fn decimal_u64_ipc_roundtrip() {
658 let mut builder = Decimal128Builder::new();
659 builder.append_value(12345_i128);
660 builder.append_value(67890_i128);
661 let original = builder.finish().with_precision_and_scale(12, 3).unwrap();
662
663 let liquid = LiquidDecimalArray::from_decimal_array(&original);
664 let bytes = liquid.to_bytes();
665 let decoded = LiquidDecimalArray::from_bytes(bytes.into());
666 let arrow = decoded.to_arrow_array();
667 assert_eq!(arrow.as_ref(), &original);
668 }
669
670 #[test]
671 fn decimal_quantized_predicate_eval() {
672 let mut builder = Decimal128Builder::new();
673 builder.append_value(100_i128);
674 builder.append_value(200_i128);
675 builder.append_null();
676 builder.append_value(300_i128);
677 let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
678
679 let liquid = LiquidDecimalArray::from_decimal_array(&original);
680 let hint = CacheExpression::PredicateColumn;
681 let io = Arc::new(TestSqueezeIo::default());
682 let (hybrid, bytes) = liquid.squeeze(io.clone(), Some(&hint)).expect("squeezable");
683 io.set_bytes(bytes);
684
685 let mask = BooleanBuffer::new_set(original.len());
686 let lit = Arc::new(Literal::new(ScalarValue::Decimal128(Some(100_i128), 10, 2)));
687 let col = Arc::new(Column::new("col", 0));
688 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(col, DFOperator::GtEq, lit));
689
690 let got = block_on(hybrid.try_eval_predicate(
691 &crate::cache::LiquidExpr::new_unchecked(expr.clone()),
692 &mask,
693 ));
694 let expected = BooleanArray::from(vec![Some(true), Some(true), None, Some(true)]);
695 assert_eq!(got, expected);
696 assert_eq!(io.reads(), 0);
697 }
698}