1pub(crate) mod dictionary;
5pub(super) mod stats;
6
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use enum_iterator::Sequence;
11use vortex_alp::ALPArray;
12use vortex_alp::ALPVTable;
13use vortex_alp::RDEncoder;
14use vortex_alp::alp_encode;
15use vortex_array::ArrayRef;
16use vortex_array::Canonical;
17use vortex_array::IntoArray;
18use vortex_array::ToCanonical;
19use vortex_array::arrays::ConstantArray;
20use vortex_array::arrays::DictArray;
21use vortex_array::arrays::DictArrayParts;
22use vortex_array::arrays::MaskedArray;
23use vortex_array::arrays::PrimitiveVTable;
24use vortex_array::scalar::Scalar;
25use vortex_array::vtable::VTable;
26use vortex_array::vtable::ValidityHelper;
27use vortex_dtype::PType;
28use vortex_error::VortexResult;
29use vortex_error::vortex_panic;
30use vortex_sparse::SparseArray;
31use vortex_sparse::SparseVTable;
32
33use self::dictionary::dictionary_encode;
34pub use self::stats::FloatStats;
35use super::integer::DictScheme as IntDictScheme;
36use super::integer::RunEndScheme as IntRunEndScheme;
37use super::integer::SparseScheme as IntSparseScheme;
38use crate::BtrBlocksCompressor;
39use crate::CanonicalCompressor;
40use crate::Compressor;
41use crate::CompressorContext;
42use crate::CompressorStats;
43use crate::Excludes;
44use crate::GenerateStatsOptions;
45use crate::IntCode;
46use crate::Scheme;
47use crate::SchemeExt;
48use crate::compressor::patches::compress_patches;
49use crate::compressor::rle;
50use crate::compressor::rle::RLEScheme;
51
52pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> + Send + Sync {}
53
54impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> + Send + Sync
55{}
56
57impl PartialEq for dyn FloatScheme {
58 fn eq(&self, other: &Self) -> bool {
59 self.code() == other.code()
60 }
61}
62
63impl Eq for dyn FloatScheme {}
64
65impl Hash for dyn FloatScheme {
66 fn hash<H: Hasher>(&self, state: &mut H) {
67 self.code().hash(state)
68 }
69}
70
71pub const ALL_FLOAT_SCHEMES: &[&dyn FloatScheme] = &[
73 &UncompressedScheme,
74 &ConstantScheme,
75 &ALPScheme,
76 &ALPRDScheme,
77 &DictScheme,
78 &NullDominated,
79 &RLE_FLOAT_SCHEME,
80 #[cfg(feature = "pco")]
81 &PcoScheme,
82];
83
84#[derive(Clone, Copy)]
86pub struct FloatCompressor<'a> {
87 pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
89}
90
91impl<'a> Compressor for FloatCompressor<'a> {
92 type ArrayVTable = PrimitiveVTable;
93 type SchemeType = dyn FloatScheme;
94 type StatsType = FloatStats;
95
96 fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
97 if self
98 .btr_blocks_compressor
99 .float_schemes()
100 .iter()
101 .any(|s| s.code() == DictScheme.code())
102 {
103 FloatStats::generate_opts(
104 array,
105 GenerateStatsOptions {
106 count_distinct_values: true,
107 },
108 )
109 } else {
110 FloatStats::generate_opts(
111 array,
112 GenerateStatsOptions {
113 count_distinct_values: false,
114 },
115 )
116 }
117 }
118
119 fn schemes(&self) -> &[&'static dyn FloatScheme] {
120 self.btr_blocks_compressor.float_schemes()
121 }
122
123 fn default_scheme(&self) -> &'static Self::SchemeType {
124 &UncompressedScheme
125 }
126}
127
128#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
130pub enum FloatCode {
131 Uncompressed,
133 Constant,
135 Alp,
137 AlpRd,
139 Dict,
141 RunEnd,
143 Rle,
145 Sparse,
147 Pco,
149}
150
151#[derive(Debug, Copy, Clone, PartialEq, Eq)]
152struct UncompressedScheme;
153
154#[derive(Debug, Copy, Clone, PartialEq, Eq)]
155struct ConstantScheme;
156
157#[derive(Debug, Copy, Clone, PartialEq, Eq)]
158struct ALPScheme;
159
160#[derive(Debug, Copy, Clone, PartialEq, Eq)]
161struct ALPRDScheme;
162
163#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164struct DictScheme;
165
166#[derive(Debug, Copy, Clone, PartialEq, Eq)]
167pub struct NullDominated;
168
169#[cfg(feature = "pco")]
171#[derive(Debug, Copy, Clone, PartialEq, Eq)]
172pub struct PcoScheme;
173
174#[derive(Debug, Copy, Clone, PartialEq, Eq)]
176pub struct FloatRLEConfig;
177
178impl rle::RLEConfig for FloatRLEConfig {
179 type Stats = FloatStats;
180 type Code = FloatCode;
181
182 const CODE: FloatCode = FloatCode::Rle;
183
184 fn compress_values(
185 compressor: &BtrBlocksCompressor,
186 values: &vortex_array::arrays::PrimitiveArray,
187 ctx: CompressorContext,
188 excludes: &[FloatCode],
189 ) -> VortexResult<ArrayRef> {
190 compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into())
191 }
192}
193
194pub const RLE_FLOAT_SCHEME: RLEScheme<FloatRLEConfig> = RLEScheme::new();
196
197impl Scheme for UncompressedScheme {
198 type StatsType = FloatStats;
199 type CodeType = FloatCode;
200
201 fn code(&self) -> FloatCode {
202 FloatCode::Uncompressed
203 }
204
205 fn expected_compression_ratio(
206 &self,
207 _compressor: &BtrBlocksCompressor,
208 _stats: &Self::StatsType,
209 _ctx: CompressorContext,
210 _excludes: &[FloatCode],
211 ) -> VortexResult<f64> {
212 Ok(1.0)
213 }
214
215 fn compress(
216 &self,
217 _btr_blocks_compressor: &BtrBlocksCompressor,
218 stats: &Self::StatsType,
219 _ctx: CompressorContext,
220 _excludes: &[FloatCode],
221 ) -> VortexResult<ArrayRef> {
222 Ok(stats.source().to_array())
223 }
224}
225
226impl Scheme for ConstantScheme {
227 type StatsType = FloatStats;
228 type CodeType = FloatCode;
229
230 fn code(&self) -> FloatCode {
231 FloatCode::Constant
232 }
233
234 fn expected_compression_ratio(
235 &self,
236 _btr_blocks_compressor: &BtrBlocksCompressor,
237 stats: &Self::StatsType,
238 ctx: CompressorContext,
239 _excludes: &[FloatCode],
240 ) -> VortexResult<f64> {
241 if ctx.is_sample {
243 return Ok(0.0);
244 }
245
246 if stats.null_count as usize == stats.src.len() || stats.value_count == 0 {
247 return Ok(0.0);
248 }
249
250 if stats.distinct_values_count != 1 {
252 return Ok(0.0);
253 }
254
255 Ok(stats.value_count as f64)
256 }
257
258 fn compress(
259 &self,
260 _btr_blocks_compressor: &BtrBlocksCompressor,
261 stats: &Self::StatsType,
262 _ctx: CompressorContext,
263 _excludes: &[FloatCode],
264 ) -> VortexResult<ArrayRef> {
265 let scalar_idx =
266 (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
267
268 match scalar_idx {
269 Some(idx) => {
270 let scalar = stats.source().scalar_at(idx)?;
271 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
272 if !stats.source().all_valid()? {
273 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
274 } else {
275 Ok(const_arr)
276 }
277 }
278 None => Ok(ConstantArray::new(
279 Scalar::null(stats.src.dtype().clone()),
280 stats.src.len(),
281 )
282 .into_array()),
283 }
284 }
285}
286
287impl Scheme for ALPScheme {
288 type StatsType = FloatStats;
289 type CodeType = FloatCode;
290
291 fn code(&self) -> FloatCode {
292 FloatCode::Alp
293 }
294
295 fn expected_compression_ratio(
296 &self,
297 compressor: &BtrBlocksCompressor,
298 stats: &Self::StatsType,
299 ctx: CompressorContext,
300 excludes: &[FloatCode],
301 ) -> VortexResult<f64> {
302 if stats.source().ptype() == PType::F16 {
304 return Ok(0.0);
305 }
306
307 if ctx.allowed_cascading == 0 {
308 return Ok(0.0);
311 }
312
313 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
314 }
315
316 fn compress(
317 &self,
318 compressor: &BtrBlocksCompressor,
319 stats: &FloatStats,
320 ctx: CompressorContext,
321 excludes: &[FloatCode],
322 ) -> VortexResult<ArrayRef> {
323 let alp_encoded = alp_encode(&stats.source().to_primitive(), None)?;
324 let alp = alp_encoded.as_::<ALPVTable>();
325 let alp_ints = alp.encoded().to_primitive();
326
327 let mut int_excludes = Vec::new();
331 if excludes.contains(&FloatCode::Dict) {
332 int_excludes.push(IntDictScheme.code());
333 }
334 if excludes.contains(&FloatCode::RunEnd) {
335 int_excludes.push(IntRunEndScheme.code());
336 }
337
338 let compressed_alp_ints = compressor.compress_canonical(
339 Canonical::Primitive(alp_ints),
340 ctx.descend(),
341 Excludes::int_only(&int_excludes),
342 )?;
343
344 let patches = alp.patches().map(compress_patches).transpose()?;
345
346 Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
347 }
348}
349
350impl Scheme for ALPRDScheme {
351 type StatsType = FloatStats;
352 type CodeType = FloatCode;
353
354 fn code(&self) -> FloatCode {
355 FloatCode::AlpRd
356 }
357
358 fn expected_compression_ratio(
359 &self,
360 compressor: &BtrBlocksCompressor,
361 stats: &Self::StatsType,
362 ctx: CompressorContext,
363 excludes: &[FloatCode],
364 ) -> VortexResult<f64> {
365 if stats.source().ptype() == PType::F16 {
366 return Ok(0.0);
367 }
368
369 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
370 }
371
372 fn compress(
373 &self,
374 _compressor: &BtrBlocksCompressor,
375 stats: &Self::StatsType,
376 _ctx: CompressorContext,
377 _excludes: &[FloatCode],
378 ) -> VortexResult<ArrayRef> {
379 let encoder = match stats.source().ptype() {
380 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
381 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
382 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
383 };
384
385 let mut alp_rd = encoder.encode(stats.source());
386
387 let patches = alp_rd
388 .left_parts_patches()
389 .map(compress_patches)
390 .transpose()?;
391 alp_rd.replace_left_parts_patches(patches);
392
393 Ok(alp_rd.into_array())
394 }
395}
396
397impl Scheme for DictScheme {
398 type StatsType = FloatStats;
399 type CodeType = FloatCode;
400
401 fn code(&self) -> FloatCode {
402 FloatCode::Dict
403 }
404
405 fn expected_compression_ratio(
406 &self,
407 compressor: &BtrBlocksCompressor,
408 stats: &Self::StatsType,
409 ctx: CompressorContext,
410 excludes: &[FloatCode],
411 ) -> VortexResult<f64> {
412 if stats.value_count == 0 {
413 return Ok(0.0);
414 }
415
416 if stats.distinct_values_count > stats.value_count / 2 {
418 return Ok(0.0);
419 }
420
421 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
423 }
424
425 fn compress(
426 &self,
427 compressor: &BtrBlocksCompressor,
428 stats: &Self::StatsType,
429 ctx: CompressorContext,
430 _excludes: &[Self::CodeType],
431 ) -> VortexResult<ArrayRef> {
432 let dict = dictionary_encode(stats);
433 let has_all_values_referenced = dict.has_all_values_referenced();
434 let DictArrayParts { codes, values, .. } = dict.into_parts();
435
436 let compressed_codes = compressor.compress_canonical(
437 Canonical::Primitive(codes.to_primitive()),
438 ctx.descend(),
439 Excludes::int_only(&[IntCode::Dict, IntCode::Sequence]),
440 )?;
441
442 assert!(values.is_canonical());
443 let compressed_values = compressor.compress_canonical(
444 Canonical::Primitive(values.to_primitive()),
445 ctx.descend(),
446 Excludes::from(&[FloatCode::Dict]),
447 )?;
448
449 unsafe {
451 Ok(
452 DictArray::new_unchecked(compressed_codes, compressed_values)
453 .set_all_values_referenced(has_all_values_referenced)
454 .into_array(),
455 )
456 }
457 }
458}
459
460impl Scheme for NullDominated {
461 type StatsType = FloatStats;
462 type CodeType = FloatCode;
463
464 fn code(&self) -> Self::CodeType {
465 FloatCode::Sparse
466 }
467
468 fn expected_compression_ratio(
469 &self,
470 _compressor: &BtrBlocksCompressor,
471 stats: &Self::StatsType,
472 ctx: CompressorContext,
473 _excludes: &[Self::CodeType],
474 ) -> VortexResult<f64> {
475 if ctx.allowed_cascading == 0 {
477 return Ok(0.0);
478 }
479
480 if stats.value_count == 0 {
481 return Ok(0.0);
483 }
484
485 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
487 return Ok(stats.src.len() as f64 / stats.value_count as f64);
488 }
489
490 Ok(0.0)
492 }
493
494 fn compress(
495 &self,
496 compressor: &BtrBlocksCompressor,
497 stats: &Self::StatsType,
498 ctx: CompressorContext,
499 _excludes: &[Self::CodeType],
500 ) -> VortexResult<ArrayRef> {
501 assert!(ctx.allowed_cascading > 0);
502
503 let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
505
506 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
507 let new_excludes = [IntSparseScheme.code()];
509
510 let indices = sparse.patches().indices().to_primitive().narrow()?;
513 let compressed_indices = compressor.compress_canonical(
514 Canonical::Primitive(indices.to_primitive()),
515 ctx.descend(),
516 Excludes::int_only(&new_excludes),
517 )?;
518
519 SparseArray::try_new(
520 compressed_indices,
521 sparse.patches().values().clone(),
522 sparse.len(),
523 sparse.fill_scalar().clone(),
524 )
525 .map(|a| a.into_array())
526 } else {
527 Ok(sparse_encoded)
528 }
529 }
530}
531
532#[cfg(feature = "pco")]
533impl Scheme for PcoScheme {
534 type StatsType = FloatStats;
535 type CodeType = FloatCode;
536
537 fn code(&self) -> FloatCode {
538 FloatCode::Pco
539 }
540
541 fn compress(
542 &self,
543 _compressor: &BtrBlocksCompressor,
544 stats: &Self::StatsType,
545 _ctx: CompressorContext,
546 _excludes: &[FloatCode],
547 ) -> VortexResult<ArrayRef> {
548 Ok(vortex_pco::PcoArray::from_primitive(
549 stats.source(),
550 pco::DEFAULT_COMPRESSION_LEVEL,
551 8192,
552 )?
553 .into_array())
554 }
555}
556
557#[cfg(test)]
558mod tests {
559
560 use std::iter;
561
562 use vortex_array::Array;
563 use vortex_array::IntoArray;
564 use vortex_array::ToCanonical;
565 use vortex_array::arrays::PrimitiveArray;
566 use vortex_array::assert_arrays_eq;
567 use vortex_array::builders::ArrayBuilder;
568 use vortex_array::builders::PrimitiveBuilder;
569 use vortex_array::display::DisplayOptions;
570 use vortex_array::validity::Validity;
571 use vortex_buffer::Buffer;
572 use vortex_buffer::buffer_mut;
573 use vortex_dtype::Nullability;
574 use vortex_error::VortexResult;
575
576 use super::RLE_FLOAT_SCHEME;
577 use crate::BtrBlocksCompressor;
578 use crate::CompressorContext;
579 use crate::CompressorExt;
580 use crate::CompressorStats;
581 use crate::Scheme;
582
583 #[test]
584 fn test_empty() -> VortexResult<()> {
585 let btr = BtrBlocksCompressor::default();
587 let result = btr.float_compressor().compress(
588 &btr,
589 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
590 CompressorContext::default(),
591 &[],
592 )?;
593
594 assert!(result.is_empty());
595 Ok(())
596 }
597
598 #[test]
599 fn test_compress() -> VortexResult<()> {
600 let mut values = buffer_mut![1.0f32; 1024];
601 for i in 0..1024 {
603 values[i] = (i % 50) as f32;
607 }
608
609 let floats = values.into_array().to_primitive();
610 let btr = BtrBlocksCompressor::default();
611 let compressed =
612 btr.float_compressor()
613 .compress(&btr, &floats, CompressorContext::default(), &[])?;
614 assert_eq!(compressed.len(), 1024);
615
616 let display = compressed
617 .display_as(DisplayOptions::MetadataOnly)
618 .to_string()
619 .to_lowercase();
620 assert_eq!(display, "vortex.dict(f32, len=1024)");
621
622 Ok(())
623 }
624
625 #[test]
626 fn test_rle_compression() -> VortexResult<()> {
627 let mut values = Vec::new();
628 values.extend(iter::repeat_n(1.5f32, 100));
629 values.extend(iter::repeat_n(2.7f32, 200));
630 values.extend(iter::repeat_n(3.15f32, 150));
631
632 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
633 let stats = super::FloatStats::generate(&array);
634 let btr = BtrBlocksCompressor::default();
635 let compressed =
636 RLE_FLOAT_SCHEME.compress(&btr, &stats, CompressorContext::default(), &[])?;
637
638 let decoded = compressed;
639 let expected = Buffer::copy_from(&values).into_array();
640 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
641 Ok(())
642 }
643
644 #[test]
645 fn test_sparse_compression() -> VortexResult<()> {
646 let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
647 array.append_value(f32::NAN);
648 array.append_value(-f32::NAN);
649 array.append_value(f32::INFINITY);
650 array.append_value(-f32::INFINITY);
651 array.append_value(0.0f32);
652 array.append_value(-0.0f32);
653 array.append_nulls(90);
654
655 let floats = array.finish_into_primitive();
656 let btr = BtrBlocksCompressor::default();
657 let compressed =
658 btr.float_compressor()
659 .compress(&btr, &floats, CompressorContext::default(), &[])?;
660 assert_eq!(compressed.len(), 96);
661
662 let display = compressed
663 .display_as(DisplayOptions::MetadataOnly)
664 .to_string()
665 .to_lowercase();
666 assert_eq!(display, "vortex.sparse(f32?, len=96)");
667
668 Ok(())
669 }
670}
671
672#[cfg(test)]
674mod scheme_selection_tests {
675
676 use vortex_alp::ALPVTable;
677 use vortex_array::arrays::ConstantVTable;
678 use vortex_array::arrays::DictVTable;
679 use vortex_array::arrays::PrimitiveArray;
680 use vortex_array::builders::ArrayBuilder;
681 use vortex_array::builders::PrimitiveBuilder;
682 use vortex_array::validity::Validity;
683 use vortex_buffer::Buffer;
684 use vortex_dtype::Nullability;
685 use vortex_error::VortexResult;
686
687 use crate::BtrBlocksCompressor;
688 use crate::CompressorContext;
689 use crate::CompressorExt;
690
691 #[test]
692 fn test_constant_compressed() -> VortexResult<()> {
693 let values: Vec<f64> = vec![42.5; 100];
694 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
695 let btr = BtrBlocksCompressor::default();
696 let compressed =
697 btr.float_compressor()
698 .compress(&btr, &array, CompressorContext::default(), &[])?;
699 assert!(compressed.is::<ConstantVTable>());
700 Ok(())
701 }
702
703 #[test]
704 fn test_alp_compressed() -> VortexResult<()> {
705 let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
706 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
707 let btr = BtrBlocksCompressor::default();
708 let compressed =
709 btr.float_compressor()
710 .compress(&btr, &array, CompressorContext::default(), &[])?;
711 assert!(compressed.is::<ALPVTable>());
712 Ok(())
713 }
714
715 #[test]
716 fn test_dict_compressed() -> VortexResult<()> {
717 let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
718 let values: Vec<f64> = (0..1000)
719 .map(|i| distinct_values[i % distinct_values.len()])
720 .collect();
721 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
722 let btr = BtrBlocksCompressor::default();
723 let compressed =
724 btr.float_compressor()
725 .compress(&btr, &array, CompressorContext::default(), &[])?;
726 assert!(compressed.is::<DictVTable>());
727 Ok(())
728 }
729
730 #[test]
731 fn test_null_dominated_compressed() -> VortexResult<()> {
732 let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
733 for i in 0..5 {
734 builder.append_value(i as f64);
735 }
736 builder.append_nulls(95);
737 let array = builder.finish_into_primitive();
738 let btr = BtrBlocksCompressor::default();
739 let compressed =
740 btr.float_compressor()
741 .compress(&btr, &array, CompressorContext::default(), &[])?;
742 assert_eq!(compressed.len(), 100);
744 Ok(())
745 }
746}