Skip to main content

vortex_btrblocks/compressor/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5use std::hash::Hasher;
6
7use enum_iterator::Sequence;
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_array::ToCanonical;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::DictArray;
14use vortex_array::arrays::MaskedArray;
15use vortex_array::arrays::VarBinArray;
16use vortex_array::arrays::VarBinView;
17use vortex_array::arrays::VarBinViewArray;
18use vortex_array::builders::dict::dict_encode;
19use vortex_array::compute::is_constant;
20use vortex_array::scalar::Scalar;
21use vortex_array::vtable::VTable;
22use vortex_array::vtable::ValidityHelper;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_error::vortex_err;
26use vortex_fsst::FSSTArray;
27use vortex_fsst::fsst_compress;
28use vortex_fsst::fsst_train_compressor;
29use vortex_sparse::Sparse;
30use vortex_sparse::SparseArray;
31use vortex_utils::aliases::hash_set::HashSet;
32
33use super::integer::DictScheme as IntDictScheme;
34use super::integer::SequenceScheme as IntSequenceScheme;
35use super::integer::SparseScheme as IntSparseScheme;
36use crate::BtrBlocksCompressor;
37use crate::CanonicalCompressor;
38use crate::Compressor;
39use crate::CompressorContext;
40use crate::CompressorStats;
41use crate::Excludes;
42use crate::GenerateStatsOptions;
43use crate::IntCode;
44use crate::Scheme;
45use crate::SchemeExt;
46use crate::sample::sample;
47
48/// Array of variable-length byte arrays, and relevant stats for compression.
49#[derive(Clone, Debug)]
50pub struct StringStats {
51    src: VarBinViewArray,
52    estimated_distinct_count: u32,
53    value_count: u32,
54    null_count: u32,
55}
56
57/// Estimate the number of distinct strings in the var bin view array.
58fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
59    let views = strings.views();
60    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
61    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
62    // share a 4-byte prefix and have the same length.
63    let mut distinct = HashSet::with_capacity(views.len() / 2);
64    views.iter().for_each(|&view| {
65        #[expect(
66            clippy::cast_possible_truncation,
67            reason = "approximate uniqueness with view prefix"
68        )]
69        let len_and_prefix = view.as_u128() as u64;
70        distinct.insert(len_and_prefix);
71    });
72
73    Ok(u32::try_from(distinct.len())?)
74}
75
76impl StringStats {
77    fn generate_opts_fallible(
78        input: &VarBinViewArray,
79        opts: GenerateStatsOptions,
80    ) -> VortexResult<Self> {
81        let null_count = input
82            .statistics()
83            .compute_null_count()
84            .ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
85        let value_count = input.len() - null_count;
86        let estimated_distinct = if opts.count_distinct_values {
87            estimate_distinct_count(input)?
88        } else {
89            u32::MAX
90        };
91
92        Ok(Self {
93            src: input.clone(),
94            value_count: u32::try_from(value_count)?,
95            null_count: u32::try_from(null_count)?,
96            estimated_distinct_count: estimated_distinct,
97        })
98    }
99}
100
101impl CompressorStats for StringStats {
102    type ArrayVTable = VarBinView;
103
104    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
105        Self::generate_opts_fallible(input, opts)
106            .vortex_expect("StringStats::generate_opts should not fail")
107    }
108
109    fn source(&self) -> &VarBinViewArray {
110        &self.src
111    }
112
113    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
114        let sampled =
115            sample(&self.src.clone().into_array(), sample_size, sample_count).to_varbinview();
116
117        Self::generate_opts(&sampled, opts)
118    }
119}
120
121/// All available string compression schemes.
122pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
123    &UncompressedScheme,
124    &DictScheme,
125    &FSSTScheme,
126    &ConstantScheme,
127    &NullDominated,
128    #[cfg(feature = "zstd")]
129    &ZstdScheme,
130    #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
131    &ZstdBuffersScheme,
132];
133
134/// [`Compressor`] for strings.
135#[derive(Clone, Copy)]
136pub struct StringCompressor<'a> {
137    /// Reference to the parent compressor.
138    pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
139}
140
141impl<'a> Compressor for StringCompressor<'a> {
142    type ArrayVTable = VarBinView;
143    type SchemeType = dyn StringScheme;
144    type StatsType = StringStats;
145
146    fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
147        if self
148            .btr_blocks_compressor
149            .string_schemes()
150            .iter()
151            .any(|s| s.code() == DictScheme.code())
152        {
153            StringStats::generate_opts(
154                array,
155                GenerateStatsOptions {
156                    count_distinct_values: true,
157                },
158            )
159        } else {
160            StringStats::generate_opts(
161                array,
162                GenerateStatsOptions {
163                    count_distinct_values: false,
164                },
165            )
166        }
167    }
168
169    fn schemes(&self) -> &[&'static dyn StringScheme] {
170        self.btr_blocks_compressor.string_schemes()
171    }
172
173    fn default_scheme(&self) -> &'static Self::SchemeType {
174        &UncompressedScheme
175    }
176}
177
178pub trait StringScheme:
179    Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
180{
181}
182
183impl<T> StringScheme for T where
184    T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
185{
186}
187
188impl PartialEq for dyn StringScheme {
189    fn eq(&self, other: &Self) -> bool {
190        self.code() == other.code()
191    }
192}
193
194impl Eq for dyn StringScheme {}
195
196impl Hash for dyn StringScheme {
197    fn hash<H: Hasher>(&self, state: &mut H) {
198        self.code().hash(state)
199    }
200}
201
202#[derive(Debug, Copy, Clone, PartialEq, Eq)]
203pub struct UncompressedScheme;
204
205#[derive(Debug, Copy, Clone, PartialEq, Eq)]
206pub struct DictScheme;
207
208#[derive(Debug, Copy, Clone, PartialEq, Eq)]
209pub struct FSSTScheme;
210
211#[derive(Debug, Copy, Clone, PartialEq, Eq)]
212pub struct ConstantScheme;
213
214#[derive(Debug, Copy, Clone, PartialEq, Eq)]
215pub struct NullDominated;
216
217/// Zstd compression without dictionaries (nvCOMP compatible).
218#[cfg(feature = "zstd")]
219#[derive(Debug, Copy, Clone, PartialEq, Eq)]
220pub struct ZstdScheme;
221
222/// Zstd buffer-level compression preserving array layout for GPU decompression.
223#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
224#[derive(Debug, Copy, Clone, PartialEq, Eq)]
225pub struct ZstdBuffersScheme;
226
227/// Unique identifier for string compression schemes.
228#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
229pub enum StringCode {
230    /// No compression applied.
231    Uncompressed,
232    /// Dictionary encoding for low-cardinality strings.
233    Dict,
234    /// FSST (Fast Static Symbol Table) compression.
235    Fsst,
236    /// Constant encoding for arrays with a single distinct value.
237    Constant,
238    /// Sparse encoding for null-dominated arrays.
239    Sparse,
240    /// Zstd compression without dictionaries.
241    Zstd,
242    /// Zstd buffer-level compression preserving array layout.
243    ZstdBuffers,
244}
245
246impl Scheme for UncompressedScheme {
247    type StatsType = StringStats;
248    type CodeType = StringCode;
249
250    fn code(&self) -> StringCode {
251        StringCode::Uncompressed
252    }
253
254    fn expected_compression_ratio(
255        &self,
256        _compressor: &BtrBlocksCompressor,
257        _stats: &Self::StatsType,
258        _ctx: CompressorContext,
259        _excludes: &[StringCode],
260    ) -> VortexResult<f64> {
261        Ok(1.0)
262    }
263
264    fn compress(
265        &self,
266        _compressor: &BtrBlocksCompressor,
267        stats: &Self::StatsType,
268        _ctx: CompressorContext,
269        _excludes: &[StringCode],
270    ) -> VortexResult<ArrayRef> {
271        Ok(stats.source().clone().into_array())
272    }
273}
274
275impl Scheme for DictScheme {
276    type StatsType = StringStats;
277    type CodeType = StringCode;
278
279    fn code(&self) -> StringCode {
280        StringCode::Dict
281    }
282
283    fn expected_compression_ratio(
284        &self,
285        compressor: &BtrBlocksCompressor,
286        stats: &Self::StatsType,
287        ctx: CompressorContext,
288        excludes: &[StringCode],
289    ) -> VortexResult<f64> {
290        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
291        if stats.estimated_distinct_count > stats.value_count / 2 {
292            return Ok(0.0);
293        }
294
295        // If array is all null, do not attempt dict.
296        if stats.value_count == 0 {
297            return Ok(0.0);
298        }
299
300        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
301    }
302
303    fn compress(
304        &self,
305        compressor: &BtrBlocksCompressor,
306        stats: &Self::StatsType,
307        ctx: CompressorContext,
308        _excludes: &[StringCode],
309    ) -> VortexResult<ArrayRef> {
310        let dict = dict_encode(&stats.source().clone().into_array())?;
311
312        // If we are not allowed to cascade, do not attempt codes or values compression.
313        if ctx.allowed_cascading == 0 {
314            return Ok(dict.into_array());
315        }
316
317        // Find best compressor for codes and values separately
318        let compressed_codes = compressor.compress_canonical(
319            Canonical::Primitive(dict.codes().to_primitive()),
320            ctx.descend(),
321            Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
322        )?;
323
324        // Attempt to compress the values with non-Dict compression.
325        // Currently this will only be FSST.
326        let compressed_values = compressor.compress_canonical(
327            Canonical::VarBinView(dict.values().to_varbinview()),
328            ctx.descend(),
329            Excludes::from(&[DictScheme.code()]),
330        )?;
331
332        // SAFETY: compressing codes or values does not alter the invariants
333        unsafe {
334            Ok(
335                DictArray::new_unchecked(compressed_codes, compressed_values)
336                    .set_all_values_referenced(dict.has_all_values_referenced())
337                    .into_array(),
338            )
339        }
340    }
341}
342
343impl Scheme for FSSTScheme {
344    type StatsType = StringStats;
345    type CodeType = StringCode;
346
347    fn code(&self) -> StringCode {
348        StringCode::Fsst
349    }
350
351    fn compress(
352        &self,
353        compressor: &BtrBlocksCompressor,
354        stats: &Self::StatsType,
355        ctx: CompressorContext,
356        _excludes: &[StringCode],
357    ) -> VortexResult<ArrayRef> {
358        let fsst = {
359            let compressor = fsst_train_compressor(&stats.src);
360            fsst_compress(&stats.src, &compressor)
361        };
362
363        let compressed_original_lengths = compressor.compress_canonical(
364            Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
365            ctx,
366            Excludes::none(),
367        )?;
368
369        let compressed_codes_offsets = compressor.compress_canonical(
370            Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
371            ctx,
372            Excludes::none(),
373        )?;
374        let compressed_codes = VarBinArray::try_new(
375            compressed_codes_offsets,
376            fsst.codes().bytes().clone(),
377            fsst.codes().dtype().clone(),
378            fsst.codes().validity().clone(),
379        )?;
380
381        let fsst = FSSTArray::try_new(
382            fsst.dtype().clone(),
383            fsst.symbols().clone(),
384            fsst.symbol_lengths().clone(),
385            compressed_codes,
386            compressed_original_lengths,
387        )?;
388
389        Ok(fsst.into_array())
390    }
391}
392
393impl Scheme for ConstantScheme {
394    type StatsType = StringStats;
395    type CodeType = StringCode;
396
397    fn code(&self) -> Self::CodeType {
398        StringCode::Constant
399    }
400
401    fn is_constant(&self) -> bool {
402        true
403    }
404
405    fn expected_compression_ratio(
406        &self,
407        _compressor: &BtrBlocksCompressor,
408        stats: &Self::StatsType,
409        ctx: CompressorContext,
410        _excludes: &[Self::CodeType],
411    ) -> VortexResult<f64> {
412        if ctx.is_sample {
413            return Ok(0.0);
414        }
415
416        if stats.estimated_distinct_count > 1
417            || !is_constant(&stats.src.clone().into_array())?.unwrap_or(false)
418        {
419            return Ok(0.0);
420        }
421
422        // Force constant is these cases
423        Ok(f64::MAX)
424    }
425
426    fn compress(
427        &self,
428        _compressor: &BtrBlocksCompressor,
429        stats: &Self::StatsType,
430        _ctx: CompressorContext,
431        _excludes: &[Self::CodeType],
432    ) -> VortexResult<ArrayRef> {
433        let scalar_idx =
434            (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
435
436        match scalar_idx {
437            Some(idx) => {
438                let scalar = stats.source().scalar_at(idx)?;
439                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
440                if !stats.source().all_valid()? {
441                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
442                } else {
443                    Ok(const_arr)
444                }
445            }
446            None => Ok(ConstantArray::new(
447                Scalar::null(stats.src.dtype().clone()),
448                stats.src.len(),
449            )
450            .into_array()),
451        }
452    }
453}
454
455impl Scheme for NullDominated {
456    type StatsType = StringStats;
457    type CodeType = StringCode;
458
459    fn code(&self) -> Self::CodeType {
460        StringCode::Sparse
461    }
462
463    fn expected_compression_ratio(
464        &self,
465        _compressor: &BtrBlocksCompressor,
466        stats: &Self::StatsType,
467        ctx: CompressorContext,
468        _excludes: &[Self::CodeType],
469    ) -> VortexResult<f64> {
470        // Only use `SparseScheme` if we can cascade.
471        if ctx.allowed_cascading == 0 {
472            return Ok(0.0);
473        }
474
475        if stats.value_count == 0 {
476            // All nulls should use ConstantScheme
477            return Ok(0.0);
478        }
479
480        // If the majority is null, will compress well.
481        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
482            return Ok(stats.src.len() as f64 / stats.value_count as f64);
483        }
484
485        // Otherwise we don't go this route
486        Ok(0.0)
487    }
488
489    fn compress(
490        &self,
491        compressor: &BtrBlocksCompressor,
492        stats: &Self::StatsType,
493        ctx: CompressorContext,
494        _excludes: &[Self::CodeType],
495    ) -> VortexResult<ArrayRef> {
496        assert!(ctx.allowed_cascading > 0);
497
498        // We pass None as we only run this pathway for NULL-dominated string arrays
499        let sparse_encoded = SparseArray::encode(&stats.src.clone().into_array(), None)?;
500
501        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
502            // Compress the indices only (not the values for strings)
503            let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
504
505            let indices = sparse.patches().indices().to_primitive().narrow()?;
506            let compressed_indices = compressor.compress_canonical(
507                Canonical::Primitive(indices),
508                ctx.descend(),
509                Excludes::int_only(&new_excludes),
510            )?;
511
512            SparseArray::try_new(
513                compressed_indices,
514                sparse.patches().values().clone(),
515                sparse.len(),
516                sparse.fill_scalar().clone(),
517            )
518            .map(|a| a.into_array())
519        } else {
520            Ok(sparse_encoded)
521        }
522    }
523}
524
525#[cfg(feature = "zstd")]
526impl Scheme for ZstdScheme {
527    type StatsType = StringStats;
528    type CodeType = StringCode;
529
530    fn code(&self) -> StringCode {
531        StringCode::Zstd
532    }
533
534    fn compress(
535        &self,
536        _compressor: &BtrBlocksCompressor,
537        stats: &Self::StatsType,
538        _ctx: CompressorContext,
539        _excludes: &[StringCode],
540    ) -> VortexResult<ArrayRef> {
541        let compacted = stats.source().compact_buffers()?;
542        Ok(
543            vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
544                .into_array(),
545        )
546    }
547}
548
549#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
550impl Scheme for ZstdBuffersScheme {
551    type StatsType = StringStats;
552    type CodeType = StringCode;
553
554    fn code(&self) -> StringCode {
555        StringCode::ZstdBuffers
556    }
557
558    fn compress(
559        &self,
560        _compressor: &BtrBlocksCompressor,
561        stats: &Self::StatsType,
562        _ctx: CompressorContext,
563        _excludes: &[StringCode],
564    ) -> VortexResult<ArrayRef> {
565        Ok(
566            vortex_zstd::ZstdBuffersArray::compress(&stats.source().clone().into_array(), 3)?
567                .into_array(),
568        )
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use vortex_array::IntoArray;
575    use vortex_array::arrays::VarBinViewArray;
576    use vortex_array::builders::ArrayBuilder;
577    use vortex_array::builders::VarBinViewBuilder;
578    use vortex_array::display::DisplayOptions;
579    use vortex_array::dtype::DType;
580    use vortex_array::dtype::Nullability;
581    use vortex_error::VortexResult;
582
583    use crate::BtrBlocksCompressor;
584
585    #[test]
586    fn test_strings() -> VortexResult<()> {
587        let mut strings = Vec::new();
588        for _ in 0..1024 {
589            strings.push(Some("hello-world-1234"));
590        }
591        for _ in 0..1024 {
592            strings.push(Some("hello-world-56789"));
593        }
594        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
595
596        let array_ref = strings.into_array();
597        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
598        assert_eq!(compressed.len(), 2048);
599
600        let display = compressed
601            .display_as(DisplayOptions::MetadataOnly)
602            .to_string()
603            .to_lowercase();
604        assert_eq!(display, "vortex.dict(utf8, len=2048)");
605
606        Ok(())
607    }
608
609    #[test]
610    fn test_sparse_nulls() -> VortexResult<()> {
611        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
612        strings.append_nulls(99);
613
614        strings.append_value("one little string");
615
616        let strings = strings.finish_into_varbinview();
617
618        let array_ref = strings.into_array();
619        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
620        assert_eq!(compressed.len(), 100);
621
622        let display = compressed
623            .display_as(DisplayOptions::MetadataOnly)
624            .to_string()
625            .to_lowercase();
626        assert_eq!(display, "vortex.sparse(utf8?, len=100)");
627
628        Ok(())
629    }
630}
631
632/// Tests to verify that each string compression scheme produces the expected encoding.
633#[cfg(test)]
634mod scheme_selection_tests {
635    use vortex_array::IntoArray;
636    use vortex_array::arrays::Constant;
637    use vortex_array::arrays::Dict;
638    use vortex_array::arrays::VarBinViewArray;
639    use vortex_array::dtype::DType;
640    use vortex_array::dtype::Nullability;
641    use vortex_error::VortexResult;
642    use vortex_fsst::FSST;
643
644    use crate::BtrBlocksCompressor;
645
646    #[test]
647    fn test_constant_compressed() -> VortexResult<()> {
648        let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
649        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
650        let array_ref = array.into_array();
651        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
652        assert!(compressed.is::<Constant>());
653        Ok(())
654    }
655
656    #[test]
657    fn test_dict_compressed() -> VortexResult<()> {
658        let distinct_values = ["apple", "banana", "cherry"];
659        let mut strings = Vec::with_capacity(1000);
660        for i in 0..1000 {
661            strings.push(Some(distinct_values[i % 3]));
662        }
663        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
664        let array_ref = array.into_array();
665        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
666        assert!(compressed.is::<Dict>());
667        Ok(())
668    }
669
670    #[test]
671    fn test_fsst_compressed() -> VortexResult<()> {
672        let mut strings = Vec::with_capacity(1000);
673        for i in 0..1000 {
674            strings.push(Some(format!(
675                "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
676            )));
677        }
678        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
679        let array_ref = array.into_array();
680        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
681        assert!(compressed.is::<FSST>());
682        Ok(())
683    }
684}