1use fsst::Compressor;
7use fsst::Symbol;
8use vortex_array::IntoArray;
9use vortex_array::accessor::ArrayAccessor;
10use vortex_array::arrays::varbin::builder::VarBinBuilder;
11use vortex_array::dtype::DType;
12use vortex_buffer::Buffer;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15
16use crate::FSST;
18use crate::FSSTArray;
19pub fn fsst_compress<A: ArrayAccessor<[u8]>>(
20 strings: A,
21 len: usize,
22 dtype: &DType,
23 compressor: &Compressor,
24) -> FSSTArray {
25 strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype.clone(), compressor))
26}
27
28pub fn fsst_train_compressor<A: ArrayAccessor<[u8]>>(array: &A) -> Compressor {
34 array.with_iterator(|iter| fsst_train_compressor_iter(iter))
35}
36
37fn fsst_train_compressor_iter<'a, I>(iter: I) -> Compressor
39where
40 I: Iterator<Item = Option<&'a [u8]>>,
41{
42 let mut lines = Vec::with_capacity(8_192);
43
44 for string in iter {
45 match string {
46 None => {}
47 Some(b) => lines.push(b),
48 }
49 }
50
51 Compressor::train(&lines)
52}
53
54const DEFAULT_BUFFER_LEN: usize = 1024 * 1024;
57
58pub fn fsst_compress_iter<'a, I>(
60 iter: I,
61 len: usize,
62 dtype: DType,
63 compressor: &Compressor,
64) -> FSSTArray
65where
66 I: Iterator<Item = Option<&'a [u8]>>,
67{
68 let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
69 let mut builder = VarBinBuilder::<i32>::with_capacity(len);
70 let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
71 for string in iter {
72 match string {
73 None => {
74 builder.append_null();
75 uncompressed_lengths.push(0);
76 }
77 Some(s) => {
78 uncompressed_lengths.push(
79 s.len()
80 .try_into()
81 .vortex_expect("string length must fit in i32"),
82 );
83
84 let target_size = 2 * s.len() + 7;
86 if target_size > buffer.len() {
87 let additional_capacity = target_size - buffer.len();
88 buffer.reserve(additional_capacity);
89 }
90
91 unsafe { compressor.compress_into(s, &mut buffer) };
93
94 builder.append_value(&buffer);
95 }
96 }
97 }
98
99 let codes = builder.finish(DType::Binary(dtype.nullability()));
100 let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
101 let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());
102
103 let uncompressed_lengths = uncompressed_lengths.into_array();
104
105 FSST::try_new(dtype, symbols, symbol_lengths, codes, uncompressed_lengths)
106 .vortex_expect("FSST parts must be valid")
107}
108
109#[cfg(test)]
110mod tests {
111 use fsst::CompressorBuilder;
112 use vortex_array::LEGACY_SESSION;
113 use vortex_array::VortexSessionExecute;
114 use vortex_array::dtype::DType;
115 use vortex_array::dtype::Nullability;
116 use vortex_array::scalar::Scalar;
117
118 use crate::compress::DEFAULT_BUFFER_LEN;
119 use crate::fsst_compress_iter;
120
121 #[test]
122 fn test_large_string() {
123 let big_string: String = "abc"
124 .chars()
125 .cycle()
126 .take(10 * DEFAULT_BUFFER_LEN)
127 .collect();
128
129 let compressor = CompressorBuilder::default().build();
130
131 let compressed = fsst_compress_iter(
132 [Some(big_string.as_bytes())].into_iter(),
133 1,
134 DType::Utf8(Nullability::NonNullable),
135 &compressor,
136 );
137
138 let decoded = compressed
139 .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
140 .unwrap();
141
142 let expected = Scalar::utf8(big_string, Nullability::NonNullable);
143
144 assert_eq!(decoded, expected);
145 }
146}