1use log::trace;
52use snafu::location;
53
54use crate::buffer::LanceBuffer;
55use crate::compression::MiniBlockDecompressor;
56use crate::data::DataBlock;
57use crate::data::{BlockInfo, FixedWidthDataBlock};
58use crate::encodings::logical::primitive::miniblock::{
59 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
60 MAX_MINIBLOCK_VALUES,
61};
62use crate::format::{pb, ProtobufUtils};
63
64use lance_core::{Error, Result};
65
66#[derive(Debug, Default)]
68pub struct RleMiniBlockEncoder;
69
70impl RleMiniBlockEncoder {
71 pub fn new() -> Self {
72 Self
73 }
74
75 fn encode_data(
76 &self,
77 data: &LanceBuffer,
78 num_values: u64,
79 bits_per_value: u64,
80 ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
81 if num_values == 0 {
82 return Ok((Vec::new(), Vec::new()));
83 }
84
85 let bytes_per_value = (bits_per_value / 8) as usize;
86
87 let estimated_runs = num_values as usize / 10;
90 let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
91 let mut all_lengths = Vec::with_capacity(estimated_runs);
92 let mut chunks = Vec::new();
93
94 let mut offset = 0usize;
95 let mut values_remaining = num_values as usize;
96
97 while values_remaining > 0 {
98 let values_start = all_values.len();
99 let lengths_start = all_lengths.len();
100
101 let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
102 8 => self.encode_chunk_rolling::<u8>(
103 data,
104 offset,
105 values_remaining,
106 &mut all_values,
107 &mut all_lengths,
108 ),
109 16 => self.encode_chunk_rolling::<u16>(
110 data,
111 offset,
112 values_remaining,
113 &mut all_values,
114 &mut all_lengths,
115 ),
116 32 => self.encode_chunk_rolling::<u32>(
117 data,
118 offset,
119 values_remaining,
120 &mut all_values,
121 &mut all_lengths,
122 ),
123 64 => self.encode_chunk_rolling::<u64>(
124 data,
125 offset,
126 values_remaining,
127 &mut all_values,
128 &mut all_lengths,
129 ),
130 _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
131 };
132
133 if values_processed == 0 {
134 break;
135 }
136
137 let log_num_values = if is_last_chunk {
138 0
139 } else {
140 assert!(
141 values_processed.is_power_of_two(),
142 "Non-last chunk must have power-of-2 values"
143 );
144 values_processed.ilog2() as u8
145 };
146
147 let values_size = all_values.len() - values_start;
148 let lengths_size = all_lengths.len() - lengths_start;
149
150 let chunk = MiniBlockChunk {
151 buffer_sizes: vec![values_size as u16, lengths_size as u16],
152 log_num_values,
153 };
154
155 chunks.push(chunk);
156
157 offset += values_processed;
158 values_remaining -= values_processed;
159 }
160
161 Ok((
163 vec![
164 LanceBuffer::Owned(all_values),
165 LanceBuffer::Owned(all_lengths),
166 ],
167 chunks,
168 ))
169 }
170
171 fn encode_chunk_rolling<T>(
188 &self,
189 data: &LanceBuffer,
190 offset: usize,
191 values_remaining: usize,
192 all_values: &mut Vec<u8>,
193 all_lengths: &mut Vec<u8>,
194 ) -> (usize, usize, bool)
195 where
196 T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug,
197 {
198 let type_size = std::mem::size_of::<T>();
199 let data_slice = data.as_ref();
200
201 let chunk_start = offset * type_size;
202 let max_by_count = MAX_MINIBLOCK_VALUES as usize;
203 let max_values = values_remaining.min(max_by_count);
204 let chunk_end = chunk_start + max_values * type_size;
205
206 if chunk_start >= data_slice.len() {
207 return (0, 0, false);
208 }
209
210 let chunk_data = &data_slice[chunk_start..chunk_end.min(data_slice.len())];
211 let typed_data: &[T] = bytemuck::cast_slice(chunk_data);
212
213 if typed_data.is_empty() {
214 return (0, 0, false);
215 }
216
217 let values_start = all_values.len();
219
220 let mut current_value = typed_data[0];
221 let mut current_length = 1u64;
222 let mut bytes_used = 0usize;
223 let mut total_values_encoded = 0usize; let checkpoints = match type_size {
229 1 => vec![256, 512, 1024, 2048, 4096], 2 => vec![128, 256, 512, 1024, 2048, 4096], _ => vec![64, 128, 256, 512, 1024, 2048, 4096], };
233 let valid_checkpoints: Vec<usize> = checkpoints
234 .into_iter()
235 .filter(|&p| p <= values_remaining)
236 .collect();
237 let mut checkpoint_idx = 0;
238
239 let mut last_checkpoint_state = None;
241
242 for &value in typed_data[1..].iter() {
243 if value == current_value {
244 current_length += 1;
245 } else {
246 let run_chunks = current_length.div_ceil(255) as usize;
248 let bytes_needed = run_chunks * (type_size + 1);
249
250 if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
252 if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
253 all_values.truncate(val_pos);
255 all_lengths.truncate(len_pos);
256 let num_runs = (val_pos - values_start) / type_size;
257 return (num_runs, checkpoint_values, false);
258 }
259 break;
260 }
261
262 bytes_used += self.add_run(¤t_value, current_length, all_values, all_lengths);
263 total_values_encoded += current_length as usize;
264 current_value = value;
265 current_length = 1;
266 }
267
268 if checkpoint_idx < valid_checkpoints.len()
270 && total_values_encoded >= valid_checkpoints[checkpoint_idx]
271 {
272 last_checkpoint_state = Some((
273 all_values.len(),
274 all_lengths.len(),
275 bytes_used,
276 valid_checkpoints[checkpoint_idx],
277 ));
278 checkpoint_idx += 1;
279 }
280 }
281
282 if current_length > 0 {
285 let run_chunks = current_length.div_ceil(255) as usize;
286 let bytes_needed = run_chunks * (type_size + 1);
287
288 if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
289 let _ = self.add_run(¤t_value, current_length, all_values, all_lengths);
290 total_values_encoded += current_length as usize;
291 }
292 }
293
294 let is_last_chunk = total_values_encoded == values_remaining;
296
297 if !is_last_chunk {
299 if total_values_encoded.is_power_of_two() {
300 } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
302 all_values.truncate(val_pos);
304 all_lengths.truncate(len_pos);
305 let num_runs = (val_pos - values_start) / type_size;
306 return (num_runs, checkpoint_values, false);
307 } else {
308 return (0, 0, false);
310 }
311 }
312
313 let num_runs = (all_values.len() - values_start) / type_size;
314 (num_runs, total_values_encoded, is_last_chunk)
315 }
316
317 fn add_run<T>(
318 &self,
319 value: &T,
320 length: u64,
321 all_values: &mut Vec<u8>,
322 all_lengths: &mut Vec<u8>,
323 ) -> usize
324 where
325 T: bytemuck::Pod,
326 {
327 let value_bytes = bytemuck::bytes_of(value);
328 let type_size = std::mem::size_of::<T>();
329 let num_full_chunks = (length / 255) as usize;
330 let remainder = (length % 255) as u8;
331
332 let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
333 all_values.reserve(total_chunks * type_size);
334 all_lengths.reserve(total_chunks);
335
336 for _ in 0..num_full_chunks {
337 all_values.extend_from_slice(value_bytes);
338 all_lengths.push(255);
339 }
340
341 if remainder > 0 {
342 all_values.extend_from_slice(value_bytes);
343 all_lengths.push(remainder);
344 }
345
346 total_chunks * (type_size + 1)
347 }
348}
349
350impl MiniBlockCompressor for RleMiniBlockEncoder {
351 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
352 match data {
353 DataBlock::FixedWidth(fixed_width) => {
354 let num_values = fixed_width.num_values;
355 let bits_per_value = fixed_width.bits_per_value;
356
357 let (all_buffers, chunks) =
358 self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
359
360 let compressed = MiniBlockCompressed {
361 data: all_buffers,
362 chunks,
363 num_values,
364 };
365
366 let encoding = ProtobufUtils::rle(bits_per_value);
367
368 Ok((compressed, encoding))
369 }
370 _ => Err(Error::InvalidInput {
371 location: location!(),
372 source: "RLE encoding only supports FixedWidth data blocks".into(),
373 }),
374 }
375 }
376}
377
378#[derive(Debug)]
380pub struct RleMiniBlockDecompressor {
381 bits_per_value: u64,
382}
383
384impl RleMiniBlockDecompressor {
385 pub fn new(bits_per_value: u64) -> Self {
386 Self { bits_per_value }
387 }
388
389 fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
390 if num_values == 0 {
391 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
392 bits_per_value: self.bits_per_value,
393 data: LanceBuffer::Owned(vec![]),
394 num_values: 0,
395 block_info: BlockInfo::default(),
396 }));
397 }
398
399 assert_eq!(
400 data.len(),
401 2,
402 "RLE decompressor expects exactly 2 buffers, got {}",
403 data.len()
404 );
405
406 let values_buffer = &data[0];
407 let lengths_buffer = &data[1];
408
409 let decoded_data = match self.bits_per_value {
410 8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
411 16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
412 32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
413 64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
414 _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
415 };
416
417 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
418 bits_per_value: self.bits_per_value,
419 data: LanceBuffer::Owned(decoded_data),
420 num_values,
421 block_info: BlockInfo::default(),
422 }))
423 }
424
425 fn decode_generic<T>(
426 &self,
427 values_buffer: &LanceBuffer,
428 lengths_buffer: &LanceBuffer,
429 num_values: u64,
430 ) -> Result<Vec<u8>>
431 where
432 T: bytemuck::Pod + Copy + std::fmt::Debug,
433 {
434 let values_bytes = values_buffer.as_ref();
435 let lengths_bytes = lengths_buffer.as_ref();
436
437 let type_size = std::mem::size_of::<T>();
438
439 if values_bytes.is_empty() || lengths_bytes.is_empty() {
440 if num_values == 0 {
441 return Ok(Vec::new());
442 } else {
443 return Err(Error::InvalidInput {
444 location: location!(),
445 source: format!("Empty buffers but expected {} values", num_values).into(),
446 });
447 }
448 }
449
450 if values_bytes.len() % type_size != 0 || lengths_bytes.is_empty() {
451 return Err(Error::InvalidInput {
452 location: location!(),
453 source: format!(
454 "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
455 std::any::type_name::<T>(),
456 values_bytes.len(),
457 type_size,
458 lengths_bytes.len()
459 )
460 .into(),
461 });
462 }
463
464 let num_runs = values_bytes.len() / type_size;
465 let num_length_entries = lengths_bytes.len();
466 assert_eq!(
467 num_runs, num_length_entries,
468 "Inconsistent RLE buffers: {} runs but {} length entries",
469 num_runs, num_length_entries
470 );
471
472 let values: &[T] = bytemuck::cast_slice(values_bytes);
473 let lengths: &[u8] = lengths_bytes;
474
475 let expected_byte_count = num_values as usize * type_size;
476 let mut decoded = Vec::with_capacity(expected_byte_count);
477
478 for (value, &length) in values.iter().zip(lengths.iter()) {
479 let run_length = length as usize;
480 let bytes_to_write = run_length * type_size;
481 let bytes_of_value = bytemuck::bytes_of(value);
482
483 if decoded.len() + bytes_to_write > expected_byte_count {
484 let remaining_bytes = expected_byte_count - decoded.len();
485 let remaining_values = remaining_bytes / type_size;
486
487 for _ in 0..remaining_values {
488 decoded.extend_from_slice(bytes_of_value);
489 }
490 break;
491 }
492
493 for _ in 0..run_length {
494 decoded.extend_from_slice(bytes_of_value);
495 }
496 }
497
498 if decoded.len() != expected_byte_count {
499 return Err(Error::InvalidInput {
500 location: location!(),
501 source: format!(
502 "RLE decoding produced {} bytes, expected {}",
503 decoded.len(),
504 expected_byte_count
505 )
506 .into(),
507 });
508 }
509
510 trace!(
511 "RLE decoded {} {} values",
512 num_values,
513 std::any::type_name::<T>()
514 );
515 Ok(decoded)
516 }
517}
518
519impl MiniBlockDecompressor for RleMiniBlockDecompressor {
520 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
521 self.decode_data(data, num_values)
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
529 use crate::data::DataBlock;
530 use arrow_array::Int32Array;
531 use lance_core::datatypes::Field;
532
533 #[test]
536 fn test_basic_rle_encoding() {
537 let encoder = RleMiniBlockEncoder::new();
538
539 let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
541 let data_block = DataBlock::from_array(array);
542
543 let (compressed, _) = encoder.compress(data_block).unwrap();
544
545 assert_eq!(compressed.num_values, 9);
546 assert_eq!(compressed.chunks.len(), 1);
547
548 let values_buffer = &compressed.data[0];
550 let lengths_buffer = &compressed.data[1];
551 assert_eq!(values_buffer.len(), 12); assert_eq!(lengths_buffer.len(), 3); }
554
555 #[test]
556 fn test_long_run_splitting() {
557 let encoder = RleMiniBlockEncoder::new();
558
559 let mut data = vec![42i32; 1000]; data.extend(&[100i32; 300]); let array = Int32Array::from(data);
564 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
565
566 let lengths_buffer = &compressed.data[1];
568 assert_eq!(lengths_buffer.len(), 6);
569 }
570
571 #[test]
572 fn test_compression_strategy_selection() {
573 let strategy = DefaultCompressionStrategy;
574 let field = Field::new_arrow("test", arrow_schema::DataType::Int32, false).unwrap();
575
576 let repetitive_array = Int32Array::from(vec![1; 1000]);
578 let repetitive_block = DataBlock::from_array(repetitive_array);
579
580 let compressor = strategy
581 .create_miniblock_compressor(&field, &repetitive_block)
582 .unwrap();
583 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
584
585 let unique_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
587 let unique_block = DataBlock::from_array(unique_array);
588
589 let compressor = strategy
590 .create_miniblock_compressor(&field, &unique_block)
591 .unwrap();
592 assert!(!format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
593 }
594
595 #[test]
598 fn test_round_trip_all_types() {
599 test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
601
602 test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
604
605 test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
607
608 test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
610 }
611
612 fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
613 where
614 T: bytemuck::Pod + PartialEq + std::fmt::Debug,
615 {
616 let encoder = RleMiniBlockEncoder::new();
617 let bytes: Vec<u8> = data
618 .iter()
619 .flat_map(|v| bytemuck::bytes_of(v))
620 .copied()
621 .collect();
622
623 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
624 bits_per_value,
625 data: LanceBuffer::Owned(bytes),
626 num_values: data.len() as u64,
627 block_info: BlockInfo::default(),
628 });
629
630 let (compressed, _) = encoder.compress(block).unwrap();
631 let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
632 let decompressed = decompressor
633 .decompress(compressed.data, compressed.num_values)
634 .unwrap();
635
636 match decompressed {
637 DataBlock::FixedWidth(ref block) => {
638 assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
640 }
641 _ => panic!("Expected FixedWidth block"),
642 }
643 }
644
645 #[test]
648 fn test_power_of_two_chunking() {
649 let encoder = RleMiniBlockEncoder::new();
650
651 let test_sizes = vec![1000, 2500, 5000, 10000];
653
654 for size in test_sizes {
655 let data: Vec<i32> = (0..size)
656 .map(|i| i / 50) .collect();
658
659 let array = Int32Array::from(data);
660 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
661
662 for (i, chunk) in compressed.chunks.iter().enumerate() {
664 if i < compressed.chunks.len() - 1 {
665 assert!(chunk.log_num_values > 0);
666 let chunk_values = 1u64 << chunk.log_num_values;
667 assert!(chunk_values.is_power_of_two());
668 assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
669 } else {
670 assert_eq!(chunk.log_num_values, 0);
671 }
672 }
673 }
674 }
675
676 #[test]
679 #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
680 fn test_invalid_buffer_count() {
681 let decompressor = RleMiniBlockDecompressor::new(32);
682 let _ = decompressor.decompress(vec![LanceBuffer::Owned(vec![1, 2, 3, 4])], 10);
683 }
684
685 #[test]
686 #[should_panic(expected = "Inconsistent RLE buffers")]
687 fn test_buffer_consistency() {
688 let decompressor = RleMiniBlockDecompressor::new(32);
689 let values = LanceBuffer::Owned(vec![1, 0, 0, 0]); let lengths = LanceBuffer::Owned(vec![5, 10]); let _ = decompressor.decompress(vec![values, lengths], 15);
692 }
693
694 #[test]
695 fn test_empty_data_handling() {
696 let encoder = RleMiniBlockEncoder::new();
697
698 let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
700 bits_per_value: 32,
701 data: LanceBuffer::Owned(vec![]),
702 num_values: 0,
703 block_info: BlockInfo::default(),
704 });
705
706 let (compressed, _) = encoder.compress(empty_block).unwrap();
707 assert_eq!(compressed.num_values, 0);
708 assert!(compressed.data.is_empty());
709
710 let decompressor = RleMiniBlockDecompressor::new(32);
712 let decompressed = decompressor.decompress(vec![], 0).unwrap();
713
714 match decompressed {
715 DataBlock::FixedWidth(ref block) => {
716 assert_eq!(block.num_values, 0);
717 assert_eq!(block.data.len(), 0);
718 }
719 _ => panic!("Expected FixedWidth block"),
720 }
721 }
722
723 #[test]
726 fn test_multi_chunk_round_trip() {
727 let encoder = RleMiniBlockEncoder::new();
728
729 let mut data = Vec::new();
731
732 data.extend(vec![999i32; 2000]);
734 data.extend(0..1000);
736 data.extend(vec![777i32; 2000]);
738
739 let array = Int32Array::from(data.clone());
740 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
741
742 let mut reconstructed = Vec::new();
744 let mut values_offset = 0usize;
745 let mut lengths_offset = 0usize;
746 let mut values_processed = 0u64;
747
748 assert_eq!(compressed.data.len(), 2);
750 let global_values = &compressed.data[0];
751 let global_lengths = &compressed.data[1];
752
753 for chunk in &compressed.chunks {
754 let chunk_values = if chunk.log_num_values > 0 {
755 1u64 << chunk.log_num_values
756 } else {
757 compressed.num_values - values_processed
758 };
759
760 let values_size = chunk.buffer_sizes[0] as usize;
762 let lengths_size = chunk.buffer_sizes[1] as usize;
763
764 let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
765 let chunk_lengths_buffer =
766 global_lengths.slice_with_length(lengths_offset, lengths_size);
767
768 let decompressor = RleMiniBlockDecompressor::new(32);
769 let chunk_data = decompressor
770 .decompress(
771 vec![chunk_values_buffer, chunk_lengths_buffer],
772 chunk_values,
773 )
774 .unwrap();
775
776 values_offset += values_size;
777 lengths_offset += lengths_size;
778 values_processed += chunk_values;
779
780 match chunk_data {
781 DataBlock::FixedWidth(ref block) => {
782 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
783 reconstructed.extend_from_slice(values);
784 }
785 _ => panic!("Expected FixedWidth block"),
786 }
787 }
788
789 assert_eq!(reconstructed, data);
790 }
791
792 #[test]
793 fn test_exact_1024_values_bug() {
794 let encoder = RleMiniBlockEncoder::new();
795
796 let test_patterns = [
798 {
800 let mut data = Vec::new();
801 for i in 0..512 {
802 data.push(i);
803 data.push(i);
804 }
805 data
806 },
807 vec![42i32; 1024],
809 {
811 let mut data = Vec::new();
812 for i in 0..1024 {
813 data.push(i % 2);
814 }
815 data
816 },
817 {
819 let mut data = Vec::new();
820 data.extend(vec![1i32; 255]);
822 data.extend(vec![2i32; 255]);
823 data.extend(vec![3i32; 255]);
824 data.extend(vec![4i32; 255]);
825 data.extend(vec![5i32; 4]);
826 data
827 },
828 { (0..1024).collect::<Vec<_>>() },
830 ];
831
832 for (idx, data) in test_patterns.iter().enumerate() {
833 assert_eq!(data.len(), 1024);
834
835 let array = Int32Array::from(data.clone());
836 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
837
838 let decompressor = RleMiniBlockDecompressor::new(32);
840 match decompressor.decompress(compressed.data, compressed.num_values) {
841 Ok(decompressed) => match decompressed {
842 DataBlock::FixedWidth(ref block) => {
843 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
844 assert_eq!(
845 values.len(),
846 1024,
847 "Pattern {} failed: got {} values",
848 idx,
849 values.len()
850 );
851 assert_eq!(values, &data[..], "Pattern {} data mismatch", idx);
852 }
853 _ => panic!("Expected FixedWidth block"),
854 },
855 Err(e) => {
856 panic!("Pattern {} failed with error: {}", idx, e);
857 }
858 }
859 }
860 }
861
862 #[test]
863 fn test_unique_values_at_boundary() {
864 let encoder = RleMiniBlockEncoder::new();
865
866 let mut data = Vec::new();
869 for i in 0..1023 {
870 data.push(i);
871 }
872 data.push(1022i32); assert_eq!(data.len(), 1024);
874
875 let array = Int32Array::from(data.clone());
876 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
877
878 let decompressor = RleMiniBlockDecompressor::new(32);
880 match decompressor.decompress(compressed.data, compressed.num_values) {
881 Ok(decompressed) => match decompressed {
882 DataBlock::FixedWidth(ref block) => {
883 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
884 assert_eq!(
885 values.len(),
886 1024,
887 "Got {} values, expected 1024",
888 values.len()
889 );
890 assert_eq!(values, &data[..]);
891 }
892 _ => panic!("Expected FixedWidth block"),
893 },
894 Err(e) => {
895 panic!("Decompression failed: {}", e);
896 }
897 }
898 }
899
900 #[test]
901 fn test_bug_4092_bytes() {
902 let encoder = RleMiniBlockEncoder::new();
904
905 let mut data = Vec::new();
907 for i in 0..1022 {
908 data.push(i);
909 }
910 data.push(999999i32);
911 data.push(999999i32);
912 assert_eq!(data.len(), 1024);
913
914 let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
915 assert_eq!(bytes.len(), 4096);
916
917 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
918 bits_per_value: 32,
919 data: LanceBuffer::Owned(bytes),
920 num_values: 1024,
921 block_info: BlockInfo::default(),
922 });
923
924 let (compressed, _) = encoder.compress(block).unwrap();
925
926 let decompressor = RleMiniBlockDecompressor::new(32);
928 match decompressor.decompress(compressed.data, compressed.num_values) {
929 Ok(decompressed) => match decompressed {
930 DataBlock::FixedWidth(ref block) => {
931 assert_eq!(
932 block.data.len(),
933 4096,
934 "Expected 4096 bytes but got {}",
935 block.data.len()
936 );
937 }
938 _ => panic!("Expected FixedWidth block"),
939 },
940 Err(e) => {
941 if e.to_string().contains("4092") {
942 panic!("Found the bug! {}", e);
943 }
944 }
945 }
946 }
947
948 #[test]
949 fn test_low_repetition_50pct_bug() {
950 let encoder = RleMiniBlockEncoder::new();
953
954 let num_values = 1_048_576; let mut data = Vec::with_capacity(num_values);
957 let mut value = 0i32;
958 let mut rng = 12345u64; for _ in 0..num_values {
961 data.push(value);
962 rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
964 if (rng >> 16) & 1 == 1 {
966 value += 1;
967 }
968 }
969
970 let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
971
972 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
973 bits_per_value: 32,
974 data: LanceBuffer::Owned(bytes),
975 num_values: num_values as u64,
976 block_info: BlockInfo::default(),
977 });
978
979 let (compressed, _) = encoder.compress(block).unwrap();
980
981 for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
983 let _chunk_values = if chunk.log_num_values > 0 {
984 1 << chunk.log_num_values
985 } else {
986 let prev_total: usize = compressed.chunks[..i]
988 .iter()
989 .map(|c| 1usize << c.log_num_values)
990 .sum();
991 num_values - prev_total
992 };
993 }
994
995 let decompressor = RleMiniBlockDecompressor::new(32);
997 match decompressor.decompress(compressed.data, compressed.num_values) {
998 Ok(decompressed) => match decompressed {
999 DataBlock::FixedWidth(ref block) => {
1000 assert_eq!(
1001 block.data.len(),
1002 num_values * 4,
1003 "Expected {} bytes but got {}",
1004 num_values * 4,
1005 block.data.len()
1006 );
1007 }
1008 _ => panic!("Expected FixedWidth block"),
1009 },
1010 Err(e) => {
1011 if e.to_string().contains("4092") {
1012 panic!("Bug reproduced! {}", e);
1013 } else {
1014 panic!("Unexpected error: {}", e);
1015 }
1016 }
1017 }
1018 }
1019}