Skip to main content

lance_encoding/encodings/physical/
fsst.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! FSST encoding
5//!
6//! FSST is a lightweight encoding for variable width data.  This module includes
7//! adapters for both miniblock and per-value encoding.
8//!
9//! FSST encoding creates a small symbol table that is needed for decoding.  Currently
10//! we create one symbol table per disk page and store it in the description.
11//!
12//! TODO: This seems to be potentially limiting.  Perhaps we should create one symbol
13//! table per mini-block chunk?  In the per-value compression it may even make sense to
14//! create multiple symbol tables for a single value!
15//!
16//! FSST encoding is transparent.
17
18use lance_core::{Error, Result};
19
20use crate::{
21    buffer::LanceBuffer,
22    compression::{MiniBlockDecompressor, VariablePerValueDecompressor},
23    data::{BlockInfo, DataBlock, VariableWidthBlock},
24    encodings::logical::primitive::{
25        fullzip::{PerValueCompressor, PerValueDataBlock},
26        miniblock::{MiniBlockCompressed, MiniBlockCompressor},
27    },
28    format::{
29        ProtobufUtils21,
30        pb21::{self, CompressiveEncoding},
31    },
32};
33
34use super::binary::BinaryMiniBlockEncoder;
35
36struct FsstCompressed {
37    data: VariableWidthBlock,
38    symbol_table: Vec<u8>,
39}
40
41impl FsstCompressed {
42    fn fsst_compress(data: DataBlock) -> Result<Self> {
43        match data {
44            DataBlock::VariableWidth(variable_width) => {
45                match variable_width.bits_per_offset {
46                    32 => {
47                        let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
48                        let offsets_slice = offsets.as_ref();
49                        let bytes_data = variable_width.data.into_buffer();
50
51                        // prepare compression output buffer
52                        let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
53                        let mut dest_values = vec![0_u8; bytes_data.len() * 2];
54                        let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
55
56                        // fsst compression
57                        fsst::fsst::compress(
58                            &mut symbol_table,
59                            bytes_data.as_slice(),
60                            offsets_slice,
61                            &mut dest_values,
62                            &mut dest_offsets,
63                        )?;
64
65                        // construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
66                        let compressed = VariableWidthBlock {
67                            data: LanceBuffer::reinterpret_vec(dest_values),
68                            bits_per_offset: 32,
69                            offsets: LanceBuffer::reinterpret_vec(dest_offsets),
70                            num_values: variable_width.num_values,
71                            block_info: BlockInfo::new(),
72                        };
73
74                        Ok(Self {
75                            data: compressed,
76                            symbol_table,
77                        })
78                    }
79                    64 => {
80                        let offsets = variable_width.offsets.borrow_to_typed_slice::<i64>();
81                        let offsets_slice = offsets.as_ref();
82                        let bytes_data = variable_width.data.into_buffer();
83
84                        // prepare compression output buffer
85                        let mut dest_offsets = vec![0_i64; offsets_slice.len() * 2];
86                        let mut dest_values = vec![0_u8; bytes_data.len() * 2];
87                        let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
88
89                        // fsst compression
90                        fsst::fsst::compress(
91                            &mut symbol_table,
92                            bytes_data.as_slice(),
93                            offsets_slice,
94                            &mut dest_values,
95                            &mut dest_offsets,
96                        )?;
97
98                        // construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
99                        let compressed = VariableWidthBlock {
100                            data: LanceBuffer::reinterpret_vec(dest_values),
101                            bits_per_offset: 64,
102                            offsets: LanceBuffer::reinterpret_vec(dest_offsets),
103                            num_values: variable_width.num_values,
104                            block_info: BlockInfo::new(),
105                        };
106
107                        Ok(Self {
108                            data: compressed,
109                            symbol_table,
110                        })
111                    }
112                    _ => panic!(
113                        "Unsupported offsets type {}",
114                        variable_width.bits_per_offset
115                    ),
116                }
117            }
118            _ => Err(Error::invalid_input_source(
119                format!(
120                    "Cannot compress a data block of type {} with FsstEncoder",
121                    data.name()
122                )
123                .into(),
124            )),
125        }
126    }
127}
128
129#[derive(Debug, Default)]
130pub struct FsstMiniBlockEncoder {
131    minichunk_size: Option<i64>,
132}
133
134impl FsstMiniBlockEncoder {
135    pub fn new(minichunk_size: Option<i64>) -> Self {
136        Self { minichunk_size }
137    }
138}
139
140impl MiniBlockCompressor for FsstMiniBlockEncoder {
141    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
142        let compressed = FsstCompressed::fsst_compress(data)?;
143
144        let data_block = DataBlock::VariableWidth(compressed.data);
145
146        // compress the fsst compressed data using `BinaryMiniBlockEncoder`
147        let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(self.minichunk_size))
148            as Box<dyn MiniBlockCompressor>;
149
150        let (binary_miniblock_compressed, binary_array_encoding) =
151            binary_compressor.compress(data_block)?;
152
153        Ok((
154            binary_miniblock_compressed,
155            ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
156        ))
157    }
158}
159
160#[derive(Debug)]
161pub struct FsstPerValueEncoder {
162    inner: Box<dyn PerValueCompressor>,
163}
164
165impl FsstPerValueEncoder {
166    pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
167        Self { inner }
168    }
169}
170
171impl PerValueCompressor for FsstPerValueEncoder {
172    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
173        let compressed = FsstCompressed::fsst_compress(data)?;
174
175        let data_block = DataBlock::VariableWidth(compressed.data);
176
177        let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
178
179        Ok((
180            binary_compressed,
181            ProtobufUtils21::fsst(binary_array_encoding, compressed.symbol_table),
182        ))
183    }
184}
185
186#[derive(Debug)]
187pub struct FsstPerValueDecompressor {
188    symbol_table: LanceBuffer,
189    inner_decompressor: Box<dyn VariablePerValueDecompressor>,
190}
191
192impl FsstPerValueDecompressor {
193    pub fn new(
194        symbol_table: LanceBuffer,
195        inner_decompressor: Box<dyn VariablePerValueDecompressor>,
196    ) -> Self {
197        Self {
198            symbol_table,
199            inner_decompressor,
200        }
201    }
202}
203
204impl VariablePerValueDecompressor for FsstPerValueDecompressor {
205    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
206        // Step 1. Run inner decompressor
207        let compressed_variable_data = self
208            .inner_decompressor
209            .decompress(data)?
210            .as_variable_width()
211            .unwrap();
212
213        // Step 2. FSST decompress
214        let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
215        let bytes = bytes.as_ref();
216
217        match compressed_variable_data.bits_per_offset {
218            32 => {
219                let offsets = compressed_variable_data
220                    .offsets
221                    .borrow_to_typed_slice::<i32>();
222                let offsets = offsets.as_ref();
223                let num_values = compressed_variable_data.num_values;
224
225                // The data will expand at most 8 times
226                // The offsets will be the same size because we have the same # of strings
227                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
228                let mut decompress_offset_buf = vec![0i32; offsets.len()];
229                fsst::fsst::decompress(
230                    &self.symbol_table,
231                    bytes,
232                    offsets,
233                    &mut decompress_bytes_buf,
234                    &mut decompress_offset_buf,
235                )?;
236
237                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
238                decompress_offset_buf.truncate((num_values + 1) as usize);
239
240                Ok(DataBlock::VariableWidth(VariableWidthBlock {
241                    data: LanceBuffer::from(decompress_bytes_buf),
242                    offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
243                    bits_per_offset: 32,
244                    num_values,
245                    block_info: BlockInfo::new(),
246                }))
247            }
248            64 => {
249                let offsets = compressed_variable_data
250                    .offsets
251                    .borrow_to_typed_slice::<i64>();
252                let offsets = offsets.as_ref();
253                let num_values = compressed_variable_data.num_values;
254
255                // The data will expand at most 8 times
256                // The offsets will be the same size because we have the same # of strings
257                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
258                let mut decompress_offset_buf = vec![0i64; offsets.len()];
259                fsst::fsst::decompress(
260                    &self.symbol_table,
261                    bytes,
262                    offsets,
263                    &mut decompress_bytes_buf,
264                    &mut decompress_offset_buf,
265                )?;
266
267                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
268                decompress_offset_buf.truncate((num_values + 1) as usize);
269
270                Ok(DataBlock::VariableWidth(VariableWidthBlock {
271                    data: LanceBuffer::from(decompress_bytes_buf),
272                    offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
273                    bits_per_offset: 64,
274                    num_values,
275                    block_info: BlockInfo::new(),
276                }))
277            }
278            _ => panic!(
279                "Unsupported offset type {}",
280                compressed_variable_data.bits_per_offset,
281            ),
282        }
283    }
284}
285
286#[derive(Debug)]
287pub struct FsstMiniBlockDecompressor {
288    symbol_table: LanceBuffer,
289    inner_decompressor: Box<dyn MiniBlockDecompressor>,
290}
291
292impl FsstMiniBlockDecompressor {
293    pub fn new(
294        description: &pb21::Fsst,
295        inner_decompressor: Box<dyn MiniBlockDecompressor>,
296    ) -> Self {
297        Self {
298            symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
299            inner_decompressor,
300        }
301    }
302}
303
304impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
305    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
306        // Step 1. decompress data use `BinaryMiniBlockDecompressor`
307        // Extract the bits_per_offset from the binary encoding
308        let compressed_data_block = self.inner_decompressor.decompress(data, num_values)?;
309        let DataBlock::VariableWidth(compressed_data_block) = compressed_data_block else {
310            panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
311        };
312
313        // Step 2. FSST decompress
314        let bytes = &compressed_data_block.data;
315        let (decompress_bytes_buf, decompress_offset_buf) =
316            if compressed_data_block.bits_per_offset == 64 {
317                let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i64>();
318                let offsets = offsets.as_ref();
319
320                // The data will expand at most 8 times
321                // The offsets will be the same size because we have the same # of strings
322                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
323                let mut decompress_offset_buf = vec![0i64; offsets.len()];
324                fsst::fsst::decompress(
325                    &self.symbol_table,
326                    bytes.as_ref(),
327                    offsets,
328                    &mut decompress_bytes_buf,
329                    &mut decompress_offset_buf,
330                )?;
331
332                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
333                decompress_offset_buf.truncate((num_values + 1) as usize);
334
335                (
336                    decompress_bytes_buf,
337                    LanceBuffer::reinterpret_vec(decompress_offset_buf),
338                )
339            } else {
340                let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
341                let offsets = offsets.as_ref();
342
343                // The data will expand at most 8 times
344                // The offsets will be the same size because we have the same # of strings
345                let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
346                let mut decompress_offset_buf = vec![0i32; offsets.len()];
347                fsst::fsst::decompress(
348                    &self.symbol_table,
349                    bytes.as_ref(),
350                    offsets,
351                    &mut decompress_bytes_buf,
352                    &mut decompress_offset_buf,
353                )?;
354
355                // Ensure the offsets array is trimmed to exactly num_values + 1 elements
356                decompress_offset_buf.truncate((num_values + 1) as usize);
357
358                (
359                    decompress_bytes_buf,
360                    LanceBuffer::reinterpret_vec(decompress_offset_buf),
361                )
362            };
363
364        Ok(DataBlock::VariableWidth(VariableWidthBlock {
365            data: LanceBuffer::from(decompress_bytes_buf),
366            offsets: decompress_offset_buf,
367            bits_per_offset: compressed_data_block.bits_per_offset,
368            num_values,
369            block_info: BlockInfo::new(),
370        }))
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use std::collections::HashMap;
377
378    use lance_datagen::{ByteCount, RowCount};
379
380    use crate::{
381        testing::{TestCases, check_round_trip_encoding_of_data},
382        version::LanceFileVersion,
383    };
384
385    #[test_log::test(tokio::test)]
386    async fn test_fsst() {
387        let test_cases = TestCases::default()
388            .with_expected_encoding("fsst")
389            .with_min_file_version(LanceFileVersion::V2_1);
390
391        // Generate data suitable for FSST (large strings, total size > 32KB)
392        let arr = lance_datagen::gen_batch()
393            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(100), false))
394            .into_batch_rows(RowCount::from(5000))
395            .unwrap()
396            .column(0)
397            .clone();
398
399        // Test both explicit metadata and automatic selection
400        // 1. Test with explicit FSST metadata
401        let metadata_explicit =
402            HashMap::from([("lance-encoding:compression".to_string(), "fsst".to_string())]);
403        check_round_trip_encoding_of_data(vec![arr.clone()], &test_cases, metadata_explicit).await;
404
405        // 2. Test automatic FSST selection based on data characteristics
406        // FSST should be chosen automatically: max_len >= 5 and total_size >= 32KB
407        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
408    }
409}