vortex_btrblocks/schemes/
string.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::VarBinArray;
12use vortex_array::arrays::primitive::PrimitiveArrayExt;
13use vortex_array::arrays::varbin::VarBinArrayExt;
14use vortex_compressor::estimate::CompressionEstimate;
15use vortex_compressor::estimate::DeferredEstimate;
16use vortex_compressor::estimate::EstimateVerdict;
17use vortex_compressor::scheme::ChildSelection;
18use vortex_compressor::scheme::DescendantExclusion;
19use vortex_error::VortexResult;
20use vortex_fsst::FSST;
21use vortex_fsst::FSSTArrayExt;
22use vortex_fsst::fsst_compress;
23use vortex_fsst::fsst_train_compressor;
24use vortex_sparse::Sparse;
25use vortex_sparse::SparseExt as _;
26
27use super::integer::IntDictScheme;
28use super::integer::SparseScheme as IntSparseScheme;
29use crate::ArrayAndStats;
30use crate::CascadingCompressor;
31use crate::CompressorContext;
32use crate::Scheme;
33use crate::SchemeExt;
34
35#[derive(Debug, Copy, Clone, PartialEq, Eq)]
37pub struct FSSTScheme;
38
39#[derive(Debug, Copy, Clone, PartialEq, Eq)]
43pub struct NullDominatedSparseScheme;
44
45#[cfg(feature = "zstd")]
47#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct ZstdScheme;
49
50#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53pub struct ZstdBuffersScheme;
54
55pub use vortex_compressor::builtins::StringConstantScheme;
57pub use vortex_compressor::builtins::StringDictScheme;
58pub use vortex_compressor::builtins::is_utf8_string;
59pub use vortex_compressor::stats::StringStats;
60
61impl Scheme for FSSTScheme {
62 fn scheme_name(&self) -> &'static str {
63 "vortex.string.fsst"
64 }
65
66 fn matches(&self, canonical: &Canonical) -> bool {
67 is_utf8_string(canonical)
68 }
69
70 fn num_children(&self) -> usize {
72 2
73 }
74
75 fn expected_compression_ratio(
76 &self,
77 _data: &ArrayAndStats,
78 _compress_ctx: CompressorContext,
79 _exec_ctx: &mut ExecutionCtx,
80 ) -> CompressionEstimate {
81 CompressionEstimate::Deferred(DeferredEstimate::Sample)
82 }
83
84 fn compress(
85 &self,
86 compressor: &CascadingCompressor,
87 data: &ArrayAndStats,
88 compress_ctx: CompressorContext,
89 exec_ctx: &mut ExecutionCtx,
90 ) -> VortexResult<ArrayRef> {
91 let utf8 = data.array_as_utf8().into_owned();
92 let compressor_fsst = fsst_train_compressor(&utf8);
93 let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx);
94
95 let uncompressed_lengths_primitive = fsst
96 .uncompressed_lengths()
97 .clone()
98 .execute::<PrimitiveArray>(exec_ctx)?
99 .narrow()?;
100 let compressed_original_lengths = compressor.compress_child(
101 &uncompressed_lengths_primitive.into_array(),
102 &compress_ctx,
103 self.id(),
104 0,
105 exec_ctx,
106 )?;
107
108 let codes_offsets_primitive = fsst
109 .codes()
110 .offsets()
111 .clone()
112 .execute::<PrimitiveArray>(exec_ctx)?
113 .narrow()?;
114 let compressed_codes_offsets = compressor.compress_child(
115 &codes_offsets_primitive.into_array(),
116 &compress_ctx,
117 self.id(),
118 1,
119 exec_ctx,
120 )?;
121 let compressed_codes = VarBinArray::try_new(
122 compressed_codes_offsets,
123 fsst.codes().bytes().clone(),
124 fsst.codes().dtype().clone(),
125 fsst.codes().validity()?,
126 )?;
127
128 let fsst = FSST::try_new(
129 fsst.dtype().clone(),
130 fsst.symbols().clone(),
131 fsst.symbol_lengths().clone(),
132 compressed_codes,
133 compressed_original_lengths,
134 exec_ctx,
135 )?;
136
137 Ok(fsst.into_array())
138 }
139}
140
141impl Scheme for NullDominatedSparseScheme {
142 fn scheme_name(&self) -> &'static str {
143 "vortex.string.sparse"
144 }
145
146 fn matches(&self, canonical: &Canonical) -> bool {
147 is_utf8_string(canonical)
148 }
149
150 fn num_children(&self) -> usize {
152 1
153 }
154
155 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
157 vec![
158 DescendantExclusion {
159 excluded: IntSparseScheme.id(),
160 children: ChildSelection::All,
161 },
162 DescendantExclusion {
163 excluded: IntDictScheme.id(),
164 children: ChildSelection::All,
165 },
166 ]
167 }
168
169 fn expected_compression_ratio(
170 &self,
171 data: &ArrayAndStats,
172 _compress_ctx: CompressorContext,
173 exec_ctx: &mut ExecutionCtx,
174 ) -> CompressionEstimate {
175 let len = data.array_len() as f64;
176 let stats = data.string_stats(exec_ctx);
177 let value_count = stats.value_count();
178
179 if value_count == 0 {
181 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
182 }
183
184 if stats.null_count() as f64 / len > 0.9 {
186 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
187 }
188
189 CompressionEstimate::Verdict(EstimateVerdict::Skip)
191 }
192
193 fn compress(
194 &self,
195 compressor: &CascadingCompressor,
196 data: &ArrayAndStats,
197 compress_ctx: CompressorContext,
198 exec_ctx: &mut ExecutionCtx,
199 ) -> VortexResult<ArrayRef> {
200 let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
202
203 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
204 let indices = sparse
206 .patches()
207 .indices()
208 .clone()
209 .execute::<PrimitiveArray>(exec_ctx)?
210 .narrow()?;
211 let compressed_indices = compressor.compress_child(
212 &indices.into_array(),
213 &compress_ctx,
214 self.id(),
215 0,
216 exec_ctx,
217 )?;
218
219 Sparse::try_new(
220 compressed_indices,
221 sparse.patches().values().clone(),
222 sparse.len(),
223 sparse.fill_scalar().clone(),
224 )
225 .map(|a| a.into_array())
226 } else {
227 Ok(sparse_encoded)
228 }
229 }
230}
231
232#[cfg(feature = "zstd")]
233impl Scheme for ZstdScheme {
234 fn scheme_name(&self) -> &'static str {
235 "vortex.string.zstd"
236 }
237
238 fn matches(&self, canonical: &Canonical) -> bool {
239 is_utf8_string(canonical)
240 }
241
242 fn expected_compression_ratio(
243 &self,
244 _data: &ArrayAndStats,
245 _compress_ctx: CompressorContext,
246 _exec_ctx: &mut ExecutionCtx,
247 ) -> CompressionEstimate {
248 CompressionEstimate::Deferred(DeferredEstimate::Sample)
249 }
250
251 fn compress(
252 &self,
253 _compressor: &CascadingCompressor,
254 data: &ArrayAndStats,
255 _compress_ctx: CompressorContext,
256 exec_ctx: &mut ExecutionCtx,
257 ) -> VortexResult<ArrayRef> {
258 let compacted = data.array_as_utf8().into_owned().compact_buffers()?;
259 Ok(
260 vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)?
261 .into_array(),
262 )
263 }
264}
265
266#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
267impl Scheme for ZstdBuffersScheme {
268 fn scheme_name(&self) -> &'static str {
269 "vortex.string.zstd_buffers"
270 }
271
272 fn matches(&self, canonical: &Canonical) -> bool {
273 is_utf8_string(canonical)
274 }
275
276 fn expected_compression_ratio(
277 &self,
278 _data: &ArrayAndStats,
279 _compress_ctx: CompressorContext,
280 _exec_ctx: &mut ExecutionCtx,
281 ) -> CompressionEstimate {
282 CompressionEstimate::Deferred(DeferredEstimate::Sample)
283 }
284
285 fn compress(
286 &self,
287 _compressor: &CascadingCompressor,
288 data: &ArrayAndStats,
289 _compress_ctx: CompressorContext,
290 exec_ctx: &mut ExecutionCtx,
291 ) -> VortexResult<ArrayRef> {
292 Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array())
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use std::sync::LazyLock;
299
300 use vortex_array::IntoArray;
301 use vortex_array::VortexSessionExecute;
302 use vortex_array::arrays::VarBinViewArray;
303 use vortex_array::builders::ArrayBuilder;
304 use vortex_array::builders::VarBinViewBuilder;
305 use vortex_array::display::DisplayOptions;
306 use vortex_array::dtype::DType;
307 use vortex_array::dtype::Nullability;
308 use vortex_array::session::ArraySession;
309 use vortex_error::VortexResult;
310 use vortex_session::VortexSession;
311
312 use crate::BtrBlocksCompressor;
313
314 static SESSION: LazyLock<VortexSession> =
315 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
316
317 #[test]
318 fn test_strings() -> VortexResult<()> {
319 let mut strings = Vec::new();
320 for _ in 0..1024 {
321 strings.push(Some("hello-world-1234"));
322 }
323 for _ in 0..1024 {
324 strings.push(Some("hello-world-56789"));
325 }
326 let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
327
328 let array_ref = strings.into_array();
329 let btr = BtrBlocksCompressor::default();
330 let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
331 assert_eq!(compressed.len(), 2048);
332
333 let display = compressed
334 .display_as(DisplayOptions::MetadataOnly)
335 .to_string()
336 .to_lowercase();
337 assert_eq!(display, "vortex.dict(utf8, len=2048)");
338
339 Ok(())
340 }
341
342 #[test]
343 fn test_sparse_nulls() -> VortexResult<()> {
344 let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
345 strings.append_nulls(99);
346
347 strings.append_value("one little string");
348
349 let strings = strings.finish_into_varbinview();
350
351 let array_ref = strings.into_array();
352 let btr = BtrBlocksCompressor::default();
353 let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
354 assert_eq!(compressed.len(), 100);
355
356 let display = compressed
357 .display_as(DisplayOptions::MetadataOnly)
358 .to_string()
359 .to_lowercase();
360 assert_eq!(display, "vortex.sparse(utf8?, len=100)");
361
362 Ok(())
363 }
364}
365
366#[cfg(test)]
368mod scheme_selection_tests {
369 use std::sync::LazyLock;
370
371 use vortex_array::IntoArray;
372 use vortex_array::VortexSessionExecute;
373 use vortex_array::arrays::Constant;
374 use vortex_array::arrays::Dict;
375 use vortex_array::arrays::VarBinViewArray;
376 use vortex_array::dtype::DType;
377 use vortex_array::dtype::Nullability;
378 use vortex_array::session::ArraySession;
379 use vortex_error::VortexResult;
380 use vortex_fsst::FSST;
381 use vortex_session::VortexSession;
382
383 use crate::BtrBlocksCompressor;
384
385 static SESSION: LazyLock<VortexSession> =
386 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
387
388 #[test]
389 fn test_constant_compressed() -> VortexResult<()> {
390 let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
391 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
392 let array_ref = array.into_array();
393 let compressed = BtrBlocksCompressor::default()
394 .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
395 assert!(compressed.is::<Constant>());
396 Ok(())
397 }
398
399 #[test]
400 fn test_dict_compressed() -> VortexResult<()> {
401 let distinct_values = ["apple", "banana", "cherry"];
402 let mut strings = Vec::with_capacity(1000);
403 for i in 0..1000 {
404 strings.push(Some(distinct_values[i % 3]));
405 }
406 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
407 let array_ref = array.into_array();
408 let compressed = BtrBlocksCompressor::default()
409 .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
410 assert!(compressed.is::<Dict>());
411 Ok(())
412 }
413
414 #[test]
415 fn test_fsst_compressed() -> VortexResult<()> {
416 let mut strings = Vec::with_capacity(1000);
417 for i in 0..1000 {
418 strings.push(Some(format!(
419 "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
420 )));
421 }
422 let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
423 let array_ref = array.into_array();
424 let compressed = BtrBlocksCompressor::default()
425 .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
426 assert!(compressed.is::<FSST>());
427 Ok(())
428 }
429}