vortex_sampling_compressor/compressors/
runend.rs

1use vortex_array::aliases::hash_set::HashSet;
2use vortex_array::array::PrimitiveEncoding;
3use vortex_array::{Array, Encoding, EncodingId, IntoArray, IntoArrayVariant};
4use vortex_error::VortexResult;
5use vortex_runend::compress::runend_encode;
6use vortex_runend::{RunEndArray, RunEndEncoding};
7
8use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
9use crate::downscale::downscale_integer_array;
10use crate::{constants, SamplingCompressor};
11
12pub const DEFAULT_RUN_END_COMPRESSOR: RunEndCompressor = RunEndCompressor { ree_threshold: 2.0 };
13
14#[derive(Debug, Clone, Copy)]
15pub struct RunEndCompressor {
16    ree_threshold: f32,
17}
18
19impl EncodingCompressor for RunEndCompressor {
20    fn id(&self) -> &str {
21        RunEndEncoding::ID.as_ref()
22    }
23
24    fn cost(&self) -> u8 {
25        constants::RUN_END_COST
26    }
27
28    fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
29        if !array.is_encoding(PrimitiveEncoding::ID) {
30            return None;
31        }
32
33        let avg_run_length = array.len() as f32
34            / array
35                .statistics()
36                .compute_run_count()
37                .unwrap_or(array.len()) as f32;
38        if avg_run_length < self.ree_threshold {
39            return None;
40        }
41
42        Some(self)
43    }
44
45    fn compress<'a>(
46        &'a self,
47        array: &Array,
48        like: Option<CompressionTree<'a>>,
49        ctx: SamplingCompressor<'a>,
50    ) -> VortexResult<CompressedArray<'a>> {
51        let primitive_array = array.clone().into_primitive()?;
52        let (ends, values) = runend_encode(&primitive_array)?;
53        let ends = downscale_integer_array(ends.into_array())?.into_primitive()?;
54
55        let compressed_ends = ctx
56            .auxiliary("ends")
57            .compress(&ends.into_array(), like.as_ref().and_then(|l| l.child(0)))?;
58        let compressed_values = ctx
59            .named("values")
60            .excluding(self)
61            .compress(&values, like.as_ref().and_then(|l| l.child(1)))?;
62
63        Ok(CompressedArray::compressed(
64            RunEndArray::try_new(compressed_ends.array, compressed_values.array)
65                .map(|a| a.into_array())?,
66            Some(CompressionTree::new(
67                self,
68                vec![compressed_ends.path, compressed_values.path],
69            )),
70            array,
71        ))
72    }
73
74    fn used_encodings(&self) -> HashSet<EncodingId> {
75        HashSet::from([RunEndEncoding::ID])
76    }
77}