lance_encoding/encodings/physical/
fsst.rs1use lance_core::{Error, Result};
19use snafu::location;
20
21use crate::{
22 buffer::LanceBuffer,
23 compression::{MiniBlockDecompressor, VariablePerValueDecompressor},
24 data::{BlockInfo, DataBlock, VariableWidthBlock},
25 encodings::logical::primitive::{
26 fullzip::{PerValueCompressor, PerValueDataBlock},
27 miniblock::{MiniBlockCompressed, MiniBlockCompressor},
28 },
29 format::{
30 pb::{self},
31 ProtobufUtils,
32 },
33};
34
35use super::binary::BinaryMiniBlockEncoder;
36
37struct FsstCompressed {
38 data: VariableWidthBlock,
39 symbol_table: Vec<u8>,
40}
41
42impl FsstCompressed {
43 fn fsst_compress(data: DataBlock) -> Result<Self> {
44 match data {
45 DataBlock::VariableWidth(mut variable_width) => {
46 match variable_width.bits_per_offset {
47 32 => {
48 let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
49 let offsets_slice = offsets.as_ref();
50 let bytes_data = variable_width.data.into_buffer();
51
52 let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
54 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
55 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
56
57 fsst::fsst::compress(
59 &mut symbol_table,
60 bytes_data.as_slice(),
61 offsets_slice,
62 &mut dest_values,
63 &mut dest_offsets,
64 )?;
65
66 let compressed = VariableWidthBlock {
68 data: LanceBuffer::reinterpret_vec(dest_values),
69 bits_per_offset: 32,
70 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
71 num_values: variable_width.num_values,
72 block_info: BlockInfo::new(),
73 };
74
75 Ok(Self {
76 data: compressed,
77 symbol_table,
78 })
79 }
80 64 => {
81 let offsets = variable_width.offsets.borrow_to_typed_slice::<i64>();
82 let offsets_slice = offsets.as_ref();
83 let bytes_data = variable_width.data.into_buffer();
84
85 let mut dest_offsets = vec![0_i64; offsets_slice.len() * 2];
87 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
88 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
89
90 fsst::fsst::compress(
92 &mut symbol_table,
93 bytes_data.as_slice(),
94 offsets_slice,
95 &mut dest_values,
96 &mut dest_offsets,
97 )?;
98
99 let compressed = VariableWidthBlock {
101 data: LanceBuffer::reinterpret_vec(dest_values),
102 bits_per_offset: 64,
103 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
104 num_values: variable_width.num_values,
105 block_info: BlockInfo::new(),
106 };
107
108 Ok(Self {
109 data: compressed,
110 symbol_table,
111 })
112 }
113 _ => panic!(
114 "Unsupported offsets type {}",
115 variable_width.bits_per_offset
116 ),
117 }
118 }
119 _ => Err(Error::InvalidInput {
120 source: format!(
121 "Cannot compress a data block of type {} with FsstEncoder",
122 data.name()
123 )
124 .into(),
125 location: location!(),
126 }),
127 }
128 }
129}
130
131#[derive(Debug, Default)]
132pub struct FsstMiniBlockEncoder {}
133
134impl MiniBlockCompressor for FsstMiniBlockEncoder {
135 fn compress(
136 &self,
137 data: DataBlock,
138 ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
139 let compressed = FsstCompressed::fsst_compress(data)?;
140
141 let data_block = DataBlock::VariableWidth(compressed.data);
142
143 let binary_compressor =
145 Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;
146
147 let (binary_miniblock_compressed, binary_array_encoding) =
148 binary_compressor.compress(data_block)?;
149
150 Ok((
151 binary_miniblock_compressed,
152 ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
153 ))
154 }
155}
156
157#[derive(Debug)]
158pub struct FsstPerValueEncoder {
159 inner: Box<dyn PerValueCompressor>,
160}
161
162impl FsstPerValueEncoder {
163 pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
164 Self { inner }
165 }
166}
167
168impl PerValueCompressor for FsstPerValueEncoder {
169 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
170 let compressed = FsstCompressed::fsst_compress(data)?;
171
172 let data_block = DataBlock::VariableWidth(compressed.data);
173
174 let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
175
176 Ok((
177 binary_compressed,
178 ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
179 ))
180 }
181}
182
183#[derive(Debug)]
184pub struct FsstPerValueDecompressor {
185 symbol_table: LanceBuffer,
186 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
187}
188
189impl FsstPerValueDecompressor {
190 pub fn new(
191 symbol_table: LanceBuffer,
192 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
193 ) -> Self {
194 Self {
195 symbol_table,
196 inner_decompressor,
197 }
198 }
199}
200
201impl VariablePerValueDecompressor for FsstPerValueDecompressor {
202 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
203 let mut compressed_variable_data = self
205 .inner_decompressor
206 .decompress(data)?
207 .as_variable_width()
208 .unwrap();
209
210 let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
212 let bytes = bytes.as_ref();
213
214 match compressed_variable_data.bits_per_offset {
215 32 => {
216 let offsets = compressed_variable_data
217 .offsets
218 .borrow_to_typed_slice::<i32>();
219 let offsets = offsets.as_ref();
220 let num_values = compressed_variable_data.num_values;
221
222 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
225 let mut decompress_offset_buf = vec![0i32; offsets.len()];
226 fsst::fsst::decompress(
227 &self.symbol_table,
228 bytes,
229 offsets,
230 &mut decompress_bytes_buf,
231 &mut decompress_offset_buf,
232 )?;
233
234 decompress_offset_buf.truncate((num_values + 1) as usize);
236
237 Ok(DataBlock::VariableWidth(VariableWidthBlock {
238 data: LanceBuffer::Owned(decompress_bytes_buf),
239 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
240 bits_per_offset: 32,
241 num_values,
242 block_info: BlockInfo::new(),
243 }))
244 }
245 64 => {
246 let offsets = compressed_variable_data
247 .offsets
248 .borrow_to_typed_slice::<i64>();
249 let offsets = offsets.as_ref();
250 let num_values = compressed_variable_data.num_values;
251
252 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
255 let mut decompress_offset_buf = vec![0i64; offsets.len()];
256 fsst::fsst::decompress(
257 &self.symbol_table,
258 bytes,
259 offsets,
260 &mut decompress_bytes_buf,
261 &mut decompress_offset_buf,
262 )?;
263
264 decompress_offset_buf.truncate((num_values + 1) as usize);
266
267 Ok(DataBlock::VariableWidth(VariableWidthBlock {
268 data: LanceBuffer::Owned(decompress_bytes_buf),
269 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
270 bits_per_offset: 64,
271 num_values,
272 block_info: BlockInfo::new(),
273 }))
274 }
275 _ => panic!(
276 "Unsupported offset type {}",
277 compressed_variable_data.bits_per_offset,
278 ),
279 }
280 }
281}
282
283#[derive(Debug)]
284pub struct FsstMiniBlockDecompressor {
285 symbol_table: LanceBuffer,
286 inner_decompressor: Box<dyn MiniBlockDecompressor>,
287}
288
289impl FsstMiniBlockDecompressor {
290 pub fn new(description: &pb::Fsst, inner_decompressor: Box<dyn MiniBlockDecompressor>) -> Self {
291 Self {
292 symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
293 inner_decompressor,
294 }
295 }
296}
297
298impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
299 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
300 let compressed_data_block = self.inner_decompressor.decompress(data, num_values)?;
303 let DataBlock::VariableWidth(mut compressed_data_block) = compressed_data_block else {
304 panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
305 };
306
307 let bytes = compressed_data_block.data.borrow_and_clone();
309 let (decompress_bytes_buf, decompress_offset_buf) =
310 if compressed_data_block.bits_per_offset == 64 {
311 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i64>();
312 let offsets = offsets.as_ref();
313
314 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
317 let mut decompress_offset_buf = vec![0i64; offsets.len()];
318 fsst::fsst::decompress(
319 &self.symbol_table,
320 bytes.as_ref(),
321 offsets,
322 &mut decompress_bytes_buf,
323 &mut decompress_offset_buf,
324 )?;
325
326 decompress_offset_buf.truncate((num_values + 1) as usize);
328
329 (
330 decompress_bytes_buf,
331 LanceBuffer::reinterpret_vec(decompress_offset_buf),
332 )
333 } else {
334 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
335 let offsets = offsets.as_ref();
336
337 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
340 let mut decompress_offset_buf = vec![0i32; offsets.len()];
341 fsst::fsst::decompress(
342 &self.symbol_table,
343 bytes.as_ref(),
344 offsets,
345 &mut decompress_bytes_buf,
346 &mut decompress_offset_buf,
347 )?;
348
349 decompress_offset_buf.truncate((num_values + 1) as usize);
351
352 (
353 decompress_bytes_buf,
354 LanceBuffer::reinterpret_vec(decompress_offset_buf),
355 )
356 };
357
358 Ok(DataBlock::VariableWidth(VariableWidthBlock {
359 data: LanceBuffer::Owned(decompress_bytes_buf),
360 offsets: decompress_offset_buf,
361 bits_per_offset: compressed_data_block.bits_per_offset,
362 num_values,
363 block_info: BlockInfo::new(),
364 }))
365 }
366}
367
368#[cfg(test)]
369mod tests {
370
371 use std::collections::HashMap;
372
373 use lance_datagen::{ByteCount, RowCount};
374
375 use crate::{
376 testing::{check_round_trip_encoding_of_data, TestCases},
377 version::LanceFileVersion,
378 };
379
380 #[test_log::test(tokio::test)]
381 async fn test_fsst() {
382 let arr = lance_datagen::gen()
383 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(32), false))
384 .into_batch_rows(RowCount::from(1_000_000))
385 .unwrap()
386 .column(0)
387 .clone();
388 check_round_trip_encoding_of_data(
389 vec![arr],
390 &TestCases::default().with_file_version(LanceFileVersion::V2_1),
391 HashMap::new(),
392 )
393 .await;
394
395 let arr = lance_datagen::gen()
396 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(64), false))
397 .into_batch_rows(RowCount::from(1_000_000))
398 .unwrap()
399 .column(0)
400 .clone();
401 check_round_trip_encoding_of_data(
402 vec![arr],
403 &TestCases::default().with_file_version(LanceFileVersion::V2_1),
404 HashMap::new(),
405 )
406 .await;
407 }
408}