1use std::any::Any;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::PrimitiveKind;
7use super::{LiquidArray, LiquidDataType, LiquidPrimitiveType};
8use crate::cache::LiquidExpr;
9use crate::liquid_array::LiquidPrimitiveArray;
10use crate::liquid_array::eval_predicate_on_array;
11use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
12use arrow::array::{
13 Array, ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
14 cast::AsArray,
15 types::{
16 Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type,
17 UInt32Type, UInt64Type,
18 },
19};
20use arrow::buffer::{BooleanBuffer, ScalarBuffer};
21use arrow::compute::kernels::filter;
22use arrow_schema::DataType;
23use bytes::Bytes;
24use num_traits::{AsPrimitive, Bounded, FromPrimitive};
25
26#[derive(Debug)]
37pub struct LiquidLinearArray<T: LiquidPrimitiveType>
38where
39 T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
40{
41 residuals: LiquidPrimitiveArray<Int64Type>,
43 intercept: f64,
45 slope: f64,
47 _phantom: PhantomData<T>,
49}
50
51pub type LiquidLinearI32Array = LiquidLinearArray<Int32Type>;
53pub type LiquidLinearI8Array = LiquidLinearArray<Int8Type>;
55pub type LiquidLinearI16Array = LiquidLinearArray<Int16Type>;
57pub type LiquidLinearI64Array = LiquidLinearArray<Int64Type>;
59pub type LiquidLinearU8Array = LiquidLinearArray<UInt8Type>;
61pub type LiquidLinearU16Array = LiquidLinearArray<UInt16Type>;
63pub type LiquidLinearU32Array = LiquidLinearArray<UInt32Type>;
65pub type LiquidLinearU64Array = LiquidLinearArray<UInt64Type>;
67pub type LiquidLinearDate32Array = LiquidLinearArray<Date32Type>;
69pub type LiquidLinearDate64Array = LiquidLinearArray<Date64Type>;
71
72impl<T> LiquidLinearArray<T>
73where
74 T: LiquidPrimitiveType,
75 T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
76{
77 pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> Self {
80 let len = arrow_array.len();
81
82 if arrow_array.null_count() == len {
84 let res = PrimitiveArray::<Int64Type>::new_null(len);
86 return Self {
87 residuals: LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res),
88 intercept: 0.0, slope: 0.0,
90 _phantom: PhantomData,
91 };
92 }
93
94 let (nn_values, nn_indices) = collect_non_null_f64_and_indices::<T>(&arrow_array);
96
97 let (mut intercept, mut slope) = fit_linf(&nn_values, &nn_indices);
99
100 let mut residuals: Vec<i64> = Vec::with_capacity(len);
102 let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
103 let vals = arrow_array.values();
104 let nulls_opt = arrow_array.nulls();
105
106 let mut orig_min_u64 = u64::MAX;
108 let mut orig_max_u64 = 0u64;
109 let mut orig_min_i64 = i64::MAX;
110 let mut orig_max_i64 = i64::MIN;
111 let mut res_min = i64::MAX;
113 let mut res_max = i64::MIN;
114
115 if is_unsigned {
116 let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
117 for i in 0..len {
118 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
119 if valid {
120 type U<TT> =
121 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
122 let v_u: U<T> = vals[i].as_();
123 let v_u64: u64 = v_u.as_();
124 if v_u64 < orig_min_u64 {
125 orig_min_u64 = v_u64;
126 }
127 if v_u64 > orig_max_u64 {
128 orig_max_u64 = v_u64;
129 }
130 let pr = slope * (i as f64) + intercept;
131 let p = predict_u64_saturated(pr, max_u64);
132 let (pos, mag) = if v_u64 >= p {
133 (true, v_u64 - p)
134 } else {
135 (false, p - v_u64)
136 };
137 let m = (mag & (i64::MAX as u64)) as i64;
138 let r = if pos { m } else { -m };
139 if r < res_min {
140 res_min = r;
141 }
142 if r > res_max {
143 res_max = r;
144 }
145 residuals.push(r);
146 } else {
147 residuals.push(0);
148 }
149 }
150 } else {
151 let (min_i64, max_i64): (i64, i64) =
152 (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
153 for i in 0..len {
154 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
155 if valid {
156 let v_i64: i64 = vals[i].as_();
157 if v_i64 < orig_min_i64 {
158 orig_min_i64 = v_i64;
159 }
160 if v_i64 > orig_max_i64 {
161 orig_max_i64 = v_i64;
162 }
163 let pr = slope * (i as f64) + intercept;
164 let p = predict_i64_saturated(pr, min_i64, max_i64);
165 let r = v_i64 - p;
166 if r < res_min {
167 res_min = r;
168 }
169 if r > res_max {
170 res_max = r;
171 }
172 residuals.push(r);
173 } else {
174 residuals.push(0);
175 }
176 }
177 }
178
179 let res_width: u128 = (res_max as i128 - res_min as i128) as u128;
181 let orig_width: u128 = if is_unsigned {
182 (orig_max_u64 as u128).saturating_sub(orig_min_u64 as u128)
183 } else {
184 (orig_max_i64 as i128 - orig_min_i64 as i128) as u128
185 };
186 if res_width >= orig_width {
187 intercept = 0.0;
189 slope = 0.0;
190 residuals.clear();
191 if is_unsigned {
192 for i in 0..len {
193 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
194 if valid {
195 type U<TT> = <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
196 let v_u: U<T> = vals[i].as_();
197 let v_u64: u64 = v_u.as_();
198 let r = (v_u64 & (i64::MAX as u64)) as i64;
199 residuals.push(r);
200 } else {
201 residuals.push(0);
202 }
203 }
204 } else {
205 for i in 0..len {
206 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
207 if valid {
208 let v_i64: i64 = vals[i].as_();
209 residuals.push(v_i64);
210 } else {
211 residuals.push(0);
212 }
213 }
214 }
215 }
216 let residuals_buf: ScalarBuffer<i64> = ScalarBuffer::from(residuals);
217 let nulls = arrow_array.nulls().cloned();
218 let res_prim = PrimitiveArray::<Int64Type>::new(residuals_buf, nulls);
219 let residuals = LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res_prim);
220
221 Self {
222 residuals,
223 intercept,
224 slope,
225 _phantom: PhantomData,
226 }
227 }
228
229 fn len(&self) -> usize {
230 self.residuals.len()
231 }
232
233 fn residual_starting_loc() -> usize {
234 let header_size =
236 LiquidIPCHeader::size() + std::mem::size_of::<f64>() + std::mem::size_of::<f64>();
237 (header_size + 7) & !7
238 }
239
240 fn to_bytes_inner(&self) -> Vec<u8> {
241 let header = LiquidIPCHeader::new(
242 LiquidDataType::LinearInteger as u16,
243 get_physical_type_id::<T>(),
244 );
245 let start = Self::residual_starting_loc();
246 let mut out = Vec::with_capacity(start + 256);
247
248 out.extend_from_slice(&header.to_bytes());
250 out.extend_from_slice(&self.intercept.to_le_bytes());
252 out.extend_from_slice(&self.slope.to_le_bytes());
253
254 while out.len() < start {
255 out.push(0);
256 }
257
258 out.extend_from_slice(&self.residuals.to_bytes_inner());
260 out
261 }
262
263 pub fn from_bytes(bytes: Bytes) -> Self {
265 let _hdr = LiquidIPCHeader::from_bytes(&bytes);
266
267 let intercept_off = LiquidIPCHeader::size();
269 let intercept =
270 f64::from_le_bytes(bytes[intercept_off..intercept_off + 8].try_into().unwrap());
271
272 let slope_off = intercept_off + std::mem::size_of::<f64>();
274 let slope = f64::from_le_bytes(bytes[slope_off..slope_off + 8].try_into().unwrap());
275
276 let start = Self::residual_starting_loc();
278 let res_bytes = bytes.slice(start..);
279 let residuals = LiquidPrimitiveArray::<Int64Type>::from_bytes(res_bytes);
280
281 Self {
282 residuals,
283 intercept,
284 slope,
285 _phantom: PhantomData,
286 }
287 }
288}
289
290impl<T> LiquidArray for LiquidLinearArray<T>
291where
292 T: LiquidPrimitiveType,
293 T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
294{
295 fn as_any(&self) -> &dyn Any {
296 self
297 }
298
299 fn original_arrow_data_type(&self) -> DataType {
300 T::DATA_TYPE.clone()
301 }
302
303 fn get_array_memory_size(&self) -> usize {
304 self.residuals.get_array_memory_size()
305 + std::mem::size_of::<f64>() + std::mem::size_of::<f64>() }
308
309 fn len(&self) -> usize {
310 self.len()
311 }
312
313 fn to_arrow_array(&self) -> ArrayRef {
314 let arr = self.residuals.to_arrow_array();
315 let (_dt, residuals, nulls) = arr.as_primitive::<Int64Type>().clone().into_parts();
316
317 let mut final_values = Vec::<T::Native>::with_capacity(self.len());
319 let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
320 if is_unsigned {
321 let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
322 for (i, &e) in residuals.iter().enumerate() {
323 let pr = self.slope * (i as f64) + self.intercept;
324 let p = predict_u64_saturated(pr, max_u64);
325 let mag = e.unsigned_abs();
326 let sum = if e >= 0 {
327 p.saturating_add(mag)
328 } else {
329 p.saturating_sub(mag)
330 };
331 final_values.push(T::Native::from_u64(sum).unwrap());
332 }
333 } else {
334 let (min_i64, max_i64): (i64, i64) =
335 (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
336 for (i, &e) in residuals.iter().enumerate() {
337 let pr = self.slope * (i as f64) + self.intercept;
338 let p = predict_i64_saturated(pr, min_i64, max_i64);
339 let sum = p.saturating_add(e);
340 final_values.push(T::Native::from_i64(sum).unwrap());
341 }
342 }
343
344 let values_buf: ScalarBuffer<T::Native> = ScalarBuffer::from(final_values);
345 Arc::new(PrimitiveArray::<T>::new(values_buf, nulls))
346 }
347
348 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
349 let arr = self.to_arrow_array();
350 let selection = BooleanArray::new(selection.clone(), None);
351 filter::filter(&arr, &selection).unwrap()
352 }
353
354 fn try_eval_predicate(&self, predicate: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
355 let arr = self.filter(filter);
356 eval_predicate_on_array(arr, predicate)
357 }
358
359 fn to_bytes(&self) -> Vec<u8> {
360 self.to_bytes_inner()
361 }
362
363 fn data_type(&self) -> LiquidDataType {
364 LiquidDataType::LinearInteger
365 }
366}
367
368#[inline]
369fn predict_u64_saturated(pred: f64, max_u64: u64) -> u64 {
370 if !pred.is_finite() || pred <= 0.0 {
371 0
372 } else if pred >= max_u64 as f64 {
373 max_u64
374 } else {
375 pred.round() as u64
376 }
377}
378
379#[inline]
380fn predict_i64_saturated(pred: f64, min_i64: i64, max_i64: i64) -> i64 {
381 if !pred.is_finite() {
382 0
383 } else if pred <= min_i64 as f64 {
384 min_i64
385 } else if pred >= max_i64 as f64 {
386 max_i64
387 } else {
388 pred.round() as i64
389 }
390}
391
392fn fit_linf(values: &[f64], idxs: &[u32]) -> (f64, f64) {
400 let n = values.len();
401 assert_eq!(values.len(), idxs.len());
402 if n == 0 {
403 return (0.0, 0.0);
404 }
405 if n == 1 {
406 return (values[0], 0.0);
407 }
408
409 let mut slope_min = f64::INFINITY;
410 let mut slope_max = f64::NEG_INFINITY;
411 for k in 1..n {
412 let di = (idxs[k] - idxs[k - 1]) as f64;
413 if di > 0.0 {
414 let dv = values[k] - values[k - 1];
415 let s = dv / di;
416 if s < slope_min {
417 slope_min = s;
418 }
419 if s > slope_max {
420 slope_max = s;
421 }
422 }
423 }
424 if !slope_min.is_finite() || !slope_max.is_finite() {
425 slope_min = 0.0;
426 slope_max = 0.0;
427 }
428
429 let mut lo = slope_min.min(slope_max);
430 let mut hi = slope_min.max(slope_max);
431 if (hi - lo).abs() < 1e-12 {
432 let pad = if hi.abs() < 1.0 { 1.0 } else { hi.abs() * 1e-6 };
433 lo -= pad;
434 hi += pad;
435 }
436
437 #[inline]
438 fn range_stats(values: &[f64], idxs: &[u32], m: f64) -> (f64, u32, f64, u32) {
439 let mut min_s = f64::INFINITY;
440 let mut max_s = f64::NEG_INFINITY;
441 let mut i_min = 0u32;
442 let mut i_max = 0u32;
443 for k in 0..values.len() {
444 let i = idxs[k] as f64;
445 let s = values[k] - m * i;
446 if s < min_s {
447 min_s = s;
448 i_min = idxs[k];
449 }
450 if s > max_s {
451 max_s = s;
452 i_max = idxs[k];
453 }
454 }
455 (min_s, i_min, max_s, i_max)
456 }
457
458 const MAX_ITERS: usize = 8;
459 for _ in 0..MAX_ITERS {
460 let m = 0.5 * (lo + hi);
461 let (_min_s, i_min, _max_s, i_max) = range_stats(values, idxs, m);
462 let g = (i_min as i64) - (i_max as i64);
463 if g > 0 {
464 hi = m;
465 } else if g < 0 {
466 lo = m;
467 } else {
468 lo = m;
469 hi = m;
470 break;
471 }
472 if (hi - lo).abs() < 1e-12 {
473 break;
474 }
475 }
476
477 let m = 0.5 * (lo + hi);
478 let (min_s, _i_min, max_s, _i_max) = range_stats(values, idxs, m);
479 let b = 0.5 * (max_s + min_s);
480 (b, m)
481}
482
483#[inline]
484fn collect_non_null_f64_and_indices<T>(arr: &PrimitiveArray<T>) -> (Vec<f64>, Vec<u32>)
485where
486 T: LiquidPrimitiveType,
487 T::Native: AsPrimitive<f64>,
488{
489 let nn = arr.len() - arr.null_count();
490 let mut values = Vec::with_capacity(nn);
491 let mut idxs = Vec::with_capacity(nn);
492 let vals = arr.values();
493 if arr.null_count() == 0 {
494 for (i, v) in vals.iter().enumerate() {
495 values.push(v.as_());
496 idxs.push(i as u32);
497 }
498 } else {
499 let nulls = arr.nulls().unwrap();
500 for (i, v) in vals.iter().enumerate() {
501 if nulls.is_valid(i) {
502 values.push(v.as_());
503 idxs.push(i as u32);
504 }
505 }
506 }
507 (values, idxs)
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 fn roundtrip_eq(values: Vec<Option<i32>>) {
515 let arr = PrimitiveArray::<Int32Type>::from(values.clone());
516 let linear = LiquidLinearI32Array::from_arrow_array(arr.clone());
517 let decoded = linear.to_arrow_array();
518 assert_eq!(decoded.as_ref(), &arr);
519
520 let bytes = Bytes::from(linear.to_bytes());
521 let decoded = LiquidLinearI32Array::from_bytes(bytes);
522 let round = decoded.to_arrow_array();
523 assert_eq!(round.as_ref(), &arr);
524 }
525
526 macro_rules! roundtrip_eq_t {
527 ($T:ty, $values:expr) => {{
528 let arr = PrimitiveArray::<$T>::from(($values).clone());
529 let linear = LiquidLinearArray::<$T>::from_arrow_array(arr.clone());
530 let decoded = linear.to_arrow_array();
531 assert_eq!(decoded.as_ref(), &arr);
532
533 let bytes = Bytes::from(linear.to_bytes());
534 let decoded = LiquidLinearArray::<$T>::from_bytes(bytes);
535 let round = decoded.to_arrow_array();
536 assert_eq!(round.as_ref(), &arr);
537 }};
538 }
539
540 #[test]
541 fn test_roundtrip_basic() {
542 roundtrip_eq(vec![
544 Some(10),
545 Some(15),
546 Some(14),
547 Some(20),
548 Some(18),
549 Some(25),
550 Some(24),
551 ]);
552 }
553
554 #[test]
555 fn test_roundtrip_with_nulls() {
556 roundtrip_eq(vec![Some(10), None, Some(30), None, Some(50), Some(70)]);
557 }
558
559 #[test]
560 fn test_all_nulls() {
561 roundtrip_eq(vec![None, None, None, None]);
562 }
563
564 #[test]
565 fn test_single_value() {
566 roundtrip_eq(vec![Some(42)]);
567 }
568
569 #[test]
570 fn test_empty() {
571 roundtrip_eq(vec![]);
572 }
573
574 #[test]
575 fn test_negative_values() {
576 roundtrip_eq(vec![
577 Some(-100),
578 Some(-50),
579 Some(0),
580 Some(50),
581 Some(25),
582 None,
583 Some(-25),
584 ]);
585 }
586
587 #[test]
588 fn test_filter_basic() {
589 let original: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3), None, Some(5), Some(8)];
590 let arr = PrimitiveArray::<Int32Type>::from(original.clone());
591 let linear = LiquidLinearI32Array::from_arrow_array(arr);
592 let selection = BooleanBuffer::from(vec![true, false, true, false, true, false]);
593 let result = linear.filter(&selection);
594 let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
595 assert_eq!(result.as_ref(), &expected);
596 }
597
598 #[test]
599 fn test_original_arrow_data_type_returns_int32() {
600 let arr = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
601 let linear = LiquidLinearI32Array::from_arrow_array(arr);
602 assert_eq!(linear.original_arrow_data_type(), DataType::Int32);
603 }
604
605 #[test]
606 fn test_roundtrip_i8() {
607 roundtrip_eq_t!(Int8Type, vec![Some(-10), Some(0), Some(10), None, Some(20)]);
608 }
609
610 #[test]
611 fn test_roundtrip_i16() {
612 roundtrip_eq_t!(
613 Int16Type,
614 vec![Some(-1000), Some(0), Some(1000), None, Some(2000)]
615 );
616 }
617
618 #[test]
619 fn test_roundtrip_i64() {
620 roundtrip_eq_t!(
621 Int64Type,
622 vec![
623 Some(-10_000_000_000),
624 Some(0),
625 Some(10_000_000_000),
626 None,
627 Some(20_000_000_000),
628 ]
629 );
630 }
631
632 #[test]
633 fn test_roundtrip_u8() {
634 roundtrip_eq_t!(
635 UInt8Type,
636 vec![Some(0), Some(10), Some(200), None, Some(255)]
637 );
638 }
639
640 #[test]
641 fn test_roundtrip_u16() {
642 roundtrip_eq_t!(
643 UInt16Type,
644 vec![Some(0), Some(1000), Some(60000), None, Some(500)]
645 );
646 }
647
648 #[test]
649 fn test_roundtrip_u32() {
650 roundtrip_eq_t!(
651 UInt32Type,
652 vec![
653 Some(0),
654 Some(1_000_000),
655 Some(3_000_000_000),
656 None,
657 Some(123_456_789),
658 ]
659 );
660 }
661
662 #[test]
663 fn test_roundtrip_u64() {
664 roundtrip_eq_t!(
665 UInt64Type,
666 vec![
667 Some(0),
668 Some(10_000_000_000),
669 Some(9_000_000_000_000_000_000u64),
670 None,
671 Some(42),
672 ]
673 );
674 }
675
676 #[test]
677 fn test_roundtrip_date32() {
678 roundtrip_eq_t!(
679 Date32Type,
680 vec![Some(-365), Some(0), Some(365), None, Some(18262)]
681 );
682 }
683
684 #[test]
685 fn test_roundtrip_date64() {
686 roundtrip_eq_t!(
687 Date64Type,
688 vec![
689 Some(-86_400_000),
690 Some(0),
691 Some(86_400_000),
692 None,
693 Some(1_000_000_000_000),
694 ]
695 );
696 }
697
698 #[test]
699 fn test_compression() {
700 let original = (0..1_000_000).step_by(100).collect::<Vec<_>>();
701
702 let original = PrimitiveArray::<Int32Type>::from_iter_values(original);
703 let arrow_size = original.get_array_memory_size();
704
705 let liquid_linear = LiquidLinearI32Array::from_arrow_array(original.clone());
706 let liquid_linear_size = liquid_linear.get_array_memory_size();
707
708 let liquid_primitive =
709 LiquidPrimitiveArray::<Int32Type>::from_arrow_array(original.clone());
710 let liquid_primitive_size = liquid_primitive.get_array_memory_size();
711
712 println!(
713 "arrow_size: {arrow_size}, liquid_linear_size: {liquid_linear_size}, liquid_primitive_size: {liquid_primitive_size}",
714 );
715
716 assert!(liquid_linear_size < arrow_size);
717 assert!(liquid_primitive_size < arrow_size);
718 assert!(liquid_linear_size < liquid_primitive_size);
719
720 let original: ArrayRef = Arc::new(original);
721 assert_eq!(original.as_ref(), liquid_linear.to_arrow_array().as_ref());
722 assert_eq!(
723 original.as_ref(),
724 liquid_primitive.to_arrow_array().as_ref()
725 );
726 }
727}