vortex_btrblocks/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::arrays::{ConstantArray, VarBinArray, VarBinViewArray, VarBinViewVTable};
5use vortex_array::vtable::ValidityHelper;
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dict::builders::dict_encode;
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
11use vortex_utils::aliases::hash_set::HashSet;
12
13use crate::integer::IntCompressor;
14use crate::sample::sample;
15use crate::{
16    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
17    estimate_compression_ratio_with_sampling, integer,
18};
19
20/// Array of variable-length byte arrays, and relevant stats for compression.
21#[derive(Clone, Debug)]
22pub struct StringStats {
23    src: VarBinViewArray,
24    estimated_distinct_count: u32,
25    value_count: u32,
26}
27
28/// Estimate the number of distinct strings in the var bin view array.
29#[allow(clippy::cast_possible_truncation)]
30fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
31    let views = strings.views();
32    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
33    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
34    // share a 4-byte prefix and have the same length.
35    let mut distinct = HashSet::with_capacity(views.len() / 2);
36    views.iter().for_each(|&view| {
37        let len_and_prefix = view.as_u128() as u64;
38        distinct.insert(len_and_prefix);
39    });
40
41    distinct
42        .len()
43        .try_into()
44        .vortex_expect("distinct count must fit in u32")
45}
46
47impl CompressorStats for StringStats {
48    type ArrayVTable = VarBinViewVTable;
49
50    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
51        let null_count = input
52            .statistics()
53            .compute_null_count()
54            .vortex_expect("null count");
55        let value_count = input.len() - null_count;
56        let estimated_distinct = if opts.count_distinct_values {
57            estimate_distinct_count(input)
58        } else {
59            u32::MAX
60        };
61
62        Self {
63            src: input.clone(),
64            value_count: value_count.try_into().vortex_expect("value_count"),
65            estimated_distinct_count: estimated_distinct,
66        }
67    }
68
69    fn source(&self) -> &VarBinViewArray {
70        &self.src
71    }
72
73    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
74        let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
75
76        Self::generate_opts(&sampled, opts)
77    }
78}
79
80/// [`Compressor`] for strings.
81pub struct StringCompressor;
82
83impl Compressor for StringCompressor {
84    type ArrayVTable = VarBinViewVTable;
85    type SchemeType = dyn StringScheme;
86    type StatsType = StringStats;
87
88    fn schemes() -> &'static [&'static Self::SchemeType] {
89        &[
90            &UncompressedScheme,
91            &DictScheme,
92            &FSSTScheme,
93            &ConstantScheme,
94        ]
95    }
96
97    fn default_scheme() -> &'static Self::SchemeType {
98        &UncompressedScheme
99    }
100
101    fn dict_scheme_code() -> StringCode {
102        DICT_SCHEME
103    }
104}
105
106pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
107
108impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
109
110#[derive(Debug, Copy, Clone)]
111pub struct UncompressedScheme;
112
113#[derive(Debug, Copy, Clone)]
114pub struct DictScheme;
115
116#[derive(Debug, Copy, Clone)]
117pub struct FSSTScheme;
118
119#[derive(Debug, Copy, Clone)]
120pub struct ConstantScheme;
121
122#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
123pub struct StringCode(u8);
124
125const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
126const DICT_SCHEME: StringCode = StringCode(1);
127const FSST_SCHEME: StringCode = StringCode(2);
128const CONSTANT_SCHEME: StringCode = StringCode(3);
129
130impl Scheme for UncompressedScheme {
131    type StatsType = StringStats;
132    type CodeType = StringCode;
133
134    fn code(&self) -> StringCode {
135        UNCOMPRESSED_SCHEME
136    }
137
138    fn expected_compression_ratio(
139        &self,
140        _stats: &Self::StatsType,
141        _is_sample: bool,
142        _allowed_cascading: usize,
143        _excludes: &[StringCode],
144    ) -> VortexResult<f64> {
145        Ok(1.0)
146    }
147
148    fn compress(
149        &self,
150        stats: &Self::StatsType,
151        _is_sample: bool,
152        _allowed_cascading: usize,
153        _excludes: &[StringCode],
154    ) -> VortexResult<ArrayRef> {
155        Ok(stats.source().to_array())
156    }
157}
158
159impl Scheme for DictScheme {
160    type StatsType = StringStats;
161    type CodeType = StringCode;
162
163    fn code(&self) -> StringCode {
164        DICT_SCHEME
165    }
166
167    fn expected_compression_ratio(
168        &self,
169        stats: &Self::StatsType,
170        is_sample: bool,
171        allowed_cascading: usize,
172        excludes: &[StringCode],
173    ) -> VortexResult<f64> {
174        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
175        if stats.estimated_distinct_count > stats.value_count / 2 {
176            return Ok(0.0);
177        }
178
179        // If array is all null, do not attempt dict.
180        if stats.value_count == 0 {
181            return Ok(0.0);
182        }
183
184        estimate_compression_ratio_with_sampling(
185            self,
186            stats,
187            is_sample,
188            allowed_cascading,
189            excludes,
190        )
191    }
192
193    fn compress(
194        &self,
195        stats: &Self::StatsType,
196        is_sample: bool,
197        allowed_cascading: usize,
198        _excludes: &[StringCode],
199    ) -> VortexResult<ArrayRef> {
200        let dict = dict_encode(&stats.source().clone().into_array())?;
201
202        // If we are not allowed to cascade, do not attempt codes or values compression.
203        if allowed_cascading == 0 {
204            return Ok(dict.into_array());
205        }
206
207        // Find best compressor for codes and values separately
208        let compressed_codes = IntCompressor::compress(
209            &dict.codes().to_primitive(),
210            is_sample,
211            allowed_cascading - 1,
212            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
213        )?;
214
215        // Attempt to compress the values with non-Dict compression.
216        // Currently this will only be FSST.
217        let compressed_values = StringCompressor::compress(
218            &dict.values().to_varbinview(),
219            is_sample,
220            allowed_cascading - 1,
221            &[DictScheme.code()],
222        )?;
223
224        // SAFETY: compressing codes or values does not alter the invariants
225        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
226    }
227}
228
229impl Scheme for FSSTScheme {
230    type StatsType = StringStats;
231    type CodeType = StringCode;
232
233    fn code(&self) -> StringCode {
234        FSST_SCHEME
235    }
236
237    fn compress(
238        &self,
239        stats: &Self::StatsType,
240        is_sample: bool,
241        allowed_cascading: usize,
242        _excludes: &[StringCode],
243    ) -> VortexResult<ArrayRef> {
244        let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
245        let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
246
247        let compressed_original_lengths = IntCompressor::compress(
248            &fsst.uncompressed_lengths().to_primitive().downcast()?,
249            is_sample,
250            allowed_cascading,
251            &[],
252        )?;
253
254        let compressed_codes_offsets = IntCompressor::compress(
255            &fsst.codes().offsets().to_primitive().downcast()?,
256            is_sample,
257            allowed_cascading,
258            &[],
259        )?;
260        let compressed_codes = VarBinArray::try_new(
261            compressed_codes_offsets,
262            fsst.codes().bytes().clone(),
263            fsst.codes().dtype().clone(),
264            fsst.codes().validity().clone(),
265        )?;
266
267        let fsst = FSSTArray::try_new(
268            fsst.dtype().clone(),
269            fsst.symbols().clone(),
270            fsst.symbol_lengths().clone(),
271            compressed_codes,
272            compressed_original_lengths,
273        )?;
274
275        Ok(fsst.into_array())
276    }
277}
278
279impl Scheme for ConstantScheme {
280    type StatsType = StringStats;
281    type CodeType = StringCode;
282
283    fn code(&self) -> Self::CodeType {
284        CONSTANT_SCHEME
285    }
286
287    fn is_constant(&self) -> bool {
288        true
289    }
290
291    fn expected_compression_ratio(
292        &self,
293        stats: &Self::StatsType,
294        is_sample: bool,
295        _allowed_cascading: usize,
296        _excludes: &[Self::CodeType],
297    ) -> VortexResult<f64> {
298        if is_sample {
299            return Ok(0.0);
300        }
301
302        if stats.src.is_constant() {
303            // Force constant
304            Ok(f64::MAX)
305        } else {
306            Ok(0.0)
307        }
308    }
309
310    fn compress(
311        &self,
312        stats: &Self::StatsType,
313        _is_sample: bool,
314        _allowed_cascading: usize,
315        _excludes: &[Self::CodeType],
316    ) -> VortexResult<ArrayRef> {
317        let scalar = stats
318            .src
319            .as_constant()
320            .vortex_expect("ConstantScheme::compress can only be called when array is constant");
321
322        Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use vortex_array::arrays::VarBinViewArray;
329    use vortex_dtype::{DType, Nullability};
330
331    use crate::Compressor;
332    use crate::string::StringCompressor;
333
334    #[test]
335    fn test_strings() {
336        let mut strings = Vec::new();
337        for _ in 0..1024 {
338            strings.push(Some("hello-world-1234"));
339        }
340        for _ in 0..1024 {
341            strings.push(Some("hello-world-56789"));
342        }
343        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
344
345        println!("original array: {}", strings.as_ref().display_tree());
346
347        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
348
349        println!("compression tree: {}", compressed.display_tree());
350    }
351}