use std::fmt;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_error::VortexResult;
use crate::CascadingCompressor;
use crate::ctx::CompressorContext;
use crate::sample::SAMPLE_SIZE;
use crate::sample::sample;
use crate::sample::sample_count_approx_one_percent;
use crate::scheme::Scheme;
use crate::scheme::SchemeExt;
use crate::stats::ArrayAndStats;
use crate::trace;
#[rustfmt::skip]
pub type EstimateFn = dyn FnOnce(
&CascadingCompressor,
&ArrayAndStats,
Option<EstimateScore>,
CompressorContext,
&mut ExecutionCtx,
) -> VortexResult<EstimateVerdict>
+ Send
+ Sync;
#[derive(Debug)]
pub enum CompressionEstimate {
Verdict(EstimateVerdict),
Deferred(DeferredEstimate),
}
#[derive(Debug)]
pub enum EstimateVerdict {
Skip,
AlwaysUse,
Ratio(f64),
}
pub enum DeferredEstimate {
Sample,
Callback(Box<EstimateFn>),
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum EstimateScore {
FiniteCompression(f64),
ZeroBytes,
}
impl EstimateScore {
pub(super) fn from_sample_sizes(before_nbytes: u64, after_nbytes: u64) -> Self {
if after_nbytes == 0 {
Self::ZeroBytes
} else {
Self::FiniteCompression(before_nbytes as f64 / after_nbytes as f64)
}
}
pub fn finite_ratio(self) -> Option<f64> {
match self {
Self::FiniteCompression(ratio) => Some(ratio),
Self::ZeroBytes => None,
}
}
fn is_valid(self) -> bool {
match self {
Self::FiniteCompression(ratio) => {
ratio.is_finite() && !ratio.is_subnormal() && ratio > 1.0
}
Self::ZeroBytes => false,
}
}
fn beats(self, other: Self) -> bool {
match (self, other) {
(Self::ZeroBytes, _) => false,
(Self::FiniteCompression(_), Self::ZeroBytes) => true,
(Self::FiniteCompression(ratio), Self::FiniteCompression(best_ratio)) => {
ratio > best_ratio
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub(super) enum WinnerEstimate {
AlwaysUse,
Score(EstimateScore),
}
impl WinnerEstimate {
pub(super) fn trace_ratio(self) -> Option<f64> {
match self {
Self::AlwaysUse => None,
Self::Score(score) => score.finite_ratio(),
}
}
}
pub(super) fn is_better_score(
score: EstimateScore,
best: Option<&(&'static dyn Scheme, EstimateScore)>,
) -> bool {
score.is_valid() && best.is_none_or(|(_, best_score)| score.beats(*best_score))
}
pub(super) fn estimate_compression_ratio_with_sampling<S: Scheme + ?Sized>(
compressor: &CascadingCompressor,
scheme: &S,
array: &ArrayRef,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<EstimateScore> {
let sample_array = if compress_ctx.is_sample() {
array.clone()
} else {
let sample_count = sample_count_approx_one_percent(array.len());
let canonical: Canonical = sample(array, SAMPLE_SIZE, sample_count).execute(exec_ctx)?;
canonical.into_array()
};
let sample_data = ArrayAndStats::new(sample_array, scheme.stats_options());
let error_ctx = trace::enabled_error_context(&compress_ctx);
let sample_ctx = compress_ctx.with_sampling();
let compressed = match scheme.compress(compressor, &sample_data, sample_ctx, exec_ctx) {
Ok(compressed) => compressed,
Err(err) => {
trace::sample_compress_failed(scheme.id(), error_ctx.as_ref(), &err);
return Err(err);
}
};
let after = compressed.nbytes();
let before = sample_data.array().nbytes();
let score = EstimateScore::from_sample_sizes(before, after);
if matches!(score, EstimateScore::ZeroBytes) {
trace::zero_byte_sample_result(scheme.id(), before);
}
Ok(score)
}
impl fmt::Debug for DeferredEstimate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DeferredEstimate::Sample => write!(f, "Sample"),
DeferredEstimate::Callback(_) => write!(f, "Callback(..)"),
}
}
}