vortex_btrblocks/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::arrays::{
5    ConstantArray, DictArray, MaskedArray, VarBinArray, VarBinViewArray, VarBinViewVTable,
6};
7use vortex_array::builders::dict::dict_encode;
8use vortex_array::vtable::ValidityHelper;
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
12use vortex_scalar::Scalar;
13use vortex_sparse::{SparseArray, SparseVTable};
14use vortex_utils::aliases::hash_set::HashSet;
15
16use crate::integer::IntCompressor;
17use crate::sample::sample;
18use crate::{
19    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20    estimate_compression_ratio_with_sampling, integer,
21};
22
23/// Array of variable-length byte arrays, and relevant stats for compression.
24#[derive(Clone, Debug)]
25pub struct StringStats {
26    src: VarBinViewArray,
27    estimated_distinct_count: u32,
28    value_count: u32,
29    null_count: u32,
30}
31
32/// Estimate the number of distinct strings in the var bin view array.
33#[allow(clippy::cast_possible_truncation)]
34fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
35    let views = strings.views();
36    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
37    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
38    // share a 4-byte prefix and have the same length.
39    let mut distinct = HashSet::with_capacity(views.len() / 2);
40    views.iter().for_each(|&view| {
41        let len_and_prefix = view.as_u128() as u64;
42        distinct.insert(len_and_prefix);
43    });
44
45    distinct
46        .len()
47        .try_into()
48        .vortex_expect("distinct count must fit in u32")
49}
50
51impl CompressorStats for StringStats {
52    type ArrayVTable = VarBinViewVTable;
53
54    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
55        let null_count = input
56            .statistics()
57            .compute_null_count()
58            .vortex_expect("null count");
59        let value_count = input.len() - null_count;
60        let estimated_distinct = if opts.count_distinct_values {
61            estimate_distinct_count(input)
62        } else {
63            u32::MAX
64        };
65
66        Self {
67            src: input.clone(),
68            value_count: value_count.try_into().vortex_expect("value_count"),
69            null_count: null_count.try_into().vortex_expect("null_count"),
70            estimated_distinct_count: estimated_distinct,
71        }
72    }
73
74    fn source(&self) -> &VarBinViewArray {
75        &self.src
76    }
77
78    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
79        let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
80
81        Self::generate_opts(&sampled, opts)
82    }
83}
84
85/// [`Compressor`] for strings.
86pub struct StringCompressor;
87
88impl Compressor for StringCompressor {
89    type ArrayVTable = VarBinViewVTable;
90    type SchemeType = dyn StringScheme;
91    type StatsType = StringStats;
92
93    fn schemes() -> &'static [&'static Self::SchemeType] {
94        &[
95            &UncompressedScheme,
96            &DictScheme,
97            &FSSTScheme,
98            &ConstantScheme,
99            &NullDominated,
100        ]
101    }
102
103    fn default_scheme() -> &'static Self::SchemeType {
104        &UncompressedScheme
105    }
106
107    fn dict_scheme_code() -> StringCode {
108        DICT_SCHEME
109    }
110}
111
112pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
113
114impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
115
116#[derive(Debug, Copy, Clone)]
117pub struct UncompressedScheme;
118
119#[derive(Debug, Copy, Clone)]
120pub struct DictScheme;
121
122#[derive(Debug, Copy, Clone)]
123pub struct FSSTScheme;
124
125#[derive(Debug, Copy, Clone)]
126pub struct ConstantScheme;
127
128#[derive(Debug, Copy, Clone)]
129pub struct NullDominated;
130
131#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
132pub struct StringCode(u8);
133
134const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
135const DICT_SCHEME: StringCode = StringCode(1);
136const FSST_SCHEME: StringCode = StringCode(2);
137const CONSTANT_SCHEME: StringCode = StringCode(3);
138
139const SPARSE_SCHEME: StringCode = StringCode(4);
140
141impl Scheme for UncompressedScheme {
142    type StatsType = StringStats;
143    type CodeType = StringCode;
144
145    fn code(&self) -> StringCode {
146        UNCOMPRESSED_SCHEME
147    }
148
149    fn expected_compression_ratio(
150        &self,
151        _stats: &Self::StatsType,
152        _is_sample: bool,
153        _allowed_cascading: usize,
154        _excludes: &[StringCode],
155    ) -> VortexResult<f64> {
156        Ok(1.0)
157    }
158
159    fn compress(
160        &self,
161        stats: &Self::StatsType,
162        _is_sample: bool,
163        _allowed_cascading: usize,
164        _excludes: &[StringCode],
165    ) -> VortexResult<ArrayRef> {
166        Ok(stats.source().to_array())
167    }
168}
169
170impl Scheme for DictScheme {
171    type StatsType = StringStats;
172    type CodeType = StringCode;
173
174    fn code(&self) -> StringCode {
175        DICT_SCHEME
176    }
177
178    fn expected_compression_ratio(
179        &self,
180        stats: &Self::StatsType,
181        is_sample: bool,
182        allowed_cascading: usize,
183        excludes: &[StringCode],
184    ) -> VortexResult<f64> {
185        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
186        if stats.estimated_distinct_count > stats.value_count / 2 {
187            return Ok(0.0);
188        }
189
190        // If array is all null, do not attempt dict.
191        if stats.value_count == 0 {
192            return Ok(0.0);
193        }
194
195        estimate_compression_ratio_with_sampling(
196            self,
197            stats,
198            is_sample,
199            allowed_cascading,
200            excludes,
201        )
202    }
203
204    fn compress(
205        &self,
206        stats: &Self::StatsType,
207        is_sample: bool,
208        allowed_cascading: usize,
209        _excludes: &[StringCode],
210    ) -> VortexResult<ArrayRef> {
211        let dict = dict_encode(&stats.source().clone().into_array())?;
212
213        // If we are not allowed to cascade, do not attempt codes or values compression.
214        if allowed_cascading == 0 {
215            return Ok(dict.into_array());
216        }
217
218        // Find best compressor for codes and values separately
219        let compressed_codes = IntCompressor::compress(
220            &dict.codes().to_primitive(),
221            is_sample,
222            allowed_cascading - 1,
223            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
224        )?;
225
226        // Attempt to compress the values with non-Dict compression.
227        // Currently this will only be FSST.
228        let compressed_values = StringCompressor::compress(
229            &dict.values().to_varbinview(),
230            is_sample,
231            allowed_cascading - 1,
232            &[DictScheme.code()],
233        )?;
234
235        // SAFETY: compressing codes or values does not alter the invariants
236        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
237    }
238}
239
240impl Scheme for FSSTScheme {
241    type StatsType = StringStats;
242    type CodeType = StringCode;
243
244    fn code(&self) -> StringCode {
245        FSST_SCHEME
246    }
247
248    fn compress(
249        &self,
250        stats: &Self::StatsType,
251        is_sample: bool,
252        allowed_cascading: usize,
253        _excludes: &[StringCode],
254    ) -> VortexResult<ArrayRef> {
255        let compressor = fsst_train_compressor(&stats.src);
256        let fsst = fsst_compress(&stats.src, &compressor);
257
258        let compressed_original_lengths = IntCompressor::compress(
259            &fsst.uncompressed_lengths().to_primitive().narrow()?,
260            is_sample,
261            allowed_cascading,
262            &[],
263        )?;
264
265        let compressed_codes_offsets = IntCompressor::compress(
266            &fsst.codes().offsets().to_primitive().narrow()?,
267            is_sample,
268            allowed_cascading,
269            &[],
270        )?;
271        let compressed_codes = VarBinArray::try_new(
272            compressed_codes_offsets,
273            fsst.codes().bytes().clone(),
274            fsst.codes().dtype().clone(),
275            fsst.codes().validity().clone(),
276        )?;
277
278        let fsst = FSSTArray::try_new(
279            fsst.dtype().clone(),
280            fsst.symbols().clone(),
281            fsst.symbol_lengths().clone(),
282            compressed_codes,
283            compressed_original_lengths,
284        )?;
285
286        Ok(fsst.into_array())
287    }
288}
289
290impl Scheme for ConstantScheme {
291    type StatsType = StringStats;
292    type CodeType = StringCode;
293
294    fn code(&self) -> Self::CodeType {
295        CONSTANT_SCHEME
296    }
297
298    fn is_constant(&self) -> bool {
299        true
300    }
301
302    fn expected_compression_ratio(
303        &self,
304        stats: &Self::StatsType,
305        is_sample: bool,
306        _allowed_cascading: usize,
307        _excludes: &[Self::CodeType],
308    ) -> VortexResult<f64> {
309        if is_sample {
310            return Ok(0.0);
311        }
312
313        if stats.estimated_distinct_count > 1 || !stats.src.is_constant() {
314            return Ok(0.0);
315        }
316
317        // Force constant is these cases
318        Ok(f64::MAX)
319    }
320
321    fn compress(
322        &self,
323        stats: &Self::StatsType,
324        _is_sample: bool,
325        _allowed_cascading: usize,
326        _excludes: &[Self::CodeType],
327    ) -> VortexResult<ArrayRef> {
328        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
329
330        match scalar_idx {
331            Some(idx) => {
332                let scalar = stats.source().scalar_at(idx);
333                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
334                if !stats.source().all_valid() {
335                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
336                } else {
337                    Ok(const_arr)
338                }
339            }
340            None => Ok(ConstantArray::new(
341                Scalar::null(stats.src.dtype().clone()),
342                stats.src.len(),
343            )
344            .into_array()),
345        }
346    }
347}
348
349impl Scheme for NullDominated {
350    type StatsType = StringStats;
351    type CodeType = StringCode;
352
353    fn code(&self) -> Self::CodeType {
354        SPARSE_SCHEME
355    }
356
357    fn expected_compression_ratio(
358        &self,
359        stats: &Self::StatsType,
360        _is_sample: bool,
361        allowed_cascading: usize,
362        _excludes: &[Self::CodeType],
363    ) -> VortexResult<f64> {
364        // Only use `SparseScheme` if we can cascade.
365        if allowed_cascading == 0 {
366            return Ok(0.0);
367        }
368
369        if stats.value_count == 0 {
370            // All nulls should use ConstantScheme
371            return Ok(0.0);
372        }
373
374        // If the majority is null, will compress well.
375        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
376            return Ok(stats.src.len() as f64 / stats.value_count as f64);
377        }
378
379        // Otherwise we don't go this route
380        Ok(0.0)
381    }
382
383    fn compress(
384        &self,
385        stats: &Self::StatsType,
386        is_sample: bool,
387        allowed_cascading: usize,
388        _excludes: &[Self::CodeType],
389    ) -> VortexResult<ArrayRef> {
390        assert!(allowed_cascading > 0);
391
392        // We pass None as we only run this pathway for NULL-dominated float arrays
393        let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
394
395        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
396            // Compress the values
397            let new_excludes = vec![integer::SparseScheme.code()];
398
399            // Don't attempt to compress the non-null values
400            let indices = sparse.patches().indices().to_primitive().narrow()?;
401            let compressed_indices = IntCompressor::compress_no_dict(
402                &indices,
403                is_sample,
404                allowed_cascading - 1,
405                &new_excludes,
406            )?;
407
408            SparseArray::try_new(
409                compressed_indices,
410                sparse.patches().values().clone(),
411                sparse.len(),
412                sparse.fill_scalar().clone(),
413            )
414            .map(|a| a.into_array())
415        } else {
416            Ok(sparse_encoded)
417        }
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use vortex_array::arrays::VarBinViewArray;
424    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
425    use vortex_dtype::{DType, Nullability};
426    use vortex_sparse::SparseEncoding;
427
428    use crate::string::StringCompressor;
429    use crate::{Compressor, MAX_CASCADE};
430
431    #[test]
432    fn test_strings() {
433        let mut strings = Vec::new();
434        for _ in 0..1024 {
435            strings.push(Some("hello-world-1234"));
436        }
437        for _ in 0..1024 {
438            strings.push(Some("hello-world-56789"));
439        }
440        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
441
442        println!("original array: {}", strings.as_ref().display_tree());
443
444        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
445
446        println!("compression tree: {}", compressed.display_tree());
447    }
448
449    #[test]
450    fn test_sparse_nulls() {
451        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
452        strings.append_nulls(99);
453
454        strings.append_value("one little string");
455
456        let strings = strings.finish_into_varbinview();
457
458        let compressed = StringCompressor::compress(&strings, false, MAX_CASCADE, &[]).unwrap();
459        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
460    }
461}