vortex_sampling_compressor/compressors/
mod.rs

1use std::any::Any;
2use std::fmt::{Debug, Display, Formatter};
3use std::hash::{Hash, Hasher};
4use std::sync::Arc;
5
6use itertools::{EitherOrBoth, Itertools};
7use vortex_array::aliases::hash_set::HashSet;
8use vortex_array::tree::TreeFormatter;
9use vortex_array::{Array, EncodingId};
10use vortex_error::{vortex_panic, VortexExpect, VortexResult};
11
12use crate::SamplingCompressor;
13
14pub mod alp;
15pub mod alp_rd;
16pub mod bitpacked;
17pub mod chunked;
18pub mod constant;
19pub mod date_time_parts;
20pub mod delta;
21pub mod dict;
22pub mod r#for;
23pub mod fsst;
24pub mod list;
25pub mod runend;
26pub mod sparse;
27pub mod struct_;
28pub mod varbin;
29pub mod zigzag;
30
31pub trait EncodingCompressor: Sync + Send + Debug {
32    fn id(&self) -> &str;
33
34    fn cost(&self) -> u8;
35
36    fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor>;
37
38    fn compress<'a>(
39        &'a self,
40        array: &Array,
41        like: Option<CompressionTree<'a>>,
42        ctx: SamplingCompressor<'a>,
43    ) -> VortexResult<CompressedArray<'a>>;
44
45    fn used_encodings(&self) -> HashSet<EncodingId>;
46}
47
48pub type CompressorRef<'a> = &'a dyn EncodingCompressor;
49
50impl PartialEq for dyn EncodingCompressor + '_ {
51    fn eq(&self, other: &Self) -> bool {
52        self.id() == other.id()
53    }
54}
55impl Eq for dyn EncodingCompressor + '_ {}
56impl Hash for dyn EncodingCompressor + '_ {
57    fn hash<H: Hasher>(&self, state: &mut H) {
58        self.id().hash(state)
59    }
60}
61
62#[derive(Clone)]
63pub struct CompressionTree<'a> {
64    compressor: &'a dyn EncodingCompressor,
65    children: Vec<Option<CompressionTree<'a>>>,
66    metadata: Option<Arc<dyn EncoderMetadata>>,
67}
68
69impl Debug for CompressionTree<'_> {
70    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71        write!(f, "{self}")
72    }
73}
74
75/// Metadata that can optionally be attached to a compression tree.
76///
77/// This enables codecs to cache trained parameters from the sampling runs to reuse for
78/// the large run.
79pub trait EncoderMetadata {
80    fn as_any(&self) -> &dyn Any;
81}
82
83impl Display for CompressionTree<'_> {
84    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85        let mut fmt = TreeFormatter::new(f, "".to_string());
86        visit_child("root", Some(self), &mut fmt)
87    }
88}
89
90fn visit_child(
91    name: &str,
92    child: Option<&CompressionTree>,
93    fmt: &mut TreeFormatter,
94) -> std::fmt::Result {
95    fmt.indent(|f| {
96        if let Some(child) = child {
97            writeln!(f, "{name}: {}", child.compressor.id())?;
98            for (i, c) in child.children.iter().enumerate() {
99                visit_child(&format!("{name}.{}", i), c.as_ref(), f)?;
100            }
101        } else {
102            writeln!(f, "{name}: uncompressed")?;
103        }
104        Ok(())
105    })
106}
107
108impl<'a> CompressionTree<'a> {
109    pub fn flat(compressor: &'a dyn EncodingCompressor) -> Self {
110        Self::new(compressor, vec![])
111    }
112
113    pub fn new(
114        compressor: &'a dyn EncodingCompressor,
115        children: Vec<Option<CompressionTree<'a>>>,
116    ) -> Self {
117        Self {
118            compressor,
119            children,
120            metadata: None,
121        }
122    }
123
124    /// Save a piece of metadata as part of the compression tree.
125    ///
126    /// This can be specific encoder parameters that were discovered at sample time
127    /// that should be reused when compressing the full array.
128    pub(crate) fn new_with_metadata(
129        compressor: &'a dyn EncodingCompressor,
130        children: Vec<Option<CompressionTree<'a>>>,
131        metadata: Arc<dyn EncoderMetadata>,
132    ) -> Self {
133        Self {
134            compressor,
135            children,
136            metadata: Some(metadata),
137        }
138    }
139
140    pub fn child(&self, idx: usize) -> Option<&CompressionTree<'a>> {
141        self.children[idx].as_ref()
142    }
143
144    /// Compresses array with our compressor without verifying that the compressor can compress this array
145    pub fn compress_unchecked(
146        &self,
147        array: &Array,
148        ctx: &SamplingCompressor<'a>,
149    ) -> VortexResult<CompressedArray<'a>> {
150        self.compressor.compress(
151            array,
152            Some(self.clone()),
153            ctx.for_compressor(self.compressor),
154        )
155    }
156
157    pub fn compress(
158        &self,
159        array: &Array,
160        ctx: &SamplingCompressor<'a>,
161    ) -> Option<VortexResult<CompressedArray<'a>>> {
162        self.compressor
163            .can_compress(array)
164            .map(|c| c.compress(array, Some(self.clone()), ctx.for_compressor(c)))
165    }
166
167    pub fn compressor(&self) -> &dyn EncodingCompressor {
168        self.compressor
169    }
170
171    /// Access the saved opaque metadata.
172    ///
173    /// This will consume the owned metadata, giving the caller ownership of
174    /// the Box.
175    ///
176    /// The value of `T` will almost always be `EncodingCompressor`-specific.
177    pub fn metadata(&mut self) -> Option<Arc<dyn EncoderMetadata>> {
178        std::mem::take(&mut self.metadata)
179    }
180
181    #[allow(clippy::type_complexity)]
182    pub fn into_parts(
183        self,
184    ) -> (
185        &'a dyn EncodingCompressor,
186        Vec<Option<CompressionTree<'a>>>,
187        Option<Arc<dyn EncoderMetadata>>,
188    ) {
189        (self.compressor, self.children, self.metadata)
190    }
191}
192
193#[derive(Debug, Clone)]
194pub struct CompressedArray<'a> {
195    array: Array,
196    path: Option<CompressionTree<'a>>,
197}
198
199impl<'a> CompressedArray<'a> {
200    pub fn uncompressed(array: Array) -> Self {
201        Self { array, path: None }
202    }
203
204    pub fn compressed(
205        compressed: Array,
206        path: Option<CompressionTree<'a>>,
207        uncompressed: impl AsRef<Array>,
208    ) -> Self {
209        let uncompressed = uncompressed.as_ref();
210
211        // Sanity check the compressed array
212        assert_eq!(
213            compressed.len(),
214            uncompressed.len(),
215            "Compressed array {} has different length to uncompressed",
216            compressed.encoding(),
217        );
218        assert_eq!(
219            compressed.dtype(),
220            uncompressed.dtype(),
221            "Compressed array {} has different dtype to uncompressed",
222            compressed.encoding(),
223        );
224
225        // eagerly compute uncompressed size in bytes at compression time, since it's
226        // too expensive to compute after compression
227        let _ = uncompressed
228            .statistics()
229            .compute_uncompressed_size_in_bytes();
230        compressed.inherit_statistics(uncompressed.statistics());
231
232        let compressed = Self {
233            array: compressed,
234            path,
235        };
236        compressed.validate();
237        compressed
238    }
239
240    fn validate(&self) {
241        self.validate_children(self.path.as_ref(), &self.array)
242    }
243
244    fn validate_children(&self, path: Option<&CompressionTree>, array: &Array) {
245        if let Some(path) = path.as_ref() {
246            path.children
247                .iter()
248                .zip_longest(array.children().iter())
249                .for_each(|pair| match pair {
250                    EitherOrBoth::Both(Some(child_tree), child_array) => {
251                        self.validate_children(Some(child_tree), child_array);
252                    }
253                    EitherOrBoth::Left(Some(child_tree)) => {
254                        vortex_panic!(
255                            "Child tree without child array!!\nroot tree: {}\nroot array: {}\nlocal tree: {path}\nlocal array: {}\nproblematic child_tree: {child_tree}",
256                            self.path().as_ref().vortex_expect("must be present"),
257                            self.array.tree_display(),
258                            array.tree_display()
259                        );
260                    }
261                    // if the child_tree is None, we have an uncompressed child array or both were None; fine either way
262                    _ => {},
263                });
264        }
265    }
266
267    #[inline]
268    pub fn array(&self) -> &Array {
269        &self.array
270    }
271
272    #[inline]
273    pub fn into_array(self) -> Array {
274        self.array
275    }
276
277    #[inline]
278    pub fn path(&self) -> &Option<CompressionTree> {
279        &self.path
280    }
281
282    #[inline]
283    pub fn into_path(self) -> Option<CompressionTree<'a>> {
284        self.path
285    }
286
287    #[inline]
288    pub fn into_parts(self) -> (Array, Option<CompressionTree<'a>>) {
289        (self.array, self.path)
290    }
291
292    /// Total size of the array in bytes, including all children and buffers.
293    #[inline]
294    pub fn nbytes(&self) -> usize {
295        self.array.nbytes()
296    }
297}
298
299impl AsRef<Array> for CompressedArray<'_> {
300    fn as_ref(&self) -> &Array {
301        &self.array
302    }
303}