lance_encoding/encodings/physical/
block.rs1use arrow_buffer::ArrowNativeType;
25use snafu::location;
26use std::{
27 io::{Cursor, Write},
28 str::FromStr,
29};
30
31use lance_core::{Error, Result};
32
33use crate::{
34 buffer::LanceBuffer,
35 compression::VariablePerValueDecompressor,
36 data::{BlockInfo, DataBlock, VariableWidthBlock},
37 encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock},
38 format::{pb, ProtobufUtils},
39};
40
41#[derive(Debug, Clone, Copy, PartialEq)]
42pub struct CompressionConfig {
43 pub(crate) scheme: CompressionScheme,
44 pub(crate) level: Option<i32>,
45}
46
47impl CompressionConfig {
48 pub(crate) fn new(scheme: CompressionScheme, level: Option<i32>) -> Self {
49 Self { scheme, level }
50 }
51}
52
53impl Default for CompressionConfig {
54 fn default() -> Self {
55 Self {
56 scheme: CompressionScheme::Lz4,
57 level: Some(0),
58 }
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub enum CompressionScheme {
64 None,
65 Fsst,
66 Zstd,
67 Lz4,
68}
69
70impl std::fmt::Display for CompressionScheme {
71 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
72 let scheme_str = match self {
73 Self::Fsst => "fsst",
74 Self::Zstd => "zstd",
75 Self::None => "none",
76 Self::Lz4 => "lz4",
77 };
78 write!(f, "{}", scheme_str)
79 }
80}
81
82impl FromStr for CompressionScheme {
83 type Err = Error;
84
85 fn from_str(s: &str) -> Result<Self> {
86 match s {
87 "none" => Ok(Self::None),
88 "zstd" => Ok(Self::Zstd),
89 _ => Err(Error::invalid_input(
90 format!("Unknown compression scheme: {}", s),
91 location!(),
92 )),
93 }
94 }
95}
96
97pub trait BufferCompressor: std::fmt::Debug + Send + Sync {
98 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
99 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
100 fn name(&self) -> &str;
101}
102
103#[derive(Debug, Default)]
104pub struct ZstdBufferCompressor {
105 compression_level: i32,
106}
107
108impl ZstdBufferCompressor {
109 pub fn new(compression_level: i32) -> Self {
110 Self { compression_level }
111 }
112}
113
114impl BufferCompressor for ZstdBufferCompressor {
115 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
116 let mut encoder = zstd::Encoder::new(output_buf, self.compression_level)?;
117 encoder.write_all(input_buf)?;
118 match encoder.finish() {
119 Ok(_) => Ok(()),
120 Err(e) => Err(e.into()),
121 }
122 }
123
124 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
125 let source = Cursor::new(input_buf);
126 zstd::stream::copy_decode(source, output_buf)?;
127 Ok(())
128 }
129
130 fn name(&self) -> &str {
131 "zstd"
132 }
133}
134
135#[derive(Debug, Default)]
136pub struct Lz4BufferCompressor {}
137
138impl BufferCompressor for Lz4BufferCompressor {
139 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
140 lz4::block::compress_to_buffer(input_buf, None, true, output_buf)
141 .map_err(|err| Error::Internal {
142 message: format!("LZ4 compression error: {}", err),
143 location: location!(),
144 })
145 .map(|_| ())
146 }
147
148 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
149 lz4::block::decompress_to_buffer(input_buf, None, output_buf)
150 .map_err(|err| Error::Internal {
151 message: format!("LZ4 decompression error: {}", err),
152 location: location!(),
153 })
154 .map(|_| ())
155 }
156
157 fn name(&self) -> &str {
158 "zstd"
159 }
160}
161
162#[derive(Debug, Default)]
163pub struct NoopBufferCompressor {}
164
165impl BufferCompressor for NoopBufferCompressor {
166 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
167 output_buf.extend_from_slice(input_buf);
168 Ok(())
169 }
170
171 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
172 output_buf.extend_from_slice(input_buf);
173 Ok(())
174 }
175
176 fn name(&self) -> &str {
177 "none"
178 }
179}
180
181pub struct GeneralBufferCompressor {}
182
183impl GeneralBufferCompressor {
184 pub fn get_compressor(compression_config: CompressionConfig) -> Box<dyn BufferCompressor> {
185 match compression_config.scheme {
186 CompressionScheme::Fsst => unimplemented!(),
188 CompressionScheme::Zstd => Box::new(ZstdBufferCompressor::new(
189 compression_config.level.unwrap_or(0),
190 )),
191 CompressionScheme::Lz4 => Box::new(Lz4BufferCompressor::default()),
192 CompressionScheme::None => Box::new(NoopBufferCompressor {}),
193 }
194 }
195}
196
197#[derive(Debug)]
199pub struct CompressedBufferEncoder {
200 pub(crate) compressor: Box<dyn BufferCompressor>,
201}
202
203impl Default for CompressedBufferEncoder {
204 fn default() -> Self {
205 Self {
206 compressor: GeneralBufferCompressor::get_compressor(CompressionConfig {
207 scheme: CompressionScheme::Zstd,
208 level: Some(0),
209 }),
210 }
211 }
212}
213
214impl CompressedBufferEncoder {
215 pub fn new(compression_config: CompressionConfig) -> Self {
216 let compressor = GeneralBufferCompressor::get_compressor(compression_config);
217 Self { compressor }
218 }
219
220 pub fn from_scheme(scheme: &str) -> Result<Self> {
221 let scheme = CompressionScheme::from_str(scheme)?;
222 Ok(Self {
223 compressor: GeneralBufferCompressor::get_compressor(CompressionConfig {
224 scheme,
225 level: Some(0),
226 }),
227 })
228 }
229}
230
231impl CompressedBufferEncoder {
232 pub fn per_value_compress<T: ArrowNativeType>(
233 &self,
234 data: &[u8],
235 offsets: &[T],
236 compressed: &mut Vec<u8>,
237 ) -> Result<LanceBuffer> {
238 let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
239 new_offsets.push(T::from_usize(0).unwrap());
240
241 for off in offsets.windows(2) {
242 let start = off[0].as_usize();
243 let end = off[1].as_usize();
244 self.compressor.compress(&data[start..end], compressed)?;
245 new_offsets.push(T::from_usize(compressed.len()).unwrap());
246 }
247
248 Ok(LanceBuffer::reinterpret_vec(new_offsets))
249 }
250
251 pub fn per_value_decompress<T: ArrowNativeType>(
252 &self,
253 data: &[u8],
254 offsets: &[T],
255 decompressed: &mut Vec<u8>,
256 ) -> Result<LanceBuffer> {
257 let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
258 new_offsets.push(T::from_usize(0).unwrap());
259
260 for off in offsets.windows(2) {
261 let start = off[0].as_usize();
262 let end = off[1].as_usize();
263 self.compressor
264 .decompress(&data[start..end], decompressed)?;
265 new_offsets.push(T::from_usize(decompressed.len()).unwrap());
266 }
267
268 Ok(LanceBuffer::reinterpret_vec(new_offsets))
269 }
270}
271
272impl PerValueCompressor for CompressedBufferEncoder {
273 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
274 let data_type = data.name();
275 let mut data = data.as_variable_width().ok_or(Error::Internal {
276 message: format!(
277 "Attempt to use CompressedBufferEncoder on data of type {}",
278 data_type
279 ),
280 location: location!(),
281 })?;
282
283 let data_bytes = &data.data;
284 let mut compressed = Vec::with_capacity(data_bytes.len());
285
286 let new_offsets = match data.bits_per_offset {
287 32 => self.per_value_compress::<u32>(
288 data_bytes,
289 &data.offsets.borrow_to_typed_slice::<u32>(),
290 &mut compressed,
291 )?,
292 64 => self.per_value_compress::<u64>(
293 data_bytes,
294 &data.offsets.borrow_to_typed_slice::<u64>(),
295 &mut compressed,
296 )?,
297 _ => unreachable!(),
298 };
299
300 let compressed = PerValueDataBlock::Variable(VariableWidthBlock {
301 bits_per_offset: data.bits_per_offset,
302 data: LanceBuffer::from(compressed),
303 offsets: new_offsets,
304 num_values: data.num_values,
305 block_info: BlockInfo::new(),
306 });
307
308 let encoding = ProtobufUtils::block(self.compressor.name());
309
310 Ok((compressed, encoding))
311 }
312}
313
314impl VariablePerValueDecompressor for CompressedBufferEncoder {
315 fn decompress(&self, mut data: VariableWidthBlock) -> Result<DataBlock> {
316 let data_bytes = &data.data;
317 let mut decompressed = Vec::with_capacity(data_bytes.len() * 2);
318
319 let new_offsets = match data.bits_per_offset {
320 32 => self.per_value_decompress(
321 data_bytes,
322 &data.offsets.borrow_to_typed_slice::<u32>(),
323 &mut decompressed,
324 )?,
325 64 => self.per_value_decompress(
326 data_bytes,
327 &data.offsets.borrow_to_typed_slice::<u32>(),
328 &mut decompressed,
329 )?,
330 _ => unreachable!(),
331 };
332 Ok(DataBlock::VariableWidth(VariableWidthBlock {
333 bits_per_offset: data.bits_per_offset,
334 data: LanceBuffer::from(decompressed),
335 offsets: new_offsets,
336 num_values: data.num_values,
337 block_info: BlockInfo::new(),
338 }))
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use std::str::FromStr;
346
347 #[test]
348 fn test_compression_scheme_from_str() {
349 assert_eq!(
350 CompressionScheme::from_str("none").unwrap(),
351 CompressionScheme::None
352 );
353 assert_eq!(
354 CompressionScheme::from_str("zstd").unwrap(),
355 CompressionScheme::Zstd
356 );
357 }
358
359 #[test]
360 fn test_compression_scheme_from_str_invalid() {
361 assert!(CompressionScheme::from_str("invalid").is_err());
362 }
363}