lance_encoding/encodings/physical/
fsst.rs1use lance_core::{Error, Result};
19use snafu::location;
20
21use crate::{
22 buffer::LanceBuffer,
23 compression::{MiniBlockDecompressor, VariablePerValueDecompressor},
24 data::{BlockInfo, DataBlock, VariableWidthBlock},
25 encodings::logical::primitive::{
26 fullzip::{PerValueCompressor, PerValueDataBlock},
27 miniblock::{MiniBlockCompressed, MiniBlockCompressor},
28 },
29 format::{
30 pb::{self},
31 ProtobufUtils,
32 },
33};
34
35use super::binary::{BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder};
36
37struct FsstCompressed {
38 data: VariableWidthBlock,
39 symbol_table: Vec<u8>,
40}
41
42impl FsstCompressed {
43 fn fsst_compress(data: DataBlock) -> Result<Self> {
44 match data {
45 DataBlock::VariableWidth(mut variable_width) => {
46 let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
47 let offsets_slice = offsets.as_ref();
48 let bytes_data = variable_width.data.into_buffer();
49
50 let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
52 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
53 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
54
55 fsst::fsst::compress(
57 &mut symbol_table,
58 bytes_data.as_slice(),
59 offsets_slice,
60 &mut dest_values,
61 &mut dest_offsets,
62 )?;
63
64 let compressed = VariableWidthBlock {
66 data: LanceBuffer::reinterpret_vec(dest_values),
67 bits_per_offset: 32,
68 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
69 num_values: variable_width.num_values,
70 block_info: BlockInfo::new(),
71 };
72
73 Ok(Self {
74 data: compressed,
75 symbol_table,
76 })
77 }
78 _ => Err(Error::InvalidInput {
79 source: format!(
80 "Cannot compress a data block of type {} with FsstEncoder",
81 data.name()
82 )
83 .into(),
84 location: location!(),
85 }),
86 }
87 }
88}
89
90#[derive(Debug, Default)]
91pub struct FsstMiniBlockEncoder {}
92
93impl MiniBlockCompressor for FsstMiniBlockEncoder {
94 fn compress(
95 &self,
96 data: DataBlock,
97 ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
98 let compressed = FsstCompressed::fsst_compress(data)?;
99
100 let data_block = DataBlock::VariableWidth(compressed.data);
101
102 let binary_compressor =
104 Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;
105
106 let (binary_miniblock_compressed, binary_array_encoding) =
107 binary_compressor.compress(data_block)?;
108
109 Ok((
110 binary_miniblock_compressed,
111 ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
112 ))
113 }
114}
115
116#[derive(Debug)]
117pub struct FsstPerValueEncoder {
118 inner: Box<dyn PerValueCompressor>,
119}
120
121impl FsstPerValueEncoder {
122 pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
123 Self { inner }
124 }
125}
126
127impl PerValueCompressor for FsstPerValueEncoder {
128 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
129 let compressed = FsstCompressed::fsst_compress(data)?;
130
131 let data_block = DataBlock::VariableWidth(compressed.data);
132
133 let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
134
135 Ok((
136 binary_compressed,
137 ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
138 ))
139 }
140}
141
142#[derive(Debug)]
143pub struct FsstPerValueDecompressor {
144 symbol_table: LanceBuffer,
145 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
146}
147
148impl FsstPerValueDecompressor {
149 pub fn new(
150 symbol_table: LanceBuffer,
151 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
152 ) -> Self {
153 Self {
154 symbol_table,
155 inner_decompressor,
156 }
157 }
158}
159
160impl VariablePerValueDecompressor for FsstPerValueDecompressor {
161 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
162 let mut compressed_variable_data = self
164 .inner_decompressor
165 .decompress(data)?
166 .as_variable_width()
167 .unwrap();
168
169 let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
171 let bytes = bytes.as_ref();
172 let offsets = compressed_variable_data
173 .offsets
174 .borrow_to_typed_slice::<i32>();
175 let offsets = offsets.as_ref();
176 let num_values = compressed_variable_data.num_values;
177
178 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
181 let mut decompress_offset_buf = vec![0i32; offsets.len()];
182 fsst::fsst::decompress(
183 &self.symbol_table,
184 bytes,
185 offsets,
186 &mut decompress_bytes_buf,
187 &mut decompress_offset_buf,
188 )?;
189
190 Ok(DataBlock::VariableWidth(VariableWidthBlock {
191 data: LanceBuffer::Owned(decompress_bytes_buf),
192 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
193 bits_per_offset: 32,
194 num_values,
195 block_info: BlockInfo::new(),
196 }))
197 }
198}
199
200#[derive(Debug)]
201pub struct FsstMiniBlockDecompressor {
202 symbol_table: LanceBuffer,
203}
204
205impl FsstMiniBlockDecompressor {
206 pub fn new(description: &pb::Fsst) -> Self {
207 Self {
208 symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
209 }
210 }
211}
212
213impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
214 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
215 let binary_decompressor =
218 Box::new(BinaryMiniBlockDecompressor::new(32)) as Box<dyn MiniBlockDecompressor>;
219 let compressed_data_block = binary_decompressor.decompress(data, num_values)?;
220 let DataBlock::VariableWidth(mut compressed_data_block) = compressed_data_block else {
221 panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
222 };
223
224 let bytes = compressed_data_block.data.borrow_to_typed_slice::<u8>();
226 let bytes = bytes.as_ref();
227 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
228 let offsets = offsets.as_ref();
229
230 let mut decompress_bytes_buf = vec![0u8; 4 * 1024 * 8];
234 let mut decompress_offset_buf = vec![0i32; 1024];
235 fsst::fsst::decompress(
236 &self.symbol_table,
237 bytes,
238 offsets,
239 &mut decompress_bytes_buf,
240 &mut decompress_offset_buf,
241 )?;
242
243 Ok(DataBlock::VariableWidth(VariableWidthBlock {
244 data: LanceBuffer::Owned(decompress_bytes_buf),
245 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
246 bits_per_offset: 32,
247 num_values,
248 block_info: BlockInfo::new(),
249 }))
250 }
251}
252
253#[cfg(test)]
254mod tests {
255
256 use std::collections::HashMap;
257
258 use lance_datagen::{ByteCount, RowCount};
259
260 use crate::{
261 testing::{check_round_trip_encoding_of_data, TestCases},
262 version::LanceFileVersion,
263 };
264
265 #[test_log::test(tokio::test)]
266 async fn test_fsst() {
267 let arr = lance_datagen::gen()
268 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(32), false))
269 .into_batch_rows(RowCount::from(1_000_000))
270 .unwrap()
271 .column(0)
272 .clone();
273 check_round_trip_encoding_of_data(
274 vec![arr],
275 &TestCases::default().with_file_version(LanceFileVersion::V2_1),
276 HashMap::new(),
277 )
278 .await;
279
280 let arr = lance_datagen::gen()
281 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(64), false))
282 .into_batch_rows(RowCount::from(1_000_000))
283 .unwrap()
284 .column(0)
285 .clone();
286 check_round_trip_encoding_of_data(
287 vec![arr],
288 &TestCases::default().with_file_version(LanceFileVersion::V2_1),
289 HashMap::new(),
290 )
291 .await;
292 }
293}