Skip to main content

vortex_btrblocks/schemes/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! String compression schemes.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::VarBinArray;
12use vortex_array::arrays::primitive::PrimitiveArrayExt;
13use vortex_array::arrays::varbin::VarBinArrayExt;
14use vortex_compressor::estimate::CompressionEstimate;
15use vortex_compressor::estimate::DeferredEstimate;
16use vortex_compressor::estimate::EstimateVerdict;
17use vortex_compressor::scheme::ChildSelection;
18use vortex_compressor::scheme::DescendantExclusion;
19use vortex_error::VortexResult;
20use vortex_fsst::FSST;
21use vortex_fsst::FSSTArrayExt;
22use vortex_fsst::fsst_compress;
23use vortex_fsst::fsst_train_compressor;
24use vortex_sparse::Sparse;
25
26use super::integer::IntDictScheme;
27use super::integer::SparseScheme as IntSparseScheme;
28use crate::ArrayAndStats;
29use crate::CascadingCompressor;
30use crate::CompressorContext;
31use crate::Scheme;
32use crate::SchemeExt;
33
34/// FSST (Fast Static Symbol Table) compression.
35#[derive(Debug, Copy, Clone, PartialEq, Eq)]
36pub struct FSSTScheme;
37
38/// Sparse encoding for null-dominated arrays.
39///
40/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
41#[derive(Debug, Copy, Clone, PartialEq, Eq)]
42pub struct NullDominatedSparseScheme;
43
44/// Zstd compression without dictionaries (nvCOMP compatible).
45#[cfg(feature = "zstd")]
46#[derive(Debug, Copy, Clone, PartialEq, Eq)]
47pub struct ZstdScheme;
48
49/// Zstd buffer-level compression preserving array layout for GPU decompression.
50#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
51#[derive(Debug, Copy, Clone, PartialEq, Eq)]
52pub struct ZstdBuffersScheme;
53
54// Re-export builtin schemes from vortex-compressor.
55pub use vortex_compressor::builtins::StringConstantScheme;
56pub use vortex_compressor::builtins::StringDictScheme;
57pub use vortex_compressor::builtins::is_utf8_string;
58pub use vortex_compressor::stats::StringStats;
59
60impl Scheme for FSSTScheme {
61    fn scheme_name(&self) -> &'static str {
62        "vortex.string.fsst"
63    }
64
65    fn matches(&self, canonical: &Canonical) -> bool {
66        is_utf8_string(canonical)
67    }
68
69    /// Children: lengths=0, code_offsets=1.
70    fn num_children(&self) -> usize {
71        2
72    }
73
74    fn expected_compression_ratio(
75        &self,
76        _data: &ArrayAndStats,
77        _compress_ctx: CompressorContext,
78        _exec_ctx: &mut ExecutionCtx,
79    ) -> CompressionEstimate {
80        CompressionEstimate::Deferred(DeferredEstimate::Sample)
81    }
82
83    fn compress(
84        &self,
85        compressor: &CascadingCompressor,
86        data: &ArrayAndStats,
87        compress_ctx: CompressorContext,
88        exec_ctx: &mut ExecutionCtx,
89    ) -> VortexResult<ArrayRef> {
90        let utf8 = data.array_as_utf8().into_owned();
91        let compressor_fsst = fsst_train_compressor(&utf8);
92        let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx);
93
94        let uncompressed_lengths_primitive = fsst
95            .uncompressed_lengths()
96            .clone()
97            .execute::<PrimitiveArray>(exec_ctx)?
98            .narrow()?;
99        let compressed_original_lengths = compressor.compress_child(
100            &uncompressed_lengths_primitive.into_array(),
101            &compress_ctx,
102            self.id(),
103            0,
104            exec_ctx,
105        )?;
106
107        let codes_offsets_primitive = fsst
108            .codes()
109            .offsets()
110            .clone()
111            .execute::<PrimitiveArray>(exec_ctx)?
112            .narrow()?;
113        let compressed_codes_offsets = compressor.compress_child(
114            &codes_offsets_primitive.into_array(),
115            &compress_ctx,
116            self.id(),
117            1,
118            exec_ctx,
119        )?;
120        let compressed_codes = VarBinArray::try_new(
121            compressed_codes_offsets,
122            fsst.codes().bytes().clone(),
123            fsst.codes().dtype().clone(),
124            fsst.codes().validity()?,
125        )?;
126
127        let fsst = FSST::try_new(
128            fsst.dtype().clone(),
129            fsst.symbols().clone(),
130            fsst.symbol_lengths().clone(),
131            compressed_codes,
132            compressed_original_lengths,
133            exec_ctx,
134        )?;
135
136        Ok(fsst.into_array())
137    }
138}
139
140impl Scheme for NullDominatedSparseScheme {
141    fn scheme_name(&self) -> &'static str {
142        "vortex.string.sparse"
143    }
144
145    fn matches(&self, canonical: &Canonical) -> bool {
146        is_utf8_string(canonical)
147    }
148
149    /// Children: indices=0.
150    fn num_children(&self) -> usize {
151        1
152    }
153
154    /// The indices of a null-dominated sparse array should not be sparse-encoded again.
155    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
156        vec![
157            DescendantExclusion {
158                excluded: IntSparseScheme.id(),
159                children: ChildSelection::All,
160            },
161            DescendantExclusion {
162                excluded: IntDictScheme.id(),
163                children: ChildSelection::All,
164            },
165        ]
166    }
167
168    fn expected_compression_ratio(
169        &self,
170        data: &ArrayAndStats,
171        _compress_ctx: CompressorContext,
172        exec_ctx: &mut ExecutionCtx,
173    ) -> CompressionEstimate {
174        let len = data.array_len() as f64;
175        let stats = data.string_stats(exec_ctx);
176        let value_count = stats.value_count();
177
178        // All-null arrays should be compressed as constant instead anyways.
179        if value_count == 0 {
180            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
181        }
182
183        // If the majority (90%) of values is null, this will compress well.
184        if stats.null_count() as f64 / len > 0.9 {
185            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
186        }
187
188        // Otherwise we don't go this route.
189        CompressionEstimate::Verdict(EstimateVerdict::Skip)
190    }
191
192    fn compress(
193        &self,
194        compressor: &CascadingCompressor,
195        data: &ArrayAndStats,
196        compress_ctx: CompressorContext,
197        exec_ctx: &mut ExecutionCtx,
198    ) -> VortexResult<ArrayRef> {
199        // We pass None as we only run this pathway for NULL-dominated string arrays.
200        let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
201
202        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
203            // Compress the indices only (not the values for strings).
204            let indices = sparse
205                .patches()
206                .indices()
207                .clone()
208                .execute::<PrimitiveArray>(exec_ctx)?
209                .narrow()?;
210            let compressed_indices = compressor.compress_child(
211                &indices.into_array(),
212                &compress_ctx,
213                self.id(),
214                0,
215                exec_ctx,
216            )?;
217
218            Sparse::try_new(
219                compressed_indices,
220                sparse.patches().values().clone(),
221                sparse.len(),
222                sparse.fill_scalar().clone(),
223            )
224            .map(|a| a.into_array())
225        } else {
226            Ok(sparse_encoded)
227        }
228    }
229}
230
231#[cfg(feature = "zstd")]
232impl Scheme for ZstdScheme {
233    fn scheme_name(&self) -> &'static str {
234        "vortex.string.zstd"
235    }
236
237    fn matches(&self, canonical: &Canonical) -> bool {
238        is_utf8_string(canonical)
239    }
240
241    fn expected_compression_ratio(
242        &self,
243        _data: &ArrayAndStats,
244        _compress_ctx: CompressorContext,
245        _exec_ctx: &mut ExecutionCtx,
246    ) -> CompressionEstimate {
247        CompressionEstimate::Deferred(DeferredEstimate::Sample)
248    }
249
250    fn compress(
251        &self,
252        _compressor: &CascadingCompressor,
253        data: &ArrayAndStats,
254        _compress_ctx: CompressorContext,
255        exec_ctx: &mut ExecutionCtx,
256    ) -> VortexResult<ArrayRef> {
257        let compacted = data.array_as_utf8().into_owned().compact_buffers()?;
258        Ok(
259            vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)?
260                .into_array(),
261        )
262    }
263}
264
265#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
266impl Scheme for ZstdBuffersScheme {
267    fn scheme_name(&self) -> &'static str {
268        "vortex.string.zstd_buffers"
269    }
270
271    fn matches(&self, canonical: &Canonical) -> bool {
272        is_utf8_string(canonical)
273    }
274
275    fn expected_compression_ratio(
276        &self,
277        _data: &ArrayAndStats,
278        _compress_ctx: CompressorContext,
279        _exec_ctx: &mut ExecutionCtx,
280    ) -> CompressionEstimate {
281        CompressionEstimate::Deferred(DeferredEstimate::Sample)
282    }
283
284    fn compress(
285        &self,
286        _compressor: &CascadingCompressor,
287        data: &ArrayAndStats,
288        _compress_ctx: CompressorContext,
289        exec_ctx: &mut ExecutionCtx,
290    ) -> VortexResult<ArrayRef> {
291        Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array())
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use std::sync::LazyLock;
298
299    use vortex_array::IntoArray;
300    use vortex_array::VortexSessionExecute;
301    use vortex_array::arrays::VarBinViewArray;
302    use vortex_array::builders::ArrayBuilder;
303    use vortex_array::builders::VarBinViewBuilder;
304    use vortex_array::display::DisplayOptions;
305    use vortex_array::dtype::DType;
306    use vortex_array::dtype::Nullability;
307    use vortex_array::session::ArraySession;
308    use vortex_error::VortexResult;
309    use vortex_session::VortexSession;
310
311    use crate::BtrBlocksCompressor;
312
313    static SESSION: LazyLock<VortexSession> =
314        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
315
316    #[test]
317    fn test_strings() -> VortexResult<()> {
318        let mut strings = Vec::new();
319        for _ in 0..1024 {
320            strings.push(Some("hello-world-1234"));
321        }
322        for _ in 0..1024 {
323            strings.push(Some("hello-world-56789"));
324        }
325        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
326
327        let array_ref = strings.into_array();
328        let btr = BtrBlocksCompressor::default();
329        let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
330        assert_eq!(compressed.len(), 2048);
331
332        let display = compressed
333            .display_as(DisplayOptions::MetadataOnly)
334            .to_string()
335            .to_lowercase();
336        assert_eq!(display, "vortex.dict(utf8, len=2048)");
337
338        Ok(())
339    }
340
341    #[test]
342    fn test_sparse_nulls() -> VortexResult<()> {
343        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
344        strings.append_nulls(99);
345
346        strings.append_value("one little string");
347
348        let strings = strings.finish_into_varbinview();
349
350        let array_ref = strings.into_array();
351        let btr = BtrBlocksCompressor::default();
352        let compressed = btr.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
353        assert_eq!(compressed.len(), 100);
354
355        let display = compressed
356            .display_as(DisplayOptions::MetadataOnly)
357            .to_string()
358            .to_lowercase();
359        assert_eq!(display, "vortex.sparse(utf8?, len=100)");
360
361        Ok(())
362    }
363}
364
365/// Tests to verify that each string compression scheme produces the expected encoding.
366#[cfg(test)]
367mod scheme_selection_tests {
368    use std::sync::LazyLock;
369
370    use vortex_array::IntoArray;
371    use vortex_array::VortexSessionExecute;
372    use vortex_array::arrays::Constant;
373    use vortex_array::arrays::Dict;
374    use vortex_array::arrays::VarBinViewArray;
375    use vortex_array::dtype::DType;
376    use vortex_array::dtype::Nullability;
377    use vortex_array::session::ArraySession;
378    use vortex_error::VortexResult;
379    use vortex_fsst::FSST;
380    use vortex_session::VortexSession;
381
382    use crate::BtrBlocksCompressor;
383
384    static SESSION: LazyLock<VortexSession> =
385        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
386
387    #[test]
388    fn test_constant_compressed() -> VortexResult<()> {
389        let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
390        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
391        let array_ref = array.into_array();
392        let compressed = BtrBlocksCompressor::default()
393            .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
394        assert!(compressed.is::<Constant>());
395        Ok(())
396    }
397
398    #[test]
399    fn test_dict_compressed() -> VortexResult<()> {
400        let distinct_values = ["apple", "banana", "cherry"];
401        let mut strings = Vec::with_capacity(1000);
402        for i in 0..1000 {
403            strings.push(Some(distinct_values[i % 3]));
404        }
405        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
406        let array_ref = array.into_array();
407        let compressed = BtrBlocksCompressor::default()
408            .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
409        assert!(compressed.is::<Dict>());
410        Ok(())
411    }
412
413    #[test]
414    fn test_fsst_compressed() -> VortexResult<()> {
415        let mut strings = Vec::with_capacity(1000);
416        for i in 0..1000 {
417            strings.push(Some(format!(
418                "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
419            )));
420        }
421        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
422        let array_ref = array.into_array();
423        let compressed = BtrBlocksCompressor::default()
424            .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
425        assert!(compressed.is::<FSST>());
426        Ok(())
427    }
428}