1use std::fmt::Debug;
5use std::hash::Hash;
6
7use itertools::Itertools;
8use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
9use vortex_array::vtable::{VTable, ValidityHelper};
10use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
11use vortex_dtype::datetime::TemporalMetadata;
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexResult, VortexUnwrap};
14
15use crate::decimal::compress_decimal;
16pub use crate::float::FloatCompressor;
17pub use crate::integer::IntCompressor;
18pub use crate::string::StringCompressor;
19pub use crate::temporal::compress_temporal;
20
21mod decimal;
22mod float;
23pub mod integer;
24mod patches;
25mod sample;
26mod string;
27mod temporal;
28
29pub struct GenerateStatsOptions {
30 pub count_distinct_values: bool,
31 }
34
35impl Default for GenerateStatsOptions {
36 fn default() -> Self {
37 Self {
38 count_distinct_values: true,
39 }
41 }
42}
43
44const SAMPLE_SIZE: u32 = 64;
45
46pub trait CompressorStats: Debug + Clone {
48 type ArrayVTable: VTable;
49
50 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
52 Self::generate_opts(input, GenerateStatsOptions::default())
53 }
54
55 fn generate_opts(
57 input: &<Self::ArrayVTable as VTable>::Array,
58 opts: GenerateStatsOptions,
59 ) -> Self;
60
61 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
62
63 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
64 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
65 }
66
67 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
68}
69
70pub trait Scheme: Debug {
74 type StatsType: CompressorStats;
75 type CodeType: Copy + Eq + Hash;
76
77 fn code(&self) -> Self::CodeType;
79
80 fn is_constant(&self) -> bool {
82 false
83 }
84
85 fn expected_compression_ratio(
93 &self,
94 stats: &Self::StatsType,
95 is_sample: bool,
96 allowed_cascading: usize,
97 excludes: &[Self::CodeType],
98 ) -> VortexResult<f64> {
99 estimate_compression_ratio_with_sampling(
100 self,
101 stats,
102 is_sample,
103 allowed_cascading,
104 excludes,
105 )
106 }
107
108 fn compress(
110 &self,
111 stats: &Self::StatsType,
112 is_sample: bool,
113 allowed_cascading: usize,
114 excludes: &[Self::CodeType],
115 ) -> VortexResult<ArrayRef>;
116}
117
118pub struct SchemeTree {
119 pub scheme: u8,
124 pub children: Vec<SchemeTree>,
126}
127
128pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
129 compressor: &T,
130 stats: &T::StatsType,
131 is_sample: bool,
132 allowed_cascading: usize,
133 excludes: &[T::CodeType],
134) -> VortexResult<f64> {
135 let sample = if is_sample {
136 stats.clone()
137 } else {
138 let source_len = stats.source().len();
140
141 let sample_count = usize::max(
143 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
144 10,
145 );
146
147 log::trace!(
148 "Sampling {} values out of {}",
149 SAMPLE_SIZE as usize * sample_count,
150 source_len
151 );
152
153 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
154 };
155
156 let after = compressor
157 .compress(&sample, true, allowed_cascading, excludes)?
158 .nbytes();
159 let before = sample.source().nbytes();
160
161 log::debug!(
162 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
163 before as f64 / after as f64
164 );
165
166 Ok(before as f64 / after as f64)
167}
168
169const MAX_CASCADE: usize = 3;
170
171pub trait Compressor {
177 type ArrayVTable: VTable;
178 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
179
180 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
182
183 fn schemes() -> &'static [&'static Self::SchemeType];
184 fn default_scheme() -> &'static Self::SchemeType;
185 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
186
187 fn compress(
188 array: &<Self::ArrayVTable as VTable>::Array,
189 is_sample: bool,
190 allowed_cascading: usize,
191 excludes: &[<Self::SchemeType as Scheme>::CodeType],
192 ) -> VortexResult<ArrayRef>
193 where
194 Self::SchemeType: 'static,
195 {
196 if array.is_empty() {
198 return Ok(array.to_array());
199 }
200
201 let stats = if excludes.contains(&Self::dict_scheme_code()) {
203 Self::StatsType::generate_opts(
204 array,
205 GenerateStatsOptions {
206 count_distinct_values: false,
207 },
208 )
209 } else {
210 Self::StatsType::generate(array)
211 };
212 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
213
214 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
215 if output.nbytes() < array.nbytes() {
216 Ok(output)
217 } else {
218 log::debug!("resulting tree too large: {}", output.display_tree());
219 Ok(array.to_array())
220 }
221 }
222
223 fn choose_scheme(
224 stats: &Self::StatsType,
225 is_sample: bool,
226 allowed_cascading: usize,
227 excludes: &[<Self::SchemeType as Scheme>::CodeType],
228 ) -> VortexResult<&'static Self::SchemeType> {
229 let mut best_ratio = 1.0;
230 let mut best_scheme: Option<&'static Self::SchemeType> = None;
231
232 let depth = MAX_CASCADE - allowed_cascading;
234
235 for scheme in Self::schemes().iter() {
236 if excludes.contains(&scheme.code()) {
237 continue;
238 }
239
240 if is_sample && scheme.is_constant() {
242 continue;
243 }
244
245 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
246
247 let ratio =
248 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
249 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
250
251 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
252 if ratio > best_ratio {
253 best_ratio = ratio;
254 best_scheme = Some(*scheme);
255 }
256 } else {
257 log::trace!(
258 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
259 );
260 }
261 }
262
263 log::debug!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
264
265 if let Some(best) = best_scheme {
266 Ok(best)
267 } else {
268 Ok(Self::default_scheme())
269 }
270 }
271}
272
273#[derive(Default, Debug, Clone)]
274pub struct BtrBlocksCompressor {
275 pub exclude_int_dict_encoding: bool,
276}
277
278impl BtrBlocksCompressor {
279 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
280 let canonical = array.to_canonical()?;
282
283 let compact = canonical.compact()?;
285
286 self.compress_canonical(compact)
287 }
288
289 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
290 match array {
291 Canonical::Null(null_array) => Ok(null_array.into_array()),
292 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
294 Canonical::Primitive(primitive) => {
295 if primitive.ptype().is_int() {
296 if self.exclude_int_dict_encoding {
297 IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
298 } else {
299 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
300 }
301 } else {
302 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
303 }
304 }
305 Canonical::Decimal(decimal) => compress_decimal(&decimal),
306 Canonical::Struct(struct_array) => {
307 let fields = struct_array
308 .fields()
309 .iter()
310 .map(|field| self.compress(field))
311 .try_collect()?;
312
313 Ok(StructArray::try_new(
314 struct_array.names().clone(),
315 fields,
316 struct_array.len(),
317 struct_array.validity().clone(),
318 )?
319 .into_array())
320 }
321 Canonical::List(list_array) => {
322 let compressed_elems = self.compress(list_array.elements())?;
324 let compressed_offsets = IntCompressor::compress_no_dict(
325 &list_array.offsets().to_primitive()?.downcast()?,
326 false,
327 MAX_CASCADE,
328 &[],
329 )?;
330
331 Ok(ListArray::try_new(
332 compressed_elems,
333 compressed_offsets,
334 list_array.validity().clone(),
335 )?
336 .into_array())
337 }
338 Canonical::VarBinView(strings) => {
339 if strings
340 .dtype()
341 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
342 {
343 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
344 } else {
345 Ok(strings.into_array())
347 }
348 }
349 Canonical::Extension(ext_array) => {
350 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
352 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
353 {
354 return compress_temporal(temporal_array);
355 }
356
357 let compressed_storage = self.compress(ext_array.storage())?;
359
360 Ok(
361 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
362 .into_array(),
363 )
364 }
365 }
366 }
367}