1use vortex_array::arrays::{VarBinArray, VarBinViewArray, VarBinViewVTable};
5use vortex_array::vtable::ValidityHelper;
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dict::builders::dict_encode;
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
11use vortex_utils::aliases::hash_set::HashSet;
12
13use crate::integer::IntCompressor;
14use crate::sample::sample;
15use crate::{
16 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
17 estimate_compression_ratio_with_sampling,
18};
19
20#[derive(Clone, Debug)]
21pub struct StringStats {
22 src: VarBinViewArray,
23 estimated_distinct_count: u32,
24 value_count: u32,
25 }
27
28#[allow(clippy::cast_possible_truncation)]
30fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
31 let views = strings.views();
32 let mut distinct = HashSet::with_capacity(views.len() / 2);
36 views.iter().for_each(|&view| {
37 let len_and_prefix = view.as_u128() as u64;
38 distinct.insert(len_and_prefix);
39 });
40
41 distinct
42 .len()
43 .try_into()
44 .vortex_expect("distinct count must fit in u32")
45}
46
47impl CompressorStats for StringStats {
48 type ArrayVTable = VarBinViewVTable;
49
50 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
51 let null_count = input
52 .statistics()
53 .compute_null_count()
54 .vortex_expect("null count");
55 let value_count = input.len() - null_count;
56 let estimated_distinct = if opts.count_distinct_values {
57 estimate_distinct_count(input)
58 } else {
59 u32::MAX
60 };
61
62 Self {
63 src: input.clone(),
64 value_count: value_count.try_into().vortex_expect("value_count"),
65 estimated_distinct_count: estimated_distinct,
66 }
67 }
68
69 fn source(&self) -> &VarBinViewArray {
70 &self.src
71 }
72
73 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
74 let sampled = sample(self.src.as_ref(), sample_size, sample_count)
75 .to_varbinview()
76 .vortex_expect("varbinview");
77
78 Self::generate_opts(&sampled, opts)
79 }
80}
81
82pub struct StringCompressor;
83
84impl Compressor for StringCompressor {
85 type ArrayVTable = VarBinViewVTable;
86 type SchemeType = dyn StringScheme;
87 type StatsType = StringStats;
88
89 fn schemes() -> &'static [&'static Self::SchemeType] {
90 &[&UncompressedScheme, &DictScheme, &FSSTScheme]
91 }
92
93 fn default_scheme() -> &'static Self::SchemeType {
94 &UncompressedScheme
95 }
96
97 fn dict_scheme_code() -> StringCode {
98 DICT_SCHEME
99 }
100}
101
102pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
103
104impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
105
106#[derive(Debug, Copy, Clone)]
107pub struct UncompressedScheme;
108
109#[derive(Debug, Copy, Clone)]
110pub struct DictScheme;
111
112#[derive(Debug, Copy, Clone)]
113pub struct FSSTScheme;
114
115#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
116pub struct StringCode(u8);
117
118const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
119const DICT_SCHEME: StringCode = StringCode(1);
120const FSST_SCHEME: StringCode = StringCode(2);
121
122impl Scheme for UncompressedScheme {
123 type StatsType = StringStats;
124 type CodeType = StringCode;
125
126 fn code(&self) -> StringCode {
127 UNCOMPRESSED_SCHEME
128 }
129
130 fn expected_compression_ratio(
131 &self,
132 _stats: &Self::StatsType,
133 _is_sample: bool,
134 _allowed_cascading: usize,
135 _excludes: &[StringCode],
136 ) -> VortexResult<f64> {
137 Ok(1.0)
138 }
139
140 fn compress(
141 &self,
142 stats: &Self::StatsType,
143 _is_sample: bool,
144 _allowed_cascading: usize,
145 _excludes: &[StringCode],
146 ) -> VortexResult<ArrayRef> {
147 Ok(stats.source().to_array())
148 }
149}
150
151impl Scheme for DictScheme {
152 type StatsType = StringStats;
153 type CodeType = StringCode;
154
155 fn code(&self) -> StringCode {
156 DICT_SCHEME
157 }
158
159 fn expected_compression_ratio(
160 &self,
161 stats: &Self::StatsType,
162 is_sample: bool,
163 allowed_cascading: usize,
164 excludes: &[StringCode],
165 ) -> VortexResult<f64> {
166 if stats.estimated_distinct_count > stats.value_count / 2 {
168 return Ok(0.0);
169 }
170
171 if stats.value_count == 0 {
173 return Ok(0.0);
174 }
175
176 estimate_compression_ratio_with_sampling(
177 self,
178 stats,
179 is_sample,
180 allowed_cascading,
181 excludes,
182 )
183 }
184
185 fn compress(
186 &self,
187 stats: &Self::StatsType,
188 is_sample: bool,
189 allowed_cascading: usize,
190 _excludes: &[StringCode],
191 ) -> VortexResult<ArrayRef> {
192 let dict = dict_encode(&stats.source().clone().into_array())?;
193
194 if allowed_cascading == 0 {
196 return Ok(dict.into_array());
197 }
198
199 let compressed_codes = IntCompressor::compress(
201 &dict.codes().to_primitive()?,
202 is_sample,
203 allowed_cascading - 1,
204 &[crate::integer::DictScheme.code()],
205 )?;
206
207 let compressed_values = StringCompressor::compress(
210 &dict.values().to_varbinview()?,
211 is_sample,
212 allowed_cascading - 1,
213 &[DictScheme.code()],
214 )?;
215
216 Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
217 }
218}
219
220impl Scheme for FSSTScheme {
221 type StatsType = StringStats;
222 type CodeType = StringCode;
223
224 fn code(&self) -> StringCode {
225 FSST_SCHEME
226 }
227
228 fn compress(
229 &self,
230 stats: &Self::StatsType,
231 is_sample: bool,
232 allowed_cascading: usize,
233 _excludes: &[StringCode],
234 ) -> VortexResult<ArrayRef> {
235 let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
236 let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
237
238 let compressed_original_lengths = IntCompressor::compress(
239 &fsst.uncompressed_lengths().to_primitive()?,
240 is_sample,
241 allowed_cascading,
242 &[],
243 )?;
244
245 let compressed_codes_offsets = IntCompressor::compress(
247 &fsst.codes().offsets().to_primitive()?,
248 is_sample,
249 allowed_cascading,
250 &[],
251 )?;
252 let compressed_codes = VarBinArray::try_new(
253 compressed_codes_offsets,
254 fsst.codes().bytes().clone(),
255 fsst.codes().dtype().clone(),
256 fsst.codes().validity().clone(),
257 )?;
258
259 let fsst = FSSTArray::try_new(
260 fsst.dtype().clone(),
261 fsst.symbols().clone(),
262 fsst.symbol_lengths().clone(),
263 compressed_codes,
264 compressed_original_lengths,
265 )?;
266
267 Ok(fsst.into_array())
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use vortex_array::arrays::VarBinViewArray;
274 use vortex_dtype::{DType, Nullability};
275
276 use crate::Compressor;
277 use crate::string::StringCompressor;
278
279 #[test]
280 fn test_strings() {
281 let mut strings = Vec::new();
282 for _ in 0..1024 {
283 strings.push(Some("hello-world-1234"));
284 }
285 for _ in 0..1024 {
286 strings.push(Some("hello-world-56789"));
287 }
288 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
289
290 println!("original array: {}", strings.as_ref().display_tree());
291
292 let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
293
294 println!("compression tree: {}", compressed.display_tree());
295 }
296}