use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::arrays::varbin::VarBinArrayExt;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_compressor::scheme::ChildSelection;
use vortex_compressor::scheme::DescendantExclusion;
use vortex_error::VortexResult;
use vortex_fsst::FSST;
use vortex_fsst::FSSTArrayExt;
use vortex_fsst::fsst_compress;
use vortex_fsst::fsst_train_compressor;
use vortex_sparse::Sparse;
use super::integer::IntDictScheme;
use super::integer::SparseScheme as IntSparseScheme;
use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;
use crate::SchemeExt;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct FSSTScheme;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct NullDominatedSparseScheme;
#[cfg(feature = "zstd")]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ZstdScheme;
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ZstdBuffersScheme;
pub use vortex_compressor::builtins::StringConstantScheme;
pub use vortex_compressor::builtins::StringDictScheme;
pub use vortex_compressor::builtins::is_utf8_string;
pub use vortex_compressor::stats::StringStats;
impl Scheme for FSSTScheme {
fn scheme_name(&self) -> &'static str {
"vortex.string.fsst"
}
fn matches(&self, canonical: &Canonical) -> bool {
is_utf8_string(canonical)
}
fn num_children(&self) -> usize {
2
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}
fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let utf8 = data.array_as_utf8().into_owned();
let compressor_fsst = fsst_train_compressor(&utf8);
let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx);
let uncompressed_lengths_primitive = fsst
.uncompressed_lengths()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow()?;
let compressed_original_lengths = compressor.compress_child(
&uncompressed_lengths_primitive.into_array(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;
let codes_offsets_primitive = fsst
.codes()
.offsets()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow()?;
let compressed_codes_offsets = compressor.compress_child(
&codes_offsets_primitive.into_array(),
&compress_ctx,
self.id(),
1,
exec_ctx,
)?;
let compressed_codes = VarBinArray::try_new(
compressed_codes_offsets,
fsst.codes().bytes().clone(),
fsst.codes().dtype().clone(),
fsst.codes().validity()?,
)?;
let fsst = FSST::try_new(
fsst.dtype().clone(),
fsst.symbols().clone(),
fsst.symbol_lengths().clone(),
compressed_codes,
compressed_original_lengths,
exec_ctx,
)?;
Ok(fsst.into_array())
}
}
impl Scheme for NullDominatedSparseScheme {
fn scheme_name(&self) -> &'static str {
"vortex.string.sparse"
}
fn matches(&self, canonical: &Canonical) -> bool {
is_utf8_string(canonical)
}
fn num_children(&self) -> usize {
1
}
fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
vec![
DescendantExclusion {
excluded: IntSparseScheme.id(),
children: ChildSelection::All,
},
DescendantExclusion {
excluded: IntDictScheme.id(),
children: ChildSelection::All,
},
]
}
fn expected_compression_ratio(
&self,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
let len = data.array_len() as f64;
let stats = data.string_stats(exec_ctx);
let value_count = stats.value_count();
if value_count == 0 {
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}
if stats.null_count() as f64 / len > 0.9 {
return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
}
CompressionEstimate::Verdict(EstimateVerdict::Skip)
}
fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
let indices = sparse
.patches()
.indices()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow()?;
let compressed_indices = compressor.compress_child(
&indices.into_array(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;
Sparse::try_new(
compressed_indices,
sparse.patches().values().clone(),
sparse.len(),
sparse.fill_scalar().clone(),
)
.map(|a| a.into_array())
} else {
Ok(sparse_encoded)
}
}
}
#[cfg(feature = "zstd")]
impl Scheme for ZstdScheme {
fn scheme_name(&self) -> &'static str {
"vortex.string.zstd"
}
fn matches(&self, canonical: &Canonical) -> bool {
is_utf8_string(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}
fn compress(
&self,
_compressor: &CascadingCompressor,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let compacted = data.array_as_utf8().into_owned().compact_buffers()?;
Ok(
vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)?
.into_array(),
)
}
}
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
impl Scheme for ZstdBuffersScheme {
fn scheme_name(&self) -> &'static str {
"vortex.string.zstd_buffers"
}
fn matches(&self, canonical: &Canonical) -> bool {
is_utf8_string(canonical)
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}
fn compress(
&self,
_compressor: &CascadingCompressor,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array())
}
}
#[cfg(test)]
mod tests {
use std::sync::LazyLock;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::builders::ArrayBuilder;
use vortex_array::builders::VarBinViewBuilder;
use vortex_array::display::DisplayOptions;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::session::ArraySession;
use vortex_error::VortexResult;
use vortex_session::VortexSession;
use crate::BtrBlocksCompressor;
static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
#[test]
fn test_strings() -> VortexResult<()> {
let mut strings = Vec::new();
for _ in 0..1024 {
strings.push(Some("hello-world-1234"));
}
for _ in 0..1024 {
strings.push(Some("hello-world-56789"));
}
let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = strings.into_array();
let btr = BtrBlocksCompressor::default();
let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
assert_eq!(compressed.len(), 2048);
let display = compressed
.display_as(DisplayOptions::MetadataOnly)
.to_string()
.to_lowercase();
assert_eq!(display, "vortex.dict(utf8, len=2048)");
Ok(())
}
#[test]
fn test_sparse_nulls() -> VortexResult<()> {
let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
strings.append_nulls(99);
strings.append_value("one little string");
let strings = strings.finish_into_varbinview();
let array_ref = strings.into_array();
let btr = BtrBlocksCompressor::default();
let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
assert_eq!(compressed.len(), 100);
let display = compressed
.display_as(DisplayOptions::MetadataOnly)
.to_string()
.to_lowercase();
assert_eq!(display, "vortex.sparse(utf8?, len=100)");
Ok(())
}
}
#[cfg(test)]
mod scheme_selection_tests {
use std::sync::LazyLock;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Constant;
use vortex_array::arrays::Dict;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::session::ArraySession;
use vortex_error::VortexResult;
use vortex_fsst::FSST;
use vortex_session::VortexSession;
use crate::BtrBlocksCompressor;
static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
#[test]
fn test_constant_compressed() -> VortexResult<()> {
let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = array.into_array();
let compressed = BtrBlocksCompressor::default()
.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
assert!(compressed.is::<Constant>());
Ok(())
}
#[test]
fn test_dict_compressed() -> VortexResult<()> {
let distinct_values = ["apple", "banana", "cherry"];
let mut strings = Vec::with_capacity(1000);
for i in 0..1000 {
strings.push(Some(distinct_values[i % 3]));
}
let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = array.into_array();
let compressed = BtrBlocksCompressor::default()
.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
assert!(compressed.is::<Dict>());
Ok(())
}
#[test]
fn test_fsst_compressed() -> VortexResult<()> {
let mut strings = Vec::with_capacity(1000);
for i in 0..1000 {
strings.push(Some(format!(
"this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
)));
}
let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = array.into_array();
let compressed = BtrBlocksCompressor::default()
.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
assert!(compressed.is::<FSST>());
Ok(())
}
}