1use std::sync::Arc;
7
8use parking_lot::Mutex;
9use parking_lot::MutexGuard;
10use vortex_array::ArrayRef;
11use vortex_array::Canonical;
12use vortex_array::CanonicalValidity;
13use vortex_array::ExecutionCtx;
14use vortex_array::IntoArray;
15use vortex_array::LEGACY_SESSION;
16use vortex_array::ToCanonical;
17use vortex_array::VortexSessionExecute;
18use vortex_array::arrays::ConstantArray;
19use vortex_array::arrays::ExtensionArray;
20use vortex_array::arrays::FixedSizeListArray;
21use vortex_array::arrays::ListArray;
22use vortex_array::arrays::ListViewArray;
23use vortex_array::arrays::StructArray;
24use vortex_array::arrays::extension::ExtensionArrayExt;
25use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
26use vortex_array::arrays::list::ListArrayExt;
27use vortex_array::arrays::listview::ListViewArrayExt;
28use vortex_array::arrays::listview::list_from_list_view;
29use vortex_array::arrays::primitive::PrimitiveArrayExt;
30use vortex_array::arrays::struct_::StructArrayExt;
31use vortex_array::dtype::DType;
32use vortex_array::dtype::Nullability;
33use vortex_array::scalar::Scalar;
34use vortex_error::VortexResult;
35use vortex_error::vortex_bail;
36use vortex_error::vortex_panic;
37
38use crate::builtins::IntDictScheme;
39use crate::ctx::CompressorContext;
40use crate::estimate::CompressionEstimate;
41use crate::estimate::estimate_compression_ratio_with_sampling;
42use crate::estimate::is_better_ratio;
43use crate::scheme::ChildSelection;
44use crate::scheme::DescendantExclusion;
45use crate::scheme::Scheme;
46use crate::scheme::SchemeExt;
47use crate::scheme::SchemeId;
48use crate::stats::ArrayAndStats;
49use crate::stats::GenerateStatsOptions;
50
51const ROOT_SCHEME_ID: SchemeId = SchemeId {
55 name: "vortex.compressor.root",
56};
57
58mod root_list_children {
60 pub const OFFSETS: usize = 1;
62 pub const SIZES: usize = 2;
64}
65
66#[derive(Debug, Clone)]
81pub struct CascadingCompressor {
82 schemes: Vec<&'static dyn Scheme>,
84
85 root_exclusions: Vec<DescendantExclusion>,
88
89 ctx: Arc<Mutex<ExecutionCtx>>,
93}
94
95impl CascadingCompressor {
96 pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
101 let root_exclusions = vec![DescendantExclusion {
104 excluded: IntDictScheme.id(),
105 children: ChildSelection::One(root_list_children::OFFSETS),
106 }];
107 Self {
108 schemes,
109 root_exclusions,
110 ctx: Arc::new(Mutex::new(LEGACY_SESSION.create_execution_ctx())),
112 }
113 }
114
115 pub fn execution_ctx(&self) -> MutexGuard<'_, ExecutionCtx> {
117 self.ctx.lock()
118 }
119
120 pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
128 let canonical = array
129 .clone()
130 .execute::<CanonicalValidity>(&mut self.execution_ctx())?
131 .0;
132
133 let compact = canonical.compact()?;
135
136 self.compress_canonical(compact, CompressorContext::new())
137 }
138
139 pub fn compress_child(
149 &self,
150 child: &ArrayRef,
151 parent_ctx: &CompressorContext,
152 parent_id: SchemeId,
153 child_index: usize,
154 ) -> VortexResult<ArrayRef> {
155 if parent_ctx.finished_cascading() {
156 return Ok(child.clone());
157 }
158
159 let canonical = child
160 .clone()
161 .execute::<CanonicalValidity>(&mut self.execution_ctx())?
162 .0;
163 let compact = canonical.compact()?;
164
165 let child_ctx = parent_ctx
166 .clone()
167 .descend_with_scheme(parent_id, child_index);
168 self.compress_canonical(compact, child_ctx)
169 }
170
171 fn compress_canonical(
177 &self,
178 array: Canonical,
179 ctx: CompressorContext,
180 ) -> VortexResult<ArrayRef> {
181 match array {
182 Canonical::Null(null_array) => Ok(null_array.into_array()),
183 Canonical::Bool(bool_array) => {
184 self.choose_and_compress(Canonical::Bool(bool_array), ctx)
185 }
186 Canonical::Primitive(primitive) => {
187 self.choose_and_compress(Canonical::Primitive(primitive), ctx)
188 }
189 Canonical::Decimal(decimal) => {
190 self.choose_and_compress(Canonical::Decimal(decimal), ctx)
191 }
192 Canonical::Struct(struct_array) => {
193 let fields = struct_array
194 .iter_unmasked_fields()
195 .map(|field| self.compress(field))
196 .collect::<Result<Vec<_>, _>>()?;
197
198 Ok(StructArray::try_new(
199 struct_array.names().clone(),
200 fields,
201 struct_array.len(),
202 struct_array.validity()?,
203 )?
204 .into_array())
205 }
206 Canonical::List(list_view_array) => {
207 if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
208 let list_array = list_from_list_view(list_view_array)?;
209 self.compress_list_array(list_array, ctx)
210 } else {
211 self.compress_list_view_array(list_view_array, ctx)
212 }
213 }
214 Canonical::FixedSizeList(fsl_array) => {
215 let compressed_elems = self.compress(fsl_array.elements())?;
216
217 Ok(FixedSizeListArray::try_new(
218 compressed_elems,
219 fsl_array.list_size(),
220 fsl_array.validity()?,
221 fsl_array.len(),
222 )?
223 .into_array())
224 }
225 Canonical::VarBinView(strings) => {
226 if strings
227 .dtype()
228 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
229 {
230 self.choose_and_compress(Canonical::VarBinView(strings), ctx)
231 } else {
232 Ok(strings.into_array())
234 }
235 }
236 Canonical::Extension(ext_array) => {
237 let before_nbytes = ext_array.as_ref().nbytes();
238
239 let result =
241 self.choose_and_compress(Canonical::Extension(ext_array.clone()), ctx)?;
242 if result.nbytes() < before_nbytes {
243 return Ok(result);
244 }
245
246 let compressed_storage = self.compress(ext_array.storage_array())?;
248
249 Ok(
250 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
251 .into_array(),
252 )
253 }
254 Canonical::Variant(_) => {
255 vortex_bail!("Variant arrays can not be compressed")
256 }
257 }
258 }
259
260 fn choose_and_compress(
275 &self,
276 canonical: Canonical,
277 ctx: CompressorContext,
278 ) -> VortexResult<ArrayRef> {
279 let eligible_schemes: Vec<&'static dyn Scheme> = self
280 .schemes
281 .iter()
282 .copied()
283 .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &ctx))
284 .collect();
285
286 let array: ArrayRef = canonical.into();
287
288 if eligible_schemes.is_empty() {
290 return Ok(array);
291 }
292
293 if array.is_empty() {
295 return Ok(array);
296 }
297 if array.all_invalid()? {
298 return Ok(
299 ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
300 );
301 }
302
303 let before_nbytes = array.nbytes();
304
305 let merged_opts = eligible_schemes
306 .iter()
307 .fold(GenerateStatsOptions::default(), |acc, s| {
308 acc.merge(s.stats_options())
309 });
310 let ctx = ctx.with_merged_stats_options(merged_opts);
311
312 let mut data = ArrayAndStats::new(array, merged_opts);
313
314 if let Some(winner) = self.choose_best_scheme(&eligible_schemes, &mut data, ctx.clone())? {
315 let compressed = winner.compress(self, &mut data, ctx)?;
320
321 if compressed.nbytes() < before_nbytes {
323 return Ok(compressed);
325 }
326 }
327
328 Ok(data.into_array())
330 }
331
332 fn choose_best_scheme(
338 &self,
339 schemes: &[&'static dyn Scheme],
340 data: &mut ArrayAndStats,
341 ctx: CompressorContext,
342 ) -> VortexResult<Option<&'static dyn Scheme>> {
343 let mut best: Option<(&'static dyn Scheme, f64)> = None;
344
345 for &scheme in schemes {
348 let estimate = scheme.expected_compression_ratio(data, ctx.clone());
349
350 match estimate {
351 CompressionEstimate::Skip => {}
352 CompressionEstimate::AlwaysUse => return Ok(Some(scheme)),
353 CompressionEstimate::Ratio(ratio) => {
354 if is_better_ratio(ratio, &best) {
355 best = Some((scheme, ratio));
356 }
357 }
358 CompressionEstimate::Sample => {
359 let sample_ratio = estimate_compression_ratio_with_sampling(
360 scheme,
361 self,
362 data.array(),
363 ctx.clone(),
364 )?;
365
366 if is_better_ratio(sample_ratio, &best) {
367 best = Some((scheme, sample_ratio));
368 }
369 }
370 CompressionEstimate::Estimate(estimate_callback) => {
372 let estimate = estimate_callback(self, data, ctx.clone())?;
373
374 match estimate {
375 CompressionEstimate::Skip => {}
376 CompressionEstimate::AlwaysUse => return Ok(Some(scheme)),
377 CompressionEstimate::Ratio(ratio) => {
378 if is_better_ratio(ratio, &best) {
379 best = Some((scheme, ratio));
380 }
381 }
382 e @ (CompressionEstimate::Sample | CompressionEstimate::Estimate(_)) => {
383 vortex_panic!(
384 "an estimation function returned an invalid variant {e:?}"
385 )
386 }
387 }
388 }
389 }
390
391 }
393
394 Ok(best.map(|(s, _)| s))
395 }
396
397 fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
401 let id = candidate.id();
402 let history = ctx.cascade_history();
403
404 if history.iter().any(|&(sid, _)| sid == id) {
406 return true;
407 }
408
409 let mut iter = history.iter().copied().peekable();
410
411 if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
414 && self
415 .root_exclusions
416 .iter()
417 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
418 {
419 return true;
420 }
421
422 for (ancestor_id, child_idx) in iter {
424 if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
425 && ancestor
426 .descendant_exclusions()
427 .iter()
428 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
429 {
430 return true;
431 }
432 }
433
434 for rule in candidate.ancestor_exclusions() {
436 if history
437 .iter()
438 .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
439 {
440 return true;
441 }
442 }
443
444 false
445 }
446
447 fn compress_list_array(
449 &self,
450 list_array: ListArray,
451 ctx: CompressorContext,
452 ) -> VortexResult<ArrayRef> {
453 let list_array = list_array.reset_offsets(true)?;
454
455 let compressed_elems = self.compress(list_array.elements())?;
456
457 let offset_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
459 let compressed_offsets = self.compress_canonical(
460 Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
461 offset_ctx,
462 )?;
463
464 Ok(
465 ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
466 .into_array(),
467 )
468 }
469
470 fn compress_list_view_array(
473 &self,
474 list_view: ListViewArray,
475 ctx: CompressorContext,
476 ) -> VortexResult<ArrayRef> {
477 let compressed_elems = self.compress(list_view.elements())?;
478
479 let offset_ctx = ctx
480 .clone()
481 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
482 let compressed_offsets = self.compress_canonical(
483 Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
484 offset_ctx,
485 )?;
486
487 let sizes_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
488 let compressed_sizes = self.compress_canonical(
489 Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
490 sizes_ctx,
491 )?;
492
493 Ok(ListViewArray::try_new(
494 compressed_elems,
495 compressed_offsets,
496 compressed_sizes,
497 list_view.validity()?,
498 )?
499 .into_array())
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use vortex_array::arrays::BoolArray;
506 use vortex_array::arrays::Constant;
507 use vortex_array::arrays::PrimitiveArray;
508 use vortex_array::validity::Validity;
509 use vortex_buffer::buffer;
510
511 use super::*;
512 use crate::builtins::FloatDictScheme;
513 use crate::builtins::IntDictScheme;
514 use crate::builtins::StringDictScheme;
515 use crate::ctx::CompressorContext;
516 use crate::scheme::SchemeExt;
517
518 fn compressor() -> CascadingCompressor {
519 CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
520 }
521
522 #[test]
523 fn test_self_exclusion() {
524 let c = compressor();
525 let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
526
527 assert!(c.is_excluded(&IntDictScheme, &ctx));
529 }
530
531 #[test]
532 fn test_root_exclusion_list_offsets() {
533 let c = compressor();
534 let ctx = CompressorContext::default()
535 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
536
537 assert!(c.is_excluded(&IntDictScheme, &ctx));
539 }
540
541 #[test]
542 fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
543 let c = compressor();
544 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
546
547 assert!(c.is_excluded(&IntDictScheme, &ctx));
549 }
550
551 #[test]
552 fn test_push_rule_float_dict_excludes_int_dict_from_values() {
553 let c = compressor();
554 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
556
557 assert!(c.is_excluded(&IntDictScheme, &ctx));
560 }
561
562 #[test]
563 fn test_no_exclusion_without_history() {
564 let c = compressor();
565 let ctx = CompressorContext::default();
566
567 assert!(!c.is_excluded(&IntDictScheme, &ctx));
569 }
570
571 #[test]
572 fn all_null_array_compresses_to_constant() -> VortexResult<()> {
573 let array = PrimitiveArray::new(
574 buffer![0i32, 0, 0, 0, 0],
575 Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
576 )
577 .into_array();
578
579 let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
582 let compressed = compressor.compress(&array)?;
583 assert!(compressed.is::<Constant>());
584 Ok(())
585 }
586
587 #[test]
594 fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
595 let array = PrimitiveArray::new(
597 buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
598 Validity::NonNullable,
599 )
600 .into_array();
601
602 let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
603
604 let ctx = CompressorContext::new().with_sampling();
608
609 let ratio =
612 estimate_compression_ratio_with_sampling(&FloatDictScheme, &compressor, &array, ctx)?;
613 assert!(ratio.is_finite());
614 Ok(())
615 }
616}