lance_encoding/encodings/physical/
fsst.rs1use lance_core::{Error, Result};
19use snafu::location;
20
21use crate::{
22 buffer::LanceBuffer,
23 compression::{MiniBlockDecompressor, VariablePerValueDecompressor},
24 data::{BlockInfo, DataBlock, VariableWidthBlock},
25 encodings::logical::primitive::{
26 fullzip::{PerValueCompressor, PerValueDataBlock},
27 miniblock::{MiniBlockCompressed, MiniBlockCompressor},
28 },
29 format::{
30 pb21::{self, CompressiveEncoding},
31 ProtobufUtils21,
32 },
33};
34
35use super::binary::BinaryMiniBlockEncoder;
36
37struct FsstCompressed {
38 data: VariableWidthBlock,
39 symbol_table: Vec<u8>,
40}
41
42impl FsstCompressed {
43 fn fsst_compress(data: DataBlock) -> Result<Self> {
44 match data {
45 DataBlock::VariableWidth(variable_width) => {
46 match variable_width.bits_per_offset {
47 32 => {
48 let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
49 let offsets_slice = offsets.as_ref();
50 let bytes_data = variable_width.data.into_buffer();
51
52 let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
54 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
55 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
56
57 fsst::fsst::compress(
59 &mut symbol_table,
60 bytes_data.as_slice(),
61 offsets_slice,
62 &mut dest_values,
63 &mut dest_offsets,
64 )?;
65
66 let compressed = VariableWidthBlock {
68 data: LanceBuffer::reinterpret_vec(dest_values),
69 bits_per_offset: 32,
70 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
71 num_values: variable_width.num_values,
72 block_info: BlockInfo::new(),
73 };
74
75 Ok(Self {
76 data: compressed,
77 symbol_table,
78 })
79 }
80 64 => {
81 let offsets = variable_width.offsets.borrow_to_typed_slice::<i64>();
82 let offsets_slice = offsets.as_ref();
83 let bytes_data = variable_width.data.into_buffer();
84
85 let mut dest_offsets = vec![0_i64; offsets_slice.len() * 2];
87 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
88 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
89
90 fsst::fsst::compress(
92 &mut symbol_table,
93 bytes_data.as_slice(),
94 offsets_slice,
95 &mut dest_values,
96 &mut dest_offsets,
97 )?;
98
99 let compressed = VariableWidthBlock {
101 data: LanceBuffer::reinterpret_vec(dest_values),
102 bits_per_offset: 64,
103 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
104 num_values: variable_width.num_values,
105 block_info: BlockInfo::new(),
106 };
107
108 Ok(Self {
109 data: compressed,
110 symbol_table,
111 })
112 }
113 _ => panic!(
114 "Unsupported offsets type {}",
115 variable_width.bits_per_offset
116 ),
117 }
118 }
119 _ => Err(Error::InvalidInput {
120 source: format!(
121 "Cannot compress a data block of type {} with FsstEncoder",
122 data.name()
123 )
124 .into(),
125 location: location!(),
126 }),
127 }
128 }
129}
130
131#[derive(Debug, Default)]
132pub struct FsstMiniBlockEncoder {
133 minichunk_size: Option<i64>,
134}
135
136impl FsstMiniBlockEncoder {
137 pub fn new(minichunk_size: Option<i64>) -> Self {
138 Self { minichunk_size }
139 }
140}
141
142impl MiniBlockCompressor for FsstMiniBlockEncoder {
143 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
144 let compressed = FsstCompressed::fsst_compress(data)?;
145
146 let data_block = DataBlock::VariableWidth(compressed.data);
147
148 let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(self.minichunk_size))
150 as Box<dyn MiniBlockCompressor>;
151
152 let (binary_miniblock_compressed, binary_array_encoding) =
153 binary_compressor.compress(data_block)?;
154
155 Ok((
156 binary_miniblock_compressed,
157 ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
158 ))
159 }
160}
161
162#[derive(Debug)]
163pub struct FsstPerValueEncoder {
164 inner: Box<dyn PerValueCompressor>,
165}
166
167impl FsstPerValueEncoder {
168 pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
169 Self { inner }
170 }
171}
172
173impl PerValueCompressor for FsstPerValueEncoder {
174 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
175 let compressed = FsstCompressed::fsst_compress(data)?;
176
177 let data_block = DataBlock::VariableWidth(compressed.data);
178
179 let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
180
181 Ok((
182 binary_compressed,
183 ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
184 ))
185 }
186}
187
188#[derive(Debug)]
189pub struct FsstPerValueDecompressor {
190 symbol_table: LanceBuffer,
191 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
192}
193
194impl FsstPerValueDecompressor {
195 pub fn new(
196 symbol_table: LanceBuffer,
197 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
198 ) -> Self {
199 Self {
200 symbol_table,
201 inner_decompressor,
202 }
203 }
204}
205
206impl VariablePerValueDecompressor for FsstPerValueDecompressor {
207 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
208 let compressed_variable_data = self
210 .inner_decompressor
211 .decompress(data)?
212 .as_variable_width()
213 .unwrap();
214
215 let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
217 let bytes = bytes.as_ref();
218
219 match compressed_variable_data.bits_per_offset {
220 32 => {
221 let offsets = compressed_variable_data
222 .offsets
223 .borrow_to_typed_slice::<i32>();
224 let offsets = offsets.as_ref();
225 let num_values = compressed_variable_data.num_values;
226
227 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
230 let mut decompress_offset_buf = vec![0i32; offsets.len()];
231 fsst::fsst::decompress(
232 &self.symbol_table,
233 bytes,
234 offsets,
235 &mut decompress_bytes_buf,
236 &mut decompress_offset_buf,
237 )?;
238
239 decompress_offset_buf.truncate((num_values + 1) as usize);
241
242 Ok(DataBlock::VariableWidth(VariableWidthBlock {
243 data: LanceBuffer::from(decompress_bytes_buf),
244 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
245 bits_per_offset: 32,
246 num_values,
247 block_info: BlockInfo::new(),
248 }))
249 }
250 64 => {
251 let offsets = compressed_variable_data
252 .offsets
253 .borrow_to_typed_slice::<i64>();
254 let offsets = offsets.as_ref();
255 let num_values = compressed_variable_data.num_values;
256
257 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
260 let mut decompress_offset_buf = vec![0i64; offsets.len()];
261 fsst::fsst::decompress(
262 &self.symbol_table,
263 bytes,
264 offsets,
265 &mut decompress_bytes_buf,
266 &mut decompress_offset_buf,
267 )?;
268
269 decompress_offset_buf.truncate((num_values + 1) as usize);
271
272 Ok(DataBlock::VariableWidth(VariableWidthBlock {
273 data: LanceBuffer::from(decompress_bytes_buf),
274 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
275 bits_per_offset: 64,
276 num_values,
277 block_info: BlockInfo::new(),
278 }))
279 }
280 _ => panic!(
281 "Unsupported offset type {}",
282 compressed_variable_data.bits_per_offset,
283 ),
284 }
285 }
286}
287
288#[derive(Debug)]
289pub struct FsstMiniBlockDecompressor {
290 symbol_table: LanceBuffer,
291 inner_decompressor: Box<dyn MiniBlockDecompressor>,
292}
293
294impl FsstMiniBlockDecompressor {
295 pub fn new(
296 description: &pb21::Fsst,
297 inner_decompressor: Box<dyn MiniBlockDecompressor>,
298 ) -> Self {
299 Self {
300 symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
301 inner_decompressor,
302 }
303 }
304}
305
306impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
307 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
308 let compressed_data_block = self.inner_decompressor.decompress(data, num_values)?;
311 let DataBlock::VariableWidth(compressed_data_block) = compressed_data_block else {
312 panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
313 };
314
315 let bytes = &compressed_data_block.data;
317 let (decompress_bytes_buf, decompress_offset_buf) =
318 if compressed_data_block.bits_per_offset == 64 {
319 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i64>();
320 let offsets = offsets.as_ref();
321
322 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
325 let mut decompress_offset_buf = vec![0i64; offsets.len()];
326 fsst::fsst::decompress(
327 &self.symbol_table,
328 bytes.as_ref(),
329 offsets,
330 &mut decompress_bytes_buf,
331 &mut decompress_offset_buf,
332 )?;
333
334 decompress_offset_buf.truncate((num_values + 1) as usize);
336
337 (
338 decompress_bytes_buf,
339 LanceBuffer::reinterpret_vec(decompress_offset_buf),
340 )
341 } else {
342 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
343 let offsets = offsets.as_ref();
344
345 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
348 let mut decompress_offset_buf = vec![0i32; offsets.len()];
349 fsst::fsst::decompress(
350 &self.symbol_table,
351 bytes.as_ref(),
352 offsets,
353 &mut decompress_bytes_buf,
354 &mut decompress_offset_buf,
355 )?;
356
357 decompress_offset_buf.truncate((num_values + 1) as usize);
359
360 (
361 decompress_bytes_buf,
362 LanceBuffer::reinterpret_vec(decompress_offset_buf),
363 )
364 };
365
366 Ok(DataBlock::VariableWidth(VariableWidthBlock {
367 data: LanceBuffer::from(decompress_bytes_buf),
368 offsets: decompress_offset_buf,
369 bits_per_offset: compressed_data_block.bits_per_offset,
370 num_values,
371 block_info: BlockInfo::new(),
372 }))
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use std::collections::HashMap;
379
380 use lance_datagen::{ByteCount, RowCount};
381
382 use crate::{
383 testing::{check_round_trip_encoding_of_data, TestCases},
384 version::LanceFileVersion,
385 };
386
387 #[test_log::test(tokio::test)]
388 async fn test_fsst() {
389 let test_cases = TestCases::default()
390 .with_expected_encoding("fsst")
391 .with_min_file_version(LanceFileVersion::V2_1);
392
393 let arr = lance_datagen::gen_batch()
395 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(100), false))
396 .into_batch_rows(RowCount::from(5000))
397 .unwrap()
398 .column(0)
399 .clone();
400
401 let metadata_explicit =
404 HashMap::from([("lance-encoding:compression".to_string(), "fsst".to_string())]);
405 check_round_trip_encoding_of_data(vec![arr.clone()], &test_cases, metadata_explicit).await;
406
407 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
410 }
411}