lance_encoding/encodings/physical/
general.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use log::trace;
5
6use crate::{
7    buffer::LanceBuffer,
8    compression::MiniBlockDecompressor,
9    data::DataBlock,
10    encodings::{
11        logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
12        physical::block::{CompressionConfig, GeneralBufferCompressor},
13    },
14    format::{pb, ProtobufUtils},
15    Result,
16};
17
18/// A miniblock compressor that wraps another miniblock compressor and applies
19/// general-purpose compression (LZ4, Zstd) to the resulting buffers.
20#[derive(Debug)]
21pub struct GeneralMiniBlockCompressor {
22    inner: Box<dyn MiniBlockCompressor>,
23    compression: CompressionConfig,
24}
25
26impl GeneralMiniBlockCompressor {
27    pub fn new(inner: Box<dyn MiniBlockCompressor>, compression: CompressionConfig) -> Self {
28        Self { inner, compression }
29    }
30}
31
32/// Minimum buffer size to consider for compression
33const MIN_BUFFER_SIZE_FOR_COMPRESSION: usize = 4 * 1024;
34
35use super::super::logical::primitive::miniblock::MiniBlockChunk;
36
37impl MiniBlockCompressor for GeneralMiniBlockCompressor {
38    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
39        // First, compress with the inner compressor
40        let (inner_compressed, inner_encoding) = self.inner.compress(page)?;
41
42        // Return the original encoding without compression if there's no data or
43        // the first buffer is not large enough
44        if inner_compressed.data.is_empty()
45            || inner_compressed.data[0].len() < MIN_BUFFER_SIZE_FOR_COMPRESSION
46        {
47            return Ok((inner_compressed, inner_encoding));
48        }
49
50        // We'll compress each chunk's portion of the first buffer independently
51        let first_buffer = &inner_compressed.data[0];
52        let mut compressed_first_buffer = Vec::new();
53        let mut new_chunks = Vec::with_capacity(inner_compressed.chunks.iter().len());
54        let mut offset = 0usize;
55        let mut total_original_size = 0usize;
56
57        let compressor = GeneralBufferCompressor::get_compressor(self.compression);
58
59        for chunk in &inner_compressed.chunks {
60            let chunk_first_buffer_size = chunk.buffer_sizes[0] as usize;
61
62            let chunk_data = &first_buffer.as_ref()[offset..offset + chunk_first_buffer_size];
63            total_original_size += chunk_first_buffer_size;
64
65            let compressed_start = compressed_first_buffer.len();
66            compressor.compress(chunk_data, &mut compressed_first_buffer)?;
67            let compressed_size = compressed_first_buffer.len() - compressed_start;
68
69            // Create new chunk with updated first buffer size
70            let mut new_buffer_sizes = chunk.buffer_sizes.clone();
71            new_buffer_sizes[0] = compressed_size as u16;
72
73            new_chunks.push(MiniBlockChunk {
74                buffer_sizes: new_buffer_sizes,
75                log_num_values: chunk.log_num_values,
76            });
77
78            offset += chunk_first_buffer_size;
79        }
80
81        // Check if compression was effective
82        let compressed_total_size = compressed_first_buffer.len();
83        if compressed_total_size >= total_original_size {
84            // Compression didn't help, return original
85            return Ok((inner_compressed, inner_encoding));
86        }
87
88        trace!(
89            "First buffer compressed from {} to {} bytes (ratio: {:.2})",
90            total_original_size,
91            compressed_total_size,
92            compressed_total_size as f32 / total_original_size as f32
93        );
94
95        // Build final buffers: compressed first buffer + remaining original buffers
96        let mut final_buffers = vec![LanceBuffer::from(compressed_first_buffer)];
97        final_buffers.extend(inner_compressed.data.into_iter().skip(1));
98
99        let compressed_result = MiniBlockCompressed {
100            data: final_buffers,
101            chunks: new_chunks,
102            num_values: inner_compressed.num_values,
103        };
104
105        // Return compressed encoding
106        let encoding = ProtobufUtils::general_mini_block(inner_encoding, self.compression);
107        Ok((compressed_result, encoding))
108    }
109}
110
111/// A miniblock decompressor that first decompresses buffers using general-purpose
112/// compression (LZ4, Zstd) and then delegates to an inner miniblock decompressor.
113#[derive(Debug)]
114pub struct GeneralMiniBlockDecompressor {
115    inner: Box<dyn MiniBlockDecompressor>,
116    compression: CompressionConfig,
117}
118
119impl GeneralMiniBlockDecompressor {
120    pub fn new(inner: Box<dyn MiniBlockDecompressor>, compression: CompressionConfig) -> Self {
121        Self { inner, compression }
122    }
123}
124
125impl MiniBlockDecompressor for GeneralMiniBlockDecompressor {
126    fn decompress(&self, mut data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
127        let mut decompressed_buffer = Vec::new();
128
129        let decompressor = GeneralBufferCompressor::get_compressor(self.compression);
130        decompressor.decompress(&data[0], &mut decompressed_buffer)?;
131        data[0] = LanceBuffer::from(decompressed_buffer);
132
133        self.inner.decompress(data, num_values)
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
141    use crate::data::{BlockInfo, FixedWidthDataBlock};
142    use crate::encodings::physical::block::CompressionScheme;
143    use crate::encodings::physical::rle::RleMiniBlockEncoder;
144    use crate::encodings::physical::value::ValueEncoder;
145    use arrow_array::{Float64Array, Int32Array};
146
147    #[derive(Debug)]
148    struct TestCase {
149        name: &'static str,
150        inner_encoder: Box<dyn MiniBlockCompressor>,
151        compression: CompressionConfig,
152        data: DataBlock,
153        expected_compressed: bool, // Whether we expect compression to be applied
154        min_compression_ratio: f32, // Minimum compression ratio if compressed
155    }
156
157    fn create_test_cases() -> Vec<TestCase> {
158        vec![
159            // Small data with RLE - should not compress due to size threshold
160            TestCase {
161                name: "small_rle_data",
162                inner_encoder: Box::new(RleMiniBlockEncoder),
163                compression: CompressionConfig {
164                    scheme: CompressionScheme::Lz4,
165                    level: None,
166                },
167                data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2]),
168                expected_compressed: false,
169                min_compression_ratio: 1.0,
170            },
171            // Large repeated data with RLE + LZ4
172            TestCase {
173                name: "large_rle_lz4",
174                inner_encoder: Box::new(RleMiniBlockEncoder),
175                compression: CompressionConfig {
176                    scheme: CompressionScheme::Lz4,
177                    level: None,
178                },
179                data: create_pattern_i32_block(2048, |i| (i / 8) as i32),
180                expected_compressed: false, // RLE already compresses well, additional LZ4 may not help
181                min_compression_ratio: 1.0,
182            },
183            // Large repeated data with RLE + Zstd
184            TestCase {
185                name: "large_rle_zstd",
186                inner_encoder: Box::new(RleMiniBlockEncoder),
187                compression: CompressionConfig {
188                    scheme: CompressionScheme::Zstd,
189                    level: Some(3),
190                },
191                data: create_pattern_i32_block(8192, |i| (i / 16) as i32),
192                expected_compressed: true, // Zstd might provide additional compression
193                min_compression_ratio: 0.9, // But not as much since RLE already compressed
194            },
195            // Sequential data with ValueEncoder + LZ4
196            TestCase {
197                name: "sequential_value_lz4",
198                inner_encoder: Box::new(ValueEncoder {}),
199                compression: CompressionConfig {
200                    scheme: CompressionScheme::Lz4,
201                    level: None,
202                },
203                data: create_pattern_i32_block(1024, |i| i as i32),
204                expected_compressed: false, // Sequential data doesn't compress well
205                min_compression_ratio: 1.0,
206            },
207            // Float data with ValueEncoder + Zstd
208            TestCase {
209                name: "float_value_zstd",
210                inner_encoder: Box::new(ValueEncoder {}),
211                compression: CompressionConfig {
212                    scheme: CompressionScheme::Zstd,
213                    level: Some(3),
214                },
215                data: create_pattern_f64_block(1024, |i| i as f64 * 0.1),
216                expected_compressed: true,
217                min_compression_ratio: 0.9,
218            },
219        ]
220    }
221
222    fn create_repeated_i32_block(values: Vec<i32>) -> DataBlock {
223        let array = Int32Array::from(values);
224        DataBlock::from_array(array)
225    }
226
227    fn create_pattern_i32_block<F>(size: usize, pattern: F) -> DataBlock
228    where
229        F: Fn(usize) -> i32,
230    {
231        let values: Vec<i32> = (0..size).map(pattern).collect();
232        let array = Int32Array::from(values);
233        DataBlock::from_array(array)
234    }
235
236    fn create_pattern_f64_block<F>(size: usize, pattern: F) -> DataBlock
237    where
238        F: Fn(usize) -> f64,
239    {
240        let values: Vec<f64> = (0..size).map(pattern).collect();
241        let array = Float64Array::from(values);
242        DataBlock::from_array(array)
243    }
244
245    fn run_round_trip_test(test_case: TestCase) {
246        let compressor =
247            GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression);
248
249        // Compress the data
250        let (compressed, encoding) = compressor.compress(test_case.data).unwrap();
251
252        // Check if compression was applied as expected
253        match &encoding.array_encoding {
254            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
255                assert!(
256                    test_case.expected_compressed,
257                    "{}: Expected compression to be applied",
258                    test_case.name
259                );
260                assert_eq!(
261                    cm.compression.as_ref().unwrap().scheme,
262                    test_case.compression.scheme.to_string()
263                );
264            }
265            _ => {
266                // Could be RLE or other encoding if compression didn't help
267                if test_case.expected_compressed {
268                    // Check if it's RLE encoding (which means compression didn't help)
269                    match &encoding.array_encoding {
270                        Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
271                            // RLE encoding returned - compression didn't help
272                        }
273                        Some(pb::array_encoding::ArrayEncoding::Flat(_)) => {
274                            // Flat encoding returned - compression didn't help
275                        }
276                        _ => {
277                            panic!(
278                                "{}: Expected GeneralMiniBlock but got {:?}",
279                                test_case.name, encoding.array_encoding
280                            );
281                        }
282                    }
283                }
284            }
285        }
286
287        // Verify chunks are created correctly
288        assert!(
289            !compressed.chunks.is_empty(),
290            "{}: No chunks created",
291            test_case.name
292        );
293
294        // Test decompression by simulating the actual miniblock decoding process
295        let decompressed_data = decompress_miniblock_chunks(&compressed, &encoding);
296
297        // Verify round trip by checking data size
298        // We expect the decompressed data to match the original number of values
299        // The bytes per value depends on the test case
300        let bytes_per_value = if test_case.name.contains("float") {
301            8 // f64
302        } else {
303            4 // i32
304        };
305        let expected_bytes = compressed.num_values as usize * bytes_per_value;
306        assert_eq!(
307            expected_bytes,
308            decompressed_data.len(),
309            "{}: Data size mismatch",
310            test_case.name
311        );
312
313        // Check compression ratio if applicable
314        if test_case.expected_compressed {
315            let compression_ratio = compressed.data[0].len() as f32 / expected_bytes as f32;
316            assert!(
317                compression_ratio <= test_case.min_compression_ratio,
318                "{}: Compression ratio {} > expected {}",
319                test_case.name,
320                compression_ratio,
321                test_case.min_compression_ratio
322            );
323        }
324    }
325
326    fn decompress_miniblock_chunks(
327        compressed: &MiniBlockCompressed,
328        encoding: &pb::ArrayEncoding,
329    ) -> Vec<u8> {
330        let mut decompressed_data = Vec::new();
331        let mut offsets = vec![0usize; compressed.data.len()]; // Track offset for each buffer
332        let decompression_strategy = DefaultDecompressionStrategy::default();
333
334        for chunk in &compressed.chunks {
335            let chunk_values = if chunk.log_num_values > 0 {
336                1u64 << chunk.log_num_values
337            } else {
338                // Last chunk - calculate remaining values
339                let decompressed_values =
340                    decompressed_data.len() as u64 / get_bytes_per_value(compressed) as u64;
341                compressed.num_values.saturating_sub(decompressed_values)
342            };
343
344            // Extract buffers for this chunk
345            let mut chunk_buffers = Vec::new();
346            for (i, &size) in chunk.buffer_sizes.iter().enumerate() {
347                if i < compressed.data.len() {
348                    let buffer_data =
349                        compressed.data[i].slice_with_length(offsets[i], size as usize);
350                    chunk_buffers.push(buffer_data);
351                    offsets[i] += size as usize;
352                }
353            }
354
355            // Create a decompressor for this chunk
356            let decompressor = decompression_strategy
357                .create_miniblock_decompressor(encoding, &decompression_strategy)
358                .unwrap();
359
360            // Decompress the chunk
361            let chunk_decompressed = decompressor
362                .decompress(chunk_buffers, chunk_values)
363                .unwrap();
364
365            match chunk_decompressed {
366                DataBlock::FixedWidth(ref block) => {
367                    decompressed_data.extend_from_slice(block.data.as_ref());
368                }
369                _ => panic!("Expected FixedWidth block"),
370            }
371        }
372
373        decompressed_data
374    }
375
376    fn get_bytes_per_value(compressed: &MiniBlockCompressed) -> usize {
377        // This is a simplification - in reality we'd need to know the data type
378        // For our tests, we mostly use i32 (4 bytes) or f64 (8 bytes)
379        // We can try to guess based on the data size
380        if compressed.num_values == 0 {
381            return 4; // Default to i32
382        }
383
384        // For float tests, the number is usually 1024 and we use f64
385        if compressed.num_values == 1024 {
386            return 8; // Likely f64
387        }
388
389        4 // Default to i32
390    }
391
392    #[test]
393    fn test_compressed_mini_block_table_driven() {
394        for test_case in create_test_cases() {
395            run_round_trip_test(test_case);
396        }
397    }
398
399    #[test]
400    fn test_compressed_mini_block_threshold() {
401        // Test that small buffers don't get compressed
402        let small_test = TestCase {
403            name: "small_buffer_no_compression",
404            inner_encoder: Box::new(RleMiniBlockEncoder),
405            compression: CompressionConfig {
406                scheme: CompressionScheme::Lz4,
407                level: None,
408            },
409            data: create_repeated_i32_block(vec![1, 1, 2, 2]),
410            expected_compressed: false,
411            min_compression_ratio: 1.0,
412        };
413        run_round_trip_test(small_test);
414    }
415
416    #[test]
417    fn test_compressed_mini_block_with_doubles() {
418        // Test with large sequential doubles that should compress well with Zstd
419        // The test focuses on verifying that GeneralMiniBlock works correctly
420        // when wrapping a simple ValueEncoder
421        let test_case = TestCase {
422            name: "float_values_with_zstd",
423            inner_encoder: Box::new(ValueEncoder {}),
424            compression: CompressionConfig {
425                scheme: CompressionScheme::Zstd,
426                level: Some(3),
427            },
428            // Create enough data to ensure compression is applied
429            data: create_pattern_f64_block(1024, |i| (i / 10) as f64),
430            expected_compressed: true,
431            min_compression_ratio: 0.5, // Zstd should achieve good compression on repetitive data
432        };
433
434        run_round_trip_test(test_case);
435    }
436
437    #[test]
438    fn test_compressed_mini_block_with_zstd() {
439        // Create test data with many unique values to ensure larger buffers
440        // RLE is efficient with runs, so we need more unique values
441        let mut values = Vec::with_capacity(8192);
442        // Create 512 unique values, each repeated 16 times
443        for i in 0..512 {
444            values.extend(vec![i as u16; 16]);
445        }
446        let data = LanceBuffer::from_bytes(
447            bytemuck::cast_slice(&values).to_vec().into(),
448            std::mem::align_of::<u16>() as u64,
449        );
450        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
451            bits_per_value: 16,
452            data,
453            num_values: 8192,
454            block_info: BlockInfo::new(),
455        });
456
457        // Create compressor with RLE encoder and Zstd compression
458        let inner = Box::new(RleMiniBlockEncoder);
459        let compression = CompressionConfig {
460            scheme: CompressionScheme::Zstd,
461            level: Some(3),
462        };
463        let compressor = GeneralMiniBlockCompressor::new(inner, compression);
464
465        // Compress the data
466        let (compressed, encoding) = compressor.compress(block).unwrap();
467
468        // Verify the encoding structure
469        match &encoding.array_encoding {
470            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
471                assert!(cm.inner.is_some());
472                assert_eq!(cm.compression.as_ref().unwrap().scheme, "zstd");
473                assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
474            }
475            Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {}
476            _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
477        }
478
479        // Verify basic properties
480        assert_eq!(compressed.num_values, 8192);
481        assert_eq!(compressed.data.len(), 2);
482    }
483
484    #[test]
485    fn test_compressed_mini_block_large_buffers() {
486        // Use value encoding which doesn't compress data, ensuring large buffers
487        // Create 1024 i32 values (4KB of data)
488        let values: Vec<i32> = (0..1024).collect();
489        let data = LanceBuffer::from_bytes(
490            bytemuck::cast_slice(&values).to_vec().into(),
491            std::mem::align_of::<i32>() as u64,
492        );
493        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
494            bits_per_value: 32,
495            data,
496            num_values: 1024,
497            block_info: BlockInfo::new(),
498        });
499
500        // Create compressor with ValueEncoder (no compression) and Zstd wrapper
501        let inner = Box::new(ValueEncoder {});
502        let compression = CompressionConfig {
503            scheme: CompressionScheme::Zstd,
504            level: Some(3),
505        };
506        let compressor = GeneralMiniBlockCompressor::new(inner, compression);
507
508        // Compress the data
509        let (compressed, encoding) = compressor.compress(block).unwrap();
510
511        // Should get GeneralMiniBlock encoding since buffer is 4KB
512        match &encoding.array_encoding {
513            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
514                assert!(cm.inner.is_some());
515                assert_eq!(cm.compression.as_ref().unwrap().scheme, "zstd");
516                assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
517
518                // Verify inner encoding is Flat (from ValueEncoder)
519                match &cm.inner.as_ref().unwrap().array_encoding {
520                    Some(pb::array_encoding::ArrayEncoding::Flat(flat)) => {
521                        assert_eq!(flat.bits_per_value, 32);
522                    }
523                    _ => panic!("Expected Flat inner encoding"),
524                }
525            }
526            _ => panic!("Expected GeneralMiniBlock encoding"),
527        }
528
529        assert_eq!(compressed.num_values, 1024);
530        // ValueEncoder produces 1 buffer, so compressed result also has 1 buffer
531        assert_eq!(compressed.data.len(), 1);
532    }
533
534    #[test]
535    fn test_compressed_mini_block_with_lz4() {
536        // Create test data with repeated patterns that LZ4 can compress well
537        let mut values = Vec::with_capacity(2048);
538        // Create a pattern with some repetition
539        for i in 0..256i32 {
540            for _ in 0..8 {
541                values.push(i);
542            }
543        }
544
545        let data = LanceBuffer::from_bytes(
546            bytemuck::cast_slice(&values).to_vec().into(),
547            std::mem::align_of::<i32>() as u64,
548        );
549        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
550            bits_per_value: 32,
551            data,
552            num_values: 2048,
553            block_info: BlockInfo::new(),
554        });
555
556        // Create compressor with ValueEncoder and LZ4 compression
557        let inner = Box::new(ValueEncoder {});
558        let compression = CompressionConfig {
559            scheme: CompressionScheme::Lz4,
560            level: None,
561        };
562        let compressor = GeneralMiniBlockCompressor::new(inner, compression);
563
564        // Compress the data
565        let (compressed, encoding) = compressor.compress(block).unwrap();
566
567        // Should get GeneralMiniBlock encoding
568        match &encoding.array_encoding {
569            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
570                assert!(cm.inner.is_some());
571                assert_eq!(cm.compression.as_ref().unwrap().scheme, "lz4");
572                assert_eq!(cm.compression.as_ref().unwrap().level, None);
573
574                // Verify inner encoding is Flat (from ValueEncoder)
575                match &cm.inner.as_ref().unwrap().array_encoding {
576                    Some(pb::array_encoding::ArrayEncoding::Flat(flat)) => {
577                        assert_eq!(flat.bits_per_value, 32);
578                    }
579                    _ => panic!("Expected Flat inner encoding"),
580                }
581            }
582            _ => panic!("Expected GeneralMiniBlock encoding"),
583        }
584
585        assert_eq!(compressed.num_values, 2048);
586        // ValueEncoder produces 1 buffer, so compressed result also has 1 buffer
587        assert_eq!(compressed.data.len(), 1);
588
589        // Test decompression
590        // In real usage, miniblock format will decompress each chunk separately
591        // We need to simulate this by decompressing each chunk and then combining them
592
593        let mut decompressed_data = Vec::new();
594        let mut offset = 0usize;
595
596        for chunk in &compressed.chunks {
597            let chunk_size = chunk.buffer_sizes[0] as usize;
598            let chunk_values = if chunk.log_num_values > 0 {
599                1u64 << chunk.log_num_values
600            } else {
601                // Last chunk
602                compressed.num_values - decompressed_data.len() as u64 / 4
603            };
604
605            // Extract this chunk's compressed data
606            let chunk_compressed_data = compressed.data[0].slice_with_length(offset, chunk_size);
607
608            // Create a decompressor for this chunk
609            let decompression_strategy = DefaultDecompressionStrategy::default();
610            let decompressor = decompression_strategy
611                .create_miniblock_decompressor(&encoding, &decompression_strategy)
612                .unwrap();
613
614            // Decompress the chunk
615            let chunk_decompressed = decompressor
616                .decompress(vec![chunk_compressed_data], chunk_values)
617                .unwrap();
618
619            match chunk_decompressed {
620                DataBlock::FixedWidth(ref block) => {
621                    decompressed_data.extend_from_slice(block.data.as_ref());
622                }
623                _ => panic!("Expected FixedWidth block"),
624            }
625
626            offset += chunk_size;
627        }
628
629        // Verify the round trip
630        let decompressed_values: &[i32] = bytemuck::cast_slice(&decompressed_data);
631        assert_eq!(values.len(), decompressed_values.len());
632        assert_eq!(values, decompressed_values);
633    }
634
635    // Special test cases that don't fit the table-driven pattern
636
637    #[test]
638    fn test_compressed_mini_block_rle_multiple_buffers() {
639        // RLE produces 2 buffers (values and lengths), test that both are handled correctly
640        let data = create_repeated_i32_block(vec![1; 100]);
641        let compressor = GeneralMiniBlockCompressor::new(
642            Box::new(RleMiniBlockEncoder),
643            CompressionConfig {
644                scheme: CompressionScheme::Lz4,
645                level: None,
646            },
647        );
648
649        let (compressed, _) = compressor.compress(data).unwrap();
650        // RLE produces 2 buffers, but only the first one is compressed
651        assert_eq!(compressed.data.len(), 2);
652    }
653
654    #[test]
655    fn test_rle_with_general_miniblock_wrapper() {
656        // Test that RLE encoding with bits_per_value >= 32 is automatically wrapped
657        // in GeneralMiniBlock with LZ4 compression
658
659        // This test directly tests the RLE encoder behavior
660        // When bits_per_value >= 32, RLE should be wrapped in GeneralMiniBlock with LZ4
661
662        // Test case 1: 32-bit RLE data
663        let test_32 = TestCase {
664            name: "rle_32bit_with_general_wrapper",
665            inner_encoder: Box::new(RleMiniBlockEncoder),
666            compression: CompressionConfig {
667                scheme: CompressionScheme::Lz4,
668                level: None,
669            },
670            data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]),
671            expected_compressed: false, // RLE already compresses well, LZ4 might not help much
672            min_compression_ratio: 1.0,
673        };
674
675        // For 32-bit RLE, the compression strategy should automatically wrap it
676        // Let's directly test the compressor
677        let compressor = GeneralMiniBlockCompressor::new(
678            Box::new(RleMiniBlockEncoder),
679            CompressionConfig {
680                scheme: CompressionScheme::Lz4,
681                level: None,
682            },
683        );
684
685        let (_compressed, encoding) = compressor.compress(test_32.data).unwrap();
686
687        // Verify the encoding structure
688        match &encoding.array_encoding {
689            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
690                // Check inner encoding is RLE
691                match &gm.inner.as_ref().unwrap().array_encoding {
692                    Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
693                        assert_eq!(rle.bits_per_value, 32);
694                    }
695                    _ => panic!("Expected RLE as inner encoding"),
696                }
697                // Check compression is LZ4
698                assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
699            }
700            Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
701                // Also acceptable if compression didn't help
702            }
703            _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
704        }
705
706        // Test case 2: 64-bit RLE data
707        let values_64: Vec<i64> = vec![100i64; 50]
708            .into_iter()
709            .chain(vec![200i64; 50])
710            .chain(vec![300i64; 50])
711            .collect();
712        let array_64 = arrow_array::Int64Array::from(values_64);
713        let block_64 = DataBlock::from_array(array_64);
714
715        let compressor_64 = GeneralMiniBlockCompressor::new(
716            Box::new(RleMiniBlockEncoder),
717            CompressionConfig {
718                scheme: CompressionScheme::Lz4,
719                level: None,
720            },
721        );
722
723        let (_compressed_64, encoding_64) = compressor_64.compress(block_64).unwrap();
724
725        // Verify the encoding structure for 64-bit
726        match &encoding_64.array_encoding {
727            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
728                // Check inner encoding is RLE
729                match &gm.inner.as_ref().unwrap().array_encoding {
730                    Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
731                        assert_eq!(rle.bits_per_value, 64);
732                    }
733                    _ => panic!("Expected RLE as inner encoding for 64-bit"),
734                }
735                // Check compression is LZ4
736                assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
737            }
738            Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
739                // Also acceptable if compression didn't help
740            }
741            _ => panic!("Expected GeneralMiniBlock or Rle encoding for 64-bit"),
742        }
743    }
744
745    #[test]
746    fn test_compressed_mini_block_empty_data() {
747        let empty_array = Int32Array::from(vec![] as Vec<i32>);
748        let empty_block = DataBlock::from_array(empty_array);
749
750        let compressor = GeneralMiniBlockCompressor::new(
751            Box::new(ValueEncoder {}),
752            CompressionConfig {
753                scheme: CompressionScheme::Lz4,
754                level: None,
755            },
756        );
757
758        let result = compressor.compress(empty_block);
759        match result {
760            Ok((compressed, _)) => {
761                assert_eq!(compressed.num_values, 0);
762            }
763            Err(_) => {
764                // Empty data might not be supported by ValueEncoder
765            }
766        }
767    }
768}