use std::hash::Hash;
use std::hash::Hasher;
use enum_iterator::Sequence;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::aggregate_fn::fns::is_constant::is_constant;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::DictArray;
use vortex_array::arrays::MaskedArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::VarBinView;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::builders::dict::dict_encode;
use vortex_array::scalar::Scalar;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityHelper;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_fsst::FSSTArray;
use vortex_fsst::fsst_compress;
use vortex_fsst::fsst_train_compressor;
use vortex_sparse::Sparse;
use vortex_sparse::SparseArray;
use vortex_utils::aliases::hash_set::HashSet;
use super::integer::DictScheme as IntDictScheme;
use super::integer::SequenceScheme as IntSequenceScheme;
use super::integer::SparseScheme as IntSparseScheme;
use crate::BtrBlocksCompressor;
use crate::CanonicalCompressor;
use crate::Compressor;
use crate::CompressorContext;
use crate::CompressorStats;
use crate::Excludes;
use crate::GenerateStatsOptions;
use crate::IntCode;
use crate::Scheme;
use crate::SchemeExt;
use crate::sample::sample;
#[derive(Clone, Debug)]
pub struct StringStats {
src: VarBinViewArray,
estimated_distinct_count: u32,
value_count: u32,
null_count: u32,
}
fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
let views = strings.views();
let mut distinct = HashSet::with_capacity(views.len() / 2);
views.iter().for_each(|&view| {
#[expect(
clippy::cast_possible_truncation,
reason = "approximate uniqueness with view prefix"
)]
let len_and_prefix = view.as_u128() as u64;
distinct.insert(len_and_prefix);
});
Ok(u32::try_from(distinct.len())?)
}
impl StringStats {
fn generate_opts_fallible(
input: &VarBinViewArray,
opts: GenerateStatsOptions,
) -> VortexResult<Self> {
let null_count = input
.statistics()
.compute_null_count()
.ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
let value_count = input.len() - null_count;
let estimated_distinct = if opts.count_distinct_values {
estimate_distinct_count(input)?
} else {
u32::MAX
};
Ok(Self {
src: input.clone(),
value_count: u32::try_from(value_count)?,
null_count: u32::try_from(null_count)?,
estimated_distinct_count: estimated_distinct,
})
}
}
impl CompressorStats for StringStats {
type ArrayVTable = VarBinView;
fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
Self::generate_opts_fallible(input, opts)
.vortex_expect("StringStats::generate_opts should not fail")
}
fn source(&self) -> &VarBinViewArray {
&self.src
}
fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
let sampled =
sample(&self.src.clone().into_array(), sample_size, sample_count).to_varbinview();
Self::generate_opts(&sampled, opts)
}
}
pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
&UncompressedScheme,
&DictScheme,
&FSSTScheme,
&ConstantScheme,
&NullDominated,
#[cfg(feature = "zstd")]
&ZstdScheme,
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
&ZstdBuffersScheme,
];
#[derive(Clone, Copy)]
pub struct StringCompressor<'a> {
pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
}
impl<'a> Compressor for StringCompressor<'a> {
type ArrayVTable = VarBinView;
type SchemeType = dyn StringScheme;
type StatsType = StringStats;
fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
if self
.btr_blocks_compressor
.string_schemes()
.iter()
.any(|s| s.code() == DictScheme.code())
{
StringStats::generate_opts(
array,
GenerateStatsOptions {
count_distinct_values: true,
},
)
} else {
StringStats::generate_opts(
array,
GenerateStatsOptions {
count_distinct_values: false,
},
)
}
}
fn schemes(&self) -> &[&'static dyn StringScheme] {
self.btr_blocks_compressor.string_schemes()
}
fn default_scheme(&self) -> &'static Self::SchemeType {
&UncompressedScheme
}
}
pub trait StringScheme:
Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
{
}
impl<T> StringScheme for T where
T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
{
}
impl PartialEq for dyn StringScheme {
fn eq(&self, other: &Self) -> bool {
self.code() == other.code()
}
}
impl Eq for dyn StringScheme {}
impl Hash for dyn StringScheme {
fn hash<H: Hasher>(&self, state: &mut H) {
self.code().hash(state)
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct UncompressedScheme;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct DictScheme;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct FSSTScheme;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ConstantScheme;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct NullDominated;
#[cfg(feature = "zstd")]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ZstdScheme;
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ZstdBuffersScheme;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
pub enum StringCode {
Uncompressed,
Dict,
Fsst,
Constant,
Sparse,
Zstd,
ZstdBuffers,
}
impl Scheme for UncompressedScheme {
type StatsType = StringStats;
type CodeType = StringCode;
fn code(&self) -> StringCode {
StringCode::Uncompressed
}
fn expected_compression_ratio(
&self,
_compressor: &BtrBlocksCompressor,
_stats: &Self::StatsType,
_ctx: CompressorContext,
_excludes: &[StringCode],
) -> VortexResult<f64> {
Ok(1.0)
}
fn compress(
&self,
_compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
_ctx: CompressorContext,
_excludes: &[StringCode],
) -> VortexResult<ArrayRef> {
Ok(stats.source().clone().into_array())
}
}
impl Scheme for DictScheme {
type StatsType = StringStats;
type CodeType = StringCode;
fn code(&self) -> StringCode {
StringCode::Dict
}
fn expected_compression_ratio(
&self,
compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
ctx: CompressorContext,
excludes: &[StringCode],
) -> VortexResult<f64> {
if stats.estimated_distinct_count > stats.value_count / 2 {
return Ok(0.0);
}
if stats.value_count == 0 {
return Ok(0.0);
}
self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
}
fn compress(
&self,
compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
ctx: CompressorContext,
_excludes: &[StringCode],
) -> VortexResult<ArrayRef> {
let dict = dict_encode(&stats.source().clone().into_array())?;
if ctx.allowed_cascading == 0 {
return Ok(dict.into_array());
}
let compressed_codes = compressor.compress_canonical(
Canonical::Primitive(dict.codes().to_primitive()),
ctx.descend(),
Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
)?;
let compressed_values = compressor.compress_canonical(
Canonical::VarBinView(dict.values().to_varbinview()),
ctx.descend(),
Excludes::from(&[DictScheme.code()]),
)?;
unsafe {
Ok(
DictArray::new_unchecked(compressed_codes, compressed_values)
.set_all_values_referenced(dict.has_all_values_referenced())
.into_array(),
)
}
}
}
impl Scheme for FSSTScheme {
type StatsType = StringStats;
type CodeType = StringCode;
fn code(&self) -> StringCode {
StringCode::Fsst
}
fn compress(
&self,
compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
ctx: CompressorContext,
_excludes: &[StringCode],
) -> VortexResult<ArrayRef> {
let fsst = {
let compressor = fsst_train_compressor(&stats.src);
fsst_compress(&stats.src, &compressor)
};
let compressed_original_lengths = compressor.compress_canonical(
Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
ctx,
Excludes::none(),
)?;
let compressed_codes_offsets = compressor.compress_canonical(
Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
ctx,
Excludes::none(),
)?;
let compressed_codes = VarBinArray::try_new(
compressed_codes_offsets,
fsst.codes().bytes().clone(),
fsst.codes().dtype().clone(),
fsst.codes().validity().clone(),
)?;
let fsst = FSSTArray::try_new(
fsst.dtype().clone(),
fsst.symbols().clone(),
fsst.symbol_lengths().clone(),
compressed_codes,
compressed_original_lengths,
)?;
Ok(fsst.into_array())
}
}
impl Scheme for ConstantScheme {
type StatsType = StringStats;
type CodeType = StringCode;
fn code(&self) -> Self::CodeType {
StringCode::Constant
}
fn is_constant(&self) -> bool {
true
}
fn expected_compression_ratio(
&self,
_compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
ctx: CompressorContext,
_excludes: &[Self::CodeType],
) -> VortexResult<f64> {
if ctx.is_sample {
return Ok(0.0);
}
let mut ctx = LEGACY_SESSION.create_execution_ctx();
if stats.estimated_distinct_count > 1
|| !is_constant(&stats.src.clone().into_array(), &mut ctx)?
{
return Ok(0.0);
}
Ok(f64::MAX)
}
fn compress(
&self,
_compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
_ctx: CompressorContext,
_excludes: &[Self::CodeType],
) -> VortexResult<ArrayRef> {
let scalar_idx =
(0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
match scalar_idx {
Some(idx) => {
let scalar = stats.source().scalar_at(idx)?;
let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
if !stats.source().all_valid()? {
Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
} else {
Ok(const_arr)
}
}
None => Ok(ConstantArray::new(
Scalar::null(stats.src.dtype().clone()),
stats.src.len(),
)
.into_array()),
}
}
}
impl Scheme for NullDominated {
type StatsType = StringStats;
type CodeType = StringCode;
fn code(&self) -> Self::CodeType {
StringCode::Sparse
}
fn expected_compression_ratio(
&self,
_compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
ctx: CompressorContext,
_excludes: &[Self::CodeType],
) -> VortexResult<f64> {
if ctx.allowed_cascading == 0 {
return Ok(0.0);
}
if stats.value_count == 0 {
return Ok(0.0);
}
if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
return Ok(stats.src.len() as f64 / stats.value_count as f64);
}
Ok(0.0)
}
fn compress(
&self,
compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
ctx: CompressorContext,
_excludes: &[Self::CodeType],
) -> VortexResult<ArrayRef> {
assert!(ctx.allowed_cascading > 0);
let sparse_encoded = SparseArray::encode(&stats.src.clone().into_array(), None)?;
if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
let indices = sparse.patches().indices().to_primitive().narrow()?;
let compressed_indices = compressor.compress_canonical(
Canonical::Primitive(indices),
ctx.descend(),
Excludes::int_only(&new_excludes),
)?;
SparseArray::try_new(
compressed_indices,
sparse.patches().values().clone(),
sparse.len(),
sparse.fill_scalar().clone(),
)
.map(|a| a.into_array())
} else {
Ok(sparse_encoded)
}
}
}
#[cfg(feature = "zstd")]
impl Scheme for ZstdScheme {
type StatsType = StringStats;
type CodeType = StringCode;
fn code(&self) -> StringCode {
StringCode::Zstd
}
fn compress(
&self,
_compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
_ctx: CompressorContext,
_excludes: &[StringCode],
) -> VortexResult<ArrayRef> {
let compacted = stats.source().compact_buffers()?;
Ok(
vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
.into_array(),
)
}
}
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
impl Scheme for ZstdBuffersScheme {
type StatsType = StringStats;
type CodeType = StringCode;
fn code(&self) -> StringCode {
StringCode::ZstdBuffers
}
fn compress(
&self,
_compressor: &BtrBlocksCompressor,
stats: &Self::StatsType,
_ctx: CompressorContext,
_excludes: &[StringCode],
) -> VortexResult<ArrayRef> {
Ok(
vortex_zstd::ZstdBuffersArray::compress(&stats.source().clone().into_array(), 3)?
.into_array(),
)
}
}
#[cfg(test)]
mod tests {
use vortex_array::IntoArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::builders::ArrayBuilder;
use vortex_array::builders::VarBinViewBuilder;
use vortex_array::display::DisplayOptions;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_error::VortexResult;
use crate::BtrBlocksCompressor;
#[test]
fn test_strings() -> VortexResult<()> {
let mut strings = Vec::new();
for _ in 0..1024 {
strings.push(Some("hello-world-1234"));
}
for _ in 0..1024 {
strings.push(Some("hello-world-56789"));
}
let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = strings.into_array();
let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
assert_eq!(compressed.len(), 2048);
let display = compressed
.display_as(DisplayOptions::MetadataOnly)
.to_string()
.to_lowercase();
assert_eq!(display, "vortex.dict(utf8, len=2048)");
Ok(())
}
#[test]
fn test_sparse_nulls() -> VortexResult<()> {
let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
strings.append_nulls(99);
strings.append_value("one little string");
let strings = strings.finish_into_varbinview();
let array_ref = strings.into_array();
let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
assert_eq!(compressed.len(), 100);
let display = compressed
.display_as(DisplayOptions::MetadataOnly)
.to_string()
.to_lowercase();
assert_eq!(display, "vortex.sparse(utf8?, len=100)");
Ok(())
}
}
#[cfg(test)]
mod scheme_selection_tests {
use vortex_array::IntoArray;
use vortex_array::arrays::Constant;
use vortex_array::arrays::Dict;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_error::VortexResult;
use vortex_fsst::FSST;
use crate::BtrBlocksCompressor;
#[test]
fn test_constant_compressed() -> VortexResult<()> {
let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = array.into_array();
let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
assert!(compressed.is::<Constant>());
Ok(())
}
#[test]
fn test_dict_compressed() -> VortexResult<()> {
let distinct_values = ["apple", "banana", "cherry"];
let mut strings = Vec::with_capacity(1000);
for i in 0..1000 {
strings.push(Some(distinct_values[i % 3]));
}
let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = array.into_array();
let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
assert!(compressed.is::<Dict>());
Ok(())
}
#[test]
fn test_fsst_compressed() -> VortexResult<()> {
let mut strings = Vec::with_capacity(1000);
for i in 0..1000 {
strings.push(Some(format!(
"this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
)));
}
let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
let array_ref = array.into_array();
let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
assert!(compressed.is::<FSST>());
Ok(())
}
}