vortex_btrblocks/schemes/
string.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::VarBinArray;
12use vortex_array::arrays::primitive::PrimitiveArrayExt;
13use vortex_array::arrays::varbin::VarBinArrayExt;
14use vortex_compressor::estimate::CompressionEstimate;
15use vortex_compressor::estimate::DeferredEstimate;
16use vortex_compressor::estimate::EstimateVerdict;
17use vortex_compressor::scheme::ChildSelection;
18use vortex_compressor::scheme::DescendantExclusion;
19use vortex_error::VortexResult;
20use vortex_fsst::FSST;
21use vortex_fsst::FSSTArrayExt;
22use vortex_fsst::fsst_compress;
23use vortex_fsst::fsst_train_compressor;
24use vortex_sparse::Sparse;
25
26use super::integer::IntDictScheme;
27use super::integer::SparseScheme as IntSparseScheme;
28use crate::ArrayAndStats;
29use crate::CascadingCompressor;
30use crate::CompressorContext;
31use crate::Scheme;
32use crate::SchemeExt;
33
34#[derive(Debug, Copy, Clone, PartialEq, Eq)]
36pub struct FSSTScheme;
37
38#[derive(Debug, Copy, Clone, PartialEq, Eq)]
42pub struct NullDominatedSparseScheme;
43
44#[cfg(feature = "zstd")]
46#[derive(Debug, Copy, Clone, PartialEq, Eq)]
47pub struct ZstdScheme;
48
49#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
51#[derive(Debug, Copy, Clone, PartialEq, Eq)]
52pub struct ZstdBuffersScheme;
53
54pub use vortex_compressor::builtins::StringConstantScheme;
56pub use vortex_compressor::builtins::StringDictScheme;
57pub use vortex_compressor::builtins::is_utf8_string;
58pub use vortex_compressor::stats::StringStats;
59
60impl Scheme for FSSTScheme {
61 fn scheme_name(&self) -> &'static str {
62 "vortex.string.fsst"
63 }
64
65 fn matches(&self, canonical: &Canonical) -> bool {
66 is_utf8_string(canonical)
67 }
68
69 fn num_children(&self) -> usize {
71 2
72 }
73
74 fn expected_compression_ratio(
75 &self,
76 _data: &ArrayAndStats,
77 _compress_ctx: CompressorContext,
78 _exec_ctx: &mut ExecutionCtx,
79 ) -> CompressionEstimate {
80 CompressionEstimate::Deferred(DeferredEstimate::Sample)
81 }
82
83 fn compress(
84 &self,
85 compressor: &CascadingCompressor,
86 data: &ArrayAndStats,
87 compress_ctx: CompressorContext,
88 exec_ctx: &mut ExecutionCtx,
89 ) -> VortexResult<ArrayRef> {
90 let utf8 = data.array_as_utf8().into_owned();
91 let compressor_fsst = fsst_train_compressor(&utf8);
92 let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx);
93
94 let uncompressed_lengths_primitive = fsst
95 .uncompressed_lengths()
96 .clone()
97 .execute::<PrimitiveArray>(exec_ctx)?
98 .narrow()?;
99 let compressed_original_lengths = compressor.compress_child(
100 &uncompressed_lengths_primitive.into_array(),
101 &compress_ctx,
102 self.id(),
103 0,
104 exec_ctx,
105 )?;
106
107 let codes_offsets_primitive = fsst
108 .codes()
109 .offsets()
110 .clone()
111 .execute::<PrimitiveArray>(exec_ctx)?
112 .narrow()?;
113 let compressed_codes_offsets = compressor.compress_child(
114 &codes_offsets_primitive.into_array(),
115 &compress_ctx,
116 self.id(),
117 1,
118 exec_ctx,
119 )?;
120 let compressed_codes = VarBinArray::try_new(
121 compressed_codes_offsets,
122 fsst.codes().bytes().clone(),
123 fsst.codes().dtype().clone(),
124 fsst.codes().validity()?,
125 )?;
126
127 let fsst = FSST::try_new(
128 fsst.dtype().clone(),
129 fsst.symbols().clone(),
130 fsst.symbol_lengths().clone(),
131 compressed_codes,
132 compressed_original_lengths,
133 exec_ctx,
134 )?;
135
136 Ok(fsst.into_array())
137 }
138}
139
140impl Scheme for NullDominatedSparseScheme {
141 fn scheme_name(&self) -> &'static str {
142 "vortex.string.sparse"
143 }
144
145 fn matches(&self, canonical: &Canonical) -> bool {
146 is_utf8_string(canonical)
147 }
148
149 fn num_children(&self) -> usize {
151 1
152 }
153
154 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
156 vec![
157 DescendantExclusion {
158 excluded: IntSparseScheme.id(),
159 children: ChildSelection::All,
160 },
161 DescendantExclusion {
162 excluded: IntDictScheme.id(),
163 children: ChildSelection::All,
164 },
165 ]
166 }
167
168 fn expected_compression_ratio(
169 &self,
170 data: &ArrayAndStats,
171 _compress_ctx: CompressorContext,
172 exec_ctx: &mut ExecutionCtx,
173 ) -> CompressionEstimate {
174 let len = data.array_len() as f64;
175 let stats = data.string_stats(exec_ctx);
176 let value_count = stats.value_count();
177
178 if value_count == 0 {
180 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
181 }
182
183 if stats.null_count() as f64 / len > 0.9 {
185 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
186 }
187
188 CompressionEstimate::Verdict(EstimateVerdict::Skip)
190 }
191
192 fn compress(
193 &self,
194 compressor: &CascadingCompressor,
195 data: &ArrayAndStats,
196 compress_ctx: CompressorContext,
197 exec_ctx: &mut ExecutionCtx,
198 ) -> VortexResult<ArrayRef> {
199 let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
201
202 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
203 let indices = sparse
205 .patches()
206 .indices()
207 .clone()
208 .execute::<PrimitiveArray>(exec_ctx)?
209 .narrow()?;
210 let compressed_indices = compressor.compress_child(
211 &indices.into_array(),
212 &compress_ctx,
213 self.id(),
214 0,
215 exec_ctx,
216 )?;
217
218 Sparse::try_new(
219 compressed_indices,
220 sparse.patches().values().clone(),
221 sparse.len(),
222 sparse.fill_scalar().clone(),
223 )
224 .map(|a| a.into_array())
225 } else {
226 Ok(sparse_encoded)
227 }
228 }
229}
230
231#[cfg(feature = "zstd")]
232impl Scheme for ZstdScheme {
233 fn scheme_name(&self) -> &'static str {
234 "vortex.string.zstd"
235 }
236
237 fn matches(&self, canonical: &Canonical) -> bool {
238 is_utf8_string(canonical)
239 }
240
241 fn expected_compression_ratio(
242 &self,
243 _data: &ArrayAndStats,
244 _compress_ctx: CompressorContext,
245 _exec_ctx: &mut ExecutionCtx,
246 ) -> CompressionEstimate {
247 CompressionEstimate::Deferred(DeferredEstimate::Sample)
248 }
249
250 fn compress(
251 &self,
252 _compressor: &CascadingCompressor,
253 data: &ArrayAndStats,
254 _compress_ctx: CompressorContext,
255 exec_ctx: &mut ExecutionCtx,
256 ) -> VortexResult<ArrayRef> {
257 let compacted = data.array_as_utf8().into_owned().compact_buffers()?;
258 Ok(
259 vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)?
260 .into_array(),
261 )
262 }
263}
264
265#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
266impl Scheme for ZstdBuffersScheme {
267 fn scheme_name(&self) -> &'static str {
268 "vortex.string.zstd_buffers"
269 }
270
271 fn matches(&self, canonical: &Canonical) -> bool {
272 is_utf8_string(canonical)
273 }
274
275 fn expected_compression_ratio(
276 &self,
277 _data: &ArrayAndStats,
278 _compress_ctx: CompressorContext,
279 _exec_ctx: &mut ExecutionCtx,
280 ) -> CompressionEstimate {
281 CompressionEstimate::Deferred(DeferredEstimate::Sample)
282 }
283
284 fn compress(
285 &self,
286 _compressor: &CascadingCompressor,
287 data: &ArrayAndStats,
288 _compress_ctx: CompressorContext,
289 exec_ctx: &mut ExecutionCtx,
290 ) -> VortexResult<ArrayRef> {
291 Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array())
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use std::sync::LazyLock;
298
299 use vortex_array::IntoArray;
300 use vortex_array::VortexSessionExecute;
301 use vortex_array::arrays::VarBinViewArray;
302 use vortex_array::builders::ArrayBuilder;
303 use vortex_array::builders::VarBinViewBuilder;
304 use vortex_array::display::DisplayOptions;
305 use vortex_array::dtype::DType;
306 use vortex_array::dtype::Nullability;
307 use vortex_array::session::ArraySession;
308 use vortex_error::VortexResult;
309 use vortex_session::VortexSession;
310
311 use crate::BtrBlocksCompressor;
312
313 static SESSION: LazyLock<VortexSession> =
314 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
315
316 #[test]
317 fn test_strings() -> VortexResult<()> {
318 let mut strings = Vec::new();
319 for _ in 0..1024 {
320 strings.push(Some("hello-world-1234"));
321 }
322 for _ in 0..1024 {
323 strings.push(Some("hello-world-56789"));
324 }
325 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
326
327 let array_ref = strings.into_array();
328 let btr = BtrBlocksCompressor::default();
329 let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
330 assert_eq!(compressed.len(), 2048);
331
332 let display = compressed
333 .display_as(DisplayOptions::MetadataOnly)
334 .to_string()
335 .to_lowercase();
336 assert_eq!(display, "vortex.dict(utf8, len=2048)");
337
338 Ok(())
339 }
340
341 #[test]
342 fn test_sparse_nulls() -> VortexResult<()> {
343 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
344 strings.append_nulls(99);
345
346 strings.append_value("one little string");
347
348 let strings = strings.finish_into_varbinview();
349
350 let array_ref = strings.into_array();
351 let btr = BtrBlocksCompressor::default();
352 let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
353 assert_eq!(compressed.len(), 100);
354
355 let display = compressed
356 .display_as(DisplayOptions::MetadataOnly)
357 .to_string()
358 .to_lowercase();
359 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
360
361 Ok(())
362 }
363}
364
365#[cfg(test)]
367mod scheme_selection_tests {
368 use std::sync::LazyLock;
369
370 use vortex_array::IntoArray;
371 use vortex_array::VortexSessionExecute;
372 use vortex_array::arrays::Constant;
373 use vortex_array::arrays::Dict;
374 use vortex_array::arrays::VarBinViewArray;
375 use vortex_array::dtype::DType;
376 use vortex_array::dtype::Nullability;
377 use vortex_array::session::ArraySession;
378 use vortex_error::VortexResult;
379 use vortex_fsst::FSST;
380 use vortex_session::VortexSession;
381
382 use crate::BtrBlocksCompressor;
383
384 static SESSION: LazyLock<VortexSession> =
385 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
386
387 #[test]
388 fn test_constant_compressed() -> VortexResult<()> {
389 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
390 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
391 let array_ref = array.into_array();
392 let compressed = BtrBlocksCompressor::default()
393 .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
394 assert!(compressed.is::<Constant>());
395 Ok(())
396 }
397
398 #[test]
399 fn test_dict_compressed() -> VortexResult<()> {
400 let distinct_values = ["apple", "banana", "cherry"];
401 let mut strings = Vec::with_capacity(1000);
402 for i in 0..1000 {
403 strings.push(Some(distinct_values[i % 3]));
404 }
405 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
406 let array_ref = array.into_array();
407 let compressed = BtrBlocksCompressor::default()
408 .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
409 assert!(compressed.is::<Dict>());
410 Ok(())
411 }
412
413 #[test]
414 fn test_fsst_compressed() -> VortexResult<()> {
415 let mut strings = Vec::with_capacity(1000);
416 for i in 0..1000 {
417 strings.push(Some(format!(
418 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
419 )));
420 }
421 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
422 let array_ref = array.into_array();
423 let compressed = BtrBlocksCompressor::default()
424 .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
425 assert!(compressed.is::<FSST>());
426 Ok(())
427 }
428}