Skip to main content

vortex_fsst/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4// Compress a set of values into an Array.
5
6use fsst::Compressor;
7use fsst::Symbol;
8use vortex_array::Array;
9use vortex_array::IntoArray;
10use vortex_array::accessor::ArrayAccessor;
11use vortex_array::arrays::builder::VarBinBuilder;
12use vortex_buffer::Buffer;
13use vortex_buffer::BufferMut;
14use vortex_dtype::DType;
15use vortex_error::VortexExpect;
16
17use crate::FSSTArray;
18
19/// Compress a string array using FSST.
20pub fn fsst_compress<A: ArrayAccessor<[u8]> + AsRef<dyn Array>>(
21    strings: A,
22    compressor: &Compressor,
23) -> FSSTArray {
24    let len = strings.as_ref().len();
25    let dtype = strings.as_ref().dtype().clone();
26    strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
27}
28
29/// Train a compressor from an array.
30///
31/// # Panics
32///
33/// If the provided array is not FSST compressible.
34pub fn fsst_train_compressor<A: ArrayAccessor<[u8]>>(array: &A) -> Compressor {
35    array.with_iterator(|iter| fsst_train_compressor_iter(iter))
36}
37
38/// Train a [compressor][Compressor] from an iterator of bytestrings.
39fn fsst_train_compressor_iter<'a, I>(iter: I) -> Compressor
40where
41    I: Iterator<Item = Option<&'a [u8]>>,
42{
43    let mut lines = Vec::with_capacity(8_192);
44
45    for string in iter {
46        match string {
47            None => {}
48            Some(b) => lines.push(b),
49        }
50    }
51
52    Compressor::train(&lines)
53}
54
55/// Compress from an iterator of bytestrings using FSST.
56pub fn fsst_compress_iter<'a, I>(
57    iter: I,
58    len: usize,
59    dtype: DType,
60    compressor: &Compressor,
61) -> FSSTArray
62where
63    I: Iterator<Item = Option<&'a [u8]>>,
64{
65    // TODO(aduffy): this might be too small.
66    let mut buffer = Vec::with_capacity(16 * 1024 * 1024);
67    let mut builder = VarBinBuilder::<i32>::with_capacity(len);
68    let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
69    for string in iter {
70        match string {
71            None => {
72                builder.append_null();
73                uncompressed_lengths.push(0);
74            }
75            Some(s) => {
76                uncompressed_lengths.push(
77                    s.len()
78                        .try_into()
79                        .vortex_expect("string length must fit in i32"),
80                );
81
82                // SAFETY: buffer is large enough
83                unsafe { compressor.compress_into(s, &mut buffer) };
84
85                builder.append_value(&buffer);
86            }
87        }
88    }
89
90    let codes = builder.finish(DType::Binary(dtype.nullability()));
91    let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
92    let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());
93
94    let uncompressed_lengths = uncompressed_lengths.into_array();
95
96    FSSTArray::try_new(dtype, symbols, symbol_lengths, codes, uncompressed_lengths)
97        .vortex_expect("building FSSTArray from parts")
98}