Skip to main content

lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{
36                CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37                GeneralBlockDecompressor,
38            },
39            byte_stream_split::{
40                ByteStreamSplitDecompressor, ByteStreamSplitEncoder, should_use_bss,
41            },
42            constant::ConstantDecompressor,
43            fsst::{
44                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45                FsstPerValueEncoder,
46            },
47            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48            packed::{
49                PackedStructFixedWidthMiniBlockDecompressor,
50                PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51                PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52                VariablePackedStructFieldKind,
53            },
54            rle::{RleDecompressor, RleEncoder},
55            value::{ValueDecompressor, ValueEncoder},
56        },
57    },
58    format::{
59        ProtobufUtils21,
60        pb21::{CompressiveEncoding, compressive_encoding::Compression},
61    },
62    statistics::{GetStat, Stat},
63    version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use arrow_schema::DataType;
68use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
69use lance_core::{Error, Result, datatypes::Field, error::LanceOptionExt};
70use std::{str::FromStr, sync::Arc};
71
72/// Default threshold for RLE compression selection when the user explicitly provides a threshold.
73///
74/// If no threshold is provided, we use a size model instead of a fixed run ratio.
75/// This preserves existing behavior for users relying on the default, while making
76/// the default selection more type-aware.
77const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79// Minimum block size (32kb) to trigger general block compression
80const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82/// Trait for compression algorithms that compress an entire block of data into one opaque
83/// and self-described chunk.
84///
85/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
86///
87/// This is the most general type of compression.  There are no constraints on the method
88/// of compression it is assumed that the entire block of data will be present at decompression.
89///
90/// This is the least appropriate strategy for random access because we must load the entire
91/// block to access any single value.  This should only be used for cases where random access is never
92/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
93/// mini-block chunks)
94pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95    /// Compress the data into a single buffer
96    ///
97    /// Also returns a description of the compression that can be used to decompress
98    /// when reading the data back
99    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102/// A trait to pick which compression to use for given data
103///
104/// There are several different kinds of compression.
105///
106/// - Block compression is the most generic, but most difficult to use efficiently
107/// - Per-value compression results in either a fixed width data block or a variable
108///   width data block.  In other words, there is some number of bits per value.
109///   In addition, each value should be independently decompressible.
110/// - Mini-block compression results in a small block of opaque data for chunks
111///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
112///   used for narrow data types (both fixed and variable length) where we can
113///   fit many values into an 16KiB block.
114pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115    /// Create a block compressor for the given data
116    fn create_block_compressor(
117        &self,
118        field: &Field,
119        data: &DataBlock,
120    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122    /// Create a per-value compressor for the given data
123    fn create_per_value(
124        &self,
125        field: &Field,
126        data: &DataBlock,
127    ) -> Result<Box<dyn PerValueCompressor>>;
128
129    /// Create a mini-block compressor for the given data
130    fn create_miniblock_compressor(
131        &self,
132        field: &Field,
133        data: &DataBlock,
134    ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139    /// User-configured compression parameters
140    params: CompressionParams,
141    /// The lance file version for compatibilities.
142    version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146    data: &FixedWidthDataBlock,
147    params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149    // BSS requires general compression to be effective
150    // If compression is not set or explicitly disabled, skip BSS
151    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152        return None;
153    }
154
155    let mode = params.bss.unwrap_or(BssMode::Auto);
156    // should_use_bss already checks for supported bit widths (32/64)
157    if should_use_bss(data, mode) {
158        return Some(Box::new(ByteStreamSplitEncoder::new(
159            data.bits_per_value as usize,
160        )));
161    }
162    None
163}
164
165fn try_rle_for_mini_block(
166    data: &FixedWidthDataBlock,
167    params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169    let bits = data.bits_per_value;
170    if !matches!(bits, 8 | 16 | 32 | 64) {
171        return None;
172    }
173
174    let type_size = bits / 8;
175    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176    let threshold = params
177        .rle_threshold
178        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180    // If the user explicitly provided a threshold then honor it as an additional guard.
181    // A lower threshold makes RLE harder to trigger and can be used to avoid CPU overhead.
182    let passes_threshold = match params.rle_threshold {
183        Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184        None => true,
185    };
186
187    if !passes_threshold {
188        return None;
189    }
190
191    // Estimate the encoded size.
192    //
193    // RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into
194    // multiple entries of up to 255 values. We don't know the run length distribution here,
195    // so we conservatively account for splitting with an upper bound.
196    let num_values = data.num_values;
197    let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
198
199    let raw_bytes = (num_values as u128) * (type_size as u128);
200    let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202    if rle_bytes < raw_bytes {
203        #[cfg(feature = "bitpacking")]
204        {
205            if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data)
206                && (bitpack_bytes as u128) < rle_bytes
207            {
208                return None;
209            }
210        }
211        return Some(Box::new(RleEncoder::new()));
212    }
213    None
214}
215
216fn try_rle_for_block(
217    data: &FixedWidthDataBlock,
218    version: LanceFileVersion,
219    params: &CompressionFieldParams,
220) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
221    if version < LanceFileVersion::V2_2 {
222        return None;
223    }
224
225    let bits = data.bits_per_value;
226    if !matches!(bits, 8 | 16 | 32 | 64) {
227        return None;
228    }
229
230    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
231    let threshold = params
232        .rle_threshold
233        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
234
235    if (run_count as f64) < (data.num_values as f64) * threshold {
236        let compressor = Box::new(RleEncoder::new());
237        let encoding = ProtobufUtils21::rle(
238            ProtobufUtils21::flat(bits, None),
239            ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
240        );
241        return Some((compressor, encoding));
242    }
243    None
244}
245
246fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
247    #[cfg(feature = "bitpacking")]
248    {
249        let bits = _data.bits_per_value;
250        if estimate_inline_bitpacking_bytes(_data).is_some() {
251            return Some(Box::new(InlineBitpacking::new(bits)));
252        }
253        None
254    }
255    #[cfg(not(feature = "bitpacking"))]
256    {
257        None
258    }
259}
260
261#[cfg(feature = "bitpacking")]
262fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
263    use arrow_array::cast::AsArray;
264
265    let bits = data.bits_per_value;
266    if !matches!(bits, 8 | 16 | 32 | 64) {
267        return None;
268    }
269    if data.num_values == 0 {
270        return None;
271    }
272
273    let bit_widths = data.expect_stat(Stat::BitWidth);
274    let widths = bit_widths.as_primitive::<UInt64Type>();
275
276    let words_per_chunk: u128 = 1;
277    let word_bytes: u128 = (bits / 8) as u128;
278    let mut total_words: u128 = 0;
279    for i in 0..widths.len() {
280        let bit_width = widths.value(i) as u128;
281        let packed_words = (1024u128 * bit_width) / (bits as u128);
282        total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
283    }
284
285    let estimated_bytes = total_words.saturating_mul(word_bytes);
286    let raw_bytes = data.data_size() as u128;
287
288    if estimated_bytes >= raw_bytes {
289        return None;
290    }
291
292    u64::try_from(estimated_bytes).ok()
293}
294
295fn try_bitpack_for_block(
296    data: &FixedWidthDataBlock,
297) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
298    let bits = data.bits_per_value;
299    if !matches!(bits, 8 | 16 | 32 | 64) {
300        return None;
301    }
302
303    let bit_widths = data.expect_stat(Stat::BitWidth);
304    let widths = bit_widths.as_primitive::<UInt64Type>();
305    let has_all_zeros = widths.values().contains(&0);
306    let max_bit_width = *widths.values().iter().max().unwrap();
307
308    let too_small =
309        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
310
311    if has_all_zeros || too_small {
312        return None;
313    }
314
315    if data.num_values <= 1024 {
316        let compressor = Box::new(InlineBitpacking::new(bits));
317        let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
318        Some((compressor, encoding))
319    } else {
320        let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
321        let encoding = ProtobufUtils21::out_of_line_bitpacking(
322            bits,
323            ProtobufUtils21::flat(max_bit_width, None),
324        );
325        Some((compressor, encoding))
326    }
327}
328
329fn maybe_wrap_general_for_mini_block(
330    inner: Box<dyn MiniBlockCompressor>,
331    params: &CompressionFieldParams,
332) -> Result<Box<dyn MiniBlockCompressor>> {
333    match params.compression.as_deref() {
334        None | Some("none") | Some("fsst") => Ok(inner),
335        Some(raw) => {
336            let scheme = CompressionScheme::from_str(raw)
337                .map_err(|_| Error::invalid_input(format!("Unknown compression scheme: {raw}")))?;
338            let cfg = CompressionConfig::new(scheme, params.compression_level);
339            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
340        }
341    }
342}
343
344fn try_general_compression(
345    version: LanceFileVersion,
346    field_params: &CompressionFieldParams,
347    data: &DataBlock,
348) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
349    // Explicitly disable general compression.
350    if field_params.compression.as_deref() == Some("none") {
351        return Ok(None);
352    }
353
354    // User-requested compression (unused today but perhaps still used
355    // in the future someday)
356    if let Some(compression_scheme) = &field_params.compression
357        && version >= LanceFileVersion::V2_2
358    {
359        let scheme: CompressionScheme = compression_scheme.parse()?;
360        let config = CompressionConfig::new(scheme, field_params.compression_level);
361        let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
362        return Ok(Some((compressor, config)));
363    }
364
365    // Automatic compression for large blocks
366    if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
367        && version >= LanceFileVersion::V2_2
368    {
369        let compressor = Box::new(CompressedBufferEncoder::default());
370        let config = compressor.compressor.config();
371        return Ok(Some((compressor, config)));
372    }
373
374    Ok(None)
375}
376
377impl DefaultCompressionStrategy {
378    /// Create a new compression strategy with default behavior
379    pub fn new() -> Self {
380        Self::default()
381    }
382
383    /// Create a new compression strategy with user-configured parameters
384    pub fn with_params(params: CompressionParams) -> Self {
385        Self {
386            params,
387            version: LanceFileVersion::default(),
388        }
389    }
390
391    /// Override the file version used to make compression decisions
392    pub fn with_version(mut self, version: LanceFileVersion) -> Self {
393        self.version = version;
394        self
395    }
396
397    /// Parse compression parameters from field metadata
398    fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
399        let mut params = CompressionFieldParams::default();
400
401        // Parse compression method
402        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
403            params.compression = Some(compression.clone());
404        }
405
406        // Parse compression level
407        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
408            params.compression_level = level.parse().ok();
409        }
410
411        // Parse RLE threshold
412        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
413            params.rle_threshold = threshold.parse().ok();
414        }
415
416        // Parse BSS mode
417        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
418            match BssMode::parse(bss_str) {
419                Some(mode) => params.bss = Some(mode),
420                None => {
421                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
422                }
423            }
424        }
425
426        // Parse minichunk size
427        if let Some(minichunk_size_str) = field
428            .metadata
429            .get(super::constants::MINICHUNK_SIZE_META_KEY)
430        {
431            if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
432                // for lance v2.1, only 32kb or smaller is supported
433                if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
434                    log::warn!(
435                        "minichunk_size '{}' too large for version '{}', using default",
436                        minichunk_size,
437                        version
438                    );
439                } else {
440                    params.minichunk_size = Some(minichunk_size);
441                }
442            } else {
443                log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
444            }
445        }
446
447        params
448    }
449
450    fn build_fixed_width_compressor(
451        &self,
452        params: &CompressionFieldParams,
453        data: &FixedWidthDataBlock,
454    ) -> Result<Box<dyn MiniBlockCompressor>> {
455        if params.compression.as_deref() == Some("none") {
456            return Ok(Box::new(ValueEncoder::default()));
457        }
458
459        let base = try_bss_for_mini_block(data, params)
460            .or_else(|| try_rle_for_mini_block(data, params))
461            .or_else(|| try_bitpack_for_mini_block(data))
462            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
463
464        maybe_wrap_general_for_mini_block(base, params)
465    }
466
467    /// Build compressor based on parameters for variable-width data
468    fn build_variable_width_compressor(
469        &self,
470        field: &Field,
471        data: &VariableWidthBlock,
472    ) -> Result<Box<dyn MiniBlockCompressor>> {
473        let params = self.get_merged_field_params(field);
474        let compression = params.compression.as_deref();
475        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
476            return Err(Error::invalid_input(format!(
477                "Variable width compression not supported for {} bit offsets",
478                data.bits_per_offset
479            )));
480        }
481
482        // Get statistics
483        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
484        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
485
486        // Explicitly disable all compression.
487        if compression == Some("none") {
488            return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
489        }
490
491        let use_fsst = compression == Some("fsst")
492            || (compression.is_none()
493                && !matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
494                && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
495                && data_size >= FSST_LEAST_INPUT_SIZE as u64);
496
497        // Choose base encoder (FSST or Binary) once.
498        let mut base_encoder: Box<dyn MiniBlockCompressor> = if use_fsst {
499            Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
500        } else {
501            Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
502        };
503
504        // Wrap with general compression when configured (except FSST / none).
505        if let Some(compression_scheme) = compression.filter(|scheme| *scheme != "fsst") {
506            let scheme: CompressionScheme = compression_scheme.parse()?;
507            let config = CompressionConfig::new(scheme, params.compression_level);
508            base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
509        }
510
511        Ok(base_encoder)
512    }
513
514    /// Merge user-configured parameters with field metadata
515    /// Field metadata has highest priority
516    fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
517        let mut field_params = self
518            .params
519            .get_field_params(&field.name, &field.data_type());
520
521        // Override with field metadata if present (highest priority)
522        let metadata_params = Self::parse_field_metadata(field, &self.version);
523        field_params.merge(&metadata_params);
524
525        field_params
526    }
527}
528
529impl CompressionStrategy for DefaultCompressionStrategy {
530    fn create_miniblock_compressor(
531        &self,
532        field: &Field,
533        data: &DataBlock,
534    ) -> Result<Box<dyn MiniBlockCompressor>> {
535        match data {
536            DataBlock::FixedWidth(fixed_width_data) => {
537                let field_params = self.get_merged_field_params(field);
538                self.build_fixed_width_compressor(&field_params, fixed_width_data)
539            }
540            DataBlock::VariableWidth(variable_width_data) => {
541                self.build_variable_width_compressor(field, variable_width_data)
542            }
543            DataBlock::Struct(struct_data_block) => {
544                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
545                // just being cautious here.
546                if struct_data_block.has_variable_width_child() {
547                    return Err(Error::invalid_input(
548                        "Packed struct mini-block encoding supports only fixed-width children",
549                    ));
550                }
551                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
552            }
553            DataBlock::FixedSizeList(_) => {
554                // Ideally we would compress the list items but this creates something of a challenge.
555                // We don't want to break lists across chunks and we need to worry about inner validity
556                // layers.  If we try and use a compression scheme then it is unlikely to respect these
557                // constraints.
558                //
559                // For now, we just don't compress.  In the future, we might want to consider a more
560                // sophisticated approach.
561                Ok(Box::new(ValueEncoder::default()))
562            }
563            _ => Err(Error::not_supported_source(
564                format!(
565                    "Mini-block compression not yet supported for block type {}",
566                    data.name()
567                )
568                .into(),
569            )),
570        }
571    }
572
573    fn create_per_value(
574        &self,
575        field: &Field,
576        data: &DataBlock,
577    ) -> Result<Box<dyn PerValueCompressor>> {
578        let field_params = self.get_merged_field_params(field);
579
580        match data {
581            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
582            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
583            DataBlock::Struct(struct_block) => {
584                if field.children.len() != struct_block.children.len() {
585                    return Err(Error::invalid_input(
586                        "Struct field metadata does not match data block children",
587                    ));
588                }
589                let has_variable_child = struct_block.has_variable_width_child();
590                if has_variable_child {
591                    if self.version < LanceFileVersion::V2_2 {
592                        return Err(Error::not_supported_source("Variable packed struct encoding requires Lance file version 2.2 or later".into()));
593                    }
594                    Ok(Box::new(PackedStructVariablePerValueEncoder::new(
595                        self.clone(),
596                        field.children.clone(),
597                    )))
598                } else {
599                    Err(Error::invalid_input(
600                        "Packed struct per-value compression should not be used for fixed-width-only structs",
601                    ))
602                }
603            }
604            DataBlock::VariableWidth(variable_width) => {
605                let compression = field_params.compression.as_deref();
606                // Check for explicit "none" compression
607                if compression == Some("none") {
608                    return Ok(Box::new(VariableEncoder::default()));
609                }
610
611                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
612                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
613
614                // If values are very large then use block compression on a per-value basis
615                //
616                // TODO: Could maybe use median here
617
618                let per_value_requested =
619                    compression.is_some_and(|compression| compression != "fsst");
620
621                if (max_len > 32 * 1024 || per_value_requested)
622                    && data_size >= FSST_LEAST_INPUT_SIZE as u64
623                {
624                    return Ok(Box::new(CompressedBufferEncoder::default()));
625                }
626
627                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
628                    let variable_compression = Box::new(VariableEncoder::default());
629                    let use_fsst = compression == Some("fsst")
630                        || (compression.is_none()
631                            && !matches!(
632                                field.data_type(),
633                                DataType::Binary | DataType::LargeBinary
634                            )
635                            && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
636                            && data_size >= FSST_LEAST_INPUT_SIZE as u64);
637
638                    // Use FSST if explicitly requested or if data characteristics warrant it.
639                    if use_fsst {
640                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
641                    } else {
642                        Ok(variable_compression)
643                    }
644                } else {
645                    panic!(
646                        "Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.",
647                        variable_width.bits_per_offset
648                    );
649                }
650            }
651            _ => unreachable!(
652                "Per-value compression not yet supported for block type: {}",
653                data.name()
654            ),
655        }
656    }
657
658    fn create_block_compressor(
659        &self,
660        field: &Field,
661        data: &DataBlock,
662    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
663        let field_params = self.get_merged_field_params(field);
664
665        match data {
666            DataBlock::FixedWidth(fixed_width) => {
667                if let Some((compressor, encoding)) =
668                    try_rle_for_block(fixed_width, self.version, &field_params)
669                {
670                    return Ok((compressor, encoding));
671                }
672                if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
673                    return Ok((compressor, encoding));
674                }
675
676                // Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION)
677                if let Some((compressor, config)) =
678                    try_general_compression(self.version, &field_params, data)?
679                {
680                    let encoding = ProtobufUtils21::wrapped(
681                        config,
682                        ProtobufUtils21::flat(fixed_width.bits_per_value, None),
683                    )?;
684                    return Ok((compressor, encoding));
685                }
686
687                let encoder = Box::new(ValueEncoder::default());
688                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
689                Ok((encoder, encoding))
690            }
691            DataBlock::VariableWidth(variable_width) => {
692                // Try general compression
693                if let Some((compressor, config)) =
694                    try_general_compression(self.version, &field_params, data)?
695                {
696                    let encoding = ProtobufUtils21::wrapped(
697                        config,
698                        ProtobufUtils21::variable(
699                            ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
700                            None,
701                        ),
702                    )?;
703                    return Ok((compressor, encoding));
704                }
705
706                let encoder = Box::new(VariableEncoder::default());
707                let encoding = ProtobufUtils21::variable(
708                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
709                    None,
710                );
711                Ok((encoder, encoding))
712            }
713            _ => unreachable!(),
714        }
715    }
716}
717
718pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
719    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
720}
721
722pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
723    /// Decompress one or more values
724    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
725    /// The number of bits in each value
726    ///
727    /// Currently (and probably long term) this must be a multiple of 8
728    fn bits_per_value(&self) -> u64;
729}
730
731pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
732    /// Decompress one or more values
733    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
734}
735
736pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
737    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
738}
739
740pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
741    fn create_miniblock_decompressor(
742        &self,
743        description: &CompressiveEncoding,
744        decompression_strategy: &dyn DecompressionStrategy,
745    ) -> Result<Box<dyn MiniBlockDecompressor>>;
746
747    fn create_fixed_per_value_decompressor(
748        &self,
749        description: &CompressiveEncoding,
750    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
751
752    fn create_variable_per_value_decompressor(
753        &self,
754        description: &CompressiveEncoding,
755    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
756
757    fn create_block_decompressor(
758        &self,
759        description: &CompressiveEncoding,
760    ) -> Result<Box<dyn BlockDecompressor>>;
761}
762
763#[derive(Debug, Default)]
764pub struct DefaultDecompressionStrategy {}
765
766impl DecompressionStrategy for DefaultDecompressionStrategy {
767    fn create_miniblock_decompressor(
768        &self,
769        description: &CompressiveEncoding,
770        decompression_strategy: &dyn DecompressionStrategy,
771    ) -> Result<Box<dyn MiniBlockDecompressor>> {
772        match description.compression.as_ref().unwrap() {
773            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
774            #[cfg(feature = "bitpacking")]
775            Compression::InlineBitpacking(description) => {
776                Ok(Box::new(InlineBitpacking::from_description(description)))
777            }
778            #[cfg(not(feature = "bitpacking"))]
779            Compression::InlineBitpacking(_) => Err(Error::not_supported_source(
780                "this runtime was not built with bitpacking support".into(),
781            )),
782            Compression::Variable(variable) => {
783                let Compression::Flat(offsets) = variable
784                    .offsets
785                    .as_ref()
786                    .unwrap()
787                    .compression
788                    .as_ref()
789                    .unwrap()
790                else {
791                    panic!("Variable compression only supports flat offsets")
792                };
793                Ok(Box::new(BinaryMiniBlockDecompressor::new(
794                    offsets.bits_per_value as u8,
795                )))
796            }
797            Compression::Fsst(description) => {
798                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
799                    description.values.as_ref().unwrap(),
800                    decompression_strategy,
801                )?;
802                Ok(Box::new(FsstMiniBlockDecompressor::new(
803                    description,
804                    inner_decompressor,
805                )))
806            }
807            Compression::PackedStruct(description) => Ok(Box::new(
808                PackedStructFixedWidthMiniBlockDecompressor::new(description),
809            )),
810            Compression::VariablePackedStruct(_) => Err(Error::not_supported_source(
811                "variable packed struct decoding is not yet implemented".into(),
812            )),
813            Compression::FixedSizeList(fsl) => {
814                // In the future, we might need to do something more complex here if FSL supports
815                // compression.
816                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
817            }
818            Compression::Rle(rle) => {
819                let bits_per_value = validate_rle_compression(rle)?;
820                Ok(Box::new(RleDecompressor::new(bits_per_value)))
821            }
822            Compression::ByteStreamSplit(bss) => {
823                let Compression::Flat(values) =
824                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
825                else {
826                    panic!("ByteStreamSplit compression only supports flat values")
827                };
828                Ok(Box::new(ByteStreamSplitDecompressor::new(
829                    values.bits_per_value as usize,
830                )))
831            }
832            Compression::General(general) => {
833                // Create inner decompressor
834                let inner_decompressor = self.create_miniblock_decompressor(
835                    general.values.as_ref().ok_or_else(|| {
836                        Error::invalid_input("GeneralMiniBlock missing inner encoding")
837                    })?,
838                    decompression_strategy,
839                )?;
840
841                // Parse compression config
842                let compression = general.compression.as_ref().ok_or_else(|| {
843                    Error::invalid_input("GeneralMiniBlock missing compression config")
844                })?;
845
846                let scheme = compression.scheme().try_into()?;
847
848                let compression_config = CompressionConfig::new(scheme, compression.level);
849
850                Ok(Box::new(GeneralMiniBlockDecompressor::new(
851                    inner_decompressor,
852                    compression_config,
853                )))
854            }
855            _ => todo!(),
856        }
857    }
858
859    fn create_fixed_per_value_decompressor(
860        &self,
861        description: &CompressiveEncoding,
862    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
863        match description.compression.as_ref().unwrap() {
864            Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
865                constant
866                    .value
867                    .as_ref()
868                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
869            ))),
870            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
871            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
872            _ => todo!("fixed-per-value decompressor for {:?}", description),
873        }
874    }
875
876    fn create_variable_per_value_decompressor(
877        &self,
878        description: &CompressiveEncoding,
879    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
880        match description.compression.as_ref().unwrap() {
881            Compression::Variable(variable) => {
882                let Compression::Flat(offsets) = variable
883                    .offsets
884                    .as_ref()
885                    .unwrap()
886                    .compression
887                    .as_ref()
888                    .unwrap()
889                else {
890                    panic!("Variable compression only supports flat offsets")
891                };
892                assert!(offsets.bits_per_value < u8::MAX as u64);
893                Ok(Box::new(VariableDecoder::default()))
894            }
895            Compression::Fsst(fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
896                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
897                Box::new(VariableDecoder::default()),
898            ))),
899            Compression::General(general) => Ok(Box::new(CompressedBufferEncoder::from_scheme(
900                general.compression.as_ref().expect_ok()?.scheme(),
901            )?)),
902            Compression::VariablePackedStruct(description) => {
903                let mut fields = Vec::with_capacity(description.fields.len());
904                for field in &description.fields {
905                    let value_encoding = field.value.as_ref().ok_or_else(|| {
906                        Error::invalid_input("VariablePackedStruct field is missing value encoding")
907                    })?;
908                    let decoder = match field.layout.as_ref().ok_or_else(|| {
909                        Error::invalid_input("VariablePackedStruct field is missing layout details")
910                    })? {
911                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
912                            bits_per_value,
913                        ) => {
914                            let decompressor =
915                                self.create_fixed_per_value_decompressor(value_encoding)?;
916                            VariablePackedStructFieldDecoder {
917                                kind: VariablePackedStructFieldKind::Fixed {
918                                    bits_per_value: *bits_per_value,
919                                    decompressor: Arc::from(decompressor),
920                                },
921                            }
922                        }
923                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
924                            bits_per_length,
925                        ) => {
926                            let decompressor =
927                                self.create_variable_per_value_decompressor(value_encoding)?;
928                            VariablePackedStructFieldDecoder {
929                                kind: VariablePackedStructFieldKind::Variable {
930                                    bits_per_length: *bits_per_length,
931                                    decompressor: Arc::from(decompressor),
932                                },
933                            }
934                        }
935                    };
936                    fields.push(decoder);
937                }
938                Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
939                    fields,
940                )))
941            }
942            _ => todo!("variable-per-value decompressor for {:?}", description),
943        }
944    }
945
946    fn create_block_decompressor(
947        &self,
948        description: &CompressiveEncoding,
949    ) -> Result<Box<dyn BlockDecompressor>> {
950        match description.compression.as_ref().unwrap() {
951            Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
952                InlineBitpacking::from_description(inline_bitpacking),
953            )),
954            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
955            Compression::Constant(constant) => {
956                let scalar = constant
957                    .value
958                    .as_ref()
959                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
960                Ok(Box::new(ConstantDecompressor::new(scalar)))
961            }
962            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
963            Compression::FixedSizeList(fsl) => {
964                Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
965            }
966            Compression::OutOfLineBitpacking(out_of_line) => {
967                // Extract the compressed bit width from the values encoding
968                let compressed_bit_width = match out_of_line
969                    .values
970                    .as_ref()
971                    .unwrap()
972                    .compression
973                    .as_ref()
974                    .unwrap()
975                {
976                    Compression::Flat(flat) => flat.bits_per_value,
977                    _ => {
978                        return Err(Error::invalid_input_source(
979                            "OutOfLineBitpacking values must use Flat encoding".into(),
980                        ));
981                    }
982                };
983                Ok(Box::new(OutOfLineBitpacking::new(
984                    compressed_bit_width,
985                    out_of_line.uncompressed_bits_per_value,
986                )))
987            }
988            Compression::General(general) => {
989                let inner_desc = general
990                    .values
991                    .as_ref()
992                    .ok_or_else(|| {
993                        Error::invalid_input("General compression missing inner encoding")
994                    })?
995                    .as_ref();
996                let inner_decompressor = self.create_block_decompressor(inner_desc)?;
997
998                let compression = general.compression.as_ref().ok_or_else(|| {
999                    Error::invalid_input("General compression missing compression config")
1000                })?;
1001                let scheme = compression.scheme().try_into()?;
1002                let config = CompressionConfig::new(scheme, compression.level);
1003                let general_decompressor =
1004                    GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1005
1006                Ok(Box::new(general_decompressor))
1007            }
1008            Compression::Rle(rle) => {
1009                let bits_per_value = validate_rle_compression(rle)?;
1010                Ok(Box::new(RleDecompressor::new(bits_per_value)))
1011            }
1012            _ => todo!(),
1013        }
1014    }
1015}
1016/// Validates RLE compression format and extracts bits_per_value
1017fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
1018    let values = rle
1019        .values
1020        .as_ref()
1021        .ok_or_else(|| Error::invalid_input("RLE compression missing values encoding"))?;
1022    let run_lengths = rle
1023        .run_lengths
1024        .as_ref()
1025        .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths encoding"))?;
1026
1027    let values = values
1028        .compression
1029        .as_ref()
1030        .ok_or_else(|| Error::invalid_input("RLE compression missing values compression"))?;
1031    let Compression::Flat(values) = values else {
1032        return Err(Error::invalid_input(
1033            "RLE compression only supports flat values",
1034        ));
1035    };
1036
1037    let run_lengths = run_lengths
1038        .compression
1039        .as_ref()
1040        .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths compression"))?;
1041    let Compression::Flat(run_lengths) = run_lengths else {
1042        return Err(Error::invalid_input(
1043            "RLE compression only supports flat run lengths",
1044        ));
1045    };
1046
1047    if run_lengths.bits_per_value != 8 {
1048        return Err(Error::invalid_input(format!(
1049            "RLE compression only supports 8-bit run lengths, got {}",
1050            run_lengths.bits_per_value
1051        )));
1052    }
1053
1054    Ok(values.bits_per_value)
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059    use super::*;
1060    use crate::buffer::LanceBuffer;
1061    use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1062    use crate::statistics::ComputeStat;
1063    use crate::testing::extract_array_encoding_chain;
1064    use arrow_schema::{DataType, Field as ArrowField};
1065    use std::collections::HashMap;
1066
1067    fn create_test_field(name: &str, data_type: DataType) -> Field {
1068        let arrow_field = ArrowField::new(name, data_type, true);
1069        let mut field = Field::try_from(&arrow_field).unwrap();
1070        field.id = -1;
1071        field
1072    }
1073
1074    fn create_fixed_width_block_with_stats(
1075        bits_per_value: u64,
1076        num_values: u64,
1077        run_count: u64,
1078    ) -> DataBlock {
1079        // Create varied data to avoid low entropy
1080        let bytes_per_value = (bits_per_value / 8) as usize;
1081        let total_bytes = bytes_per_value * num_values as usize;
1082        let mut data = vec![0u8; total_bytes];
1083
1084        // Create data with specified run count
1085        let values_per_run = (num_values / run_count).max(1);
1086        let mut run_value = 0u8;
1087
1088        for i in 0..num_values as usize {
1089            if i % values_per_run as usize == 0 {
1090                run_value = run_value.wrapping_add(17); // Use prime to get varied values
1091            }
1092            // Fill all bytes of the value to create high entropy
1093            for j in 0..bytes_per_value {
1094                let byte_offset = i * bytes_per_value + j;
1095                if byte_offset < data.len() {
1096                    data[byte_offset] = run_value.wrapping_add(j as u8);
1097                }
1098            }
1099        }
1100
1101        let mut block = FixedWidthDataBlock {
1102            bits_per_value,
1103            data: LanceBuffer::reinterpret_vec(data),
1104            num_values,
1105            block_info: BlockInfo::default(),
1106        };
1107
1108        // Compute all statistics including BytePositionEntropy
1109        use crate::statistics::ComputeStat;
1110        block.compute_stat();
1111
1112        DataBlock::FixedWidth(block)
1113    }
1114
1115    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1116        // Create data with some variety to avoid always triggering BSS
1117        let bytes_per_value = (bits_per_value / 8) as usize;
1118        let total_bytes = bytes_per_value * num_values as usize;
1119        let mut data = vec![0u8; total_bytes];
1120
1121        // Add some variation to the data to make it more realistic
1122        for i in 0..num_values as usize {
1123            let byte_offset = i * bytes_per_value;
1124            if byte_offset < data.len() {
1125                data[byte_offset] = (i % 256) as u8;
1126            }
1127        }
1128
1129        let mut block = FixedWidthDataBlock {
1130            bits_per_value,
1131            data: LanceBuffer::reinterpret_vec(data),
1132            num_values,
1133            block_info: BlockInfo::default(),
1134        };
1135
1136        // Compute all statistics including BytePositionEntropy
1137        use crate::statistics::ComputeStat;
1138        block.compute_stat();
1139
1140        DataBlock::FixedWidth(block)
1141    }
1142
1143    fn create_variable_width_block(
1144        bits_per_offset: u8,
1145        num_values: u64,
1146        avg_value_size: usize,
1147    ) -> DataBlock {
1148        use crate::statistics::ComputeStat;
1149
1150        // Create offsets buffer (num_values + 1 offsets)
1151        let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1152        let mut current_offset = 0i64;
1153        offsets.push(current_offset);
1154
1155        // Generate offsets with varying value sizes
1156        for i in 0..num_values {
1157            let value_size = if avg_value_size == 0 {
1158                1
1159            } else {
1160                ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1161                    .min(avg_value_size * 2)
1162            };
1163            current_offset += value_size as i64;
1164            offsets.push(current_offset);
1165        }
1166
1167        // Create data buffer with realistic content
1168        let total_data_size = current_offset as usize;
1169        let mut data = vec![0u8; total_data_size];
1170
1171        // Fill data with varied content
1172        for i in 0..num_values {
1173            let start_offset = offsets[i as usize] as usize;
1174            let end_offset = offsets[(i + 1) as usize] as usize;
1175
1176            let content = (i % 256) as u8;
1177            for j in 0..end_offset - start_offset {
1178                data[start_offset + j] = content.wrapping_add(j as u8);
1179            }
1180        }
1181
1182        // Convert offsets to appropriate lance buffer
1183        let offsets_buffer = match bits_per_offset {
1184            32 => {
1185                let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1186                LanceBuffer::reinterpret_vec(offsets_32)
1187            }
1188            64 => LanceBuffer::reinterpret_vec(offsets),
1189            _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1190        };
1191
1192        let mut block = VariableWidthBlock {
1193            data: LanceBuffer::from(data),
1194            offsets: offsets_buffer,
1195            bits_per_offset,
1196            num_values,
1197            block_info: BlockInfo::default(),
1198        };
1199
1200        block.compute_stat();
1201        DataBlock::VariableWidth(block)
1202    }
1203
1204    fn create_fsst_candidate_variable_width_block() -> DataBlock {
1205        create_variable_width_block(32, 4096, FSST_LEAST_INPUT_MAX_LENGTH as usize + 16)
1206    }
1207
1208    #[test]
1209    fn test_parameter_based_compression() {
1210        let mut params = CompressionParams::new();
1211
1212        // Configure RLE for ID columns with BSS explicitly disabled
1213        params.columns.insert(
1214            "*_id".to_string(),
1215            CompressionFieldParams {
1216                rle_threshold: Some(0.3),
1217                compression: Some("lz4".to_string()),
1218                compression_level: None,
1219                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
1220                minichunk_size: None,
1221            },
1222        );
1223
1224        let strategy = DefaultCompressionStrategy::with_params(params);
1225        let field = create_test_field("user_id", DataType::Int32);
1226
1227        // Create data with low run count for RLE
1228        // Use create_fixed_width_block_with_stats which properly sets run count
1229        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
1230
1231        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1232        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
1233        let debug_str = format!("{:?}", compressor);
1234
1235        // The compressor should be RLE wrapped in general compression
1236        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1237        assert!(debug_str.contains("RleEncoder"));
1238    }
1239
1240    #[test]
1241    fn test_type_level_parameters() {
1242        let mut params = CompressionParams::new();
1243
1244        // Configure all Int32 to use specific settings
1245        params.types.insert(
1246            "Int32".to_string(),
1247            CompressionFieldParams {
1248                rle_threshold: Some(0.1), // Very low threshold
1249                compression: Some("zstd".to_string()),
1250                compression_level: Some(3),
1251                bss: Some(BssMode::Off), // Disable BSS to test RLE
1252                minichunk_size: None,
1253            },
1254        );
1255
1256        let strategy = DefaultCompressionStrategy::with_params(params);
1257        let field = create_test_field("some_column", DataType::Int32);
1258        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
1259        let data = create_fixed_width_block_with_stats(32, 1000, 50);
1260
1261        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1262        // Should use RLE due to very low threshold
1263        assert!(format!("{:?}", compressor).contains("RleEncoder"));
1264    }
1265
1266    #[test]
1267    #[cfg(feature = "bitpacking")]
1268    fn test_low_cardinality_prefers_bitpacking_over_rle() {
1269        let strategy = DefaultCompressionStrategy::new();
1270        let field = create_test_field("int_score", DataType::Int64);
1271
1272        // Low cardinality values (3/4/5) but with moderate run count:
1273        // RLE compresses vs raw, yet bitpacking should be smaller.
1274        let mut values: Vec<u64> = Vec::with_capacity(256);
1275        for run_idx in 0..64 {
1276            let value = match run_idx % 3 {
1277                0 => 3u64,
1278                1 => 4u64,
1279                _ => 5u64,
1280            };
1281            values.extend(std::iter::repeat_n(value, 4));
1282        }
1283
1284        let mut block = FixedWidthDataBlock {
1285            bits_per_value: 64,
1286            data: LanceBuffer::reinterpret_vec(values),
1287            num_values: 256,
1288            block_info: BlockInfo::default(),
1289        };
1290
1291        use crate::statistics::ComputeStat;
1292        block.compute_stat();
1293
1294        let data = DataBlock::FixedWidth(block);
1295        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1296        let debug_str = format!("{:?}", compressor);
1297        assert!(
1298            debug_str.contains("InlineBitpacking"),
1299            "expected InlineBitpacking, got: {debug_str}"
1300        );
1301        assert!(
1302            !debug_str.contains("RleEncoder"),
1303            "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1304        );
1305    }
1306
1307    fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1308        let chain = extract_array_encoding_chain(encoding);
1309        if variable {
1310            assert_eq!(chain.len(), 2);
1311            assert_eq!(chain.first().unwrap().as_str(), "variable");
1312            assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1313        } else {
1314            assert_eq!(chain.len(), 1);
1315            assert_eq!(chain.first().unwrap().as_str(), "flat");
1316        }
1317    }
1318
1319    #[test]
1320    fn test_none_compression() {
1321        let mut params = CompressionParams::new();
1322
1323        // Disable compression for embeddings
1324        params.columns.insert(
1325            "embeddings".to_string(),
1326            CompressionFieldParams {
1327                compression: Some("none".to_string()),
1328                ..Default::default()
1329            },
1330        );
1331
1332        let strategy = DefaultCompressionStrategy::with_params(params);
1333        let field = create_test_field("embeddings", DataType::Float32);
1334        let fixed_data = create_fixed_width_block(32, 1000);
1335        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1336
1337        // Test miniblock
1338        let compressor = strategy
1339            .create_miniblock_compressor(&field, &fixed_data)
1340            .unwrap();
1341        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1342        check_uncompressed_encoding(&encoding, false);
1343        let compressor = strategy
1344            .create_miniblock_compressor(&field, &variable_data)
1345            .unwrap();
1346        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1347        check_uncompressed_encoding(&encoding, true);
1348
1349        // Test pervalue
1350        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1351        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1352        check_uncompressed_encoding(&encoding, false);
1353        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1354        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1355        check_uncompressed_encoding(&encoding, true);
1356    }
1357
1358    #[test]
1359    fn test_field_metadata_none_compression() {
1360        // Prepare field with metadata for none compression
1361        let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1362        let mut metadata = HashMap::new();
1363        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1364        arrow_field = arrow_field.with_metadata(metadata);
1365        let field = Field::try_from(&arrow_field).unwrap();
1366
1367        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1368
1369        // Test miniblock
1370        let fixed_data = create_fixed_width_block(32, 1000);
1371        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1372
1373        let compressor = strategy
1374            .create_miniblock_compressor(&field, &fixed_data)
1375            .unwrap();
1376        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1377        check_uncompressed_encoding(&encoding, false);
1378
1379        let compressor = strategy
1380            .create_miniblock_compressor(&field, &variable_data)
1381            .unwrap();
1382        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1383        check_uncompressed_encoding(&encoding, true);
1384
1385        // Test pervalue
1386        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1387        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1388        check_uncompressed_encoding(&encoding, false);
1389
1390        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1391        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1392        check_uncompressed_encoding(&encoding, true);
1393    }
1394
1395    #[test]
1396    fn test_auto_fsst_disabled_for_binary_fields() {
1397        let strategy = DefaultCompressionStrategy::new();
1398        let field = create_test_field("bytes", DataType::Binary);
1399        let variable_data = create_fsst_candidate_variable_width_block();
1400
1401        let miniblock = strategy
1402            .create_miniblock_compressor(&field, &variable_data)
1403            .unwrap();
1404        let miniblock_debug = format!("{:?}", miniblock);
1405        assert!(
1406            miniblock_debug.contains("BinaryMiniBlockEncoder"),
1407            "expected BinaryMiniBlockEncoder, got: {miniblock_debug}"
1408        );
1409        assert!(
1410            !miniblock_debug.contains("FsstMiniBlockEncoder"),
1411            "did not expect FsstMiniBlockEncoder, got: {miniblock_debug}"
1412        );
1413
1414        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1415        let per_value_debug = format!("{:?}", per_value);
1416        assert!(
1417            per_value_debug.contains("VariableEncoder"),
1418            "expected VariableEncoder, got: {per_value_debug}"
1419        );
1420        assert!(
1421            !per_value_debug.contains("FsstPerValueEncoder"),
1422            "did not expect FsstPerValueEncoder, got: {per_value_debug}"
1423        );
1424    }
1425
1426    #[test]
1427    fn test_auto_fsst_still_enabled_for_utf8_fields() {
1428        let strategy = DefaultCompressionStrategy::new();
1429        let field = create_test_field("text", DataType::Utf8);
1430        let variable_data = create_fsst_candidate_variable_width_block();
1431
1432        let miniblock = strategy
1433            .create_miniblock_compressor(&field, &variable_data)
1434            .unwrap();
1435        let miniblock_debug = format!("{:?}", miniblock);
1436        assert!(
1437            miniblock_debug.contains("FsstMiniBlockEncoder"),
1438            "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1439        );
1440
1441        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1442        let per_value_debug = format!("{:?}", per_value);
1443        assert!(
1444            per_value_debug.contains("FsstPerValueEncoder"),
1445            "expected FsstPerValueEncoder, got: {per_value_debug}"
1446        );
1447    }
1448
1449    #[test]
1450    fn test_explicit_fsst_still_supported_for_binary_fields() {
1451        let mut params = CompressionParams::new();
1452        params.columns.insert(
1453            "bytes".to_string(),
1454            CompressionFieldParams {
1455                compression: Some("fsst".to_string()),
1456                ..Default::default()
1457            },
1458        );
1459
1460        let strategy = DefaultCompressionStrategy::with_params(params);
1461        let field = create_test_field("bytes", DataType::Binary);
1462        let variable_data = create_fsst_candidate_variable_width_block();
1463
1464        let miniblock = strategy
1465            .create_miniblock_compressor(&field, &variable_data)
1466            .unwrap();
1467        let miniblock_debug = format!("{:?}", miniblock);
1468        assert!(
1469            miniblock_debug.contains("FsstMiniBlockEncoder"),
1470            "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1471        );
1472
1473        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1474        let per_value_debug = format!("{:?}", per_value);
1475        assert!(
1476            per_value_debug.contains("FsstPerValueEncoder"),
1477            "expected FsstPerValueEncoder, got: {per_value_debug}"
1478        );
1479    }
1480
1481    #[test]
1482    fn test_parameter_merge_priority() {
1483        let mut params = CompressionParams::new();
1484
1485        // Set type-level
1486        params.types.insert(
1487            "Int32".to_string(),
1488            CompressionFieldParams {
1489                rle_threshold: Some(0.5),
1490                compression: Some("lz4".to_string()),
1491                ..Default::default()
1492            },
1493        );
1494
1495        // Set column-level (highest priority)
1496        params.columns.insert(
1497            "user_id".to_string(),
1498            CompressionFieldParams {
1499                rle_threshold: Some(0.2),
1500                compression: Some("zstd".to_string()),
1501                compression_level: Some(6),
1502                bss: None,
1503                minichunk_size: None,
1504            },
1505        );
1506
1507        let strategy = DefaultCompressionStrategy::with_params(params);
1508
1509        // Get merged params
1510        let merged = strategy
1511            .params
1512            .get_field_params("user_id", &DataType::Int32);
1513
1514        // Column params should override type params
1515        assert_eq!(merged.rle_threshold, Some(0.2));
1516        assert_eq!(merged.compression, Some("zstd".to_string()));
1517        assert_eq!(merged.compression_level, Some(6));
1518
1519        // Test field with only type params
1520        let merged = strategy
1521            .params
1522            .get_field_params("other_field", &DataType::Int32);
1523        assert_eq!(merged.rle_threshold, Some(0.5));
1524        assert_eq!(merged.compression, Some("lz4".to_string()));
1525        assert_eq!(merged.compression_level, None);
1526    }
1527
1528    #[test]
1529    fn test_pattern_matching() {
1530        let mut params = CompressionParams::new();
1531
1532        // Configure pattern for log files
1533        params.columns.insert(
1534            "log_*".to_string(),
1535            CompressionFieldParams {
1536                compression: Some("zstd".to_string()),
1537                compression_level: Some(6),
1538                ..Default::default()
1539            },
1540        );
1541
1542        let strategy = DefaultCompressionStrategy::with_params(params);
1543
1544        // Should match pattern
1545        let merged = strategy
1546            .params
1547            .get_field_params("log_messages", &DataType::Utf8);
1548        assert_eq!(merged.compression, Some("zstd".to_string()));
1549        assert_eq!(merged.compression_level, Some(6));
1550
1551        // Should not match
1552        let merged = strategy
1553            .params
1554            .get_field_params("messages_log", &DataType::Utf8);
1555        assert_eq!(merged.compression, None);
1556    }
1557
1558    #[test]
1559    fn test_legacy_metadata_support() {
1560        let params = CompressionParams::new();
1561        let strategy = DefaultCompressionStrategy::with_params(params);
1562
1563        // Test field with "none" compression metadata
1564        let mut metadata = HashMap::new();
1565        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1566        let mut field = create_test_field("some_column", DataType::Int32);
1567        field.metadata = metadata;
1568
1569        let data = create_fixed_width_block(32, 1000);
1570        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1571
1572        // Should respect metadata and use ValueEncoder
1573        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1574    }
1575
1576    #[test]
1577    fn test_default_behavior() {
1578        // Empty params should fall back to default behavior
1579        let params = CompressionParams::new();
1580        let strategy = DefaultCompressionStrategy::with_params(params);
1581
1582        let field = create_test_field("random_column", DataType::Int32);
1583        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
1584        let data = create_fixed_width_block_with_stats(32, 1000, 600);
1585
1586        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1587        // Should use default strategy's decision
1588        let debug_str = format!("{:?}", compressor);
1589        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1590    }
1591
1592    #[test]
1593    fn test_field_metadata_compression() {
1594        let params = CompressionParams::new();
1595        let strategy = DefaultCompressionStrategy::with_params(params);
1596
1597        // Test field with compression metadata
1598        let mut metadata = HashMap::new();
1599        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1600        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1601        let mut field = create_test_field("test_column", DataType::Int32);
1602        field.metadata = metadata;
1603
1604        let data = create_fixed_width_block(32, 1000);
1605        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1606
1607        // Should use zstd with level 6
1608        let debug_str = format!("{:?}", compressor);
1609        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1610    }
1611
1612    #[test]
1613    fn test_field_metadata_rle_threshold() {
1614        let params = CompressionParams::new();
1615        let strategy = DefaultCompressionStrategy::with_params(params);
1616
1617        // Test field with RLE threshold metadata
1618        let mut metadata = HashMap::new();
1619        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1620        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
1621        let mut field = create_test_field("test_column", DataType::Int32);
1622        field.metadata = metadata;
1623
1624        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
1625        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
1626        let data = create_fixed_width_block_with_stats(32, 1000, 100);
1627
1628        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1629
1630        // Should use RLE because run_count (100) < num_values * threshold (800)
1631        let debug_str = format!("{:?}", compressor);
1632        assert!(debug_str.contains("RleEncoder"));
1633    }
1634
1635    #[test]
1636    fn test_field_metadata_override_params() {
1637        // Set up params with one configuration
1638        let mut params = CompressionParams::new();
1639        params.columns.insert(
1640            "test_column".to_string(),
1641            CompressionFieldParams {
1642                rle_threshold: Some(0.3),
1643                compression: Some("lz4".to_string()),
1644                compression_level: None,
1645                bss: None,
1646                minichunk_size: None,
1647            },
1648        );
1649
1650        let strategy = DefaultCompressionStrategy::with_params(params);
1651
1652        // Field metadata should override params
1653        let mut metadata = HashMap::new();
1654        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1655        let mut field = create_test_field("test_column", DataType::Int32);
1656        field.metadata = metadata;
1657
1658        let data = create_fixed_width_block(32, 1000);
1659        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1660
1661        // Should use none compression (from metadata) instead of lz4 (from params)
1662        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1663    }
1664
1665    #[test]
1666    fn test_field_metadata_mixed_configuration() {
1667        // Configure type-level params
1668        let mut params = CompressionParams::new();
1669        params.types.insert(
1670            "Int32".to_string(),
1671            CompressionFieldParams {
1672                rle_threshold: Some(0.5),
1673                compression: Some("lz4".to_string()),
1674                ..Default::default()
1675            },
1676        );
1677
1678        let strategy = DefaultCompressionStrategy::with_params(params);
1679
1680        // Field metadata provides partial override
1681        let mut metadata = HashMap::new();
1682        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1683        let mut field = create_test_field("test_column", DataType::Int32);
1684        field.metadata = metadata;
1685
1686        let data = create_fixed_width_block(32, 1000);
1687        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1688
1689        // Should use lz4 (from type params) with level 3 (from metadata)
1690        let debug_str = format!("{:?}", compressor);
1691        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1692    }
1693
1694    #[test]
1695    fn test_bss_field_metadata() {
1696        let params = CompressionParams::new();
1697        let strategy = DefaultCompressionStrategy::with_params(params);
1698
1699        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1700        let mut metadata = HashMap::new();
1701        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1702        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1703        let arrow_field =
1704            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1705        let field = Field::try_from(&arrow_field).unwrap();
1706
1707        // Create float data
1708        let data = create_fixed_width_block(32, 100);
1709
1710        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1711        let debug_str = format!("{:?}", compressor);
1712        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1713    }
1714
1715    #[test]
1716    fn test_bss_with_compression() {
1717        let params = CompressionParams::new();
1718        let strategy = DefaultCompressionStrategy::with_params(params);
1719
1720        // Test BSS with LZ4 compression
1721        let mut metadata = HashMap::new();
1722        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1723        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1724        let arrow_field =
1725            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1726        let field = Field::try_from(&arrow_field).unwrap();
1727
1728        // Create double data
1729        let data = create_fixed_width_block(64, 100);
1730
1731        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1732        let debug_str = format!("{:?}", compressor);
1733        // Should have BSS wrapped in general compression
1734        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1735        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1736    }
1737
1738    #[test]
1739    #[cfg(any(feature = "lz4", feature = "zstd"))]
1740    fn test_general_block_decompression_fixed_width_v2_2() {
1741        // Request general compression via the write path (2.2 requirement) and ensure the read path mirrors it.
1742        let mut params = CompressionParams::new();
1743        params.columns.insert(
1744            "dict_values".to_string(),
1745            CompressionFieldParams {
1746                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1747                ..Default::default()
1748            },
1749        );
1750
1751        let mut strategy = DefaultCompressionStrategy::with_params(params);
1752        strategy.version = LanceFileVersion::V2_2;
1753
1754        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1755        let data = create_fixed_width_block(24, 1024);
1756        let DataBlock::FixedWidth(expected_block) = &data else {
1757            panic!("expected fixed width block");
1758        };
1759        let expected_bits = expected_block.bits_per_value;
1760        let expected_num_values = expected_block.num_values;
1761        let num_values = expected_num_values;
1762
1763        let (compressor, encoding) = strategy
1764            .create_block_compressor(&field, &data)
1765            .expect("general compression should be selected");
1766        match encoding.compression.as_ref() {
1767            Some(Compression::General(_)) => {}
1768            other => panic!("expected general compression, got {:?}", other),
1769        }
1770
1771        let compressed_buffer = compressor
1772            .compress(data.clone())
1773            .expect("write path general compression should succeed");
1774
1775        let decompressor = DefaultDecompressionStrategy::default()
1776            .create_block_decompressor(&encoding)
1777            .expect("general block decompressor should be created");
1778
1779        let decoded = decompressor
1780            .decompress(compressed_buffer, num_values)
1781            .expect("decompression should succeed");
1782
1783        match decoded {
1784            DataBlock::FixedWidth(block) => {
1785                assert_eq!(block.bits_per_value, expected_bits);
1786                assert_eq!(block.num_values, expected_num_values);
1787                assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1788            }
1789            _ => panic!("expected fixed width block"),
1790        }
1791    }
1792
1793    #[test]
1794    #[cfg(any(feature = "lz4", feature = "zstd"))]
1795    fn test_general_compression_not_selected_for_v2_1_even_if_requested() {
1796        let mut params = CompressionParams::new();
1797        params.columns.insert(
1798            "dict_values".to_string(),
1799            CompressionFieldParams {
1800                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1801                ..Default::default()
1802            },
1803        );
1804
1805        let strategy =
1806            DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_1);
1807        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1808        let data = create_fixed_width_block(24, 1024);
1809
1810        let (_compressor, encoding) = strategy
1811            .create_block_compressor(&field, &data)
1812            .expect("block compressor selection should succeed");
1813
1814        assert!(
1815            !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1816            "general compression should not be selected for V2.1"
1817        );
1818    }
1819
1820    #[test]
1821    fn test_none_compression_disables_auto_general_block_compression() {
1822        let mut params = CompressionParams::new();
1823        params.columns.insert(
1824            "dict_values".to_string(),
1825            CompressionFieldParams {
1826                compression: Some("none".to_string()),
1827                ..Default::default()
1828            },
1829        );
1830
1831        let strategy =
1832            DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_2);
1833        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1834        let data = create_fixed_width_block(24, 20_000);
1835
1836        assert!(
1837            data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION,
1838            "test requires block size above automatic general compression threshold"
1839        );
1840
1841        let (_compressor, encoding) = strategy
1842            .create_block_compressor(&field, &data)
1843            .expect("block compressor selection should succeed");
1844
1845        assert!(
1846            !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1847            "compression=none should disable automatic block general compression"
1848        );
1849    }
1850
1851    #[test]
1852    fn test_rle_block_used_for_version_v2_2() {
1853        let field = create_test_field("test_repdef", DataType::UInt16);
1854
1855        // Create highly repetitive data
1856        let num_values = 1000u64;
1857        let mut data = Vec::with_capacity(num_values as usize);
1858        for i in 0..10 {
1859            for _ in 0..100 {
1860                data.push(i as u16);
1861            }
1862        }
1863
1864        let mut block = FixedWidthDataBlock {
1865            bits_per_value: 16,
1866            data: LanceBuffer::reinterpret_vec(data),
1867            num_values,
1868            block_info: BlockInfo::default(),
1869        };
1870
1871        block.compute_stat();
1872
1873        let data_block = DataBlock::FixedWidth(block);
1874
1875        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1876            .with_version(LanceFileVersion::V2_2);
1877
1878        let (compressor, _) = strategy
1879            .create_block_compressor(&field, &data_block)
1880            .unwrap();
1881
1882        let debug_str = format!("{:?}", compressor);
1883        assert!(debug_str.contains("RleEncoder"));
1884    }
1885
1886    #[test]
1887    fn test_rle_block_not_used_for_version_v2_1() {
1888        let field = create_test_field("test_repdef", DataType::UInt16);
1889
1890        // Create highly repetitive data
1891        let num_values = 1000u64;
1892        let mut data = Vec::with_capacity(num_values as usize);
1893        for i in 0..10 {
1894            for _ in 0..100 {
1895                data.push(i as u16);
1896            }
1897        }
1898
1899        let mut block = FixedWidthDataBlock {
1900            bits_per_value: 16,
1901            data: LanceBuffer::reinterpret_vec(data),
1902            num_values,
1903            block_info: BlockInfo::default(),
1904        };
1905
1906        block.compute_stat();
1907
1908        let data_block = DataBlock::FixedWidth(block);
1909
1910        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1911            .with_version(LanceFileVersion::V2_1);
1912
1913        let (compressor, _) = strategy
1914            .create_block_compressor(&field, &data_block)
1915            .unwrap();
1916
1917        let debug_str = format!("{:?}", compressor);
1918        assert!(
1919            !debug_str.contains("RleEncoder"),
1920            "RLE should not be used for V2.1"
1921        );
1922    }
1923}