lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::InlineBitpacking;
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
36            byte_stream_split::{
37                should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
38            },
39            constant::ConstantDecompressor,
40            fsst::{
41                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
42                FsstPerValueEncoder,
43            },
44            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
45            packed::{
46                PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
47            },
48            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
49            value::{ValueDecompressor, ValueEncoder},
50        },
51    },
52    format::{
53        pb21::{compressive_encoding::Compression, CompressiveEncoding},
54        ProtobufUtils21,
55    },
56    statistics::{GetStat, Stat},
57};
58
59use arrow_array::types::UInt64Type;
60use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
61use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
62use snafu::location;
63use std::str::FromStr;
64
65/// Default threshold for RLE compression selection.
66/// RLE is chosen when the run count is less than this fraction of total values.
67const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
68
69/// Trait for compression algorithms that compress an entire block of data into one opaque
70/// and self-described chunk.
71///
72/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
73///
74/// This is the most general type of compression.  There are no constraints on the method
75/// of compression it is assumed that the entire block of data will be present at decompression.
76///
77/// This is the least appropriate strategy for random access because we must load the entire
78/// block to access any single value.  This should only be used for cases where random access is never
79/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
80/// mini-block chunks)
81pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
82    /// Compress the data into a single buffer
83    ///
84    /// Also returns a description of the compression that can be used to decompress
85    /// when reading the data back
86    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
87}
88
89/// A trait to pick which compression to use for given data
90///
91/// There are several different kinds of compression.
92///
93/// - Block compression is the most generic, but most difficult to use efficiently
94/// - Per-value compression results in either a fixed width data block or a variable
95///   width data block.  In other words, there is some number of bits per value.
96///   In addition, each value should be independently decompressible.
97/// - Mini-block compression results in a small block of opaque data for chunks
98///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
99///   used for narrow data types (both fixed and variable length) where we can
100///   fit many values into an 16KiB block.
101pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
102    /// Create a block compressor for the given data
103    fn create_block_compressor(
104        &self,
105        field: &Field,
106        data: &DataBlock,
107    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
108
109    /// Create a per-value compressor for the given data
110    fn create_per_value(
111        &self,
112        field: &Field,
113        data: &DataBlock,
114    ) -> Result<Box<dyn PerValueCompressor>>;
115
116    /// Create a mini-block compressor for the given data
117    fn create_miniblock_compressor(
118        &self,
119        field: &Field,
120        data: &DataBlock,
121    ) -> Result<Box<dyn MiniBlockCompressor>>;
122}
123
124#[derive(Debug, Default)]
125pub struct DefaultCompressionStrategy {
126    /// User-configured compression parameters
127    params: CompressionParams,
128}
129
130fn try_bss_for_mini_block(
131    data: &FixedWidthDataBlock,
132    params: &CompressionFieldParams,
133) -> Option<Box<dyn MiniBlockCompressor>> {
134    // BSS requires general compression to be effective
135    // If compression is not set or explicitly disabled, skip BSS
136    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
137        return None;
138    }
139
140    let mode = params.bss.unwrap_or(BssMode::Auto);
141    // should_use_bss already checks for supported bit widths (32/64)
142    if should_use_bss(data, mode) {
143        return Some(Box::new(ByteStreamSplitEncoder::new(
144            data.bits_per_value as usize,
145        )));
146    }
147    None
148}
149
150fn try_rle_for_mini_block(
151    data: &FixedWidthDataBlock,
152    params: &CompressionFieldParams,
153) -> Option<Box<dyn MiniBlockCompressor>> {
154    let bits = data.bits_per_value;
155    if !matches!(bits, 8 | 16 | 32 | 64) {
156        return None;
157    }
158
159    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
160    let threshold = params
161        .rle_threshold
162        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
163
164    if (run_count as f64) < (data.num_values as f64) * threshold {
165        return Some(Box::new(RleMiniBlockEncoder::new()));
166    }
167    None
168}
169
170fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
171    #[cfg(feature = "bitpacking")]
172    {
173        use arrow_array::cast::AsArray;
174
175        let bits = _data.bits_per_value;
176        if !matches!(bits, 8 | 16 | 32 | 64) {
177            return None;
178        }
179
180        let bit_widths = _data.expect_stat(Stat::BitWidth);
181        let widths = bit_widths.as_primitive::<UInt64Type>();
182        let too_small = widths.len() == 1
183            && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
184
185        if !too_small {
186            return Some(Box::new(InlineBitpacking::new(bits)));
187        }
188        None
189    }
190    #[cfg(not(feature = "bitpacking"))]
191    {
192        None
193    }
194}
195
196fn maybe_wrap_general_for_mini_block(
197    inner: Box<dyn MiniBlockCompressor>,
198    params: &CompressionFieldParams,
199) -> Result<Box<dyn MiniBlockCompressor>> {
200    match params.compression.as_deref() {
201        None | Some("none") | Some("fsst") => Ok(inner),
202        Some(raw) => {
203            let scheme = CompressionScheme::from_str(raw).map_err(|_| {
204                lance_core::Error::invalid_input(
205                    format!("Unknown compression scheme: {raw}"),
206                    location!(),
207                )
208            })?;
209            let cfg = CompressionConfig::new(scheme, params.compression_level);
210            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
211        }
212    }
213}
214
215impl DefaultCompressionStrategy {
216    /// Create a new compression strategy with default behavior
217    pub fn new() -> Self {
218        Self::default()
219    }
220
221    /// Create a new compression strategy with user-configured parameters
222    pub fn with_params(params: CompressionParams) -> Self {
223        Self { params }
224    }
225
226    /// Parse compression parameters from field metadata
227    fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
228        let mut params = CompressionFieldParams::default();
229
230        // Parse compression method
231        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
232            params.compression = Some(compression.clone());
233        }
234
235        // Parse compression level
236        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
237            params.compression_level = level.parse().ok();
238        }
239
240        // Parse RLE threshold
241        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
242            params.rle_threshold = threshold.parse().ok();
243        }
244
245        // Parse BSS mode
246        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
247            match BssMode::parse(bss_str) {
248                Some(mode) => params.bss = Some(mode),
249                None => {
250                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
251                }
252            }
253        }
254
255        params
256    }
257
258    fn build_fixed_width_compressor(
259        &self,
260        params: &CompressionFieldParams,
261        data: &FixedWidthDataBlock,
262    ) -> Result<Box<dyn MiniBlockCompressor>> {
263        if params.compression.as_deref() == Some("none") {
264            return Ok(Box::new(ValueEncoder::default()));
265        }
266
267        let base = try_bss_for_mini_block(data, params)
268            .or_else(|| try_rle_for_mini_block(data, params))
269            .or_else(|| try_bitpack_for_mini_block(data))
270            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
271
272        maybe_wrap_general_for_mini_block(base, params)
273    }
274
275    /// Build compressor based on parameters for variable-width data
276    fn build_variable_width_compressor(
277        &self,
278        params: &CompressionFieldParams,
279        data: &VariableWidthBlock,
280    ) -> Result<Box<dyn MiniBlockCompressor>> {
281        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
282            return Err(Error::invalid_input(
283                format!(
284                    "Variable width compression not supported for {} bit offsets",
285                    data.bits_per_offset
286                ),
287                location!(),
288            ));
289        }
290
291        // Get statistics
292        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
293        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
294
295        // 1. Check for explicit "none" compression
296        if params.compression.as_deref() == Some("none") {
297            return Ok(Box::new(BinaryMiniBlockEncoder::default()));
298        }
299
300        // 2. Check for explicit "fsst" compression
301        if params.compression.as_deref() == Some("fsst") {
302            return Ok(Box::new(FsstMiniBlockEncoder::default()));
303        }
304
305        // 3. Choose base encoder (FSST or Binary) based on data characteristics
306        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
307            >= FSST_LEAST_INPUT_MAX_LENGTH
308            && data_size >= FSST_LEAST_INPUT_SIZE as u64
309        {
310            Box::new(FsstMiniBlockEncoder::default())
311        } else {
312            Box::new(BinaryMiniBlockEncoder::default())
313        };
314
315        // 4. Apply general compression if configured
316        if let Some(compression_scheme) = &params.compression {
317            if compression_scheme != "none" && compression_scheme != "fsst" {
318                let scheme: CompressionScheme = compression_scheme.parse()?;
319                let config = CompressionConfig::new(scheme, params.compression_level);
320                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
321            }
322        }
323
324        Ok(base_encoder)
325    }
326}
327
328impl CompressionStrategy for DefaultCompressionStrategy {
329    fn create_miniblock_compressor(
330        &self,
331        field: &Field,
332        data: &DataBlock,
333    ) -> Result<Box<dyn MiniBlockCompressor>> {
334        let mut field_params = self
335            .params
336            .get_field_params(&field.name, &field.data_type());
337
338        // Override with field metadata if present (highest priority)
339        let metadata_params = Self::parse_field_metadata(field);
340        field_params.merge(&metadata_params);
341
342        match data {
343            DataBlock::FixedWidth(fixed_width_data) => {
344                self.build_fixed_width_compressor(&field_params, fixed_width_data)
345            }
346            DataBlock::VariableWidth(variable_width_data) => {
347                self.build_variable_width_compressor(&field_params, variable_width_data)
348            }
349            DataBlock::Struct(struct_data_block) => {
350                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
351                // just being cautious here.
352                if struct_data_block
353                    .children
354                    .iter()
355                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
356                {
357                    panic!("packed struct encoding currently only supports fixed-width fields.")
358                }
359                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
360            }
361            DataBlock::FixedSizeList(_) => {
362                // Ideally we would compress the list items but this creates something of a challenge.
363                // We don't want to break lists across chunks and we need to worry about inner validity
364                // layers.  If we try and use a compression scheme then it is unlikely to respect these
365                // constraints.
366                //
367                // For now, we just don't compress.  In the future, we might want to consider a more
368                // sophisticated approach.
369                Ok(Box::new(ValueEncoder::default()))
370            }
371            _ => Err(Error::NotSupported {
372                source: format!(
373                    "Mini-block compression not yet supported for block type {}",
374                    data.name()
375                )
376                .into(),
377                location: location!(),
378            }),
379        }
380    }
381
382    fn create_per_value(
383        &self,
384        field: &Field,
385        data: &DataBlock,
386    ) -> Result<Box<dyn PerValueCompressor>> {
387        let field_params = Self::parse_field_metadata(field);
388
389        match data {
390            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
391            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
392            DataBlock::VariableWidth(variable_width) => {
393                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
394                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
395
396                // If values are very large then use block compression on a per-value basis
397                //
398                // TODO: Could maybe use median here
399                if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
400                    return Ok(Box::new(CompressedBufferEncoder::default()));
401                }
402
403                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
404                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
405                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
406
407                    let variable_compression = Box::new(VariableEncoder::default());
408
409                    // Use FSST if explicitly requested or if data characteristics warrant it
410                    if field_params.compression.as_deref() == Some("fsst")
411                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
412                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
413                    {
414                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
415                    } else {
416                        Ok(variable_compression)
417                    }
418                } else {
419                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
420                }
421            }
422            _ => unreachable!(
423                "Per-value compression not yet supported for block type: {}",
424                data.name()
425            ),
426        }
427    }
428
429    fn create_block_compressor(
430        &self,
431        _field: &Field,
432        data: &DataBlock,
433    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
434        // TODO: We should actually compress here!
435        match data {
436            // Currently, block compression is used for rep/def (which is fixed width) and for dictionary
437            // encoding (which could be fixed width or variable width).
438            DataBlock::FixedWidth(fixed_width) => {
439                let encoder = Box::new(ValueEncoder::default());
440                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
441                Ok((encoder, encoding))
442            }
443            DataBlock::VariableWidth(variable_width) => {
444                let encoder = Box::new(VariableEncoder::default());
445                let encoding = ProtobufUtils21::variable(
446                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
447                    None,
448                );
449                Ok((encoder, encoding))
450            }
451            _ => unreachable!(),
452        }
453    }
454}
455
456pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
457    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
458}
459
460pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
461    /// Decompress one or more values
462    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
463    /// The number of bits in each value
464    ///
465    /// Currently (and probably long term) this must be a multiple of 8
466    fn bits_per_value(&self) -> u64;
467}
468
469pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
470    /// Decompress one or more values
471    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
472}
473
474pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
475    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
476}
477
478pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
479    fn create_miniblock_decompressor(
480        &self,
481        description: &CompressiveEncoding,
482        decompression_strategy: &dyn DecompressionStrategy,
483    ) -> Result<Box<dyn MiniBlockDecompressor>>;
484
485    fn create_fixed_per_value_decompressor(
486        &self,
487        description: &CompressiveEncoding,
488    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
489
490    fn create_variable_per_value_decompressor(
491        &self,
492        description: &CompressiveEncoding,
493    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
494
495    fn create_block_decompressor(
496        &self,
497        description: &CompressiveEncoding,
498    ) -> Result<Box<dyn BlockDecompressor>>;
499}
500
501#[derive(Debug, Default)]
502pub struct DefaultDecompressionStrategy {}
503
504impl DecompressionStrategy for DefaultDecompressionStrategy {
505    fn create_miniblock_decompressor(
506        &self,
507        description: &CompressiveEncoding,
508        decompression_strategy: &dyn DecompressionStrategy,
509    ) -> Result<Box<dyn MiniBlockDecompressor>> {
510        match description.compression.as_ref().unwrap() {
511            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
512            #[cfg(feature = "bitpacking")]
513            Compression::InlineBitpacking(description) => {
514                Ok(Box::new(InlineBitpacking::from_description(description)))
515            }
516            #[cfg(not(feature = "bitpacking"))]
517            Compression::InlineBitpacking(_) => Err(Error::NotSupported {
518                source: "this runtime was not built with bitpacking support".into(),
519                location: location!(),
520            }),
521            Compression::Variable(variable) => {
522                let Compression::Flat(offsets) = variable
523                    .offsets
524                    .as_ref()
525                    .unwrap()
526                    .compression
527                    .as_ref()
528                    .unwrap()
529                else {
530                    panic!("Variable compression only supports flat offsets")
531                };
532                Ok(Box::new(BinaryMiniBlockDecompressor::new(
533                    offsets.bits_per_value as u8,
534                )))
535            }
536            Compression::Fsst(description) => {
537                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
538                    description.values.as_ref().unwrap(),
539                    decompression_strategy,
540                )?;
541                Ok(Box::new(FsstMiniBlockDecompressor::new(
542                    description,
543                    inner_decompressor,
544                )))
545            }
546            Compression::PackedStruct(description) => Ok(Box::new(
547                PackedStructFixedWidthMiniBlockDecompressor::new(description),
548            )),
549            Compression::FixedSizeList(fsl) => {
550                // In the future, we might need to do something more complex here if FSL supports
551                // compression.
552                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
553            }
554            Compression::Rle(rle) => {
555                let Compression::Flat(values) =
556                    rle.values.as_ref().unwrap().compression.as_ref().unwrap()
557                else {
558                    panic!("RLE compression only supports flat values")
559                };
560                let Compression::Flat(run_lengths) = rle
561                    .run_lengths
562                    .as_ref()
563                    .unwrap()
564                    .compression
565                    .as_ref()
566                    .unwrap()
567                else {
568                    panic!("RLE compression only supports flat run lengths")
569                };
570                assert_eq!(
571                    run_lengths.bits_per_value, 8,
572                    "RLE compression only supports 8-bit run lengths"
573                );
574                Ok(Box::new(RleMiniBlockDecompressor::new(
575                    values.bits_per_value,
576                )))
577            }
578            Compression::ByteStreamSplit(bss) => {
579                let Compression::Flat(values) =
580                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
581                else {
582                    panic!("ByteStreamSplit compression only supports flat values")
583                };
584                Ok(Box::new(ByteStreamSplitDecompressor::new(
585                    values.bits_per_value as usize,
586                )))
587            }
588            Compression::General(general) => {
589                // Create inner decompressor
590                let inner_decompressor = self.create_miniblock_decompressor(
591                    general.values.as_ref().ok_or_else(|| {
592                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
593                    })?,
594                    decompression_strategy,
595                )?;
596
597                // Parse compression config
598                let compression = general.compression.as_ref().ok_or_else(|| {
599                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
600                })?;
601
602                let scheme = compression.scheme().try_into()?;
603
604                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
605                    scheme,
606                    compression.level,
607                );
608
609                Ok(Box::new(GeneralMiniBlockDecompressor::new(
610                    inner_decompressor,
611                    compression_config,
612                )))
613            }
614            _ => todo!(),
615        }
616    }
617
618    fn create_fixed_per_value_decompressor(
619        &self,
620        description: &CompressiveEncoding,
621    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
622        match description.compression.as_ref().unwrap() {
623            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
624            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
625            _ => todo!("fixed-per-value decompressor for {:?}", description),
626        }
627    }
628
629    fn create_variable_per_value_decompressor(
630        &self,
631        description: &CompressiveEncoding,
632    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
633        match description.compression.as_ref().unwrap() {
634            Compression::Variable(variable) => {
635                let Compression::Flat(offsets) = variable
636                    .offsets
637                    .as_ref()
638                    .unwrap()
639                    .compression
640                    .as_ref()
641                    .unwrap()
642                else {
643                    panic!("Variable compression only supports flat offsets")
644                };
645                assert!(offsets.bits_per_value < u8::MAX as u64);
646                Ok(Box::new(VariableDecoder::default()))
647            }
648            Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
649                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
650                Box::new(VariableDecoder::default()),
651            ))),
652            Compression::General(ref general) => {
653                Ok(Box::new(CompressedBufferEncoder::from_scheme(
654                    general.compression.as_ref().expect_ok()?.scheme(),
655                )?))
656            }
657            _ => todo!("variable-per-value decompressor for {:?}", description),
658        }
659    }
660
661    fn create_block_decompressor(
662        &self,
663        description: &CompressiveEncoding,
664    ) -> Result<Box<dyn BlockDecompressor>> {
665        match description.compression.as_ref().unwrap() {
666            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
667            Compression::Constant(constant) => {
668                let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
669                Ok(Box::new(ConstantDecompressor::new(scalar)))
670            }
671            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
672            _ => todo!(),
673        }
674    }
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use crate::buffer::LanceBuffer;
681    use crate::data::{BlockInfo, DataBlock};
682    use arrow_schema::{DataType, Field as ArrowField};
683    use std::collections::HashMap;
684
685    fn create_test_field(name: &str, data_type: DataType) -> Field {
686        let arrow_field = ArrowField::new(name, data_type, true);
687        let mut field = Field::try_from(&arrow_field).unwrap();
688        field.id = -1;
689        field
690    }
691
692    fn create_fixed_width_block_with_stats(
693        bits_per_value: u64,
694        num_values: u64,
695        run_count: u64,
696    ) -> DataBlock {
697        // Create varied data to avoid low entropy
698        let bytes_per_value = (bits_per_value / 8) as usize;
699        let total_bytes = bytes_per_value * num_values as usize;
700        let mut data = vec![0u8; total_bytes];
701
702        // Create data with specified run count
703        let values_per_run = (num_values / run_count).max(1);
704        let mut run_value = 0u8;
705
706        for i in 0..num_values as usize {
707            if i % values_per_run as usize == 0 {
708                run_value = run_value.wrapping_add(17); // Use prime to get varied values
709            }
710            // Fill all bytes of the value to create high entropy
711            for j in 0..bytes_per_value {
712                let byte_offset = i * bytes_per_value + j;
713                if byte_offset < data.len() {
714                    data[byte_offset] = run_value.wrapping_add(j as u8);
715                }
716            }
717        }
718
719        let mut block = FixedWidthDataBlock {
720            bits_per_value,
721            data: LanceBuffer::reinterpret_vec(data),
722            num_values,
723            block_info: BlockInfo::default(),
724        };
725
726        // Compute all statistics including BytePositionEntropy
727        use crate::statistics::ComputeStat;
728        block.compute_stat();
729
730        DataBlock::FixedWidth(block)
731    }
732
733    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
734        // Create data with some variety to avoid always triggering BSS
735        let bytes_per_value = (bits_per_value / 8) as usize;
736        let total_bytes = bytes_per_value * num_values as usize;
737        let mut data = vec![0u8; total_bytes];
738
739        // Add some variation to the data to make it more realistic
740        for i in 0..num_values as usize {
741            let byte_offset = i * bytes_per_value;
742            if byte_offset < data.len() {
743                data[byte_offset] = (i % 256) as u8;
744            }
745        }
746
747        let mut block = FixedWidthDataBlock {
748            bits_per_value,
749            data: LanceBuffer::reinterpret_vec(data),
750            num_values,
751            block_info: BlockInfo::default(),
752        };
753
754        // Compute all statistics including BytePositionEntropy
755        use crate::statistics::ComputeStat;
756        block.compute_stat();
757
758        DataBlock::FixedWidth(block)
759    }
760
761    #[test]
762    fn test_parameter_based_compression() {
763        let mut params = CompressionParams::new();
764
765        // Configure RLE for ID columns with BSS explicitly disabled
766        params.columns.insert(
767            "*_id".to_string(),
768            CompressionFieldParams {
769                rle_threshold: Some(0.3),
770                compression: Some("lz4".to_string()),
771                compression_level: None,
772                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
773            },
774        );
775
776        let strategy = DefaultCompressionStrategy::with_params(params);
777        let field = create_test_field("user_id", DataType::Int32);
778
779        // Create data with low run count for RLE
780        // Use create_fixed_width_block_with_stats which properly sets run count
781        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
782
783        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
784        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
785        let debug_str = format!("{:?}", compressor);
786
787        // The compressor should be RLE wrapped in general compression
788        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
789        assert!(debug_str.contains("RleMiniBlockEncoder"));
790    }
791
792    #[test]
793    fn test_type_level_parameters() {
794        let mut params = CompressionParams::new();
795
796        // Configure all Int32 to use specific settings
797        params.types.insert(
798            "Int32".to_string(),
799            CompressionFieldParams {
800                rle_threshold: Some(0.1), // Very low threshold
801                compression: Some("zstd".to_string()),
802                compression_level: Some(3),
803                bss: Some(BssMode::Off), // Disable BSS to test RLE
804            },
805        );
806
807        let strategy = DefaultCompressionStrategy::with_params(params);
808        let field = create_test_field("some_column", DataType::Int32);
809        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
810        let data = create_fixed_width_block_with_stats(32, 1000, 50);
811
812        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
813        // Should use RLE due to very low threshold
814        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
815    }
816
817    #[test]
818    fn test_none_compression() {
819        let mut params = CompressionParams::new();
820
821        // Disable compression for embeddings
822        params.columns.insert(
823            "embeddings".to_string(),
824            CompressionFieldParams {
825                compression: Some("none".to_string()),
826                ..Default::default()
827            },
828        );
829
830        let strategy = DefaultCompressionStrategy::with_params(params);
831        let field = create_test_field("embeddings", DataType::Float32);
832        let data = create_fixed_width_block(32, 1000);
833
834        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
835        // Should use ValueEncoder (no compression)
836        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
837    }
838
839    #[test]
840    fn test_parameter_merge_priority() {
841        let mut params = CompressionParams::new();
842
843        // Set type-level
844        params.types.insert(
845            "Int32".to_string(),
846            CompressionFieldParams {
847                rle_threshold: Some(0.5),
848                compression: Some("lz4".to_string()),
849                ..Default::default()
850            },
851        );
852
853        // Set column-level (highest priority)
854        params.columns.insert(
855            "user_id".to_string(),
856            CompressionFieldParams {
857                rle_threshold: Some(0.2),
858                compression: Some("zstd".to_string()),
859                compression_level: Some(6),
860                bss: None,
861            },
862        );
863
864        let strategy = DefaultCompressionStrategy::with_params(params);
865
866        // Get merged params
867        let merged = strategy
868            .params
869            .get_field_params("user_id", &DataType::Int32);
870
871        // Column params should override type params
872        assert_eq!(merged.rle_threshold, Some(0.2));
873        assert_eq!(merged.compression, Some("zstd".to_string()));
874        assert_eq!(merged.compression_level, Some(6));
875
876        // Test field with only type params
877        let merged = strategy
878            .params
879            .get_field_params("other_field", &DataType::Int32);
880        assert_eq!(merged.rle_threshold, Some(0.5));
881        assert_eq!(merged.compression, Some("lz4".to_string()));
882        assert_eq!(merged.compression_level, None);
883    }
884
885    #[test]
886    fn test_pattern_matching() {
887        let mut params = CompressionParams::new();
888
889        // Configure pattern for log files
890        params.columns.insert(
891            "log_*".to_string(),
892            CompressionFieldParams {
893                compression: Some("zstd".to_string()),
894                compression_level: Some(6),
895                ..Default::default()
896            },
897        );
898
899        let strategy = DefaultCompressionStrategy::with_params(params);
900
901        // Should match pattern
902        let merged = strategy
903            .params
904            .get_field_params("log_messages", &DataType::Utf8);
905        assert_eq!(merged.compression, Some("zstd".to_string()));
906        assert_eq!(merged.compression_level, Some(6));
907
908        // Should not match
909        let merged = strategy
910            .params
911            .get_field_params("messages_log", &DataType::Utf8);
912        assert_eq!(merged.compression, None);
913    }
914
915    #[test]
916    fn test_legacy_metadata_support() {
917        let params = CompressionParams::new();
918        let strategy = DefaultCompressionStrategy::with_params(params);
919
920        // Test field with "none" compression metadata
921        let mut metadata = HashMap::new();
922        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
923        let mut field = create_test_field("some_column", DataType::Int32);
924        field.metadata = metadata;
925
926        let data = create_fixed_width_block(32, 1000);
927        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
928
929        // Should respect metadata and use ValueEncoder
930        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
931    }
932
933    #[test]
934    fn test_default_behavior() {
935        // Empty params should fall back to default behavior
936        let params = CompressionParams::new();
937        let strategy = DefaultCompressionStrategy::with_params(params);
938
939        let field = create_test_field("random_column", DataType::Int32);
940        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
941        let data = create_fixed_width_block_with_stats(32, 1000, 600);
942
943        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
944        // Should use default strategy's decision
945        let debug_str = format!("{:?}", compressor);
946        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
947    }
948
949    #[test]
950    fn test_field_metadata_compression() {
951        let params = CompressionParams::new();
952        let strategy = DefaultCompressionStrategy::with_params(params);
953
954        // Test field with compression metadata
955        let mut metadata = HashMap::new();
956        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
957        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
958        let mut field = create_test_field("test_column", DataType::Int32);
959        field.metadata = metadata;
960
961        let data = create_fixed_width_block(32, 1000);
962        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
963
964        // Should use zstd with level 6
965        let debug_str = format!("{:?}", compressor);
966        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
967    }
968
969    #[test]
970    fn test_field_metadata_rle_threshold() {
971        let params = CompressionParams::new();
972        let strategy = DefaultCompressionStrategy::with_params(params);
973
974        // Test field with RLE threshold metadata
975        let mut metadata = HashMap::new();
976        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
977        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
978        let mut field = create_test_field("test_column", DataType::Int32);
979        field.metadata = metadata;
980
981        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
982        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
983        let data = create_fixed_width_block_with_stats(32, 1000, 100);
984
985        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
986
987        // Should use RLE because run_count (100) < num_values * threshold (800)
988        let debug_str = format!("{:?}", compressor);
989        assert!(debug_str.contains("RleMiniBlockEncoder"));
990    }
991
992    #[test]
993    fn test_field_metadata_override_params() {
994        // Set up params with one configuration
995        let mut params = CompressionParams::new();
996        params.columns.insert(
997            "test_column".to_string(),
998            CompressionFieldParams {
999                rle_threshold: Some(0.3),
1000                compression: Some("lz4".to_string()),
1001                compression_level: None,
1002                bss: None,
1003            },
1004        );
1005
1006        let strategy = DefaultCompressionStrategy::with_params(params);
1007
1008        // Field metadata should override params
1009        let mut metadata = HashMap::new();
1010        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1011        let mut field = create_test_field("test_column", DataType::Int32);
1012        field.metadata = metadata;
1013
1014        let data = create_fixed_width_block(32, 1000);
1015        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1016
1017        // Should use none compression (from metadata) instead of lz4 (from params)
1018        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1019    }
1020
1021    #[test]
1022    fn test_field_metadata_mixed_configuration() {
1023        // Configure type-level params
1024        let mut params = CompressionParams::new();
1025        params.types.insert(
1026            "Int32".to_string(),
1027            CompressionFieldParams {
1028                rle_threshold: Some(0.5),
1029                compression: Some("lz4".to_string()),
1030                ..Default::default()
1031            },
1032        );
1033
1034        let strategy = DefaultCompressionStrategy::with_params(params);
1035
1036        // Field metadata provides partial override
1037        let mut metadata = HashMap::new();
1038        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1039        let mut field = create_test_field("test_column", DataType::Int32);
1040        field.metadata = metadata;
1041
1042        let data = create_fixed_width_block(32, 1000);
1043        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1044
1045        // Should use lz4 (from type params) with level 3 (from metadata)
1046        let debug_str = format!("{:?}", compressor);
1047        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1048    }
1049
1050    #[test]
1051    fn test_bss_field_metadata() {
1052        let params = CompressionParams::new();
1053        let strategy = DefaultCompressionStrategy::with_params(params);
1054
1055        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1056        let mut metadata = HashMap::new();
1057        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1058        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1059        let arrow_field =
1060            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1061        let field = Field::try_from(&arrow_field).unwrap();
1062
1063        // Create float data
1064        let data = create_fixed_width_block(32, 100);
1065
1066        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1067        let debug_str = format!("{:?}", compressor);
1068        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1069    }
1070
1071    #[test]
1072    fn test_bss_with_compression() {
1073        let params = CompressionParams::new();
1074        let strategy = DefaultCompressionStrategy::with_params(params);
1075
1076        // Test BSS with LZ4 compression
1077        let mut metadata = HashMap::new();
1078        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1079        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1080        let arrow_field =
1081            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1082        let field = Field::try_from(&arrow_field).unwrap();
1083
1084        // Create double data
1085        let data = create_fixed_width_block(64, 100);
1086
1087        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1088        let debug_str = format!("{:?}", compressor);
1089        // Should have BSS wrapped in general compression
1090        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1091        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1092    }
1093}