lance_encoding/encodings/physical/
block.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Encodings based on traditional block compression schemes
5//!
6//! Traditional compressors take in a buffer and return a smaller buffer.  All encoding
7//! description is shoved into the compressed buffer and the entire buffer is needed to
8//! decompress any of the data.
9//!
10//! These encodings are not transparent, which limits our ability to use them.  In addition
11//! they are often quite expensive in CPU terms.
12//!
13//! However, they are effective and useful for some cases.  For example, when working with large
14//! variable length values (e.g. source code files) they can be very effective.
15//!
16//! The module introduces the `[BufferCompressor]` trait which describes the interface for a
17//! traditional block compressor.  It is implemented for the most common compression schemes
18//! (zstd, lz4, etc).
19//!
20//! There is not yet a mini-block variant of this compressor (but could easily be one) and the
21//! full zip variant works by applying compression on a per-value basis (which allows it to be
22//! transparent).
23
24use arrow_buffer::ArrowNativeType;
25use snafu::location;
26use std::{
27    io::{Cursor, Write},
28    str::FromStr,
29};
30
31use lance_core::{Error, Result};
32
33use crate::{
34    buffer::LanceBuffer,
35    compression::VariablePerValueDecompressor,
36    data::{BlockInfo, DataBlock, VariableWidthBlock},
37    encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock},
38    format::{pb, ProtobufUtils},
39};
40
41#[derive(Debug, Clone, Copy, PartialEq)]
42pub struct CompressionConfig {
43    pub(crate) scheme: CompressionScheme,
44    pub(crate) level: Option<i32>,
45}
46
47impl CompressionConfig {
48    pub(crate) fn new(scheme: CompressionScheme, level: Option<i32>) -> Self {
49        Self { scheme, level }
50    }
51}
52
53impl Default for CompressionConfig {
54    fn default() -> Self {
55        Self {
56            scheme: CompressionScheme::Lz4,
57            level: Some(0),
58        }
59    }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub enum CompressionScheme {
64    None,
65    Fsst,
66    Zstd,
67    Lz4,
68}
69
70impl std::fmt::Display for CompressionScheme {
71    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
72        let scheme_str = match self {
73            Self::Fsst => "fsst",
74            Self::Zstd => "zstd",
75            Self::None => "none",
76            Self::Lz4 => "lz4",
77        };
78        write!(f, "{}", scheme_str)
79    }
80}
81
82impl FromStr for CompressionScheme {
83    type Err = Error;
84
85    fn from_str(s: &str) -> Result<Self> {
86        match s {
87            "none" => Ok(Self::None),
88            "zstd" => Ok(Self::Zstd),
89            _ => Err(Error::invalid_input(
90                format!("Unknown compression scheme: {}", s),
91                location!(),
92            )),
93        }
94    }
95}
96
97pub trait BufferCompressor: std::fmt::Debug + Send + Sync {
98    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
99    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
100    fn name(&self) -> &str;
101}
102
103#[derive(Debug, Default)]
104pub struct ZstdBufferCompressor {
105    compression_level: i32,
106}
107
108impl ZstdBufferCompressor {
109    pub fn new(compression_level: i32) -> Self {
110        Self { compression_level }
111    }
112}
113
114impl BufferCompressor for ZstdBufferCompressor {
115    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
116        let mut encoder = zstd::Encoder::new(output_buf, self.compression_level)?;
117        encoder.write_all(input_buf)?;
118        match encoder.finish() {
119            Ok(_) => Ok(()),
120            Err(e) => Err(e.into()),
121        }
122    }
123
124    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
125        let source = Cursor::new(input_buf);
126        zstd::stream::copy_decode(source, output_buf)?;
127        Ok(())
128    }
129
130    fn name(&self) -> &str {
131        "zstd"
132    }
133}
134
135#[derive(Debug, Default)]
136pub struct Lz4BufferCompressor {}
137
138impl BufferCompressor for Lz4BufferCompressor {
139    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
140        lz4::block::compress_to_buffer(input_buf, None, true, output_buf)
141            .map_err(|err| Error::Internal {
142                message: format!("LZ4 compression error: {}", err),
143                location: location!(),
144            })
145            .map(|_| ())
146    }
147
148    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
149        lz4::block::decompress_to_buffer(input_buf, None, output_buf)
150            .map_err(|err| Error::Internal {
151                message: format!("LZ4 decompression error: {}", err),
152                location: location!(),
153            })
154            .map(|_| ())
155    }
156
157    fn name(&self) -> &str {
158        "zstd"
159    }
160}
161
162#[derive(Debug, Default)]
163pub struct NoopBufferCompressor {}
164
165impl BufferCompressor for NoopBufferCompressor {
166    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
167        output_buf.extend_from_slice(input_buf);
168        Ok(())
169    }
170
171    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
172        output_buf.extend_from_slice(input_buf);
173        Ok(())
174    }
175
176    fn name(&self) -> &str {
177        "none"
178    }
179}
180
181pub struct GeneralBufferCompressor {}
182
183impl GeneralBufferCompressor {
184    pub fn get_compressor(compression_config: CompressionConfig) -> Box<dyn BufferCompressor> {
185        match compression_config.scheme {
186            // FSST has its own compression path and isn't implemented as a generic buffer compressor
187            CompressionScheme::Fsst => unimplemented!(),
188            CompressionScheme::Zstd => Box::new(ZstdBufferCompressor::new(
189                compression_config.level.unwrap_or(0),
190            )),
191            CompressionScheme::Lz4 => Box::new(Lz4BufferCompressor::default()),
192            CompressionScheme::None => Box::new(NoopBufferCompressor {}),
193        }
194    }
195}
196
197// An encoder which uses generic compression, such as zstd/lz4 to encode buffers
198#[derive(Debug)]
199pub struct CompressedBufferEncoder {
200    pub(crate) compressor: Box<dyn BufferCompressor>,
201}
202
203impl Default for CompressedBufferEncoder {
204    fn default() -> Self {
205        Self {
206            compressor: GeneralBufferCompressor::get_compressor(CompressionConfig {
207                scheme: CompressionScheme::Zstd,
208                level: Some(0),
209            }),
210        }
211    }
212}
213
214impl CompressedBufferEncoder {
215    pub fn new(compression_config: CompressionConfig) -> Self {
216        let compressor = GeneralBufferCompressor::get_compressor(compression_config);
217        Self { compressor }
218    }
219
220    pub fn from_scheme(scheme: &str) -> Result<Self> {
221        let scheme = CompressionScheme::from_str(scheme)?;
222        Ok(Self {
223            compressor: GeneralBufferCompressor::get_compressor(CompressionConfig {
224                scheme,
225                level: Some(0),
226            }),
227        })
228    }
229}
230
231impl CompressedBufferEncoder {
232    pub fn per_value_compress<T: ArrowNativeType>(
233        &self,
234        data: &[u8],
235        offsets: &[T],
236        compressed: &mut Vec<u8>,
237    ) -> Result<LanceBuffer> {
238        let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
239        new_offsets.push(T::from_usize(0).unwrap());
240
241        for off in offsets.windows(2) {
242            let start = off[0].as_usize();
243            let end = off[1].as_usize();
244            self.compressor.compress(&data[start..end], compressed)?;
245            new_offsets.push(T::from_usize(compressed.len()).unwrap());
246        }
247
248        Ok(LanceBuffer::reinterpret_vec(new_offsets))
249    }
250
251    pub fn per_value_decompress<T: ArrowNativeType>(
252        &self,
253        data: &[u8],
254        offsets: &[T],
255        decompressed: &mut Vec<u8>,
256    ) -> Result<LanceBuffer> {
257        let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
258        new_offsets.push(T::from_usize(0).unwrap());
259
260        for off in offsets.windows(2) {
261            let start = off[0].as_usize();
262            let end = off[1].as_usize();
263            self.compressor
264                .decompress(&data[start..end], decompressed)?;
265            new_offsets.push(T::from_usize(decompressed.len()).unwrap());
266        }
267
268        Ok(LanceBuffer::reinterpret_vec(new_offsets))
269    }
270}
271
272impl PerValueCompressor for CompressedBufferEncoder {
273    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
274        let data_type = data.name();
275        let mut data = data.as_variable_width().ok_or(Error::Internal {
276            message: format!(
277                "Attempt to use CompressedBufferEncoder on data of type {}",
278                data_type
279            ),
280            location: location!(),
281        })?;
282
283        let data_bytes = &data.data;
284        let mut compressed = Vec::with_capacity(data_bytes.len());
285
286        let new_offsets = match data.bits_per_offset {
287            32 => self.per_value_compress::<u32>(
288                data_bytes,
289                &data.offsets.borrow_to_typed_slice::<u32>(),
290                &mut compressed,
291            )?,
292            64 => self.per_value_compress::<u64>(
293                data_bytes,
294                &data.offsets.borrow_to_typed_slice::<u64>(),
295                &mut compressed,
296            )?,
297            _ => unreachable!(),
298        };
299
300        let compressed = PerValueDataBlock::Variable(VariableWidthBlock {
301            bits_per_offset: data.bits_per_offset,
302            data: LanceBuffer::from(compressed),
303            offsets: new_offsets,
304            num_values: data.num_values,
305            block_info: BlockInfo::new(),
306        });
307
308        let encoding = ProtobufUtils::block(self.compressor.name());
309
310        Ok((compressed, encoding))
311    }
312}
313
314impl VariablePerValueDecompressor for CompressedBufferEncoder {
315    fn decompress(&self, mut data: VariableWidthBlock) -> Result<DataBlock> {
316        let data_bytes = &data.data;
317        let mut decompressed = Vec::with_capacity(data_bytes.len() * 2);
318
319        let new_offsets = match data.bits_per_offset {
320            32 => self.per_value_decompress(
321                data_bytes,
322                &data.offsets.borrow_to_typed_slice::<u32>(),
323                &mut decompressed,
324            )?,
325            64 => self.per_value_decompress(
326                data_bytes,
327                &data.offsets.borrow_to_typed_slice::<u32>(),
328                &mut decompressed,
329            )?,
330            _ => unreachable!(),
331        };
332        Ok(DataBlock::VariableWidth(VariableWidthBlock {
333            bits_per_offset: data.bits_per_offset,
334            data: LanceBuffer::from(decompressed),
335            offsets: new_offsets,
336            num_values: data.num_values,
337            block_info: BlockInfo::new(),
338        }))
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use std::str::FromStr;
346
347    #[test]
348    fn test_compression_scheme_from_str() {
349        assert_eq!(
350            CompressionScheme::from_str("none").unwrap(),
351            CompressionScheme::None
352        );
353        assert_eq!(
354            CompressionScheme::from_str("zstd").unwrap(),
355            CompressionScheme::Zstd
356        );
357    }
358
359    #[test]
360    fn test_compression_scheme_from_str_invalid() {
361        assert!(CompressionScheme::from_str("invalid").is_err());
362    }
363}