lance_encoding/encodings/physical/
general.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use log::trace;
5
6use crate::{
7    buffer::LanceBuffer,
8    compression::MiniBlockDecompressor,
9    data::DataBlock,
10    encodings::{
11        logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
12        physical::block::{CompressionConfig, GeneralBufferCompressor},
13    },
14    format::{pb21::CompressiveEncoding, ProtobufUtils21},
15    Result,
16};
17
18/// A miniblock compressor that wraps another miniblock compressor and applies
19/// general-purpose compression (LZ4, Zstd) to the resulting buffers.
20#[derive(Debug)]
21pub struct GeneralMiniBlockCompressor {
22    inner: Box<dyn MiniBlockCompressor>,
23    compression: CompressionConfig,
24}
25
26impl GeneralMiniBlockCompressor {
27    pub fn new(inner: Box<dyn MiniBlockCompressor>, compression: CompressionConfig) -> Self {
28        Self { inner, compression }
29    }
30}
31
32/// Minimum buffer size to consider for compression
33const MIN_BUFFER_SIZE_FOR_COMPRESSION: usize = 4 * 1024;
34
35use super::super::logical::primitive::miniblock::MiniBlockChunk;
36
37impl MiniBlockCompressor for GeneralMiniBlockCompressor {
38    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
39        // First, compress with the inner compressor
40        let (inner_compressed, inner_encoding) = self.inner.compress(page)?;
41
42        // Return the original encoding without compression if there's no data or
43        // the first buffer is not large enough
44        if inner_compressed.data.is_empty()
45            || inner_compressed.data[0].len() < MIN_BUFFER_SIZE_FOR_COMPRESSION
46        {
47            return Ok((inner_compressed, inner_encoding));
48        }
49
50        // We'll compress each chunk's portion of the first buffer independently
51        let first_buffer = &inner_compressed.data[0];
52        let mut compressed_first_buffer = Vec::new();
53        let mut new_chunks = Vec::with_capacity(inner_compressed.chunks.iter().len());
54        let mut offset = 0usize;
55        let mut total_original_size = 0usize;
56
57        let compressor = GeneralBufferCompressor::get_compressor(self.compression)?;
58
59        for chunk in &inner_compressed.chunks {
60            let chunk_first_buffer_size = chunk.buffer_sizes[0] as usize;
61
62            let chunk_data = &first_buffer.as_ref()[offset..offset + chunk_first_buffer_size];
63            total_original_size += chunk_first_buffer_size;
64
65            let compressed_start = compressed_first_buffer.len();
66            compressor.compress(chunk_data, &mut compressed_first_buffer)?;
67            let compressed_size = compressed_first_buffer.len() - compressed_start;
68
69            // Create new chunk with updated first buffer size
70            let mut new_buffer_sizes = chunk.buffer_sizes.clone();
71            new_buffer_sizes[0] = compressed_size as u16;
72
73            new_chunks.push(MiniBlockChunk {
74                buffer_sizes: new_buffer_sizes,
75                log_num_values: chunk.log_num_values,
76            });
77
78            offset += chunk_first_buffer_size;
79        }
80
81        // Check if compression was effective
82        let compressed_total_size = compressed_first_buffer.len();
83        if compressed_total_size >= total_original_size {
84            // Compression didn't help, return original
85            return Ok((inner_compressed, inner_encoding));
86        }
87
88        trace!(
89            "First buffer compressed from {} to {} bytes (ratio: {:.2})",
90            total_original_size,
91            compressed_total_size,
92            compressed_total_size as f32 / total_original_size as f32
93        );
94
95        // Build final buffers: compressed first buffer + remaining original buffers
96        let mut final_buffers = vec![LanceBuffer::from(compressed_first_buffer)];
97        final_buffers.extend(inner_compressed.data.into_iter().skip(1));
98
99        let compressed_result = MiniBlockCompressed {
100            data: final_buffers,
101            chunks: new_chunks,
102            num_values: inner_compressed.num_values,
103        };
104
105        // Return compressed encoding
106        let encoding = ProtobufUtils21::wrapped(self.compression, inner_encoding)?;
107        Ok((compressed_result, encoding))
108    }
109}
110
111/// A miniblock decompressor that first decompresses buffers using general-purpose
112/// compression (LZ4, Zstd) and then delegates to an inner miniblock decompressor.
113#[derive(Debug)]
114pub struct GeneralMiniBlockDecompressor {
115    inner: Box<dyn MiniBlockDecompressor>,
116    compression: CompressionConfig,
117}
118
119impl GeneralMiniBlockDecompressor {
120    pub fn new(inner: Box<dyn MiniBlockDecompressor>, compression: CompressionConfig) -> Self {
121        Self { inner, compression }
122    }
123}
124
125impl MiniBlockDecompressor for GeneralMiniBlockDecompressor {
126    fn decompress(&self, mut data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
127        let mut decompressed_buffer = Vec::new();
128
129        let decompressor = GeneralBufferCompressor::get_compressor(self.compression)?;
130        decompressor.decompress(&data[0], &mut decompressed_buffer)?;
131        data[0] = LanceBuffer::from(decompressed_buffer);
132
133        self.inner.decompress(data, num_values)
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
141    use crate::data::{BlockInfo, FixedWidthDataBlock};
142    use crate::encodings::physical::block::CompressionScheme;
143    use crate::encodings::physical::rle::RleMiniBlockEncoder;
144    use crate::encodings::physical::value::ValueEncoder;
145    use crate::format::pb21;
146    use crate::format::pb21::compressive_encoding::Compression;
147    use arrow_array::{Float64Array, Int32Array};
148
149    #[derive(Debug)]
150    struct TestCase {
151        name: &'static str,
152        inner_encoder: Box<dyn MiniBlockCompressor>,
153        compression: CompressionConfig,
154        data: DataBlock,
155        expected_compressed: bool, // Whether we expect compression to be applied
156        min_compression_ratio: f32, // Minimum compression ratio if compressed
157    }
158
159    fn create_test_cases() -> Vec<TestCase> {
160        vec![
161            // Small data with RLE - should not compress due to size threshold
162            TestCase {
163                name: "small_rle_data",
164                inner_encoder: Box::new(RleMiniBlockEncoder),
165                compression: CompressionConfig {
166                    scheme: CompressionScheme::Lz4,
167                    level: None,
168                },
169                data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2]),
170                expected_compressed: false,
171                min_compression_ratio: 1.0,
172            },
173            // Large repeated data with RLE + LZ4
174            TestCase {
175                name: "large_rle_lz4",
176                inner_encoder: Box::new(RleMiniBlockEncoder),
177                compression: CompressionConfig {
178                    scheme: CompressionScheme::Lz4,
179                    level: None,
180                },
181                data: create_pattern_i32_block(2048, |i| (i / 8) as i32),
182                expected_compressed: false, // RLE already compresses well, additional LZ4 may not help
183                min_compression_ratio: 1.0,
184            },
185            // Large repeated data with RLE + Zstd
186            TestCase {
187                name: "large_rle_zstd",
188                inner_encoder: Box::new(RleMiniBlockEncoder),
189                compression: CompressionConfig {
190                    scheme: CompressionScheme::Zstd,
191                    level: Some(3),
192                },
193                data: create_pattern_i32_block(8192, |i| (i / 16) as i32),
194                expected_compressed: true, // Zstd might provide additional compression
195                min_compression_ratio: 0.9, // But not as much since RLE already compressed
196            },
197            // Sequential data with ValueEncoder + LZ4
198            TestCase {
199                name: "sequential_value_lz4",
200                inner_encoder: Box::new(ValueEncoder {}),
201                compression: CompressionConfig {
202                    scheme: CompressionScheme::Lz4,
203                    level: None,
204                },
205                data: create_pattern_i32_block(1024, |i| i as i32),
206                expected_compressed: false, // Sequential data doesn't compress well
207                min_compression_ratio: 1.0,
208            },
209            // Float data with ValueEncoder + Zstd
210            TestCase {
211                name: "float_value_zstd",
212                inner_encoder: Box::new(ValueEncoder {}),
213                compression: CompressionConfig {
214                    scheme: CompressionScheme::Zstd,
215                    level: Some(3),
216                },
217                data: create_pattern_f64_block(1024, |i| i as f64 * 0.1),
218                expected_compressed: true,
219                min_compression_ratio: 0.9,
220            },
221        ]
222    }
223
224    fn create_repeated_i32_block(values: Vec<i32>) -> DataBlock {
225        let array = Int32Array::from(values);
226        DataBlock::from_array(array)
227    }
228
229    fn create_pattern_i32_block<F>(size: usize, pattern: F) -> DataBlock
230    where
231        F: Fn(usize) -> i32,
232    {
233        let values: Vec<i32> = (0..size).map(pattern).collect();
234        let array = Int32Array::from(values);
235        DataBlock::from_array(array)
236    }
237
238    fn create_pattern_f64_block<F>(size: usize, pattern: F) -> DataBlock
239    where
240        F: Fn(usize) -> f64,
241    {
242        let values: Vec<f64> = (0..size).map(pattern).collect();
243        let array = Float64Array::from(values);
244        DataBlock::from_array(array)
245    }
246
247    fn run_round_trip_test(test_case: TestCase) {
248        let compressor =
249            GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression);
250
251        // Compress the data
252        let (compressed, encoding) = compressor.compress(test_case.data).unwrap();
253
254        // Check if compression was applied as expected
255        match &encoding.compression {
256            Some(Compression::General(cm)) => {
257                assert!(
258                    test_case.expected_compressed,
259                    "{}: Expected compression to be applied",
260                    test_case.name
261                );
262                assert_eq!(
263                    CompressionScheme::try_from(cm.compression.as_ref().unwrap().scheme()).unwrap(),
264                    test_case.compression.scheme
265                );
266            }
267            _ => {
268                // Could be RLE or other encoding if compression didn't help
269                if test_case.expected_compressed {
270                    // Check if it's RLE encoding (which means compression didn't help)
271                    match &encoding.compression {
272                        Some(Compression::Rle(_)) => {
273                            // RLE encoding returned - compression didn't help
274                        }
275                        Some(Compression::Flat(_)) => {
276                            // Flat encoding returned - compression didn't help
277                        }
278                        _ => {
279                            panic!(
280                                "{}: Expected GeneralMiniBlock but got {:?}",
281                                test_case.name, encoding.compression
282                            );
283                        }
284                    }
285                }
286            }
287        }
288
289        // Verify chunks are created correctly
290        assert!(
291            !compressed.chunks.is_empty(),
292            "{}: No chunks created",
293            test_case.name
294        );
295
296        // Test decompression by simulating the actual miniblock decoding process
297        let decompressed_data = decompress_miniblock_chunks(&compressed, &encoding);
298
299        // Verify round trip by checking data size
300        // We expect the decompressed data to match the original number of values
301        // The bytes per value depends on the test case
302        let bytes_per_value = if test_case.name.contains("float") {
303            8 // f64
304        } else {
305            4 // i32
306        };
307        let expected_bytes = compressed.num_values as usize * bytes_per_value;
308        assert_eq!(
309            expected_bytes,
310            decompressed_data.len(),
311            "{}: Data size mismatch",
312            test_case.name
313        );
314
315        // Check compression ratio if applicable
316        if test_case.expected_compressed {
317            let compression_ratio = compressed.data[0].len() as f32 / expected_bytes as f32;
318            assert!(
319                compression_ratio <= test_case.min_compression_ratio,
320                "{}: Compression ratio {} > expected {}",
321                test_case.name,
322                compression_ratio,
323                test_case.min_compression_ratio
324            );
325        }
326    }
327
328    fn decompress_miniblock_chunks(
329        compressed: &MiniBlockCompressed,
330        encoding: &CompressiveEncoding,
331    ) -> Vec<u8> {
332        let mut decompressed_data = Vec::new();
333        let mut offsets = vec![0usize; compressed.data.len()]; // Track offset for each buffer
334        let decompression_strategy = DefaultDecompressionStrategy::default();
335
336        for chunk in &compressed.chunks {
337            let chunk_values = if chunk.log_num_values > 0 {
338                1u64 << chunk.log_num_values
339            } else {
340                // Last chunk - calculate remaining values
341                let decompressed_values =
342                    decompressed_data.len() as u64 / get_bytes_per_value(compressed) as u64;
343                compressed.num_values.saturating_sub(decompressed_values)
344            };
345
346            // Extract buffers for this chunk
347            let mut chunk_buffers = Vec::new();
348            for (i, &size) in chunk.buffer_sizes.iter().enumerate() {
349                if i < compressed.data.len() {
350                    let buffer_data =
351                        compressed.data[i].slice_with_length(offsets[i], size as usize);
352                    chunk_buffers.push(buffer_data);
353                    offsets[i] += size as usize;
354                }
355            }
356
357            // Create a decompressor for this chunk
358            let decompressor = decompression_strategy
359                .create_miniblock_decompressor(encoding, &decompression_strategy)
360                .unwrap();
361
362            // Decompress the chunk
363            let chunk_decompressed = decompressor
364                .decompress(chunk_buffers, chunk_values)
365                .unwrap();
366
367            match chunk_decompressed {
368                DataBlock::FixedWidth(ref block) => {
369                    decompressed_data.extend_from_slice(block.data.as_ref());
370                }
371                _ => panic!("Expected FixedWidth block"),
372            }
373        }
374
375        decompressed_data
376    }
377
378    fn get_bytes_per_value(compressed: &MiniBlockCompressed) -> usize {
379        // This is a simplification - in reality we'd need to know the data type
380        // For our tests, we mostly use i32 (4 bytes) or f64 (8 bytes)
381        // We can try to guess based on the data size
382        if compressed.num_values == 0 {
383            return 4; // Default to i32
384        }
385
386        // For float tests, the number is usually 1024 and we use f64
387        if compressed.num_values == 1024 {
388            return 8; // Likely f64
389        }
390
391        4 // Default to i32
392    }
393
394    #[test]
395    fn test_compressed_mini_block_table_driven() {
396        for test_case in create_test_cases() {
397            run_round_trip_test(test_case);
398        }
399    }
400
401    #[test]
402    fn test_compressed_mini_block_threshold() {
403        // Test that small buffers don't get compressed
404        let small_test = TestCase {
405            name: "small_buffer_no_compression",
406            inner_encoder: Box::new(RleMiniBlockEncoder),
407            compression: CompressionConfig {
408                scheme: CompressionScheme::Lz4,
409                level: None,
410            },
411            data: create_repeated_i32_block(vec![1, 1, 2, 2]),
412            expected_compressed: false,
413            min_compression_ratio: 1.0,
414        };
415        run_round_trip_test(small_test);
416    }
417
418    #[test]
419    fn test_compressed_mini_block_with_doubles() {
420        // Test with large sequential doubles that should compress well with Zstd
421        // The test focuses on verifying that GeneralMiniBlock works correctly
422        // when wrapping a simple ValueEncoder
423        let test_case = TestCase {
424            name: "float_values_with_zstd",
425            inner_encoder: Box::new(ValueEncoder {}),
426            compression: CompressionConfig {
427                scheme: CompressionScheme::Zstd,
428                level: Some(3),
429            },
430            // Create enough data to ensure compression is applied
431            data: create_pattern_f64_block(1024, |i| (i / 10) as f64),
432            expected_compressed: true,
433            min_compression_ratio: 0.5, // Zstd should achieve good compression on repetitive data
434        };
435
436        run_round_trip_test(test_case);
437    }
438
439    #[test]
440    fn test_compressed_mini_block_large_buffers() {
441        // Use value encoding which doesn't compress data, ensuring large buffers
442        // Create 1024 i32 values (4KB of data)
443        let values: Vec<i32> = (0..1024).collect();
444        let data = LanceBuffer::from_bytes(
445            bytemuck::cast_slice(&values).to_vec().into(),
446            std::mem::align_of::<i32>() as u64,
447        );
448        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
449            bits_per_value: 32,
450            data,
451            num_values: 1024,
452            block_info: BlockInfo::new(),
453        });
454
455        // Create compressor with ValueEncoder (no compression) and Zstd wrapper
456        let inner = Box::new(ValueEncoder {});
457        let compression = CompressionConfig {
458            scheme: CompressionScheme::Zstd,
459            level: Some(3),
460        };
461        let compressor = GeneralMiniBlockCompressor::new(inner, compression);
462
463        // Compress the data
464        let (compressed, encoding) = compressor.compress(block).unwrap();
465
466        // Should get GeneralMiniBlock encoding since buffer is 4KB
467        match &encoding.compression {
468            Some(Compression::General(cm)) => {
469                assert!(cm.values.is_some());
470                assert_eq!(
471                    cm.compression.as_ref().unwrap().scheme(),
472                    pb21::CompressionScheme::CompressionAlgorithmZstd
473                );
474                assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
475
476                // Verify inner encoding is Flat (from ValueEncoder)
477                match &cm.values.as_ref().unwrap().compression {
478                    Some(Compression::Flat(flat)) => {
479                        assert_eq!(flat.bits_per_value, 32);
480                    }
481                    _ => panic!("Expected Flat inner encoding"),
482                }
483            }
484            _ => panic!("Expected GeneralMiniBlock encoding"),
485        }
486
487        assert_eq!(compressed.num_values, 1024);
488        // ValueEncoder produces 1 buffer, so compressed result also has 1 buffer
489        assert_eq!(compressed.data.len(), 1);
490    }
491
492    // Special test cases that don't fit the table-driven pattern
493
494    #[test]
495    fn test_compressed_mini_block_rle_multiple_buffers() {
496        // RLE produces 2 buffers (values and lengths), test that both are handled correctly
497        let data = create_repeated_i32_block(vec![1; 100]);
498        let compressor = GeneralMiniBlockCompressor::new(
499            Box::new(RleMiniBlockEncoder),
500            CompressionConfig {
501                scheme: CompressionScheme::Lz4,
502                level: None,
503            },
504        );
505
506        let (compressed, _) = compressor.compress(data).unwrap();
507        // RLE produces 2 buffers, but only the first one is compressed
508        assert_eq!(compressed.data.len(), 2);
509    }
510
511    #[test]
512    fn test_rle_with_general_miniblock_wrapper() {
513        // Test that RLE encoding with bits_per_value >= 32 is automatically wrapped
514        // in GeneralMiniBlock with LZ4 compression
515
516        // This test directly tests the RLE encoder behavior
517        // When bits_per_value >= 32, RLE should be wrapped in GeneralMiniBlock with LZ4
518
519        // Test case 1: 32-bit RLE data
520        let test_32 = TestCase {
521            name: "rle_32bit_with_general_wrapper",
522            inner_encoder: Box::new(RleMiniBlockEncoder),
523            compression: CompressionConfig {
524                scheme: CompressionScheme::Lz4,
525                level: None,
526            },
527            data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]),
528            expected_compressed: false, // RLE already compresses well, LZ4 might not help much
529            min_compression_ratio: 1.0,
530        };
531
532        // For 32-bit RLE, the compression strategy should automatically wrap it
533        // Let's directly test the compressor
534        let compressor = GeneralMiniBlockCompressor::new(
535            Box::new(RleMiniBlockEncoder),
536            CompressionConfig {
537                scheme: CompressionScheme::Lz4,
538                level: None,
539            },
540        );
541
542        let (_compressed, encoding) = compressor.compress(test_32.data).unwrap();
543
544        // Verify the encoding structure
545        match &encoding.compression {
546            Some(Compression::General(cm)) => {
547                // Check inner encoding is RLE
548                match &cm.values.as_ref().unwrap().compression {
549                    Some(Compression::Rle(rle)) => {
550                        let Compression::Flat(values) =
551                            rle.values.as_ref().unwrap().compression.as_ref().unwrap()
552                        else {
553                            panic!("Expected flat for RLE values")
554                        };
555                        let Compression::Flat(run_lengths) = rle
556                            .run_lengths
557                            .as_ref()
558                            .unwrap()
559                            .compression
560                            .as_ref()
561                            .unwrap()
562                        else {
563                            panic!("Expected flat for RLE run lengths")
564                        };
565                        assert_eq!(values.bits_per_value, 32);
566                        assert_eq!(run_lengths.bits_per_value, 8);
567                    }
568                    _ => panic!("Expected RLE as inner encoding"),
569                }
570                // Check compression is LZ4
571                assert_eq!(
572                    cm.compression.as_ref().unwrap().scheme(),
573                    pb21::CompressionScheme::CompressionAlgorithmLz4
574                );
575            }
576            Some(Compression::Rle(_)) => {
577                // Also acceptable if compression didn't help
578            }
579            _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
580        }
581
582        // Test case 2: 64-bit RLE data
583        let values_64: Vec<i64> = vec![100i64; 50]
584            .into_iter()
585            .chain(vec![200i64; 50])
586            .chain(vec![300i64; 50])
587            .collect();
588        let array_64 = arrow_array::Int64Array::from(values_64);
589        let block_64 = DataBlock::from_array(array_64);
590
591        let compressor_64 = GeneralMiniBlockCompressor::new(
592            Box::new(RleMiniBlockEncoder),
593            CompressionConfig {
594                scheme: CompressionScheme::Lz4,
595                level: None,
596            },
597        );
598
599        let (_compressed_64, encoding_64) = compressor_64.compress(block_64).unwrap();
600
601        // Verify the encoding structure for 64-bit
602        match &encoding_64.compression {
603            Some(Compression::General(cm)) => {
604                // Check inner encoding is RLE
605                match &cm.values.as_ref().unwrap().compression {
606                    Some(Compression::Rle(rle)) => {
607                        let Compression::Flat(values) =
608                            rle.values.as_ref().unwrap().compression.as_ref().unwrap()
609                        else {
610                            panic!("Expected flat for RLE values")
611                        };
612                        let Compression::Flat(run_lengths) = rle
613                            .run_lengths
614                            .as_ref()
615                            .unwrap()
616                            .compression
617                            .as_ref()
618                            .unwrap()
619                        else {
620                            panic!("Expected flat for RLE run lengths")
621                        };
622                        assert_eq!(values.bits_per_value, 64);
623                        assert_eq!(run_lengths.bits_per_value, 8);
624                    }
625                    _ => panic!("Expected RLE as inner encoding for 64-bit"),
626                }
627                // Check compression is LZ4
628                assert_eq!(
629                    cm.compression.as_ref().unwrap().scheme(),
630                    pb21::CompressionScheme::CompressionAlgorithmLz4
631                );
632            }
633            Some(Compression::Rle(_)) => {
634                // Also acceptable if compression didn't help
635            }
636            _ => panic!("Expected GeneralMiniBlock or Rle encoding for 64-bit"),
637        }
638    }
639
640    #[test]
641    fn test_compressed_mini_block_empty_data() {
642        let empty_array = Int32Array::from(vec![] as Vec<i32>);
643        let empty_block = DataBlock::from_array(empty_array);
644
645        let compressor = GeneralMiniBlockCompressor::new(
646            Box::new(ValueEncoder {}),
647            CompressionConfig {
648                scheme: CompressionScheme::Lz4,
649                level: None,
650            },
651        );
652
653        let result = compressor.compress(empty_block);
654        match result {
655            Ok((compressed, _)) => {
656                assert_eq!(compressed.num_values, 0);
657            }
658            Err(_) => {
659                // Empty data might not be supported by ValueEncoder
660            }
661        }
662    }
663}