1use log::trace;
5
6use crate::{
7 buffer::LanceBuffer,
8 compression::MiniBlockDecompressor,
9 data::DataBlock,
10 encodings::{
11 logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
12 physical::block::{CompressionConfig, GeneralBufferCompressor},
13 },
14 format::{pb, ProtobufUtils},
15 Result,
16};
17
18#[derive(Debug)]
21pub struct GeneralMiniBlockCompressor {
22 inner: Box<dyn MiniBlockCompressor>,
23 compression: CompressionConfig,
24}
25
26impl GeneralMiniBlockCompressor {
27 pub fn new(inner: Box<dyn MiniBlockCompressor>, compression: CompressionConfig) -> Self {
28 Self { inner, compression }
29 }
30}
31
32const MIN_BUFFER_SIZE_FOR_COMPRESSION: usize = 4 * 1024;
34
35use super::super::logical::primitive::miniblock::MiniBlockChunk;
36
37impl MiniBlockCompressor for GeneralMiniBlockCompressor {
38 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
39 let (inner_compressed, inner_encoding) = self.inner.compress(page)?;
41
42 if inner_compressed.data.is_empty()
45 || inner_compressed.data[0].len() < MIN_BUFFER_SIZE_FOR_COMPRESSION
46 {
47 return Ok((inner_compressed, inner_encoding));
48 }
49
50 let first_buffer = &inner_compressed.data[0];
52 let mut compressed_first_buffer = Vec::new();
53 let mut new_chunks = Vec::with_capacity(inner_compressed.chunks.iter().len());
54 let mut offset = 0usize;
55 let mut total_original_size = 0usize;
56
57 let compressor = GeneralBufferCompressor::get_compressor(self.compression);
58
59 for chunk in &inner_compressed.chunks {
60 let chunk_first_buffer_size = chunk.buffer_sizes[0] as usize;
61
62 let chunk_data = &first_buffer.as_ref()[offset..offset + chunk_first_buffer_size];
63 total_original_size += chunk_first_buffer_size;
64
65 let compressed_start = compressed_first_buffer.len();
66 compressor.compress(chunk_data, &mut compressed_first_buffer)?;
67 let compressed_size = compressed_first_buffer.len() - compressed_start;
68
69 let mut new_buffer_sizes = chunk.buffer_sizes.clone();
71 new_buffer_sizes[0] = compressed_size as u16;
72
73 new_chunks.push(MiniBlockChunk {
74 buffer_sizes: new_buffer_sizes,
75 log_num_values: chunk.log_num_values,
76 });
77
78 offset += chunk_first_buffer_size;
79 }
80
81 let compressed_total_size = compressed_first_buffer.len();
83 if compressed_total_size >= total_original_size {
84 return Ok((inner_compressed, inner_encoding));
86 }
87
88 trace!(
89 "First buffer compressed from {} to {} bytes (ratio: {:.2})",
90 total_original_size,
91 compressed_total_size,
92 compressed_total_size as f32 / total_original_size as f32
93 );
94
95 let mut final_buffers = vec![LanceBuffer::from(compressed_first_buffer)];
97 final_buffers.extend(inner_compressed.data.into_iter().skip(1));
98
99 let compressed_result = MiniBlockCompressed {
100 data: final_buffers,
101 chunks: new_chunks,
102 num_values: inner_compressed.num_values,
103 };
104
105 let encoding = ProtobufUtils::general_mini_block(inner_encoding, self.compression);
107 Ok((compressed_result, encoding))
108 }
109}
110
111#[derive(Debug)]
114pub struct GeneralMiniBlockDecompressor {
115 inner: Box<dyn MiniBlockDecompressor>,
116 compression: CompressionConfig,
117}
118
119impl GeneralMiniBlockDecompressor {
120 pub fn new(inner: Box<dyn MiniBlockDecompressor>, compression: CompressionConfig) -> Self {
121 Self { inner, compression }
122 }
123}
124
125impl MiniBlockDecompressor for GeneralMiniBlockDecompressor {
126 fn decompress(&self, mut data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
127 let mut decompressed_buffer = Vec::new();
128
129 let decompressor = GeneralBufferCompressor::get_compressor(self.compression);
130 decompressor.decompress(&data[0], &mut decompressed_buffer)?;
131 data[0] = LanceBuffer::from(decompressed_buffer);
132
133 self.inner.decompress(data, num_values)
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
141 use crate::data::{BlockInfo, FixedWidthDataBlock};
142 use crate::encodings::physical::block::CompressionScheme;
143 use crate::encodings::physical::rle::RleMiniBlockEncoder;
144 use crate::encodings::physical::value::ValueEncoder;
145 use arrow_array::{Float64Array, Int32Array};
146
147 #[derive(Debug)]
148 struct TestCase {
149 name: &'static str,
150 inner_encoder: Box<dyn MiniBlockCompressor>,
151 compression: CompressionConfig,
152 data: DataBlock,
153 expected_compressed: bool, min_compression_ratio: f32, }
156
157 fn create_test_cases() -> Vec<TestCase> {
158 vec![
159 TestCase {
161 name: "small_rle_data",
162 inner_encoder: Box::new(RleMiniBlockEncoder),
163 compression: CompressionConfig {
164 scheme: CompressionScheme::Lz4,
165 level: None,
166 },
167 data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2]),
168 expected_compressed: false,
169 min_compression_ratio: 1.0,
170 },
171 TestCase {
173 name: "large_rle_lz4",
174 inner_encoder: Box::new(RleMiniBlockEncoder),
175 compression: CompressionConfig {
176 scheme: CompressionScheme::Lz4,
177 level: None,
178 },
179 data: create_pattern_i32_block(2048, |i| (i / 8) as i32),
180 expected_compressed: false, min_compression_ratio: 1.0,
182 },
183 TestCase {
185 name: "large_rle_zstd",
186 inner_encoder: Box::new(RleMiniBlockEncoder),
187 compression: CompressionConfig {
188 scheme: CompressionScheme::Zstd,
189 level: Some(3),
190 },
191 data: create_pattern_i32_block(8192, |i| (i / 16) as i32),
192 expected_compressed: true, min_compression_ratio: 0.9, },
195 TestCase {
197 name: "sequential_value_lz4",
198 inner_encoder: Box::new(ValueEncoder {}),
199 compression: CompressionConfig {
200 scheme: CompressionScheme::Lz4,
201 level: None,
202 },
203 data: create_pattern_i32_block(1024, |i| i as i32),
204 expected_compressed: false, min_compression_ratio: 1.0,
206 },
207 TestCase {
209 name: "float_value_zstd",
210 inner_encoder: Box::new(ValueEncoder {}),
211 compression: CompressionConfig {
212 scheme: CompressionScheme::Zstd,
213 level: Some(3),
214 },
215 data: create_pattern_f64_block(1024, |i| i as f64 * 0.1),
216 expected_compressed: true,
217 min_compression_ratio: 0.9,
218 },
219 ]
220 }
221
222 fn create_repeated_i32_block(values: Vec<i32>) -> DataBlock {
223 let array = Int32Array::from(values);
224 DataBlock::from_array(array)
225 }
226
227 fn create_pattern_i32_block<F>(size: usize, pattern: F) -> DataBlock
228 where
229 F: Fn(usize) -> i32,
230 {
231 let values: Vec<i32> = (0..size).map(pattern).collect();
232 let array = Int32Array::from(values);
233 DataBlock::from_array(array)
234 }
235
236 fn create_pattern_f64_block<F>(size: usize, pattern: F) -> DataBlock
237 where
238 F: Fn(usize) -> f64,
239 {
240 let values: Vec<f64> = (0..size).map(pattern).collect();
241 let array = Float64Array::from(values);
242 DataBlock::from_array(array)
243 }
244
245 fn run_round_trip_test(test_case: TestCase) {
246 let compressor =
247 GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression);
248
249 let (compressed, encoding) = compressor.compress(test_case.data).unwrap();
251
252 match &encoding.array_encoding {
254 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
255 assert!(
256 test_case.expected_compressed,
257 "{}: Expected compression to be applied",
258 test_case.name
259 );
260 assert_eq!(
261 cm.compression.as_ref().unwrap().scheme,
262 test_case.compression.scheme.to_string()
263 );
264 }
265 _ => {
266 if test_case.expected_compressed {
268 match &encoding.array_encoding {
270 Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
271 }
273 Some(pb::array_encoding::ArrayEncoding::Flat(_)) => {
274 }
276 _ => {
277 panic!(
278 "{}: Expected GeneralMiniBlock but got {:?}",
279 test_case.name, encoding.array_encoding
280 );
281 }
282 }
283 }
284 }
285 }
286
287 assert!(
289 !compressed.chunks.is_empty(),
290 "{}: No chunks created",
291 test_case.name
292 );
293
294 let decompressed_data = decompress_miniblock_chunks(&compressed, &encoding);
296
297 let bytes_per_value = if test_case.name.contains("float") {
301 8 } else {
303 4 };
305 let expected_bytes = compressed.num_values as usize * bytes_per_value;
306 assert_eq!(
307 expected_bytes,
308 decompressed_data.len(),
309 "{}: Data size mismatch",
310 test_case.name
311 );
312
313 if test_case.expected_compressed {
315 let compression_ratio = compressed.data[0].len() as f32 / expected_bytes as f32;
316 assert!(
317 compression_ratio <= test_case.min_compression_ratio,
318 "{}: Compression ratio {} > expected {}",
319 test_case.name,
320 compression_ratio,
321 test_case.min_compression_ratio
322 );
323 }
324 }
325
326 fn decompress_miniblock_chunks(
327 compressed: &MiniBlockCompressed,
328 encoding: &pb::ArrayEncoding,
329 ) -> Vec<u8> {
330 let mut decompressed_data = Vec::new();
331 let mut offsets = vec![0usize; compressed.data.len()]; let decompression_strategy = DefaultDecompressionStrategy::default();
333
334 for chunk in &compressed.chunks {
335 let chunk_values = if chunk.log_num_values > 0 {
336 1u64 << chunk.log_num_values
337 } else {
338 let decompressed_values =
340 decompressed_data.len() as u64 / get_bytes_per_value(compressed) as u64;
341 compressed.num_values.saturating_sub(decompressed_values)
342 };
343
344 let mut chunk_buffers = Vec::new();
346 for (i, &size) in chunk.buffer_sizes.iter().enumerate() {
347 if i < compressed.data.len() {
348 let buffer_data =
349 compressed.data[i].slice_with_length(offsets[i], size as usize);
350 chunk_buffers.push(buffer_data);
351 offsets[i] += size as usize;
352 }
353 }
354
355 let decompressor = decompression_strategy
357 .create_miniblock_decompressor(encoding, &decompression_strategy)
358 .unwrap();
359
360 let chunk_decompressed = decompressor
362 .decompress(chunk_buffers, chunk_values)
363 .unwrap();
364
365 match chunk_decompressed {
366 DataBlock::FixedWidth(ref block) => {
367 decompressed_data.extend_from_slice(block.data.as_ref());
368 }
369 _ => panic!("Expected FixedWidth block"),
370 }
371 }
372
373 decompressed_data
374 }
375
376 fn get_bytes_per_value(compressed: &MiniBlockCompressed) -> usize {
377 if compressed.num_values == 0 {
381 return 4; }
383
384 if compressed.num_values == 1024 {
386 return 8; }
388
389 4 }
391
392 #[test]
393 fn test_compressed_mini_block_table_driven() {
394 for test_case in create_test_cases() {
395 run_round_trip_test(test_case);
396 }
397 }
398
399 #[test]
400 fn test_compressed_mini_block_threshold() {
401 let small_test = TestCase {
403 name: "small_buffer_no_compression",
404 inner_encoder: Box::new(RleMiniBlockEncoder),
405 compression: CompressionConfig {
406 scheme: CompressionScheme::Lz4,
407 level: None,
408 },
409 data: create_repeated_i32_block(vec![1, 1, 2, 2]),
410 expected_compressed: false,
411 min_compression_ratio: 1.0,
412 };
413 run_round_trip_test(small_test);
414 }
415
416 #[test]
417 fn test_compressed_mini_block_with_doubles() {
418 let test_case = TestCase {
422 name: "float_values_with_zstd",
423 inner_encoder: Box::new(ValueEncoder {}),
424 compression: CompressionConfig {
425 scheme: CompressionScheme::Zstd,
426 level: Some(3),
427 },
428 data: create_pattern_f64_block(1024, |i| (i / 10) as f64),
430 expected_compressed: true,
431 min_compression_ratio: 0.5, };
433
434 run_round_trip_test(test_case);
435 }
436
437 #[test]
438 fn test_compressed_mini_block_with_zstd() {
439 let mut values = Vec::with_capacity(8192);
442 for i in 0..512 {
444 values.extend(vec![i as u16; 16]);
445 }
446 let data = LanceBuffer::from_bytes(
447 bytemuck::cast_slice(&values).to_vec().into(),
448 std::mem::align_of::<u16>() as u64,
449 );
450 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
451 bits_per_value: 16,
452 data,
453 num_values: 8192,
454 block_info: BlockInfo::new(),
455 });
456
457 let inner = Box::new(RleMiniBlockEncoder);
459 let compression = CompressionConfig {
460 scheme: CompressionScheme::Zstd,
461 level: Some(3),
462 };
463 let compressor = GeneralMiniBlockCompressor::new(inner, compression);
464
465 let (compressed, encoding) = compressor.compress(block).unwrap();
467
468 match &encoding.array_encoding {
470 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
471 assert!(cm.inner.is_some());
472 assert_eq!(cm.compression.as_ref().unwrap().scheme, "zstd");
473 assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
474 }
475 Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {}
476 _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
477 }
478
479 assert_eq!(compressed.num_values, 8192);
481 assert_eq!(compressed.data.len(), 2);
482 }
483
484 #[test]
485 fn test_compressed_mini_block_large_buffers() {
486 let values: Vec<i32> = (0..1024).collect();
489 let data = LanceBuffer::from_bytes(
490 bytemuck::cast_slice(&values).to_vec().into(),
491 std::mem::align_of::<i32>() as u64,
492 );
493 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
494 bits_per_value: 32,
495 data,
496 num_values: 1024,
497 block_info: BlockInfo::new(),
498 });
499
500 let inner = Box::new(ValueEncoder {});
502 let compression = CompressionConfig {
503 scheme: CompressionScheme::Zstd,
504 level: Some(3),
505 };
506 let compressor = GeneralMiniBlockCompressor::new(inner, compression);
507
508 let (compressed, encoding) = compressor.compress(block).unwrap();
510
511 match &encoding.array_encoding {
513 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
514 assert!(cm.inner.is_some());
515 assert_eq!(cm.compression.as_ref().unwrap().scheme, "zstd");
516 assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
517
518 match &cm.inner.as_ref().unwrap().array_encoding {
520 Some(pb::array_encoding::ArrayEncoding::Flat(flat)) => {
521 assert_eq!(flat.bits_per_value, 32);
522 }
523 _ => panic!("Expected Flat inner encoding"),
524 }
525 }
526 _ => panic!("Expected GeneralMiniBlock encoding"),
527 }
528
529 assert_eq!(compressed.num_values, 1024);
530 assert_eq!(compressed.data.len(), 1);
532 }
533
534 #[test]
535 fn test_compressed_mini_block_with_lz4() {
536 let mut values = Vec::with_capacity(2048);
538 for i in 0..256i32 {
540 for _ in 0..8 {
541 values.push(i);
542 }
543 }
544
545 let data = LanceBuffer::from_bytes(
546 bytemuck::cast_slice(&values).to_vec().into(),
547 std::mem::align_of::<i32>() as u64,
548 );
549 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
550 bits_per_value: 32,
551 data,
552 num_values: 2048,
553 block_info: BlockInfo::new(),
554 });
555
556 let inner = Box::new(ValueEncoder {});
558 let compression = CompressionConfig {
559 scheme: CompressionScheme::Lz4,
560 level: None,
561 };
562 let compressor = GeneralMiniBlockCompressor::new(inner, compression);
563
564 let (compressed, encoding) = compressor.compress(block).unwrap();
566
567 match &encoding.array_encoding {
569 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
570 assert!(cm.inner.is_some());
571 assert_eq!(cm.compression.as_ref().unwrap().scheme, "lz4");
572 assert_eq!(cm.compression.as_ref().unwrap().level, None);
573
574 match &cm.inner.as_ref().unwrap().array_encoding {
576 Some(pb::array_encoding::ArrayEncoding::Flat(flat)) => {
577 assert_eq!(flat.bits_per_value, 32);
578 }
579 _ => panic!("Expected Flat inner encoding"),
580 }
581 }
582 _ => panic!("Expected GeneralMiniBlock encoding"),
583 }
584
585 assert_eq!(compressed.num_values, 2048);
586 assert_eq!(compressed.data.len(), 1);
588
589 let mut decompressed_data = Vec::new();
594 let mut offset = 0usize;
595
596 for chunk in &compressed.chunks {
597 let chunk_size = chunk.buffer_sizes[0] as usize;
598 let chunk_values = if chunk.log_num_values > 0 {
599 1u64 << chunk.log_num_values
600 } else {
601 compressed.num_values - decompressed_data.len() as u64 / 4
603 };
604
605 let chunk_compressed_data = compressed.data[0].slice_with_length(offset, chunk_size);
607
608 let decompression_strategy = DefaultDecompressionStrategy::default();
610 let decompressor = decompression_strategy
611 .create_miniblock_decompressor(&encoding, &decompression_strategy)
612 .unwrap();
613
614 let chunk_decompressed = decompressor
616 .decompress(vec![chunk_compressed_data], chunk_values)
617 .unwrap();
618
619 match chunk_decompressed {
620 DataBlock::FixedWidth(ref block) => {
621 decompressed_data.extend_from_slice(block.data.as_ref());
622 }
623 _ => panic!("Expected FixedWidth block"),
624 }
625
626 offset += chunk_size;
627 }
628
629 let decompressed_values: &[i32] = bytemuck::cast_slice(&decompressed_data);
631 assert_eq!(values.len(), decompressed_values.len());
632 assert_eq!(values, decompressed_values);
633 }
634
635 #[test]
638 fn test_compressed_mini_block_rle_multiple_buffers() {
639 let data = create_repeated_i32_block(vec![1; 100]);
641 let compressor = GeneralMiniBlockCompressor::new(
642 Box::new(RleMiniBlockEncoder),
643 CompressionConfig {
644 scheme: CompressionScheme::Lz4,
645 level: None,
646 },
647 );
648
649 let (compressed, _) = compressor.compress(data).unwrap();
650 assert_eq!(compressed.data.len(), 2);
652 }
653
654 #[test]
655 fn test_rle_with_general_miniblock_wrapper() {
656 let test_32 = TestCase {
664 name: "rle_32bit_with_general_wrapper",
665 inner_encoder: Box::new(RleMiniBlockEncoder),
666 compression: CompressionConfig {
667 scheme: CompressionScheme::Lz4,
668 level: None,
669 },
670 data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]),
671 expected_compressed: false, min_compression_ratio: 1.0,
673 };
674
675 let compressor = GeneralMiniBlockCompressor::new(
678 Box::new(RleMiniBlockEncoder),
679 CompressionConfig {
680 scheme: CompressionScheme::Lz4,
681 level: None,
682 },
683 );
684
685 let (_compressed, encoding) = compressor.compress(test_32.data).unwrap();
686
687 match &encoding.array_encoding {
689 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
690 match &gm.inner.as_ref().unwrap().array_encoding {
692 Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
693 assert_eq!(rle.bits_per_value, 32);
694 }
695 _ => panic!("Expected RLE as inner encoding"),
696 }
697 assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
699 }
700 Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
701 }
703 _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
704 }
705
706 let values_64: Vec<i64> = vec![100i64; 50]
708 .into_iter()
709 .chain(vec![200i64; 50])
710 .chain(vec![300i64; 50])
711 .collect();
712 let array_64 = arrow_array::Int64Array::from(values_64);
713 let block_64 = DataBlock::from_array(array_64);
714
715 let compressor_64 = GeneralMiniBlockCompressor::new(
716 Box::new(RleMiniBlockEncoder),
717 CompressionConfig {
718 scheme: CompressionScheme::Lz4,
719 level: None,
720 },
721 );
722
723 let (_compressed_64, encoding_64) = compressor_64.compress(block_64).unwrap();
724
725 match &encoding_64.array_encoding {
727 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
728 match &gm.inner.as_ref().unwrap().array_encoding {
730 Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
731 assert_eq!(rle.bits_per_value, 64);
732 }
733 _ => panic!("Expected RLE as inner encoding for 64-bit"),
734 }
735 assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
737 }
738 Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
739 }
741 _ => panic!("Expected GeneralMiniBlock or Rle encoding for 64-bit"),
742 }
743 }
744
745 #[test]
746 fn test_compressed_mini_block_empty_data() {
747 let empty_array = Int32Array::from(vec![] as Vec<i32>);
748 let empty_block = DataBlock::from_array(empty_array);
749
750 let compressor = GeneralMiniBlockCompressor::new(
751 Box::new(ValueEncoder {}),
752 CompressionConfig {
753 scheme: CompressionScheme::Lz4,
754 level: None,
755 },
756 );
757
758 let result = compressor.compress(empty_block);
759 match result {
760 Ok((compressed, _)) => {
761 assert_eq!(compressed.num_values, 0);
762 }
763 Err(_) => {
764 }
766 }
767 }
768}