use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_compressor::scheme::SchemeId;
use vortex_error::VortexResult;
use vortex_onpair::DEFAULT_DICT12_CONFIG;
use vortex_onpair::OnPair;
use vortex_onpair::OnPairArrayExt;
use vortex_onpair::OnPairArraySlotsExt;
use vortex_onpair::onpair_compress;
use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;
use crate::SchemeExt;
use crate::schemes::integer::try_compress_delta;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct OnPairScheme;
impl Scheme for OnPairScheme {
fn scheme_name(&self) -> &'static str {
"vortex.string.onpair"
}
fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_utf8()
}
fn num_children(&self) -> usize {
4
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}
fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let utf8 = data.array_as_varbinview().into_owned();
let onpair_array = onpair_compress(&utf8, utf8.len(), utf8.dtype(), DEFAULT_DICT12_CONFIG)?;
let dict_offsets = compress_offsets_child(
compressor,
onpair_array.dict_offsets(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;
let codes = compress_primitive_child(
compressor,
onpair_array.codes(),
&compress_ctx,
self.id(),
1,
exec_ctx,
)?;
let codes_offsets = compress_offsets_child(
compressor,
onpair_array.codes_offsets(),
&compress_ctx,
self.id(),
2,
exec_ctx,
)?;
let uncompressed_lengths = compress_primitive_child(
compressor,
onpair_array.uncompressed_lengths(),
&compress_ctx,
self.id(),
3,
exec_ctx,
)?;
Ok(OnPair::try_new(
onpair_array.dtype().clone(),
onpair_array.dict_bytes_handle().clone(),
dict_offsets,
codes,
codes_offsets,
uncompressed_lengths,
onpair_array.array_validity(),
onpair_array.bits(),
)?
.into_array())
}
}
fn compress_primitive_child(
compressor: &CascadingCompressor,
child: &ArrayRef,
compress_ctx: &CompressorContext,
scheme_id: SchemeId,
child_idx: usize,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let narrowed = child
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow(exec_ctx)?
.into_array();
compressor.compress_child(&narrowed, compress_ctx, scheme_id, child_idx, exec_ctx)
}
const OFFSETS_DELTA_MIN_LEN: usize = 2048;
fn compress_offsets_child(
compressor: &CascadingCompressor,
child: &ArrayRef,
compress_ctx: &CompressorContext,
scheme_id: SchemeId,
child_idx: usize,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let narrowed = child
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow(exec_ctx)?
.into_array();
let plain =
compressor.compress_child(&narrowed, compress_ctx, scheme_id, child_idx, exec_ctx)?;
if narrowed.len() < OFFSETS_DELTA_MIN_LEN {
return Ok(plain);
}
let delta = try_compress_delta(
compressor,
&narrowed,
compress_ctx,
scheme_id,
child_idx,
exec_ctx,
)?;
if delta.nbytes() < plain.nbytes() {
Ok(delta)
} else {
Ok(plain)
}
}