1use vortex_array::ArrayRef;
5use vortex_array::IntoArray;
6use vortex_array::ToCanonical;
7use vortex_array::arrays::ConstantArray;
8use vortex_array::arrays::DictArray;
9use vortex_array::arrays::MaskedArray;
10use vortex_array::arrays::VarBinArray;
11use vortex_array::arrays::VarBinViewArray;
12use vortex_array::arrays::VarBinViewVTable;
13use vortex_array::builders::dict::dict_encode;
14use vortex_array::vtable::ValidityHelper;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_fsst::FSSTArray;
18use vortex_fsst::fsst_compress;
19use vortex_fsst::fsst_train_compressor;
20use vortex_scalar::Scalar;
21use vortex_sparse::SparseArray;
22use vortex_sparse::SparseVTable;
23use vortex_utils::aliases::hash_set::HashSet;
24
25use crate::Compressor;
26use crate::CompressorStats;
27use crate::GenerateStatsOptions;
28use crate::Scheme;
29use crate::estimate_compression_ratio_with_sampling;
30use crate::integer;
31use crate::integer::IntCompressor;
32use crate::sample::sample;
33
34#[derive(Clone, Debug)]
36pub struct StringStats {
37 src: VarBinViewArray,
38 estimated_distinct_count: u32,
39 value_count: u32,
40 null_count: u32,
41}
42
43fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
45 let views = strings.views();
46 let mut distinct = HashSet::with_capacity(views.len() / 2);
50 views.iter().for_each(|&view| {
51 #[expect(
52 clippy::cast_possible_truncation,
53 reason = "approximate uniqueness with view prefix"
54 )]
55 let len_and_prefix = view.as_u128() as u64;
56 distinct.insert(len_and_prefix);
57 });
58
59 distinct
60 .len()
61 .try_into()
62 .vortex_expect("distinct count must fit in u32")
63}
64
65impl CompressorStats for StringStats {
66 type ArrayVTable = VarBinViewVTable;
67
68 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
69 let null_count = input
70 .statistics()
71 .compute_null_count()
72 .vortex_expect("null count");
73 let value_count = input.len() - null_count;
74 let estimated_distinct = if opts.count_distinct_values {
75 estimate_distinct_count(input)
76 } else {
77 u32::MAX
78 };
79
80 Self {
81 src: input.clone(),
82 value_count: value_count.try_into().vortex_expect("value_count"),
83 null_count: null_count.try_into().vortex_expect("null_count"),
84 estimated_distinct_count: estimated_distinct,
85 }
86 }
87
88 fn source(&self) -> &VarBinViewArray {
89 &self.src
90 }
91
92 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
93 let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
94
95 Self::generate_opts(&sampled, opts)
96 }
97}
98
99pub struct StringCompressor;
101
102impl Compressor for StringCompressor {
103 type ArrayVTable = VarBinViewVTable;
104 type SchemeType = dyn StringScheme;
105 type StatsType = StringStats;
106
107 fn schemes() -> &'static [&'static Self::SchemeType] {
108 &[
109 &UncompressedScheme,
110 &DictScheme,
111 &FSSTScheme,
112 &ConstantScheme,
113 &NullDominated,
114 ]
115 }
116
117 fn default_scheme() -> &'static Self::SchemeType {
118 &UncompressedScheme
119 }
120
121 fn dict_scheme_code() -> StringCode {
122 DICT_SCHEME
123 }
124}
125
126pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
127
128impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
129
130#[derive(Debug, Copy, Clone)]
131pub struct UncompressedScheme;
132
133#[derive(Debug, Copy, Clone)]
134pub struct DictScheme;
135
136#[derive(Debug, Copy, Clone)]
137pub struct FSSTScheme;
138
139#[derive(Debug, Copy, Clone)]
140pub struct ConstantScheme;
141
142#[derive(Debug, Copy, Clone)]
143pub struct NullDominated;
144
145#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
146pub struct StringCode(u8);
147
148const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
149const DICT_SCHEME: StringCode = StringCode(1);
150const FSST_SCHEME: StringCode = StringCode(2);
151const CONSTANT_SCHEME: StringCode = StringCode(3);
152
153const SPARSE_SCHEME: StringCode = StringCode(4);
154
155impl Scheme for UncompressedScheme {
156 type StatsType = StringStats;
157 type CodeType = StringCode;
158
159 fn code(&self) -> StringCode {
160 UNCOMPRESSED_SCHEME
161 }
162
163 fn expected_compression_ratio(
164 &self,
165 _stats: &Self::StatsType,
166 _is_sample: bool,
167 _allowed_cascading: usize,
168 _excludes: &[StringCode],
169 ) -> VortexResult<f64> {
170 Ok(1.0)
171 }
172
173 fn compress(
174 &self,
175 stats: &Self::StatsType,
176 _is_sample: bool,
177 _allowed_cascading: usize,
178 _excludes: &[StringCode],
179 ) -> VortexResult<ArrayRef> {
180 Ok(stats.source().to_array())
181 }
182}
183
184impl Scheme for DictScheme {
185 type StatsType = StringStats;
186 type CodeType = StringCode;
187
188 fn code(&self) -> StringCode {
189 DICT_SCHEME
190 }
191
192 fn expected_compression_ratio(
193 &self,
194 stats: &Self::StatsType,
195 is_sample: bool,
196 allowed_cascading: usize,
197 excludes: &[StringCode],
198 ) -> VortexResult<f64> {
199 if stats.estimated_distinct_count > stats.value_count / 2 {
201 return Ok(0.0);
202 }
203
204 if stats.value_count == 0 {
206 return Ok(0.0);
207 }
208
209 estimate_compression_ratio_with_sampling(
210 self,
211 stats,
212 is_sample,
213 allowed_cascading,
214 excludes,
215 )
216 }
217
218 fn compress(
219 &self,
220 stats: &Self::StatsType,
221 is_sample: bool,
222 allowed_cascading: usize,
223 _excludes: &[StringCode],
224 ) -> VortexResult<ArrayRef> {
225 let dict = dict_encode(&stats.source().clone().into_array())?;
226
227 if allowed_cascading == 0 {
229 return Ok(dict.into_array());
230 }
231
232 let compressed_codes = IntCompressor::compress(
234 &dict.codes().to_primitive(),
235 is_sample,
236 allowed_cascading - 1,
237 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
238 )?;
239
240 let compressed_values = StringCompressor::compress(
243 &dict.values().to_varbinview(),
244 is_sample,
245 allowed_cascading - 1,
246 &[DictScheme.code()],
247 )?;
248
249 unsafe {
251 Ok(
252 DictArray::new_unchecked(compressed_codes, compressed_values)
253 .set_all_values_referenced(dict.has_all_values_referenced())
254 .into_array(),
255 )
256 }
257 }
258}
259
260impl Scheme for FSSTScheme {
261 type StatsType = StringStats;
262 type CodeType = StringCode;
263
264 fn code(&self) -> StringCode {
265 FSST_SCHEME
266 }
267
268 fn compress(
269 &self,
270 stats: &Self::StatsType,
271 is_sample: bool,
272 allowed_cascading: usize,
273 _excludes: &[StringCode],
274 ) -> VortexResult<ArrayRef> {
275 let compressor = fsst_train_compressor(&stats.src);
276 let fsst = fsst_compress(&stats.src, &compressor);
277
278 let compressed_original_lengths = IntCompressor::compress(
279 &fsst.uncompressed_lengths().to_primitive().narrow()?,
280 is_sample,
281 allowed_cascading,
282 &[],
283 )?;
284
285 let compressed_codes_offsets = IntCompressor::compress(
286 &fsst.codes().offsets().to_primitive().narrow()?,
287 is_sample,
288 allowed_cascading,
289 &[],
290 )?;
291 let compressed_codes = VarBinArray::try_new(
292 compressed_codes_offsets,
293 fsst.codes().bytes().clone(),
294 fsst.codes().dtype().clone(),
295 fsst.codes().validity().clone(),
296 )?;
297
298 let fsst = FSSTArray::try_new(
299 fsst.dtype().clone(),
300 fsst.symbols().clone(),
301 fsst.symbol_lengths().clone(),
302 compressed_codes,
303 compressed_original_lengths,
304 )?;
305
306 Ok(fsst.into_array())
307 }
308}
309
310impl Scheme for ConstantScheme {
311 type StatsType = StringStats;
312 type CodeType = StringCode;
313
314 fn code(&self) -> Self::CodeType {
315 CONSTANT_SCHEME
316 }
317
318 fn is_constant(&self) -> bool {
319 true
320 }
321
322 fn expected_compression_ratio(
323 &self,
324 stats: &Self::StatsType,
325 is_sample: bool,
326 _allowed_cascading: usize,
327 _excludes: &[Self::CodeType],
328 ) -> VortexResult<f64> {
329 if is_sample {
330 return Ok(0.0);
331 }
332
333 if stats.estimated_distinct_count > 1 || !stats.src.is_constant() {
334 return Ok(0.0);
335 }
336
337 Ok(f64::MAX)
339 }
340
341 fn compress(
342 &self,
343 stats: &Self::StatsType,
344 _is_sample: bool,
345 _allowed_cascading: usize,
346 _excludes: &[Self::CodeType],
347 ) -> VortexResult<ArrayRef> {
348 let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
349
350 match scalar_idx {
351 Some(idx) => {
352 let scalar = stats.source().scalar_at(idx);
353 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
354 if !stats.source().all_valid() {
355 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
356 } else {
357 Ok(const_arr)
358 }
359 }
360 None => Ok(ConstantArray::new(
361 Scalar::null(stats.src.dtype().clone()),
362 stats.src.len(),
363 )
364 .into_array()),
365 }
366 }
367}
368
369impl Scheme for NullDominated {
370 type StatsType = StringStats;
371 type CodeType = StringCode;
372
373 fn code(&self) -> Self::CodeType {
374 SPARSE_SCHEME
375 }
376
377 fn expected_compression_ratio(
378 &self,
379 stats: &Self::StatsType,
380 _is_sample: bool,
381 allowed_cascading: usize,
382 _excludes: &[Self::CodeType],
383 ) -> VortexResult<f64> {
384 if allowed_cascading == 0 {
386 return Ok(0.0);
387 }
388
389 if stats.value_count == 0 {
390 return Ok(0.0);
392 }
393
394 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
396 return Ok(stats.src.len() as f64 / stats.value_count as f64);
397 }
398
399 Ok(0.0)
401 }
402
403 fn compress(
404 &self,
405 stats: &Self::StatsType,
406 is_sample: bool,
407 allowed_cascading: usize,
408 _excludes: &[Self::CodeType],
409 ) -> VortexResult<ArrayRef> {
410 assert!(allowed_cascading > 0);
411
412 let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
414
415 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
416 let new_excludes = vec![integer::SparseScheme.code()];
418
419 let indices = sparse.patches().indices().to_primitive().narrow()?;
421 let compressed_indices = IntCompressor::compress_no_dict(
422 &indices,
423 is_sample,
424 allowed_cascading - 1,
425 &new_excludes,
426 )?;
427
428 SparseArray::try_new(
429 compressed_indices,
430 sparse.patches().values().clone(),
431 sparse.len(),
432 sparse.fill_scalar().clone(),
433 )
434 .map(|a| a.into_array())
435 } else {
436 Ok(sparse_encoded)
437 }
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use vortex_array::arrays::VarBinViewArray;
444 use vortex_array::builders::ArrayBuilder;
445 use vortex_array::builders::VarBinViewBuilder;
446 use vortex_dtype::DType;
447 use vortex_dtype::Nullability;
448 use vortex_sparse::SparseVTable;
449
450 use crate::Compressor;
451 use crate::MAX_CASCADE;
452 use crate::string::StringCompressor;
453
454 #[test]
455 fn test_strings() {
456 let mut strings = Vec::new();
457 for _ in 0..1024 {
458 strings.push(Some("hello-world-1234"));
459 }
460 for _ in 0..1024 {
461 strings.push(Some("hello-world-56789"));
462 }
463 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
464
465 println!("original array: {}", strings.as_ref().display_tree());
466
467 let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
468
469 println!("compression tree: {}", compressed.display_tree());
470 }
471
472 #[test]
473 fn test_sparse_nulls() {
474 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
475 strings.append_nulls(99);
476
477 strings.append_value("one little string");
478
479 let strings = strings.finish_into_varbinview();
480
481 let compressed = StringCompressor::compress(&strings, false, MAX_CASCADE, &[]).unwrap();
482 assert!(compressed.is::<SparseVTable>());
483 }
484}