use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_compressor::builtins::BinaryDictScheme;
use vortex_compressor::builtins::FloatDictScheme;
use vortex_compressor::builtins::IntDictScheme;
use vortex_compressor::builtins::StringDictScheme;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_compressor::scheme::AncestorExclusion;
use vortex_compressor::scheme::ChildSelection;
use vortex_compressor::scheme::DescendantExclusion;
use vortex_error::VortexResult;
use vortex_runend::RunEnd;
use vortex_runend::compress::runend_encode;
use super::IntRLEScheme;
use super::SparseScheme;
use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;
use crate::SchemeExt;
const RUN_END_THRESHOLD: u32 = 4;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct RunEndScheme;
impl Scheme for RunEndScheme {
fn scheme_name(&self) -> &'static str {
"vortex.int.runend"
}
fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_int()
}
fn num_children(&self) -> usize {
2
}
fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
vec![
DescendantExclusion {
excluded: IntDictScheme.id(),
children: ChildSelection::One(1),
},
DescendantExclusion {
excluded: RunEndScheme.id(),
children: ChildSelection::One(1),
},
DescendantExclusion {
excluded: IntRLEScheme.id(),
children: ChildSelection::One(1),
},
DescendantExclusion {
excluded: SparseScheme.id(),
children: ChildSelection::One(1),
},
]
}
fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
vec![
AncestorExclusion {
ancestor: IntDictScheme.id(),
children: ChildSelection::One(0),
},
AncestorExclusion {
ancestor: FloatDictScheme.id(),
children: ChildSelection::One(0),
},
AncestorExclusion {
ancestor: StringDictScheme.id(),
children: ChildSelection::One(0),
},
AncestorExclusion {
ancestor: BinaryDictScheme.id(),
children: ChildSelection::One(0),
},
]
}
fn expected_compression_ratio(
&self,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
if data.integer_stats(exec_ctx).average_run_length() < RUN_END_THRESHOLD {
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}
fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let (ends, values) = runend_encode(data.array_as_primitive(), exec_ctx);
let values_primitive = values.execute::<PrimitiveArray>(exec_ctx)?;
let compressed_values = compressor.compress_child(
&values_primitive.into_array(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;
let compressed_ends =
compressor.compress_child(&ends.into_array(), &compress_ctx, self.id(), 1, exec_ctx)?;
Ok(unsafe {
RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
.into_array()
})
}
}