1use std::fmt::Debug;
2use std::hash::Hash;
3
4use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
5use vortex_array::nbytes::NBytes;
6use vortex_array::variants::{ExtensionArrayTrait, PrimitiveArrayTrait, StructArrayTrait};
7use vortex_array::{Array, ArrayRef, Canonical};
8use vortex_datetime_dtype::TemporalMetadata;
9use vortex_dtype::{DType, Nullability};
10use vortex_error::{VortexExpect, VortexResult};
11
12pub use crate::float::FloatCompressor;
13pub use crate::integer::IntCompressor;
14pub use crate::string::StringCompressor;
15pub use crate::temporal::compress_temporal;
16
17mod downscale;
18mod float;
19pub mod integer;
20mod patches;
21mod sample;
22mod string;
23mod temporal;
24
25pub struct GenerateStatsOptions {
26 pub count_distinct_values: bool,
27 }
30
31impl Default for GenerateStatsOptions {
32 fn default() -> Self {
33 Self {
34 count_distinct_values: true,
35 }
37 }
38}
39
40pub trait CompressorStats: Clone {
42 type ArrayType: Array;
43
44 fn generate(input: &Self::ArrayType) -> Self {
46 Self::generate_opts(input, GenerateStatsOptions::default())
47 }
48
49 fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self;
50
51 fn source(&self) -> &Self::ArrayType;
52
53 fn sample(&self, sample_size: u16, sample_count: u16) -> Self {
54 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
55 }
56
57 fn sample_opts(&self, sample_size: u16, sample_count: u16, opts: GenerateStatsOptions) -> Self;
58}
59
60pub trait Scheme: Debug {
64 type StatsType: CompressorStats;
65 type CodeType: Copy + Eq + Hash;
66
67 fn code(&self) -> Self::CodeType;
69
70 fn is_constant(&self) -> bool {
72 false
73 }
74
75 fn expected_compression_ratio(
83 &self,
84 stats: &Self::StatsType,
85 is_sample: bool,
86 allowed_cascading: usize,
87 excludes: &[Self::CodeType],
88 ) -> VortexResult<f64> {
89 estimate_compression_ratio_with_sampling(
90 self,
91 stats,
92 is_sample,
93 allowed_cascading,
94 excludes,
95 )
96 }
97
98 fn compress(
100 &self,
101 stats: &Self::StatsType,
102 is_sample: bool,
103 allowed_cascading: usize,
104 excludes: &[Self::CodeType],
105 ) -> VortexResult<ArrayRef>;
106}
107
108pub struct SchemeTree {
109 pub scheme: u8,
114 pub children: Vec<SchemeTree>,
116}
117
118pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
119 compressor: &T,
120 stats: &T::StatsType,
121 is_sample: bool,
122 allowed_cascading: usize,
123 excludes: &[T::CodeType],
124) -> VortexResult<f64> {
125 let sample = if is_sample {
126 stats.clone()
127 } else {
128 stats.sample(64, 10)
129 };
130
131 let after = compressor
132 .compress(&sample, true, allowed_cascading, excludes)?
133 .nbytes();
134 let before = sample.source().nbytes();
135
136 log::debug!(
137 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
138 before as f64 / after as f64
139 );
140
141 Ok(before as f64 / after as f64)
142}
143
144const MAX_CASCADE: usize = 3;
145
146pub trait Compressor {
152 type ArrayType: Array;
153 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
154
155 type StatsType: CompressorStats<ArrayType = Self::ArrayType>;
157
158 fn schemes() -> &'static [&'static Self::SchemeType];
159 fn default_scheme() -> &'static Self::SchemeType;
160 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
161
162 fn compress(
163 array: &Self::ArrayType,
164 is_sample: bool,
165 allowed_cascading: usize,
166 excludes: &[<Self::SchemeType as Scheme>::CodeType],
167 ) -> VortexResult<ArrayRef>
168 where
169 Self::SchemeType: 'static,
170 {
171 if array.is_empty() {
173 return Ok(array.to_array());
174 }
175
176 let stats = if excludes.contains(&Self::dict_scheme_code()) {
178 Self::StatsType::generate_opts(
179 array,
180 GenerateStatsOptions {
181 count_distinct_values: false,
182 },
183 )
184 } else {
185 Self::StatsType::generate(array)
186 };
187 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
188
189 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
190 if output.nbytes() < array.nbytes() {
191 Ok(output)
192 } else {
193 log::debug!("resulting tree too large: {}", output.tree_display());
194 Ok(array.to_array())
195 }
196 }
197
198 fn choose_scheme(
199 stats: &Self::StatsType,
200 is_sample: bool,
201 allowed_cascading: usize,
202 excludes: &[<Self::SchemeType as Scheme>::CodeType],
203 ) -> VortexResult<&'static Self::SchemeType> {
204 let mut best_ratio = 1.0;
205 let mut best_scheme: Option<&'static Self::SchemeType> = None;
206
207 let depth = MAX_CASCADE - allowed_cascading;
209
210 for scheme in Self::schemes().iter() {
211 if excludes.contains(&scheme.code()) {
212 continue;
213 }
214
215 if is_sample && scheme.is_constant() {
217 continue;
218 }
219
220 log::debug!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
221
222 let ratio =
223 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
224 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:#?} ratio = {ratio}");
225
226 if ratio > best_ratio {
227 best_ratio = ratio;
228 let _ = best_scheme.insert(*scheme);
229 }
230 }
231
232 log::trace!("depth={depth} best scheme = {best_scheme:#?} ratio = {best_ratio}");
233
234 if let Some(best) = best_scheme {
235 Ok(best)
236 } else {
237 Ok(Self::default_scheme())
238 }
239 }
240}
241
242#[derive(Debug)]
243pub struct BtrBlocksCompressor;
244
245impl BtrBlocksCompressor {
246 #[allow(clippy::only_used_in_recursion)]
247 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
248 match array.to_canonical()? {
249 Canonical::Null(null_array) => Ok(null_array.into_array()),
250 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
252 Canonical::Primitive(primitive) => {
253 if primitive.ptype().is_int() {
254 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
255 } else {
256 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
257 }
258 }
259 Canonical::Struct(struct_array) => {
260 let mut fields = Vec::new();
261 for idx in 0..struct_array.nfields() {
262 let field = struct_array
263 .maybe_null_field_by_idx(idx)
264 .vortex_expect("field access");
265 let compressed = self.compress(&field)?;
266 fields.push(compressed);
267 }
268
269 Ok(StructArray::try_new(
270 struct_array.names().clone(),
271 fields,
272 struct_array.len(),
273 struct_array.validity().clone(),
274 )?
275 .into_array())
276 }
277 Canonical::List(list_array) => {
278 let compressed_elems = self.compress(list_array.elements())?;
280 let compressed_offsets = self.compress(list_array.offsets())?;
281
282 Ok(ListArray::try_new(
283 compressed_elems,
284 compressed_offsets,
285 list_array.validity().clone(),
286 )?
287 .into_array())
288 }
289 Canonical::VarBinView(strings) => {
290 if strings
291 .dtype()
292 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
293 {
294 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
295 } else {
296 Ok(strings.into_array())
298 }
299 }
300 Canonical::Extension(ext_array) => {
301 if let Ok(temporal_array) =
303 TemporalArray::try_from(ext_array.to_array().into_array())
304 {
305 if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
306 return compress_temporal(temporal_array);
307 }
308 }
309
310 let compressed_storage = self.compress(ext_array.storage())?;
312
313 Ok(
314 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
315 .into_array(),
316 )
317 }
318 }
319 }
320}