Skip to main content

vortex_btrblocks/compressor/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5use std::hash::Hasher;
6
7use enum_iterator::Sequence;
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_array::ToCanonical;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::DictArray;
14use vortex_array::arrays::MaskedArray;
15use vortex_array::arrays::VarBinArray;
16use vortex_array::arrays::VarBinViewArray;
17use vortex_array::arrays::VarBinViewVTable;
18use vortex_array::builders::dict::dict_encode;
19use vortex_array::compute::is_constant;
20use vortex_array::scalar::Scalar;
21use vortex_array::vtable::VTable;
22use vortex_array::vtable::ValidityHelper;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_error::vortex_err;
26use vortex_fsst::FSSTArray;
27use vortex_fsst::fsst_compress;
28use vortex_fsst::fsst_train_compressor;
29use vortex_sparse::SparseArray;
30use vortex_sparse::SparseVTable;
31use vortex_utils::aliases::hash_set::HashSet;
32
33use super::integer::DictScheme as IntDictScheme;
34use super::integer::SequenceScheme as IntSequenceScheme;
35use super::integer::SparseScheme as IntSparseScheme;
36use crate::BtrBlocksCompressor;
37use crate::CanonicalCompressor;
38use crate::Compressor;
39use crate::CompressorContext;
40use crate::CompressorStats;
41use crate::Excludes;
42use crate::GenerateStatsOptions;
43use crate::IntCode;
44use crate::Scheme;
45use crate::SchemeExt;
46use crate::sample::sample;
47
48/// Array of variable-length byte arrays, and relevant stats for compression.
49#[derive(Clone, Debug)]
50pub struct StringStats {
51    src: VarBinViewArray,
52    estimated_distinct_count: u32,
53    value_count: u32,
54    null_count: u32,
55}
56
57/// Estimate the number of distinct strings in the var bin view array.
58fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
59    let views = strings.views();
60    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
61    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
62    // share a 4-byte prefix and have the same length.
63    let mut distinct = HashSet::with_capacity(views.len() / 2);
64    views.iter().for_each(|&view| {
65        #[expect(
66            clippy::cast_possible_truncation,
67            reason = "approximate uniqueness with view prefix"
68        )]
69        let len_and_prefix = view.as_u128() as u64;
70        distinct.insert(len_and_prefix);
71    });
72
73    Ok(u32::try_from(distinct.len())?)
74}
75
76impl StringStats {
77    fn generate_opts_fallible(
78        input: &VarBinViewArray,
79        opts: GenerateStatsOptions,
80    ) -> VortexResult<Self> {
81        let null_count = input
82            .statistics()
83            .compute_null_count()
84            .ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
85        let value_count = input.len() - null_count;
86        let estimated_distinct = if opts.count_distinct_values {
87            estimate_distinct_count(input)?
88        } else {
89            u32::MAX
90        };
91
92        Ok(Self {
93            src: input.clone(),
94            value_count: u32::try_from(value_count)?,
95            null_count: u32::try_from(null_count)?,
96            estimated_distinct_count: estimated_distinct,
97        })
98    }
99}
100
101impl CompressorStats for StringStats {
102    type ArrayVTable = VarBinViewVTable;
103
104    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
105        Self::generate_opts_fallible(input, opts)
106            .vortex_expect("StringStats::generate_opts should not fail")
107    }
108
109    fn source(&self) -> &VarBinViewArray {
110        &self.src
111    }
112
113    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
114        let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
115
116        Self::generate_opts(&sampled, opts)
117    }
118}
119
120/// All available string compression schemes.
121pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
122    &UncompressedScheme,
123    &DictScheme,
124    &FSSTScheme,
125    &ConstantScheme,
126    &NullDominated,
127    #[cfg(feature = "zstd")]
128    &ZstdScheme,
129    #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
130    &ZstdBuffersScheme,
131];
132
133/// [`Compressor`] for strings.
134#[derive(Clone, Copy)]
135pub struct StringCompressor<'a> {
136    /// Reference to the parent compressor.
137    pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
138}
139
140impl<'a> Compressor for StringCompressor<'a> {
141    type ArrayVTable = VarBinViewVTable;
142    type SchemeType = dyn StringScheme;
143    type StatsType = StringStats;
144
145    fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
146        if self
147            .btr_blocks_compressor
148            .string_schemes()
149            .iter()
150            .any(|s| s.code() == DictScheme.code())
151        {
152            StringStats::generate_opts(
153                array,
154                GenerateStatsOptions {
155                    count_distinct_values: true,
156                },
157            )
158        } else {
159            StringStats::generate_opts(
160                array,
161                GenerateStatsOptions {
162                    count_distinct_values: false,
163                },
164            )
165        }
166    }
167
168    fn schemes(&self) -> &[&'static dyn StringScheme] {
169        self.btr_blocks_compressor.string_schemes()
170    }
171
172    fn default_scheme(&self) -> &'static Self::SchemeType {
173        &UncompressedScheme
174    }
175}
176
177pub trait StringScheme:
178    Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
179{
180}
181
182impl<T> StringScheme for T where
183    T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
184{
185}
186
187impl PartialEq for dyn StringScheme {
188    fn eq(&self, other: &Self) -> bool {
189        self.code() == other.code()
190    }
191}
192
193impl Eq for dyn StringScheme {}
194
195impl Hash for dyn StringScheme {
196    fn hash<H: Hasher>(&self, state: &mut H) {
197        self.code().hash(state)
198    }
199}
200
201#[derive(Debug, Copy, Clone, PartialEq, Eq)]
202pub struct UncompressedScheme;
203
204#[derive(Debug, Copy, Clone, PartialEq, Eq)]
205pub struct DictScheme;
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq)]
208pub struct FSSTScheme;
209
210#[derive(Debug, Copy, Clone, PartialEq, Eq)]
211pub struct ConstantScheme;
212
213#[derive(Debug, Copy, Clone, PartialEq, Eq)]
214pub struct NullDominated;
215
216/// Zstd compression without dictionaries (nvCOMP compatible).
217#[cfg(feature = "zstd")]
218#[derive(Debug, Copy, Clone, PartialEq, Eq)]
219pub struct ZstdScheme;
220
221/// Zstd buffer-level compression preserving array layout for GPU decompression.
222#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
223#[derive(Debug, Copy, Clone, PartialEq, Eq)]
224pub struct ZstdBuffersScheme;
225
226/// Unique identifier for string compression schemes.
227#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
228pub enum StringCode {
229    /// No compression applied.
230    Uncompressed,
231    /// Dictionary encoding for low-cardinality strings.
232    Dict,
233    /// FSST (Fast Static Symbol Table) compression.
234    Fsst,
235    /// Constant encoding for arrays with a single distinct value.
236    Constant,
237    /// Sparse encoding for null-dominated arrays.
238    Sparse,
239    /// Zstd compression without dictionaries.
240    Zstd,
241    /// Zstd buffer-level compression preserving array layout.
242    ZstdBuffers,
243}
244
245impl Scheme for UncompressedScheme {
246    type StatsType = StringStats;
247    type CodeType = StringCode;
248
249    fn code(&self) -> StringCode {
250        StringCode::Uncompressed
251    }
252
253    fn expected_compression_ratio(
254        &self,
255        _compressor: &BtrBlocksCompressor,
256        _stats: &Self::StatsType,
257        _ctx: CompressorContext,
258        _excludes: &[StringCode],
259    ) -> VortexResult<f64> {
260        Ok(1.0)
261    }
262
263    fn compress(
264        &self,
265        _compressor: &BtrBlocksCompressor,
266        stats: &Self::StatsType,
267        _ctx: CompressorContext,
268        _excludes: &[StringCode],
269    ) -> VortexResult<ArrayRef> {
270        Ok(stats.source().to_array())
271    }
272}
273
274impl Scheme for DictScheme {
275    type StatsType = StringStats;
276    type CodeType = StringCode;
277
278    fn code(&self) -> StringCode {
279        StringCode::Dict
280    }
281
282    fn expected_compression_ratio(
283        &self,
284        compressor: &BtrBlocksCompressor,
285        stats: &Self::StatsType,
286        ctx: CompressorContext,
287        excludes: &[StringCode],
288    ) -> VortexResult<f64> {
289        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
290        if stats.estimated_distinct_count > stats.value_count / 2 {
291            return Ok(0.0);
292        }
293
294        // If array is all null, do not attempt dict.
295        if stats.value_count == 0 {
296            return Ok(0.0);
297        }
298
299        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
300    }
301
302    fn compress(
303        &self,
304        compressor: &BtrBlocksCompressor,
305        stats: &Self::StatsType,
306        ctx: CompressorContext,
307        _excludes: &[StringCode],
308    ) -> VortexResult<ArrayRef> {
309        let dict = dict_encode(&stats.source().clone().into_array())?;
310
311        // If we are not allowed to cascade, do not attempt codes or values compression.
312        if ctx.allowed_cascading == 0 {
313            return Ok(dict.into_array());
314        }
315
316        // Find best compressor for codes and values separately
317        let compressed_codes = compressor.compress_canonical(
318            Canonical::Primitive(dict.codes().to_primitive()),
319            ctx.descend(),
320            Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
321        )?;
322
323        // Attempt to compress the values with non-Dict compression.
324        // Currently this will only be FSST.
325        let compressed_values = compressor.compress_canonical(
326            Canonical::VarBinView(dict.values().to_varbinview()),
327            ctx.descend(),
328            Excludes::from(&[DictScheme.code()]),
329        )?;
330
331        // SAFETY: compressing codes or values does not alter the invariants
332        unsafe {
333            Ok(
334                DictArray::new_unchecked(compressed_codes, compressed_values)
335                    .set_all_values_referenced(dict.has_all_values_referenced())
336                    .into_array(),
337            )
338        }
339    }
340}
341
342impl Scheme for FSSTScheme {
343    type StatsType = StringStats;
344    type CodeType = StringCode;
345
346    fn code(&self) -> StringCode {
347        StringCode::Fsst
348    }
349
350    fn compress(
351        &self,
352        compressor: &BtrBlocksCompressor,
353        stats: &Self::StatsType,
354        ctx: CompressorContext,
355        _excludes: &[StringCode],
356    ) -> VortexResult<ArrayRef> {
357        let fsst = {
358            let compressor = fsst_train_compressor(&stats.src);
359            fsst_compress(&stats.src, &compressor)
360        };
361
362        let compressed_original_lengths = compressor.compress_canonical(
363            Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
364            ctx,
365            Excludes::none(),
366        )?;
367
368        let compressed_codes_offsets = compressor.compress_canonical(
369            Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
370            ctx,
371            Excludes::none(),
372        )?;
373        let compressed_codes = VarBinArray::try_new(
374            compressed_codes_offsets,
375            fsst.codes().bytes().clone(),
376            fsst.codes().dtype().clone(),
377            fsst.codes().validity().clone(),
378        )?;
379
380        let fsst = FSSTArray::try_new(
381            fsst.dtype().clone(),
382            fsst.symbols().clone(),
383            fsst.symbol_lengths().clone(),
384            compressed_codes,
385            compressed_original_lengths,
386        )?;
387
388        Ok(fsst.into_array())
389    }
390}
391
392impl Scheme for ConstantScheme {
393    type StatsType = StringStats;
394    type CodeType = StringCode;
395
396    fn code(&self) -> Self::CodeType {
397        StringCode::Constant
398    }
399
400    fn is_constant(&self) -> bool {
401        true
402    }
403
404    fn expected_compression_ratio(
405        &self,
406        _compressor: &BtrBlocksCompressor,
407        stats: &Self::StatsType,
408        ctx: CompressorContext,
409        _excludes: &[Self::CodeType],
410    ) -> VortexResult<f64> {
411        if ctx.is_sample {
412            return Ok(0.0);
413        }
414
415        if stats.estimated_distinct_count > 1 || !is_constant(stats.src.as_ref())?.unwrap_or(false)
416        {
417            return Ok(0.0);
418        }
419
420        // Force constant is these cases
421        Ok(f64::MAX)
422    }
423
424    fn compress(
425        &self,
426        _compressor: &BtrBlocksCompressor,
427        stats: &Self::StatsType,
428        _ctx: CompressorContext,
429        _excludes: &[Self::CodeType],
430    ) -> VortexResult<ArrayRef> {
431        let scalar_idx =
432            (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
433
434        match scalar_idx {
435            Some(idx) => {
436                let scalar = stats.source().scalar_at(idx)?;
437                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
438                if !stats.source().all_valid()? {
439                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
440                } else {
441                    Ok(const_arr)
442                }
443            }
444            None => Ok(ConstantArray::new(
445                Scalar::null(stats.src.dtype().clone()),
446                stats.src.len(),
447            )
448            .into_array()),
449        }
450    }
451}
452
453impl Scheme for NullDominated {
454    type StatsType = StringStats;
455    type CodeType = StringCode;
456
457    fn code(&self) -> Self::CodeType {
458        StringCode::Sparse
459    }
460
461    fn expected_compression_ratio(
462        &self,
463        _compressor: &BtrBlocksCompressor,
464        stats: &Self::StatsType,
465        ctx: CompressorContext,
466        _excludes: &[Self::CodeType],
467    ) -> VortexResult<f64> {
468        // Only use `SparseScheme` if we can cascade.
469        if ctx.allowed_cascading == 0 {
470            return Ok(0.0);
471        }
472
473        if stats.value_count == 0 {
474            // All nulls should use ConstantScheme
475            return Ok(0.0);
476        }
477
478        // If the majority is null, will compress well.
479        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
480            return Ok(stats.src.len() as f64 / stats.value_count as f64);
481        }
482
483        // Otherwise we don't go this route
484        Ok(0.0)
485    }
486
487    fn compress(
488        &self,
489        compressor: &BtrBlocksCompressor,
490        stats: &Self::StatsType,
491        ctx: CompressorContext,
492        _excludes: &[Self::CodeType],
493    ) -> VortexResult<ArrayRef> {
494        assert!(ctx.allowed_cascading > 0);
495
496        // We pass None as we only run this pathway for NULL-dominated string arrays
497        let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
498
499        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
500            // Compress the indices only (not the values for strings)
501            let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
502
503            let indices = sparse.patches().indices().to_primitive().narrow()?;
504            let compressed_indices = compressor.compress_canonical(
505                Canonical::Primitive(indices),
506                ctx.descend(),
507                Excludes::int_only(&new_excludes),
508            )?;
509
510            SparseArray::try_new(
511                compressed_indices,
512                sparse.patches().values().clone(),
513                sparse.len(),
514                sparse.fill_scalar().clone(),
515            )
516            .map(|a| a.into_array())
517        } else {
518            Ok(sparse_encoded)
519        }
520    }
521}
522
523#[cfg(feature = "zstd")]
524impl Scheme for ZstdScheme {
525    type StatsType = StringStats;
526    type CodeType = StringCode;
527
528    fn code(&self) -> StringCode {
529        StringCode::Zstd
530    }
531
532    fn compress(
533        &self,
534        _compressor: &BtrBlocksCompressor,
535        stats: &Self::StatsType,
536        _ctx: CompressorContext,
537        _excludes: &[StringCode],
538    ) -> VortexResult<ArrayRef> {
539        let compacted = stats.source().compact_buffers()?;
540        Ok(
541            vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
542                .into_array(),
543        )
544    }
545}
546
547#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
548impl Scheme for ZstdBuffersScheme {
549    type StatsType = StringStats;
550    type CodeType = StringCode;
551
552    fn code(&self) -> StringCode {
553        StringCode::ZstdBuffers
554    }
555
556    fn compress(
557        &self,
558        _compressor: &BtrBlocksCompressor,
559        stats: &Self::StatsType,
560        _ctx: CompressorContext,
561        _excludes: &[StringCode],
562    ) -> VortexResult<ArrayRef> {
563        Ok(vortex_zstd::ZstdBuffersArray::compress(&stats.source().to_array(), 3)?.into_array())
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use vortex_array::arrays::VarBinViewArray;
570    use vortex_array::builders::ArrayBuilder;
571    use vortex_array::builders::VarBinViewBuilder;
572    use vortex_array::display::DisplayOptions;
573    use vortex_dtype::DType;
574    use vortex_dtype::Nullability;
575    use vortex_error::VortexResult;
576
577    use crate::BtrBlocksCompressor;
578
579    #[test]
580    fn test_strings() -> VortexResult<()> {
581        let mut strings = Vec::new();
582        for _ in 0..1024 {
583            strings.push(Some("hello-world-1234"));
584        }
585        for _ in 0..1024 {
586            strings.push(Some("hello-world-56789"));
587        }
588        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
589
590        let compressed = BtrBlocksCompressor::default().compress(strings.as_ref())?;
591        assert_eq!(compressed.len(), 2048);
592
593        let display = compressed
594            .display_as(DisplayOptions::MetadataOnly)
595            .to_string()
596            .to_lowercase();
597        assert_eq!(display, "vortex.dict(utf8, len=2048)");
598
599        Ok(())
600    }
601
602    #[test]
603    fn test_sparse_nulls() -> VortexResult<()> {
604        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
605        strings.append_nulls(99);
606
607        strings.append_value("one little string");
608
609        let strings = strings.finish_into_varbinview();
610
611        let compressed = BtrBlocksCompressor::default().compress(strings.as_ref())?;
612        assert_eq!(compressed.len(), 100);
613
614        let display = compressed
615            .display_as(DisplayOptions::MetadataOnly)
616            .to_string()
617            .to_lowercase();
618        assert_eq!(display, "vortex.sparse(utf8?, len=100)");
619
620        Ok(())
621    }
622}
623
624/// Tests to verify that each string compression scheme produces the expected encoding.
625#[cfg(test)]
626mod scheme_selection_tests {
627    use vortex_array::arrays::ConstantVTable;
628    use vortex_array::arrays::DictVTable;
629    use vortex_array::arrays::VarBinViewArray;
630    use vortex_dtype::DType;
631    use vortex_dtype::Nullability;
632    use vortex_error::VortexResult;
633    use vortex_fsst::FSSTVTable;
634
635    use crate::BtrBlocksCompressor;
636
637    #[test]
638    fn test_constant_compressed() -> VortexResult<()> {
639        let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
640        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
641        let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?;
642        assert!(compressed.is::<ConstantVTable>());
643        Ok(())
644    }
645
646    #[test]
647    fn test_dict_compressed() -> VortexResult<()> {
648        let distinct_values = ["apple", "banana", "cherry"];
649        let mut strings = Vec::with_capacity(1000);
650        for i in 0..1000 {
651            strings.push(Some(distinct_values[i % 3]));
652        }
653        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
654        let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?;
655        assert!(compressed.is::<DictVTable>());
656        Ok(())
657    }
658
659    #[test]
660    fn test_fsst_compressed() -> VortexResult<()> {
661        let mut strings = Vec::with_capacity(1000);
662        for i in 0..1000 {
663            strings.push(Some(format!(
664                "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
665            )));
666        }
667        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
668        let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?;
669        assert!(compressed.is::<FSSTVTable>());
670        Ok(())
671    }
672}