1#![feature(array_chunks)]
2
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
8use vortex_array::compress::downscale_integer_array;
9use vortex_array::vtable::{VTable, ValidityHelper};
10use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
11use vortex_dtype::datetime::TemporalMetadata;
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexResult, VortexUnwrap};
14
15use crate::decimal::compress_decimal;
16pub use crate::float::FloatCompressor;
17pub use crate::integer::IntCompressor;
18pub use crate::string::StringCompressor;
19pub use crate::temporal::compress_temporal;
20
21mod decimal;
22mod float;
23pub mod integer;
24mod patches;
25mod sample;
26mod string;
27mod temporal;
28
29pub struct GenerateStatsOptions {
30 pub count_distinct_values: bool,
31 }
34
35impl Default for GenerateStatsOptions {
36 fn default() -> Self {
37 Self {
38 count_distinct_values: true,
39 }
41 }
42}
43
44const SAMPLE_SIZE: u32 = 64;
45
46pub trait CompressorStats: Debug + Clone {
48 type ArrayVTable: VTable;
49
50 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
52 Self::generate_opts(input, GenerateStatsOptions::default())
53 }
54
55 fn generate_opts(
56 input: &<Self::ArrayVTable as VTable>::Array,
57 opts: GenerateStatsOptions,
58 ) -> Self;
59
60 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
61
62 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
63 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
64 }
65
66 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
67}
68
69pub trait Scheme: Debug {
73 type StatsType: CompressorStats;
74 type CodeType: Copy + Eq + Hash;
75
76 fn code(&self) -> Self::CodeType;
78
79 fn is_constant(&self) -> bool {
81 false
82 }
83
84 fn expected_compression_ratio(
92 &self,
93 stats: &Self::StatsType,
94 is_sample: bool,
95 allowed_cascading: usize,
96 excludes: &[Self::CodeType],
97 ) -> VortexResult<f64> {
98 estimate_compression_ratio_with_sampling(
99 self,
100 stats,
101 is_sample,
102 allowed_cascading,
103 excludes,
104 )
105 }
106
107 fn compress(
109 &self,
110 stats: &Self::StatsType,
111 is_sample: bool,
112 allowed_cascading: usize,
113 excludes: &[Self::CodeType],
114 ) -> VortexResult<ArrayRef>;
115}
116
117pub struct SchemeTree {
118 pub scheme: u8,
123 pub children: Vec<SchemeTree>,
125}
126
127pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
128 compressor: &T,
129 stats: &T::StatsType,
130 is_sample: bool,
131 allowed_cascading: usize,
132 excludes: &[T::CodeType],
133) -> VortexResult<f64> {
134 let sample = if is_sample {
135 stats.clone()
136 } else {
137 let source_len = stats.source().len();
139
140 let sample_count = usize::max(
142 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
143 10,
144 );
145
146 log::trace!(
147 "Sampling {} values out of {}",
148 SAMPLE_SIZE as usize * sample_count,
149 source_len
150 );
151
152 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
153 };
154
155 let after = compressor
156 .compress(&sample, true, allowed_cascading, excludes)?
157 .nbytes();
158 let before = sample.source().nbytes();
159
160 log::debug!(
161 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
162 before as f64 / after as f64
163 );
164
165 Ok(before as f64 / after as f64)
166}
167
168const MAX_CASCADE: usize = 3;
169
170pub trait Compressor {
176 type ArrayVTable: VTable;
177 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
178
179 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
181
182 fn schemes() -> &'static [&'static Self::SchemeType];
183 fn default_scheme() -> &'static Self::SchemeType;
184 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
185
186 fn compress(
187 array: &<Self::ArrayVTable as VTable>::Array,
188 is_sample: bool,
189 allowed_cascading: usize,
190 excludes: &[<Self::SchemeType as Scheme>::CodeType],
191 ) -> VortexResult<ArrayRef>
192 where
193 Self::SchemeType: 'static,
194 {
195 if array.is_empty() {
197 return Ok(array.to_array());
198 }
199
200 let stats = if excludes.contains(&Self::dict_scheme_code()) {
202 Self::StatsType::generate_opts(
203 array,
204 GenerateStatsOptions {
205 count_distinct_values: false,
206 },
207 )
208 } else {
209 Self::StatsType::generate(array)
210 };
211 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
212
213 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
214 if output.nbytes() < array.nbytes() {
215 Ok(output)
216 } else {
217 log::debug!("resulting tree too large: {}", output.tree_display());
218 Ok(array.to_array())
219 }
220 }
221
222 fn choose_scheme(
223 stats: &Self::StatsType,
224 is_sample: bool,
225 allowed_cascading: usize,
226 excludes: &[<Self::SchemeType as Scheme>::CodeType],
227 ) -> VortexResult<&'static Self::SchemeType> {
228 let mut best_ratio = 1.0;
229 let mut best_scheme: Option<&'static Self::SchemeType> = None;
230
231 let depth = MAX_CASCADE - allowed_cascading;
233
234 for scheme in Self::schemes().iter() {
235 if excludes.contains(&scheme.code()) {
236 continue;
237 }
238
239 if is_sample && scheme.is_constant() {
241 continue;
242 }
243
244 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
245
246 let ratio =
247 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
248 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
249
250 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
251 if ratio > best_ratio {
252 best_ratio = ratio;
253 best_scheme = Some(*scheme);
254 }
255 } else {
256 log::trace!(
257 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
258 );
259 }
260 }
261
262 log::debug!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
263
264 if let Some(best) = best_scheme {
265 Ok(best)
266 } else {
267 Ok(Self::default_scheme())
268 }
269 }
270}
271
272#[derive(Debug)]
273pub struct BtrBlocksCompressor;
274
275impl BtrBlocksCompressor {
276 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
277 self.compress_canonical(array.to_canonical()?)
278 }
279
280 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
281 match array {
282 Canonical::Null(null_array) => Ok(null_array.into_array()),
283 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
285 Canonical::Primitive(primitive) => {
286 if primitive.ptype().is_int() {
287 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
288 } else {
289 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
290 }
291 }
292 Canonical::Decimal(decimal) => compress_decimal(&decimal),
293 Canonical::Struct(struct_array) => {
294 let fields = struct_array
295 .fields()
296 .iter()
297 .map(|field| self.compress(field))
298 .try_collect()?;
299
300 Ok(StructArray::try_new(
301 struct_array.names().clone(),
302 fields,
303 struct_array.len(),
304 struct_array.validity().clone(),
305 )?
306 .into_array())
307 }
308 Canonical::List(list_array) => {
309 let compressed_elems = self.compress(list_array.elements())?;
311 let compressed_offsets = IntCompressor::compress_no_dict(
312 &downscale_integer_array(list_array.offsets().clone())?.to_primitive()?,
313 false,
314 MAX_CASCADE,
315 &[],
316 )?;
317
318 Ok(ListArray::try_new(
319 compressed_elems,
320 compressed_offsets,
321 list_array.validity().clone(),
322 )?
323 .into_array())
324 }
325 Canonical::VarBinView(strings) => {
326 if strings
327 .dtype()
328 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
329 {
330 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
331 } else {
332 Ok(strings.into_array())
334 }
335 }
336 Canonical::Extension(ext_array) => {
337 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array()) {
339 if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
340 return compress_temporal(temporal_array);
341 }
342 }
343
344 let compressed_storage = self.compress(ext_array.storage())?;
346
347 Ok(
348 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
349 .into_array(),
350 )
351 }
352 }
353 }
354}