1use std::fmt::Debug;
5use std::hash::Hash;
6
7use itertools::Itertools;
8use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
9use vortex_array::compress::downscale_integer_array;
10use vortex_array::vtable::{VTable, ValidityHelper};
11use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
12use vortex_dtype::datetime::TemporalMetadata;
13use vortex_dtype::{DType, Nullability};
14use vortex_error::{VortexResult, VortexUnwrap};
15
16use crate::decimal::compress_decimal;
17pub use crate::float::FloatCompressor;
18pub use crate::integer::IntCompressor;
19pub use crate::string::StringCompressor;
20pub use crate::temporal::compress_temporal;
21
22mod decimal;
23mod float;
24pub mod integer;
25mod patches;
26mod sample;
27mod string;
28mod temporal;
29
30pub struct GenerateStatsOptions {
31 pub count_distinct_values: bool,
32 }
35
36impl Default for GenerateStatsOptions {
37 fn default() -> Self {
38 Self {
39 count_distinct_values: true,
40 }
42 }
43}
44
45const SAMPLE_SIZE: u32 = 64;
46
47pub trait CompressorStats: Debug + Clone {
49 type ArrayVTable: VTable;
50
51 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
53 Self::generate_opts(input, GenerateStatsOptions::default())
54 }
55
56 fn generate_opts(
58 input: &<Self::ArrayVTable as VTable>::Array,
59 opts: GenerateStatsOptions,
60 ) -> Self;
61
62 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
63
64 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
65 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
66 }
67
68 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
69}
70
71pub trait Scheme: Debug {
75 type StatsType: CompressorStats;
76 type CodeType: Copy + Eq + Hash;
77
78 fn code(&self) -> Self::CodeType;
80
81 fn is_constant(&self) -> bool {
83 false
84 }
85
86 fn expected_compression_ratio(
94 &self,
95 stats: &Self::StatsType,
96 is_sample: bool,
97 allowed_cascading: usize,
98 excludes: &[Self::CodeType],
99 ) -> VortexResult<f64> {
100 estimate_compression_ratio_with_sampling(
101 self,
102 stats,
103 is_sample,
104 allowed_cascading,
105 excludes,
106 )
107 }
108
109 fn compress(
111 &self,
112 stats: &Self::StatsType,
113 is_sample: bool,
114 allowed_cascading: usize,
115 excludes: &[Self::CodeType],
116 ) -> VortexResult<ArrayRef>;
117}
118
119pub struct SchemeTree {
120 pub scheme: u8,
125 pub children: Vec<SchemeTree>,
127}
128
129pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
130 compressor: &T,
131 stats: &T::StatsType,
132 is_sample: bool,
133 allowed_cascading: usize,
134 excludes: &[T::CodeType],
135) -> VortexResult<f64> {
136 let sample = if is_sample {
137 stats.clone()
138 } else {
139 let source_len = stats.source().len();
141
142 let sample_count = usize::max(
144 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
145 10,
146 );
147
148 log::trace!(
149 "Sampling {} values out of {}",
150 SAMPLE_SIZE as usize * sample_count,
151 source_len
152 );
153
154 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
155 };
156
157 let after = compressor
158 .compress(&sample, true, allowed_cascading, excludes)?
159 .nbytes();
160 let before = sample.source().nbytes();
161
162 log::debug!(
163 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
164 before as f64 / after as f64
165 );
166
167 Ok(before as f64 / after as f64)
168}
169
170const MAX_CASCADE: usize = 3;
171
172pub trait Compressor {
178 type ArrayVTable: VTable;
179 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
180
181 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
183
184 fn schemes() -> &'static [&'static Self::SchemeType];
185 fn default_scheme() -> &'static Self::SchemeType;
186 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
187
188 fn compress(
189 array: &<Self::ArrayVTable as VTable>::Array,
190 is_sample: bool,
191 allowed_cascading: usize,
192 excludes: &[<Self::SchemeType as Scheme>::CodeType],
193 ) -> VortexResult<ArrayRef>
194 where
195 Self::SchemeType: 'static,
196 {
197 if array.is_empty() {
199 return Ok(array.to_array());
200 }
201
202 let stats = if excludes.contains(&Self::dict_scheme_code()) {
204 Self::StatsType::generate_opts(
205 array,
206 GenerateStatsOptions {
207 count_distinct_values: false,
208 },
209 )
210 } else {
211 Self::StatsType::generate(array)
212 };
213 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
214
215 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
216 if output.nbytes() < array.nbytes() {
217 Ok(output)
218 } else {
219 log::debug!("resulting tree too large: {}", output.display_tree());
220 Ok(array.to_array())
221 }
222 }
223
224 fn choose_scheme(
225 stats: &Self::StatsType,
226 is_sample: bool,
227 allowed_cascading: usize,
228 excludes: &[<Self::SchemeType as Scheme>::CodeType],
229 ) -> VortexResult<&'static Self::SchemeType> {
230 let mut best_ratio = 1.0;
231 let mut best_scheme: Option<&'static Self::SchemeType> = None;
232
233 let depth = MAX_CASCADE - allowed_cascading;
235
236 for scheme in Self::schemes().iter() {
237 if excludes.contains(&scheme.code()) {
238 continue;
239 }
240
241 if is_sample && scheme.is_constant() {
243 continue;
244 }
245
246 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
247
248 let ratio =
249 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
250 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
251
252 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
253 if ratio > best_ratio {
254 best_ratio = ratio;
255 best_scheme = Some(*scheme);
256 }
257 } else {
258 log::trace!(
259 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
260 );
261 }
262 }
263
264 log::debug!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
265
266 if let Some(best) = best_scheme {
267 Ok(best)
268 } else {
269 Ok(Self::default_scheme())
270 }
271 }
272}
273
274#[derive(Debug)]
275pub struct BtrBlocksCompressor;
276
277impl BtrBlocksCompressor {
278 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
279 let canonical = array.to_canonical()?;
281
282 let compact = canonical.compact()?;
284
285 self.compress_canonical(compact)
286 }
287
288 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
289 match array {
290 Canonical::Null(null_array) => Ok(null_array.into_array()),
291 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
293 Canonical::Primitive(primitive) => {
294 if primitive.ptype().is_int() {
295 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
296 } else {
297 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
298 }
299 }
300 Canonical::Decimal(decimal) => compress_decimal(&decimal),
301 Canonical::Struct(struct_array) => {
302 let fields = struct_array
303 .fields()
304 .iter()
305 .map(|field| self.compress(field))
306 .try_collect()?;
307
308 Ok(StructArray::try_new(
309 struct_array.names().clone(),
310 fields,
311 struct_array.len(),
312 struct_array.validity().clone(),
313 )?
314 .into_array())
315 }
316 Canonical::List(list_array) => {
317 let compressed_elems = self.compress(list_array.elements())?;
319 let compressed_offsets = IntCompressor::compress_no_dict(
320 &downscale_integer_array(list_array.offsets().clone())?.to_primitive()?,
321 false,
322 MAX_CASCADE,
323 &[],
324 )?;
325
326 Ok(ListArray::try_new(
327 compressed_elems,
328 compressed_offsets,
329 list_array.validity().clone(),
330 )?
331 .into_array())
332 }
333 Canonical::VarBinView(strings) => {
334 if strings
335 .dtype()
336 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
337 {
338 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
339 } else {
340 Ok(strings.into_array())
342 }
343 }
344 Canonical::Extension(ext_array) => {
345 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array()) {
347 if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
348 return compress_temporal(temporal_array);
349 }
350 }
351
352 let compressed_storage = self.compress(ext_array.storage())?;
354
355 Ok(
356 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
357 .into_array(),
358 )
359 }
360 }
361 }
362}