vortex_btrblocks/schemes/
string.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::ToCanonical;
10use vortex_array::arrays::VarBinArray;
11use vortex_array::arrays::primitive::PrimitiveArrayExt;
12use vortex_array::arrays::varbin::VarBinArrayExt;
13use vortex_compressor::estimate::CompressionEstimate;
14use vortex_compressor::scheme::ChildSelection;
15use vortex_compressor::scheme::DescendantExclusion;
16use vortex_error::VortexResult;
17use vortex_fsst::FSST;
18use vortex_fsst::FSSTArrayExt;
19use vortex_fsst::fsst_compress;
20use vortex_fsst::fsst_train_compressor;
21use vortex_sparse::Sparse;
22
23use super::integer::IntDictScheme;
24use super::integer::SparseScheme as IntSparseScheme;
25use crate::ArrayAndStats;
26use crate::CascadingCompressor;
27use crate::CompressorContext;
28use crate::Scheme;
29use crate::SchemeExt;
30
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
33pub struct FSSTScheme;
34
35#[derive(Debug, Copy, Clone, PartialEq, Eq)]
39pub struct NullDominatedSparseScheme;
40
41#[cfg(feature = "zstd")]
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub struct ZstdScheme;
45
46#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
48#[derive(Debug, Copy, Clone, PartialEq, Eq)]
49pub struct ZstdBuffersScheme;
50
51pub use vortex_compressor::builtins::StringConstantScheme;
53pub use vortex_compressor::builtins::StringDictScheme;
54pub use vortex_compressor::builtins::is_utf8_string;
55pub use vortex_compressor::stats::StringStats;
56
57impl Scheme for FSSTScheme {
58 fn scheme_name(&self) -> &'static str {
59 "vortex.string.fsst"
60 }
61
62 fn matches(&self, canonical: &Canonical) -> bool {
63 is_utf8_string(canonical)
64 }
65
66 fn num_children(&self) -> usize {
68 2
69 }
70
71 fn expected_compression_ratio(
72 &self,
73 _data: &mut ArrayAndStats,
74 _ctx: CompressorContext,
75 ) -> CompressionEstimate {
76 CompressionEstimate::Sample
77 }
78
79 fn compress(
80 &self,
81 compressor: &CascadingCompressor,
82 data: &mut ArrayAndStats,
83 ctx: CompressorContext,
84 ) -> VortexResult<ArrayRef> {
85 let utf8 = data.array_as_utf8();
86 let compressor_fsst = fsst_train_compressor(&utf8);
87 let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst);
88
89 let compressed_original_lengths = compressor.compress_child(
90 &fsst
91 .uncompressed_lengths()
92 .to_primitive()
93 .narrow()?
94 .into_array(),
95 &ctx,
96 self.id(),
97 0,
98 )?;
99
100 let compressed_codes_offsets = compressor.compress_child(
101 &fsst.codes().offsets().to_primitive().narrow()?.into_array(),
102 &ctx,
103 self.id(),
104 1,
105 )?;
106 let compressed_codes = VarBinArray::try_new(
107 compressed_codes_offsets,
108 fsst.codes().bytes().clone(),
109 fsst.codes().dtype().clone(),
110 fsst.codes().validity()?,
111 )?;
112
113 let fsst = FSST::try_new(
114 fsst.dtype().clone(),
115 fsst.symbols().clone(),
116 fsst.symbol_lengths().clone(),
117 compressed_codes,
118 compressed_original_lengths,
119 )?;
120
121 Ok(fsst.into_array())
122 }
123}
124
125impl Scheme for NullDominatedSparseScheme {
126 fn scheme_name(&self) -> &'static str {
127 "vortex.string.sparse"
128 }
129
130 fn matches(&self, canonical: &Canonical) -> bool {
131 is_utf8_string(canonical)
132 }
133
134 fn num_children(&self) -> usize {
136 1
137 }
138
139 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
141 vec![
142 DescendantExclusion {
143 excluded: IntSparseScheme.id(),
144 children: ChildSelection::All,
145 },
146 DescendantExclusion {
147 excluded: IntDictScheme.id(),
148 children: ChildSelection::All,
149 },
150 ]
151 }
152
153 fn expected_compression_ratio(
154 &self,
155 data: &mut ArrayAndStats,
156 _ctx: CompressorContext,
157 ) -> CompressionEstimate {
158 let len = data.array_len() as f64;
159 let stats = data.string_stats();
160 let value_count = stats.value_count();
161
162 if value_count == 0 {
164 return CompressionEstimate::Skip;
165 }
166
167 if stats.null_count() as f64 / len > 0.9 {
169 return CompressionEstimate::Ratio(len / value_count as f64);
170 }
171
172 CompressionEstimate::Skip
174 }
175
176 fn compress(
177 &self,
178 compressor: &CascadingCompressor,
179 data: &mut ArrayAndStats,
180 ctx: CompressorContext,
181 ) -> VortexResult<ArrayRef> {
182 let sparse_encoded = Sparse::encode(data.array(), None)?;
184
185 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
186 let indices = sparse.patches().indices().to_primitive().narrow()?;
188 let compressed_indices =
189 compressor.compress_child(&indices.into_array(), &ctx, self.id(), 0)?;
190
191 Sparse::try_new(
192 compressed_indices,
193 sparse.patches().values().clone(),
194 sparse.len(),
195 sparse.fill_scalar().clone(),
196 )
197 .map(|a| a.into_array())
198 } else {
199 Ok(sparse_encoded)
200 }
201 }
202}
203
204#[cfg(feature = "zstd")]
205impl Scheme for ZstdScheme {
206 fn scheme_name(&self) -> &'static str {
207 "vortex.string.zstd"
208 }
209
210 fn matches(&self, canonical: &Canonical) -> bool {
211 is_utf8_string(canonical)
212 }
213
214 fn expected_compression_ratio(
215 &self,
216 _data: &mut ArrayAndStats,
217 _ctx: CompressorContext,
218 ) -> CompressionEstimate {
219 CompressionEstimate::Sample
220 }
221
222 fn compress(
223 &self,
224 _compressor: &CascadingCompressor,
225 data: &mut ArrayAndStats,
226 _ctx: CompressorContext,
227 ) -> VortexResult<ArrayRef> {
228 let compacted = data.array_as_utf8().compact_buffers()?;
229 Ok(vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192)?.into_array())
230 }
231}
232
233#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
234impl Scheme for ZstdBuffersScheme {
235 fn scheme_name(&self) -> &'static str {
236 "vortex.string.zstd_buffers"
237 }
238
239 fn matches(&self, canonical: &Canonical) -> bool {
240 is_utf8_string(canonical)
241 }
242
243 fn expected_compression_ratio(
244 &self,
245 _data: &mut ArrayAndStats,
246 _ctx: CompressorContext,
247 ) -> CompressionEstimate {
248 CompressionEstimate::Sample
249 }
250
251 fn compress(
252 &self,
253 _compressor: &CascadingCompressor,
254 data: &mut ArrayAndStats,
255 _ctx: CompressorContext,
256 ) -> VortexResult<ArrayRef> {
257 Ok(
258 vortex_zstd::ZstdBuffers::compress(data.array(), 3, &vortex_array::LEGACY_SESSION)?
259 .into_array(),
260 )
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use vortex_array::IntoArray;
267 use vortex_array::arrays::VarBinViewArray;
268 use vortex_array::builders::ArrayBuilder;
269 use vortex_array::builders::VarBinViewBuilder;
270 use vortex_array::display::DisplayOptions;
271 use vortex_array::dtype::DType;
272 use vortex_array::dtype::Nullability;
273 use vortex_error::VortexResult;
274
275 use crate::BtrBlocksCompressor;
276
277 #[test]
278 fn test_strings() -> VortexResult<()> {
279 let mut strings = Vec::new();
280 for _ in 0..1024 {
281 strings.push(Some("hello-world-1234"));
282 }
283 for _ in 0..1024 {
284 strings.push(Some("hello-world-56789"));
285 }
286 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
287
288 let array_ref = strings.into_array();
289 let btr = BtrBlocksCompressor::default();
290 let compressed = btr.compress(&array_ref)?;
291 assert_eq!(compressed.len(), 2048);
292
293 let display = compressed
294 .display_as(DisplayOptions::MetadataOnly)
295 .to_string()
296 .to_lowercase();
297 assert_eq!(display, "vortex.dict(utf8, len=2048)");
298
299 Ok(())
300 }
301
302 #[test]
303 fn test_sparse_nulls() -> VortexResult<()> {
304 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
305 strings.append_nulls(99);
306
307 strings.append_value("one little string");
308
309 let strings = strings.finish_into_varbinview();
310
311 let array_ref = strings.into_array();
312 let btr = BtrBlocksCompressor::default();
313 let compressed = btr.compress(&array_ref)?;
314 assert_eq!(compressed.len(), 100);
315
316 let display = compressed
317 .display_as(DisplayOptions::MetadataOnly)
318 .to_string()
319 .to_lowercase();
320 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
321
322 Ok(())
323 }
324}
325
326#[cfg(test)]
328mod scheme_selection_tests {
329 use vortex_array::IntoArray;
330 use vortex_array::arrays::Constant;
331 use vortex_array::arrays::Dict;
332 use vortex_array::arrays::VarBinViewArray;
333 use vortex_array::dtype::DType;
334 use vortex_array::dtype::Nullability;
335 use vortex_error::VortexResult;
336 use vortex_fsst::FSST;
337
338 use crate::BtrBlocksCompressor;
339
340 #[test]
341 fn test_constant_compressed() -> VortexResult<()> {
342 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
343 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
344 let array_ref = array.into_array();
345 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
346 assert!(compressed.is::<Constant>());
347 Ok(())
348 }
349
350 #[test]
351 fn test_dict_compressed() -> VortexResult<()> {
352 let distinct_values = ["apple", "banana", "cherry"];
353 let mut strings = Vec::with_capacity(1000);
354 for i in 0..1000 {
355 strings.push(Some(distinct_values[i % 3]));
356 }
357 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
358 let array_ref = array.into_array();
359 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
360 assert!(compressed.is::<Dict>());
361 Ok(())
362 }
363
364 #[test]
365 fn test_fsst_compressed() -> VortexResult<()> {
366 let mut strings = Vec::with_capacity(1000);
367 for i in 0..1000 {
368 strings.push(Some(format!(
369 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
370 )));
371 }
372 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
373 let array_ref = array.into_array();
374 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
375 assert!(compressed.is::<FSST>());
376 Ok(())
377 }
378}