vortex_sampling_compressor/compressors/
mod.rs1use std::any::Any;
2use std::fmt::{Debug, Display, Formatter};
3use std::hash::{Hash, Hasher};
4use std::sync::Arc;
5
6use itertools::{EitherOrBoth, Itertools};
7use vortex_array::aliases::hash_set::HashSet;
8use vortex_array::tree::TreeFormatter;
9use vortex_array::{Array, EncodingId};
10use vortex_error::{vortex_panic, VortexExpect, VortexResult};
11
12use crate::SamplingCompressor;
13
14pub mod alp;
15pub mod alp_rd;
16pub mod bitpacked;
17pub mod chunked;
18pub mod constant;
19pub mod date_time_parts;
20pub mod delta;
21pub mod dict;
22pub mod r#for;
23pub mod fsst;
24pub mod list;
25pub mod runend;
26pub mod sparse;
27pub mod struct_;
28pub mod varbin;
29pub mod zigzag;
30
31pub trait EncodingCompressor: Sync + Send + Debug {
32 fn id(&self) -> &str;
33
34 fn cost(&self) -> u8;
35
36 fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor>;
37
38 fn compress<'a>(
39 &'a self,
40 array: &Array,
41 like: Option<CompressionTree<'a>>,
42 ctx: SamplingCompressor<'a>,
43 ) -> VortexResult<CompressedArray<'a>>;
44
45 fn used_encodings(&self) -> HashSet<EncodingId>;
46}
47
48pub type CompressorRef<'a> = &'a dyn EncodingCompressor;
49
50impl PartialEq for dyn EncodingCompressor + '_ {
51 fn eq(&self, other: &Self) -> bool {
52 self.id() == other.id()
53 }
54}
55impl Eq for dyn EncodingCompressor + '_ {}
56impl Hash for dyn EncodingCompressor + '_ {
57 fn hash<H: Hasher>(&self, state: &mut H) {
58 self.id().hash(state)
59 }
60}
61
62#[derive(Clone)]
63pub struct CompressionTree<'a> {
64 compressor: &'a dyn EncodingCompressor,
65 children: Vec<Option<CompressionTree<'a>>>,
66 metadata: Option<Arc<dyn EncoderMetadata>>,
67}
68
69impl Debug for CompressionTree<'_> {
70 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71 write!(f, "{self}")
72 }
73}
74
75pub trait EncoderMetadata {
80 fn as_any(&self) -> &dyn Any;
81}
82
83impl Display for CompressionTree<'_> {
84 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85 let mut fmt = TreeFormatter::new(f, "".to_string());
86 visit_child("root", Some(self), &mut fmt)
87 }
88}
89
90fn visit_child(
91 name: &str,
92 child: Option<&CompressionTree>,
93 fmt: &mut TreeFormatter,
94) -> std::fmt::Result {
95 fmt.indent(|f| {
96 if let Some(child) = child {
97 writeln!(f, "{name}: {}", child.compressor.id())?;
98 for (i, c) in child.children.iter().enumerate() {
99 visit_child(&format!("{name}.{}", i), c.as_ref(), f)?;
100 }
101 } else {
102 writeln!(f, "{name}: uncompressed")?;
103 }
104 Ok(())
105 })
106}
107
108impl<'a> CompressionTree<'a> {
109 pub fn flat(compressor: &'a dyn EncodingCompressor) -> Self {
110 Self::new(compressor, vec![])
111 }
112
113 pub fn new(
114 compressor: &'a dyn EncodingCompressor,
115 children: Vec<Option<CompressionTree<'a>>>,
116 ) -> Self {
117 Self {
118 compressor,
119 children,
120 metadata: None,
121 }
122 }
123
124 pub(crate) fn new_with_metadata(
129 compressor: &'a dyn EncodingCompressor,
130 children: Vec<Option<CompressionTree<'a>>>,
131 metadata: Arc<dyn EncoderMetadata>,
132 ) -> Self {
133 Self {
134 compressor,
135 children,
136 metadata: Some(metadata),
137 }
138 }
139
140 pub fn child(&self, idx: usize) -> Option<&CompressionTree<'a>> {
141 self.children[idx].as_ref()
142 }
143
144 pub fn compress_unchecked(
146 &self,
147 array: &Array,
148 ctx: &SamplingCompressor<'a>,
149 ) -> VortexResult<CompressedArray<'a>> {
150 self.compressor.compress(
151 array,
152 Some(self.clone()),
153 ctx.for_compressor(self.compressor),
154 )
155 }
156
157 pub fn compress(
158 &self,
159 array: &Array,
160 ctx: &SamplingCompressor<'a>,
161 ) -> Option<VortexResult<CompressedArray<'a>>> {
162 self.compressor
163 .can_compress(array)
164 .map(|c| c.compress(array, Some(self.clone()), ctx.for_compressor(c)))
165 }
166
167 pub fn compressor(&self) -> &dyn EncodingCompressor {
168 self.compressor
169 }
170
171 pub fn metadata(&mut self) -> Option<Arc<dyn EncoderMetadata>> {
178 std::mem::take(&mut self.metadata)
179 }
180
181 #[allow(clippy::type_complexity)]
182 pub fn into_parts(
183 self,
184 ) -> (
185 &'a dyn EncodingCompressor,
186 Vec<Option<CompressionTree<'a>>>,
187 Option<Arc<dyn EncoderMetadata>>,
188 ) {
189 (self.compressor, self.children, self.metadata)
190 }
191}
192
193#[derive(Debug, Clone)]
194pub struct CompressedArray<'a> {
195 array: Array,
196 path: Option<CompressionTree<'a>>,
197}
198
199impl<'a> CompressedArray<'a> {
200 pub fn uncompressed(array: Array) -> Self {
201 Self { array, path: None }
202 }
203
204 pub fn compressed(
205 compressed: Array,
206 path: Option<CompressionTree<'a>>,
207 uncompressed: impl AsRef<Array>,
208 ) -> Self {
209 let uncompressed = uncompressed.as_ref();
210
211 assert_eq!(
213 compressed.len(),
214 uncompressed.len(),
215 "Compressed array {} has different length to uncompressed",
216 compressed.encoding(),
217 );
218 assert_eq!(
219 compressed.dtype(),
220 uncompressed.dtype(),
221 "Compressed array {} has different dtype to uncompressed",
222 compressed.encoding(),
223 );
224
225 let _ = uncompressed
228 .statistics()
229 .compute_uncompressed_size_in_bytes();
230 compressed.inherit_statistics(uncompressed.statistics());
231
232 let compressed = Self {
233 array: compressed,
234 path,
235 };
236 compressed.validate();
237 compressed
238 }
239
240 fn validate(&self) {
241 self.validate_children(self.path.as_ref(), &self.array)
242 }
243
244 fn validate_children(&self, path: Option<&CompressionTree>, array: &Array) {
245 if let Some(path) = path.as_ref() {
246 path.children
247 .iter()
248 .zip_longest(array.children().iter())
249 .for_each(|pair| match pair {
250 EitherOrBoth::Both(Some(child_tree), child_array) => {
251 self.validate_children(Some(child_tree), child_array);
252 }
253 EitherOrBoth::Left(Some(child_tree)) => {
254 vortex_panic!(
255 "Child tree without child array!!\nroot tree: {}\nroot array: {}\nlocal tree: {path}\nlocal array: {}\nproblematic child_tree: {child_tree}",
256 self.path().as_ref().vortex_expect("must be present"),
257 self.array.tree_display(),
258 array.tree_display()
259 );
260 }
261 _ => {},
263 });
264 }
265 }
266
267 #[inline]
268 pub fn array(&self) -> &Array {
269 &self.array
270 }
271
272 #[inline]
273 pub fn into_array(self) -> Array {
274 self.array
275 }
276
277 #[inline]
278 pub fn path(&self) -> &Option<CompressionTree> {
279 &self.path
280 }
281
282 #[inline]
283 pub fn into_path(self) -> Option<CompressionTree<'a>> {
284 self.path
285 }
286
287 #[inline]
288 pub fn into_parts(self) -> (Array, Option<CompressionTree<'a>>) {
289 (self.array, self.path)
290 }
291
292 #[inline]
294 pub fn nbytes(&self) -> usize {
295 self.array.nbytes()
296 }
297}
298
299impl AsRef<Array> for CompressedArray<'_> {
300 fn as_ref(&self) -> &Array {
301 &self.array
302 }
303}