1pub mod dictionary;
2mod stats;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7pub use stats::IntegerStats;
8use vortex_array::arrays::{ConstantArray, PrimitiveArray};
9use vortex_array::compress::downscale_integer_array;
10use vortex_array::nbytes::NBytes;
11use vortex_array::variants::PrimitiveArrayTrait;
12use vortex_array::{Array, ArrayExt, ArrayRef, ArrayStatistics, ToCanonical};
13use vortex_dict::DictArray;
14use vortex_error::{VortexExpect, VortexResult, VortexUnwrap};
15use vortex_fastlanes::{FoRArray, bit_width_histogram, bitpack_encode, find_best_bit_width};
16use vortex_runend::RunEndArray;
17use vortex_runend::compress::runend_encode;
18use vortex_scalar::Scalar;
19use vortex_sparse::SparseArray;
20use vortex_zigzag::{ZigZagArray, zigzag_encode};
21
22use crate::integer::dictionary::dictionary_encode;
23use crate::patches::compress_patches;
24use crate::{
25 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
26 estimate_compression_ratio_with_sampling,
27};
28
29pub struct IntCompressor;
30
31impl Compressor for IntCompressor {
32 type ArrayType = PrimitiveArray;
33 type SchemeType = dyn IntegerScheme;
34 type StatsType = IntegerStats;
35
36 fn schemes() -> &'static [&'static dyn IntegerScheme] {
37 &[
38 &ConstantScheme,
39 &FORScheme,
40 &ZigZagScheme,
41 &BitPackingScheme,
42 &SparseScheme,
43 &DictScheme,
44 &RunEndScheme,
45 ]
46 }
47
48 fn default_scheme() -> &'static Self::SchemeType {
49 &UncompressedScheme
50 }
51
52 fn dict_scheme_code() -> IntCode {
53 DICT_SCHEME
54 }
55}
56
57impl IntCompressor {
58 pub fn compress_no_dict(
59 array: &PrimitiveArray,
60 is_sample: bool,
61 allowed_cascading: usize,
62 excludes: &[IntCode],
63 ) -> VortexResult<ArrayRef> {
64 let stats = IntegerStats::generate_opts(
65 array,
66 GenerateStatsOptions {
67 count_distinct_values: false,
68 },
69 );
70
71 let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
72 let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
73
74 if output.nbytes() < array.nbytes() {
75 Ok(output)
76 } else {
77 log::debug!("resulting tree too large: {}", output.tree_display());
78 Ok(array.to_array())
79 }
80 }
81}
82
83pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
84
85impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
87
88#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
89pub struct IntCode(u8);
90
91const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
92const CONSTANT_SCHEME: IntCode = IntCode(1);
93const FOR_SCHEME: IntCode = IntCode(2);
94const ZIGZAG_SCHEME: IntCode = IntCode(3);
95const BITPACKING_SCHEME: IntCode = IntCode(4);
96const SPARSE_SCHEME: IntCode = IntCode(5);
97const DICT_SCHEME: IntCode = IntCode(6);
98const RUNEND_SCHEME: IntCode = IntCode(7);
99
100#[derive(Debug, Copy, Clone)]
101pub struct UncompressedScheme;
102
103#[derive(Debug, Copy, Clone)]
104pub struct ConstantScheme;
105
106#[derive(Debug, Copy, Clone)]
107pub struct FORScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct ZigZagScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct BitPackingScheme;
114
115#[derive(Debug, Copy, Clone)]
116pub struct SparseScheme;
117
118#[derive(Debug, Copy, Clone)]
119pub struct DictScheme;
120
121#[derive(Debug, Copy, Clone)]
122pub struct RunEndScheme;
123
124const RUN_END_THRESHOLD: u32 = 4;
126
127impl Scheme for UncompressedScheme {
128 type StatsType = IntegerStats;
129 type CodeType = IntCode;
130
131 fn code(&self) -> IntCode {
132 UNCOMPRESSED_SCHEME
133 }
134
135 fn expected_compression_ratio(
136 &self,
137 _stats: &IntegerStats,
138 _is_sample: bool,
139 _allowed_cascading: usize,
140 _excludes: &[IntCode],
141 ) -> VortexResult<f64> {
142 Ok(1.0)
144 }
145
146 fn compress(
147 &self,
148 stats: &IntegerStats,
149 _is_sample: bool,
150 _allowed_cascading: usize,
151 _excludes: &[IntCode],
152 ) -> VortexResult<ArrayRef> {
153 Ok(stats.source().clone().into_array())
154 }
155}
156
157impl Scheme for ConstantScheme {
158 type StatsType = IntegerStats;
159 type CodeType = IntCode;
160
161 fn code(&self) -> IntCode {
162 CONSTANT_SCHEME
163 }
164
165 fn is_constant(&self) -> bool {
166 true
167 }
168
169 fn expected_compression_ratio(
170 &self,
171 stats: &IntegerStats,
172 is_sample: bool,
173 _allowed_cascading: usize,
174 _excludes: &[IntCode],
175 ) -> VortexResult<f64> {
176 if is_sample {
178 return Ok(0.0);
179 }
180
181 if stats.distinct_values_count != 1 {
183 return Ok(0.0);
184 }
185
186 if stats.null_count > 0 && stats.value_count > 0 {
188 return Ok(0.0);
189 }
190
191 Ok(stats.value_count as f64)
192 }
193
194 fn compress(
195 &self,
196 stats: &IntegerStats,
197 _is_sample: bool,
198 _allowed_cascading: usize,
199 _excludes: &[IntCode],
200 ) -> VortexResult<ArrayRef> {
201 let scalar = stats
204 .source()
205 .as_constant()
206 .vortex_expect("constant array expected");
207
208 Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
209 }
210}
211
212impl Scheme for FORScheme {
213 type StatsType = IntegerStats;
214 type CodeType = IntCode;
215
216 fn code(&self) -> IntCode {
217 FOR_SCHEME
218 }
219
220 fn expected_compression_ratio(
221 &self,
222 stats: &IntegerStats,
223 _is_sample: bool,
224 allowed_cascading: usize,
225 _excludes: &[IntCode],
226 ) -> VortexResult<f64> {
227 if allowed_cascading == 0 {
229 return Ok(0.0);
230 }
231
232 if stats.value_count == 0 {
234 return Ok(0.0);
235 }
236
237 if stats.typed.min_is_zero() {
239 return Ok(0.0);
240 }
241
242 let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
244 let bw = match stats.typed.max_minus_min().checked_ilog2() {
245 Some(l) => l + 1,
246 None => return Ok(0.0),
249 };
250
251 if full_width - bw < 8 {
253 return Ok(0.0);
254 }
255
256 Ok(full_width as f64 / bw as f64)
257 }
258
259 fn compress(
260 &self,
261 stats: &IntegerStats,
262 is_sample: bool,
263 _allowed_cascading: usize,
264 excludes: &[IntCode],
265 ) -> VortexResult<ArrayRef> {
266 let for_array = FoRArray::encode(stats.src.clone())?;
267 let biased = for_array.encoded().to_primitive()?;
268 let biased_stats = IntegerStats::generate_opts(
269 &biased,
270 GenerateStatsOptions {
271 count_distinct_values: false,
272 },
273 );
274
275 let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
280
281 Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
282 }
283}
284
285impl Scheme for ZigZagScheme {
286 type StatsType = IntegerStats;
287 type CodeType = IntCode;
288
289 fn code(&self) -> IntCode {
290 ZIGZAG_SCHEME
291 }
292
293 fn expected_compression_ratio(
294 &self,
295 stats: &IntegerStats,
296 is_sample: bool,
297 allowed_cascading: usize,
298 excludes: &[IntCode],
299 ) -> VortexResult<f64> {
300 if allowed_cascading == 0 {
302 return Ok(0.0);
303 }
304
305 if stats.value_count == 0 {
307 return Ok(0.0);
308 }
309
310 if !stats.typed.min_is_negative() {
312 return Ok(0.0);
313 }
314
315 estimate_compression_ratio_with_sampling(
317 self,
318 stats,
319 is_sample,
320 allowed_cascading,
321 excludes,
322 )
323 }
324
325 fn compress(
326 &self,
327 stats: &IntegerStats,
328 is_sample: bool,
329 allowed_cascading: usize,
330 excludes: &[IntCode],
331 ) -> VortexResult<ArrayRef> {
332 let zag = zigzag_encode(stats.src.clone())?;
334 let encoded = zag.encoded().to_primitive()?;
335
336 let mut new_excludes = vec![
339 ZigZagScheme.code(),
340 DictScheme.code(),
341 RunEndScheme.code(),
342 SparseScheme.code(),
343 ];
344 new_excludes.extend_from_slice(excludes);
345
346 let compressed =
347 IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
348
349 log::debug!("zigzag output: {}", compressed.tree_display());
350
351 Ok(ZigZagArray::try_new(compressed)?.into_array())
352 }
353}
354
355impl Scheme for BitPackingScheme {
356 type StatsType = IntegerStats;
357 type CodeType = IntCode;
358
359 fn code(&self) -> IntCode {
360 BITPACKING_SCHEME
361 }
362
363 #[allow(clippy::cast_possible_truncation)]
364 fn expected_compression_ratio(
365 &self,
366 stats: &IntegerStats,
367 is_sample: bool,
368 allowed_cascading: usize,
369 excludes: &[IntCode],
370 ) -> VortexResult<f64> {
371 if stats.typed.min_is_negative() {
373 return Ok(0.0);
374 }
375
376 if stats.value_count == 0 {
378 return Ok(0.0);
379 }
380
381 estimate_compression_ratio_with_sampling(
382 self,
383 stats,
384 is_sample,
385 allowed_cascading,
386 excludes,
387 )
388 }
389
390 #[allow(clippy::cast_possible_truncation)]
391 fn compress(
392 &self,
393 stats: &IntegerStats,
394 _is_sample: bool,
395 _allowed_cascading: usize,
396 _excludes: &[IntCode],
397 ) -> VortexResult<ArrayRef> {
398 let histogram = bit_width_histogram(stats.source())?;
399 let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
400 if bw as usize == stats.source().ptype().bit_width() {
402 return Ok(stats.source().clone().into_array());
403 }
404 let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
405
406 let patches = packed.patches().map(compress_patches).transpose()?;
407 packed.replace_patches(patches);
408
409 Ok(packed.into_array())
410 }
411}
412
413impl Scheme for SparseScheme {
414 type StatsType = IntegerStats;
415 type CodeType = IntCode;
416
417 fn code(&self) -> IntCode {
418 SPARSE_SCHEME
419 }
420
421 fn expected_compression_ratio(
423 &self,
424 stats: &IntegerStats,
425 _is_sample: bool,
426 _allowed_cascading: usize,
427 _excludes: &[IntCode],
428 ) -> VortexResult<f64> {
429 if stats.value_count == 0 {
430 return Ok(0.0);
432 }
433
434 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
436 return Ok(stats.src.len() as f64 / stats.value_count as f64);
437 }
438
439 let (_, top_count) = stats.typed.top_value_and_count();
441
442 if top_count == stats.value_count {
443 return Ok(0.0);
445 }
446
447 let freq = top_count as f64 / stats.value_count as f64;
448 if freq >= 0.9 {
449 return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
451 }
452
453 Ok(0.0)
454 }
455
456 fn compress(
457 &self,
458 stats: &IntegerStats,
459 is_sample: bool,
460 allowed_cascading: usize,
461 excludes: &[IntCode],
462 ) -> VortexResult<ArrayRef> {
463 assert!(allowed_cascading > 0);
464 let (top_pvalue, top_count) = stats.typed.top_value_and_count();
465 if top_count as usize == stats.src.len() {
466 return Ok(ConstantArray::new(
468 Scalar::primitive_value(
469 top_pvalue,
470 top_pvalue.ptype(),
471 stats.src.dtype().nullability(),
472 ),
473 stats.src.len(),
474 )
475 .into_array());
476 }
477
478 let sparse_encoded = SparseArray::encode(
479 &stats.src,
480 Some(Scalar::primitive_value(
481 top_pvalue,
482 top_pvalue.ptype(),
483 stats.src.dtype().nullability(),
484 )),
485 )?;
486
487 if let Some(sparse) = sparse_encoded.as_opt::<SparseArray>() {
488 let mut new_excludes = vec![SparseScheme.code()];
490 new_excludes.extend_from_slice(excludes);
491
492 let compressed_values = IntCompressor::compress_no_dict(
493 &sparse.patches().values().to_primitive()?,
494 is_sample,
495 allowed_cascading - 1,
496 &new_excludes,
497 )?;
498
499 let indices =
500 downscale_integer_array(sparse.patches().indices().clone())?.to_primitive()?;
501
502 let compressed_indices = IntCompressor::compress_no_dict(
503 &indices,
504 is_sample,
505 allowed_cascading - 1,
506 &new_excludes,
507 )?;
508
509 SparseArray::try_new(
510 compressed_indices,
511 compressed_values,
512 sparse.len(),
513 sparse.fill_scalar().clone(),
514 )
515 .map(|a| a.into_array())
516 } else {
517 Ok(sparse_encoded)
518 }
519 }
520}
521
522impl Scheme for DictScheme {
523 type StatsType = IntegerStats;
524 type CodeType = IntCode;
525
526 fn code(&self) -> IntCode {
527 DICT_SCHEME
528 }
529
530 fn expected_compression_ratio(
531 &self,
532 stats: &IntegerStats,
533 _is_sample: bool,
534 allowed_cascading: usize,
535 _excludes: &[IntCode],
536 ) -> VortexResult<f64> {
537 if allowed_cascading == 0 {
539 return Ok(0.0);
540 }
541
542 if stats.value_count == 0 {
543 return Ok(0.0);
544 }
545
546 if stats.distinct_values_count > stats.value_count / 2 {
548 return Ok(0.0);
549 }
550
551 let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
553
554 let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
556
557 let n_runs = stats.value_count / stats.average_run_length;
558
559 let codes_size_bp = (codes_bw * stats.value_count) as usize;
561 let codes_size_rle_bp = (codes_bw + 32) * n_runs;
562
563 let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
564
565 let before = stats.value_count as usize * stats.source().ptype().bit_width();
566
567 Ok(before as f64 / (values_size + codes_size) as f64)
568 }
569
570 fn compress(
571 &self,
572 stats: &IntegerStats,
573 is_sample: bool,
574 allowed_cascading: usize,
575 excludes: &[IntCode],
576 ) -> VortexResult<ArrayRef> {
577 assert!(allowed_cascading > 0);
578
579 let dict = dictionary_encode(stats)?;
583
584 let mut new_excludes = vec![DICT_SCHEME];
586 new_excludes.extend_from_slice(excludes);
587
588 let compressed_codes = IntCompressor::compress_no_dict(
589 &dict.codes().to_primitive()?,
590 is_sample,
591 allowed_cascading - 1,
592 &new_excludes,
593 )?;
594
595 Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
596 }
597}
598
599impl Scheme for RunEndScheme {
600 type StatsType = IntegerStats;
601 type CodeType = IntCode;
602
603 fn code(&self) -> IntCode {
604 RUNEND_SCHEME
605 }
606
607 fn expected_compression_ratio(
608 &self,
609 stats: &IntegerStats,
610 is_sample: bool,
611 allowed_cascading: usize,
612 excludes: &[IntCode],
613 ) -> VortexResult<f64> {
614 if stats.average_run_length < RUN_END_THRESHOLD {
616 return Ok(0.0);
617 }
618
619 if allowed_cascading == 0 {
620 return Ok(0.0);
621 }
622
623 estimate_compression_ratio_with_sampling(
625 self,
626 stats,
627 is_sample,
628 allowed_cascading,
629 excludes,
630 )
631 }
632
633 fn compress(
634 &self,
635 stats: &IntegerStats,
636 is_sample: bool,
637 allowed_cascading: usize,
638 excludes: &[IntCode],
639 ) -> VortexResult<ArrayRef> {
640 assert!(allowed_cascading > 0);
641
642 let (ends, values) = runend_encode(&stats.src)?;
644
645 let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
646 new_excludes.extend_from_slice(excludes);
647
648 let ends_stats = IntegerStats::generate_opts(
649 &ends,
650 GenerateStatsOptions {
651 count_distinct_values: false,
652 },
653 );
654 let ends_scheme = IntCompressor::choose_scheme(
655 &ends_stats,
656 is_sample,
657 allowed_cascading - 1,
658 &new_excludes,
659 )?;
660 let compressed_ends =
661 ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
662
663 let compressed_values = IntCompressor::compress_no_dict(
664 &values.to_primitive()?,
665 is_sample,
666 allowed_cascading - 1,
667 &new_excludes,
668 )?;
669
670 Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use itertools::Itertools;
677 use log::LevelFilter;
678 use rand::rngs::StdRng;
679 use rand::{RngCore, SeedableRng};
680 use vortex_array::aliases::hash_set::HashSet;
681 use vortex_array::arrays::PrimitiveArray;
682 use vortex_array::validity::Validity;
683 use vortex_array::vtable::EncodingVTable;
684 use vortex_array::{Array, IntoArray, ToCanonical};
685 use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
686 use vortex_sparse::SparseEncoding;
687
688 use crate::integer::{IntCompressor, IntegerStats, SparseScheme};
689 use crate::{Compressor, CompressorStats, Scheme};
690
691 #[test]
692 fn test_empty() {
693 let result = IntCompressor::compress(
695 &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
696 false,
697 3,
698 &[],
699 )
700 .unwrap();
701
702 assert!(result.is_empty());
703 }
704
705 #[test]
706 fn test_dict_encodable() {
707 let mut codes = BufferMut::<i32>::with_capacity(65_535);
708 let numbers = [0, 10, 50, 100, 1000, 3000]
712 .into_iter()
713 .map(|i| 1234 * i)
714 .collect_vec();
715
716 let mut rng = StdRng::seed_from_u64(1u64);
717 while codes.len() < 64000 {
718 let run_length = rng.next_u32() % 5;
719 let value = numbers[rng.next_u32() as usize % numbers.len()];
720 for _ in 0..run_length {
721 codes.push(value);
722 }
723 }
724
725 let primitive = codes.freeze().into_array().to_primitive().unwrap();
726 let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
727 log::info!("compressed values: {}", compressed.tree_display());
728 }
729
730 #[test]
731 fn test_window_name() {
732 env_logger::builder()
733 .filter(None, LevelFilter::Debug)
734 .try_init()
735 .ok();
736
737 let mut values = buffer_mut![-1i32; 1_000_000];
739 let mut visited = HashSet::new();
740 let mut rng = StdRng::seed_from_u64(1u64);
741 while visited.len() < 223 {
742 let random = (rng.next_u32() as usize) % 1_000_000;
743 if visited.contains(&random) {
744 continue;
745 }
746 visited.insert(random);
747 values[random] = 5 * (rng.next_u64() % 100) as i32;
749 }
750
751 let array = values.freeze().into_array().to_primitive().unwrap();
752 let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
753 log::info!("WindowName compressed: {}", compressed.tree_display());
754 }
755
756 #[test]
757 fn sparse_with_nulls() {
758 let array = PrimitiveArray::new(
759 buffer![189u8, 189, 189, 0, 46],
760 Validity::from_iter(vec![true, true, true, true, false]),
761 );
762 let compressed = SparseScheme
763 .compress(&IntegerStats::generate(&array), false, 3, &[])
764 .unwrap();
765 assert_eq!(compressed.encoding(), SparseEncoding.id());
766 let decoded = compressed.to_primitive().unwrap();
767 let expected = [189u8, 189, 189, 0, 0];
768 assert_eq!(decoded.as_slice::<u8>(), &expected);
769 assert_eq!(decoded.validity(), array.validity());
770 }
771
772 #[test]
773 fn sparse_mostly_nulls() {
774 let array = PrimitiveArray::new(
775 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
776 Validity::from_iter(vec![
777 false, false, false, false, false, false, false, false, false, false, true,
778 ]),
779 );
780 let compressed = SparseScheme
781 .compress(&IntegerStats::generate(&array), false, 3, &[])
782 .unwrap();
783 assert_eq!(compressed.encoding(), SparseEncoding.id());
784 let decoded = compressed.to_primitive().unwrap();
785 let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
786 assert_eq!(decoded.as_slice::<u8>(), &expected);
787 assert_eq!(decoded.validity(), array.validity());
788 }
789}