1use std::fmt::Debug;
5use std::hash::Hash;
6
7use itertools::Itertools;
8use vortex_array::arrays::{
9 ExtensionArray, FixedSizeListArray, ListArray, StructArray, TemporalArray,
10};
11use vortex_array::vtable::{VTable, ValidityHelper};
12use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
13use vortex_dtype::datetime::TemporalMetadata;
14use vortex_dtype::{DType, Nullability};
15use vortex_error::{VortexResult, VortexUnwrap};
16
17use crate::decimal::compress_decimal;
18pub use crate::float::FloatCompressor;
19pub use crate::integer::IntCompressor;
20pub use crate::string::StringCompressor;
21pub use crate::temporal::compress_temporal;
22
23mod decimal;
24mod float;
25pub mod integer;
26mod patches;
27mod sample;
28mod string;
29mod temporal;
30
31pub struct GenerateStatsOptions {
32 pub count_distinct_values: bool,
33 }
36
37impl Default for GenerateStatsOptions {
38 fn default() -> Self {
39 Self {
40 count_distinct_values: true,
41 }
43 }
44}
45
46const SAMPLE_SIZE: u32 = 64;
47
48pub trait CompressorStats: Debug + Clone {
50 type ArrayVTable: VTable;
51
52 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
54 Self::generate_opts(input, GenerateStatsOptions::default())
55 }
56
57 fn generate_opts(
59 input: &<Self::ArrayVTable as VTable>::Array,
60 opts: GenerateStatsOptions,
61 ) -> Self;
62
63 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
64
65 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
66 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
67 }
68
69 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
70}
71
72pub trait Scheme: Debug {
76 type StatsType: CompressorStats;
77 type CodeType: Copy + Eq + Hash;
78
79 fn code(&self) -> Self::CodeType;
81
82 fn is_constant(&self) -> bool {
84 false
85 }
86
87 fn expected_compression_ratio(
95 &self,
96 stats: &Self::StatsType,
97 is_sample: bool,
98 allowed_cascading: usize,
99 excludes: &[Self::CodeType],
100 ) -> VortexResult<f64> {
101 estimate_compression_ratio_with_sampling(
102 self,
103 stats,
104 is_sample,
105 allowed_cascading,
106 excludes,
107 )
108 }
109
110 fn compress(
112 &self,
113 stats: &Self::StatsType,
114 is_sample: bool,
115 allowed_cascading: usize,
116 excludes: &[Self::CodeType],
117 ) -> VortexResult<ArrayRef>;
118}
119
120pub struct SchemeTree {
121 pub scheme: u8,
126 pub children: Vec<SchemeTree>,
128}
129
130pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
131 compressor: &T,
132 stats: &T::StatsType,
133 is_sample: bool,
134 allowed_cascading: usize,
135 excludes: &[T::CodeType],
136) -> VortexResult<f64> {
137 let sample = if is_sample {
138 stats.clone()
139 } else {
140 let source_len = stats.source().len();
142
143 let sample_count = usize::max(
145 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
146 10,
147 );
148
149 log::trace!(
150 "Sampling {} values out of {}",
151 SAMPLE_SIZE as usize * sample_count,
152 source_len
153 );
154
155 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
156 };
157
158 let after = compressor
159 .compress(&sample, true, allowed_cascading, excludes)?
160 .nbytes();
161 let before = sample.source().nbytes();
162
163 log::debug!(
164 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
165 before as f64 / after as f64
166 );
167
168 Ok(before as f64 / after as f64)
169}
170
171const MAX_CASCADE: usize = 3;
172
173pub trait Compressor {
179 type ArrayVTable: VTable;
180 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
181
182 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
184
185 fn schemes() -> &'static [&'static Self::SchemeType];
186 fn default_scheme() -> &'static Self::SchemeType;
187 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
188
189 fn compress(
190 array: &<Self::ArrayVTable as VTable>::Array,
191 is_sample: bool,
192 allowed_cascading: usize,
193 excludes: &[<Self::SchemeType as Scheme>::CodeType],
194 ) -> VortexResult<ArrayRef>
195 where
196 Self::SchemeType: 'static,
197 {
198 if array.is_empty() {
200 return Ok(array.to_array());
201 }
202
203 let stats = if excludes.contains(&Self::dict_scheme_code()) {
205 Self::StatsType::generate_opts(
206 array,
207 GenerateStatsOptions {
208 count_distinct_values: false,
209 },
210 )
211 } else {
212 Self::StatsType::generate(array)
213 };
214 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
215
216 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
217 if output.nbytes() < array.nbytes() {
218 Ok(output)
219 } else {
220 log::debug!("resulting tree too large: {}", output.display_tree());
221 Ok(array.to_array())
222 }
223 }
224
225 fn choose_scheme(
226 stats: &Self::StatsType,
227 is_sample: bool,
228 allowed_cascading: usize,
229 excludes: &[<Self::SchemeType as Scheme>::CodeType],
230 ) -> VortexResult<&'static Self::SchemeType> {
231 let mut best_ratio = 1.0;
232 let mut best_scheme: Option<&'static Self::SchemeType> = None;
233
234 let depth = MAX_CASCADE - allowed_cascading;
236
237 for scheme in Self::schemes().iter() {
238 if excludes.contains(&scheme.code()) {
239 continue;
240 }
241
242 if is_sample && scheme.is_constant() {
244 continue;
245 }
246
247 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
248
249 let ratio =
250 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
251 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
252
253 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
254 if ratio > best_ratio {
255 best_ratio = ratio;
256 best_scheme = Some(*scheme);
257 }
258 } else {
259 log::trace!(
260 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
261 );
262 }
263 }
264
265 log::debug!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
266
267 if let Some(best) = best_scheme {
268 Ok(best)
269 } else {
270 Ok(Self::default_scheme())
271 }
272 }
273}
274
275#[derive(Default, Debug, Clone)]
276pub struct BtrBlocksCompressor {
277 pub exclude_int_dict_encoding: bool,
278}
279
280impl BtrBlocksCompressor {
281 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
282 let canonical = array.to_canonical();
284
285 let compact = canonical.compact()?;
287
288 self.compress_canonical(compact)
289 }
290
291 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
292 match array {
293 Canonical::Null(null_array) => Ok(null_array.into_array()),
294 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
296 Canonical::Primitive(primitive) => {
297 if primitive.ptype().is_int() {
298 if self.exclude_int_dict_encoding {
299 IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
300 } else {
301 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
302 }
303 } else {
304 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
305 }
306 }
307 Canonical::Decimal(decimal) => compress_decimal(&decimal),
308 Canonical::Struct(struct_array) => {
309 let fields = struct_array
310 .fields()
311 .iter()
312 .map(|field| self.compress(field))
313 .try_collect()?;
314
315 Ok(StructArray::try_new(
316 struct_array.names().clone(),
317 fields,
318 struct_array.len(),
319 struct_array.validity().clone(),
320 )?
321 .into_array())
322 }
323 Canonical::List(list_array) => {
324 let compressed_elems = self.compress(list_array.elements())?;
326 let compressed_offsets = IntCompressor::compress_no_dict(
327 &list_array.offsets().to_primitive().downcast()?,
328 false,
329 MAX_CASCADE,
330 &[],
331 )?;
332
333 Ok(ListArray::try_new(
334 compressed_elems,
335 compressed_offsets,
336 list_array.validity().clone(),
337 )?
338 .into_array())
339 }
340 Canonical::FixedSizeList(fsl_array) => {
341 let compressed_elems = self.compress(fsl_array.elements())?;
342
343 Ok(FixedSizeListArray::try_new(
344 compressed_elems,
345 fsl_array.list_size(),
346 fsl_array.validity().clone(),
347 fsl_array.len(),
348 )?
349 .into_array())
350 }
351 Canonical::VarBinView(strings) => {
352 if strings
353 .dtype()
354 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
355 {
356 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
357 } else {
358 Ok(strings.into_array())
360 }
361 }
362 Canonical::Extension(ext_array) => {
363 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
365 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
366 {
367 return compress_temporal(temporal_array);
368 }
369
370 let compressed_storage = self.compress(ext_array.storage())?;
372
373 Ok(
374 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
375 .into_array(),
376 )
377 }
378 }
379 }
380}