lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19/// Default threshold for RLE compression selection.
20/// RLE is chosen when the run count is less than this fraction of total values.
21const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
22
23use crate::{
24    buffer::LanceBuffer,
25    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
26    encodings::{
27        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
28        physical::{
29            binary::{
30                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
31                VariableDecoder, VariableEncoder,
32            },
33            bitpack::InlineBitpacking,
34            block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
35            byte_stream_split::ByteStreamSplitDecompressor,
36            constant::ConstantDecompressor,
37            fsst::{
38                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
39                FsstPerValueEncoder,
40            },
41            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
42            packed::{
43                PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
44            },
45            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
46            value::{ValueDecompressor, ValueEncoder},
47        },
48    },
49    format::{pb, ProtobufUtils},
50    statistics::{GetStat, Stat},
51};
52
53use arrow::{array::AsArray, datatypes::UInt64Type};
54use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
55use lance_core::{
56    datatypes::{Field, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY},
57    Error, Result,
58};
59use snafu::location;
60
61/// Trait for compression algorithms that compress an entire block of data into one opaque
62/// and self-described chunk.
63///
64/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
65///
66/// This is the most general type of compression.  There are no constraints on the method
67/// of compression it is assumed that the entire block of data will be present at decompression.
68///
69/// This is the least appropriate strategy for random access because we must load the entire
70/// block to access any single value.  This should only be used for cases where random access is never
71/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
72/// mini-block chunks)
73pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
74    /// Compress the data into a single buffer
75    ///
76    /// Also returns a description of the compression that can be used to decompress
77    /// when reading the data back
78    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
79}
80
81/// A trait to pick which compression to use for given data
82///
83/// There are several different kinds of compression.
84///
85/// - Block compression is the most generic, but most difficult to use efficiently
86/// - Per-value compression results in either a fixed width data block or a variable
87///   width data block.  In other words, there is some number of bits per value.
88///   In addition, each value should be independently decompressible.
89/// - Mini-block compression results in a small block of opaque data for chunks
90///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
91///   used for narrow data types (both fixed and variable length) where we can
92///   fit many values into an 16KiB block.
93pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
94    /// Create a block compressor for the given data
95    fn create_block_compressor(
96        &self,
97        field: &Field,
98        data: &DataBlock,
99    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
100
101    /// Create a per-value compressor for the given data
102    fn create_per_value(
103        &self,
104        field: &Field,
105        data: &DataBlock,
106    ) -> Result<Box<dyn PerValueCompressor>>;
107
108    /// Create a mini-block compressor for the given data
109    fn create_miniblock_compressor(
110        &self,
111        field: &Field,
112        data: &DataBlock,
113    ) -> Result<Box<dyn MiniBlockCompressor>>;
114}
115
116#[derive(Debug, Default)]
117pub struct DefaultCompressionStrategy;
118
119impl CompressionStrategy for DefaultCompressionStrategy {
120    fn create_miniblock_compressor(
121        &self,
122        field: &Field,
123        data: &DataBlock,
124    ) -> Result<Box<dyn MiniBlockCompressor>> {
125        match data {
126            DataBlock::FixedWidth(fixed_width_data) => {
127                let is_byte_width_aligned = fixed_width_data.bits_per_value == 8
128                    || fixed_width_data.bits_per_value == 16
129                    || fixed_width_data.bits_per_value == 32
130                    || fixed_width_data.bits_per_value == 64;
131                let bit_widths = data.expect_stat(Stat::BitWidth);
132                let bit_widths = bit_widths.as_primitive::<UInt64Type>();
133                // Temporary hack to work around https://github.com/lancedb/lance/issues/3102
134                // Ideally we should still be able to bit-pack here (either to 0 or 1 bit per value)
135                let has_all_zeros = bit_widths.values().iter().any(|v| *v == 0);
136                // The minimum bit packing size is a block of 1024 values.  For very small pages the uncompressed
137                // size might be smaller than the compressed size.
138                let too_small = bit_widths.len() == 1
139                    && InlineBitpacking::min_size_bytes(bit_widths.value(0)) >= data.data_size();
140
141                if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
142                    if compression.as_str() == "none" {
143                        return Ok(Box::new(ValueEncoder::default()));
144                    }
145                }
146
147                let rle_threshold: f64 = if let Some(value) =
148                    field.metadata.get(RLE_THRESHOLD_META_KEY)
149                {
150                    value.as_str().parse().map_err(|_| {
151                        Error::invalid_input("rle threshold is not a valid float64", location!())
152                    })?
153                } else {
154                    DEFAULT_RLE_COMPRESSION_THRESHOLD
155                };
156
157                // Check if RLE would be beneficial
158                let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
159                let num_values = fixed_width_data.num_values;
160
161                // Use RLE if the run count is less than the threshold
162                if (run_count as f64) < (num_values as f64) * rle_threshold && is_byte_width_aligned
163                {
164                    if fixed_width_data.bits_per_value >= 32 {
165                        return Ok(Box::new(GeneralMiniBlockCompressor::new(
166                            Box::new(RleMiniBlockEncoder::new()),
167                            CompressionConfig::new(CompressionScheme::Lz4, None),
168                        )));
169                    }
170                    return Ok(Box::new(RleMiniBlockEncoder::new()));
171                }
172
173                if !has_all_zeros && !too_small && is_byte_width_aligned {
174                    Ok(Box::new(InlineBitpacking::new(
175                        fixed_width_data.bits_per_value,
176                    )))
177                } else {
178                    Ok(Box::new(ValueEncoder::default()))
179                }
180            }
181            DataBlock::VariableWidth(variable_width_data) => {
182                if variable_width_data.bits_per_offset == 32
183                    || variable_width_data.bits_per_offset == 64
184                {
185                    let data_size =
186                        variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);
187                    let max_len =
188                        variable_width_data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
189
190                    if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
191                        && data_size >= FSST_LEAST_INPUT_SIZE as u64
192                    {
193                        Ok(Box::new(FsstMiniBlockEncoder::default()))
194                    } else {
195                        Ok(Box::new(BinaryMiniBlockEncoder::default()))
196                    }
197                } else {
198                    todo!(
199                        "Implement MiniBlockCompression for VariableWidth DataBlock with {} bit offsets.",
200                        variable_width_data.bits_per_offset
201                    )
202                }
203            }
204            DataBlock::Struct(struct_data_block) => {
205                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
206                // just being cautious here.
207                if struct_data_block
208                    .children
209                    .iter()
210                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
211                {
212                    panic!("packed struct encoding currently only supports fixed-width fields.")
213                }
214                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
215            }
216            DataBlock::FixedSizeList(_) => {
217                // Ideally we would compress the list items but this creates something of a challenge.
218                // We don't want to break lists across chunks and we need to worry about inner validity
219                // layers.  If we try and use a compression scheme then it is unlikely to respect these
220                // constraints.
221                //
222                // For now, we just don't compress.  In the future, we might want to consider a more
223                // sophisticated approach.
224                Ok(Box::new(ValueEncoder::default()))
225            }
226            _ => Err(Error::NotSupported {
227                source: format!(
228                    "Mini-block compression not yet supported for block type {}",
229                    data.name()
230                )
231                .into(),
232                location: location!(),
233            }),
234        }
235    }
236
237    fn create_per_value(
238        &self,
239        _field: &Field,
240        data: &DataBlock,
241    ) -> Result<Box<dyn PerValueCompressor>> {
242        match data {
243            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
244            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
245            DataBlock::VariableWidth(variable_width) => {
246                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
247                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
248
249                // If values are very large then use block compression on a per-value basis
250                //
251                // TODO: Could maybe use median here
252                if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
253                    return Ok(Box::new(CompressedBufferEncoder::default()));
254                }
255
256                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
257                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
258                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
259
260                    let variable_compression = Box::new(VariableEncoder::default());
261
262                    if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
263                        && data_size >= FSST_LEAST_INPUT_SIZE as u64
264                    {
265                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
266                    } else {
267                        Ok(variable_compression)
268                    }
269                } else {
270                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
271                }
272            }
273            _ => unreachable!(
274                "Per-value compression not yet supported for block type: {}",
275                data.name()
276            ),
277        }
278    }
279
280    fn create_block_compressor(
281        &self,
282        _field: &Field,
283        data: &DataBlock,
284    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
285        // TODO: We should actually compress here!
286        match data {
287            // Currently, block compression is used for rep/def (which is fixed width) and for dictionary
288            // encoding (which could be fixed width or variable width).
289            DataBlock::FixedWidth(fixed_width) => {
290                let encoder = Box::new(ValueEncoder::default());
291                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
292                Ok((encoder, encoding))
293            }
294            DataBlock::VariableWidth(variable_width) => {
295                let encoder = Box::new(VariableEncoder::default());
296                let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
297                Ok((encoder, encoding))
298            }
299            _ => unreachable!(),
300        }
301    }
302}
303
304pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
305    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
306}
307
308pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
309    /// Decompress one or more values
310    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
311    /// The number of bits in each value
312    ///
313    /// Currently (and probably long term) this must be a multiple of 8
314    fn bits_per_value(&self) -> u64;
315}
316
317pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
318    /// Decompress one or more values
319    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
320}
321
322pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
323    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
324}
325
326pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
327    fn create_miniblock_decompressor(
328        &self,
329        description: &pb::ArrayEncoding,
330        decompression_strategy: &dyn DecompressionStrategy,
331    ) -> Result<Box<dyn MiniBlockDecompressor>>;
332
333    fn create_fixed_per_value_decompressor(
334        &self,
335        description: &pb::ArrayEncoding,
336    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
337
338    fn create_variable_per_value_decompressor(
339        &self,
340        description: &pb::ArrayEncoding,
341    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
342
343    fn create_block_decompressor(
344        &self,
345        description: &pb::ArrayEncoding,
346    ) -> Result<Box<dyn BlockDecompressor>>;
347}
348
349#[derive(Debug, Default)]
350pub struct DefaultDecompressionStrategy {}
351
352impl DecompressionStrategy for DefaultDecompressionStrategy {
353    fn create_miniblock_decompressor(
354        &self,
355        description: &pb::ArrayEncoding,
356        decompression_strategy: &dyn DecompressionStrategy,
357    ) -> Result<Box<dyn MiniBlockDecompressor>> {
358        match description.array_encoding.as_ref().unwrap() {
359            pb::array_encoding::ArrayEncoding::Flat(flat) => {
360                Ok(Box::new(ValueDecompressor::from_flat(flat)))
361            }
362            pb::array_encoding::ArrayEncoding::InlineBitpacking(description) => {
363                Ok(Box::new(InlineBitpacking::from_description(description)))
364            }
365            pb::array_encoding::ArrayEncoding::Variable(variable) => Ok(Box::new(
366                BinaryMiniBlockDecompressor::new(variable.bits_per_offset as u8),
367            )),
368            pb::array_encoding::ArrayEncoding::Fsst(description) => {
369                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
370                    description.binary.as_ref().unwrap(),
371                    decompression_strategy,
372                )?;
373                Ok(Box::new(FsstMiniBlockDecompressor::new(
374                    description,
375                    inner_decompressor,
376                )))
377            }
378            pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
379                Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
380                    description,
381                )))
382            }
383            pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
384                // In the future, we might need to do something more complex here if FSL supports
385                // compression.
386                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
387            }
388            pb::array_encoding::ArrayEncoding::Rle(rle) => {
389                Ok(Box::new(RleMiniBlockDecompressor::new(rle.bits_per_value)))
390            }
391            pb::array_encoding::ArrayEncoding::ByteStreamSplit(bss) => Ok(Box::new(
392                ByteStreamSplitDecompressor::new(bss.bits_per_value as usize),
393            )),
394            pb::array_encoding::ArrayEncoding::GeneralMiniBlock(general) => {
395                // Create inner decompressor
396                let inner_decompressor = self.create_miniblock_decompressor(
397                    general.inner.as_ref().ok_or_else(|| {
398                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
399                    })?,
400                    decompression_strategy,
401                )?;
402
403                // Parse compression config
404                let compression = general.compression.as_ref().ok_or_else(|| {
405                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
406                })?;
407
408                let scheme = compression.scheme.parse()?;
409
410                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
411                    scheme,
412                    compression.level,
413                );
414
415                Ok(Box::new(GeneralMiniBlockDecompressor::new(
416                    inner_decompressor,
417                    compression_config,
418                )))
419            }
420            _ => todo!(),
421        }
422    }
423
424    fn create_fixed_per_value_decompressor(
425        &self,
426        description: &pb::ArrayEncoding,
427    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
428        match description.array_encoding.as_ref().unwrap() {
429            pb::array_encoding::ArrayEncoding::Flat(flat) => {
430                Ok(Box::new(ValueDecompressor::from_flat(flat)))
431            }
432            pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
433                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
434            }
435            _ => todo!("fixed-per-value decompressor for {:?}", description),
436        }
437    }
438
439    fn create_variable_per_value_decompressor(
440        &self,
441        description: &pb::ArrayEncoding,
442    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
443        match *description.array_encoding.as_ref().unwrap() {
444            pb::array_encoding::ArrayEncoding::Variable(variable) => {
445                assert!(variable.bits_per_offset < u8::MAX as u32);
446                Ok(Box::new(VariableDecoder::default()))
447            }
448            pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
449                Ok(Box::new(FsstPerValueDecompressor::new(
450                    LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
451                    Box::new(VariableDecoder::default()),
452                )))
453            }
454            pb::array_encoding::ArrayEncoding::Block(ref block) => Ok(Box::new(
455                CompressedBufferEncoder::from_scheme(&block.scheme)?,
456            )),
457            _ => todo!("variable-per-value decompressor for {:?}", description),
458        }
459    }
460
461    fn create_block_decompressor(
462        &self,
463        description: &pb::ArrayEncoding,
464    ) -> Result<Box<dyn BlockDecompressor>> {
465        match description.array_encoding.as_ref().unwrap() {
466            pb::array_encoding::ArrayEncoding::Flat(flat) => {
467                Ok(Box::new(ValueDecompressor::from_flat(flat)))
468            }
469            pb::array_encoding::ArrayEncoding::Constant(constant) => {
470                let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
471                Ok(Box::new(ConstantDecompressor::new(scalar)))
472            }
473            pb::array_encoding::ArrayEncoding::Variable(_) => {
474                Ok(Box::new(BinaryBlockDecompressor::default()))
475            }
476            _ => todo!(),
477        }
478    }
479}