1use vortex_array::aliases::hash_set::HashSet;
2use vortex_array::arrays::{VarBinArray, VarBinViewArray};
3use vortex_array::{Array, ArrayRef, ToCanonical};
4use vortex_dict::DictArray;
5use vortex_dict::builders::dict_encode;
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
8
9use crate::integer::IntCompressor;
10use crate::sample::sample;
11use crate::{
12 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
13 estimate_compression_ratio_with_sampling,
14};
15
16#[derive(Clone, Debug)]
17pub struct StringStats {
18 src: VarBinViewArray,
19 estimated_distinct_count: u32,
20 value_count: u32,
21 }
23
24#[allow(clippy::cast_possible_truncation)]
26fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
27 let views = strings.views();
28 let mut distinct = HashSet::with_capacity(views.len() / 2);
32 views.iter().for_each(|&view| {
33 let len_and_prefix = view.as_u128() as u64;
34 distinct.insert(len_and_prefix);
35 });
36
37 distinct
38 .len()
39 .try_into()
40 .vortex_expect("distinct count must fit in u32")
41}
42
43impl CompressorStats for StringStats {
44 type ArrayType = VarBinViewArray;
45
46 fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self {
47 let null_count = input
48 .statistics()
49 .compute_null_count()
50 .vortex_expect("null count");
51 let value_count = input.len() - null_count;
52 let estimated_distinct = if opts.count_distinct_values {
53 estimate_distinct_count(input)
54 } else {
55 u32::MAX
56 };
57
58 Self {
59 src: input.clone(),
60 value_count: value_count.try_into().vortex_expect("value_count"),
61 estimated_distinct_count: estimated_distinct,
63 }
64 }
65
66 fn source(&self) -> &Self::ArrayType {
67 &self.src
68 }
69
70 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
71 let sampled = sample(self.src.clone(), sample_size, sample_count)
72 .to_varbinview()
73 .vortex_expect("varbinview");
74
75 Self::generate_opts(&sampled, opts)
76 }
77}
78
79pub struct StringCompressor;
80
81impl Compressor for StringCompressor {
82 type ArrayType = VarBinViewArray;
83 type SchemeType = dyn StringScheme;
84 type StatsType = StringStats;
85
86 fn schemes() -> &'static [&'static Self::SchemeType] {
87 &[&UncompressedScheme, &DictScheme, &FSSTScheme]
88 }
89
90 fn default_scheme() -> &'static Self::SchemeType {
91 &UncompressedScheme
92 }
93
94 fn dict_scheme_code() -> StringCode {
95 DICT_SCHEME
96 }
97}
98
99pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
100
101impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
102
103#[derive(Debug, Copy, Clone)]
104pub struct UncompressedScheme;
105
106#[derive(Debug, Copy, Clone)]
107pub struct DictScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct FSSTScheme;
111
112#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
113pub struct StringCode(u8);
114
115const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
116const DICT_SCHEME: StringCode = StringCode(1);
117const FSST_SCHEME: StringCode = StringCode(2);
118
119impl Scheme for UncompressedScheme {
120 type StatsType = StringStats;
121 type CodeType = StringCode;
122
123 fn code(&self) -> StringCode {
124 UNCOMPRESSED_SCHEME
125 }
126
127 fn expected_compression_ratio(
128 &self,
129 _stats: &Self::StatsType,
130 _is_sample: bool,
131 _allowed_cascading: usize,
132 _excludes: &[StringCode],
133 ) -> VortexResult<f64> {
134 Ok(1.0)
135 }
136
137 fn compress(
138 &self,
139 stats: &Self::StatsType,
140 _is_sample: bool,
141 _allowed_cascading: usize,
142 _excludes: &[StringCode],
143 ) -> VortexResult<ArrayRef> {
144 Ok(stats.source().clone().into_array())
145 }
146}
147
148impl Scheme for DictScheme {
149 type StatsType = StringStats;
150 type CodeType = StringCode;
151
152 fn code(&self) -> StringCode {
153 DICT_SCHEME
154 }
155
156 fn expected_compression_ratio(
157 &self,
158 stats: &Self::StatsType,
159 is_sample: bool,
160 allowed_cascading: usize,
161 excludes: &[StringCode],
162 ) -> VortexResult<f64> {
163 if stats.estimated_distinct_count > stats.value_count / 2 {
165 return Ok(0.0);
166 }
167
168 if stats.value_count == 0 {
170 return Ok(0.0);
171 }
172
173 estimate_compression_ratio_with_sampling(
174 self,
175 stats,
176 is_sample,
177 allowed_cascading,
178 excludes,
179 )
180 }
181
182 fn compress(
183 &self,
184 stats: &Self::StatsType,
185 is_sample: bool,
186 allowed_cascading: usize,
187 _excludes: &[StringCode],
188 ) -> VortexResult<ArrayRef> {
189 let dict = dict_encode(&stats.source().clone().into_array())?;
190
191 if allowed_cascading == 0 {
193 return Ok(dict.into_array());
194 }
195
196 let compressed_codes = IntCompressor::compress(
198 &dict.codes().to_primitive()?,
199 is_sample,
200 allowed_cascading - 1,
201 &[crate::integer::DictScheme.code()],
202 )?;
203
204 let compressed_values = StringCompressor::compress(
207 &dict.values().to_varbinview()?,
208 is_sample,
209 allowed_cascading - 1,
210 &[DictScheme.code()],
211 )?;
212
213 Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
214 }
215}
216
217impl Scheme for FSSTScheme {
218 type StatsType = StringStats;
219 type CodeType = StringCode;
220
221 fn code(&self) -> StringCode {
222 FSST_SCHEME
223 }
224
225 fn compress(
226 &self,
227 stats: &Self::StatsType,
228 is_sample: bool,
229 allowed_cascading: usize,
230 _excludes: &[StringCode],
231 ) -> VortexResult<ArrayRef> {
232 let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
233 let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
234
235 let compressed_original_lengths = IntCompressor::compress(
236 &fsst.uncompressed_lengths().to_primitive()?,
237 is_sample,
238 allowed_cascading,
239 &[],
240 )?;
241
242 let compressed_codes_offsets = IntCompressor::compress(
244 &fsst.codes().offsets().to_primitive()?,
245 is_sample,
246 allowed_cascading,
247 &[],
248 )?;
249 let compressed_codes = VarBinArray::try_new(
250 compressed_codes_offsets,
251 fsst.codes().bytes().clone(),
252 fsst.codes().dtype().clone(),
253 fsst.codes().validity().clone(),
254 )?;
255
256 let fsst = FSSTArray::try_new(
257 fsst.dtype().clone(),
258 fsst.symbols().clone(),
259 fsst.symbol_lengths().clone(),
260 compressed_codes,
261 compressed_original_lengths,
262 )?;
263
264 Ok(fsst.into_array())
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use vortex_array::Array;
271 use vortex_array::arrays::VarBinViewArray;
272 use vortex_dtype::{DType, Nullability};
273
274 use crate::Compressor;
275 use crate::string::StringCompressor;
276
277 #[test]
278 fn test_strings() {
279 let mut strings = Vec::new();
280 for _ in 0..1024 {
281 strings.push(Some("hello-world-1234"));
282 }
283 for _ in 0..1024 {
284 strings.push(Some("hello-world-56789"));
285 }
286 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
287
288 println!(
289 "original array: {}",
290 (&strings as &dyn Array).tree_display()
291 );
292
293 let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
294
295 println!("compression tree: {}", compressed.tree_display());
296 }
297}