Skip to main content

lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{
36                CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37                GeneralBlockDecompressor,
38            },
39            byte_stream_split::{
40                should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
41            },
42            constant::ConstantDecompressor,
43            fsst::{
44                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45                FsstPerValueEncoder,
46            },
47            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48            packed::{
49                PackedStructFixedWidthMiniBlockDecompressor,
50                PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51                PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52                VariablePackedStructFieldKind,
53            },
54            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
55            value::{ValueDecompressor, ValueEncoder},
56        },
57    },
58    format::{
59        pb21::{compressive_encoding::Compression, CompressiveEncoding},
60        ProtobufUtils21,
61    },
62    statistics::{GetStat, Stat},
63    version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
68use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
69use snafu::location;
70use std::{str::FromStr, sync::Arc};
71
72/// Default threshold for RLE compression selection when the user explicitly provides a threshold.
73///
74/// If no threshold is provided, we use a size model instead of a fixed run ratio.
75/// This preserves existing behavior for users relying on the default, while making
76/// the default selection more type-aware.
77const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79// Minimum block size (32kb) to trigger general block compression
80const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82/// Trait for compression algorithms that compress an entire block of data into one opaque
83/// and self-described chunk.
84///
85/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
86///
87/// This is the most general type of compression.  There are no constraints on the method
88/// of compression it is assumed that the entire block of data will be present at decompression.
89///
90/// This is the least appropriate strategy for random access because we must load the entire
91/// block to access any single value.  This should only be used for cases where random access is never
92/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
93/// mini-block chunks)
94pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95    /// Compress the data into a single buffer
96    ///
97    /// Also returns a description of the compression that can be used to decompress
98    /// when reading the data back
99    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102/// A trait to pick which compression to use for given data
103///
104/// There are several different kinds of compression.
105///
106/// - Block compression is the most generic, but most difficult to use efficiently
107/// - Per-value compression results in either a fixed width data block or a variable
108///   width data block.  In other words, there is some number of bits per value.
109///   In addition, each value should be independently decompressible.
110/// - Mini-block compression results in a small block of opaque data for chunks
111///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
112///   used for narrow data types (both fixed and variable length) where we can
113///   fit many values into an 16KiB block.
114pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115    /// Create a block compressor for the given data
116    fn create_block_compressor(
117        &self,
118        field: &Field,
119        data: &DataBlock,
120    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122    /// Create a per-value compressor for the given data
123    fn create_per_value(
124        &self,
125        field: &Field,
126        data: &DataBlock,
127    ) -> Result<Box<dyn PerValueCompressor>>;
128
129    /// Create a mini-block compressor for the given data
130    fn create_miniblock_compressor(
131        &self,
132        field: &Field,
133        data: &DataBlock,
134    ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139    /// User-configured compression parameters
140    params: CompressionParams,
141    /// The lance file version for compatibilities.
142    version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146    data: &FixedWidthDataBlock,
147    params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149    // BSS requires general compression to be effective
150    // If compression is not set or explicitly disabled, skip BSS
151    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152        return None;
153    }
154
155    let mode = params.bss.unwrap_or(BssMode::Auto);
156    // should_use_bss already checks for supported bit widths (32/64)
157    if should_use_bss(data, mode) {
158        return Some(Box::new(ByteStreamSplitEncoder::new(
159            data.bits_per_value as usize,
160        )));
161    }
162    None
163}
164
165fn try_rle_for_mini_block(
166    data: &FixedWidthDataBlock,
167    params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169    let bits = data.bits_per_value;
170    if !matches!(bits, 8 | 16 | 32 | 64) {
171        return None;
172    }
173
174    let type_size = bits / 8;
175    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176    let threshold = params
177        .rle_threshold
178        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180    // If the user explicitly provided a threshold then honor it as an additional guard.
181    // A lower threshold makes RLE harder to trigger and can be used to avoid CPU overhead.
182    let passes_threshold = match params.rle_threshold {
183        Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184        None => true,
185    };
186
187    if !passes_threshold {
188        return None;
189    }
190
191    // Estimate the encoded size.
192    //
193    // RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into
194    // multiple entries of up to 255 values. We don't know the run length distribution here,
195    // so we conservatively account for splitting with an upper bound.
196    let num_values = data.num_values;
197    let estimated_pairs = (run_count + (num_values / 255)).min(num_values);
198
199    let raw_bytes = (num_values as u128) * (type_size as u128);
200    let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202    if rle_bytes < raw_bytes {
203        #[cfg(feature = "bitpacking")]
204        {
205            if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data) {
206                if (bitpack_bytes as u128) < rle_bytes {
207                    return None;
208                }
209            }
210        }
211        return Some(Box::new(RleMiniBlockEncoder::new()));
212    }
213    None
214}
215
216fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
217    #[cfg(feature = "bitpacking")]
218    {
219        let bits = _data.bits_per_value;
220        if estimate_inline_bitpacking_bytes(_data).is_some() {
221            return Some(Box::new(InlineBitpacking::new(bits)));
222        }
223        None
224    }
225    #[cfg(not(feature = "bitpacking"))]
226    {
227        None
228    }
229}
230
231#[cfg(feature = "bitpacking")]
232fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
233    use arrow_array::cast::AsArray;
234
235    let bits = data.bits_per_value;
236    if !matches!(bits, 8 | 16 | 32 | 64) {
237        return None;
238    }
239    if data.num_values == 0 {
240        return None;
241    }
242
243    let bit_widths = data.expect_stat(Stat::BitWidth);
244    let widths = bit_widths.as_primitive::<UInt64Type>();
245
246    let words_per_chunk: u128 = 1;
247    let word_bytes: u128 = (bits / 8) as u128;
248    let mut total_words: u128 = 0;
249    for i in 0..widths.len() {
250        let bit_width = widths.value(i) as u128;
251        let packed_words = (1024u128 * bit_width) / (bits as u128);
252        total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
253    }
254
255    let estimated_bytes = total_words.saturating_mul(word_bytes);
256    let raw_bytes = data.data_size() as u128;
257
258    if estimated_bytes >= raw_bytes {
259        return None;
260    }
261
262    u64::try_from(estimated_bytes).ok()
263}
264
265fn try_bitpack_for_block(
266    data: &FixedWidthDataBlock,
267) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
268    let bits = data.bits_per_value;
269    if !matches!(bits, 8 | 16 | 32 | 64) {
270        return None;
271    }
272
273    let bit_widths = data.expect_stat(Stat::BitWidth);
274    let widths = bit_widths.as_primitive::<UInt64Type>();
275    let has_all_zeros = widths.values().contains(&0);
276    let max_bit_width = *widths.values().iter().max().unwrap();
277
278    let too_small =
279        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
280
281    if has_all_zeros || too_small {
282        return None;
283    }
284
285    if data.num_values <= 1024 {
286        let compressor = Box::new(InlineBitpacking::new(bits));
287        let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
288        Some((compressor, encoding))
289    } else {
290        let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
291        let encoding = ProtobufUtils21::out_of_line_bitpacking(
292            bits,
293            ProtobufUtils21::flat(max_bit_width, None),
294        );
295        Some((compressor, encoding))
296    }
297}
298
299fn maybe_wrap_general_for_mini_block(
300    inner: Box<dyn MiniBlockCompressor>,
301    params: &CompressionFieldParams,
302) -> Result<Box<dyn MiniBlockCompressor>> {
303    match params.compression.as_deref() {
304        None | Some("none") | Some("fsst") => Ok(inner),
305        Some(raw) => {
306            let scheme = CompressionScheme::from_str(raw).map_err(|_| {
307                lance_core::Error::invalid_input(
308                    format!("Unknown compression scheme: {raw}"),
309                    location!(),
310                )
311            })?;
312            let cfg = CompressionConfig::new(scheme, params.compression_level);
313            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
314        }
315    }
316}
317
318fn try_general_compression(
319    version: LanceFileVersion,
320    field_params: &CompressionFieldParams,
321    data: &DataBlock,
322) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
323    // User-requested compression (unused today but perhaps still used
324    // in the future someday)
325    if let Some(compression_scheme) = &field_params.compression {
326        if compression_scheme != "none" && version >= LanceFileVersion::V2_2 {
327            let scheme: CompressionScheme = compression_scheme.parse()?;
328            let config = CompressionConfig::new(scheme, field_params.compression_level);
329            let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
330            return Ok(Some((compressor, config)));
331        }
332    }
333
334    // Automatic compression for large blocks
335    if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
336        && version >= LanceFileVersion::V2_2
337    {
338        let compressor = Box::new(CompressedBufferEncoder::default());
339        let config = compressor.compressor.config();
340        return Ok(Some((compressor, config)));
341    }
342
343    Ok(None)
344}
345
346impl DefaultCompressionStrategy {
347    /// Create a new compression strategy with default behavior
348    pub fn new() -> Self {
349        Self::default()
350    }
351
352    /// Create a new compression strategy with user-configured parameters
353    pub fn with_params(params: CompressionParams) -> Self {
354        Self {
355            params,
356            version: LanceFileVersion::default(),
357        }
358    }
359
360    /// Override the file version used to make compression decisions
361    pub fn with_version(mut self, version: LanceFileVersion) -> Self {
362        self.version = version;
363        self
364    }
365
366    /// Parse compression parameters from field metadata
367    fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
368        let mut params = CompressionFieldParams::default();
369
370        // Parse compression method
371        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
372            params.compression = Some(compression.clone());
373        }
374
375        // Parse compression level
376        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
377            params.compression_level = level.parse().ok();
378        }
379
380        // Parse RLE threshold
381        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
382            params.rle_threshold = threshold.parse().ok();
383        }
384
385        // Parse BSS mode
386        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
387            match BssMode::parse(bss_str) {
388                Some(mode) => params.bss = Some(mode),
389                None => {
390                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
391                }
392            }
393        }
394
395        // Parse minichunk size
396        if let Some(minichunk_size_str) = field
397            .metadata
398            .get(super::constants::MINICHUNK_SIZE_META_KEY)
399        {
400            if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
401                // for lance v2.1, only 32kb or smaller is supported
402                if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
403                    log::warn!(
404                        "minichunk_size '{}' too large for version '{}', using default",
405                        minichunk_size,
406                        version
407                    );
408                } else {
409                    params.minichunk_size = Some(minichunk_size);
410                }
411            } else {
412                log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
413            }
414        }
415
416        params
417    }
418
419    fn build_fixed_width_compressor(
420        &self,
421        params: &CompressionFieldParams,
422        data: &FixedWidthDataBlock,
423    ) -> Result<Box<dyn MiniBlockCompressor>> {
424        if params.compression.as_deref() == Some("none") {
425            return Ok(Box::new(ValueEncoder::default()));
426        }
427
428        let base = try_bss_for_mini_block(data, params)
429            .or_else(|| try_rle_for_mini_block(data, params))
430            .or_else(|| try_bitpack_for_mini_block(data))
431            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
432
433        maybe_wrap_general_for_mini_block(base, params)
434    }
435
436    /// Build compressor based on parameters for variable-width data
437    fn build_variable_width_compressor(
438        &self,
439        params: &CompressionFieldParams,
440        data: &VariableWidthBlock,
441    ) -> Result<Box<dyn MiniBlockCompressor>> {
442        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
443            return Err(Error::invalid_input(
444                format!(
445                    "Variable width compression not supported for {} bit offsets",
446                    data.bits_per_offset
447                ),
448                location!(),
449            ));
450        }
451
452        // Get statistics
453        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
454        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
455
456        // 1. Check for explicit "none" compression
457        if params.compression.as_deref() == Some("none") {
458            return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
459        }
460
461        // 2. Check for explicit "fsst" compression
462        if params.compression.as_deref() == Some("fsst") {
463            return Ok(Box::new(FsstMiniBlockEncoder::new(params.minichunk_size)));
464        }
465
466        // 3. Choose base encoder (FSST or Binary) based on data characteristics
467        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
468            >= FSST_LEAST_INPUT_MAX_LENGTH
469            && data_size >= FSST_LEAST_INPUT_SIZE as u64
470        {
471            Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
472        } else {
473            Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
474        };
475
476        // 4. Apply general compression if configured
477        if let Some(compression_scheme) = &params.compression {
478            if compression_scheme != "none" && compression_scheme != "fsst" {
479                let scheme: CompressionScheme = compression_scheme.parse()?;
480                let config = CompressionConfig::new(scheme, params.compression_level);
481                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
482            }
483        }
484
485        Ok(base_encoder)
486    }
487
488    /// Merge user-configured parameters with field metadata
489    /// Field metadata has highest priority
490    fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
491        let mut field_params = self
492            .params
493            .get_field_params(&field.name, &field.data_type());
494
495        // Override with field metadata if present (highest priority)
496        let metadata_params = Self::parse_field_metadata(field, &self.version);
497        field_params.merge(&metadata_params);
498
499        field_params
500    }
501}
502
503impl CompressionStrategy for DefaultCompressionStrategy {
504    fn create_miniblock_compressor(
505        &self,
506        field: &Field,
507        data: &DataBlock,
508    ) -> Result<Box<dyn MiniBlockCompressor>> {
509        let field_params = self.get_merged_field_params(field);
510
511        match data {
512            DataBlock::FixedWidth(fixed_width_data) => {
513                self.build_fixed_width_compressor(&field_params, fixed_width_data)
514            }
515            DataBlock::VariableWidth(variable_width_data) => {
516                self.build_variable_width_compressor(&field_params, variable_width_data)
517            }
518            DataBlock::Struct(struct_data_block) => {
519                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
520                // just being cautious here.
521                if struct_data_block.has_variable_width_child() {
522                    return Err(Error::invalid_input(
523                        "Packed struct mini-block encoding supports only fixed-width children",
524                        location!(),
525                    ));
526                }
527                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
528            }
529            DataBlock::FixedSizeList(_) => {
530                // Ideally we would compress the list items but this creates something of a challenge.
531                // We don't want to break lists across chunks and we need to worry about inner validity
532                // layers.  If we try and use a compression scheme then it is unlikely to respect these
533                // constraints.
534                //
535                // For now, we just don't compress.  In the future, we might want to consider a more
536                // sophisticated approach.
537                Ok(Box::new(ValueEncoder::default()))
538            }
539            _ => Err(Error::NotSupported {
540                source: format!(
541                    "Mini-block compression not yet supported for block type {}",
542                    data.name()
543                )
544                .into(),
545                location: location!(),
546            }),
547        }
548    }
549
550    fn create_per_value(
551        &self,
552        field: &Field,
553        data: &DataBlock,
554    ) -> Result<Box<dyn PerValueCompressor>> {
555        let field_params = self.get_merged_field_params(field);
556
557        match data {
558            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
559            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
560            DataBlock::Struct(struct_block) => {
561                if field.children.len() != struct_block.children.len() {
562                    return Err(Error::invalid_input(
563                        "Struct field metadata does not match data block children",
564                        location!(),
565                    ));
566                }
567                let has_variable_child = struct_block.has_variable_width_child();
568                if has_variable_child {
569                    if self.version < LanceFileVersion::V2_2 {
570                        return Err(Error::NotSupported {
571                            source: "Variable packed struct encoding requires Lance file version 2.2 or later".into(),
572                            location: location!(),
573                        });
574                    }
575                    Ok(Box::new(PackedStructVariablePerValueEncoder::new(
576                        self.clone(),
577                        field.children.clone(),
578                    )))
579                } else {
580                    Err(Error::invalid_input(
581                        "Packed struct per-value compression should not be used for fixed-width-only structs",
582                        location!(),
583                    ))
584                }
585            }
586            DataBlock::VariableWidth(variable_width) => {
587                // Check for explicit "none" compression
588                if field_params.compression.as_deref() == Some("none") {
589                    return Ok(Box::new(VariableEncoder::default()));
590                }
591
592                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
593                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
594
595                // If values are very large then use block compression on a per-value basis
596                //
597                // TODO: Could maybe use median here
598
599                let per_value_requested =
600                    if let Some(compression) = field_params.compression.as_deref() {
601                        compression != "fsst"
602                    } else {
603                        false
604                    };
605
606                if (max_len > 32 * 1024 || per_value_requested)
607                    && data_size >= FSST_LEAST_INPUT_SIZE as u64
608                {
609                    return Ok(Box::new(CompressedBufferEncoder::default()));
610                }
611
612                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
613                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
614                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
615
616                    let variable_compression = Box::new(VariableEncoder::default());
617
618                    // Use FSST if explicitly requested or if data characteristics warrant it
619                    if field_params.compression.as_deref() == Some("fsst")
620                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
621                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
622                    {
623                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
624                    } else {
625                        Ok(variable_compression)
626                    }
627                } else {
628                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
629                }
630            }
631            _ => unreachable!(
632                "Per-value compression not yet supported for block type: {}",
633                data.name()
634            ),
635        }
636    }
637
638    fn create_block_compressor(
639        &self,
640        field: &Field,
641        data: &DataBlock,
642    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
643        let field_params = self.get_merged_field_params(field);
644
645        match data {
646            DataBlock::FixedWidth(fixed_width) => {
647                if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
648                    return Ok((compressor, encoding));
649                }
650
651                // Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION)
652                if let Some((compressor, config)) =
653                    try_general_compression(self.version, &field_params, data)?
654                {
655                    let encoding = ProtobufUtils21::wrapped(
656                        config,
657                        ProtobufUtils21::flat(fixed_width.bits_per_value, None),
658                    )?;
659                    return Ok((compressor, encoding));
660                }
661
662                let encoder = Box::new(ValueEncoder::default());
663                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
664                Ok((encoder, encoding))
665            }
666            DataBlock::VariableWidth(variable_width) => {
667                // Try general compression
668                if let Some((compressor, config)) =
669                    try_general_compression(self.version, &field_params, data)?
670                {
671                    let encoding = ProtobufUtils21::wrapped(
672                        config,
673                        ProtobufUtils21::variable(
674                            ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
675                            None,
676                        ),
677                    )?;
678                    return Ok((compressor, encoding));
679                }
680
681                let encoder = Box::new(VariableEncoder::default());
682                let encoding = ProtobufUtils21::variable(
683                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
684                    None,
685                );
686                Ok((encoder, encoding))
687            }
688            _ => unreachable!(),
689        }
690    }
691}
692
693pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
694    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
695}
696
697pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
698    /// Decompress one or more values
699    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
700    /// The number of bits in each value
701    ///
702    /// Currently (and probably long term) this must be a multiple of 8
703    fn bits_per_value(&self) -> u64;
704}
705
706pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
707    /// Decompress one or more values
708    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
709}
710
711pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
712    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
713}
714
715pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
716    fn create_miniblock_decompressor(
717        &self,
718        description: &CompressiveEncoding,
719        decompression_strategy: &dyn DecompressionStrategy,
720    ) -> Result<Box<dyn MiniBlockDecompressor>>;
721
722    fn create_fixed_per_value_decompressor(
723        &self,
724        description: &CompressiveEncoding,
725    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
726
727    fn create_variable_per_value_decompressor(
728        &self,
729        description: &CompressiveEncoding,
730    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
731
732    fn create_block_decompressor(
733        &self,
734        description: &CompressiveEncoding,
735    ) -> Result<Box<dyn BlockDecompressor>>;
736}
737
738#[derive(Debug, Default)]
739pub struct DefaultDecompressionStrategy {}
740
741impl DecompressionStrategy for DefaultDecompressionStrategy {
742    fn create_miniblock_decompressor(
743        &self,
744        description: &CompressiveEncoding,
745        decompression_strategy: &dyn DecompressionStrategy,
746    ) -> Result<Box<dyn MiniBlockDecompressor>> {
747        match description.compression.as_ref().unwrap() {
748            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
749            #[cfg(feature = "bitpacking")]
750            Compression::InlineBitpacking(description) => {
751                Ok(Box::new(InlineBitpacking::from_description(description)))
752            }
753            #[cfg(not(feature = "bitpacking"))]
754            Compression::InlineBitpacking(_) => Err(Error::NotSupported {
755                source: "this runtime was not built with bitpacking support".into(),
756                location: location!(),
757            }),
758            Compression::Variable(variable) => {
759                let Compression::Flat(offsets) = variable
760                    .offsets
761                    .as_ref()
762                    .unwrap()
763                    .compression
764                    .as_ref()
765                    .unwrap()
766                else {
767                    panic!("Variable compression only supports flat offsets")
768                };
769                Ok(Box::new(BinaryMiniBlockDecompressor::new(
770                    offsets.bits_per_value as u8,
771                )))
772            }
773            Compression::Fsst(description) => {
774                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
775                    description.values.as_ref().unwrap(),
776                    decompression_strategy,
777                )?;
778                Ok(Box::new(FsstMiniBlockDecompressor::new(
779                    description,
780                    inner_decompressor,
781                )))
782            }
783            Compression::PackedStruct(description) => Ok(Box::new(
784                PackedStructFixedWidthMiniBlockDecompressor::new(description),
785            )),
786            Compression::VariablePackedStruct(_) => Err(Error::NotSupported {
787                source: "variable packed struct decoding is not yet implemented".into(),
788                location: location!(),
789            }),
790            Compression::FixedSizeList(fsl) => {
791                // In the future, we might need to do something more complex here if FSL supports
792                // compression.
793                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
794            }
795            Compression::Rle(rle) => {
796                let Compression::Flat(values) =
797                    rle.values.as_ref().unwrap().compression.as_ref().unwrap()
798                else {
799                    panic!("RLE compression only supports flat values")
800                };
801                let Compression::Flat(run_lengths) = rle
802                    .run_lengths
803                    .as_ref()
804                    .unwrap()
805                    .compression
806                    .as_ref()
807                    .unwrap()
808                else {
809                    panic!("RLE compression only supports flat run lengths")
810                };
811                assert_eq!(
812                    run_lengths.bits_per_value, 8,
813                    "RLE compression only supports 8-bit run lengths"
814                );
815                Ok(Box::new(RleMiniBlockDecompressor::new(
816                    values.bits_per_value,
817                )))
818            }
819            Compression::ByteStreamSplit(bss) => {
820                let Compression::Flat(values) =
821                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
822                else {
823                    panic!("ByteStreamSplit compression only supports flat values")
824                };
825                Ok(Box::new(ByteStreamSplitDecompressor::new(
826                    values.bits_per_value as usize,
827                )))
828            }
829            Compression::General(general) => {
830                // Create inner decompressor
831                let inner_decompressor = self.create_miniblock_decompressor(
832                    general.values.as_ref().ok_or_else(|| {
833                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
834                    })?,
835                    decompression_strategy,
836                )?;
837
838                // Parse compression config
839                let compression = general.compression.as_ref().ok_or_else(|| {
840                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
841                })?;
842
843                let scheme = compression.scheme().try_into()?;
844
845                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
846                    scheme,
847                    compression.level,
848                );
849
850                Ok(Box::new(GeneralMiniBlockDecompressor::new(
851                    inner_decompressor,
852                    compression_config,
853                )))
854            }
855            _ => todo!(),
856        }
857    }
858
859    fn create_fixed_per_value_decompressor(
860        &self,
861        description: &CompressiveEncoding,
862    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
863        match description.compression.as_ref().unwrap() {
864            Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
865                constant
866                    .value
867                    .as_ref()
868                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
869            ))),
870            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
871            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
872            _ => todo!("fixed-per-value decompressor for {:?}", description),
873        }
874    }
875
876    fn create_variable_per_value_decompressor(
877        &self,
878        description: &CompressiveEncoding,
879    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
880        match description.compression.as_ref().unwrap() {
881            Compression::Variable(variable) => {
882                let Compression::Flat(offsets) = variable
883                    .offsets
884                    .as_ref()
885                    .unwrap()
886                    .compression
887                    .as_ref()
888                    .unwrap()
889                else {
890                    panic!("Variable compression only supports flat offsets")
891                };
892                assert!(offsets.bits_per_value < u8::MAX as u64);
893                Ok(Box::new(VariableDecoder::default()))
894            }
895            Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
896                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
897                Box::new(VariableDecoder::default()),
898            ))),
899            Compression::General(ref general) => {
900                Ok(Box::new(CompressedBufferEncoder::from_scheme(
901                    general.compression.as_ref().expect_ok()?.scheme(),
902                )?))
903            }
904            Compression::VariablePackedStruct(description) => {
905                let mut fields = Vec::with_capacity(description.fields.len());
906                for field in &description.fields {
907                    let value_encoding = field.value.as_ref().ok_or_else(|| {
908                        Error::invalid_input(
909                            "VariablePackedStruct field is missing value encoding",
910                            location!(),
911                        )
912                    })?;
913                    let decoder = match field.layout.as_ref().ok_or_else(|| {
914                        Error::invalid_input(
915                            "VariablePackedStruct field is missing layout details",
916                            location!(),
917                        )
918                    })? {
919                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
920                            bits_per_value,
921                        ) => {
922                            let decompressor =
923                                self.create_fixed_per_value_decompressor(value_encoding)?;
924                            VariablePackedStructFieldDecoder {
925                                kind: VariablePackedStructFieldKind::Fixed {
926                                    bits_per_value: *bits_per_value,
927                                    decompressor: Arc::from(decompressor),
928                                },
929                            }
930                        }
931                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
932                            bits_per_length,
933                        ) => {
934                            let decompressor =
935                                self.create_variable_per_value_decompressor(value_encoding)?;
936                            VariablePackedStructFieldDecoder {
937                                kind: VariablePackedStructFieldKind::Variable {
938                                    bits_per_length: *bits_per_length,
939                                    decompressor: Arc::from(decompressor),
940                                },
941                            }
942                        }
943                    };
944                    fields.push(decoder);
945                }
946                Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
947                    fields,
948                )))
949            }
950            _ => todo!("variable-per-value decompressor for {:?}", description),
951        }
952    }
953
954    fn create_block_decompressor(
955        &self,
956        description: &CompressiveEncoding,
957    ) -> Result<Box<dyn BlockDecompressor>> {
958        match description.compression.as_ref().unwrap() {
959            Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
960                InlineBitpacking::from_description(inline_bitpacking),
961            )),
962            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
963            Compression::Constant(constant) => {
964                let scalar = constant
965                    .value
966                    .as_ref()
967                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
968                Ok(Box::new(ConstantDecompressor::new(scalar)))
969            }
970            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
971            Compression::FixedSizeList(fsl) => {
972                Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
973            }
974            Compression::OutOfLineBitpacking(out_of_line) => {
975                // Extract the compressed bit width from the values encoding
976                let compressed_bit_width = match out_of_line
977                    .values
978                    .as_ref()
979                    .unwrap()
980                    .compression
981                    .as_ref()
982                    .unwrap()
983                {
984                    Compression::Flat(flat) => flat.bits_per_value,
985                    _ => {
986                        return Err(Error::InvalidInput {
987                            location: location!(),
988                            source: "OutOfLineBitpacking values must use Flat encoding".into(),
989                        })
990                    }
991                };
992                Ok(Box::new(OutOfLineBitpacking::new(
993                    compressed_bit_width,
994                    out_of_line.uncompressed_bits_per_value,
995                )))
996            }
997            Compression::General(general) => {
998                let inner_desc = general
999                    .values
1000                    .as_ref()
1001                    .ok_or_else(|| {
1002                        Error::invalid_input(
1003                            "General compression missing inner encoding",
1004                            location!(),
1005                        )
1006                    })?
1007                    .as_ref();
1008                let inner_decompressor = self.create_block_decompressor(inner_desc)?;
1009
1010                let compression = general.compression.as_ref().ok_or_else(|| {
1011                    Error::invalid_input(
1012                        "General compression missing compression config",
1013                        location!(),
1014                    )
1015                })?;
1016                let scheme = compression.scheme().try_into()?;
1017                let config = CompressionConfig::new(scheme, compression.level);
1018                let general_decompressor =
1019                    GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1020
1021                Ok(Box::new(general_decompressor))
1022            }
1023            _ => todo!(),
1024        }
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031    use crate::buffer::LanceBuffer;
1032    use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1033    use crate::testing::extract_array_encoding_chain;
1034    use arrow_schema::{DataType, Field as ArrowField};
1035    use std::collections::HashMap;
1036
1037    fn create_test_field(name: &str, data_type: DataType) -> Field {
1038        let arrow_field = ArrowField::new(name, data_type, true);
1039        let mut field = Field::try_from(&arrow_field).unwrap();
1040        field.id = -1;
1041        field
1042    }
1043
1044    fn create_fixed_width_block_with_stats(
1045        bits_per_value: u64,
1046        num_values: u64,
1047        run_count: u64,
1048    ) -> DataBlock {
1049        // Create varied data to avoid low entropy
1050        let bytes_per_value = (bits_per_value / 8) as usize;
1051        let total_bytes = bytes_per_value * num_values as usize;
1052        let mut data = vec![0u8; total_bytes];
1053
1054        // Create data with specified run count
1055        let values_per_run = (num_values / run_count).max(1);
1056        let mut run_value = 0u8;
1057
1058        for i in 0..num_values as usize {
1059            if i % values_per_run as usize == 0 {
1060                run_value = run_value.wrapping_add(17); // Use prime to get varied values
1061            }
1062            // Fill all bytes of the value to create high entropy
1063            for j in 0..bytes_per_value {
1064                let byte_offset = i * bytes_per_value + j;
1065                if byte_offset < data.len() {
1066                    data[byte_offset] = run_value.wrapping_add(j as u8);
1067                }
1068            }
1069        }
1070
1071        let mut block = FixedWidthDataBlock {
1072            bits_per_value,
1073            data: LanceBuffer::reinterpret_vec(data),
1074            num_values,
1075            block_info: BlockInfo::default(),
1076        };
1077
1078        // Compute all statistics including BytePositionEntropy
1079        use crate::statistics::ComputeStat;
1080        block.compute_stat();
1081
1082        DataBlock::FixedWidth(block)
1083    }
1084
1085    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1086        // Create data with some variety to avoid always triggering BSS
1087        let bytes_per_value = (bits_per_value / 8) as usize;
1088        let total_bytes = bytes_per_value * num_values as usize;
1089        let mut data = vec![0u8; total_bytes];
1090
1091        // Add some variation to the data to make it more realistic
1092        for i in 0..num_values as usize {
1093            let byte_offset = i * bytes_per_value;
1094            if byte_offset < data.len() {
1095                data[byte_offset] = (i % 256) as u8;
1096            }
1097        }
1098
1099        let mut block = FixedWidthDataBlock {
1100            bits_per_value,
1101            data: LanceBuffer::reinterpret_vec(data),
1102            num_values,
1103            block_info: BlockInfo::default(),
1104        };
1105
1106        // Compute all statistics including BytePositionEntropy
1107        use crate::statistics::ComputeStat;
1108        block.compute_stat();
1109
1110        DataBlock::FixedWidth(block)
1111    }
1112
1113    fn create_variable_width_block(
1114        bits_per_offset: u8,
1115        num_values: u64,
1116        avg_value_size: usize,
1117    ) -> DataBlock {
1118        use crate::statistics::ComputeStat;
1119
1120        // Create offsets buffer (num_values + 1 offsets)
1121        let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1122        let mut current_offset = 0i64;
1123        offsets.push(current_offset);
1124
1125        // Generate offsets with varying value sizes
1126        for i in 0..num_values {
1127            let value_size = if avg_value_size == 0 {
1128                1
1129            } else {
1130                ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1131                    .min(avg_value_size * 2)
1132            };
1133            current_offset += value_size as i64;
1134            offsets.push(current_offset);
1135        }
1136
1137        // Create data buffer with realistic content
1138        let total_data_size = current_offset as usize;
1139        let mut data = vec![0u8; total_data_size];
1140
1141        // Fill data with varied content
1142        for i in 0..num_values {
1143            let start_offset = offsets[i as usize] as usize;
1144            let end_offset = offsets[(i + 1) as usize] as usize;
1145
1146            let content = (i % 256) as u8;
1147            for j in 0..end_offset - start_offset {
1148                data[start_offset + j] = content.wrapping_add(j as u8);
1149            }
1150        }
1151
1152        // Convert offsets to appropriate lance buffer
1153        let offsets_buffer = match bits_per_offset {
1154            32 => {
1155                let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1156                LanceBuffer::reinterpret_vec(offsets_32)
1157            }
1158            64 => LanceBuffer::reinterpret_vec(offsets),
1159            _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1160        };
1161
1162        let mut block = VariableWidthBlock {
1163            data: LanceBuffer::from(data),
1164            offsets: offsets_buffer,
1165            bits_per_offset,
1166            num_values,
1167            block_info: BlockInfo::default(),
1168        };
1169
1170        block.compute_stat();
1171        DataBlock::VariableWidth(block)
1172    }
1173
1174    #[test]
1175    fn test_parameter_based_compression() {
1176        let mut params = CompressionParams::new();
1177
1178        // Configure RLE for ID columns with BSS explicitly disabled
1179        params.columns.insert(
1180            "*_id".to_string(),
1181            CompressionFieldParams {
1182                rle_threshold: Some(0.3),
1183                compression: Some("lz4".to_string()),
1184                compression_level: None,
1185                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
1186                minichunk_size: None,
1187            },
1188        );
1189
1190        let strategy = DefaultCompressionStrategy::with_params(params);
1191        let field = create_test_field("user_id", DataType::Int32);
1192
1193        // Create data with low run count for RLE
1194        // Use create_fixed_width_block_with_stats which properly sets run count
1195        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
1196
1197        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1198        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
1199        let debug_str = format!("{:?}", compressor);
1200
1201        // The compressor should be RLE wrapped in general compression
1202        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1203        assert!(debug_str.contains("RleMiniBlockEncoder"));
1204    }
1205
1206    #[test]
1207    fn test_type_level_parameters() {
1208        let mut params = CompressionParams::new();
1209
1210        // Configure all Int32 to use specific settings
1211        params.types.insert(
1212            "Int32".to_string(),
1213            CompressionFieldParams {
1214                rle_threshold: Some(0.1), // Very low threshold
1215                compression: Some("zstd".to_string()),
1216                compression_level: Some(3),
1217                bss: Some(BssMode::Off), // Disable BSS to test RLE
1218                minichunk_size: None,
1219            },
1220        );
1221
1222        let strategy = DefaultCompressionStrategy::with_params(params);
1223        let field = create_test_field("some_column", DataType::Int32);
1224        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
1225        let data = create_fixed_width_block_with_stats(32, 1000, 50);
1226
1227        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1228        // Should use RLE due to very low threshold
1229        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
1230    }
1231
1232    #[test]
1233    #[cfg(feature = "bitpacking")]
1234    fn test_low_cardinality_prefers_bitpacking_over_rle() {
1235        let strategy = DefaultCompressionStrategy::new();
1236        let field = create_test_field("int_score", DataType::Int64);
1237
1238        // Low cardinality values (3/4/5) but with moderate run count:
1239        // RLE compresses vs raw, yet bitpacking should be smaller.
1240        let mut values: Vec<u64> = Vec::with_capacity(256);
1241        for run_idx in 0..64 {
1242            let value = match run_idx % 3 {
1243                0 => 3u64,
1244                1 => 4u64,
1245                _ => 5u64,
1246            };
1247            values.extend(std::iter::repeat_n(value, 4));
1248        }
1249
1250        let mut block = FixedWidthDataBlock {
1251            bits_per_value: 64,
1252            data: LanceBuffer::reinterpret_vec(values),
1253            num_values: 256,
1254            block_info: BlockInfo::default(),
1255        };
1256
1257        use crate::statistics::ComputeStat;
1258        block.compute_stat();
1259
1260        let data = DataBlock::FixedWidth(block);
1261        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1262        let debug_str = format!("{:?}", compressor);
1263        assert!(
1264            debug_str.contains("InlineBitpacking"),
1265            "expected InlineBitpacking, got: {debug_str}"
1266        );
1267        assert!(
1268            !debug_str.contains("RleMiniBlockEncoder"),
1269            "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1270        );
1271    }
1272
1273    fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1274        let chain = extract_array_encoding_chain(encoding);
1275        if variable {
1276            assert_eq!(chain.len(), 2);
1277            assert_eq!(chain.first().unwrap().as_str(), "variable");
1278            assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1279        } else {
1280            assert_eq!(chain.len(), 1);
1281            assert_eq!(chain.first().unwrap().as_str(), "flat");
1282        }
1283    }
1284
1285    #[test]
1286    fn test_none_compression() {
1287        let mut params = CompressionParams::new();
1288
1289        // Disable compression for embeddings
1290        params.columns.insert(
1291            "embeddings".to_string(),
1292            CompressionFieldParams {
1293                compression: Some("none".to_string()),
1294                ..Default::default()
1295            },
1296        );
1297
1298        let strategy = DefaultCompressionStrategy::with_params(params);
1299        let field = create_test_field("embeddings", DataType::Float32);
1300        let fixed_data = create_fixed_width_block(32, 1000);
1301        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1302
1303        // Test miniblock
1304        let compressor = strategy
1305            .create_miniblock_compressor(&field, &fixed_data)
1306            .unwrap();
1307        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1308        check_uncompressed_encoding(&encoding, false);
1309        let compressor = strategy
1310            .create_miniblock_compressor(&field, &variable_data)
1311            .unwrap();
1312        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1313        check_uncompressed_encoding(&encoding, true);
1314
1315        // Test pervalue
1316        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1317        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1318        check_uncompressed_encoding(&encoding, false);
1319        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1320        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1321        check_uncompressed_encoding(&encoding, true);
1322    }
1323
1324    #[test]
1325    fn test_field_metadata_none_compression() {
1326        // Prepare field with metadata for none compression
1327        let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1328        let mut metadata = HashMap::new();
1329        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1330        arrow_field = arrow_field.with_metadata(metadata);
1331        let field = Field::try_from(&arrow_field).unwrap();
1332
1333        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1334
1335        // Test miniblock
1336        let fixed_data = create_fixed_width_block(32, 1000);
1337        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1338
1339        let compressor = strategy
1340            .create_miniblock_compressor(&field, &fixed_data)
1341            .unwrap();
1342        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1343        check_uncompressed_encoding(&encoding, false);
1344
1345        let compressor = strategy
1346            .create_miniblock_compressor(&field, &variable_data)
1347            .unwrap();
1348        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1349        check_uncompressed_encoding(&encoding, true);
1350
1351        // Test pervalue
1352        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1353        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1354        check_uncompressed_encoding(&encoding, false);
1355
1356        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1357        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1358        check_uncompressed_encoding(&encoding, true);
1359    }
1360
1361    #[test]
1362    fn test_parameter_merge_priority() {
1363        let mut params = CompressionParams::new();
1364
1365        // Set type-level
1366        params.types.insert(
1367            "Int32".to_string(),
1368            CompressionFieldParams {
1369                rle_threshold: Some(0.5),
1370                compression: Some("lz4".to_string()),
1371                ..Default::default()
1372            },
1373        );
1374
1375        // Set column-level (highest priority)
1376        params.columns.insert(
1377            "user_id".to_string(),
1378            CompressionFieldParams {
1379                rle_threshold: Some(0.2),
1380                compression: Some("zstd".to_string()),
1381                compression_level: Some(6),
1382                bss: None,
1383                minichunk_size: None,
1384            },
1385        );
1386
1387        let strategy = DefaultCompressionStrategy::with_params(params);
1388
1389        // Get merged params
1390        let merged = strategy
1391            .params
1392            .get_field_params("user_id", &DataType::Int32);
1393
1394        // Column params should override type params
1395        assert_eq!(merged.rle_threshold, Some(0.2));
1396        assert_eq!(merged.compression, Some("zstd".to_string()));
1397        assert_eq!(merged.compression_level, Some(6));
1398
1399        // Test field with only type params
1400        let merged = strategy
1401            .params
1402            .get_field_params("other_field", &DataType::Int32);
1403        assert_eq!(merged.rle_threshold, Some(0.5));
1404        assert_eq!(merged.compression, Some("lz4".to_string()));
1405        assert_eq!(merged.compression_level, None);
1406    }
1407
1408    #[test]
1409    fn test_pattern_matching() {
1410        let mut params = CompressionParams::new();
1411
1412        // Configure pattern for log files
1413        params.columns.insert(
1414            "log_*".to_string(),
1415            CompressionFieldParams {
1416                compression: Some("zstd".to_string()),
1417                compression_level: Some(6),
1418                ..Default::default()
1419            },
1420        );
1421
1422        let strategy = DefaultCompressionStrategy::with_params(params);
1423
1424        // Should match pattern
1425        let merged = strategy
1426            .params
1427            .get_field_params("log_messages", &DataType::Utf8);
1428        assert_eq!(merged.compression, Some("zstd".to_string()));
1429        assert_eq!(merged.compression_level, Some(6));
1430
1431        // Should not match
1432        let merged = strategy
1433            .params
1434            .get_field_params("messages_log", &DataType::Utf8);
1435        assert_eq!(merged.compression, None);
1436    }
1437
1438    #[test]
1439    fn test_legacy_metadata_support() {
1440        let params = CompressionParams::new();
1441        let strategy = DefaultCompressionStrategy::with_params(params);
1442
1443        // Test field with "none" compression metadata
1444        let mut metadata = HashMap::new();
1445        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1446        let mut field = create_test_field("some_column", DataType::Int32);
1447        field.metadata = metadata;
1448
1449        let data = create_fixed_width_block(32, 1000);
1450        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1451
1452        // Should respect metadata and use ValueEncoder
1453        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1454    }
1455
1456    #[test]
1457    fn test_default_behavior() {
1458        // Empty params should fall back to default behavior
1459        let params = CompressionParams::new();
1460        let strategy = DefaultCompressionStrategy::with_params(params);
1461
1462        let field = create_test_field("random_column", DataType::Int32);
1463        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
1464        let data = create_fixed_width_block_with_stats(32, 1000, 600);
1465
1466        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1467        // Should use default strategy's decision
1468        let debug_str = format!("{:?}", compressor);
1469        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1470    }
1471
1472    #[test]
1473    fn test_field_metadata_compression() {
1474        let params = CompressionParams::new();
1475        let strategy = DefaultCompressionStrategy::with_params(params);
1476
1477        // Test field with compression metadata
1478        let mut metadata = HashMap::new();
1479        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1480        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1481        let mut field = create_test_field("test_column", DataType::Int32);
1482        field.metadata = metadata;
1483
1484        let data = create_fixed_width_block(32, 1000);
1485        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1486
1487        // Should use zstd with level 6
1488        let debug_str = format!("{:?}", compressor);
1489        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1490    }
1491
1492    #[test]
1493    fn test_field_metadata_rle_threshold() {
1494        let params = CompressionParams::new();
1495        let strategy = DefaultCompressionStrategy::with_params(params);
1496
1497        // Test field with RLE threshold metadata
1498        let mut metadata = HashMap::new();
1499        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1500        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
1501        let mut field = create_test_field("test_column", DataType::Int32);
1502        field.metadata = metadata;
1503
1504        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
1505        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
1506        let data = create_fixed_width_block_with_stats(32, 1000, 100);
1507
1508        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1509
1510        // Should use RLE because run_count (100) < num_values * threshold (800)
1511        let debug_str = format!("{:?}", compressor);
1512        assert!(debug_str.contains("RleMiniBlockEncoder"));
1513    }
1514
1515    #[test]
1516    fn test_field_metadata_override_params() {
1517        // Set up params with one configuration
1518        let mut params = CompressionParams::new();
1519        params.columns.insert(
1520            "test_column".to_string(),
1521            CompressionFieldParams {
1522                rle_threshold: Some(0.3),
1523                compression: Some("lz4".to_string()),
1524                compression_level: None,
1525                bss: None,
1526                minichunk_size: None,
1527            },
1528        );
1529
1530        let strategy = DefaultCompressionStrategy::with_params(params);
1531
1532        // Field metadata should override params
1533        let mut metadata = HashMap::new();
1534        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1535        let mut field = create_test_field("test_column", DataType::Int32);
1536        field.metadata = metadata;
1537
1538        let data = create_fixed_width_block(32, 1000);
1539        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1540
1541        // Should use none compression (from metadata) instead of lz4 (from params)
1542        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1543    }
1544
1545    #[test]
1546    fn test_field_metadata_mixed_configuration() {
1547        // Configure type-level params
1548        let mut params = CompressionParams::new();
1549        params.types.insert(
1550            "Int32".to_string(),
1551            CompressionFieldParams {
1552                rle_threshold: Some(0.5),
1553                compression: Some("lz4".to_string()),
1554                ..Default::default()
1555            },
1556        );
1557
1558        let strategy = DefaultCompressionStrategy::with_params(params);
1559
1560        // Field metadata provides partial override
1561        let mut metadata = HashMap::new();
1562        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1563        let mut field = create_test_field("test_column", DataType::Int32);
1564        field.metadata = metadata;
1565
1566        let data = create_fixed_width_block(32, 1000);
1567        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1568
1569        // Should use lz4 (from type params) with level 3 (from metadata)
1570        let debug_str = format!("{:?}", compressor);
1571        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1572    }
1573
1574    #[test]
1575    fn test_bss_field_metadata() {
1576        let params = CompressionParams::new();
1577        let strategy = DefaultCompressionStrategy::with_params(params);
1578
1579        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1580        let mut metadata = HashMap::new();
1581        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1582        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1583        let arrow_field =
1584            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1585        let field = Field::try_from(&arrow_field).unwrap();
1586
1587        // Create float data
1588        let data = create_fixed_width_block(32, 100);
1589
1590        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1591        let debug_str = format!("{:?}", compressor);
1592        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1593    }
1594
1595    #[test]
1596    fn test_bss_with_compression() {
1597        let params = CompressionParams::new();
1598        let strategy = DefaultCompressionStrategy::with_params(params);
1599
1600        // Test BSS with LZ4 compression
1601        let mut metadata = HashMap::new();
1602        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1603        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1604        let arrow_field =
1605            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1606        let field = Field::try_from(&arrow_field).unwrap();
1607
1608        // Create double data
1609        let data = create_fixed_width_block(64, 100);
1610
1611        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1612        let debug_str = format!("{:?}", compressor);
1613        // Should have BSS wrapped in general compression
1614        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1615        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1616    }
1617
1618    #[test]
1619    #[cfg(any(feature = "lz4", feature = "zstd"))]
1620    fn test_general_block_decompression_fixed_width_v2_2() {
1621        // Request general compression via the write path (2.2 requirement) and ensure the read path mirrors it.
1622        let mut params = CompressionParams::new();
1623        params.columns.insert(
1624            "dict_values".to_string(),
1625            CompressionFieldParams {
1626                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1627                ..Default::default()
1628            },
1629        );
1630
1631        let mut strategy = DefaultCompressionStrategy::with_params(params);
1632        strategy.version = LanceFileVersion::V2_2;
1633
1634        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1635        let data = create_fixed_width_block(24, 1024);
1636        let DataBlock::FixedWidth(expected_block) = &data else {
1637            panic!("expected fixed width block");
1638        };
1639        let expected_bits = expected_block.bits_per_value;
1640        let expected_num_values = expected_block.num_values;
1641        let num_values = expected_num_values;
1642
1643        let (compressor, encoding) = strategy
1644            .create_block_compressor(&field, &data)
1645            .expect("general compression should be selected");
1646        match encoding.compression.as_ref() {
1647            Some(Compression::General(_)) => {}
1648            other => panic!("expected general compression, got {:?}", other),
1649        }
1650
1651        let compressed_buffer = compressor
1652            .compress(data.clone())
1653            .expect("write path general compression should succeed");
1654
1655        let decompressor = DefaultDecompressionStrategy::default()
1656            .create_block_decompressor(&encoding)
1657            .expect("general block decompressor should be created");
1658
1659        let decoded = decompressor
1660            .decompress(compressed_buffer, num_values)
1661            .expect("decompression should succeed");
1662
1663        match decoded {
1664            DataBlock::FixedWidth(block) => {
1665                assert_eq!(block.bits_per_value, expected_bits);
1666                assert_eq!(block.num_values, expected_num_values);
1667                assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1668            }
1669            _ => panic!("expected fixed width block"),
1670        }
1671    }
1672}