1pub mod dictionary;
2mod stats;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7pub use stats::IntegerStats;
8use vortex_array::arrays::{ConstantArray, PrimitiveArray, PrimitiveVTable};
9use vortex_array::compress::downscale_integer_array;
10use vortex_array::{ArrayRef, IntoArray, ToCanonical};
11use vortex_dict::DictArray;
12use vortex_error::{VortexExpect, VortexResult, VortexUnwrap};
13use vortex_fastlanes::{FoRArray, bit_width_histogram, bitpack_encode, find_best_bit_width};
14use vortex_runend::RunEndArray;
15use vortex_runend::compress::runend_encode;
16use vortex_scalar::Scalar;
17use vortex_sparse::{SparseArray, SparseVTable};
18use vortex_zigzag::{ZigZagArray, zigzag_encode};
19
20use crate::integer::dictionary::dictionary_encode;
21use crate::patches::compress_patches;
22use crate::{
23 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
24 estimate_compression_ratio_with_sampling,
25};
26
27pub struct IntCompressor;
28
29impl Compressor for IntCompressor {
30 type ArrayVTable = PrimitiveVTable;
31 type SchemeType = dyn IntegerScheme;
32 type StatsType = IntegerStats;
33
34 fn schemes() -> &'static [&'static dyn IntegerScheme] {
35 &[
36 &ConstantScheme,
37 &FORScheme,
38 &ZigZagScheme,
39 &BitPackingScheme,
40 &SparseScheme,
41 &DictScheme,
42 &RunEndScheme,
43 ]
44 }
45
46 fn default_scheme() -> &'static Self::SchemeType {
47 &UncompressedScheme
48 }
49
50 fn dict_scheme_code() -> IntCode {
51 DICT_SCHEME
52 }
53}
54
55impl IntCompressor {
56 pub fn compress_no_dict(
57 array: &PrimitiveArray,
58 is_sample: bool,
59 allowed_cascading: usize,
60 excludes: &[IntCode],
61 ) -> VortexResult<ArrayRef> {
62 let stats = IntegerStats::generate_opts(
63 array,
64 GenerateStatsOptions {
65 count_distinct_values: false,
66 },
67 );
68
69 let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
70 let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
71
72 if output.nbytes() < array.nbytes() {
73 Ok(output)
74 } else {
75 log::debug!("resulting tree too large: {}", output.tree_display());
76 Ok(array.to_array())
77 }
78 }
79}
80
81pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
82
83impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
85
86#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
87pub struct IntCode(u8);
88
89const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
90const CONSTANT_SCHEME: IntCode = IntCode(1);
91const FOR_SCHEME: IntCode = IntCode(2);
92const ZIGZAG_SCHEME: IntCode = IntCode(3);
93const BITPACKING_SCHEME: IntCode = IntCode(4);
94const SPARSE_SCHEME: IntCode = IntCode(5);
95const DICT_SCHEME: IntCode = IntCode(6);
96const RUNEND_SCHEME: IntCode = IntCode(7);
97
98#[derive(Debug, Copy, Clone)]
99pub struct UncompressedScheme;
100
101#[derive(Debug, Copy, Clone)]
102pub struct ConstantScheme;
103
104#[derive(Debug, Copy, Clone)]
105pub struct FORScheme;
106
107#[derive(Debug, Copy, Clone)]
108pub struct ZigZagScheme;
109
110#[derive(Debug, Copy, Clone)]
111pub struct BitPackingScheme;
112
113#[derive(Debug, Copy, Clone)]
114pub struct SparseScheme;
115
116#[derive(Debug, Copy, Clone)]
117pub struct DictScheme;
118
119#[derive(Debug, Copy, Clone)]
120pub struct RunEndScheme;
121
122const RUN_END_THRESHOLD: u32 = 4;
124
125impl Scheme for UncompressedScheme {
126 type StatsType = IntegerStats;
127 type CodeType = IntCode;
128
129 fn code(&self) -> IntCode {
130 UNCOMPRESSED_SCHEME
131 }
132
133 fn expected_compression_ratio(
134 &self,
135 _stats: &IntegerStats,
136 _is_sample: bool,
137 _allowed_cascading: usize,
138 _excludes: &[IntCode],
139 ) -> VortexResult<f64> {
140 Ok(1.0)
142 }
143
144 fn compress(
145 &self,
146 stats: &IntegerStats,
147 _is_sample: bool,
148 _allowed_cascading: usize,
149 _excludes: &[IntCode],
150 ) -> VortexResult<ArrayRef> {
151 Ok(stats.source().to_array())
152 }
153}
154
155impl Scheme for ConstantScheme {
156 type StatsType = IntegerStats;
157 type CodeType = IntCode;
158
159 fn code(&self) -> IntCode {
160 CONSTANT_SCHEME
161 }
162
163 fn is_constant(&self) -> bool {
164 true
165 }
166
167 fn expected_compression_ratio(
168 &self,
169 stats: &IntegerStats,
170 is_sample: bool,
171 _allowed_cascading: usize,
172 _excludes: &[IntCode],
173 ) -> VortexResult<f64> {
174 if is_sample {
176 return Ok(0.0);
177 }
178
179 if stats.distinct_values_count != 1 {
181 return Ok(0.0);
182 }
183
184 if stats.null_count > 0 && stats.value_count > 0 {
186 return Ok(0.0);
187 }
188
189 Ok(stats.value_count as f64)
190 }
191
192 fn compress(
193 &self,
194 stats: &IntegerStats,
195 _is_sample: bool,
196 _allowed_cascading: usize,
197 _excludes: &[IntCode],
198 ) -> VortexResult<ArrayRef> {
199 let scalar = stats
202 .source()
203 .as_constant()
204 .vortex_expect("constant array expected");
205
206 Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
207 }
208}
209
210impl Scheme for FORScheme {
211 type StatsType = IntegerStats;
212 type CodeType = IntCode;
213
214 fn code(&self) -> IntCode {
215 FOR_SCHEME
216 }
217
218 fn expected_compression_ratio(
219 &self,
220 stats: &IntegerStats,
221 _is_sample: bool,
222 allowed_cascading: usize,
223 _excludes: &[IntCode],
224 ) -> VortexResult<f64> {
225 if allowed_cascading == 0 {
227 return Ok(0.0);
228 }
229
230 if stats.value_count == 0 {
232 return Ok(0.0);
233 }
234
235 if stats.typed.min_is_zero() {
237 return Ok(0.0);
238 }
239
240 let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
242 let bw = match stats.typed.max_minus_min().checked_ilog2() {
243 Some(l) => l + 1,
244 None => return Ok(0.0),
247 };
248
249 if full_width - bw < 8 {
251 return Ok(0.0);
252 }
253
254 Ok(full_width as f64 / bw as f64)
255 }
256
257 fn compress(
258 &self,
259 stats: &IntegerStats,
260 is_sample: bool,
261 _allowed_cascading: usize,
262 excludes: &[IntCode],
263 ) -> VortexResult<ArrayRef> {
264 let for_array = FoRArray::encode(stats.src.clone())?;
265 let biased = for_array.encoded().to_primitive()?;
266 let biased_stats = IntegerStats::generate_opts(
267 &biased,
268 GenerateStatsOptions {
269 count_distinct_values: false,
270 },
271 );
272
273 let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
278
279 Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
280 }
281}
282
283impl Scheme for ZigZagScheme {
284 type StatsType = IntegerStats;
285 type CodeType = IntCode;
286
287 fn code(&self) -> IntCode {
288 ZIGZAG_SCHEME
289 }
290
291 fn expected_compression_ratio(
292 &self,
293 stats: &IntegerStats,
294 is_sample: bool,
295 allowed_cascading: usize,
296 excludes: &[IntCode],
297 ) -> VortexResult<f64> {
298 if allowed_cascading == 0 {
300 return Ok(0.0);
301 }
302
303 if stats.value_count == 0 {
305 return Ok(0.0);
306 }
307
308 if !stats.typed.min_is_negative() {
310 return Ok(0.0);
311 }
312
313 estimate_compression_ratio_with_sampling(
315 self,
316 stats,
317 is_sample,
318 allowed_cascading,
319 excludes,
320 )
321 }
322
323 fn compress(
324 &self,
325 stats: &IntegerStats,
326 is_sample: bool,
327 allowed_cascading: usize,
328 excludes: &[IntCode],
329 ) -> VortexResult<ArrayRef> {
330 let zag = zigzag_encode(stats.src.clone())?;
332 let encoded = zag.encoded().to_primitive()?;
333
334 let mut new_excludes = vec![
337 ZigZagScheme.code(),
338 DictScheme.code(),
339 RunEndScheme.code(),
340 SparseScheme.code(),
341 ];
342 new_excludes.extend_from_slice(excludes);
343
344 let compressed =
345 IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
346
347 log::debug!("zigzag output: {}", compressed.tree_display());
348
349 Ok(ZigZagArray::try_new(compressed)?.into_array())
350 }
351}
352
353impl Scheme for BitPackingScheme {
354 type StatsType = IntegerStats;
355 type CodeType = IntCode;
356
357 fn code(&self) -> IntCode {
358 BITPACKING_SCHEME
359 }
360
361 #[allow(clippy::cast_possible_truncation)]
362 fn expected_compression_ratio(
363 &self,
364 stats: &IntegerStats,
365 is_sample: bool,
366 allowed_cascading: usize,
367 excludes: &[IntCode],
368 ) -> VortexResult<f64> {
369 if stats.typed.min_is_negative() {
371 return Ok(0.0);
372 }
373
374 if stats.value_count == 0 {
376 return Ok(0.0);
377 }
378
379 estimate_compression_ratio_with_sampling(
380 self,
381 stats,
382 is_sample,
383 allowed_cascading,
384 excludes,
385 )
386 }
387
388 #[allow(clippy::cast_possible_truncation)]
389 fn compress(
390 &self,
391 stats: &IntegerStats,
392 _is_sample: bool,
393 _allowed_cascading: usize,
394 _excludes: &[IntCode],
395 ) -> VortexResult<ArrayRef> {
396 let histogram = bit_width_histogram(stats.source())?;
397 let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
398 if bw as usize == stats.source().ptype().bit_width() {
400 return Ok(stats.source().clone().into_array());
401 }
402 let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
403
404 let patches = packed.patches().map(compress_patches).transpose()?;
405 packed.replace_patches(patches);
406
407 Ok(packed.into_array())
408 }
409}
410
411impl Scheme for SparseScheme {
412 type StatsType = IntegerStats;
413 type CodeType = IntCode;
414
415 fn code(&self) -> IntCode {
416 SPARSE_SCHEME
417 }
418
419 fn expected_compression_ratio(
421 &self,
422 stats: &IntegerStats,
423 _is_sample: bool,
424 _allowed_cascading: usize,
425 _excludes: &[IntCode],
426 ) -> VortexResult<f64> {
427 if stats.value_count == 0 {
428 return Ok(0.0);
430 }
431
432 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
434 return Ok(stats.src.len() as f64 / stats.value_count as f64);
435 }
436
437 let (_, top_count) = stats.typed.top_value_and_count();
439
440 if top_count == stats.value_count {
441 return Ok(0.0);
443 }
444
445 let freq = top_count as f64 / stats.value_count as f64;
446 if freq >= 0.9 {
447 return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
449 }
450
451 Ok(0.0)
452 }
453
454 fn compress(
455 &self,
456 stats: &IntegerStats,
457 is_sample: bool,
458 allowed_cascading: usize,
459 excludes: &[IntCode],
460 ) -> VortexResult<ArrayRef> {
461 assert!(allowed_cascading > 0);
462 let (top_pvalue, top_count) = stats.typed.top_value_and_count();
463 if top_count as usize == stats.src.len() {
464 return Ok(ConstantArray::new(
466 Scalar::primitive_value(
467 top_pvalue,
468 top_pvalue.ptype(),
469 stats.src.dtype().nullability(),
470 ),
471 stats.src.len(),
472 )
473 .into_array());
474 }
475
476 let sparse_encoded = SparseArray::encode(
477 stats.src.as_ref(),
478 Some(Scalar::primitive_value(
479 top_pvalue,
480 top_pvalue.ptype(),
481 stats.src.dtype().nullability(),
482 )),
483 )?;
484
485 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
486 let mut new_excludes = vec![SparseScheme.code()];
488 new_excludes.extend_from_slice(excludes);
489
490 let compressed_values = IntCompressor::compress_no_dict(
491 &sparse.patches().values().to_primitive()?,
492 is_sample,
493 allowed_cascading - 1,
494 &new_excludes,
495 )?;
496
497 let indices =
498 downscale_integer_array(sparse.patches().indices().clone())?.to_primitive()?;
499
500 let compressed_indices = IntCompressor::compress_no_dict(
501 &indices,
502 is_sample,
503 allowed_cascading - 1,
504 &new_excludes,
505 )?;
506
507 SparseArray::try_new(
508 compressed_indices,
509 compressed_values,
510 sparse.len(),
511 sparse.fill_scalar().clone(),
512 )
513 .map(|a| a.into_array())
514 } else {
515 Ok(sparse_encoded)
516 }
517 }
518}
519
520impl Scheme for DictScheme {
521 type StatsType = IntegerStats;
522 type CodeType = IntCode;
523
524 fn code(&self) -> IntCode {
525 DICT_SCHEME
526 }
527
528 fn expected_compression_ratio(
529 &self,
530 stats: &IntegerStats,
531 _is_sample: bool,
532 allowed_cascading: usize,
533 _excludes: &[IntCode],
534 ) -> VortexResult<f64> {
535 if allowed_cascading == 0 {
537 return Ok(0.0);
538 }
539
540 if stats.value_count == 0 {
541 return Ok(0.0);
542 }
543
544 if stats.distinct_values_count > stats.value_count / 2 {
546 return Ok(0.0);
547 }
548
549 let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
551
552 let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
554
555 let n_runs = stats.value_count / stats.average_run_length;
556
557 let codes_size_bp = (codes_bw * stats.value_count) as usize;
559 let codes_size_rle_bp = (codes_bw + 32) * n_runs;
560
561 let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
562
563 let before = stats.value_count as usize * stats.source().ptype().bit_width();
564
565 Ok(before as f64 / (values_size + codes_size) as f64)
566 }
567
568 fn compress(
569 &self,
570 stats: &IntegerStats,
571 is_sample: bool,
572 allowed_cascading: usize,
573 excludes: &[IntCode],
574 ) -> VortexResult<ArrayRef> {
575 assert!(allowed_cascading > 0);
576
577 let dict = dictionary_encode(stats)?;
581
582 let mut new_excludes = vec![DICT_SCHEME];
584 new_excludes.extend_from_slice(excludes);
585
586 let compressed_codes = IntCompressor::compress_no_dict(
587 &dict.codes().to_primitive()?,
588 is_sample,
589 allowed_cascading - 1,
590 &new_excludes,
591 )?;
592
593 Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
594 }
595}
596
597impl Scheme for RunEndScheme {
598 type StatsType = IntegerStats;
599 type CodeType = IntCode;
600
601 fn code(&self) -> IntCode {
602 RUNEND_SCHEME
603 }
604
605 fn expected_compression_ratio(
606 &self,
607 stats: &IntegerStats,
608 is_sample: bool,
609 allowed_cascading: usize,
610 excludes: &[IntCode],
611 ) -> VortexResult<f64> {
612 if stats.average_run_length < RUN_END_THRESHOLD {
614 return Ok(0.0);
615 }
616
617 if allowed_cascading == 0 {
618 return Ok(0.0);
619 }
620
621 estimate_compression_ratio_with_sampling(
623 self,
624 stats,
625 is_sample,
626 allowed_cascading,
627 excludes,
628 )
629 }
630
631 fn compress(
632 &self,
633 stats: &IntegerStats,
634 is_sample: bool,
635 allowed_cascading: usize,
636 excludes: &[IntCode],
637 ) -> VortexResult<ArrayRef> {
638 assert!(allowed_cascading > 0);
639
640 let (ends, values) = runend_encode(&stats.src)?;
642
643 let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
644 new_excludes.extend_from_slice(excludes);
645
646 let ends_stats = IntegerStats::generate_opts(
647 &ends,
648 GenerateStatsOptions {
649 count_distinct_values: false,
650 },
651 );
652 let ends_scheme = IntCompressor::choose_scheme(
653 &ends_stats,
654 is_sample,
655 allowed_cascading - 1,
656 &new_excludes,
657 )?;
658 let compressed_ends =
659 ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
660
661 let compressed_values = IntCompressor::compress_no_dict(
662 &values.to_primitive()?,
663 is_sample,
664 allowed_cascading - 1,
665 &new_excludes,
666 )?;
667
668 Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use itertools::Itertools;
675 use log::LevelFilter;
676 use rand::rngs::StdRng;
677 use rand::{RngCore, SeedableRng};
678 use vortex_array::aliases::hash_set::HashSet;
679 use vortex_array::arrays::PrimitiveArray;
680 use vortex_array::validity::Validity;
681 use vortex_array::vtable::ValidityHelper;
682 use vortex_array::{Array, IntoArray, ToCanonical};
683 use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
684 use vortex_sparse::SparseEncoding;
685
686 use crate::integer::{IntCompressor, IntegerStats, SparseScheme};
687 use crate::{Compressor, CompressorStats, Scheme};
688
689 #[test]
690 fn test_empty() {
691 let result = IntCompressor::compress(
693 &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
694 false,
695 3,
696 &[],
697 )
698 .unwrap();
699
700 assert!(result.is_empty());
701 }
702
703 #[test]
704 fn test_dict_encodable() {
705 let mut codes = BufferMut::<i32>::with_capacity(65_535);
706 let numbers = [0, 10, 50, 100, 1000, 3000]
710 .into_iter()
711 .map(|i| 1234 * i)
712 .collect_vec();
713
714 let mut rng = StdRng::seed_from_u64(1u64);
715 while codes.len() < 64000 {
716 let run_length = rng.next_u32() % 5;
717 let value = numbers[rng.next_u32() as usize % numbers.len()];
718 for _ in 0..run_length {
719 codes.push(value);
720 }
721 }
722
723 let primitive = codes.freeze().into_array().to_primitive().unwrap();
724 let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
725 log::info!("compressed values: {}", compressed.tree_display());
726 }
727
728 #[test]
729 fn test_window_name() {
730 env_logger::builder()
731 .filter(None, LevelFilter::Debug)
732 .try_init()
733 .ok();
734
735 let mut values = buffer_mut![-1i32; 1_000_000];
737 let mut visited = HashSet::new();
738 let mut rng = StdRng::seed_from_u64(1u64);
739 while visited.len() < 223 {
740 let random = (rng.next_u32() as usize) % 1_000_000;
741 if visited.contains(&random) {
742 continue;
743 }
744 visited.insert(random);
745 values[random] = 5 * (rng.next_u64() % 100) as i32;
747 }
748
749 let array = values.freeze().into_array().to_primitive().unwrap();
750 let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
751 log::info!("WindowName compressed: {}", compressed.tree_display());
752 }
753
754 #[test]
755 fn sparse_with_nulls() {
756 let array = PrimitiveArray::new(
757 buffer![189u8, 189, 189, 0, 46],
758 Validity::from_iter(vec![true, true, true, true, false]),
759 );
760 let compressed = SparseScheme
761 .compress(&IntegerStats::generate(&array), false, 3, &[])
762 .unwrap();
763 assert_eq!(compressed.encoding_id(), SparseEncoding.id());
764 let decoded = compressed.to_primitive().unwrap();
765 let expected = [189u8, 189, 189, 0, 0];
766 assert_eq!(decoded.as_slice::<u8>(), &expected);
767 assert_eq!(decoded.validity(), array.validity());
768 }
769
770 #[test]
771 fn sparse_mostly_nulls() {
772 let array = PrimitiveArray::new(
773 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
774 Validity::from_iter(vec![
775 false, false, false, false, false, false, false, false, false, false, true,
776 ]),
777 );
778 let compressed = SparseScheme
779 .compress(&IntegerStats::generate(&array), false, 3, &[])
780 .unwrap();
781 assert_eq!(compressed.encoding_id(), SparseEncoding.id());
782 let decoded = compressed.to_primitive().unwrap();
783 let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
784 assert_eq!(decoded.as_slice::<u8>(), &expected);
785 assert_eq!(decoded.validity(), array.validity());
786 }
787}