Skip to main content

lance_encoding/encodings/physical/
fsst.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! FSST encoding
5//!
6//! FSST is a lightweight encoding for variable width data.  This module includes
7//! adapters for both miniblock and per-value encoding.
8//!
9//! FSST encoding creates a small symbol table that is needed for decoding.  Currently
10//! we create one symbol table per disk page and store it in the description.
11//!
12//! TODO: This seems to be potentially limiting.  Perhaps we should create one symbol
13//! table per mini-block chunk?  In the per-value compression it may even make sense to
14//! create multiple symbol tables for a single value!
15//!
16//! FSST encoding is transparent.
17
18use lance_core::{Error, Result};
19use snafu::location;
20
21use crate::{
22    buffer::LanceBuffer,
23    compression::{MiniBlockDecompressor, VariablePerValueDecompressor},
24    data::{BlockInfo, DataBlock, VariableWidthBlock},
25    encodings::logical::primitive::{
26        fullzip::{PerValueCompressor, PerValueDataBlock},
27        miniblock::{MiniBlockCompressed, MiniBlockCompressor},
28    },
29    format::{
30        pb21::{self, CompressiveEncoding},
31        ProtobufUtils21,
32    },
33};
34
35use super::binary::BinaryMiniBlockEncoder;
36
37struct FsstCompressed {
38    data: VariableWidthBlock,
39    symbol_table: Vec<u8>,
40}
41
42impl FsstCompressed {
43    fn fsst_compress(data: DataBlock) -> Result<Self> {
44        match data {
45            DataBlock::VariableWidth(variable_width) => {
46                match variable_width.bits_per_offset {
47                    32 => {
48                        let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
49                        let offsets_slice = offsets.as_ref();
50                        let bytes_data = variable_width.data.into_buffer();
51
52                        // prepare compression output buffer
53                        let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
54                        let mut dest_values = vec![0_u8; bytes_data.len() * 2];
55                        let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
56
57                        // fsst compression
58                        fsst::fsst::compress(
59                            &mut symbol_table,
60                            bytes_data.as_slice(),
61                            offsets_slice,
62                            &mut dest_values,
63                            &mut dest_offsets,
64                        )?;
65
66                        // construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
67                        let compressed = VariableWidthBlock {
68                            data: LanceBuffer::reinterpret_vec(dest_values),
69                            bits_per_offset: 32,
70                            offsets: LanceBuffer::reinterpret_vec(dest_offsets),
71                            num_values: variable_width.num_values,
72                            block_info: BlockInfo::new(),
73                        };
74
75                        Ok(Self {
76                            data: compressed,
77                            symbol_table,
78                        })
79                    }
80                    64 => {
81                        let offsets = variable_width.offsets.borrow_to_typed_slice::<i64>();
82                        let offsets_slice = offsets.as_ref();
83                        let bytes_data = variable_width.data.into_buffer();
84
85                        // prepare compression output buffer
86                        let mut dest_offsets = vec![0_i64; offsets_slice.len() * 2];
87                        let mut dest_values = vec![0_u8; bytes_data.len() * 2];
88                        let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
89
90                        // fsst compression
91                        fsst::fsst::compress(
92                            &mut symbol_table,
93                            bytes_data.as_slice(),
94                            offsets_slice,
95                            &mut dest_values,
96                            &mut dest_offsets,
97                        )?;
98
99                        // construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
100                        let compressed = VariableWidthBlock {
101                            data: LanceBuffer::reinterpret_vec(dest_values),
102                            bits_per_offset: 64,
103                            offsets: LanceBuffer::reinterpret_vec(dest_offsets),
104                            num_values: variable_width.num_values,
105                            block_info: BlockInfo::new(),
106                        };
107
108                        Ok(Self {
109                            data: compressed,
110                            symbol_table,
111                        })
112                    }
113                    _ => panic!(
114                        "Unsupported offsets type {}",
115                        variable_width.bits_per_offset
116                    ),
117                }
118            }
119            _ => Err(Error::InvalidInput {
120                source: format!(
121                    "Cannot compress a data block of type {} with FsstEncoder",
122                    data.name()
123                )
124                .into(),
125                location: location!(),
126            }),
127        }
128    }
129}
130
131#[derive(Debug, Default)]
132pub struct FsstMiniBlockEncoder {
133    minichunk_size: Option<i64>,
134}
135
136impl FsstMiniBlockEncoder {
137    pub fn new(minichunk_size: Option<i64>) -> Self {
138        Self { minichunk_size }
139    }
140}
141
142impl MiniBlockCompressor for FsstMiniBlockEncoder {
143    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
144        let compressed = FsstCompressed::fsst_compress(data)?;
145
146        let data_block = DataBlock::VariableWidth(compressed.data);
147
148        // compress the fsst compressed data using `BinaryMiniBlockEncoder`
149        let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(self.minichunk_size))
150            as Box<dyn MiniBlockCompressor>;
151
152        let (binary_miniblock_compressed, binary_array_encoding) =
153            binary_compressor.compress(data_block)?;
154
155        Ok((
156            binary_miniblock_compressed,
157            ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
158        ))
159    }
160}
161
162#[derive(Debug)]
163pub struct FsstPerValueEncoder {
164    inner: Box<dyn PerValueCompressor>,
165}
166
167impl FsstPerValueEncoder {
168    pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
169        Self { inner }
170    }
171}
172
173impl PerValueCompressor for FsstPerValueEncoder {
174    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
175        let compressed = FsstCompressed::fsst_compress(data)?;
176
177        let data_block = DataBlock::VariableWidth(compressed.data);
178
179        let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
180
181        Ok((
182            binary_compressed,
183            ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
184        ))
185    }
186}
187
188#[derive(Debug)]
189pub struct FsstPerValueDecompressor {
190    symbol_table: LanceBuffer,
191    inner_decompressor: Box<dyn VariablePerValueDecompressor>,
192}
193
194impl FsstPerValueDecompressor {
195    pub fn new(
196        symbol_table: LanceBuffer,
197        inner_decompressor: Box<dyn VariablePerValueDecompressor>,
198    ) -> Self {
199        Self {
200            symbol_table,
201            inner_decompressor,
202        }
203    }
204}
205
206impl VariablePerValueDecompressor for FsstPerValueDecompressor {
207    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
208        // Step 1. Run inner decompressor
209        let compressed_variable_data = self
210            .inner_decompressor
211            .decompress(data)?
212            .as_variable_width()
213            .unwrap();
214
215        // Step 2. FSST decompress
216        let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
217        let bytes = bytes.as_ref();
218
219        match compressed_variable_data.bits_per_offset {
220            32 => {
221                let offsets = compressed_variable_data
222                    .offsets
223                    .borrow_to_typed_slice::<i32>();
224                let offsets = offsets.as_ref();
225                let num_values = compressed_variable_data.num_values;
226
227                // The data will expand at most 8 times
228                // The offsets will be the same size because we have the same # of strings
229                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
230                let mut decompress_offset_buf = vec![0i32; offsets.len()];
231                fsst::fsst::decompress(
232                    &self.symbol_table,
233                    bytes,
234                    offsets,
235                    &mut decompress_bytes_buf,
236                    &mut decompress_offset_buf,
237                )?;
238
239                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
240                decompress_offset_buf.truncate((num_values + 1) as usize);
241
242                Ok(DataBlock::VariableWidth(VariableWidthBlock {
243                    data: LanceBuffer::from(decompress_bytes_buf),
244                    offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
245                    bits_per_offset: 32,
246                    num_values,
247                    block_info: BlockInfo::new(),
248                }))
249            }
250            64 => {
251                let offsets = compressed_variable_data
252                    .offsets
253                    .borrow_to_typed_slice::<i64>();
254                let offsets = offsets.as_ref();
255                let num_values = compressed_variable_data.num_values;
256
257                // The data will expand at most 8 times
258                // The offsets will be the same size because we have the same # of strings
259                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
260                let mut decompress_offset_buf = vec![0i64; offsets.len()];
261                fsst::fsst::decompress(
262                    &self.symbol_table,
263                    bytes,
264                    offsets,
265                    &mut decompress_bytes_buf,
266                    &mut decompress_offset_buf,
267                )?;
268
269                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
270                decompress_offset_buf.truncate((num_values + 1) as usize);
271
272                Ok(DataBlock::VariableWidth(VariableWidthBlock {
273                    data: LanceBuffer::from(decompress_bytes_buf),
274                    offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
275                    bits_per_offset: 64,
276                    num_values,
277                    block_info: BlockInfo::new(),
278                }))
279            }
280            _ => panic!(
281                "Unsupported offset type {}",
282                compressed_variable_data.bits_per_offset,
283            ),
284        }
285    }
286}
287
288#[derive(Debug)]
289pub struct FsstMiniBlockDecompressor {
290    symbol_table: LanceBuffer,
291    inner_decompressor: Box<dyn MiniBlockDecompressor>,
292}
293
294impl FsstMiniBlockDecompressor {
295    pub fn new(
296        description: &pb21::Fsst,
297        inner_decompressor: Box<dyn MiniBlockDecompressor>,
298    ) -> Self {
299        Self {
300            symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
301            inner_decompressor,
302        }
303    }
304}
305
306impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
307    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
308        // Step 1. decompress data use `BinaryMiniBlockDecompressor`
309        // Extract the bits_per_offset from the binary encoding
310        let compressed_data_block = self.inner_decompressor.decompress(data, num_values)?;
311        let DataBlock::VariableWidth(compressed_data_block) = compressed_data_block else {
312            panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
313        };
314
315        // Step 2. FSST decompress
316        let bytes = &compressed_data_block.data;
317        let (decompress_bytes_buf, decompress_offset_buf) =
318            if compressed_data_block.bits_per_offset == 64 {
319                let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i64>();
320                let offsets = offsets.as_ref();
321
322                // The data will expand at most 8 times
323                // The offsets will be the same size because we have the same # of strings
324                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
325                let mut decompress_offset_buf = vec![0i64; offsets.len()];
326                fsst::fsst::decompress(
327                    &self.symbol_table,
328                    bytes.as_ref(),
329                    offsets,
330                    &mut decompress_bytes_buf,
331                    &mut decompress_offset_buf,
332                )?;
333
334                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
335                decompress_offset_buf.truncate((num_values + 1) as usize);
336
337                (
338                    decompress_bytes_buf,
339                    LanceBuffer::reinterpret_vec(decompress_offset_buf),
340                )
341            } else {
342                let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
343                let offsets = offsets.as_ref();
344
345                // The data will expand at most 8 times
346                // The offsets will be the same size because we have the same # of strings
347                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
348                let mut decompress_offset_buf = vec![0i32; offsets.len()];
349                fsst::fsst::decompress(
350                    &self.symbol_table,
351                    bytes.as_ref(),
352                    offsets,
353                    &mut decompress_bytes_buf,
354                    &mut decompress_offset_buf,
355                )?;
356
357                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
358                decompress_offset_buf.truncate((num_values + 1) as usize);
359
360                (
361                    decompress_bytes_buf,
362                    LanceBuffer::reinterpret_vec(decompress_offset_buf),
363                )
364            };
365
366        Ok(DataBlock::VariableWidth(VariableWidthBlock {
367            data: LanceBuffer::from(decompress_bytes_buf),
368            offsets: decompress_offset_buf,
369            bits_per_offset: compressed_data_block.bits_per_offset,
370            num_values,
371            block_info: BlockInfo::new(),
372        }))
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use std::collections::HashMap;
379
380    use lance_datagen::{ByteCount, RowCount};
381
382    use crate::{
383        testing::{check_round_trip_encoding_of_data, TestCases},
384        version::LanceFileVersion,
385    };
386
387    #[test_log::test(tokio::test)]
388    async fn test_fsst() {
389        let test_cases = TestCases::default()
390            .with_expected_encoding("fsst")
391            .with_min_file_version(LanceFileVersion::V2_1);
392
393        // Generate data suitable for FSST (large strings, total size > 32KB)
394        let arr = lance_datagen::gen_batch()
395            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(100), false))
396            .into_batch_rows(RowCount::from(5000))
397            .unwrap()
398            .column(0)
399            .clone();
400
401        // Test both explicit metadata and automatic selection
402        // 1. Test with explicit FSST metadata
403        let metadata_explicit =
404            HashMap::from([("lance-encoding:compression".to_string(), "fsst".to_string())]);
405        check_round_trip_encoding_of_data(vec![arr.clone()], &test_cases, metadata_explicit).await;
406
407        // 2. Test automatic FSST selection based on data characteristics
408        // FSST should be chosen automatically: max_len >= 5 and total_size >= 32KB
409        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
410    }
411}