lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19use crate::{
20    buffer::LanceBuffer,
21    compression_config::{CompressionFieldParams, CompressionParams},
22    constants::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY},
23    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
24    encodings::{
25        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
26        physical::{
27            binary::{
28                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
29                VariableDecoder, VariableEncoder,
30            },
31            bitpack::InlineBitpacking,
32            block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
33            byte_stream_split::ByteStreamSplitDecompressor,
34            constant::ConstantDecompressor,
35            fsst::{
36                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
37                FsstPerValueEncoder,
38            },
39            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
40            packed::{
41                PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
42            },
43            rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
44            value::{ValueDecompressor, ValueEncoder},
45        },
46    },
47    format::{pb, ProtobufUtils},
48    statistics::{GetStat, Stat},
49};
50use arrow::{array::AsArray, datatypes::UInt64Type};
51use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
52use lance_core::{datatypes::Field, Error, Result};
53use snafu::location;
54
55/// Default threshold for RLE compression selection.
56/// RLE is chosen when the run count is less than this fraction of total values.
57const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
58
59/// Trait for compression algorithms that compress an entire block of data into one opaque
60/// and self-described chunk.
61///
62/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
63///
64/// This is the most general type of compression.  There are no constraints on the method
65/// of compression it is assumed that the entire block of data will be present at decompression.
66///
67/// This is the least appropriate strategy for random access because we must load the entire
68/// block to access any single value.  This should only be used for cases where random access is never
69/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
70/// mini-block chunks)
71pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
72    /// Compress the data into a single buffer
73    ///
74    /// Also returns a description of the compression that can be used to decompress
75    /// when reading the data back
76    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
77}
78
79/// A trait to pick which compression to use for given data
80///
81/// There are several different kinds of compression.
82///
83/// - Block compression is the most generic, but most difficult to use efficiently
84/// - Per-value compression results in either a fixed width data block or a variable
85///   width data block.  In other words, there is some number of bits per value.
86///   In addition, each value should be independently decompressible.
87/// - Mini-block compression results in a small block of opaque data for chunks
88///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
89///   used for narrow data types (both fixed and variable length) where we can
90///   fit many values into an 16KiB block.
91pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
92    /// Create a block compressor for the given data
93    fn create_block_compressor(
94        &self,
95        field: &Field,
96        data: &DataBlock,
97    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
98
99    /// Create a per-value compressor for the given data
100    fn create_per_value(
101        &self,
102        field: &Field,
103        data: &DataBlock,
104    ) -> Result<Box<dyn PerValueCompressor>>;
105
106    /// Create a mini-block compressor for the given data
107    fn create_miniblock_compressor(
108        &self,
109        field: &Field,
110        data: &DataBlock,
111    ) -> Result<Box<dyn MiniBlockCompressor>>;
112}
113
114#[derive(Debug, Default)]
115pub struct DefaultCompressionStrategy {
116    /// User-configured compression parameters
117    params: CompressionParams,
118}
119
120impl DefaultCompressionStrategy {
121    /// Create a new compression strategy with default behavior
122    pub fn new() -> Self {
123        Self::default()
124    }
125
126    /// Create a new compression strategy with user-configured parameters
127    pub fn with_params(params: CompressionParams) -> Self {
128        Self { params }
129    }
130
131    /// Parse compression parameters from field metadata
132    fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
133        let mut params = CompressionFieldParams::default();
134
135        // Parse compression method
136        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
137            params.compression = Some(compression.clone());
138        }
139
140        // Parse compression level
141        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
142            params.compression_level = level.parse().ok();
143        }
144
145        // Parse RLE threshold
146        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
147            params.rle_threshold = threshold.parse().ok();
148        }
149
150        params
151    }
152
153    /// Build compressor based on parameters for fixed-width data
154    fn build_fixed_width_compressor(
155        &self,
156        params: &CompressionFieldParams,
157        data: &FixedWidthDataBlock,
158    ) -> Result<Box<dyn MiniBlockCompressor>> {
159        let bits_per_value = data.bits_per_value;
160        let is_byte_aligned = bits_per_value == 8
161            || bits_per_value == 16
162            || bits_per_value == 32
163            || bits_per_value == 64;
164
165        // Get statistics
166        let bit_widths = data.expect_stat(Stat::BitWidth);
167        let bit_widths = bit_widths.as_primitive::<UInt64Type>();
168        let has_all_zeros = bit_widths.values().iter().any(|v| *v == 0);
169        let too_small = bit_widths.len() == 1
170            && InlineBitpacking::min_size_bytes(bit_widths.value(0)) >= data.data_size();
171
172        // 1. Check for explicit "none" compression
173        if params.compression.as_deref() == Some("none") {
174            return Ok(Box::new(ValueEncoder::default()));
175        }
176
177        // 2. Determine base encoder
178        let mut base_encoder: Box<dyn MiniBlockCompressor> = {
179            // Check if RLE should be used
180            let rle_threshold = params
181                .rle_threshold
182                .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
183
184            let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
185            let num_values = data.num_values;
186
187            if (run_count as f64) < (num_values as f64) * rle_threshold && is_byte_aligned {
188                Box::new(RleMiniBlockEncoder::new())
189            } else if !has_all_zeros && !too_small && is_byte_aligned {
190                // Use bitpacking if appropriate
191                Box::new(InlineBitpacking::new(bits_per_value))
192            } else {
193                // Default to no compression for base layer
194                Box::new(ValueEncoder::default())
195            }
196        };
197
198        // 3. Apply general compression if configured
199        if let Some(compression_scheme) = &params.compression {
200            if compression_scheme != "none" && compression_scheme != "fsst" {
201                let scheme: CompressionScheme = compression_scheme.parse()?;
202                let config = CompressionConfig::new(scheme, params.compression_level);
203                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
204            }
205        }
206
207        Ok(base_encoder)
208    }
209
210    /// Build compressor based on parameters for variable-width data
211    fn build_variable_width_compressor(
212        &self,
213        params: &CompressionFieldParams,
214        data: &VariableWidthBlock,
215    ) -> Result<Box<dyn MiniBlockCompressor>> {
216        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
217            return Err(Error::invalid_input(
218                format!(
219                    "Variable width compression not supported for {} bit offsets",
220                    data.bits_per_offset
221                ),
222                location!(),
223            ));
224        }
225
226        // Get statistics
227        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
228        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
229
230        // 1. Check for explicit "none" compression
231        if params.compression.as_deref() == Some("none") {
232            return Ok(Box::new(BinaryMiniBlockEncoder::default()));
233        }
234
235        // 2. Check for explicit "fsst" compression
236        if params.compression.as_deref() == Some("fsst") {
237            return Ok(Box::new(FsstMiniBlockEncoder::default()));
238        }
239
240        // 3. Choose base encoder (FSST or Binary) based on data characteristics
241        let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
242            >= FSST_LEAST_INPUT_MAX_LENGTH
243            && data_size >= FSST_LEAST_INPUT_SIZE as u64
244        {
245            Box::new(FsstMiniBlockEncoder::default())
246        } else {
247            Box::new(BinaryMiniBlockEncoder::default())
248        };
249
250        // 4. Apply general compression if configured
251        if let Some(compression_scheme) = &params.compression {
252            if compression_scheme != "none" && compression_scheme != "fsst" {
253                let scheme: CompressionScheme = compression_scheme.parse()?;
254                let config = CompressionConfig::new(scheme, params.compression_level);
255                base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
256            }
257        }
258
259        Ok(base_encoder)
260    }
261}
262
263impl CompressionStrategy for DefaultCompressionStrategy {
264    fn create_miniblock_compressor(
265        &self,
266        field: &Field,
267        data: &DataBlock,
268    ) -> Result<Box<dyn MiniBlockCompressor>> {
269        let mut field_params = self
270            .params
271            .get_field_params(&field.name, &field.data_type());
272
273        // Override with field metadata if present (highest priority)
274        let metadata_params = Self::parse_field_metadata(field);
275        field_params.merge(&metadata_params);
276
277        match data {
278            DataBlock::FixedWidth(fixed_width_data) => {
279                self.build_fixed_width_compressor(&field_params, fixed_width_data)
280            }
281            DataBlock::VariableWidth(variable_width_data) => {
282                self.build_variable_width_compressor(&field_params, variable_width_data)
283            }
284            DataBlock::Struct(struct_data_block) => {
285                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
286                // just being cautious here.
287                if struct_data_block
288                    .children
289                    .iter()
290                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
291                {
292                    panic!("packed struct encoding currently only supports fixed-width fields.")
293                }
294                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
295            }
296            DataBlock::FixedSizeList(_) => {
297                // Ideally we would compress the list items but this creates something of a challenge.
298                // We don't want to break lists across chunks and we need to worry about inner validity
299                // layers.  If we try and use a compression scheme then it is unlikely to respect these
300                // constraints.
301                //
302                // For now, we just don't compress.  In the future, we might want to consider a more
303                // sophisticated approach.
304                Ok(Box::new(ValueEncoder::default()))
305            }
306            _ => Err(Error::NotSupported {
307                source: format!(
308                    "Mini-block compression not yet supported for block type {}",
309                    data.name()
310                )
311                .into(),
312                location: location!(),
313            }),
314        }
315    }
316
317    fn create_per_value(
318        &self,
319        field: &Field,
320        data: &DataBlock,
321    ) -> Result<Box<dyn PerValueCompressor>> {
322        let field_params = Self::parse_field_metadata(field);
323
324        match data {
325            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
326            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
327            DataBlock::VariableWidth(variable_width) => {
328                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
329                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
330
331                // If values are very large then use block compression on a per-value basis
332                //
333                // TODO: Could maybe use median here
334                if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
335                    return Ok(Box::new(CompressedBufferEncoder::default()));
336                }
337
338                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
339                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
340                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
341
342                    let variable_compression = Box::new(VariableEncoder::default());
343
344                    // Use FSST if explicitly requested or if data characteristics warrant it
345                    if field_params.compression.as_deref() == Some("fsst")
346                        || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
347                            && data_size >= FSST_LEAST_INPUT_SIZE as u64)
348                    {
349                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
350                    } else {
351                        Ok(variable_compression)
352                    }
353                } else {
354                    panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
355                }
356            }
357            _ => unreachable!(
358                "Per-value compression not yet supported for block type: {}",
359                data.name()
360            ),
361        }
362    }
363
364    fn create_block_compressor(
365        &self,
366        _field: &Field,
367        data: &DataBlock,
368    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
369        // TODO: We should actually compress here!
370        match data {
371            // Currently, block compression is used for rep/def (which is fixed width) and for dictionary
372            // encoding (which could be fixed width or variable width).
373            DataBlock::FixedWidth(fixed_width) => {
374                let encoder = Box::new(ValueEncoder::default());
375                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
376                Ok((encoder, encoding))
377            }
378            DataBlock::VariableWidth(variable_width) => {
379                let encoder = Box::new(VariableEncoder::default());
380                let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
381                Ok((encoder, encoding))
382            }
383            _ => unreachable!(),
384        }
385    }
386}
387
388pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
389    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
390}
391
392pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
393    /// Decompress one or more values
394    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
395    /// The number of bits in each value
396    ///
397    /// Currently (and probably long term) this must be a multiple of 8
398    fn bits_per_value(&self) -> u64;
399}
400
401pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
402    /// Decompress one or more values
403    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
404}
405
406pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
407    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
408}
409
410pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
411    fn create_miniblock_decompressor(
412        &self,
413        description: &pb::ArrayEncoding,
414        decompression_strategy: &dyn DecompressionStrategy,
415    ) -> Result<Box<dyn MiniBlockDecompressor>>;
416
417    fn create_fixed_per_value_decompressor(
418        &self,
419        description: &pb::ArrayEncoding,
420    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
421
422    fn create_variable_per_value_decompressor(
423        &self,
424        description: &pb::ArrayEncoding,
425    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
426
427    fn create_block_decompressor(
428        &self,
429        description: &pb::ArrayEncoding,
430    ) -> Result<Box<dyn BlockDecompressor>>;
431}
432
433#[derive(Debug, Default)]
434pub struct DefaultDecompressionStrategy {}
435
436impl DecompressionStrategy for DefaultDecompressionStrategy {
437    fn create_miniblock_decompressor(
438        &self,
439        description: &pb::ArrayEncoding,
440        decompression_strategy: &dyn DecompressionStrategy,
441    ) -> Result<Box<dyn MiniBlockDecompressor>> {
442        match description.array_encoding.as_ref().unwrap() {
443            pb::array_encoding::ArrayEncoding::Flat(flat) => {
444                Ok(Box::new(ValueDecompressor::from_flat(flat)))
445            }
446            pb::array_encoding::ArrayEncoding::InlineBitpacking(description) => {
447                Ok(Box::new(InlineBitpacking::from_description(description)))
448            }
449            pb::array_encoding::ArrayEncoding::Variable(variable) => Ok(Box::new(
450                BinaryMiniBlockDecompressor::new(variable.bits_per_offset as u8),
451            )),
452            pb::array_encoding::ArrayEncoding::Fsst(description) => {
453                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
454                    description.binary.as_ref().unwrap(),
455                    decompression_strategy,
456                )?;
457                Ok(Box::new(FsstMiniBlockDecompressor::new(
458                    description,
459                    inner_decompressor,
460                )))
461            }
462            pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
463                Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
464                    description,
465                )))
466            }
467            pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
468                // In the future, we might need to do something more complex here if FSL supports
469                // compression.
470                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
471            }
472            pb::array_encoding::ArrayEncoding::Rle(rle) => {
473                Ok(Box::new(RleMiniBlockDecompressor::new(rle.bits_per_value)))
474            }
475            pb::array_encoding::ArrayEncoding::ByteStreamSplit(bss) => Ok(Box::new(
476                ByteStreamSplitDecompressor::new(bss.bits_per_value as usize),
477            )),
478            pb::array_encoding::ArrayEncoding::GeneralMiniBlock(general) => {
479                // Create inner decompressor
480                let inner_decompressor = self.create_miniblock_decompressor(
481                    general.inner.as_ref().ok_or_else(|| {
482                        Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
483                    })?,
484                    decompression_strategy,
485                )?;
486
487                // Parse compression config
488                let compression = general.compression.as_ref().ok_or_else(|| {
489                    Error::invalid_input("GeneralMiniBlock missing compression config", location!())
490                })?;
491
492                let scheme = compression.scheme.parse()?;
493
494                let compression_config = crate::encodings::physical::block::CompressionConfig::new(
495                    scheme,
496                    compression.level,
497                );
498
499                Ok(Box::new(GeneralMiniBlockDecompressor::new(
500                    inner_decompressor,
501                    compression_config,
502                )))
503            }
504            _ => todo!(),
505        }
506    }
507
508    fn create_fixed_per_value_decompressor(
509        &self,
510        description: &pb::ArrayEncoding,
511    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
512        match description.array_encoding.as_ref().unwrap() {
513            pb::array_encoding::ArrayEncoding::Flat(flat) => {
514                Ok(Box::new(ValueDecompressor::from_flat(flat)))
515            }
516            pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
517                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
518            }
519            _ => todo!("fixed-per-value decompressor for {:?}", description),
520        }
521    }
522
523    fn create_variable_per_value_decompressor(
524        &self,
525        description: &pb::ArrayEncoding,
526    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
527        match *description.array_encoding.as_ref().unwrap() {
528            pb::array_encoding::ArrayEncoding::Variable(variable) => {
529                assert!(variable.bits_per_offset < u8::MAX as u32);
530                Ok(Box::new(VariableDecoder::default()))
531            }
532            pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
533                Ok(Box::new(FsstPerValueDecompressor::new(
534                    LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
535                    Box::new(VariableDecoder::default()),
536                )))
537            }
538            pb::array_encoding::ArrayEncoding::Block(ref block) => Ok(Box::new(
539                CompressedBufferEncoder::from_scheme(&block.scheme)?,
540            )),
541            _ => todo!("variable-per-value decompressor for {:?}", description),
542        }
543    }
544
545    fn create_block_decompressor(
546        &self,
547        description: &pb::ArrayEncoding,
548    ) -> Result<Box<dyn BlockDecompressor>> {
549        match description.array_encoding.as_ref().unwrap() {
550            pb::array_encoding::ArrayEncoding::Flat(flat) => {
551                Ok(Box::new(ValueDecompressor::from_flat(flat)))
552            }
553            pb::array_encoding::ArrayEncoding::Constant(constant) => {
554                let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
555                Ok(Box::new(ConstantDecompressor::new(scalar)))
556            }
557            pb::array_encoding::ArrayEncoding::Variable(_) => {
558                Ok(Box::new(BinaryBlockDecompressor::default()))
559            }
560            _ => todo!(),
561        }
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use crate::buffer::LanceBuffer;
569    use crate::data::{BlockInfo, DataBlock};
570    use arrow::datatypes::{DataType, Field as ArrowField};
571    use std::collections::HashMap;
572
573    fn create_test_field(name: &str, data_type: DataType) -> Field {
574        let arrow_field = ArrowField::new(name, data_type, true);
575        let mut field = Field::try_from(&arrow_field).unwrap();
576        field.id = -1;
577        field
578    }
579
580    fn create_fixed_width_block_with_stats(
581        bits_per_value: u64,
582        num_values: u64,
583        run_count: u64,
584    ) -> DataBlock {
585        let block = FixedWidthDataBlock {
586            bits_per_value,
587            data: LanceBuffer::reinterpret_vec(vec![
588                0u8;
589                (bits_per_value * num_values / 8) as usize
590            ]),
591            num_values,
592            block_info: BlockInfo::default(),
593        };
594
595        // Add required statistics
596        use crate::statistics::Stat;
597        use arrow::array::{ArrayRef, UInt64Array};
598        use std::sync::Arc;
599
600        let bit_widths = Arc::new(UInt64Array::from(vec![bits_per_value])) as ArrayRef;
601        let run_count_stat = Arc::new(UInt64Array::from(vec![run_count])) as ArrayRef;
602
603        block
604            .block_info
605            .0
606            .write()
607            .unwrap()
608            .insert(Stat::BitWidth, bit_widths);
609        block
610            .block_info
611            .0
612            .write()
613            .unwrap()
614            .insert(Stat::RunCount, run_count_stat);
615
616        DataBlock::FixedWidth(block)
617    }
618
619    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
620        // Default run_count is num_values / 4
621        create_fixed_width_block_with_stats(bits_per_value, num_values, num_values / 4)
622    }
623
624    #[test]
625    fn test_parameter_based_compression() {
626        let mut params = CompressionParams::new();
627
628        // Configure RLE for ID columns
629        params.columns.insert(
630            "*_id".to_string(),
631            CompressionFieldParams {
632                rle_threshold: Some(0.3),
633                compression: Some("lz4".to_string()),
634                compression_level: None,
635            },
636        );
637
638        let strategy = DefaultCompressionStrategy::with_params(params);
639        let field = create_test_field("user_id", DataType::Int32);
640        let data = create_fixed_width_block(32, 1000);
641
642        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
643        // Should use RLE due to low threshold
644        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
645    }
646
647    #[test]
648    fn test_type_level_parameters() {
649        let mut params = CompressionParams::new();
650
651        // Configure all Int32 to use specific settings
652        params.types.insert(
653            "Int32".to_string(),
654            CompressionFieldParams {
655                rle_threshold: Some(0.1), // Very low threshold
656                compression: Some("zstd".to_string()),
657                compression_level: Some(3),
658            },
659        );
660
661        let strategy = DefaultCompressionStrategy::with_params(params);
662        let field = create_test_field("some_column", DataType::Int32);
663        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
664        let data = create_fixed_width_block_with_stats(32, 1000, 50);
665
666        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
667        // Should use RLE due to very low threshold
668        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
669    }
670
671    #[test]
672    fn test_none_compression() {
673        let mut params = CompressionParams::new();
674
675        // Disable compression for embeddings
676        params.columns.insert(
677            "embeddings".to_string(),
678            CompressionFieldParams {
679                compression: Some("none".to_string()),
680                ..Default::default()
681            },
682        );
683
684        let strategy = DefaultCompressionStrategy::with_params(params);
685        let field = create_test_field("embeddings", DataType::Float32);
686        let data = create_fixed_width_block(32, 1000);
687
688        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
689        // Should use ValueEncoder (no compression)
690        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
691    }
692
693    #[test]
694    fn test_parameter_merge_priority() {
695        let mut params = CompressionParams::new();
696
697        // Set type-level
698        params.types.insert(
699            "Int32".to_string(),
700            CompressionFieldParams {
701                rle_threshold: Some(0.5),
702                compression: Some("lz4".to_string()),
703                ..Default::default()
704            },
705        );
706
707        // Set column-level (highest priority)
708        params.columns.insert(
709            "user_id".to_string(),
710            CompressionFieldParams {
711                rle_threshold: Some(0.2),
712                compression: Some("zstd".to_string()),
713                compression_level: Some(6),
714            },
715        );
716
717        let strategy = DefaultCompressionStrategy::with_params(params);
718
719        // Get merged params
720        let merged = strategy
721            .params
722            .get_field_params("user_id", &DataType::Int32);
723
724        // Column params should override type params
725        assert_eq!(merged.rle_threshold, Some(0.2));
726        assert_eq!(merged.compression, Some("zstd".to_string()));
727        assert_eq!(merged.compression_level, Some(6));
728
729        // Test field with only type params
730        let merged = strategy
731            .params
732            .get_field_params("other_field", &DataType::Int32);
733        assert_eq!(merged.rle_threshold, Some(0.5));
734        assert_eq!(merged.compression, Some("lz4".to_string()));
735        assert_eq!(merged.compression_level, None);
736    }
737
738    #[test]
739    fn test_pattern_matching() {
740        let mut params = CompressionParams::new();
741
742        // Configure pattern for log files
743        params.columns.insert(
744            "log_*".to_string(),
745            CompressionFieldParams {
746                compression: Some("zstd".to_string()),
747                compression_level: Some(6),
748                ..Default::default()
749            },
750        );
751
752        let strategy = DefaultCompressionStrategy::with_params(params);
753
754        // Should match pattern
755        let merged = strategy
756            .params
757            .get_field_params("log_messages", &DataType::Utf8);
758        assert_eq!(merged.compression, Some("zstd".to_string()));
759        assert_eq!(merged.compression_level, Some(6));
760
761        // Should not match
762        let merged = strategy
763            .params
764            .get_field_params("messages_log", &DataType::Utf8);
765        assert_eq!(merged.compression, None);
766    }
767
768    #[test]
769    fn test_legacy_metadata_support() {
770        let params = CompressionParams::new();
771        let strategy = DefaultCompressionStrategy::with_params(params);
772
773        // Test field with "none" compression metadata
774        let mut metadata = HashMap::new();
775        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
776        let mut field = create_test_field("some_column", DataType::Int32);
777        field.metadata = metadata;
778
779        let data = create_fixed_width_block(32, 1000);
780        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
781
782        // Should respect metadata and use ValueEncoder
783        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
784    }
785
786    #[test]
787    fn test_default_behavior() {
788        // Empty params should fall back to default behavior
789        let params = CompressionParams::new();
790        let strategy = DefaultCompressionStrategy::with_params(params);
791
792        let field = create_test_field("random_column", DataType::Int32);
793        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
794        let data = create_fixed_width_block_with_stats(32, 1000, 600);
795
796        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
797        // Should use default strategy's decision
798        let debug_str = format!("{:?}", compressor);
799        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
800    }
801
802    #[test]
803    fn test_field_metadata_compression() {
804        let params = CompressionParams::new();
805        let strategy = DefaultCompressionStrategy::with_params(params);
806
807        // Test field with compression metadata
808        let mut metadata = HashMap::new();
809        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
810        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
811        let mut field = create_test_field("test_column", DataType::Int32);
812        field.metadata = metadata;
813
814        let data = create_fixed_width_block(32, 1000);
815        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
816
817        // Should use zstd with level 6
818        let debug_str = format!("{:?}", compressor);
819        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
820    }
821
822    #[test]
823    fn test_field_metadata_rle_threshold() {
824        let params = CompressionParams::new();
825        let strategy = DefaultCompressionStrategy::with_params(params);
826
827        // Test field with RLE threshold metadata
828        let mut metadata = HashMap::new();
829        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
830        let mut field = create_test_field("test_column", DataType::Int32);
831        field.metadata = metadata;
832
833        // Create data with 0.7 run ratio (700 runs for 1000 values)
834        let data = create_fixed_width_block_with_stats(32, 1000, 700);
835        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
836
837        // Should use RLE because 0.7 < 0.8
838        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
839    }
840
841    #[test]
842    fn test_field_metadata_override_params() {
843        // Set up params with one configuration
844        let mut params = CompressionParams::new();
845        params.columns.insert(
846            "test_column".to_string(),
847            CompressionFieldParams {
848                rle_threshold: Some(0.3),
849                compression: Some("lz4".to_string()),
850                compression_level: None,
851            },
852        );
853
854        let strategy = DefaultCompressionStrategy::with_params(params);
855
856        // Field metadata should override params
857        let mut metadata = HashMap::new();
858        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
859        let mut field = create_test_field("test_column", DataType::Int32);
860        field.metadata = metadata;
861
862        let data = create_fixed_width_block(32, 1000);
863        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
864
865        // Should use none compression (from metadata) instead of lz4 (from params)
866        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
867    }
868
869    #[test]
870    fn test_field_metadata_mixed_configuration() {
871        // Configure type-level params
872        let mut params = CompressionParams::new();
873        params.types.insert(
874            "Int32".to_string(),
875            CompressionFieldParams {
876                rle_threshold: Some(0.5),
877                compression: Some("lz4".to_string()),
878                ..Default::default()
879            },
880        );
881
882        let strategy = DefaultCompressionStrategy::with_params(params);
883
884        // Field metadata provides partial override
885        let mut metadata = HashMap::new();
886        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
887        let mut field = create_test_field("test_column", DataType::Int32);
888        field.metadata = metadata;
889
890        let data = create_fixed_width_block(32, 1000);
891        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
892
893        // Should use lz4 (from type params) with level 3 (from metadata)
894        let debug_str = format!("{:?}", compressor);
895        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
896    }
897}