vortex_btrblocks/schemes/
string.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::ToCanonical;
10use vortex_array::arrays::VarBinArray;
11use vortex_array::arrays::primitive::PrimitiveArrayExt;
12use vortex_array::arrays::varbin::VarBinArrayExt;
13use vortex_compressor::estimate::CompressionEstimate;
14use vortex_compressor::estimate::DeferredEstimate;
15use vortex_compressor::estimate::EstimateVerdict;
16use vortex_compressor::scheme::ChildSelection;
17use vortex_compressor::scheme::DescendantExclusion;
18use vortex_error::VortexResult;
19use vortex_fsst::FSST;
20use vortex_fsst::FSSTArrayExt;
21use vortex_fsst::fsst_compress;
22use vortex_fsst::fsst_train_compressor;
23use vortex_sparse::Sparse;
24
25use super::integer::IntDictScheme;
26use super::integer::SparseScheme as IntSparseScheme;
27use crate::ArrayAndStats;
28use crate::CascadingCompressor;
29use crate::CompressorContext;
30use crate::Scheme;
31use crate::SchemeExt;
32
33#[derive(Debug, Copy, Clone, PartialEq, Eq)]
35pub struct FSSTScheme;
36
37#[derive(Debug, Copy, Clone, PartialEq, Eq)]
41pub struct NullDominatedSparseScheme;
42
43#[cfg(feature = "zstd")]
45#[derive(Debug, Copy, Clone, PartialEq, Eq)]
46pub struct ZstdScheme;
47
48#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
50#[derive(Debug, Copy, Clone, PartialEq, Eq)]
51pub struct ZstdBuffersScheme;
52
53pub use vortex_compressor::builtins::StringConstantScheme;
55pub use vortex_compressor::builtins::StringDictScheme;
56pub use vortex_compressor::builtins::is_utf8_string;
57pub use vortex_compressor::stats::StringStats;
58
59impl Scheme for FSSTScheme {
60 fn scheme_name(&self) -> &'static str {
61 "vortex.string.fsst"
62 }
63
64 fn matches(&self, canonical: &Canonical) -> bool {
65 is_utf8_string(canonical)
66 }
67
68 fn num_children(&self) -> usize {
70 2
71 }
72
73 fn expected_compression_ratio(
74 &self,
75 _data: &mut ArrayAndStats,
76 _ctx: CompressorContext,
77 ) -> CompressionEstimate {
78 CompressionEstimate::Deferred(DeferredEstimate::Sample)
79 }
80
81 fn compress(
82 &self,
83 compressor: &CascadingCompressor,
84 data: &mut ArrayAndStats,
85 ctx: CompressorContext,
86 ) -> VortexResult<ArrayRef> {
87 let utf8 = data.array_as_utf8().into_owned();
88 let compressor_fsst = fsst_train_compressor(&utf8);
89 let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst);
90
91 let compressed_original_lengths = compressor.compress_child(
92 &fsst
93 .uncompressed_lengths()
94 .to_primitive()
95 .narrow()?
96 .into_array(),
97 &ctx,
98 self.id(),
99 0,
100 )?;
101
102 let compressed_codes_offsets = compressor.compress_child(
103 &fsst.codes().offsets().to_primitive().narrow()?.into_array(),
104 &ctx,
105 self.id(),
106 1,
107 )?;
108 let compressed_codes = VarBinArray::try_new(
109 compressed_codes_offsets,
110 fsst.codes().bytes().clone(),
111 fsst.codes().dtype().clone(),
112 fsst.codes().validity()?,
113 )?;
114
115 let fsst = FSST::try_new(
116 fsst.dtype().clone(),
117 fsst.symbols().clone(),
118 fsst.symbol_lengths().clone(),
119 compressed_codes,
120 compressed_original_lengths,
121 )?;
122
123 Ok(fsst.into_array())
124 }
125}
126
127impl Scheme for NullDominatedSparseScheme {
128 fn scheme_name(&self) -> &'static str {
129 "vortex.string.sparse"
130 }
131
132 fn matches(&self, canonical: &Canonical) -> bool {
133 is_utf8_string(canonical)
134 }
135
136 fn num_children(&self) -> usize {
138 1
139 }
140
141 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
143 vec![
144 DescendantExclusion {
145 excluded: IntSparseScheme.id(),
146 children: ChildSelection::All,
147 },
148 DescendantExclusion {
149 excluded: IntDictScheme.id(),
150 children: ChildSelection::All,
151 },
152 ]
153 }
154
155 fn expected_compression_ratio(
156 &self,
157 data: &mut ArrayAndStats,
158 _ctx: CompressorContext,
159 ) -> CompressionEstimate {
160 let len = data.array_len() as f64;
161 let stats = data.string_stats();
162 let value_count = stats.value_count();
163
164 if value_count == 0 {
166 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
167 }
168
169 if stats.null_count() as f64 / len > 0.9 {
171 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
172 }
173
174 CompressionEstimate::Verdict(EstimateVerdict::Skip)
176 }
177
178 fn compress(
179 &self,
180 compressor: &CascadingCompressor,
181 data: &mut ArrayAndStats,
182 ctx: CompressorContext,
183 ) -> VortexResult<ArrayRef> {
184 let sparse_encoded = Sparse::encode(data.array(), None)?;
186
187 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
188 let indices = sparse.patches().indices().to_primitive().narrow()?;
190 let compressed_indices =
191 compressor.compress_child(&indices.into_array(), &ctx, self.id(), 0)?;
192
193 Sparse::try_new(
194 compressed_indices,
195 sparse.patches().values().clone(),
196 sparse.len(),
197 sparse.fill_scalar().clone(),
198 )
199 .map(|a| a.into_array())
200 } else {
201 Ok(sparse_encoded)
202 }
203 }
204}
205
206#[cfg(feature = "zstd")]
207impl Scheme for ZstdScheme {
208 fn scheme_name(&self) -> &'static str {
209 "vortex.string.zstd"
210 }
211
212 fn matches(&self, canonical: &Canonical) -> bool {
213 is_utf8_string(canonical)
214 }
215
216 fn expected_compression_ratio(
217 &self,
218 _data: &mut ArrayAndStats,
219 _ctx: CompressorContext,
220 ) -> CompressionEstimate {
221 CompressionEstimate::Deferred(DeferredEstimate::Sample)
222 }
223
224 fn compress(
225 &self,
226 _compressor: &CascadingCompressor,
227 data: &mut ArrayAndStats,
228 _ctx: CompressorContext,
229 ) -> VortexResult<ArrayRef> {
230 let compacted = data.array_as_utf8().into_owned().compact_buffers()?;
231 Ok(vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192)?.into_array())
232 }
233}
234
235#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
236impl Scheme for ZstdBuffersScheme {
237 fn scheme_name(&self) -> &'static str {
238 "vortex.string.zstd_buffers"
239 }
240
241 fn matches(&self, canonical: &Canonical) -> bool {
242 is_utf8_string(canonical)
243 }
244
245 fn expected_compression_ratio(
246 &self,
247 _data: &mut ArrayAndStats,
248 _ctx: CompressorContext,
249 ) -> CompressionEstimate {
250 CompressionEstimate::Deferred(DeferredEstimate::Sample)
251 }
252
253 fn compress(
254 &self,
255 _compressor: &CascadingCompressor,
256 data: &mut ArrayAndStats,
257 _ctx: CompressorContext,
258 ) -> VortexResult<ArrayRef> {
259 Ok(
260 vortex_zstd::ZstdBuffers::compress(data.array(), 3, &vortex_array::LEGACY_SESSION)?
261 .into_array(),
262 )
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use vortex_array::IntoArray;
269 use vortex_array::arrays::VarBinViewArray;
270 use vortex_array::builders::ArrayBuilder;
271 use vortex_array::builders::VarBinViewBuilder;
272 use vortex_array::display::DisplayOptions;
273 use vortex_array::dtype::DType;
274 use vortex_array::dtype::Nullability;
275 use vortex_error::VortexResult;
276
277 use crate::BtrBlocksCompressor;
278
279 #[test]
280 fn test_strings() -> VortexResult<()> {
281 let mut strings = Vec::new();
282 for _ in 0..1024 {
283 strings.push(Some("hello-world-1234"));
284 }
285 for _ in 0..1024 {
286 strings.push(Some("hello-world-56789"));
287 }
288 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
289
290 let array_ref = strings.into_array();
291 let btr = BtrBlocksCompressor::default();
292 let compressed = btr.compress(&array_ref)?;
293 assert_eq!(compressed.len(), 2048);
294
295 let display = compressed
296 .display_as(DisplayOptions::MetadataOnly)
297 .to_string()
298 .to_lowercase();
299 assert_eq!(display, "vortex.dict(utf8, len=2048)");
300
301 Ok(())
302 }
303
304 #[test]
305 fn test_sparse_nulls() -> VortexResult<()> {
306 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
307 strings.append_nulls(99);
308
309 strings.append_value("one little string");
310
311 let strings = strings.finish_into_varbinview();
312
313 let array_ref = strings.into_array();
314 let btr = BtrBlocksCompressor::default();
315 let compressed = btr.compress(&array_ref)?;
316 assert_eq!(compressed.len(), 100);
317
318 let display = compressed
319 .display_as(DisplayOptions::MetadataOnly)
320 .to_string()
321 .to_lowercase();
322 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
323
324 Ok(())
325 }
326}
327
328#[cfg(test)]
330mod scheme_selection_tests {
331 use vortex_array::IntoArray;
332 use vortex_array::arrays::Constant;
333 use vortex_array::arrays::Dict;
334 use vortex_array::arrays::VarBinViewArray;
335 use vortex_array::dtype::DType;
336 use vortex_array::dtype::Nullability;
337 use vortex_error::VortexResult;
338 use vortex_fsst::FSST;
339
340 use crate::BtrBlocksCompressor;
341
342 #[test]
343 fn test_constant_compressed() -> VortexResult<()> {
344 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
345 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
346 let array_ref = array.into_array();
347 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
348 assert!(compressed.is::<Constant>());
349 Ok(())
350 }
351
352 #[test]
353 fn test_dict_compressed() -> VortexResult<()> {
354 let distinct_values = ["apple", "banana", "cherry"];
355 let mut strings = Vec::with_capacity(1000);
356 for i in 0..1000 {
357 strings.push(Some(distinct_values[i % 3]));
358 }
359 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
360 let array_ref = array.into_array();
361 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
362 assert!(compressed.is::<Dict>());
363 Ok(())
364 }
365
366 #[test]
367 fn test_fsst_compressed() -> VortexResult<()> {
368 let mut strings = Vec::with_capacity(1000);
369 for i in 0..1000 {
370 strings.push(Some(format!(
371 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
372 )));
373 }
374 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
375 let array_ref = array.into_array();
376 let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
377 assert!(compressed.is::<FSST>());
378 Ok(())
379 }
380}