Skip to main content

vortex_btrblocks/compressor/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5use std::hash::Hasher;
6
7use enum_iterator::Sequence;
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical;
13use vortex_array::VortexSessionExecute;
14use vortex_array::aggregate_fn::fns::is_constant::is_constant;
15use vortex_array::arrays::ConstantArray;
16use vortex_array::arrays::DictArray;
17use vortex_array::arrays::MaskedArray;
18use vortex_array::arrays::VarBinArray;
19use vortex_array::arrays::VarBinView;
20use vortex_array::arrays::VarBinViewArray;
21use vortex_array::builders::dict::dict_encode;
22use vortex_array::scalar::Scalar;
23use vortex_array::vtable::VTable;
24use vortex_array::vtable::ValidityHelper;
25use vortex_error::VortexExpect;
26use vortex_error::VortexResult;
27use vortex_error::vortex_err;
28use vortex_fsst::FSSTArray;
29use vortex_fsst::fsst_compress;
30use vortex_fsst::fsst_train_compressor;
31use vortex_sparse::Sparse;
32use vortex_sparse::SparseArray;
33use vortex_utils::aliases::hash_set::HashSet;
34
35use super::integer::DictScheme as IntDictScheme;
36use super::integer::SequenceScheme as IntSequenceScheme;
37use super::integer::SparseScheme as IntSparseScheme;
38use crate::BtrBlocksCompressor;
39use crate::CanonicalCompressor;
40use crate::Compressor;
41use crate::CompressorContext;
42use crate::CompressorStats;
43use crate::Excludes;
44use crate::GenerateStatsOptions;
45use crate::IntCode;
46use crate::Scheme;
47use crate::SchemeExt;
48use crate::sample::sample;
49
50/// Array of variable-length byte arrays, and relevant stats for compression.
51#[derive(Clone, Debug)]
52pub struct StringStats {
53    src: VarBinViewArray,
54    estimated_distinct_count: u32,
55    value_count: u32,
56    null_count: u32,
57}
58
59/// Estimate the number of distinct strings in the var bin view array.
60fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
61    let views = strings.views();
62    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
63    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
64    // share a 4-byte prefix and have the same length.
65    let mut distinct = HashSet::with_capacity(views.len() / 2);
66    views.iter().for_each(|&view| {
67        #[expect(
68            clippy::cast_possible_truncation,
69            reason = "approximate uniqueness with view prefix"
70        )]
71        let len_and_prefix = view.as_u128() as u64;
72        distinct.insert(len_and_prefix);
73    });
74
75    Ok(u32::try_from(distinct.len())?)
76}
77
78impl StringStats {
79    fn generate_opts_fallible(
80        input: &VarBinViewArray,
81        opts: GenerateStatsOptions,
82    ) -> VortexResult<Self> {
83        let null_count = input
84            .statistics()
85            .compute_null_count()
86            .ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
87        let value_count = input.len() - null_count;
88        let estimated_distinct = if opts.count_distinct_values {
89            estimate_distinct_count(input)?
90        } else {
91            u32::MAX
92        };
93
94        Ok(Self {
95            src: input.clone(),
96            value_count: u32::try_from(value_count)?,
97            null_count: u32::try_from(null_count)?,
98            estimated_distinct_count: estimated_distinct,
99        })
100    }
101}
102
103impl CompressorStats for StringStats {
104    type ArrayVTable = VarBinView;
105
106    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
107        Self::generate_opts_fallible(input, opts)
108            .vortex_expect("StringStats::generate_opts should not fail")
109    }
110
111    fn source(&self) -> &VarBinViewArray {
112        &self.src
113    }
114
115    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
116        let sampled =
117            sample(&self.src.clone().into_array(), sample_size, sample_count).to_varbinview();
118
119        Self::generate_opts(&sampled, opts)
120    }
121}
122
123/// All available string compression schemes.
124pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
125    &UncompressedScheme,
126    &DictScheme,
127    &FSSTScheme,
128    &ConstantScheme,
129    &NullDominated,
130    #[cfg(feature = "zstd")]
131    &ZstdScheme,
132    #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
133    &ZstdBuffersScheme,
134];
135
136/// [`Compressor`] for strings.
137#[derive(Clone, Copy)]
138pub struct StringCompressor<'a> {
139    /// Reference to the parent compressor.
140    pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
141}
142
143impl<'a> Compressor for StringCompressor<'a> {
144    type ArrayVTable = VarBinView;
145    type SchemeType = dyn StringScheme;
146    type StatsType = StringStats;
147
148    fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
149        if self
150            .btr_blocks_compressor
151            .string_schemes()
152            .iter()
153            .any(|s| s.code() == DictScheme.code())
154        {
155            StringStats::generate_opts(
156                array,
157                GenerateStatsOptions {
158                    count_distinct_values: true,
159                },
160            )
161        } else {
162            StringStats::generate_opts(
163                array,
164                GenerateStatsOptions {
165                    count_distinct_values: false,
166                },
167            )
168        }
169    }
170
171    fn schemes(&self) -> &[&'static dyn StringScheme] {
172        self.btr_blocks_compressor.string_schemes()
173    }
174
175    fn default_scheme(&self) -> &'static Self::SchemeType {
176        &UncompressedScheme
177    }
178}
179
180pub trait StringScheme:
181    Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
182{
183}
184
185impl<T> StringScheme for T where
186    T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
187{
188}
189
190impl PartialEq for dyn StringScheme {
191    fn eq(&self, other: &Self) -> bool {
192        self.code() == other.code()
193    }
194}
195
196impl Eq for dyn StringScheme {}
197
198impl Hash for dyn StringScheme {
199    fn hash<H: Hasher>(&self, state: &mut H) {
200        self.code().hash(state)
201    }
202}
203
204#[derive(Debug, Copy, Clone, PartialEq, Eq)]
205pub struct UncompressedScheme;
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq)]
208pub struct DictScheme;
209
210#[derive(Debug, Copy, Clone, PartialEq, Eq)]
211pub struct FSSTScheme;
212
213#[derive(Debug, Copy, Clone, PartialEq, Eq)]
214pub struct ConstantScheme;
215
216#[derive(Debug, Copy, Clone, PartialEq, Eq)]
217pub struct NullDominated;
218
219/// Zstd compression without dictionaries (nvCOMP compatible).
220#[cfg(feature = "zstd")]
221#[derive(Debug, Copy, Clone, PartialEq, Eq)]
222pub struct ZstdScheme;
223
224/// Zstd buffer-level compression preserving array layout for GPU decompression.
225#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
226#[derive(Debug, Copy, Clone, PartialEq, Eq)]
227pub struct ZstdBuffersScheme;
228
229/// Unique identifier for string compression schemes.
230#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
231pub enum StringCode {
232    /// No compression applied.
233    Uncompressed,
234    /// Dictionary encoding for low-cardinality strings.
235    Dict,
236    /// FSST (Fast Static Symbol Table) compression.
237    Fsst,
238    /// Constant encoding for arrays with a single distinct value.
239    Constant,
240    /// Sparse encoding for null-dominated arrays.
241    Sparse,
242    /// Zstd compression without dictionaries.
243    Zstd,
244    /// Zstd buffer-level compression preserving array layout.
245    ZstdBuffers,
246}
247
248impl Scheme for UncompressedScheme {
249    type StatsType = StringStats;
250    type CodeType = StringCode;
251
252    fn code(&self) -> StringCode {
253        StringCode::Uncompressed
254    }
255
256    fn expected_compression_ratio(
257        &self,
258        _compressor: &BtrBlocksCompressor,
259        _stats: &Self::StatsType,
260        _ctx: CompressorContext,
261        _excludes: &[StringCode],
262    ) -> VortexResult<f64> {
263        Ok(1.0)
264    }
265
266    fn compress(
267        &self,
268        _compressor: &BtrBlocksCompressor,
269        stats: &Self::StatsType,
270        _ctx: CompressorContext,
271        _excludes: &[StringCode],
272    ) -> VortexResult<ArrayRef> {
273        Ok(stats.source().clone().into_array())
274    }
275}
276
277impl Scheme for DictScheme {
278    type StatsType = StringStats;
279    type CodeType = StringCode;
280
281    fn code(&self) -> StringCode {
282        StringCode::Dict
283    }
284
285    fn expected_compression_ratio(
286        &self,
287        compressor: &BtrBlocksCompressor,
288        stats: &Self::StatsType,
289        ctx: CompressorContext,
290        excludes: &[StringCode],
291    ) -> VortexResult<f64> {
292        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
293        if stats.estimated_distinct_count > stats.value_count / 2 {
294            return Ok(0.0);
295        }
296
297        // If array is all null, do not attempt dict.
298        if stats.value_count == 0 {
299            return Ok(0.0);
300        }
301
302        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
303    }
304
305    fn compress(
306        &self,
307        compressor: &BtrBlocksCompressor,
308        stats: &Self::StatsType,
309        ctx: CompressorContext,
310        _excludes: &[StringCode],
311    ) -> VortexResult<ArrayRef> {
312        let dict = dict_encode(&stats.source().clone().into_array())?;
313
314        // If we are not allowed to cascade, do not attempt codes or values compression.
315        if ctx.allowed_cascading == 0 {
316            return Ok(dict.into_array());
317        }
318
319        // Find best compressor for codes and values separately
320        let compressed_codes = compressor.compress_canonical(
321            Canonical::Primitive(dict.codes().to_primitive()),
322            ctx.descend(),
323            Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
324        )?;
325
326        // Attempt to compress the values with non-Dict compression.
327        // Currently this will only be FSST.
328        let compressed_values = compressor.compress_canonical(
329            Canonical::VarBinView(dict.values().to_varbinview()),
330            ctx.descend(),
331            Excludes::from(&[DictScheme.code()]),
332        )?;
333
334        // SAFETY: compressing codes or values does not alter the invariants
335        unsafe {
336            Ok(
337                DictArray::new_unchecked(compressed_codes, compressed_values)
338                    .set_all_values_referenced(dict.has_all_values_referenced())
339                    .into_array(),
340            )
341        }
342    }
343}
344
345impl Scheme for FSSTScheme {
346    type StatsType = StringStats;
347    type CodeType = StringCode;
348
349    fn code(&self) -> StringCode {
350        StringCode::Fsst
351    }
352
353    fn compress(
354        &self,
355        compressor: &BtrBlocksCompressor,
356        stats: &Self::StatsType,
357        ctx: CompressorContext,
358        _excludes: &[StringCode],
359    ) -> VortexResult<ArrayRef> {
360        let fsst = {
361            let compressor = fsst_train_compressor(&stats.src);
362            fsst_compress(&stats.src, &compressor)
363        };
364
365        let compressed_original_lengths = compressor.compress_canonical(
366            Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
367            ctx,
368            Excludes::none(),
369        )?;
370
371        let compressed_codes_offsets = compressor.compress_canonical(
372            Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
373            ctx,
374            Excludes::none(),
375        )?;
376        let compressed_codes = VarBinArray::try_new(
377            compressed_codes_offsets,
378            fsst.codes().bytes().clone(),
379            fsst.codes().dtype().clone(),
380            fsst.codes().validity().clone(),
381        )?;
382
383        let fsst = FSSTArray::try_new(
384            fsst.dtype().clone(),
385            fsst.symbols().clone(),
386            fsst.symbol_lengths().clone(),
387            compressed_codes,
388            compressed_original_lengths,
389        )?;
390
391        Ok(fsst.into_array())
392    }
393}
394
395impl Scheme for ConstantScheme {
396    type StatsType = StringStats;
397    type CodeType = StringCode;
398
399    fn code(&self) -> Self::CodeType {
400        StringCode::Constant
401    }
402
403    fn is_constant(&self) -> bool {
404        true
405    }
406
407    fn expected_compression_ratio(
408        &self,
409        _compressor: &BtrBlocksCompressor,
410        stats: &Self::StatsType,
411        ctx: CompressorContext,
412        _excludes: &[Self::CodeType],
413    ) -> VortexResult<f64> {
414        if ctx.is_sample {
415            return Ok(0.0);
416        }
417
418        let mut ctx = LEGACY_SESSION.create_execution_ctx();
419        if stats.estimated_distinct_count > 1
420            || !is_constant(&stats.src.clone().into_array(), &mut ctx)?
421        {
422            return Ok(0.0);
423        }
424
425        // Force constant is these cases
426        Ok(f64::MAX)
427    }
428
429    fn compress(
430        &self,
431        _compressor: &BtrBlocksCompressor,
432        stats: &Self::StatsType,
433        _ctx: CompressorContext,
434        _excludes: &[Self::CodeType],
435    ) -> VortexResult<ArrayRef> {
436        let scalar_idx =
437            (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
438
439        match scalar_idx {
440            Some(idx) => {
441                let scalar = stats.source().scalar_at(idx)?;
442                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
443                if !stats.source().all_valid()? {
444                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
445                } else {
446                    Ok(const_arr)
447                }
448            }
449            None => Ok(ConstantArray::new(
450                Scalar::null(stats.src.dtype().clone()),
451                stats.src.len(),
452            )
453            .into_array()),
454        }
455    }
456}
457
458impl Scheme for NullDominated {
459    type StatsType = StringStats;
460    type CodeType = StringCode;
461
462    fn code(&self) -> Self::CodeType {
463        StringCode::Sparse
464    }
465
466    fn expected_compression_ratio(
467        &self,
468        _compressor: &BtrBlocksCompressor,
469        stats: &Self::StatsType,
470        ctx: CompressorContext,
471        _excludes: &[Self::CodeType],
472    ) -> VortexResult<f64> {
473        // Only use `SparseScheme` if we can cascade.
474        if ctx.allowed_cascading == 0 {
475            return Ok(0.0);
476        }
477
478        if stats.value_count == 0 {
479            // All nulls should use ConstantScheme
480            return Ok(0.0);
481        }
482
483        // If the majority is null, will compress well.
484        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
485            return Ok(stats.src.len() as f64 / stats.value_count as f64);
486        }
487
488        // Otherwise we don't go this route
489        Ok(0.0)
490    }
491
492    fn compress(
493        &self,
494        compressor: &BtrBlocksCompressor,
495        stats: &Self::StatsType,
496        ctx: CompressorContext,
497        _excludes: &[Self::CodeType],
498    ) -> VortexResult<ArrayRef> {
499        assert!(ctx.allowed_cascading > 0);
500
501        // We pass None as we only run this pathway for NULL-dominated string arrays
502        let sparse_encoded = SparseArray::encode(&stats.src.clone().into_array(), None)?;
503
504        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
505            // Compress the indices only (not the values for strings)
506            let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
507
508            let indices = sparse.patches().indices().to_primitive().narrow()?;
509            let compressed_indices = compressor.compress_canonical(
510                Canonical::Primitive(indices),
511                ctx.descend(),
512                Excludes::int_only(&new_excludes),
513            )?;
514
515            SparseArray::try_new(
516                compressed_indices,
517                sparse.patches().values().clone(),
518                sparse.len(),
519                sparse.fill_scalar().clone(),
520            )
521            .map(|a| a.into_array())
522        } else {
523            Ok(sparse_encoded)
524        }
525    }
526}
527
528#[cfg(feature = "zstd")]
529impl Scheme for ZstdScheme {
530    type StatsType = StringStats;
531    type CodeType = StringCode;
532
533    fn code(&self) -> StringCode {
534        StringCode::Zstd
535    }
536
537    fn compress(
538        &self,
539        _compressor: &BtrBlocksCompressor,
540        stats: &Self::StatsType,
541        _ctx: CompressorContext,
542        _excludes: &[StringCode],
543    ) -> VortexResult<ArrayRef> {
544        let compacted = stats.source().compact_buffers()?;
545        Ok(
546            vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
547                .into_array(),
548        )
549    }
550}
551
552#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
553impl Scheme for ZstdBuffersScheme {
554    type StatsType = StringStats;
555    type CodeType = StringCode;
556
557    fn code(&self) -> StringCode {
558        StringCode::ZstdBuffers
559    }
560
561    fn compress(
562        &self,
563        _compressor: &BtrBlocksCompressor,
564        stats: &Self::StatsType,
565        _ctx: CompressorContext,
566        _excludes: &[StringCode],
567    ) -> VortexResult<ArrayRef> {
568        Ok(
569            vortex_zstd::ZstdBuffersArray::compress(&stats.source().clone().into_array(), 3)?
570                .into_array(),
571        )
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use vortex_array::IntoArray;
578    use vortex_array::arrays::VarBinViewArray;
579    use vortex_array::builders::ArrayBuilder;
580    use vortex_array::builders::VarBinViewBuilder;
581    use vortex_array::display::DisplayOptions;
582    use vortex_array::dtype::DType;
583    use vortex_array::dtype::Nullability;
584    use vortex_error::VortexResult;
585
586    use crate::BtrBlocksCompressor;
587
588    #[test]
589    fn test_strings() -> VortexResult<()> {
590        let mut strings = Vec::new();
591        for _ in 0..1024 {
592            strings.push(Some("hello-world-1234"));
593        }
594        for _ in 0..1024 {
595            strings.push(Some("hello-world-56789"));
596        }
597        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
598
599        let array_ref = strings.into_array();
600        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
601        assert_eq!(compressed.len(), 2048);
602
603        let display = compressed
604            .display_as(DisplayOptions::MetadataOnly)
605            .to_string()
606            .to_lowercase();
607        assert_eq!(display, "vortex.dict(utf8, len=2048)");
608
609        Ok(())
610    }
611
612    #[test]
613    fn test_sparse_nulls() -> VortexResult<()> {
614        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
615        strings.append_nulls(99);
616
617        strings.append_value("one little string");
618
619        let strings = strings.finish_into_varbinview();
620
621        let array_ref = strings.into_array();
622        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
623        assert_eq!(compressed.len(), 100);
624
625        let display = compressed
626            .display_as(DisplayOptions::MetadataOnly)
627            .to_string()
628            .to_lowercase();
629        assert_eq!(display, "vortex.sparse(utf8?, len=100)");
630
631        Ok(())
632    }
633}
634
635/// Tests to verify that each string compression scheme produces the expected encoding.
636#[cfg(test)]
637mod scheme_selection_tests {
638    use vortex_array::IntoArray;
639    use vortex_array::arrays::Constant;
640    use vortex_array::arrays::Dict;
641    use vortex_array::arrays::VarBinViewArray;
642    use vortex_array::dtype::DType;
643    use vortex_array::dtype::Nullability;
644    use vortex_error::VortexResult;
645    use vortex_fsst::FSST;
646
647    use crate::BtrBlocksCompressor;
648
649    #[test]
650    fn test_constant_compressed() -> VortexResult<()> {
651        let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
652        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
653        let array_ref = array.into_array();
654        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
655        assert!(compressed.is::<Constant>());
656        Ok(())
657    }
658
659    #[test]
660    fn test_dict_compressed() -> VortexResult<()> {
661        let distinct_values = ["apple", "banana", "cherry"];
662        let mut strings = Vec::with_capacity(1000);
663        for i in 0..1000 {
664            strings.push(Some(distinct_values[i % 3]));
665        }
666        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
667        let array_ref = array.into_array();
668        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
669        assert!(compressed.is::<Dict>());
670        Ok(())
671    }
672
673    #[test]
674    fn test_fsst_compressed() -> VortexResult<()> {
675        let mut strings = Vec::with_capacity(1000);
676        for i in 0..1000 {
677            strings.push(Some(format!(
678                "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
679            )));
680        }
681        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
682        let array_ref = array.into_array();
683        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
684        assert!(compressed.is::<FSST>());
685        Ok(())
686    }
687}