lance_encoding/encodings/physical/
fsst.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! FSST encoding
5//!
6//! FSST is a lightweight encoding for variable width data.  This module includes
7//! adapters for both miniblock and per-value encoding.
8//!
9//! FSST encoding creates a small symbol table that is needed for decoding.  Currently
10//! we create one symbol table per disk page and store it in the description.
11//!
12//! TODO: This seems to be potentially limiting.  Perhaps we should create one symbol
13//! table per mini-block chunk?  In the per-value compression it may even make sense to
14//! create multiple symbol tables for a single value!
15//!
16//! FSST encoding is transparent.
17
18use lance_core::{Error, Result};
19use snafu::location;
20
21use crate::{
22    buffer::LanceBuffer,
23    compression::{MiniBlockDecompressor, VariablePerValueDecompressor},
24    data::{BlockInfo, DataBlock, VariableWidthBlock},
25    encodings::logical::primitive::{
26        fullzip::{PerValueCompressor, PerValueDataBlock},
27        miniblock::{MiniBlockCompressed, MiniBlockCompressor},
28    },
29    format::{
30        pb::{self},
31        ProtobufUtils,
32    },
33};
34
35use super::binary::{BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder};
36
37struct FsstCompressed {
38    data: VariableWidthBlock,
39    symbol_table: Vec<u8>,
40}
41
42impl FsstCompressed {
43    fn fsst_compress(data: DataBlock) -> Result<Self> {
44        match data {
45            DataBlock::VariableWidth(mut variable_width) => {
46                let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
47                let offsets_slice = offsets.as_ref();
48                let bytes_data = variable_width.data.into_buffer();
49
50                // prepare compression output buffer
51                let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
52                let mut dest_values = vec![0_u8; bytes_data.len() * 2];
53                let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
54
55                // fsst compression
56                fsst::fsst::compress(
57                    &mut symbol_table,
58                    bytes_data.as_slice(),
59                    offsets_slice,
60                    &mut dest_values,
61                    &mut dest_offsets,
62                )?;
63
64                // construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
65                let compressed = VariableWidthBlock {
66                    data: LanceBuffer::reinterpret_vec(dest_values),
67                    bits_per_offset: 32,
68                    offsets: LanceBuffer::reinterpret_vec(dest_offsets),
69                    num_values: variable_width.num_values,
70                    block_info: BlockInfo::new(),
71                };
72
73                Ok(Self {
74                    data: compressed,
75                    symbol_table,
76                })
77            }
78            _ => Err(Error::InvalidInput {
79                source: format!(
80                    "Cannot compress a data block of type {} with FsstEncoder",
81                    data.name()
82                )
83                .into(),
84                location: location!(),
85            }),
86        }
87    }
88}
89
90#[derive(Debug, Default)]
91pub struct FsstMiniBlockEncoder {}
92
93impl MiniBlockCompressor for FsstMiniBlockEncoder {
94    fn compress(
95        &self,
96        data: DataBlock,
97    ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
98        let compressed = FsstCompressed::fsst_compress(data)?;
99
100        let data_block = DataBlock::VariableWidth(compressed.data);
101
102        // compress the fsst compressed data using `BinaryMiniBlockEncoder`
103        let binary_compressor =
104            Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;
105
106        let (binary_miniblock_compressed, binary_array_encoding) =
107            binary_compressor.compress(data_block)?;
108
109        Ok((
110            binary_miniblock_compressed,
111            ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
112        ))
113    }
114}
115
116#[derive(Debug)]
117pub struct FsstPerValueEncoder {
118    inner: Box<dyn PerValueCompressor>,
119}
120
121impl FsstPerValueEncoder {
122    pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
123        Self { inner }
124    }
125}
126
127impl PerValueCompressor for FsstPerValueEncoder {
128    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
129        let compressed = FsstCompressed::fsst_compress(data)?;
130
131        let data_block = DataBlock::VariableWidth(compressed.data);
132
133        let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
134
135        Ok((
136            binary_compressed,
137            ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
138        ))
139    }
140}
141
142#[derive(Debug)]
143pub struct FsstPerValueDecompressor {
144    symbol_table: LanceBuffer,
145    inner_decompressor: Box<dyn VariablePerValueDecompressor>,
146}
147
148impl FsstPerValueDecompressor {
149    pub fn new(
150        symbol_table: LanceBuffer,
151        inner_decompressor: Box<dyn VariablePerValueDecompressor>,
152    ) -> Self {
153        Self {
154            symbol_table,
155            inner_decompressor,
156        }
157    }
158}
159
160impl VariablePerValueDecompressor for FsstPerValueDecompressor {
161    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
162        // Step 1. Run inner decompressor
163        let mut compressed_variable_data = self
164            .inner_decompressor
165            .decompress(data)?
166            .as_variable_width()
167            .unwrap();
168
169        // Step 2. FSST decompress
170        let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
171        let bytes = bytes.as_ref();
172        let offsets = compressed_variable_data
173            .offsets
174            .borrow_to_typed_slice::<i32>();
175        let offsets = offsets.as_ref();
176        let num_values = compressed_variable_data.num_values;
177
178        // The data will expand at most 8 times
179        // The offsets will be the same size because we have the same # of strings
180        let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
181        let mut decompress_offset_buf = vec![0i32; offsets.len()];
182        fsst::fsst::decompress(
183            &self.symbol_table,
184            bytes,
185            offsets,
186            &mut decompress_bytes_buf,
187            &mut decompress_offset_buf,
188        )?;
189
190        Ok(DataBlock::VariableWidth(VariableWidthBlock {
191            data: LanceBuffer::Owned(decompress_bytes_buf),
192            offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
193            bits_per_offset: 32,
194            num_values,
195            block_info: BlockInfo::new(),
196        }))
197    }
198}
199
200#[derive(Debug)]
201pub struct FsstMiniBlockDecompressor {
202    symbol_table: LanceBuffer,
203}
204
205impl FsstMiniBlockDecompressor {
206    pub fn new(description: &pb::Fsst) -> Self {
207        Self {
208            symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
209        }
210    }
211}
212
213impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
214    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
215        // Step 1. decompress data use `BinaryMiniBlockDecompressor`
216        // TODO: Support 64 bits for FSST compressor
217        let binary_decompressor =
218            Box::new(BinaryMiniBlockDecompressor::new(32)) as Box<dyn MiniBlockDecompressor>;
219        let compressed_data_block = binary_decompressor.decompress(data, num_values)?;
220        let DataBlock::VariableWidth(mut compressed_data_block) = compressed_data_block else {
221            panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
222        };
223
224        // Step 2. FSST decompress
225        let bytes = compressed_data_block.data.borrow_to_typed_slice::<u8>();
226        let bytes = bytes.as_ref();
227        let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
228        let offsets = offsets.as_ref();
229
230        // FSST decompression output buffer, the `MiniBlock` has a size limit of `4 KiB` and
231        // the FSST decompression algorithm output is at most `8 * input_size`
232        // Since `MiniBlock Size` <= 4 KiB and `offsets` are type `i32, it has number of `offsets` <= 1024.
233        let mut decompress_bytes_buf = vec![0u8; 4 * 1024 * 8];
234        let mut decompress_offset_buf = vec![0i32; 1024];
235        fsst::fsst::decompress(
236            &self.symbol_table,
237            bytes,
238            offsets,
239            &mut decompress_bytes_buf,
240            &mut decompress_offset_buf,
241        )?;
242
243        Ok(DataBlock::VariableWidth(VariableWidthBlock {
244            data: LanceBuffer::Owned(decompress_bytes_buf),
245            offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
246            bits_per_offset: 32,
247            num_values,
248            block_info: BlockInfo::new(),
249        }))
250    }
251}
252
253#[cfg(test)]
254mod tests {
255
256    use std::collections::HashMap;
257
258    use lance_datagen::{ByteCount, RowCount};
259
260    use crate::{
261        testing::{check_round_trip_encoding_of_data, TestCases},
262        version::LanceFileVersion,
263    };
264
265    #[test_log::test(tokio::test)]
266    async fn test_fsst() {
267        let arr = lance_datagen::gen()
268            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(32), false))
269            .into_batch_rows(RowCount::from(1_000_000))
270            .unwrap()
271            .column(0)
272            .clone();
273        check_round_trip_encoding_of_data(
274            vec![arr],
275            &TestCases::default().with_file_version(LanceFileVersion::V2_1),
276            HashMap::new(),
277        )
278        .await;
279
280        let arr = lance_datagen::gen()
281            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(64), false))
282            .into_batch_rows(RowCount::from(1_000_000))
283            .unwrap()
284            .column(0)
285            .clone();
286        check_round_trip_encoding_of_data(
287            vec![arr],
288            &TestCases::default().with_file_version(LanceFileVersion::V2_1),
289            HashMap::new(),
290        )
291        .await;
292    }
293}