1pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::ALPArray;
8use vortex_alp::ALPVTable;
9use vortex_alp::RDEncoder;
10use vortex_array::ArrayRef;
11use vortex_array::IntoArray;
12use vortex_array::ToCanonical;
13use vortex_array::arrays::ConstantArray;
14use vortex_array::arrays::DictArray;
15use vortex_array::arrays::MaskedArray;
16use vortex_array::arrays::PrimitiveVTable;
17use vortex_array::vtable::ArrayVTableExt;
18use vortex_array::vtable::ValidityHelper;
19use vortex_dtype::PType;
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_error::vortex_panic;
23use vortex_scalar::Scalar;
24use vortex_sparse::SparseArray;
25use vortex_sparse::SparseVTable;
26
27pub use self::stats::FloatStats;
28use crate::Compressor;
29use crate::CompressorStats;
30use crate::GenerateStatsOptions;
31use crate::Scheme;
32use crate::estimate_compression_ratio_with_sampling;
33use crate::float::dictionary::dictionary_encode;
34use crate::integer;
35use crate::integer::IntCompressor;
36use crate::integer::IntegerStats;
37use crate::patches::compress_patches;
38use crate::rle::RLEScheme;
39
40pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
41
42impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
43
44pub struct FloatCompressor;
46
47impl Compressor for FloatCompressor {
48 type ArrayVTable = PrimitiveVTable;
49 type SchemeType = dyn FloatScheme;
50 type StatsType = FloatStats;
51
52 fn schemes() -> &'static [&'static Self::SchemeType] {
53 &[
54 &UncompressedScheme,
55 &ConstantScheme,
56 &ALPScheme,
57 &ALPRDScheme,
58 &DictScheme,
59 &NullDominated,
60 &RLE_FLOAT_SCHEME,
61 ]
62 }
63
64 fn default_scheme() -> &'static Self::SchemeType {
65 &UncompressedScheme
66 }
67
68 fn dict_scheme_code() -> FloatCode {
69 DICT_SCHEME
70 }
71}
72
73const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
74const CONSTANT_SCHEME: FloatCode = FloatCode(1);
75const ALP_SCHEME: FloatCode = FloatCode(2);
76const ALPRD_SCHEME: FloatCode = FloatCode(3);
77const DICT_SCHEME: FloatCode = FloatCode(4);
78const RUN_END_SCHEME: FloatCode = FloatCode(5);
79const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6);
80
81const SPARSE_SCHEME: FloatCode = FloatCode(7);
82
83#[derive(Debug, Copy, Clone)]
84struct UncompressedScheme;
85
86#[derive(Debug, Copy, Clone)]
87struct ConstantScheme;
88
89#[derive(Debug, Copy, Clone)]
90struct ALPScheme;
91
92#[derive(Debug, Copy, Clone)]
93struct ALPRDScheme;
94
95#[derive(Debug, Copy, Clone)]
96struct DictScheme;
97
98#[derive(Debug, Copy, Clone)]
99pub struct NullDominated;
100
101pub const RLE_FLOAT_SCHEME: RLEScheme<FloatStats, FloatCode> = RLEScheme::new(
102 RUN_LENGTH_SCHEME,
103 |values, is_sample, allowed_cascading, excludes| {
104 FloatCompressor::compress(values, is_sample, allowed_cascading, excludes)
105 },
106);
107
108impl Scheme for UncompressedScheme {
109 type StatsType = FloatStats;
110 type CodeType = FloatCode;
111
112 fn code(&self) -> FloatCode {
113 UNCOMPRESSED_SCHEME
114 }
115
116 fn expected_compression_ratio(
117 &self,
118 _stats: &Self::StatsType,
119 _is_sample: bool,
120 _allowed_cascading: usize,
121 _excludes: &[FloatCode],
122 ) -> VortexResult<f64> {
123 Ok(1.0)
124 }
125
126 fn compress(
127 &self,
128 stats: &Self::StatsType,
129 _is_sample: bool,
130 _allowed_cascading: usize,
131 _excludes: &[FloatCode],
132 ) -> VortexResult<ArrayRef> {
133 Ok(stats.source().to_array())
134 }
135}
136
137impl Scheme for ConstantScheme {
138 type StatsType = FloatStats;
139 type CodeType = FloatCode;
140
141 fn code(&self) -> FloatCode {
142 CONSTANT_SCHEME
143 }
144
145 fn expected_compression_ratio(
146 &self,
147 stats: &Self::StatsType,
148 is_sample: bool,
149 _allowed_cascading: usize,
150 _excludes: &[FloatCode],
151 ) -> VortexResult<f64> {
152 if is_sample {
154 return Ok(0.0);
155 }
156
157 if stats.null_count as usize == stats.src.len() || stats.value_count == 0 {
158 return Ok(0.0);
159 }
160
161 if stats.distinct_values_count != 1 {
163 return Ok(0.0);
164 }
165
166 Ok(stats.value_count as f64)
167 }
168
169 fn compress(
170 &self,
171 stats: &Self::StatsType,
172 _is_sample: bool,
173 _allowed_cascading: usize,
174 _excludes: &[FloatCode],
175 ) -> VortexResult<ArrayRef> {
176 let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
177
178 match scalar_idx {
179 Some(idx) => {
180 let scalar = stats.source().scalar_at(idx);
181 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
182 if !stats.source().all_valid() {
183 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
184 } else {
185 Ok(const_arr)
186 }
187 }
188 None => Ok(ConstantArray::new(
189 Scalar::null(stats.src.dtype().clone()),
190 stats.src.len(),
191 )
192 .into_array()),
193 }
194 }
195}
196
197#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
198pub struct FloatCode(u8);
199
200impl Scheme for ALPScheme {
201 type StatsType = FloatStats;
202 type CodeType = FloatCode;
203
204 fn code(&self) -> FloatCode {
205 ALP_SCHEME
206 }
207
208 fn expected_compression_ratio(
209 &self,
210 stats: &Self::StatsType,
211 is_sample: bool,
212 allowed_cascading: usize,
213 excludes: &[FloatCode],
214 ) -> VortexResult<f64> {
215 if stats.source().ptype() == PType::F16 {
217 return Ok(0.0);
218 }
219
220 if allowed_cascading == 0 {
221 return Ok(0.0);
224 }
225
226 estimate_compression_ratio_with_sampling(
227 self,
228 stats,
229 is_sample,
230 allowed_cascading,
231 excludes,
232 )
233 }
234
235 fn compress(
236 &self,
237 stats: &FloatStats,
238 is_sample: bool,
239 allowed_cascading: usize,
240 excludes: &[FloatCode],
241 ) -> VortexResult<ArrayRef> {
242 let alp_encoded = ALPVTable
243 .as_vtable()
244 .encode(&stats.source().to_canonical(), None)?
245 .vortex_expect("Input is a supported floating point array");
246 let alp = alp_encoded.as_::<ALPVTable>();
247 let alp_ints = alp.encoded().to_primitive();
248
249 let mut int_excludes = Vec::new();
253 if excludes.contains(&DICT_SCHEME) {
254 int_excludes.push(integer::DictScheme.code());
255 }
256 if excludes.contains(&RUN_END_SCHEME) {
257 int_excludes.push(integer::RunEndScheme.code());
258 }
259
260 let compressed_alp_ints =
261 IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
262
263 let patches = alp.patches().map(compress_patches).transpose()?;
264
265 Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
266 }
267}
268
269impl Scheme for ALPRDScheme {
270 type StatsType = FloatStats;
271 type CodeType = FloatCode;
272
273 fn code(&self) -> FloatCode {
274 ALPRD_SCHEME
275 }
276
277 fn expected_compression_ratio(
278 &self,
279 stats: &Self::StatsType,
280 is_sample: bool,
281 allowed_cascading: usize,
282 excludes: &[FloatCode],
283 ) -> VortexResult<f64> {
284 if stats.source().ptype() == PType::F16 {
285 return Ok(0.0);
286 }
287
288 estimate_compression_ratio_with_sampling(
289 self,
290 stats,
291 is_sample,
292 allowed_cascading,
293 excludes,
294 )
295 }
296
297 fn compress(
298 &self,
299 stats: &Self::StatsType,
300 _is_sample: bool,
301 _allowed_cascading: usize,
302 _excludes: &[FloatCode],
303 ) -> VortexResult<ArrayRef> {
304 let encoder = match stats.source().ptype() {
305 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
306 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
307 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
308 };
309
310 let mut alp_rd = encoder.encode(stats.source());
311
312 let patches = alp_rd
313 .left_parts_patches()
314 .map(compress_patches)
315 .transpose()?;
316 alp_rd.replace_left_parts_patches(patches);
317
318 Ok(alp_rd.into_array())
319 }
320}
321
322impl Scheme for DictScheme {
323 type StatsType = FloatStats;
324 type CodeType = FloatCode;
325
326 fn code(&self) -> FloatCode {
327 DICT_SCHEME
328 }
329
330 fn expected_compression_ratio(
331 &self,
332 stats: &Self::StatsType,
333 is_sample: bool,
334 allowed_cascading: usize,
335 excludes: &[FloatCode],
336 ) -> VortexResult<f64> {
337 if stats.value_count == 0 {
338 return Ok(0.0);
339 }
340
341 if stats.distinct_values_count > stats.value_count / 2 {
343 return Ok(0.0);
344 }
345
346 estimate_compression_ratio_with_sampling(
348 self,
349 stats,
350 is_sample,
351 allowed_cascading,
352 excludes,
353 )
354 }
355
356 fn compress(
357 &self,
358 stats: &Self::StatsType,
359 is_sample: bool,
360 allowed_cascading: usize,
361 _excludes: &[FloatCode],
362 ) -> VortexResult<ArrayRef> {
363 let dict_array = dictionary_encode(stats);
364
365 let codes_stats = IntegerStats::generate_opts(
367 &dict_array.codes().to_primitive().narrow()?,
368 GenerateStatsOptions {
369 count_distinct_values: false,
370 },
371 );
372 let codes_scheme = IntCompressor::choose_scheme(
373 &codes_stats,
374 is_sample,
375 allowed_cascading - 1,
376 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
377 )?;
378 let compressed_codes = codes_scheme.compress(
379 &codes_stats,
380 is_sample,
381 allowed_cascading - 1,
382 &[integer::DictScheme.code()],
383 )?;
384
385 let compressed_values = FloatCompressor::compress(
386 &dict_array.values().to_primitive(),
387 is_sample,
388 allowed_cascading - 1,
389 &[DICT_SCHEME],
390 )?;
391
392 unsafe {
394 Ok(
395 DictArray::new_unchecked(compressed_codes, compressed_values)
396 .set_all_values_referenced(dict_array.has_all_values_referenced())
397 .into_array(),
398 )
399 }
400 }
401}
402
403impl Scheme for NullDominated {
404 type StatsType = FloatStats;
405 type CodeType = FloatCode;
406
407 fn code(&self) -> Self::CodeType {
408 SPARSE_SCHEME
409 }
410
411 fn expected_compression_ratio(
412 &self,
413 stats: &Self::StatsType,
414 _is_sample: bool,
415 allowed_cascading: usize,
416 _excludes: &[Self::CodeType],
417 ) -> VortexResult<f64> {
418 if allowed_cascading == 0 {
420 return Ok(0.0);
421 }
422
423 if stats.value_count == 0 {
424 return Ok(0.0);
426 }
427
428 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
430 return Ok(stats.src.len() as f64 / stats.value_count as f64);
431 }
432
433 Ok(0.0)
435 }
436
437 fn compress(
438 &self,
439 stats: &Self::StatsType,
440 is_sample: bool,
441 allowed_cascading: usize,
442 _excludes: &[Self::CodeType],
443 ) -> VortexResult<ArrayRef> {
444 assert!(allowed_cascading > 0);
445
446 let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
448
449 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
450 let new_excludes = vec![integer::SparseScheme.code()];
452
453 let indices = sparse.patches().indices().to_primitive().narrow()?;
456 let compressed_indices = IntCompressor::compress_no_dict(
457 &indices,
458 is_sample,
459 allowed_cascading - 1,
460 &new_excludes,
461 )?;
462
463 SparseArray::try_new(
464 compressed_indices,
465 sparse.patches().values().clone(),
466 sparse.len(),
467 sparse.fill_scalar().clone(),
468 )
469 .map(|a| a.into_array())
470 } else {
471 Ok(sparse_encoded)
472 }
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use std::iter;
479
480 use vortex_array::Array;
481 use vortex_array::IntoArray;
482 use vortex_array::ToCanonical;
483 use vortex_array::arrays::PrimitiveArray;
484 use vortex_array::assert_arrays_eq;
485 use vortex_array::builders::ArrayBuilder;
486 use vortex_array::builders::PrimitiveBuilder;
487 use vortex_array::validity::Validity;
488 use vortex_buffer::Buffer;
489 use vortex_buffer::buffer_mut;
490 use vortex_dtype::Nullability;
491 use vortex_sparse::SparseVTable;
492
493 use crate::Compressor;
494 use crate::CompressorStats;
495 use crate::MAX_CASCADE;
496 use crate::Scheme;
497 use crate::float::FloatCompressor;
498 use crate::float::RLE_FLOAT_SCHEME;
499
500 #[test]
501 fn test_empty() {
502 let result = FloatCompressor::compress(
504 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
505 false,
506 3,
507 &[],
508 )
509 .unwrap();
510
511 assert!(result.is_empty());
512 }
513
514 #[test]
515 fn test_compress() {
516 let mut values = buffer_mut![1.0f32; 1024];
517 for i in 0..1024 {
519 values[i] = (i % 50) as f32;
523 }
524
525 let floats = values.into_array().to_primitive();
526 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
527 println!("compressed: {}", compressed.display_tree())
528 }
529
530 #[test]
531 fn test_rle_compression() {
532 let mut values = Vec::new();
533 values.extend(iter::repeat_n(1.5f32, 100));
534 values.extend(iter::repeat_n(2.7f32, 200));
535 values.extend(iter::repeat_n(3.15f32, 150));
536
537 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
538 let stats = crate::float::FloatStats::generate(&array);
539 let compressed = RLE_FLOAT_SCHEME.compress(&stats, false, 3, &[]).unwrap();
540
541 let decoded = compressed;
542 let expected = Buffer::copy_from(&values).into_array();
543 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
544 }
545
546 #[test]
547 fn test_sparse_compression() {
548 let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
549 array.append_value(f32::NAN);
550 array.append_value(-f32::NAN);
551 array.append_value(f32::INFINITY);
552 array.append_value(-f32::INFINITY);
553 array.append_value(0.0f32);
554 array.append_value(-0.0f32);
555 array.append_nulls(90);
556
557 let floats = array.finish_into_primitive();
558
559 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
560
561 assert!(compressed.is::<SparseVTable>());
562 }
563}