1use vortex_array::arrays::{
5 ConstantArray, DictArray, MaskedArray, VarBinArray, VarBinViewArray, VarBinViewVTable,
6};
7use vortex_array::builders::dict::dict_encode;
8use vortex_array::vtable::ValidityHelper;
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
12use vortex_scalar::Scalar;
13use vortex_sparse::{SparseArray, SparseVTable};
14use vortex_utils::aliases::hash_set::HashSet;
15
16use crate::integer::IntCompressor;
17use crate::sample::sample;
18use crate::{
19 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20 estimate_compression_ratio_with_sampling, integer,
21};
22
23#[derive(Clone, Debug)]
25pub struct StringStats {
26 src: VarBinViewArray,
27 estimated_distinct_count: u32,
28 value_count: u32,
29 null_count: u32,
30}
31
32#[allow(clippy::cast_possible_truncation)]
34fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
35 let views = strings.views();
36 let mut distinct = HashSet::with_capacity(views.len() / 2);
40 views.iter().for_each(|&view| {
41 let len_and_prefix = view.as_u128() as u64;
42 distinct.insert(len_and_prefix);
43 });
44
45 distinct
46 .len()
47 .try_into()
48 .vortex_expect("distinct count must fit in u32")
49}
50
51impl CompressorStats for StringStats {
52 type ArrayVTable = VarBinViewVTable;
53
54 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
55 let null_count = input
56 .statistics()
57 .compute_null_count()
58 .vortex_expect("null count");
59 let value_count = input.len() - null_count;
60 let estimated_distinct = if opts.count_distinct_values {
61 estimate_distinct_count(input)
62 } else {
63 u32::MAX
64 };
65
66 Self {
67 src: input.clone(),
68 value_count: value_count.try_into().vortex_expect("value_count"),
69 null_count: null_count.try_into().vortex_expect("null_count"),
70 estimated_distinct_count: estimated_distinct,
71 }
72 }
73
74 fn source(&self) -> &VarBinViewArray {
75 &self.src
76 }
77
78 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
79 let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
80
81 Self::generate_opts(&sampled, opts)
82 }
83}
84
85pub struct StringCompressor;
87
88impl Compressor for StringCompressor {
89 type ArrayVTable = VarBinViewVTable;
90 type SchemeType = dyn StringScheme;
91 type StatsType = StringStats;
92
93 fn schemes() -> &'static [&'static Self::SchemeType] {
94 &[
95 &UncompressedScheme,
96 &DictScheme,
97 &FSSTScheme,
98 &ConstantScheme,
99 &NullDominated,
100 ]
101 }
102
103 fn default_scheme() -> &'static Self::SchemeType {
104 &UncompressedScheme
105 }
106
107 fn dict_scheme_code() -> StringCode {
108 DICT_SCHEME
109 }
110}
111
112pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
113
114impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
115
116#[derive(Debug, Copy, Clone)]
117pub struct UncompressedScheme;
118
119#[derive(Debug, Copy, Clone)]
120pub struct DictScheme;
121
122#[derive(Debug, Copy, Clone)]
123pub struct FSSTScheme;
124
125#[derive(Debug, Copy, Clone)]
126pub struct ConstantScheme;
127
128#[derive(Debug, Copy, Clone)]
129pub struct NullDominated;
130
131#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
132pub struct StringCode(u8);
133
134const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
135const DICT_SCHEME: StringCode = StringCode(1);
136const FSST_SCHEME: StringCode = StringCode(2);
137const CONSTANT_SCHEME: StringCode = StringCode(3);
138
139const SPARSE_SCHEME: StringCode = StringCode(4);
140
141impl Scheme for UncompressedScheme {
142 type StatsType = StringStats;
143 type CodeType = StringCode;
144
145 fn code(&self) -> StringCode {
146 UNCOMPRESSED_SCHEME
147 }
148
149 fn expected_compression_ratio(
150 &self,
151 _stats: &Self::StatsType,
152 _is_sample: bool,
153 _allowed_cascading: usize,
154 _excludes: &[StringCode],
155 ) -> VortexResult<f64> {
156 Ok(1.0)
157 }
158
159 fn compress(
160 &self,
161 stats: &Self::StatsType,
162 _is_sample: bool,
163 _allowed_cascading: usize,
164 _excludes: &[StringCode],
165 ) -> VortexResult<ArrayRef> {
166 Ok(stats.source().to_array())
167 }
168}
169
170impl Scheme for DictScheme {
171 type StatsType = StringStats;
172 type CodeType = StringCode;
173
174 fn code(&self) -> StringCode {
175 DICT_SCHEME
176 }
177
178 fn expected_compression_ratio(
179 &self,
180 stats: &Self::StatsType,
181 is_sample: bool,
182 allowed_cascading: usize,
183 excludes: &[StringCode],
184 ) -> VortexResult<f64> {
185 if stats.estimated_distinct_count > stats.value_count / 2 {
187 return Ok(0.0);
188 }
189
190 if stats.value_count == 0 {
192 return Ok(0.0);
193 }
194
195 estimate_compression_ratio_with_sampling(
196 self,
197 stats,
198 is_sample,
199 allowed_cascading,
200 excludes,
201 )
202 }
203
204 fn compress(
205 &self,
206 stats: &Self::StatsType,
207 is_sample: bool,
208 allowed_cascading: usize,
209 _excludes: &[StringCode],
210 ) -> VortexResult<ArrayRef> {
211 let dict = dict_encode(&stats.source().clone().into_array())?;
212
213 if allowed_cascading == 0 {
215 return Ok(dict.into_array());
216 }
217
218 let compressed_codes = IntCompressor::compress(
220 &dict.codes().to_primitive(),
221 is_sample,
222 allowed_cascading - 1,
223 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
224 )?;
225
226 let compressed_values = StringCompressor::compress(
229 &dict.values().to_varbinview(),
230 is_sample,
231 allowed_cascading - 1,
232 &[DictScheme.code()],
233 )?;
234
235 unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
237 }
238}
239
240impl Scheme for FSSTScheme {
241 type StatsType = StringStats;
242 type CodeType = StringCode;
243
244 fn code(&self) -> StringCode {
245 FSST_SCHEME
246 }
247
248 fn compress(
249 &self,
250 stats: &Self::StatsType,
251 is_sample: bool,
252 allowed_cascading: usize,
253 _excludes: &[StringCode],
254 ) -> VortexResult<ArrayRef> {
255 let compressor = fsst_train_compressor(&stats.src);
256 let fsst = fsst_compress(&stats.src, &compressor);
257
258 let compressed_original_lengths = IntCompressor::compress(
259 &fsst.uncompressed_lengths().to_primitive().narrow()?,
260 is_sample,
261 allowed_cascading,
262 &[],
263 )?;
264
265 let compressed_codes_offsets = IntCompressor::compress(
266 &fsst.codes().offsets().to_primitive().narrow()?,
267 is_sample,
268 allowed_cascading,
269 &[],
270 )?;
271 let compressed_codes = VarBinArray::try_new(
272 compressed_codes_offsets,
273 fsst.codes().bytes().clone(),
274 fsst.codes().dtype().clone(),
275 fsst.codes().validity().clone(),
276 )?;
277
278 let fsst = FSSTArray::try_new(
279 fsst.dtype().clone(),
280 fsst.symbols().clone(),
281 fsst.symbol_lengths().clone(),
282 compressed_codes,
283 compressed_original_lengths,
284 )?;
285
286 Ok(fsst.into_array())
287 }
288}
289
290impl Scheme for ConstantScheme {
291 type StatsType = StringStats;
292 type CodeType = StringCode;
293
294 fn code(&self) -> Self::CodeType {
295 CONSTANT_SCHEME
296 }
297
298 fn is_constant(&self) -> bool {
299 true
300 }
301
302 fn expected_compression_ratio(
303 &self,
304 stats: &Self::StatsType,
305 is_sample: bool,
306 _allowed_cascading: usize,
307 _excludes: &[Self::CodeType],
308 ) -> VortexResult<f64> {
309 if is_sample {
310 return Ok(0.0);
311 }
312
313 if stats.estimated_distinct_count > 1 || !stats.src.is_constant() {
314 return Ok(0.0);
315 }
316
317 Ok(f64::MAX)
319 }
320
321 fn compress(
322 &self,
323 stats: &Self::StatsType,
324 _is_sample: bool,
325 _allowed_cascading: usize,
326 _excludes: &[Self::CodeType],
327 ) -> VortexResult<ArrayRef> {
328 let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
329
330 match scalar_idx {
331 Some(idx) => {
332 let scalar = stats.source().scalar_at(idx);
333 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
334 if !stats.source().all_valid() {
335 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
336 } else {
337 Ok(const_arr)
338 }
339 }
340 None => Ok(ConstantArray::new(
341 Scalar::null(stats.src.dtype().clone()),
342 stats.src.len(),
343 )
344 .into_array()),
345 }
346 }
347}
348
349impl Scheme for NullDominated {
350 type StatsType = StringStats;
351 type CodeType = StringCode;
352
353 fn code(&self) -> Self::CodeType {
354 SPARSE_SCHEME
355 }
356
357 fn expected_compression_ratio(
358 &self,
359 stats: &Self::StatsType,
360 _is_sample: bool,
361 allowed_cascading: usize,
362 _excludes: &[Self::CodeType],
363 ) -> VortexResult<f64> {
364 if allowed_cascading == 0 {
366 return Ok(0.0);
367 }
368
369 if stats.value_count == 0 {
370 return Ok(0.0);
372 }
373
374 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
376 return Ok(stats.src.len() as f64 / stats.value_count as f64);
377 }
378
379 Ok(0.0)
381 }
382
383 fn compress(
384 &self,
385 stats: &Self::StatsType,
386 is_sample: bool,
387 allowed_cascading: usize,
388 _excludes: &[Self::CodeType],
389 ) -> VortexResult<ArrayRef> {
390 assert!(allowed_cascading > 0);
391
392 let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
394
395 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
396 let new_excludes = vec![integer::SparseScheme.code()];
398
399 let indices = sparse.patches().indices().to_primitive().narrow()?;
401 let compressed_indices = IntCompressor::compress_no_dict(
402 &indices,
403 is_sample,
404 allowed_cascading - 1,
405 &new_excludes,
406 )?;
407
408 SparseArray::try_new(
409 compressed_indices,
410 sparse.patches().values().clone(),
411 sparse.len(),
412 sparse.fill_scalar().clone(),
413 )
414 .map(|a| a.into_array())
415 } else {
416 Ok(sparse_encoded)
417 }
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use vortex_array::arrays::VarBinViewArray;
424 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
425 use vortex_dtype::{DType, Nullability};
426 use vortex_sparse::SparseEncoding;
427
428 use crate::string::StringCompressor;
429 use crate::{Compressor, MAX_CASCADE};
430
431 #[test]
432 fn test_strings() {
433 let mut strings = Vec::new();
434 for _ in 0..1024 {
435 strings.push(Some("hello-world-1234"));
436 }
437 for _ in 0..1024 {
438 strings.push(Some("hello-world-56789"));
439 }
440 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
441
442 println!("original array: {}", strings.as_ref().display_tree());
443
444 let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
445
446 println!("compression tree: {}", compressed.display_tree());
447 }
448
449 #[test]
450 fn test_sparse_nulls() {
451 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
452 strings.append_nulls(99);
453
454 strings.append_value("one little string");
455
456 let strings = strings.finish_into_varbinview();
457
458 let compressed = StringCompressor::compress(&strings, false, MAX_CASCADE, &[]).unwrap();
459 assert_eq!(compressed.encoding_id(), SparseEncoding.id());
460 }
461}