1#![feature(array_chunks)]
2
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
7use vortex_array::nbytes::NBytes;
8use vortex_array::variants::{ExtensionArrayTrait, PrimitiveArrayTrait, StructArrayTrait};
9use vortex_array::{Array, ArrayRef, Canonical};
10use vortex_dtype::datetime::TemporalMetadata;
11use vortex_dtype::{DType, Nullability};
12use vortex_error::{VortexExpect, VortexResult, VortexUnwrap};
13
14pub use crate::float::FloatCompressor;
15pub use crate::integer::IntCompressor;
16pub use crate::string::StringCompressor;
17pub use crate::temporal::compress_temporal;
18
19mod float;
20pub mod integer;
21mod patches;
22mod sample;
23mod string;
24mod temporal;
25
26pub struct GenerateStatsOptions {
27 pub count_distinct_values: bool,
28 }
31
32impl Default for GenerateStatsOptions {
33 fn default() -> Self {
34 Self {
35 count_distinct_values: true,
36 }
38 }
39}
40
41const SAMPLE_SIZE: u32 = 64;
42
43pub trait CompressorStats: Debug + Clone {
45 type ArrayType: Array;
46
47 fn generate(input: &Self::ArrayType) -> Self {
49 Self::generate_opts(input, GenerateStatsOptions::default())
50 }
51
52 fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self;
53
54 fn source(&self) -> &Self::ArrayType;
55
56 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
57 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
58 }
59
60 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
61}
62
63pub trait Scheme: Debug {
67 type StatsType: CompressorStats;
68 type CodeType: Copy + Eq + Hash;
69
70 fn code(&self) -> Self::CodeType;
72
73 fn is_constant(&self) -> bool {
75 false
76 }
77
78 fn expected_compression_ratio(
86 &self,
87 stats: &Self::StatsType,
88 is_sample: bool,
89 allowed_cascading: usize,
90 excludes: &[Self::CodeType],
91 ) -> VortexResult<f64> {
92 estimate_compression_ratio_with_sampling(
93 self,
94 stats,
95 is_sample,
96 allowed_cascading,
97 excludes,
98 )
99 }
100
101 fn compress(
103 &self,
104 stats: &Self::StatsType,
105 is_sample: bool,
106 allowed_cascading: usize,
107 excludes: &[Self::CodeType],
108 ) -> VortexResult<ArrayRef>;
109}
110
111pub struct SchemeTree {
112 pub scheme: u8,
117 pub children: Vec<SchemeTree>,
119}
120
121pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
122 compressor: &T,
123 stats: &T::StatsType,
124 is_sample: bool,
125 allowed_cascading: usize,
126 excludes: &[T::CodeType],
127) -> VortexResult<f64> {
128 let sample = if is_sample {
129 stats.clone()
130 } else {
131 let source_len = stats.source().len();
133
134 let sample_count = usize::max(
136 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
137 10,
138 );
139
140 log::trace!(
141 "Sampling {} values out of {}",
142 SAMPLE_SIZE as usize * sample_count,
143 source_len
144 );
145
146 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
147 };
148
149 let after = compressor
150 .compress(&sample, true, allowed_cascading, excludes)?
151 .nbytes();
152 let before = sample.source().nbytes();
153
154 log::debug!(
155 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
156 before as f64 / after as f64
157 );
158
159 Ok(before as f64 / after as f64)
160}
161
162const MAX_CASCADE: usize = 3;
163
164pub trait Compressor {
170 type ArrayType: Array;
171 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
172
173 type StatsType: CompressorStats<ArrayType = Self::ArrayType>;
175
176 fn schemes() -> &'static [&'static Self::SchemeType];
177 fn default_scheme() -> &'static Self::SchemeType;
178 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
179
180 fn compress(
181 array: &Self::ArrayType,
182 is_sample: bool,
183 allowed_cascading: usize,
184 excludes: &[<Self::SchemeType as Scheme>::CodeType],
185 ) -> VortexResult<ArrayRef>
186 where
187 Self::SchemeType: 'static,
188 {
189 if array.is_empty() {
191 return Ok(array.to_array());
192 }
193
194 let stats = if excludes.contains(&Self::dict_scheme_code()) {
196 Self::StatsType::generate_opts(
197 array,
198 GenerateStatsOptions {
199 count_distinct_values: false,
200 },
201 )
202 } else {
203 Self::StatsType::generate(array)
204 };
205 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
206
207 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
208 if output.nbytes() < array.nbytes() {
209 Ok(output)
210 } else {
211 log::debug!("resulting tree too large: {}", output.tree_display());
212 Ok(array.to_array())
213 }
214 }
215
216 fn choose_scheme(
217 stats: &Self::StatsType,
218 is_sample: bool,
219 allowed_cascading: usize,
220 excludes: &[<Self::SchemeType as Scheme>::CodeType],
221 ) -> VortexResult<&'static Self::SchemeType> {
222 let mut best_ratio = 1.0;
223 let mut best_scheme: Option<&'static Self::SchemeType> = None;
224
225 let depth = MAX_CASCADE - allowed_cascading;
227
228 for scheme in Self::schemes().iter() {
229 if excludes.contains(&scheme.code()) {
230 continue;
231 }
232
233 if is_sample && scheme.is_constant() {
235 continue;
236 }
237
238 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
239
240 let ratio =
241 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
242 log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
243
244 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
245 if ratio > best_ratio {
246 best_ratio = ratio;
247 best_scheme = Some(*scheme);
248 }
249 } else {
250 log::trace!(
251 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
252 );
253 }
254 }
255
256 log::debug!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
257
258 if let Some(best) = best_scheme {
259 Ok(best)
260 } else {
261 Ok(Self::default_scheme())
262 }
263 }
264}
265
266#[derive(Debug)]
267pub struct BtrBlocksCompressor;
268
269impl BtrBlocksCompressor {
270 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
271 self.compress_canonical(array.to_canonical()?)
272 }
273
274 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
275 match array {
276 Canonical::Null(null_array) => Ok(null_array.into_array()),
277 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
279 Canonical::Primitive(primitive) => {
280 if primitive.ptype().is_int() {
281 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
282 } else {
283 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
284 }
285 }
286 Canonical::Struct(struct_array) => {
287 let mut fields = Vec::new();
288 for idx in 0..struct_array.nfields() {
289 let field = struct_array
290 .maybe_null_field_by_idx(idx)
291 .vortex_expect("field access");
292 let compressed = self.compress(&field)?;
293 fields.push(compressed);
294 }
295
296 Ok(StructArray::try_new(
297 struct_array.names().clone(),
298 fields,
299 struct_array.len(),
300 struct_array.validity().clone(),
301 )?
302 .into_array())
303 }
304 Canonical::List(list_array) => {
305 let compressed_elems = self.compress(list_array.elements())?;
307 let compressed_offsets = self.compress(list_array.offsets())?;
308
309 Ok(ListArray::try_new(
310 compressed_elems,
311 compressed_offsets,
312 list_array.validity().clone(),
313 )?
314 .into_array())
315 }
316 Canonical::VarBinView(strings) => {
317 if strings
318 .dtype()
319 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
320 {
321 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
322 } else {
323 Ok(strings.into_array())
325 }
326 }
327 Canonical::Extension(ext_array) => {
328 if let Ok(temporal_array) =
330 TemporalArray::try_from(ext_array.to_array().into_array())
331 {
332 if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
333 return compress_temporal(temporal_array);
334 }
335 }
336
337 let compressed_storage = self.compress(ext_array.storage())?;
339
340 Ok(
341 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
342 .into_array(),
343 )
344 }
345 }
346 }
347}