Skip to main content

lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{
36                CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37                GeneralBlockDecompressor,
38            },
39            byte_stream_split::{
40                ByteStreamSplitDecompressor, ByteStreamSplitEncoder, should_use_bss,
41            },
42            constant::ConstantDecompressor,
43            fsst::{
44                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45                FsstPerValueEncoder,
46            },
47            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48            packed::{
49                PackedStructFixedWidthMiniBlockDecompressor,
50                PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51                PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52                VariablePackedStructFieldKind,
53            },
54            rle::{RleDecompressor, RleEncoder},
55            value::{ValueDecompressor, ValueEncoder},
56        },
57    },
58    format::{
59        ProtobufUtils21,
60        pb21::{CompressiveEncoding, compressive_encoding::Compression},
61    },
62    statistics::{GetStat, Stat},
63    version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use arrow_schema::DataType;
68use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
69use lance_core::{Error, Result, datatypes::Field, error::LanceOptionExt};
70use std::{str::FromStr, sync::Arc};
71
72/// Default threshold for RLE compression selection when the user explicitly provides a threshold.
73///
74/// If no threshold is provided, we use a size model instead of a fixed run ratio.
75/// This preserves existing behavior for users relying on the default, while making
76/// the default selection more type-aware.
77const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79// Minimum block size (32kb) to trigger general block compression
80const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82/// Trait for compression algorithms that compress an entire block of data into one opaque
83/// and self-described chunk.
84///
85/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
86///
87/// This is the most general type of compression.  There are no constraints on the method
88/// of compression it is assumed that the entire block of data will be present at decompression.
89///
90/// This is the least appropriate strategy for random access because we must load the entire
91/// block to access any single value.  This should only be used for cases where random access is never
92/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
93/// mini-block chunks)
94pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95    /// Compress the data into a single buffer
96    ///
97    /// Also returns a description of the compression that can be used to decompress
98    /// when reading the data back
99    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102/// A trait to pick which compression to use for given data
103///
104/// There are several different kinds of compression.
105///
106/// - Block compression is the most generic, but most difficult to use efficiently
107/// - Per-value compression results in either a fixed width data block or a variable
108///   width data block.  In other words, there is some number of bits per value.
109///   In addition, each value should be independently decompressible.
110/// - Mini-block compression results in a small block of opaque data for chunks
111///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
112///   used for narrow data types (both fixed and variable length) where we can
113///   fit many values into an 16KiB block.
114pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115    /// Create a block compressor for the given data
116    fn create_block_compressor(
117        &self,
118        field: &Field,
119        data: &DataBlock,
120    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122    /// Create a per-value compressor for the given data
123    fn create_per_value(
124        &self,
125        field: &Field,
126        data: &DataBlock,
127    ) -> Result<Box<dyn PerValueCompressor>>;
128
129    /// Create a mini-block compressor for the given data
130    fn create_miniblock_compressor(
131        &self,
132        field: &Field,
133        data: &DataBlock,
134    ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139    /// User-configured compression parameters
140    params: CompressionParams,
141    /// The lance file version for compatibilities.
142    version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146    data: &FixedWidthDataBlock,
147    params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149    // BSS requires general compression to be effective
150    // If compression is not set or explicitly disabled, skip BSS
151    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152        return None;
153    }
154
155    let mode = params.bss.unwrap_or(BssMode::Auto);
156    // should_use_bss already checks for supported bit widths (32/64)
157    if should_use_bss(data, mode) {
158        return Some(Box::new(ByteStreamSplitEncoder::new(
159            data.bits_per_value as usize,
160        )));
161    }
162    None
163}
164
165fn try_rle_for_mini_block(
166    data: &FixedWidthDataBlock,
167    params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169    let bits = data.bits_per_value;
170    if !matches!(bits, 8 | 16 | 32 | 64) {
171        return None;
172    }
173
174    let type_size = bits / 8;
175    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176    let threshold = params
177        .rle_threshold
178        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180    // If the user explicitly provided a threshold then honor it as an additional guard.
181    // A lower threshold makes RLE harder to trigger and can be used to avoid CPU overhead.
182    let passes_threshold = match params.rle_threshold {
183        Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184        None => true,
185    };
186
187    if !passes_threshold {
188        return None;
189    }
190
191    // Estimate the encoded size.
192    //
193    // RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into
194    // multiple entries of up to 255 values. We don't know the run length distribution here,
195    // so we conservatively account for splitting with an upper bound.
196    let num_values = data.num_values;
197    let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
198
199    let raw_bytes = (num_values as u128) * (type_size as u128);
200    let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202    if rle_bytes < raw_bytes {
203        #[cfg(feature = "bitpacking")]
204        {
205            if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data)
206                && (bitpack_bytes as u128) < rle_bytes
207            {
208                return None;
209            }
210        }
211        return Some(Box::new(RleEncoder::new()));
212    }
213    None
214}
215
216fn try_rle_for_block(
217    data: &FixedWidthDataBlock,
218    version: LanceFileVersion,
219    params: &CompressionFieldParams,
220) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
221    if version < LanceFileVersion::V2_2 {
222        return None;
223    }
224
225    let bits = data.bits_per_value;
226    if !matches!(bits, 8 | 16 | 32 | 64) {
227        return None;
228    }
229
230    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
231    let threshold = params
232        .rle_threshold
233        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
234
235    if (run_count as f64) < (data.num_values as f64) * threshold {
236        let compressor = Box::new(RleEncoder::new());
237        let encoding = ProtobufUtils21::rle(
238            ProtobufUtils21::flat(bits, None),
239            ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
240        );
241        return Some((compressor, encoding));
242    }
243    None
244}
245
246fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
247    #[cfg(feature = "bitpacking")]
248    {
249        let bits = _data.bits_per_value;
250        if estimate_inline_bitpacking_bytes(_data).is_some() {
251            return Some(Box::new(InlineBitpacking::new(bits)));
252        }
253        None
254    }
255    #[cfg(not(feature = "bitpacking"))]
256    {
257        None
258    }
259}
260
261#[cfg(feature = "bitpacking")]
262fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
263    use arrow_array::cast::AsArray;
264
265    let bits = data.bits_per_value;
266    if !matches!(bits, 8 | 16 | 32 | 64) {
267        return None;
268    }
269    if data.num_values == 0 {
270        return None;
271    }
272
273    let bit_widths = data.expect_stat(Stat::BitWidth);
274    let widths = bit_widths.as_primitive::<UInt64Type>();
275
276    let words_per_chunk: u128 = 1;
277    let word_bytes: u128 = (bits / 8) as u128;
278    let mut total_words: u128 = 0;
279    for i in 0..widths.len() {
280        let bit_width = widths.value(i) as u128;
281        let packed_words = (1024u128 * bit_width) / (bits as u128);
282        total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
283    }
284
285    let estimated_bytes = total_words.saturating_mul(word_bytes);
286    let raw_bytes = data.data_size() as u128;
287
288    if estimated_bytes >= raw_bytes {
289        return None;
290    }
291
292    u64::try_from(estimated_bytes).ok()
293}
294
295fn try_bitpack_for_block(
296    data: &FixedWidthDataBlock,
297) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
298    let bits = data.bits_per_value;
299    if !matches!(bits, 8 | 16 | 32 | 64) {
300        return None;
301    }
302
303    let bit_widths = data.expect_stat(Stat::BitWidth);
304    let widths = bit_widths.as_primitive::<UInt64Type>();
305    let has_all_zeros = widths.values().contains(&0);
306    let max_bit_width = *widths.values().iter().max().unwrap();
307
308    let too_small =
309        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
310
311    if has_all_zeros || too_small {
312        return None;
313    }
314
315    if data.num_values <= 1024 {
316        let compressor = Box::new(InlineBitpacking::new(bits));
317        let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
318        Some((compressor, encoding))
319    } else {
320        let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
321        let encoding = ProtobufUtils21::out_of_line_bitpacking(
322            bits,
323            ProtobufUtils21::flat(max_bit_width, None),
324        );
325        Some((compressor, encoding))
326    }
327}
328
329fn maybe_wrap_general_for_mini_block(
330    inner: Box<dyn MiniBlockCompressor>,
331    params: &CompressionFieldParams,
332) -> Result<Box<dyn MiniBlockCompressor>> {
333    match params.compression.as_deref() {
334        None | Some("none") | Some("fsst") => Ok(inner),
335        Some(raw) => {
336            let scheme = CompressionScheme::from_str(raw)
337                .map_err(|_| Error::invalid_input(format!("Unknown compression scheme: {raw}")))?;
338            let cfg = CompressionConfig::new(scheme, params.compression_level);
339            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
340        }
341    }
342}
343
344fn try_general_compression(
345    version: LanceFileVersion,
346    field_params: &CompressionFieldParams,
347    data: &DataBlock,
348) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
349    // Explicitly disable general compression.
350    if field_params.compression.as_deref() == Some("none") {
351        return Ok(None);
352    }
353
354    // User-requested compression (unused today but perhaps still used
355    // in the future someday)
356    if let Some(compression_scheme) = &field_params.compression
357        && version >= LanceFileVersion::V2_2
358    {
359        let scheme: CompressionScheme = compression_scheme.parse()?;
360        let config = CompressionConfig::new(scheme, field_params.compression_level);
361        let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
362        return Ok(Some((compressor, config)));
363    }
364
365    // Automatic compression for large blocks
366    if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
367        && version >= LanceFileVersion::V2_2
368    {
369        let compressor = Box::new(CompressedBufferEncoder::default());
370        let config = compressor.compressor.config();
371        return Ok(Some((compressor, config)));
372    }
373
374    Ok(None)
375}
376
377impl DefaultCompressionStrategy {
378    /// Create a new compression strategy with default behavior
379    pub fn new() -> Self {
380        Self::default()
381    }
382
383    /// Create a new compression strategy with user-configured parameters
384    pub fn with_params(params: CompressionParams) -> Self {
385        Self {
386            params,
387            version: LanceFileVersion::default(),
388        }
389    }
390
391    /// Override the file version used to make compression decisions
392    pub fn with_version(mut self, version: LanceFileVersion) -> Self {
393        self.version = version;
394        self
395    }
396
397    /// Parse compression parameters from field metadata
398    fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
399        let mut params = CompressionFieldParams::default();
400
401        // Parse compression method
402        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
403            params.compression = Some(compression.clone());
404        }
405
406        // Parse compression level
407        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
408            params.compression_level = level.parse().ok();
409        }
410
411        // Parse RLE threshold
412        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
413            params.rle_threshold = threshold.parse().ok();
414        }
415
416        // Parse BSS mode
417        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
418            match BssMode::parse(bss_str) {
419                Some(mode) => params.bss = Some(mode),
420                None => {
421                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
422                }
423            }
424        }
425
426        // Parse minichunk size
427        if let Some(minichunk_size_str) = field
428            .metadata
429            .get(super::constants::MINICHUNK_SIZE_META_KEY)
430        {
431            if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
432                // for lance v2.1, only 32kb or smaller is supported
433                if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
434                    log::warn!(
435                        "minichunk_size '{}' too large for version '{}', using default",
436                        minichunk_size,
437                        version
438                    );
439                } else {
440                    params.minichunk_size = Some(minichunk_size);
441                }
442            } else {
443                log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
444            }
445        }
446
447        params
448    }
449
450    fn build_fixed_width_compressor(
451        &self,
452        params: &CompressionFieldParams,
453        data: &FixedWidthDataBlock,
454    ) -> Result<Box<dyn MiniBlockCompressor>> {
455        if params.compression.as_deref() == Some("none") {
456            return Ok(Box::new(ValueEncoder::default()));
457        }
458
459        let base = try_bss_for_mini_block(data, params)
460            .or_else(|| try_rle_for_mini_block(data, params))
461            .or_else(|| try_bitpack_for_mini_block(data))
462            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
463
464        maybe_wrap_general_for_mini_block(base, params)
465    }
466
467    /// Build compressor based on parameters for variable-width data
468    fn build_variable_width_compressor(
469        &self,
470        field: &Field,
471        data: &VariableWidthBlock,
472    ) -> Result<Box<dyn MiniBlockCompressor>> {
473        let params = self.get_merged_field_params(field);
474        let compression = params.compression.as_deref();
475        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
476            return Err(Error::invalid_input(format!(
477                "Variable width compression not supported for {} bit offsets",
478                data.bits_per_offset
479            )));
480        }
481
482        // Get statistics
483        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
484        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
485
486        // Explicitly disable all compression.
487        if compression == Some("none") {
488            return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
489        }
490
491        let use_fsst = compression == Some("fsst")
492            || (!matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
493                && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
494                && data_size >= FSST_LEAST_INPUT_SIZE as u64);
495
496        // Choose base encoder (FSST or Binary) once.
497        let mut base_encoder: Box<dyn MiniBlockCompressor> = if use_fsst {
498            Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
499        } else {
500            Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
501        };
502
503        // Wrap with general compression when configured (except FSST / none).
504        if let Some(compression_scheme) = compression.filter(|scheme| *scheme != "fsst") {
505            let scheme: CompressionScheme = compression_scheme.parse()?;
506            let config = CompressionConfig::new(scheme, params.compression_level);
507            base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
508        }
509
510        Ok(base_encoder)
511    }
512
513    /// Merge user-configured parameters with field metadata
514    /// Field metadata has highest priority
515    fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
516        let mut field_params = self
517            .params
518            .get_field_params(&field.name, &field.data_type());
519
520        // Override with field metadata if present (highest priority)
521        let metadata_params = Self::parse_field_metadata(field, &self.version);
522        field_params.merge(&metadata_params);
523
524        field_params
525    }
526}
527
528impl CompressionStrategy for DefaultCompressionStrategy {
529    fn create_miniblock_compressor(
530        &self,
531        field: &Field,
532        data: &DataBlock,
533    ) -> Result<Box<dyn MiniBlockCompressor>> {
534        match data {
535            DataBlock::FixedWidth(fixed_width_data) => {
536                let field_params = self.get_merged_field_params(field);
537                self.build_fixed_width_compressor(&field_params, fixed_width_data)
538            }
539            DataBlock::VariableWidth(variable_width_data) => {
540                self.build_variable_width_compressor(field, variable_width_data)
541            }
542            DataBlock::Struct(struct_data_block) => {
543                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
544                // just being cautious here.
545                if struct_data_block.has_variable_width_child() {
546                    return Err(Error::invalid_input(
547                        "Packed struct mini-block encoding supports only fixed-width children",
548                    ));
549                }
550                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
551            }
552            DataBlock::FixedSizeList(_) => {
553                // Ideally we would compress the list items but this creates something of a challenge.
554                // We don't want to break lists across chunks and we need to worry about inner validity
555                // layers.  If we try and use a compression scheme then it is unlikely to respect these
556                // constraints.
557                //
558                // For now, we just don't compress.  In the future, we might want to consider a more
559                // sophisticated approach.
560                Ok(Box::new(ValueEncoder::default()))
561            }
562            _ => Err(Error::not_supported_source(
563                format!(
564                    "Mini-block compression not yet supported for block type {}",
565                    data.name()
566                )
567                .into(),
568            )),
569        }
570    }
571
572    fn create_per_value(
573        &self,
574        field: &Field,
575        data: &DataBlock,
576    ) -> Result<Box<dyn PerValueCompressor>> {
577        let field_params = self.get_merged_field_params(field);
578
579        match data {
580            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
581            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
582            DataBlock::Struct(struct_block) => {
583                if field.children.len() != struct_block.children.len() {
584                    return Err(Error::invalid_input(
585                        "Struct field metadata does not match data block children",
586                    ));
587                }
588                let has_variable_child = struct_block.has_variable_width_child();
589                if has_variable_child {
590                    if self.version < LanceFileVersion::V2_2 {
591                        return Err(Error::not_supported_source("Variable packed struct encoding requires Lance file version 2.2 or later".into()));
592                    }
593                    Ok(Box::new(PackedStructVariablePerValueEncoder::new(
594                        self.clone(),
595                        field.children.clone(),
596                    )))
597                } else {
598                    Err(Error::invalid_input(
599                        "Packed struct per-value compression should not be used for fixed-width-only structs",
600                    ))
601                }
602            }
603            DataBlock::VariableWidth(variable_width) => {
604                let compression = field_params.compression.as_deref();
605                // Check for explicit "none" compression
606                if compression == Some("none") {
607                    return Ok(Box::new(VariableEncoder::default()));
608                }
609
610                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
611                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
612
613                // If values are very large then use block compression on a per-value basis
614                //
615                // TODO: Could maybe use median here
616
617                let per_value_requested =
618                    compression.is_some_and(|compression| compression != "fsst");
619
620                if (max_len > 32 * 1024 || per_value_requested)
621                    && data_size >= FSST_LEAST_INPUT_SIZE as u64
622                {
623                    return Ok(Box::new(CompressedBufferEncoder::default()));
624                }
625
626                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
627                    let variable_compression = Box::new(VariableEncoder::default());
628                    let use_fsst = compression == Some("fsst")
629                        || (!matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
630                            && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
631                            && data_size >= FSST_LEAST_INPUT_SIZE as u64);
632
633                    // Use FSST if explicitly requested or if data characteristics warrant it.
634                    if use_fsst {
635                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
636                    } else {
637                        Ok(variable_compression)
638                    }
639                } else {
640                    panic!(
641                        "Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.",
642                        variable_width.bits_per_offset
643                    );
644                }
645            }
646            _ => unreachable!(
647                "Per-value compression not yet supported for block type: {}",
648                data.name()
649            ),
650        }
651    }
652
653    fn create_block_compressor(
654        &self,
655        field: &Field,
656        data: &DataBlock,
657    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
658        let field_params = self.get_merged_field_params(field);
659
660        match data {
661            DataBlock::FixedWidth(fixed_width) => {
662                if let Some((compressor, encoding)) =
663                    try_rle_for_block(fixed_width, self.version, &field_params)
664                {
665                    return Ok((compressor, encoding));
666                }
667                if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
668                    return Ok((compressor, encoding));
669                }
670
671                // Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION)
672                if let Some((compressor, config)) =
673                    try_general_compression(self.version, &field_params, data)?
674                {
675                    let encoding = ProtobufUtils21::wrapped(
676                        config,
677                        ProtobufUtils21::flat(fixed_width.bits_per_value, None),
678                    )?;
679                    return Ok((compressor, encoding));
680                }
681
682                let encoder = Box::new(ValueEncoder::default());
683                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
684                Ok((encoder, encoding))
685            }
686            DataBlock::VariableWidth(variable_width) => {
687                // Try general compression
688                if let Some((compressor, config)) =
689                    try_general_compression(self.version, &field_params, data)?
690                {
691                    let encoding = ProtobufUtils21::wrapped(
692                        config,
693                        ProtobufUtils21::variable(
694                            ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
695                            None,
696                        ),
697                    )?;
698                    return Ok((compressor, encoding));
699                }
700
701                let encoder = Box::new(VariableEncoder::default());
702                let encoding = ProtobufUtils21::variable(
703                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
704                    None,
705                );
706                Ok((encoder, encoding))
707            }
708            _ => unreachable!(),
709        }
710    }
711}
712
713pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
714    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
715}
716
717pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
718    /// Decompress one or more values
719    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
720    /// The number of bits in each value
721    ///
722    /// Currently (and probably long term) this must be a multiple of 8
723    fn bits_per_value(&self) -> u64;
724}
725
726pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
727    /// Decompress one or more values
728    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
729}
730
731pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
732    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
733}
734
735pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
736    fn create_miniblock_decompressor(
737        &self,
738        description: &CompressiveEncoding,
739        decompression_strategy: &dyn DecompressionStrategy,
740    ) -> Result<Box<dyn MiniBlockDecompressor>>;
741
742    fn create_fixed_per_value_decompressor(
743        &self,
744        description: &CompressiveEncoding,
745    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
746
747    fn create_variable_per_value_decompressor(
748        &self,
749        description: &CompressiveEncoding,
750    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
751
752    fn create_block_decompressor(
753        &self,
754        description: &CompressiveEncoding,
755    ) -> Result<Box<dyn BlockDecompressor>>;
756}
757
758#[derive(Debug, Default)]
759pub struct DefaultDecompressionStrategy {}
760
761impl DecompressionStrategy for DefaultDecompressionStrategy {
762    fn create_miniblock_decompressor(
763        &self,
764        description: &CompressiveEncoding,
765        decompression_strategy: &dyn DecompressionStrategy,
766    ) -> Result<Box<dyn MiniBlockDecompressor>> {
767        match description.compression.as_ref().unwrap() {
768            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
769            #[cfg(feature = "bitpacking")]
770            Compression::InlineBitpacking(description) => {
771                Ok(Box::new(InlineBitpacking::from_description(description)))
772            }
773            #[cfg(not(feature = "bitpacking"))]
774            Compression::InlineBitpacking(_) => Err(Error::not_supported_source(
775                "this runtime was not built with bitpacking support".into(),
776            )),
777            Compression::Variable(variable) => {
778                let Compression::Flat(offsets) = variable
779                    .offsets
780                    .as_ref()
781                    .unwrap()
782                    .compression
783                    .as_ref()
784                    .unwrap()
785                else {
786                    panic!("Variable compression only supports flat offsets")
787                };
788                Ok(Box::new(BinaryMiniBlockDecompressor::new(
789                    offsets.bits_per_value as u8,
790                )))
791            }
792            Compression::Fsst(description) => {
793                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
794                    description.values.as_ref().unwrap(),
795                    decompression_strategy,
796                )?;
797                Ok(Box::new(FsstMiniBlockDecompressor::new(
798                    description,
799                    inner_decompressor,
800                )))
801            }
802            Compression::PackedStruct(description) => Ok(Box::new(
803                PackedStructFixedWidthMiniBlockDecompressor::new(description),
804            )),
805            Compression::VariablePackedStruct(_) => Err(Error::not_supported_source(
806                "variable packed struct decoding is not yet implemented".into(),
807            )),
808            Compression::FixedSizeList(fsl) => {
809                // In the future, we might need to do something more complex here if FSL supports
810                // compression.
811                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
812            }
813            Compression::Rle(rle) => {
814                let bits_per_value = validate_rle_compression(rle)?;
815                Ok(Box::new(RleDecompressor::new(bits_per_value)))
816            }
817            Compression::ByteStreamSplit(bss) => {
818                let Compression::Flat(values) =
819                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
820                else {
821                    panic!("ByteStreamSplit compression only supports flat values")
822                };
823                Ok(Box::new(ByteStreamSplitDecompressor::new(
824                    values.bits_per_value as usize,
825                )))
826            }
827            Compression::General(general) => {
828                // Create inner decompressor
829                let inner_decompressor = self.create_miniblock_decompressor(
830                    general.values.as_ref().ok_or_else(|| {
831                        Error::invalid_input("GeneralMiniBlock missing inner encoding")
832                    })?,
833                    decompression_strategy,
834                )?;
835
836                // Parse compression config
837                let compression = general.compression.as_ref().ok_or_else(|| {
838                    Error::invalid_input("GeneralMiniBlock missing compression config")
839                })?;
840
841                let scheme = compression.scheme().try_into()?;
842
843                let compression_config = CompressionConfig::new(scheme, compression.level);
844
845                Ok(Box::new(GeneralMiniBlockDecompressor::new(
846                    inner_decompressor,
847                    compression_config,
848                )))
849            }
850            _ => todo!(),
851        }
852    }
853
854    fn create_fixed_per_value_decompressor(
855        &self,
856        description: &CompressiveEncoding,
857    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
858        match description.compression.as_ref().unwrap() {
859            Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
860                constant
861                    .value
862                    .as_ref()
863                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
864            ))),
865            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
866            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
867            _ => todo!("fixed-per-value decompressor for {:?}", description),
868        }
869    }
870
871    fn create_variable_per_value_decompressor(
872        &self,
873        description: &CompressiveEncoding,
874    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
875        match description.compression.as_ref().unwrap() {
876            Compression::Variable(variable) => {
877                let Compression::Flat(offsets) = variable
878                    .offsets
879                    .as_ref()
880                    .unwrap()
881                    .compression
882                    .as_ref()
883                    .unwrap()
884                else {
885                    panic!("Variable compression only supports flat offsets")
886                };
887                assert!(offsets.bits_per_value < u8::MAX as u64);
888                Ok(Box::new(VariableDecoder::default()))
889            }
890            Compression::Fsst(fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
891                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
892                Box::new(VariableDecoder::default()),
893            ))),
894            Compression::General(general) => Ok(Box::new(CompressedBufferEncoder::from_scheme(
895                general.compression.as_ref().expect_ok()?.scheme(),
896            )?)),
897            Compression::VariablePackedStruct(description) => {
898                let mut fields = Vec::with_capacity(description.fields.len());
899                for field in &description.fields {
900                    let value_encoding = field.value.as_ref().ok_or_else(|| {
901                        Error::invalid_input("VariablePackedStruct field is missing value encoding")
902                    })?;
903                    let decoder = match field.layout.as_ref().ok_or_else(|| {
904                        Error::invalid_input("VariablePackedStruct field is missing layout details")
905                    })? {
906                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
907                            bits_per_value,
908                        ) => {
909                            let decompressor =
910                                self.create_fixed_per_value_decompressor(value_encoding)?;
911                            VariablePackedStructFieldDecoder {
912                                kind: VariablePackedStructFieldKind::Fixed {
913                                    bits_per_value: *bits_per_value,
914                                    decompressor: Arc::from(decompressor),
915                                },
916                            }
917                        }
918                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
919                            bits_per_length,
920                        ) => {
921                            let decompressor =
922                                self.create_variable_per_value_decompressor(value_encoding)?;
923                            VariablePackedStructFieldDecoder {
924                                kind: VariablePackedStructFieldKind::Variable {
925                                    bits_per_length: *bits_per_length,
926                                    decompressor: Arc::from(decompressor),
927                                },
928                            }
929                        }
930                    };
931                    fields.push(decoder);
932                }
933                Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
934                    fields,
935                )))
936            }
937            _ => todo!("variable-per-value decompressor for {:?}", description),
938        }
939    }
940
941    fn create_block_decompressor(
942        &self,
943        description: &CompressiveEncoding,
944    ) -> Result<Box<dyn BlockDecompressor>> {
945        match description.compression.as_ref().unwrap() {
946            Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
947                InlineBitpacking::from_description(inline_bitpacking),
948            )),
949            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
950            Compression::Constant(constant) => {
951                let scalar = constant
952                    .value
953                    .as_ref()
954                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
955                Ok(Box::new(ConstantDecompressor::new(scalar)))
956            }
957            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
958            Compression::FixedSizeList(fsl) => {
959                Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
960            }
961            Compression::OutOfLineBitpacking(out_of_line) => {
962                // Extract the compressed bit width from the values encoding
963                let compressed_bit_width = match out_of_line
964                    .values
965                    .as_ref()
966                    .unwrap()
967                    .compression
968                    .as_ref()
969                    .unwrap()
970                {
971                    Compression::Flat(flat) => flat.bits_per_value,
972                    _ => {
973                        return Err(Error::invalid_input_source(
974                            "OutOfLineBitpacking values must use Flat encoding".into(),
975                        ));
976                    }
977                };
978                Ok(Box::new(OutOfLineBitpacking::new(
979                    compressed_bit_width,
980                    out_of_line.uncompressed_bits_per_value,
981                )))
982            }
983            Compression::General(general) => {
984                let inner_desc = general
985                    .values
986                    .as_ref()
987                    .ok_or_else(|| {
988                        Error::invalid_input("General compression missing inner encoding")
989                    })?
990                    .as_ref();
991                let inner_decompressor = self.create_block_decompressor(inner_desc)?;
992
993                let compression = general.compression.as_ref().ok_or_else(|| {
994                    Error::invalid_input("General compression missing compression config")
995                })?;
996                let scheme = compression.scheme().try_into()?;
997                let config = CompressionConfig::new(scheme, compression.level);
998                let general_decompressor =
999                    GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1000
1001                Ok(Box::new(general_decompressor))
1002            }
1003            Compression::Rle(rle) => {
1004                let bits_per_value = validate_rle_compression(rle)?;
1005                Ok(Box::new(RleDecompressor::new(bits_per_value)))
1006            }
1007            _ => todo!(),
1008        }
1009    }
1010}
1011/// Validates RLE compression format and extracts bits_per_value
1012fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
1013    let values = rle
1014        .values
1015        .as_ref()
1016        .ok_or_else(|| Error::invalid_input("RLE compression missing values encoding"))?;
1017    let run_lengths = rle
1018        .run_lengths
1019        .as_ref()
1020        .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths encoding"))?;
1021
1022    let values = values
1023        .compression
1024        .as_ref()
1025        .ok_or_else(|| Error::invalid_input("RLE compression missing values compression"))?;
1026    let Compression::Flat(values) = values else {
1027        return Err(Error::invalid_input(
1028            "RLE compression only supports flat values",
1029        ));
1030    };
1031
1032    let run_lengths = run_lengths
1033        .compression
1034        .as_ref()
1035        .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths compression"))?;
1036    let Compression::Flat(run_lengths) = run_lengths else {
1037        return Err(Error::invalid_input(
1038            "RLE compression only supports flat run lengths",
1039        ));
1040    };
1041
1042    if run_lengths.bits_per_value != 8 {
1043        return Err(Error::invalid_input(format!(
1044            "RLE compression only supports 8-bit run lengths, got {}",
1045            run_lengths.bits_per_value
1046        )));
1047    }
1048
1049    Ok(values.bits_per_value)
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054    use super::*;
1055    use crate::buffer::LanceBuffer;
1056    use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1057    use crate::statistics::ComputeStat;
1058    use crate::testing::extract_array_encoding_chain;
1059    use arrow_schema::{DataType, Field as ArrowField};
1060    use std::collections::HashMap;
1061
1062    fn create_test_field(name: &str, data_type: DataType) -> Field {
1063        let arrow_field = ArrowField::new(name, data_type, true);
1064        let mut field = Field::try_from(&arrow_field).unwrap();
1065        field.id = -1;
1066        field
1067    }
1068
1069    fn create_fixed_width_block_with_stats(
1070        bits_per_value: u64,
1071        num_values: u64,
1072        run_count: u64,
1073    ) -> DataBlock {
1074        // Create varied data to avoid low entropy
1075        let bytes_per_value = (bits_per_value / 8) as usize;
1076        let total_bytes = bytes_per_value * num_values as usize;
1077        let mut data = vec![0u8; total_bytes];
1078
1079        // Create data with specified run count
1080        let values_per_run = (num_values / run_count).max(1);
1081        let mut run_value = 0u8;
1082
1083        for i in 0..num_values as usize {
1084            if i % values_per_run as usize == 0 {
1085                run_value = run_value.wrapping_add(17); // Use prime to get varied values
1086            }
1087            // Fill all bytes of the value to create high entropy
1088            for j in 0..bytes_per_value {
1089                let byte_offset = i * bytes_per_value + j;
1090                if byte_offset < data.len() {
1091                    data[byte_offset] = run_value.wrapping_add(j as u8);
1092                }
1093            }
1094        }
1095
1096        let mut block = FixedWidthDataBlock {
1097            bits_per_value,
1098            data: LanceBuffer::reinterpret_vec(data),
1099            num_values,
1100            block_info: BlockInfo::default(),
1101        };
1102
1103        // Compute all statistics including BytePositionEntropy
1104        use crate::statistics::ComputeStat;
1105        block.compute_stat();
1106
1107        DataBlock::FixedWidth(block)
1108    }
1109
1110    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1111        // Create data with some variety to avoid always triggering BSS
1112        let bytes_per_value = (bits_per_value / 8) as usize;
1113        let total_bytes = bytes_per_value * num_values as usize;
1114        let mut data = vec![0u8; total_bytes];
1115
1116        // Add some variation to the data to make it more realistic
1117        for i in 0..num_values as usize {
1118            let byte_offset = i * bytes_per_value;
1119            if byte_offset < data.len() {
1120                data[byte_offset] = (i % 256) as u8;
1121            }
1122        }
1123
1124        let mut block = FixedWidthDataBlock {
1125            bits_per_value,
1126            data: LanceBuffer::reinterpret_vec(data),
1127            num_values,
1128            block_info: BlockInfo::default(),
1129        };
1130
1131        // Compute all statistics including BytePositionEntropy
1132        use crate::statistics::ComputeStat;
1133        block.compute_stat();
1134
1135        DataBlock::FixedWidth(block)
1136    }
1137
1138    fn create_variable_width_block(
1139        bits_per_offset: u8,
1140        num_values: u64,
1141        avg_value_size: usize,
1142    ) -> DataBlock {
1143        use crate::statistics::ComputeStat;
1144
1145        // Create offsets buffer (num_values + 1 offsets)
1146        let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1147        let mut current_offset = 0i64;
1148        offsets.push(current_offset);
1149
1150        // Generate offsets with varying value sizes
1151        for i in 0..num_values {
1152            let value_size = if avg_value_size == 0 {
1153                1
1154            } else {
1155                ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1156                    .min(avg_value_size * 2)
1157            };
1158            current_offset += value_size as i64;
1159            offsets.push(current_offset);
1160        }
1161
1162        // Create data buffer with realistic content
1163        let total_data_size = current_offset as usize;
1164        let mut data = vec![0u8; total_data_size];
1165
1166        // Fill data with varied content
1167        for i in 0..num_values {
1168            let start_offset = offsets[i as usize] as usize;
1169            let end_offset = offsets[(i + 1) as usize] as usize;
1170
1171            let content = (i % 256) as u8;
1172            for j in 0..end_offset - start_offset {
1173                data[start_offset + j] = content.wrapping_add(j as u8);
1174            }
1175        }
1176
1177        // Convert offsets to appropriate lance buffer
1178        let offsets_buffer = match bits_per_offset {
1179            32 => {
1180                let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1181                LanceBuffer::reinterpret_vec(offsets_32)
1182            }
1183            64 => LanceBuffer::reinterpret_vec(offsets),
1184            _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1185        };
1186
1187        let mut block = VariableWidthBlock {
1188            data: LanceBuffer::from(data),
1189            offsets: offsets_buffer,
1190            bits_per_offset,
1191            num_values,
1192            block_info: BlockInfo::default(),
1193        };
1194
1195        block.compute_stat();
1196        DataBlock::VariableWidth(block)
1197    }
1198
1199    fn create_fsst_candidate_variable_width_block() -> DataBlock {
1200        create_variable_width_block(32, 4096, FSST_LEAST_INPUT_MAX_LENGTH as usize + 16)
1201    }
1202
1203    #[test]
1204    fn test_parameter_based_compression() {
1205        let mut params = CompressionParams::new();
1206
1207        // Configure RLE for ID columns with BSS explicitly disabled
1208        params.columns.insert(
1209            "*_id".to_string(),
1210            CompressionFieldParams {
1211                rle_threshold: Some(0.3),
1212                compression: Some("lz4".to_string()),
1213                compression_level: None,
1214                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
1215                minichunk_size: None,
1216            },
1217        );
1218
1219        let strategy = DefaultCompressionStrategy::with_params(params);
1220        let field = create_test_field("user_id", DataType::Int32);
1221
1222        // Create data with low run count for RLE
1223        // Use create_fixed_width_block_with_stats which properly sets run count
1224        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
1225
1226        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1227        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
1228        let debug_str = format!("{:?}", compressor);
1229
1230        // The compressor should be RLE wrapped in general compression
1231        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1232        assert!(debug_str.contains("RleEncoder"));
1233    }
1234
1235    #[test]
1236    fn test_type_level_parameters() {
1237        let mut params = CompressionParams::new();
1238
1239        // Configure all Int32 to use specific settings
1240        params.types.insert(
1241            "Int32".to_string(),
1242            CompressionFieldParams {
1243                rle_threshold: Some(0.1), // Very low threshold
1244                compression: Some("zstd".to_string()),
1245                compression_level: Some(3),
1246                bss: Some(BssMode::Off), // Disable BSS to test RLE
1247                minichunk_size: None,
1248            },
1249        );
1250
1251        let strategy = DefaultCompressionStrategy::with_params(params);
1252        let field = create_test_field("some_column", DataType::Int32);
1253        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
1254        let data = create_fixed_width_block_with_stats(32, 1000, 50);
1255
1256        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1257        // Should use RLE due to very low threshold
1258        assert!(format!("{:?}", compressor).contains("RleEncoder"));
1259    }
1260
1261    #[test]
1262    #[cfg(feature = "bitpacking")]
1263    fn test_low_cardinality_prefers_bitpacking_over_rle() {
1264        let strategy = DefaultCompressionStrategy::new();
1265        let field = create_test_field("int_score", DataType::Int64);
1266
1267        // Low cardinality values (3/4/5) but with moderate run count:
1268        // RLE compresses vs raw, yet bitpacking should be smaller.
1269        let mut values: Vec<u64> = Vec::with_capacity(256);
1270        for run_idx in 0..64 {
1271            let value = match run_idx % 3 {
1272                0 => 3u64,
1273                1 => 4u64,
1274                _ => 5u64,
1275            };
1276            values.extend(std::iter::repeat_n(value, 4));
1277        }
1278
1279        let mut block = FixedWidthDataBlock {
1280            bits_per_value: 64,
1281            data: LanceBuffer::reinterpret_vec(values),
1282            num_values: 256,
1283            block_info: BlockInfo::default(),
1284        };
1285
1286        use crate::statistics::ComputeStat;
1287        block.compute_stat();
1288
1289        let data = DataBlock::FixedWidth(block);
1290        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1291        let debug_str = format!("{:?}", compressor);
1292        assert!(
1293            debug_str.contains("InlineBitpacking"),
1294            "expected InlineBitpacking, got: {debug_str}"
1295        );
1296        assert!(
1297            !debug_str.contains("RleEncoder"),
1298            "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1299        );
1300    }
1301
1302    fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1303        let chain = extract_array_encoding_chain(encoding);
1304        if variable {
1305            assert_eq!(chain.len(), 2);
1306            assert_eq!(chain.first().unwrap().as_str(), "variable");
1307            assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1308        } else {
1309            assert_eq!(chain.len(), 1);
1310            assert_eq!(chain.first().unwrap().as_str(), "flat");
1311        }
1312    }
1313
1314    #[test]
1315    fn test_none_compression() {
1316        let mut params = CompressionParams::new();
1317
1318        // Disable compression for embeddings
1319        params.columns.insert(
1320            "embeddings".to_string(),
1321            CompressionFieldParams {
1322                compression: Some("none".to_string()),
1323                ..Default::default()
1324            },
1325        );
1326
1327        let strategy = DefaultCompressionStrategy::with_params(params);
1328        let field = create_test_field("embeddings", DataType::Float32);
1329        let fixed_data = create_fixed_width_block(32, 1000);
1330        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1331
1332        // Test miniblock
1333        let compressor = strategy
1334            .create_miniblock_compressor(&field, &fixed_data)
1335            .unwrap();
1336        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1337        check_uncompressed_encoding(&encoding, false);
1338        let compressor = strategy
1339            .create_miniblock_compressor(&field, &variable_data)
1340            .unwrap();
1341        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1342        check_uncompressed_encoding(&encoding, true);
1343
1344        // Test pervalue
1345        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1346        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1347        check_uncompressed_encoding(&encoding, false);
1348        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1349        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1350        check_uncompressed_encoding(&encoding, true);
1351    }
1352
1353    #[test]
1354    fn test_field_metadata_none_compression() {
1355        // Prepare field with metadata for none compression
1356        let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1357        let mut metadata = HashMap::new();
1358        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1359        arrow_field = arrow_field.with_metadata(metadata);
1360        let field = Field::try_from(&arrow_field).unwrap();
1361
1362        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1363
1364        // Test miniblock
1365        let fixed_data = create_fixed_width_block(32, 1000);
1366        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1367
1368        let compressor = strategy
1369            .create_miniblock_compressor(&field, &fixed_data)
1370            .unwrap();
1371        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1372        check_uncompressed_encoding(&encoding, false);
1373
1374        let compressor = strategy
1375            .create_miniblock_compressor(&field, &variable_data)
1376            .unwrap();
1377        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1378        check_uncompressed_encoding(&encoding, true);
1379
1380        // Test pervalue
1381        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1382        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1383        check_uncompressed_encoding(&encoding, false);
1384
1385        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1386        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1387        check_uncompressed_encoding(&encoding, true);
1388    }
1389
1390    #[test]
1391    fn test_auto_fsst_disabled_for_binary_fields() {
1392        let strategy = DefaultCompressionStrategy::new();
1393        let field = create_test_field("bytes", DataType::Binary);
1394        let variable_data = create_fsst_candidate_variable_width_block();
1395
1396        let miniblock = strategy
1397            .create_miniblock_compressor(&field, &variable_data)
1398            .unwrap();
1399        let miniblock_debug = format!("{:?}", miniblock);
1400        assert!(
1401            miniblock_debug.contains("BinaryMiniBlockEncoder"),
1402            "expected BinaryMiniBlockEncoder, got: {miniblock_debug}"
1403        );
1404        assert!(
1405            !miniblock_debug.contains("FsstMiniBlockEncoder"),
1406            "did not expect FsstMiniBlockEncoder, got: {miniblock_debug}"
1407        );
1408
1409        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1410        let per_value_debug = format!("{:?}", per_value);
1411        assert!(
1412            per_value_debug.contains("VariableEncoder"),
1413            "expected VariableEncoder, got: {per_value_debug}"
1414        );
1415        assert!(
1416            !per_value_debug.contains("FsstPerValueEncoder"),
1417            "did not expect FsstPerValueEncoder, got: {per_value_debug}"
1418        );
1419    }
1420
1421    #[test]
1422    fn test_auto_fsst_still_enabled_for_utf8_fields() {
1423        let strategy = DefaultCompressionStrategy::new();
1424        let field = create_test_field("text", DataType::Utf8);
1425        let variable_data = create_fsst_candidate_variable_width_block();
1426
1427        let miniblock = strategy
1428            .create_miniblock_compressor(&field, &variable_data)
1429            .unwrap();
1430        let miniblock_debug = format!("{:?}", miniblock);
1431        assert!(
1432            miniblock_debug.contains("FsstMiniBlockEncoder"),
1433            "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1434        );
1435
1436        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1437        let per_value_debug = format!("{:?}", per_value);
1438        assert!(
1439            per_value_debug.contains("FsstPerValueEncoder"),
1440            "expected FsstPerValueEncoder, got: {per_value_debug}"
1441        );
1442    }
1443
1444    #[test]
1445    fn test_explicit_fsst_still_supported_for_binary_fields() {
1446        let mut params = CompressionParams::new();
1447        params.columns.insert(
1448            "bytes".to_string(),
1449            CompressionFieldParams {
1450                compression: Some("fsst".to_string()),
1451                ..Default::default()
1452            },
1453        );
1454
1455        let strategy = DefaultCompressionStrategy::with_params(params);
1456        let field = create_test_field("bytes", DataType::Binary);
1457        let variable_data = create_fsst_candidate_variable_width_block();
1458
1459        let miniblock = strategy
1460            .create_miniblock_compressor(&field, &variable_data)
1461            .unwrap();
1462        let miniblock_debug = format!("{:?}", miniblock);
1463        assert!(
1464            miniblock_debug.contains("FsstMiniBlockEncoder"),
1465            "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1466        );
1467
1468        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1469        let per_value_debug = format!("{:?}", per_value);
1470        assert!(
1471            per_value_debug.contains("FsstPerValueEncoder"),
1472            "expected FsstPerValueEncoder, got: {per_value_debug}"
1473        );
1474    }
1475
1476    #[test]
1477    fn test_parameter_merge_priority() {
1478        let mut params = CompressionParams::new();
1479
1480        // Set type-level
1481        params.types.insert(
1482            "Int32".to_string(),
1483            CompressionFieldParams {
1484                rle_threshold: Some(0.5),
1485                compression: Some("lz4".to_string()),
1486                ..Default::default()
1487            },
1488        );
1489
1490        // Set column-level (highest priority)
1491        params.columns.insert(
1492            "user_id".to_string(),
1493            CompressionFieldParams {
1494                rle_threshold: Some(0.2),
1495                compression: Some("zstd".to_string()),
1496                compression_level: Some(6),
1497                bss: None,
1498                minichunk_size: None,
1499            },
1500        );
1501
1502        let strategy = DefaultCompressionStrategy::with_params(params);
1503
1504        // Get merged params
1505        let merged = strategy
1506            .params
1507            .get_field_params("user_id", &DataType::Int32);
1508
1509        // Column params should override type params
1510        assert_eq!(merged.rle_threshold, Some(0.2));
1511        assert_eq!(merged.compression, Some("zstd".to_string()));
1512        assert_eq!(merged.compression_level, Some(6));
1513
1514        // Test field with only type params
1515        let merged = strategy
1516            .params
1517            .get_field_params("other_field", &DataType::Int32);
1518        assert_eq!(merged.rle_threshold, Some(0.5));
1519        assert_eq!(merged.compression, Some("lz4".to_string()));
1520        assert_eq!(merged.compression_level, None);
1521    }
1522
1523    #[test]
1524    fn test_pattern_matching() {
1525        let mut params = CompressionParams::new();
1526
1527        // Configure pattern for log files
1528        params.columns.insert(
1529            "log_*".to_string(),
1530            CompressionFieldParams {
1531                compression: Some("zstd".to_string()),
1532                compression_level: Some(6),
1533                ..Default::default()
1534            },
1535        );
1536
1537        let strategy = DefaultCompressionStrategy::with_params(params);
1538
1539        // Should match pattern
1540        let merged = strategy
1541            .params
1542            .get_field_params("log_messages", &DataType::Utf8);
1543        assert_eq!(merged.compression, Some("zstd".to_string()));
1544        assert_eq!(merged.compression_level, Some(6));
1545
1546        // Should not match
1547        let merged = strategy
1548            .params
1549            .get_field_params("messages_log", &DataType::Utf8);
1550        assert_eq!(merged.compression, None);
1551    }
1552
1553    #[test]
1554    fn test_legacy_metadata_support() {
1555        let params = CompressionParams::new();
1556        let strategy = DefaultCompressionStrategy::with_params(params);
1557
1558        // Test field with "none" compression metadata
1559        let mut metadata = HashMap::new();
1560        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1561        let mut field = create_test_field("some_column", DataType::Int32);
1562        field.metadata = metadata;
1563
1564        let data = create_fixed_width_block(32, 1000);
1565        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1566
1567        // Should respect metadata and use ValueEncoder
1568        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1569    }
1570
1571    #[test]
1572    fn test_default_behavior() {
1573        // Empty params should fall back to default behavior
1574        let params = CompressionParams::new();
1575        let strategy = DefaultCompressionStrategy::with_params(params);
1576
1577        let field = create_test_field("random_column", DataType::Int32);
1578        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
1579        let data = create_fixed_width_block_with_stats(32, 1000, 600);
1580
1581        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1582        // Should use default strategy's decision
1583        let debug_str = format!("{:?}", compressor);
1584        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1585    }
1586
1587    #[test]
1588    fn test_field_metadata_compression() {
1589        let params = CompressionParams::new();
1590        let strategy = DefaultCompressionStrategy::with_params(params);
1591
1592        // Test field with compression metadata
1593        let mut metadata = HashMap::new();
1594        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1595        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1596        let mut field = create_test_field("test_column", DataType::Int32);
1597        field.metadata = metadata;
1598
1599        let data = create_fixed_width_block(32, 1000);
1600        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1601
1602        // Should use zstd with level 6
1603        let debug_str = format!("{:?}", compressor);
1604        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1605    }
1606
1607    #[test]
1608    fn test_field_metadata_rle_threshold() {
1609        let params = CompressionParams::new();
1610        let strategy = DefaultCompressionStrategy::with_params(params);
1611
1612        // Test field with RLE threshold metadata
1613        let mut metadata = HashMap::new();
1614        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1615        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
1616        let mut field = create_test_field("test_column", DataType::Int32);
1617        field.metadata = metadata;
1618
1619        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
1620        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
1621        let data = create_fixed_width_block_with_stats(32, 1000, 100);
1622
1623        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1624
1625        // Should use RLE because run_count (100) < num_values * threshold (800)
1626        let debug_str = format!("{:?}", compressor);
1627        assert!(debug_str.contains("RleEncoder"));
1628    }
1629
1630    #[test]
1631    fn test_field_metadata_override_params() {
1632        // Set up params with one configuration
1633        let mut params = CompressionParams::new();
1634        params.columns.insert(
1635            "test_column".to_string(),
1636            CompressionFieldParams {
1637                rle_threshold: Some(0.3),
1638                compression: Some("lz4".to_string()),
1639                compression_level: None,
1640                bss: None,
1641                minichunk_size: None,
1642            },
1643        );
1644
1645        let strategy = DefaultCompressionStrategy::with_params(params);
1646
1647        // Field metadata should override params
1648        let mut metadata = HashMap::new();
1649        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1650        let mut field = create_test_field("test_column", DataType::Int32);
1651        field.metadata = metadata;
1652
1653        let data = create_fixed_width_block(32, 1000);
1654        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1655
1656        // Should use none compression (from metadata) instead of lz4 (from params)
1657        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1658    }
1659
1660    #[test]
1661    fn test_field_metadata_mixed_configuration() {
1662        // Configure type-level params
1663        let mut params = CompressionParams::new();
1664        params.types.insert(
1665            "Int32".to_string(),
1666            CompressionFieldParams {
1667                rle_threshold: Some(0.5),
1668                compression: Some("lz4".to_string()),
1669                ..Default::default()
1670            },
1671        );
1672
1673        let strategy = DefaultCompressionStrategy::with_params(params);
1674
1675        // Field metadata provides partial override
1676        let mut metadata = HashMap::new();
1677        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1678        let mut field = create_test_field("test_column", DataType::Int32);
1679        field.metadata = metadata;
1680
1681        let data = create_fixed_width_block(32, 1000);
1682        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1683
1684        // Should use lz4 (from type params) with level 3 (from metadata)
1685        let debug_str = format!("{:?}", compressor);
1686        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1687    }
1688
1689    #[test]
1690    fn test_bss_field_metadata() {
1691        let params = CompressionParams::new();
1692        let strategy = DefaultCompressionStrategy::with_params(params);
1693
1694        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1695        let mut metadata = HashMap::new();
1696        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1697        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1698        let arrow_field =
1699            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1700        let field = Field::try_from(&arrow_field).unwrap();
1701
1702        // Create float data
1703        let data = create_fixed_width_block(32, 100);
1704
1705        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1706        let debug_str = format!("{:?}", compressor);
1707        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1708    }
1709
1710    #[test]
1711    fn test_bss_with_compression() {
1712        let params = CompressionParams::new();
1713        let strategy = DefaultCompressionStrategy::with_params(params);
1714
1715        // Test BSS with LZ4 compression
1716        let mut metadata = HashMap::new();
1717        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1718        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1719        let arrow_field =
1720            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1721        let field = Field::try_from(&arrow_field).unwrap();
1722
1723        // Create double data
1724        let data = create_fixed_width_block(64, 100);
1725
1726        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1727        let debug_str = format!("{:?}", compressor);
1728        // Should have BSS wrapped in general compression
1729        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1730        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1731    }
1732
1733    #[test]
1734    #[cfg(any(feature = "lz4", feature = "zstd"))]
1735    fn test_general_block_decompression_fixed_width_v2_2() {
1736        // Request general compression via the write path (2.2 requirement) and ensure the read path mirrors it.
1737        let mut params = CompressionParams::new();
1738        params.columns.insert(
1739            "dict_values".to_string(),
1740            CompressionFieldParams {
1741                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1742                ..Default::default()
1743            },
1744        );
1745
1746        let mut strategy = DefaultCompressionStrategy::with_params(params);
1747        strategy.version = LanceFileVersion::V2_2;
1748
1749        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1750        let data = create_fixed_width_block(24, 1024);
1751        let DataBlock::FixedWidth(expected_block) = &data else {
1752            panic!("expected fixed width block");
1753        };
1754        let expected_bits = expected_block.bits_per_value;
1755        let expected_num_values = expected_block.num_values;
1756        let num_values = expected_num_values;
1757
1758        let (compressor, encoding) = strategy
1759            .create_block_compressor(&field, &data)
1760            .expect("general compression should be selected");
1761        match encoding.compression.as_ref() {
1762            Some(Compression::General(_)) => {}
1763            other => panic!("expected general compression, got {:?}", other),
1764        }
1765
1766        let compressed_buffer = compressor
1767            .compress(data.clone())
1768            .expect("write path general compression should succeed");
1769
1770        let decompressor = DefaultDecompressionStrategy::default()
1771            .create_block_decompressor(&encoding)
1772            .expect("general block decompressor should be created");
1773
1774        let decoded = decompressor
1775            .decompress(compressed_buffer, num_values)
1776            .expect("decompression should succeed");
1777
1778        match decoded {
1779            DataBlock::FixedWidth(block) => {
1780                assert_eq!(block.bits_per_value, expected_bits);
1781                assert_eq!(block.num_values, expected_num_values);
1782                assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1783            }
1784            _ => panic!("expected fixed width block"),
1785        }
1786    }
1787
1788    #[test]
1789    #[cfg(any(feature = "lz4", feature = "zstd"))]
1790    fn test_general_compression_not_selected_for_v2_1_even_if_requested() {
1791        let mut params = CompressionParams::new();
1792        params.columns.insert(
1793            "dict_values".to_string(),
1794            CompressionFieldParams {
1795                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1796                ..Default::default()
1797            },
1798        );
1799
1800        let strategy =
1801            DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_1);
1802        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1803        let data = create_fixed_width_block(24, 1024);
1804
1805        let (_compressor, encoding) = strategy
1806            .create_block_compressor(&field, &data)
1807            .expect("block compressor selection should succeed");
1808
1809        assert!(
1810            !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1811            "general compression should not be selected for V2.1"
1812        );
1813    }
1814
1815    #[test]
1816    fn test_none_compression_disables_auto_general_block_compression() {
1817        let mut params = CompressionParams::new();
1818        params.columns.insert(
1819            "dict_values".to_string(),
1820            CompressionFieldParams {
1821                compression: Some("none".to_string()),
1822                ..Default::default()
1823            },
1824        );
1825
1826        let strategy =
1827            DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_2);
1828        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1829        let data = create_fixed_width_block(24, 20_000);
1830
1831        assert!(
1832            data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION,
1833            "test requires block size above automatic general compression threshold"
1834        );
1835
1836        let (_compressor, encoding) = strategy
1837            .create_block_compressor(&field, &data)
1838            .expect("block compressor selection should succeed");
1839
1840        assert!(
1841            !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1842            "compression=none should disable automatic block general compression"
1843        );
1844    }
1845
1846    #[test]
1847    fn test_rle_block_used_for_version_v2_2() {
1848        let field = create_test_field("test_repdef", DataType::UInt16);
1849
1850        // Create highly repetitive data
1851        let num_values = 1000u64;
1852        let mut data = Vec::with_capacity(num_values as usize);
1853        for i in 0..10 {
1854            for _ in 0..100 {
1855                data.push(i as u16);
1856            }
1857        }
1858
1859        let mut block = FixedWidthDataBlock {
1860            bits_per_value: 16,
1861            data: LanceBuffer::reinterpret_vec(data),
1862            num_values,
1863            block_info: BlockInfo::default(),
1864        };
1865
1866        block.compute_stat();
1867
1868        let data_block = DataBlock::FixedWidth(block);
1869
1870        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1871            .with_version(LanceFileVersion::V2_2);
1872
1873        let (compressor, _) = strategy
1874            .create_block_compressor(&field, &data_block)
1875            .unwrap();
1876
1877        let debug_str = format!("{:?}", compressor);
1878        assert!(debug_str.contains("RleEncoder"));
1879    }
1880
1881    #[test]
1882    fn test_rle_block_not_used_for_version_v2_1() {
1883        let field = create_test_field("test_repdef", DataType::UInt16);
1884
1885        // Create highly repetitive data
1886        let num_values = 1000u64;
1887        let mut data = Vec::with_capacity(num_values as usize);
1888        for i in 0..10 {
1889            for _ in 0..100 {
1890                data.push(i as u16);
1891            }
1892        }
1893
1894        let mut block = FixedWidthDataBlock {
1895            bits_per_value: 16,
1896            data: LanceBuffer::reinterpret_vec(data),
1897            num_values,
1898            block_info: BlockInfo::default(),
1899        };
1900
1901        block.compute_stat();
1902
1903        let data_block = DataBlock::FixedWidth(block);
1904
1905        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1906            .with_version(LanceFileVersion::V2_1);
1907
1908        let (compressor, _) = strategy
1909            .create_block_compressor(&field, &data_block)
1910            .unwrap();
1911
1912        let debug_str = format!("{:?}", compressor);
1913        assert!(
1914            !debug_str.contains("RleEncoder"),
1915            "RLE should not be used for V2.1"
1916        );
1917    }
1918}