Skip to main content

vortex_fsst/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4// Compress a set of values into an Array.
5
6use fsst::Compressor;
7use fsst::Symbol;
8use vortex_array::Array;
9use vortex_array::IntoArray;
10use vortex_array::accessor::ArrayAccessor;
11use vortex_array::arrays::builder::VarBinBuilder;
12use vortex_array::dtype::DType;
13use vortex_buffer::Buffer;
14use vortex_buffer::BufferMut;
15use vortex_error::VortexExpect;
16
17use crate::FSSTArray;
18
19/// Compress a string array using FSST.
20pub fn fsst_compress<A: ArrayAccessor<[u8]> + AsRef<dyn Array>>(
21    strings: A,
22    compressor: &Compressor,
23) -> FSSTArray {
24    let len = strings.as_ref().len();
25    let dtype = strings.as_ref().dtype().clone();
26    strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
27}
28
29/// Train a compressor from an array.
30///
31/// # Panics
32///
33/// If the provided array is not FSST compressible.
34pub fn fsst_train_compressor<A: ArrayAccessor<[u8]>>(array: &A) -> Compressor {
35    array.with_iterator(|iter| fsst_train_compressor_iter(iter))
36}
37
38/// Train a [compressor][Compressor] from an iterator of bytestrings.
39fn fsst_train_compressor_iter<'a, I>(iter: I) -> Compressor
40where
41    I: Iterator<Item = Option<&'a [u8]>>,
42{
43    let mut lines = Vec::with_capacity(8_192);
44
45    for string in iter {
46        match string {
47            None => {}
48            Some(b) => lines.push(b),
49        }
50    }
51
52    Compressor::train(&lines)
53}
54
55/// Most strings are small in practice. If we encounter a larger string, we reallocate
56/// the buffer to hold enough capacity for the worst-case compressed value.
57const DEFAULT_BUFFER_LEN: usize = 1024 * 1024;
58
59/// Compress from an iterator of bytestrings using FSST.
60pub fn fsst_compress_iter<'a, I>(
61    iter: I,
62    len: usize,
63    dtype: DType,
64    compressor: &Compressor,
65) -> FSSTArray
66where
67    I: Iterator<Item = Option<&'a [u8]>>,
68{
69    let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
70    let mut builder = VarBinBuilder::<i32>::with_capacity(len);
71    let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
72    for string in iter {
73        match string {
74            None => {
75                builder.append_null();
76                uncompressed_lengths.push(0);
77            }
78            Some(s) => {
79                uncompressed_lengths.push(
80                    s.len()
81                        .try_into()
82                        .vortex_expect("string length must fit in i32"),
83                );
84
85                // make sure the buffer is 2x+7 larger than the input
86                let target_size = 2 * s.len() + 7;
87                if target_size > buffer.len() {
88                    let additional_capacity = target_size - buffer.len();
89                    buffer.reserve(additional_capacity);
90                }
91
92                // SAFETY: buffer is always sized to be large enough
93                unsafe { compressor.compress_into(s, &mut buffer) };
94
95                builder.append_value(&buffer);
96            }
97        }
98    }
99
100    let codes = builder.finish(DType::Binary(dtype.nullability()));
101    let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
102    let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());
103
104    let uncompressed_lengths = uncompressed_lengths.into_array();
105
106    FSSTArray::try_new(dtype, symbols, symbol_lengths, codes, uncompressed_lengths)
107        .vortex_expect("building FSSTArray from parts")
108}
109
110#[cfg(test)]
111mod tests {
112    use fsst::CompressorBuilder;
113    use vortex_array::dtype::DType;
114    use vortex_array::dtype::Nullability;
115    use vortex_array::scalar::Scalar;
116
117    use crate::compress::DEFAULT_BUFFER_LEN;
118    use crate::fsst_compress_iter;
119
120    #[test]
121    fn test_large_string() {
122        let big_string: String = "abc"
123            .chars()
124            .cycle()
125            .take(10 * DEFAULT_BUFFER_LEN)
126            .collect();
127
128        let compressor = CompressorBuilder::default().build();
129
130        let compressed = fsst_compress_iter(
131            [Some(big_string.as_bytes())].into_iter(),
132            1,
133            DType::Utf8(Nullability::NonNullable),
134            &compressor,
135        );
136
137        let decoded = compressed.scalar_at(0).unwrap();
138
139        let expected = Scalar::utf8(big_string, Nullability::NonNullable);
140
141        assert_eq!(decoded, expected);
142    }
143}