1pub mod dictionary;
2mod stats;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7pub use stats::IntegerStats;
8use vortex_array::arrays::{ConstantArray, PrimitiveArray};
9use vortex_array::compress::downscale_integer_array;
10use vortex_array::nbytes::NBytes;
11use vortex_array::variants::PrimitiveArrayTrait;
12use vortex_array::{Array, ArrayExt, ArrayRef, ArrayStatistics, ToCanonical};
13use vortex_dict::DictArray;
14use vortex_error::{VortexExpect, VortexResult, VortexUnwrap};
15use vortex_fastlanes::{FoRArray, bitpack_encode, find_best_bit_width};
16use vortex_runend::RunEndArray;
17use vortex_runend::compress::runend_encode;
18use vortex_scalar::Scalar;
19use vortex_sparse::SparseArray;
20use vortex_zigzag::{ZigZagArray, zigzag_encode};
21
22use crate::integer::dictionary::dictionary_encode;
23use crate::patches::compress_patches;
24use crate::{
25 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
26 estimate_compression_ratio_with_sampling,
27};
28
29pub struct IntCompressor;
30
31impl Compressor for IntCompressor {
32 type ArrayType = PrimitiveArray;
33 type SchemeType = dyn IntegerScheme;
34 type StatsType = IntegerStats;
35
36 fn schemes() -> &'static [&'static dyn IntegerScheme] {
37 &[
38 &ConstantScheme,
39 &FORScheme,
40 &ZigZagScheme,
41 &BitPackingScheme,
42 &SparseScheme,
43 &DictScheme,
44 &RunEndScheme,
45 ]
46 }
47
48 fn default_scheme() -> &'static Self::SchemeType {
49 &UncompressedScheme
50 }
51
52 fn dict_scheme_code() -> IntCode {
53 DICT_SCHEME
54 }
55}
56
57impl IntCompressor {
58 pub fn compress_no_dict(
59 array: &PrimitiveArray,
60 is_sample: bool,
61 allowed_cascading: usize,
62 excludes: &[IntCode],
63 ) -> VortexResult<ArrayRef> {
64 let stats = IntegerStats::generate_opts(
65 array,
66 GenerateStatsOptions {
67 count_distinct_values: false,
68 },
69 );
70
71 let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
72 let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
73
74 if output.nbytes() < array.nbytes() {
75 Ok(output)
76 } else {
77 log::debug!("resulting tree too large: {}", output.tree_display());
78 Ok(array.to_array())
79 }
80 }
81}
82
83pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
84
85impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
87
88#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
89pub struct IntCode(u8);
90
91const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
92const CONSTANT_SCHEME: IntCode = IntCode(1);
93const FOR_SCHEME: IntCode = IntCode(2);
94const ZIGZAG_SCHEME: IntCode = IntCode(3);
95const BITPACKING_SCHEME: IntCode = IntCode(4);
96const SPARSE_SCHEME: IntCode = IntCode(5);
97const DICT_SCHEME: IntCode = IntCode(6);
98const RUNEND_SCHEME: IntCode = IntCode(7);
99
100#[derive(Debug, Copy, Clone)]
101pub struct UncompressedScheme;
102
103#[derive(Debug, Copy, Clone)]
104pub struct ConstantScheme;
105
106#[derive(Debug, Copy, Clone)]
107pub struct FORScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct ZigZagScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct BitPackingScheme;
114
115#[derive(Debug, Copy, Clone)]
116pub struct SparseScheme;
117
118#[derive(Debug, Copy, Clone)]
119pub struct DictScheme;
120
121#[derive(Debug, Copy, Clone)]
122pub struct RunEndScheme;
123
124const RUN_END_THRESHOLD: u32 = 4;
126
127impl Scheme for UncompressedScheme {
128 type StatsType = IntegerStats;
129 type CodeType = IntCode;
130
131 fn code(&self) -> IntCode {
132 UNCOMPRESSED_SCHEME
133 }
134
135 fn expected_compression_ratio(
136 &self,
137 _stats: &IntegerStats,
138 _is_sample: bool,
139 _allowed_cascading: usize,
140 _excludes: &[IntCode],
141 ) -> VortexResult<f64> {
142 Ok(1.0)
144 }
145
146 fn compress(
147 &self,
148 stats: &IntegerStats,
149 _is_sample: bool,
150 _allowed_cascading: usize,
151 _excludes: &[IntCode],
152 ) -> VortexResult<ArrayRef> {
153 Ok(stats.source().clone().into_array())
154 }
155}
156
157impl Scheme for ConstantScheme {
158 type StatsType = IntegerStats;
159 type CodeType = IntCode;
160
161 fn code(&self) -> IntCode {
162 CONSTANT_SCHEME
163 }
164
165 fn is_constant(&self) -> bool {
166 true
167 }
168
169 fn expected_compression_ratio(
170 &self,
171 stats: &IntegerStats,
172 is_sample: bool,
173 _allowed_cascading: usize,
174 _excludes: &[IntCode],
175 ) -> VortexResult<f64> {
176 if is_sample {
178 return Ok(0.0);
179 }
180
181 if stats.distinct_values_count != 1 {
183 return Ok(0.0);
184 }
185
186 if stats.null_count > 0 && stats.value_count > 0 {
188 return Ok(0.0);
189 }
190
191 Ok(stats.value_count as f64)
192 }
193
194 fn compress(
195 &self,
196 stats: &IntegerStats,
197 _is_sample: bool,
198 _allowed_cascading: usize,
199 _excludes: &[IntCode],
200 ) -> VortexResult<ArrayRef> {
201 let scalar = stats
204 .source()
205 .as_constant()
206 .vortex_expect("constant array expected");
207
208 Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
209 }
210}
211
212impl Scheme for FORScheme {
213 type StatsType = IntegerStats;
214 type CodeType = IntCode;
215
216 fn code(&self) -> IntCode {
217 FOR_SCHEME
218 }
219
220 fn expected_compression_ratio(
221 &self,
222 stats: &IntegerStats,
223 _is_sample: bool,
224 allowed_cascading: usize,
225 _excludes: &[IntCode],
226 ) -> VortexResult<f64> {
227 if allowed_cascading == 0 {
229 return Ok(0.0);
230 }
231
232 if stats.value_count == 0 {
234 return Ok(0.0);
235 }
236
237 if stats.typed.min_is_zero() {
239 return Ok(0.0);
240 }
241
242 let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
244 let bw = match stats.typed.max_minus_min().checked_ilog2() {
245 Some(l) => l + 1,
246 None => return Ok(0.0),
249 };
250
251 if full_width - bw < 8 {
253 return Ok(0.0);
254 }
255
256 Ok(full_width as f64 / bw as f64)
257 }
258
259 fn compress(
260 &self,
261 stats: &IntegerStats,
262 is_sample: bool,
263 _allowed_cascading: usize,
264 excludes: &[IntCode],
265 ) -> VortexResult<ArrayRef> {
266 let for_array = FoRArray::encode(stats.src.clone())?;
267 let biased = for_array.encoded().to_primitive()?;
268 let biased_stats = IntegerStats::generate_opts(
269 &biased,
270 GenerateStatsOptions {
271 count_distinct_values: false,
272 },
273 );
274
275 let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
280
281 Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
282 }
283}
284
285impl Scheme for ZigZagScheme {
286 type StatsType = IntegerStats;
287 type CodeType = IntCode;
288
289 fn code(&self) -> IntCode {
290 ZIGZAG_SCHEME
291 }
292
293 fn expected_compression_ratio(
294 &self,
295 stats: &IntegerStats,
296 is_sample: bool,
297 allowed_cascading: usize,
298 excludes: &[IntCode],
299 ) -> VortexResult<f64> {
300 if allowed_cascading == 0 {
302 return Ok(0.0);
303 }
304
305 if stats.value_count == 0 {
307 return Ok(0.0);
308 }
309
310 if !stats.typed.min_is_negative() {
312 return Ok(0.0);
313 }
314
315 estimate_compression_ratio_with_sampling(
317 self,
318 stats,
319 is_sample,
320 allowed_cascading,
321 excludes,
322 )
323 }
324
325 fn compress(
326 &self,
327 stats: &IntegerStats,
328 is_sample: bool,
329 allowed_cascading: usize,
330 excludes: &[IntCode],
331 ) -> VortexResult<ArrayRef> {
332 let zag = zigzag_encode(stats.src.clone())?;
334 let encoded = zag.encoded().to_primitive()?;
335
336 let mut new_excludes = vec![
339 ZigZagScheme.code(),
340 DictScheme.code(),
341 RunEndScheme.code(),
342 SparseScheme.code(),
343 ];
344 new_excludes.extend_from_slice(excludes);
345
346 let compressed =
347 IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
348
349 log::debug!("zigzag output: {}", compressed.tree_display());
350
351 Ok(ZigZagArray::try_new(compressed)?.into_array())
352 }
353}
354
355impl Scheme for BitPackingScheme {
356 type StatsType = IntegerStats;
357 type CodeType = IntCode;
358
359 fn code(&self) -> IntCode {
360 BITPACKING_SCHEME
361 }
362
363 #[allow(clippy::cast_possible_truncation)]
364 fn expected_compression_ratio(
365 &self,
366 stats: &IntegerStats,
367 is_sample: bool,
368 allowed_cascading: usize,
369 excludes: &[IntCode],
370 ) -> VortexResult<f64> {
371 if stats.typed.min_is_negative() {
373 return Ok(0.0);
374 }
375
376 if stats.value_count == 0 {
378 return Ok(0.0);
379 }
380
381 estimate_compression_ratio_with_sampling(
382 self,
383 stats,
384 is_sample,
385 allowed_cascading,
386 excludes,
387 )
388 }
389
390 #[allow(clippy::cast_possible_truncation)]
391 fn compress(
392 &self,
393 stats: &IntegerStats,
394 _is_sample: bool,
395 _allowed_cascading: usize,
396 _excludes: &[IntCode],
397 ) -> VortexResult<ArrayRef> {
398 let bw = find_best_bit_width(stats.source())?;
399 if bw as usize == stats.source().ptype().bit_width() {
401 return Ok(stats.source().clone().into_array());
402 }
403 let mut packed = bitpack_encode(stats.source(), bw)?;
404
405 let patches = packed.patches().map(compress_patches).transpose()?;
406 packed.replace_patches(patches);
407
408 Ok(packed.into_array())
409 }
410}
411
412impl Scheme for SparseScheme {
413 type StatsType = IntegerStats;
414 type CodeType = IntCode;
415
416 fn code(&self) -> IntCode {
417 SPARSE_SCHEME
418 }
419
420 fn expected_compression_ratio(
422 &self,
423 stats: &IntegerStats,
424 _is_sample: bool,
425 _allowed_cascading: usize,
426 _excludes: &[IntCode],
427 ) -> VortexResult<f64> {
428 if stats.value_count == 0 {
429 return Ok(0.0);
431 }
432
433 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
435 return Ok(stats.src.len() as f64 / stats.value_count as f64);
436 }
437
438 let (_, top_count) = stats.typed.top_value_and_count();
440
441 if top_count == stats.value_count {
442 return Ok(0.0);
444 }
445
446 let freq = top_count as f64 / stats.value_count as f64;
447 if freq >= 0.9 {
448 return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
450 }
451
452 Ok(0.0)
453 }
454
455 fn compress(
456 &self,
457 stats: &IntegerStats,
458 is_sample: bool,
459 allowed_cascading: usize,
460 excludes: &[IntCode],
461 ) -> VortexResult<ArrayRef> {
462 assert!(allowed_cascading > 0);
463 let (top_pvalue, top_count) = stats.typed.top_value_and_count();
464 if top_count as usize == stats.src.len() {
465 return Ok(ConstantArray::new(
467 Scalar::primitive_value(
468 top_pvalue,
469 top_pvalue.ptype(),
470 stats.src.dtype().nullability(),
471 ),
472 stats.src.len(),
473 )
474 .into_array());
475 }
476
477 let sparse_encoded = SparseArray::encode(
478 &stats.src,
479 Some(Scalar::primitive_value(
480 top_pvalue,
481 top_pvalue.ptype(),
482 stats.src.dtype().nullability(),
483 )),
484 )?;
485
486 if let Some(sparse) = sparse_encoded.as_opt::<SparseArray>() {
487 let mut new_excludes = vec![SparseScheme.code()];
489 new_excludes.extend_from_slice(excludes);
490
491 let compressed_values = IntCompressor::compress_no_dict(
492 &sparse.patches().values().to_primitive()?,
493 is_sample,
494 allowed_cascading - 1,
495 &new_excludes,
496 )?;
497
498 let indices =
499 downscale_integer_array(sparse.patches().indices().clone())?.to_primitive()?;
500
501 let compressed_indices = IntCompressor::compress_no_dict(
502 &indices,
503 is_sample,
504 allowed_cascading - 1,
505 &new_excludes,
506 )?;
507
508 SparseArray::try_new(
509 compressed_indices,
510 compressed_values,
511 sparse.len(),
512 sparse.fill_scalar().clone(),
513 )
514 .map(|a| a.into_array())
515 } else {
516 Ok(sparse_encoded)
517 }
518 }
519}
520
521impl Scheme for DictScheme {
522 type StatsType = IntegerStats;
523 type CodeType = IntCode;
524
525 fn code(&self) -> IntCode {
526 DICT_SCHEME
527 }
528
529 fn expected_compression_ratio(
530 &self,
531 stats: &IntegerStats,
532 _is_sample: bool,
533 allowed_cascading: usize,
534 _excludes: &[IntCode],
535 ) -> VortexResult<f64> {
536 if allowed_cascading == 0 {
538 return Ok(0.0);
539 }
540
541 if stats.value_count == 0 {
542 return Ok(0.0);
543 }
544
545 if stats.distinct_values_count > stats.value_count / 2 {
547 return Ok(0.0);
548 }
549
550 let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
552
553 let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
555
556 let n_runs = stats.value_count / stats.average_run_length;
557
558 let codes_size_bp = (codes_bw * stats.value_count) as usize;
560 let codes_size_rle_bp = (codes_bw + 32) * n_runs;
561
562 let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
563
564 let before = stats.value_count as usize * stats.source().ptype().bit_width();
565
566 Ok(before as f64 / (values_size + codes_size) as f64)
567 }
568
569 fn compress(
570 &self,
571 stats: &IntegerStats,
572 is_sample: bool,
573 allowed_cascading: usize,
574 excludes: &[IntCode],
575 ) -> VortexResult<ArrayRef> {
576 assert!(allowed_cascading > 0);
577
578 let dict = dictionary_encode(stats)?;
582
583 let mut new_excludes = vec![DICT_SCHEME];
585 new_excludes.extend_from_slice(excludes);
586
587 let compressed_codes = IntCompressor::compress_no_dict(
588 &dict.codes().to_primitive()?,
589 is_sample,
590 allowed_cascading - 1,
591 &new_excludes,
592 )?;
593
594 Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
595 }
596}
597
598impl Scheme for RunEndScheme {
599 type StatsType = IntegerStats;
600 type CodeType = IntCode;
601
602 fn code(&self) -> IntCode {
603 RUNEND_SCHEME
604 }
605
606 fn expected_compression_ratio(
607 &self,
608 stats: &IntegerStats,
609 is_sample: bool,
610 allowed_cascading: usize,
611 excludes: &[IntCode],
612 ) -> VortexResult<f64> {
613 if stats.average_run_length < RUN_END_THRESHOLD {
615 return Ok(0.0);
616 }
617
618 if allowed_cascading == 0 {
619 return Ok(0.0);
620 }
621
622 estimate_compression_ratio_with_sampling(
624 self,
625 stats,
626 is_sample,
627 allowed_cascading,
628 excludes,
629 )
630 }
631
632 fn compress(
633 &self,
634 stats: &IntegerStats,
635 is_sample: bool,
636 allowed_cascading: usize,
637 excludes: &[IntCode],
638 ) -> VortexResult<ArrayRef> {
639 assert!(allowed_cascading > 0);
640
641 let (ends, values) = runend_encode(&stats.src)?;
643
644 let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
645 new_excludes.extend_from_slice(excludes);
646
647 let ends_stats = IntegerStats::generate_opts(
648 &ends,
649 GenerateStatsOptions {
650 count_distinct_values: false,
651 },
652 );
653 let ends_scheme = IntCompressor::choose_scheme(
654 &ends_stats,
655 is_sample,
656 allowed_cascading - 1,
657 &new_excludes,
658 )?;
659 let compressed_ends =
660 ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
661
662 let compressed_values = IntCompressor::compress_no_dict(
663 &values.to_primitive()?,
664 is_sample,
665 allowed_cascading - 1,
666 &new_excludes,
667 )?;
668
669 Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
670 }
671}
672
673#[cfg(test)]
674mod tests {
675 use itertools::Itertools;
676 use log::LevelFilter;
677 use rand::rngs::StdRng;
678 use rand::{RngCore, SeedableRng};
679 use vortex_array::aliases::hash_set::HashSet;
680 use vortex_array::arrays::PrimitiveArray;
681 use vortex_array::validity::Validity;
682 use vortex_array::vtable::EncodingVTable;
683 use vortex_array::{Array, IntoArray, ToCanonical};
684 use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
685 use vortex_sparse::SparseEncoding;
686
687 use crate::integer::{IntCompressor, IntegerStats, SparseScheme};
688 use crate::{Compressor, CompressorStats, Scheme};
689
690 #[test]
691 fn test_empty() {
692 let result = IntCompressor::compress(
694 &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
695 false,
696 3,
697 &[],
698 )
699 .unwrap();
700
701 assert!(result.is_empty());
702 }
703
704 #[test]
705 fn test_dict_encodable() {
706 let mut codes = BufferMut::<i32>::with_capacity(65_535);
707 let numbers = [0, 10, 50, 100, 1000, 3000]
711 .into_iter()
712 .map(|i| 1234 * i)
713 .collect_vec();
714
715 let mut rng = StdRng::seed_from_u64(1u64);
716 while codes.len() < 64000 {
717 let run_length = rng.next_u32() % 5;
718 let value = numbers[rng.next_u32() as usize % numbers.len()];
719 for _ in 0..run_length {
720 codes.push(value);
721 }
722 }
723
724 let primitive = codes.freeze().into_array().to_primitive().unwrap();
725 let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
726 log::info!("compressed values: {}", compressed.tree_display());
727 }
728
729 #[test]
730 fn test_window_name() {
731 env_logger::builder()
732 .filter(None, LevelFilter::Debug)
733 .try_init()
734 .ok();
735
736 let mut values = buffer_mut![-1i32; 1_000_000];
738 let mut visited = HashSet::new();
739 let mut rng = StdRng::seed_from_u64(1u64);
740 while visited.len() < 223 {
741 let random = (rng.next_u32() as usize) % 1_000_000;
742 if visited.contains(&random) {
743 continue;
744 }
745 visited.insert(random);
746 values[random] = 5 * (rng.next_u64() % 100) as i32;
748 }
749
750 let array = values.freeze().into_array().to_primitive().unwrap();
751 let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
752 log::info!("WindowName compressed: {}", compressed.tree_display());
753 }
754
755 #[test]
756 fn sparse_with_nulls() {
757 let array = PrimitiveArray::new(
758 buffer![189u8, 189, 189, 0, 46],
759 Validity::from_iter(vec![true, true, true, true, false]),
760 );
761 let compressed = SparseScheme
762 .compress(&IntegerStats::generate(&array), false, 3, &[])
763 .unwrap();
764 assert_eq!(compressed.encoding(), SparseEncoding.id());
765 let decoded = compressed.to_primitive().unwrap();
766 let expected = [189u8, 189, 189, 0, 0];
767 assert_eq!(decoded.as_slice::<u8>(), &expected);
768 assert_eq!(decoded.validity(), array.validity());
769 }
770
771 #[test]
772 fn sparse_mostly_nulls() {
773 let array = PrimitiveArray::new(
774 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
775 Validity::from_iter(vec![
776 false, false, false, false, false, false, false, false, false, false, true,
777 ]),
778 );
779 let compressed = SparseScheme
780 .compress(&IntegerStats::generate(&array), false, 3, &[])
781 .unwrap();
782 assert_eq!(compressed.encoding(), SparseEncoding.id());
783 let decoded = compressed.to_primitive().unwrap();
784 let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
785 assert_eq!(decoded.as_slice::<u8>(), &expected);
786 assert_eq!(decoded.validity(), array.validity());
787 }
788}