1#![deny(missing_docs)]
5
6use std::fmt::Debug;
34use std::hash::Hash;
35
36use itertools::Itertools;
37use vortex_array::arrays::{
38 ExtensionArray, FixedSizeListArray, ListArray, StructArray, TemporalArray,
39};
40use vortex_array::vtable::{VTable, ValidityHelper};
41use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
42use vortex_dtype::datetime::TemporalMetadata;
43use vortex_dtype::{DType, Nullability};
44use vortex_error::{VortexResult, VortexUnwrap};
45
46use crate::decimal::compress_decimal;
47pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode;
48pub use crate::float::{FloatCompressor, FloatStats};
49pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode;
50pub use crate::integer::{IntCompressor, IntegerStats};
51pub use crate::string::{StringCompressor, StringStats};
52pub use crate::temporal::compress_temporal;
53
54mod decimal;
55mod float;
56mod integer;
57mod patches;
58mod sample;
59mod string;
60mod temporal;
61
62pub struct GenerateStatsOptions {
64 pub count_distinct_values: bool,
66 }
69
70impl Default for GenerateStatsOptions {
71 fn default() -> Self {
72 Self {
73 count_distinct_values: true,
74 }
76 }
77}
78
79const SAMPLE_SIZE: u32 = 64;
80
81pub trait CompressorStats: Debug + Clone {
83 type ArrayVTable: VTable;
85
86 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
88 Self::generate_opts(input, GenerateStatsOptions::default())
89 }
90
91 fn generate_opts(
93 input: &<Self::ArrayVTable as VTable>::Array,
94 opts: GenerateStatsOptions,
95 ) -> Self;
96
97 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
99
100 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
102 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
103 }
104
105 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
107}
108
109pub trait Scheme: Debug {
113 type StatsType: CompressorStats;
115 type CodeType: Copy + Eq + Hash;
117
118 fn code(&self) -> Self::CodeType;
120
121 fn is_constant(&self) -> bool {
123 false
124 }
125
126 fn expected_compression_ratio(
134 &self,
135 stats: &Self::StatsType,
136 is_sample: bool,
137 allowed_cascading: usize,
138 excludes: &[Self::CodeType],
139 ) -> VortexResult<f64> {
140 estimate_compression_ratio_with_sampling(
141 self,
142 stats,
143 is_sample,
144 allowed_cascading,
145 excludes,
146 )
147 }
148
149 fn compress(
151 &self,
152 stats: &Self::StatsType,
153 is_sample: bool,
154 allowed_cascading: usize,
155 excludes: &[Self::CodeType],
156 ) -> VortexResult<ArrayRef>;
157}
158
159fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
160 compressor: &T,
161 stats: &T::StatsType,
162 is_sample: bool,
163 allowed_cascading: usize,
164 excludes: &[T::CodeType],
165) -> VortexResult<f64> {
166 let sample = if is_sample {
167 stats.clone()
168 } else {
169 let source_len = stats.source().len();
171
172 let sample_count = usize::max(
174 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
175 10,
176 );
177
178 log::trace!(
179 "Sampling {} values out of {}",
180 SAMPLE_SIZE as usize * sample_count,
181 source_len
182 );
183
184 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
185 };
186
187 let after = compressor
188 .compress(&sample, true, allowed_cascading, excludes)?
189 .nbytes();
190 let before = sample.source().nbytes();
191
192 log::debug!(
193 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
194 before as f64 / after as f64
195 );
196
197 Ok(before as f64 / after as f64)
198}
199
200const MAX_CASCADE: usize = 3;
201
202pub trait Compressor {
212 type ArrayVTable: VTable;
214 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
216 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
218
219 fn schemes() -> &'static [&'static Self::SchemeType];
221 fn default_scheme() -> &'static Self::SchemeType;
223 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
225
226 fn compress(
231 array: &<Self::ArrayVTable as VTable>::Array,
232 is_sample: bool,
233 allowed_cascading: usize,
234 excludes: &[<Self::SchemeType as Scheme>::CodeType],
235 ) -> VortexResult<ArrayRef>
236 where
237 Self::SchemeType: 'static,
238 {
239 if array.is_empty() {
241 return Ok(array.to_array());
242 }
243
244 let stats = if excludes.contains(&Self::dict_scheme_code()) {
246 Self::StatsType::generate_opts(
247 array,
248 GenerateStatsOptions {
249 count_distinct_values: false,
250 },
251 )
252 } else {
253 Self::StatsType::generate(array)
254 };
255 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
256
257 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
258 if output.nbytes() < array.nbytes() {
259 Ok(output)
260 } else {
261 log::debug!("resulting tree too large: {}", output.display_tree());
262 Ok(array.to_array())
263 }
264 }
265
266 fn choose_scheme(
272 stats: &Self::StatsType,
273 is_sample: bool,
274 allowed_cascading: usize,
275 excludes: &[<Self::SchemeType as Scheme>::CodeType],
276 ) -> VortexResult<&'static Self::SchemeType> {
277 let mut best_ratio = 1.0;
278 let mut best_scheme: Option<&'static Self::SchemeType> = None;
279
280 let depth = MAX_CASCADE - allowed_cascading;
282
283 for scheme in Self::schemes().iter() {
284 if excludes.contains(&scheme.code()) {
285 continue;
286 }
287
288 if is_sample && scheme.is_constant() {
290 continue;
291 }
292
293 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
294
295 let ratio =
296 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
297 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
298
299 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
300 if ratio > best_ratio {
301 best_ratio = ratio;
302 best_scheme = Some(*scheme);
303 }
304 } else {
305 log::trace!(
306 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
307 );
308 }
309 }
310
311 log::debug!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
312
313 if let Some(best) = best_scheme {
314 Ok(best)
315 } else {
316 Ok(Self::default_scheme())
317 }
318 }
319}
320
321#[derive(Default, Debug, Clone)]
343pub struct BtrBlocksCompressor {
344 pub exclude_int_dict_encoding: bool,
350}
351
352impl BtrBlocksCompressor {
353 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
357 let canonical = array.to_canonical();
359
360 let compact = canonical.compact()?;
362
363 self.compress_canonical(compact)
364 }
365
366 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
370 match array {
371 Canonical::Null(null_array) => Ok(null_array.into_array()),
372 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
374 Canonical::Primitive(primitive) => {
375 if primitive.ptype().is_int() {
376 if self.exclude_int_dict_encoding {
377 IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
378 } else {
379 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
380 }
381 } else {
382 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
383 }
384 }
385 Canonical::Decimal(decimal) => compress_decimal(&decimal),
386 Canonical::Struct(struct_array) => {
387 let fields = struct_array
388 .fields()
389 .iter()
390 .map(|field| self.compress(field))
391 .try_collect()?;
392
393 Ok(StructArray::try_new(
394 struct_array.names().clone(),
395 fields,
396 struct_array.len(),
397 struct_array.validity().clone(),
398 )?
399 .into_array())
400 }
401 Canonical::List(list_array) => {
402 let compressed_elems = self.compress(list_array.elements())?;
404 let compressed_offsets = IntCompressor::compress_no_dict(
405 &list_array.offsets().to_primitive().downcast()?,
406 false,
407 MAX_CASCADE,
408 &[],
409 )?;
410
411 Ok(ListArray::try_new(
412 compressed_elems,
413 compressed_offsets,
414 list_array.validity().clone(),
415 )?
416 .into_array())
417 }
418 Canonical::FixedSizeList(fsl_array) => {
419 let compressed_elems = self.compress(fsl_array.elements())?;
420
421 Ok(FixedSizeListArray::try_new(
422 compressed_elems,
423 fsl_array.list_size(),
424 fsl_array.validity().clone(),
425 fsl_array.len(),
426 )?
427 .into_array())
428 }
429 Canonical::VarBinView(strings) => {
430 if strings
431 .dtype()
432 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
433 {
434 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
435 } else {
436 Ok(strings.into_array())
438 }
439 }
440 Canonical::Extension(ext_array) => {
441 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
443 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
444 {
445 return compress_temporal(temporal_array);
446 }
447
448 let compressed_storage = self.compress(ext_array.storage())?;
450
451 Ok(
452 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
453 .into_array(),
454 )
455 }
456 }
457 }
458}