1use log::trace;
5
6use crate::{
7 buffer::LanceBuffer,
8 compression::MiniBlockDecompressor,
9 data::DataBlock,
10 encodings::{
11 logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
12 physical::block::{CompressionConfig, GeneralBufferCompressor},
13 },
14 format::{pb, ProtobufUtils},
15 Result,
16};
17
18#[derive(Debug)]
21pub struct GeneralMiniBlockCompressor {
22 inner: Box<dyn MiniBlockCompressor>,
23 compression: CompressionConfig,
24}
25
26impl GeneralMiniBlockCompressor {
27 pub fn new(inner: Box<dyn MiniBlockCompressor>, compression: CompressionConfig) -> Self {
28 Self { inner, compression }
29 }
30}
31
32const MIN_BUFFER_SIZE_FOR_COMPRESSION: usize = 4 * 1024;
34
35use super::super::logical::primitive::miniblock::MiniBlockChunk;
36
37impl MiniBlockCompressor for GeneralMiniBlockCompressor {
38 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
39 let (inner_compressed, inner_encoding) = self.inner.compress(page)?;
41
42 if inner_compressed.data.is_empty()
45 || inner_compressed.data[0].len() < MIN_BUFFER_SIZE_FOR_COMPRESSION
46 {
47 return Ok((inner_compressed, inner_encoding));
48 }
49
50 let first_buffer = &inner_compressed.data[0];
52 let mut compressed_first_buffer = Vec::new();
53 let mut new_chunks = Vec::with_capacity(inner_compressed.chunks.iter().len());
54 let mut offset = 0usize;
55 let mut total_original_size = 0usize;
56
57 let compressor = GeneralBufferCompressor::get_compressor(self.compression);
58
59 for chunk in &inner_compressed.chunks {
60 let chunk_first_buffer_size = chunk.buffer_sizes[0] as usize;
61
62 let chunk_data = &first_buffer.as_ref()[offset..offset + chunk_first_buffer_size];
63 total_original_size += chunk_first_buffer_size;
64
65 let compressed_start = compressed_first_buffer.len();
66 compressor.compress(chunk_data, &mut compressed_first_buffer)?;
67 let compressed_size = compressed_first_buffer.len() - compressed_start;
68
69 let mut new_buffer_sizes = chunk.buffer_sizes.clone();
71 new_buffer_sizes[0] = compressed_size as u16;
72
73 new_chunks.push(MiniBlockChunk {
74 buffer_sizes: new_buffer_sizes,
75 log_num_values: chunk.log_num_values,
76 });
77
78 offset += chunk_first_buffer_size;
79 }
80
81 let compressed_total_size = compressed_first_buffer.len();
83 if compressed_total_size >= total_original_size {
84 return Ok((inner_compressed, inner_encoding));
86 }
87
88 trace!(
89 "First buffer compressed from {} to {} bytes (ratio: {:.2})",
90 total_original_size,
91 compressed_total_size,
92 compressed_total_size as f32 / total_original_size as f32
93 );
94
95 let mut final_buffers = vec![LanceBuffer::from(compressed_first_buffer)];
97 final_buffers.extend(inner_compressed.data.into_iter().skip(1));
98
99 let compressed_result = MiniBlockCompressed {
100 data: final_buffers,
101 chunks: new_chunks,
102 num_values: inner_compressed.num_values,
103 };
104
105 let encoding = ProtobufUtils::general_mini_block(inner_encoding, self.compression);
107 Ok((compressed_result, encoding))
108 }
109}
110
111#[derive(Debug)]
114pub struct GeneralMiniBlockDecompressor {
115 inner: Box<dyn MiniBlockDecompressor>,
116 compression: CompressionConfig,
117}
118
119impl GeneralMiniBlockDecompressor {
120 pub fn new(inner: Box<dyn MiniBlockDecompressor>, compression: CompressionConfig) -> Self {
121 Self { inner, compression }
122 }
123}
124
125impl MiniBlockDecompressor for GeneralMiniBlockDecompressor {
126 fn decompress(&self, mut data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
127 let mut decompressed_buffer = Vec::new();
128
129 let decompressor = GeneralBufferCompressor::get_compressor(self.compression);
130 decompressor.decompress(&data[0], &mut decompressed_buffer)?;
131 data[0] = LanceBuffer::from(decompressed_buffer);
132
133 self.inner.decompress(data, num_values)
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
141 use crate::data::{BlockInfo, FixedWidthDataBlock};
142 use crate::encodings::physical::block::CompressionScheme;
143 use crate::encodings::physical::rle::RleMiniBlockEncoder;
144 use crate::encodings::physical::value::ValueEncoder;
145 use arrow_array::{Float64Array, Int32Array};
146
147 #[derive(Debug)]
148 struct TestCase {
149 name: &'static str,
150 inner_encoder: Box<dyn MiniBlockCompressor>,
151 compression: CompressionConfig,
152 data: DataBlock,
153 expected_compressed: bool, min_compression_ratio: f32, }
156
157 fn create_test_cases() -> Vec<TestCase> {
158 vec![
159 TestCase {
161 name: "small_rle_data",
162 inner_encoder: Box::new(RleMiniBlockEncoder),
163 compression: CompressionConfig {
164 scheme: CompressionScheme::Lz4,
165 level: None,
166 },
167 data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2]),
168 expected_compressed: false,
169 min_compression_ratio: 1.0,
170 },
171 TestCase {
173 name: "large_rle_lz4",
174 inner_encoder: Box::new(RleMiniBlockEncoder),
175 compression: CompressionConfig {
176 scheme: CompressionScheme::Lz4,
177 level: None,
178 },
179 data: create_pattern_i32_block(2048, |i| (i / 8) as i32),
180 expected_compressed: false, min_compression_ratio: 1.0,
182 },
183 TestCase {
185 name: "large_rle_zstd",
186 inner_encoder: Box::new(RleMiniBlockEncoder),
187 compression: CompressionConfig {
188 scheme: CompressionScheme::Zstd,
189 level: Some(3),
190 },
191 data: create_pattern_i32_block(8192, |i| (i / 16) as i32),
192 expected_compressed: true, min_compression_ratio: 0.9, },
195 TestCase {
197 name: "sequential_value_lz4",
198 inner_encoder: Box::new(ValueEncoder {}),
199 compression: CompressionConfig {
200 scheme: CompressionScheme::Lz4,
201 level: None,
202 },
203 data: create_pattern_i32_block(1024, |i| i as i32),
204 expected_compressed: false, min_compression_ratio: 1.0,
206 },
207 TestCase {
209 name: "float_value_zstd",
210 inner_encoder: Box::new(ValueEncoder {}),
211 compression: CompressionConfig {
212 scheme: CompressionScheme::Zstd,
213 level: Some(3),
214 },
215 data: create_pattern_f64_block(1024, |i| i as f64 * 0.1),
216 expected_compressed: true,
217 min_compression_ratio: 0.9,
218 },
219 ]
220 }
221
222 fn create_repeated_i32_block(values: Vec<i32>) -> DataBlock {
223 let array = Int32Array::from(values);
224 DataBlock::from_array(array)
225 }
226
227 fn create_pattern_i32_block<F>(size: usize, pattern: F) -> DataBlock
228 where
229 F: Fn(usize) -> i32,
230 {
231 let values: Vec<i32> = (0..size).map(pattern).collect();
232 let array = Int32Array::from(values);
233 DataBlock::from_array(array)
234 }
235
236 fn create_pattern_f64_block<F>(size: usize, pattern: F) -> DataBlock
237 where
238 F: Fn(usize) -> f64,
239 {
240 let values: Vec<f64> = (0..size).map(pattern).collect();
241 let array = Float64Array::from(values);
242 DataBlock::from_array(array)
243 }
244
245 fn run_round_trip_test(test_case: TestCase) {
246 let compressor =
247 GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression);
248
249 let (compressed, encoding) = compressor.compress(test_case.data).unwrap();
251
252 match &encoding.array_encoding {
254 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
255 assert!(
256 test_case.expected_compressed,
257 "{}: Expected compression to be applied",
258 test_case.name
259 );
260 assert_eq!(
261 cm.compression.as_ref().unwrap().scheme,
262 test_case.compression.scheme.to_string()
263 );
264 }
265 _ => {
266 if test_case.expected_compressed {
268 match &encoding.array_encoding {
270 Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
271 }
273 Some(pb::array_encoding::ArrayEncoding::Flat(_)) => {
274 }
276 _ => {
277 panic!(
278 "{}: Expected GeneralMiniBlock but got {:?}",
279 test_case.name, encoding.array_encoding
280 );
281 }
282 }
283 }
284 }
285 }
286
287 assert!(
289 !compressed.chunks.is_empty(),
290 "{}: No chunks created",
291 test_case.name
292 );
293
294 let decompressed_data = decompress_miniblock_chunks(&compressed, &encoding);
296
297 let bytes_per_value = if test_case.name.contains("float") {
301 8 } else {
303 4 };
305 let expected_bytes = compressed.num_values as usize * bytes_per_value;
306 assert_eq!(
307 expected_bytes,
308 decompressed_data.len(),
309 "{}: Data size mismatch",
310 test_case.name
311 );
312
313 if test_case.expected_compressed {
315 let compression_ratio = compressed.data[0].len() as f32 / expected_bytes as f32;
316 assert!(
317 compression_ratio <= test_case.min_compression_ratio,
318 "{}: Compression ratio {} > expected {}",
319 test_case.name,
320 compression_ratio,
321 test_case.min_compression_ratio
322 );
323 }
324 }
325
326 fn decompress_miniblock_chunks(
327 compressed: &MiniBlockCompressed,
328 encoding: &pb::ArrayEncoding,
329 ) -> Vec<u8> {
330 let mut decompressed_data = Vec::new();
331 let mut offsets = vec![0usize; compressed.data.len()]; let decompression_strategy = DefaultDecompressionStrategy::default();
333
334 for chunk in &compressed.chunks {
335 let chunk_values = if chunk.log_num_values > 0 {
336 1u64 << chunk.log_num_values
337 } else {
338 let decompressed_values =
340 decompressed_data.len() as u64 / get_bytes_per_value(compressed) as u64;
341 compressed.num_values.saturating_sub(decompressed_values)
342 };
343
344 let mut chunk_buffers = Vec::new();
346 for (i, &size) in chunk.buffer_sizes.iter().enumerate() {
347 if i < compressed.data.len() {
348 let buffer_data =
349 compressed.data[i].slice_with_length(offsets[i], size as usize);
350 chunk_buffers.push(buffer_data);
351 offsets[i] += size as usize;
352 }
353 }
354
355 let decompressor = decompression_strategy
357 .create_miniblock_decompressor(encoding, &decompression_strategy)
358 .unwrap();
359
360 let chunk_decompressed = decompressor
362 .decompress(chunk_buffers, chunk_values)
363 .unwrap();
364
365 match chunk_decompressed {
366 DataBlock::FixedWidth(ref block) => {
367 decompressed_data.extend_from_slice(block.data.as_ref());
368 }
369 _ => panic!("Expected FixedWidth block"),
370 }
371 }
372
373 decompressed_data
374 }
375
376 fn get_bytes_per_value(compressed: &MiniBlockCompressed) -> usize {
377 if compressed.num_values == 0 {
381 return 4; }
383
384 if compressed.num_values == 1024 {
386 return 8; }
388
389 4 }
391
392 #[test]
393 fn test_compressed_mini_block_table_driven() {
394 for test_case in create_test_cases() {
395 run_round_trip_test(test_case);
396 }
397 }
398
399 #[test]
400 fn test_compressed_mini_block_threshold() {
401 let small_test = TestCase {
403 name: "small_buffer_no_compression",
404 inner_encoder: Box::new(RleMiniBlockEncoder),
405 compression: CompressionConfig {
406 scheme: CompressionScheme::Lz4,
407 level: None,
408 },
409 data: create_repeated_i32_block(vec![1, 1, 2, 2]),
410 expected_compressed: false,
411 min_compression_ratio: 1.0,
412 };
413 run_round_trip_test(small_test);
414 }
415
416 #[test]
417 fn test_compressed_mini_block_with_doubles() {
418 let test_case = TestCase {
422 name: "float_values_with_zstd",
423 inner_encoder: Box::new(ValueEncoder {}),
424 compression: CompressionConfig {
425 scheme: CompressionScheme::Zstd,
426 level: Some(3),
427 },
428 data: create_pattern_f64_block(1024, |i| (i / 10) as f64),
430 expected_compressed: true,
431 min_compression_ratio: 0.5, };
433
434 run_round_trip_test(test_case);
435 }
436
437 #[test]
438 fn test_compressed_mini_block_large_buffers() {
439 let values: Vec<i32> = (0..1024).collect();
442 let data = LanceBuffer::from_bytes(
443 bytemuck::cast_slice(&values).to_vec().into(),
444 std::mem::align_of::<i32>() as u64,
445 );
446 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
447 bits_per_value: 32,
448 data,
449 num_values: 1024,
450 block_info: BlockInfo::new(),
451 });
452
453 let inner = Box::new(ValueEncoder {});
455 let compression = CompressionConfig {
456 scheme: CompressionScheme::Zstd,
457 level: Some(3),
458 };
459 let compressor = GeneralMiniBlockCompressor::new(inner, compression);
460
461 let (compressed, encoding) = compressor.compress(block).unwrap();
463
464 match &encoding.array_encoding {
466 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(cm)) => {
467 assert!(cm.inner.is_some());
468 assert_eq!(cm.compression.as_ref().unwrap().scheme, "zstd");
469 assert_eq!(cm.compression.as_ref().unwrap().level, Some(3));
470
471 match &cm.inner.as_ref().unwrap().array_encoding {
473 Some(pb::array_encoding::ArrayEncoding::Flat(flat)) => {
474 assert_eq!(flat.bits_per_value, 32);
475 }
476 _ => panic!("Expected Flat inner encoding"),
477 }
478 }
479 _ => panic!("Expected GeneralMiniBlock encoding"),
480 }
481
482 assert_eq!(compressed.num_values, 1024);
483 assert_eq!(compressed.data.len(), 1);
485 }
486
487 #[test]
490 fn test_compressed_mini_block_rle_multiple_buffers() {
491 let data = create_repeated_i32_block(vec![1; 100]);
493 let compressor = GeneralMiniBlockCompressor::new(
494 Box::new(RleMiniBlockEncoder),
495 CompressionConfig {
496 scheme: CompressionScheme::Lz4,
497 level: None,
498 },
499 );
500
501 let (compressed, _) = compressor.compress(data).unwrap();
502 assert_eq!(compressed.data.len(), 2);
504 }
505
506 #[test]
507 fn test_rle_with_general_miniblock_wrapper() {
508 let test_32 = TestCase {
516 name: "rle_32bit_with_general_wrapper",
517 inner_encoder: Box::new(RleMiniBlockEncoder),
518 compression: CompressionConfig {
519 scheme: CompressionScheme::Lz4,
520 level: None,
521 },
522 data: create_repeated_i32_block(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]),
523 expected_compressed: false, min_compression_ratio: 1.0,
525 };
526
527 let compressor = GeneralMiniBlockCompressor::new(
530 Box::new(RleMiniBlockEncoder),
531 CompressionConfig {
532 scheme: CompressionScheme::Lz4,
533 level: None,
534 },
535 );
536
537 let (_compressed, encoding) = compressor.compress(test_32.data).unwrap();
538
539 match &encoding.array_encoding {
541 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
542 match &gm.inner.as_ref().unwrap().array_encoding {
544 Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
545 assert_eq!(rle.bits_per_value, 32);
546 }
547 _ => panic!("Expected RLE as inner encoding"),
548 }
549 assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
551 }
552 Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
553 }
555 _ => panic!("Expected GeneralMiniBlock or Rle encoding"),
556 }
557
558 let values_64: Vec<i64> = vec![100i64; 50]
560 .into_iter()
561 .chain(vec![200i64; 50])
562 .chain(vec![300i64; 50])
563 .collect();
564 let array_64 = arrow_array::Int64Array::from(values_64);
565 let block_64 = DataBlock::from_array(array_64);
566
567 let compressor_64 = GeneralMiniBlockCompressor::new(
568 Box::new(RleMiniBlockEncoder),
569 CompressionConfig {
570 scheme: CompressionScheme::Lz4,
571 level: None,
572 },
573 );
574
575 let (_compressed_64, encoding_64) = compressor_64.compress(block_64).unwrap();
576
577 match &encoding_64.array_encoding {
579 Some(pb::array_encoding::ArrayEncoding::GeneralMiniBlock(gm)) => {
580 match &gm.inner.as_ref().unwrap().array_encoding {
582 Some(pb::array_encoding::ArrayEncoding::Rle(rle)) => {
583 assert_eq!(rle.bits_per_value, 64);
584 }
585 _ => panic!("Expected RLE as inner encoding for 64-bit"),
586 }
587 assert_eq!(gm.compression.as_ref().unwrap().scheme, "lz4");
589 }
590 Some(pb::array_encoding::ArrayEncoding::Rle(_)) => {
591 }
593 _ => panic!("Expected GeneralMiniBlock or Rle encoding for 64-bit"),
594 }
595 }
596
597 #[test]
598 fn test_compressed_mini_block_empty_data() {
599 let empty_array = Int32Array::from(vec![] as Vec<i32>);
600 let empty_block = DataBlock::from_array(empty_array);
601
602 let compressor = GeneralMiniBlockCompressor::new(
603 Box::new(ValueEncoder {}),
604 CompressionConfig {
605 scheme: CompressionScheme::Lz4,
606 level: None,
607 },
608 );
609
610 let result = compressor.compress(empty_block);
611 match result {
612 Ok((compressed, _)) => {
613 assert_eq!(compressed.num_values, 0);
614 }
615 Err(_) => {
616 }
618 }
619 }
620}