1use std::hash::Hash;
5use std::hash::Hasher;
6
7use enum_iterator::Sequence;
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_array::ToCanonical;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::DictArray;
14use vortex_array::arrays::MaskedArray;
15use vortex_array::arrays::VarBinArray;
16use vortex_array::arrays::VarBinViewArray;
17use vortex_array::arrays::VarBinViewVTable;
18use vortex_array::builders::dict::dict_encode;
19use vortex_array::compute::is_constant;
20use vortex_array::scalar::Scalar;
21use vortex_array::vtable::VTable;
22use vortex_array::vtable::ValidityHelper;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_error::vortex_err;
26use vortex_fsst::FSSTArray;
27use vortex_fsst::fsst_compress;
28use vortex_fsst::fsst_train_compressor;
29use vortex_sparse::SparseArray;
30use vortex_sparse::SparseVTable;
31use vortex_utils::aliases::hash_set::HashSet;
32
33use super::integer::DictScheme as IntDictScheme;
34use super::integer::SequenceScheme as IntSequenceScheme;
35use super::integer::SparseScheme as IntSparseScheme;
36use crate::BtrBlocksCompressor;
37use crate::CanonicalCompressor;
38use crate::Compressor;
39use crate::CompressorContext;
40use crate::CompressorStats;
41use crate::Excludes;
42use crate::GenerateStatsOptions;
43use crate::IntCode;
44use crate::Scheme;
45use crate::SchemeExt;
46use crate::sample::sample;
47
48#[derive(Clone, Debug)]
50pub struct StringStats {
51 src: VarBinViewArray,
52 estimated_distinct_count: u32,
53 value_count: u32,
54 null_count: u32,
55}
56
57fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
59 let views = strings.views();
60 let mut distinct = HashSet::with_capacity(views.len() / 2);
64 views.iter().for_each(|&view| {
65 #[expect(
66 clippy::cast_possible_truncation,
67 reason = "approximate uniqueness with view prefix"
68 )]
69 let len_and_prefix = view.as_u128() as u64;
70 distinct.insert(len_and_prefix);
71 });
72
73 Ok(u32::try_from(distinct.len())?)
74}
75
76impl StringStats {
77 fn generate_opts_fallible(
78 input: &VarBinViewArray,
79 opts: GenerateStatsOptions,
80 ) -> VortexResult<Self> {
81 let null_count = input
82 .statistics()
83 .compute_null_count()
84 .ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
85 let value_count = input.len() - null_count;
86 let estimated_distinct = if opts.count_distinct_values {
87 estimate_distinct_count(input)?
88 } else {
89 u32::MAX
90 };
91
92 Ok(Self {
93 src: input.clone(),
94 value_count: u32::try_from(value_count)?,
95 null_count: u32::try_from(null_count)?,
96 estimated_distinct_count: estimated_distinct,
97 })
98 }
99}
100
101impl CompressorStats for StringStats {
102 type ArrayVTable = VarBinViewVTable;
103
104 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
105 Self::generate_opts_fallible(input, opts)
106 .vortex_expect("StringStats::generate_opts should not fail")
107 }
108
109 fn source(&self) -> &VarBinViewArray {
110 &self.src
111 }
112
113 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
114 let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
115
116 Self::generate_opts(&sampled, opts)
117 }
118}
119
120pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
122 &UncompressedScheme,
123 &DictScheme,
124 &FSSTScheme,
125 &ConstantScheme,
126 &NullDominated,
127 #[cfg(feature = "zstd")]
128 &ZstdScheme,
129 #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
130 &ZstdBuffersScheme,
131];
132
133#[derive(Clone, Copy)]
135pub struct StringCompressor<'a> {
136 pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
138}
139
140impl<'a> Compressor for StringCompressor<'a> {
141 type ArrayVTable = VarBinViewVTable;
142 type SchemeType = dyn StringScheme;
143 type StatsType = StringStats;
144
145 fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
146 if self
147 .btr_blocks_compressor
148 .string_schemes()
149 .iter()
150 .any(|s| s.code() == DictScheme.code())
151 {
152 StringStats::generate_opts(
153 array,
154 GenerateStatsOptions {
155 count_distinct_values: true,
156 },
157 )
158 } else {
159 StringStats::generate_opts(
160 array,
161 GenerateStatsOptions {
162 count_distinct_values: false,
163 },
164 )
165 }
166 }
167
168 fn schemes(&self) -> &[&'static dyn StringScheme] {
169 self.btr_blocks_compressor.string_schemes()
170 }
171
172 fn default_scheme(&self) -> &'static Self::SchemeType {
173 &UncompressedScheme
174 }
175}
176
177pub trait StringScheme:
178 Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
179{
180}
181
182impl<T> StringScheme for T where
183 T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
184{
185}
186
187impl PartialEq for dyn StringScheme {
188 fn eq(&self, other: &Self) -> bool {
189 self.code() == other.code()
190 }
191}
192
193impl Eq for dyn StringScheme {}
194
195impl Hash for dyn StringScheme {
196 fn hash<H: Hasher>(&self, state: &mut H) {
197 self.code().hash(state)
198 }
199}
200
201#[derive(Debug, Copy, Clone, PartialEq, Eq)]
202pub struct UncompressedScheme;
203
204#[derive(Debug, Copy, Clone, PartialEq, Eq)]
205pub struct DictScheme;
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq)]
208pub struct FSSTScheme;
209
210#[derive(Debug, Copy, Clone, PartialEq, Eq)]
211pub struct ConstantScheme;
212
213#[derive(Debug, Copy, Clone, PartialEq, Eq)]
214pub struct NullDominated;
215
216#[cfg(feature = "zstd")]
218#[derive(Debug, Copy, Clone, PartialEq, Eq)]
219pub struct ZstdScheme;
220
221#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
223#[derive(Debug, Copy, Clone, PartialEq, Eq)]
224pub struct ZstdBuffersScheme;
225
226#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
228pub enum StringCode {
229 Uncompressed,
231 Dict,
233 Fsst,
235 Constant,
237 Sparse,
239 Zstd,
241 ZstdBuffers,
243}
244
245impl Scheme for UncompressedScheme {
246 type StatsType = StringStats;
247 type CodeType = StringCode;
248
249 fn code(&self) -> StringCode {
250 StringCode::Uncompressed
251 }
252
253 fn expected_compression_ratio(
254 &self,
255 _compressor: &BtrBlocksCompressor,
256 _stats: &Self::StatsType,
257 _ctx: CompressorContext,
258 _excludes: &[StringCode],
259 ) -> VortexResult<f64> {
260 Ok(1.0)
261 }
262
263 fn compress(
264 &self,
265 _compressor: &BtrBlocksCompressor,
266 stats: &Self::StatsType,
267 _ctx: CompressorContext,
268 _excludes: &[StringCode],
269 ) -> VortexResult<ArrayRef> {
270 Ok(stats.source().to_array())
271 }
272}
273
274impl Scheme for DictScheme {
275 type StatsType = StringStats;
276 type CodeType = StringCode;
277
278 fn code(&self) -> StringCode {
279 StringCode::Dict
280 }
281
282 fn expected_compression_ratio(
283 &self,
284 compressor: &BtrBlocksCompressor,
285 stats: &Self::StatsType,
286 ctx: CompressorContext,
287 excludes: &[StringCode],
288 ) -> VortexResult<f64> {
289 if stats.estimated_distinct_count > stats.value_count / 2 {
291 return Ok(0.0);
292 }
293
294 if stats.value_count == 0 {
296 return Ok(0.0);
297 }
298
299 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
300 }
301
302 fn compress(
303 &self,
304 compressor: &BtrBlocksCompressor,
305 stats: &Self::StatsType,
306 ctx: CompressorContext,
307 _excludes: &[StringCode],
308 ) -> VortexResult<ArrayRef> {
309 let dict = dict_encode(&stats.source().clone().into_array())?;
310
311 if ctx.allowed_cascading == 0 {
313 return Ok(dict.into_array());
314 }
315
316 let compressed_codes = compressor.compress_canonical(
318 Canonical::Primitive(dict.codes().to_primitive()),
319 ctx.descend(),
320 Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
321 )?;
322
323 let compressed_values = compressor.compress_canonical(
326 Canonical::VarBinView(dict.values().to_varbinview()),
327 ctx.descend(),
328 Excludes::from(&[DictScheme.code()]),
329 )?;
330
331 unsafe {
333 Ok(
334 DictArray::new_unchecked(compressed_codes, compressed_values)
335 .set_all_values_referenced(dict.has_all_values_referenced())
336 .into_array(),
337 )
338 }
339 }
340}
341
342impl Scheme for FSSTScheme {
343 type StatsType = StringStats;
344 type CodeType = StringCode;
345
346 fn code(&self) -> StringCode {
347 StringCode::Fsst
348 }
349
350 fn compress(
351 &self,
352 compressor: &BtrBlocksCompressor,
353 stats: &Self::StatsType,
354 ctx: CompressorContext,
355 _excludes: &[StringCode],
356 ) -> VortexResult<ArrayRef> {
357 let fsst = {
358 let compressor = fsst_train_compressor(&stats.src);
359 fsst_compress(&stats.src, &compressor)
360 };
361
362 let compressed_original_lengths = compressor.compress_canonical(
363 Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
364 ctx,
365 Excludes::none(),
366 )?;
367
368 let compressed_codes_offsets = compressor.compress_canonical(
369 Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
370 ctx,
371 Excludes::none(),
372 )?;
373 let compressed_codes = VarBinArray::try_new(
374 compressed_codes_offsets,
375 fsst.codes().bytes().clone(),
376 fsst.codes().dtype().clone(),
377 fsst.codes().validity().clone(),
378 )?;
379
380 let fsst = FSSTArray::try_new(
381 fsst.dtype().clone(),
382 fsst.symbols().clone(),
383 fsst.symbol_lengths().clone(),
384 compressed_codes,
385 compressed_original_lengths,
386 )?;
387
388 Ok(fsst.into_array())
389 }
390}
391
392impl Scheme for ConstantScheme {
393 type StatsType = StringStats;
394 type CodeType = StringCode;
395
396 fn code(&self) -> Self::CodeType {
397 StringCode::Constant
398 }
399
400 fn is_constant(&self) -> bool {
401 true
402 }
403
404 fn expected_compression_ratio(
405 &self,
406 _compressor: &BtrBlocksCompressor,
407 stats: &Self::StatsType,
408 ctx: CompressorContext,
409 _excludes: &[Self::CodeType],
410 ) -> VortexResult<f64> {
411 if ctx.is_sample {
412 return Ok(0.0);
413 }
414
415 if stats.estimated_distinct_count > 1 || !is_constant(stats.src.as_ref())?.unwrap_or(false)
416 {
417 return Ok(0.0);
418 }
419
420 Ok(f64::MAX)
422 }
423
424 fn compress(
425 &self,
426 _compressor: &BtrBlocksCompressor,
427 stats: &Self::StatsType,
428 _ctx: CompressorContext,
429 _excludes: &[Self::CodeType],
430 ) -> VortexResult<ArrayRef> {
431 let scalar_idx =
432 (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
433
434 match scalar_idx {
435 Some(idx) => {
436 let scalar = stats.source().scalar_at(idx)?;
437 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
438 if !stats.source().all_valid()? {
439 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
440 } else {
441 Ok(const_arr)
442 }
443 }
444 None => Ok(ConstantArray::new(
445 Scalar::null(stats.src.dtype().clone()),
446 stats.src.len(),
447 )
448 .into_array()),
449 }
450 }
451}
452
453impl Scheme for NullDominated {
454 type StatsType = StringStats;
455 type CodeType = StringCode;
456
457 fn code(&self) -> Self::CodeType {
458 StringCode::Sparse
459 }
460
461 fn expected_compression_ratio(
462 &self,
463 _compressor: &BtrBlocksCompressor,
464 stats: &Self::StatsType,
465 ctx: CompressorContext,
466 _excludes: &[Self::CodeType],
467 ) -> VortexResult<f64> {
468 if ctx.allowed_cascading == 0 {
470 return Ok(0.0);
471 }
472
473 if stats.value_count == 0 {
474 return Ok(0.0);
476 }
477
478 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
480 return Ok(stats.src.len() as f64 / stats.value_count as f64);
481 }
482
483 Ok(0.0)
485 }
486
487 fn compress(
488 &self,
489 compressor: &BtrBlocksCompressor,
490 stats: &Self::StatsType,
491 ctx: CompressorContext,
492 _excludes: &[Self::CodeType],
493 ) -> VortexResult<ArrayRef> {
494 assert!(ctx.allowed_cascading > 0);
495
496 let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
498
499 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
500 let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
502
503 let indices = sparse.patches().indices().to_primitive().narrow()?;
504 let compressed_indices = compressor.compress_canonical(
505 Canonical::Primitive(indices),
506 ctx.descend(),
507 Excludes::int_only(&new_excludes),
508 )?;
509
510 SparseArray::try_new(
511 compressed_indices,
512 sparse.patches().values().clone(),
513 sparse.len(),
514 sparse.fill_scalar().clone(),
515 )
516 .map(|a| a.into_array())
517 } else {
518 Ok(sparse_encoded)
519 }
520 }
521}
522
523#[cfg(feature = "zstd")]
524impl Scheme for ZstdScheme {
525 type StatsType = StringStats;
526 type CodeType = StringCode;
527
528 fn code(&self) -> StringCode {
529 StringCode::Zstd
530 }
531
532 fn compress(
533 &self,
534 _compressor: &BtrBlocksCompressor,
535 stats: &Self::StatsType,
536 _ctx: CompressorContext,
537 _excludes: &[StringCode],
538 ) -> VortexResult<ArrayRef> {
539 let compacted = stats.source().compact_buffers()?;
540 Ok(
541 vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
542 .into_array(),
543 )
544 }
545}
546
547#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
548impl Scheme for ZstdBuffersScheme {
549 type StatsType = StringStats;
550 type CodeType = StringCode;
551
552 fn code(&self) -> StringCode {
553 StringCode::ZstdBuffers
554 }
555
556 fn compress(
557 &self,
558 _compressor: &BtrBlocksCompressor,
559 stats: &Self::StatsType,
560 _ctx: CompressorContext,
561 _excludes: &[StringCode],
562 ) -> VortexResult<ArrayRef> {
563 Ok(vortex_zstd::ZstdBuffersArray::compress(&stats.source().to_array(), 3)?.into_array())
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use vortex_array::arrays::VarBinViewArray;
570 use vortex_array::builders::ArrayBuilder;
571 use vortex_array::builders::VarBinViewBuilder;
572 use vortex_array::display::DisplayOptions;
573 use vortex_dtype::DType;
574 use vortex_dtype::Nullability;
575 use vortex_error::VortexResult;
576
577 use crate::BtrBlocksCompressor;
578
579 #[test]
580 fn test_strings() -> VortexResult<()> {
581 let mut strings = Vec::new();
582 for _ in 0..1024 {
583 strings.push(Some("hello-world-1234"));
584 }
585 for _ in 0..1024 {
586 strings.push(Some("hello-world-56789"));
587 }
588 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
589
590 let compressed = BtrBlocksCompressor::default().compress(strings.as_ref())?;
591 assert_eq!(compressed.len(), 2048);
592
593 let display = compressed
594 .display_as(DisplayOptions::MetadataOnly)
595 .to_string()
596 .to_lowercase();
597 assert_eq!(display, "vortex.dict(utf8, len=2048)");
598
599 Ok(())
600 }
601
602 #[test]
603 fn test_sparse_nulls() -> VortexResult<()> {
604 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
605 strings.append_nulls(99);
606
607 strings.append_value("one little string");
608
609 let strings = strings.finish_into_varbinview();
610
611 let compressed = BtrBlocksCompressor::default().compress(strings.as_ref())?;
612 assert_eq!(compressed.len(), 100);
613
614 let display = compressed
615 .display_as(DisplayOptions::MetadataOnly)
616 .to_string()
617 .to_lowercase();
618 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
619
620 Ok(())
621 }
622}
623
624#[cfg(test)]
626mod scheme_selection_tests {
627 use vortex_array::arrays::ConstantVTable;
628 use vortex_array::arrays::DictVTable;
629 use vortex_array::arrays::VarBinViewArray;
630 use vortex_dtype::DType;
631 use vortex_dtype::Nullability;
632 use vortex_error::VortexResult;
633 use vortex_fsst::FSSTVTable;
634
635 use crate::BtrBlocksCompressor;
636
637 #[test]
638 fn test_constant_compressed() -> VortexResult<()> {
639 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
640 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
641 let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?;
642 assert!(compressed.is::<ConstantVTable>());
643 Ok(())
644 }
645
646 #[test]
647 fn test_dict_compressed() -> VortexResult<()> {
648 let distinct_values = ["apple", "banana", "cherry"];
649 let mut strings = Vec::with_capacity(1000);
650 for i in 0..1000 {
651 strings.push(Some(distinct_values[i % 3]));
652 }
653 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
654 let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?;
655 assert!(compressed.is::<DictVTable>());
656 Ok(())
657 }
658
659 #[test]
660 fn test_fsst_compressed() -> VortexResult<()> {
661 let mut strings = Vec::with_capacity(1000);
662 for i in 0..1000 {
663 strings.push(Some(format!(
664 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
665 )));
666 }
667 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
668 let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?;
669 assert!(compressed.is::<FSSTVTable>());
670 Ok(())
671 }
672}