1use std::hash::Hash;
5use std::hash::Hasher;
6
7use enum_iterator::Sequence;
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_array::ToCanonical;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::DictArray;
14use vortex_array::arrays::MaskedArray;
15use vortex_array::arrays::VarBinArray;
16use vortex_array::arrays::VarBinViewArray;
17use vortex_array::arrays::VarBinViewVTable;
18use vortex_array::builders::dict::dict_encode;
19use vortex_array::compute::is_constant;
20use vortex_array::scalar::Scalar;
21use vortex_array::vtable::VTable;
22use vortex_array::vtable::ValidityHelper;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_error::vortex_err;
26use vortex_fsst::FSSTArray;
27use vortex_fsst::fsst_compress;
28use vortex_fsst::fsst_train_compressor;
29use vortex_sparse::SparseArray;
30use vortex_sparse::SparseVTable;
31use vortex_utils::aliases::hash_set::HashSet;
32
33use super::integer::DictScheme as IntDictScheme;
34use super::integer::SequenceScheme as IntSequenceScheme;
35use super::integer::SparseScheme as IntSparseScheme;
36use crate::BtrBlocksCompressor;
37use crate::CanonicalCompressor;
38use crate::Compressor;
39use crate::CompressorContext;
40use crate::CompressorStats;
41use crate::Excludes;
42use crate::GenerateStatsOptions;
43use crate::IntCode;
44use crate::Scheme;
45use crate::SchemeExt;
46use crate::sample::sample;
47
48#[derive(Clone, Debug)]
50pub struct StringStats {
51 src: VarBinViewArray,
52 estimated_distinct_count: u32,
53 value_count: u32,
54 null_count: u32,
55}
56
57fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
59 let views = strings.views();
60 let mut distinct = HashSet::with_capacity(views.len() / 2);
64 views.iter().for_each(|&view| {
65 #[expect(
66 clippy::cast_possible_truncation,
67 reason = "approximate uniqueness with view prefix"
68 )]
69 let len_and_prefix = view.as_u128() as u64;
70 distinct.insert(len_and_prefix);
71 });
72
73 Ok(u32::try_from(distinct.len())?)
74}
75
76impl StringStats {
77 fn generate_opts_fallible(
78 input: &VarBinViewArray,
79 opts: GenerateStatsOptions,
80 ) -> VortexResult<Self> {
81 let null_count = input
82 .statistics()
83 .compute_null_count()
84 .ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
85 let value_count = input.len() - null_count;
86 let estimated_distinct = if opts.count_distinct_values {
87 estimate_distinct_count(input)?
88 } else {
89 u32::MAX
90 };
91
92 Ok(Self {
93 src: input.clone(),
94 value_count: u32::try_from(value_count)?,
95 null_count: u32::try_from(null_count)?,
96 estimated_distinct_count: estimated_distinct,
97 })
98 }
99}
100
101impl CompressorStats for StringStats {
102 type ArrayVTable = VarBinViewVTable;
103
104 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
105 Self::generate_opts_fallible(input, opts)
106 .vortex_expect("StringStats::generate_opts should not fail")
107 }
108
109 fn source(&self) -> &VarBinViewArray {
110 &self.src
111 }
112
113 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
114 let sampled = sample(&self.src.to_array(), sample_size, sample_count).to_varbinview();
115
116 Self::generate_opts(&sampled, opts)
117 }
118}
119
120pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
122 &UncompressedScheme,
123 &DictScheme,
124 &FSSTScheme,
125 &ConstantScheme,
126 &NullDominated,
127 #[cfg(feature = "zstd")]
128 &ZstdScheme,
129 #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
130 &ZstdBuffersScheme,
131];
132
133#[derive(Clone, Copy)]
135pub struct StringCompressor<'a> {
136 pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
138}
139
140impl<'a> Compressor for StringCompressor<'a> {
141 type ArrayVTable = VarBinViewVTable;
142 type SchemeType = dyn StringScheme;
143 type StatsType = StringStats;
144
145 fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
146 if self
147 .btr_blocks_compressor
148 .string_schemes()
149 .iter()
150 .any(|s| s.code() == DictScheme.code())
151 {
152 StringStats::generate_opts(
153 array,
154 GenerateStatsOptions {
155 count_distinct_values: true,
156 },
157 )
158 } else {
159 StringStats::generate_opts(
160 array,
161 GenerateStatsOptions {
162 count_distinct_values: false,
163 },
164 )
165 }
166 }
167
168 fn schemes(&self) -> &[&'static dyn StringScheme] {
169 self.btr_blocks_compressor.string_schemes()
170 }
171
172 fn default_scheme(&self) -> &'static Self::SchemeType {
173 &UncompressedScheme
174 }
175}
176
177pub trait StringScheme:
178 Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
179{
180}
181
182impl<T> StringScheme for T where
183 T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
184{
185}
186
187impl PartialEq for dyn StringScheme {
188 fn eq(&self, other: &Self) -> bool {
189 self.code() == other.code()
190 }
191}
192
193impl Eq for dyn StringScheme {}
194
195impl Hash for dyn StringScheme {
196 fn hash<H: Hasher>(&self, state: &mut H) {
197 self.code().hash(state)
198 }
199}
200
201#[derive(Debug, Copy, Clone, PartialEq, Eq)]
202pub struct UncompressedScheme;
203
204#[derive(Debug, Copy, Clone, PartialEq, Eq)]
205pub struct DictScheme;
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq)]
208pub struct FSSTScheme;
209
210#[derive(Debug, Copy, Clone, PartialEq, Eq)]
211pub struct ConstantScheme;
212
213#[derive(Debug, Copy, Clone, PartialEq, Eq)]
214pub struct NullDominated;
215
216#[cfg(feature = "zstd")]
218#[derive(Debug, Copy, Clone, PartialEq, Eq)]
219pub struct ZstdScheme;
220
221#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
223#[derive(Debug, Copy, Clone, PartialEq, Eq)]
224pub struct ZstdBuffersScheme;
225
226#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
228pub enum StringCode {
229 Uncompressed,
231 Dict,
233 Fsst,
235 Constant,
237 Sparse,
239 Zstd,
241 ZstdBuffers,
243}
244
245impl Scheme for UncompressedScheme {
246 type StatsType = StringStats;
247 type CodeType = StringCode;
248
249 fn code(&self) -> StringCode {
250 StringCode::Uncompressed
251 }
252
253 fn expected_compression_ratio(
254 &self,
255 _compressor: &BtrBlocksCompressor,
256 _stats: &Self::StatsType,
257 _ctx: CompressorContext,
258 _excludes: &[StringCode],
259 ) -> VortexResult<f64> {
260 Ok(1.0)
261 }
262
263 fn compress(
264 &self,
265 _compressor: &BtrBlocksCompressor,
266 stats: &Self::StatsType,
267 _ctx: CompressorContext,
268 _excludes: &[StringCode],
269 ) -> VortexResult<ArrayRef> {
270 Ok(stats.source().to_array())
271 }
272}
273
274impl Scheme for DictScheme {
275 type StatsType = StringStats;
276 type CodeType = StringCode;
277
278 fn code(&self) -> StringCode {
279 StringCode::Dict
280 }
281
282 fn expected_compression_ratio(
283 &self,
284 compressor: &BtrBlocksCompressor,
285 stats: &Self::StatsType,
286 ctx: CompressorContext,
287 excludes: &[StringCode],
288 ) -> VortexResult<f64> {
289 if stats.estimated_distinct_count > stats.value_count / 2 {
291 return Ok(0.0);
292 }
293
294 if stats.value_count == 0 {
296 return Ok(0.0);
297 }
298
299 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
300 }
301
302 fn compress(
303 &self,
304 compressor: &BtrBlocksCompressor,
305 stats: &Self::StatsType,
306 ctx: CompressorContext,
307 _excludes: &[StringCode],
308 ) -> VortexResult<ArrayRef> {
309 let dict = dict_encode(&stats.source().clone().into_array())?;
310
311 if ctx.allowed_cascading == 0 {
313 return Ok(dict.into_array());
314 }
315
316 let compressed_codes = compressor.compress_canonical(
318 Canonical::Primitive(dict.codes().to_primitive()),
319 ctx.descend(),
320 Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
321 )?;
322
323 let compressed_values = compressor.compress_canonical(
326 Canonical::VarBinView(dict.values().to_varbinview()),
327 ctx.descend(),
328 Excludes::from(&[DictScheme.code()]),
329 )?;
330
331 unsafe {
333 Ok(
334 DictArray::new_unchecked(compressed_codes, compressed_values)
335 .set_all_values_referenced(dict.has_all_values_referenced())
336 .into_array(),
337 )
338 }
339 }
340}
341
342impl Scheme for FSSTScheme {
343 type StatsType = StringStats;
344 type CodeType = StringCode;
345
346 fn code(&self) -> StringCode {
347 StringCode::Fsst
348 }
349
350 fn compress(
351 &self,
352 compressor: &BtrBlocksCompressor,
353 stats: &Self::StatsType,
354 ctx: CompressorContext,
355 _excludes: &[StringCode],
356 ) -> VortexResult<ArrayRef> {
357 let fsst = {
358 let compressor = fsst_train_compressor(&stats.src);
359 fsst_compress(&stats.src, &compressor)
360 };
361
362 let compressed_original_lengths = compressor.compress_canonical(
363 Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
364 ctx,
365 Excludes::none(),
366 )?;
367
368 let compressed_codes_offsets = compressor.compress_canonical(
369 Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
370 ctx,
371 Excludes::none(),
372 )?;
373 let compressed_codes = VarBinArray::try_new(
374 compressed_codes_offsets,
375 fsst.codes().bytes().clone(),
376 fsst.codes().dtype().clone(),
377 fsst.codes().validity().clone(),
378 )?;
379
380 let fsst = FSSTArray::try_new(
381 fsst.dtype().clone(),
382 fsst.symbols().clone(),
383 fsst.symbol_lengths().clone(),
384 compressed_codes,
385 compressed_original_lengths,
386 )?;
387
388 Ok(fsst.into_array())
389 }
390}
391
392impl Scheme for ConstantScheme {
393 type StatsType = StringStats;
394 type CodeType = StringCode;
395
396 fn code(&self) -> Self::CodeType {
397 StringCode::Constant
398 }
399
400 fn is_constant(&self) -> bool {
401 true
402 }
403
404 fn expected_compression_ratio(
405 &self,
406 _compressor: &BtrBlocksCompressor,
407 stats: &Self::StatsType,
408 ctx: CompressorContext,
409 _excludes: &[Self::CodeType],
410 ) -> VortexResult<f64> {
411 if ctx.is_sample {
412 return Ok(0.0);
413 }
414
415 if stats.estimated_distinct_count > 1
416 || !is_constant(&stats.src.to_array())?.unwrap_or(false)
417 {
418 return Ok(0.0);
419 }
420
421 Ok(f64::MAX)
423 }
424
425 fn compress(
426 &self,
427 _compressor: &BtrBlocksCompressor,
428 stats: &Self::StatsType,
429 _ctx: CompressorContext,
430 _excludes: &[Self::CodeType],
431 ) -> VortexResult<ArrayRef> {
432 let scalar_idx =
433 (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
434
435 match scalar_idx {
436 Some(idx) => {
437 let scalar = stats.source().scalar_at(idx)?;
438 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
439 if !stats.source().all_valid()? {
440 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
441 } else {
442 Ok(const_arr)
443 }
444 }
445 None => Ok(ConstantArray::new(
446 Scalar::null(stats.src.dtype().clone()),
447 stats.src.len(),
448 )
449 .into_array()),
450 }
451 }
452}
453
454impl Scheme for NullDominated {
455 type StatsType = StringStats;
456 type CodeType = StringCode;
457
458 fn code(&self) -> Self::CodeType {
459 StringCode::Sparse
460 }
461
462 fn expected_compression_ratio(
463 &self,
464 _compressor: &BtrBlocksCompressor,
465 stats: &Self::StatsType,
466 ctx: CompressorContext,
467 _excludes: &[Self::CodeType],
468 ) -> VortexResult<f64> {
469 if ctx.allowed_cascading == 0 {
471 return Ok(0.0);
472 }
473
474 if stats.value_count == 0 {
475 return Ok(0.0);
477 }
478
479 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
481 return Ok(stats.src.len() as f64 / stats.value_count as f64);
482 }
483
484 Ok(0.0)
486 }
487
488 fn compress(
489 &self,
490 compressor: &BtrBlocksCompressor,
491 stats: &Self::StatsType,
492 ctx: CompressorContext,
493 _excludes: &[Self::CodeType],
494 ) -> VortexResult<ArrayRef> {
495 assert!(ctx.allowed_cascading > 0);
496
497 let sparse_encoded = SparseArray::encode(&stats.src.to_array(), None)?;
499
500 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
501 let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
503
504 let indices = sparse.patches().indices().to_primitive().narrow()?;
505 let compressed_indices = compressor.compress_canonical(
506 Canonical::Primitive(indices),
507 ctx.descend(),
508 Excludes::int_only(&new_excludes),
509 )?;
510
511 SparseArray::try_new(
512 compressed_indices,
513 sparse.patches().values().clone(),
514 sparse.len(),
515 sparse.fill_scalar().clone(),
516 )
517 .map(|a| a.into_array())
518 } else {
519 Ok(sparse_encoded)
520 }
521 }
522}
523
524#[cfg(feature = "zstd")]
525impl Scheme for ZstdScheme {
526 type StatsType = StringStats;
527 type CodeType = StringCode;
528
529 fn code(&self) -> StringCode {
530 StringCode::Zstd
531 }
532
533 fn compress(
534 &self,
535 _compressor: &BtrBlocksCompressor,
536 stats: &Self::StatsType,
537 _ctx: CompressorContext,
538 _excludes: &[StringCode],
539 ) -> VortexResult<ArrayRef> {
540 let compacted = stats.source().compact_buffers()?;
541 Ok(
542 vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
543 .into_array(),
544 )
545 }
546}
547
548#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
549impl Scheme for ZstdBuffersScheme {
550 type StatsType = StringStats;
551 type CodeType = StringCode;
552
553 fn code(&self) -> StringCode {
554 StringCode::ZstdBuffers
555 }
556
557 fn compress(
558 &self,
559 _compressor: &BtrBlocksCompressor,
560 stats: &Self::StatsType,
561 _ctx: CompressorContext,
562 _excludes: &[StringCode],
563 ) -> VortexResult<ArrayRef> {
564 Ok(vortex_zstd::ZstdBuffersArray::compress(&stats.source().to_array(), 3)?.into_array())
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use vortex_array::IntoArray;
571 use vortex_array::arrays::VarBinViewArray;
572 use vortex_array::builders::ArrayBuilder;
573 use vortex_array::builders::VarBinViewBuilder;
574 use vortex_array::display::DisplayOptions;
575 use vortex_array::dtype::DType;
576 use vortex_array::dtype::Nullability;
577 use vortex_error::VortexResult;
578
579 use crate::BtrBlocksCompressor;
580
581 #[test]
582 fn test_strings() -> VortexResult<()> {
583 let mut strings = Vec::new();
584 for _ in 0..1024 {
585 strings.push(Some("hello-world-1234"));
586 }
587 for _ in 0..1024 {
588 strings.push(Some("hello-world-56789"));
589 }
590 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
591
592 let array_ref = strings.into_array();
593 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
594 assert_eq!(compressed.len(), 2048);
595
596 let display = compressed
597 .display_as(DisplayOptions::MetadataOnly)
598 .to_string()
599 .to_lowercase();
600 assert_eq!(display, "vortex.dict(utf8, len=2048)");
601
602 Ok(())
603 }
604
605 #[test]
606 fn test_sparse_nulls() -> VortexResult<()> {
607 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
608 strings.append_nulls(99);
609
610 strings.append_value("one little string");
611
612 let strings = strings.finish_into_varbinview();
613
614 let array_ref = strings.into_array();
615 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
616 assert_eq!(compressed.len(), 100);
617
618 let display = compressed
619 .display_as(DisplayOptions::MetadataOnly)
620 .to_string()
621 .to_lowercase();
622 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
623
624 Ok(())
625 }
626}
627
628#[cfg(test)]
630mod scheme_selection_tests {
631 use vortex_array::IntoArray;
632 use vortex_array::arrays::ConstantVTable;
633 use vortex_array::arrays::DictVTable;
634 use vortex_array::arrays::VarBinViewArray;
635 use vortex_array::dtype::DType;
636 use vortex_array::dtype::Nullability;
637 use vortex_error::VortexResult;
638 use vortex_fsst::FSSTVTable;
639
640 use crate::BtrBlocksCompressor;
641
642 #[test]
643 fn test_constant_compressed() -> VortexResult<()> {
644 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
645 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
646 let array_ref = array.into_array();
647 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
648 assert!(compressed.is::<ConstantVTable>());
649 Ok(())
650 }
651
652 #[test]
653 fn test_dict_compressed() -> VortexResult<()> {
654 let distinct_values = ["apple", "banana", "cherry"];
655 let mut strings = Vec::with_capacity(1000);
656 for i in 0..1000 {
657 strings.push(Some(distinct_values[i % 3]));
658 }
659 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
660 let array_ref = array.into_array();
661 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
662 assert!(compressed.is::<DictVTable>());
663 Ok(())
664 }
665
666 #[test]
667 fn test_fsst_compressed() -> VortexResult<()> {
668 let mut strings = Vec::with_capacity(1000);
669 for i in 0..1000 {
670 strings.push(Some(format!(
671 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
672 )));
673 }
674 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
675 let array_ref = array.into_array();
676 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
677 assert!(compressed.is::<FSSTVTable>());
678 Ok(())
679 }
680}