use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::CanonicalValidity;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ExtensionArray;
use vortex_array::arrays::FixedSizeListArray;
use vortex_array::arrays::ListArray;
use vortex_array::arrays::ListViewArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::extension::ExtensionArrayExt;
use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
use vortex_array::arrays::list::ListArrayExt;
use vortex_array::arrays::listview::ListViewArrayExt;
use vortex_array::arrays::listview::list_from_list_view;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::arrays::scalar_fn::AnyScalarFn;
use vortex_array::arrays::struct_::StructArrayExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::scalar::Scalar;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use crate::builtins::IntDictScheme;
use crate::ctx::CompressorContext;
use crate::estimate::CompressionEstimate;
use crate::estimate::DeferredEstimate;
use crate::estimate::EstimateScore;
use crate::estimate::EstimateVerdict;
use crate::estimate::WinnerEstimate;
use crate::estimate::estimate_compression_ratio_with_sampling;
use crate::estimate::is_better_score;
use crate::scheme::ChildSelection;
use crate::scheme::DescendantExclusion;
use crate::scheme::Scheme;
use crate::scheme::SchemeExt;
use crate::scheme::SchemeId;
use crate::stats::ArrayAndStats;
use crate::stats::GenerateStatsOptions;
use crate::trace;
pub(crate) const ROOT_SCHEME_ID: SchemeId = SchemeId {
name: "vortex.compressor.root",
};
mod root_list_children {
pub const OFFSETS: usize = 1;
pub const SIZES: usize = 2;
}
#[derive(Debug, Clone)]
pub struct CascadingCompressor {
schemes: Vec<&'static dyn Scheme>,
root_exclusions: Vec<DescendantExclusion>,
}
impl CascadingCompressor {
pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
let root_exclusions = vec![DescendantExclusion {
excluded: IntDictScheme.id(),
children: ChildSelection::One(root_list_children::OFFSETS),
}];
Self {
schemes,
root_exclusions,
}
}
pub fn compress(
&self,
array: &ArrayRef,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let before_nbytes = array.nbytes();
let span = trace::compress_span(array.len(), array.dtype(), before_nbytes);
let _enter = span.enter();
let canonical = array.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
let compact = canonical.compact()?;
let compressed = self.compress_canonical(compact, CompressorContext::new(), exec_ctx)?;
trace::record_compress_outcome(&span, before_nbytes, compressed.nbytes());
Ok(compressed)
}
pub fn compress_child(
&self,
child: &ArrayRef,
parent_ctx: &CompressorContext,
parent_id: SchemeId,
child_index: usize,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
if parent_ctx.finished_cascading() {
trace::cascade_exhausted(parent_id, child_index);
return Ok(child.clone());
}
let canonical = child.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
let compact = canonical.compact()?;
let child_ctx = parent_ctx
.clone()
.descend_with_scheme(parent_id, child_index);
self.compress_canonical(compact, child_ctx, exec_ctx)
}
fn compress_canonical(
&self,
array: Canonical,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
match array {
Canonical::Null(null_array) => Ok(null_array.into_array()),
Canonical::Bool(bool_array) => {
self.choose_and_compress(Canonical::Bool(bool_array), compress_ctx, exec_ctx)
}
Canonical::Primitive(primitive) => {
self.choose_and_compress(Canonical::Primitive(primitive), compress_ctx, exec_ctx)
}
Canonical::Decimal(decimal) => {
self.choose_and_compress(Canonical::Decimal(decimal), compress_ctx, exec_ctx)
}
Canonical::Struct(struct_array) => {
let fields = struct_array
.iter_unmasked_fields()
.map(|field| self.compress(field, exec_ctx))
.collect::<Result<Vec<_>, _>>()?;
Ok(StructArray::try_new(
struct_array.names().clone(),
fields,
struct_array.len(),
struct_array.validity()?,
)?
.into_array())
}
Canonical::List(list_view_array) => {
if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
let list_array = list_from_list_view(list_view_array)?;
self.compress_list_array(list_array, compress_ctx, exec_ctx)
} else {
self.compress_list_view_array(list_view_array, compress_ctx, exec_ctx)
}
}
Canonical::FixedSizeList(fsl_array) => {
let compressed_elems = self.compress(fsl_array.elements(), exec_ctx)?;
Ok(FixedSizeListArray::try_new(
compressed_elems,
fsl_array.list_size(),
fsl_array.validity()?,
fsl_array.len(),
)?
.into_array())
}
Canonical::VarBinView(strings) => {
if strings
.dtype()
.eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
{
self.choose_and_compress(Canonical::VarBinView(strings), compress_ctx, exec_ctx)
} else {
Ok(strings.into_array())
}
}
Canonical::Extension(ext_array) => {
let before_nbytes = ext_array.as_ref().nbytes();
let result = self.choose_and_compress(
Canonical::Extension(ext_array.clone()),
compress_ctx,
exec_ctx,
)?;
if result.nbytes() < before_nbytes {
return Ok(result);
}
if result.is::<AnyScalarFn>() {
return Ok(result);
}
let compressed_storage = self.compress(ext_array.storage_array(), exec_ctx)?;
Ok(
ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
.into_array(),
)
}
Canonical::Variant(_) => {
vortex_bail!("Variant arrays can not be compressed")
}
}
}
fn choose_and_compress(
&self,
canonical: Canonical,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let eligible_schemes: Vec<&'static dyn Scheme> = self
.schemes
.iter()
.copied()
.filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &compress_ctx))
.collect();
let array: ArrayRef = canonical.into();
if eligible_schemes.is_empty() || array.is_empty() {
return Ok(array);
}
if array.all_invalid(exec_ctx)? {
return Ok(
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
);
}
let before_nbytes = array.nbytes();
let merged_opts = eligible_schemes
.iter()
.fold(GenerateStatsOptions::default(), |acc, s| {
acc.merge(s.stats_options())
});
let compress_ctx = compress_ctx.with_merged_stats_options(merged_opts);
let data = ArrayAndStats::new(array, merged_opts);
let Some((winner, winner_estimate)) =
self.choose_best_scheme(&eligible_schemes, &data, compress_ctx.clone(), exec_ctx)?
else {
return Ok(data.into_array());
};
let error_ctx = trace::enabled_error_context(&compress_ctx);
let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
let compressed = winner
.compress(self, &data, compress_ctx, exec_ctx)
.inspect_err(|err| {
trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
})?;
let after_nbytes = compressed.nbytes();
let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
trace::record_winner_compress_result(
after_nbytes,
winner_estimate.trace_ratio(),
actual_ratio,
accepted,
);
if accepted {
Ok(compressed)
} else {
Ok(data.into_array())
}
}
fn choose_best_scheme(
&self,
schemes: &[&'static dyn Scheme],
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
let mut best: Option<(&'static dyn Scheme, EstimateScore)> = None;
let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
{
let _verdict_pass = trace::verdict_pass_span().entered();
for &scheme in schemes {
match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
}
CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
let score = EstimateScore::FiniteCompression(ratio);
if is_better_score(score, best.as_ref()) {
best = Some((scheme, score));
}
}
CompressionEstimate::Deferred(deferred_estimate) => {
deferred.push((scheme, deferred_estimate));
}
}
}
}
for (scheme, deferred_estimate) in deferred {
let _span = trace::scheme_eval_span(scheme.id()).entered();
let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
match deferred_estimate {
DeferredEstimate::Sample => {
let score = estimate_compression_ratio_with_sampling(
self,
scheme,
data.array(),
compress_ctx.clone(),
exec_ctx,
)?;
if is_better_score(score, best.as_ref()) {
best = Some((scheme, score));
}
}
DeferredEstimate::Callback(callback) => {
match callback(self, data, threshold, compress_ctx.clone(), exec_ctx)? {
EstimateVerdict::Skip => {}
EstimateVerdict::AlwaysUse => {
return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
}
EstimateVerdict::Ratio(ratio) => {
let score = EstimateScore::FiniteCompression(ratio);
if is_better_score(score, best.as_ref()) {
best = Some((scheme, score));
}
}
}
}
}
}
Ok(best.map(|(scheme, score)| (scheme, WinnerEstimate::Score(score))))
}
fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
let id = candidate.id();
let history = ctx.cascade_history();
if history.iter().any(|&(sid, _)| sid == id) {
return true;
}
let mut iter = history.iter().copied().peekable();
if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
&& self
.root_exclusions
.iter()
.any(|rule| rule.excluded == id && rule.children.contains(child_idx))
{
return true;
}
for (ancestor_id, child_idx) in iter {
if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
&& ancestor
.descendant_exclusions()
.iter()
.any(|rule| rule.excluded == id && rule.children.contains(child_idx))
{
return true;
}
}
for rule in candidate.ancestor_exclusions() {
if history
.iter()
.any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
{
return true;
}
}
false
}
fn compress_list_array(
&self,
list_array: ListArray,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let list_array = list_array.reset_offsets(true)?;
let compressed_elems = self.compress(list_array.elements(), exec_ctx)?;
let offset_ctx =
compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
let list_offsets_primitive = list_array
.offsets()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow()?;
let compressed_offsets = self.compress_canonical(
Canonical::Primitive(list_offsets_primitive),
offset_ctx,
exec_ctx,
)?;
Ok(
ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
.into_array(),
)
}
fn compress_list_view_array(
&self,
list_view: ListViewArray,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let compressed_elems = self.compress(list_view.elements(), exec_ctx)?;
let offset_ctx = compress_ctx
.clone()
.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
let list_view_offsets_primitive = list_view
.offsets()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow()?;
let compressed_offsets = self.compress_canonical(
Canonical::Primitive(list_view_offsets_primitive),
offset_ctx,
exec_ctx,
)?;
let sizes_ctx = compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
let list_view_sizes_primitive = list_view
.sizes()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow()?;
let compressed_sizes = self.compress_canonical(
Canonical::Primitive(list_view_sizes_primitive),
sizes_ctx,
exec_ctx,
)?;
Ok(ListViewArray::try_new(
compressed_elems,
compressed_offsets,
compressed_sizes,
list_view.validity()?,
)?
.into_array())
}
}
#[cfg(test)]
mod tests {
use std::sync::LazyLock;
use parking_lot::Mutex;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::Constant;
use vortex_array::arrays::NullArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::session::ArraySession;
use vortex_array::validity::Validity;
use vortex_buffer::buffer;
use vortex_session::VortexSession;
use super::*;
use crate::builtins::FloatDictScheme;
use crate::builtins::IntDictScheme;
use crate::builtins::StringDictScheme;
use crate::ctx::CompressorContext;
use crate::estimate::CompressionEstimate;
use crate::estimate::DeferredEstimate;
use crate::estimate::EstimateScore;
use crate::estimate::EstimateVerdict;
use crate::estimate::WinnerEstimate;
use crate::scheme::SchemeExt;
static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
fn compressor() -> CascadingCompressor {
CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
}
fn estimate_test_data() -> ArrayAndStats {
let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
ArrayAndStats::new(array, GenerateStatsOptions::default())
}
fn matches_integer_primitive(canonical: &Canonical) -> bool {
matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
}
#[derive(Debug)]
struct DirectRatioScheme;
impl Scheme for DirectRatioScheme {
fn scheme_name(&self) -> &'static str {
"test.direct_ratio"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct ImmediateAlwaysUseScheme;
impl Scheme for ImmediateAlwaysUseScheme {
fn scheme_name(&self) -> &'static str {
"test.immediate_always_use"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct CallbackAlwaysUseScheme;
impl Scheme for CallbackAlwaysUseScheme {
fn scheme_name(&self) -> &'static str {
"test.callback_always_use"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::AlwaysUse),
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct CallbackSkipScheme;
impl Scheme for CallbackSkipScheme {
fn scheme_name(&self) -> &'static str {
"test.callback_skip"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Skip),
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct CallbackRatioScheme;
impl Scheme for CallbackRatioScheme {
fn scheme_name(&self) -> &'static str {
"test.callback_ratio"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(3.0)),
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct HugeRatioScheme;
impl Scheme for HugeRatioScheme {
fn scheme_name(&self) -> &'static str {
"test.huge_ratio"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct ZeroBytesSamplingScheme;
impl Scheme for ZeroBytesSamplingScheme {
fn scheme_name(&self) -> &'static str {
"test.zero_bytes_sampling"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}
fn compress(
&self,
_compressor: &CascadingCompressor,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
Ok(NullArray::new(data.array().len()).into_array())
}
}
#[test]
fn test_self_exclusion() {
let c = compressor();
let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_root_exclusion_list_offsets() {
let c = compressor();
let ctx = CompressorContext::default()
.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
let c = compressor();
let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_push_rule_float_dict_excludes_int_dict_from_values() {
let c = compressor();
let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
assert!(c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn test_no_exclusion_without_history() {
let c = compressor();
let ctx = CompressorContext::default();
assert!(!c.is_excluded(&IntDictScheme, &ctx));
}
#[test]
fn immediate_always_use_wins_immediately() -> VortexResult<()> {
let compressor =
CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::AlwaysUse))
if scheme.id() == ImmediateAlwaysUseScheme.id()
));
Ok(())
}
#[test]
fn callback_always_use_wins_immediately() -> VortexResult<()> {
let compressor =
CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::AlwaysUse))
if scheme.id() == CallbackAlwaysUseScheme.id()
));
Ok(())
}
#[test]
fn callback_skip_is_ignored() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(2.0))))
if scheme.id() == DirectRatioScheme.id()
));
Ok(())
}
#[test]
fn callback_ratio_competes_numerically() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(3.0))))
if scheme.id() == CallbackRatioScheme.id()
));
Ok(())
}
#[test]
fn zero_byte_sample_loses_to_finite_ratio() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &ZeroBytesSamplingScheme]);
let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &ZeroBytesSamplingScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
if scheme.id() == HugeRatioScheme.id()
));
Ok(())
}
#[test]
fn finite_ratio_displaces_zero_byte_sample() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &HugeRatioScheme]);
let schemes: [&'static dyn Scheme; 2] = [&ZeroBytesSamplingScheme, &HugeRatioScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
if scheme.id() == HugeRatioScheme.id()
));
Ok(())
}
#[test]
fn zero_byte_sample_alone_selects_no_scheme() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
let schemes: [&'static dyn Scheme; 1] = [&ZeroBytesSamplingScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(winner.is_none());
Ok(())
}
static OBSERVER_LOCK: Mutex<()> = Mutex::new(());
static OBSERVED_THRESHOLD: Mutex<Option<Option<EstimateScore>>> = Mutex::new(None);
#[derive(Debug)]
struct ThresholdObservingScheme;
impl Scheme for ThresholdObservingScheme {
fn scheme_name(&self) -> &'static str {
"test.threshold_observing"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, best_so_far, _ctx, _exec_ctx| {
*OBSERVED_THRESHOLD.lock() = Some(best_so_far);
Ok(EstimateVerdict::Skip)
},
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[derive(Debug)]
struct CallbackMatchingRatioScheme;
impl Scheme for CallbackMatchingRatioScheme {
fn scheme_name(&self) -> &'static str {
"test.callback_matching_ratio"
}
fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(2.0)),
)))
}
fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
unreachable!("test helper should never be selected for compression")
}
}
#[test]
fn callback_always_use_overrides_pass_one_best() -> VortexResult<()> {
let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &CallbackAlwaysUseScheme]);
let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &CallbackAlwaysUseScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::AlwaysUse))
if scheme.id() == CallbackAlwaysUseScheme.id()
));
Ok(())
}
#[test]
fn threshold_reflects_pass_one_best() -> VortexResult<()> {
let _guard = OBSERVER_LOCK.lock();
*OBSERVED_THRESHOLD.lock() = None;
let compressor =
CascadingCompressor::new(vec![&DirectRatioScheme, &ThresholdObservingScheme]);
let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ThresholdObservingScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
let observed = *OBSERVED_THRESHOLD.lock();
assert!(matches!(
observed,
Some(Some(EstimateScore::FiniteCompression(r))) if r == 2.0
));
Ok(())
}
#[test]
fn threshold_is_none_when_only_prior_is_zero_bytes() -> VortexResult<()> {
let _guard = OBSERVER_LOCK.lock();
*OBSERVED_THRESHOLD.lock() = None;
let compressor =
CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &ThresholdObservingScheme]);
let schemes: [&'static dyn Scheme; 2] =
[&ZeroBytesSamplingScheme, &ThresholdObservingScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
let observed = *OBSERVED_THRESHOLD.lock();
assert_eq!(observed, Some(None));
Ok(())
}
#[test]
fn threshold_is_none_when_no_prior_scheme() -> VortexResult<()> {
let _guard = OBSERVER_LOCK.lock();
*OBSERVED_THRESHOLD.lock() = None;
let compressor = CascadingCompressor::new(vec![&ThresholdObservingScheme]);
let schemes: [&'static dyn Scheme; 1] = [&ThresholdObservingScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
let observed = *OBSERVED_THRESHOLD.lock();
assert_eq!(observed, Some(None));
Ok(())
}
#[test]
fn threshold_updates_from_earlier_deferred_callback() -> VortexResult<()> {
let _guard = OBSERVER_LOCK.lock();
*OBSERVED_THRESHOLD.lock() = None;
let compressor =
CascadingCompressor::new(vec![&CallbackRatioScheme, &ThresholdObservingScheme]);
let schemes: [&'static dyn Scheme; 2] = [&CallbackRatioScheme, &ThresholdObservingScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
let observed = *OBSERVED_THRESHOLD.lock();
assert!(matches!(
observed,
Some(Some(EstimateScore::FiniteCompression(r))) if r == 3.0
));
Ok(())
}
#[test]
fn ratio_tie_between_immediate_and_deferred_favors_immediate() -> VortexResult<()> {
let compressor =
CascadingCompressor::new(vec![&CallbackMatchingRatioScheme, &DirectRatioScheme]);
let schemes: [&'static dyn Scheme; 2] = [&CallbackMatchingRatioScheme, &DirectRatioScheme];
let data = estimate_test_data();
let mut exec_ctx = SESSION.create_execution_ctx();
let winner = compressor.choose_best_scheme(
&schemes,
&data,
CompressorContext::new(),
&mut exec_ctx,
)?;
assert!(matches!(
winner,
Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(r))))
if scheme.id() == DirectRatioScheme.id() && r == 2.0
));
Ok(())
}
#[test]
fn all_null_array_compresses_to_constant() -> VortexResult<()> {
let array = PrimitiveArray::new(
buffer![0i32, 0, 0, 0, 0],
Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
)
.into_array();
let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
let mut exec_ctx = SESSION.create_execution_ctx();
let compressed = compressor.compress(&array, &mut exec_ctx)?;
assert!(compressed.is::<Constant>());
Ok(())
}
#[test]
fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
let array = PrimitiveArray::new(
buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
Validity::NonNullable,
)
.into_array();
let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
let ctx = CompressorContext::new().with_sampling();
let mut exec_ctx = SESSION.create_execution_ctx();
let score = estimate_compression_ratio_with_sampling(
&compressor,
&FloatDictScheme,
&array,
ctx,
&mut exec_ctx,
)?;
assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
Ok(())
}
}