1use fsst::Compressor;
7use fsst::Symbol;
8use vortex_array::Array;
9use vortex_array::IntoArray;
10use vortex_array::accessor::ArrayAccessor;
11use vortex_array::arrays::builder::VarBinBuilder;
12use vortex_array::dtype::DType;
13use vortex_buffer::Buffer;
14use vortex_buffer::BufferMut;
15use vortex_error::VortexExpect;
16
17use crate::FSSTArray;
18
19pub fn fsst_compress<A: ArrayAccessor<[u8]> + AsRef<dyn Array>>(
21 strings: A,
22 compressor: &Compressor,
23) -> FSSTArray {
24 let len = strings.as_ref().len();
25 let dtype = strings.as_ref().dtype().clone();
26 strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
27}
28
29pub fn fsst_train_compressor<A: ArrayAccessor<[u8]>>(array: &A) -> Compressor {
35 array.with_iterator(|iter| fsst_train_compressor_iter(iter))
36}
37
38fn fsst_train_compressor_iter<'a, I>(iter: I) -> Compressor
40where
41 I: Iterator<Item = Option<&'a [u8]>>,
42{
43 let mut lines = Vec::with_capacity(8_192);
44
45 for string in iter {
46 match string {
47 None => {}
48 Some(b) => lines.push(b),
49 }
50 }
51
52 Compressor::train(&lines)
53}
54
55const DEFAULT_BUFFER_LEN: usize = 1024 * 1024;
58
59pub fn fsst_compress_iter<'a, I>(
61 iter: I,
62 len: usize,
63 dtype: DType,
64 compressor: &Compressor,
65) -> FSSTArray
66where
67 I: Iterator<Item = Option<&'a [u8]>>,
68{
69 let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
70 let mut builder = VarBinBuilder::<i32>::with_capacity(len);
71 let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
72 for string in iter {
73 match string {
74 None => {
75 builder.append_null();
76 uncompressed_lengths.push(0);
77 }
78 Some(s) => {
79 uncompressed_lengths.push(
80 s.len()
81 .try_into()
82 .vortex_expect("string length must fit in i32"),
83 );
84
85 let target_size = 2 * s.len() + 7;
87 if target_size > buffer.len() {
88 let additional_capacity = target_size - buffer.len();
89 buffer.reserve(additional_capacity);
90 }
91
92 unsafe { compressor.compress_into(s, &mut buffer) };
94
95 builder.append_value(&buffer);
96 }
97 }
98 }
99
100 let codes = builder.finish(DType::Binary(dtype.nullability()));
101 let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
102 let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());
103
104 let uncompressed_lengths = uncompressed_lengths.into_array();
105
106 FSSTArray::try_new(dtype, symbols, symbol_lengths, codes, uncompressed_lengths)
107 .vortex_expect("building FSSTArray from parts")
108}
109
110#[cfg(test)]
111mod tests {
112 use fsst::CompressorBuilder;
113 use vortex_array::dtype::DType;
114 use vortex_array::dtype::Nullability;
115 use vortex_array::scalar::Scalar;
116
117 use crate::compress::DEFAULT_BUFFER_LEN;
118 use crate::fsst_compress_iter;
119
120 #[test]
121 fn test_large_string() {
122 let big_string: String = "abc"
123 .chars()
124 .cycle()
125 .take(10 * DEFAULT_BUFFER_LEN)
126 .collect();
127
128 let compressor = CompressorBuilder::default().build();
129
130 let compressed = fsst_compress_iter(
131 [Some(big_string.as_bytes())].into_iter(),
132 1,
133 DType::Utf8(Nullability::NonNullable),
134 &compressor,
135 );
136
137 let decoded = compressed.scalar_at(0).unwrap();
138
139 let expected = Scalar::utf8(big_string, Nullability::NonNullable);
140
141 assert_eq!(decoded, expected);
142 }
143}