Skip to main content

lance_encoding/
compression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Compression traits and definitions for Lance 2.1
5//!
6//! In 2.1 the first step of encoding is structural encoding, where we shred inputs into
7//! leaf arrays and take care of the validity / offsets structure.  Then we pick a structural
8//! encoding (mini-block or full-zip) and then we compress the data.
9//!
10//! This module defines the traits for the compression step.  Each structural encoding has its
11//! own compression strategy.
12//!
13//! Miniblock compression is a block based approach for small data.  Since we introduce some read
14//! amplification and decompress entire blocks we are able to use opaque compression.
15//!
16//! Fullzip compression is a per-value approach where we require that values are transparently
17//! compressed so that we can locate them later.
18
19#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22    buffer::LanceBuffer,
23    compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24    constants::{
25        BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26    },
27    data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28    encodings::{
29        logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30        physical::{
31            binary::{
32                BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33                VariableDecoder, VariableEncoder,
34            },
35            block::{
36                CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37                GeneralBlockDecompressor,
38            },
39            byte_stream_split::{
40                ByteStreamSplitDecompressor, ByteStreamSplitEncoder, should_use_bss,
41            },
42            constant::ConstantDecompressor,
43            fsst::{
44                FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45                FsstPerValueEncoder,
46            },
47            general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48            packed::{
49                PackedStructFixedWidthMiniBlockDecompressor,
50                PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51                PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52                VariablePackedStructFieldKind,
53            },
54            rle::{RleDecompressor, RleEncoder},
55            value::{ValueDecompressor, ValueEncoder},
56        },
57    },
58    format::{
59        ProtobufUtils21,
60        pb21::{CompressiveEncoding, compressive_encoding::Compression},
61    },
62    statistics::{GetStat, Stat},
63    version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use arrow_schema::DataType;
68use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
69use lance_core::{Error, Result, datatypes::Field, error::LanceOptionExt};
70use std::{str::FromStr, sync::Arc};
71
72/// Default threshold for RLE compression selection when the user explicitly provides a threshold.
73///
74/// If no threshold is provided, we use a size model instead of a fixed run ratio.
75/// This preserves existing behavior for users relying on the default, while making
76/// the default selection more type-aware.
77const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79// Minimum block size (32kb) to trigger general block compression
80const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82/// Trait for compression algorithms that compress an entire block of data into one opaque
83/// and self-described chunk.
84///
85/// This is actually a _third_ compression strategy used in a few corner cases today (TODO: remove?)
86///
87/// This is the most general type of compression.  There are no constraints on the method
88/// of compression it is assumed that the entire block of data will be present at decompression.
89///
90/// This is the least appropriate strategy for random access because we must load the entire
91/// block to access any single value.  This should only be used for cases where random access is never
92/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
93/// mini-block chunks)
94pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95    /// Compress the data into a single buffer
96    ///
97    /// Also returns a description of the compression that can be used to decompress
98    /// when reading the data back
99    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102/// A trait to pick which compression to use for given data
103///
104/// There are several different kinds of compression.
105///
106/// - Block compression is the most generic, but most difficult to use efficiently
107/// - Per-value compression results in either a fixed width data block or a variable
108///   width data block.  In other words, there is some number of bits per value.
109///   In addition, each value should be independently decompressible.
110/// - Mini-block compression results in a small block of opaque data for chunks
111///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
112///   used for narrow data types (both fixed and variable length) where we can
113///   fit many values into an 16KiB block.
114pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115    /// Create a block compressor for the given data
116    fn create_block_compressor(
117        &self,
118        field: &Field,
119        data: &DataBlock,
120    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122    /// Create a per-value compressor for the given data
123    fn create_per_value(
124        &self,
125        field: &Field,
126        data: &DataBlock,
127    ) -> Result<Box<dyn PerValueCompressor>>;
128
129    /// Create a mini-block compressor for the given data
130    fn create_miniblock_compressor(
131        &self,
132        field: &Field,
133        data: &DataBlock,
134    ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139    /// User-configured compression parameters
140    params: CompressionParams,
141    /// The lance file version for compatibilities.
142    version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146    data: &FixedWidthDataBlock,
147    params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149    // BSS requires general compression to be effective
150    // If compression is not set or explicitly disabled, skip BSS
151    if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152        return None;
153    }
154
155    let mode = params.bss.unwrap_or(BssMode::Auto);
156    // should_use_bss already checks for supported bit widths (32/64)
157    if should_use_bss(data, mode) {
158        return Some(Box::new(ByteStreamSplitEncoder::new(
159            data.bits_per_value as usize,
160        )));
161    }
162    None
163}
164
165fn try_rle_for_mini_block(
166    data: &FixedWidthDataBlock,
167    params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169    let bits = data.bits_per_value;
170    if !matches!(bits, 8 | 16 | 32 | 64) {
171        return None;
172    }
173
174    let type_size = bits / 8;
175    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176    let threshold = params
177        .rle_threshold
178        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180    // If the user explicitly provided a threshold then honor it as an additional guard.
181    // A lower threshold makes RLE harder to trigger and can be used to avoid CPU overhead.
182    let passes_threshold = match params.rle_threshold {
183        Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184        None => true,
185    };
186
187    if !passes_threshold {
188        return None;
189    }
190
191    // Estimate the encoded size.
192    //
193    // RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into
194    // multiple entries of up to 255 values. We don't know the run length distribution here,
195    // so we conservatively account for splitting with an upper bound.
196    let num_values = data.num_values;
197    let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
198
199    let raw_bytes = (num_values as u128) * (type_size as u128);
200    let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202    if rle_bytes < raw_bytes {
203        #[cfg(feature = "bitpacking")]
204        {
205            if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data)
206                && (bitpack_bytes as u128) < rle_bytes
207            {
208                return None;
209            }
210        }
211        return Some(Box::new(RleEncoder::new()));
212    }
213    None
214}
215
216fn try_rle_for_block(
217    data: &FixedWidthDataBlock,
218    version: LanceFileVersion,
219    params: &CompressionFieldParams,
220) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
221    if version < LanceFileVersion::V2_2 {
222        return None;
223    }
224
225    let bits = data.bits_per_value;
226    if !matches!(bits, 8 | 16 | 32 | 64) {
227        return None;
228    }
229
230    let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
231    let threshold = params
232        .rle_threshold
233        .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
234
235    if (run_count as f64) < (data.num_values as f64) * threshold {
236        let compressor = Box::new(RleEncoder::new());
237        let encoding = ProtobufUtils21::rle(
238            ProtobufUtils21::flat(bits, None),
239            ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
240        );
241        return Some((compressor, encoding));
242    }
243    None
244}
245
246fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
247    #[cfg(feature = "bitpacking")]
248    {
249        let bits = _data.bits_per_value;
250        if estimate_inline_bitpacking_bytes(_data).is_some() {
251            return Some(Box::new(InlineBitpacking::new(bits)));
252        }
253        None
254    }
255    #[cfg(not(feature = "bitpacking"))]
256    {
257        None
258    }
259}
260
261#[cfg(feature = "bitpacking")]
262fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
263    use arrow_array::cast::AsArray;
264
265    let bits = data.bits_per_value;
266    if !matches!(bits, 8 | 16 | 32 | 64) {
267        return None;
268    }
269    if data.num_values == 0 {
270        return None;
271    }
272
273    let bit_widths = data.expect_stat(Stat::BitWidth);
274    let widths = bit_widths.as_primitive::<UInt64Type>();
275
276    let words_per_chunk: u128 = 1;
277    let word_bytes: u128 = (bits / 8) as u128;
278    let mut total_words: u128 = 0;
279    for i in 0..widths.len() {
280        let bit_width = widths.value(i) as u128;
281        let packed_words = (1024u128 * bit_width) / (bits as u128);
282        total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
283    }
284
285    let estimated_bytes = total_words.saturating_mul(word_bytes);
286    let raw_bytes = data.data_size() as u128;
287
288    if estimated_bytes >= raw_bytes {
289        return None;
290    }
291
292    u64::try_from(estimated_bytes).ok()
293}
294
295fn try_bitpack_for_block(
296    data: &FixedWidthDataBlock,
297) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
298    let bits = data.bits_per_value;
299    if !matches!(bits, 8 | 16 | 32 | 64) {
300        return None;
301    }
302
303    let bit_widths = data.expect_stat(Stat::BitWidth);
304    let widths = bit_widths.as_primitive::<UInt64Type>();
305    let max_bit_width = *widths.values().iter().max().unwrap();
306
307    let too_small =
308        widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
309
310    if too_small {
311        return None;
312    }
313
314    if data.num_values <= 1024 {
315        let compressor = Box::new(InlineBitpacking::new(bits));
316        let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
317        Some((compressor, encoding))
318    } else {
319        let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
320        let encoding = ProtobufUtils21::out_of_line_bitpacking(
321            bits,
322            ProtobufUtils21::flat(max_bit_width, None),
323        );
324        Some((compressor, encoding))
325    }
326}
327
328fn maybe_wrap_general_for_mini_block(
329    inner: Box<dyn MiniBlockCompressor>,
330    params: &CompressionFieldParams,
331) -> Result<Box<dyn MiniBlockCompressor>> {
332    match params.compression.as_deref() {
333        None | Some("none") | Some("fsst") => Ok(inner),
334        Some(raw) => {
335            let scheme = CompressionScheme::from_str(raw)
336                .map_err(|_| Error::invalid_input(format!("Unknown compression scheme: {raw}")))?;
337            let cfg = CompressionConfig::new(scheme, params.compression_level);
338            Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
339        }
340    }
341}
342
343fn try_general_compression(
344    version: LanceFileVersion,
345    field_params: &CompressionFieldParams,
346    data: &DataBlock,
347) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
348    // Explicitly disable general compression.
349    if field_params.compression.as_deref() == Some("none") {
350        return Ok(None);
351    }
352
353    // User-requested compression (unused today but perhaps still used
354    // in the future someday)
355    if let Some(compression_scheme) = &field_params.compression
356        && version >= LanceFileVersion::V2_2
357    {
358        let scheme: CompressionScheme = compression_scheme.parse()?;
359        let config = CompressionConfig::new(scheme, field_params.compression_level);
360        let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
361        return Ok(Some((compressor, config)));
362    }
363
364    // Automatic compression for large blocks
365    if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
366        && version >= LanceFileVersion::V2_2
367    {
368        let compressor = Box::new(CompressedBufferEncoder::default());
369        let config = compressor.compressor.config();
370        return Ok(Some((compressor, config)));
371    }
372
373    Ok(None)
374}
375
376impl DefaultCompressionStrategy {
377    /// Create a new compression strategy with default behavior
378    pub fn new() -> Self {
379        Self::default()
380    }
381
382    /// Create a new compression strategy with user-configured parameters
383    pub fn with_params(params: CompressionParams) -> Self {
384        Self {
385            params,
386            version: LanceFileVersion::default(),
387        }
388    }
389
390    /// Override the file version used to make compression decisions
391    pub fn with_version(mut self, version: LanceFileVersion) -> Self {
392        self.version = version;
393        self
394    }
395
396    /// Parse compression parameters from field metadata
397    fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
398        let mut params = CompressionFieldParams::default();
399
400        // Parse compression method
401        if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
402            params.compression = Some(compression.clone());
403        }
404
405        // Parse compression level
406        if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
407            params.compression_level = level.parse().ok();
408        }
409
410        // Parse RLE threshold
411        if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
412            params.rle_threshold = threshold.parse().ok();
413        }
414
415        // Parse BSS mode
416        if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
417            match BssMode::parse(bss_str) {
418                Some(mode) => params.bss = Some(mode),
419                None => {
420                    log::warn!("Invalid BSS mode '{}', using default", bss_str);
421                }
422            }
423        }
424
425        // Parse minichunk size
426        if let Some(minichunk_size_str) = field
427            .metadata
428            .get(super::constants::MINICHUNK_SIZE_META_KEY)
429        {
430            if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
431                // for lance v2.1, only 32kb or smaller is supported
432                if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
433                    log::warn!(
434                        "minichunk_size '{}' too large for version '{}', using default",
435                        minichunk_size,
436                        version
437                    );
438                } else {
439                    params.minichunk_size = Some(minichunk_size);
440                }
441            } else {
442                log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
443            }
444        }
445
446        params
447    }
448
449    fn build_fixed_width_compressor(
450        &self,
451        params: &CompressionFieldParams,
452        data: &FixedWidthDataBlock,
453    ) -> Result<Box<dyn MiniBlockCompressor>> {
454        if params.compression.as_deref() == Some("none") {
455            return Ok(Box::new(ValueEncoder::default()));
456        }
457
458        let base = try_bss_for_mini_block(data, params)
459            .or_else(|| try_rle_for_mini_block(data, params))
460            .or_else(|| try_bitpack_for_mini_block(data))
461            .unwrap_or_else(|| Box::new(ValueEncoder::default()));
462
463        maybe_wrap_general_for_mini_block(base, params)
464    }
465
466    /// Build compressor based on parameters for variable-width data
467    fn build_variable_width_compressor(
468        &self,
469        field: &Field,
470        data: &VariableWidthBlock,
471    ) -> Result<Box<dyn MiniBlockCompressor>> {
472        let params = self.get_merged_field_params(field);
473        let compression = params.compression.as_deref();
474        if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
475            return Err(Error::invalid_input(format!(
476                "Variable width compression not supported for {} bit offsets",
477                data.bits_per_offset
478            )));
479        }
480
481        // Get statistics
482        let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
483        let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
484
485        // Explicitly disable all compression.
486        if compression == Some("none") {
487            return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
488        }
489
490        let use_fsst = compression == Some("fsst")
491            || (compression.is_none()
492                && !matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
493                && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
494                && data_size >= FSST_LEAST_INPUT_SIZE as u64);
495
496        // Choose base encoder (FSST or Binary) once.
497        let mut base_encoder: Box<dyn MiniBlockCompressor> = if use_fsst {
498            Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
499        } else {
500            Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
501        };
502
503        // Wrap with general compression when configured (except FSST / none).
504        if let Some(compression_scheme) = compression.filter(|scheme| *scheme != "fsst") {
505            let scheme: CompressionScheme = compression_scheme.parse()?;
506            let config = CompressionConfig::new(scheme, params.compression_level);
507            base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
508        }
509
510        Ok(base_encoder)
511    }
512
513    /// Merge user-configured parameters with field metadata
514    /// Field metadata has highest priority
515    fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
516        let mut field_params = self
517            .params
518            .get_field_params(&field.name, &field.data_type());
519
520        // Override with field metadata if present (highest priority)
521        let metadata_params = Self::parse_field_metadata(field, &self.version);
522        field_params.merge(&metadata_params);
523
524        field_params
525    }
526}
527
528impl CompressionStrategy for DefaultCompressionStrategy {
529    fn create_miniblock_compressor(
530        &self,
531        field: &Field,
532        data: &DataBlock,
533    ) -> Result<Box<dyn MiniBlockCompressor>> {
534        match data {
535            DataBlock::FixedWidth(fixed_width_data) => {
536                let field_params = self.get_merged_field_params(field);
537                self.build_fixed_width_compressor(&field_params, fixed_width_data)
538            }
539            DataBlock::VariableWidth(variable_width_data) => {
540                self.build_variable_width_compressor(field, variable_width_data)
541            }
542            DataBlock::Struct(struct_data_block) => {
543                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
544                // just being cautious here.
545                if struct_data_block.has_variable_width_child() {
546                    return Err(Error::invalid_input(
547                        "Packed struct mini-block encoding supports only fixed-width children",
548                    ));
549                }
550                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
551            }
552            DataBlock::FixedSizeList(_) => {
553                // Ideally we would compress the list items but this creates something of a challenge.
554                // We don't want to break lists across chunks and we need to worry about inner validity
555                // layers.  If we try and use a compression scheme then it is unlikely to respect these
556                // constraints.
557                //
558                // For now, we just don't compress.  In the future, we might want to consider a more
559                // sophisticated approach.
560                Ok(Box::new(ValueEncoder::default()))
561            }
562            _ => Err(Error::not_supported_source(
563                format!(
564                    "Mini-block compression not yet supported for block type {}",
565                    data.name()
566                )
567                .into(),
568            )),
569        }
570    }
571
572    fn create_per_value(
573        &self,
574        field: &Field,
575        data: &DataBlock,
576    ) -> Result<Box<dyn PerValueCompressor>> {
577        let field_params = self.get_merged_field_params(field);
578
579        match data {
580            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
581            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
582            DataBlock::Struct(struct_block) => {
583                if field.children.len() != struct_block.children.len() {
584                    return Err(Error::invalid_input(
585                        "Struct field metadata does not match data block children",
586                    ));
587                }
588                let has_variable_child = struct_block.has_variable_width_child();
589                if has_variable_child {
590                    if self.version < LanceFileVersion::V2_2 {
591                        return Err(Error::not_supported_source("Variable packed struct encoding requires Lance file version 2.2 or later".into()));
592                    }
593                    Ok(Box::new(PackedStructVariablePerValueEncoder::new(
594                        self.clone(),
595                        field.children.clone(),
596                    )))
597                } else {
598                    Err(Error::invalid_input(
599                        "Packed struct per-value compression should not be used for fixed-width-only structs",
600                    ))
601                }
602            }
603            DataBlock::VariableWidth(variable_width) => {
604                let compression = field_params.compression.as_deref();
605                // Check for explicit "none" compression
606                if compression == Some("none") {
607                    return Ok(Box::new(VariableEncoder::default()));
608                }
609
610                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
611                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
612
613                // If values are very large then use block compression on a per-value basis
614                //
615                // TODO: Could maybe use median here
616
617                let per_value_requested =
618                    compression.is_some_and(|compression| compression != "fsst");
619
620                if (max_len > 32 * 1024 || per_value_requested)
621                    && data_size >= FSST_LEAST_INPUT_SIZE as u64
622                {
623                    return Ok(Box::new(CompressedBufferEncoder::default()));
624                }
625
626                if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
627                    let variable_compression = Box::new(VariableEncoder::default());
628                    let use_fsst = compression == Some("fsst")
629                        || (compression.is_none()
630                            && !matches!(
631                                field.data_type(),
632                                DataType::Binary | DataType::LargeBinary
633                            )
634                            && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
635                            && data_size >= FSST_LEAST_INPUT_SIZE as u64);
636
637                    // Use FSST if explicitly requested or if data characteristics warrant it.
638                    if use_fsst {
639                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
640                    } else {
641                        Ok(variable_compression)
642                    }
643                } else {
644                    panic!(
645                        "Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.",
646                        variable_width.bits_per_offset
647                    );
648                }
649            }
650            _ => unreachable!(
651                "Per-value compression not yet supported for block type: {}",
652                data.name()
653            ),
654        }
655    }
656
657    fn create_block_compressor(
658        &self,
659        field: &Field,
660        data: &DataBlock,
661    ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
662        let field_params = self.get_merged_field_params(field);
663
664        match data {
665            DataBlock::FixedWidth(fixed_width) => {
666                if let Some((compressor, encoding)) =
667                    try_rle_for_block(fixed_width, self.version, &field_params)
668                {
669                    return Ok((compressor, encoding));
670                }
671                if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
672                    return Ok((compressor, encoding));
673                }
674
675                // Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION)
676                if let Some((compressor, config)) =
677                    try_general_compression(self.version, &field_params, data)?
678                {
679                    let encoding = ProtobufUtils21::wrapped(
680                        config,
681                        ProtobufUtils21::flat(fixed_width.bits_per_value, None),
682                    )?;
683                    return Ok((compressor, encoding));
684                }
685
686                let encoder = Box::new(ValueEncoder::default());
687                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
688                Ok((encoder, encoding))
689            }
690            DataBlock::VariableWidth(variable_width) => {
691                // Try general compression
692                if let Some((compressor, config)) =
693                    try_general_compression(self.version, &field_params, data)?
694                {
695                    let encoding = ProtobufUtils21::wrapped(
696                        config,
697                        ProtobufUtils21::variable(
698                            ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
699                            None,
700                        ),
701                    )?;
702                    return Ok((compressor, encoding));
703                }
704
705                let encoder = Box::new(VariableEncoder::default());
706                let encoding = ProtobufUtils21::variable(
707                    ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
708                    None,
709                );
710                Ok((encoder, encoding))
711            }
712            _ => unreachable!(),
713        }
714    }
715}
716
717pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
718    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
719}
720
721pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
722    /// Decompress one or more values
723    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
724    /// The number of bits in each value
725    ///
726    /// Currently (and probably long term) this must be a multiple of 8
727    fn bits_per_value(&self) -> u64;
728}
729
730pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
731    /// Decompress one or more values
732    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
733}
734
735pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
736    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
737}
738
739pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
740    fn create_miniblock_decompressor(
741        &self,
742        description: &CompressiveEncoding,
743        decompression_strategy: &dyn DecompressionStrategy,
744    ) -> Result<Box<dyn MiniBlockDecompressor>>;
745
746    fn create_fixed_per_value_decompressor(
747        &self,
748        description: &CompressiveEncoding,
749    ) -> Result<Box<dyn FixedPerValueDecompressor>>;
750
751    fn create_variable_per_value_decompressor(
752        &self,
753        description: &CompressiveEncoding,
754    ) -> Result<Box<dyn VariablePerValueDecompressor>>;
755
756    fn create_block_decompressor(
757        &self,
758        description: &CompressiveEncoding,
759    ) -> Result<Box<dyn BlockDecompressor>>;
760}
761
762#[derive(Debug, Default)]
763pub struct DefaultDecompressionStrategy {}
764
765impl DecompressionStrategy for DefaultDecompressionStrategy {
766    fn create_miniblock_decompressor(
767        &self,
768        description: &CompressiveEncoding,
769        decompression_strategy: &dyn DecompressionStrategy,
770    ) -> Result<Box<dyn MiniBlockDecompressor>> {
771        match description.compression.as_ref().unwrap() {
772            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
773            #[cfg(feature = "bitpacking")]
774            Compression::InlineBitpacking(description) => {
775                Ok(Box::new(InlineBitpacking::from_description(description)))
776            }
777            #[cfg(not(feature = "bitpacking"))]
778            Compression::InlineBitpacking(_) => Err(Error::not_supported_source(
779                "this runtime was not built with bitpacking support".into(),
780            )),
781            Compression::Variable(variable) => {
782                let Compression::Flat(offsets) = variable
783                    .offsets
784                    .as_ref()
785                    .unwrap()
786                    .compression
787                    .as_ref()
788                    .unwrap()
789                else {
790                    panic!("Variable compression only supports flat offsets")
791                };
792                Ok(Box::new(BinaryMiniBlockDecompressor::new(
793                    offsets.bits_per_value as u8,
794                )))
795            }
796            Compression::Fsst(description) => {
797                let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
798                    description.values.as_ref().unwrap(),
799                    decompression_strategy,
800                )?;
801                Ok(Box::new(FsstMiniBlockDecompressor::new(
802                    description,
803                    inner_decompressor,
804                )))
805            }
806            Compression::PackedStruct(description) => Ok(Box::new(
807                PackedStructFixedWidthMiniBlockDecompressor::new(description),
808            )),
809            Compression::VariablePackedStruct(_) => Err(Error::not_supported_source(
810                "variable packed struct decoding is not yet implemented".into(),
811            )),
812            Compression::FixedSizeList(fsl) => {
813                // In the future, we might need to do something more complex here if FSL supports
814                // compression.
815                Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
816            }
817            Compression::Rle(rle) => {
818                let bits_per_value = validate_rle_compression(rle)?;
819                Ok(Box::new(RleDecompressor::new(bits_per_value)))
820            }
821            Compression::ByteStreamSplit(bss) => {
822                let Compression::Flat(values) =
823                    bss.values.as_ref().unwrap().compression.as_ref().unwrap()
824                else {
825                    panic!("ByteStreamSplit compression only supports flat values")
826                };
827                Ok(Box::new(ByteStreamSplitDecompressor::new(
828                    values.bits_per_value as usize,
829                )))
830            }
831            Compression::General(general) => {
832                // Create inner decompressor
833                let inner_decompressor = self.create_miniblock_decompressor(
834                    general.values.as_ref().ok_or_else(|| {
835                        Error::invalid_input("GeneralMiniBlock missing inner encoding")
836                    })?,
837                    decompression_strategy,
838                )?;
839
840                // Parse compression config
841                let compression = general.compression.as_ref().ok_or_else(|| {
842                    Error::invalid_input("GeneralMiniBlock missing compression config")
843                })?;
844
845                let scheme = compression.scheme().try_into()?;
846
847                let compression_config = CompressionConfig::new(scheme, compression.level);
848
849                Ok(Box::new(GeneralMiniBlockDecompressor::new(
850                    inner_decompressor,
851                    compression_config,
852                )))
853            }
854            _ => todo!(),
855        }
856    }
857
858    fn create_fixed_per_value_decompressor(
859        &self,
860        description: &CompressiveEncoding,
861    ) -> Result<Box<dyn FixedPerValueDecompressor>> {
862        match description.compression.as_ref().unwrap() {
863            Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
864                constant
865                    .value
866                    .as_ref()
867                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
868            ))),
869            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
870            Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
871            _ => todo!("fixed-per-value decompressor for {:?}", description),
872        }
873    }
874
875    fn create_variable_per_value_decompressor(
876        &self,
877        description: &CompressiveEncoding,
878    ) -> Result<Box<dyn VariablePerValueDecompressor>> {
879        match description.compression.as_ref().unwrap() {
880            Compression::Variable(variable) => {
881                let Compression::Flat(offsets) = variable
882                    .offsets
883                    .as_ref()
884                    .unwrap()
885                    .compression
886                    .as_ref()
887                    .unwrap()
888                else {
889                    panic!("Variable compression only supports flat offsets")
890                };
891                assert!(offsets.bits_per_value < u8::MAX as u64);
892                Ok(Box::new(VariableDecoder::default()))
893            }
894            Compression::Fsst(fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
895                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
896                Box::new(VariableDecoder::default()),
897            ))),
898            Compression::General(general) => Ok(Box::new(CompressedBufferEncoder::from_scheme(
899                general.compression.as_ref().expect_ok()?.scheme(),
900            )?)),
901            Compression::VariablePackedStruct(description) => {
902                let mut fields = Vec::with_capacity(description.fields.len());
903                for field in &description.fields {
904                    let value_encoding = field.value.as_ref().ok_or_else(|| {
905                        Error::invalid_input("VariablePackedStruct field is missing value encoding")
906                    })?;
907                    let decoder = match field.layout.as_ref().ok_or_else(|| {
908                        Error::invalid_input("VariablePackedStruct field is missing layout details")
909                    })? {
910                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
911                            bits_per_value,
912                        ) => {
913                            let decompressor =
914                                self.create_fixed_per_value_decompressor(value_encoding)?;
915                            VariablePackedStructFieldDecoder {
916                                kind: VariablePackedStructFieldKind::Fixed {
917                                    bits_per_value: *bits_per_value,
918                                    decompressor: Arc::from(decompressor),
919                                },
920                            }
921                        }
922                        crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
923                            bits_per_length,
924                        ) => {
925                            let decompressor =
926                                self.create_variable_per_value_decompressor(value_encoding)?;
927                            VariablePackedStructFieldDecoder {
928                                kind: VariablePackedStructFieldKind::Variable {
929                                    bits_per_length: *bits_per_length,
930                                    decompressor: Arc::from(decompressor),
931                                },
932                            }
933                        }
934                    };
935                    fields.push(decoder);
936                }
937                Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
938                    fields,
939                )))
940            }
941            _ => todo!("variable-per-value decompressor for {:?}", description),
942        }
943    }
944
945    fn create_block_decompressor(
946        &self,
947        description: &CompressiveEncoding,
948    ) -> Result<Box<dyn BlockDecompressor>> {
949        match description.compression.as_ref().unwrap() {
950            Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
951                InlineBitpacking::from_description(inline_bitpacking),
952            )),
953            Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
954            Compression::Constant(constant) => {
955                let scalar = constant
956                    .value
957                    .as_ref()
958                    .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
959                Ok(Box::new(ConstantDecompressor::new(scalar)))
960            }
961            Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
962            Compression::FixedSizeList(fsl) => {
963                Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
964            }
965            Compression::OutOfLineBitpacking(out_of_line) => {
966                // Extract the compressed bit width from the values encoding
967                let compressed_bit_width = match out_of_line
968                    .values
969                    .as_ref()
970                    .unwrap()
971                    .compression
972                    .as_ref()
973                    .unwrap()
974                {
975                    Compression::Flat(flat) => flat.bits_per_value,
976                    _ => {
977                        return Err(Error::invalid_input_source(
978                            "OutOfLineBitpacking values must use Flat encoding".into(),
979                        ));
980                    }
981                };
982                Ok(Box::new(OutOfLineBitpacking::new(
983                    compressed_bit_width,
984                    out_of_line.uncompressed_bits_per_value,
985                )))
986            }
987            Compression::General(general) => {
988                let inner_desc = general
989                    .values
990                    .as_ref()
991                    .ok_or_else(|| {
992                        Error::invalid_input("General compression missing inner encoding")
993                    })?
994                    .as_ref();
995                let inner_decompressor = self.create_block_decompressor(inner_desc)?;
996
997                let compression = general.compression.as_ref().ok_or_else(|| {
998                    Error::invalid_input("General compression missing compression config")
999                })?;
1000                let scheme = compression.scheme().try_into()?;
1001                let config = CompressionConfig::new(scheme, compression.level);
1002                let general_decompressor =
1003                    GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1004
1005                Ok(Box::new(general_decompressor))
1006            }
1007            Compression::Rle(rle) => {
1008                let bits_per_value = validate_rle_compression(rle)?;
1009                Ok(Box::new(RleDecompressor::new(bits_per_value)))
1010            }
1011            _ => todo!(),
1012        }
1013    }
1014}
1015/// Validates RLE compression format and extracts bits_per_value
1016fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
1017    let values = rle
1018        .values
1019        .as_ref()
1020        .ok_or_else(|| Error::invalid_input("RLE compression missing values encoding"))?;
1021    let run_lengths = rle
1022        .run_lengths
1023        .as_ref()
1024        .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths encoding"))?;
1025
1026    let values = values
1027        .compression
1028        .as_ref()
1029        .ok_or_else(|| Error::invalid_input("RLE compression missing values compression"))?;
1030    let Compression::Flat(values) = values else {
1031        return Err(Error::invalid_input(
1032            "RLE compression only supports flat values",
1033        ));
1034    };
1035
1036    let run_lengths = run_lengths
1037        .compression
1038        .as_ref()
1039        .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths compression"))?;
1040    let Compression::Flat(run_lengths) = run_lengths else {
1041        return Err(Error::invalid_input(
1042            "RLE compression only supports flat run lengths",
1043        ));
1044    };
1045
1046    if run_lengths.bits_per_value != 8 {
1047        return Err(Error::invalid_input(format!(
1048            "RLE compression only supports 8-bit run lengths, got {}",
1049            run_lengths.bits_per_value
1050        )));
1051    }
1052
1053    Ok(values.bits_per_value)
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use super::*;
1059    use crate::buffer::LanceBuffer;
1060    use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1061    use crate::statistics::ComputeStat;
1062    use crate::testing::extract_array_encoding_chain;
1063    use arrow_schema::{DataType, Field as ArrowField};
1064    use std::collections::HashMap;
1065
1066    fn create_test_field(name: &str, data_type: DataType) -> Field {
1067        let arrow_field = ArrowField::new(name, data_type, true);
1068        let mut field = Field::try_from(&arrow_field).unwrap();
1069        field.id = -1;
1070        field
1071    }
1072
1073    fn create_fixed_width_block_with_stats(
1074        bits_per_value: u64,
1075        num_values: u64,
1076        run_count: u64,
1077    ) -> DataBlock {
1078        // Create varied data to avoid low entropy
1079        let bytes_per_value = (bits_per_value / 8) as usize;
1080        let total_bytes = bytes_per_value * num_values as usize;
1081        let mut data = vec![0u8; total_bytes];
1082
1083        // Create data with specified run count
1084        let values_per_run = (num_values / run_count).max(1);
1085        let mut run_value = 0u8;
1086
1087        for i in 0..num_values as usize {
1088            if i % values_per_run as usize == 0 {
1089                run_value = run_value.wrapping_add(17); // Use prime to get varied values
1090            }
1091            // Fill all bytes of the value to create high entropy
1092            for j in 0..bytes_per_value {
1093                let byte_offset = i * bytes_per_value + j;
1094                if byte_offset < data.len() {
1095                    data[byte_offset] = run_value.wrapping_add(j as u8);
1096                }
1097            }
1098        }
1099
1100        let mut block = FixedWidthDataBlock {
1101            bits_per_value,
1102            data: LanceBuffer::reinterpret_vec(data),
1103            num_values,
1104            block_info: BlockInfo::default(),
1105        };
1106
1107        // Compute all statistics including BytePositionEntropy
1108        use crate::statistics::ComputeStat;
1109        block.compute_stat();
1110
1111        DataBlock::FixedWidth(block)
1112    }
1113
1114    fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1115        // Create data with some variety to avoid always triggering BSS
1116        let bytes_per_value = (bits_per_value / 8) as usize;
1117        let total_bytes = bytes_per_value * num_values as usize;
1118        let mut data = vec![0u8; total_bytes];
1119
1120        // Add some variation to the data to make it more realistic
1121        for i in 0..num_values as usize {
1122            let byte_offset = i * bytes_per_value;
1123            if byte_offset < data.len() {
1124                data[byte_offset] = (i % 256) as u8;
1125            }
1126        }
1127
1128        let mut block = FixedWidthDataBlock {
1129            bits_per_value,
1130            data: LanceBuffer::reinterpret_vec(data),
1131            num_values,
1132            block_info: BlockInfo::default(),
1133        };
1134
1135        // Compute all statistics including BytePositionEntropy
1136        use crate::statistics::ComputeStat;
1137        block.compute_stat();
1138
1139        DataBlock::FixedWidth(block)
1140    }
1141
1142    fn create_variable_width_block(
1143        bits_per_offset: u8,
1144        num_values: u64,
1145        avg_value_size: usize,
1146    ) -> DataBlock {
1147        use crate::statistics::ComputeStat;
1148
1149        // Create offsets buffer (num_values + 1 offsets)
1150        let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1151        let mut current_offset = 0i64;
1152        offsets.push(current_offset);
1153
1154        // Generate offsets with varying value sizes
1155        for i in 0..num_values {
1156            let value_size = if avg_value_size == 0 {
1157                1
1158            } else {
1159                ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1160                    .min(avg_value_size * 2)
1161            };
1162            current_offset += value_size as i64;
1163            offsets.push(current_offset);
1164        }
1165
1166        // Create data buffer with realistic content
1167        let total_data_size = current_offset as usize;
1168        let mut data = vec![0u8; total_data_size];
1169
1170        // Fill data with varied content
1171        for i in 0..num_values {
1172            let start_offset = offsets[i as usize] as usize;
1173            let end_offset = offsets[(i + 1) as usize] as usize;
1174
1175            let content = (i % 256) as u8;
1176            for j in 0..end_offset - start_offset {
1177                data[start_offset + j] = content.wrapping_add(j as u8);
1178            }
1179        }
1180
1181        // Convert offsets to appropriate lance buffer
1182        let offsets_buffer = match bits_per_offset {
1183            32 => {
1184                let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1185                LanceBuffer::reinterpret_vec(offsets_32)
1186            }
1187            64 => LanceBuffer::reinterpret_vec(offsets),
1188            _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1189        };
1190
1191        let mut block = VariableWidthBlock {
1192            data: LanceBuffer::from(data),
1193            offsets: offsets_buffer,
1194            bits_per_offset,
1195            num_values,
1196            block_info: BlockInfo::default(),
1197        };
1198
1199        block.compute_stat();
1200        DataBlock::VariableWidth(block)
1201    }
1202
1203    fn create_fsst_candidate_variable_width_block() -> DataBlock {
1204        create_variable_width_block(32, 4096, FSST_LEAST_INPUT_MAX_LENGTH as usize + 16)
1205    }
1206
1207    #[test]
1208    fn test_parameter_based_compression() {
1209        let mut params = CompressionParams::new();
1210
1211        // Configure RLE for ID columns with BSS explicitly disabled
1212        params.columns.insert(
1213            "*_id".to_string(),
1214            CompressionFieldParams {
1215                rle_threshold: Some(0.3),
1216                compression: Some("lz4".to_string()),
1217                compression_level: None,
1218                bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE
1219                minichunk_size: None,
1220            },
1221        );
1222
1223        let strategy = DefaultCompressionStrategy::with_params(params);
1224        let field = create_test_field("user_id", DataType::Int32);
1225
1226        // Create data with low run count for RLE
1227        // Use create_fixed_width_block_with_stats which properly sets run count
1228        let data = create_fixed_width_block_with_stats(32, 1000, 100); // 100 runs out of 1000 values
1229
1230        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1231        // Should use RLE due to low threshold (0.3) and low run count (100/1000 = 0.1)
1232        let debug_str = format!("{:?}", compressor);
1233
1234        // The compressor should be RLE wrapped in general compression
1235        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1236        assert!(debug_str.contains("RleEncoder"));
1237    }
1238
1239    #[test]
1240    fn test_type_level_parameters() {
1241        let mut params = CompressionParams::new();
1242
1243        // Configure all Int32 to use specific settings
1244        params.types.insert(
1245            "Int32".to_string(),
1246            CompressionFieldParams {
1247                rle_threshold: Some(0.1), // Very low threshold
1248                compression: Some("zstd".to_string()),
1249                compression_level: Some(3),
1250                bss: Some(BssMode::Off), // Disable BSS to test RLE
1251                minichunk_size: None,
1252            },
1253        );
1254
1255        let strategy = DefaultCompressionStrategy::with_params(params);
1256        let field = create_test_field("some_column", DataType::Int32);
1257        // Create data with very low run count (50 runs for 1000 values = 0.05 ratio)
1258        let data = create_fixed_width_block_with_stats(32, 1000, 50);
1259
1260        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1261        // Should use RLE due to very low threshold
1262        assert!(format!("{:?}", compressor).contains("RleEncoder"));
1263    }
1264
1265    // Regression for #6626: an all-zero stat segment (e.g. rep/def for a long
1266    // run of empty lists) used to disable block bitpacking entirely.
1267    #[test]
1268    #[cfg(feature = "bitpacking")]
1269    fn test_block_bitpacks_with_zero_segment() {
1270        let strategy = DefaultCompressionStrategy::new();
1271        let field = create_test_field("levels", DataType::UInt16);
1272
1273        // First 1024 zeros, then 1024 ones; max bit width is 1.
1274        let mut values: Vec<u16> = vec![0; 1024];
1275        values.extend(std::iter::repeat_n(1u16, 1024));
1276        let mut block = FixedWidthDataBlock {
1277            bits_per_value: 16,
1278            data: LanceBuffer::reinterpret_vec(values),
1279            num_values: 2048,
1280            block_info: BlockInfo::default(),
1281        };
1282        block.compute_stat();
1283        let data = DataBlock::FixedWidth(block);
1284
1285        let (compressor, _encoding) = strategy.create_block_compressor(&field, &data).unwrap();
1286        let debug_str = format!("{:?}", compressor);
1287        assert!(
1288            debug_str.contains("OutOfLineBitpacking"),
1289            "expected OutOfLineBitpacking, got: {debug_str}"
1290        );
1291    }
1292
1293    #[test]
1294    #[cfg(feature = "bitpacking")]
1295    fn test_low_cardinality_prefers_bitpacking_over_rle() {
1296        let strategy = DefaultCompressionStrategy::new();
1297        let field = create_test_field("int_score", DataType::Int64);
1298
1299        // Low cardinality values (3/4/5) but with moderate run count:
1300        // RLE compresses vs raw, yet bitpacking should be smaller.
1301        let mut values: Vec<u64> = Vec::with_capacity(256);
1302        for run_idx in 0..64 {
1303            let value = match run_idx % 3 {
1304                0 => 3u64,
1305                1 => 4u64,
1306                _ => 5u64,
1307            };
1308            values.extend(std::iter::repeat_n(value, 4));
1309        }
1310
1311        let mut block = FixedWidthDataBlock {
1312            bits_per_value: 64,
1313            data: LanceBuffer::reinterpret_vec(values),
1314            num_values: 256,
1315            block_info: BlockInfo::default(),
1316        };
1317
1318        use crate::statistics::ComputeStat;
1319        block.compute_stat();
1320
1321        let data = DataBlock::FixedWidth(block);
1322        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1323        let debug_str = format!("{:?}", compressor);
1324        assert!(
1325            debug_str.contains("InlineBitpacking"),
1326            "expected InlineBitpacking, got: {debug_str}"
1327        );
1328        assert!(
1329            !debug_str.contains("RleEncoder"),
1330            "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1331        );
1332    }
1333
1334    fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1335        let chain = extract_array_encoding_chain(encoding);
1336        if variable {
1337            assert_eq!(chain.len(), 2);
1338            assert_eq!(chain.first().unwrap().as_str(), "variable");
1339            assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1340        } else {
1341            assert_eq!(chain.len(), 1);
1342            assert_eq!(chain.first().unwrap().as_str(), "flat");
1343        }
1344    }
1345
1346    #[test]
1347    fn test_none_compression() {
1348        let mut params = CompressionParams::new();
1349
1350        // Disable compression for embeddings
1351        params.columns.insert(
1352            "embeddings".to_string(),
1353            CompressionFieldParams {
1354                compression: Some("none".to_string()),
1355                ..Default::default()
1356            },
1357        );
1358
1359        let strategy = DefaultCompressionStrategy::with_params(params);
1360        let field = create_test_field("embeddings", DataType::Float32);
1361        let fixed_data = create_fixed_width_block(32, 1000);
1362        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1363
1364        // Test miniblock
1365        let compressor = strategy
1366            .create_miniblock_compressor(&field, &fixed_data)
1367            .unwrap();
1368        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1369        check_uncompressed_encoding(&encoding, false);
1370        let compressor = strategy
1371            .create_miniblock_compressor(&field, &variable_data)
1372            .unwrap();
1373        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1374        check_uncompressed_encoding(&encoding, true);
1375
1376        // Test pervalue
1377        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1378        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1379        check_uncompressed_encoding(&encoding, false);
1380        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1381        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1382        check_uncompressed_encoding(&encoding, true);
1383    }
1384
1385    #[test]
1386    fn test_field_metadata_none_compression() {
1387        // Prepare field with metadata for none compression
1388        let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1389        let mut metadata = HashMap::new();
1390        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1391        arrow_field = arrow_field.with_metadata(metadata);
1392        let field = Field::try_from(&arrow_field).unwrap();
1393
1394        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1395
1396        // Test miniblock
1397        let fixed_data = create_fixed_width_block(32, 1000);
1398        let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1399
1400        let compressor = strategy
1401            .create_miniblock_compressor(&field, &fixed_data)
1402            .unwrap();
1403        let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1404        check_uncompressed_encoding(&encoding, false);
1405
1406        let compressor = strategy
1407            .create_miniblock_compressor(&field, &variable_data)
1408            .unwrap();
1409        let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1410        check_uncompressed_encoding(&encoding, true);
1411
1412        // Test pervalue
1413        let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1414        let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1415        check_uncompressed_encoding(&encoding, false);
1416
1417        let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1418        let (_block, encoding) = compressor.compress(variable_data).unwrap();
1419        check_uncompressed_encoding(&encoding, true);
1420    }
1421
1422    #[test]
1423    fn test_auto_fsst_disabled_for_binary_fields() {
1424        let strategy = DefaultCompressionStrategy::new();
1425        let field = create_test_field("bytes", DataType::Binary);
1426        let variable_data = create_fsst_candidate_variable_width_block();
1427
1428        let miniblock = strategy
1429            .create_miniblock_compressor(&field, &variable_data)
1430            .unwrap();
1431        let miniblock_debug = format!("{:?}", miniblock);
1432        assert!(
1433            miniblock_debug.contains("BinaryMiniBlockEncoder"),
1434            "expected BinaryMiniBlockEncoder, got: {miniblock_debug}"
1435        );
1436        assert!(
1437            !miniblock_debug.contains("FsstMiniBlockEncoder"),
1438            "did not expect FsstMiniBlockEncoder, got: {miniblock_debug}"
1439        );
1440
1441        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1442        let per_value_debug = format!("{:?}", per_value);
1443        assert!(
1444            per_value_debug.contains("VariableEncoder"),
1445            "expected VariableEncoder, got: {per_value_debug}"
1446        );
1447        assert!(
1448            !per_value_debug.contains("FsstPerValueEncoder"),
1449            "did not expect FsstPerValueEncoder, got: {per_value_debug}"
1450        );
1451    }
1452
1453    #[test]
1454    fn test_auto_fsst_still_enabled_for_utf8_fields() {
1455        let strategy = DefaultCompressionStrategy::new();
1456        let field = create_test_field("text", DataType::Utf8);
1457        let variable_data = create_fsst_candidate_variable_width_block();
1458
1459        let miniblock = strategy
1460            .create_miniblock_compressor(&field, &variable_data)
1461            .unwrap();
1462        let miniblock_debug = format!("{:?}", miniblock);
1463        assert!(
1464            miniblock_debug.contains("FsstMiniBlockEncoder"),
1465            "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1466        );
1467
1468        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1469        let per_value_debug = format!("{:?}", per_value);
1470        assert!(
1471            per_value_debug.contains("FsstPerValueEncoder"),
1472            "expected FsstPerValueEncoder, got: {per_value_debug}"
1473        );
1474    }
1475
1476    #[test]
1477    fn test_explicit_fsst_still_supported_for_binary_fields() {
1478        let mut params = CompressionParams::new();
1479        params.columns.insert(
1480            "bytes".to_string(),
1481            CompressionFieldParams {
1482                compression: Some("fsst".to_string()),
1483                ..Default::default()
1484            },
1485        );
1486
1487        let strategy = DefaultCompressionStrategy::with_params(params);
1488        let field = create_test_field("bytes", DataType::Binary);
1489        let variable_data = create_fsst_candidate_variable_width_block();
1490
1491        let miniblock = strategy
1492            .create_miniblock_compressor(&field, &variable_data)
1493            .unwrap();
1494        let miniblock_debug = format!("{:?}", miniblock);
1495        assert!(
1496            miniblock_debug.contains("FsstMiniBlockEncoder"),
1497            "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1498        );
1499
1500        let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1501        let per_value_debug = format!("{:?}", per_value);
1502        assert!(
1503            per_value_debug.contains("FsstPerValueEncoder"),
1504            "expected FsstPerValueEncoder, got: {per_value_debug}"
1505        );
1506    }
1507
1508    #[test]
1509    fn test_parameter_merge_priority() {
1510        let mut params = CompressionParams::new();
1511
1512        // Set type-level
1513        params.types.insert(
1514            "Int32".to_string(),
1515            CompressionFieldParams {
1516                rle_threshold: Some(0.5),
1517                compression: Some("lz4".to_string()),
1518                ..Default::default()
1519            },
1520        );
1521
1522        // Set column-level (highest priority)
1523        params.columns.insert(
1524            "user_id".to_string(),
1525            CompressionFieldParams {
1526                rle_threshold: Some(0.2),
1527                compression: Some("zstd".to_string()),
1528                compression_level: Some(6),
1529                bss: None,
1530                minichunk_size: None,
1531            },
1532        );
1533
1534        let strategy = DefaultCompressionStrategy::with_params(params);
1535
1536        // Get merged params
1537        let merged = strategy
1538            .params
1539            .get_field_params("user_id", &DataType::Int32);
1540
1541        // Column params should override type params
1542        assert_eq!(merged.rle_threshold, Some(0.2));
1543        assert_eq!(merged.compression, Some("zstd".to_string()));
1544        assert_eq!(merged.compression_level, Some(6));
1545
1546        // Test field with only type params
1547        let merged = strategy
1548            .params
1549            .get_field_params("other_field", &DataType::Int32);
1550        assert_eq!(merged.rle_threshold, Some(0.5));
1551        assert_eq!(merged.compression, Some("lz4".to_string()));
1552        assert_eq!(merged.compression_level, None);
1553    }
1554
1555    #[test]
1556    fn test_pattern_matching() {
1557        let mut params = CompressionParams::new();
1558
1559        // Configure pattern for log files
1560        params.columns.insert(
1561            "log_*".to_string(),
1562            CompressionFieldParams {
1563                compression: Some("zstd".to_string()),
1564                compression_level: Some(6),
1565                ..Default::default()
1566            },
1567        );
1568
1569        let strategy = DefaultCompressionStrategy::with_params(params);
1570
1571        // Should match pattern
1572        let merged = strategy
1573            .params
1574            .get_field_params("log_messages", &DataType::Utf8);
1575        assert_eq!(merged.compression, Some("zstd".to_string()));
1576        assert_eq!(merged.compression_level, Some(6));
1577
1578        // Should not match
1579        let merged = strategy
1580            .params
1581            .get_field_params("messages_log", &DataType::Utf8);
1582        assert_eq!(merged.compression, None);
1583    }
1584
1585    #[test]
1586    fn test_legacy_metadata_support() {
1587        let params = CompressionParams::new();
1588        let strategy = DefaultCompressionStrategy::with_params(params);
1589
1590        // Test field with "none" compression metadata
1591        let mut metadata = HashMap::new();
1592        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1593        let mut field = create_test_field("some_column", DataType::Int32);
1594        field.metadata = metadata;
1595
1596        let data = create_fixed_width_block(32, 1000);
1597        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1598
1599        // Should respect metadata and use ValueEncoder
1600        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1601    }
1602
1603    #[test]
1604    fn test_default_behavior() {
1605        // Empty params should fall back to default behavior
1606        let params = CompressionParams::new();
1607        let strategy = DefaultCompressionStrategy::with_params(params);
1608
1609        let field = create_test_field("random_column", DataType::Int32);
1610        // Create data with high run count that won't trigger RLE (600 runs for 1000 values = 0.6 ratio)
1611        let data = create_fixed_width_block_with_stats(32, 1000, 600);
1612
1613        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1614        // Should use default strategy's decision
1615        let debug_str = format!("{:?}", compressor);
1616        assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1617    }
1618
1619    #[test]
1620    fn test_field_metadata_compression() {
1621        let params = CompressionParams::new();
1622        let strategy = DefaultCompressionStrategy::with_params(params);
1623
1624        // Test field with compression metadata
1625        let mut metadata = HashMap::new();
1626        metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1627        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1628        let mut field = create_test_field("test_column", DataType::Int32);
1629        field.metadata = metadata;
1630
1631        let data = create_fixed_width_block(32, 1000);
1632        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1633
1634        // Should use zstd with level 6
1635        let debug_str = format!("{:?}", compressor);
1636        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1637    }
1638
1639    #[test]
1640    fn test_field_metadata_rle_threshold() {
1641        let params = CompressionParams::new();
1642        let strategy = DefaultCompressionStrategy::with_params(params);
1643
1644        // Test field with RLE threshold metadata
1645        let mut metadata = HashMap::new();
1646        metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1647        metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); // Disable BSS to test RLE
1648        let mut field = create_test_field("test_column", DataType::Int32);
1649        field.metadata = metadata;
1650
1651        // Create data with low run count (e.g., 100 runs for 1000 values = 0.1 ratio)
1652        // This ensures run_count (100) < num_values * threshold (1000 * 0.8 = 800)
1653        let data = create_fixed_width_block_with_stats(32, 1000, 100);
1654
1655        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1656
1657        // Should use RLE because run_count (100) < num_values * threshold (800)
1658        let debug_str = format!("{:?}", compressor);
1659        assert!(debug_str.contains("RleEncoder"));
1660    }
1661
1662    #[test]
1663    fn test_field_metadata_override_params() {
1664        // Set up params with one configuration
1665        let mut params = CompressionParams::new();
1666        params.columns.insert(
1667            "test_column".to_string(),
1668            CompressionFieldParams {
1669                rle_threshold: Some(0.3),
1670                compression: Some("lz4".to_string()),
1671                compression_level: None,
1672                bss: None,
1673                minichunk_size: None,
1674            },
1675        );
1676
1677        let strategy = DefaultCompressionStrategy::with_params(params);
1678
1679        // Field metadata should override params
1680        let mut metadata = HashMap::new();
1681        metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1682        let mut field = create_test_field("test_column", DataType::Int32);
1683        field.metadata = metadata;
1684
1685        let data = create_fixed_width_block(32, 1000);
1686        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1687
1688        // Should use none compression (from metadata) instead of lz4 (from params)
1689        assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1690    }
1691
1692    #[test]
1693    fn test_field_metadata_mixed_configuration() {
1694        // Configure type-level params
1695        let mut params = CompressionParams::new();
1696        params.types.insert(
1697            "Int32".to_string(),
1698            CompressionFieldParams {
1699                rle_threshold: Some(0.5),
1700                compression: Some("lz4".to_string()),
1701                ..Default::default()
1702            },
1703        );
1704
1705        let strategy = DefaultCompressionStrategy::with_params(params);
1706
1707        // Field metadata provides partial override
1708        let mut metadata = HashMap::new();
1709        metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1710        let mut field = create_test_field("test_column", DataType::Int32);
1711        field.metadata = metadata;
1712
1713        let data = create_fixed_width_block(32, 1000);
1714        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1715
1716        // Should use lz4 (from type params) with level 3 (from metadata)
1717        let debug_str = format!("{:?}", compressor);
1718        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1719    }
1720
1721    #[test]
1722    fn test_bss_field_metadata() {
1723        let params = CompressionParams::new();
1724        let strategy = DefaultCompressionStrategy::with_params(params);
1725
1726        // Test BSS "on" mode with compression enabled (BSS requires compression to be effective)
1727        let mut metadata = HashMap::new();
1728        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1729        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1730        let arrow_field =
1731            ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1732        let field = Field::try_from(&arrow_field).unwrap();
1733
1734        // Create float data
1735        let data = create_fixed_width_block(32, 100);
1736
1737        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1738        let debug_str = format!("{:?}", compressor);
1739        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1740    }
1741
1742    #[test]
1743    fn test_bss_with_compression() {
1744        let params = CompressionParams::new();
1745        let strategy = DefaultCompressionStrategy::with_params(params);
1746
1747        // Test BSS with LZ4 compression
1748        let mut metadata = HashMap::new();
1749        metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1750        metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1751        let arrow_field =
1752            ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1753        let field = Field::try_from(&arrow_field).unwrap();
1754
1755        // Create double data
1756        let data = create_fixed_width_block(64, 100);
1757
1758        let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1759        let debug_str = format!("{:?}", compressor);
1760        // Should have BSS wrapped in general compression
1761        assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1762        assert!(debug_str.contains("ByteStreamSplitEncoder"));
1763    }
1764
1765    #[test]
1766    #[cfg(any(feature = "lz4", feature = "zstd"))]
1767    fn test_general_block_decompression_fixed_width_v2_2() {
1768        // Request general compression via the write path (2.2 requirement) and ensure the read path mirrors it.
1769        let mut params = CompressionParams::new();
1770        params.columns.insert(
1771            "dict_values".to_string(),
1772            CompressionFieldParams {
1773                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1774                ..Default::default()
1775            },
1776        );
1777
1778        let mut strategy = DefaultCompressionStrategy::with_params(params);
1779        strategy.version = LanceFileVersion::V2_2;
1780
1781        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1782        let data = create_fixed_width_block(24, 1024);
1783        let DataBlock::FixedWidth(expected_block) = &data else {
1784            panic!("expected fixed width block");
1785        };
1786        let expected_bits = expected_block.bits_per_value;
1787        let expected_num_values = expected_block.num_values;
1788        let num_values = expected_num_values;
1789
1790        let (compressor, encoding) = strategy
1791            .create_block_compressor(&field, &data)
1792            .expect("general compression should be selected");
1793        match encoding.compression.as_ref() {
1794            Some(Compression::General(_)) => {}
1795            other => panic!("expected general compression, got {:?}", other),
1796        }
1797
1798        let compressed_buffer = compressor
1799            .compress(data.clone())
1800            .expect("write path general compression should succeed");
1801
1802        let decompressor = DefaultDecompressionStrategy::default()
1803            .create_block_decompressor(&encoding)
1804            .expect("general block decompressor should be created");
1805
1806        let decoded = decompressor
1807            .decompress(compressed_buffer, num_values)
1808            .expect("decompression should succeed");
1809
1810        match decoded {
1811            DataBlock::FixedWidth(block) => {
1812                assert_eq!(block.bits_per_value, expected_bits);
1813                assert_eq!(block.num_values, expected_num_values);
1814                assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1815            }
1816            _ => panic!("expected fixed width block"),
1817        }
1818    }
1819
1820    #[test]
1821    #[cfg(any(feature = "lz4", feature = "zstd"))]
1822    fn test_general_compression_not_selected_for_v2_1_even_if_requested() {
1823        let mut params = CompressionParams::new();
1824        params.columns.insert(
1825            "dict_values".to_string(),
1826            CompressionFieldParams {
1827                compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1828                ..Default::default()
1829            },
1830        );
1831
1832        let strategy =
1833            DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_1);
1834        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1835        let data = create_fixed_width_block(24, 1024);
1836
1837        let (_compressor, encoding) = strategy
1838            .create_block_compressor(&field, &data)
1839            .expect("block compressor selection should succeed");
1840
1841        assert!(
1842            !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1843            "general compression should not be selected for V2.1"
1844        );
1845    }
1846
1847    #[test]
1848    fn test_none_compression_disables_auto_general_block_compression() {
1849        let mut params = CompressionParams::new();
1850        params.columns.insert(
1851            "dict_values".to_string(),
1852            CompressionFieldParams {
1853                compression: Some("none".to_string()),
1854                ..Default::default()
1855            },
1856        );
1857
1858        let strategy =
1859            DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_2);
1860        let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1861        let data = create_fixed_width_block(24, 20_000);
1862
1863        assert!(
1864            data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION,
1865            "test requires block size above automatic general compression threshold"
1866        );
1867
1868        let (_compressor, encoding) = strategy
1869            .create_block_compressor(&field, &data)
1870            .expect("block compressor selection should succeed");
1871
1872        assert!(
1873            !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1874            "compression=none should disable automatic block general compression"
1875        );
1876    }
1877
1878    #[test]
1879    fn test_rle_block_used_for_version_v2_2() {
1880        let field = create_test_field("test_repdef", DataType::UInt16);
1881
1882        // Create highly repetitive data
1883        let num_values = 1000u64;
1884        let mut data = Vec::with_capacity(num_values as usize);
1885        for i in 0..10 {
1886            for _ in 0..100 {
1887                data.push(i as u16);
1888            }
1889        }
1890
1891        let mut block = FixedWidthDataBlock {
1892            bits_per_value: 16,
1893            data: LanceBuffer::reinterpret_vec(data),
1894            num_values,
1895            block_info: BlockInfo::default(),
1896        };
1897
1898        block.compute_stat();
1899
1900        let data_block = DataBlock::FixedWidth(block);
1901
1902        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1903            .with_version(LanceFileVersion::V2_2);
1904
1905        let (compressor, _) = strategy
1906            .create_block_compressor(&field, &data_block)
1907            .unwrap();
1908
1909        let debug_str = format!("{:?}", compressor);
1910        assert!(debug_str.contains("RleEncoder"));
1911    }
1912
1913    #[test]
1914    fn test_rle_block_not_used_for_version_v2_1() {
1915        let field = create_test_field("test_repdef", DataType::UInt16);
1916
1917        // Create highly repetitive data
1918        let num_values = 1000u64;
1919        let mut data = Vec::with_capacity(num_values as usize);
1920        for i in 0..10 {
1921            for _ in 0..100 {
1922                data.push(i as u16);
1923            }
1924        }
1925
1926        let mut block = FixedWidthDataBlock {
1927            bits_per_value: 16,
1928            data: LanceBuffer::reinterpret_vec(data),
1929            num_values,
1930            block_info: BlockInfo::default(),
1931        };
1932
1933        block.compute_stat();
1934
1935        let data_block = DataBlock::FixedWidth(block);
1936
1937        let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1938            .with_version(LanceFileVersion::V2_1);
1939
1940        let (compressor, _) = strategy
1941            .create_block_compressor(&field, &data_block)
1942            .unwrap();
1943
1944        let debug_str = format!("{:?}", compressor);
1945        assert!(
1946            !debug_str.contains("RleEncoder"),
1947            "RLE should not be used for V2.1"
1948        );
1949    }
1950}