lance_encoding/encodings/physical/
fsst.rs1use lance_core::{Error, Result};
19
20use crate::{
21 buffer::LanceBuffer,
22 compression::{MiniBlockDecompressor, VariablePerValueDecompressor},
23 data::{BlockInfo, DataBlock, VariableWidthBlock},
24 encodings::logical::primitive::{
25 fullzip::{PerValueCompressor, PerValueDataBlock},
26 miniblock::{MiniBlockCompressed, MiniBlockCompressor},
27 },
28 format::{
29 ProtobufUtils21,
30 pb21::{self, CompressiveEncoding},
31 },
32};
33
34use super::binary::BinaryMiniBlockEncoder;
35
36struct FsstCompressed {
37 data: VariableWidthBlock,
38 symbol_table: Vec<u8>,
39}
40
41impl FsstCompressed {
42 fn fsst_compress(data: DataBlock) -> Result<Self> {
43 match data {
44 DataBlock::VariableWidth(variable_width) => {
45 match variable_width.bits_per_offset {
46 32 => {
47 let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
48 let offsets_slice = offsets.as_ref();
49 let bytes_data = variable_width.data.into_buffer();
50
51 let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
53 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
54 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
55
56 fsst::fsst::compress(
58 &mut symbol_table,
59 bytes_data.as_slice(),
60 offsets_slice,
61 &mut dest_values,
62 &mut dest_offsets,
63 )?;
64
65 let compressed = VariableWidthBlock {
67 data: LanceBuffer::reinterpret_vec(dest_values),
68 bits_per_offset: 32,
69 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
70 num_values: variable_width.num_values,
71 block_info: BlockInfo::new(),
72 };
73
74 Ok(Self {
75 data: compressed,
76 symbol_table,
77 })
78 }
79 64 => {
80 let offsets = variable_width.offsets.borrow_to_typed_slice::<i64>();
81 let offsets_slice = offsets.as_ref();
82 let bytes_data = variable_width.data.into_buffer();
83
84 let mut dest_offsets = vec![0_i64; offsets_slice.len() * 2];
86 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
87 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
88
89 fsst::fsst::compress(
91 &mut symbol_table,
92 bytes_data.as_slice(),
93 offsets_slice,
94 &mut dest_values,
95 &mut dest_offsets,
96 )?;
97
98 let compressed = VariableWidthBlock {
100 data: LanceBuffer::reinterpret_vec(dest_values),
101 bits_per_offset: 64,
102 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
103 num_values: variable_width.num_values,
104 block_info: BlockInfo::new(),
105 };
106
107 Ok(Self {
108 data: compressed,
109 symbol_table,
110 })
111 }
112 _ => panic!(
113 "Unsupported offsets type {}",
114 variable_width.bits_per_offset
115 ),
116 }
117 }
118 _ => Err(Error::invalid_input_source(
119 format!(
120 "Cannot compress a data block of type {} with FsstEncoder",
121 data.name()
122 )
123 .into(),
124 )),
125 }
126 }
127}
128
129#[derive(Debug, Default)]
130pub struct FsstMiniBlockEncoder {
131 minichunk_size: Option<i64>,
132}
133
134impl FsstMiniBlockEncoder {
135 pub fn new(minichunk_size: Option<i64>) -> Self {
136 Self { minichunk_size }
137 }
138}
139
140impl MiniBlockCompressor for FsstMiniBlockEncoder {
141 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
142 let compressed = FsstCompressed::fsst_compress(data)?;
143
144 let data_block = DataBlock::VariableWidth(compressed.data);
145
146 let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(self.minichunk_size))
148 as Box<dyn MiniBlockCompressor>;
149
150 let (binary_miniblock_compressed, binary_array_encoding) =
151 binary_compressor.compress(data_block)?;
152
153 Ok((
154 binary_miniblock_compressed,
155 ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
156 ))
157 }
158}
159
160#[derive(Debug)]
161pub struct FsstPerValueEncoder {
162 inner: Box<dyn PerValueCompressor>,
163}
164
165impl FsstPerValueEncoder {
166 pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
167 Self { inner }
168 }
169}
170
171impl PerValueCompressor for FsstPerValueEncoder {
172 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
173 let compressed = FsstCompressed::fsst_compress(data)?;
174
175 let data_block = DataBlock::VariableWidth(compressed.data);
176
177 let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
178
179 Ok((
180 binary_compressed,
181 ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
182 ))
183 }
184}
185
186#[derive(Debug)]
187pub struct FsstPerValueDecompressor {
188 symbol_table: LanceBuffer,
189 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
190}
191
192impl FsstPerValueDecompressor {
193 pub fn new(
194 symbol_table: LanceBuffer,
195 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
196 ) -> Self {
197 Self {
198 symbol_table,
199 inner_decompressor,
200 }
201 }
202}
203
204impl VariablePerValueDecompressor for FsstPerValueDecompressor {
205 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
206 let compressed_variable_data = self
208 .inner_decompressor
209 .decompress(data)?
210 .as_variable_width()
211 .unwrap();
212
213 let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
215 let bytes = bytes.as_ref();
216
217 match compressed_variable_data.bits_per_offset {
218 32 => {
219 let offsets = compressed_variable_data
220 .offsets
221 .borrow_to_typed_slice::<i32>();
222 let offsets = offsets.as_ref();
223 let num_values = compressed_variable_data.num_values;
224
225 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
228 let mut decompress_offset_buf = vec![0i32; offsets.len()];
229 fsst::fsst::decompress(
230 &self.symbol_table,
231 bytes,
232 offsets,
233 &mut decompress_bytes_buf,
234 &mut decompress_offset_buf,
235 )?;
236
237 decompress_offset_buf.truncate((num_values + 1) as usize);
239
240 Ok(DataBlock::VariableWidth(VariableWidthBlock {
241 data: LanceBuffer::from(decompress_bytes_buf),
242 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
243 bits_per_offset: 32,
244 num_values,
245 block_info: BlockInfo::new(),
246 }))
247 }
248 64 => {
249 let offsets = compressed_variable_data
250 .offsets
251 .borrow_to_typed_slice::<i64>();
252 let offsets = offsets.as_ref();
253 let num_values = compressed_variable_data.num_values;
254
255 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
258 let mut decompress_offset_buf = vec![0i64; offsets.len()];
259 fsst::fsst::decompress(
260 &self.symbol_table,
261 bytes,
262 offsets,
263 &mut decompress_bytes_buf,
264 &mut decompress_offset_buf,
265 )?;
266
267 decompress_offset_buf.truncate((num_values + 1) as usize);
269
270 Ok(DataBlock::VariableWidth(VariableWidthBlock {
271 data: LanceBuffer::from(decompress_bytes_buf),
272 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
273 bits_per_offset: 64,
274 num_values,
275 block_info: BlockInfo::new(),
276 }))
277 }
278 _ => panic!(
279 "Unsupported offset type {}",
280 compressed_variable_data.bits_per_offset,
281 ),
282 }
283 }
284}
285
286#[derive(Debug)]
287pub struct FsstMiniBlockDecompressor {
288 symbol_table: LanceBuffer,
289 inner_decompressor: Box<dyn MiniBlockDecompressor>,
290}
291
292impl FsstMiniBlockDecompressor {
293 pub fn new(
294 description: &pb21::Fsst,
295 inner_decompressor: Box<dyn MiniBlockDecompressor>,
296 ) -> Self {
297 Self {
298 symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
299 inner_decompressor,
300 }
301 }
302}
303
304impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
305 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
306 let compressed_data_block = self.inner_decompressor.decompress(data, num_values)?;
309 let DataBlock::VariableWidth(compressed_data_block) = compressed_data_block else {
310 panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
311 };
312
313 let bytes = &compressed_data_block.data;
315 let (decompress_bytes_buf, decompress_offset_buf) =
316 if compressed_data_block.bits_per_offset == 64 {
317 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i64>();
318 let offsets = offsets.as_ref();
319
320 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
323 let mut decompress_offset_buf = vec![0i64; offsets.len()];
324 fsst::fsst::decompress(
325 &self.symbol_table,
326 bytes.as_ref(),
327 offsets,
328 &mut decompress_bytes_buf,
329 &mut decompress_offset_buf,
330 )?;
331
332 decompress_offset_buf.truncate((num_values + 1) as usize);
334
335 (
336 decompress_bytes_buf,
337 LanceBuffer::reinterpret_vec(decompress_offset_buf),
338 )
339 } else {
340 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
341 let offsets = offsets.as_ref();
342
343 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
346 let mut decompress_offset_buf = vec![0i32; offsets.len()];
347 fsst::fsst::decompress(
348 &self.symbol_table,
349 bytes.as_ref(),
350 offsets,
351 &mut decompress_bytes_buf,
352 &mut decompress_offset_buf,
353 )?;
354
355 decompress_offset_buf.truncate((num_values + 1) as usize);
357
358 (
359 decompress_bytes_buf,
360 LanceBuffer::reinterpret_vec(decompress_offset_buf),
361 )
362 };
363
364 Ok(DataBlock::VariableWidth(VariableWidthBlock {
365 data: LanceBuffer::from(decompress_bytes_buf),
366 offsets: decompress_offset_buf,
367 bits_per_offset: compressed_data_block.bits_per_offset,
368 num_values,
369 block_info: BlockInfo::new(),
370 }))
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use std::collections::HashMap;
377
378 use lance_datagen::{ByteCount, RowCount};
379
380 use crate::{
381 testing::{TestCases, check_round_trip_encoding_of_data},
382 version::LanceFileVersion,
383 };
384
385 #[test_log::test(tokio::test)]
386 async fn test_fsst() {
387 let test_cases = TestCases::default()
388 .with_expected_encoding("fsst")
389 .with_min_file_version(LanceFileVersion::V2_1);
390
391 let arr = lance_datagen::gen_batch()
393 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(100), false))
394 .into_batch_rows(RowCount::from(5000))
395 .unwrap()
396 .column(0)
397 .clone();
398
399 let metadata_explicit =
402 HashMap::from([("lance-encoding:compression".to_string(), "fsst".to_string())]);
403 check_round_trip_encoding_of_data(vec![arr.clone()], &test_cases, metadata_explicit).await;
404
405 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
408 }
409}