lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
36            byte_stream_split::{
37                should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
38            },
39            constant::ConstantDecompressor,
40            fsst::{
41                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
42                FsstPerValueEncoder,
43            },
44            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
45            packed::{
46                PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
47            },
48            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
49            value::{ValueDecompressor, ValueEncoder},
50        },
51    },
52    format::{
53        pb21::{compressive_encoding::Compression, CompressiveEncoding},
54        ProtobufUtils21,
55    },
56    statistics::{GetStat, Stat},
57};
58
59use arrow_array::{cast::AsArray, types::UInt64Type};
60use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
61use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
62use snafu::location;
63use std::str::FromStr;
64
65/// Default threshold for RLE compression selection.
66/// RLE is chosen when the run count is less than this fraction of total values.
67const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
68
69/// Trait for compression algorithms that compress an entire block of data into one opaque
70/// and self-described chunk.
71///
72/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
73///
74/// This is the most general type of compression.  There are no constraints on the method
75/// of compression it is assumed that the entire block of data will be present at decompression.
76///
77/// This is the least appropriate strategy for random access because we must load the entire
78/// block to access any single value.  This should only be used for cases where random access is never
79/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
80/// mini-block chunks)
81pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
82    /// Compress the data into a single buffer
83    ///
84    /// Also returns a description of the compression that can be used to decompress
85    /// when reading the data back
86    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
87}
88
89/// A trait to pick which compression to use for given data
90///
91/// There are several different kinds of compression.
92///
93/// - Block compression is the most generic, but most difficult to use efficiently
94/// - Per-value compression results in either a fixed width data block or a variable
95///   width data block.  In other words, there is some number of bits per value.
96///   In addition, each value should be independently decompressible.
97/// - Mini-block compression results in a small block of opaque data for chunks
98///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
99///   used for narrow data types (both fixed and variable length) where we can
100///   fit many values into an 16KiB block.
101pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
102    /// Create a block compressor for the given data
103    fn create_block_compressor(
104        &self,
105        field: &Field,
106        data: &DataBlock,
107    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
108
109    /// Create a per-value compressor for the given data
110    fn create_per_value(
111        &self,
112        field: &Field,
113        data: &DataBlock,
114    ) -> Result<Box<dyn PerValueCompressor>>;
115
116    /// Create a mini-block compressor for the given data
117    fn create_miniblock_compressor(
118        &self,
119        field: &Field,
120        data: &DataBlock,
121    ) -> Result<Box<dyn MiniBlockCompressor>>;
122}
123
124#[derive(Debug, Default)]
125pub struct DefaultCompressionStrategy {
126    /// User-configured compression parameters
127    params: CompressionParams,
128}
129
130fn try_bss_for_mini_block(
131    data: &FixedWidthDataBlock,
132    params: &CompressionFieldParams,
133) -> Option<Box<dyn MiniBlockCompressor>> {
134    // BSS requires general compression to be effective
135    // If compression is not set or explicitly disabled, skip BSS
136    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
137        return None;
138    }
139
140    let mode = params.bss.unwrap_or(BssMode::Auto);
141    // should_use_bss already checks for supported bit widths (32/64)
142    if should_use_bss(data, mode) {
143        return Some(Box::new(ByteStreamSplitEncoder::new(
144            data.bits_per_value as usize,
145        )));
146    }
147    None
148}
149
150fn try_rle_for_mini_block(
151    data: &FixedWidthDataBlock,
152    params: &CompressionFieldParams,
153) -> Option<Box<dyn MiniBlockCompressor>> {
154    let bits = data.bits_per_value;
155    if !matches!(bits, 8 | 16 | 32 | 64) {
156        return None;
157    }
158
159    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
160    let threshold = params
161        .rle_threshold
162        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
163
164    if (run_count as f64) < (data.num_values as f64) * threshold {
165        return Some(Box::new(RleMiniBlockEncoder::new()));
166    }
167    None
168}
169
170fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
171    #[cfg(feature = "bitpacking")]
172    {
173        use arrow_array::cast::AsArray;
174
175        let bits = _data.bits_per_value;
176        if !matches!(bits, 8 | 16 | 32 | 64) {
177            return None;
178        }
179
180        let bit_widths = _data.expect_stat(Stat::BitWidth);
181        let widths = bit_widths.as_primitive::<UInt64Type>();
182        let too_small = widths.len() == 1
183            && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
184
185        if !too_small {
186            return Some(Box::new(InlineBitpacking::new(bits)));
187        }
188        None
189    }
190    #[cfg(not(feature = "bitpacking"))]
191    {
192        None
193    }
194}
195
196fn try_bitpack_for_block(
197    data: &FixedWidthDataBlock,
198) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
199    let bits = data.bits_per_value;
200    if !matches!(bits, 8 | 16 | 32 | 64) {
201        return None;
202    }
203
204    let bit_widths = data.expect_stat(Stat::BitWidth);
205    let widths = bit_widths.as_primitive::<UInt64Type>();
206    let has_all_zeros = widths.values().contains(&0);
207    let max_bit_width = *widths.values().iter().max().unwrap();
208
209    let too_small =
210        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
211
212    if has_all_zeros || too_small {
213        return None;
214    }
215
216    if data.num_values <= 1024 {
217        let compressor = Box::new(InlineBitpacking::new(bits));
218        let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
219        Some((compressor, encoding))
220    } else {
221        let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
222        let encoding = ProtobufUtils21::out_of_line_bitpacking(
223            bits,
224            ProtobufUtils21::flat(max_bit_width, None),
225        );
226        Some((compressor, encoding))
227    }
228}
229
230fn maybe_wrap_general_for_mini_block(
231    inner: Box<dyn MiniBlockCompressor>,
232    params: &CompressionFieldParams,
233) -> Result<Box<dyn MiniBlockCompressor>> {
234    match params.compression.as_deref() {
235        None | Some("none") | Some("fsst") => Ok(inner),
236        Some(raw) => {
237            let scheme = CompressionScheme::from_str(raw).map_err(|_| {
238                lance_core::Error::invalid_input(
239                    format!("Unknown compression scheme: {raw}"),
240                    location!(),
241                )
242            })?;
243            let cfg = CompressionConfig::new(scheme, params.compression_level);
244            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
245        }
246    }
247}
248
249impl DefaultCompressionStrategy {
250    /// Create a new compression strategy with default behavior
251    pub fn new() -> Self {
252        Self::default()
253    }
254
255    /// Create a new compression strategy with user-configured parameters
256    pub fn with_params(params: CompressionParams) -> Self {
257        Self { params }
258    }
259
260    /// Parse compression parameters from field metadata
261    fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
262        let mut params = CompressionFieldParams::default();
263
264        // Parse compression method
265        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
266            params.compression = Some(compression.clone());
267        }
268
269        // Parse compression level
270        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
271            params.compression_level = level.parse().ok();
272        }
273
274        // Parse RLE threshold
275        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
276            params.rle_threshold = threshold.parse().ok();
277        }
278
279        // Parse BSS mode
280        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
281            match BssMode::parse(bss_str) {
282                Some(mode) => params.bss = Some(mode),
283                None => {
284                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
285                }
286            }
287        }
288
289        params
290    }
291
292    fn build_fixed_width_compressor(
293        &self,
294        params: &CompressionFieldParams,
295        data: &FixedWidthDataBlock,
296    ) -> Result<Box<dyn MiniBlockCompressor>> {
297        if params.compression.as_deref() == Some("none") {
298            return Ok(Box::new(ValueEncoder::default()));
299        }
300
301        let base = try_bss_for_mini_block(data, params)
302            .or_else(|| try_rle_for_mini_block(data, params))
303            .or_else(|| try_bitpack_for_mini_block(data))
304            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
305
306        maybe_wrap_general_for_mini_block(base, params)
307    }
308
309    /// Build compressor based on parameters for variable-width data
310    fn build_variable_width_compressor(
311        &self,
312        params: &CompressionFieldParams,
313        data: &VariableWidthBlock,
314    ) -> Result<Box<dyn MiniBlockCompressor>> {
315        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
316            return Err(Error::invalid_input(
317                format!(
318                    "Variable width compression not supported for {} bit offsets",
319                    data.bits_per_offset
320                ),
321                location!(),
322            ));
323        }
324
325        // Get statistics
326        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
327        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
328
329        // 1. Check for explicit "none" compression
330        if params.compression.as_deref() == Some("none") {
331            return Ok(Box::new(BinaryMiniBlockEncoder::default()));
332        }
333
334        // 2. Check for explicit "fsst" compression
335        if params.compression.as_deref() == Some("fsst") {
336            return Ok(Box::new(FsstMiniBlockEncoder::default()));
337        }
338
339        // 3. Choose base encoder (FSST or Binary) based on data characteristics
340        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
341            >= FSST_LEAST_INPUT_MAX_LENGTH
342            && data_size >= FSST_LEAST_INPUT_SIZE as u64
343        {
344            Box::new(FsstMiniBlockEncoder::default())
345        } else {
346            Box::new(BinaryMiniBlockEncoder::default())
347        };
348
349        // 4. Apply general compression if configured
350        if let Some(compression_scheme) = &params.compression {
351            if compression_scheme != "none" && compression_scheme != "fsst" {
352                let scheme: CompressionScheme = compression_scheme.parse()?;
353                let config = CompressionConfig::new(scheme, params.compression_level);
354                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
355            }
356        }
357
358        Ok(base_encoder)
359    }
360}
361
362impl CompressionStrategy for DefaultCompressionStrategy {
363    fn create_miniblock_compressor(
364        &self,
365        field: &Field,
366        data: &DataBlock,
367    ) -> Result<Box<dyn MiniBlockCompressor>> {
368        let mut field_params = self
369            .params
370            .get_field_params(&field.name, &field.data_type());
371
372        // Override with field metadata if present (highest priority)
373        let metadata_params = Self::parse_field_metadata(field);
374        field_params.merge(&metadata_params);
375
376        match data {
377            DataBlock::FixedWidth(fixed_width_data) => {
378                self.build_fixed_width_compressor(&field_params, fixed_width_data)
379            }
380            DataBlock::VariableWidth(variable_width_data) => {
381                self.build_variable_width_compressor(&field_params, variable_width_data)
382            }
383            DataBlock::Struct(struct_data_block) => {
384                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
385                // just being cautious here.
386                if struct_data_block
387                    .children
388                    .iter()
389                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
390                {
391                    panic!("packed struct encoding currently only supports fixed-width fields.")
392                }
393                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
394            }
395            DataBlock::FixedSizeList(_) => {
396                // Ideally we would compress the list items but this creates something of a challenge.
397                // We don't want to break lists across chunks and we need to worry about inner validity
398                // layers.  If we try and use a compression scheme then it is unlikely to respect these
399                // constraints.
400                //
401                // For now, we just don't compress.  In the future, we might want to consider a more
402                // sophisticated approach.
403                Ok(Box::new(ValueEncoder::default()))
404            }
405            _ => Err(Error::NotSupported {
406                source: format!(
407                    "Mini-block compression not yet supported for block type {}",
408                    data.name()
409                )
410                .into(),
411                location: location!(),
412            }),
413        }
414    }
415
416    fn create_per_value(
417        &self,
418        field: &Field,
419        data: &DataBlock,
420    ) -> Result<Box<dyn PerValueCompressor>> {
421        let field_params = Self::parse_field_metadata(field);
422
423        match data {
424            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
425            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
426            DataBlock::VariableWidth(variable_width) => {
427                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
428                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
429
430                // If values are very large then use block compression on a per-value basis
431                //
432                // TODO: Could maybe use median here
433
434                let per_value_requested =
435                    if let Some(compression) = field_params.compression.as_deref() {
436                        compression != "none" && compression != "fsst"
437                    } else {
438                        false
439                    };
440
441                if (max_len > 32 * 1024 || per_value_requested)
442                    && data_size >= FSST_LEAST_INPUT_SIZE as u64
443                {
444                    return Ok(Box::new(CompressedBufferEncoder::default()));
445                }
446
447                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
448                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
449                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
450
451                    let variable_compression = Box::new(VariableEncoder::default());
452
453                    // Use FSST if explicitly requested or if data characteristics warrant it
454                    if field_params.compression.as_deref() == Some("fsst")
455                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
456                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
457                    {
458                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
459                    } else {
460                        Ok(variable_compression)
461                    }
462                } else {
463                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
464                }
465            }
466            _ => unreachable!(
467                "Per-value compression not yet supported for block type: {}",
468                data.name()
469            ),
470        }
471    }
472
473    fn create_block_compressor(
474        &self,
475        field: &Field,
476        data: &DataBlock,
477    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
478        match data {
479            // Currently, block compression is used for rep/def (which is fixed width) and for dictionary
480            // encoding (which could be fixed width or variable width).
481            DataBlock::FixedWidth(fixed_width) => {
482                if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
483                    return Ok((compressor, encoding));
484                }
485
486                // Default to uncompressed
487                let encoder = Box::new(ValueEncoder::default());
488                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
489                Ok((encoder, encoding))
490            }
491            DataBlock::VariableWidth(variable_width) => {
492                let encoder = Box::new(VariableEncoder::default());
493                let encoding = ProtobufUtils21::variable(
494                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
495                    None,
496                );
497                Ok((encoder, encoding))
498            }
499            _ => todo!(
500                "block compressor for field {:?} and block type {:?}",
501                field,
502                data.name()
503            ),
504        }
505    }
506}
507
508pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
509    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
510}
511
512pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
513    /// Decompress one or more values
514    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
515    /// The number of bits in each value
516    ///
517    /// Currently (and probably long term) this must be a multiple of 8
518    fn bits_per_value(&self) -> u64;
519}
520
521pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
522    /// Decompress one or more values
523    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
524}
525
526pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
527    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
528}
529
530pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
531    fn create_miniblock_decompressor(
532        &self,
533        description: &CompressiveEncoding,
534        decompression_strategy: &dyn DecompressionStrategy,
535    ) -> Result<Box<dyn MiniBlockDecompressor>>;
536
537    fn create_fixed_per_value_decompressor(
538        &self,
539        description: &CompressiveEncoding,
540    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
541
542    fn create_variable_per_value_decompressor(
543        &self,
544        description: &CompressiveEncoding,
545    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
546
547    fn create_block_decompressor(
548        &self,
549        description: &CompressiveEncoding,
550    ) -> Result<Box<dyn BlockDecompressor>>;
551}
552
553#[derive(Debug, Default)]
554pub struct DefaultDecompressionStrategy {}
555
556impl DecompressionStrategy for DefaultDecompressionStrategy {
557    fn create_miniblock_decompressor(
558        &self,
559        description: &CompressiveEncoding,
560        decompression_strategy: &dyn DecompressionStrategy,
561    ) -> Result<Box<dyn MiniBlockDecompressor>> {
562        match description.compression.as_ref().unwrap() {
563            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
564            #[cfg(feature = "bitpacking")]
565            Compression::InlineBitpacking(description) => {
566                Ok(Box::new(InlineBitpacking::from_description(description)))
567            }
568            #[cfg(not(feature = "bitpacking"))]
569            Compression::InlineBitpacking(_) => Err(Error::NotSupported {
570                source: "this runtime was not built with bitpacking support".into(),
571                location: location!(),
572            }),
573            Compression::Variable(variable) => {
574                let Compression::Flat(offsets) = variable
575                    .offsets
576                    .as_ref()
577                    .unwrap()
578                    .compression
579                    .as_ref()
580                    .unwrap()
581                else {
582                    panic!("Variable compression only supports flat offsets")
583                };
584                Ok(Box::new(BinaryMiniBlockDecompressor::new(
585                    offsets.bits_per_value as u8,
586                )))
587            }
588            Compression::Fsst(description) => {
589                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
590                    description.values.as_ref().unwrap(),
591                    decompression_strategy,
592                )?;
593                Ok(Box::new(FsstMiniBlockDecompressor::new(
594                    description,
595                    inner_decompressor,
596                )))
597            }
598            Compression::PackedStruct(description) => Ok(Box::new(
599                PackedStructFixedWidthMiniBlockDecompressor::new(description),
600            )),
601            Compression::FixedSizeList(fsl) => {
602                // In the future, we might need to do something more complex here if FSL supports
603                // compression.
604                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
605            }
606            Compression::Rle(rle) => {
607                let Compression::Flat(values) =
608                    rle.values.as_ref().unwrap().compression.as_ref().unwrap()
609                else {
610                    panic!("RLE compression only supports flat values")
611                };
612                let Compression::Flat(run_lengths) = rle
613                    .run_lengths
614                    .as_ref()
615                    .unwrap()
616                    .compression
617                    .as_ref()
618                    .unwrap()
619                else {
620                    panic!("RLE compression only supports flat run lengths")
621                };
622                assert_eq!(
623                    run_lengths.bits_per_value, 8,
624                    "RLE compression only supports 8-bit run lengths"
625                );
626                Ok(Box::new(RleMiniBlockDecompressor::new(
627                    values.bits_per_value,
628                )))
629            }
630            Compression::ByteStreamSplit(bss) => {
631                let Compression::Flat(values) =
632                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
633                else {
634                    panic!("ByteStreamSplit compression only supports flat values")
635                };
636                Ok(Box::new(ByteStreamSplitDecompressor::new(
637                    values.bits_per_value as usize,
638                )))
639            }
640            Compression::General(general) => {
641                // Create inner decompressor
642                let inner_decompressor = self.create_miniblock_decompressor(
643                    general.values.as_ref().ok_or_else(|| {
644                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
645                    })?,
646                    decompression_strategy,
647                )?;
648
649                // Parse compression config
650                let compression = general.compression.as_ref().ok_or_else(|| {
651                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
652                })?;
653
654                let scheme = compression.scheme().try_into()?;
655
656                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
657                    scheme,
658                    compression.level,
659                );
660
661                Ok(Box::new(GeneralMiniBlockDecompressor::new(
662                    inner_decompressor,
663                    compression_config,
664                )))
665            }
666            _ => todo!(),
667        }
668    }
669
670    fn create_fixed_per_value_decompressor(
671        &self,
672        description: &CompressiveEncoding,
673    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
674        match description.compression.as_ref().unwrap() {
675            Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
676                constant
677                    .value
678                    .as_ref()
679                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
680            ))),
681            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
682            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
683            _ => todo!("fixed-per-value decompressor for {:?}", description),
684        }
685    }
686
687    fn create_variable_per_value_decompressor(
688        &self,
689        description: &CompressiveEncoding,
690    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
691        match description.compression.as_ref().unwrap() {
692            Compression::Variable(variable) => {
693                let Compression::Flat(offsets) = variable
694                    .offsets
695                    .as_ref()
696                    .unwrap()
697                    .compression
698                    .as_ref()
699                    .unwrap()
700                else {
701                    panic!("Variable compression only supports flat offsets")
702                };
703                assert!(offsets.bits_per_value < u8::MAX as u64);
704                Ok(Box::new(VariableDecoder::default()))
705            }
706            Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
707                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
708                Box::new(VariableDecoder::default()),
709            ))),
710            Compression::General(ref general) => {
711                Ok(Box::new(CompressedBufferEncoder::from_scheme(
712                    general.compression.as_ref().expect_ok()?.scheme(),
713                )?))
714            }
715            _ => todo!("variable-per-value decompressor for {:?}", description),
716        }
717    }
718
719    fn create_block_decompressor(
720        &self,
721        description: &CompressiveEncoding,
722    ) -> Result<Box<dyn BlockDecompressor>> {
723        match description.compression.as_ref().unwrap() {
724            Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
725                InlineBitpacking::from_description(inline_bitpacking),
726            )),
727            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
728            Compression::Constant(constant) => {
729                let scalar = constant
730                    .value
731                    .as_ref()
732                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
733                Ok(Box::new(ConstantDecompressor::new(scalar)))
734            }
735            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
736            Compression::OutOfLineBitpacking(out_of_line) => {
737                // Extract the compressed bit width from the values encoding
738                let compressed_bit_width = match out_of_line
739                    .values
740                    .as_ref()
741                    .unwrap()
742                    .compression
743                    .as_ref()
744                    .unwrap()
745                {
746                    Compression::Flat(flat) => flat.bits_per_value,
747                    _ => {
748                        return Err(Error::InvalidInput {
749                            location: location!(),
750                            source: "OutOfLineBitpacking values must use Flat encoding".into(),
751                        })
752                    }
753                };
754                Ok(Box::new(OutOfLineBitpacking::new(
755                    compressed_bit_width,
756                    out_of_line.uncompressed_bits_per_value,
757                )))
758            }
759            _ => todo!(),
760        }
761    }
762}
763
764#[cfg(test)]
765mod tests {
766    use super::*;
767    use crate::buffer::LanceBuffer;
768    use crate::data::{BlockInfo, DataBlock};
769    use arrow_schema::{DataType, Field as ArrowField};
770    use std::collections::HashMap;
771
772    fn create_test_field(name: &str, data_type: DataType) -> Field {
773        let arrow_field = ArrowField::new(name, data_type, true);
774        let mut field = Field::try_from(&arrow_field).unwrap();
775        field.id = -1;
776        field
777    }
778
779    fn create_fixed_width_block_with_stats(
780        bits_per_value: u64,
781        num_values: u64,
782        run_count: u64,
783    ) -> DataBlock {
784        // Create varied data to avoid low entropy
785        let bytes_per_value = (bits_per_value / 8) as usize;
786        let total_bytes = bytes_per_value * num_values as usize;
787        let mut data = vec![0u8; total_bytes];
788
789        // Create data with specified run count
790        let values_per_run = (num_values / run_count).max(1);
791        let mut run_value = 0u8;
792
793        for i in 0..num_values as usize {
794            if i % values_per_run as usize == 0 {
795                run_value = run_value.wrapping_add(17); // Use prime to get varied values
796            }
797            // Fill all bytes of the value to create high entropy
798            for j in 0..bytes_per_value {
799                let byte_offset = i * bytes_per_value + j;
800                if byte_offset < data.len() {
801                    data[byte_offset] = run_value.wrapping_add(j as u8);
802                }
803            }
804        }
805
806        let mut block = FixedWidthDataBlock {
807            bits_per_value,
808            data: LanceBuffer::reinterpret_vec(data),
809            num_values,
810            block_info: BlockInfo::default(),
811        };
812
813        // Compute all statistics including BytePositionEntropy
814        use crate::statistics::ComputeStat;
815        block.compute_stat();
816
817        DataBlock::FixedWidth(block)
818    }
819
820    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
821        // Create data with some variety to avoid always triggering BSS
822        let bytes_per_value = (bits_per_value / 8) as usize;
823        let total_bytes = bytes_per_value * num_values as usize;
824        let mut data = vec![0u8; total_bytes];
825
826        // Add some variation to the data to make it more realistic
827        for i in 0..num_values as usize {
828            let byte_offset = i * bytes_per_value;
829            if byte_offset < data.len() {
830                data[byte_offset] = (i % 256) as u8;
831            }
832        }
833
834        let mut block = FixedWidthDataBlock {
835            bits_per_value,
836            data: LanceBuffer::reinterpret_vec(data),
837            num_values,
838            block_info: BlockInfo::default(),
839        };
840
841        // Compute all statistics including BytePositionEntropy
842        use crate::statistics::ComputeStat;
843        block.compute_stat();
844
845        DataBlock::FixedWidth(block)
846    }
847
848    #[test]
849    fn test_parameter_based_compression() {
850        let mut params = CompressionParams::new();
851
852        // Configure RLE for ID columns with BSS explicitly disabled
853        params.columns.insert(
854            "*_id".to_string(),
855            CompressionFieldParams {
856                rle_threshold: Some(0.3),
857                compression: Some("lz4".to_string()),
858                compression_level: None,
859                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
860            },
861        );
862
863        let strategy = DefaultCompressionStrategy::with_params(params);
864        let field = create_test_field("user_id", DataType::Int32);
865
866        // Create data with low run count for RLE
867        // Use create_fixed_width_block_with_stats which properly sets run count
868        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
869
870        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
871        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
872        let debug_str = format!("{:?}", compressor);
873
874        // The compressor should be RLE wrapped in general compression
875        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
876        assert!(debug_str.contains("RleMiniBlockEncoder"));
877    }
878
879    #[test]
880    fn test_type_level_parameters() {
881        let mut params = CompressionParams::new();
882
883        // Configure all Int32 to use specific settings
884        params.types.insert(
885            "Int32".to_string(),
886            CompressionFieldParams {
887                rle_threshold: Some(0.1), // Very low threshold
888                compression: Some("zstd".to_string()),
889                compression_level: Some(3),
890                bss: Some(BssMode::Off), // Disable BSS to test RLE
891            },
892        );
893
894        let strategy = DefaultCompressionStrategy::with_params(params);
895        let field = create_test_field("some_column", DataType::Int32);
896        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
897        let data = create_fixed_width_block_with_stats(32, 1000, 50);
898
899        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
900        // Should use RLE due to very low threshold
901        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
902    }
903
904    #[test]
905    fn test_none_compression() {
906        let mut params = CompressionParams::new();
907
908        // Disable compression for embeddings
909        params.columns.insert(
910            "embeddings".to_string(),
911            CompressionFieldParams {
912                compression: Some("none".to_string()),
913                ..Default::default()
914            },
915        );
916
917        let strategy = DefaultCompressionStrategy::with_params(params);
918        let field = create_test_field("embeddings", DataType::Float32);
919        let data = create_fixed_width_block(32, 1000);
920
921        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
922        // Should use ValueEncoder (no compression)
923        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
924    }
925
926    #[test]
927    fn test_parameter_merge_priority() {
928        let mut params = CompressionParams::new();
929
930        // Set type-level
931        params.types.insert(
932            "Int32".to_string(),
933            CompressionFieldParams {
934                rle_threshold: Some(0.5),
935                compression: Some("lz4".to_string()),
936                ..Default::default()
937            },
938        );
939
940        // Set column-level (highest priority)
941        params.columns.insert(
942            "user_id".to_string(),
943            CompressionFieldParams {
944                rle_threshold: Some(0.2),
945                compression: Some("zstd".to_string()),
946                compression_level: Some(6),
947                bss: None,
948            },
949        );
950
951        let strategy = DefaultCompressionStrategy::with_params(params);
952
953        // Get merged params
954        let merged = strategy
955            .params
956            .get_field_params("user_id", &DataType::Int32);
957
958        // Column params should override type params
959        assert_eq!(merged.rle_threshold, Some(0.2));
960        assert_eq!(merged.compression, Some("zstd".to_string()));
961        assert_eq!(merged.compression_level, Some(6));
962
963        // Test field with only type params
964        let merged = strategy
965            .params
966            .get_field_params("other_field", &DataType::Int32);
967        assert_eq!(merged.rle_threshold, Some(0.5));
968        assert_eq!(merged.compression, Some("lz4".to_string()));
969        assert_eq!(merged.compression_level, None);
970    }
971
972    #[test]
973    fn test_pattern_matching() {
974        let mut params = CompressionParams::new();
975
976        // Configure pattern for log files
977        params.columns.insert(
978            "log_*".to_string(),
979            CompressionFieldParams {
980                compression: Some("zstd".to_string()),
981                compression_level: Some(6),
982                ..Default::default()
983            },
984        );
985
986        let strategy = DefaultCompressionStrategy::with_params(params);
987
988        // Should match pattern
989        let merged = strategy
990            .params
991            .get_field_params("log_messages", &DataType::Utf8);
992        assert_eq!(merged.compression, Some("zstd".to_string()));
993        assert_eq!(merged.compression_level, Some(6));
994
995        // Should not match
996        let merged = strategy
997            .params
998            .get_field_params("messages_log", &DataType::Utf8);
999        assert_eq!(merged.compression, None);
1000    }
1001
1002    #[test]
1003    fn test_legacy_metadata_support() {
1004        let params = CompressionParams::new();
1005        let strategy = DefaultCompressionStrategy::with_params(params);
1006
1007        // Test field with "none" compression metadata
1008        let mut metadata = HashMap::new();
1009        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1010        let mut field = create_test_field("some_column", DataType::Int32);
1011        field.metadata = metadata;
1012
1013        let data = create_fixed_width_block(32, 1000);
1014        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1015
1016        // Should respect metadata and use ValueEncoder
1017        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1018    }
1019
1020    #[test]
1021    fn test_default_behavior() {
1022        // Empty params should fall back to default behavior
1023        let params = CompressionParams::new();
1024        let strategy = DefaultCompressionStrategy::with_params(params);
1025
1026        let field = create_test_field("random_column", DataType::Int32);
1027        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
1028        let data = create_fixed_width_block_with_stats(32, 1000, 600);
1029
1030        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1031        // Should use default strategy's decision
1032        let debug_str = format!("{:?}", compressor);
1033        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1034    }
1035
1036    #[test]
1037    fn test_field_metadata_compression() {
1038        let params = CompressionParams::new();
1039        let strategy = DefaultCompressionStrategy::with_params(params);
1040
1041        // Test field with compression metadata
1042        let mut metadata = HashMap::new();
1043        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1044        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1045        let mut field = create_test_field("test_column", DataType::Int32);
1046        field.metadata = metadata;
1047
1048        let data = create_fixed_width_block(32, 1000);
1049        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1050
1051        // Should use zstd with level 6
1052        let debug_str = format!("{:?}", compressor);
1053        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1054    }
1055
1056    #[test]
1057    fn test_field_metadata_rle_threshold() {
1058        let params = CompressionParams::new();
1059        let strategy = DefaultCompressionStrategy::with_params(params);
1060
1061        // Test field with RLE threshold metadata
1062        let mut metadata = HashMap::new();
1063        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1064        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
1065        let mut field = create_test_field("test_column", DataType::Int32);
1066        field.metadata = metadata;
1067
1068        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
1069        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
1070        let data = create_fixed_width_block_with_stats(32, 1000, 100);
1071
1072        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1073
1074        // Should use RLE because run_count (100) < num_values * threshold (800)
1075        let debug_str = format!("{:?}", compressor);
1076        assert!(debug_str.contains("RleMiniBlockEncoder"));
1077    }
1078
1079    #[test]
1080    fn test_field_metadata_override_params() {
1081        // Set up params with one configuration
1082        let mut params = CompressionParams::new();
1083        params.columns.insert(
1084            "test_column".to_string(),
1085            CompressionFieldParams {
1086                rle_threshold: Some(0.3),
1087                compression: Some("lz4".to_string()),
1088                compression_level: None,
1089                bss: None,
1090            },
1091        );
1092
1093        let strategy = DefaultCompressionStrategy::with_params(params);
1094
1095        // Field metadata should override params
1096        let mut metadata = HashMap::new();
1097        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1098        let mut field = create_test_field("test_column", DataType::Int32);
1099        field.metadata = metadata;
1100
1101        let data = create_fixed_width_block(32, 1000);
1102        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1103
1104        // Should use none compression (from metadata) instead of lz4 (from params)
1105        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1106    }
1107
1108    #[test]
1109    fn test_field_metadata_mixed_configuration() {
1110        // Configure type-level params
1111        let mut params = CompressionParams::new();
1112        params.types.insert(
1113            "Int32".to_string(),
1114            CompressionFieldParams {
1115                rle_threshold: Some(0.5),
1116                compression: Some("lz4".to_string()),
1117                ..Default::default()
1118            },
1119        );
1120
1121        let strategy = DefaultCompressionStrategy::with_params(params);
1122
1123        // Field metadata provides partial override
1124        let mut metadata = HashMap::new();
1125        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1126        let mut field = create_test_field("test_column", DataType::Int32);
1127        field.metadata = metadata;
1128
1129        let data = create_fixed_width_block(32, 1000);
1130        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1131
1132        // Should use lz4 (from type params) with level 3 (from metadata)
1133        let debug_str = format!("{:?}", compressor);
1134        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1135    }
1136
1137    #[test]
1138    fn test_bss_field_metadata() {
1139        let params = CompressionParams::new();
1140        let strategy = DefaultCompressionStrategy::with_params(params);
1141
1142        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1143        let mut metadata = HashMap::new();
1144        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1145        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1146        let arrow_field =
1147            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1148        let field = Field::try_from(&arrow_field).unwrap();
1149
1150        // Create float data
1151        let data = create_fixed_width_block(32, 100);
1152
1153        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1154        let debug_str = format!("{:?}", compressor);
1155        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1156    }
1157
1158    #[test]
1159    fn test_bss_with_compression() {
1160        let params = CompressionParams::new();
1161        let strategy = DefaultCompressionStrategy::with_params(params);
1162
1163        // Test BSS with LZ4 compression
1164        let mut metadata = HashMap::new();
1165        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1166        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1167        let arrow_field =
1168            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1169        let field = Field::try_from(&arrow_field).unwrap();
1170
1171        // Create double data
1172        let data = create_fixed_width_block(64, 100);
1173
1174        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1175        let debug_str = format!("{:?}", compressor);
1176        // Should have BSS wrapped in general compression
1177        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1178        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1179    }
1180}