lance_encoding/encodings/physical/
general.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use log::trace;
5
6use crate::{
7    buffer::LanceBuffer,
8    compression::MiniBlockDecompressor,
9    data::DataBlock,
10    encodings::{
11        logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
12        physical::block::{CompressionConfig, GeneralBufferCompressor},
13    },
14    format::{pb, ProtobufUtils},
15    Result,
16};
17
18/// A miniblock compressor that wraps another miniblock compressor and applies
19/// general-purpose compression (LZ4, Zstd) to the resulting buffers.
20#[derive(Debug)]
21pub struct GeneralMiniBlockCompressor {
22    inner: Box<dyn MiniBlockCompressor>,
23    compression: CompressionConfig,
24}
25
26impl GeneralMiniBlockCompressor {
27    pub fn new(inner: Box<dyn MiniBlockCompressor>, compression: CompressionConfig) -> Self {
28        Self { inner, compression }
29    }
30}
31
32/// Minimum buffer size to consider for compression
33const MIN_BUFFER_SIZE_FOR_COMPRESSION: usize = 4 * 1024;
34
35use super::super::logical::primitive::miniblock::MiniBlockChunk;
36
37impl MiniBlockCompressor for GeneralMiniBlockCompressor {
38    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
39        // First, compress with the inner compressor
40        let (inner_compressed, inner_encoding) = self.inner.compress(page)?;
41
42        // Return the original encoding without compression if there's no data or
43        // the first buffer is not large enough
44        if inner_compressed.data.is_empty()
45            || inner_compressed.data[0].len() < MIN_BUFFER_SIZE_FOR_COMPRESSION
46        {
47            return Ok((inner_compressed, inner_encoding));
48        }
49
50        // We'll compress each chunk's portion of the first buffer independently
51        let first_buffer = &inner_compressed.data[0];
52        let mut compressed_first_buffer = Vec::new();
53        let mut new_chunks = Vec::with_capacity(inner_compressed.chunks.iter().len());
54        let mut offset = 0usize;
55        let mut total_original_size = 0usize;
56
57        let compressor = GeneralBufferCompressor::get_compressor(self.compression);
58
59        for chunk in &inner_compressed.chunks {
60            let chunk_first_buffer_size = chunk.buffer_sizes[0] as usize;
61
62            let chunk_data = &first_buffer.as_ref()[offset..offset + chunk_first_buffer_size];
63            total_original_size += chunk_first_buffer_size;
64
65            let compressed_start = compressed_first_buffer.len();
66            compressor.compress(chunk_data, &mut compressed_first_buffer)?;
67            let compressed_size = compressed_first_buffer.len() - compressed_start;
68
69            // Create new chunk with updated first buffer size
70            let mut new_buffer_sizes = chunk.buffer_sizes.clone();
71            new_buffer_sizes[0] = compressed_size as u16;
72
73            new_chunks.push(MiniBlockChunk {
74                buffer_sizes: new_buffer_sizes,
75                log_num_values: chunk.log_num_values,
76            });
77
78            offset += chunk_first_buffer_size;
79        }
80
81        // Check if compression was effective
82        let compressed_total_size = compressed_first_buffer.len();
83        if compressed_total_size >= total_original_size {
84            // Compression didn't help, return original
85            return Ok((inner_compressed, inner_encoding));
86        }
87
88        trace!(
89            "First buffer compressed from {} to {} bytes (ratio: {:.2})",
90            total_original_size,
91            compressed_total_size,
92            compressed_total_size as f32 / total_original_size as f32
93        );
94
95        // Build final buffers: compressed first buffer + remaining original buffers
96        let mut final_buffers = vec![LanceBuffer::from(compressed_first_buffer)];
97        final_buffers.extend(inner_compressed.data.into_iter().skip(1));
98
99        let compressed_result = MiniBlockCompressed {
100            data: final_buffers,
101            chunks: new_chunks,
102            num_values: inner_compressed.num_values,
103        };
104
105        // Return compressed encoding
106        let encoding = ProtobufUtils::general_mini_block(inner_encoding, self.compression);
107        Ok((compressed_result, encoding))
108    }
109}
110
111/// A miniblock decompressor that first decompresses buffers using general-purpose
112/// compression (LZ4, Zstd) and then delegates to an inner miniblock decompressor.
113#[derive(Debug)]
114pub struct GeneralMiniBlockDecompressor {
115    inner: Box<dyn MiniBlockDecompressor>,
116    compression: CompressionConfig,
117}
118
119impl GeneralMiniBlockDecompressor {
120    pub fn new(inner: Box<dyn MiniBlockDecompressor>, compression: CompressionConfig) -> Self {
121        Self { inner, compression }
122    }
123}
124
125impl MiniBlockDecompressor for GeneralMiniBlockDecompressor {
126    fn decompress(&self, mut data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
127        let mut decompressed_buffer = Vec::new();
128
129        let decompressor = GeneralBufferCompressor::get_compressor(self.compression);
130        decompressor.decompress(&data[0], &mut decompressed_buffer)?;
131        data[0] = LanceBuffer::from(decompressed_buffer);
132
133        self.inner.decompress(data, num_values)
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
141    use crate::data::{BlockInfo, FixedWidthDataBlock};
142    use crate::encodings::physical::block::CompressionScheme;
143    use crate::encodings::physical::rle::RleMiniBlockEncoder;
144    use crate::encodings::physical::value::ValueEncoder;
145    use arrow_array::{Float64Array, Int32Array};
146
147    #[derive(Debug)]
148    struct TestCase {
149        name: &'static str,
150        inner_encoder: Box<dyn MiniBlockCompressor>,
151        compression: CompressionConfig,
152        data: DataBlock,
153        expected_compressed: bool, // Whether we expect compression to be applied
154        min_compression_ratio: f32, // Minimum compression ratio if compressed
155    }
156
157    fn create_test_cases() -> Vec<TestCase> {
158        vec![
159            // Small data with RLE - should not compress due to size threshold
160            TestCase {
161                name: "small_rle_data",
162                inner_encoder: Box::new(RleMiniBlockEncoder),
163                compression: CompressionConfig {
164                    scheme: CompressionScheme::Lz4,
165                    level: None,
166                },
167                data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2]),
168                expected_compressed: false,
169                min_compression_ratio: 1.0,
170            },
171            // Large repeated data with RLE + LZ4
172            TestCase {
173                name: "large_rle_lz4",
174                inner_encoder: Box::new(RleMiniBlockEncoder),
175                compression: CompressionConfig {
176                    scheme: CompressionScheme::Lz4,
177                    level: None,
178                },
179                data: create_pattern_i32_block(2048, |i| (i / 8) as i32),
180                expected_compressed: false, // RLE already compresses well, additional LZ4 may not help
181                min_compression_ratio: 1.0,
182            },
183            // Large repeated data with RLE + Zstd
184            TestCase {
185                name: "large_rle_zstd",
186                inner_encoder: Box::new(RleMiniBlockEncoder),
187                compression: CompressionConfig {
188                    scheme: CompressionScheme::Zstd,
189                    level: Some(3),
190                },
191                data: create_pattern_i32_block(8192, |i| (i / 16) as i32),
192                expected_compressed: true, // Zstd might provide additional compression
193                min_compression_ratio: 0.9, // But not as much since RLE already compressed
194            },
195            // Sequential data with ValueEncoder + LZ4
196            TestCase {
197                name: "sequential_value_lz4",
198                inner_encoder: Box::new(ValueEncoder {}),
199                compression: CompressionConfig {
200                    scheme: CompressionScheme::Lz4,
201                    level: None,
202                },
203                data: create_pattern_i32_block(1024, |i| i as i32),
204                expected_compressed: false, // Sequential data doesn't compress well
205                min_compression_ratio: 1.0,
206            },
207            // Float data with ValueEncoder + Zstd
208            TestCase {
209                name: "float_value_zstd",
210                inner_encoder: Box::new(ValueEncoder {}),
211                compression: CompressionConfig {
212                    scheme: CompressionScheme::Zstd,
213                    level: Some(3),
214                },
215                data: create_pattern_f64_block(1024, |i| i as f64 * 0.1),
216                expected_compressed: true,
217                min_compression_ratio: 0.9,
218            },
219        ]
220    }
221
222    fn create_repeated_i32_block(values: Vec<i32>) -> DataBlock {
223        let array = Int32Array::from(values);
224        DataBlock::from_array(array)
225    }
226
227    fn create_pattern_i32_block<F>(size: usize, pattern: F) -> DataBlock
228    where
229        F: Fn(usize) -> i32,
230    {
231        let values: Vec<i32> = (0..size).map(pattern).collect();
232        let array = Int32Array::from(values);
233        DataBlock::from_array(array)
234    }
235
236    fn create_pattern_f64_block<F>(size: usize, pattern: F) -> DataBlock
237    where
238        F: Fn(usize) -> f64,
239    {
240        let values: Vec<f64> = (0..size).map(pattern).collect();
241        let array = Float64Array::from(values);
242        DataBlock::from_array(array)
243    }
244
245    fn run_round_trip_test(test_case: TestCase) {
246        let compressor =
247            GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression);
248
249        // Compress the data
250        let (compressed, encoding) = compressor.compress(test_case.data).unwrap();
251
252        // Check if compression was applied as expected
253        match &encoding.array_encoding {
254            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
255                assert!(
256                    test_case.expected_compressed,
257                    "{}: Expected compression to be applied",
258                    test_case.name
259                );
260                assert_eq!(
261                    cm.compression.as_ref().unwrap().scheme,
262                    test_case.compression.scheme.to_string()
263                );
264            }
265            _ => {
266                // Could be RLE or other encoding if compression didn't help
267                if test_case.expected_compressed {
268                    // Check if it's RLE encoding (which means compression didn't help)
269                    match &encoding.array_encoding {
270                        Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
271                            // RLE encoding returned - compression didn't help
272                        }
273                        Some(pb::array_encoding::ArrayEncoding::Flat(_)) => {
274                            // Flat encoding returned - compression didn't help
275                        }
276                        _ => {
277                            panic!(
278                                "{}: Expected GeneralMiniBlock but got {:?}",
279                                test_case.name, encoding.array_encoding
280                            );
281                        }
282                    }
283                }
284            }
285        }
286
287        // Verify chunks are created correctly
288        assert!(
289            !compressed.chunks.is_empty(),
290            "{}: No chunks created",
291            test_case.name
292        );
293
294        // Test decompression by simulating the actual miniblock decoding process
295        let decompressed_data = decompress_miniblock_chunks(&compressed, &encoding);
296
297        // Verify round trip by checking data size
298        // We expect the decompressed data to match the original number of values
299        // The bytes per value depends on the test case
300        let bytes_per_value = if test_case.name.contains("float") {
301            8 // f64
302        } else {
303            4 // i32
304        };
305        let expected_bytes = compressed.num_values as usize * bytes_per_value;
306        assert_eq!(
307            expected_bytes,
308            decompressed_data.len(),
309            "{}: Data size mismatch",
310            test_case.name
311        );
312
313        // Check compression ratio if applicable
314        if test_case.expected_compressed {
315            let compression_ratio = compressed.data[0].len() as f32 / expected_bytes as f32;
316            assert!(
317                compression_ratio <= test_case.min_compression_ratio,
318                "{}: Compression ratio {} > expected {}",
319                test_case.name,
320                compression_ratio,
321                test_case.min_compression_ratio
322            );
323        }
324    }
325
326    fn decompress_miniblock_chunks(
327        compressed: &MiniBlockCompressed,
328        encoding: &pb::ArrayEncoding,
329    ) -> Vec<u8> {
330        let mut decompressed_data = Vec::new();
331        let mut offsets = vec![0usize; compressed.data.len()]; // Track offset for each buffer
332        let decompression_strategy = DefaultDecompressionStrategy::default();
333
334        for chunk in &compressed.chunks {
335            let chunk_values = if chunk.log_num_values > 0 {
336                1u64 << chunk.log_num_values
337            } else {
338                // Last chunk - calculate remaining values
339                let decompressed_values =
340                    decompressed_data.len() as u64 / get_bytes_per_value(compressed) as u64;
341                compressed.num_values.saturating_sub(decompressed_values)
342            };
343
344            // Extract buffers for this chunk
345            let mut chunk_buffers = Vec::new();
346            for (i, &size) in chunk.buffer_sizes.iter().enumerate() {
347                if i < compressed.data.len() {
348                    let buffer_data =
349                        compressed.data[i].slice_with_length(offsets[i], size as usize);
350                    chunk_buffers.push(buffer_data);
351                    offsets[i] += size as usize;
352                }
353            }
354
355            // Create a decompressor for this chunk
356            let decompressor = decompression_strategy
357                .create_miniblock_decompressor(encoding, &decompression_strategy)
358                .unwrap();
359
360            // Decompress the chunk
361            let chunk_decompressed = decompressor
362                .decompress(chunk_buffers, chunk_values)
363                .unwrap();
364
365            match chunk_decompressed {
366                DataBlock::FixedWidth(ref block) => {
367                    decompressed_data.extend_from_slice(block.data.as_ref());
368                }
369                _ => panic!("Expected FixedWidth block"),
370            }
371        }
372
373        decompressed_data
374    }
375
376    fn get_bytes_per_value(compressed: &MiniBlockCompressed) -> usize {
377        // This is a simplification - in reality we'd need to know the data type
378        // For our tests, we mostly use i32 (4 bytes) or f64 (8 bytes)
379        // We can try to guess based on the data size
380        if compressed.num_values == 0 {
381            return 4; // Default to i32
382        }
383
384        // For float tests, the number is usually 1024 and we use f64
385        if compressed.num_values == 1024 {
386            return 8; // Likely f64
387        }
388
389        4 // Default to i32
390    }
391
392    #[test]
393    fn test_compressed_mini_block_table_driven() {
394        for test_case in create_test_cases() {
395            run_round_trip_test(test_case);
396        }
397    }
398
399    #[test]
400    fn test_compressed_mini_block_threshold() {
401        // Test that small buffers don't get compressed
402        let small_test = TestCase {
403            name: "small_buffer_no_compression",
404            inner_encoder: Box::new(RleMiniBlockEncoder),
405            compression: CompressionConfig {
406                scheme: CompressionScheme::Lz4,
407                level: None,
408            },
409            data: create_repeated_i32_block(vec![1, 1, 2, 2]),
410            expected_compressed: false,
411            min_compression_ratio: 1.0,
412        };
413        run_round_trip_test(small_test);
414    }
415
416    #[test]
417    fn test_compressed_mini_block_with_doubles() {
418        // Test with large sequential doubles that should compress well with Zstd
419        // The test focuses on verifying that GeneralMiniBlock works correctly
420        // when wrapping a simple ValueEncoder
421        let test_case = TestCase {
422            name: "float_values_with_zstd",
423            inner_encoder: Box::new(ValueEncoder {}),
424            compression: CompressionConfig {
425                scheme: CompressionScheme::Zstd,
426                level: Some(3),
427            },
428            // Create enough data to ensure compression is applied
429            data: create_pattern_f64_block(1024, |i| (i / 10) as f64),
430            expected_compressed: true,
431            min_compression_ratio: 0.5, // Zstd should achieve good compression on repetitive data
432        };
433
434        run_round_trip_test(test_case);
435    }
436
437    #[test]
438    fn test_compressed_mini_block_large_buffers() {
439        // Use value encoding which doesn't compress data, ensuring large buffers
440        // Create 1024 i32 values (4KB of data)
441        let values: Vec<i32> = (0..1024).collect();
442        let data = LanceBuffer::from_bytes(
443            bytemuck::cast_slice(&values).to_vec().into(),
444            std::mem::align_of::<i32>() as u64,
445        );
446        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
447            bits_per_value: 32,
448            data,
449            num_values: 1024,
450            block_info: BlockInfo::new(),
451        });
452
453        // Create compressor with ValueEncoder (no compression) and Zstd wrapper
454        let inner = Box::new(ValueEncoder {});
455        let compression = CompressionConfig {
456            scheme: CompressionScheme::Zstd,
457            level: Some(3),
458        };
459        let compressor = GeneralMiniBlockCompressor::new(inner, compression);
460
461        // Compress the data
462        let (compressed, encoding) = compressor.compress(block).unwrap();
463
464        // Should get GeneralMiniBlock encoding since buffer is 4KB
465        match &encoding.array_encoding {
466            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
467                assert!(cm.inner.is_some());
468                assert_eq!(cm.compression.as_ref().unwrap().scheme, "zstd");
469                assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
470
471                // Verify inner encoding is Flat (from ValueEncoder)
472                match &cm.inner.as_ref().unwrap().array_encoding {
473                    Some(pb::array_encoding::ArrayEncoding::Flat(flat)) => {
474                        assert_eq!(flat.bits_per_value, 32);
475                    }
476                    _ => panic!("Expected Flat inner encoding"),
477                }
478            }
479            _ => panic!("Expected GeneralMiniBlock encoding"),
480        }
481
482        assert_eq!(compressed.num_values, 1024);
483        // ValueEncoder produces 1 buffer, so compressed result also has 1 buffer
484        assert_eq!(compressed.data.len(), 1);
485    }
486
487    // Special test cases that don't fit the table-driven pattern
488
489    #[test]
490    fn test_compressed_mini_block_rle_multiple_buffers() {
491        // RLE produces 2 buffers (values and lengths), test that both are handled correctly
492        let data = create_repeated_i32_block(vec![1; 100]);
493        let compressor = GeneralMiniBlockCompressor::new(
494            Box::new(RleMiniBlockEncoder),
495            CompressionConfig {
496                scheme: CompressionScheme::Lz4,
497                level: None,
498            },
499        );
500
501        let (compressed, _) = compressor.compress(data).unwrap();
502        // RLE produces 2 buffers, but only the first one is compressed
503        assert_eq!(compressed.data.len(), 2);
504    }
505
506    #[test]
507    fn test_rle_with_general_miniblock_wrapper() {
508        // Test that RLE encoding with bits_per_value >= 32 is automatically wrapped
509        // in GeneralMiniBlock with LZ4 compression
510
511        // This test directly tests the RLE encoder behavior
512        // When bits_per_value >= 32, RLE should be wrapped in GeneralMiniBlock with LZ4
513
514        // Test case 1: 32-bit RLE data
515        let test_32 = TestCase {
516            name: "rle_32bit_with_general_wrapper",
517            inner_encoder: Box::new(RleMiniBlockEncoder),
518            compression: CompressionConfig {
519                scheme: CompressionScheme::Lz4,
520                level: None,
521            },
522            data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]),
523            expected_compressed: false, // RLE already compresses well, LZ4 might not help much
524            min_compression_ratio: 1.0,
525        };
526
527        // For 32-bit RLE, the compression strategy should automatically wrap it
528        // Let's directly test the compressor
529        let compressor = GeneralMiniBlockCompressor::new(
530            Box::new(RleMiniBlockEncoder),
531            CompressionConfig {
532                scheme: CompressionScheme::Lz4,
533                level: None,
534            },
535        );
536
537        let (_compressed, encoding) = compressor.compress(test_32.data).unwrap();
538
539        // Verify the encoding structure
540        match &encoding.array_encoding {
541            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
542                // Check inner encoding is RLE
543                match &gm.inner.as_ref().unwrap().array_encoding {
544                    Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
545                        assert_eq!(rle.bits_per_value, 32);
546                    }
547                    _ => panic!("Expected RLE as inner encoding"),
548                }
549                // Check compression is LZ4
550                assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
551            }
552            Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
553                // Also acceptable if compression didn't help
554            }
555            _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
556        }
557
558        // Test case 2: 64-bit RLE data
559        let values_64: Vec<i64> = vec![100i64; 50]
560            .into_iter()
561            .chain(vec![200i64; 50])
562            .chain(vec![300i64; 50])
563            .collect();
564        let array_64 = arrow_array::Int64Array::from(values_64);
565        let block_64 = DataBlock::from_array(array_64);
566
567        let compressor_64 = GeneralMiniBlockCompressor::new(
568            Box::new(RleMiniBlockEncoder),
569            CompressionConfig {
570                scheme: CompressionScheme::Lz4,
571                level: None,
572            },
573        );
574
575        let (_compressed_64, encoding_64) = compressor_64.compress(block_64).unwrap();
576
577        // Verify the encoding structure for 64-bit
578        match &encoding_64.array_encoding {
579            Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
580                // Check inner encoding is RLE
581                match &gm.inner.as_ref().unwrap().array_encoding {
582                    Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
583                        assert_eq!(rle.bits_per_value, 64);
584                    }
585                    _ => panic!("Expected RLE as inner encoding for 64-bit"),
586                }
587                // Check compression is LZ4
588                assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
589            }
590            Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
591                // Also acceptable if compression didn't help
592            }
593            _ => panic!("Expected GeneralMiniBlock or Rle encoding for 64-bit"),
594        }
595    }
596
597    #[test]
598    fn test_compressed_mini_block_empty_data() {
599        let empty_array = Int32Array::from(vec![] as Vec<i32>);
600        let empty_block = DataBlock::from_array(empty_array);
601
602        let compressor = GeneralMiniBlockCompressor::new(
603            Box::new(ValueEncoder {}),
604            CompressionConfig {
605                scheme: CompressionScheme::Lz4,
606                level: None,
607            },
608        );
609
610        let result = compressor.compress(empty_block);
611        match result {
612            Ok((compressed, _)) => {
613                assert_eq!(compressed.num_values, 0);
614            }
615            Err(_) => {
616                // Empty data might not be supported by ValueEncoder
617            }
618        }
619    }
620}