1use log::trace;
5
6use crate::{
7 buffer::LanceBuffer,
8 compression::MiniBlockDecompressor,
9 data::DataBlock,
10 encodings::{
11 logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
12 physical::block::{CompressionConfig, GeneralBufferCompressor},
13 },
14 format::{pb21::CompressiveEncoding, ProtobufUtils21},
15 Result,
16};
17
18#[derive(Debug)]
21pub struct GeneralMiniBlockCompressor {
22 inner: Box<dyn MiniBlockCompressor>,
23 compression: CompressionConfig,
24}
25
26impl GeneralMiniBlockCompressor {
27 pub fn new(inner: Box<dyn MiniBlockCompressor>, compression: CompressionConfig) -> Self {
28 Self { inner, compression }
29 }
30}
31
32const MIN_BUFFER_SIZE_FOR_COMPRESSION: usize = 4 * 1024;
34
35use super::super::logical::primitive::miniblock::MiniBlockChunk;
36
37impl MiniBlockCompressor for GeneralMiniBlockCompressor {
38 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
39 let (inner_compressed, inner_encoding) = self.inner.compress(page)?;
41
42 if inner_compressed.data.is_empty()
45 || inner_compressed.data[0].len() < MIN_BUFFER_SIZE_FOR_COMPRESSION
46 {
47 return Ok((inner_compressed, inner_encoding));
48 }
49
50 let first_buffer = &inner_compressed.data[0];
52 let mut compressed_first_buffer = Vec::new();
53 let mut new_chunks = Vec::with_capacity(inner_compressed.chunks.iter().len());
54 let mut offset = 0usize;
55 let mut total_original_size = 0usize;
56
57 let compressor = GeneralBufferCompressor::get_compressor(self.compression)?;
58
59 for chunk in &inner_compressed.chunks {
60 let chunk_first_buffer_size = chunk.buffer_sizes[0] as usize;
61
62 let chunk_data = &first_buffer.as_ref()[offset..offset + chunk_first_buffer_size];
63 total_original_size += chunk_first_buffer_size;
64
65 let compressed_start = compressed_first_buffer.len();
66 compressor.compress(chunk_data, &mut compressed_first_buffer)?;
67 let compressed_size = compressed_first_buffer.len() - compressed_start;
68
69 let mut new_buffer_sizes = chunk.buffer_sizes.clone();
71 new_buffer_sizes[0] = compressed_size as u16;
72
73 new_chunks.push(MiniBlockChunk {
74 buffer_sizes: new_buffer_sizes,
75 log_num_values: chunk.log_num_values,
76 });
77
78 offset += chunk_first_buffer_size;
79 }
80
81 let compressed_total_size = compressed_first_buffer.len();
83 if compressed_total_size >= total_original_size {
84 return Ok((inner_compressed, inner_encoding));
86 }
87
88 trace!(
89 "First buffer compressed from {} to {} bytes (ratio: {:.2})",
90 total_original_size,
91 compressed_total_size,
92 compressed_total_size as f32 / total_original_size as f32
93 );
94
95 let mut final_buffers = vec![LanceBuffer::from(compressed_first_buffer)];
97 final_buffers.extend(inner_compressed.data.into_iter().skip(1));
98
99 let compressed_result = MiniBlockCompressed {
100 data: final_buffers,
101 chunks: new_chunks,
102 num_values: inner_compressed.num_values,
103 };
104
105 let encoding = ProtobufUtils21::wrapped(self.compression, inner_encoding)?;
107 Ok((compressed_result, encoding))
108 }
109}
110
111#[derive(Debug)]
114pub struct GeneralMiniBlockDecompressor {
115 inner: Box<dyn MiniBlockDecompressor>,
116 compression: CompressionConfig,
117}
118
119impl GeneralMiniBlockDecompressor {
120 pub fn new(inner: Box<dyn MiniBlockDecompressor>, compression: CompressionConfig) -> Self {
121 Self { inner, compression }
122 }
123}
124
125impl MiniBlockDecompressor for GeneralMiniBlockDecompressor {
126 fn decompress(&self, mut data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
127 let mut decompressed_buffer = Vec::new();
128
129 let decompressor = GeneralBufferCompressor::get_compressor(self.compression)?;
130 decompressor.decompress(&data[0], &mut decompressed_buffer)?;
131 data[0] = LanceBuffer::from(decompressed_buffer);
132
133 self.inner.decompress(data, num_values)
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
141 use crate::data::{BlockInfo, FixedWidthDataBlock};
142 use crate::encodings::physical::block::CompressionScheme;
143 use crate::encodings::physical::rle::RleMiniBlockEncoder;
144 use crate::encodings::physical::value::ValueEncoder;
145 use crate::format::pb21;
146 use crate::format::pb21::compressive_encoding::Compression;
147 use arrow_array::{Float64Array, Int32Array};
148
149 #[derive(Debug)]
150 struct TestCase {
151 name: &'static str,
152 inner_encoder: Box<dyn MiniBlockCompressor>,
153 compression: CompressionConfig,
154 data: DataBlock,
155 expected_compressed: bool, min_compression_ratio: f32, }
158
159 fn create_test_cases() -> Vec<TestCase> {
160 vec![
161 TestCase {
163 name: "small_rle_data",
164 inner_encoder: Box::new(RleMiniBlockEncoder),
165 compression: CompressionConfig {
166 scheme: CompressionScheme::Lz4,
167 level: None,
168 },
169 data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2]),
170 expected_compressed: false,
171 min_compression_ratio: 1.0,
172 },
173 TestCase {
175 name: "large_rle_lz4",
176 inner_encoder: Box::new(RleMiniBlockEncoder),
177 compression: CompressionConfig {
178 scheme: CompressionScheme::Lz4,
179 level: None,
180 },
181 data: create_pattern_i32_block(2048, |i| (i / 8) as i32),
182 expected_compressed: false, min_compression_ratio: 1.0,
184 },
185 TestCase {
187 name: "large_rle_zstd",
188 inner_encoder: Box::new(RleMiniBlockEncoder),
189 compression: CompressionConfig {
190 scheme: CompressionScheme::Zstd,
191 level: Some(3),
192 },
193 data: create_pattern_i32_block(8192, |i| (i / 16) as i32),
194 expected_compressed: true, min_compression_ratio: 0.9, },
197 TestCase {
199 name: "sequential_value_lz4",
200 inner_encoder: Box::new(ValueEncoder {}),
201 compression: CompressionConfig {
202 scheme: CompressionScheme::Lz4,
203 level: None,
204 },
205 data: create_pattern_i32_block(1024, |i| i as i32),
206 expected_compressed: false, min_compression_ratio: 1.0,
208 },
209 TestCase {
211 name: "float_value_zstd",
212 inner_encoder: Box::new(ValueEncoder {}),
213 compression: CompressionConfig {
214 scheme: CompressionScheme::Zstd,
215 level: Some(3),
216 },
217 data: create_pattern_f64_block(1024, |i| i as f64 * 0.1),
218 expected_compressed: true,
219 min_compression_ratio: 0.9,
220 },
221 ]
222 }
223
224 fn create_repeated_i32_block(values: Vec<i32>) -> DataBlock {
225 let array = Int32Array::from(values);
226 DataBlock::from_array(array)
227 }
228
229 fn create_pattern_i32_block<F>(size: usize, pattern: F) -> DataBlock
230 where
231 F: Fn(usize) -> i32,
232 {
233 let values: Vec<i32> = (0..size).map(pattern).collect();
234 let array = Int32Array::from(values);
235 DataBlock::from_array(array)
236 }
237
238 fn create_pattern_f64_block<F>(size: usize, pattern: F) -> DataBlock
239 where
240 F: Fn(usize) -> f64,
241 {
242 let values: Vec<f64> = (0..size).map(pattern).collect();
243 let array = Float64Array::from(values);
244 DataBlock::from_array(array)
245 }
246
247 fn run_round_trip_test(test_case: TestCase) {
248 let compressor =
249 GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression);
250
251 let (compressed, encoding) = compressor.compress(test_case.data).unwrap();
253
254 match &encoding.compression {
256 Some(Compression::General(cm)) => {
257 assert!(
258 test_case.expected_compressed,
259 "{}: Expected compression to be applied",
260 test_case.name
261 );
262 assert_eq!(
263 CompressionScheme::try_from(cm.compression.as_ref().unwrap().scheme()).unwrap(),
264 test_case.compression.scheme
265 );
266 }
267 _ => {
268 if test_case.expected_compressed {
270 match &encoding.compression {
272 Some(Compression::Rle(_)) => {
273 }
275 Some(Compression::Flat(_)) => {
276 }
278 _ => {
279 panic!(
280 "{}: Expected GeneralMiniBlock but got {:?}",
281 test_case.name, encoding.compression
282 );
283 }
284 }
285 }
286 }
287 }
288
289 assert!(
291 !compressed.chunks.is_empty(),
292 "{}: No chunks created",
293 test_case.name
294 );
295
296 let decompressed_data = decompress_miniblock_chunks(&compressed, &encoding);
298
299 let bytes_per_value = if test_case.name.contains("float") {
303 8 } else {
305 4 };
307 let expected_bytes = compressed.num_values as usize * bytes_per_value;
308 assert_eq!(
309 expected_bytes,
310 decompressed_data.len(),
311 "{}: Data size mismatch",
312 test_case.name
313 );
314
315 if test_case.expected_compressed {
317 let compression_ratio = compressed.data[0].len() as f32 / expected_bytes as f32;
318 assert!(
319 compression_ratio <= test_case.min_compression_ratio,
320 "{}: Compression ratio {} > expected {}",
321 test_case.name,
322 compression_ratio,
323 test_case.min_compression_ratio
324 );
325 }
326 }
327
328 fn decompress_miniblock_chunks(
329 compressed: &MiniBlockCompressed,
330 encoding: &CompressiveEncoding,
331 ) -> Vec<u8> {
332 let mut decompressed_data = Vec::new();
333 let mut offsets = vec![0usize; compressed.data.len()]; let decompression_strategy = DefaultDecompressionStrategy::default();
335
336 for chunk in &compressed.chunks {
337 let chunk_values = if chunk.log_num_values > 0 {
338 1u64 << chunk.log_num_values
339 } else {
340 let decompressed_values =
342 decompressed_data.len() as u64 / get_bytes_per_value(compressed) as u64;
343 compressed.num_values.saturating_sub(decompressed_values)
344 };
345
346 let mut chunk_buffers = Vec::new();
348 for (i, &size) in chunk.buffer_sizes.iter().enumerate() {
349 if i < compressed.data.len() {
350 let buffer_data =
351 compressed.data[i].slice_with_length(offsets[i], size as usize);
352 chunk_buffers.push(buffer_data);
353 offsets[i] += size as usize;
354 }
355 }
356
357 let decompressor = decompression_strategy
359 .create_miniblock_decompressor(encoding, &decompression_strategy)
360 .unwrap();
361
362 let chunk_decompressed = decompressor
364 .decompress(chunk_buffers, chunk_values)
365 .unwrap();
366
367 match chunk_decompressed {
368 DataBlock::FixedWidth(ref block) => {
369 decompressed_data.extend_from_slice(block.data.as_ref());
370 }
371 _ => panic!("Expected FixedWidth block"),
372 }
373 }
374
375 decompressed_data
376 }
377
378 fn get_bytes_per_value(compressed: &MiniBlockCompressed) -> usize {
379 if compressed.num_values == 0 {
383 return 4; }
385
386 if compressed.num_values == 1024 {
388 return 8; }
390
391 4 }
393
394 #[test]
395 fn test_compressed_mini_block_table_driven() {
396 for test_case in create_test_cases() {
397 run_round_trip_test(test_case);
398 }
399 }
400
401 #[test]
402 fn test_compressed_mini_block_threshold() {
403 let small_test = TestCase {
405 name: "small_buffer_no_compression",
406 inner_encoder: Box::new(RleMiniBlockEncoder),
407 compression: CompressionConfig {
408 scheme: CompressionScheme::Lz4,
409 level: None,
410 },
411 data: create_repeated_i32_block(vec![1, 1, 2, 2]),
412 expected_compressed: false,
413 min_compression_ratio: 1.0,
414 };
415 run_round_trip_test(small_test);
416 }
417
418 #[test]
419 fn test_compressed_mini_block_with_doubles() {
420 let test_case = TestCase {
424 name: "float_values_with_zstd",
425 inner_encoder: Box::new(ValueEncoder {}),
426 compression: CompressionConfig {
427 scheme: CompressionScheme::Zstd,
428 level: Some(3),
429 },
430 data: create_pattern_f64_block(1024, |i| (i / 10) as f64),
432 expected_compressed: true,
433 min_compression_ratio: 0.5, };
435
436 run_round_trip_test(test_case);
437 }
438
439 #[test]
440 fn test_compressed_mini_block_large_buffers() {
441 let values: Vec<i32> = (0..1024).collect();
444 let data = LanceBuffer::from_bytes(
445 bytemuck::cast_slice(&values).to_vec().into(),
446 std::mem::align_of::<i32>() as u64,
447 );
448 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
449 bits_per_value: 32,
450 data,
451 num_values: 1024,
452 block_info: BlockInfo::new(),
453 });
454
455 let inner = Box::new(ValueEncoder {});
457 let compression = CompressionConfig {
458 scheme: CompressionScheme::Zstd,
459 level: Some(3),
460 };
461 let compressor = GeneralMiniBlockCompressor::new(inner, compression);
462
463 let (compressed, encoding) = compressor.compress(block).unwrap();
465
466 match &encoding.compression {
468 Some(Compression::General(cm)) => {
469 assert!(cm.values.is_some());
470 assert_eq!(
471 cm.compression.as_ref().unwrap().scheme(),
472 pb21::CompressionScheme::CompressionAlgorithmZstd
473 );
474 assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
475
476 match &cm.values.as_ref().unwrap().compression {
478 Some(Compression::Flat(flat)) => {
479 assert_eq!(flat.bits_per_value, 32);
480 }
481 _ => panic!("Expected Flat inner encoding"),
482 }
483 }
484 _ => panic!("Expected GeneralMiniBlock encoding"),
485 }
486
487 assert_eq!(compressed.num_values, 1024);
488 assert_eq!(compressed.data.len(), 1);
490 }
491
492 #[test]
495 fn test_compressed_mini_block_rle_multiple_buffers() {
496 let data = create_repeated_i32_block(vec![1; 100]);
498 let compressor = GeneralMiniBlockCompressor::new(
499 Box::new(RleMiniBlockEncoder),
500 CompressionConfig {
501 scheme: CompressionScheme::Lz4,
502 level: None,
503 },
504 );
505
506 let (compressed, _) = compressor.compress(data).unwrap();
507 assert_eq!(compressed.data.len(), 2);
509 }
510
511 #[test]
512 fn test_rle_with_general_miniblock_wrapper() {
513 let test_32 = TestCase {
521 name: "rle_32bit_with_general_wrapper",
522 inner_encoder: Box::new(RleMiniBlockEncoder),
523 compression: CompressionConfig {
524 scheme: CompressionScheme::Lz4,
525 level: None,
526 },
527 data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]),
528 expected_compressed: false, min_compression_ratio: 1.0,
530 };
531
532 let compressor = GeneralMiniBlockCompressor::new(
535 Box::new(RleMiniBlockEncoder),
536 CompressionConfig {
537 scheme: CompressionScheme::Lz4,
538 level: None,
539 },
540 );
541
542 let (_compressed, encoding) = compressor.compress(test_32.data).unwrap();
543
544 match &encoding.compression {
546 Some(Compression::General(cm)) => {
547 match &cm.values.as_ref().unwrap().compression {
549 Some(Compression::Rle(rle)) => {
550 let Compression::Flat(values) =
551 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
552 else {
553 panic!("Expected flat for RLE values")
554 };
555 let Compression::Flat(run_lengths) = rle
556 .run_lengths
557 .as_ref()
558 .unwrap()
559 .compression
560 .as_ref()
561 .unwrap()
562 else {
563 panic!("Expected flat for RLE run lengths")
564 };
565 assert_eq!(values.bits_per_value, 32);
566 assert_eq!(run_lengths.bits_per_value, 8);
567 }
568 _ => panic!("Expected RLE as inner encoding"),
569 }
570 assert_eq!(
572 cm.compression.as_ref().unwrap().scheme(),
573 pb21::CompressionScheme::CompressionAlgorithmLz4
574 );
575 }
576 Some(Compression::Rle(_)) => {
577 }
579 _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
580 }
581
582 let values_64: Vec<i64> = vec![100i64; 50]
584 .into_iter()
585 .chain(vec![200i64; 50])
586 .chain(vec![300i64; 50])
587 .collect();
588 let array_64 = arrow_array::Int64Array::from(values_64);
589 let block_64 = DataBlock::from_array(array_64);
590
591 let compressor_64 = GeneralMiniBlockCompressor::new(
592 Box::new(RleMiniBlockEncoder),
593 CompressionConfig {
594 scheme: CompressionScheme::Lz4,
595 level: None,
596 },
597 );
598
599 let (_compressed_64, encoding_64) = compressor_64.compress(block_64).unwrap();
600
601 match &encoding_64.compression {
603 Some(Compression::General(cm)) => {
604 match &cm.values.as_ref().unwrap().compression {
606 Some(Compression::Rle(rle)) => {
607 let Compression::Flat(values) =
608 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
609 else {
610 panic!("Expected flat for RLE values")
611 };
612 let Compression::Flat(run_lengths) = rle
613 .run_lengths
614 .as_ref()
615 .unwrap()
616 .compression
617 .as_ref()
618 .unwrap()
619 else {
620 panic!("Expected flat for RLE run lengths")
621 };
622 assert_eq!(values.bits_per_value, 64);
623 assert_eq!(run_lengths.bits_per_value, 8);
624 }
625 _ => panic!("Expected RLE as inner encoding for 64-bit"),
626 }
627 assert_eq!(
629 cm.compression.as_ref().unwrap().scheme(),
630 pb21::CompressionScheme::CompressionAlgorithmLz4
631 );
632 }
633 Some(Compression::Rle(_)) => {
634 }
636 _ => panic!("Expected GeneralMiniBlock or Rle encoding for 64-bit"),
637 }
638 }
639
640 #[test]
641 fn test_compressed_mini_block_empty_data() {
642 let empty_array = Int32Array::from(vec![] as Vec<i32>);
643 let empty_block = DataBlock::from_array(empty_array);
644
645 let compressor = GeneralMiniBlockCompressor::new(
646 Box::new(ValueEncoder {}),
647 CompressionConfig {
648 scheme: CompressionScheme::Lz4,
649 level: None,
650 },
651 );
652
653 let result = compressor.compress(empty_block);
654 match result {
655 Ok((compressed, _)) => {
656 assert_eq!(compressed.num_values, 0);
657 }
658 Err(_) => {
659 }
661 }
662 }
663}