lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19use crate::{
20    buffer::LanceBuffer,
21    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
22    constants::{
23        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
24    },
25    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
26    encodings::{
27        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
28        physical::{
29            binary::{
30                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
31                VariableDecoder, VariableEncoder,
32            },
33            bitpacking::InlineBitpacking,
34            block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
35            byte_stream_split::{
36                should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
37            },
38            constant::ConstantDecompressor,
39            fsst::{
40                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
41                FsstPerValueEncoder,
42            },
43            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
44            packed::{
45                PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
46            },
47            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
48            value::{ValueDecompressor, ValueEncoder},
49        },
50    },
51    format::{
52        pb21::{compressive_encoding::Compression, CompressiveEncoding},
53        ProtobufUtils21,
54    },
55    statistics::{GetStat, Stat},
56};
57use arrow::{array::AsArray, datatypes::UInt64Type};
58use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
59use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
60use snafu::location;
61use std::str::FromStr;
62
63/// Default threshold for RLE compression selection.
64/// RLE is chosen when the run count is less than this fraction of total values.
65const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
66
67/// Trait for compression algorithms that compress an entire block of data into one opaque
68/// and self-described chunk.
69///
70/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
71///
72/// This is the most general type of compression.  There are no constraints on the method
73/// of compression it is assumed that the entire block of data will be present at decompression.
74///
75/// This is the least appropriate strategy for random access because we must load the entire
76/// block to access any single value.  This should only be used for cases where random access is never
77/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
78/// mini-block chunks)
79pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
80    /// Compress the data into a single buffer
81    ///
82    /// Also returns a description of the compression that can be used to decompress
83    /// when reading the data back
84    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
85}
86
87/// A trait to pick which compression to use for given data
88///
89/// There are several different kinds of compression.
90///
91/// - Block compression is the most generic, but most difficult to use efficiently
92/// - Per-value compression results in either a fixed width data block or a variable
93///   width data block.  In other words, there is some number of bits per value.
94///   In addition, each value should be independently decompressible.
95/// - Mini-block compression results in a small block of opaque data for chunks
96///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
97///   used for narrow data types (both fixed and variable length) where we can
98///   fit many values into an 16KiB block.
99pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
100    /// Create a block compressor for the given data
101    fn create_block_compressor(
102        &self,
103        field: &Field,
104        data: &DataBlock,
105    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
106
107    /// Create a per-value compressor for the given data
108    fn create_per_value(
109        &self,
110        field: &Field,
111        data: &DataBlock,
112    ) -> Result<Box<dyn PerValueCompressor>>;
113
114    /// Create a mini-block compressor for the given data
115    fn create_miniblock_compressor(
116        &self,
117        field: &Field,
118        data: &DataBlock,
119    ) -> Result<Box<dyn MiniBlockCompressor>>;
120}
121
122#[derive(Debug, Default)]
123pub struct DefaultCompressionStrategy {
124    /// User-configured compression parameters
125    params: CompressionParams,
126}
127
128fn try_bss_for_mini_block(
129    data: &FixedWidthDataBlock,
130    params: &CompressionFieldParams,
131) -> Option<Box<dyn MiniBlockCompressor>> {
132    // BSS requires general compression to be effective
133    // If compression is not set or explicitly disabled, skip BSS
134    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
135        return None;
136    }
137
138    let mode = params.bss.unwrap_or(BssMode::Auto);
139    // should_use_bss already checks for supported bit widths (32/64)
140    if should_use_bss(data, mode) {
141        return Some(Box::new(ByteStreamSplitEncoder::new(
142            data.bits_per_value as usize,
143        )));
144    }
145    None
146}
147
148fn try_rle_for_mini_block(
149    data: &FixedWidthDataBlock,
150    params: &CompressionFieldParams,
151) -> Option<Box<dyn MiniBlockCompressor>> {
152    let bits = data.bits_per_value;
153    if !matches!(bits, 8 | 16 | 32 | 64) {
154        return None;
155    }
156
157    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
158    let threshold = params
159        .rle_threshold
160        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
161
162    if (run_count as f64) < (data.num_values as f64) * threshold {
163        return Some(Box::new(RleMiniBlockEncoder::new()));
164    }
165    None
166}
167
168fn try_bitpack_for_mini_block(data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
169    let bits = data.bits_per_value;
170    if !matches!(bits, 8 | 16 | 32 | 64) {
171        return None;
172    }
173
174    let bit_widths = data.expect_stat(Stat::BitWidth);
175    let widths = bit_widths.as_primitive::<UInt64Type>();
176    let has_all_zeros = widths.values().iter().any(|&w| w == 0);
177    let too_small =
178        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
179
180    if !has_all_zeros && !too_small {
181        return Some(Box::new(InlineBitpacking::new(bits)));
182    }
183    None
184}
185
186fn maybe_wrap_general_for_mini_block(
187    inner: Box<dyn MiniBlockCompressor>,
188    params: &CompressionFieldParams,
189) -> Result<Box<dyn MiniBlockCompressor>> {
190    match params.compression.as_deref() {
191        None | Some("none") | Some("fsst") => Ok(inner),
192        Some(raw) => {
193            let scheme = CompressionScheme::from_str(raw).map_err(|_| {
194                lance_core::Error::invalid_input(
195                    format!("Unknown compression scheme: {raw}"),
196                    location!(),
197                )
198            })?;
199            let cfg = CompressionConfig::new(scheme, params.compression_level);
200            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
201        }
202    }
203}
204
205impl DefaultCompressionStrategy {
206    /// Create a new compression strategy with default behavior
207    pub fn new() -> Self {
208        Self::default()
209    }
210
211    /// Create a new compression strategy with user-configured parameters
212    pub fn with_params(params: CompressionParams) -> Self {
213        Self { params }
214    }
215
216    /// Parse compression parameters from field metadata
217    fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
218        let mut params = CompressionFieldParams::default();
219
220        // Parse compression method
221        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
222            params.compression = Some(compression.clone());
223        }
224
225        // Parse compression level
226        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
227            params.compression_level = level.parse().ok();
228        }
229
230        // Parse RLE threshold
231        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
232            params.rle_threshold = threshold.parse().ok();
233        }
234
235        // Parse BSS mode
236        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
237            match BssMode::parse(bss_str) {
238                Some(mode) => params.bss = Some(mode),
239                None => {
240                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
241                }
242            }
243        }
244
245        params
246    }
247
248    fn build_fixed_width_compressor(
249        &self,
250        params: &CompressionFieldParams,
251        data: &FixedWidthDataBlock,
252    ) -> Result<Box<dyn MiniBlockCompressor>> {
253        if params.compression.as_deref() == Some("none") {
254            return Ok(Box::new(ValueEncoder::default()));
255        }
256
257        let base = try_bss_for_mini_block(data, params)
258            .or_else(|| try_rle_for_mini_block(data, params))
259            .or_else(|| try_bitpack_for_mini_block(data))
260            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
261
262        maybe_wrap_general_for_mini_block(base, params)
263    }
264
265    /// Build compressor based on parameters for variable-width data
266    fn build_variable_width_compressor(
267        &self,
268        params: &CompressionFieldParams,
269        data: &VariableWidthBlock,
270    ) -> Result<Box<dyn MiniBlockCompressor>> {
271        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
272            return Err(Error::invalid_input(
273                format!(
274                    "Variable width compression not supported for {} bit offsets",
275                    data.bits_per_offset
276                ),
277                location!(),
278            ));
279        }
280
281        // Get statistics
282        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
283        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
284
285        // 1. Check for explicit "none" compression
286        if params.compression.as_deref() == Some("none") {
287            return Ok(Box::new(BinaryMiniBlockEncoder::default()));
288        }
289
290        // 2. Check for explicit "fsst" compression
291        if params.compression.as_deref() == Some("fsst") {
292            return Ok(Box::new(FsstMiniBlockEncoder::default()));
293        }
294
295        // 3. Choose base encoder (FSST or Binary) based on data characteristics
296        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
297            >= FSST_LEAST_INPUT_MAX_LENGTH
298            && data_size >= FSST_LEAST_INPUT_SIZE as u64
299        {
300            Box::new(FsstMiniBlockEncoder::default())
301        } else {
302            Box::new(BinaryMiniBlockEncoder::default())
303        };
304
305        // 4. Apply general compression if configured
306        if let Some(compression_scheme) = &params.compression {
307            if compression_scheme != "none" && compression_scheme != "fsst" {
308                let scheme: CompressionScheme = compression_scheme.parse()?;
309                let config = CompressionConfig::new(scheme, params.compression_level);
310                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
311            }
312        }
313
314        Ok(base_encoder)
315    }
316}
317
318impl CompressionStrategy for DefaultCompressionStrategy {
319    fn create_miniblock_compressor(
320        &self,
321        field: &Field,
322        data: &DataBlock,
323    ) -> Result<Box<dyn MiniBlockCompressor>> {
324        let mut field_params = self
325            .params
326            .get_field_params(&field.name, &field.data_type());
327
328        // Override with field metadata if present (highest priority)
329        let metadata_params = Self::parse_field_metadata(field);
330        field_params.merge(&metadata_params);
331
332        match data {
333            DataBlock::FixedWidth(fixed_width_data) => {
334                self.build_fixed_width_compressor(&field_params, fixed_width_data)
335            }
336            DataBlock::VariableWidth(variable_width_data) => {
337                self.build_variable_width_compressor(&field_params, variable_width_data)
338            }
339            DataBlock::Struct(struct_data_block) => {
340                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
341                // just being cautious here.
342                if struct_data_block
343                    .children
344                    .iter()
345                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
346                {
347                    panic!("packed struct encoding currently only supports fixed-width fields.")
348                }
349                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
350            }
351            DataBlock::FixedSizeList(_) => {
352                // Ideally we would compress the list items but this creates something of a challenge.
353                // We don't want to break lists across chunks and we need to worry about inner validity
354                // layers.  If we try and use a compression scheme then it is unlikely to respect these
355                // constraints.
356                //
357                // For now, we just don't compress.  In the future, we might want to consider a more
358                // sophisticated approach.
359                Ok(Box::new(ValueEncoder::default()))
360            }
361            _ => Err(Error::NotSupported {
362                source: format!(
363                    "Mini-block compression not yet supported for block type {}",
364                    data.name()
365                )
366                .into(),
367                location: location!(),
368            }),
369        }
370    }
371
372    fn create_per_value(
373        &self,
374        field: &Field,
375        data: &DataBlock,
376    ) -> Result<Box<dyn PerValueCompressor>> {
377        let field_params = Self::parse_field_metadata(field);
378
379        match data {
380            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
381            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
382            DataBlock::VariableWidth(variable_width) => {
383                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
384                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
385
386                // If values are very large then use block compression on a per-value basis
387                //
388                // TODO: Could maybe use median here
389                if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
390                    return Ok(Box::new(CompressedBufferEncoder::default()));
391                }
392
393                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
394                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
395                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
396
397                    let variable_compression = Box::new(VariableEncoder::default());
398
399                    // Use FSST if explicitly requested or if data characteristics warrant it
400                    if field_params.compression.as_deref() == Some("fsst")
401                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
402                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
403                    {
404                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
405                    } else {
406                        Ok(variable_compression)
407                    }
408                } else {
409                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
410                }
411            }
412            _ => unreachable!(
413                "Per-value compression not yet supported for block type: {}",
414                data.name()
415            ),
416        }
417    }
418
419    fn create_block_compressor(
420        &self,
421        _field: &Field,
422        data: &DataBlock,
423    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
424        // TODO: We should actually compress here!
425        match data {
426            // Currently, block compression is used for rep/def (which is fixed width) and for dictionary
427            // encoding (which could be fixed width or variable width).
428            DataBlock::FixedWidth(fixed_width) => {
429                let encoder = Box::new(ValueEncoder::default());
430                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
431                Ok((encoder, encoding))
432            }
433            DataBlock::VariableWidth(variable_width) => {
434                let encoder = Box::new(VariableEncoder::default());
435                let encoding = ProtobufUtils21::variable(
436                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
437                    None,
438                );
439                Ok((encoder, encoding))
440            }
441            _ => unreachable!(),
442        }
443    }
444}
445
446pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
447    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
448}
449
450pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
451    /// Decompress one or more values
452    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
453    /// The number of bits in each value
454    ///
455    /// Currently (and probably long term) this must be a multiple of 8
456    fn bits_per_value(&self) -> u64;
457}
458
459pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
460    /// Decompress one or more values
461    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
462}
463
464pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
465    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
466}
467
468pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
469    fn create_miniblock_decompressor(
470        &self,
471        description: &CompressiveEncoding,
472        decompression_strategy: &dyn DecompressionStrategy,
473    ) -> Result<Box<dyn MiniBlockDecompressor>>;
474
475    fn create_fixed_per_value_decompressor(
476        &self,
477        description: &CompressiveEncoding,
478    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
479
480    fn create_variable_per_value_decompressor(
481        &self,
482        description: &CompressiveEncoding,
483    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
484
485    fn create_block_decompressor(
486        &self,
487        description: &CompressiveEncoding,
488    ) -> Result<Box<dyn BlockDecompressor>>;
489}
490
491#[derive(Debug, Default)]
492pub struct DefaultDecompressionStrategy {}
493
494impl DecompressionStrategy for DefaultDecompressionStrategy {
495    fn create_miniblock_decompressor(
496        &self,
497        description: &CompressiveEncoding,
498        decompression_strategy: &dyn DecompressionStrategy,
499    ) -> Result<Box<dyn MiniBlockDecompressor>> {
500        match description.compression.as_ref().unwrap() {
501            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
502            Compression::InlineBitpacking(description) => {
503                Ok(Box::new(InlineBitpacking::from_description(description)))
504            }
505            Compression::Variable(variable) => {
506                let Compression::Flat(offsets) = variable
507                    .offsets
508                    .as_ref()
509                    .unwrap()
510                    .compression
511                    .as_ref()
512                    .unwrap()
513                else {
514                    panic!("Variable compression only supports flat offsets")
515                };
516                Ok(Box::new(BinaryMiniBlockDecompressor::new(
517                    offsets.bits_per_value as u8,
518                )))
519            }
520            Compression::Fsst(description) => {
521                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
522                    description.values.as_ref().unwrap(),
523                    decompression_strategy,
524                )?;
525                Ok(Box::new(FsstMiniBlockDecompressor::new(
526                    description,
527                    inner_decompressor,
528                )))
529            }
530            Compression::PackedStruct(description) => Ok(Box::new(
531                PackedStructFixedWidthMiniBlockDecompressor::new(description),
532            )),
533            Compression::FixedSizeList(fsl) => {
534                // In the future, we might need to do something more complex here if FSL supports
535                // compression.
536                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
537            }
538            Compression::Rle(rle) => {
539                let Compression::Flat(values) =
540                    rle.values.as_ref().unwrap().compression.as_ref().unwrap()
541                else {
542                    panic!("RLE compression only supports flat values")
543                };
544                let Compression::Flat(run_lengths) = rle
545                    .run_lengths
546                    .as_ref()
547                    .unwrap()
548                    .compression
549                    .as_ref()
550                    .unwrap()
551                else {
552                    panic!("RLE compression only supports flat run lengths")
553                };
554                assert_eq!(
555                    run_lengths.bits_per_value, 8,
556                    "RLE compression only supports 8-bit run lengths"
557                );
558                Ok(Box::new(RleMiniBlockDecompressor::new(
559                    values.bits_per_value,
560                )))
561            }
562            Compression::ByteStreamSplit(bss) => {
563                let Compression::Flat(values) =
564                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
565                else {
566                    panic!("ByteStreamSplit compression only supports flat values")
567                };
568                Ok(Box::new(ByteStreamSplitDecompressor::new(
569                    values.bits_per_value as usize,
570                )))
571            }
572            Compression::General(general) => {
573                // Create inner decompressor
574                let inner_decompressor = self.create_miniblock_decompressor(
575                    general.values.as_ref().ok_or_else(|| {
576                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
577                    })?,
578                    decompression_strategy,
579                )?;
580
581                // Parse compression config
582                let compression = general.compression.as_ref().ok_or_else(|| {
583                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
584                })?;
585
586                let scheme = compression.scheme().try_into()?;
587
588                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
589                    scheme,
590                    compression.level,
591                );
592
593                Ok(Box::new(GeneralMiniBlockDecompressor::new(
594                    inner_decompressor,
595                    compression_config,
596                )))
597            }
598            _ => todo!(),
599        }
600    }
601
602    fn create_fixed_per_value_decompressor(
603        &self,
604        description: &CompressiveEncoding,
605    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
606        match description.compression.as_ref().unwrap() {
607            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
608            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
609            _ => todo!("fixed-per-value decompressor for {:?}", description),
610        }
611    }
612
613    fn create_variable_per_value_decompressor(
614        &self,
615        description: &CompressiveEncoding,
616    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
617        match description.compression.as_ref().unwrap() {
618            Compression::Variable(variable) => {
619                let Compression::Flat(offsets) = variable
620                    .offsets
621                    .as_ref()
622                    .unwrap()
623                    .compression
624                    .as_ref()
625                    .unwrap()
626                else {
627                    panic!("Variable compression only supports flat offsets")
628                };
629                assert!(offsets.bits_per_value < u8::MAX as u64);
630                Ok(Box::new(VariableDecoder::default()))
631            }
632            Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
633                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
634                Box::new(VariableDecoder::default()),
635            ))),
636            Compression::General(ref general) => {
637                Ok(Box::new(CompressedBufferEncoder::from_scheme(
638                    general.compression.as_ref().expect_ok()?.scheme(),
639                )?))
640            }
641            _ => todo!("variable-per-value decompressor for {:?}", description),
642        }
643    }
644
645    fn create_block_decompressor(
646        &self,
647        description: &CompressiveEncoding,
648    ) -> Result<Box<dyn BlockDecompressor>> {
649        match description.compression.as_ref().unwrap() {
650            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
651            Compression::Constant(constant) => {
652                let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
653                Ok(Box::new(ConstantDecompressor::new(scalar)))
654            }
655            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
656            _ => todo!(),
657        }
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664    use crate::buffer::LanceBuffer;
665    use crate::data::{BlockInfo, DataBlock};
666    use arrow::datatypes::{DataType, Field as ArrowField};
667    use std::collections::HashMap;
668
669    fn create_test_field(name: &str, data_type: DataType) -> Field {
670        let arrow_field = ArrowField::new(name, data_type, true);
671        let mut field = Field::try_from(&arrow_field).unwrap();
672        field.id = -1;
673        field
674    }
675
676    fn create_fixed_width_block_with_stats(
677        bits_per_value: u64,
678        num_values: u64,
679        run_count: u64,
680    ) -> DataBlock {
681        // Create varied data to avoid low entropy
682        let bytes_per_value = (bits_per_value / 8) as usize;
683        let total_bytes = bytes_per_value * num_values as usize;
684        let mut data = vec![0u8; total_bytes];
685
686        // Create data with specified run count
687        let values_per_run = (num_values / run_count).max(1);
688        let mut run_value = 0u8;
689
690        for i in 0..num_values as usize {
691            if i % values_per_run as usize == 0 {
692                run_value = run_value.wrapping_add(17); // Use prime to get varied values
693            }
694            // Fill all bytes of the value to create high entropy
695            for j in 0..bytes_per_value {
696                let byte_offset = i * bytes_per_value + j;
697                if byte_offset < data.len() {
698                    data[byte_offset] = run_value.wrapping_add(j as u8);
699                }
700            }
701        }
702
703        let mut block = FixedWidthDataBlock {
704            bits_per_value,
705            data: LanceBuffer::reinterpret_vec(data),
706            num_values,
707            block_info: BlockInfo::default(),
708        };
709
710        // Compute all statistics including BytePositionEntropy
711        use crate::statistics::ComputeStat;
712        block.compute_stat();
713
714        DataBlock::FixedWidth(block)
715    }
716
717    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
718        // Create data with some variety to avoid always triggering BSS
719        let bytes_per_value = (bits_per_value / 8) as usize;
720        let total_bytes = bytes_per_value * num_values as usize;
721        let mut data = vec![0u8; total_bytes];
722
723        // Add some variation to the data to make it more realistic
724        for i in 0..num_values as usize {
725            let byte_offset = i * bytes_per_value;
726            if byte_offset < data.len() {
727                data[byte_offset] = (i % 256) as u8;
728            }
729        }
730
731        let mut block = FixedWidthDataBlock {
732            bits_per_value,
733            data: LanceBuffer::reinterpret_vec(data),
734            num_values,
735            block_info: BlockInfo::default(),
736        };
737
738        // Compute all statistics including BytePositionEntropy
739        use crate::statistics::ComputeStat;
740        block.compute_stat();
741
742        DataBlock::FixedWidth(block)
743    }
744
745    #[test]
746    fn test_parameter_based_compression() {
747        let mut params = CompressionParams::new();
748
749        // Configure RLE for ID columns with BSS explicitly disabled
750        params.columns.insert(
751            "*_id".to_string(),
752            CompressionFieldParams {
753                rle_threshold: Some(0.3),
754                compression: Some("lz4".to_string()),
755                compression_level: None,
756                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
757            },
758        );
759
760        let strategy = DefaultCompressionStrategy::with_params(params);
761        let field = create_test_field("user_id", DataType::Int32);
762
763        // Create data with low run count for RLE
764        // Use create_fixed_width_block_with_stats which properly sets run count
765        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
766
767        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
768        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
769        let debug_str = format!("{:?}", compressor);
770
771        // The compressor should be RLE wrapped in general compression
772        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
773        assert!(debug_str.contains("RleMiniBlockEncoder"));
774    }
775
776    #[test]
777    fn test_type_level_parameters() {
778        let mut params = CompressionParams::new();
779
780        // Configure all Int32 to use specific settings
781        params.types.insert(
782            "Int32".to_string(),
783            CompressionFieldParams {
784                rle_threshold: Some(0.1), // Very low threshold
785                compression: Some("zstd".to_string()),
786                compression_level: Some(3),
787                bss: Some(BssMode::Off), // Disable BSS to test RLE
788            },
789        );
790
791        let strategy = DefaultCompressionStrategy::with_params(params);
792        let field = create_test_field("some_column", DataType::Int32);
793        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
794        let data = create_fixed_width_block_with_stats(32, 1000, 50);
795
796        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
797        // Should use RLE due to very low threshold
798        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
799    }
800
801    #[test]
802    fn test_none_compression() {
803        let mut params = CompressionParams::new();
804
805        // Disable compression for embeddings
806        params.columns.insert(
807            "embeddings".to_string(),
808            CompressionFieldParams {
809                compression: Some("none".to_string()),
810                ..Default::default()
811            },
812        );
813
814        let strategy = DefaultCompressionStrategy::with_params(params);
815        let field = create_test_field("embeddings", DataType::Float32);
816        let data = create_fixed_width_block(32, 1000);
817
818        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
819        // Should use ValueEncoder (no compression)
820        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
821    }
822
823    #[test]
824    fn test_parameter_merge_priority() {
825        let mut params = CompressionParams::new();
826
827        // Set type-level
828        params.types.insert(
829            "Int32".to_string(),
830            CompressionFieldParams {
831                rle_threshold: Some(0.5),
832                compression: Some("lz4".to_string()),
833                ..Default::default()
834            },
835        );
836
837        // Set column-level (highest priority)
838        params.columns.insert(
839            "user_id".to_string(),
840            CompressionFieldParams {
841                rle_threshold: Some(0.2),
842                compression: Some("zstd".to_string()),
843                compression_level: Some(6),
844                bss: None,
845            },
846        );
847
848        let strategy = DefaultCompressionStrategy::with_params(params);
849
850        // Get merged params
851        let merged = strategy
852            .params
853            .get_field_params("user_id", &DataType::Int32);
854
855        // Column params should override type params
856        assert_eq!(merged.rle_threshold, Some(0.2));
857        assert_eq!(merged.compression, Some("zstd".to_string()));
858        assert_eq!(merged.compression_level, Some(6));
859
860        // Test field with only type params
861        let merged = strategy
862            .params
863            .get_field_params("other_field", &DataType::Int32);
864        assert_eq!(merged.rle_threshold, Some(0.5));
865        assert_eq!(merged.compression, Some("lz4".to_string()));
866        assert_eq!(merged.compression_level, None);
867    }
868
869    #[test]
870    fn test_pattern_matching() {
871        let mut params = CompressionParams::new();
872
873        // Configure pattern for log files
874        params.columns.insert(
875            "log_*".to_string(),
876            CompressionFieldParams {
877                compression: Some("zstd".to_string()),
878                compression_level: Some(6),
879                ..Default::default()
880            },
881        );
882
883        let strategy = DefaultCompressionStrategy::with_params(params);
884
885        // Should match pattern
886        let merged = strategy
887            .params
888            .get_field_params("log_messages", &DataType::Utf8);
889        assert_eq!(merged.compression, Some("zstd".to_string()));
890        assert_eq!(merged.compression_level, Some(6));
891
892        // Should not match
893        let merged = strategy
894            .params
895            .get_field_params("messages_log", &DataType::Utf8);
896        assert_eq!(merged.compression, None);
897    }
898
899    #[test]
900    fn test_legacy_metadata_support() {
901        let params = CompressionParams::new();
902        let strategy = DefaultCompressionStrategy::with_params(params);
903
904        // Test field with "none" compression metadata
905        let mut metadata = HashMap::new();
906        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
907        let mut field = create_test_field("some_column", DataType::Int32);
908        field.metadata = metadata;
909
910        let data = create_fixed_width_block(32, 1000);
911        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
912
913        // Should respect metadata and use ValueEncoder
914        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
915    }
916
917    #[test]
918    fn test_default_behavior() {
919        // Empty params should fall back to default behavior
920        let params = CompressionParams::new();
921        let strategy = DefaultCompressionStrategy::with_params(params);
922
923        let field = create_test_field("random_column", DataType::Int32);
924        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
925        let data = create_fixed_width_block_with_stats(32, 1000, 600);
926
927        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
928        // Should use default strategy's decision
929        let debug_str = format!("{:?}", compressor);
930        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
931    }
932
933    #[test]
934    fn test_field_metadata_compression() {
935        let params = CompressionParams::new();
936        let strategy = DefaultCompressionStrategy::with_params(params);
937
938        // Test field with compression metadata
939        let mut metadata = HashMap::new();
940        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
941        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
942        let mut field = create_test_field("test_column", DataType::Int32);
943        field.metadata = metadata;
944
945        let data = create_fixed_width_block(32, 1000);
946        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
947
948        // Should use zstd with level 6
949        let debug_str = format!("{:?}", compressor);
950        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
951    }
952
953    #[test]
954    fn test_field_metadata_rle_threshold() {
955        let params = CompressionParams::new();
956        let strategy = DefaultCompressionStrategy::with_params(params);
957
958        // Test field with RLE threshold metadata
959        let mut metadata = HashMap::new();
960        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
961        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
962        let mut field = create_test_field("test_column", DataType::Int32);
963        field.metadata = metadata;
964
965        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
966        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
967        let data = create_fixed_width_block_with_stats(32, 1000, 100);
968
969        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
970
971        // Should use RLE because run_count (100) < num_values * threshold (800)
972        let debug_str = format!("{:?}", compressor);
973        assert!(debug_str.contains("RleMiniBlockEncoder"));
974    }
975
976    #[test]
977    fn test_field_metadata_override_params() {
978        // Set up params with one configuration
979        let mut params = CompressionParams::new();
980        params.columns.insert(
981            "test_column".to_string(),
982            CompressionFieldParams {
983                rle_threshold: Some(0.3),
984                compression: Some("lz4".to_string()),
985                compression_level: None,
986                bss: None,
987            },
988        );
989
990        let strategy = DefaultCompressionStrategy::with_params(params);
991
992        // Field metadata should override params
993        let mut metadata = HashMap::new();
994        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
995        let mut field = create_test_field("test_column", DataType::Int32);
996        field.metadata = metadata;
997
998        let data = create_fixed_width_block(32, 1000);
999        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1000
1001        // Should use none compression (from metadata) instead of lz4 (from params)
1002        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1003    }
1004
1005    #[test]
1006    fn test_field_metadata_mixed_configuration() {
1007        // Configure type-level params
1008        let mut params = CompressionParams::new();
1009        params.types.insert(
1010            "Int32".to_string(),
1011            CompressionFieldParams {
1012                rle_threshold: Some(0.5),
1013                compression: Some("lz4".to_string()),
1014                ..Default::default()
1015            },
1016        );
1017
1018        let strategy = DefaultCompressionStrategy::with_params(params);
1019
1020        // Field metadata provides partial override
1021        let mut metadata = HashMap::new();
1022        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1023        let mut field = create_test_field("test_column", DataType::Int32);
1024        field.metadata = metadata;
1025
1026        let data = create_fixed_width_block(32, 1000);
1027        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1028
1029        // Should use lz4 (from type params) with level 3 (from metadata)
1030        let debug_str = format!("{:?}", compressor);
1031        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1032    }
1033
1034    #[test]
1035    fn test_bss_field_metadata() {
1036        let params = CompressionParams::new();
1037        let strategy = DefaultCompressionStrategy::with_params(params);
1038
1039        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1040        let mut metadata = HashMap::new();
1041        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1042        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1043        let arrow_field =
1044            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1045        let field = Field::try_from(&arrow_field).unwrap();
1046
1047        // Create float data
1048        let data = create_fixed_width_block(32, 100);
1049
1050        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1051        let debug_str = format!("{:?}", compressor);
1052        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1053    }
1054
1055    #[test]
1056    fn test_bss_with_compression() {
1057        let params = CompressionParams::new();
1058        let strategy = DefaultCompressionStrategy::with_params(params);
1059
1060        // Test BSS with LZ4 compression
1061        let mut metadata = HashMap::new();
1062        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1063        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1064        let arrow_field =
1065            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1066        let field = Field::try_from(&arrow_field).unwrap();
1067
1068        // Create double data
1069        let data = create_fixed_width_block(64, 100);
1070
1071        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1072        let debug_str = format!("{:?}", compressor);
1073        // Should have BSS wrapped in general compression
1074        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1075        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1076    }
1077}