vortex_sampling_compressor/compressors/
chunked.rs1use std::any::Any;
2use std::sync::Arc;
3
4use vortex_array::aliases::hash_set::HashSet;
5use vortex_array::array::{ChunkedArray, ChunkedEncoding};
6use vortex_array::compress::compute_precompression_stats;
7use vortex_array::{Array, Encoding, EncodingId, IntoArray};
8use vortex_error::{vortex_bail, VortexExpect, VortexResult};
9
10use super::EncoderMetadata;
11use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
12use crate::{constants, SamplingCompressor};
13
14#[derive(Debug)]
15pub struct ChunkedCompressor {
16 relatively_good_ratio: f32,
17}
18
19pub const DEFAULT_CHUNKED_COMPRESSOR: ChunkedCompressor = ChunkedCompressor {
20 relatively_good_ratio: 1.2,
21};
22
23pub struct ChunkedCompressorMetadata(Option<f32>);
24
25impl EncoderMetadata for ChunkedCompressorMetadata {
26 fn as_any(&self) -> &dyn Any {
27 self
28 }
29}
30
31impl EncodingCompressor for ChunkedCompressor {
32 fn id(&self) -> &str {
33 ChunkedEncoding::ID.as_ref()
34 }
35
36 fn cost(&self) -> u8 {
37 constants::CHUNKED_COST
38 }
39
40 fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
41 array.is_encoding(ChunkedEncoding::ID).then_some(self)
42 }
43
44 fn compress<'a>(
45 &'a self,
46 array: &Array,
47 like: Option<CompressionTree<'a>>,
48 ctx: SamplingCompressor<'a>,
49 ) -> VortexResult<CompressedArray<'a>> {
50 let chunked_array = ChunkedArray::try_from(array.clone())?;
51 self.compress_chunked(&chunked_array, like, ctx)
52 }
53
54 fn used_encodings(&self) -> HashSet<EncodingId> {
55 HashSet::from([])
56 }
57}
58
59impl ChunkedCompressor {
60 fn relatively_good_ratio(&self) -> f32 {
70 self.relatively_good_ratio
71 }
72
73 fn compress_chunked<'a>(
74 &'a self,
75 array: &ChunkedArray,
76 like: Option<CompressionTree<'a>>,
77 ctx: SamplingCompressor<'a>,
78 ) -> VortexResult<CompressedArray<'a>> {
79 let less_chunked = array.rechunk(
80 ctx.options().target_block_bytesize,
81 ctx.options().target_block_size,
82 )?;
83
84 let mut previous = like_into_parts(like)?;
85 let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks());
86 let mut compressed_trees = Vec::with_capacity(less_chunked.nchunks() + 1);
87 compressed_trees.push(None); for (index, chunk) in less_chunked.chunks().enumerate() {
90 compute_precompression_stats(&chunk)?;
95
96 let like = previous.as_ref().map(|(like, _)| like);
97 let (compressed_chunk, tree) = ctx
98 .named(&format!("chunk-{}", index))
99 .compress(&chunk, like)?
100 .into_parts();
101
102 let ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
103 let exceeded_target_ratio = previous
104 .as_ref()
105 .map(|(_, target_ratio)| ratio > target_ratio * self.relatively_good_ratio())
106 .unwrap_or(false);
107
108 if ratio > 1.0 || exceeded_target_ratio {
109 log::debug!("unsatisfactory ratio {}, previous: {:?}", ratio, previous);
110 let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts();
111 let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);
112
113 compressed_chunks.push(compressed_chunk);
114 compressed_trees.push(tree.clone());
115 previous = tree.map(|tree| (tree, new_ratio));
116 } else {
117 compressed_chunks.push(compressed_chunk);
118 compressed_trees.push(tree.clone());
119 previous = previous.or_else(|| tree.map(|tree| (tree, ratio)));
120 }
121 }
122
123 let ratio = previous.map(|(_, ratio)| ratio);
124 Ok(CompressedArray::compressed(
125 ChunkedArray::try_new(compressed_chunks, array.dtype().clone())?.into_array(),
126 Some(CompressionTree::new_with_metadata(
127 self,
128 compressed_trees,
129 Arc::new(ChunkedCompressorMetadata(ratio)),
130 )),
131 array,
132 ))
133 }
134}
135
136fn like_into_parts(
137 tree: Option<CompressionTree<'_>>,
138) -> VortexResult<Option<(CompressionTree<'_>, f32)>> {
139 let (_, mut children, metadata) = match tree {
140 None => return Ok(None),
141 Some(tree) => tree.into_parts(),
142 };
143
144 if children.len() < 2 {
146 vortex_bail!("Chunked array compression tree must have at least two children")
147 }
148
149 let latest_child = children
151 .pop()
152 .vortex_expect("Unreachable: tree must have at least two children");
153
154 let Some(target_ratio) = metadata else {
155 vortex_bail!("Chunked array compression tree must have metadata")
156 };
157 let Some(ChunkedCompressorMetadata(target_ratio)) =
158 target_ratio.as_ref().as_any().downcast_ref()
159 else {
160 vortex_bail!("Chunked array compression tree must be ChunkedCompressorMetadata")
161 };
162
163 match (latest_child, target_ratio) {
164 (None, None) => Ok(None),
165 (Some(child), Some(ratio)) => Ok(Some((child, *ratio))),
166 (..) => vortex_bail!("Chunked array compression tree must have a child iff it has a ratio"),
167 }
168}