vortex_btrblocks/
string.rs

1use vortex_array::aliases::hash_set::HashSet;
2use vortex_array::arrays::{VarBinArray, VarBinViewArray};
3use vortex_array::{Array, ArrayRef, ToCanonical};
4use vortex_dict::DictArray;
5use vortex_dict::builders::dict_encode;
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
8
9use crate::integer::IntCompressor;
10use crate::sample::sample;
11use crate::{
12    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
13    estimate_compression_ratio_with_sampling,
14};
15
16#[derive(Clone, Debug)]
17pub struct StringStats {
18    src: VarBinViewArray,
19    estimated_distinct_count: u32,
20    value_count: u32,
21    // null_count: u32,
22}
23
24/// Estimate the number of distinct strings in the var bin view array.
25#[allow(clippy::cast_possible_truncation)]
26fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
27    let views = strings.views();
28    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
29    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
30    // share a 4-byte prefix and have the same length.
31    let mut distinct = HashSet::with_capacity(views.len() / 2);
32    views.iter().for_each(|&view| {
33        let len_and_prefix = view.as_u128() as u64;
34        distinct.insert(len_and_prefix);
35    });
36
37    distinct
38        .len()
39        .try_into()
40        .vortex_expect("distinct count must fit in u32")
41}
42
43impl CompressorStats for StringStats {
44    type ArrayType = VarBinViewArray;
45
46    fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self {
47        let null_count = input
48            .statistics()
49            .compute_null_count()
50            .vortex_expect("null count");
51        let value_count = input.len() - null_count;
52        let estimated_distinct = if opts.count_distinct_values {
53            estimate_distinct_count(input)
54        } else {
55            u32::MAX
56        };
57
58        Self {
59            src: input.clone(),
60            value_count: value_count.try_into().vortex_expect("value_count"),
61            // null_count: null_count.try_into().vortex_expect("null_count"),
62            estimated_distinct_count: estimated_distinct,
63        }
64    }
65
66    fn source(&self) -> &Self::ArrayType {
67        &self.src
68    }
69
70    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
71        let sampled = sample(self.src.clone(), sample_size, sample_count)
72            .to_varbinview()
73            .vortex_expect("varbinview");
74
75        Self::generate_opts(&sampled, opts)
76    }
77}
78
79pub struct StringCompressor;
80
81impl Compressor for StringCompressor {
82    type ArrayType = VarBinViewArray;
83    type SchemeType = dyn StringScheme;
84    type StatsType = StringStats;
85
86    fn schemes() -> &'static [&'static Self::SchemeType] {
87        &[&UncompressedScheme, &DictScheme, &FSSTScheme]
88    }
89
90    fn default_scheme() -> &'static Self::SchemeType {
91        &UncompressedScheme
92    }
93
94    fn dict_scheme_code() -> StringCode {
95        DICT_SCHEME
96    }
97}
98
99pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
100
101impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
102
103#[derive(Debug, Copy, Clone)]
104pub struct UncompressedScheme;
105
106#[derive(Debug, Copy, Clone)]
107pub struct DictScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct FSSTScheme;
111
112#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
113pub struct StringCode(u8);
114
115const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
116const DICT_SCHEME: StringCode = StringCode(1);
117const FSST_SCHEME: StringCode = StringCode(2);
118
119impl Scheme for UncompressedScheme {
120    type StatsType = StringStats;
121    type CodeType = StringCode;
122
123    fn code(&self) -> StringCode {
124        UNCOMPRESSED_SCHEME
125    }
126
127    fn expected_compression_ratio(
128        &self,
129        _stats: &Self::StatsType,
130        _is_sample: bool,
131        _allowed_cascading: usize,
132        _excludes: &[StringCode],
133    ) -> VortexResult<f64> {
134        Ok(1.0)
135    }
136
137    fn compress(
138        &self,
139        stats: &Self::StatsType,
140        _is_sample: bool,
141        _allowed_cascading: usize,
142        _excludes: &[StringCode],
143    ) -> VortexResult<ArrayRef> {
144        Ok(stats.source().clone().into_array())
145    }
146}
147
148impl Scheme for DictScheme {
149    type StatsType = StringStats;
150    type CodeType = StringCode;
151
152    fn code(&self) -> StringCode {
153        DICT_SCHEME
154    }
155
156    fn expected_compression_ratio(
157        &self,
158        stats: &Self::StatsType,
159        is_sample: bool,
160        allowed_cascading: usize,
161        excludes: &[StringCode],
162    ) -> VortexResult<f64> {
163        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
164        if stats.estimated_distinct_count > stats.value_count / 2 {
165            return Ok(0.0);
166        }
167
168        // If array is all null, do not attempt dict.
169        if stats.value_count == 0 {
170            return Ok(0.0);
171        }
172
173        estimate_compression_ratio_with_sampling(
174            self,
175            stats,
176            is_sample,
177            allowed_cascading,
178            excludes,
179        )
180    }
181
182    fn compress(
183        &self,
184        stats: &Self::StatsType,
185        is_sample: bool,
186        allowed_cascading: usize,
187        _excludes: &[StringCode],
188    ) -> VortexResult<ArrayRef> {
189        let dict = dict_encode(&stats.source().clone().into_array())?;
190
191        // If we are not allowed to cascade, do not attempt codes or values compression.
192        if allowed_cascading == 0 {
193            return Ok(dict.into_array());
194        }
195
196        // Find best compressor for codes and values separately
197        let compressed_codes = IntCompressor::compress(
198            &dict.codes().to_primitive()?,
199            is_sample,
200            allowed_cascading - 1,
201            &[crate::integer::DictScheme.code()],
202        )?;
203
204        // Attempt to compress the values with non-Dict compression.
205        // Currently this will only be FSST.
206        let compressed_values = StringCompressor::compress(
207            &dict.values().to_varbinview()?,
208            is_sample,
209            allowed_cascading - 1,
210            &[DictScheme.code()],
211        )?;
212
213        Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
214    }
215}
216
217impl Scheme for FSSTScheme {
218    type StatsType = StringStats;
219    type CodeType = StringCode;
220
221    fn code(&self) -> StringCode {
222        FSST_SCHEME
223    }
224
225    fn compress(
226        &self,
227        stats: &Self::StatsType,
228        is_sample: bool,
229        allowed_cascading: usize,
230        _excludes: &[StringCode],
231    ) -> VortexResult<ArrayRef> {
232        let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
233        let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
234
235        let compressed_original_lengths = IntCompressor::compress(
236            &fsst.uncompressed_lengths().to_primitive()?,
237            is_sample,
238            allowed_cascading,
239            &[],
240        )?;
241
242        // We compress the var bin offsets of the FSST codes array.
243        let compressed_codes_offsets = IntCompressor::compress(
244            &fsst.codes().offsets().to_primitive()?,
245            is_sample,
246            allowed_cascading,
247            &[],
248        )?;
249        let compressed_codes = VarBinArray::try_new(
250            compressed_codes_offsets,
251            fsst.codes().bytes().clone(),
252            fsst.codes().dtype().clone(),
253            fsst.codes().validity().clone(),
254        )?;
255
256        let fsst = FSSTArray::try_new(
257            fsst.dtype().clone(),
258            fsst.symbols().clone(),
259            fsst.symbol_lengths().clone(),
260            compressed_codes,
261            compressed_original_lengths,
262        )?;
263
264        Ok(fsst.into_array())
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use vortex_array::Array;
271    use vortex_array::arrays::VarBinViewArray;
272    use vortex_dtype::{DType, Nullability};
273
274    use crate::Compressor;
275    use crate::string::StringCompressor;
276
277    #[test]
278    fn test_strings() {
279        let mut strings = Vec::new();
280        for _ in 0..1024 {
281            strings.push(Some("hello-world-1234"));
282        }
283        for _ in 0..1024 {
284            strings.push(Some("hello-world-56789"));
285        }
286        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
287
288        println!(
289            "original array: {}",
290            (&strings as &dyn Array).tree_display()
291        );
292
293        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
294
295        println!("compression tree: {}", compressed.tree_display());
296    }
297}