1#![deny(missing_docs)]
5
6use std::fmt::Debug;
34use std::hash::Hash;
35
36use vortex_array::Array;
37use vortex_array::ArrayRef;
38use vortex_array::Canonical;
39use vortex_array::IntoArray;
40use vortex_array::ToCanonical;
41use vortex_array::arrays::ConstantArray;
42use vortex_array::arrays::ExtensionArray;
43use vortex_array::arrays::FixedSizeListArray;
44use vortex_array::arrays::ListArray;
45use vortex_array::arrays::StructArray;
46use vortex_array::arrays::TemporalArray;
47use vortex_array::arrays::list_from_list_view;
48use vortex_array::compute::Cost;
49use vortex_array::vtable::VTable;
50use vortex_array::vtable::ValidityHelper;
51use vortex_dtype::DType;
52use vortex_dtype::Nullability;
53use vortex_dtype::datetime::TemporalMetadata;
54use vortex_error::VortexExpect;
55use vortex_error::VortexResult;
56
57use crate::decimal::compress_decimal;
58pub use crate::float::FloatCompressor;
59pub use crate::float::FloatStats;
60pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode;
61pub use crate::integer::IntCompressor;
62pub use crate::integer::IntegerStats;
63pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode;
64pub use crate::string::StringCompressor;
65pub use crate::string::StringStats;
66pub use crate::temporal::compress_temporal;
67
68mod decimal;
69mod float;
70mod integer;
71mod patches;
72mod rle;
73mod sample;
74mod string;
75mod temporal;
76
77pub struct GenerateStatsOptions {
79 pub count_distinct_values: bool,
81 }
84
85impl Default for GenerateStatsOptions {
86 fn default() -> Self {
87 Self {
88 count_distinct_values: true,
89 }
91 }
92}
93
94const SAMPLE_SIZE: u32 = 64;
95
96pub trait CompressorStats: Debug + Clone {
98 type ArrayVTable: VTable;
100
101 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
103 Self::generate_opts(input, GenerateStatsOptions::default())
104 }
105
106 fn generate_opts(
108 input: &<Self::ArrayVTable as VTable>::Array,
109 opts: GenerateStatsOptions,
110 ) -> Self;
111
112 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
114
115 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
117 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
118 }
119
120 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
122}
123
124pub trait Scheme: Debug {
128 type StatsType: CompressorStats;
130 type CodeType: Copy + Eq + Hash;
132
133 fn code(&self) -> Self::CodeType;
135
136 fn is_constant(&self) -> bool {
138 false
139 }
140
141 fn expected_compression_ratio(
149 &self,
150 stats: &Self::StatsType,
151 is_sample: bool,
152 allowed_cascading: usize,
153 excludes: &[Self::CodeType],
154 ) -> VortexResult<f64> {
155 estimate_compression_ratio_with_sampling(
156 self,
157 stats,
158 is_sample,
159 allowed_cascading,
160 excludes,
161 )
162 }
163
164 fn compress(
166 &self,
167 stats: &Self::StatsType,
168 is_sample: bool,
169 allowed_cascading: usize,
170 excludes: &[Self::CodeType],
171 ) -> VortexResult<ArrayRef>;
172}
173
174fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
175 compressor: &T,
176 stats: &T::StatsType,
177 is_sample: bool,
178 allowed_cascading: usize,
179 excludes: &[T::CodeType],
180) -> VortexResult<f64> {
181 let sample = if is_sample {
182 stats.clone()
183 } else {
184 let source_len = stats.source().len();
186
187 let sample_count = usize::max(
189 (source_len / 100)
190 / usize::try_from(SAMPLE_SIZE).vortex_expect("SAMPLE_SIZE must fit in usize"),
191 10,
192 );
193
194 tracing::trace!(
195 "Sampling {} values out of {}",
196 SAMPLE_SIZE as usize * sample_count,
197 source_len
198 );
199
200 stats.sample(
201 SAMPLE_SIZE,
202 sample_count
203 .try_into()
204 .vortex_expect("sample count must fit in u32"),
205 )
206 };
207
208 let after = compressor
209 .compress(&sample, true, allowed_cascading, excludes)?
210 .nbytes();
211 let before = sample.source().nbytes();
212
213 tracing::debug!(
214 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
215 before as f64 / after as f64
216 );
217
218 Ok(before as f64 / after as f64)
219}
220
221const MAX_CASCADE: usize = 3;
222
223pub trait Compressor {
233 type ArrayVTable: VTable;
235 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
237 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
239
240 fn schemes() -> &'static [&'static Self::SchemeType];
242 fn default_scheme() -> &'static Self::SchemeType;
244 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
246
247 fn compress(
252 array: &<Self::ArrayVTable as VTable>::Array,
253 is_sample: bool,
254 allowed_cascading: usize,
255 excludes: &[<Self::SchemeType as Scheme>::CodeType],
256 ) -> VortexResult<ArrayRef>
257 where
258 Self::SchemeType: 'static,
259 {
260 if array.is_empty() {
262 return Ok(array.to_array());
263 }
264
265 let stats = if excludes.contains(&Self::dict_scheme_code()) {
267 Self::StatsType::generate_opts(
268 array,
269 GenerateStatsOptions {
270 count_distinct_values: false,
271 },
272 )
273 } else {
274 Self::StatsType::generate(array)
275 };
276 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
277
278 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
279 if output.nbytes() < array.nbytes() {
280 Ok(output)
281 } else {
282 tracing::debug!("resulting tree too large: {}", output.display_tree());
283 Ok(array.to_array())
284 }
285 }
286
287 #[allow(clippy::cognitive_complexity)]
293 fn choose_scheme(
294 stats: &Self::StatsType,
295 is_sample: bool,
296 allowed_cascading: usize,
297 excludes: &[<Self::SchemeType as Scheme>::CodeType],
298 ) -> VortexResult<&'static Self::SchemeType> {
299 let mut best_ratio = 1.0;
300 let mut best_scheme: Option<&'static Self::SchemeType> = None;
301
302 let depth = MAX_CASCADE - allowed_cascading;
304
305 for scheme in Self::schemes().iter() {
306 if excludes.contains(&scheme.code()) {
307 continue;
308 }
309
310 if is_sample && scheme.is_constant() {
312 continue;
313 }
314
315 tracing::trace!(is_sample, depth, ?scheme, "Trying compression scheme");
316
317 let ratio =
318 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
319 tracing::trace!(
320 is_sample,
321 depth,
322 ratio,
323 ?scheme,
324 "Expected compression result"
325 );
326
327 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
328 if ratio > best_ratio {
329 best_ratio = ratio;
330 best_scheme = Some(*scheme);
331 }
332 } else {
333 tracing::trace!(
334 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
335 );
336 }
337 }
338
339 tracing::trace!(depth, scheme = ?best_scheme, ratio = best_ratio, "best scheme found");
340
341 if let Some(best) = best_scheme {
342 Ok(best)
343 } else {
344 Ok(Self::default_scheme())
345 }
346 }
347}
348
349#[derive(Default, Debug, Clone)]
371pub struct BtrBlocksCompressor {
372 pub exclude_int_dict_encoding: bool,
378}
379
380impl BtrBlocksCompressor {
381 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
385 let canonical = array.to_canonical();
387
388 let compact = canonical.compact()?;
390
391 self.compress_canonical(compact)
392 }
393
394 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
398 match array {
399 Canonical::Null(null_array) => Ok(null_array.into_array()),
400 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
402 Canonical::Primitive(primitive) => {
403 if primitive.ptype().is_int() {
404 if self.exclude_int_dict_encoding {
405 IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
406 } else {
407 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
408 }
409 } else {
410 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
411 }
412 }
413 Canonical::Decimal(decimal) => compress_decimal(&decimal),
414 Canonical::Struct(struct_array) => {
415 let fields = struct_array
416 .fields()
417 .iter()
418 .map(|field| self.compress(field))
419 .collect::<Result<Vec<_>, _>>()?;
420
421 Ok(StructArray::try_new(
422 struct_array.names().clone(),
423 fields,
424 struct_array.len(),
425 struct_array.validity().clone(),
426 )?
427 .into_array())
428 }
429 Canonical::List(list_view_array) => {
430 let list_array = list_from_list_view(list_view_array);
433
434 let list_array = list_array.reset_offsets(true)?;
438
439 let compressed_elems = self.compress(list_array.elements())?;
440
441 let compressed_offsets = IntCompressor::compress_no_dict(
446 &list_array.offsets().to_primitive().narrow()?,
447 false,
448 MAX_CASCADE,
449 &[],
450 )?;
451
452 Ok(ListArray::try_new(
453 compressed_elems,
454 compressed_offsets,
455 list_array.validity().clone(),
456 )?
457 .into_array())
458 }
459 Canonical::FixedSizeList(fsl_array) => {
460 let compressed_elems = self.compress(fsl_array.elements())?;
461
462 Ok(FixedSizeListArray::try_new(
463 compressed_elems,
464 fsl_array.list_size(),
465 fsl_array.validity().clone(),
466 fsl_array.len(),
467 )?
468 .into_array())
469 }
470 Canonical::VarBinView(strings) => {
471 if strings
472 .dtype()
473 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
474 {
475 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
476 } else {
477 Ok(strings.into_array())
479 }
480 }
481 Canonical::Extension(ext_array) => {
482 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
484 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
485 {
486 if temporal_array.as_ref().is_constant_opts(Cost::Canonicalize) {
487 return Ok(ConstantArray::new(
488 temporal_array.as_ref().scalar_at(0),
489 ext_array.len(),
490 )
491 .into_array());
492 }
493 return compress_temporal(temporal_array);
494 }
495
496 let compressed_storage = self.compress(ext_array.storage())?;
498
499 Ok(
500 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
501 .into_array(),
502 )
503 }
504 }
505 }
506}