lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{
36                CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37                GeneralBlockDecompressor,
38            },
39            byte_stream_split::{
40                should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
41            },
42            constant::ConstantDecompressor,
43            fsst::{
44                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45                FsstPerValueEncoder,
46            },
47            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48            packed::{
49                PackedStructFixedWidthMiniBlockDecompressor,
50                PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51                PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52                VariablePackedStructFieldKind,
53            },
54            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
55            value::{ValueDecompressor, ValueEncoder},
56        },
57    },
58    format::{
59        pb21::{compressive_encoding::Compression, CompressiveEncoding},
60        ProtobufUtils21,
61    },
62    statistics::{GetStat, Stat},
63    version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
68use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
69use snafu::location;
70use std::{str::FromStr, sync::Arc};
71
72/// Default threshold for RLE compression selection.
73/// RLE is chosen when the run count is less than this fraction of total values.
74const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
75
76// Minimum block size (32kb) to trigger general block compression
77const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
78
79/// Trait for compression algorithms that compress an entire block of data into one opaque
80/// and self-described chunk.
81///
82/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
83///
84/// This is the most general type of compression.  There are no constraints on the method
85/// of compression it is assumed that the entire block of data will be present at decompression.
86///
87/// This is the least appropriate strategy for random access because we must load the entire
88/// block to access any single value.  This should only be used for cases where random access is never
89/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
90/// mini-block chunks)
91pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
92    /// Compress the data into a single buffer
93    ///
94    /// Also returns a description of the compression that can be used to decompress
95    /// when reading the data back
96    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
97}
98
99/// A trait to pick which compression to use for given data
100///
101/// There are several different kinds of compression.
102///
103/// - Block compression is the most generic, but most difficult to use efficiently
104/// - Per-value compression results in either a fixed width data block or a variable
105///   width data block.  In other words, there is some number of bits per value.
106///   In addition, each value should be independently decompressible.
107/// - Mini-block compression results in a small block of opaque data for chunks
108///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
109///   used for narrow data types (both fixed and variable length) where we can
110///   fit many values into an 16KiB block.
111pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
112    /// Create a block compressor for the given data
113    fn create_block_compressor(
114        &self,
115        field: &Field,
116        data: &DataBlock,
117    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
118
119    /// Create a per-value compressor for the given data
120    fn create_per_value(
121        &self,
122        field: &Field,
123        data: &DataBlock,
124    ) -> Result<Box<dyn PerValueCompressor>>;
125
126    /// Create a mini-block compressor for the given data
127    fn create_miniblock_compressor(
128        &self,
129        field: &Field,
130        data: &DataBlock,
131    ) -> Result<Box<dyn MiniBlockCompressor>>;
132}
133
134#[derive(Debug, Default, Clone)]
135pub struct DefaultCompressionStrategy {
136    /// User-configured compression parameters
137    params: CompressionParams,
138    /// The lance file version for compatibilities.
139    version: LanceFileVersion,
140}
141
142fn try_bss_for_mini_block(
143    data: &FixedWidthDataBlock,
144    params: &CompressionFieldParams,
145) -> Option<Box<dyn MiniBlockCompressor>> {
146    // BSS requires general compression to be effective
147    // If compression is not set or explicitly disabled, skip BSS
148    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
149        return None;
150    }
151
152    let mode = params.bss.unwrap_or(BssMode::Auto);
153    // should_use_bss already checks for supported bit widths (32/64)
154    if should_use_bss(data, mode) {
155        return Some(Box::new(ByteStreamSplitEncoder::new(
156            data.bits_per_value as usize,
157        )));
158    }
159    None
160}
161
162fn try_rle_for_mini_block(
163    data: &FixedWidthDataBlock,
164    params: &CompressionFieldParams,
165) -> Option<Box<dyn MiniBlockCompressor>> {
166    let bits = data.bits_per_value;
167    if !matches!(bits, 8 | 16 | 32 | 64) {
168        return None;
169    }
170
171    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
172    let threshold = params
173        .rle_threshold
174        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
175
176    if (run_count as f64) < (data.num_values as f64) * threshold {
177        return Some(Box::new(RleMiniBlockEncoder::new()));
178    }
179    None
180}
181
182fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
183    #[cfg(feature = "bitpacking")]
184    {
185        use arrow_array::cast::AsArray;
186
187        let bits = _data.bits_per_value;
188        if !matches!(bits, 8 | 16 | 32 | 64) {
189            return None;
190        }
191
192        let bit_widths = _data.expect_stat(Stat::BitWidth);
193        let widths = bit_widths.as_primitive::<UInt64Type>();
194        let too_small = widths.len() == 1
195            && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
196
197        if !too_small {
198            return Some(Box::new(InlineBitpacking::new(bits)));
199        }
200        None
201    }
202    #[cfg(not(feature = "bitpacking"))]
203    {
204        None
205    }
206}
207
208fn try_bitpack_for_block(
209    data: &FixedWidthDataBlock,
210) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
211    let bits = data.bits_per_value;
212    if !matches!(bits, 8 | 16 | 32 | 64) {
213        return None;
214    }
215
216    let bit_widths = data.expect_stat(Stat::BitWidth);
217    let widths = bit_widths.as_primitive::<UInt64Type>();
218    let has_all_zeros = widths.values().contains(&0);
219    let max_bit_width = *widths.values().iter().max().unwrap();
220
221    let too_small =
222        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
223
224    if has_all_zeros || too_small {
225        return None;
226    }
227
228    if data.num_values <= 1024 {
229        let compressor = Box::new(InlineBitpacking::new(bits));
230        let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
231        Some((compressor, encoding))
232    } else {
233        let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
234        let encoding = ProtobufUtils21::out_of_line_bitpacking(
235            bits,
236            ProtobufUtils21::flat(max_bit_width, None),
237        );
238        Some((compressor, encoding))
239    }
240}
241
242fn maybe_wrap_general_for_mini_block(
243    inner: Box<dyn MiniBlockCompressor>,
244    params: &CompressionFieldParams,
245) -> Result<Box<dyn MiniBlockCompressor>> {
246    match params.compression.as_deref() {
247        None | Some("none") | Some("fsst") => Ok(inner),
248        Some(raw) => {
249            let scheme = CompressionScheme::from_str(raw).map_err(|_| {
250                lance_core::Error::invalid_input(
251                    format!("Unknown compression scheme: {raw}"),
252                    location!(),
253                )
254            })?;
255            let cfg = CompressionConfig::new(scheme, params.compression_level);
256            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
257        }
258    }
259}
260
261fn try_general_compression(
262    version: LanceFileVersion,
263    field_params: &CompressionFieldParams,
264    data: &DataBlock,
265) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
266    // User-requested compression (unused today but perhaps still used
267    // in the future someday)
268    if let Some(compression_scheme) = &field_params.compression {
269        if compression_scheme != "none" && version >= LanceFileVersion::V2_2 {
270            let scheme: CompressionScheme = compression_scheme.parse()?;
271            let config = CompressionConfig::new(scheme, field_params.compression_level);
272            let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
273            return Ok(Some((compressor, config)));
274        }
275    }
276
277    // Automatic compression for large blocks
278    if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
279        && version >= LanceFileVersion::V2_2
280    {
281        let compressor = Box::new(CompressedBufferEncoder::default());
282        let config = compressor.compressor.config();
283        return Ok(Some((compressor, config)));
284    }
285
286    Ok(None)
287}
288
289impl DefaultCompressionStrategy {
290    /// Create a new compression strategy with default behavior
291    pub fn new() -> Self {
292        Self::default()
293    }
294
295    /// Create a new compression strategy with user-configured parameters
296    pub fn with_params(params: CompressionParams) -> Self {
297        Self {
298            params,
299            version: LanceFileVersion::default(),
300        }
301    }
302
303    /// Override the file version used to make compression decisions
304    pub fn with_version(mut self, version: LanceFileVersion) -> Self {
305        self.version = version;
306        self
307    }
308
309    /// Parse compression parameters from field metadata
310    fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
311        let mut params = CompressionFieldParams::default();
312
313        // Parse compression method
314        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
315            params.compression = Some(compression.clone());
316        }
317
318        // Parse compression level
319        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
320            params.compression_level = level.parse().ok();
321        }
322
323        // Parse RLE threshold
324        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
325            params.rle_threshold = threshold.parse().ok();
326        }
327
328        // Parse BSS mode
329        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
330            match BssMode::parse(bss_str) {
331                Some(mode) => params.bss = Some(mode),
332                None => {
333                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
334                }
335            }
336        }
337
338        params
339    }
340
341    fn build_fixed_width_compressor(
342        &self,
343        params: &CompressionFieldParams,
344        data: &FixedWidthDataBlock,
345    ) -> Result<Box<dyn MiniBlockCompressor>> {
346        if params.compression.as_deref() == Some("none") {
347            return Ok(Box::new(ValueEncoder::default()));
348        }
349
350        let base = try_bss_for_mini_block(data, params)
351            .or_else(|| try_rle_for_mini_block(data, params))
352            .or_else(|| try_bitpack_for_mini_block(data))
353            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
354
355        maybe_wrap_general_for_mini_block(base, params)
356    }
357
358    /// Build compressor based on parameters for variable-width data
359    fn build_variable_width_compressor(
360        &self,
361        params: &CompressionFieldParams,
362        data: &VariableWidthBlock,
363    ) -> Result<Box<dyn MiniBlockCompressor>> {
364        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
365            return Err(Error::invalid_input(
366                format!(
367                    "Variable width compression not supported for {} bit offsets",
368                    data.bits_per_offset
369                ),
370                location!(),
371            ));
372        }
373
374        // Get statistics
375        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
376        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
377
378        // 1. Check for explicit "none" compression
379        if params.compression.as_deref() == Some("none") {
380            return Ok(Box::new(BinaryMiniBlockEncoder::default()));
381        }
382
383        // 2. Check for explicit "fsst" compression
384        if params.compression.as_deref() == Some("fsst") {
385            return Ok(Box::new(FsstMiniBlockEncoder::default()));
386        }
387
388        // 3. Choose base encoder (FSST or Binary) based on data characteristics
389        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
390            >= FSST_LEAST_INPUT_MAX_LENGTH
391            && data_size >= FSST_LEAST_INPUT_SIZE as u64
392        {
393            Box::new(FsstMiniBlockEncoder::default())
394        } else {
395            Box::new(BinaryMiniBlockEncoder::default())
396        };
397
398        // 4. Apply general compression if configured
399        if let Some(compression_scheme) = &params.compression {
400            if compression_scheme != "none" && compression_scheme != "fsst" {
401                let scheme: CompressionScheme = compression_scheme.parse()?;
402                let config = CompressionConfig::new(scheme, params.compression_level);
403                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
404            }
405        }
406
407        Ok(base_encoder)
408    }
409
410    /// Merge user-configured parameters with field metadata
411    /// Field metadata has highest priority
412    fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
413        let mut field_params = self
414            .params
415            .get_field_params(&field.name, &field.data_type());
416
417        // Override with field metadata if present (highest priority)
418        let metadata_params = Self::parse_field_metadata(field);
419        field_params.merge(&metadata_params);
420
421        field_params
422    }
423}
424
425impl CompressionStrategy for DefaultCompressionStrategy {
426    fn create_miniblock_compressor(
427        &self,
428        field: &Field,
429        data: &DataBlock,
430    ) -> Result<Box<dyn MiniBlockCompressor>> {
431        let field_params = self.get_merged_field_params(field);
432
433        match data {
434            DataBlock::FixedWidth(fixed_width_data) => {
435                self.build_fixed_width_compressor(&field_params, fixed_width_data)
436            }
437            DataBlock::VariableWidth(variable_width_data) => {
438                self.build_variable_width_compressor(&field_params, variable_width_data)
439            }
440            DataBlock::Struct(struct_data_block) => {
441                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
442                // just being cautious here.
443                if struct_data_block.has_variable_width_child() {
444                    return Err(Error::invalid_input(
445                        "Packed struct mini-block encoding supports only fixed-width children",
446                        location!(),
447                    ));
448                }
449                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
450            }
451            DataBlock::FixedSizeList(_) => {
452                // Ideally we would compress the list items but this creates something of a challenge.
453                // We don't want to break lists across chunks and we need to worry about inner validity
454                // layers.  If we try and use a compression scheme then it is unlikely to respect these
455                // constraints.
456                //
457                // For now, we just don't compress.  In the future, we might want to consider a more
458                // sophisticated approach.
459                Ok(Box::new(ValueEncoder::default()))
460            }
461            _ => Err(Error::NotSupported {
462                source: format!(
463                    "Mini-block compression not yet supported for block type {}",
464                    data.name()
465                )
466                .into(),
467                location: location!(),
468            }),
469        }
470    }
471
472    fn create_per_value(
473        &self,
474        field: &Field,
475        data: &DataBlock,
476    ) -> Result<Box<dyn PerValueCompressor>> {
477        let field_params = self.get_merged_field_params(field);
478
479        match data {
480            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
481            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
482            DataBlock::Struct(struct_block) => {
483                if field.children.len() != struct_block.children.len() {
484                    return Err(Error::invalid_input(
485                        "Struct field metadata does not match data block children",
486                        location!(),
487                    ));
488                }
489                let has_variable_child = struct_block.has_variable_width_child();
490                if has_variable_child {
491                    if self.version < LanceFileVersion::V2_2 {
492                        return Err(Error::NotSupported {
493                            source: "Variable packed struct encoding requires Lance file version 2.2 or later".into(),
494                            location: location!(),
495                        });
496                    }
497                    Ok(Box::new(PackedStructVariablePerValueEncoder::new(
498                        self.clone(),
499                        field.children.clone(),
500                    )))
501                } else {
502                    Err(Error::invalid_input(
503                        "Packed struct per-value compression should not be used for fixed-width-only structs",
504                        location!(),
505                    ))
506                }
507            }
508            DataBlock::VariableWidth(variable_width) => {
509                // Check for explicit "none" compression
510                if field_params.compression.as_deref() == Some("none") {
511                    return Ok(Box::new(VariableEncoder::default()));
512                }
513
514                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
515                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
516
517                // If values are very large then use block compression on a per-value basis
518                //
519                // TODO: Could maybe use median here
520
521                let per_value_requested =
522                    if let Some(compression) = field_params.compression.as_deref() {
523                        compression != "fsst"
524                    } else {
525                        false
526                    };
527
528                if (max_len > 32 * 1024 || per_value_requested)
529                    && data_size >= FSST_LEAST_INPUT_SIZE as u64
530                {
531                    return Ok(Box::new(CompressedBufferEncoder::default()));
532                }
533
534                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
535                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
536                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
537
538                    let variable_compression = Box::new(VariableEncoder::default());
539
540                    // Use FSST if explicitly requested or if data characteristics warrant it
541                    if field_params.compression.as_deref() == Some("fsst")
542                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
543                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
544                    {
545                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
546                    } else {
547                        Ok(variable_compression)
548                    }
549                } else {
550                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
551                }
552            }
553            _ => unreachable!(
554                "Per-value compression not yet supported for block type: {}",
555                data.name()
556            ),
557        }
558    }
559
560    fn create_block_compressor(
561        &self,
562        field: &Field,
563        data: &DataBlock,
564    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
565        let field_params = self.get_merged_field_params(field);
566
567        match data {
568            DataBlock::FixedWidth(fixed_width) => {
569                if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
570                    return Ok((compressor, encoding));
571                }
572
573                // Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION)
574                if let Some((compressor, config)) =
575                    try_general_compression(self.version, &field_params, data)?
576                {
577                    let encoding = ProtobufUtils21::wrapped(
578                        config,
579                        ProtobufUtils21::flat(fixed_width.bits_per_value, None),
580                    )?;
581                    return Ok((compressor, encoding));
582                }
583
584                let encoder = Box::new(ValueEncoder::default());
585                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
586                Ok((encoder, encoding))
587            }
588            DataBlock::VariableWidth(variable_width) => {
589                // Try general compression
590                if let Some((compressor, config)) =
591                    try_general_compression(self.version, &field_params, data)?
592                {
593                    let encoding = ProtobufUtils21::wrapped(
594                        config,
595                        ProtobufUtils21::variable(
596                            ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
597                            None,
598                        ),
599                    )?;
600                    return Ok((compressor, encoding));
601                }
602
603                let encoder = Box::new(VariableEncoder::default());
604                let encoding = ProtobufUtils21::variable(
605                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
606                    None,
607                );
608                Ok((encoder, encoding))
609            }
610            _ => unreachable!(),
611        }
612    }
613}
614
615pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
616    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
617}
618
619pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
620    /// Decompress one or more values
621    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
622    /// The number of bits in each value
623    ///
624    /// Currently (and probably long term) this must be a multiple of 8
625    fn bits_per_value(&self) -> u64;
626}
627
628pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
629    /// Decompress one or more values
630    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
631}
632
633pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
634    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
635}
636
637pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
638    fn create_miniblock_decompressor(
639        &self,
640        description: &CompressiveEncoding,
641        decompression_strategy: &dyn DecompressionStrategy,
642    ) -> Result<Box<dyn MiniBlockDecompressor>>;
643
644    fn create_fixed_per_value_decompressor(
645        &self,
646        description: &CompressiveEncoding,
647    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
648
649    fn create_variable_per_value_decompressor(
650        &self,
651        description: &CompressiveEncoding,
652    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
653
654    fn create_block_decompressor(
655        &self,
656        description: &CompressiveEncoding,
657    ) -> Result<Box<dyn BlockDecompressor>>;
658}
659
660#[derive(Debug, Default)]
661pub struct DefaultDecompressionStrategy {}
662
663impl DecompressionStrategy for DefaultDecompressionStrategy {
664    fn create_miniblock_decompressor(
665        &self,
666        description: &CompressiveEncoding,
667        decompression_strategy: &dyn DecompressionStrategy,
668    ) -> Result<Box<dyn MiniBlockDecompressor>> {
669        match description.compression.as_ref().unwrap() {
670            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
671            #[cfg(feature = "bitpacking")]
672            Compression::InlineBitpacking(description) => {
673                Ok(Box::new(InlineBitpacking::from_description(description)))
674            }
675            #[cfg(not(feature = "bitpacking"))]
676            Compression::InlineBitpacking(_) => Err(Error::NotSupported {
677                source: "this runtime was not built with bitpacking support".into(),
678                location: location!(),
679            }),
680            Compression::Variable(variable) => {
681                let Compression::Flat(offsets) = variable
682                    .offsets
683                    .as_ref()
684                    .unwrap()
685                    .compression
686                    .as_ref()
687                    .unwrap()
688                else {
689                    panic!("Variable compression only supports flat offsets")
690                };
691                Ok(Box::new(BinaryMiniBlockDecompressor::new(
692                    offsets.bits_per_value as u8,
693                )))
694            }
695            Compression::Fsst(description) => {
696                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
697                    description.values.as_ref().unwrap(),
698                    decompression_strategy,
699                )?;
700                Ok(Box::new(FsstMiniBlockDecompressor::new(
701                    description,
702                    inner_decompressor,
703                )))
704            }
705            Compression::PackedStruct(description) => Ok(Box::new(
706                PackedStructFixedWidthMiniBlockDecompressor::new(description),
707            )),
708            Compression::VariablePackedStruct(_) => Err(Error::NotSupported {
709                source: "variable packed struct decoding is not yet implemented".into(),
710                location: location!(),
711            }),
712            Compression::FixedSizeList(fsl) => {
713                // In the future, we might need to do something more complex here if FSL supports
714                // compression.
715                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
716            }
717            Compression::Rle(rle) => {
718                let Compression::Flat(values) =
719                    rle.values.as_ref().unwrap().compression.as_ref().unwrap()
720                else {
721                    panic!("RLE compression only supports flat values")
722                };
723                let Compression::Flat(run_lengths) = rle
724                    .run_lengths
725                    .as_ref()
726                    .unwrap()
727                    .compression
728                    .as_ref()
729                    .unwrap()
730                else {
731                    panic!("RLE compression only supports flat run lengths")
732                };
733                assert_eq!(
734                    run_lengths.bits_per_value, 8,
735                    "RLE compression only supports 8-bit run lengths"
736                );
737                Ok(Box::new(RleMiniBlockDecompressor::new(
738                    values.bits_per_value,
739                )))
740            }
741            Compression::ByteStreamSplit(bss) => {
742                let Compression::Flat(values) =
743                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
744                else {
745                    panic!("ByteStreamSplit compression only supports flat values")
746                };
747                Ok(Box::new(ByteStreamSplitDecompressor::new(
748                    values.bits_per_value as usize,
749                )))
750            }
751            Compression::General(general) => {
752                // Create inner decompressor
753                let inner_decompressor = self.create_miniblock_decompressor(
754                    general.values.as_ref().ok_or_else(|| {
755                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
756                    })?,
757                    decompression_strategy,
758                )?;
759
760                // Parse compression config
761                let compression = general.compression.as_ref().ok_or_else(|| {
762                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
763                })?;
764
765                let scheme = compression.scheme().try_into()?;
766
767                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
768                    scheme,
769                    compression.level,
770                );
771
772                Ok(Box::new(GeneralMiniBlockDecompressor::new(
773                    inner_decompressor,
774                    compression_config,
775                )))
776            }
777            _ => todo!(),
778        }
779    }
780
781    fn create_fixed_per_value_decompressor(
782        &self,
783        description: &CompressiveEncoding,
784    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
785        match description.compression.as_ref().unwrap() {
786            Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
787                constant
788                    .value
789                    .as_ref()
790                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
791            ))),
792            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
793            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
794            _ => todo!("fixed-per-value decompressor for {:?}", description),
795        }
796    }
797
798    fn create_variable_per_value_decompressor(
799        &self,
800        description: &CompressiveEncoding,
801    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
802        match description.compression.as_ref().unwrap() {
803            Compression::Variable(variable) => {
804                let Compression::Flat(offsets) = variable
805                    .offsets
806                    .as_ref()
807                    .unwrap()
808                    .compression
809                    .as_ref()
810                    .unwrap()
811                else {
812                    panic!("Variable compression only supports flat offsets")
813                };
814                assert!(offsets.bits_per_value < u8::MAX as u64);
815                Ok(Box::new(VariableDecoder::default()))
816            }
817            Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
818                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
819                Box::new(VariableDecoder::default()),
820            ))),
821            Compression::General(ref general) => {
822                Ok(Box::new(CompressedBufferEncoder::from_scheme(
823                    general.compression.as_ref().expect_ok()?.scheme(),
824                )?))
825            }
826            Compression::VariablePackedStruct(description) => {
827                let mut fields = Vec::with_capacity(description.fields.len());
828                for field in &description.fields {
829                    let value_encoding = field.value.as_ref().ok_or_else(|| {
830                        Error::invalid_input(
831                            "VariablePackedStruct field is missing value encoding",
832                            location!(),
833                        )
834                    })?;
835                    let decoder = match field.layout.as_ref().ok_or_else(|| {
836                        Error::invalid_input(
837                            "VariablePackedStruct field is missing layout details",
838                            location!(),
839                        )
840                    })? {
841                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
842                            bits_per_value,
843                        ) => {
844                            let decompressor =
845                                self.create_fixed_per_value_decompressor(value_encoding)?;
846                            VariablePackedStructFieldDecoder {
847                                kind: VariablePackedStructFieldKind::Fixed {
848                                    bits_per_value: *bits_per_value,
849                                    decompressor: Arc::from(decompressor),
850                                },
851                            }
852                        }
853                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
854                            bits_per_length,
855                        ) => {
856                            let decompressor =
857                                self.create_variable_per_value_decompressor(value_encoding)?;
858                            VariablePackedStructFieldDecoder {
859                                kind: VariablePackedStructFieldKind::Variable {
860                                    bits_per_length: *bits_per_length,
861                                    decompressor: Arc::from(decompressor),
862                                },
863                            }
864                        }
865                    };
866                    fields.push(decoder);
867                }
868                Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
869                    fields,
870                )))
871            }
872            _ => todo!("variable-per-value decompressor for {:?}", description),
873        }
874    }
875
876    fn create_block_decompressor(
877        &self,
878        description: &CompressiveEncoding,
879    ) -> Result<Box<dyn BlockDecompressor>> {
880        match description.compression.as_ref().unwrap() {
881            Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
882                InlineBitpacking::from_description(inline_bitpacking),
883            )),
884            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
885            Compression::Constant(constant) => {
886                let scalar = constant
887                    .value
888                    .as_ref()
889                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
890                Ok(Box::new(ConstantDecompressor::new(scalar)))
891            }
892            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
893            Compression::FixedSizeList(fsl) => {
894                Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
895            }
896            Compression::OutOfLineBitpacking(out_of_line) => {
897                // Extract the compressed bit width from the values encoding
898                let compressed_bit_width = match out_of_line
899                    .values
900                    .as_ref()
901                    .unwrap()
902                    .compression
903                    .as_ref()
904                    .unwrap()
905                {
906                    Compression::Flat(flat) => flat.bits_per_value,
907                    _ => {
908                        return Err(Error::InvalidInput {
909                            location: location!(),
910                            source: "OutOfLineBitpacking values must use Flat encoding".into(),
911                        })
912                    }
913                };
914                Ok(Box::new(OutOfLineBitpacking::new(
915                    compressed_bit_width,
916                    out_of_line.uncompressed_bits_per_value,
917                )))
918            }
919            Compression::General(general) => {
920                let inner_desc = general
921                    .values
922                    .as_ref()
923                    .ok_or_else(|| {
924                        Error::invalid_input(
925                            "General compression missing inner encoding",
926                            location!(),
927                        )
928                    })?
929                    .as_ref();
930                let inner_decompressor = self.create_block_decompressor(inner_desc)?;
931
932                let compression = general.compression.as_ref().ok_or_else(|| {
933                    Error::invalid_input(
934                        "General compression missing compression config",
935                        location!(),
936                    )
937                })?;
938                let scheme = compression.scheme().try_into()?;
939                let config = CompressionConfig::new(scheme, compression.level);
940                let general_decompressor =
941                    GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
942
943                Ok(Box::new(general_decompressor))
944            }
945            _ => todo!(),
946        }
947    }
948}
949
950#[cfg(test)]
951mod tests {
952    use super::*;
953    use crate::buffer::LanceBuffer;
954    use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
955    use crate::testing::extract_array_encoding_chain;
956    use arrow_schema::{DataType, Field as ArrowField};
957    use std::collections::HashMap;
958
959    fn create_test_field(name: &str, data_type: DataType) -> Field {
960        let arrow_field = ArrowField::new(name, data_type, true);
961        let mut field = Field::try_from(&arrow_field).unwrap();
962        field.id = -1;
963        field
964    }
965
966    fn create_fixed_width_block_with_stats(
967        bits_per_value: u64,
968        num_values: u64,
969        run_count: u64,
970    ) -> DataBlock {
971        // Create varied data to avoid low entropy
972        let bytes_per_value = (bits_per_value / 8) as usize;
973        let total_bytes = bytes_per_value * num_values as usize;
974        let mut data = vec![0u8; total_bytes];
975
976        // Create data with specified run count
977        let values_per_run = (num_values / run_count).max(1);
978        let mut run_value = 0u8;
979
980        for i in 0..num_values as usize {
981            if i % values_per_run as usize == 0 {
982                run_value = run_value.wrapping_add(17); // Use prime to get varied values
983            }
984            // Fill all bytes of the value to create high entropy
985            for j in 0..bytes_per_value {
986                let byte_offset = i * bytes_per_value + j;
987                if byte_offset < data.len() {
988                    data[byte_offset] = run_value.wrapping_add(j as u8);
989                }
990            }
991        }
992
993        let mut block = FixedWidthDataBlock {
994            bits_per_value,
995            data: LanceBuffer::reinterpret_vec(data),
996            num_values,
997            block_info: BlockInfo::default(),
998        };
999
1000        // Compute all statistics including BytePositionEntropy
1001        use crate::statistics::ComputeStat;
1002        block.compute_stat();
1003
1004        DataBlock::FixedWidth(block)
1005    }
1006
1007    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1008        // Create data with some variety to avoid always triggering BSS
1009        let bytes_per_value = (bits_per_value / 8) as usize;
1010        let total_bytes = bytes_per_value * num_values as usize;
1011        let mut data = vec![0u8; total_bytes];
1012
1013        // Add some variation to the data to make it more realistic
1014        for i in 0..num_values as usize {
1015            let byte_offset = i * bytes_per_value;
1016            if byte_offset < data.len() {
1017                data[byte_offset] = (i % 256) as u8;
1018            }
1019        }
1020
1021        let mut block = FixedWidthDataBlock {
1022            bits_per_value,
1023            data: LanceBuffer::reinterpret_vec(data),
1024            num_values,
1025            block_info: BlockInfo::default(),
1026        };
1027
1028        // Compute all statistics including BytePositionEntropy
1029        use crate::statistics::ComputeStat;
1030        block.compute_stat();
1031
1032        DataBlock::FixedWidth(block)
1033    }
1034
1035    fn create_variable_width_block(
1036        bits_per_offset: u8,
1037        num_values: u64,
1038        avg_value_size: usize,
1039    ) -> DataBlock {
1040        use crate::statistics::ComputeStat;
1041
1042        // Create offsets buffer (num_values + 1 offsets)
1043        let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1044        let mut current_offset = 0i64;
1045        offsets.push(current_offset);
1046
1047        // Generate offsets with varying value sizes
1048        for i in 0..num_values {
1049            let value_size = if avg_value_size == 0 {
1050                1
1051            } else {
1052                ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1053                    .min(avg_value_size * 2)
1054            };
1055            current_offset += value_size as i64;
1056            offsets.push(current_offset);
1057        }
1058
1059        // Create data buffer with realistic content
1060        let total_data_size = current_offset as usize;
1061        let mut data = vec![0u8; total_data_size];
1062
1063        // Fill data with varied content
1064        for i in 0..num_values {
1065            let start_offset = offsets[i as usize] as usize;
1066            let end_offset = offsets[(i + 1) as usize] as usize;
1067
1068            let content = (i % 256) as u8;
1069            for j in 0..end_offset - start_offset {
1070                data[start_offset + j] = content.wrapping_add(j as u8);
1071            }
1072        }
1073
1074        // Convert offsets to appropriate lance buffer
1075        let offsets_buffer = match bits_per_offset {
1076            32 => {
1077                let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1078                LanceBuffer::reinterpret_vec(offsets_32)
1079            }
1080            64 => LanceBuffer::reinterpret_vec(offsets),
1081            _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1082        };
1083
1084        let mut block = VariableWidthBlock {
1085            data: LanceBuffer::from(data),
1086            offsets: offsets_buffer,
1087            bits_per_offset,
1088            num_values,
1089            block_info: BlockInfo::default(),
1090        };
1091
1092        block.compute_stat();
1093        DataBlock::VariableWidth(block)
1094    }
1095
1096    #[test]
1097    fn test_parameter_based_compression() {
1098        let mut params = CompressionParams::new();
1099
1100        // Configure RLE for ID columns with BSS explicitly disabled
1101        params.columns.insert(
1102            "*_id".to_string(),
1103            CompressionFieldParams {
1104                rle_threshold: Some(0.3),
1105                compression: Some("lz4".to_string()),
1106                compression_level: None,
1107                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
1108            },
1109        );
1110
1111        let strategy = DefaultCompressionStrategy::with_params(params);
1112        let field = create_test_field("user_id", DataType::Int32);
1113
1114        // Create data with low run count for RLE
1115        // Use create_fixed_width_block_with_stats which properly sets run count
1116        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
1117
1118        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1119        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
1120        let debug_str = format!("{:?}", compressor);
1121
1122        // The compressor should be RLE wrapped in general compression
1123        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1124        assert!(debug_str.contains("RleMiniBlockEncoder"));
1125    }
1126
1127    #[test]
1128    fn test_type_level_parameters() {
1129        let mut params = CompressionParams::new();
1130
1131        // Configure all Int32 to use specific settings
1132        params.types.insert(
1133            "Int32".to_string(),
1134            CompressionFieldParams {
1135                rle_threshold: Some(0.1), // Very low threshold
1136                compression: Some("zstd".to_string()),
1137                compression_level: Some(3),
1138                bss: Some(BssMode::Off), // Disable BSS to test RLE
1139            },
1140        );
1141
1142        let strategy = DefaultCompressionStrategy::with_params(params);
1143        let field = create_test_field("some_column", DataType::Int32);
1144        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
1145        let data = create_fixed_width_block_with_stats(32, 1000, 50);
1146
1147        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1148        // Should use RLE due to very low threshold
1149        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
1150    }
1151
1152    fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1153        let chain = extract_array_encoding_chain(encoding);
1154        if variable {
1155            assert_eq!(chain.len(), 2);
1156            assert_eq!(chain.first().unwrap().as_str(), "variable");
1157            assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1158        } else {
1159            assert_eq!(chain.len(), 1);
1160            assert_eq!(chain.first().unwrap().as_str(), "flat");
1161        }
1162    }
1163
1164    #[test]
1165    fn test_none_compression() {
1166        let mut params = CompressionParams::new();
1167
1168        // Disable compression for embeddings
1169        params.columns.insert(
1170            "embeddings".to_string(),
1171            CompressionFieldParams {
1172                compression: Some("none".to_string()),
1173                ..Default::default()
1174            },
1175        );
1176
1177        let strategy = DefaultCompressionStrategy::with_params(params);
1178        let field = create_test_field("embeddings", DataType::Float32);
1179        let fixed_data = create_fixed_width_block(32, 1000);
1180        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1181
1182        // Test miniblock
1183        let compressor = strategy
1184            .create_miniblock_compressor(&field, &fixed_data)
1185            .unwrap();
1186        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1187        check_uncompressed_encoding(&encoding, false);
1188        let compressor = strategy
1189            .create_miniblock_compressor(&field, &variable_data)
1190            .unwrap();
1191        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1192        check_uncompressed_encoding(&encoding, true);
1193
1194        // Test pervalue
1195        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1196        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1197        check_uncompressed_encoding(&encoding, false);
1198        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1199        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1200        check_uncompressed_encoding(&encoding, true);
1201    }
1202
1203    #[test]
1204    fn test_field_metadata_none_compression() {
1205        // Prepare field with metadata for none compression
1206        let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1207        let mut metadata = HashMap::new();
1208        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1209        arrow_field = arrow_field.with_metadata(metadata);
1210        let field = Field::try_from(&arrow_field).unwrap();
1211
1212        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1213
1214        // Test miniblock
1215        let fixed_data = create_fixed_width_block(32, 1000);
1216        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1217
1218        let compressor = strategy
1219            .create_miniblock_compressor(&field, &fixed_data)
1220            .unwrap();
1221        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1222        check_uncompressed_encoding(&encoding, false);
1223
1224        let compressor = strategy
1225            .create_miniblock_compressor(&field, &variable_data)
1226            .unwrap();
1227        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1228        check_uncompressed_encoding(&encoding, true);
1229
1230        // Test pervalue
1231        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1232        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1233        check_uncompressed_encoding(&encoding, false);
1234
1235        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1236        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1237        check_uncompressed_encoding(&encoding, true);
1238    }
1239
1240    #[test]
1241    fn test_parameter_merge_priority() {
1242        let mut params = CompressionParams::new();
1243
1244        // Set type-level
1245        params.types.insert(
1246            "Int32".to_string(),
1247            CompressionFieldParams {
1248                rle_threshold: Some(0.5),
1249                compression: Some("lz4".to_string()),
1250                ..Default::default()
1251            },
1252        );
1253
1254        // Set column-level (highest priority)
1255        params.columns.insert(
1256            "user_id".to_string(),
1257            CompressionFieldParams {
1258                rle_threshold: Some(0.2),
1259                compression: Some("zstd".to_string()),
1260                compression_level: Some(6),
1261                bss: None,
1262            },
1263        );
1264
1265        let strategy = DefaultCompressionStrategy::with_params(params);
1266
1267        // Get merged params
1268        let merged = strategy
1269            .params
1270            .get_field_params("user_id", &DataType::Int32);
1271
1272        // Column params should override type params
1273        assert_eq!(merged.rle_threshold, Some(0.2));
1274        assert_eq!(merged.compression, Some("zstd".to_string()));
1275        assert_eq!(merged.compression_level, Some(6));
1276
1277        // Test field with only type params
1278        let merged = strategy
1279            .params
1280            .get_field_params("other_field", &DataType::Int32);
1281        assert_eq!(merged.rle_threshold, Some(0.5));
1282        assert_eq!(merged.compression, Some("lz4".to_string()));
1283        assert_eq!(merged.compression_level, None);
1284    }
1285
1286    #[test]
1287    fn test_pattern_matching() {
1288        let mut params = CompressionParams::new();
1289
1290        // Configure pattern for log files
1291        params.columns.insert(
1292            "log_*".to_string(),
1293            CompressionFieldParams {
1294                compression: Some("zstd".to_string()),
1295                compression_level: Some(6),
1296                ..Default::default()
1297            },
1298        );
1299
1300        let strategy = DefaultCompressionStrategy::with_params(params);
1301
1302        // Should match pattern
1303        let merged = strategy
1304            .params
1305            .get_field_params("log_messages", &DataType::Utf8);
1306        assert_eq!(merged.compression, Some("zstd".to_string()));
1307        assert_eq!(merged.compression_level, Some(6));
1308
1309        // Should not match
1310        let merged = strategy
1311            .params
1312            .get_field_params("messages_log", &DataType::Utf8);
1313        assert_eq!(merged.compression, None);
1314    }
1315
1316    #[test]
1317    fn test_legacy_metadata_support() {
1318        let params = CompressionParams::new();
1319        let strategy = DefaultCompressionStrategy::with_params(params);
1320
1321        // Test field with "none" compression metadata
1322        let mut metadata = HashMap::new();
1323        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1324        let mut field = create_test_field("some_column", DataType::Int32);
1325        field.metadata = metadata;
1326
1327        let data = create_fixed_width_block(32, 1000);
1328        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1329
1330        // Should respect metadata and use ValueEncoder
1331        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1332    }
1333
1334    #[test]
1335    fn test_default_behavior() {
1336        // Empty params should fall back to default behavior
1337        let params = CompressionParams::new();
1338        let strategy = DefaultCompressionStrategy::with_params(params);
1339
1340        let field = create_test_field("random_column", DataType::Int32);
1341        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
1342        let data = create_fixed_width_block_with_stats(32, 1000, 600);
1343
1344        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1345        // Should use default strategy's decision
1346        let debug_str = format!("{:?}", compressor);
1347        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1348    }
1349
1350    #[test]
1351    fn test_field_metadata_compression() {
1352        let params = CompressionParams::new();
1353        let strategy = DefaultCompressionStrategy::with_params(params);
1354
1355        // Test field with compression metadata
1356        let mut metadata = HashMap::new();
1357        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1358        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1359        let mut field = create_test_field("test_column", DataType::Int32);
1360        field.metadata = metadata;
1361
1362        let data = create_fixed_width_block(32, 1000);
1363        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1364
1365        // Should use zstd with level 6
1366        let debug_str = format!("{:?}", compressor);
1367        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1368    }
1369
1370    #[test]
1371    fn test_field_metadata_rle_threshold() {
1372        let params = CompressionParams::new();
1373        let strategy = DefaultCompressionStrategy::with_params(params);
1374
1375        // Test field with RLE threshold metadata
1376        let mut metadata = HashMap::new();
1377        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1378        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
1379        let mut field = create_test_field("test_column", DataType::Int32);
1380        field.metadata = metadata;
1381
1382        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
1383        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
1384        let data = create_fixed_width_block_with_stats(32, 1000, 100);
1385
1386        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1387
1388        // Should use RLE because run_count (100) < num_values * threshold (800)
1389        let debug_str = format!("{:?}", compressor);
1390        assert!(debug_str.contains("RleMiniBlockEncoder"));
1391    }
1392
1393    #[test]
1394    fn test_field_metadata_override_params() {
1395        // Set up params with one configuration
1396        let mut params = CompressionParams::new();
1397        params.columns.insert(
1398            "test_column".to_string(),
1399            CompressionFieldParams {
1400                rle_threshold: Some(0.3),
1401                compression: Some("lz4".to_string()),
1402                compression_level: None,
1403                bss: None,
1404            },
1405        );
1406
1407        let strategy = DefaultCompressionStrategy::with_params(params);
1408
1409        // Field metadata should override params
1410        let mut metadata = HashMap::new();
1411        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1412        let mut field = create_test_field("test_column", DataType::Int32);
1413        field.metadata = metadata;
1414
1415        let data = create_fixed_width_block(32, 1000);
1416        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1417
1418        // Should use none compression (from metadata) instead of lz4 (from params)
1419        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1420    }
1421
1422    #[test]
1423    fn test_field_metadata_mixed_configuration() {
1424        // Configure type-level params
1425        let mut params = CompressionParams::new();
1426        params.types.insert(
1427            "Int32".to_string(),
1428            CompressionFieldParams {
1429                rle_threshold: Some(0.5),
1430                compression: Some("lz4".to_string()),
1431                ..Default::default()
1432            },
1433        );
1434
1435        let strategy = DefaultCompressionStrategy::with_params(params);
1436
1437        // Field metadata provides partial override
1438        let mut metadata = HashMap::new();
1439        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1440        let mut field = create_test_field("test_column", DataType::Int32);
1441        field.metadata = metadata;
1442
1443        let data = create_fixed_width_block(32, 1000);
1444        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1445
1446        // Should use lz4 (from type params) with level 3 (from metadata)
1447        let debug_str = format!("{:?}", compressor);
1448        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1449    }
1450
1451    #[test]
1452    fn test_bss_field_metadata() {
1453        let params = CompressionParams::new();
1454        let strategy = DefaultCompressionStrategy::with_params(params);
1455
1456        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1457        let mut metadata = HashMap::new();
1458        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1459        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1460        let arrow_field =
1461            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1462        let field = Field::try_from(&arrow_field).unwrap();
1463
1464        // Create float data
1465        let data = create_fixed_width_block(32, 100);
1466
1467        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1468        let debug_str = format!("{:?}", compressor);
1469        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1470    }
1471
1472    #[test]
1473    fn test_bss_with_compression() {
1474        let params = CompressionParams::new();
1475        let strategy = DefaultCompressionStrategy::with_params(params);
1476
1477        // Test BSS with LZ4 compression
1478        let mut metadata = HashMap::new();
1479        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1480        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1481        let arrow_field =
1482            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1483        let field = Field::try_from(&arrow_field).unwrap();
1484
1485        // Create double data
1486        let data = create_fixed_width_block(64, 100);
1487
1488        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1489        let debug_str = format!("{:?}", compressor);
1490        // Should have BSS wrapped in general compression
1491        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1492        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1493    }
1494
1495    #[test]
1496    #[cfg(any(feature = "lz4", feature = "zstd"))]
1497    fn test_general_block_decompression_fixed_width_v2_2() {
1498        // Request general compression via the write path (2.2 requirement) and ensure the read path mirrors it.
1499        let mut params = CompressionParams::new();
1500        params.columns.insert(
1501            "dict_values".to_string(),
1502            CompressionFieldParams {
1503                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1504                ..Default::default()
1505            },
1506        );
1507
1508        let mut strategy = DefaultCompressionStrategy::with_params(params);
1509        strategy.version = LanceFileVersion::V2_2;
1510
1511        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1512        let data = create_fixed_width_block(24, 1024);
1513        let DataBlock::FixedWidth(expected_block) = &data else {
1514            panic!("expected fixed width block");
1515        };
1516        let expected_bits = expected_block.bits_per_value;
1517        let expected_num_values = expected_block.num_values;
1518        let num_values = expected_num_values;
1519
1520        let (compressor, encoding) = strategy
1521            .create_block_compressor(&field, &data)
1522            .expect("general compression should be selected");
1523        match encoding.compression.as_ref() {
1524            Some(Compression::General(_)) => {}
1525            other => panic!("expected general compression, got {:?}", other),
1526        }
1527
1528        let compressed_buffer = compressor
1529            .compress(data.clone())
1530            .expect("write path general compression should succeed");
1531
1532        let decompressor = DefaultDecompressionStrategy::default()
1533            .create_block_decompressor(&encoding)
1534            .expect("general block decompressor should be created");
1535
1536        let decoded = decompressor
1537            .decompress(compressed_buffer, num_values)
1538            .expect("decompression should succeed");
1539
1540        match decoded {
1541            DataBlock::FixedWidth(block) => {
1542                assert_eq!(block.bits_per_value, expected_bits);
1543                assert_eq!(block.num_values, expected_num_values);
1544                assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1545            }
1546            _ => panic!("expected fixed width block"),
1547        }
1548    }
1549}