vortex_btrblocks/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::arrays::{VarBinArray, VarBinViewArray, VarBinViewVTable};
5use vortex_array::vtable::ValidityHelper;
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dict::builders::dict_encode;
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
11use vortex_utils::aliases::hash_set::HashSet;
12
13use crate::integer::IntCompressor;
14use crate::sample::sample;
15use crate::{
16    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
17    estimate_compression_ratio_with_sampling, integer,
18};
19
20#[derive(Clone, Debug)]
21pub struct StringStats {
22    src: VarBinViewArray,
23    estimated_distinct_count: u32,
24    value_count: u32,
25    // null_count: u32,
26}
27
28/// Estimate the number of distinct strings in the var bin view array.
29#[allow(clippy::cast_possible_truncation)]
30fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
31    let views = strings.views();
32    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
33    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
34    // share a 4-byte prefix and have the same length.
35    let mut distinct = HashSet::with_capacity(views.len() / 2);
36    views.iter().for_each(|&view| {
37        let len_and_prefix = view.as_u128() as u64;
38        distinct.insert(len_and_prefix);
39    });
40
41    distinct
42        .len()
43        .try_into()
44        .vortex_expect("distinct count must fit in u32")
45}
46
47impl CompressorStats for StringStats {
48    type ArrayVTable = VarBinViewVTable;
49
50    fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
51        let null_count = input
52            .statistics()
53            .compute_null_count()
54            .vortex_expect("null count");
55        let value_count = input.len() - null_count;
56        let estimated_distinct = if opts.count_distinct_values {
57            estimate_distinct_count(input)
58        } else {
59            u32::MAX
60        };
61
62        Self {
63            src: input.clone(),
64            value_count: value_count.try_into().vortex_expect("value_count"),
65            estimated_distinct_count: estimated_distinct,
66        }
67    }
68
69    fn source(&self) -> &VarBinViewArray {
70        &self.src
71    }
72
73    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
74        let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
75
76        Self::generate_opts(&sampled, opts)
77    }
78}
79
80pub struct StringCompressor;
81
82impl Compressor for StringCompressor {
83    type ArrayVTable = VarBinViewVTable;
84    type SchemeType = dyn StringScheme;
85    type StatsType = StringStats;
86
87    fn schemes() -> &'static [&'static Self::SchemeType] {
88        &[&UncompressedScheme, &DictScheme, &FSSTScheme]
89    }
90
91    fn default_scheme() -> &'static Self::SchemeType {
92        &UncompressedScheme
93    }
94
95    fn dict_scheme_code() -> StringCode {
96        DICT_SCHEME
97    }
98}
99
100pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
101
102impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
103
104#[derive(Debug, Copy, Clone)]
105pub struct UncompressedScheme;
106
107#[derive(Debug, Copy, Clone)]
108pub struct DictScheme;
109
110#[derive(Debug, Copy, Clone)]
111pub struct FSSTScheme;
112
113#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
114pub struct StringCode(u8);
115
116const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
117const DICT_SCHEME: StringCode = StringCode(1);
118const FSST_SCHEME: StringCode = StringCode(2);
119
120impl Scheme for UncompressedScheme {
121    type StatsType = StringStats;
122    type CodeType = StringCode;
123
124    fn code(&self) -> StringCode {
125        UNCOMPRESSED_SCHEME
126    }
127
128    fn expected_compression_ratio(
129        &self,
130        _stats: &Self::StatsType,
131        _is_sample: bool,
132        _allowed_cascading: usize,
133        _excludes: &[StringCode],
134    ) -> VortexResult<f64> {
135        Ok(1.0)
136    }
137
138    fn compress(
139        &self,
140        stats: &Self::StatsType,
141        _is_sample: bool,
142        _allowed_cascading: usize,
143        _excludes: &[StringCode],
144    ) -> VortexResult<ArrayRef> {
145        Ok(stats.source().to_array())
146    }
147}
148
149impl Scheme for DictScheme {
150    type StatsType = StringStats;
151    type CodeType = StringCode;
152
153    fn code(&self) -> StringCode {
154        DICT_SCHEME
155    }
156
157    fn expected_compression_ratio(
158        &self,
159        stats: &Self::StatsType,
160        is_sample: bool,
161        allowed_cascading: usize,
162        excludes: &[StringCode],
163    ) -> VortexResult<f64> {
164        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
165        if stats.estimated_distinct_count > stats.value_count / 2 {
166            return Ok(0.0);
167        }
168
169        // If array is all null, do not attempt dict.
170        if stats.value_count == 0 {
171            return Ok(0.0);
172        }
173
174        estimate_compression_ratio_with_sampling(
175            self,
176            stats,
177            is_sample,
178            allowed_cascading,
179            excludes,
180        )
181    }
182
183    fn compress(
184        &self,
185        stats: &Self::StatsType,
186        is_sample: bool,
187        allowed_cascading: usize,
188        _excludes: &[StringCode],
189    ) -> VortexResult<ArrayRef> {
190        let dict = dict_encode(&stats.source().clone().into_array())?;
191
192        // If we are not allowed to cascade, do not attempt codes or values compression.
193        if allowed_cascading == 0 {
194            return Ok(dict.into_array());
195        }
196
197        // Find best compressor for codes and values separately
198        let compressed_codes = IntCompressor::compress(
199            &dict.codes().to_primitive(),
200            is_sample,
201            allowed_cascading - 1,
202            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
203        )?;
204
205        // Attempt to compress the values with non-Dict compression.
206        // Currently this will only be FSST.
207        let compressed_values = StringCompressor::compress(
208            &dict.values().to_varbinview(),
209            is_sample,
210            allowed_cascading - 1,
211            &[DictScheme.code()],
212        )?;
213
214        // SAFETY: compressing codes or values does not alter the invariants
215        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
216    }
217}
218
219impl Scheme for FSSTScheme {
220    type StatsType = StringStats;
221    type CodeType = StringCode;
222
223    fn code(&self) -> StringCode {
224        FSST_SCHEME
225    }
226
227    fn compress(
228        &self,
229        stats: &Self::StatsType,
230        is_sample: bool,
231        allowed_cascading: usize,
232        _excludes: &[StringCode],
233    ) -> VortexResult<ArrayRef> {
234        let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
235        let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
236
237        let compressed_original_lengths = IntCompressor::compress(
238            &fsst.uncompressed_lengths().to_primitive().downcast()?,
239            is_sample,
240            allowed_cascading,
241            &[],
242        )?;
243
244        let compressed_codes_offsets = IntCompressor::compress(
245            &fsst.codes().offsets().to_primitive().downcast()?,
246            is_sample,
247            allowed_cascading,
248            &[],
249        )?;
250        let compressed_codes = VarBinArray::try_new(
251            compressed_codes_offsets,
252            fsst.codes().bytes().clone(),
253            fsst.codes().dtype().clone(),
254            fsst.codes().validity().clone(),
255        )?;
256
257        let fsst = FSSTArray::try_new(
258            fsst.dtype().clone(),
259            fsst.symbols().clone(),
260            fsst.symbol_lengths().clone(),
261            compressed_codes,
262            compressed_original_lengths,
263        )?;
264
265        Ok(fsst.into_array())
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use vortex_array::arrays::VarBinViewArray;
272    use vortex_dtype::{DType, Nullability};
273
274    use crate::Compressor;
275    use crate::string::StringCompressor;
276
277    #[test]
278    fn test_strings() {
279        let mut strings = Vec::new();
280        for _ in 0..1024 {
281            strings.push(Some("hello-world-1234"));
282        }
283        for _ in 0..1024 {
284            strings.push(Some("hello-world-56789"));
285        }
286        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
287
288        println!("original array: {}", strings.as_ref().display_tree());
289
290        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
291
292        println!("compression tree: {}", compressed.display_tree());
293    }
294}