lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{
36                CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37                GeneralBlockDecompressor,
38            },
39            byte_stream_split::{
40                should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
41            },
42            constant::ConstantDecompressor,
43            fsst::{
44                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45                FsstPerValueEncoder,
46            },
47            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48            packed::{
49                PackedStructFixedWidthMiniBlockDecompressor,
50                PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51                PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52                VariablePackedStructFieldKind,
53            },
54            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
55            value::{ValueDecompressor, ValueEncoder},
56        },
57    },
58    format::{
59        pb21::{compressive_encoding::Compression, CompressiveEncoding},
60        ProtobufUtils21,
61    },
62    statistics::{GetStat, Stat},
63    version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
68use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
69use snafu::location;
70use std::{str::FromStr, sync::Arc};
71
72/// Default threshold for RLE compression selection.
73/// RLE is chosen when the run count is less than this fraction of total values.
74const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
75
76// Minimum block size (32kb) to trigger general block compression
77const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
78
79/// Trait for compression algorithms that compress an entire block of data into one opaque
80/// and self-described chunk.
81///
82/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
83///
84/// This is the most general type of compression.  There are no constraints on the method
85/// of compression it is assumed that the entire block of data will be present at decompression.
86///
87/// This is the least appropriate strategy for random access because we must load the entire
88/// block to access any single value.  This should only be used for cases where random access is never
89/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
90/// mini-block chunks)
91pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
92    /// Compress the data into a single buffer
93    ///
94    /// Also returns a description of the compression that can be used to decompress
95    /// when reading the data back
96    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
97}
98
99/// A trait to pick which compression to use for given data
100///
101/// There are several different kinds of compression.
102///
103/// - Block compression is the most generic, but most difficult to use efficiently
104/// - Per-value compression results in either a fixed width data block or a variable
105///   width data block.  In other words, there is some number of bits per value.
106///   In addition, each value should be independently decompressible.
107/// - Mini-block compression results in a small block of opaque data for chunks
108///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
109///   used for narrow data types (both fixed and variable length) where we can
110///   fit many values into an 16KiB block.
111pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
112    /// Create a block compressor for the given data
113    fn create_block_compressor(
114        &self,
115        field: &Field,
116        data: &DataBlock,
117    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
118
119    /// Create a per-value compressor for the given data
120    fn create_per_value(
121        &self,
122        field: &Field,
123        data: &DataBlock,
124    ) -> Result<Box<dyn PerValueCompressor>>;
125
126    /// Create a mini-block compressor for the given data
127    fn create_miniblock_compressor(
128        &self,
129        field: &Field,
130        data: &DataBlock,
131    ) -> Result<Box<dyn MiniBlockCompressor>>;
132}
133
134#[derive(Debug, Default, Clone)]
135pub struct DefaultCompressionStrategy {
136    /// User-configured compression parameters
137    params: CompressionParams,
138    /// The lance file version for compatibilities.
139    version: LanceFileVersion,
140}
141
142fn try_bss_for_mini_block(
143    data: &FixedWidthDataBlock,
144    params: &CompressionFieldParams,
145) -> Option<Box<dyn MiniBlockCompressor>> {
146    // BSS requires general compression to be effective
147    // If compression is not set or explicitly disabled, skip BSS
148    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
149        return None;
150    }
151
152    let mode = params.bss.unwrap_or(BssMode::Auto);
153    // should_use_bss already checks for supported bit widths (32/64)
154    if should_use_bss(data, mode) {
155        return Some(Box::new(ByteStreamSplitEncoder::new(
156            data.bits_per_value as usize,
157        )));
158    }
159    None
160}
161
162fn try_rle_for_mini_block(
163    data: &FixedWidthDataBlock,
164    params: &CompressionFieldParams,
165) -> Option<Box<dyn MiniBlockCompressor>> {
166    let bits = data.bits_per_value;
167    if !matches!(bits, 8 | 16 | 32 | 64) {
168        return None;
169    }
170
171    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
172    let threshold = params
173        .rle_threshold
174        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
175
176    if (run_count as f64) < (data.num_values as f64) * threshold {
177        return Some(Box::new(RleMiniBlockEncoder::new()));
178    }
179    None
180}
181
182fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
183    #[cfg(feature = "bitpacking")]
184    {
185        use arrow_array::cast::AsArray;
186
187        let bits = _data.bits_per_value;
188        if !matches!(bits, 8 | 16 | 32 | 64) {
189            return None;
190        }
191
192        let bit_widths = _data.expect_stat(Stat::BitWidth);
193        let widths = bit_widths.as_primitive::<UInt64Type>();
194        let too_small = widths.len() == 1
195            && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
196
197        if !too_small {
198            return Some(Box::new(InlineBitpacking::new(bits)));
199        }
200        None
201    }
202    #[cfg(not(feature = "bitpacking"))]
203    {
204        None
205    }
206}
207
208fn try_bitpack_for_block(
209    data: &FixedWidthDataBlock,
210) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
211    let bits = data.bits_per_value;
212    if !matches!(bits, 8 | 16 | 32 | 64) {
213        return None;
214    }
215
216    let bit_widths = data.expect_stat(Stat::BitWidth);
217    let widths = bit_widths.as_primitive::<UInt64Type>();
218    let has_all_zeros = widths.values().contains(&0);
219    let max_bit_width = *widths.values().iter().max().unwrap();
220
221    let too_small =
222        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
223
224    if has_all_zeros || too_small {
225        return None;
226    }
227
228    if data.num_values <= 1024 {
229        let compressor = Box::new(InlineBitpacking::new(bits));
230        let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
231        Some((compressor, encoding))
232    } else {
233        let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
234        let encoding = ProtobufUtils21::out_of_line_bitpacking(
235            bits,
236            ProtobufUtils21::flat(max_bit_width, None),
237        );
238        Some((compressor, encoding))
239    }
240}
241
242fn maybe_wrap_general_for_mini_block(
243    inner: Box<dyn MiniBlockCompressor>,
244    params: &CompressionFieldParams,
245) -> Result<Box<dyn MiniBlockCompressor>> {
246    match params.compression.as_deref() {
247        None | Some("none") | Some("fsst") => Ok(inner),
248        Some(raw) => {
249            let scheme = CompressionScheme::from_str(raw).map_err(|_| {
250                lance_core::Error::invalid_input(
251                    format!("Unknown compression scheme: {raw}"),
252                    location!(),
253                )
254            })?;
255            let cfg = CompressionConfig::new(scheme, params.compression_level);
256            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
257        }
258    }
259}
260
261fn try_general_compression(
262    version: LanceFileVersion,
263    field_params: &CompressionFieldParams,
264    data: &DataBlock,
265) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
266    // User-requested compression (unused today but perhaps still used
267    // in the future someday)
268    if let Some(compression_scheme) = &field_params.compression {
269        if compression_scheme != "none" && version >= LanceFileVersion::V2_2 {
270            let scheme: CompressionScheme = compression_scheme.parse()?;
271            let config = CompressionConfig::new(scheme, field_params.compression_level);
272            let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
273            return Ok(Some((compressor, config)));
274        }
275    }
276
277    // Automatic compression for large blocks
278    if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
279        && version >= LanceFileVersion::V2_2
280    {
281        let compressor = Box::new(CompressedBufferEncoder::default());
282        let config = compressor.compressor.config();
283        return Ok(Some((compressor, config)));
284    }
285
286    Ok(None)
287}
288
289impl DefaultCompressionStrategy {
290    /// Create a new compression strategy with default behavior
291    pub fn new() -> Self {
292        Self::default()
293    }
294
295    /// Create a new compression strategy with user-configured parameters
296    pub fn with_params(params: CompressionParams) -> Self {
297        Self {
298            params,
299            version: LanceFileVersion::default(),
300        }
301    }
302
303    /// Override the file version used to make compression decisions
304    pub fn with_version(mut self, version: LanceFileVersion) -> Self {
305        self.version = version;
306        self
307    }
308
309    /// Parse compression parameters from field metadata
310    fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
311        let mut params = CompressionFieldParams::default();
312
313        // Parse compression method
314        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
315            params.compression = Some(compression.clone());
316        }
317
318        // Parse compression level
319        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
320            params.compression_level = level.parse().ok();
321        }
322
323        // Parse RLE threshold
324        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
325            params.rle_threshold = threshold.parse().ok();
326        }
327
328        // Parse BSS mode
329        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
330            match BssMode::parse(bss_str) {
331                Some(mode) => params.bss = Some(mode),
332                None => {
333                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
334                }
335            }
336        }
337
338        params
339    }
340
341    fn build_fixed_width_compressor(
342        &self,
343        params: &CompressionFieldParams,
344        data: &FixedWidthDataBlock,
345    ) -> Result<Box<dyn MiniBlockCompressor>> {
346        if params.compression.as_deref() == Some("none") {
347            return Ok(Box::new(ValueEncoder::default()));
348        }
349
350        let base = try_bss_for_mini_block(data, params)
351            .or_else(|| try_rle_for_mini_block(data, params))
352            .or_else(|| try_bitpack_for_mini_block(data))
353            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
354
355        maybe_wrap_general_for_mini_block(base, params)
356    }
357
358    /// Build compressor based on parameters for variable-width data
359    fn build_variable_width_compressor(
360        &self,
361        params: &CompressionFieldParams,
362        data: &VariableWidthBlock,
363    ) -> Result<Box<dyn MiniBlockCompressor>> {
364        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
365            return Err(Error::invalid_input(
366                format!(
367                    "Variable width compression not supported for {} bit offsets",
368                    data.bits_per_offset
369                ),
370                location!(),
371            ));
372        }
373
374        // Get statistics
375        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
376        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
377
378        // 1. Check for explicit "none" compression
379        if params.compression.as_deref() == Some("none") {
380            return Ok(Box::new(BinaryMiniBlockEncoder::default()));
381        }
382
383        // 2. Check for explicit "fsst" compression
384        if params.compression.as_deref() == Some("fsst") {
385            return Ok(Box::new(FsstMiniBlockEncoder::default()));
386        }
387
388        // 3. Choose base encoder (FSST or Binary) based on data characteristics
389        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
390            >= FSST_LEAST_INPUT_MAX_LENGTH
391            && data_size >= FSST_LEAST_INPUT_SIZE as u64
392        {
393            Box::new(FsstMiniBlockEncoder::default())
394        } else {
395            Box::new(BinaryMiniBlockEncoder::default())
396        };
397
398        // 4. Apply general compression if configured
399        if let Some(compression_scheme) = &params.compression {
400            if compression_scheme != "none" && compression_scheme != "fsst" {
401                let scheme: CompressionScheme = compression_scheme.parse()?;
402                let config = CompressionConfig::new(scheme, params.compression_level);
403                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
404            }
405        }
406
407        Ok(base_encoder)
408    }
409
410    /// Merge user-configured parameters with field metadata
411    /// Field metadata has highest priority
412    fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
413        let mut field_params = self
414            .params
415            .get_field_params(&field.name, &field.data_type());
416
417        // Override with field metadata if present (highest priority)
418        let metadata_params = Self::parse_field_metadata(field);
419        field_params.merge(&metadata_params);
420
421        field_params
422    }
423}
424
425impl CompressionStrategy for DefaultCompressionStrategy {
426    fn create_miniblock_compressor(
427        &self,
428        field: &Field,
429        data: &DataBlock,
430    ) -> Result<Box<dyn MiniBlockCompressor>> {
431        let field_params = self.get_merged_field_params(field);
432
433        match data {
434            DataBlock::FixedWidth(fixed_width_data) => {
435                self.build_fixed_width_compressor(&field_params, fixed_width_data)
436            }
437            DataBlock::VariableWidth(variable_width_data) => {
438                self.build_variable_width_compressor(&field_params, variable_width_data)
439            }
440            DataBlock::Struct(struct_data_block) => {
441                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
442                // just being cautious here.
443                if struct_data_block.has_variable_width_child() {
444                    return Err(Error::invalid_input(
445                        "Packed struct mini-block encoding supports only fixed-width children",
446                        location!(),
447                    ));
448                }
449                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
450            }
451            DataBlock::FixedSizeList(_) => {
452                // Ideally we would compress the list items but this creates something of a challenge.
453                // We don't want to break lists across chunks and we need to worry about inner validity
454                // layers.  If we try and use a compression scheme then it is unlikely to respect these
455                // constraints.
456                //
457                // For now, we just don't compress.  In the future, we might want to consider a more
458                // sophisticated approach.
459                Ok(Box::new(ValueEncoder::default()))
460            }
461            _ => Err(Error::NotSupported {
462                source: format!(
463                    "Mini-block compression not yet supported for block type {}",
464                    data.name()
465                )
466                .into(),
467                location: location!(),
468            }),
469        }
470    }
471
472    fn create_per_value(
473        &self,
474        field: &Field,
475        data: &DataBlock,
476    ) -> Result<Box<dyn PerValueCompressor>> {
477        let field_params = Self::parse_field_metadata(field);
478
479        match data {
480            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
481            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
482            DataBlock::Struct(struct_block) => {
483                if field.children.len() != struct_block.children.len() {
484                    return Err(Error::invalid_input(
485                        "Struct field metadata does not match data block children",
486                        location!(),
487                    ));
488                }
489                let has_variable_child = struct_block.has_variable_width_child();
490                if has_variable_child {
491                    if self.version < LanceFileVersion::V2_2 {
492                        return Err(Error::NotSupported {
493                            source: "Variable packed struct encoding requires Lance file version 2.2 or later".into(),
494                            location: location!(),
495                        });
496                    }
497                    Ok(Box::new(PackedStructVariablePerValueEncoder::new(
498                        self.clone(),
499                        field.children.clone(),
500                    )))
501                } else {
502                    Err(Error::invalid_input(
503                        "Packed struct per-value compression should not be used for fixed-width-only structs",
504                        location!(),
505                    ))
506                }
507            }
508            DataBlock::VariableWidth(variable_width) => {
509                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
510                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
511
512                // If values are very large then use block compression on a per-value basis
513                //
514                // TODO: Could maybe use median here
515
516                let per_value_requested =
517                    if let Some(compression) = field_params.compression.as_deref() {
518                        compression != "none" && compression != "fsst"
519                    } else {
520                        false
521                    };
522
523                if (max_len > 32 * 1024 || per_value_requested)
524                    && data_size >= FSST_LEAST_INPUT_SIZE as u64
525                {
526                    return Ok(Box::new(CompressedBufferEncoder::default()));
527                }
528
529                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
530                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
531                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
532
533                    let variable_compression = Box::new(VariableEncoder::default());
534
535                    // Use FSST if explicitly requested or if data characteristics warrant it
536                    if field_params.compression.as_deref() == Some("fsst")
537                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
538                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
539                    {
540                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
541                    } else {
542                        Ok(variable_compression)
543                    }
544                } else {
545                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
546                }
547            }
548            _ => unreachable!(
549                "Per-value compression not yet supported for block type: {}",
550                data.name()
551            ),
552        }
553    }
554
555    fn create_block_compressor(
556        &self,
557        field: &Field,
558        data: &DataBlock,
559    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
560        let field_params = self.get_merged_field_params(field);
561
562        match data {
563            DataBlock::FixedWidth(fixed_width) => {
564                if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
565                    return Ok((compressor, encoding));
566                }
567
568                // Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION)
569                if let Some((compressor, config)) =
570                    try_general_compression(self.version, &field_params, data)?
571                {
572                    let encoding = ProtobufUtils21::wrapped(
573                        config,
574                        ProtobufUtils21::flat(fixed_width.bits_per_value, None),
575                    )?;
576                    return Ok((compressor, encoding));
577                }
578
579                let encoder = Box::new(ValueEncoder::default());
580                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
581                Ok((encoder, encoding))
582            }
583            DataBlock::VariableWidth(variable_width) => {
584                // Try general compression
585                if let Some((compressor, config)) =
586                    try_general_compression(self.version, &field_params, data)?
587                {
588                    let encoding = ProtobufUtils21::wrapped(
589                        config,
590                        ProtobufUtils21::variable(
591                            ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
592                            None,
593                        ),
594                    )?;
595                    return Ok((compressor, encoding));
596                }
597
598                let encoder = Box::new(VariableEncoder::default());
599                let encoding = ProtobufUtils21::variable(
600                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
601                    None,
602                );
603                Ok((encoder, encoding))
604            }
605            _ => unreachable!(),
606        }
607    }
608}
609
610pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
611    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
612}
613
614pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
615    /// Decompress one or more values
616    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
617    /// The number of bits in each value
618    ///
619    /// Currently (and probably long term) this must be a multiple of 8
620    fn bits_per_value(&self) -> u64;
621}
622
623pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
624    /// Decompress one or more values
625    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
626}
627
628pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
629    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
630}
631
632pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
633    fn create_miniblock_decompressor(
634        &self,
635        description: &CompressiveEncoding,
636        decompression_strategy: &dyn DecompressionStrategy,
637    ) -> Result<Box<dyn MiniBlockDecompressor>>;
638
639    fn create_fixed_per_value_decompressor(
640        &self,
641        description: &CompressiveEncoding,
642    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
643
644    fn create_variable_per_value_decompressor(
645        &self,
646        description: &CompressiveEncoding,
647    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
648
649    fn create_block_decompressor(
650        &self,
651        description: &CompressiveEncoding,
652    ) -> Result<Box<dyn BlockDecompressor>>;
653}
654
655#[derive(Debug, Default)]
656pub struct DefaultDecompressionStrategy {}
657
658impl DecompressionStrategy for DefaultDecompressionStrategy {
659    fn create_miniblock_decompressor(
660        &self,
661        description: &CompressiveEncoding,
662        decompression_strategy: &dyn DecompressionStrategy,
663    ) -> Result<Box<dyn MiniBlockDecompressor>> {
664        match description.compression.as_ref().unwrap() {
665            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
666            #[cfg(feature = "bitpacking")]
667            Compression::InlineBitpacking(description) => {
668                Ok(Box::new(InlineBitpacking::from_description(description)))
669            }
670            #[cfg(not(feature = "bitpacking"))]
671            Compression::InlineBitpacking(_) => Err(Error::NotSupported {
672                source: "this runtime was not built with bitpacking support".into(),
673                location: location!(),
674            }),
675            Compression::Variable(variable) => {
676                let Compression::Flat(offsets) = variable
677                    .offsets
678                    .as_ref()
679                    .unwrap()
680                    .compression
681                    .as_ref()
682                    .unwrap()
683                else {
684                    panic!("Variable compression only supports flat offsets")
685                };
686                Ok(Box::new(BinaryMiniBlockDecompressor::new(
687                    offsets.bits_per_value as u8,
688                )))
689            }
690            Compression::Fsst(description) => {
691                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
692                    description.values.as_ref().unwrap(),
693                    decompression_strategy,
694                )?;
695                Ok(Box::new(FsstMiniBlockDecompressor::new(
696                    description,
697                    inner_decompressor,
698                )))
699            }
700            Compression::PackedStruct(description) => Ok(Box::new(
701                PackedStructFixedWidthMiniBlockDecompressor::new(description),
702            )),
703            Compression::VariablePackedStruct(_) => Err(Error::NotSupported {
704                source: "variable packed struct decoding is not yet implemented".into(),
705                location: location!(),
706            }),
707            Compression::FixedSizeList(fsl) => {
708                // In the future, we might need to do something more complex here if FSL supports
709                // compression.
710                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
711            }
712            Compression::Rle(rle) => {
713                let Compression::Flat(values) =
714                    rle.values.as_ref().unwrap().compression.as_ref().unwrap()
715                else {
716                    panic!("RLE compression only supports flat values")
717                };
718                let Compression::Flat(run_lengths) = rle
719                    .run_lengths
720                    .as_ref()
721                    .unwrap()
722                    .compression
723                    .as_ref()
724                    .unwrap()
725                else {
726                    panic!("RLE compression only supports flat run lengths")
727                };
728                assert_eq!(
729                    run_lengths.bits_per_value, 8,
730                    "RLE compression only supports 8-bit run lengths"
731                );
732                Ok(Box::new(RleMiniBlockDecompressor::new(
733                    values.bits_per_value,
734                )))
735            }
736            Compression::ByteStreamSplit(bss) => {
737                let Compression::Flat(values) =
738                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
739                else {
740                    panic!("ByteStreamSplit compression only supports flat values")
741                };
742                Ok(Box::new(ByteStreamSplitDecompressor::new(
743                    values.bits_per_value as usize,
744                )))
745            }
746            Compression::General(general) => {
747                // Create inner decompressor
748                let inner_decompressor = self.create_miniblock_decompressor(
749                    general.values.as_ref().ok_or_else(|| {
750                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
751                    })?,
752                    decompression_strategy,
753                )?;
754
755                // Parse compression config
756                let compression = general.compression.as_ref().ok_or_else(|| {
757                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
758                })?;
759
760                let scheme = compression.scheme().try_into()?;
761
762                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
763                    scheme,
764                    compression.level,
765                );
766
767                Ok(Box::new(GeneralMiniBlockDecompressor::new(
768                    inner_decompressor,
769                    compression_config,
770                )))
771            }
772            _ => todo!(),
773        }
774    }
775
776    fn create_fixed_per_value_decompressor(
777        &self,
778        description: &CompressiveEncoding,
779    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
780        match description.compression.as_ref().unwrap() {
781            Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
782                constant
783                    .value
784                    .as_ref()
785                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
786            ))),
787            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
788            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
789            _ => todo!("fixed-per-value decompressor for {:?}", description),
790        }
791    }
792
793    fn create_variable_per_value_decompressor(
794        &self,
795        description: &CompressiveEncoding,
796    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
797        match description.compression.as_ref().unwrap() {
798            Compression::Variable(variable) => {
799                let Compression::Flat(offsets) = variable
800                    .offsets
801                    .as_ref()
802                    .unwrap()
803                    .compression
804                    .as_ref()
805                    .unwrap()
806                else {
807                    panic!("Variable compression only supports flat offsets")
808                };
809                assert!(offsets.bits_per_value < u8::MAX as u64);
810                Ok(Box::new(VariableDecoder::default()))
811            }
812            Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
813                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
814                Box::new(VariableDecoder::default()),
815            ))),
816            Compression::General(ref general) => {
817                Ok(Box::new(CompressedBufferEncoder::from_scheme(
818                    general.compression.as_ref().expect_ok()?.scheme(),
819                )?))
820            }
821            Compression::VariablePackedStruct(description) => {
822                let mut fields = Vec::with_capacity(description.fields.len());
823                for field in &description.fields {
824                    let value_encoding = field.value.as_ref().ok_or_else(|| {
825                        Error::invalid_input(
826                            "VariablePackedStruct field is missing value encoding",
827                            location!(),
828                        )
829                    })?;
830                    let decoder = match field.layout.as_ref().ok_or_else(|| {
831                        Error::invalid_input(
832                            "VariablePackedStruct field is missing layout details",
833                            location!(),
834                        )
835                    })? {
836                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
837                            bits_per_value,
838                        ) => {
839                            let decompressor =
840                                self.create_fixed_per_value_decompressor(value_encoding)?;
841                            VariablePackedStructFieldDecoder {
842                                kind: VariablePackedStructFieldKind::Fixed {
843                                    bits_per_value: *bits_per_value,
844                                    decompressor: Arc::from(decompressor),
845                                },
846                            }
847                        }
848                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
849                            bits_per_length,
850                        ) => {
851                            let decompressor =
852                                self.create_variable_per_value_decompressor(value_encoding)?;
853                            VariablePackedStructFieldDecoder {
854                                kind: VariablePackedStructFieldKind::Variable {
855                                    bits_per_length: *bits_per_length,
856                                    decompressor: Arc::from(decompressor),
857                                },
858                            }
859                        }
860                    };
861                    fields.push(decoder);
862                }
863                Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
864                    fields,
865                )))
866            }
867            _ => todo!("variable-per-value decompressor for {:?}", description),
868        }
869    }
870
871    fn create_block_decompressor(
872        &self,
873        description: &CompressiveEncoding,
874    ) -> Result<Box<dyn BlockDecompressor>> {
875        match description.compression.as_ref().unwrap() {
876            Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
877                InlineBitpacking::from_description(inline_bitpacking),
878            )),
879            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
880            Compression::Constant(constant) => {
881                let scalar = constant
882                    .value
883                    .as_ref()
884                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
885                Ok(Box::new(ConstantDecompressor::new(scalar)))
886            }
887            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
888            Compression::FixedSizeList(fsl) => {
889                Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
890            }
891            Compression::OutOfLineBitpacking(out_of_line) => {
892                // Extract the compressed bit width from the values encoding
893                let compressed_bit_width = match out_of_line
894                    .values
895                    .as_ref()
896                    .unwrap()
897                    .compression
898                    .as_ref()
899                    .unwrap()
900                {
901                    Compression::Flat(flat) => flat.bits_per_value,
902                    _ => {
903                        return Err(Error::InvalidInput {
904                            location: location!(),
905                            source: "OutOfLineBitpacking values must use Flat encoding".into(),
906                        })
907                    }
908                };
909                Ok(Box::new(OutOfLineBitpacking::new(
910                    compressed_bit_width,
911                    out_of_line.uncompressed_bits_per_value,
912                )))
913            }
914            Compression::General(general) => {
915                let inner_desc = general
916                    .values
917                    .as_ref()
918                    .ok_or_else(|| {
919                        Error::invalid_input(
920                            "General compression missing inner encoding",
921                            location!(),
922                        )
923                    })?
924                    .as_ref();
925                let inner_decompressor = self.create_block_decompressor(inner_desc)?;
926
927                let compression = general.compression.as_ref().ok_or_else(|| {
928                    Error::invalid_input(
929                        "General compression missing compression config",
930                        location!(),
931                    )
932                })?;
933                let scheme = compression.scheme().try_into()?;
934                let config = CompressionConfig::new(scheme, compression.level);
935                let general_decompressor =
936                    GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
937
938                Ok(Box::new(general_decompressor))
939            }
940            _ => todo!(),
941        }
942    }
943}
944
945#[cfg(test)]
946mod tests {
947    use super::*;
948    use crate::buffer::LanceBuffer;
949    use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
950    use arrow_schema::{DataType, Field as ArrowField};
951    use std::collections::HashMap;
952
953    fn create_test_field(name: &str, data_type: DataType) -> Field {
954        let arrow_field = ArrowField::new(name, data_type, true);
955        let mut field = Field::try_from(&arrow_field).unwrap();
956        field.id = -1;
957        field
958    }
959
960    fn create_fixed_width_block_with_stats(
961        bits_per_value: u64,
962        num_values: u64,
963        run_count: u64,
964    ) -> DataBlock {
965        // Create varied data to avoid low entropy
966        let bytes_per_value = (bits_per_value / 8) as usize;
967        let total_bytes = bytes_per_value * num_values as usize;
968        let mut data = vec![0u8; total_bytes];
969
970        // Create data with specified run count
971        let values_per_run = (num_values / run_count).max(1);
972        let mut run_value = 0u8;
973
974        for i in 0..num_values as usize {
975            if i % values_per_run as usize == 0 {
976                run_value = run_value.wrapping_add(17); // Use prime to get varied values
977            }
978            // Fill all bytes of the value to create high entropy
979            for j in 0..bytes_per_value {
980                let byte_offset = i * bytes_per_value + j;
981                if byte_offset < data.len() {
982                    data[byte_offset] = run_value.wrapping_add(j as u8);
983                }
984            }
985        }
986
987        let mut block = FixedWidthDataBlock {
988            bits_per_value,
989            data: LanceBuffer::reinterpret_vec(data),
990            num_values,
991            block_info: BlockInfo::default(),
992        };
993
994        // Compute all statistics including BytePositionEntropy
995        use crate::statistics::ComputeStat;
996        block.compute_stat();
997
998        DataBlock::FixedWidth(block)
999    }
1000
1001    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1002        // Create data with some variety to avoid always triggering BSS
1003        let bytes_per_value = (bits_per_value / 8) as usize;
1004        let total_bytes = bytes_per_value * num_values as usize;
1005        let mut data = vec![0u8; total_bytes];
1006
1007        // Add some variation to the data to make it more realistic
1008        for i in 0..num_values as usize {
1009            let byte_offset = i * bytes_per_value;
1010            if byte_offset < data.len() {
1011                data[byte_offset] = (i % 256) as u8;
1012            }
1013        }
1014
1015        let mut block = FixedWidthDataBlock {
1016            bits_per_value,
1017            data: LanceBuffer::reinterpret_vec(data),
1018            num_values,
1019            block_info: BlockInfo::default(),
1020        };
1021
1022        // Compute all statistics including BytePositionEntropy
1023        use crate::statistics::ComputeStat;
1024        block.compute_stat();
1025
1026        DataBlock::FixedWidth(block)
1027    }
1028
1029    #[test]
1030    fn test_parameter_based_compression() {
1031        let mut params = CompressionParams::new();
1032
1033        // Configure RLE for ID columns with BSS explicitly disabled
1034        params.columns.insert(
1035            "*_id".to_string(),
1036            CompressionFieldParams {
1037                rle_threshold: Some(0.3),
1038                compression: Some("lz4".to_string()),
1039                compression_level: None,
1040                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
1041            },
1042        );
1043
1044        let strategy = DefaultCompressionStrategy::with_params(params);
1045        let field = create_test_field("user_id", DataType::Int32);
1046
1047        // Create data with low run count for RLE
1048        // Use create_fixed_width_block_with_stats which properly sets run count
1049        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
1050
1051        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1052        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
1053        let debug_str = format!("{:?}", compressor);
1054
1055        // The compressor should be RLE wrapped in general compression
1056        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1057        assert!(debug_str.contains("RleMiniBlockEncoder"));
1058    }
1059
1060    #[test]
1061    fn test_type_level_parameters() {
1062        let mut params = CompressionParams::new();
1063
1064        // Configure all Int32 to use specific settings
1065        params.types.insert(
1066            "Int32".to_string(),
1067            CompressionFieldParams {
1068                rle_threshold: Some(0.1), // Very low threshold
1069                compression: Some("zstd".to_string()),
1070                compression_level: Some(3),
1071                bss: Some(BssMode::Off), // Disable BSS to test RLE
1072            },
1073        );
1074
1075        let strategy = DefaultCompressionStrategy::with_params(params);
1076        let field = create_test_field("some_column", DataType::Int32);
1077        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
1078        let data = create_fixed_width_block_with_stats(32, 1000, 50);
1079
1080        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1081        // Should use RLE due to very low threshold
1082        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
1083    }
1084
1085    #[test]
1086    fn test_none_compression() {
1087        let mut params = CompressionParams::new();
1088
1089        // Disable compression for embeddings
1090        params.columns.insert(
1091            "embeddings".to_string(),
1092            CompressionFieldParams {
1093                compression: Some("none".to_string()),
1094                ..Default::default()
1095            },
1096        );
1097
1098        let strategy = DefaultCompressionStrategy::with_params(params);
1099        let field = create_test_field("embeddings", DataType::Float32);
1100        let data = create_fixed_width_block(32, 1000);
1101
1102        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1103        // Should use ValueEncoder (no compression)
1104        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1105    }
1106
1107    #[test]
1108    fn test_parameter_merge_priority() {
1109        let mut params = CompressionParams::new();
1110
1111        // Set type-level
1112        params.types.insert(
1113            "Int32".to_string(),
1114            CompressionFieldParams {
1115                rle_threshold: Some(0.5),
1116                compression: Some("lz4".to_string()),
1117                ..Default::default()
1118            },
1119        );
1120
1121        // Set column-level (highest priority)
1122        params.columns.insert(
1123            "user_id".to_string(),
1124            CompressionFieldParams {
1125                rle_threshold: Some(0.2),
1126                compression: Some("zstd".to_string()),
1127                compression_level: Some(6),
1128                bss: None,
1129            },
1130        );
1131
1132        let strategy = DefaultCompressionStrategy::with_params(params);
1133
1134        // Get merged params
1135        let merged = strategy
1136            .params
1137            .get_field_params("user_id", &DataType::Int32);
1138
1139        // Column params should override type params
1140        assert_eq!(merged.rle_threshold, Some(0.2));
1141        assert_eq!(merged.compression, Some("zstd".to_string()));
1142        assert_eq!(merged.compression_level, Some(6));
1143
1144        // Test field with only type params
1145        let merged = strategy
1146            .params
1147            .get_field_params("other_field", &DataType::Int32);
1148        assert_eq!(merged.rle_threshold, Some(0.5));
1149        assert_eq!(merged.compression, Some("lz4".to_string()));
1150        assert_eq!(merged.compression_level, None);
1151    }
1152
1153    #[test]
1154    fn test_pattern_matching() {
1155        let mut params = CompressionParams::new();
1156
1157        // Configure pattern for log files
1158        params.columns.insert(
1159            "log_*".to_string(),
1160            CompressionFieldParams {
1161                compression: Some("zstd".to_string()),
1162                compression_level: Some(6),
1163                ..Default::default()
1164            },
1165        );
1166
1167        let strategy = DefaultCompressionStrategy::with_params(params);
1168
1169        // Should match pattern
1170        let merged = strategy
1171            .params
1172            .get_field_params("log_messages", &DataType::Utf8);
1173        assert_eq!(merged.compression, Some("zstd".to_string()));
1174        assert_eq!(merged.compression_level, Some(6));
1175
1176        // Should not match
1177        let merged = strategy
1178            .params
1179            .get_field_params("messages_log", &DataType::Utf8);
1180        assert_eq!(merged.compression, None);
1181    }
1182
1183    #[test]
1184    fn test_legacy_metadata_support() {
1185        let params = CompressionParams::new();
1186        let strategy = DefaultCompressionStrategy::with_params(params);
1187
1188        // Test field with "none" compression metadata
1189        let mut metadata = HashMap::new();
1190        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1191        let mut field = create_test_field("some_column", DataType::Int32);
1192        field.metadata = metadata;
1193
1194        let data = create_fixed_width_block(32, 1000);
1195        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1196
1197        // Should respect metadata and use ValueEncoder
1198        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1199    }
1200
1201    #[test]
1202    fn test_default_behavior() {
1203        // Empty params should fall back to default behavior
1204        let params = CompressionParams::new();
1205        let strategy = DefaultCompressionStrategy::with_params(params);
1206
1207        let field = create_test_field("random_column", DataType::Int32);
1208        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
1209        let data = create_fixed_width_block_with_stats(32, 1000, 600);
1210
1211        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1212        // Should use default strategy's decision
1213        let debug_str = format!("{:?}", compressor);
1214        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1215    }
1216
1217    #[test]
1218    fn test_field_metadata_compression() {
1219        let params = CompressionParams::new();
1220        let strategy = DefaultCompressionStrategy::with_params(params);
1221
1222        // Test field with compression metadata
1223        let mut metadata = HashMap::new();
1224        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1225        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1226        let mut field = create_test_field("test_column", DataType::Int32);
1227        field.metadata = metadata;
1228
1229        let data = create_fixed_width_block(32, 1000);
1230        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1231
1232        // Should use zstd with level 6
1233        let debug_str = format!("{:?}", compressor);
1234        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1235    }
1236
1237    #[test]
1238    fn test_field_metadata_rle_threshold() {
1239        let params = CompressionParams::new();
1240        let strategy = DefaultCompressionStrategy::with_params(params);
1241
1242        // Test field with RLE threshold metadata
1243        let mut metadata = HashMap::new();
1244        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1245        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
1246        let mut field = create_test_field("test_column", DataType::Int32);
1247        field.metadata = metadata;
1248
1249        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
1250        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
1251        let data = create_fixed_width_block_with_stats(32, 1000, 100);
1252
1253        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1254
1255        // Should use RLE because run_count (100) < num_values * threshold (800)
1256        let debug_str = format!("{:?}", compressor);
1257        assert!(debug_str.contains("RleMiniBlockEncoder"));
1258    }
1259
1260    #[test]
1261    fn test_field_metadata_override_params() {
1262        // Set up params with one configuration
1263        let mut params = CompressionParams::new();
1264        params.columns.insert(
1265            "test_column".to_string(),
1266            CompressionFieldParams {
1267                rle_threshold: Some(0.3),
1268                compression: Some("lz4".to_string()),
1269                compression_level: None,
1270                bss: None,
1271            },
1272        );
1273
1274        let strategy = DefaultCompressionStrategy::with_params(params);
1275
1276        // Field metadata should override params
1277        let mut metadata = HashMap::new();
1278        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1279        let mut field = create_test_field("test_column", DataType::Int32);
1280        field.metadata = metadata;
1281
1282        let data = create_fixed_width_block(32, 1000);
1283        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1284
1285        // Should use none compression (from metadata) instead of lz4 (from params)
1286        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1287    }
1288
1289    #[test]
1290    fn test_field_metadata_mixed_configuration() {
1291        // Configure type-level params
1292        let mut params = CompressionParams::new();
1293        params.types.insert(
1294            "Int32".to_string(),
1295            CompressionFieldParams {
1296                rle_threshold: Some(0.5),
1297                compression: Some("lz4".to_string()),
1298                ..Default::default()
1299            },
1300        );
1301
1302        let strategy = DefaultCompressionStrategy::with_params(params);
1303
1304        // Field metadata provides partial override
1305        let mut metadata = HashMap::new();
1306        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1307        let mut field = create_test_field("test_column", DataType::Int32);
1308        field.metadata = metadata;
1309
1310        let data = create_fixed_width_block(32, 1000);
1311        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1312
1313        // Should use lz4 (from type params) with level 3 (from metadata)
1314        let debug_str = format!("{:?}", compressor);
1315        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1316    }
1317
1318    #[test]
1319    fn test_bss_field_metadata() {
1320        let params = CompressionParams::new();
1321        let strategy = DefaultCompressionStrategy::with_params(params);
1322
1323        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1324        let mut metadata = HashMap::new();
1325        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1326        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1327        let arrow_field =
1328            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1329        let field = Field::try_from(&arrow_field).unwrap();
1330
1331        // Create float data
1332        let data = create_fixed_width_block(32, 100);
1333
1334        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1335        let debug_str = format!("{:?}", compressor);
1336        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1337    }
1338
1339    #[test]
1340    fn test_bss_with_compression() {
1341        let params = CompressionParams::new();
1342        let strategy = DefaultCompressionStrategy::with_params(params);
1343
1344        // Test BSS with LZ4 compression
1345        let mut metadata = HashMap::new();
1346        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1347        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1348        let arrow_field =
1349            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1350        let field = Field::try_from(&arrow_field).unwrap();
1351
1352        // Create double data
1353        let data = create_fixed_width_block(64, 100);
1354
1355        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1356        let debug_str = format!("{:?}", compressor);
1357        // Should have BSS wrapped in general compression
1358        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1359        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1360    }
1361
1362    #[test]
1363    #[cfg(any(feature = "lz4", feature = "zstd"))]
1364    fn test_general_block_decompression_fixed_width_v2_2() {
1365        // Request general compression via the write path (2.2 requirement) and ensure the read path mirrors it.
1366        let mut params = CompressionParams::new();
1367        params.columns.insert(
1368            "dict_values".to_string(),
1369            CompressionFieldParams {
1370                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1371                ..Default::default()
1372            },
1373        );
1374
1375        let mut strategy = DefaultCompressionStrategy::with_params(params);
1376        strategy.version = LanceFileVersion::V2_2;
1377
1378        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1379        let data = create_fixed_width_block(24, 1024);
1380        let DataBlock::FixedWidth(expected_block) = &data else {
1381            panic!("expected fixed width block");
1382        };
1383        let expected_bits = expected_block.bits_per_value;
1384        let expected_num_values = expected_block.num_values;
1385        let num_values = expected_num_values;
1386
1387        let (compressor, encoding) = strategy
1388            .create_block_compressor(&field, &data)
1389            .expect("general compression should be selected");
1390        match encoding.compression.as_ref() {
1391            Some(Compression::General(_)) => {}
1392            other => panic!("expected general compression, got {:?}", other),
1393        }
1394
1395        let compressed_buffer = compressor
1396            .compress(data.clone())
1397            .expect("write path general compression should succeed");
1398
1399        let decompressor = DefaultDecompressionStrategy::default()
1400            .create_block_decompressor(&encoding)
1401            .expect("general block decompressor should be created");
1402
1403        let decoded = decompressor
1404            .decompress(compressed_buffer, num_values)
1405            .expect("decompression should succeed");
1406
1407        match decoded {
1408            DataBlock::FixedWidth(block) => {
1409                assert_eq!(block.bits_per_value, expected_bits);
1410                assert_eq!(block.num_values, expected_num_values);
1411                assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1412            }
1413            _ => panic!("expected fixed width block"),
1414        }
1415    }
1416}