use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::arrays::varbin::VarBinArrayExt;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_error::VortexResult;
use vortex_fsst::FSST;
use vortex_fsst::FSSTArrayExt;
use vortex_fsst::fsst_compress;
use vortex_fsst::fsst_train_compressor;
use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;
use crate::SchemeExt;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct FSSTScheme;
impl Scheme for FSSTScheme {
fn scheme_name(&self) -> &'static str {
"vortex.string.fsst"
}
fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_utf8()
}
fn num_children(&self) -> usize {
2
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}
fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let utf8 = data.array_as_varbinview().into_owned();
let compressor_fsst = fsst_train_compressor(&utf8);
let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx);
let uncompressed_lengths_primitive = fsst
.uncompressed_lengths()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow(exec_ctx)?;
let compressed_original_lengths = compressor.compress_child(
&uncompressed_lengths_primitive.into_array(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;
let codes_offsets_primitive = fsst
.codes()
.offsets()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow(exec_ctx)?;
let compressed_codes_offsets = compressor.compress_child(
&codes_offsets_primitive.into_array(),
&compress_ctx,
self.id(),
1,
exec_ctx,
)?;
let compressed_codes = VarBinArray::try_new(
compressed_codes_offsets,
fsst.codes().bytes().clone(),
fsst.codes().dtype().clone(),
fsst.codes().validity()?,
)?;
let fsst = FSST::try_new(
fsst.dtype().clone(),
fsst.symbols().clone(),
fsst.symbol_lengths().clone(),
compressed_codes,
compressed_original_lengths,
exec_ctx,
)?;
Ok(fsst.into_array())
}
}