use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::arrays::ConstantArray;
use vortex_array::scalar::Scalar;
use vortex_array::vtable::VTable;
use vortex_error::VortexResult;
use crate::BtrBlocksCompressor;
use crate::CompressorContext;
use crate::CompressorStats;
use crate::Scheme;
pub(crate) mod decimal;
pub(crate) mod float;
pub(crate) mod integer;
mod patches;
mod rle;
pub(crate) mod string;
pub(crate) mod temporal;
pub(crate) const MAX_CASCADE: usize = 3;
pub trait Compressor {
type ArrayVTable: VTable;
type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType;
fn schemes(&self) -> &[&'static Self::SchemeType];
fn default_scheme(&self) -> &'static Self::SchemeType;
}
pub trait CompressorExt: Compressor
where
Self::SchemeType: 'static,
{
#[allow(clippy::cognitive_complexity)]
fn choose_scheme(
&self,
compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
ctx: CompressorContext,
excludes: &[<Self::SchemeType as Scheme>::CodeType],
) -> VortexResult<&'static Self::SchemeType> {
let mut best_ratio = 1.0;
let mut best_scheme: Option<&'static Self::SchemeType> = None;
let depth = MAX_CASCADE - ctx.allowed_cascading;
for scheme in self.schemes().iter() {
if excludes.contains(&scheme.code()) {
continue;
}
if ctx.is_sample && scheme.is_constant() {
continue;
}
tracing::trace!(
is_sample = ctx.is_sample,
depth,
is_constant = scheme.is_constant(),
?scheme,
"Trying compression scheme"
);
let ratio = scheme.expected_compression_ratio(compressor, stats, ctx, excludes)?;
tracing::trace!(
is_sample = ctx.is_sample,
depth,
ratio,
?scheme,
"Expected compression result"
);
if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
if ratio > best_ratio {
best_ratio = ratio;
best_scheme = Some(*scheme);
}
} else {
tracing::trace!(
"Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
);
}
}
tracing::trace!(depth, scheme = ?best_scheme, ratio = best_ratio, "best scheme found");
if let Some(best) = best_scheme {
Ok(best)
} else {
Ok(self.default_scheme())
}
}
fn compress(
&self,
btr_blocks_compressor: &BtrBlocksCompressor,
array: &<<Self as Compressor>::ArrayVTable as VTable>::Array,
ctx: CompressorContext,
excludes: &[<Self::SchemeType as Scheme>::CodeType],
) -> VortexResult<ArrayRef> {
if array.is_empty() {
return Ok(array.to_array());
}
if array.all_invalid()? {
return Ok(
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
);
}
let stats = self.gen_stats(array);
let best_scheme = self.choose_scheme(btr_blocks_compressor, &stats, ctx, excludes)?;
let output = best_scheme.compress(btr_blocks_compressor, &stats, ctx, excludes)?;
if output.nbytes() < array.nbytes() {
Ok(output)
} else {
tracing::debug!("resulting tree too large: {}", output.encoding_id());
Ok(array.to_array())
}
}
}
impl<T: Compressor> CompressorExt for T where T::SchemeType: 'static {}