1use std::hash::Hash;
5use std::hash::Hasher;
6
7use enum_iterator::Sequence;
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_array::ToCanonical;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::DictArray;
14use vortex_array::arrays::MaskedArray;
15use vortex_array::arrays::VarBinArray;
16use vortex_array::arrays::VarBinView;
17use vortex_array::arrays::VarBinViewArray;
18use vortex_array::builders::dict::dict_encode;
19use vortex_array::compute::is_constant;
20use vortex_array::scalar::Scalar;
21use vortex_array::vtable::VTable;
22use vortex_array::vtable::ValidityHelper;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_error::vortex_err;
26use vortex_fsst::FSSTArray;
27use vortex_fsst::fsst_compress;
28use vortex_fsst::fsst_train_compressor;
29use vortex_sparse::Sparse;
30use vortex_sparse::SparseArray;
31use vortex_utils::aliases::hash_set::HashSet;
32
33use super::integer::DictScheme as IntDictScheme;
34use super::integer::SequenceScheme as IntSequenceScheme;
35use super::integer::SparseScheme as IntSparseScheme;
36use crate::BtrBlocksCompressor;
37use crate::CanonicalCompressor;
38use crate::Compressor;
39use crate::CompressorContext;
40use crate::CompressorStats;
41use crate::Excludes;
42use crate::GenerateStatsOptions;
43use crate::IntCode;
44use crate::Scheme;
45use crate::SchemeExt;
46use crate::sample::sample;
47
48#[derive(Clone, Debug)]
50pub struct StringStats {
51 src: VarBinViewArray,
52 estimated_distinct_count: u32,
53 value_count: u32,
54 null_count: u32,
55}
56
57fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
59 let views = strings.views();
60 let mut distinct = HashSet::with_capacity(views.len() / 2);
64 views.iter().for_each(|&view| {
65 #[expect(
66 clippy::cast_possible_truncation,
67 reason = "approximate uniqueness with view prefix"
68 )]
69 let len_and_prefix = view.as_u128() as u64;
70 distinct.insert(len_and_prefix);
71 });
72
73 Ok(u32::try_from(distinct.len())?)
74}
75
76impl StringStats {
77 fn generate_opts_fallible(
78 input: &VarBinViewArray,
79 opts: GenerateStatsOptions,
80 ) -> VortexResult<Self> {
81 let null_count = input
82 .statistics()
83 .compute_null_count()
84 .ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
85 let value_count = input.len() - null_count;
86 let estimated_distinct = if opts.count_distinct_values {
87 estimate_distinct_count(input)?
88 } else {
89 u32::MAX
90 };
91
92 Ok(Self {
93 src: input.clone(),
94 value_count: u32::try_from(value_count)?,
95 null_count: u32::try_from(null_count)?,
96 estimated_distinct_count: estimated_distinct,
97 })
98 }
99}
100
101impl CompressorStats for StringStats {
102 type ArrayVTable = VarBinView;
103
104 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
105 Self::generate_opts_fallible(input, opts)
106 .vortex_expect("StringStats::generate_opts should not fail")
107 }
108
109 fn source(&self) -> &VarBinViewArray {
110 &self.src
111 }
112
113 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
114 let sampled =
115 sample(&self.src.clone().into_array(), sample_size, sample_count).to_varbinview();
116
117 Self::generate_opts(&sampled, opts)
118 }
119}
120
121pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
123 &UncompressedScheme,
124 &DictScheme,
125 &FSSTScheme,
126 &ConstantScheme,
127 &NullDominated,
128 #[cfg(feature = "zstd")]
129 &ZstdScheme,
130 #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
131 &ZstdBuffersScheme,
132];
133
134#[derive(Clone, Copy)]
136pub struct StringCompressor<'a> {
137 pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
139}
140
141impl<'a> Compressor for StringCompressor<'a> {
142 type ArrayVTable = VarBinView;
143 type SchemeType = dyn StringScheme;
144 type StatsType = StringStats;
145
146 fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
147 if self
148 .btr_blocks_compressor
149 .string_schemes()
150 .iter()
151 .any(|s| s.code() == DictScheme.code())
152 {
153 StringStats::generate_opts(
154 array,
155 GenerateStatsOptions {
156 count_distinct_values: true,
157 },
158 )
159 } else {
160 StringStats::generate_opts(
161 array,
162 GenerateStatsOptions {
163 count_distinct_values: false,
164 },
165 )
166 }
167 }
168
169 fn schemes(&self) -> &[&'static dyn StringScheme] {
170 self.btr_blocks_compressor.string_schemes()
171 }
172
173 fn default_scheme(&self) -> &'static Self::SchemeType {
174 &UncompressedScheme
175 }
176}
177
178pub trait StringScheme:
179 Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
180{
181}
182
183impl<T> StringScheme for T where
184 T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
185{
186}
187
188impl PartialEq for dyn StringScheme {
189 fn eq(&self, other: &Self) -> bool {
190 self.code() == other.code()
191 }
192}
193
194impl Eq for dyn StringScheme {}
195
196impl Hash for dyn StringScheme {
197 fn hash<H: Hasher>(&self, state: &mut H) {
198 self.code().hash(state)
199 }
200}
201
202#[derive(Debug, Copy, Clone, PartialEq, Eq)]
203pub struct UncompressedScheme;
204
205#[derive(Debug, Copy, Clone, PartialEq, Eq)]
206pub struct DictScheme;
207
208#[derive(Debug, Copy, Clone, PartialEq, Eq)]
209pub struct FSSTScheme;
210
211#[derive(Debug, Copy, Clone, PartialEq, Eq)]
212pub struct ConstantScheme;
213
214#[derive(Debug, Copy, Clone, PartialEq, Eq)]
215pub struct NullDominated;
216
217#[cfg(feature = "zstd")]
219#[derive(Debug, Copy, Clone, PartialEq, Eq)]
220pub struct ZstdScheme;
221
222#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
224#[derive(Debug, Copy, Clone, PartialEq, Eq)]
225pub struct ZstdBuffersScheme;
226
227#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
229pub enum StringCode {
230 Uncompressed,
232 Dict,
234 Fsst,
236 Constant,
238 Sparse,
240 Zstd,
242 ZstdBuffers,
244}
245
246impl Scheme for UncompressedScheme {
247 type StatsType = StringStats;
248 type CodeType = StringCode;
249
250 fn code(&self) -> StringCode {
251 StringCode::Uncompressed
252 }
253
254 fn expected_compression_ratio(
255 &self,
256 _compressor: &BtrBlocksCompressor,
257 _stats: &Self::StatsType,
258 _ctx: CompressorContext,
259 _excludes: &[StringCode],
260 ) -> VortexResult<f64> {
261 Ok(1.0)
262 }
263
264 fn compress(
265 &self,
266 _compressor: &BtrBlocksCompressor,
267 stats: &Self::StatsType,
268 _ctx: CompressorContext,
269 _excludes: &[StringCode],
270 ) -> VortexResult<ArrayRef> {
271 Ok(stats.source().clone().into_array())
272 }
273}
274
275impl Scheme for DictScheme {
276 type StatsType = StringStats;
277 type CodeType = StringCode;
278
279 fn code(&self) -> StringCode {
280 StringCode::Dict
281 }
282
283 fn expected_compression_ratio(
284 &self,
285 compressor: &BtrBlocksCompressor,
286 stats: &Self::StatsType,
287 ctx: CompressorContext,
288 excludes: &[StringCode],
289 ) -> VortexResult<f64> {
290 if stats.estimated_distinct_count > stats.value_count / 2 {
292 return Ok(0.0);
293 }
294
295 if stats.value_count == 0 {
297 return Ok(0.0);
298 }
299
300 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
301 }
302
303 fn compress(
304 &self,
305 compressor: &BtrBlocksCompressor,
306 stats: &Self::StatsType,
307 ctx: CompressorContext,
308 _excludes: &[StringCode],
309 ) -> VortexResult<ArrayRef> {
310 let dict = dict_encode(&stats.source().clone().into_array())?;
311
312 if ctx.allowed_cascading == 0 {
314 return Ok(dict.into_array());
315 }
316
317 let compressed_codes = compressor.compress_canonical(
319 Canonical::Primitive(dict.codes().to_primitive()),
320 ctx.descend(),
321 Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
322 )?;
323
324 let compressed_values = compressor.compress_canonical(
327 Canonical::VarBinView(dict.values().to_varbinview()),
328 ctx.descend(),
329 Excludes::from(&[DictScheme.code()]),
330 )?;
331
332 unsafe {
334 Ok(
335 DictArray::new_unchecked(compressed_codes, compressed_values)
336 .set_all_values_referenced(dict.has_all_values_referenced())
337 .into_array(),
338 )
339 }
340 }
341}
342
343impl Scheme for FSSTScheme {
344 type StatsType = StringStats;
345 type CodeType = StringCode;
346
347 fn code(&self) -> StringCode {
348 StringCode::Fsst
349 }
350
351 fn compress(
352 &self,
353 compressor: &BtrBlocksCompressor,
354 stats: &Self::StatsType,
355 ctx: CompressorContext,
356 _excludes: &[StringCode],
357 ) -> VortexResult<ArrayRef> {
358 let fsst = {
359 let compressor = fsst_train_compressor(&stats.src);
360 fsst_compress(&stats.src, &compressor)
361 };
362
363 let compressed_original_lengths = compressor.compress_canonical(
364 Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
365 ctx,
366 Excludes::none(),
367 )?;
368
369 let compressed_codes_offsets = compressor.compress_canonical(
370 Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
371 ctx,
372 Excludes::none(),
373 )?;
374 let compressed_codes = VarBinArray::try_new(
375 compressed_codes_offsets,
376 fsst.codes().bytes().clone(),
377 fsst.codes().dtype().clone(),
378 fsst.codes().validity().clone(),
379 )?;
380
381 let fsst = FSSTArray::try_new(
382 fsst.dtype().clone(),
383 fsst.symbols().clone(),
384 fsst.symbol_lengths().clone(),
385 compressed_codes,
386 compressed_original_lengths,
387 )?;
388
389 Ok(fsst.into_array())
390 }
391}
392
393impl Scheme for ConstantScheme {
394 type StatsType = StringStats;
395 type CodeType = StringCode;
396
397 fn code(&self) -> Self::CodeType {
398 StringCode::Constant
399 }
400
401 fn is_constant(&self) -> bool {
402 true
403 }
404
405 fn expected_compression_ratio(
406 &self,
407 _compressor: &BtrBlocksCompressor,
408 stats: &Self::StatsType,
409 ctx: CompressorContext,
410 _excludes: &[Self::CodeType],
411 ) -> VortexResult<f64> {
412 if ctx.is_sample {
413 return Ok(0.0);
414 }
415
416 if stats.estimated_distinct_count > 1
417 || !is_constant(&stats.src.clone().into_array())?.unwrap_or(false)
418 {
419 return Ok(0.0);
420 }
421
422 Ok(f64::MAX)
424 }
425
426 fn compress(
427 &self,
428 _compressor: &BtrBlocksCompressor,
429 stats: &Self::StatsType,
430 _ctx: CompressorContext,
431 _excludes: &[Self::CodeType],
432 ) -> VortexResult<ArrayRef> {
433 let scalar_idx =
434 (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
435
436 match scalar_idx {
437 Some(idx) => {
438 let scalar = stats.source().scalar_at(idx)?;
439 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
440 if !stats.source().all_valid()? {
441 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
442 } else {
443 Ok(const_arr)
444 }
445 }
446 None => Ok(ConstantArray::new(
447 Scalar::null(stats.src.dtype().clone()),
448 stats.src.len(),
449 )
450 .into_array()),
451 }
452 }
453}
454
455impl Scheme for NullDominated {
456 type StatsType = StringStats;
457 type CodeType = StringCode;
458
459 fn code(&self) -> Self::CodeType {
460 StringCode::Sparse
461 }
462
463 fn expected_compression_ratio(
464 &self,
465 _compressor: &BtrBlocksCompressor,
466 stats: &Self::StatsType,
467 ctx: CompressorContext,
468 _excludes: &[Self::CodeType],
469 ) -> VortexResult<f64> {
470 if ctx.allowed_cascading == 0 {
472 return Ok(0.0);
473 }
474
475 if stats.value_count == 0 {
476 return Ok(0.0);
478 }
479
480 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
482 return Ok(stats.src.len() as f64 / stats.value_count as f64);
483 }
484
485 Ok(0.0)
487 }
488
489 fn compress(
490 &self,
491 compressor: &BtrBlocksCompressor,
492 stats: &Self::StatsType,
493 ctx: CompressorContext,
494 _excludes: &[Self::CodeType],
495 ) -> VortexResult<ArrayRef> {
496 assert!(ctx.allowed_cascading > 0);
497
498 let sparse_encoded = SparseArray::encode(&stats.src.clone().into_array(), None)?;
500
501 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
502 let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
504
505 let indices = sparse.patches().indices().to_primitive().narrow()?;
506 let compressed_indices = compressor.compress_canonical(
507 Canonical::Primitive(indices),
508 ctx.descend(),
509 Excludes::int_only(&new_excludes),
510 )?;
511
512 SparseArray::try_new(
513 compressed_indices,
514 sparse.patches().values().clone(),
515 sparse.len(),
516 sparse.fill_scalar().clone(),
517 )
518 .map(|a| a.into_array())
519 } else {
520 Ok(sparse_encoded)
521 }
522 }
523}
524
525#[cfg(feature = "zstd")]
526impl Scheme for ZstdScheme {
527 type StatsType = StringStats;
528 type CodeType = StringCode;
529
530 fn code(&self) -> StringCode {
531 StringCode::Zstd
532 }
533
534 fn compress(
535 &self,
536 _compressor: &BtrBlocksCompressor,
537 stats: &Self::StatsType,
538 _ctx: CompressorContext,
539 _excludes: &[StringCode],
540 ) -> VortexResult<ArrayRef> {
541 let compacted = stats.source().compact_buffers()?;
542 Ok(
543 vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
544 .into_array(),
545 )
546 }
547}
548
549#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
550impl Scheme for ZstdBuffersScheme {
551 type StatsType = StringStats;
552 type CodeType = StringCode;
553
554 fn code(&self) -> StringCode {
555 StringCode::ZstdBuffers
556 }
557
558 fn compress(
559 &self,
560 _compressor: &BtrBlocksCompressor,
561 stats: &Self::StatsType,
562 _ctx: CompressorContext,
563 _excludes: &[StringCode],
564 ) -> VortexResult<ArrayRef> {
565 Ok(
566 vortex_zstd::ZstdBuffersArray::compress(&stats.source().clone().into_array(), 3)?
567 .into_array(),
568 )
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use vortex_array::IntoArray;
575 use vortex_array::arrays::VarBinViewArray;
576 use vortex_array::builders::ArrayBuilder;
577 use vortex_array::builders::VarBinViewBuilder;
578 use vortex_array::display::DisplayOptions;
579 use vortex_array::dtype::DType;
580 use vortex_array::dtype::Nullability;
581 use vortex_error::VortexResult;
582
583 use crate::BtrBlocksCompressor;
584
585 #[test]
586 fn test_strings() -> VortexResult<()> {
587 let mut strings = Vec::new();
588 for _ in 0..1024 {
589 strings.push(Some("hello-world-1234"));
590 }
591 for _ in 0..1024 {
592 strings.push(Some("hello-world-56789"));
593 }
594 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
595
596 let array_ref = strings.into_array();
597 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
598 assert_eq!(compressed.len(), 2048);
599
600 let display = compressed
601 .display_as(DisplayOptions::MetadataOnly)
602 .to_string()
603 .to_lowercase();
604 assert_eq!(display, "vortex.dict(utf8, len=2048)");
605
606 Ok(())
607 }
608
609 #[test]
610 fn test_sparse_nulls() -> VortexResult<()> {
611 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
612 strings.append_nulls(99);
613
614 strings.append_value("one little string");
615
616 let strings = strings.finish_into_varbinview();
617
618 let array_ref = strings.into_array();
619 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
620 assert_eq!(compressed.len(), 100);
621
622 let display = compressed
623 .display_as(DisplayOptions::MetadataOnly)
624 .to_string()
625 .to_lowercase();
626 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
627
628 Ok(())
629 }
630}
631
632#[cfg(test)]
634mod scheme_selection_tests {
635 use vortex_array::IntoArray;
636 use vortex_array::arrays::Constant;
637 use vortex_array::arrays::Dict;
638 use vortex_array::arrays::VarBinViewArray;
639 use vortex_array::dtype::DType;
640 use vortex_array::dtype::Nullability;
641 use vortex_error::VortexResult;
642 use vortex_fsst::FSST;
643
644 use crate::BtrBlocksCompressor;
645
646 #[test]
647 fn test_constant_compressed() -> VortexResult<()> {
648 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
649 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
650 let array_ref = array.into_array();
651 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
652 assert!(compressed.is::<Constant>());
653 Ok(())
654 }
655
656 #[test]
657 fn test_dict_compressed() -> VortexResult<()> {
658 let distinct_values = ["apple", "banana", "cherry"];
659 let mut strings = Vec::with_capacity(1000);
660 for i in 0..1000 {
661 strings.push(Some(distinct_values[i % 3]));
662 }
663 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
664 let array_ref = array.into_array();
665 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
666 assert!(compressed.is::<Dict>());
667 Ok(())
668 }
669
670 #[test]
671 fn test_fsst_compressed() -> VortexResult<()> {
672 let mut strings = Vec::with_capacity(1000);
673 for i in 0..1000 {
674 strings.push(Some(format!(
675 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
676 )));
677 }
678 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
679 let array_ref = array.into_array();
680 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
681 assert!(compressed.is::<FSST>());
682 Ok(())
683 }
684}