1use std::any::Any;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::PrimitiveKind;
7use super::{LiquidArray, LiquidDataType, LiquidPrimitiveType};
8use crate::liquid_array::LiquidPrimitiveArray;
9use crate::liquid_array::ipc::{LiquidIPCHeader, get_physical_type_id};
10use arrow::array::{
11 Array, ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray,
12 cast::AsArray,
13 types::{
14 Date32Type, Date64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type,
15 UInt32Type, UInt64Type,
16 },
17};
18use arrow::buffer::{BooleanBuffer, ScalarBuffer};
19use arrow::compute::kernels::filter;
20use arrow_schema::DataType;
21use bytes::Bytes;
22use num_traits::{AsPrimitive, Bounded, FromPrimitive};
23
24#[derive(Debug)]
35pub struct LiquidLinearArray<T: LiquidPrimitiveType>
36where
37 T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
38{
39 residuals: LiquidPrimitiveArray<Int64Type>,
41 intercept: f64,
43 slope: f64,
45 _phantom: PhantomData<T>,
47}
48
49pub type LiquidLinearI32Array = LiquidLinearArray<Int32Type>;
51pub type LiquidLinearI8Array = LiquidLinearArray<Int8Type>;
53pub type LiquidLinearI16Array = LiquidLinearArray<Int16Type>;
55pub type LiquidLinearI64Array = LiquidLinearArray<Int64Type>;
57pub type LiquidLinearU8Array = LiquidLinearArray<UInt8Type>;
59pub type LiquidLinearU16Array = LiquidLinearArray<UInt16Type>;
61pub type LiquidLinearU32Array = LiquidLinearArray<UInt32Type>;
63pub type LiquidLinearU64Array = LiquidLinearArray<UInt64Type>;
65pub type LiquidLinearDate32Array = LiquidLinearArray<Date32Type>;
67pub type LiquidLinearDate64Array = LiquidLinearArray<Date64Type>;
69
70impl<T> LiquidLinearArray<T>
71where
72 T: LiquidPrimitiveType,
73 T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
74{
75 pub fn from_arrow_array(arrow_array: PrimitiveArray<T>) -> Self {
78 let len = arrow_array.len();
79
80 if arrow_array.null_count() == len {
82 let res = PrimitiveArray::<Int64Type>::new_null(len);
84 return Self {
85 residuals: LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res),
86 intercept: 0.0, slope: 0.0,
88 _phantom: PhantomData,
89 };
90 }
91
92 let (nn_values, nn_indices) = collect_non_null_f64_and_indices::<T>(&arrow_array);
94
95 let (mut intercept, mut slope) = fit_linf(&nn_values, &nn_indices);
97
98 let mut residuals: Vec<i64> = Vec::with_capacity(len);
100 let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
101 let vals = arrow_array.values();
102 let nulls_opt = arrow_array.nulls();
103
104 let mut orig_min_u64 = u64::MAX;
106 let mut orig_max_u64 = 0u64;
107 let mut orig_min_i64 = i64::MAX;
108 let mut orig_max_i64 = i64::MIN;
109 let mut res_min = i64::MAX;
111 let mut res_max = i64::MIN;
112
113 if is_unsigned {
114 let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
115 for i in 0..len {
116 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
117 if valid {
118 type U<TT> =
119 <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
120 let v_u: U<T> = vals[i].as_();
121 let v_u64: u64 = v_u.as_();
122 if v_u64 < orig_min_u64 {
123 orig_min_u64 = v_u64;
124 }
125 if v_u64 > orig_max_u64 {
126 orig_max_u64 = v_u64;
127 }
128 let pr = slope * (i as f64) + intercept;
129 let p = predict_u64_saturated(pr, max_u64);
130 let (pos, mag) = if v_u64 >= p {
131 (true, v_u64 - p)
132 } else {
133 (false, p - v_u64)
134 };
135 let m = (mag & (i64::MAX as u64)) as i64;
136 let r = if pos { m } else { -m };
137 if r < res_min {
138 res_min = r;
139 }
140 if r > res_max {
141 res_max = r;
142 }
143 residuals.push(r);
144 } else {
145 residuals.push(0);
146 }
147 }
148 } else {
149 let (min_i64, max_i64): (i64, i64) =
150 (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
151 for i in 0..len {
152 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
153 if valid {
154 let v_i64: i64 = vals[i].as_();
155 if v_i64 < orig_min_i64 {
156 orig_min_i64 = v_i64;
157 }
158 if v_i64 > orig_max_i64 {
159 orig_max_i64 = v_i64;
160 }
161 let pr = slope * (i as f64) + intercept;
162 let p = predict_i64_saturated(pr, min_i64, max_i64);
163 let r = v_i64 - p;
164 if r < res_min {
165 res_min = r;
166 }
167 if r > res_max {
168 res_max = r;
169 }
170 residuals.push(r);
171 } else {
172 residuals.push(0);
173 }
174 }
175 }
176
177 let res_width: u128 = (res_max as i128 - res_min as i128) as u128;
179 let orig_width: u128 = if is_unsigned {
180 (orig_max_u64 as u128).saturating_sub(orig_min_u64 as u128)
181 } else {
182 (orig_max_i64 as i128 - orig_min_i64 as i128) as u128
183 };
184 if res_width >= orig_width {
185 intercept = 0.0;
187 slope = 0.0;
188 residuals.clear();
189 if is_unsigned {
190 for i in 0..len {
191 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
192 if valid {
193 type U<TT> = <<TT as LiquidPrimitiveType>::UnSignedType as ArrowPrimitiveType>::Native;
194 let v_u: U<T> = vals[i].as_();
195 let v_u64: u64 = v_u.as_();
196 let r = (v_u64 & (i64::MAX as u64)) as i64;
197 residuals.push(r);
198 } else {
199 residuals.push(0);
200 }
201 }
202 } else {
203 for i in 0..len {
204 let valid = nulls_opt.as_ref().is_none_or(|n| n.is_valid(i));
205 if valid {
206 let v_i64: i64 = vals[i].as_();
207 residuals.push(v_i64);
208 } else {
209 residuals.push(0);
210 }
211 }
212 }
213 }
214 let residuals_buf: ScalarBuffer<i64> = ScalarBuffer::from(residuals);
215 let nulls = arrow_array.nulls().cloned();
216 let res_prim = PrimitiveArray::<Int64Type>::new(residuals_buf, nulls);
217 let residuals = LiquidPrimitiveArray::<Int64Type>::from_arrow_array(res_prim);
218
219 Self {
220 residuals,
221 intercept,
222 slope,
223 _phantom: PhantomData,
224 }
225 }
226
227 fn len(&self) -> usize {
228 self.residuals.len()
229 }
230
231 fn residual_starting_loc() -> usize {
232 let header_size =
234 LiquidIPCHeader::size() + std::mem::size_of::<f64>() + std::mem::size_of::<f64>();
235 (header_size + 7) & !7
236 }
237
238 fn to_bytes_inner(&self) -> Vec<u8> {
239 let header = LiquidIPCHeader::new(
240 LiquidDataType::LinearInteger as u16,
241 get_physical_type_id::<T>(),
242 );
243 let start = Self::residual_starting_loc();
244 let mut out = Vec::with_capacity(start + 256);
245
246 out.extend_from_slice(&header.to_bytes());
248 out.extend_from_slice(&self.intercept.to_le_bytes());
250 out.extend_from_slice(&self.slope.to_le_bytes());
251
252 while out.len() < start {
253 out.push(0);
254 }
255
256 out.extend_from_slice(&self.residuals.to_bytes_inner());
258 out
259 }
260
261 pub fn from_bytes(bytes: Bytes) -> Self {
263 let _hdr = LiquidIPCHeader::from_bytes(&bytes);
264
265 let intercept_off = LiquidIPCHeader::size();
267 let intercept =
268 f64::from_le_bytes(bytes[intercept_off..intercept_off + 8].try_into().unwrap());
269
270 let slope_off = intercept_off + std::mem::size_of::<f64>();
272 let slope = f64::from_le_bytes(bytes[slope_off..slope_off + 8].try_into().unwrap());
273
274 let start = Self::residual_starting_loc();
276 let res_bytes = bytes.slice(start..);
277 let residuals = LiquidPrimitiveArray::<Int64Type>::from_bytes(res_bytes);
278
279 Self {
280 residuals,
281 intercept,
282 slope,
283 _phantom: PhantomData,
284 }
285 }
286}
287
288impl<T> LiquidArray for LiquidLinearArray<T>
289where
290 T: LiquidPrimitiveType,
291 T::Native: AsPrimitive<f64> + FromPrimitive + Bounded,
292{
293 fn as_any(&self) -> &dyn Any {
294 self
295 }
296
297 fn original_arrow_data_type(&self) -> DataType {
298 T::DATA_TYPE.clone()
299 }
300
301 fn get_array_memory_size(&self) -> usize {
302 self.residuals.get_array_memory_size()
303 + std::mem::size_of::<f64>() + std::mem::size_of::<f64>() }
306
307 fn len(&self) -> usize {
308 self.len()
309 }
310
311 fn to_arrow_array(&self) -> ArrayRef {
312 let arr = self.residuals.to_arrow_array();
313 let (_dt, residuals, nulls) = arr.as_primitive::<Int64Type>().clone().into_parts();
314
315 let mut final_values = Vec::<T::Native>::with_capacity(self.len());
317 let is_unsigned = <T as PrimitiveKind>::IS_UNSIGNED;
318 if is_unsigned {
319 let max_u64: u64 = <T as PrimitiveKind>::MAX_U64;
320 for (i, &e) in residuals.iter().enumerate() {
321 let pr = self.slope * (i as f64) + self.intercept;
322 let p = predict_u64_saturated(pr, max_u64);
323 let mag = e.unsigned_abs();
324 let sum = if e >= 0 {
325 p.saturating_add(mag)
326 } else {
327 p.saturating_sub(mag)
328 };
329 final_values.push(T::Native::from_u64(sum).unwrap());
330 }
331 } else {
332 let (min_i64, max_i64): (i64, i64) =
333 (<T as PrimitiveKind>::MIN_I64, <T as PrimitiveKind>::MAX_I64);
334 for (i, &e) in residuals.iter().enumerate() {
335 let pr = self.slope * (i as f64) + self.intercept;
336 let p = predict_i64_saturated(pr, min_i64, max_i64);
337 let sum = p.saturating_add(e);
338 final_values.push(T::Native::from_i64(sum).unwrap());
339 }
340 }
341
342 let values_buf: ScalarBuffer<T::Native> = ScalarBuffer::from(final_values);
343 Arc::new(PrimitiveArray::<T>::new(values_buf, nulls))
344 }
345
346 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
347 let arr = self.to_arrow_array();
348 let selection = BooleanArray::new(selection.clone(), None);
349 filter::filter(&arr, &selection).unwrap()
350 }
351
352 fn try_eval_predicate(
353 &self,
354 _predicate: &Arc<dyn datafusion::physical_plan::PhysicalExpr>,
355 _filter: &BooleanBuffer,
356 ) -> Option<BooleanArray> {
357 None
359 }
360
361 fn to_bytes(&self) -> Vec<u8> {
362 self.to_bytes_inner()
363 }
364
365 fn data_type(&self) -> LiquidDataType {
366 LiquidDataType::LinearInteger
367 }
368}
369
370#[inline]
371fn predict_u64_saturated(pred: f64, max_u64: u64) -> u64 {
372 if !pred.is_finite() || pred <= 0.0 {
373 0
374 } else if pred >= max_u64 as f64 {
375 max_u64
376 } else {
377 pred.round() as u64
378 }
379}
380
381#[inline]
382fn predict_i64_saturated(pred: f64, min_i64: i64, max_i64: i64) -> i64 {
383 if !pred.is_finite() {
384 0
385 } else if pred <= min_i64 as f64 {
386 min_i64
387 } else if pred >= max_i64 as f64 {
388 max_i64
389 } else {
390 pred.round() as i64
391 }
392}
393
394fn fit_linf(values: &[f64], idxs: &[u32]) -> (f64, f64) {
402 let n = values.len();
403 assert_eq!(values.len(), idxs.len());
404 if n == 0 {
405 return (0.0, 0.0);
406 }
407 if n == 1 {
408 return (values[0], 0.0);
409 }
410
411 let mut slope_min = f64::INFINITY;
412 let mut slope_max = f64::NEG_INFINITY;
413 for k in 1..n {
414 let di = (idxs[k] - idxs[k - 1]) as f64;
415 if di > 0.0 {
416 let dv = values[k] - values[k - 1];
417 let s = dv / di;
418 if s < slope_min {
419 slope_min = s;
420 }
421 if s > slope_max {
422 slope_max = s;
423 }
424 }
425 }
426 if !slope_min.is_finite() || !slope_max.is_finite() {
427 slope_min = 0.0;
428 slope_max = 0.0;
429 }
430
431 let mut lo = slope_min.min(slope_max);
432 let mut hi = slope_min.max(slope_max);
433 if (hi - lo).abs() < 1e-12 {
434 let pad = if hi.abs() < 1.0 { 1.0 } else { hi.abs() * 1e-6 };
435 lo -= pad;
436 hi += pad;
437 }
438
439 #[inline]
440 fn range_stats(values: &[f64], idxs: &[u32], m: f64) -> (f64, u32, f64, u32) {
441 let mut min_s = f64::INFINITY;
442 let mut max_s = f64::NEG_INFINITY;
443 let mut i_min = 0u32;
444 let mut i_max = 0u32;
445 for k in 0..values.len() {
446 let i = idxs[k] as f64;
447 let s = values[k] - m * i;
448 if s < min_s {
449 min_s = s;
450 i_min = idxs[k];
451 }
452 if s > max_s {
453 max_s = s;
454 i_max = idxs[k];
455 }
456 }
457 (min_s, i_min, max_s, i_max)
458 }
459
460 const MAX_ITERS: usize = 8;
461 for _ in 0..MAX_ITERS {
462 let m = 0.5 * (lo + hi);
463 let (_min_s, i_min, _max_s, i_max) = range_stats(values, idxs, m);
464 let g = (i_min as i64) - (i_max as i64);
465 if g > 0 {
466 hi = m;
467 } else if g < 0 {
468 lo = m;
469 } else {
470 lo = m;
471 hi = m;
472 break;
473 }
474 if (hi - lo).abs() < 1e-12 {
475 break;
476 }
477 }
478
479 let m = 0.5 * (lo + hi);
480 let (min_s, _i_min, max_s, _i_max) = range_stats(values, idxs, m);
481 let b = 0.5 * (max_s + min_s);
482 (b, m)
483}
484
485#[inline]
486fn collect_non_null_f64_and_indices<T>(arr: &PrimitiveArray<T>) -> (Vec<f64>, Vec<u32>)
487where
488 T: LiquidPrimitiveType,
489 T::Native: AsPrimitive<f64>,
490{
491 let nn = arr.len() - arr.null_count();
492 let mut values = Vec::with_capacity(nn);
493 let mut idxs = Vec::with_capacity(nn);
494 let vals = arr.values();
495 if arr.null_count() == 0 {
496 for (i, v) in vals.iter().enumerate() {
497 values.push(v.as_());
498 idxs.push(i as u32);
499 }
500 } else {
501 let nulls = arr.nulls().unwrap();
502 for (i, v) in vals.iter().enumerate() {
503 if nulls.is_valid(i) {
504 values.push(v.as_());
505 idxs.push(i as u32);
506 }
507 }
508 }
509 (values, idxs)
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515
516 fn roundtrip_eq(values: Vec<Option<i32>>) {
517 let arr = PrimitiveArray::<Int32Type>::from(values.clone());
518 let linear = LiquidLinearI32Array::from_arrow_array(arr.clone());
519 let decoded = linear.to_arrow_array();
520 assert_eq!(decoded.as_ref(), &arr);
521
522 let bytes = Bytes::from(linear.to_bytes());
523 let decoded = LiquidLinearI32Array::from_bytes(bytes);
524 let round = decoded.to_arrow_array();
525 assert_eq!(round.as_ref(), &arr);
526 }
527
528 macro_rules! roundtrip_eq_t {
529 ($T:ty, $values:expr) => {{
530 let arr = PrimitiveArray::<$T>::from(($values).clone());
531 let linear = LiquidLinearArray::<$T>::from_arrow_array(arr.clone());
532 let decoded = linear.to_arrow_array();
533 assert_eq!(decoded.as_ref(), &arr);
534
535 let bytes = Bytes::from(linear.to_bytes());
536 let decoded = LiquidLinearArray::<$T>::from_bytes(bytes);
537 let round = decoded.to_arrow_array();
538 assert_eq!(round.as_ref(), &arr);
539 }};
540 }
541
542 #[test]
543 fn test_roundtrip_basic() {
544 roundtrip_eq(vec![
546 Some(10),
547 Some(15),
548 Some(14),
549 Some(20),
550 Some(18),
551 Some(25),
552 Some(24),
553 ]);
554 }
555
556 #[test]
557 fn test_roundtrip_with_nulls() {
558 roundtrip_eq(vec![Some(10), None, Some(30), None, Some(50), Some(70)]);
559 }
560
561 #[test]
562 fn test_all_nulls() {
563 roundtrip_eq(vec![None, None, None, None]);
564 }
565
566 #[test]
567 fn test_single_value() {
568 roundtrip_eq(vec![Some(42)]);
569 }
570
571 #[test]
572 fn test_empty() {
573 roundtrip_eq(vec![]);
574 }
575
576 #[test]
577 fn test_negative_values() {
578 roundtrip_eq(vec![
579 Some(-100),
580 Some(-50),
581 Some(0),
582 Some(50),
583 Some(25),
584 None,
585 Some(-25),
586 ]);
587 }
588
589 #[test]
590 fn test_filter_basic() {
591 let original: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3), None, Some(5), Some(8)];
592 let arr = PrimitiveArray::<Int32Type>::from(original.clone());
593 let linear = LiquidLinearI32Array::from_arrow_array(arr);
594 let selection = BooleanBuffer::from(vec![true, false, true, false, true, false]);
595 let result = linear.filter(&selection);
596 let expected = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(3), Some(5)]);
597 assert_eq!(result.as_ref(), &expected);
598 }
599
600 #[test]
601 fn test_original_arrow_data_type_returns_int32() {
602 let arr = PrimitiveArray::<Int32Type>::from(vec![Some(1), Some(2)]);
603 let linear = LiquidLinearI32Array::from_arrow_array(arr);
604 assert_eq!(linear.original_arrow_data_type(), DataType::Int32);
605 }
606
607 #[test]
608 fn test_roundtrip_i8() {
609 roundtrip_eq_t!(Int8Type, vec![Some(-10), Some(0), Some(10), None, Some(20)]);
610 }
611
612 #[test]
613 fn test_roundtrip_i16() {
614 roundtrip_eq_t!(
615 Int16Type,
616 vec![Some(-1000), Some(0), Some(1000), None, Some(2000)]
617 );
618 }
619
620 #[test]
621 fn test_roundtrip_i64() {
622 roundtrip_eq_t!(
623 Int64Type,
624 vec![
625 Some(-10_000_000_000),
626 Some(0),
627 Some(10_000_000_000),
628 None,
629 Some(20_000_000_000),
630 ]
631 );
632 }
633
634 #[test]
635 fn test_roundtrip_u8() {
636 roundtrip_eq_t!(
637 UInt8Type,
638 vec![Some(0), Some(10), Some(200), None, Some(255)]
639 );
640 }
641
642 #[test]
643 fn test_roundtrip_u16() {
644 roundtrip_eq_t!(
645 UInt16Type,
646 vec![Some(0), Some(1000), Some(60000), None, Some(500)]
647 );
648 }
649
650 #[test]
651 fn test_roundtrip_u32() {
652 roundtrip_eq_t!(
653 UInt32Type,
654 vec![
655 Some(0),
656 Some(1_000_000),
657 Some(3_000_000_000),
658 None,
659 Some(123_456_789),
660 ]
661 );
662 }
663
664 #[test]
665 fn test_roundtrip_u64() {
666 roundtrip_eq_t!(
667 UInt64Type,
668 vec![
669 Some(0),
670 Some(10_000_000_000),
671 Some(9_000_000_000_000_000_000u64),
672 None,
673 Some(42),
674 ]
675 );
676 }
677
678 #[test]
679 fn test_roundtrip_date32() {
680 roundtrip_eq_t!(
681 Date32Type,
682 vec![Some(-365), Some(0), Some(365), None, Some(18262)]
683 );
684 }
685
686 #[test]
687 fn test_roundtrip_date64() {
688 roundtrip_eq_t!(
689 Date64Type,
690 vec![
691 Some(-86_400_000),
692 Some(0),
693 Some(86_400_000),
694 None,
695 Some(1_000_000_000_000),
696 ]
697 );
698 }
699
700 #[test]
701 fn test_compression() {
702 let original = (0..1_000_000).step_by(100).collect::<Vec<_>>();
703
704 let original = PrimitiveArray::<Int32Type>::from_iter_values(original);
705 let arrow_size = original.get_array_memory_size();
706
707 let liquid_linear = LiquidLinearI32Array::from_arrow_array(original.clone());
708 let liquid_linear_size = liquid_linear.get_array_memory_size();
709
710 let liquid_primitive =
711 LiquidPrimitiveArray::<Int32Type>::from_arrow_array(original.clone());
712 let liquid_primitive_size = liquid_primitive.get_array_memory_size();
713
714 println!(
715 "arrow_size: {arrow_size}, liquid_linear_size: {liquid_linear_size}, liquid_primitive_size: {liquid_primitive_size}",
716 );
717
718 assert!(liquid_linear_size < arrow_size);
719 assert!(liquid_primitive_size < arrow_size);
720 assert!(liquid_linear_size < liquid_primitive_size);
721
722 let original: ArrayRef = Arc::new(original);
723 assert_eq!(original.as_ref(), liquid_linear.to_arrow_array().as_ref());
724 assert_eq!(
725 original.as_ref(),
726 liquid_primitive.to_arrow_array().as_ref()
727 );
728 }
729}