1use vortex_array::arrays::{ConstantArray, VarBinArray, VarBinViewArray, VarBinViewVTable};
5use vortex_array::vtable::ValidityHelper;
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dict::builders::dict_encode;
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
11use vortex_utils::aliases::hash_set::HashSet;
12
13use crate::integer::IntCompressor;
14use crate::sample::sample;
15use crate::{
16 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
17 estimate_compression_ratio_with_sampling, integer,
18};
19
20#[derive(Clone, Debug)]
22pub struct StringStats {
23 src: VarBinViewArray,
24 estimated_distinct_count: u32,
25 value_count: u32,
26}
27
28#[allow(clippy::cast_possible_truncation)]
30fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
31 let views = strings.views();
32 let mut distinct = HashSet::with_capacity(views.len() / 2);
36 views.iter().for_each(|&view| {
37 let len_and_prefix = view.as_u128() as u64;
38 distinct.insert(len_and_prefix);
39 });
40
41 distinct
42 .len()
43 .try_into()
44 .vortex_expect("distinct count must fit in u32")
45}
46
47impl CompressorStats for StringStats {
48 type ArrayVTable = VarBinViewVTable;
49
50 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
51 let null_count = input
52 .statistics()
53 .compute_null_count()
54 .vortex_expect("null count");
55 let value_count = input.len() - null_count;
56 let estimated_distinct = if opts.count_distinct_values {
57 estimate_distinct_count(input)
58 } else {
59 u32::MAX
60 };
61
62 Self {
63 src: input.clone(),
64 value_count: value_count.try_into().vortex_expect("value_count"),
65 estimated_distinct_count: estimated_distinct,
66 }
67 }
68
69 fn source(&self) -> &VarBinViewArray {
70 &self.src
71 }
72
73 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
74 let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
75
76 Self::generate_opts(&sampled, opts)
77 }
78}
79
80pub struct StringCompressor;
82
83impl Compressor for StringCompressor {
84 type ArrayVTable = VarBinViewVTable;
85 type SchemeType = dyn StringScheme;
86 type StatsType = StringStats;
87
88 fn schemes() -> &'static [&'static Self::SchemeType] {
89 &[
90 &UncompressedScheme,
91 &DictScheme,
92 &FSSTScheme,
93 &ConstantScheme,
94 ]
95 }
96
97 fn default_scheme() -> &'static Self::SchemeType {
98 &UncompressedScheme
99 }
100
101 fn dict_scheme_code() -> StringCode {
102 DICT_SCHEME
103 }
104}
105
106pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
107
108impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
109
110#[derive(Debug, Copy, Clone)]
111pub struct UncompressedScheme;
112
113#[derive(Debug, Copy, Clone)]
114pub struct DictScheme;
115
116#[derive(Debug, Copy, Clone)]
117pub struct FSSTScheme;
118
119#[derive(Debug, Copy, Clone)]
120pub struct ConstantScheme;
121
122#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
123pub struct StringCode(u8);
124
125const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
126const DICT_SCHEME: StringCode = StringCode(1);
127const FSST_SCHEME: StringCode = StringCode(2);
128const CONSTANT_SCHEME: StringCode = StringCode(3);
129
130impl Scheme for UncompressedScheme {
131 type StatsType = StringStats;
132 type CodeType = StringCode;
133
134 fn code(&self) -> StringCode {
135 UNCOMPRESSED_SCHEME
136 }
137
138 fn expected_compression_ratio(
139 &self,
140 _stats: &Self::StatsType,
141 _is_sample: bool,
142 _allowed_cascading: usize,
143 _excludes: &[StringCode],
144 ) -> VortexResult<f64> {
145 Ok(1.0)
146 }
147
148 fn compress(
149 &self,
150 stats: &Self::StatsType,
151 _is_sample: bool,
152 _allowed_cascading: usize,
153 _excludes: &[StringCode],
154 ) -> VortexResult<ArrayRef> {
155 Ok(stats.source().to_array())
156 }
157}
158
159impl Scheme for DictScheme {
160 type StatsType = StringStats;
161 type CodeType = StringCode;
162
163 fn code(&self) -> StringCode {
164 DICT_SCHEME
165 }
166
167 fn expected_compression_ratio(
168 &self,
169 stats: &Self::StatsType,
170 is_sample: bool,
171 allowed_cascading: usize,
172 excludes: &[StringCode],
173 ) -> VortexResult<f64> {
174 if stats.estimated_distinct_count > stats.value_count / 2 {
176 return Ok(0.0);
177 }
178
179 if stats.value_count == 0 {
181 return Ok(0.0);
182 }
183
184 estimate_compression_ratio_with_sampling(
185 self,
186 stats,
187 is_sample,
188 allowed_cascading,
189 excludes,
190 )
191 }
192
193 fn compress(
194 &self,
195 stats: &Self::StatsType,
196 is_sample: bool,
197 allowed_cascading: usize,
198 _excludes: &[StringCode],
199 ) -> VortexResult<ArrayRef> {
200 let dict = dict_encode(&stats.source().clone().into_array())?;
201
202 if allowed_cascading == 0 {
204 return Ok(dict.into_array());
205 }
206
207 let compressed_codes = IntCompressor::compress(
209 &dict.codes().to_primitive(),
210 is_sample,
211 allowed_cascading - 1,
212 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
213 )?;
214
215 let compressed_values = StringCompressor::compress(
218 &dict.values().to_varbinview(),
219 is_sample,
220 allowed_cascading - 1,
221 &[DictScheme.code()],
222 )?;
223
224 unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
226 }
227}
228
229impl Scheme for FSSTScheme {
230 type StatsType = StringStats;
231 type CodeType = StringCode;
232
233 fn code(&self) -> StringCode {
234 FSST_SCHEME
235 }
236
237 fn compress(
238 &self,
239 stats: &Self::StatsType,
240 is_sample: bool,
241 allowed_cascading: usize,
242 _excludes: &[StringCode],
243 ) -> VortexResult<ArrayRef> {
244 let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
245 let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
246
247 let compressed_original_lengths = IntCompressor::compress(
248 &fsst.uncompressed_lengths().to_primitive().downcast()?,
249 is_sample,
250 allowed_cascading,
251 &[],
252 )?;
253
254 let compressed_codes_offsets = IntCompressor::compress(
255 &fsst.codes().offsets().to_primitive().downcast()?,
256 is_sample,
257 allowed_cascading,
258 &[],
259 )?;
260 let compressed_codes = VarBinArray::try_new(
261 compressed_codes_offsets,
262 fsst.codes().bytes().clone(),
263 fsst.codes().dtype().clone(),
264 fsst.codes().validity().clone(),
265 )?;
266
267 let fsst = FSSTArray::try_new(
268 fsst.dtype().clone(),
269 fsst.symbols().clone(),
270 fsst.symbol_lengths().clone(),
271 compressed_codes,
272 compressed_original_lengths,
273 )?;
274
275 Ok(fsst.into_array())
276 }
277}
278
279impl Scheme for ConstantScheme {
280 type StatsType = StringStats;
281 type CodeType = StringCode;
282
283 fn code(&self) -> Self::CodeType {
284 CONSTANT_SCHEME
285 }
286
287 fn is_constant(&self) -> bool {
288 true
289 }
290
291 fn expected_compression_ratio(
292 &self,
293 stats: &Self::StatsType,
294 is_sample: bool,
295 _allowed_cascading: usize,
296 _excludes: &[Self::CodeType],
297 ) -> VortexResult<f64> {
298 if is_sample {
299 return Ok(0.0);
300 }
301
302 if stats.src.is_constant() {
303 Ok(f64::MAX)
305 } else {
306 Ok(0.0)
307 }
308 }
309
310 fn compress(
311 &self,
312 stats: &Self::StatsType,
313 _is_sample: bool,
314 _allowed_cascading: usize,
315 _excludes: &[Self::CodeType],
316 ) -> VortexResult<ArrayRef> {
317 let scalar = stats
318 .src
319 .as_constant()
320 .vortex_expect("ConstantScheme::compress can only be called when array is constant");
321
322 Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use vortex_array::arrays::VarBinViewArray;
329 use vortex_dtype::{DType, Nullability};
330
331 use crate::Compressor;
332 use crate::string::StringCompressor;
333
334 #[test]
335 fn test_strings() {
336 let mut strings = Vec::new();
337 for _ in 0..1024 {
338 strings.push(Some("hello-world-1234"));
339 }
340 for _ in 0..1024 {
341 strings.push(Some("hello-world-56789"));
342 }
343 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
344
345 println!("original array: {}", strings.as_ref().display_tree());
346
347 let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
348
349 println!("compression tree: {}", compressed.display_tree());
350 }
351}