vortex_sampling_compressor/compressors/
chunked.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use vortex_array::aliases::hash_set::HashSet;
5use vortex_array::array::{ChunkedArray, ChunkedEncoding};
6use vortex_array::compress::compute_precompression_stats;
7use vortex_array::{Array, Encoding, EncodingId, IntoArray};
8use vortex_error::{vortex_bail, VortexExpect, VortexResult};
9
10use super::EncoderMetadata;
11use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
12use crate::{constants, SamplingCompressor};
13
14#[derive(Debug)]
15pub struct ChunkedCompressor {
16    relatively_good_ratio: f32,
17}
18
19pub const DEFAULT_CHUNKED_COMPRESSOR: ChunkedCompressor = ChunkedCompressor {
20    relatively_good_ratio: 1.2,
21};
22
23pub struct ChunkedCompressorMetadata(Option<f32>);
24
25impl EncoderMetadata for ChunkedCompressorMetadata {
26    fn as_any(&self) -> &dyn Any {
27        self
28    }
29}
30
31impl EncodingCompressor for ChunkedCompressor {
32    fn id(&self) -> &str {
33        ChunkedEncoding::ID.as_ref()
34    }
35
36    fn cost(&self) -> u8 {
37        constants::CHUNKED_COST
38    }
39
40    fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
41        array.is_encoding(ChunkedEncoding::ID).then_some(self)
42    }
43
44    fn compress<'a>(
45        &'a self,
46        array: &Array,
47        like: Option<CompressionTree<'a>>,
48        ctx: SamplingCompressor<'a>,
49    ) -> VortexResult<CompressedArray<'a>> {
50        let chunked_array = ChunkedArray::try_from(array.clone())?;
51        self.compress_chunked(&chunked_array, like, ctx)
52    }
53
54    fn used_encodings(&self) -> HashSet<EncodingId> {
55        HashSet::from([])
56    }
57}
58
59impl ChunkedCompressor {
60    /// How far the compression ratio is allowed to grow from one chunk to another chunk.
61    ///
62    /// As long as a compressor compresses subsequent chunks "reasonably well" we should continue to
63    /// use it, which saves us the cost of searching for a good compressor. This constant quantifies
64    /// "reasonably well" as
65    ///
66    /// ```text
67    /// new_ratio <= old_ratio * self.relatively_good_ratio
68    /// ```
69    fn relatively_good_ratio(&self) -> f32 {
70        self.relatively_good_ratio
71    }
72
73    fn compress_chunked<'a>(
74        &'a self,
75        array: &ChunkedArray,
76        like: Option<CompressionTree<'a>>,
77        ctx: SamplingCompressor<'a>,
78    ) -> VortexResult<CompressedArray<'a>> {
79        let less_chunked = array.rechunk(
80            ctx.options().target_block_bytesize,
81            ctx.options().target_block_size,
82        )?;
83
84        let mut previous = like_into_parts(like)?;
85        let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks());
86        let mut compressed_trees = Vec::with_capacity(less_chunked.nchunks() + 1);
87        compressed_trees.push(None); // for the chunk offsets
88
89        for (index, chunk) in less_chunked.chunks().enumerate() {
90            // these are extremely valuable when reading/writing, but are potentially much more expensive
91            // to compute post-compression. That's because not all encodings implement stats, so we would
92            // potentially have to canonicalize during writes just to get stats, which would be silly.
93            // Also, we only really require them for column chunks, not for every array.
94            compute_precompression_stats(&chunk)?;
95
96            let like = previous.as_ref().map(|(like, _)| like);
97            let (compressed_chunk, tree) = ctx
98                .named(&format!("chunk-{}", index))
99                .compress(&chunk, like)?
100                .into_parts();
101
102            let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
103            let exceeded_target_ratio = previous
104                .as_ref()
105                .map(|(_, target_ratio)| ratio > target_ratio * self.relatively_good_ratio())
106                .unwrap_or(false);
107
108            if ratio > 1.0 || exceeded_target_ratio {
109                log::debug!("unsatisfactory ratio {}, previous: {:?}", ratio, previous);
110                let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts();
111                let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
112
113                compressed_chunks.push(compressed_chunk);
114                compressed_trees.push(tree.clone());
115                previous = tree.map(|tree| (tree, new_ratio));
116            } else {
117                compressed_chunks.push(compressed_chunk);
118                compressed_trees.push(tree.clone());
119                previous = previous.or_else(|| tree.map(|tree| (tree, ratio)));
120            }
121        }
122
123        let ratio = previous.map(|(_, ratio)| ratio);
124        Ok(CompressedArray::compressed(
125            ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(),
126            Some(CompressionTree::new_with_metadata(
127                self,
128                compressed_trees,
129                Arc::new(ChunkedCompressorMetadata(ratio)),
130            )),
131            array,
132        ))
133    }
134}
135
136fn like_into_parts(
137    tree: Option<CompressionTree<'_>>,
138) -> VortexResult<Option<(CompressionTree<'_>, f32)>> {
139    let (_, mut children, metadata) = match tree {
140        None => return Ok(None),
141        Some(tree) => tree.into_parts(),
142    };
143
144    // must have one for the chunk offsets and one per chunk (and at least one chunk!)
145    if children.len() < 2 {
146        vortex_bail!("Chunked array compression tree must have at least two children")
147    }
148
149    // since we compress sequentially, we take the last child as the previous (and thus presumably most-similar) chunk
150    let latest_child = children
151        .pop()
152        .vortex_expect("Unreachable: tree must have at least two children");
153
154    let Some(target_ratio) = metadata else {
155        vortex_bail!("Chunked array compression tree must have metadata")
156    };
157    let Some(ChunkedCompressorMetadata(target_ratio)) =
158        target_ratio.as_ref().as_any().downcast_ref()
159    else {
160        vortex_bail!("Chunked array compression tree must be ChunkedCompressorMetadata")
161    };
162
163    match (latest_child, target_ratio) {
164        (None, None) => Ok(None),
165        (Some(child), Some(ratio)) => Ok(Some((child, *ratio))),
166        (..) => vortex_bail!("Chunked array compression tree must have a child iff it has a ratio"),
167    }
168}