vortex_btrblocks/
string.rs

1use vortex_array::aliases::hash_set::HashSet;
2use vortex_array::arrays::{VarBinArray, VarBinViewArray, VarBinViewVTable};
3use vortex_array::vtable::ValidityHelper;
4use vortex_array::{ArrayRef, IntoArray, ToCanonical};
5use vortex_dict::DictArray;
6use vortex_dict::builders::dict_encode;
7use vortex_error::{VortexExpect, VortexResult};
8use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
9
10use crate::integer::IntCompressor;
11use crate::sample::sample;
12use crate::{
13    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
14    estimate_compression_ratio_with_sampling,
15};
16
17#[derive(Clone, Debug)]
18pub struct StringStats {
19    src: VarBinViewArray,
20    estimated_distinct_count: u32,
21    value_count: u32,
22    // null_count: u32,
23}
24
25/// Estimate the number of distinct strings in the var bin view array.
26#[allow(clippy::cast_possible_truncation)]
27fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
28    let views = strings.views();
29    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
30    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
31    // share a 4-byte prefix and have the same length.
32    let mut distinct = HashSet::with_capacity(views.len() / 2);
33    views.iter().for_each(|&view| {
34        let len_and_prefix = view.as_u128() as u64;
35        distinct.insert(len_and_prefix);
36    });
37
38    distinct
39        .len()
40        .try_into()
41        .vortex_expect("distinct count must fit in u32")
42}
43
44impl CompressorStats for StringStats {
45    type ArrayVTable = VarBinViewVTable;
46
47    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
48        let null_count = input
49            .statistics()
50            .compute_null_count()
51            .vortex_expect("null count");
52        let value_count = input.len() - null_count;
53        let estimated_distinct = if opts.count_distinct_values {
54            estimate_distinct_count(input)
55        } else {
56            u32::MAX
57        };
58
59        Self {
60            src: input.clone(),
61            value_count: value_count.try_into().vortex_expect("value_count"),
62            // null_count: null_count.try_into().vortex_expect("null_count"),
63            estimated_distinct_count: estimated_distinct,
64        }
65    }
66
67    fn source(&self) -> &VarBinViewArray {
68        &self.src
69    }
70
71    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
72        let sampled = sample(self.src.as_ref(), sample_size, sample_count)
73            .to_varbinview()
74            .vortex_expect("varbinview");
75
76        Self::generate_opts(&sampled, opts)
77    }
78}
79
80pub struct StringCompressor;
81
82impl Compressor for StringCompressor {
83    type ArrayVTable = VarBinViewVTable;
84    type SchemeType = dyn StringScheme;
85    type StatsType = StringStats;
86
87    fn schemes() -> &'static [&'static Self::SchemeType] {
88        &[&UncompressedScheme, &DictScheme, &FSSTScheme]
89    }
90
91    fn default_scheme() -> &'static Self::SchemeType {
92        &UncompressedScheme
93    }
94
95    fn dict_scheme_code() -> StringCode {
96        DICT_SCHEME
97    }
98}
99
100pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
101
102impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
103
104#[derive(Debug, Copy, Clone)]
105pub struct UncompressedScheme;
106
107#[derive(Debug, Copy, Clone)]
108pub struct DictScheme;
109
110#[derive(Debug, Copy, Clone)]
111pub struct FSSTScheme;
112
113#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
114pub struct StringCode(u8);
115
116const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
117const DICT_SCHEME: StringCode = StringCode(1);
118const FSST_SCHEME: StringCode = StringCode(2);
119
120impl Scheme for UncompressedScheme {
121    type StatsType = StringStats;
122    type CodeType = StringCode;
123
124    fn code(&self) -> StringCode {
125        UNCOMPRESSED_SCHEME
126    }
127
128    fn expected_compression_ratio(
129        &self,
130        _stats: &Self::StatsType,
131        _is_sample: bool,
132        _allowed_cascading: usize,
133        _excludes: &[StringCode],
134    ) -> VortexResult<f64> {
135        Ok(1.0)
136    }
137
138    fn compress(
139        &self,
140        stats: &Self::StatsType,
141        _is_sample: bool,
142        _allowed_cascading: usize,
143        _excludes: &[StringCode],
144    ) -> VortexResult<ArrayRef> {
145        Ok(stats.source().to_array())
146    }
147}
148
149impl Scheme for DictScheme {
150    type StatsType = StringStats;
151    type CodeType = StringCode;
152
153    fn code(&self) -> StringCode {
154        DICT_SCHEME
155    }
156
157    fn expected_compression_ratio(
158        &self,
159        stats: &Self::StatsType,
160        is_sample: bool,
161        allowed_cascading: usize,
162        excludes: &[StringCode],
163    ) -> VortexResult<f64> {
164        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
165        if stats.estimated_distinct_count > stats.value_count / 2 {
166            return Ok(0.0);
167        }
168
169        // If array is all null, do not attempt dict.
170        if stats.value_count == 0 {
171            return Ok(0.0);
172        }
173
174        estimate_compression_ratio_with_sampling(
175            self,
176            stats,
177            is_sample,
178            allowed_cascading,
179            excludes,
180        )
181    }
182
183    fn compress(
184        &self,
185        stats: &Self::StatsType,
186        is_sample: bool,
187        allowed_cascading: usize,
188        _excludes: &[StringCode],
189    ) -> VortexResult<ArrayRef> {
190        let dict = dict_encode(&stats.source().clone().into_array())?;
191
192        // If we are not allowed to cascade, do not attempt codes or values compression.
193        if allowed_cascading == 0 {
194            return Ok(dict.into_array());
195        }
196
197        // Find best compressor for codes and values separately
198        let compressed_codes = IntCompressor::compress(
199            &dict.codes().to_primitive()?,
200            is_sample,
201            allowed_cascading - 1,
202            &[crate::integer::DictScheme.code()],
203        )?;
204
205        // Attempt to compress the values with non-Dict compression.
206        // Currently this will only be FSST.
207        let compressed_values = StringCompressor::compress(
208            &dict.values().to_varbinview()?,
209            is_sample,
210            allowed_cascading - 1,
211            &[DictScheme.code()],
212        )?;
213
214        Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
215    }
216}
217
218impl Scheme for FSSTScheme {
219    type StatsType = StringStats;
220    type CodeType = StringCode;
221
222    fn code(&self) -> StringCode {
223        FSST_SCHEME
224    }
225
226    fn compress(
227        &self,
228        stats: &Self::StatsType,
229        is_sample: bool,
230        allowed_cascading: usize,
231        _excludes: &[StringCode],
232    ) -> VortexResult<ArrayRef> {
233        let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
234        let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
235
236        let compressed_original_lengths = IntCompressor::compress(
237            &fsst.uncompressed_lengths().to_primitive()?,
238            is_sample,
239            allowed_cascading,
240            &[],
241        )?;
242
243        // We compress the var bin offsets of the FSST codes array.
244        let compressed_codes_offsets = IntCompressor::compress(
245            &fsst.codes().offsets().to_primitive()?,
246            is_sample,
247            allowed_cascading,
248            &[],
249        )?;
250        let compressed_codes = VarBinArray::try_new(
251            compressed_codes_offsets,
252            fsst.codes().bytes().clone(),
253            fsst.codes().dtype().clone(),
254            fsst.codes().validity().clone(),
255        )?;
256
257        let fsst = FSSTArray::try_new(
258            fsst.dtype().clone(),
259            fsst.symbols().clone(),
260            fsst.symbol_lengths().clone(),
261            compressed_codes,
262            compressed_original_lengths,
263        )?;
264
265        Ok(fsst.into_array())
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use vortex_array::arrays::VarBinViewArray;
272    use vortex_dtype::{DType, Nullability};
273
274    use crate::Compressor;
275    use crate::string::StringCompressor;
276
277    #[test]
278    fn test_strings() {
279        let mut strings = Vec::new();
280        for _ in 0..1024 {
281            strings.push(Some("hello-world-1234"));
282        }
283        for _ in 0..1024 {
284            strings.push(Some("hello-world-56789"));
285        }
286        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
287
288        println!("original array: {}", strings.as_ref().tree_display());
289
290        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
291
292        println!("compression tree: {}", compressed.tree_display());
293    }
294}