vortex_btrblocks/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::ArrayRef;
5use vortex_array::IntoArray;
6use vortex_array::ToCanonical;
7use vortex_array::arrays::ConstantArray;
8use vortex_array::arrays::DictArray;
9use vortex_array::arrays::MaskedArray;
10use vortex_array::arrays::VarBinArray;
11use vortex_array::arrays::VarBinViewArray;
12use vortex_array::arrays::VarBinViewVTable;
13use vortex_array::builders::dict::dict_encode;
14use vortex_array::vtable::ValidityHelper;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_fsst::FSSTArray;
18use vortex_fsst::fsst_compress;
19use vortex_fsst::fsst_train_compressor;
20use vortex_scalar::Scalar;
21use vortex_sparse::SparseArray;
22use vortex_sparse::SparseVTable;
23use vortex_utils::aliases::hash_set::HashSet;
24
25use crate::Compressor;
26use crate::CompressorStats;
27use crate::GenerateStatsOptions;
28use crate::Scheme;
29use crate::estimate_compression_ratio_with_sampling;
30use crate::integer;
31use crate::integer::IntCompressor;
32use crate::sample::sample;
33
34/// Array of variable-length byte arrays, and relevant stats for compression.
35#[derive(Clone, Debug)]
36pub struct StringStats {
37    src: VarBinViewArray,
38    estimated_distinct_count: u32,
39    value_count: u32,
40    null_count: u32,
41}
42
43/// Estimate the number of distinct strings in the var bin view array.
44fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
45    let views = strings.views();
46    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
47    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
48    // share a 4-byte prefix and have the same length.
49    let mut distinct = HashSet::with_capacity(views.len() / 2);
50    views.iter().for_each(|&view| {
51        #[expect(
52            clippy::cast_possible_truncation,
53            reason = "approximate uniqueness with view prefix"
54        )]
55        let len_and_prefix = view.as_u128() as u64;
56        distinct.insert(len_and_prefix);
57    });
58
59    distinct
60        .len()
61        .try_into()
62        .vortex_expect("distinct count must fit in u32")
63}
64
65impl CompressorStats for StringStats {
66    type ArrayVTable = VarBinViewVTable;
67
68    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
69        let null_count = input
70            .statistics()
71            .compute_null_count()
72            .vortex_expect("null count");
73        let value_count = input.len() - null_count;
74        let estimated_distinct = if opts.count_distinct_values {
75            estimate_distinct_count(input)
76        } else {
77            u32::MAX
78        };
79
80        Self {
81            src: input.clone(),
82            value_count: value_count.try_into().vortex_expect("value_count"),
83            null_count: null_count.try_into().vortex_expect("null_count"),
84            estimated_distinct_count: estimated_distinct,
85        }
86    }
87
88    fn source(&self) -> &VarBinViewArray {
89        &self.src
90    }
91
92    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
93        let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
94
95        Self::generate_opts(&sampled, opts)
96    }
97}
98
99/// [`Compressor`] for strings.
100pub struct StringCompressor;
101
102impl Compressor for StringCompressor {
103    type ArrayVTable = VarBinViewVTable;
104    type SchemeType = dyn StringScheme;
105    type StatsType = StringStats;
106
107    fn schemes() -> &'static [&'static Self::SchemeType] {
108        &[
109            &UncompressedScheme,
110            &DictScheme,
111            &FSSTScheme,
112            &ConstantScheme,
113            &NullDominated,
114        ]
115    }
116
117    fn default_scheme() -> &'static Self::SchemeType {
118        &UncompressedScheme
119    }
120
121    fn dict_scheme_code() -> StringCode {
122        DICT_SCHEME
123    }
124}
125
126pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
127
128impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
129
130#[derive(Debug, Copy, Clone)]
131pub struct UncompressedScheme;
132
133#[derive(Debug, Copy, Clone)]
134pub struct DictScheme;
135
136#[derive(Debug, Copy, Clone)]
137pub struct FSSTScheme;
138
139#[derive(Debug, Copy, Clone)]
140pub struct ConstantScheme;
141
142#[derive(Debug, Copy, Clone)]
143pub struct NullDominated;
144
145#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
146pub struct StringCode(u8);
147
148const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
149const DICT_SCHEME: StringCode = StringCode(1);
150const FSST_SCHEME: StringCode = StringCode(2);
151const CONSTANT_SCHEME: StringCode = StringCode(3);
152
153const SPARSE_SCHEME: StringCode = StringCode(4);
154
155impl Scheme for UncompressedScheme {
156    type StatsType = StringStats;
157    type CodeType = StringCode;
158
159    fn code(&self) -> StringCode {
160        UNCOMPRESSED_SCHEME
161    }
162
163    fn expected_compression_ratio(
164        &self,
165        _stats: &Self::StatsType,
166        _is_sample: bool,
167        _allowed_cascading: usize,
168        _excludes: &[StringCode],
169    ) -> VortexResult<f64> {
170        Ok(1.0)
171    }
172
173    fn compress(
174        &self,
175        stats: &Self::StatsType,
176        _is_sample: bool,
177        _allowed_cascading: usize,
178        _excludes: &[StringCode],
179    ) -> VortexResult<ArrayRef> {
180        Ok(stats.source().to_array())
181    }
182}
183
184impl Scheme for DictScheme {
185    type StatsType = StringStats;
186    type CodeType = StringCode;
187
188    fn code(&self) -> StringCode {
189        DICT_SCHEME
190    }
191
192    fn expected_compression_ratio(
193        &self,
194        stats: &Self::StatsType,
195        is_sample: bool,
196        allowed_cascading: usize,
197        excludes: &[StringCode],
198    ) -> VortexResult<f64> {
199        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
200        if stats.estimated_distinct_count > stats.value_count / 2 {
201            return Ok(0.0);
202        }
203
204        // If array is all null, do not attempt dict.
205        if stats.value_count == 0 {
206            return Ok(0.0);
207        }
208
209        estimate_compression_ratio_with_sampling(
210            self,
211            stats,
212            is_sample,
213            allowed_cascading,
214            excludes,
215        )
216    }
217
218    fn compress(
219        &self,
220        stats: &Self::StatsType,
221        is_sample: bool,
222        allowed_cascading: usize,
223        _excludes: &[StringCode],
224    ) -> VortexResult<ArrayRef> {
225        let dict = dict_encode(&stats.source().clone().into_array())?;
226
227        // If we are not allowed to cascade, do not attempt codes or values compression.
228        if allowed_cascading == 0 {
229            return Ok(dict.into_array());
230        }
231
232        // Find best compressor for codes and values separately
233        let compressed_codes = IntCompressor::compress(
234            &dict.codes().to_primitive(),
235            is_sample,
236            allowed_cascading - 1,
237            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
238        )?;
239
240        // Attempt to compress the values with non-Dict compression.
241        // Currently this will only be FSST.
242        let compressed_values = StringCompressor::compress(
243            &dict.values().to_varbinview(),
244            is_sample,
245            allowed_cascading - 1,
246            &[DictScheme.code()],
247        )?;
248
249        // SAFETY: compressing codes or values does not alter the invariants
250        unsafe {
251            Ok(
252                DictArray::new_unchecked(compressed_codes, compressed_values)
253                    .set_all_values_referenced(dict.has_all_values_referenced())
254                    .into_array(),
255            )
256        }
257    }
258}
259
260impl Scheme for FSSTScheme {
261    type StatsType = StringStats;
262    type CodeType = StringCode;
263
264    fn code(&self) -> StringCode {
265        FSST_SCHEME
266    }
267
268    fn compress(
269        &self,
270        stats: &Self::StatsType,
271        is_sample: bool,
272        allowed_cascading: usize,
273        _excludes: &[StringCode],
274    ) -> VortexResult<ArrayRef> {
275        let compressor = fsst_train_compressor(&stats.src);
276        let fsst = fsst_compress(&stats.src, &compressor);
277
278        let compressed_original_lengths = IntCompressor::compress(
279            &fsst.uncompressed_lengths().to_primitive().narrow()?,
280            is_sample,
281            allowed_cascading,
282            &[],
283        )?;
284
285        let compressed_codes_offsets = IntCompressor::compress(
286            &fsst.codes().offsets().to_primitive().narrow()?,
287            is_sample,
288            allowed_cascading,
289            &[],
290        )?;
291        let compressed_codes = VarBinArray::try_new(
292            compressed_codes_offsets,
293            fsst.codes().bytes().clone(),
294            fsst.codes().dtype().clone(),
295            fsst.codes().validity().clone(),
296        )?;
297
298        let fsst = FSSTArray::try_new(
299            fsst.dtype().clone(),
300            fsst.symbols().clone(),
301            fsst.symbol_lengths().clone(),
302            compressed_codes,
303            compressed_original_lengths,
304        )?;
305
306        Ok(fsst.into_array())
307    }
308}
309
310impl Scheme for ConstantScheme {
311    type StatsType = StringStats;
312    type CodeType = StringCode;
313
314    fn code(&self) -> Self::CodeType {
315        CONSTANT_SCHEME
316    }
317
318    fn is_constant(&self) -> bool {
319        true
320    }
321
322    fn expected_compression_ratio(
323        &self,
324        stats: &Self::StatsType,
325        is_sample: bool,
326        _allowed_cascading: usize,
327        _excludes: &[Self::CodeType],
328    ) -> VortexResult<f64> {
329        if is_sample {
330            return Ok(0.0);
331        }
332
333        if stats.estimated_distinct_count > 1 || !stats.src.is_constant() {
334            return Ok(0.0);
335        }
336
337        // Force constant is these cases
338        Ok(f64::MAX)
339    }
340
341    fn compress(
342        &self,
343        stats: &Self::StatsType,
344        _is_sample: bool,
345        _allowed_cascading: usize,
346        _excludes: &[Self::CodeType],
347    ) -> VortexResult<ArrayRef> {
348        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
349
350        match scalar_idx {
351            Some(idx) => {
352                let scalar = stats.source().scalar_at(idx);
353                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
354                if !stats.source().all_valid() {
355                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
356                } else {
357                    Ok(const_arr)
358                }
359            }
360            None => Ok(ConstantArray::new(
361                Scalar::null(stats.src.dtype().clone()),
362                stats.src.len(),
363            )
364            .into_array()),
365        }
366    }
367}
368
369impl Scheme for NullDominated {
370    type StatsType = StringStats;
371    type CodeType = StringCode;
372
373    fn code(&self) -> Self::CodeType {
374        SPARSE_SCHEME
375    }
376
377    fn expected_compression_ratio(
378        &self,
379        stats: &Self::StatsType,
380        _is_sample: bool,
381        allowed_cascading: usize,
382        _excludes: &[Self::CodeType],
383    ) -> VortexResult<f64> {
384        // Only use `SparseScheme` if we can cascade.
385        if allowed_cascading == 0 {
386            return Ok(0.0);
387        }
388
389        if stats.value_count == 0 {
390            // All nulls should use ConstantScheme
391            return Ok(0.0);
392        }
393
394        // If the majority is null, will compress well.
395        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
396            return Ok(stats.src.len() as f64 / stats.value_count as f64);
397        }
398
399        // Otherwise we don't go this route
400        Ok(0.0)
401    }
402
403    fn compress(
404        &self,
405        stats: &Self::StatsType,
406        is_sample: bool,
407        allowed_cascading: usize,
408        _excludes: &[Self::CodeType],
409    ) -> VortexResult<ArrayRef> {
410        assert!(allowed_cascading > 0);
411
412        // We pass None as we only run this pathway for NULL-dominated float arrays
413        let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
414
415        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
416            // Compress the values
417            let new_excludes = vec![integer::SparseScheme.code()];
418
419            // Don't attempt to compress the non-null values
420            let indices = sparse.patches().indices().to_primitive().narrow()?;
421            let compressed_indices = IntCompressor::compress_no_dict(
422                &indices,
423                is_sample,
424                allowed_cascading - 1,
425                &new_excludes,
426            )?;
427
428            SparseArray::try_new(
429                compressed_indices,
430                sparse.patches().values().clone(),
431                sparse.len(),
432                sparse.fill_scalar().clone(),
433            )
434            .map(|a| a.into_array())
435        } else {
436            Ok(sparse_encoded)
437        }
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use vortex_array::arrays::VarBinViewArray;
444    use vortex_array::builders::ArrayBuilder;
445    use vortex_array::builders::VarBinViewBuilder;
446    use vortex_dtype::DType;
447    use vortex_dtype::Nullability;
448    use vortex_sparse::SparseVTable;
449
450    use crate::Compressor;
451    use crate::MAX_CASCADE;
452    use crate::string::StringCompressor;
453
454    #[test]
455    fn test_strings() {
456        let mut strings = Vec::new();
457        for _ in 0..1024 {
458            strings.push(Some("hello-world-1234"));
459        }
460        for _ in 0..1024 {
461            strings.push(Some("hello-world-56789"));
462        }
463        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
464
465        println!("original array: {}", strings.as_ref().display_tree());
466
467        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
468
469        println!("compression tree: {}", compressed.display_tree());
470    }
471
472    #[test]
473    fn test_sparse_nulls() {
474        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
475        strings.append_nulls(99);
476
477        strings.append_value("one little string");
478
479        let strings = strings.finish_into_varbinview();
480
481        let compressed = StringCompressor::compress(&strings, false, MAX_CASCADE, &[]).unwrap();
482        assert!(compressed.is::<SparseVTable>());
483    }
484}