1pub mod dictionary;
5mod stats;
6
7use std::fmt::Debug;
8use std::hash::Hash;
9
10pub use stats::IntegerStats;
11use vortex_array::arrays::{ConstantArray, PrimitiveArray, PrimitiveVTable};
12use vortex_array::{ArrayRef, IntoArray, ToCanonical};
13use vortex_dict::DictArray;
14use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail, vortex_err};
15use vortex_fastlanes::{FoRArray, bit_width_histogram, bitpack_encode, find_best_bit_width};
16use vortex_runend::RunEndArray;
17use vortex_runend::compress::runend_encode;
18use vortex_scalar::Scalar;
19use vortex_sequence::sequence_encode;
20use vortex_sparse::{SparseArray, SparseVTable};
21use vortex_zigzag::{ZigZagArray, zigzag_encode};
22
23use crate::integer::dictionary::dictionary_encode;
24use crate::patches::compress_patches;
25use crate::{
26 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
27 estimate_compression_ratio_with_sampling,
28};
29
30pub struct IntCompressor;
31
32impl Compressor for IntCompressor {
33 type ArrayVTable = PrimitiveVTable;
34 type SchemeType = dyn IntegerScheme;
35 type StatsType = IntegerStats;
36
37 fn schemes() -> &'static [&'static dyn IntegerScheme] {
38 &[
39 &ConstantScheme,
40 &FORScheme,
41 &ZigZagScheme,
42 &BitPackingScheme,
43 &SparseScheme,
44 &DictScheme,
45 &RunEndScheme,
46 &SequenceScheme,
47 ]
48 }
49
50 fn default_scheme() -> &'static Self::SchemeType {
51 &UncompressedScheme
52 }
53
54 fn dict_scheme_code() -> IntCode {
55 DICT_SCHEME
56 }
57}
58
59impl IntCompressor {
60 pub fn compress_no_dict(
61 array: &PrimitiveArray,
62 is_sample: bool,
63 allowed_cascading: usize,
64 excludes: &[IntCode],
65 ) -> VortexResult<ArrayRef> {
66 let stats = IntegerStats::generate_opts(
67 array,
68 GenerateStatsOptions {
69 count_distinct_values: false,
70 },
71 );
72
73 let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
74 let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
75
76 if output.nbytes() < array.nbytes() {
77 Ok(output)
78 } else {
79 log::debug!("resulting tree too large: {}", output.display_tree());
80 Ok(array.to_array())
81 }
82 }
83}
84
85pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
86
87impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
89
90#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
91pub struct IntCode(u8);
92
93const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
94const CONSTANT_SCHEME: IntCode = IntCode(1);
95const FOR_SCHEME: IntCode = IntCode(2);
96const ZIGZAG_SCHEME: IntCode = IntCode(3);
97const BITPACKING_SCHEME: IntCode = IntCode(4);
98const SPARSE_SCHEME: IntCode = IntCode(5);
99const DICT_SCHEME: IntCode = IntCode(6);
100const RUNEND_SCHEME: IntCode = IntCode(7);
101const SEQUENCE_SCHEME: IntCode = IntCode(8);
102
103#[derive(Debug, Copy, Clone)]
104pub struct UncompressedScheme;
105
106#[derive(Debug, Copy, Clone)]
107pub struct ConstantScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct FORScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct ZigZagScheme;
114
115#[derive(Debug, Copy, Clone)]
116pub struct BitPackingScheme;
117
118#[derive(Debug, Copy, Clone)]
119pub struct SparseScheme;
120
121#[derive(Debug, Copy, Clone)]
122pub struct DictScheme;
123
124#[derive(Debug, Copy, Clone)]
125pub struct RunEndScheme;
126
127#[derive(Debug, Copy, Clone)]
128pub struct SequenceScheme;
129
130const RUN_END_THRESHOLD: u32 = 4;
132
133impl Scheme for UncompressedScheme {
134 type StatsType = IntegerStats;
135 type CodeType = IntCode;
136
137 fn code(&self) -> IntCode {
138 UNCOMPRESSED_SCHEME
139 }
140
141 fn expected_compression_ratio(
142 &self,
143 _stats: &IntegerStats,
144 _is_sample: bool,
145 _allowed_cascading: usize,
146 _excludes: &[IntCode],
147 ) -> VortexResult<f64> {
148 Ok(1.0)
150 }
151
152 fn compress(
153 &self,
154 stats: &IntegerStats,
155 _is_sample: bool,
156 _allowed_cascading: usize,
157 _excludes: &[IntCode],
158 ) -> VortexResult<ArrayRef> {
159 Ok(stats.source().to_array())
160 }
161}
162
163impl Scheme for ConstantScheme {
164 type StatsType = IntegerStats;
165 type CodeType = IntCode;
166
167 fn code(&self) -> IntCode {
168 CONSTANT_SCHEME
169 }
170
171 fn is_constant(&self) -> bool {
172 true
173 }
174
175 fn expected_compression_ratio(
176 &self,
177 stats: &IntegerStats,
178 is_sample: bool,
179 _allowed_cascading: usize,
180 _excludes: &[IntCode],
181 ) -> VortexResult<f64> {
182 if is_sample {
184 return Ok(0.0);
185 }
186
187 if stats.distinct_values_count != 1 {
189 return Ok(0.0);
190 }
191
192 if stats.null_count > 0 && stats.value_count > 0 {
194 return Ok(0.0);
195 }
196
197 Ok(stats.value_count as f64)
198 }
199
200 fn compress(
201 &self,
202 stats: &IntegerStats,
203 _is_sample: bool,
204 _allowed_cascading: usize,
205 _excludes: &[IntCode],
206 ) -> VortexResult<ArrayRef> {
207 let scalar = stats
210 .source()
211 .as_constant()
212 .vortex_expect("constant array expected");
213
214 Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
215 }
216}
217
218impl Scheme for FORScheme {
219 type StatsType = IntegerStats;
220 type CodeType = IntCode;
221
222 fn code(&self) -> IntCode {
223 FOR_SCHEME
224 }
225
226 fn expected_compression_ratio(
227 &self,
228 stats: &IntegerStats,
229 _is_sample: bool,
230 allowed_cascading: usize,
231 _excludes: &[IntCode],
232 ) -> VortexResult<f64> {
233 if allowed_cascading == 0 {
235 return Ok(0.0);
236 }
237
238 if stats.value_count == 0 {
240 return Ok(0.0);
241 }
242
243 if stats.typed.min_is_zero() {
245 return Ok(0.0);
246 }
247
248 let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
250 let bw = match stats.typed.max_minus_min().checked_ilog2() {
251 Some(l) => l + 1,
252 None => return Ok(0.0),
255 };
256
257 if full_width - bw < 8 {
259 return Ok(0.0);
260 }
261
262 Ok(full_width as f64 / bw as f64)
263 }
264
265 fn compress(
266 &self,
267 stats: &IntegerStats,
268 is_sample: bool,
269 _allowed_cascading: usize,
270 excludes: &[IntCode],
271 ) -> VortexResult<ArrayRef> {
272 let for_array = FoRArray::encode(stats.src.clone())?;
273 let biased = for_array.encoded().to_primitive();
274 let biased_stats = IntegerStats::generate_opts(
275 &biased,
276 GenerateStatsOptions {
277 count_distinct_values: false,
278 },
279 );
280
281 let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
286
287 Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
288 }
289}
290
291impl Scheme for ZigZagScheme {
292 type StatsType = IntegerStats;
293 type CodeType = IntCode;
294
295 fn code(&self) -> IntCode {
296 ZIGZAG_SCHEME
297 }
298
299 fn expected_compression_ratio(
300 &self,
301 stats: &IntegerStats,
302 is_sample: bool,
303 allowed_cascading: usize,
304 excludes: &[IntCode],
305 ) -> VortexResult<f64> {
306 if allowed_cascading == 0 {
308 return Ok(0.0);
309 }
310
311 if stats.value_count == 0 {
313 return Ok(0.0);
314 }
315
316 if !stats.typed.min_is_negative() {
318 return Ok(0.0);
319 }
320
321 estimate_compression_ratio_with_sampling(
323 self,
324 stats,
325 is_sample,
326 allowed_cascading,
327 excludes,
328 )
329 }
330
331 fn compress(
332 &self,
333 stats: &IntegerStats,
334 is_sample: bool,
335 allowed_cascading: usize,
336 excludes: &[IntCode],
337 ) -> VortexResult<ArrayRef> {
338 let zag = zigzag_encode(stats.src.clone())?;
340 let encoded = zag.encoded().to_primitive();
341
342 let mut new_excludes = vec![
345 ZigZagScheme.code(),
346 DictScheme.code(),
347 RunEndScheme.code(),
348 SparseScheme.code(),
349 ];
350 new_excludes.extend_from_slice(excludes);
351
352 let compressed =
353 IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
354
355 log::debug!("zigzag output: {}", compressed.display_tree());
356
357 Ok(ZigZagArray::try_new(compressed)?.into_array())
358 }
359}
360
361impl Scheme for BitPackingScheme {
362 type StatsType = IntegerStats;
363 type CodeType = IntCode;
364
365 fn code(&self) -> IntCode {
366 BITPACKING_SCHEME
367 }
368
369 #[allow(clippy::cast_possible_truncation)]
370 fn expected_compression_ratio(
371 &self,
372 stats: &IntegerStats,
373 is_sample: bool,
374 allowed_cascading: usize,
375 excludes: &[IntCode],
376 ) -> VortexResult<f64> {
377 if stats.typed.min_is_negative() {
379 return Ok(0.0);
380 }
381
382 if stats.value_count == 0 {
384 return Ok(0.0);
385 }
386
387 estimate_compression_ratio_with_sampling(
388 self,
389 stats,
390 is_sample,
391 allowed_cascading,
392 excludes,
393 )
394 }
395
396 #[allow(clippy::cast_possible_truncation)]
397 fn compress(
398 &self,
399 stats: &IntegerStats,
400 _is_sample: bool,
401 _allowed_cascading: usize,
402 _excludes: &[IntCode],
403 ) -> VortexResult<ArrayRef> {
404 let histogram = bit_width_histogram(stats.source())?;
405 let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
406 if bw as usize == stats.source().ptype().bit_width() {
408 return Ok(stats.source().clone().into_array());
409 }
410 let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
411
412 let patches = packed.patches().map(compress_patches).transpose()?;
413 packed.replace_patches(patches);
414
415 Ok(packed.into_array())
416 }
417}
418
419impl Scheme for SparseScheme {
420 type StatsType = IntegerStats;
421 type CodeType = IntCode;
422
423 fn code(&self) -> IntCode {
424 SPARSE_SCHEME
425 }
426
427 fn expected_compression_ratio(
429 &self,
430 stats: &IntegerStats,
431 _is_sample: bool,
432 _allowed_cascading: usize,
433 _excludes: &[IntCode],
434 ) -> VortexResult<f64> {
435 if stats.value_count == 0 {
436 return Ok(0.0);
438 }
439
440 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
442 return Ok(stats.src.len() as f64 / stats.value_count as f64);
443 }
444
445 let (_, top_count) = stats.typed.top_value_and_count();
447
448 if top_count == stats.value_count {
449 return Ok(0.0);
451 }
452
453 let freq = top_count as f64 / stats.value_count as f64;
454 if freq >= 0.9 {
455 return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
457 }
458
459 Ok(0.0)
460 }
461
462 fn compress(
463 &self,
464 stats: &IntegerStats,
465 is_sample: bool,
466 allowed_cascading: usize,
467 excludes: &[IntCode],
468 ) -> VortexResult<ArrayRef> {
469 assert!(allowed_cascading > 0);
470 let (top_pvalue, top_count) = stats.typed.top_value_and_count();
471 if top_count as usize == stats.src.len() {
472 return Ok(ConstantArray::new(
474 Scalar::primitive_value(
475 top_pvalue,
476 top_pvalue.ptype(),
477 stats.src.dtype().nullability(),
478 ),
479 stats.src.len(),
480 )
481 .into_array());
482 }
483
484 let sparse_encoded = SparseArray::encode(
485 stats.src.as_ref(),
486 Some(Scalar::primitive_value(
487 top_pvalue,
488 top_pvalue.ptype(),
489 stats.src.dtype().nullability(),
490 )),
491 )?;
492
493 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
494 let mut new_excludes = vec![SparseScheme.code()];
496 new_excludes.extend_from_slice(excludes);
497
498 let compressed_values = IntCompressor::compress_no_dict(
499 &sparse.patches().values().to_primitive(),
500 is_sample,
501 allowed_cascading - 1,
502 &new_excludes,
503 )?;
504
505 let indices = sparse.patches().indices().to_primitive().downcast()?;
506
507 let compressed_indices = IntCompressor::compress_no_dict(
508 &indices,
509 is_sample,
510 allowed_cascading - 1,
511 &new_excludes,
512 )?;
513
514 SparseArray::try_new(
515 compressed_indices,
516 compressed_values,
517 sparse.len(),
518 sparse.fill_scalar().clone(),
519 )
520 .map(|a| a.into_array())
521 } else {
522 Ok(sparse_encoded)
523 }
524 }
525}
526
527impl Scheme for DictScheme {
528 type StatsType = IntegerStats;
529 type CodeType = IntCode;
530
531 fn code(&self) -> IntCode {
532 DICT_SCHEME
533 }
534
535 fn expected_compression_ratio(
536 &self,
537 stats: &IntegerStats,
538 _is_sample: bool,
539 allowed_cascading: usize,
540 _excludes: &[IntCode],
541 ) -> VortexResult<f64> {
542 if allowed_cascading == 0 {
544 return Ok(0.0);
545 }
546
547 if stats.value_count == 0 {
548 return Ok(0.0);
549 }
550
551 if stats.distinct_values_count > stats.value_count / 2 {
553 return Ok(0.0);
554 }
555
556 let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
558
559 let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
561
562 let n_runs = stats.value_count / stats.average_run_length;
563
564 let codes_size_bp = (codes_bw * stats.value_count) as usize;
566 let codes_size_rle_bp = (codes_bw + 32) * n_runs;
567
568 let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
569
570 let before = stats.value_count as usize * stats.source().ptype().bit_width();
571
572 Ok(before as f64 / (values_size + codes_size) as f64)
573 }
574
575 fn compress(
576 &self,
577 stats: &IntegerStats,
578 is_sample: bool,
579 allowed_cascading: usize,
580 excludes: &[IntCode],
581 ) -> VortexResult<ArrayRef> {
582 assert!(allowed_cascading > 0);
583
584 let dict = dictionary_encode(stats);
588
589 let mut new_excludes = vec![DICT_SCHEME, SEQUENCE_SCHEME];
592 new_excludes.extend_from_slice(excludes);
593
594 let compressed_codes = IntCompressor::compress_no_dict(
595 &dict.codes().to_primitive().downcast()?,
596 is_sample,
597 allowed_cascading - 1,
598 &new_excludes,
599 )?;
600
601 unsafe {
603 Ok(DictArray::new_unchecked(compressed_codes, dict.values().clone()).into_array())
604 }
605 }
606}
607
608impl Scheme for RunEndScheme {
609 type StatsType = IntegerStats;
610 type CodeType = IntCode;
611
612 fn code(&self) -> IntCode {
613 RUNEND_SCHEME
614 }
615
616 fn expected_compression_ratio(
617 &self,
618 stats: &IntegerStats,
619 is_sample: bool,
620 allowed_cascading: usize,
621 excludes: &[IntCode],
622 ) -> VortexResult<f64> {
623 if stats.average_run_length < RUN_END_THRESHOLD {
625 return Ok(0.0);
626 }
627
628 if allowed_cascading == 0 {
629 return Ok(0.0);
630 }
631
632 estimate_compression_ratio_with_sampling(
634 self,
635 stats,
636 is_sample,
637 allowed_cascading,
638 excludes,
639 )
640 }
641
642 fn compress(
643 &self,
644 stats: &IntegerStats,
645 is_sample: bool,
646 allowed_cascading: usize,
647 excludes: &[IntCode],
648 ) -> VortexResult<ArrayRef> {
649 assert!(allowed_cascading > 0);
650
651 let (ends, values) = runend_encode(&stats.src);
653
654 let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
655 new_excludes.extend_from_slice(excludes);
656
657 let ends_stats = IntegerStats::generate_opts(
658 &ends.to_primitive(),
659 GenerateStatsOptions {
660 count_distinct_values: false,
661 },
662 );
663 let ends_scheme = IntCompressor::choose_scheme(
664 &ends_stats,
665 is_sample,
666 allowed_cascading - 1,
667 &new_excludes,
668 )?;
669 let compressed_ends =
670 ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
671
672 let compressed_values = IntCompressor::compress_no_dict(
673 &values.to_primitive(),
674 is_sample,
675 allowed_cascading - 1,
676 &new_excludes,
677 )?;
678
679 unsafe {
681 Ok(
682 RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
683 .into_array(),
684 )
685 }
686 }
687}
688
689impl Scheme for SequenceScheme {
690 type StatsType = IntegerStats;
691 type CodeType = IntCode;
692
693 fn code(&self) -> Self::CodeType {
694 SEQUENCE_SCHEME
695 }
696
697 fn expected_compression_ratio(
698 &self,
699 stats: &Self::StatsType,
700 _is_sample: bool,
701 _allowed_cascading: usize,
702 _excludes: &[Self::CodeType],
703 ) -> VortexResult<f64> {
704 if stats.null_count > 0 {
705 return Ok(0.0);
706 }
707 Ok(sequence_encode(&stats.src)?
710 .map(|_| stats.src.len() as f64 / 2.0)
711 .unwrap_or(0.0))
712 }
713
714 fn compress(
715 &self,
716 stats: &Self::StatsType,
717 _is_sample: bool,
718 _allowed_cascading: usize,
719 _excludes: &[Self::CodeType],
720 ) -> VortexResult<ArrayRef> {
721 if stats.null_count > 0 {
722 vortex_bail!("sequence encoding does not support nulls");
723 }
724 sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use itertools::Itertools;
731 use log::LevelFilter;
732 use rand::rngs::StdRng;
733 use rand::{RngCore, SeedableRng};
734 use vortex_array::arrays::PrimitiveArray;
735 use vortex_array::validity::Validity;
736 use vortex_array::vtable::ValidityHelper;
737 use vortex_array::{Array, IntoArray, ToCanonical};
738 use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
739 use vortex_dict::DictEncoding;
740 use vortex_sequence::SequenceEncoding;
741 use vortex_sparse::SparseEncoding;
742 use vortex_utils::aliases::hash_set::HashSet;
743
744 use crate::integer::{IntCompressor, IntegerStats, SequenceScheme, SparseScheme};
745 use crate::{Compressor, CompressorStats, Scheme};
746
747 #[test]
748 fn test_empty() {
749 let result = IntCompressor::compress(
751 &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
752 false,
753 3,
754 &[],
755 )
756 .unwrap();
757
758 assert!(result.is_empty());
759 }
760
761 #[test]
762 fn test_dict_encodable() {
763 let mut codes = BufferMut::<i32>::with_capacity(65_535);
764 let numbers = [0, 10, 50, 100, 1000, 3000]
768 .into_iter()
769 .map(|i| 1234 * i)
770 .collect_vec();
771
772 let mut rng = StdRng::seed_from_u64(1u64);
773 while codes.len() < 64000 {
774 let run_length = rng.next_u32() % 5;
775 let value = numbers[rng.next_u32() as usize % numbers.len()];
776 for _ in 0..run_length {
777 codes.push(value);
778 }
779 }
780
781 let primitive = codes.freeze().into_array().to_primitive();
782 let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
783 assert_eq!(compressed.encoding_id(), DictEncoding.id());
784 }
785
786 #[test]
787 fn test_window_name() {
788 env_logger::builder()
789 .filter(None, LevelFilter::Debug)
790 .try_init()
791 .ok();
792
793 let mut values = buffer_mut![-1i32; 1_000_000];
795 let mut visited = HashSet::new();
796 let mut rng = StdRng::seed_from_u64(1u64);
797 while visited.len() < 223 {
798 let random = (rng.next_u32() as usize) % 1_000_000;
799 if visited.contains(&random) {
800 continue;
801 }
802 visited.insert(random);
803 values[random] = 5 * (rng.next_u64() % 100) as i32;
805 }
806
807 let array = values.freeze().into_array().to_primitive();
808 let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
809 log::info!("WindowName compressed: {}", compressed.display_tree());
810 }
811
812 #[test]
813 fn sparse_with_nulls() {
814 let array = PrimitiveArray::new(
815 buffer![189u8, 189, 189, 0, 46],
816 Validity::from_iter(vec![true, true, true, true, false]),
817 );
818 let compressed = SparseScheme
819 .compress(&IntegerStats::generate(&array), false, 3, &[])
820 .unwrap();
821 assert_eq!(compressed.encoding_id(), SparseEncoding.id());
822 let decoded = compressed.to_primitive();
823 let expected = [189u8, 189, 189, 0, 0];
824 assert_eq!(decoded.as_slice::<u8>(), &expected);
825 assert_eq!(decoded.validity(), array.validity());
826 }
827
828 #[test]
829 fn sparse_mostly_nulls() {
830 let array = PrimitiveArray::new(
831 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
832 Validity::from_iter(vec![
833 false, false, false, false, false, false, false, false, false, false, true,
834 ]),
835 );
836 let compressed = SparseScheme
837 .compress(&IntegerStats::generate(&array), false, 3, &[])
838 .unwrap();
839 assert_eq!(compressed.encoding_id(), SparseEncoding.id());
840 let decoded = compressed.to_primitive();
841 let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
842 assert_eq!(decoded.as_slice::<u8>(), &expected);
843 assert_eq!(decoded.validity(), array.validity());
844 }
845
846 #[test]
847 fn nullable_sequence() {
848 let values = (0i32..20).step_by(7).collect_vec();
849 let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
850 let compressed = SequenceScheme
851 .compress(&IntegerStats::generate(&array), false, 3, &[])
852 .unwrap();
853 assert_eq!(compressed.encoding_id(), SequenceEncoding.id());
854 let decoded = compressed.to_primitive();
855 assert_eq!(decoded.as_slice::<i32>(), values.as_slice());
856 }
857}