vortex_fastlanes/for/array/
for_compress.rs1use num_traits::PrimInt;
5use num_traits::WrappingSub;
6use vortex_array::IntoArray;
7use vortex_array::arrays::PrimitiveArray;
8use vortex_array::dtype::NativePType;
9use vortex_array::expr::stats::Stat;
10use vortex_array::match_each_integer_ptype;
11use vortex_array::stats::ArrayStats;
12use vortex_error::VortexResult;
13use vortex_error::vortex_err;
14
15use crate::FoRArray;
16
17impl FoRArray {
18 pub fn encode(array: PrimitiveArray) -> VortexResult<FoRArray> {
19 let stats = ArrayStats::from(array.statistics().to_owned());
20 let min = array
21 .statistics()
22 .compute_stat(Stat::Min)?
23 .ok_or_else(|| vortex_err!("Min stat not found"))?;
24
25 let encoded = match_each_integer_ptype!(array.ptype(), |T| {
26 compress_primitive::<T>(array, T::try_from(&min)?)?.into_array()
27 });
28 let for_array = FoRArray::try_new(encoded, min)?;
29 for_array
30 .stats_set()
31 .to_ref(for_array.as_ref())
32 .inherit_from(stats.to_ref(for_array.as_ref()));
33 Ok(for_array)
34 }
35}
36
37fn compress_primitive<T: NativePType + WrappingSub + PrimInt>(
38 parray: PrimitiveArray,
39 min: T,
40) -> VortexResult<PrimitiveArray> {
41 parray.map_each_with_validity::<T, _, _>(|(v, bool)| {
44 if bool {
45 v.wrapping_sub(&min)
46 } else {
47 T::zero()
48 }
49 })
50}
51
52#[cfg(test)]
53mod test {
54 use std::sync::LazyLock;
55
56 use itertools::Itertools;
57 use vortex_array::Array;
58 use vortex_array::ToCanonical;
59 use vortex_array::VortexSessionExecute;
60 use vortex_array::assert_arrays_eq;
61 use vortex_array::dtype::PType;
62 use vortex_array::expr::stats::StatsProvider;
63 use vortex_array::scalar::Scalar;
64 use vortex_array::session::ArraySession;
65 use vortex_array::validity::Validity;
66 use vortex_buffer::buffer;
67 use vortex_session::VortexSession;
68
69 use super::*;
70 use crate::BitPackedArray;
71 use crate::r#for::array::for_decompress::decompress;
72 use crate::r#for::array::for_decompress::fused_decompress;
73
74 static SESSION: LazyLock<VortexSession> =
75 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
76
77 #[test]
78 fn test_compress_round_trip_small() {
79 let array = PrimitiveArray::new(
80 (1i32..10).collect::<vortex_buffer::Buffer<_>>(),
81 Validity::NonNullable,
82 );
83 let compressed = FoRArray::encode(array.clone()).unwrap();
84 assert_eq!(i32::try_from(compressed.reference_scalar()).unwrap(), 1);
85
86 assert_arrays_eq!(compressed, array);
87 }
88
89 #[test]
90 fn test_compress() {
91 let array = PrimitiveArray::new(
93 (0u32..10_000)
94 .map(|v| v + 1_000_000)
95 .collect::<vortex_buffer::Buffer<_>>(),
96 Validity::NonNullable,
97 );
98 let compressed = FoRArray::encode(array).unwrap();
99 assert_eq!(
100 u32::try_from(compressed.reference_scalar()).unwrap(),
101 1_000_000u32
102 );
103 }
104
105 #[test]
106 fn test_zeros() {
107 let array = PrimitiveArray::new(buffer![0i32; 100], Validity::NonNullable);
108 assert_eq!(array.statistics().len(), 0);
109
110 let dtype = array.dtype().clone();
111 let compressed = FoRArray::encode(array).unwrap();
112 assert_eq!(compressed.reference_scalar().dtype(), &dtype);
113 assert!(compressed.reference_scalar().dtype().is_signed_int());
114 assert!(compressed.encoded().dtype().is_signed_int());
115
116 let encoded = compressed.encoded().scalar_at(0).unwrap();
117 assert_eq!(encoded, Scalar::from(0i32));
118 }
119
120 #[test]
121 fn test_decompress() {
122 let array = PrimitiveArray::from_iter((0u32..100_000).step_by(1024).map(|v| v + 1_000_000));
124 let compressed = FoRArray::encode(array.clone()).unwrap();
125 assert_arrays_eq!(compressed, array);
126 }
127
128 #[test]
129 fn test_decompress_fused() {
130 let expect = PrimitiveArray::from_iter((0u32..1024).map(|x| x % 7 + 10));
132 let array = PrimitiveArray::from_iter((0u32..1024).map(|x| x % 7));
133 let bp = BitPackedArray::encode(array.as_ref(), 3).unwrap();
134 let compressed = FoRArray::try_new(bp.into_array(), 10u32.into()).unwrap();
135 assert_arrays_eq!(compressed, expect);
136 }
137
138 #[test]
139 fn test_decompress_fused_patches() -> VortexResult<()> {
140 let expect = PrimitiveArray::from_iter((0u32..1024).map(|x| x % 7 + 10));
142 let array = PrimitiveArray::from_iter((0u32..1024).map(|x| x % 7));
143 let bp = BitPackedArray::encode(array.as_ref(), 2).unwrap();
144 let compressed = FoRArray::try_new(bp.clone().into_array(), 10u32.into()).unwrap();
145 let decompressed =
146 fused_decompress::<u32>(&compressed, &bp, &mut SESSION.create_execution_ctx())?;
147 assert_arrays_eq!(decompressed, expect);
148 Ok(())
149 }
150
151 #[test]
152 fn test_overflow() -> VortexResult<()> {
153 let array = PrimitiveArray::from_iter(i8::MIN..=i8::MAX);
154 let compressed = FoRArray::encode(array.clone()).unwrap();
155 assert_eq!(
156 i8::MIN,
157 compressed
158 .reference_scalar()
159 .as_primitive()
160 .typed_value::<i8>()
161 .unwrap()
162 );
163
164 let encoded = compressed
165 .encoded()
166 .to_primitive()
167 .reinterpret_cast(PType::U8);
168 let unsigned: Vec<u8> = (0..=u8::MAX).collect_vec();
169 let expected_unsigned = PrimitiveArray::from_iter(unsigned);
170 assert_arrays_eq!(encoded, expected_unsigned);
171
172 let decompressed = decompress(&compressed, &mut SESSION.create_execution_ctx())?;
173 array
174 .as_slice::<i8>()
175 .iter()
176 .enumerate()
177 .for_each(|(i, v)| {
178 assert_eq!(*v, i8::try_from(&compressed.scalar_at(i).unwrap()).unwrap());
179 });
180 assert_arrays_eq!(decompressed, array);
181 Ok(())
182 }
183}