1use std::hash::Hash;
5use std::hash::Hasher;
6
7use enum_iterator::Sequence;
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical;
13use vortex_array::VortexSessionExecute;
14use vortex_array::aggregate_fn::fns::is_constant::is_constant;
15use vortex_array::arrays::ConstantArray;
16use vortex_array::arrays::DictArray;
17use vortex_array::arrays::MaskedArray;
18use vortex_array::arrays::VarBinArray;
19use vortex_array::arrays::VarBinView;
20use vortex_array::arrays::VarBinViewArray;
21use vortex_array::builders::dict::dict_encode;
22use vortex_array::scalar::Scalar;
23use vortex_array::vtable::VTable;
24use vortex_array::vtable::ValidityHelper;
25use vortex_error::VortexExpect;
26use vortex_error::VortexResult;
27use vortex_error::vortex_err;
28use vortex_fsst::FSSTArray;
29use vortex_fsst::fsst_compress;
30use vortex_fsst::fsst_train_compressor;
31use vortex_sparse::Sparse;
32use vortex_sparse::SparseArray;
33use vortex_utils::aliases::hash_set::HashSet;
34
35use super::integer::DictScheme as IntDictScheme;
36use super::integer::SequenceScheme as IntSequenceScheme;
37use super::integer::SparseScheme as IntSparseScheme;
38use crate::BtrBlocksCompressor;
39use crate::CanonicalCompressor;
40use crate::Compressor;
41use crate::CompressorContext;
42use crate::CompressorStats;
43use crate::Excludes;
44use crate::GenerateStatsOptions;
45use crate::IntCode;
46use crate::Scheme;
47use crate::SchemeExt;
48use crate::sample::sample;
49
50#[derive(Clone, Debug)]
52pub struct StringStats {
53 src: VarBinViewArray,
54 estimated_distinct_count: u32,
55 value_count: u32,
56 null_count: u32,
57}
58
59fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult<u32> {
61 let views = strings.views();
62 let mut distinct = HashSet::with_capacity(views.len() / 2);
66 views.iter().for_each(|&view| {
67 #[expect(
68 clippy::cast_possible_truncation,
69 reason = "approximate uniqueness with view prefix"
70 )]
71 let len_and_prefix = view.as_u128() as u64;
72 distinct.insert(len_and_prefix);
73 });
74
75 Ok(u32::try_from(distinct.len())?)
76}
77
78impl StringStats {
79 fn generate_opts_fallible(
80 input: &VarBinViewArray,
81 opts: GenerateStatsOptions,
82 ) -> VortexResult<Self> {
83 let null_count = input
84 .statistics()
85 .compute_null_count()
86 .ok_or_else(|| vortex_err!("Failed to compute null_count"))?;
87 let value_count = input.len() - null_count;
88 let estimated_distinct = if opts.count_distinct_values {
89 estimate_distinct_count(input)?
90 } else {
91 u32::MAX
92 };
93
94 Ok(Self {
95 src: input.clone(),
96 value_count: u32::try_from(value_count)?,
97 null_count: u32::try_from(null_count)?,
98 estimated_distinct_count: estimated_distinct,
99 })
100 }
101}
102
103impl CompressorStats for StringStats {
104 type ArrayVTable = VarBinView;
105
106 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
107 Self::generate_opts_fallible(input, opts)
108 .vortex_expect("StringStats::generate_opts should not fail")
109 }
110
111 fn source(&self) -> &VarBinViewArray {
112 &self.src
113 }
114
115 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
116 let sampled =
117 sample(&self.src.clone().into_array(), sample_size, sample_count).to_varbinview();
118
119 Self::generate_opts(&sampled, opts)
120 }
121}
122
123pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
125 &UncompressedScheme,
126 &DictScheme,
127 &FSSTScheme,
128 &ConstantScheme,
129 &NullDominated,
130 #[cfg(feature = "zstd")]
131 &ZstdScheme,
132 #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
133 &ZstdBuffersScheme,
134];
135
136#[derive(Clone, Copy)]
138pub struct StringCompressor<'a> {
139 pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
141}
142
143impl<'a> Compressor for StringCompressor<'a> {
144 type ArrayVTable = VarBinView;
145 type SchemeType = dyn StringScheme;
146 type StatsType = StringStats;
147
148 fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
149 if self
150 .btr_blocks_compressor
151 .string_schemes()
152 .iter()
153 .any(|s| s.code() == DictScheme.code())
154 {
155 StringStats::generate_opts(
156 array,
157 GenerateStatsOptions {
158 count_distinct_values: true,
159 },
160 )
161 } else {
162 StringStats::generate_opts(
163 array,
164 GenerateStatsOptions {
165 count_distinct_values: false,
166 },
167 )
168 }
169 }
170
171 fn schemes(&self) -> &[&'static dyn StringScheme] {
172 self.btr_blocks_compressor.string_schemes()
173 }
174
175 fn default_scheme(&self) -> &'static Self::SchemeType {
176 &UncompressedScheme
177 }
178}
179
180pub trait StringScheme:
181 Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
182{
183}
184
185impl<T> StringScheme for T where
186 T: Scheme<StatsType = StringStats, CodeType = StringCode> + Send + Sync
187{
188}
189
190impl PartialEq for dyn StringScheme {
191 fn eq(&self, other: &Self) -> bool {
192 self.code() == other.code()
193 }
194}
195
196impl Eq for dyn StringScheme {}
197
198impl Hash for dyn StringScheme {
199 fn hash<H: Hasher>(&self, state: &mut H) {
200 self.code().hash(state)
201 }
202}
203
204#[derive(Debug, Copy, Clone, PartialEq, Eq)]
205pub struct UncompressedScheme;
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq)]
208pub struct DictScheme;
209
210#[derive(Debug, Copy, Clone, PartialEq, Eq)]
211pub struct FSSTScheme;
212
213#[derive(Debug, Copy, Clone, PartialEq, Eq)]
214pub struct ConstantScheme;
215
216#[derive(Debug, Copy, Clone, PartialEq, Eq)]
217pub struct NullDominated;
218
219#[cfg(feature = "zstd")]
221#[derive(Debug, Copy, Clone, PartialEq, Eq)]
222pub struct ZstdScheme;
223
224#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
226#[derive(Debug, Copy, Clone, PartialEq, Eq)]
227pub struct ZstdBuffersScheme;
228
229#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
231pub enum StringCode {
232 Uncompressed,
234 Dict,
236 Fsst,
238 Constant,
240 Sparse,
242 Zstd,
244 ZstdBuffers,
246}
247
248impl Scheme for UncompressedScheme {
249 type StatsType = StringStats;
250 type CodeType = StringCode;
251
252 fn code(&self) -> StringCode {
253 StringCode::Uncompressed
254 }
255
256 fn expected_compression_ratio(
257 &self,
258 _compressor: &BtrBlocksCompressor,
259 _stats: &Self::StatsType,
260 _ctx: CompressorContext,
261 _excludes: &[StringCode],
262 ) -> VortexResult<f64> {
263 Ok(1.0)
264 }
265
266 fn compress(
267 &self,
268 _compressor: &BtrBlocksCompressor,
269 stats: &Self::StatsType,
270 _ctx: CompressorContext,
271 _excludes: &[StringCode],
272 ) -> VortexResult<ArrayRef> {
273 Ok(stats.source().clone().into_array())
274 }
275}
276
277impl Scheme for DictScheme {
278 type StatsType = StringStats;
279 type CodeType = StringCode;
280
281 fn code(&self) -> StringCode {
282 StringCode::Dict
283 }
284
285 fn expected_compression_ratio(
286 &self,
287 compressor: &BtrBlocksCompressor,
288 stats: &Self::StatsType,
289 ctx: CompressorContext,
290 excludes: &[StringCode],
291 ) -> VortexResult<f64> {
292 if stats.estimated_distinct_count > stats.value_count / 2 {
294 return Ok(0.0);
295 }
296
297 if stats.value_count == 0 {
299 return Ok(0.0);
300 }
301
302 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
303 }
304
305 fn compress(
306 &self,
307 compressor: &BtrBlocksCompressor,
308 stats: &Self::StatsType,
309 ctx: CompressorContext,
310 _excludes: &[StringCode],
311 ) -> VortexResult<ArrayRef> {
312 let dict = dict_encode(&stats.source().clone().into_array())?;
313
314 if ctx.allowed_cascading == 0 {
316 return Ok(dict.into_array());
317 }
318
319 let compressed_codes = compressor.compress_canonical(
321 Canonical::Primitive(dict.codes().to_primitive()),
322 ctx.descend(),
323 Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]),
324 )?;
325
326 let compressed_values = compressor.compress_canonical(
329 Canonical::VarBinView(dict.values().to_varbinview()),
330 ctx.descend(),
331 Excludes::from(&[DictScheme.code()]),
332 )?;
333
334 unsafe {
336 Ok(
337 DictArray::new_unchecked(compressed_codes, compressed_values)
338 .set_all_values_referenced(dict.has_all_values_referenced())
339 .into_array(),
340 )
341 }
342 }
343}
344
345impl Scheme for FSSTScheme {
346 type StatsType = StringStats;
347 type CodeType = StringCode;
348
349 fn code(&self) -> StringCode {
350 StringCode::Fsst
351 }
352
353 fn compress(
354 &self,
355 compressor: &BtrBlocksCompressor,
356 stats: &Self::StatsType,
357 ctx: CompressorContext,
358 _excludes: &[StringCode],
359 ) -> VortexResult<ArrayRef> {
360 let fsst = {
361 let compressor = fsst_train_compressor(&stats.src);
362 fsst_compress(&stats.src, &compressor)
363 };
364
365 let compressed_original_lengths = compressor.compress_canonical(
366 Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?),
367 ctx,
368 Excludes::none(),
369 )?;
370
371 let compressed_codes_offsets = compressor.compress_canonical(
372 Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?),
373 ctx,
374 Excludes::none(),
375 )?;
376 let compressed_codes = VarBinArray::try_new(
377 compressed_codes_offsets,
378 fsst.codes().bytes().clone(),
379 fsst.codes().dtype().clone(),
380 fsst.codes().validity().clone(),
381 )?;
382
383 let fsst = FSSTArray::try_new(
384 fsst.dtype().clone(),
385 fsst.symbols().clone(),
386 fsst.symbol_lengths().clone(),
387 compressed_codes,
388 compressed_original_lengths,
389 )?;
390
391 Ok(fsst.into_array())
392 }
393}
394
395impl Scheme for ConstantScheme {
396 type StatsType = StringStats;
397 type CodeType = StringCode;
398
399 fn code(&self) -> Self::CodeType {
400 StringCode::Constant
401 }
402
403 fn is_constant(&self) -> bool {
404 true
405 }
406
407 fn expected_compression_ratio(
408 &self,
409 _compressor: &BtrBlocksCompressor,
410 stats: &Self::StatsType,
411 ctx: CompressorContext,
412 _excludes: &[Self::CodeType],
413 ) -> VortexResult<f64> {
414 if ctx.is_sample {
415 return Ok(0.0);
416 }
417
418 let mut ctx = LEGACY_SESSION.create_execution_ctx();
419 if stats.estimated_distinct_count > 1
420 || !is_constant(&stats.src.clone().into_array(), &mut ctx)?
421 {
422 return Ok(0.0);
423 }
424
425 Ok(f64::MAX)
427 }
428
429 fn compress(
430 &self,
431 _compressor: &BtrBlocksCompressor,
432 stats: &Self::StatsType,
433 _ctx: CompressorContext,
434 _excludes: &[Self::CodeType],
435 ) -> VortexResult<ArrayRef> {
436 let scalar_idx =
437 (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
438
439 match scalar_idx {
440 Some(idx) => {
441 let scalar = stats.source().scalar_at(idx)?;
442 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
443 if !stats.source().all_valid()? {
444 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
445 } else {
446 Ok(const_arr)
447 }
448 }
449 None => Ok(ConstantArray::new(
450 Scalar::null(stats.src.dtype().clone()),
451 stats.src.len(),
452 )
453 .into_array()),
454 }
455 }
456}
457
458impl Scheme for NullDominated {
459 type StatsType = StringStats;
460 type CodeType = StringCode;
461
462 fn code(&self) -> Self::CodeType {
463 StringCode::Sparse
464 }
465
466 fn expected_compression_ratio(
467 &self,
468 _compressor: &BtrBlocksCompressor,
469 stats: &Self::StatsType,
470 ctx: CompressorContext,
471 _excludes: &[Self::CodeType],
472 ) -> VortexResult<f64> {
473 if ctx.allowed_cascading == 0 {
475 return Ok(0.0);
476 }
477
478 if stats.value_count == 0 {
479 return Ok(0.0);
481 }
482
483 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
485 return Ok(stats.src.len() as f64 / stats.value_count as f64);
486 }
487
488 Ok(0.0)
490 }
491
492 fn compress(
493 &self,
494 compressor: &BtrBlocksCompressor,
495 stats: &Self::StatsType,
496 ctx: CompressorContext,
497 _excludes: &[Self::CodeType],
498 ) -> VortexResult<ArrayRef> {
499 assert!(ctx.allowed_cascading > 0);
500
501 let sparse_encoded = SparseArray::encode(&stats.src.clone().into_array(), None)?;
503
504 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
505 let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict];
507
508 let indices = sparse.patches().indices().to_primitive().narrow()?;
509 let compressed_indices = compressor.compress_canonical(
510 Canonical::Primitive(indices),
511 ctx.descend(),
512 Excludes::int_only(&new_excludes),
513 )?;
514
515 SparseArray::try_new(
516 compressed_indices,
517 sparse.patches().values().clone(),
518 sparse.len(),
519 sparse.fill_scalar().clone(),
520 )
521 .map(|a| a.into_array())
522 } else {
523 Ok(sparse_encoded)
524 }
525 }
526}
527
528#[cfg(feature = "zstd")]
529impl Scheme for ZstdScheme {
530 type StatsType = StringStats;
531 type CodeType = StringCode;
532
533 fn code(&self) -> StringCode {
534 StringCode::Zstd
535 }
536
537 fn compress(
538 &self,
539 _compressor: &BtrBlocksCompressor,
540 stats: &Self::StatsType,
541 _ctx: CompressorContext,
542 _excludes: &[StringCode],
543 ) -> VortexResult<ArrayRef> {
544 let compacted = stats.source().compact_buffers()?;
545 Ok(
546 vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
547 .into_array(),
548 )
549 }
550}
551
552#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
553impl Scheme for ZstdBuffersScheme {
554 type StatsType = StringStats;
555 type CodeType = StringCode;
556
557 fn code(&self) -> StringCode {
558 StringCode::ZstdBuffers
559 }
560
561 fn compress(
562 &self,
563 _compressor: &BtrBlocksCompressor,
564 stats: &Self::StatsType,
565 _ctx: CompressorContext,
566 _excludes: &[StringCode],
567 ) -> VortexResult<ArrayRef> {
568 Ok(
569 vortex_zstd::ZstdBuffersArray::compress(&stats.source().clone().into_array(), 3)?
570 .into_array(),
571 )
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use vortex_array::IntoArray;
578 use vortex_array::arrays::VarBinViewArray;
579 use vortex_array::builders::ArrayBuilder;
580 use vortex_array::builders::VarBinViewBuilder;
581 use vortex_array::display::DisplayOptions;
582 use vortex_array::dtype::DType;
583 use vortex_array::dtype::Nullability;
584 use vortex_error::VortexResult;
585
586 use crate::BtrBlocksCompressor;
587
588 #[test]
589 fn test_strings() -> VortexResult<()> {
590 let mut strings = Vec::new();
591 for _ in 0..1024 {
592 strings.push(Some("hello-world-1234"));
593 }
594 for _ in 0..1024 {
595 strings.push(Some("hello-world-56789"));
596 }
597 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
598
599 let array_ref = strings.into_array();
600 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
601 assert_eq!(compressed.len(), 2048);
602
603 let display = compressed
604 .display_as(DisplayOptions::MetadataOnly)
605 .to_string()
606 .to_lowercase();
607 assert_eq!(display, "vortex.dict(utf8, len=2048)");
608
609 Ok(())
610 }
611
612 #[test]
613 fn test_sparse_nulls() -> VortexResult<()> {
614 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
615 strings.append_nulls(99);
616
617 strings.append_value("one little string");
618
619 let strings = strings.finish_into_varbinview();
620
621 let array_ref = strings.into_array();
622 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
623 assert_eq!(compressed.len(), 100);
624
625 let display = compressed
626 .display_as(DisplayOptions::MetadataOnly)
627 .to_string()
628 .to_lowercase();
629 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
630
631 Ok(())
632 }
633}
634
635#[cfg(test)]
637mod scheme_selection_tests {
638 use vortex_array::IntoArray;
639 use vortex_array::arrays::Constant;
640 use vortex_array::arrays::Dict;
641 use vortex_array::arrays::VarBinViewArray;
642 use vortex_array::dtype::DType;
643 use vortex_array::dtype::Nullability;
644 use vortex_error::VortexResult;
645 use vortex_fsst::FSST;
646
647 use crate::BtrBlocksCompressor;
648
649 #[test]
650 fn test_constant_compressed() -> VortexResult<()> {
651 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
652 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
653 let array_ref = array.into_array();
654 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
655 assert!(compressed.is::<Constant>());
656 Ok(())
657 }
658
659 #[test]
660 fn test_dict_compressed() -> VortexResult<()> {
661 let distinct_values = ["apple", "banana", "cherry"];
662 let mut strings = Vec::with_capacity(1000);
663 for i in 0..1000 {
664 strings.push(Some(distinct_values[i % 3]));
665 }
666 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
667 let array_ref = array.into_array();
668 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
669 assert!(compressed.is::<Dict>());
670 Ok(())
671 }
672
673 #[test]
674 fn test_fsst_compressed() -> VortexResult<()> {
675 let mut strings = Vec::with_capacity(1000);
676 for i in 0..1000 {
677 strings.push(Some(format!(
678 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
679 )));
680 }
681 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
682 let array_ref = array.into_array();
683 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
684 assert!(compressed.is::<FSST>());
685 Ok(())
686 }
687}