use std::sync::Arc;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::CanonicalValidity;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ExtensionArray;
use vortex_array::arrays::FixedSizeListArray;
use vortex_array::arrays::ListArray;
use vortex_array::arrays::ListViewArray;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::extension::ExtensionArrayExt;
use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
use vortex_array::arrays::list::ListArrayExt;
use vortex_array::arrays::listview::ListViewArrayExt;
use vortex_array::arrays::listview::list_from_list_view;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::arrays::struct_::StructArrayExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::scalar::Scalar;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use crate::builtins::IntDictScheme;
use crate::ctx::CompressorContext;
use crate::estimate::CompressionEstimate;
use crate::estimate::DeferredEstimate;
use crate::estimate::EstimateVerdict;
use crate::estimate::estimate_compression_ratio_with_sampling;
use crate::estimate::is_better_ratio;
use crate::scheme::ChildSelection;
use crate::scheme::DescendantExclusion;
use crate::scheme::Scheme;
use crate::scheme::SchemeExt;
use crate::scheme::SchemeId;
use crate::stats::ArrayAndStats;
use crate::stats::GenerateStatsOptions;
const ROOT_SCHEME_ID: SchemeId = SchemeId {
name: "vortex.compressor.root",
};
mod root_list_children {
pub const OFFSETS: usize = 1;
pub const SIZES: usize = 2;
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum WinnerEstimate {
AlwaysUse,
Ratio(f64),
}
#[derive(Debug, Clone)]
pub struct CascadingCompressor {
schemes: Vec<&'static dyn Scheme>,
root_exclusions: Vec<DescendantExclusion>,
ctx: Arc<Mutex<ExecutionCtx>>,
}
impl CascadingCompressor {
pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
let root_exclusions = vec![DescendantExclusion {
excluded: IntDictScheme.id(),
children: ChildSelection::One(root_list_children::OFFSETS),
}];
Self {
schemes,
root_exclusions,
ctx: Arc::new(Mutex::new(LEGACY_SESSION.create_execution_ctx())),
}
}
pub fn execution_ctx(&self) -> MutexGuard<'_, ExecutionCtx> {
self.ctx.lock()
}
pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
let canonical = array
.clone()
.execute::<CanonicalValidity>(&mut self.execution_ctx())?
.0;
let compact = canonical.compact()?;
self.compress_canonical(compact, CompressorContext::new())
}
pub fn compress_child(
&self,
child: &ArrayRef,
parent_ctx: &CompressorContext,
parent_id: SchemeId,
child_index: usize,
) -> VortexResult<ArrayRef> {
if parent_ctx.finished_cascading() {
return Ok(child.clone());
}
let canonical = child
.clone()
.execute::<CanonicalValidity>(&mut self.execution_ctx())?
.0;
let compact = canonical.compact()?;
let child_ctx = parent_ctx
.clone()
.descend_with_scheme(parent_id, child_index);
self.compress_canonical(compact, child_ctx)
}
fn compress_canonical(
&self,
array: Canonical,
ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
match array {
Canonical::Null(null_array) => Ok(null_array.into_array()),
Canonical::Bool(bool_array) => {
self.choose_and_compress(Canonical::Bool(bool_array), ctx)
}
Canonical::Primitive(primitive) => {
self.choose_and_compress(Canonical::Primitive(primitive), ctx)
}
Canonical::Decimal(decimal) => {
self.choose_and_compress(Canonical::Decimal(decimal), ctx)
}
Canonical::Struct(struct_array) => {
let fields = struct_array
.iter_unmasked_fields()
.map(|field| self.compress(field))
.collect::<Result<Vec<_>, _>>()?;
Ok(StructArray::try_new(
struct_array.names().clone(),
fields,
struct_array.len(),
struct_array.validity()?,
)?
.into_array())
}
Canonical::List(list_view_array) => {
if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
let list_array = list_from_list_view(list_view_array)?;
self.compress_list_array(list_array, ctx)
} else {
self.compress_list_view_array(list_view_array, ctx)
}
}
Canonical::FixedSizeList(fsl_array) => {
let compressed_elems = self.compress(fsl_array.elements())?;
Ok(FixedSizeListArray::try_new(
compressed_elems,
fsl_array.list_size(),
fsl_array.validity()?,
fsl_array.len(),
)?
.into_array())
}
Canonical::VarBinView(strings) => {
if strings
.dtype()
.eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
{
self.choose_and_compress(Canonical::VarBinView(strings), ctx)
} else {
Ok(strings.into_array())
}
}
Canonical::Extension(ext_array) => {
let before_nbytes = ext_array.as_ref().nbytes();
let result =
self.choose_and_compress(Canonical::Extension(ext_array.clone()), ctx)?;
if result.nbytes() < before_nbytes {
return Ok(result);
}
let compressed_storage = self.compress(ext_array.storage_array())?;
Ok(
ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
.into_array(),
)
}
Canonical::Variant(_) => {
vortex_bail!("Variant arrays can not be compressed")
}
}
}
fn choose_and_compress(
&self,
canonical: Canonical,
ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
let eligible_schemes: Vec<&'static dyn Scheme> = self
.schemes
.iter()
.copied()
.filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &ctx))
.collect();
let array: ArrayRef = canonical.into();
if eligible_schemes.is_empty() {
return Ok(array);
}
if array.is_empty() {
return Ok(array);
}
if array.all_invalid(&mut LEGACY_SESSION.create_execution_ctx())? {
return Ok(
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
);
}
let before_nbytes = array.nbytes();
let merged_opts = eligible_schemes
.iter()
.fold(GenerateStatsOptions::default(), |acc, s| {
acc.merge(s.stats_options())
});
let ctx = ctx.with_merged_stats_options(merged_opts);
let mut data = ArrayAndStats::new(array, merged_opts);
if let Some((winner, _winner_estimate)) =
self.choose_best_scheme(&eligible_schemes, &mut data, ctx.clone())?
{
let compressed = winner.compress(self, &mut data, ctx)?;
if compressed.nbytes() < before_nbytes {
return Ok(compressed);
}
}
Ok(data.into_array())
}
fn choose_best_scheme(
&self,
schemes: &[&'static dyn Scheme],
data: &mut ArrayAndStats,
ctx: CompressorContext,
) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
let mut best: Option<(&'static dyn Scheme, f64)> = None;
for &scheme in schemes {
let estimate = scheme.expected_compression_ratio(data, ctx.clone());
match estimate {
CompressionEstimate::Verdict(verdict) => {
if let Some(winner_estimate) =
Self::check_and_update_estimate_verdict(&mut best, scheme, verdict)
{
return Ok(Some((scheme, winner_estimate)));
}
}
CompressionEstimate::Deferred(DeferredEstimate::Sample) => {
let sample_ratio = estimate_compression_ratio_with_sampling(
scheme,
self,
data.array(),
ctx.clone(),
)?;
if is_better_ratio(sample_ratio, &best) {
best = Some((scheme, sample_ratio));
}
}
CompressionEstimate::Deferred(DeferredEstimate::Callback(estimate_callback)) => {
let verdict = estimate_callback(self, data, ctx.clone())?;
if let Some(winner_estimate) =
Self::check_and_update_estimate_verdict(&mut best, scheme, verdict)
{
return Ok(Some((scheme, winner_estimate)));
}
}
}
}
Ok(best.map(|(scheme, ratio)| (scheme, WinnerEstimate::Ratio(ratio))))
}
fn check_and_update_estimate_verdict(
best: &mut Option<(&'static dyn Scheme, f64)>,
scheme: &'static dyn Scheme,
verdict: EstimateVerdict,
) -> Option<WinnerEstimate> {
match verdict {
EstimateVerdict::Skip => None,
EstimateVerdict::AlwaysUse => Some(WinnerEstimate::AlwaysUse),
EstimateVerdict::Ratio(ratio) => {
if is_better_ratio(ratio, &*best) {
*best = Some((scheme, ratio));
}
None
}
}
}
fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
let id = candidate.id();
let history = ctx.cascade_history();
if history.iter().any(|&(sid, _)| sid == id) {
return true;
}
let mut iter = history.iter().copied().peekable();
if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
&& self
.root_exclusions
.iter()
.any(|rule| rule.excluded == id && rule.children.contains(child_idx))
{
return true;
}
for (ancestor_id, child_idx) in iter {
if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
&& ancestor
.descendant_exclusions()
.iter()
.any(|rule| rule.excluded == id && rule.children.contains(child_idx))
{
return true;
}
}
for rule in candidate.ancestor_exclusions() {
if history
.iter()
.any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
{
return true;
}
}
false
}
fn compress_list_array(
&self,
list_array: ListArray,
ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
let list_array = list_array.reset_offsets(true)?;
let compressed_elems = self.compress(list_array.elements())?;
let offset_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
let compressed_offsets = self.compress_canonical(
Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
offset_ctx,
)?;
Ok(
ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
.into_array(),
)
}
fn compress_list_view_array(
&self,
list_view: ListViewArray,
ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
let compressed_elems = self.compress(list_view.elements())?;
let offset_ctx = ctx
.clone()
.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
let compressed_offsets = self.compress_canonical(
Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
offset_ctx,
)?;
let sizes_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
let compressed_sizes = self.compress_canonical(
Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
sizes_ctx,
)?;
Ok(ListViewArray::try_new(
compressed_elems,
compressed_offsets,
compressed_sizes,
list_view.validity()?,
)?
.into_array())
}
}
#[cfg(test)]
mod tests {
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::Constant;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::validity::Validity;
use vortex_buffer::buffer;
use super::*;
use crate::builtins::FloatDictScheme;
use crate::builtins::IntDictScheme;
use crate::builtins::StringDictScheme;
use crate::ctx::CompressorContext;
use crate::estimate::CompressionEstimate;
use crate::estimate::DeferredEstimate;
use crate::estimate::EstimateVerdict;
use crate::scheme::SchemeExt;
fn compressor() -> CascadingCompressor {
CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
}
fn estimate_test_data() -> ArrayAndStats {
let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
ArrayAndStats::new(array, GenerateStatsOptions::default())
}
fn matches_integer_primitive(canonical: &Canonical) -> bool {
matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
}
#[derive(Debug)]
struct DirectRatioScheme;
impl Scheme for DirectRatioScheme {
fn scheme_name(&self) -> &'static str {
"test.direct_ratio"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct ImmediateAlwaysUseScheme;
impl Scheme for ImmediateAlwaysUseScheme {
fn scheme_name(&self) -> &'static str {
"test.immediate_always_use"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct CallbackAlwaysUseScheme;
impl Scheme for CallbackAlwaysUseScheme {
fn scheme_name(&self) -> &'static str {
"test.callback_always_use"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, _ctx| Ok(EstimateVerdict::AlwaysUse),
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct CallbackSkipScheme;
impl Scheme for CallbackSkipScheme {
fn scheme_name(&self) -> &'static str {
"test.callback_skip"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, _ctx| Ok(EstimateVerdict::Skip),
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct CallbackRatioScheme;
impl Scheme for CallbackRatioScheme {
fn scheme_name(&self) -> &'static str {
"test.callback_ratio"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, _ctx| Ok(EstimateVerdict::Ratio(3.0)),
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[test]
fn test_self_exclusion() {
let c = compressor();
let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_root_exclusion_list_offsets() {
let c = compressor();
let ctx = CompressorContext::default()
.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
let c = compressor();
let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_push_rule_float_dict_excludes_int_dict_from_values() {
let c = compressor();
let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_no_exclusion_without_history() {
let c = compressor();
let ctx = CompressorContext::default();
assert!(!c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn immediate_always_use_wins_immediately() -> VortexResult<()> {
let compressor =
CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
let mut data = estimate_test_data();
let winner =
compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::AlwaysUse))
if scheme.id() == ImmediateAlwaysUseScheme.id()
));
Ok(())
}
#[test]
fn callback_always_use_wins_immediately() -> VortexResult<()> {
let compressor =
CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
let mut data = estimate_test_data();
let winner =
compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::AlwaysUse))
if scheme.id() == CallbackAlwaysUseScheme.id()
));
Ok(())
}
#[test]
fn callback_skip_is_ignored() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
let mut data = estimate_test_data();
let winner =
compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::Ratio(2.0)))
if scheme.id() == DirectRatioScheme.id()
));
Ok(())
}
#[test]
fn callback_ratio_competes_numerically() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
let mut data = estimate_test_data();
let winner =
compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::Ratio(3.0)))
if scheme.id() == CallbackRatioScheme.id()
));
Ok(())
}
#[test]
fn all_null_array_compresses_to_constant() -> VortexResult<()> {
let array = PrimitiveArray::new(
buffer![0i32, 0, 0, 0, 0],
Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
)
.into_array();
let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
let compressed = compressor.compress(&array)?;
assert!(compressed.is::<Constant>());
Ok(())
}
#[test]
fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
let array = PrimitiveArray::new(
buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
Validity::NonNullable,
)
.into_array();
let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
let ctx = CompressorContext::new().with_sampling();
let ratio =
estimate_compression_ratio_with_sampling(&FloatDictScheme, &compressor, &array, ctx)?;
assert!(ratio.is_finite());
Ok(())
}
}