1#![feature(array_chunks)]
2
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
8use vortex_array::vtable::{VTable, ValidityHelper};
9use vortex_array::{Array, ArrayRef, Canonical, IntoArray};
10use vortex_dtype::datetime::TemporalMetadata;
11use vortex_dtype::{DType, Nullability};
12use vortex_error::{VortexResult, VortexUnwrap};
13
14use crate::decimal::compress_decimal;
15pub use crate::float::FloatCompressor;
16pub use crate::integer::IntCompressor;
17pub use crate::string::StringCompressor;
18pub use crate::temporal::compress_temporal;
19
20mod decimal;
21mod float;
22pub mod integer;
23mod patches;
24mod sample;
25mod string;
26mod temporal;
27
28pub struct GenerateStatsOptions {
29 pub count_distinct_values: bool,
30 }
33
34impl Default for GenerateStatsOptions {
35 fn default() -> Self {
36 Self {
37 count_distinct_values: true,
38 }
40 }
41}
42
43const SAMPLE_SIZE: u32 = 64;
44
45pub trait CompressorStats: Debug + Clone {
47 type ArrayVTable: VTable;
48
49 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
51 Self::generate_opts(input, GenerateStatsOptions::default())
52 }
53
54 fn generate_opts(
55 input: &<Self::ArrayVTable as VTable>::Array,
56 opts: GenerateStatsOptions,
57 ) -> Self;
58
59 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
60
61 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
62 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
63 }
64
65 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
66}
67
68pub trait Scheme: Debug {
72 type StatsType: CompressorStats;
73 type CodeType: Copy + Eq + Hash;
74
75 fn code(&self) -> Self::CodeType;
77
78 fn is_constant(&self) -> bool {
80 false
81 }
82
83 fn expected_compression_ratio(
91 &self,
92 stats: &Self::StatsType,
93 is_sample: bool,
94 allowed_cascading: usize,
95 excludes: &[Self::CodeType],
96 ) -> VortexResult<f64> {
97 estimate_compression_ratio_with_sampling(
98 self,
99 stats,
100 is_sample,
101 allowed_cascading,
102 excludes,
103 )
104 }
105
106 fn compress(
108 &self,
109 stats: &Self::StatsType,
110 is_sample: bool,
111 allowed_cascading: usize,
112 excludes: &[Self::CodeType],
113 ) -> VortexResult<ArrayRef>;
114}
115
116pub struct SchemeTree {
117 pub scheme: u8,
122 pub children: Vec<SchemeTree>,
124}
125
126pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
127 compressor: &T,
128 stats: &T::StatsType,
129 is_sample: bool,
130 allowed_cascading: usize,
131 excludes: &[T::CodeType],
132) -> VortexResult<f64> {
133 let sample = if is_sample {
134 stats.clone()
135 } else {
136 let source_len = stats.source().len();
138
139 let sample_count = usize::max(
141 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
142 10,
143 );
144
145 log::trace!(
146 "Sampling {} values out of {}",
147 SAMPLE_SIZE as usize * sample_count,
148 source_len
149 );
150
151 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
152 };
153
154 let after = compressor
155 .compress(&sample, true, allowed_cascading, excludes)?
156 .nbytes();
157 let before = sample.source().nbytes();
158
159 log::debug!(
160 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
161 before as f64 / after as f64
162 );
163
164 Ok(before as f64 / after as f64)
165}
166
167const MAX_CASCADE: usize = 3;
168
169pub trait Compressor {
175 type ArrayVTable: VTable;
176 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
177
178 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
180
181 fn schemes() -> &'static [&'static Self::SchemeType];
182 fn default_scheme() -> &'static Self::SchemeType;
183 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
184
185 fn compress(
186 array: &<Self::ArrayVTable as VTable>::Array,
187 is_sample: bool,
188 allowed_cascading: usize,
189 excludes: &[<Self::SchemeType as Scheme>::CodeType],
190 ) -> VortexResult<ArrayRef>
191 where
192 Self::SchemeType: 'static,
193 {
194 if array.is_empty() {
196 return Ok(array.to_array());
197 }
198
199 let stats = if excludes.contains(&Self::dict_scheme_code()) {
201 Self::StatsType::generate_opts(
202 array,
203 GenerateStatsOptions {
204 count_distinct_values: false,
205 },
206 )
207 } else {
208 Self::StatsType::generate(array)
209 };
210 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
211
212 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
213 if output.nbytes() < array.nbytes() {
214 Ok(output)
215 } else {
216 log::debug!("resulting tree too large: {}", output.tree_display());
217 Ok(array.to_array())
218 }
219 }
220
221 fn choose_scheme(
222 stats: &Self::StatsType,
223 is_sample: bool,
224 allowed_cascading: usize,
225 excludes: &[<Self::SchemeType as Scheme>::CodeType],
226 ) -> VortexResult<&'static Self::SchemeType> {
227 let mut best_ratio = 1.0;
228 let mut best_scheme: Option<&'static Self::SchemeType> = None;
229
230 let depth = MAX_CASCADE - allowed_cascading;
232
233 for scheme in Self::schemes().iter() {
234 if excludes.contains(&scheme.code()) {
235 continue;
236 }
237
238 if is_sample && scheme.is_constant() {
240 continue;
241 }
242
243 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
244
245 let ratio =
246 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
247 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
248
249 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
250 if ratio > best_ratio {
251 best_ratio = ratio;
252 best_scheme = Some(*scheme);
253 }
254 } else {
255 log::trace!(
256 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
257 );
258 }
259 }
260
261 log::debug!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
262
263 if let Some(best) = best_scheme {
264 Ok(best)
265 } else {
266 Ok(Self::default_scheme())
267 }
268 }
269}
270
271#[derive(Debug)]
272pub struct BtrBlocksCompressor;
273
274impl BtrBlocksCompressor {
275 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
276 self.compress_canonical(array.to_canonical()?)
277 }
278
279 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
280 match array {
281 Canonical::Null(null_array) => Ok(null_array.into_array()),
282 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
284 Canonical::Primitive(primitive) => {
285 if primitive.ptype().is_int() {
286 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
287 } else {
288 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
289 }
290 }
291 Canonical::Decimal(decimal) => compress_decimal(&decimal),
292 Canonical::Struct(struct_array) => {
293 let fields = struct_array
294 .fields()
295 .iter()
296 .map(|field| self.compress(field))
297 .try_collect()?;
298
299 Ok(StructArray::try_new(
300 struct_array.names().clone(),
301 fields,
302 struct_array.len(),
303 struct_array.validity().clone(),
304 )?
305 .into_array())
306 }
307 Canonical::List(list_array) => {
308 let compressed_elems = self.compress(list_array.elements())?;
310 let compressed_offsets = self.compress(list_array.offsets())?;
311
312 Ok(ListArray::try_new(
313 compressed_elems,
314 compressed_offsets,
315 list_array.validity().clone(),
316 )?
317 .into_array())
318 }
319 Canonical::VarBinView(strings) => {
320 if strings
321 .dtype()
322 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
323 {
324 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
325 } else {
326 Ok(strings.into_array())
328 }
329 }
330 Canonical::Extension(ext_array) => {
331 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array()) {
333 if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
334 return compress_temporal(temporal_array);
335 }
336 }
337
338 let compressed_storage = self.compress(ext_array.storage())?;
340
341 Ok(
342 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
343 .into_array(),
344 )
345 }
346 }
347 }
348}