1use std::any::Any;
2use std::mem::size_of;
3use std::num::NonZero;
4use std::sync::Arc;
5
6use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
7use arrow::buffer::{BooleanBuffer, ScalarBuffer};
8use arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType, UInt64Type, i256};
9use arrow_schema::DataType;
10use bytes::Bytes;
11use datafusion::physical_plan::PhysicalExpr;
12use datafusion::physical_plan::expressions::{
13 BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal,
14};
15use num_traits::ToPrimitive;
16
17use super::{
18 LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, NeedsBacking,
19 Operator, SqueezeIoHandler, SqueezeResult,
20};
21use crate::cache::CacheExpression;
22use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
23use crate::liquid_array::raw::BitPackedArray;
24use crate::utils::get_bit_width;
25
26#[derive(Debug, Clone, Copy)]
27struct DecimalMeta {
28 precision: u8,
29 scale: i8,
30 is_256: bool,
31}
32
33impl DecimalMeta {
34 fn from_data_type(data_type: &DataType) -> Self {
35 match data_type {
36 DataType::Decimal128(precision, scale) => Self {
37 precision: *precision,
38 scale: *scale,
39 is_256: false,
40 },
41 DataType::Decimal256(precision, scale) => Self {
42 precision: *precision,
43 scale: *scale,
44 is_256: true,
45 },
46 _ => panic!("unsupported decimal data type: {data_type:?}"),
47 }
48 }
49
50 fn data_type(&self) -> DataType {
51 if self.is_256 {
52 DataType::Decimal256(self.precision, self.scale)
53 } else {
54 DataType::Decimal128(self.precision, self.scale)
55 }
56 }
57
58 fn arrow_code(&self) -> u8 {
59 if self.is_256 { 1 } else { 0 }
60 }
61}
62
63#[repr(C)]
64struct DecimalArrayHeader {
65 arrow_type: u8, precision: u8,
67 scale: i8,
68 __padding: u8,
69 __reserved: u32,
70}
71
72impl DecimalArrayHeader {
73 const fn size() -> usize {
74 8
75 }
76
77 fn from_meta(meta: DecimalMeta) -> Self {
78 Self {
79 arrow_type: meta.arrow_code(),
80 precision: meta.precision,
81 scale: meta.scale,
82 __padding: 0,
83 __reserved: 0,
84 }
85 }
86
87 fn to_bytes(&self) -> [u8; Self::size()] {
88 let mut bytes = [0; Self::size()];
89 bytes[0] = self.arrow_type;
90 bytes[1] = self.precision;
91 bytes[2] = self.scale as u8;
92 bytes
93 }
94
95 fn from_bytes(bytes: &[u8]) -> Self {
96 if bytes.len() < Self::size() {
97 panic!(
98 "value too small for DecimalArrayHeader, expected at least {} bytes, got {}",
99 Self::size(),
100 bytes.len()
101 );
102 }
103 Self {
104 arrow_type: bytes[0],
105 precision: bytes[1],
106 scale: bytes[2] as i8,
107 __padding: 0,
108 __reserved: 0,
109 }
110 }
111}
112
113#[derive(Debug)]
115pub struct LiquidDecimalArray {
116 meta: DecimalMeta,
117 bit_packed: BitPackedArray<UInt64Type>,
118 reference_value: u64,
119}
120
121impl LiquidDecimalArray {
122 pub(crate) fn fits_u64<T: DecimalType>(array: &PrimitiveArray<T>) -> bool
123 where
124 T::Native: ToPrimitive,
125 {
126 array.iter().flatten().all(|v| v.to_u64().is_some())
127 }
128
129 pub(crate) fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self
130 where
131 T::Native: ToPrimitive,
132 {
133 debug_assert!(Self::fits_u64(array));
134 let meta = DecimalMeta::from_data_type(array.data_type());
135 if array.null_count() == array.len() {
136 return Self {
137 meta,
138 bit_packed: BitPackedArray::new_null_array(array.len()),
139 reference_value: 0,
140 };
141 }
142
143 let nulls = array.nulls().cloned();
144 let mut min = u64::MAX;
145 let mut max = 0u64;
146 let values: Vec<u64> = array
147 .iter()
148 .map(|v| match v {
149 Some(v) => {
150 let value = v.to_u64().expect("decimal fits u64");
151 if value < min {
152 min = value;
153 }
154 if value > max {
155 max = value;
156 }
157 value
158 }
159 None => 0,
160 })
161 .collect();
162
163 let bit_width = get_bit_width(max - min);
164 let offsets = ScalarBuffer::from_iter(values.iter().map(|v| v.saturating_sub(min)));
165 let unsigned_array = PrimitiveArray::<UInt64Type>::new(offsets, nulls);
166 let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
167
168 Self {
169 meta,
170 bit_packed,
171 reference_value: min,
172 }
173 }
174
175 fn bit_pack_starting_loc() -> usize {
176 let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
177 (header_size + size_of::<u64>() + 7) & !7
178 }
179
180 fn to_u64_array(&self) -> PrimitiveArray<UInt64Type> {
181 let unsigned_array = self.bit_packed.to_primitive();
182 let (_data_type, values, _nulls) = unsigned_array.into_parts();
183 let nulls = self.bit_packed.nulls();
184 let values = if self.reference_value != 0 {
185 let reference_value = self.reference_value;
186 ScalarBuffer::from_iter(values.iter().map(|v| v.wrapping_add(reference_value)))
187 } else {
188 values
189 };
190 PrimitiveArray::<UInt64Type>::new(values, nulls.cloned())
191 }
192
193 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
194 let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
195 let mut result = Vec::with_capacity(Self::bit_pack_starting_loc() + 256);
196 result.resize(header_size, 0);
197
198 let logical_type_id = LiquidDataType::Decimal as u16;
199 let physical_type_id = get_physical_type_id::<UInt64Type>();
200 let ipc_header = LiquidIPCHeader::new(logical_type_id, physical_type_id);
201 result[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
202
203 let decimal_header = DecimalArrayHeader::from_meta(self.meta);
204 result[LiquidIPCHeader::size()..header_size].copy_from_slice(&decimal_header.to_bytes());
205
206 result.extend_from_slice(&self.reference_value.to_le_bytes());
207 while result.len() < Self::bit_pack_starting_loc() {
208 result.push(0);
209 }
210 self.bit_packed.to_bytes(&mut result);
211 result
212 }
213
214 pub(crate) fn from_bytes(bytes: Bytes) -> Self {
215 let header_size = LiquidIPCHeader::size() + DecimalArrayHeader::size();
216 let header = LiquidIPCHeader::from_bytes(&bytes);
217
218 assert_eq!(header.logical_type_id, LiquidDataType::Decimal as u16);
219 assert_eq!(
220 header.physical_type_id,
221 get_physical_type_id::<UInt64Type>()
222 );
223
224 let decimal_header =
225 DecimalArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
226 let meta = DecimalMeta {
227 precision: decimal_header.precision,
228 scale: decimal_header.scale,
229 is_256: match decimal_header.arrow_type {
230 0 => false,
231 1 => true,
232 _ => panic!(
233 "unsupported decimal type code: {}",
234 decimal_header.arrow_type
235 ),
236 },
237 };
238
239 let ref_start = header_size;
240 let ref_end = ref_start + size_of::<u64>();
241 let reference_value = u64::from_le_bytes(bytes[ref_start..ref_end].try_into().unwrap());
242
243 let bit_packed_data = bytes.slice(Self::bit_pack_starting_loc()..);
244 let bit_packed = BitPackedArray::<UInt64Type>::from_bytes(bit_packed_data);
245
246 Self {
247 meta,
248 bit_packed,
249 reference_value,
250 }
251 }
252}
253
254impl LiquidArray for LiquidDecimalArray {
255 fn as_any(&self) -> &dyn Any {
256 self
257 }
258
259 fn get_array_memory_size(&self) -> usize {
260 self.bit_packed.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
261 }
262
263 fn len(&self) -> usize {
264 self.bit_packed.len()
265 }
266
267 fn to_arrow_array(&self) -> ArrayRef {
268 let u64_array = self.to_u64_array();
269 let (_data_type, values, nulls) = u64_array.into_parts();
270 let data_type = self.meta.data_type();
271 if self.meta.is_256 {
272 let values_i256 =
273 ScalarBuffer::from_iter(values.iter().map(|v| i256::from_i128(*v as i128)));
274 let array = PrimitiveArray::<Decimal256Type>::new(values_i256, nulls);
275 Arc::new(array.with_data_type(data_type))
276 } else {
277 let values_i128 = ScalarBuffer::from_iter(values.iter().map(|v| *v as i128));
278 let array = PrimitiveArray::<Decimal128Type>::new(values_i128, nulls);
279 Arc::new(array.with_data_type(data_type))
280 }
281 }
282
283 fn original_arrow_data_type(&self) -> DataType {
284 self.meta.data_type()
285 }
286
287 fn to_bytes(&self) -> Vec<u8> {
288 self.to_bytes_inner()
289 }
290
291 fn data_type(&self) -> LiquidDataType {
292 LiquidDataType::Decimal
293 }
294
295 fn squeeze(
296 &self,
297 io: Arc<dyn SqueezeIoHandler>,
298 expression_hint: Option<&CacheExpression>,
299 ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
300 let _expression_hint = expression_hint?;
301 let full_bytes = Bytes::from(self.to_bytes_inner());
302 let disk_range = 0u64..(full_bytes.len() as u64);
303
304 let orig_bw = self.bit_packed.bit_width()?;
305 if orig_bw.get() < 8 {
306 return None;
307 }
308
309 let new_bw_u8 = NonZero::new((orig_bw.get() / 2).max(1)).unwrap();
310 let unsigned_array = self.bit_packed.to_primitive();
311 let (_dt, values, nulls) = unsigned_array.into_parts();
312
313 let max_offset = values.iter().copied().max().unwrap_or(0);
314 let bucket_count_u64 = 1u64 << (new_bw_u8.get() as u64);
315 let range_size = max_offset.saturating_add(1);
316 let bucket_width_u64 = (range_size.div_ceil(bucket_count_u64)).max(1);
317
318 let quantized_values: ScalarBuffer<u64> =
319 ScalarBuffer::from_iter(values.iter().map(|&v| {
320 let mut idx_u64 = v / bucket_width_u64;
321 if idx_u64 >= bucket_count_u64 {
322 idx_u64 = bucket_count_u64 - 1;
323 }
324 idx_u64
325 }));
326 let quantized_unsigned = PrimitiveArray::<UInt64Type>::new(quantized_values, nulls);
327 let quantized_bitpacked = BitPackedArray::from_primitive(quantized_unsigned, new_bw_u8);
328
329 let hybrid = LiquidDecimalQuantizedArray {
330 quantized: quantized_bitpacked,
331 reference_value: self.reference_value,
332 bucket_width: bucket_width_u64,
333 disk_range,
334 io,
335 meta: self.meta,
336 };
337 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, full_bytes))
338 }
339}
340
341#[derive(Debug, Clone)]
342pub(crate) struct LiquidDecimalQuantizedArray {
343 quantized: BitPackedArray<UInt64Type>,
344 reference_value: u64,
345 bucket_width: u64,
346 disk_range: std::ops::Range<u64>,
347 io: Arc<dyn SqueezeIoHandler>,
348 meta: DecimalMeta,
349}
350
351impl LiquidDecimalQuantizedArray {
352 fn len(&self) -> usize {
353 self.quantized.len()
354 }
355
356 fn new_from_filtered(&self, filtered: PrimitiveArray<UInt64Type>) -> Self {
357 let bit_width = self
358 .quantized
359 .bit_width()
360 .expect("quantized bit width must exist");
361 let quantized = BitPackedArray::from_primitive(filtered, bit_width);
362 Self {
363 quantized,
364 reference_value: self.reference_value,
365 bucket_width: self.bucket_width,
366 disk_range: self.disk_range.clone(),
367 io: self.io.clone(),
368 meta: self.meta,
369 }
370 }
371
372 fn filter_inner(&self, selection: &BooleanBuffer) -> Self {
373 let q_prim: PrimitiveArray<UInt64Type> = self.quantized.to_primitive();
374 let selection = BooleanArray::new(selection.clone(), None);
375 let filtered = arrow::compute::kernels::filter::filter(&q_prim, &selection).unwrap();
376 let filtered = filtered.as_primitive::<UInt64Type>().clone();
377 self.new_from_filtered(filtered)
378 }
379
380 async fn hydrate_full_arrow(&self) -> ArrayRef {
381 let bytes = self
382 .io
383 .read(Some(self.disk_range.clone()))
384 .await
385 .expect("read squeezed backing");
386 let liquid = crate::liquid_array::ipc::read_from_bytes(
387 bytes,
388 &crate::liquid_array::ipc::LiquidIPCContext::new(None),
389 );
390 liquid.to_arrow_array()
391 }
392
393 fn literal_to_u64(&self, literal: &Literal) -> Option<u64> {
394 use datafusion::common::ScalarValue;
395 match literal.value() {
396 ScalarValue::Decimal128(Some(v), _precision, scale) => {
397 if *scale != self.meta.scale {
398 return None;
399 }
400 v.to_u64()
401 }
402 ScalarValue::Decimal256(Some(v), _precision, scale) => {
403 if *scale != self.meta.scale {
404 return None;
405 }
406 v.to_u64()
407 }
408 _ => None,
409 }
410 }
411
412 fn try_eval_predicate_inner(
413 &self,
414 op: &Operator,
415 literal: &Literal,
416 ) -> SqueezeResult<Option<BooleanArray>> {
417 let k = match self.literal_to_u64(literal) {
418 Some(k) => k,
419 None => return Ok(None),
420 };
421
422 let q_prim = self.quantized.to_primitive();
423 let (_dt, values, _nulls) = q_prim.into_parts();
424 let nulls_opt = self.quantized.nulls();
425
426 let mut out_vals: Vec<bool> = Vec::with_capacity(values.len());
427
428 let push_const_for_below = |op: &Operator| -> bool {
429 match op {
430 Operator::Eq => false,
431 Operator::NotEq => true,
432 Operator::Lt => false,
433 Operator::LtEq => false,
434 Operator::Gt => true,
435 Operator::GtEq => true,
436 }
437 };
438
439 if k < self.reference_value {
440 let const_val = push_const_for_below(op);
441 if let Some(n) = nulls_opt {
442 for (i, _b) in values.iter().enumerate() {
443 out_vals.push(n.is_valid(i) && const_val);
444 }
445 } else {
446 out_vals.resize(values.len(), const_val);
447 }
448 } else {
449 let rel = k - self.reference_value;
450 let bw = self.bucket_width;
451 let q = rel / bw;
452 let r = rel % bw;
453
454 let less_side: bool = matches!(
455 op,
456 Operator::Eq | Operator::NotEq | Operator::Lt | Operator::LtEq
457 );
458 let greater_side: bool = matches!(op, Operator::NotEq | Operator::Gt | Operator::GtEq);
459 let on_equal_bucket = |r: u64, bw: u64| -> Option<bool> {
460 match op {
461 Operator::Eq | Operator::NotEq => None,
462 Operator::Lt => (r == 0).then_some(false),
463 Operator::LtEq => (r + 1 == bw).then_some(true),
464 Operator::Gt => (r + 1 == bw).then_some(false),
465 Operator::GtEq => (r == 0).then_some(true),
466 }
467 };
468
469 if let Some(n) = nulls_opt {
470 for (i, &b) in values.iter().enumerate() {
471 if !n.is_valid(i) {
472 out_vals.push(false);
473 continue;
474 }
475 let v = if b < q {
476 less_side
477 } else if b > q {
478 greater_side
479 } else {
480 match on_equal_bucket(r, bw) {
481 Some(val) => val,
482 None => return Err(NeedsBacking),
483 }
484 };
485 out_vals.push(v);
486 }
487 } else {
488 for &b in values.iter() {
489 let v = if b < q {
490 less_side
491 } else if b > q {
492 greater_side
493 } else {
494 match on_equal_bucket(r, bw) {
495 Some(val) => val,
496 None => return Err(NeedsBacking),
497 }
498 };
499 out_vals.push(v);
500 }
501 }
502 }
503
504 let bool_buf = BooleanBuffer::from_iter(out_vals);
505 let out = BooleanArray::new(bool_buf, self.quantized.nulls().cloned());
506 Ok(Some(out))
507 }
508}
509
510#[async_trait::async_trait]
511impl LiquidSqueezedArray for LiquidDecimalQuantizedArray {
512 fn as_any(&self) -> &dyn Any {
513 self
514 }
515
516 fn get_array_memory_size(&self) -> usize {
517 self.quantized.get_array_memory_size() + size_of::<u64>() + size_of::<DecimalMeta>()
518 }
519
520 fn len(&self) -> usize {
521 LiquidDecimalQuantizedArray::len(self)
522 }
523
524 async fn to_arrow_array(&self) -> ArrayRef {
525 self.hydrate_full_arrow().await
526 }
527
528 fn data_type(&self) -> LiquidDataType {
529 LiquidDataType::Decimal
530 }
531
532 fn original_arrow_data_type(&self) -> DataType {
533 self.meta.data_type()
534 }
535
536 async fn try_eval_predicate(
537 &self,
538 expr: &Arc<dyn PhysicalExpr>,
539 filter: &BooleanBuffer,
540 ) -> Option<BooleanArray> {
541 let filtered = self.filter_inner(filter);
542
543 let expr = unwrap_dynamic_filter(expr)?;
544 let binary_expr = expr.as_any().downcast_ref::<BinaryExpr>()?;
545 binary_expr.left().as_any().downcast_ref::<Column>()?;
546
547 let literal = binary_expr.right().as_any().downcast_ref::<Literal>()?;
548
549 let op = Operator::from_datafusion(binary_expr.op())?;
550 match filtered.try_eval_predicate_inner(&op, literal) {
551 Ok(Some(mask)) => {
552 self.io.trace_io_saved();
553 return Some(mask);
554 }
555 Ok(None) => return None,
556 Err(NeedsBacking) => {}
557 }
558
559 use arrow::array::cast::AsArray;
560 use datafusion::logical_expr::ColumnarValue;
561 use datafusion::physical_expr_common::datum::apply_cmp;
562
563 let full = self.hydrate_full_arrow().await;
564 let selection_array = BooleanArray::new(filter.clone(), None);
565 let filtered_arr = arrow::compute::filter(&full, &selection_array).ok()?;
566 let filtered_len = filtered_arr.len();
567
568 let lhs = ColumnarValue::Array(filtered_arr);
569 let rhs = ColumnarValue::Scalar(literal.value().clone());
570 let result = match binary_expr.op() {
571 datafusion::logical_expr::Operator::NotEq => {
572 apply_cmp(datafusion::logical_expr::Operator::NotEq, &lhs, &rhs)
573 }
574 datafusion::logical_expr::Operator::Eq => {
575 apply_cmp(datafusion::logical_expr::Operator::Eq, &lhs, &rhs)
576 }
577 datafusion::logical_expr::Operator::Lt => {
578 apply_cmp(datafusion::logical_expr::Operator::Lt, &lhs, &rhs)
579 }
580 datafusion::logical_expr::Operator::LtEq => {
581 apply_cmp(datafusion::logical_expr::Operator::LtEq, &lhs, &rhs)
582 }
583 datafusion::logical_expr::Operator::Gt => {
584 apply_cmp(datafusion::logical_expr::Operator::Gt, &lhs, &rhs)
585 }
586 datafusion::logical_expr::Operator::GtEq => {
587 apply_cmp(datafusion::logical_expr::Operator::GtEq, &lhs, &rhs)
588 }
589 _ => return None,
590 };
591 let result = result.ok()?;
592 Some(result.into_array(filtered_len).ok()?.as_boolean().clone())
593 }
594}
595
596fn unwrap_dynamic_filter(expr: &Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>> {
597 if let Some(dynamic_filter) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
598 dynamic_filter.current().ok()
599 } else {
600 Some(expr.clone())
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::cache::{CacheExpression, TestSqueezeIo};
608 use arrow::array::Decimal128Builder;
609 use arrow::buffer::BooleanBuffer;
610 use datafusion::logical_expr::Operator as DFOperator;
611 use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
612 use datafusion::scalar::ScalarValue;
613 use futures::executor::block_on;
614 use std::sync::Arc;
615
616 #[test]
617 fn decimal_u64_roundtrip() {
618 let mut builder = Decimal128Builder::new();
619 builder.append_value(100_i128);
620 builder.append_null();
621 builder.append_value(250_i128);
622 let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
623
624 let liquid = LiquidDecimalArray::from_decimal_array(&original);
625 let arrow = liquid.to_arrow_array();
626 assert_eq!(arrow.as_ref(), &original);
627 }
628
629 #[test]
630 fn decimal_u64_ipc_roundtrip() {
631 let mut builder = Decimal128Builder::new();
632 builder.append_value(12345_i128);
633 builder.append_value(67890_i128);
634 let original = builder.finish().with_precision_and_scale(12, 3).unwrap();
635
636 let liquid = LiquidDecimalArray::from_decimal_array(&original);
637 let bytes = liquid.to_bytes();
638 let decoded = LiquidDecimalArray::from_bytes(bytes.into());
639 let arrow = decoded.to_arrow_array();
640 assert_eq!(arrow.as_ref(), &original);
641 }
642
643 #[test]
644 fn decimal_quantized_predicate_eval() {
645 let mut builder = Decimal128Builder::new();
646 builder.append_value(100_i128);
647 builder.append_value(200_i128);
648 builder.append_null();
649 builder.append_value(300_i128);
650 let original = builder.finish().with_precision_and_scale(10, 2).unwrap();
651
652 let liquid = LiquidDecimalArray::from_decimal_array(&original);
653 let hint = CacheExpression::PredicateColumn;
654 let io = Arc::new(TestSqueezeIo::default());
655 let (hybrid, bytes) = liquid.squeeze(io.clone(), Some(&hint)).expect("squeezable");
656 io.set_bytes(bytes);
657
658 let mask = BooleanBuffer::new_set(original.len());
659 let lit = Arc::new(Literal::new(ScalarValue::Decimal128(Some(100_i128), 10, 2)));
660 let col = Arc::new(Column::new("col", 0));
661 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(col, DFOperator::GtEq, lit));
662
663 let got = block_on(hybrid.try_eval_predicate(&expr, &mask)).expect("supported");
664 let expected = BooleanArray::from(vec![Some(true), Some(true), None, Some(true)]);
665 assert_eq!(got, expected);
666 assert_eq!(io.reads(), 0);
667 }
668}