vortex_sampling_compressor/compressors/
fsst.rsuse std::any::Any;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use fsst::Compressor;
use vortex::array::{VarBin, VarBinView};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding, FSST};
use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
use crate::SamplingCompressor;
#[derive(Debug)]
pub struct FSSTCompressor;
const FSST_SYMTAB_MAX_SIZE: usize = 8 * 255 + 255;
impl EncoderMetadata for Compressor {
fn as_any(&self) -> &dyn Any {
self
}
}
impl EncodingCompressor for FSSTCompressor {
fn id(&self) -> &str {
FSST::ID.as_ref()
}
fn can_compress(&self, array: &vortex::Array) -> Option<&dyn EncodingCompressor> {
if !matches!(array.dtype(), &DType::Utf8(_)) {
return None;
}
if array.is_encoding(VarBin::ID) || array.is_encoding(VarBinView::ID) {
return Some(self);
}
None
}
fn compress<'a>(
&'a self,
array: &vortex::Array,
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
if array.nbytes() < 10 * FSST_SYMTAB_MAX_SIZE {
return Ok(CompressedArray::uncompressed(array.clone()));
}
let compressor = like
.clone()
.and_then(|mut tree| tree.metadata())
.map(VortexResult::Ok)
.unwrap_or_else(|| Ok(Arc::new(fsst_train_compressor(array)?)))?;
let Some(fsst_compressor) = compressor.as_any().downcast_ref::<Compressor>() else {
vortex_bail!("Could not downcast metadata as FSST Compressor")
};
let fsst_array = if array.is_encoding(VarBin::ID) || array.is_encoding(VarBinView::ID) {
fsst_compress(array, fsst_compressor)?
} else {
vortex_bail!(
"Unsupported encoding for FSSTCompressor: {}",
array.encoding().id()
)
};
let uncompressed_lengths = ctx
.auxiliary("uncompressed_lengths")
.excluding(self)
.compress(
&fsst_array.uncompressed_lengths(),
like.as_ref().and_then(|l| l.child(0)),
)?;
Ok(CompressedArray::new(
FSSTArray::try_new(
fsst_array.dtype().clone(),
fsst_array.symbols(),
fsst_array.symbol_lengths(),
fsst_array.codes(),
uncompressed_lengths.array,
)?
.into_array(),
Some(CompressionTree::new_with_metadata(
self,
vec![uncompressed_lengths.path],
compressor,
)),
))
}
fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&FSSTEncoding as EncodingRef])
}
}