1pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, DictArray, MaskedArray, PrimitiveVTable};
9use vortex_array::vtable::ValidityHelper;
10use vortex_array::{ArrayRef, IntoArray, ToCanonical};
11use vortex_dtype::PType;
12use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13use vortex_scalar::Scalar;
14use vortex_sparse::{SparseArray, SparseVTable};
15
16pub use self::stats::FloatStats;
17use crate::float::dictionary::dictionary_encode;
18use crate::integer::{IntCompressor, IntegerStats};
19use crate::patches::compress_patches;
20use crate::rle::RLEScheme;
21use crate::{
22 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
23 estimate_compression_ratio_with_sampling, integer,
24};
25
26pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
27
28impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
29
30pub struct FloatCompressor;
32
33impl Compressor for FloatCompressor {
34 type ArrayVTable = PrimitiveVTable;
35 type SchemeType = dyn FloatScheme;
36 type StatsType = FloatStats;
37
38 fn schemes() -> &'static [&'static Self::SchemeType] {
39 &[
40 &UncompressedScheme,
41 &ConstantScheme,
42 &ALPScheme,
43 &ALPRDScheme,
44 &DictScheme,
45 &NullDominated,
46 &RLE_FLOAT_SCHEME,
47 ]
48 }
49
50 fn default_scheme() -> &'static Self::SchemeType {
51 &UncompressedScheme
52 }
53
54 fn dict_scheme_code() -> FloatCode {
55 DICT_SCHEME
56 }
57}
58
59const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
60const CONSTANT_SCHEME: FloatCode = FloatCode(1);
61const ALP_SCHEME: FloatCode = FloatCode(2);
62const ALPRD_SCHEME: FloatCode = FloatCode(3);
63const DICT_SCHEME: FloatCode = FloatCode(4);
64const RUN_END_SCHEME: FloatCode = FloatCode(5);
65const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6);
66
67const SPARSE_SCHEME: FloatCode = FloatCode(7);
68
69#[derive(Debug, Copy, Clone)]
70struct UncompressedScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct ConstantScheme;
74
75#[derive(Debug, Copy, Clone)]
76struct ALPScheme;
77
78#[derive(Debug, Copy, Clone)]
79struct ALPRDScheme;
80
81#[derive(Debug, Copy, Clone)]
82struct DictScheme;
83
84#[derive(Debug, Copy, Clone)]
85pub struct NullDominated;
86
87pub const RLE_FLOAT_SCHEME: RLEScheme<FloatStats, FloatCode> = RLEScheme::new(
88 RUN_LENGTH_SCHEME,
89 |values, is_sample, allowed_cascading, excludes| {
90 FloatCompressor::compress(values, is_sample, allowed_cascading, excludes)
91 },
92);
93
94impl Scheme for UncompressedScheme {
95 type StatsType = FloatStats;
96 type CodeType = FloatCode;
97
98 fn code(&self) -> FloatCode {
99 UNCOMPRESSED_SCHEME
100 }
101
102 fn expected_compression_ratio(
103 &self,
104 _stats: &Self::StatsType,
105 _is_sample: bool,
106 _allowed_cascading: usize,
107 _excludes: &[FloatCode],
108 ) -> VortexResult<f64> {
109 Ok(1.0)
110 }
111
112 fn compress(
113 &self,
114 stats: &Self::StatsType,
115 _is_sample: bool,
116 _allowed_cascading: usize,
117 _excludes: &[FloatCode],
118 ) -> VortexResult<ArrayRef> {
119 Ok(stats.source().to_array())
120 }
121}
122
123impl Scheme for ConstantScheme {
124 type StatsType = FloatStats;
125 type CodeType = FloatCode;
126
127 fn code(&self) -> FloatCode {
128 CONSTANT_SCHEME
129 }
130
131 fn expected_compression_ratio(
132 &self,
133 stats: &Self::StatsType,
134 is_sample: bool,
135 _allowed_cascading: usize,
136 _excludes: &[FloatCode],
137 ) -> VortexResult<f64> {
138 if is_sample {
140 return Ok(0.0);
141 }
142
143 if stats.null_count as usize == stats.src.len() || stats.value_count == 0 {
144 return Ok(0.0);
145 }
146
147 if stats.distinct_values_count != 1 {
149 return Ok(0.0);
150 }
151
152 Ok(stats.value_count as f64)
153 }
154
155 fn compress(
156 &self,
157 stats: &Self::StatsType,
158 _is_sample: bool,
159 _allowed_cascading: usize,
160 _excludes: &[FloatCode],
161 ) -> VortexResult<ArrayRef> {
162 let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
163
164 match scalar_idx {
165 Some(idx) => {
166 let scalar = stats.source().scalar_at(idx);
167 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
168 if !stats.source().all_valid() {
169 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
170 } else {
171 Ok(const_arr)
172 }
173 }
174 None => Ok(ConstantArray::new(
175 Scalar::null(stats.src.dtype().clone()),
176 stats.src.len(),
177 )
178 .into_array()),
179 }
180 }
181}
182
183#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
184pub struct FloatCode(u8);
185
186impl Scheme for ALPScheme {
187 type StatsType = FloatStats;
188 type CodeType = FloatCode;
189
190 fn code(&self) -> FloatCode {
191 ALP_SCHEME
192 }
193
194 fn expected_compression_ratio(
195 &self,
196 stats: &Self::StatsType,
197 is_sample: bool,
198 allowed_cascading: usize,
199 excludes: &[FloatCode],
200 ) -> VortexResult<f64> {
201 if stats.source().ptype() == PType::F16 {
203 return Ok(0.0);
204 }
205
206 if allowed_cascading == 0 {
207 return Ok(0.0);
210 }
211
212 estimate_compression_ratio_with_sampling(
213 self,
214 stats,
215 is_sample,
216 allowed_cascading,
217 excludes,
218 )
219 }
220
221 fn compress(
222 &self,
223 stats: &FloatStats,
224 is_sample: bool,
225 allowed_cascading: usize,
226 excludes: &[FloatCode],
227 ) -> VortexResult<ArrayRef> {
228 let alp_encoded = ALPEncoding
229 .encode(&stats.source().to_canonical(), None)?
230 .vortex_expect("Input is a supported floating point array");
231 let alp = alp_encoded.as_::<ALPVTable>();
232 let alp_ints = alp.encoded().to_primitive();
233
234 let mut int_excludes = Vec::new();
238 if excludes.contains(&DICT_SCHEME) {
239 int_excludes.push(integer::DictScheme.code());
240 }
241 if excludes.contains(&RUN_END_SCHEME) {
242 int_excludes.push(integer::RunEndScheme.code());
243 }
244
245 let compressed_alp_ints =
246 IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
247
248 let patches = alp.patches().map(compress_patches).transpose()?;
249
250 Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
251 }
252}
253
254impl Scheme for ALPRDScheme {
255 type StatsType = FloatStats;
256 type CodeType = FloatCode;
257
258 fn code(&self) -> FloatCode {
259 ALPRD_SCHEME
260 }
261
262 fn expected_compression_ratio(
263 &self,
264 stats: &Self::StatsType,
265 is_sample: bool,
266 allowed_cascading: usize,
267 excludes: &[FloatCode],
268 ) -> VortexResult<f64> {
269 if stats.source().ptype() == PType::F16 {
270 return Ok(0.0);
271 }
272
273 estimate_compression_ratio_with_sampling(
274 self,
275 stats,
276 is_sample,
277 allowed_cascading,
278 excludes,
279 )
280 }
281
282 fn compress(
283 &self,
284 stats: &Self::StatsType,
285 _is_sample: bool,
286 _allowed_cascading: usize,
287 _excludes: &[FloatCode],
288 ) -> VortexResult<ArrayRef> {
289 let encoder = match stats.source().ptype() {
290 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
291 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
292 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
293 };
294
295 let mut alp_rd = encoder.encode(stats.source());
296
297 let patches = alp_rd
298 .left_parts_patches()
299 .map(compress_patches)
300 .transpose()?;
301 alp_rd.replace_left_parts_patches(patches);
302
303 Ok(alp_rd.into_array())
304 }
305}
306
307impl Scheme for DictScheme {
308 type StatsType = FloatStats;
309 type CodeType = FloatCode;
310
311 fn code(&self) -> FloatCode {
312 DICT_SCHEME
313 }
314
315 fn expected_compression_ratio(
316 &self,
317 stats: &Self::StatsType,
318 is_sample: bool,
319 allowed_cascading: usize,
320 excludes: &[FloatCode],
321 ) -> VortexResult<f64> {
322 if stats.value_count == 0 {
323 return Ok(0.0);
324 }
325
326 if stats.distinct_values_count > stats.value_count / 2 {
328 return Ok(0.0);
329 }
330
331 estimate_compression_ratio_with_sampling(
333 self,
334 stats,
335 is_sample,
336 allowed_cascading,
337 excludes,
338 )
339 }
340
341 fn compress(
342 &self,
343 stats: &Self::StatsType,
344 is_sample: bool,
345 allowed_cascading: usize,
346 _excludes: &[FloatCode],
347 ) -> VortexResult<ArrayRef> {
348 let dict_array = dictionary_encode(stats);
349
350 let codes_stats = IntegerStats::generate_opts(
352 &dict_array.codes().to_primitive().narrow()?,
353 GenerateStatsOptions {
354 count_distinct_values: false,
355 },
356 );
357 let codes_scheme = IntCompressor::choose_scheme(
358 &codes_stats,
359 is_sample,
360 allowed_cascading - 1,
361 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
362 )?;
363 let compressed_codes = codes_scheme.compress(
364 &codes_stats,
365 is_sample,
366 allowed_cascading - 1,
367 &[integer::DictScheme.code()],
368 )?;
369
370 let compressed_values = FloatCompressor::compress(
371 &dict_array.values().to_primitive(),
372 is_sample,
373 allowed_cascading - 1,
374 &[DICT_SCHEME],
375 )?;
376
377 unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
379 }
380}
381
382impl Scheme for NullDominated {
383 type StatsType = FloatStats;
384 type CodeType = FloatCode;
385
386 fn code(&self) -> Self::CodeType {
387 SPARSE_SCHEME
388 }
389
390 fn expected_compression_ratio(
391 &self,
392 stats: &Self::StatsType,
393 _is_sample: bool,
394 allowed_cascading: usize,
395 _excludes: &[Self::CodeType],
396 ) -> VortexResult<f64> {
397 if allowed_cascading == 0 {
399 return Ok(0.0);
400 }
401
402 if stats.value_count == 0 {
403 return Ok(0.0);
405 }
406
407 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
409 return Ok(stats.src.len() as f64 / stats.value_count as f64);
410 }
411
412 Ok(0.0)
414 }
415
416 fn compress(
417 &self,
418 stats: &Self::StatsType,
419 is_sample: bool,
420 allowed_cascading: usize,
421 _excludes: &[Self::CodeType],
422 ) -> VortexResult<ArrayRef> {
423 assert!(allowed_cascading > 0);
424
425 let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
427
428 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
429 let new_excludes = vec![integer::SparseScheme.code()];
431
432 let indices = sparse.patches().indices().to_primitive().narrow()?;
435 let compressed_indices = IntCompressor::compress_no_dict(
436 &indices,
437 is_sample,
438 allowed_cascading - 1,
439 &new_excludes,
440 )?;
441
442 SparseArray::try_new(
443 compressed_indices,
444 sparse.patches().values().clone(),
445 sparse.len(),
446 sparse.fill_scalar().clone(),
447 )
448 .map(|a| a.into_array())
449 } else {
450 Ok(sparse_encoded)
451 }
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use std::iter;
458
459 use vortex_array::arrays::PrimitiveArray;
460 use vortex_array::builders::{ArrayBuilder, PrimitiveBuilder};
461 use vortex_array::validity::Validity;
462 use vortex_array::{Array, IntoArray, ToCanonical, assert_arrays_eq};
463 use vortex_buffer::{Buffer, buffer_mut};
464 use vortex_dtype::Nullability;
465 use vortex_sparse::SparseEncoding;
466
467 use crate::float::{FloatCompressor, RLE_FLOAT_SCHEME};
468 use crate::{Compressor, CompressorStats, MAX_CASCADE, Scheme};
469
470 #[test]
471 fn test_empty() {
472 let result = FloatCompressor::compress(
474 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
475 false,
476 3,
477 &[],
478 )
479 .unwrap();
480
481 assert!(result.is_empty());
482 }
483
484 #[test]
485 fn test_compress() {
486 let mut values = buffer_mut![1.0f32; 1024];
487 for i in 0..1024 {
489 values[i] = (i % 50) as f32;
493 }
494
495 let floats = values.into_array().to_primitive();
496 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
497 println!("compressed: {}", compressed.display_tree())
498 }
499
500 #[test]
501 fn test_rle_compression() {
502 let mut values = Vec::new();
503 values.extend(iter::repeat_n(1.5f32, 100));
504 values.extend(iter::repeat_n(2.7f32, 200));
505 values.extend(iter::repeat_n(3.15f32, 150));
506
507 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
508 let stats = crate::float::FloatStats::generate(&array);
509 let compressed = RLE_FLOAT_SCHEME.compress(&stats, false, 3, &[]).unwrap();
510
511 let decoded = compressed;
512 let expected = Buffer::copy_from(&values).into_array();
513 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
514 }
515
516 #[test]
517 fn test_sparse_compression() {
518 let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
519 array.append_value(f32::NAN);
520 array.append_value(-f32::NAN);
521 array.append_value(f32::INFINITY);
522 array.append_value(-f32::INFINITY);
523 array.append_value(0.0f32);
524 array.append_value(-0.0f32);
525 array.append_nulls(90);
526
527 let floats = array.finish_into_primitive();
528
529 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
530
531 assert_eq!(compressed.encoding_id(), SparseEncoding.id());
532 }
533}