vortex_btrblocks/
string.rs1use vortex_array::aliases::hash_set::HashSet;
2use vortex_array::arrays::VarBinViewArray;
3use vortex_array::{Array, ArrayRef, ToCanonical};
4use vortex_dict::DictArray;
5use vortex_dict::builders::dict_encode;
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_fsst::{fsst_compress, fsst_train_compressor};
8
9use crate::downscale::downscale_integer_array;
10use crate::integer::IntCompressor;
11use crate::sample::sample;
12use crate::{
13 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
14 estimate_compression_ratio_with_sampling,
15};
16
17#[derive(Clone)]
18pub struct StringStats {
19 src: VarBinViewArray,
20 estimated_distinct_count: u32,
21 value_count: u32,
22 }
24
25#[allow(clippy::cast_possible_truncation)]
27fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
28 let views = strings.views();
29 let mut disinct = HashSet::with_capacity(views.len() / 2);
33 views.iter().for_each(|&view| {
34 let view_u128: u128 = unsafe { std::mem::transmute(view) };
36 let len_and_prefix = view_u128 as u64;
37 disinct.insert(len_and_prefix);
38 });
39
40 disinct
41 .len()
42 .try_into()
43 .vortex_expect("distinct count must fit in u32")
44}
45
46impl CompressorStats for StringStats {
47 type ArrayType = VarBinViewArray;
48
49 fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self {
50 let null_count = input
51 .validity()
52 .null_count(input.len())
53 .vortex_expect("null_count");
54 let value_count = input.len() - null_count;
55 let estimated_distinct = if opts.count_distinct_values {
56 estimate_distinct_count(input)
57 } else {
58 u32::MAX
59 };
60
61 Self {
62 src: input.clone(),
63 value_count: value_count.try_into().vortex_expect("value_count"),
64 estimated_distinct_count: estimated_distinct,
66 }
67 }
68
69 fn source(&self) -> &Self::ArrayType {
70 &self.src
71 }
72
73 fn sample_opts(&self, sample_size: u16, sample_count: u16, opts: GenerateStatsOptions) -> Self {
74 let sampled = sample(self.src.clone(), sample_size, sample_count)
75 .to_varbinview()
76 .vortex_expect("varbinview");
77
78 Self::generate_opts(&sampled, opts)
79 }
80}
81
82pub struct StringCompressor;
83
84impl Compressor for StringCompressor {
85 type ArrayType = VarBinViewArray;
86 type SchemeType = dyn StringScheme;
87 type StatsType = StringStats;
88
89 fn schemes() -> &'static [&'static Self::SchemeType] {
90 &[&UncompressedScheme, &DictScheme, &FSSTScheme]
91 }
92
93 fn default_scheme() -> &'static Self::SchemeType {
94 &UncompressedScheme
95 }
96
97 fn dict_scheme_code() -> StringCode {
98 DICT_SCHEME
99 }
100}
101
102pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
103
104impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
105
106#[derive(Debug, Copy, Clone)]
107pub struct UncompressedScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct DictScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct FSSTScheme;
114
115#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
116pub struct StringCode(u8);
117
118const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
119const DICT_SCHEME: StringCode = StringCode(1);
120const FSST_SCHEME: StringCode = StringCode(2);
121
122impl Scheme for UncompressedScheme {
123 type StatsType = StringStats;
124 type CodeType = StringCode;
125
126 fn code(&self) -> StringCode {
127 UNCOMPRESSED_SCHEME
128 }
129
130 fn expected_compression_ratio(
131 &self,
132 _stats: &Self::StatsType,
133 _is_sample: bool,
134 _allowed_cascading: usize,
135 _excludes: &[StringCode],
136 ) -> VortexResult<f64> {
137 Ok(1.0)
138 }
139
140 fn compress(
141 &self,
142 stats: &Self::StatsType,
143 _is_sample: bool,
144 _allowed_cascading: usize,
145 _excludes: &[StringCode],
146 ) -> VortexResult<ArrayRef> {
147 Ok(stats.source().clone().into_array())
148 }
149}
150
151impl Scheme for DictScheme {
152 type StatsType = StringStats;
153 type CodeType = StringCode;
154
155 fn code(&self) -> StringCode {
156 DICT_SCHEME
157 }
158
159 fn expected_compression_ratio(
160 &self,
161 stats: &Self::StatsType,
162 is_sample: bool,
163 allowed_cascading: usize,
164 excludes: &[StringCode],
165 ) -> VortexResult<f64> {
166 if stats.estimated_distinct_count > stats.value_count / 2 {
168 return Ok(0.0);
169 }
170
171 if stats.value_count == 0 {
173 return Ok(0.0);
174 }
175
176 estimate_compression_ratio_with_sampling(
177 self,
178 stats,
179 is_sample,
180 allowed_cascading,
181 excludes,
182 )
183 }
184
185 fn compress(
186 &self,
187 stats: &Self::StatsType,
188 is_sample: bool,
189 allowed_cascading: usize,
190 _excludes: &[StringCode],
191 ) -> VortexResult<ArrayRef> {
192 let dict = dict_encode(&stats.source().clone().into_array())?;
193
194 if allowed_cascading == 0 {
196 return Ok(dict.into_array());
197 }
198
199 let downscaled_codes = downscale_integer_array(dict.codes().to_array())?.to_primitive()?;
201 let compressed_codes = IntCompressor::compress(
202 &downscaled_codes,
203 is_sample,
204 allowed_cascading - 1,
205 &[crate::integer::DictScheme.code()],
206 )?;
207
208 let compressed_values = StringCompressor::compress(
211 &dict.values().to_varbinview()?,
212 is_sample,
213 allowed_cascading - 1,
214 &[DictScheme.code()],
215 )?;
216
217 Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
218 }
219}
220
221impl Scheme for FSSTScheme {
222 type StatsType = StringStats;
223 type CodeType = StringCode;
224
225 fn code(&self) -> StringCode {
226 FSST_SCHEME
227 }
228
229 fn compress(
230 &self,
231 stats: &Self::StatsType,
232 _is_sample: bool,
233 _allowed_cascading: usize,
234 _excludes: &[StringCode],
235 ) -> VortexResult<ArrayRef> {
236 let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
237 let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
238
239 Ok(fsst.into_array())
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use vortex_array::Array;
246 use vortex_array::arrays::VarBinViewArray;
247 use vortex_dtype::{DType, Nullability};
248
249 use crate::Compressor;
250 use crate::string::StringCompressor;
251
252 #[test]
253 fn test_strings() {
254 let mut strings = Vec::new();
255 for _ in 0..1024 {
256 strings.push(Some("hello-world-1234"));
257 }
258 for _ in 0..1024 {
259 strings.push(Some("hello-world-56789"));
260 }
261 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
262
263 println!(
264 "original array: {}",
265 (&strings as &dyn Array).tree_display()
266 );
267
268 let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
269
270 println!("compression tree: {}", compressed.tree_display());
271 }
272}