1use vortex_array::arrays::{VarBinArray, VarBinViewArray, VarBinViewVTable};
5use vortex_array::vtable::ValidityHelper;
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dict::builders::dict_encode;
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
11use vortex_utils::aliases::hash_set::HashSet;
12
13use crate::integer::IntCompressor;
14use crate::sample::sample;
15use crate::{
16 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
17 estimate_compression_ratio_with_sampling, integer,
18};
19
20#[derive(Clone, Debug)]
21pub struct StringStats {
22 src: VarBinViewArray,
23 estimated_distinct_count: u32,
24 value_count: u32,
25 }
27
28#[allow(clippy::cast_possible_truncation)]
30fn estimate_distinct_count(strings: &VarBinViewArray) -> u32 {
31 let views = strings.views();
32 let mut distinct = HashSet::with_capacity(views.len() / 2);
36 views.iter().for_each(|&view| {
37 let len_and_prefix = view.as_u128() as u64;
38 distinct.insert(len_and_prefix);
39 });
40
41 distinct
42 .len()
43 .try_into()
44 .vortex_expect("distinct count must fit in u32")
45}
46
47impl CompressorStats for StringStats {
48 type ArrayVTable = VarBinViewVTable;
49
50 fn generate_opts(input: &VarBinViewArray, opts: GenerateStatsOptions) -> Self {
51 let null_count = input
52 .statistics()
53 .compute_null_count()
54 .vortex_expect("null count");
55 let value_count = input.len() - null_count;
56 let estimated_distinct = if opts.count_distinct_values {
57 estimate_distinct_count(input)
58 } else {
59 u32::MAX
60 };
61
62 Self {
63 src: input.clone(),
64 value_count: value_count.try_into().vortex_expect("value_count"),
65 estimated_distinct_count: estimated_distinct,
66 }
67 }
68
69 fn source(&self) -> &VarBinViewArray {
70 &self.src
71 }
72
73 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self {
74 let sampled = sample(self.src.as_ref(), sample_size, sample_count).to_varbinview();
75
76 Self::generate_opts(&sampled, opts)
77 }
78}
79
80pub struct StringCompressor;
81
82impl Compressor for StringCompressor {
83 type ArrayVTable = VarBinViewVTable;
84 type SchemeType = dyn StringScheme;
85 type StatsType = StringStats;
86
87 fn schemes() -> &'static [&'static Self::SchemeType] {
88 &[&UncompressedScheme, &DictScheme, &FSSTScheme]
89 }
90
91 fn default_scheme() -> &'static Self::SchemeType {
92 &UncompressedScheme
93 }
94
95 fn dict_scheme_code() -> StringCode {
96 DICT_SCHEME
97 }
98}
99
100pub trait StringScheme: Scheme<StatsType = StringStats, CodeType = StringCode> {}
101
102impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = StringCode> {}
103
104#[derive(Debug, Copy, Clone)]
105pub struct UncompressedScheme;
106
107#[derive(Debug, Copy, Clone)]
108pub struct DictScheme;
109
110#[derive(Debug, Copy, Clone)]
111pub struct FSSTScheme;
112
113#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
114pub struct StringCode(u8);
115
116const UNCOMPRESSED_SCHEME: StringCode = StringCode(0);
117const DICT_SCHEME: StringCode = StringCode(1);
118const FSST_SCHEME: StringCode = StringCode(2);
119
120impl Scheme for UncompressedScheme {
121 type StatsType = StringStats;
122 type CodeType = StringCode;
123
124 fn code(&self) -> StringCode {
125 UNCOMPRESSED_SCHEME
126 }
127
128 fn expected_compression_ratio(
129 &self,
130 _stats: &Self::StatsType,
131 _is_sample: bool,
132 _allowed_cascading: usize,
133 _excludes: &[StringCode],
134 ) -> VortexResult<f64> {
135 Ok(1.0)
136 }
137
138 fn compress(
139 &self,
140 stats: &Self::StatsType,
141 _is_sample: bool,
142 _allowed_cascading: usize,
143 _excludes: &[StringCode],
144 ) -> VortexResult<ArrayRef> {
145 Ok(stats.source().to_array())
146 }
147}
148
149impl Scheme for DictScheme {
150 type StatsType = StringStats;
151 type CodeType = StringCode;
152
153 fn code(&self) -> StringCode {
154 DICT_SCHEME
155 }
156
157 fn expected_compression_ratio(
158 &self,
159 stats: &Self::StatsType,
160 is_sample: bool,
161 allowed_cascading: usize,
162 excludes: &[StringCode],
163 ) -> VortexResult<f64> {
164 if stats.estimated_distinct_count > stats.value_count / 2 {
166 return Ok(0.0);
167 }
168
169 if stats.value_count == 0 {
171 return Ok(0.0);
172 }
173
174 estimate_compression_ratio_with_sampling(
175 self,
176 stats,
177 is_sample,
178 allowed_cascading,
179 excludes,
180 )
181 }
182
183 fn compress(
184 &self,
185 stats: &Self::StatsType,
186 is_sample: bool,
187 allowed_cascading: usize,
188 _excludes: &[StringCode],
189 ) -> VortexResult<ArrayRef> {
190 let dict = dict_encode(&stats.source().clone().into_array())?;
191
192 if allowed_cascading == 0 {
194 return Ok(dict.into_array());
195 }
196
197 let compressed_codes = IntCompressor::compress(
199 &dict.codes().to_primitive(),
200 is_sample,
201 allowed_cascading - 1,
202 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
203 )?;
204
205 let compressed_values = StringCompressor::compress(
208 &dict.values().to_varbinview(),
209 is_sample,
210 allowed_cascading - 1,
211 &[DictScheme.code()],
212 )?;
213
214 unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
216 }
217}
218
219impl Scheme for FSSTScheme {
220 type StatsType = StringStats;
221 type CodeType = StringCode;
222
223 fn code(&self) -> StringCode {
224 FSST_SCHEME
225 }
226
227 fn compress(
228 &self,
229 stats: &Self::StatsType,
230 is_sample: bool,
231 allowed_cascading: usize,
232 _excludes: &[StringCode],
233 ) -> VortexResult<ArrayRef> {
234 let compressor = fsst_train_compressor(&stats.src.clone().into_array())?;
235 let fsst = fsst_compress(&stats.src.clone().into_array(), &compressor)?;
236
237 let compressed_original_lengths = IntCompressor::compress(
238 &fsst.uncompressed_lengths().to_primitive().downcast()?,
239 is_sample,
240 allowed_cascading,
241 &[],
242 )?;
243
244 let compressed_codes_offsets = IntCompressor::compress(
245 &fsst.codes().offsets().to_primitive().downcast()?,
246 is_sample,
247 allowed_cascading,
248 &[],
249 )?;
250 let compressed_codes = VarBinArray::try_new(
251 compressed_codes_offsets,
252 fsst.codes().bytes().clone(),
253 fsst.codes().dtype().clone(),
254 fsst.codes().validity().clone(),
255 )?;
256
257 let fsst = FSSTArray::try_new(
258 fsst.dtype().clone(),
259 fsst.symbols().clone(),
260 fsst.symbol_lengths().clone(),
261 compressed_codes,
262 compressed_original_lengths,
263 )?;
264
265 Ok(fsst.into_array())
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use vortex_array::arrays::VarBinViewArray;
272 use vortex_dtype::{DType, Nullability};
273
274 use crate::Compressor;
275 use crate::string::StringCompressor;
276
277 #[test]
278 fn test_strings() {
279 let mut strings = Vec::new();
280 for _ in 0..1024 {
281 strings.push(Some("hello-world-1234"));
282 }
283 for _ in 0..1024 {
284 strings.push(Some("hello-world-56789"));
285 }
286 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
287
288 println!("original array: {}", strings.as_ref().display_tree());
289
290 let compressed = StringCompressor::compress(&strings, false, 3, &[]).unwrap();
291
292 println!("compression tree: {}", compressed.display_tree());
293 }
294}