lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::InlineBitpacking;
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
36            byte_stream_split::{
37                should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
38            },
39            constant::ConstantDecompressor,
40            fsst::{
41                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
42                FsstPerValueEncoder,
43            },
44            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
45            packed::{
46                PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
47            },
48            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
49            value::{ValueDecompressor, ValueEncoder},
50        },
51    },
52    format::{
53        pb21::{compressive_encoding::Compression, CompressiveEncoding},
54        ProtobufUtils21,
55    },
56    statistics::{GetStat, Stat},
57};
58
59use arrow_array::types::UInt64Type;
60use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
61use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
62use snafu::location;
63use std::str::FromStr;
64
65/// Default threshold for RLE compression selection.
66/// RLE is chosen when the run count is less than this fraction of total values.
67const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
68
69/// Trait for compression algorithms that compress an entire block of data into one opaque
70/// and self-described chunk.
71///
72/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
73///
74/// This is the most general type of compression.  There are no constraints on the method
75/// of compression it is assumed that the entire block of data will be present at decompression.
76///
77/// This is the least appropriate strategy for random access because we must load the entire
78/// block to access any single value.  This should only be used for cases where random access is never
79/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
80/// mini-block chunks)
81pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
82    /// Compress the data into a single buffer
83    ///
84    /// Also returns a description of the compression that can be used to decompress
85    /// when reading the data back
86    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
87}
88
89/// A trait to pick which compression to use for given data
90///
91/// There are several different kinds of compression.
92///
93/// - Block compression is the most generic, but most difficult to use efficiently
94/// - Per-value compression results in either a fixed width data block or a variable
95///   width data block.  In other words, there is some number of bits per value.
96///   In addition, each value should be independently decompressible.
97/// - Mini-block compression results in a small block of opaque data for chunks
98///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
99///   used for narrow data types (both fixed and variable length) where we can
100///   fit many values into an 16KiB block.
101pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
102    /// Create a block compressor for the given data
103    fn create_block_compressor(
104        &self,
105        field: &Field,
106        data: &DataBlock,
107    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
108
109    /// Create a per-value compressor for the given data
110    fn create_per_value(
111        &self,
112        field: &Field,
113        data: &DataBlock,
114    ) -> Result<Box<dyn PerValueCompressor>>;
115
116    /// Create a mini-block compressor for the given data
117    fn create_miniblock_compressor(
118        &self,
119        field: &Field,
120        data: &DataBlock,
121    ) -> Result<Box<dyn MiniBlockCompressor>>;
122}
123
124#[derive(Debug, Default)]
125pub struct DefaultCompressionStrategy {
126    /// User-configured compression parameters
127    params: CompressionParams,
128}
129
130fn try_bss_for_mini_block(
131    data: &FixedWidthDataBlock,
132    params: &CompressionFieldParams,
133) -> Option<Box<dyn MiniBlockCompressor>> {
134    // BSS requires general compression to be effective
135    // If compression is not set or explicitly disabled, skip BSS
136    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
137        return None;
138    }
139
140    let mode = params.bss.unwrap_or(BssMode::Auto);
141    // should_use_bss already checks for supported bit widths (32/64)
142    if should_use_bss(data, mode) {
143        return Some(Box::new(ByteStreamSplitEncoder::new(
144            data.bits_per_value as usize,
145        )));
146    }
147    None
148}
149
150fn try_rle_for_mini_block(
151    data: &FixedWidthDataBlock,
152    params: &CompressionFieldParams,
153) -> Option<Box<dyn MiniBlockCompressor>> {
154    let bits = data.bits_per_value;
155    if !matches!(bits, 8 | 16 | 32 | 64) {
156        return None;
157    }
158
159    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
160    let threshold = params
161        .rle_threshold
162        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
163
164    if (run_count as f64) < (data.num_values as f64) * threshold {
165        return Some(Box::new(RleMiniBlockEncoder::new()));
166    }
167    None
168}
169
170fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
171    #[cfg(feature = "bitpacking")]
172    {
173        use arrow_array::cast::AsArray;
174
175        let bits = _data.bits_per_value;
176        if !matches!(bits, 8 | 16 | 32 | 64) {
177            return None;
178        }
179
180        let bit_widths = _data.expect_stat(Stat::BitWidth);
181        let widths = bit_widths.as_primitive::<UInt64Type>();
182        let has_all_zeros = widths.values().contains(&0);
183        let too_small = widths.len() == 1
184            && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
185
186        if !has_all_zeros && !too_small {
187            return Some(Box::new(InlineBitpacking::new(bits)));
188        }
189        None
190    }
191    #[cfg(not(feature = "bitpacking"))]
192    {
193        None
194    }
195}
196
197fn maybe_wrap_general_for_mini_block(
198    inner: Box<dyn MiniBlockCompressor>,
199    params: &CompressionFieldParams,
200) -> Result<Box<dyn MiniBlockCompressor>> {
201    match params.compression.as_deref() {
202        None | Some("none") | Some("fsst") => Ok(inner),
203        Some(raw) => {
204            let scheme = CompressionScheme::from_str(raw).map_err(|_| {
205                lance_core::Error::invalid_input(
206                    format!("Unknown compression scheme: {raw}"),
207                    location!(),
208                )
209            })?;
210            let cfg = CompressionConfig::new(scheme, params.compression_level);
211            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
212        }
213    }
214}
215
216impl DefaultCompressionStrategy {
217    /// Create a new compression strategy with default behavior
218    pub fn new() -> Self {
219        Self::default()
220    }
221
222    /// Create a new compression strategy with user-configured parameters
223    pub fn with_params(params: CompressionParams) -> Self {
224        Self { params }
225    }
226
227    /// Parse compression parameters from field metadata
228    fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
229        let mut params = CompressionFieldParams::default();
230
231        // Parse compression method
232        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
233            params.compression = Some(compression.clone());
234        }
235
236        // Parse compression level
237        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
238            params.compression_level = level.parse().ok();
239        }
240
241        // Parse RLE threshold
242        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
243            params.rle_threshold = threshold.parse().ok();
244        }
245
246        // Parse BSS mode
247        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
248            match BssMode::parse(bss_str) {
249                Some(mode) => params.bss = Some(mode),
250                None => {
251                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
252                }
253            }
254        }
255
256        params
257    }
258
259    fn build_fixed_width_compressor(
260        &self,
261        params: &CompressionFieldParams,
262        data: &FixedWidthDataBlock,
263    ) -> Result<Box<dyn MiniBlockCompressor>> {
264        if params.compression.as_deref() == Some("none") {
265            return Ok(Box::new(ValueEncoder::default()));
266        }
267
268        let base = try_bss_for_mini_block(data, params)
269            .or_else(|| try_rle_for_mini_block(data, params))
270            .or_else(|| try_bitpack_for_mini_block(data))
271            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
272
273        maybe_wrap_general_for_mini_block(base, params)
274    }
275
276    /// Build compressor based on parameters for variable-width data
277    fn build_variable_width_compressor(
278        &self,
279        params: &CompressionFieldParams,
280        data: &VariableWidthBlock,
281    ) -> Result<Box<dyn MiniBlockCompressor>> {
282        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
283            return Err(Error::invalid_input(
284                format!(
285                    "Variable width compression not supported for {} bit offsets",
286                    data.bits_per_offset
287                ),
288                location!(),
289            ));
290        }
291
292        // Get statistics
293        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
294        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
295
296        // 1. Check for explicit "none" compression
297        if params.compression.as_deref() == Some("none") {
298            return Ok(Box::new(BinaryMiniBlockEncoder::default()));
299        }
300
301        // 2. Check for explicit "fsst" compression
302        if params.compression.as_deref() == Some("fsst") {
303            return Ok(Box::new(FsstMiniBlockEncoder::default()));
304        }
305
306        // 3. Choose base encoder (FSST or Binary) based on data characteristics
307        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
308            >= FSST_LEAST_INPUT_MAX_LENGTH
309            && data_size >= FSST_LEAST_INPUT_SIZE as u64
310        {
311            Box::new(FsstMiniBlockEncoder::default())
312        } else {
313            Box::new(BinaryMiniBlockEncoder::default())
314        };
315
316        // 4. Apply general compression if configured
317        if let Some(compression_scheme) = &params.compression {
318            if compression_scheme != "none" && compression_scheme != "fsst" {
319                let scheme: CompressionScheme = compression_scheme.parse()?;
320                let config = CompressionConfig::new(scheme, params.compression_level);
321                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
322            }
323        }
324
325        Ok(base_encoder)
326    }
327}
328
329impl CompressionStrategy for DefaultCompressionStrategy {
330    fn create_miniblock_compressor(
331        &self,
332        field: &Field,
333        data: &DataBlock,
334    ) -> Result<Box<dyn MiniBlockCompressor>> {
335        let mut field_params = self
336            .params
337            .get_field_params(&field.name, &field.data_type());
338
339        // Override with field metadata if present (highest priority)
340        let metadata_params = Self::parse_field_metadata(field);
341        field_params.merge(&metadata_params);
342
343        match data {
344            DataBlock::FixedWidth(fixed_width_data) => {
345                self.build_fixed_width_compressor(&field_params, fixed_width_data)
346            }
347            DataBlock::VariableWidth(variable_width_data) => {
348                self.build_variable_width_compressor(&field_params, variable_width_data)
349            }
350            DataBlock::Struct(struct_data_block) => {
351                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
352                // just being cautious here.
353                if struct_data_block
354                    .children
355                    .iter()
356                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
357                {
358                    panic!("packed struct encoding currently only supports fixed-width fields.")
359                }
360                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
361            }
362            DataBlock::FixedSizeList(_) => {
363                // Ideally we would compress the list items but this creates something of a challenge.
364                // We don't want to break lists across chunks and we need to worry about inner validity
365                // layers.  If we try and use a compression scheme then it is unlikely to respect these
366                // constraints.
367                //
368                // For now, we just don't compress.  In the future, we might want to consider a more
369                // sophisticated approach.
370                Ok(Box::new(ValueEncoder::default()))
371            }
372            _ => Err(Error::NotSupported {
373                source: format!(
374                    "Mini-block compression not yet supported for block type {}",
375                    data.name()
376                )
377                .into(),
378                location: location!(),
379            }),
380        }
381    }
382
383    fn create_per_value(
384        &self,
385        field: &Field,
386        data: &DataBlock,
387    ) -> Result<Box<dyn PerValueCompressor>> {
388        let field_params = Self::parse_field_metadata(field);
389
390        match data {
391            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
392            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
393            DataBlock::VariableWidth(variable_width) => {
394                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
395                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
396
397                // If values are very large then use block compression on a per-value basis
398                //
399                // TODO: Could maybe use median here
400                if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
401                    return Ok(Box::new(CompressedBufferEncoder::default()));
402                }
403
404                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
405                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
406                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
407
408                    let variable_compression = Box::new(VariableEncoder::default());
409
410                    // Use FSST if explicitly requested or if data characteristics warrant it
411                    if field_params.compression.as_deref() == Some("fsst")
412                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
413                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
414                    {
415                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
416                    } else {
417                        Ok(variable_compression)
418                    }
419                } else {
420                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
421                }
422            }
423            _ => unreachable!(
424                "Per-value compression not yet supported for block type: {}",
425                data.name()
426            ),
427        }
428    }
429
430    fn create_block_compressor(
431        &self,
432        _field: &Field,
433        data: &DataBlock,
434    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
435        // TODO: We should actually compress here!
436        match data {
437            // Currently, block compression is used for rep/def (which is fixed width) and for dictionary
438            // encoding (which could be fixed width or variable width).
439            DataBlock::FixedWidth(fixed_width) => {
440                let encoder = Box::new(ValueEncoder::default());
441                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
442                Ok((encoder, encoding))
443            }
444            DataBlock::VariableWidth(variable_width) => {
445                let encoder = Box::new(VariableEncoder::default());
446                let encoding = ProtobufUtils21::variable(
447                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
448                    None,
449                );
450                Ok((encoder, encoding))
451            }
452            _ => unreachable!(),
453        }
454    }
455}
456
457pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
458    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
459}
460
461pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
462    /// Decompress one or more values
463    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
464    /// The number of bits in each value
465    ///
466    /// Currently (and probably long term) this must be a multiple of 8
467    fn bits_per_value(&self) -> u64;
468}
469
470pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
471    /// Decompress one or more values
472    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
473}
474
475pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
476    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
477}
478
479pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
480    fn create_miniblock_decompressor(
481        &self,
482        description: &CompressiveEncoding,
483        decompression_strategy: &dyn DecompressionStrategy,
484    ) -> Result<Box<dyn MiniBlockDecompressor>>;
485
486    fn create_fixed_per_value_decompressor(
487        &self,
488        description: &CompressiveEncoding,
489    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
490
491    fn create_variable_per_value_decompressor(
492        &self,
493        description: &CompressiveEncoding,
494    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
495
496    fn create_block_decompressor(
497        &self,
498        description: &CompressiveEncoding,
499    ) -> Result<Box<dyn BlockDecompressor>>;
500}
501
502#[derive(Debug, Default)]
503pub struct DefaultDecompressionStrategy {}
504
505impl DecompressionStrategy for DefaultDecompressionStrategy {
506    fn create_miniblock_decompressor(
507        &self,
508        description: &CompressiveEncoding,
509        decompression_strategy: &dyn DecompressionStrategy,
510    ) -> Result<Box<dyn MiniBlockDecompressor>> {
511        match description.compression.as_ref().unwrap() {
512            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
513            #[cfg(feature = "bitpacking")]
514            Compression::InlineBitpacking(description) => {
515                Ok(Box::new(InlineBitpacking::from_description(description)))
516            }
517            #[cfg(not(feature = "bitpacking"))]
518            Compression::InlineBitpacking(_) => Err(Error::NotSupported {
519                source: "this runtime was not built with bitpacking support".into(),
520                location: location!(),
521            }),
522            Compression::Variable(variable) => {
523                let Compression::Flat(offsets) = variable
524                    .offsets
525                    .as_ref()
526                    .unwrap()
527                    .compression
528                    .as_ref()
529                    .unwrap()
530                else {
531                    panic!("Variable compression only supports flat offsets")
532                };
533                Ok(Box::new(BinaryMiniBlockDecompressor::new(
534                    offsets.bits_per_value as u8,
535                )))
536            }
537            Compression::Fsst(description) => {
538                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
539                    description.values.as_ref().unwrap(),
540                    decompression_strategy,
541                )?;
542                Ok(Box::new(FsstMiniBlockDecompressor::new(
543                    description,
544                    inner_decompressor,
545                )))
546            }
547            Compression::PackedStruct(description) => Ok(Box::new(
548                PackedStructFixedWidthMiniBlockDecompressor::new(description),
549            )),
550            Compression::FixedSizeList(fsl) => {
551                // In the future, we might need to do something more complex here if FSL supports
552                // compression.
553                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
554            }
555            Compression::Rle(rle) => {
556                let Compression::Flat(values) =
557                    rle.values.as_ref().unwrap().compression.as_ref().unwrap()
558                else {
559                    panic!("RLE compression only supports flat values")
560                };
561                let Compression::Flat(run_lengths) = rle
562                    .run_lengths
563                    .as_ref()
564                    .unwrap()
565                    .compression
566                    .as_ref()
567                    .unwrap()
568                else {
569                    panic!("RLE compression only supports flat run lengths")
570                };
571                assert_eq!(
572                    run_lengths.bits_per_value, 8,
573                    "RLE compression only supports 8-bit run lengths"
574                );
575                Ok(Box::new(RleMiniBlockDecompressor::new(
576                    values.bits_per_value,
577                )))
578            }
579            Compression::ByteStreamSplit(bss) => {
580                let Compression::Flat(values) =
581                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
582                else {
583                    panic!("ByteStreamSplit compression only supports flat values")
584                };
585                Ok(Box::new(ByteStreamSplitDecompressor::new(
586                    values.bits_per_value as usize,
587                )))
588            }
589            Compression::General(general) => {
590                // Create inner decompressor
591                let inner_decompressor = self.create_miniblock_decompressor(
592                    general.values.as_ref().ok_or_else(|| {
593                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
594                    })?,
595                    decompression_strategy,
596                )?;
597
598                // Parse compression config
599                let compression = general.compression.as_ref().ok_or_else(|| {
600                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
601                })?;
602
603                let scheme = compression.scheme().try_into()?;
604
605                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
606                    scheme,
607                    compression.level,
608                );
609
610                Ok(Box::new(GeneralMiniBlockDecompressor::new(
611                    inner_decompressor,
612                    compression_config,
613                )))
614            }
615            _ => todo!(),
616        }
617    }
618
619    fn create_fixed_per_value_decompressor(
620        &self,
621        description: &CompressiveEncoding,
622    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
623        match description.compression.as_ref().unwrap() {
624            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
625            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
626            _ => todo!("fixed-per-value decompressor for {:?}", description),
627        }
628    }
629
630    fn create_variable_per_value_decompressor(
631        &self,
632        description: &CompressiveEncoding,
633    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
634        match description.compression.as_ref().unwrap() {
635            Compression::Variable(variable) => {
636                let Compression::Flat(offsets) = variable
637                    .offsets
638                    .as_ref()
639                    .unwrap()
640                    .compression
641                    .as_ref()
642                    .unwrap()
643                else {
644                    panic!("Variable compression only supports flat offsets")
645                };
646                assert!(offsets.bits_per_value < u8::MAX as u64);
647                Ok(Box::new(VariableDecoder::default()))
648            }
649            Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
650                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
651                Box::new(VariableDecoder::default()),
652            ))),
653            Compression::General(ref general) => {
654                Ok(Box::new(CompressedBufferEncoder::from_scheme(
655                    general.compression.as_ref().expect_ok()?.scheme(),
656                )?))
657            }
658            _ => todo!("variable-per-value decompressor for {:?}", description),
659        }
660    }
661
662    fn create_block_decompressor(
663        &self,
664        description: &CompressiveEncoding,
665    ) -> Result<Box<dyn BlockDecompressor>> {
666        match description.compression.as_ref().unwrap() {
667            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
668            Compression::Constant(constant) => {
669                let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
670                Ok(Box::new(ConstantDecompressor::new(scalar)))
671            }
672            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
673            _ => todo!(),
674        }
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681    use crate::buffer::LanceBuffer;
682    use crate::data::{BlockInfo, DataBlock};
683    use arrow_schema::{DataType, Field as ArrowField};
684    use std::collections::HashMap;
685
686    fn create_test_field(name: &str, data_type: DataType) -> Field {
687        let arrow_field = ArrowField::new(name, data_type, true);
688        let mut field = Field::try_from(&arrow_field).unwrap();
689        field.id = -1;
690        field
691    }
692
693    fn create_fixed_width_block_with_stats(
694        bits_per_value: u64,
695        num_values: u64,
696        run_count: u64,
697    ) -> DataBlock {
698        // Create varied data to avoid low entropy
699        let bytes_per_value = (bits_per_value / 8) as usize;
700        let total_bytes = bytes_per_value * num_values as usize;
701        let mut data = vec![0u8; total_bytes];
702
703        // Create data with specified run count
704        let values_per_run = (num_values / run_count).max(1);
705        let mut run_value = 0u8;
706
707        for i in 0..num_values as usize {
708            if i % values_per_run as usize == 0 {
709                run_value = run_value.wrapping_add(17); // Use prime to get varied values
710            }
711            // Fill all bytes of the value to create high entropy
712            for j in 0..bytes_per_value {
713                let byte_offset = i * bytes_per_value + j;
714                if byte_offset < data.len() {
715                    data[byte_offset] = run_value.wrapping_add(j as u8);
716                }
717            }
718        }
719
720        let mut block = FixedWidthDataBlock {
721            bits_per_value,
722            data: LanceBuffer::reinterpret_vec(data),
723            num_values,
724            block_info: BlockInfo::default(),
725        };
726
727        // Compute all statistics including BytePositionEntropy
728        use crate::statistics::ComputeStat;
729        block.compute_stat();
730
731        DataBlock::FixedWidth(block)
732    }
733
734    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
735        // Create data with some variety to avoid always triggering BSS
736        let bytes_per_value = (bits_per_value / 8) as usize;
737        let total_bytes = bytes_per_value * num_values as usize;
738        let mut data = vec![0u8; total_bytes];
739
740        // Add some variation to the data to make it more realistic
741        for i in 0..num_values as usize {
742            let byte_offset = i * bytes_per_value;
743            if byte_offset < data.len() {
744                data[byte_offset] = (i % 256) as u8;
745            }
746        }
747
748        let mut block = FixedWidthDataBlock {
749            bits_per_value,
750            data: LanceBuffer::reinterpret_vec(data),
751            num_values,
752            block_info: BlockInfo::default(),
753        };
754
755        // Compute all statistics including BytePositionEntropy
756        use crate::statistics::ComputeStat;
757        block.compute_stat();
758
759        DataBlock::FixedWidth(block)
760    }
761
762    #[test]
763    fn test_parameter_based_compression() {
764        let mut params = CompressionParams::new();
765
766        // Configure RLE for ID columns with BSS explicitly disabled
767        params.columns.insert(
768            "*_id".to_string(),
769            CompressionFieldParams {
770                rle_threshold: Some(0.3),
771                compression: Some("lz4".to_string()),
772                compression_level: None,
773                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
774            },
775        );
776
777        let strategy = DefaultCompressionStrategy::with_params(params);
778        let field = create_test_field("user_id", DataType::Int32);
779
780        // Create data with low run count for RLE
781        // Use create_fixed_width_block_with_stats which properly sets run count
782        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
783
784        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
785        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
786        let debug_str = format!("{:?}", compressor);
787
788        // The compressor should be RLE wrapped in general compression
789        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
790        assert!(debug_str.contains("RleMiniBlockEncoder"));
791    }
792
793    #[test]
794    fn test_type_level_parameters() {
795        let mut params = CompressionParams::new();
796
797        // Configure all Int32 to use specific settings
798        params.types.insert(
799            "Int32".to_string(),
800            CompressionFieldParams {
801                rle_threshold: Some(0.1), // Very low threshold
802                compression: Some("zstd".to_string()),
803                compression_level: Some(3),
804                bss: Some(BssMode::Off), // Disable BSS to test RLE
805            },
806        );
807
808        let strategy = DefaultCompressionStrategy::with_params(params);
809        let field = create_test_field("some_column", DataType::Int32);
810        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
811        let data = create_fixed_width_block_with_stats(32, 1000, 50);
812
813        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
814        // Should use RLE due to very low threshold
815        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
816    }
817
818    #[test]
819    fn test_none_compression() {
820        let mut params = CompressionParams::new();
821
822        // Disable compression for embeddings
823        params.columns.insert(
824            "embeddings".to_string(),
825            CompressionFieldParams {
826                compression: Some("none".to_string()),
827                ..Default::default()
828            },
829        );
830
831        let strategy = DefaultCompressionStrategy::with_params(params);
832        let field = create_test_field("embeddings", DataType::Float32);
833        let data = create_fixed_width_block(32, 1000);
834
835        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
836        // Should use ValueEncoder (no compression)
837        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
838    }
839
840    #[test]
841    fn test_parameter_merge_priority() {
842        let mut params = CompressionParams::new();
843
844        // Set type-level
845        params.types.insert(
846            "Int32".to_string(),
847            CompressionFieldParams {
848                rle_threshold: Some(0.5),
849                compression: Some("lz4".to_string()),
850                ..Default::default()
851            },
852        );
853
854        // Set column-level (highest priority)
855        params.columns.insert(
856            "user_id".to_string(),
857            CompressionFieldParams {
858                rle_threshold: Some(0.2),
859                compression: Some("zstd".to_string()),
860                compression_level: Some(6),
861                bss: None,
862            },
863        );
864
865        let strategy = DefaultCompressionStrategy::with_params(params);
866
867        // Get merged params
868        let merged = strategy
869            .params
870            .get_field_params("user_id", &DataType::Int32);
871
872        // Column params should override type params
873        assert_eq!(merged.rle_threshold, Some(0.2));
874        assert_eq!(merged.compression, Some("zstd".to_string()));
875        assert_eq!(merged.compression_level, Some(6));
876
877        // Test field with only type params
878        let merged = strategy
879            .params
880            .get_field_params("other_field", &DataType::Int32);
881        assert_eq!(merged.rle_threshold, Some(0.5));
882        assert_eq!(merged.compression, Some("lz4".to_string()));
883        assert_eq!(merged.compression_level, None);
884    }
885
886    #[test]
887    fn test_pattern_matching() {
888        let mut params = CompressionParams::new();
889
890        // Configure pattern for log files
891        params.columns.insert(
892            "log_*".to_string(),
893            CompressionFieldParams {
894                compression: Some("zstd".to_string()),
895                compression_level: Some(6),
896                ..Default::default()
897            },
898        );
899
900        let strategy = DefaultCompressionStrategy::with_params(params);
901
902        // Should match pattern
903        let merged = strategy
904            .params
905            .get_field_params("log_messages", &DataType::Utf8);
906        assert_eq!(merged.compression, Some("zstd".to_string()));
907        assert_eq!(merged.compression_level, Some(6));
908
909        // Should not match
910        let merged = strategy
911            .params
912            .get_field_params("messages_log", &DataType::Utf8);
913        assert_eq!(merged.compression, None);
914    }
915
916    #[test]
917    fn test_legacy_metadata_support() {
918        let params = CompressionParams::new();
919        let strategy = DefaultCompressionStrategy::with_params(params);
920
921        // Test field with "none" compression metadata
922        let mut metadata = HashMap::new();
923        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
924        let mut field = create_test_field("some_column", DataType::Int32);
925        field.metadata = metadata;
926
927        let data = create_fixed_width_block(32, 1000);
928        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
929
930        // Should respect metadata and use ValueEncoder
931        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
932    }
933
934    #[test]
935    fn test_default_behavior() {
936        // Empty params should fall back to default behavior
937        let params = CompressionParams::new();
938        let strategy = DefaultCompressionStrategy::with_params(params);
939
940        let field = create_test_field("random_column", DataType::Int32);
941        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
942        let data = create_fixed_width_block_with_stats(32, 1000, 600);
943
944        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
945        // Should use default strategy's decision
946        let debug_str = format!("{:?}", compressor);
947        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
948    }
949
950    #[test]
951    fn test_field_metadata_compression() {
952        let params = CompressionParams::new();
953        let strategy = DefaultCompressionStrategy::with_params(params);
954
955        // Test field with compression metadata
956        let mut metadata = HashMap::new();
957        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
958        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
959        let mut field = create_test_field("test_column", DataType::Int32);
960        field.metadata = metadata;
961
962        let data = create_fixed_width_block(32, 1000);
963        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
964
965        // Should use zstd with level 6
966        let debug_str = format!("{:?}", compressor);
967        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
968    }
969
970    #[test]
971    fn test_field_metadata_rle_threshold() {
972        let params = CompressionParams::new();
973        let strategy = DefaultCompressionStrategy::with_params(params);
974
975        // Test field with RLE threshold metadata
976        let mut metadata = HashMap::new();
977        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
978        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
979        let mut field = create_test_field("test_column", DataType::Int32);
980        field.metadata = metadata;
981
982        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
983        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
984        let data = create_fixed_width_block_with_stats(32, 1000, 100);
985
986        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
987
988        // Should use RLE because run_count (100) < num_values * threshold (800)
989        let debug_str = format!("{:?}", compressor);
990        assert!(debug_str.contains("RleMiniBlockEncoder"));
991    }
992
993    #[test]
994    fn test_field_metadata_override_params() {
995        // Set up params with one configuration
996        let mut params = CompressionParams::new();
997        params.columns.insert(
998            "test_column".to_string(),
999            CompressionFieldParams {
1000                rle_threshold: Some(0.3),
1001                compression: Some("lz4".to_string()),
1002                compression_level: None,
1003                bss: None,
1004            },
1005        );
1006
1007        let strategy = DefaultCompressionStrategy::with_params(params);
1008
1009        // Field metadata should override params
1010        let mut metadata = HashMap::new();
1011        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1012        let mut field = create_test_field("test_column", DataType::Int32);
1013        field.metadata = metadata;
1014
1015        let data = create_fixed_width_block(32, 1000);
1016        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1017
1018        // Should use none compression (from metadata) instead of lz4 (from params)
1019        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1020    }
1021
1022    #[test]
1023    fn test_field_metadata_mixed_configuration() {
1024        // Configure type-level params
1025        let mut params = CompressionParams::new();
1026        params.types.insert(
1027            "Int32".to_string(),
1028            CompressionFieldParams {
1029                rle_threshold: Some(0.5),
1030                compression: Some("lz4".to_string()),
1031                ..Default::default()
1032            },
1033        );
1034
1035        let strategy = DefaultCompressionStrategy::with_params(params);
1036
1037        // Field metadata provides partial override
1038        let mut metadata = HashMap::new();
1039        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1040        let mut field = create_test_field("test_column", DataType::Int32);
1041        field.metadata = metadata;
1042
1043        let data = create_fixed_width_block(32, 1000);
1044        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1045
1046        // Should use lz4 (from type params) with level 3 (from metadata)
1047        let debug_str = format!("{:?}", compressor);
1048        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1049    }
1050
1051    #[test]
1052    fn test_bss_field_metadata() {
1053        let params = CompressionParams::new();
1054        let strategy = DefaultCompressionStrategy::with_params(params);
1055
1056        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1057        let mut metadata = HashMap::new();
1058        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1059        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1060        let arrow_field =
1061            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1062        let field = Field::try_from(&arrow_field).unwrap();
1063
1064        // Create float data
1065        let data = create_fixed_width_block(32, 100);
1066
1067        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1068        let debug_str = format!("{:?}", compressor);
1069        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1070    }
1071
1072    #[test]
1073    fn test_bss_with_compression() {
1074        let params = CompressionParams::new();
1075        let strategy = DefaultCompressionStrategy::with_params(params);
1076
1077        // Test BSS with LZ4 compression
1078        let mut metadata = HashMap::new();
1079        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1080        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1081        let arrow_field =
1082            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1083        let field = Field::try_from(&arrow_field).unwrap();
1084
1085        // Create double data
1086        let data = create_fixed_width_block(64, 100);
1087
1088        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1089        let debug_str = format!("{:?}", compressor);
1090        // Should have BSS wrapped in general compression
1091        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1092        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1093    }
1094}