1pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, MaskedArray, PrimitiveVTable};
9use vortex_array::vtable::ValidityHelper;
10use vortex_array::{ArrayRef, IntoArray, ToCanonical};
11use vortex_dict::DictArray;
12use vortex_dtype::PType;
13use vortex_error::{VortexExpect, VortexResult, vortex_panic};
14use vortex_scalar::Scalar;
15
16pub use self::stats::FloatStats;
17use crate::float::dictionary::dictionary_encode;
18use crate::integer::{IntCompressor, IntegerStats};
19use crate::patches::compress_patches;
20use crate::rle::RLEScheme;
21use crate::{
22 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
23 estimate_compression_ratio_with_sampling, integer,
24};
25
26pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
27
28impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
29
30pub struct FloatCompressor;
32
33impl Compressor for FloatCompressor {
34 type ArrayVTable = PrimitiveVTable;
35 type SchemeType = dyn FloatScheme;
36 type StatsType = FloatStats;
37
38 fn schemes() -> &'static [&'static Self::SchemeType] {
39 &[
40 &UncompressedScheme,
41 &ConstantScheme,
42 &ALPScheme,
43 &ALPRDScheme,
44 &DictScheme,
45 &RLE_FLOAT_SCHEME,
46 ]
47 }
48
49 fn default_scheme() -> &'static Self::SchemeType {
50 &UncompressedScheme
51 }
52
53 fn dict_scheme_code() -> FloatCode {
54 DICT_SCHEME
55 }
56}
57
58const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
59const CONSTANT_SCHEME: FloatCode = FloatCode(1);
60const ALP_SCHEME: FloatCode = FloatCode(2);
61const ALPRD_SCHEME: FloatCode = FloatCode(3);
62const DICT_SCHEME: FloatCode = FloatCode(4);
63const RUN_END_SCHEME: FloatCode = FloatCode(5);
64const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6);
65
66#[derive(Debug, Copy, Clone)]
67struct UncompressedScheme;
68
69#[derive(Debug, Copy, Clone)]
70struct ConstantScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct ALPScheme;
74
75#[derive(Debug, Copy, Clone)]
76struct ALPRDScheme;
77
78#[derive(Debug, Copy, Clone)]
79struct DictScheme;
80
81pub const RLE_FLOAT_SCHEME: RLEScheme<FloatStats, FloatCode> = RLEScheme::new(
82 RUN_LENGTH_SCHEME,
83 |values, is_sample, allowed_cascading, excludes| {
84 FloatCompressor::compress(values, is_sample, allowed_cascading, excludes)
85 },
86);
87
88impl Scheme for UncompressedScheme {
89 type StatsType = FloatStats;
90 type CodeType = FloatCode;
91
92 fn code(&self) -> FloatCode {
93 UNCOMPRESSED_SCHEME
94 }
95
96 fn expected_compression_ratio(
97 &self,
98 _stats: &Self::StatsType,
99 _is_sample: bool,
100 _allowed_cascading: usize,
101 _excludes: &[FloatCode],
102 ) -> VortexResult<f64> {
103 Ok(1.0)
104 }
105
106 fn compress(
107 &self,
108 stats: &Self::StatsType,
109 _is_sample: bool,
110 _allowed_cascading: usize,
111 _excludes: &[FloatCode],
112 ) -> VortexResult<ArrayRef> {
113 Ok(stats.source().to_array())
114 }
115}
116
117impl Scheme for ConstantScheme {
118 type StatsType = FloatStats;
119 type CodeType = FloatCode;
120
121 fn code(&self) -> FloatCode {
122 CONSTANT_SCHEME
123 }
124
125 fn expected_compression_ratio(
126 &self,
127 stats: &Self::StatsType,
128 is_sample: bool,
129 _allowed_cascading: usize,
130 _excludes: &[FloatCode],
131 ) -> VortexResult<f64> {
132 if is_sample {
134 return Ok(0.0);
135 }
136
137 if stats.null_count as usize == stats.src.len() || stats.value_count == 0 {
138 return Ok(0.0);
139 }
140
141 if stats.distinct_values_count != 1 {
143 return Ok(0.0);
144 }
145
146 Ok(stats.value_count as f64)
147 }
148
149 fn compress(
150 &self,
151 stats: &Self::StatsType,
152 _is_sample: bool,
153 _allowed_cascading: usize,
154 _excludes: &[FloatCode],
155 ) -> VortexResult<ArrayRef> {
156 let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
157
158 match scalar_idx {
159 Some(idx) => {
160 let scalar = stats.source().scalar_at(idx);
161 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
162 if !stats.source().all_valid() {
163 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
164 } else {
165 Ok(const_arr)
166 }
167 }
168 None => Ok(ConstantArray::new(
169 Scalar::null(stats.src.dtype().clone()),
170 stats.src.len(),
171 )
172 .into_array()),
173 }
174 }
175}
176
177#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
178pub struct FloatCode(u8);
179
180impl Scheme for ALPScheme {
181 type StatsType = FloatStats;
182 type CodeType = FloatCode;
183
184 fn code(&self) -> FloatCode {
185 ALP_SCHEME
186 }
187
188 fn expected_compression_ratio(
189 &self,
190 stats: &Self::StatsType,
191 is_sample: bool,
192 allowed_cascading: usize,
193 excludes: &[FloatCode],
194 ) -> VortexResult<f64> {
195 if stats.source().ptype() == PType::F16 {
197 return Ok(0.0);
198 }
199
200 if allowed_cascading == 0 {
201 return Ok(0.0);
204 }
205
206 estimate_compression_ratio_with_sampling(
207 self,
208 stats,
209 is_sample,
210 allowed_cascading,
211 excludes,
212 )
213 }
214
215 fn compress(
216 &self,
217 stats: &FloatStats,
218 is_sample: bool,
219 allowed_cascading: usize,
220 excludes: &[FloatCode],
221 ) -> VortexResult<ArrayRef> {
222 let alp_encoded = ALPEncoding
223 .encode(&stats.source().to_canonical(), None)?
224 .vortex_expect("Input is a supported floating point array");
225 let alp = alp_encoded.as_::<ALPVTable>();
226 let alp_ints = alp.encoded().to_primitive();
227
228 let mut int_excludes = Vec::new();
232 if excludes.contains(&DICT_SCHEME) {
233 int_excludes.push(integer::DictScheme.code());
234 }
235 if excludes.contains(&RUN_END_SCHEME) {
236 int_excludes.push(integer::RunEndScheme.code());
237 }
238
239 let compressed_alp_ints =
240 IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
241
242 let patches = alp.patches().map(compress_patches).transpose()?;
243
244 Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
245 }
246}
247
248impl Scheme for ALPRDScheme {
249 type StatsType = FloatStats;
250 type CodeType = FloatCode;
251
252 fn code(&self) -> FloatCode {
253 ALPRD_SCHEME
254 }
255
256 fn expected_compression_ratio(
257 &self,
258 stats: &Self::StatsType,
259 is_sample: bool,
260 allowed_cascading: usize,
261 excludes: &[FloatCode],
262 ) -> VortexResult<f64> {
263 if stats.source().ptype() == PType::F16 {
264 return Ok(0.0);
265 }
266
267 estimate_compression_ratio_with_sampling(
268 self,
269 stats,
270 is_sample,
271 allowed_cascading,
272 excludes,
273 )
274 }
275
276 fn compress(
277 &self,
278 stats: &Self::StatsType,
279 _is_sample: bool,
280 _allowed_cascading: usize,
281 _excludes: &[FloatCode],
282 ) -> VortexResult<ArrayRef> {
283 let encoder = match stats.source().ptype() {
284 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
285 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
286 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
287 };
288
289 let mut alp_rd = encoder.encode(stats.source());
290
291 let patches = alp_rd
292 .left_parts_patches()
293 .map(compress_patches)
294 .transpose()?;
295 alp_rd.replace_left_parts_patches(patches);
296
297 Ok(alp_rd.into_array())
298 }
299}
300
301impl Scheme for DictScheme {
302 type StatsType = FloatStats;
303 type CodeType = FloatCode;
304
305 fn code(&self) -> FloatCode {
306 DICT_SCHEME
307 }
308
309 fn expected_compression_ratio(
310 &self,
311 stats: &Self::StatsType,
312 is_sample: bool,
313 allowed_cascading: usize,
314 excludes: &[FloatCode],
315 ) -> VortexResult<f64> {
316 if stats.value_count == 0 {
317 return Ok(0.0);
318 }
319
320 if stats.distinct_values_count > stats.value_count / 2 {
322 return Ok(0.0);
323 }
324
325 estimate_compression_ratio_with_sampling(
327 self,
328 stats,
329 is_sample,
330 allowed_cascading,
331 excludes,
332 )
333 }
334
335 fn compress(
336 &self,
337 stats: &Self::StatsType,
338 is_sample: bool,
339 allowed_cascading: usize,
340 _excludes: &[FloatCode],
341 ) -> VortexResult<ArrayRef> {
342 let dict_array = dictionary_encode(stats);
343
344 let codes_stats = IntegerStats::generate_opts(
346 &dict_array.codes().to_primitive().narrow()?,
347 GenerateStatsOptions {
348 count_distinct_values: false,
349 },
350 );
351 let codes_scheme = IntCompressor::choose_scheme(
352 &codes_stats,
353 is_sample,
354 allowed_cascading - 1,
355 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
356 )?;
357 let compressed_codes = codes_scheme.compress(
358 &codes_stats,
359 is_sample,
360 allowed_cascading - 1,
361 &[integer::DictScheme.code()],
362 )?;
363
364 let compressed_values = FloatCompressor::compress(
365 &dict_array.values().to_primitive(),
366 is_sample,
367 allowed_cascading - 1,
368 &[DICT_SCHEME],
369 )?;
370
371 unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use std::iter;
379
380 use vortex_array::arrays::PrimitiveArray;
381 use vortex_array::validity::Validity;
382 use vortex_array::{Array, IntoArray, ToCanonical};
383 use vortex_buffer::{Buffer, buffer_mut};
384
385 use crate::float::{FloatCompressor, RLE_FLOAT_SCHEME};
386 use crate::{Compressor, CompressorStats, MAX_CASCADE, Scheme};
387
388 #[test]
389 fn test_empty() {
390 let result = FloatCompressor::compress(
392 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
393 false,
394 3,
395 &[],
396 )
397 .unwrap();
398
399 assert!(result.is_empty());
400 }
401
402 #[test]
403 fn test_compress() {
404 let mut values = buffer_mut![1.0f32; 1024];
405 for i in 0..1024 {
407 values[i] = (i % 50) as f32;
411 }
412
413 let floats = values.into_array().to_primitive();
414 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
415 println!("compressed: {}", compressed.display_tree())
416 }
417
418 #[test]
419 fn test_rle_compression() {
420 let mut values = Vec::new();
421 values.extend(iter::repeat_n(1.5f32, 100));
422 values.extend(iter::repeat_n(2.7f32, 200));
423 values.extend(iter::repeat_n(3.15f32, 150));
424
425 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
426 let stats = crate::float::FloatStats::generate(&array);
427 let compressed = RLE_FLOAT_SCHEME.compress(&stats, false, 3, &[]).unwrap();
428
429 let decoded = compressed.to_primitive();
430 assert_eq!(decoded.as_slice::<f32>(), values.as_slice());
431 }
432}