Skip to main content

vortex_btrblocks/schemes/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! String compression schemes.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::VarBinArray;
12use vortex_array::arrays::primitive::PrimitiveArrayExt;
13use vortex_array::arrays::varbin::VarBinArrayExt;
14use vortex_compressor::estimate::CompressionEstimate;
15use vortex_compressor::estimate::DeferredEstimate;
16use vortex_compressor::estimate::EstimateVerdict;
17use vortex_compressor::scheme::ChildSelection;
18use vortex_compressor::scheme::DescendantExclusion;
19use vortex_error::VortexResult;
20use vortex_fsst::FSST;
21use vortex_fsst::FSSTArrayExt;
22use vortex_fsst::fsst_compress;
23use vortex_fsst::fsst_train_compressor;
24use vortex_sparse::Sparse;
25use vortex_sparse::SparseExt as _;
26
27use super::integer::IntDictScheme;
28use super::integer::SparseScheme as IntSparseScheme;
29use crate::ArrayAndStats;
30use crate::CascadingCompressor;
31use crate::CompressorContext;
32use crate::Scheme;
33use crate::SchemeExt;
34
35/// FSST (Fast Static Symbol Table) compression.
36#[derive(Debug, Copy, Clone, PartialEq, Eq)]
37pub struct FSSTScheme;
38
39/// Sparse encoding for null-dominated arrays.
40///
41/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
43pub struct NullDominatedSparseScheme;
44
45/// Zstd compression without dictionaries (nvCOMP compatible).
46#[cfg(feature = "zstd")]
47#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct ZstdScheme;
49
50/// Zstd buffer-level compression preserving array layout for GPU decompression.
51#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53pub struct ZstdBuffersScheme;
54
55// Re-export builtin schemes from vortex-compressor.
56pub use vortex_compressor::builtins::StringConstantScheme;
57pub use vortex_compressor::builtins::StringDictScheme;
58pub use vortex_compressor::builtins::is_utf8_string;
59pub use vortex_compressor::stats::StringStats;
60
61impl Scheme for FSSTScheme {
62    fn scheme_name(&self) -> &'static str {
63        "vortex.string.fsst"
64    }
65
66    fn matches(&self, canonical: &Canonical) -> bool {
67        is_utf8_string(canonical)
68    }
69
70    /// Children: lengths=0, code_offsets=1.
71    fn num_children(&self) -> usize {
72        2
73    }
74
75    fn expected_compression_ratio(
76        &self,
77        _data: &ArrayAndStats,
78        _compress_ctx: CompressorContext,
79        _exec_ctx: &mut ExecutionCtx,
80    ) -> CompressionEstimate {
81        CompressionEstimate::Deferred(DeferredEstimate::Sample)
82    }
83
84    fn compress(
85        &self,
86        compressor: &CascadingCompressor,
87        data: &ArrayAndStats,
88        compress_ctx: CompressorContext,
89        exec_ctx: &mut ExecutionCtx,
90    ) -> VortexResult<ArrayRef> {
91        let utf8 = data.array_as_utf8().into_owned();
92        let compressor_fsst = fsst_train_compressor(&utf8);
93        let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx);
94
95        let uncompressed_lengths_primitive = fsst
96            .uncompressed_lengths()
97            .clone()
98            .execute::<PrimitiveArray>(exec_ctx)?
99            .narrow()?;
100        let compressed_original_lengths = compressor.compress_child(
101            &uncompressed_lengths_primitive.into_array(),
102            &compress_ctx,
103            self.id(),
104            0,
105            exec_ctx,
106        )?;
107
108        let codes_offsets_primitive = fsst
109            .codes()
110            .offsets()
111            .clone()
112            .execute::<PrimitiveArray>(exec_ctx)?
113            .narrow()?;
114        let compressed_codes_offsets = compressor.compress_child(
115            &codes_offsets_primitive.into_array(),
116            &compress_ctx,
117            self.id(),
118            1,
119            exec_ctx,
120        )?;
121        let compressed_codes = VarBinArray::try_new(
122            compressed_codes_offsets,
123            fsst.codes().bytes().clone(),
124            fsst.codes().dtype().clone(),
125            fsst.codes().validity()?,
126        )?;
127
128        let fsst = FSST::try_new(
129            fsst.dtype().clone(),
130            fsst.symbols().clone(),
131            fsst.symbol_lengths().clone(),
132            compressed_codes,
133            compressed_original_lengths,
134            exec_ctx,
135        )?;
136
137        Ok(fsst.into_array())
138    }
139}
140
141impl Scheme for NullDominatedSparseScheme {
142    fn scheme_name(&self) -> &'static str {
143        "vortex.string.sparse"
144    }
145
146    fn matches(&self, canonical: &Canonical) -> bool {
147        is_utf8_string(canonical)
148    }
149
150    /// Children: indices=0.
151    fn num_children(&self) -> usize {
152        1
153    }
154
155    /// The indices of a null-dominated sparse array should not be sparse-encoded again.
156    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
157        vec![
158            DescendantExclusion {
159                excluded: IntSparseScheme.id(),
160                children: ChildSelection::All,
161            },
162            DescendantExclusion {
163                excluded: IntDictScheme.id(),
164                children: ChildSelection::All,
165            },
166        ]
167    }
168
169    fn expected_compression_ratio(
170        &self,
171        data: &ArrayAndStats,
172        _compress_ctx: CompressorContext,
173        exec_ctx: &mut ExecutionCtx,
174    ) -> CompressionEstimate {
175        let len = data.array_len() as f64;
176        let stats = data.string_stats(exec_ctx);
177        let value_count = stats.value_count();
178
179        // All-null arrays should be compressed as constant instead anyways.
180        if value_count == 0 {
181            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
182        }
183
184        // If the majority (90%) of values is null, this will compress well.
185        if stats.null_count() as f64 / len > 0.9 {
186            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
187        }
188
189        // Otherwise we don't go this route.
190        CompressionEstimate::Verdict(EstimateVerdict::Skip)
191    }
192
193    fn compress(
194        &self,
195        compressor: &CascadingCompressor,
196        data: &ArrayAndStats,
197        compress_ctx: CompressorContext,
198        exec_ctx: &mut ExecutionCtx,
199    ) -> VortexResult<ArrayRef> {
200        // We pass None as we only run this pathway for NULL-dominated string arrays.
201        let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
202
203        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
204            // Compress the indices only (not the values for strings).
205            let indices = sparse
206                .patches()
207                .indices()
208                .clone()
209                .execute::<PrimitiveArray>(exec_ctx)?
210                .narrow()?;
211            let compressed_indices = compressor.compress_child(
212                &indices.into_array(),
213                &compress_ctx,
214                self.id(),
215                0,
216                exec_ctx,
217            )?;
218
219            Sparse::try_new(
220                compressed_indices,
221                sparse.patches().values().clone(),
222                sparse.len(),
223                sparse.fill_scalar().clone(),
224            )
225            .map(|a| a.into_array())
226        } else {
227            Ok(sparse_encoded)
228        }
229    }
230}
231
232#[cfg(feature = "zstd")]
233impl Scheme for ZstdScheme {
234    fn scheme_name(&self) -> &'static str {
235        "vortex.string.zstd"
236    }
237
238    fn matches(&self, canonical: &Canonical) -> bool {
239        is_utf8_string(canonical)
240    }
241
242    fn expected_compression_ratio(
243        &self,
244        _data: &ArrayAndStats,
245        _compress_ctx: CompressorContext,
246        _exec_ctx: &mut ExecutionCtx,
247    ) -> CompressionEstimate {
248        CompressionEstimate::Deferred(DeferredEstimate::Sample)
249    }
250
251    fn compress(
252        &self,
253        _compressor: &CascadingCompressor,
254        data: &ArrayAndStats,
255        _compress_ctx: CompressorContext,
256        exec_ctx: &mut ExecutionCtx,
257    ) -> VortexResult<ArrayRef> {
258        let compacted = data.array_as_utf8().into_owned().compact_buffers()?;
259        Ok(
260            vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)?
261                .into_array(),
262        )
263    }
264}
265
266#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
267impl Scheme for ZstdBuffersScheme {
268    fn scheme_name(&self) -> &'static str {
269        "vortex.string.zstd_buffers"
270    }
271
272    fn matches(&self, canonical: &Canonical) -> bool {
273        is_utf8_string(canonical)
274    }
275
276    fn expected_compression_ratio(
277        &self,
278        _data: &ArrayAndStats,
279        _compress_ctx: CompressorContext,
280        _exec_ctx: &mut ExecutionCtx,
281    ) -> CompressionEstimate {
282        CompressionEstimate::Deferred(DeferredEstimate::Sample)
283    }
284
285    fn compress(
286        &self,
287        _compressor: &CascadingCompressor,
288        data: &ArrayAndStats,
289        _compress_ctx: CompressorContext,
290        exec_ctx: &mut ExecutionCtx,
291    ) -> VortexResult<ArrayRef> {
292        Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array())
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use std::sync::LazyLock;
299
300    use vortex_array::IntoArray;
301    use vortex_array::VortexSessionExecute;
302    use vortex_array::arrays::VarBinViewArray;
303    use vortex_array::builders::ArrayBuilder;
304    use vortex_array::builders::VarBinViewBuilder;
305    use vortex_array::display::DisplayOptions;
306    use vortex_array::dtype::DType;
307    use vortex_array::dtype::Nullability;
308    use vortex_array::session::ArraySession;
309    use vortex_error::VortexResult;
310    use vortex_session::VortexSession;
311
312    use crate::BtrBlocksCompressor;
313
314    static SESSION: LazyLock<VortexSession> =
315        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
316
317    #[test]
318    fn test_strings() -> VortexResult<()> {
319        let mut strings = Vec::new();
320        for _ in 0..1024 {
321            strings.push(Some("hello-world-1234"));
322        }
323        for _ in 0..1024 {
324            strings.push(Some("hello-world-56789"));
325        }
326        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
327
328        let array_ref = strings.into_array();
329        let btr = BtrBlocksCompressor::default();
330        let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
331        assert_eq!(compressed.len(), 2048);
332
333        let display = compressed
334            .display_as(DisplayOptions::MetadataOnly)
335            .to_string()
336            .to_lowercase();
337        assert_eq!(display, "vortex.dict(utf8, len=2048)");
338
339        Ok(())
340    }
341
342    #[test]
343    fn test_sparse_nulls() -> VortexResult<()> {
344        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
345        strings.append_nulls(99);
346
347        strings.append_value("one little string");
348
349        let strings = strings.finish_into_varbinview();
350
351        let array_ref = strings.into_array();
352        let btr = BtrBlocksCompressor::default();
353        let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
354        assert_eq!(compressed.len(), 100);
355
356        let display = compressed
357            .display_as(DisplayOptions::MetadataOnly)
358            .to_string()
359            .to_lowercase();
360        assert_eq!(display, "vortex.sparse(utf8?, len=100)");
361
362        Ok(())
363    }
364}
365
366/// Tests to verify that each string compression scheme produces the expected encoding.
367#[cfg(test)]
368mod scheme_selection_tests {
369    use std::sync::LazyLock;
370
371    use vortex_array::IntoArray;
372    use vortex_array::VortexSessionExecute;
373    use vortex_array::arrays::Constant;
374    use vortex_array::arrays::Dict;
375    use vortex_array::arrays::VarBinViewArray;
376    use vortex_array::dtype::DType;
377    use vortex_array::dtype::Nullability;
378    use vortex_array::session::ArraySession;
379    use vortex_error::VortexResult;
380    use vortex_fsst::FSST;
381    use vortex_session::VortexSession;
382
383    use crate::BtrBlocksCompressor;
384
385    static SESSION: LazyLock<VortexSession> =
386        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
387
388    #[test]
389    fn test_constant_compressed() -> VortexResult<()> {
390        let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
391        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
392        let array_ref = array.into_array();
393        let compressed = BtrBlocksCompressor::default()
394            .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
395        assert!(compressed.is::<Constant>());
396        Ok(())
397    }
398
399    #[test]
400    fn test_dict_compressed() -> VortexResult<()> {
401        let distinct_values = ["apple", "banana", "cherry"];
402        let mut strings = Vec::with_capacity(1000);
403        for i in 0..1000 {
404            strings.push(Some(distinct_values[i % 3]));
405        }
406        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
407        let array_ref = array.into_array();
408        let compressed = BtrBlocksCompressor::default()
409            .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
410        assert!(compressed.is::<Dict>());
411        Ok(())
412    }
413
414    #[test]
415    fn test_fsst_compressed() -> VortexResult<()> {
416        let mut strings = Vec::with_capacity(1000);
417        for i in 0..1000 {
418            strings.push(Some(format!(
419                "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
420            )));
421        }
422        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
423        let array_ref = array.into_array();
424        let compressed = BtrBlocksCompressor::default()
425            .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
426        assert!(compressed.is::<FSST>());
427        Ok(())
428    }
429}