Skip to main content

vortex_btrblocks/schemes/
string.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! String compression schemes.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::ToCanonical;
10use vortex_array::arrays::VarBinArray;
11use vortex_array::arrays::primitive::PrimitiveArrayExt;
12use vortex_array::arrays::varbin::VarBinArrayExt;
13use vortex_compressor::estimate::CompressionEstimate;
14use vortex_compressor::scheme::ChildSelection;
15use vortex_compressor::scheme::DescendantExclusion;
16use vortex_error::VortexResult;
17use vortex_fsst::FSST;
18use vortex_fsst::FSSTArrayExt;
19use vortex_fsst::fsst_compress;
20use vortex_fsst::fsst_train_compressor;
21use vortex_sparse::Sparse;
22
23use super::integer::IntDictScheme;
24use super::integer::SparseScheme as IntSparseScheme;
25use crate::ArrayAndStats;
26use crate::CascadingCompressor;
27use crate::CompressorContext;
28use crate::Scheme;
29use crate::SchemeExt;
30
31/// FSST (Fast Static Symbol Table) compression.
32#[derive(Debug, Copy, Clone, PartialEq, Eq)]
33pub struct FSSTScheme;
34
35/// Sparse encoding for null-dominated arrays.
36///
37/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
38#[derive(Debug, Copy, Clone, PartialEq, Eq)]
39pub struct NullDominatedSparseScheme;
40
41/// Zstd compression without dictionaries (nvCOMP compatible).
42#[cfg(feature = "zstd")]
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub struct ZstdScheme;
45
46/// Zstd buffer-level compression preserving array layout for GPU decompression.
47#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
48#[derive(Debug, Copy, Clone, PartialEq, Eq)]
49pub struct ZstdBuffersScheme;
50
51// Re-export builtin schemes from vortex-compressor.
52pub use vortex_compressor::builtins::StringConstantScheme;
53pub use vortex_compressor::builtins::StringDictScheme;
54pub use vortex_compressor::builtins::is_utf8_string;
55pub use vortex_compressor::stats::StringStats;
56
57impl Scheme for FSSTScheme {
58    fn scheme_name(&self) -> &'static str {
59        "vortex.string.fsst"
60    }
61
62    fn matches(&self, canonical: &Canonical) -> bool {
63        is_utf8_string(canonical)
64    }
65
66    /// Children: lengths=0, code_offsets=1.
67    fn num_children(&self) -> usize {
68        2
69    }
70
71    fn expected_compression_ratio(
72        &self,
73        _data: &mut ArrayAndStats,
74        _ctx: CompressorContext,
75    ) -> CompressionEstimate {
76        CompressionEstimate::Sample
77    }
78
79    fn compress(
80        &self,
81        compressor: &CascadingCompressor,
82        data: &mut ArrayAndStats,
83        ctx: CompressorContext,
84    ) -> VortexResult<ArrayRef> {
85        let utf8 = data.array_as_utf8();
86        let compressor_fsst = fsst_train_compressor(&utf8);
87        let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst);
88
89        let compressed_original_lengths = compressor.compress_child(
90            &fsst
91                .uncompressed_lengths()
92                .to_primitive()
93                .narrow()?
94                .into_array(),
95            &ctx,
96            self.id(),
97            0,
98        )?;
99
100        let compressed_codes_offsets = compressor.compress_child(
101            &fsst.codes().offsets().to_primitive().narrow()?.into_array(),
102            &ctx,
103            self.id(),
104            1,
105        )?;
106        let compressed_codes = VarBinArray::try_new(
107            compressed_codes_offsets,
108            fsst.codes().bytes().clone(),
109            fsst.codes().dtype().clone(),
110            fsst.codes().validity()?,
111        )?;
112
113        let fsst = FSST::try_new(
114            fsst.dtype().clone(),
115            fsst.symbols().clone(),
116            fsst.symbol_lengths().clone(),
117            compressed_codes,
118            compressed_original_lengths,
119        )?;
120
121        Ok(fsst.into_array())
122    }
123}
124
125impl Scheme for NullDominatedSparseScheme {
126    fn scheme_name(&self) -> &'static str {
127        "vortex.string.sparse"
128    }
129
130    fn matches(&self, canonical: &Canonical) -> bool {
131        is_utf8_string(canonical)
132    }
133
134    /// Children: indices=0.
135    fn num_children(&self) -> usize {
136        1
137    }
138
139    /// The indices of a null-dominated sparse array should not be sparse-encoded again.
140    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
141        vec![
142            DescendantExclusion {
143                excluded: IntSparseScheme.id(),
144                children: ChildSelection::All,
145            },
146            DescendantExclusion {
147                excluded: IntDictScheme.id(),
148                children: ChildSelection::All,
149            },
150        ]
151    }
152
153    fn expected_compression_ratio(
154        &self,
155        data: &mut ArrayAndStats,
156        _ctx: CompressorContext,
157    ) -> CompressionEstimate {
158        let len = data.array_len() as f64;
159        let stats = data.string_stats();
160        let value_count = stats.value_count();
161
162        // All-null arrays should be compressed as constant instead anyways.
163        if value_count == 0 {
164            return CompressionEstimate::Skip;
165        }
166
167        // If the majority (90%) of values is null, this will compress well.
168        if stats.null_count() as f64 / len > 0.9 {
169            return CompressionEstimate::Ratio(len / value_count as f64);
170        }
171
172        // Otherwise we don't go this route.
173        CompressionEstimate::Skip
174    }
175
176    fn compress(
177        &self,
178        compressor: &CascadingCompressor,
179        data: &mut ArrayAndStats,
180        ctx: CompressorContext,
181    ) -> VortexResult<ArrayRef> {
182        // We pass None as we only run this pathway for NULL-dominated string arrays.
183        let sparse_encoded = Sparse::encode(data.array(), None)?;
184
185        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
186            // Compress the indices only (not the values for strings).
187            let indices = sparse.patches().indices().to_primitive().narrow()?;
188            let compressed_indices =
189                compressor.compress_child(&indices.into_array(), &ctx, self.id(), 0)?;
190
191            Sparse::try_new(
192                compressed_indices,
193                sparse.patches().values().clone(),
194                sparse.len(),
195                sparse.fill_scalar().clone(),
196            )
197            .map(|a| a.into_array())
198        } else {
199            Ok(sparse_encoded)
200        }
201    }
202}
203
204#[cfg(feature = "zstd")]
205impl Scheme for ZstdScheme {
206    fn scheme_name(&self) -> &'static str {
207        "vortex.string.zstd"
208    }
209
210    fn matches(&self, canonical: &Canonical) -> bool {
211        is_utf8_string(canonical)
212    }
213
214    fn expected_compression_ratio(
215        &self,
216        _data: &mut ArrayAndStats,
217        _ctx: CompressorContext,
218    ) -> CompressionEstimate {
219        CompressionEstimate::Sample
220    }
221
222    fn compress(
223        &self,
224        _compressor: &CascadingCompressor,
225        data: &mut ArrayAndStats,
226        _ctx: CompressorContext,
227    ) -> VortexResult<ArrayRef> {
228        let compacted = data.array_as_utf8().compact_buffers()?;
229        Ok(vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192)?.into_array())
230    }
231}
232
233#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
234impl Scheme for ZstdBuffersScheme {
235    fn scheme_name(&self) -> &'static str {
236        "vortex.string.zstd_buffers"
237    }
238
239    fn matches(&self, canonical: &Canonical) -> bool {
240        is_utf8_string(canonical)
241    }
242
243    fn expected_compression_ratio(
244        &self,
245        _data: &mut ArrayAndStats,
246        _ctx: CompressorContext,
247    ) -> CompressionEstimate {
248        CompressionEstimate::Sample
249    }
250
251    fn compress(
252        &self,
253        _compressor: &CascadingCompressor,
254        data: &mut ArrayAndStats,
255        _ctx: CompressorContext,
256    ) -> VortexResult<ArrayRef> {
257        Ok(
258            vortex_zstd::ZstdBuffers::compress(data.array(), 3, &vortex_array::LEGACY_SESSION)?
259                .into_array(),
260        )
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use vortex_array::IntoArray;
267    use vortex_array::arrays::VarBinViewArray;
268    use vortex_array::builders::ArrayBuilder;
269    use vortex_array::builders::VarBinViewBuilder;
270    use vortex_array::display::DisplayOptions;
271    use vortex_array::dtype::DType;
272    use vortex_array::dtype::Nullability;
273    use vortex_error::VortexResult;
274
275    use crate::BtrBlocksCompressor;
276
277    #[test]
278    fn test_strings() -> VortexResult<()> {
279        let mut strings = Vec::new();
280        for _ in 0..1024 {
281            strings.push(Some("hello-world-1234"));
282        }
283        for _ in 0..1024 {
284            strings.push(Some("hello-world-56789"));
285        }
286        let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
287
288        let array_ref = strings.into_array();
289        let btr = BtrBlocksCompressor::default();
290        let compressed = btr.compress(&array_ref)?;
291        assert_eq!(compressed.len(), 2048);
292
293        let display = compressed
294            .display_as(DisplayOptions::MetadataOnly)
295            .to_string()
296            .to_lowercase();
297        assert_eq!(display, "vortex.dict(utf8, len=2048)");
298
299        Ok(())
300    }
301
302    #[test]
303    fn test_sparse_nulls() -> VortexResult<()> {
304        let mut strings = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 100);
305        strings.append_nulls(99);
306
307        strings.append_value("one little string");
308
309        let strings = strings.finish_into_varbinview();
310
311        let array_ref = strings.into_array();
312        let btr = BtrBlocksCompressor::default();
313        let compressed = btr.compress(&array_ref)?;
314        assert_eq!(compressed.len(), 100);
315
316        let display = compressed
317            .display_as(DisplayOptions::MetadataOnly)
318            .to_string()
319            .to_lowercase();
320        assert_eq!(display, "vortex.sparse(utf8?, len=100)");
321
322        Ok(())
323    }
324}
325
326/// Tests to verify that each string compression scheme produces the expected encoding.
327#[cfg(test)]
328mod scheme_selection_tests {
329    use vortex_array::IntoArray;
330    use vortex_array::arrays::Constant;
331    use vortex_array::arrays::Dict;
332    use vortex_array::arrays::VarBinViewArray;
333    use vortex_array::dtype::DType;
334    use vortex_array::dtype::Nullability;
335    use vortex_error::VortexResult;
336    use vortex_fsst::FSST;
337
338    use crate::BtrBlocksCompressor;
339
340    #[test]
341    fn test_constant_compressed() -> VortexResult<()> {
342        let strings: Vec<Option<&str>> = vec![Some("constant_value"); 100];
343        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
344        let array_ref = array.into_array();
345        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
346        assert!(compressed.is::<Constant>());
347        Ok(())
348    }
349
350    #[test]
351    fn test_dict_compressed() -> VortexResult<()> {
352        let distinct_values = ["apple", "banana", "cherry"];
353        let mut strings = Vec::with_capacity(1000);
354        for i in 0..1000 {
355            strings.push(Some(distinct_values[i % 3]));
356        }
357        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
358        let array_ref = array.into_array();
359        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
360        assert!(compressed.is::<Dict>());
361        Ok(())
362    }
363
364    #[test]
365    fn test_fsst_compressed() -> VortexResult<()> {
366        let mut strings = Vec::with_capacity(1000);
367        for i in 0..1000 {
368            strings.push(Some(format!(
369                "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern"
370            )));
371        }
372        let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable));
373        let array_ref = array.into_array();
374        let compressed = BtrBlocksCompressor::default().compress(&array_ref)?;
375        assert!(compressed.is::<FSST>());
376        Ok(())
377    }
378}