vortex_array/
compress.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use vortex_error::VortexResult;

use crate::aliases::hash_set::HashSet;
use crate::stats::PRUNING_STATS;
use crate::{Array, EncodingId};

pub trait CompressionStrategy {
    fn compress(&self, array: &Array) -> VortexResult<Array>;

    fn used_encodings(&self) -> HashSet<EncodingId>;
}

/// Check that compression did not alter the length of the validity array.
pub fn check_validity_unchanged(arr: &Array, compressed: &Array) {
    let _ = arr;
    let _ = compressed;
    #[cfg(debug_assertions)]
    {
        use vortex_error::VortexExpect;

        let old_validity = arr
            .logical_validity()
            .vortex_expect("failed to compute validity")
            .len();
        let new_validity = compressed
            .logical_validity()
            .vortex_expect("failed to compute validity ")
            .len();

        debug_assert!(
            old_validity == new_validity,
            "validity length changed after compression: {old_validity} -> {new_validity}\n From tree {} To tree {}\n",
            arr.tree_display(),
            compressed.tree_display()
        );
    }
}

/// Check that compression did not alter the dtype
pub fn check_dtype_unchanged(arr: &Array, compressed: &Array) {
    let _ = arr;
    let _ = compressed;
    #[cfg(debug_assertions)]
    {
        debug_assert!(
            arr.dtype() == compressed.dtype(),
            "Compression changed dtype: {} -> {}\nFrom array: {}Into array {}",
            arr.dtype(),
            compressed.dtype(),
            arr.tree_display(),
            compressed.tree_display(),
        );
    }
}

// Check that compression preserved the statistics.
pub fn check_statistics_unchanged(arr: &Array, compressed: &Array) {
    let _ = arr;
    let _ = compressed;
    #[cfg(debug_assertions)]
    {
        use vortex_scalar::Scalar;

        use crate::stats::Stat;

        // Run count merge_ordered assumes that the run is "broken" on each chunk, which is a useful estimate but not guaranteed to be correct.
        for (stat, value) in arr
            .statistics()
            .to_set()
            .into_iter()
            .filter(|(stat, _)| *stat != Stat::RunCount)
        {
            let compressed_scalar = compressed
                .statistics()
                .get(stat)
                .map(|sv| Scalar::new(stat.dtype(compressed.dtype()), sv));
            debug_assert_eq!(
                compressed_scalar,
                Some(Scalar::new(stat.dtype(arr.dtype()), value.clone())),
                "Compression changed {stat} from {value} to {}",
                compressed_scalar
                    .as_ref()
                    .map(|s| s.to_string())
                    .unwrap_or_else(|| "null".to_string()),
            );
        }
    }
}

/// Eagerly compute certain statistics (i.e., pruning stats plus UncompressedSizeInBytes) for an array.
/// This function is intended to be called in compressors, immediately before compression occurs.
pub fn compute_precompression_stats(arr: &Array) -> VortexResult<()> {
    arr.statistics().compute_uncompressed_size_in_bytes();
    arr.statistics().compute_all(PRUNING_STATS).map(|_| ())
}