1pub mod dictionary;
2mod stats;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::ops::Not;
7
8use num_traits::PrimInt;
9pub use stats::IntegerStats;
10use vortex_array::arrays::{BooleanBufferBuilder, ConstantArray, PrimitiveArray};
11use vortex_array::compute::filter;
12use vortex_array::nbytes::NBytes;
13use vortex_array::variants::PrimitiveArrayTrait;
14use vortex_array::{Array, ArrayRef, ArrayStatistics, IntoArray, ToCanonical};
15use vortex_buffer::Buffer;
16use vortex_dict::DictArray;
17use vortex_dtype::match_each_integer_ptype;
18use vortex_error::{VortexExpect, VortexResult, VortexUnwrap};
19use vortex_fastlanes::{FoRArray, bitpack_encode, find_best_bit_width, for_compress};
20use vortex_mask::{AllOr, Mask};
21use vortex_runend::RunEndArray;
22use vortex_runend::compress::runend_encode;
23use vortex_scalar::Scalar;
24use vortex_sparse::SparseArray;
25use vortex_zigzag::{ZigZagArray, zigzag_encode};
26
27use crate::downscale::downscale_integer_array;
28use crate::integer::dictionary::dictionary_encode;
29use crate::patches::compress_patches;
30use crate::{
31 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
32 estimate_compression_ratio_with_sampling,
33};
34
35pub struct IntCompressor;
36
37impl Compressor for IntCompressor {
38 type ArrayType = PrimitiveArray;
39 type SchemeType = dyn IntegerScheme;
40 type StatsType = IntegerStats;
41
42 fn schemes() -> &'static [&'static dyn IntegerScheme] {
43 &[
44 &ConstantScheme,
45 &FORScheme,
46 &ZigZagScheme,
47 &BitPackingScheme,
48 &SparseScheme,
49 &DictScheme,
50 &RunEndScheme,
51 ]
52 }
53
54 fn default_scheme() -> &'static Self::SchemeType {
55 &UncompressedScheme
56 }
57
58 fn dict_scheme_code() -> IntCode {
59 DICT_SCHEME
60 }
61}
62
63impl IntCompressor {
64 pub fn compress_no_dict(
65 array: &PrimitiveArray,
66 is_sample: bool,
67 allowed_cascading: usize,
68 excludes: &[IntCode],
69 ) -> VortexResult<ArrayRef> {
70 let stats = IntegerStats::generate_opts(
71 array,
72 GenerateStatsOptions {
73 count_distinct_values: false,
74 },
75 );
76
77 let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
78 let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
79
80 if output.nbytes() < array.nbytes() {
81 Ok(output)
82 } else {
83 log::debug!("resulting tree too large: {}", output.tree_display());
84 Ok(array.to_array())
85 }
86 }
87}
88
89pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
90
91impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
93
94#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
95pub struct IntCode(u8);
96
97const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
98const CONSTANT_SCHEME: IntCode = IntCode(1);
99const FOR_SCHEME: IntCode = IntCode(2);
100const ZIGZAG_SCHEME: IntCode = IntCode(3);
101const BITPACKING_SCHEME: IntCode = IntCode(4);
102const SPARSE_SCHEME: IntCode = IntCode(5);
103const DICT_SCHEME: IntCode = IntCode(6);
104const RUNEND_SCHEME: IntCode = IntCode(7);
105
106#[derive(Debug, Copy, Clone)]
107pub struct UncompressedScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct ConstantScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct FORScheme;
114
115#[derive(Debug, Copy, Clone)]
116pub struct ZigZagScheme;
117
118#[derive(Debug, Copy, Clone)]
119pub struct BitPackingScheme;
120
121#[derive(Debug, Copy, Clone)]
122pub struct SparseScheme;
123
124#[derive(Debug, Copy, Clone)]
125pub struct DictScheme;
126
127#[derive(Debug, Copy, Clone)]
128pub struct RunEndScheme;
129
130const RUN_END_THRESHOLD: u32 = 4;
132
133impl Scheme for UncompressedScheme {
134 type StatsType = IntegerStats;
135 type CodeType = IntCode;
136
137 fn code(&self) -> IntCode {
138 UNCOMPRESSED_SCHEME
139 }
140
141 fn expected_compression_ratio(
142 &self,
143 _stats: &IntegerStats,
144 _is_sample: bool,
145 _allowed_cascading: usize,
146 _excludes: &[IntCode],
147 ) -> VortexResult<f64> {
148 Ok(1.0)
150 }
151
152 fn compress(
153 &self,
154 stats: &IntegerStats,
155 _is_sample: bool,
156 _allowed_cascading: usize,
157 _excludes: &[IntCode],
158 ) -> VortexResult<ArrayRef> {
159 Ok(stats.source().clone().into_array())
160 }
161}
162
163impl Scheme for ConstantScheme {
164 type StatsType = IntegerStats;
165 type CodeType = IntCode;
166
167 fn code(&self) -> IntCode {
168 CONSTANT_SCHEME
169 }
170
171 fn is_constant(&self) -> bool {
172 true
173 }
174
175 fn expected_compression_ratio(
176 &self,
177 stats: &IntegerStats,
178 is_sample: bool,
179 _allowed_cascading: usize,
180 _excludes: &[IntCode],
181 ) -> VortexResult<f64> {
182 if is_sample {
184 return Ok(0.0);
185 }
186
187 if stats.distinct_values_count != 1 {
189 return Ok(0.0);
190 }
191
192 if stats.null_count > 0 && stats.value_count > 0 {
194 return Ok(0.0);
195 }
196
197 Ok(stats.value_count as f64)
198 }
199
200 fn compress(
201 &self,
202 stats: &IntegerStats,
203 _is_sample: bool,
204 _allowed_cascading: usize,
205 _excludes: &[IntCode],
206 ) -> VortexResult<ArrayRef> {
207 let scalar = stats
210 .source()
211 .as_constant()
212 .vortex_expect("constant array expected");
213
214 Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
215 }
216}
217
218impl Scheme for FORScheme {
219 type StatsType = IntegerStats;
220 type CodeType = IntCode;
221
222 fn code(&self) -> IntCode {
223 FOR_SCHEME
224 }
225
226 fn expected_compression_ratio(
227 &self,
228 stats: &IntegerStats,
229 _is_sample: bool,
230 allowed_cascading: usize,
231 _excludes: &[IntCode],
232 ) -> VortexResult<f64> {
233 if allowed_cascading == 0 {
235 return Ok(0.0);
236 }
237
238 if stats.value_count == 0 {
240 return Ok(0.0);
241 }
242
243 if stats.typed.min_is_zero() {
245 return Ok(0.0);
246 }
247
248 let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
250 let padding = 64 - full_width;
251 let bw = full_width + padding - stats.typed.max_minus_min().leading_zeros();
252
253 if full_width - bw < 8 {
255 return Ok(0.0);
256 }
257
258 if bw == 0 {
260 return Ok(0.0);
261 }
262
263 Ok(full_width as f64 / bw as f64)
264 }
265
266 fn compress(
267 &self,
268 stats: &IntegerStats,
269 is_sample: bool,
270 _allowed_cascading: usize,
271 excludes: &[IntCode],
272 ) -> VortexResult<ArrayRef> {
273 let for_array = for_compress(stats.src.clone())?;
274 let biased = for_array.encoded().to_primitive()?;
275 let biased_stats = IntegerStats::generate_opts(
276 &biased,
277 GenerateStatsOptions {
278 count_distinct_values: false,
279 },
280 );
281
282 let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
287
288 Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
289 }
290}
291
292impl Scheme for ZigZagScheme {
293 type StatsType = IntegerStats;
294 type CodeType = IntCode;
295
296 fn code(&self) -> IntCode {
297 ZIGZAG_SCHEME
298 }
299
300 fn expected_compression_ratio(
301 &self,
302 stats: &IntegerStats,
303 is_sample: bool,
304 allowed_cascading: usize,
305 excludes: &[IntCode],
306 ) -> VortexResult<f64> {
307 if allowed_cascading == 0 {
309 return Ok(0.0);
310 }
311
312 if stats.value_count == 0 {
314 return Ok(0.0);
315 }
316
317 if !stats.typed.min_is_negative() {
319 return Ok(0.0);
320 }
321
322 estimate_compression_ratio_with_sampling(
324 self,
325 stats,
326 is_sample,
327 allowed_cascading,
328 excludes,
329 )
330 }
331
332 fn compress(
333 &self,
334 stats: &IntegerStats,
335 is_sample: bool,
336 allowed_cascading: usize,
337 excludes: &[IntCode],
338 ) -> VortexResult<ArrayRef> {
339 let zag = zigzag_encode(stats.src.clone())?;
341 let encoded = zag.encoded().to_primitive()?;
342
343 let mut new_excludes = vec![
346 ZigZagScheme.code(),
347 DictScheme.code(),
348 RunEndScheme.code(),
349 SparseScheme.code(),
350 ];
351 new_excludes.extend_from_slice(excludes);
352
353 let compressed =
354 IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
355
356 log::debug!("zigzag output: {}", compressed.tree_display());
357
358 Ok(ZigZagArray::try_new(compressed)?.into_array())
359 }
360}
361
362impl Scheme for BitPackingScheme {
363 type StatsType = IntegerStats;
364 type CodeType = IntCode;
365
366 fn code(&self) -> IntCode {
367 BITPACKING_SCHEME
368 }
369
370 #[allow(clippy::cast_possible_truncation)]
371 fn expected_compression_ratio(
372 &self,
373 stats: &IntegerStats,
374 is_sample: bool,
375 allowed_cascading: usize,
376 excludes: &[IntCode],
377 ) -> VortexResult<f64> {
378 if stats.typed.min_is_negative() {
380 return Ok(0.0);
381 }
382
383 if stats.value_count == 0 {
385 return Ok(0.0);
386 }
387
388 estimate_compression_ratio_with_sampling(
389 self,
390 stats,
391 is_sample,
392 allowed_cascading,
393 excludes,
394 )
395 }
396
397 #[allow(clippy::cast_possible_truncation)]
398 fn compress(
399 &self,
400 stats: &IntegerStats,
401 _is_sample: bool,
402 _allowed_cascading: usize,
403 _excludes: &[IntCode],
404 ) -> VortexResult<ArrayRef> {
405 let bw = find_best_bit_width(stats.source())?;
406 if bw as usize == stats.source().ptype().bit_width() {
408 return Ok(stats.source().clone().into_array());
409 }
410 let mut packed = bitpack_encode(stats.source(), bw)?;
411
412 let patches = packed.patches().map(compress_patches).transpose()?;
413 packed.replace_patches(patches);
414
415 Ok(packed.into_array())
416 }
417}
418
419impl Scheme for SparseScheme {
420 type StatsType = IntegerStats;
421 type CodeType = IntCode;
422
423 fn code(&self) -> IntCode {
424 SPARSE_SCHEME
425 }
426
427 fn expected_compression_ratio(
429 &self,
430 stats: &IntegerStats,
431 _is_sample: bool,
432 _allowed_cascading: usize,
433 _excludes: &[IntCode],
434 ) -> VortexResult<f64> {
435 if stats.value_count == 0 {
436 return Ok(0.0);
438 }
439
440 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
442 return Ok(stats.src.len() as f64 / stats.value_count as f64);
443 }
444
445 let (_, top_count) = stats.typed.top_value_and_count();
447
448 if top_count == stats.value_count {
449 return Ok(0.0);
451 }
452
453 let freq = top_count as f64 / stats.value_count as f64;
454 if freq >= 0.9 {
455 return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
457 }
458
459 Ok(0.0)
460 }
461
462 fn compress(
463 &self,
464 stats: &IntegerStats,
465 is_sample: bool,
466 allowed_cascading: usize,
467 excludes: &[IntCode],
468 ) -> VortexResult<ArrayRef> {
469 assert!(allowed_cascading > 0);
470 let mask = stats.src.validity().to_logical(stats.src.len())?;
471
472 if mask.all_false() {
473 return Ok(ConstantArray::new(
475 Scalar::null(stats.source().dtype().clone()),
476 stats.src.len(),
477 )
478 .into_array());
479 } else if mask.false_count() as f64 > (0.9 * mask.len() as f64) {
480 let non_null = mask.not();
482 let non_null_values = filter(stats.source(), &non_null)?;
483 let non_null_indices = match non_null.indices() {
484 AllOr::All => {
485 unreachable!()
487 }
488 AllOr::None => {
489 unreachable!()
491 }
492 AllOr::Some(values) => {
493 let buffer: Buffer<u32> = values
494 .iter()
495 .map(|&v| v.try_into().vortex_expect("indices must fit in u32"))
496 .collect();
497
498 buffer.into_array()
499 }
500 };
501
502 return Ok(SparseArray::try_new(
503 non_null_indices,
504 non_null_values,
505 stats.src.len(),
506 Scalar::null(stats.source().dtype().clone()),
507 )?
508 .into_array());
509 }
510
511 let (top_pvalue, top_count) = stats.typed.top_value_and_count();
513
514 if top_count == (stats.value_count + stats.null_count) {
515 return Ok(ConstantArray::new(
517 Scalar::primitive_value(
518 top_pvalue,
519 top_pvalue.ptype(),
520 stats.src.dtype().nullability(),
521 ),
522 stats.src.len(),
523 )
524 .into_array());
525 }
526
527 let non_top_mask = match_each_integer_ptype!(stats.src.ptype(), |$T| {
528 let buffer = stats.src.buffer::<$T>();
529 let top_value: $T = top_pvalue.as_primitive::<$T>().vortex_expect("top value");
530 value_indices(top_value, buffer.as_ref(), &mask)
531 });
532
533 let non_top_values = filter(&stats.src, &non_top_mask)?.to_primitive()?;
534
535 let mut new_excludes = vec![SparseScheme.code()];
537 new_excludes.extend_from_slice(excludes);
538
539 let compressed_values = IntCompressor::compress_no_dict(
540 &non_top_values,
541 is_sample,
542 allowed_cascading - 1,
543 &new_excludes,
544 )?;
545
546 let indices: Buffer<u64> = match non_top_mask {
548 Mask::AllTrue(count) => {
549 (0u64..count as u64).collect()
551 }
552 Mask::AllFalse(_) => {
553 Buffer::empty()
555 }
556 Mask::Values(values) => values.indices().iter().map(|v| *v as u64).collect(),
557 };
558
559 let indices = downscale_integer_array(indices.into_array())?.to_primitive()?;
560
561 let compressed_indices = IntCompressor::compress_no_dict(
562 &indices,
563 is_sample,
564 allowed_cascading - 1,
565 &new_excludes,
566 )?;
567
568 Ok(SparseArray::try_new(
569 compressed_indices,
570 compressed_values,
571 stats.src.len(),
572 Scalar::primitive_value(
573 top_pvalue,
574 top_pvalue.ptype(),
575 stats.src.dtype().nullability(),
576 ),
577 )?
578 .into_array())
579 }
580}
581
582fn value_indices<T: PrimInt + Hash + Into<Scalar>>(
583 top_value: T,
584 values: &[T],
585 validity: &Mask,
586) -> Mask {
587 let mut buffer = BooleanBufferBuilder::new(values.len());
589 for (idx, &value) in values.iter().enumerate() {
590 buffer.append(validity.value(idx) && top_value != value);
591 }
592
593 Mask::from_buffer(buffer.finish())
594}
595
596impl Scheme for DictScheme {
597 type StatsType = IntegerStats;
598 type CodeType = IntCode;
599
600 fn code(&self) -> IntCode {
601 DICT_SCHEME
602 }
603
604 fn expected_compression_ratio(
605 &self,
606 stats: &IntegerStats,
607 _is_sample: bool,
608 allowed_cascading: usize,
609 _excludes: &[IntCode],
610 ) -> VortexResult<f64> {
611 if allowed_cascading == 0 {
613 return Ok(0.0);
614 }
615
616 if stats.value_count == 0 {
617 return Ok(0.0);
618 }
619
620 if stats.distinct_values_count > stats.value_count / 2 {
622 return Ok(0.0);
623 }
624
625 let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
627
628 let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
630
631 let n_runs = stats.value_count / stats.average_run_length;
632
633 let codes_size_bp = (codes_bw * stats.value_count) as usize;
635 let codes_size_rle_bp = (codes_bw + 32) * n_runs;
636
637 let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
638
639 let before = stats.value_count as usize * stats.source().ptype().bit_width();
640
641 Ok(before as f64 / (values_size + codes_size) as f64)
642 }
643
644 fn compress(
645 &self,
646 stats: &IntegerStats,
647 is_sample: bool,
648 allowed_cascading: usize,
649 excludes: &[IntCode],
650 ) -> VortexResult<ArrayRef> {
651 assert!(allowed_cascading > 0);
652
653 let dict = dictionary_encode(stats)?;
657
658 let mut new_excludes = vec![DICT_SCHEME];
660 new_excludes.extend_from_slice(excludes);
661
662 let compressed_codes = IntCompressor::compress_no_dict(
663 &dict.codes().to_primitive()?,
664 is_sample,
665 allowed_cascading - 1,
666 &new_excludes,
667 )?;
668
669 Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
670 }
671}
672
673impl Scheme for RunEndScheme {
674 type StatsType = IntegerStats;
675 type CodeType = IntCode;
676
677 fn code(&self) -> IntCode {
678 RUNEND_SCHEME
679 }
680
681 fn expected_compression_ratio(
682 &self,
683 stats: &IntegerStats,
684 is_sample: bool,
685 allowed_cascading: usize,
686 excludes: &[IntCode],
687 ) -> VortexResult<f64> {
688 if stats.average_run_length < RUN_END_THRESHOLD {
690 return Ok(0.0);
691 }
692
693 if allowed_cascading == 0 {
694 return Ok(0.0);
695 }
696
697 estimate_compression_ratio_with_sampling(
699 self,
700 stats,
701 is_sample,
702 allowed_cascading,
703 excludes,
704 )
705 }
706
707 fn compress(
708 &self,
709 stats: &IntegerStats,
710 is_sample: bool,
711 allowed_cascading: usize,
712 excludes: &[IntCode],
713 ) -> VortexResult<ArrayRef> {
714 assert!(allowed_cascading > 0);
715
716 let (ends, values) = runend_encode(&stats.src)?;
718
719 let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
720 new_excludes.extend_from_slice(excludes);
721
722 let ends_stats = IntegerStats::generate_opts(
723 &ends,
724 GenerateStatsOptions {
725 count_distinct_values: false,
726 },
727 );
728 let ends_scheme = IntCompressor::choose_scheme(
729 &ends_stats,
730 is_sample,
731 allowed_cascading - 1,
732 &new_excludes,
733 )?;
734 let compressed_ends =
735 ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
736
737 let compressed_values = IntCompressor::compress_no_dict(
738 &values.to_primitive()?,
739 is_sample,
740 allowed_cascading - 1,
741 &new_excludes,
742 )?;
743
744 Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
745 }
746}
747
748#[cfg(test)]
749mod tests {
750 use itertools::Itertools;
751 use log::LevelFilter;
752 use rand::rngs::StdRng;
753 use rand::{RngCore, SeedableRng};
754 use vortex_array::aliases::hash_set::HashSet;
755 use vortex_array::arrays::PrimitiveArray;
756 use vortex_array::validity::Validity;
757 use vortex_array::{IntoArray, ToCanonical};
758 use vortex_buffer::{Buffer, BufferMut, buffer_mut};
759
760 use crate::Compressor;
761 use crate::integer::IntCompressor;
762
763 #[test]
764 fn test_empty() {
765 let result = IntCompressor::compress(
767 &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
768 false,
769 3,
770 &[],
771 )
772 .unwrap();
773
774 assert!(result.is_empty());
775 }
776
777 #[test]
778 fn test_dict_encodable() {
779 let mut codes = BufferMut::<i32>::with_capacity(65_535);
780 let numbers = [0, 10, 50, 100, 1000, 3000]
784 .into_iter()
785 .map(|i| 1234 * i)
786 .collect_vec();
787
788 let mut rng = StdRng::seed_from_u64(1u64);
789 while codes.len() < 64000 {
790 let run_length = rng.next_u32() % 5;
791 let value = numbers[rng.next_u32() as usize % numbers.len()];
792 for _ in 0..run_length {
793 codes.push(value);
794 }
795 }
796
797 let primitive = codes.freeze().into_array().to_primitive().unwrap();
798 let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
799 log::info!("compressed values: {}", compressed.tree_display());
800 }
801
802 #[test]
803 fn test_window_name() {
804 env_logger::builder()
805 .filter(None, LevelFilter::Debug)
806 .try_init()
807 .ok();
808
809 let mut values = buffer_mut![-1i32; 1_000_000];
811 let mut visited = HashSet::new();
812 let mut rng = StdRng::seed_from_u64(1u64);
813 while visited.len() < 223 {
814 let random = (rng.next_u32() as usize) % 1_000_000;
815 if visited.contains(&random) {
816 continue;
817 }
818 visited.insert(random);
819 values[random] = 5 * (rng.next_u64() % 100) as i32;
821 }
822
823 let array = values.freeze().into_array().to_primitive().unwrap();
824 let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
825 log::info!("WindowName compressed: {}", compressed.tree_display());
826 }
827}