vortex_btrblocks/
string.rs

1use vortex_array::aliases::hash_set::HashSet;
2use vortex_array::arrays::VarBinViewArray;
3use vortex_array::{Array, ArrayRef, ToCanonical};
4use vortex_dict::DictArray;
5use vortex_dict::builders::dict_encode;
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_fsst::{fsst_compress, fsst_train_compressor};
8
9use crate::downscale::downscale_integer_array;
10use crate::integer::IntCompressor;
11use crate::sample::sample;
12use crate::{
13    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
14    estimate_compression_ratio_with_sampling,
15};
16
17#[derive(Clone)]
18pub struct StringStats {
19    src: VarBinViewArray,
20    estimated_distinct_count: u32,
21    value_count: u32,
22    // null_count: u32,
23}
24
25/// Estimate the number of distinct strings in the var bin view array.
26#[allow(clippy::cast_possible_truncation)]
27fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
28    let views = strings.views();
29    // Iterate the views. Two strings which are equal must have the same first 8-bytes.
30    // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all
31    // share a 4-byte prefix and have the same length.
32    let mut disinct = HashSet::with_capacity(views.len() / 2);
33    views.iter().for_each(|&view| {
34        // SAFETY: we're doing bitwise manipulations. Did you expect that to be safe??
35        let view_u128: u128 = unsafe { std::mem::transmute(view) };
36        let len_and_prefix = view_u128 as u64;
37        disinct.insert(len_and_prefix);
38    });
39
40    disinct
41        .len()
42        .try_into()
43        .vortex_expect("distinct count must fit in u32")
44}
45
46impl CompressorStats for StringStats {
47    type ArrayType = VarBinViewArray;
48
49    fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self {
50        let null_count = input
51            .validity()
52            .null_count(input.len())
53            .vortex_expect("null_count");
54        let value_count = input.len() - null_count;
55        let estimated_distinct = if opts.count_distinct_values {
56            estimate_distinct_count(input)
57        } else {
58            u32::MAX
59        };
60
61        Self {
62            src: input.clone(),
63            value_count: value_count.try_into().vortex_expect("value_count"),
64            // null_count: null_count.try_into().vortex_expect("null_count"),
65            estimated_distinct_count: estimated_distinct,
66        }
67    }
68
69    fn source(&self) -> &Self::ArrayType {
70        &self.src
71    }
72
73    fn sample_opts(&self, sample_size: u16, sample_count: u16, opts: GenerateStatsOptions) -> Self {
74        let sampled = sample(self.src.clone(), sample_size, sample_count)
75            .to_varbinview()
76            .vortex_expect("varbinview");
77
78        Self::generate_opts(&sampled, opts)
79    }
80}
81
82pub struct StringCompressor;
83
84impl Compressor for StringCompressor {
85    type ArrayType = VarBinViewArray;
86    type SchemeType = dyn StringScheme;
87    type StatsType = StringStats;
88
89    fn schemes() -> &'static [&'static Self::SchemeType] {
90        &[&UncompressedScheme, &DictScheme, &FSSTScheme]
91    }
92
93    fn default_scheme() -> &'static Self::SchemeType {
94        &UncompressedScheme
95    }
96
97    fn dict_scheme_code() -> StringCode {
98        DICT_SCHEME
99    }
100}
101
102pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
103
104impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
105
106#[derive(Debug, Copy, Clone)]
107pub struct UncompressedScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct DictScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct FSSTScheme;
114
115#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
116pub struct StringCode(u8);
117
118const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
119const DICT_SCHEME: StringCode = StringCode(1);
120const FSST_SCHEME: StringCode = StringCode(2);
121
122impl Scheme for UncompressedScheme {
123    type StatsType = StringStats;
124    type CodeType = StringCode;
125
126    fn code(&self) -> StringCode {
127        UNCOMPRESSED_SCHEME
128    }
129
130    fn expected_compression_ratio(
131        &self,
132        _stats: &Self::StatsType,
133        _is_sample: bool,
134        _allowed_cascading: usize,
135        _excludes: &[StringCode],
136    ) -> VortexResult<f64> {
137        Ok(1.0)
138    }
139
140    fn compress(
141        &self,
142        stats: &Self::StatsType,
143        _is_sample: bool,
144        _allowed_cascading: usize,
145        _excludes: &[StringCode],
146    ) -> VortexResult<ArrayRef> {
147        Ok(stats.source().clone().into_array())
148    }
149}
150
151impl Scheme for DictScheme {
152    type StatsType = StringStats;
153    type CodeType = StringCode;
154
155    fn code(&self) -> StringCode {
156        DICT_SCHEME
157    }
158
159    fn expected_compression_ratio(
160        &self,
161        stats: &Self::StatsType,
162        is_sample: bool,
163        allowed_cascading: usize,
164        excludes: &[StringCode],
165    ) -> VortexResult<f64> {
166        // If we don't have a sufficiently high number of distinct values, do not attempt Dict.
167        if stats.estimated_distinct_count > stats.value_count / 2 {
168            return Ok(0.0);
169        }
170
171        // If array is all null, do not attempt dict.
172        if stats.value_count == 0 {
173            return Ok(0.0);
174        }
175
176        estimate_compression_ratio_with_sampling(
177            self,
178            stats,
179            is_sample,
180            allowed_cascading,
181            excludes,
182        )
183    }
184
185    fn compress(
186        &self,
187        stats: &Self::StatsType,
188        is_sample: bool,
189        allowed_cascading: usize,
190        _excludes: &[StringCode],
191    ) -> VortexResult<ArrayRef> {
192        let dict = dict_encode(&stats.source().clone().into_array())?;
193
194        // If we are not allowed to cascade, do not attempt codes or values compression.
195        if allowed_cascading == 0 {
196            return Ok(dict.into_array());
197        }
198
199        // Find best compressor for codes and values separately
200        let downscaled_codes = downscale_integer_array(dict.codes().to_array())?.to_primitive()?;
201        let compressed_codes = IntCompressor::compress(
202            &downscaled_codes,
203            is_sample,
204            allowed_cascading - 1,
205            &[crate::integer::DictScheme.code()],
206        )?;
207
208        // Attempt to compress the values with non-Dict compression.
209        // Currently this will only be FSST.
210        let compressed_values = StringCompressor::compress(
211            &dict.values().to_varbinview()?,
212            is_sample,
213            allowed_cascading - 1,
214            &[DictScheme.code()],
215        )?;
216
217        Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
218    }
219}
220
221impl Scheme for FSSTScheme {
222    type StatsType = StringStats;
223    type CodeType = StringCode;
224
225    fn code(&self) -> StringCode {
226        FSST_SCHEME
227    }
228
229    fn compress(
230        &self,
231        stats: &Self::StatsType,
232        _is_sample: bool,
233        _allowed_cascading: usize,
234        _excludes: &[StringCode],
235    ) -> VortexResult<ArrayRef> {
236        let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
237        let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
238
239        Ok(fsst.into_array())
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use vortex_array::Array;
246    use vortex_array::arrays::VarBinViewArray;
247    use vortex_dtype::{DType, Nullability};
248
249    use crate::Compressor;
250    use crate::string::StringCompressor;
251
252    #[test]
253    fn test_strings() {
254        let mut strings = Vec::new();
255        for _ in 0..1024 {
256            strings.push(Some("hello-world-1234"));
257        }
258        for _ in 0..1024 {
259            strings.push(Some("hello-world-56789"));
260        }
261        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
262
263        println!(
264            "original array: {}",
265            (&strings as &dyn Array).tree_display()
266        );
267
268        let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
269
270        println!("compression tree: {}", compressed.tree_display());
271    }
272}