Skip to main content

vortex_fsst/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4// Compress a set of values into an Array.
5
6use fsst::Compressor;
7use fsst::Symbol;
8use vortex_array::IntoArray;
9use vortex_array::accessor::ArrayAccessor;
10use vortex_array::arrays::varbin::builder::VarBinBuilder;
11use vortex_array::dtype::DType;
12use vortex_buffer::Buffer;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15
16/// Compress a string array using FSST.
17use crate::FSST;
18use crate::FSSTArray;
19pub fn fsst_compress<A: ArrayAccessor<[u8]>>(
20    strings: A,
21    len: usize,
22    dtype: &DType,
23    compressor: &Compressor,
24) -> FSSTArray {
25    strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype.clone(), compressor))
26}
27
28/// Train a compressor from an array.
29///
30/// # Panics
31///
32/// If the provided array is not FSST compressible.
33pub fn fsst_train_compressor<A: ArrayAccessor<[u8]>>(array: &A) -> Compressor {
34    array.with_iterator(|iter| fsst_train_compressor_iter(iter))
35}
36
37/// Train a [compressor][Compressor] from an iterator of bytestrings.
38fn fsst_train_compressor_iter<'a, I>(iter: I) -> Compressor
39where
40    I: Iterator<Item = Option<&'a [u8]>>,
41{
42    let mut lines = Vec::with_capacity(8_192);
43
44    for string in iter {
45        match string {
46            None => {}
47            Some(b) => lines.push(b),
48        }
49    }
50
51    Compressor::train(&lines)
52}
53
54/// Most strings are small in practice. If we encounter a larger string, we reallocate
55/// the buffer to hold enough capacity for the worst-case compressed value.
56const DEFAULT_BUFFER_LEN: usize = 1024 * 1024;
57
58/// Compress from an iterator of bytestrings using FSST.
59pub fn fsst_compress_iter<'a, I>(
60    iter: I,
61    len: usize,
62    dtype: DType,
63    compressor: &Compressor,
64) -> FSSTArray
65where
66    I: Iterator<Item = Option<&'a [u8]>>,
67{
68    let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
69    let mut builder = VarBinBuilder::<i32>::with_capacity(len);
70    let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
71    for string in iter {
72        match string {
73            None => {
74                builder.append_null();
75                uncompressed_lengths.push(0);
76            }
77            Some(s) => {
78                uncompressed_lengths.push(
79                    s.len()
80                        .try_into()
81                        .vortex_expect("string length must fit in i32"),
82                );
83
84                // make sure the buffer is 2x+7 larger than the input
85                let target_size = 2 * s.len() + 7;
86                if target_size > buffer.len() {
87                    let additional_capacity = target_size - buffer.len();
88                    buffer.reserve(additional_capacity);
89                }
90
91                // SAFETY: buffer is always sized to be large enough
92                unsafe { compressor.compress_into(s, &mut buffer) };
93
94                builder.append_value(&buffer);
95            }
96        }
97    }
98
99    let codes = builder.finish(DType::Binary(dtype.nullability()));
100    let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
101    let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());
102
103    let uncompressed_lengths = uncompressed_lengths.into_array();
104
105    FSST::try_new(dtype, symbols, symbol_lengths, codes, uncompressed_lengths)
106        .vortex_expect("FSST parts must be valid")
107}
108
109#[cfg(test)]
110mod tests {
111    use fsst::CompressorBuilder;
112    use vortex_array::LEGACY_SESSION;
113    use vortex_array::VortexSessionExecute;
114    use vortex_array::dtype::DType;
115    use vortex_array::dtype::Nullability;
116    use vortex_array::scalar::Scalar;
117
118    use crate::compress::DEFAULT_BUFFER_LEN;
119    use crate::fsst_compress_iter;
120
121    #[test]
122    fn test_large_string() {
123        let big_string: String = "abc"
124            .chars()
125            .cycle()
126            .take(10 * DEFAULT_BUFFER_LEN)
127            .collect();
128
129        let compressor = CompressorBuilder::default().build();
130
131        let compressed = fsst_compress_iter(
132            [Some(big_string.as_bytes())].into_iter(),
133            1,
134            DType::Utf8(Nullability::NonNullable),
135            &compressor,
136        );
137
138        let decoded = compressed
139            .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
140            .unwrap();
141
142        let expected = Scalar::utf8(big_string, Nullability::NonNullable);
143
144        assert_eq!(decoded, expected);
145    }
146}