vortex_btrblocks/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::arrays::{VarBinArray, VarBinViewArray, VarBinViewVTable};
5use vortex_array::vtable::ValidityHelper;
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dict::builders::dict_encode;
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
11use vortex_utils::aliases::hash_set::HashSet;
12
13use crate::integer::IntCompressor;
14use crate::sample::sample;
15use crate::{
16    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
17    estimate_compression_ratio_with_sampling, integer,
18};
19
20#[derive(Clone, Debug)]
21pub struct StringStats {
22    src: VarBinViewArray,
23    estimated_distinct_count: u32,
24    value_count: u32,
25    // null_count: u32,
26}
27
28/// Estimate the number of distinct strings in the var bin view array.
29#[allow(clippy::cast_possible_truncation)]
30fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
31    let views = strings.views();
32    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
33    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
34    // share a 4-byte prefix and have the same length.
35    let mut distinct = HashSet::with_capacity(views.len() / 2);
36    views.iter().for_each(|&view| {
37        let len_and_prefix = view.as_u128() as u64;
38        distinct.insert(len_and_prefix);
39    });
40
41    distinct
42        .len()
43        .try_into()
44        .vortex_expect("distinct count must fit in u32")
45}
46
47impl CompressorStats for StringStats {
48    type ArrayVTable = VarBinViewVTable;
49
50    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
51        let null_count = input
52            .statistics()
53            .compute_null_count()
54            .vortex_expect("null count");
55        let value_count = input.len() - null_count;
56        let estimated_distinct = if opts.count_distinct_values {
57            estimate_distinct_count(input)
58        } else {
59            u32::MAX
60        };
61
62        Self {
63            src: input.clone(),
64            value_count: value_count.try_into().vortex_expect("value_count"),
65            estimated_distinct_count: estimated_distinct,
66        }
67    }
68
69    fn source(&self) -> &VarBinViewArray {
70        &self.src
71    }
72
73    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
74        let sampled = sample(self.src.as_ref(), sample_size, sample_count)
75            .to_varbinview()
76            .vortex_expect("varbinview");
77
78        Self::generate_opts(&sampled, opts)
79    }
80}
81
82pub struct StringCompressor;
83
84impl Compressor for StringCompressor {
85    type ArrayVTable = VarBinViewVTable;
86    type SchemeType = dyn StringScheme;
87    type StatsType = StringStats;
88
89    fn schemes() -> &'static [&'static Self::SchemeType] {
90        &[&UncompressedScheme, &DictScheme, &FSSTScheme]
91    }
92
93    fn default_scheme() -> &'static Self::SchemeType {
94        &UncompressedScheme
95    }
96
97    fn dict_scheme_code() -> StringCode {
98        DICT_SCHEME
99    }
100}
101
102pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
103
104impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
105
106#[derive(Debug, Copy, Clone)]
107pub struct UncompressedScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct DictScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct FSSTScheme;
114
115#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
116pub struct StringCode(u8);
117
118const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
119const DICT_SCHEME: StringCode = StringCode(1);
120const FSST_SCHEME: StringCode = StringCode(2);
121
122impl Scheme for UncompressedScheme {
123    type StatsType = StringStats;
124    type CodeType = StringCode;
125
126    fn code(&self) -> StringCode {
127        UNCOMPRESSED_SCHEME
128    }
129
130    fn expected_compression_ratio(
131        &self,
132        _stats: &Self::StatsType,
133        _is_sample: bool,
134        _allowed_cascading: usize,
135        _excludes: &[StringCode],
136    ) -> VortexResult<f64> {
137        Ok(1.0)
138    }
139
140    fn compress(
141        &self,
142        stats: &Self::StatsType,
143        _is_sample: bool,
144        _allowed_cascading: usize,
145        _excludes: &[StringCode],
146    ) -> VortexResult<ArrayRef> {
147        Ok(stats.source().to_array())
148    }
149}
150
151impl Scheme for DictScheme {
152    type StatsType = StringStats;
153    type CodeType = StringCode;
154
155    fn code(&self) -> StringCode {
156        DICT_SCHEME
157    }
158
159    fn expected_compression_ratio(
160        &self,
161        stats: &Self::StatsType,
162        is_sample: bool,
163        allowed_cascading: usize,
164        excludes: &[StringCode],
165    ) -> VortexResult<f64> {
166        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
167        if stats.estimated_distinct_count > stats.value_count / 2 {
168            return Ok(0.0);
169        }
170
171        // If array is all null, do not attempt dict.
172        if stats.value_count == 0 {
173            return Ok(0.0);
174        }
175
176        estimate_compression_ratio_with_sampling(
177            self,
178            stats,
179            is_sample,
180            allowed_cascading,
181            excludes,
182        )
183    }
184
185    fn compress(
186        &self,
187        stats: &Self::StatsType,
188        is_sample: bool,
189        allowed_cascading: usize,
190        _excludes: &[StringCode],
191    ) -> VortexResult<ArrayRef> {
192        let dict = dict_encode(&stats.source().clone().into_array())?;
193
194        // If we are not allowed to cascade, do not attempt codes or values compression.
195        if allowed_cascading == 0 {
196            return Ok(dict.into_array());
197        }
198
199        // Find best compressor for codes and values separately
200        let compressed_codes = IntCompressor::compress(
201            &dict.codes().to_primitive()?,
202            is_sample,
203            allowed_cascading - 1,
204            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
205        )?;
206
207        // Attempt to compress the values with non-Dict compression.
208        // Currently this will only be FSST.
209        let compressed_values = StringCompressor::compress(
210            &dict.values().to_varbinview()?,
211            is_sample,
212            allowed_cascading - 1,
213            &[DictScheme.code()],
214        )?;
215
216        // SAFETY: compressing codes or values does not alter the invariants
217        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
218    }
219}
220
221impl Scheme for FSSTScheme {
222    type StatsType = StringStats;
223    type CodeType = StringCode;
224
225    fn code(&self) -> StringCode {
226        FSST_SCHEME
227    }
228
229    fn compress(
230        &self,
231        stats: &Self::StatsType,
232        is_sample: bool,
233        allowed_cascading: usize,
234        _excludes: &[StringCode],
235    ) -> VortexResult<ArrayRef> {
236        let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
237        let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
238
239        let compressed_original_lengths = IntCompressor::compress(
240            &fsst.uncompressed_lengths().to_primitive()?.downcast()?,
241            is_sample,
242            allowed_cascading,
243            &[],
244        )?;
245
246        let compressed_codes_offsets = IntCompressor::compress(
247            &fsst.codes().offsets().to_primitive()?.downcast()?,
248            is_sample,
249            allowed_cascading,
250            &[],
251        )?;
252        let compressed_codes = VarBinArray::try_new(
253            compressed_codes_offsets,
254            fsst.codes().bytes().clone(),
255            fsst.codes().dtype().clone(),
256            fsst.codes().validity().clone(),
257        )?;
258
259        let fsst = FSSTArray::try_new(
260            fsst.dtype().clone(),
261            fsst.symbols().clone(),
262            fsst.symbol_lengths().clone(),
263            compressed_codes,
264            compressed_original_lengths,
265        )?;
266
267        Ok(fsst.into_array())
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use vortex_array::arrays::VarBinViewArray;
274    use vortex_dtype::{DType, Nullability};
275
276    use crate::Compressor;
277    use crate::string::StringCompressor;
278
279    #[test]
280    fn test_strings() {
281        let mut strings = Vec::new();
282        for _ in 0..1024 {
283            strings.push(Some("hello-world-1234"));
284        }
285        for _ in 0..1024 {
286            strings.push(Some("hello-world-56789"));
287        }
288        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
289
290        println!("original array: {}", strings.as_ref().display_tree());
291
292        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
293
294        println!("compression tree: {}", compressed.display_tree());
295    }
296}