1use log::trace;
52use snafu::location;
53
54use crate::buffer::LanceBuffer;
55use crate::compression::MiniBlockDecompressor;
56use crate::data::DataBlock;
57use crate::data::{BlockInfo, FixedWidthDataBlock};
58use crate::encodings::logical::primitive::miniblock::{
59 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
60 MAX_MINIBLOCK_VALUES,
61};
62use crate::format::{pb, ProtobufUtils};
63
64use lance_core::{Error, Result};
65
66#[derive(Debug, Default)]
68pub struct RleMiniBlockEncoder;
69
70impl RleMiniBlockEncoder {
71 pub fn new() -> Self {
72 Self
73 }
74
75 fn encode_data(
76 &self,
77 data: &LanceBuffer,
78 num_values: u64,
79 bits_per_value: u64,
80 ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
81 if num_values == 0 {
82 return Ok((Vec::new(), Vec::new()));
83 }
84
85 let bytes_per_value = (bits_per_value / 8) as usize;
86
87 let estimated_runs = num_values as usize / 10;
90 let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
91 let mut all_lengths = Vec::with_capacity(estimated_runs);
92 let mut chunks = Vec::new();
93
94 let mut offset = 0usize;
95 let mut values_remaining = num_values as usize;
96
97 while values_remaining > 0 {
98 let values_start = all_values.len();
99 let lengths_start = all_lengths.len();
100
101 let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
102 8 => self.encode_chunk_rolling::<u8>(
103 data,
104 offset,
105 values_remaining,
106 &mut all_values,
107 &mut all_lengths,
108 ),
109 16 => self.encode_chunk_rolling::<u16>(
110 data,
111 offset,
112 values_remaining,
113 &mut all_values,
114 &mut all_lengths,
115 ),
116 32 => self.encode_chunk_rolling::<u32>(
117 data,
118 offset,
119 values_remaining,
120 &mut all_values,
121 &mut all_lengths,
122 ),
123 64 => self.encode_chunk_rolling::<u64>(
124 data,
125 offset,
126 values_remaining,
127 &mut all_values,
128 &mut all_lengths,
129 ),
130 _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
131 };
132
133 if values_processed == 0 {
134 break;
135 }
136
137 let log_num_values = if is_last_chunk {
138 0
139 } else {
140 assert!(
141 values_processed.is_power_of_two(),
142 "Non-last chunk must have power-of-2 values"
143 );
144 values_processed.ilog2() as u8
145 };
146
147 let values_size = all_values.len() - values_start;
148 let lengths_size = all_lengths.len() - lengths_start;
149
150 let chunk = MiniBlockChunk {
151 buffer_sizes: vec![values_size as u16, lengths_size as u16],
152 log_num_values,
153 };
154
155 chunks.push(chunk);
156
157 offset += values_processed;
158 values_remaining -= values_processed;
159 }
160
161 Ok((
163 vec![
164 LanceBuffer::Owned(all_values),
165 LanceBuffer::Owned(all_lengths),
166 ],
167 chunks,
168 ))
169 }
170
171 fn encode_chunk_rolling<T>(
188 &self,
189 data: &LanceBuffer,
190 offset: usize,
191 values_remaining: usize,
192 all_values: &mut Vec<u8>,
193 all_lengths: &mut Vec<u8>,
194 ) -> (usize, usize, bool)
195 where
196 T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug,
197 {
198 let type_size = std::mem::size_of::<T>();
199 let data_slice = data.as_ref();
200
201 let chunk_start = offset * type_size;
202 let max_by_count = MAX_MINIBLOCK_VALUES as usize;
203 let max_values = values_remaining.min(max_by_count);
204 let chunk_end = chunk_start + max_values * type_size;
205
206 if chunk_start >= data_slice.len() {
207 return (0, 0, false);
208 }
209
210 let chunk_data = &data_slice[chunk_start..chunk_end.min(data_slice.len())];
211 let typed_data: &[T] = bytemuck::cast_slice(chunk_data);
212
213 if typed_data.is_empty() {
214 return (0, 0, false);
215 }
216
217 let values_start = all_values.len();
219
220 let mut current_value = typed_data[0];
221 let mut current_length = 1u64;
222 let mut bytes_used = 0usize;
223 let mut total_values_encoded = 0usize; let checkpoints = match type_size {
229 1 => vec![256, 512, 1024, 2048, 4096], 2 => vec![128, 256, 512, 1024, 2048, 4096], _ => vec![64, 128, 256, 512, 1024, 2048, 4096], };
233 let valid_checkpoints: Vec<usize> = checkpoints
234 .into_iter()
235 .filter(|&p| p <= values_remaining)
236 .collect();
237 let mut checkpoint_idx = 0;
238
239 let mut last_checkpoint_state = None;
241
242 for &value in typed_data[1..].iter() {
243 if value == current_value {
244 current_length += 1;
245 } else {
246 let run_chunks = current_length.div_ceil(255) as usize;
248 let bytes_needed = run_chunks * (type_size + 1);
249
250 if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
252 if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
253 all_values.truncate(val_pos);
255 all_lengths.truncate(len_pos);
256 let num_runs = (val_pos - values_start) / type_size;
257 return (num_runs, checkpoint_values, false);
258 }
259 break;
260 }
261
262 bytes_used += self.add_run(¤t_value, current_length, all_values, all_lengths);
263 total_values_encoded += current_length as usize;
264 current_value = value;
265 current_length = 1;
266 }
267
268 if checkpoint_idx < valid_checkpoints.len()
270 && total_values_encoded >= valid_checkpoints[checkpoint_idx]
271 {
272 last_checkpoint_state = Some((
273 all_values.len(),
274 all_lengths.len(),
275 bytes_used,
276 valid_checkpoints[checkpoint_idx],
277 ));
278 checkpoint_idx += 1;
279 }
280 }
281
282 if current_length > 0 {
285 let run_chunks = current_length.div_ceil(255) as usize;
286 let bytes_needed = run_chunks * (type_size + 1);
287
288 if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
289 let _ = self.add_run(¤t_value, current_length, all_values, all_lengths);
290 total_values_encoded += current_length as usize;
291 }
292 }
293
294 let is_last_chunk = total_values_encoded == values_remaining;
296
297 if !is_last_chunk {
299 if total_values_encoded.is_power_of_two() {
300 } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
302 all_values.truncate(val_pos);
304 all_lengths.truncate(len_pos);
305 let num_runs = (val_pos - values_start) / type_size;
306 return (num_runs, checkpoint_values, false);
307 } else {
308 return (0, 0, false);
310 }
311 }
312
313 let num_runs = (all_values.len() - values_start) / type_size;
314 (num_runs, total_values_encoded, is_last_chunk)
315 }
316
317 fn add_run<T>(
318 &self,
319 value: &T,
320 length: u64,
321 all_values: &mut Vec<u8>,
322 all_lengths: &mut Vec<u8>,
323 ) -> usize
324 where
325 T: bytemuck::Pod,
326 {
327 let value_bytes = bytemuck::bytes_of(value);
328 let type_size = std::mem::size_of::<T>();
329 let num_full_chunks = (length / 255) as usize;
330 let remainder = (length % 255) as u8;
331
332 let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
333 all_values.reserve(total_chunks * type_size);
334 all_lengths.reserve(total_chunks);
335
336 for _ in 0..num_full_chunks {
337 all_values.extend_from_slice(value_bytes);
338 all_lengths.push(255);
339 }
340
341 if remainder > 0 {
342 all_values.extend_from_slice(value_bytes);
343 all_lengths.push(remainder);
344 }
345
346 total_chunks * (type_size + 1)
347 }
348}
349
350impl MiniBlockCompressor for RleMiniBlockEncoder {
351 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
352 match data {
353 DataBlock::FixedWidth(fixed_width) => {
354 let num_values = fixed_width.num_values;
355 let bits_per_value = fixed_width.bits_per_value;
356
357 let (all_buffers, chunks) =
358 self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
359
360 let compressed = MiniBlockCompressed {
361 data: all_buffers,
362 chunks,
363 num_values,
364 };
365
366 let encoding = ProtobufUtils::rle(bits_per_value);
367
368 Ok((compressed, encoding))
369 }
370 _ => Err(Error::InvalidInput {
371 location: location!(),
372 source: "RLE encoding only supports FixedWidth data blocks".into(),
373 }),
374 }
375 }
376}
377
378#[derive(Debug)]
380pub struct RleMiniBlockDecompressor {
381 bits_per_value: u64,
382}
383
384impl RleMiniBlockDecompressor {
385 pub fn new(bits_per_value: u64) -> Self {
386 Self { bits_per_value }
387 }
388
389 fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
390 if num_values == 0 {
391 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
392 bits_per_value: self.bits_per_value,
393 data: LanceBuffer::Owned(vec![]),
394 num_values: 0,
395 block_info: BlockInfo::default(),
396 }));
397 }
398
399 assert_eq!(
400 data.len(),
401 2,
402 "RLE decompressor expects exactly 2 buffers, got {}",
403 data.len()
404 );
405
406 let values_buffer = &data[0];
407 let lengths_buffer = &data[1];
408
409 let decoded_data = match self.bits_per_value {
410 8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
411 16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
412 32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
413 64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
414 _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
415 };
416
417 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
418 bits_per_value: self.bits_per_value,
419 data: LanceBuffer::Owned(decoded_data),
420 num_values,
421 block_info: BlockInfo::default(),
422 }))
423 }
424
425 fn decode_generic<T>(
426 &self,
427 values_buffer: &LanceBuffer,
428 lengths_buffer: &LanceBuffer,
429 num_values: u64,
430 ) -> Result<Vec<u8>>
431 where
432 T: bytemuck::Pod + Copy + std::fmt::Debug,
433 {
434 let values_bytes = values_buffer.as_ref();
435 let lengths_bytes = lengths_buffer.as_ref();
436
437 let type_size = std::mem::size_of::<T>();
438
439 if values_bytes.is_empty() || lengths_bytes.is_empty() {
440 if num_values == 0 {
441 return Ok(Vec::new());
442 } else {
443 return Err(Error::InvalidInput {
444 location: location!(),
445 source: format!("Empty buffers but expected {} values", num_values).into(),
446 });
447 }
448 }
449
450 if values_bytes.len() % type_size != 0 || lengths_bytes.is_empty() {
451 return Err(Error::InvalidInput {
452 location: location!(),
453 source: format!(
454 "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
455 std::any::type_name::<T>(),
456 values_bytes.len(),
457 type_size,
458 lengths_bytes.len()
459 )
460 .into(),
461 });
462 }
463
464 let num_runs = values_bytes.len() / type_size;
465 let num_length_entries = lengths_bytes.len();
466 assert_eq!(
467 num_runs, num_length_entries,
468 "Inconsistent RLE buffers: {} runs but {} length entries",
469 num_runs, num_length_entries
470 );
471
472 let values: &[T] = bytemuck::cast_slice(values_bytes);
473 let lengths: &[u8] = lengths_bytes;
474
475 let expected_byte_count = num_values as usize * type_size;
476 let mut decoded = Vec::with_capacity(expected_byte_count);
477
478 for (value, &length) in values.iter().zip(lengths.iter()) {
479 let run_length = length as usize;
480 let bytes_to_write = run_length * type_size;
481 let bytes_of_value = bytemuck::bytes_of(value);
482
483 if decoded.len() + bytes_to_write > expected_byte_count {
484 let remaining_bytes = expected_byte_count - decoded.len();
485 let remaining_values = remaining_bytes / type_size;
486
487 for _ in 0..remaining_values {
488 decoded.extend_from_slice(bytes_of_value);
489 }
490 break;
491 }
492
493 for _ in 0..run_length {
494 decoded.extend_from_slice(bytes_of_value);
495 }
496 }
497
498 if decoded.len() != expected_byte_count {
499 return Err(Error::InvalidInput {
500 location: location!(),
501 source: format!(
502 "RLE decoding produced {} bytes, expected {}",
503 decoded.len(),
504 expected_byte_count
505 )
506 .into(),
507 });
508 }
509
510 trace!(
511 "RLE decoded {} {} values",
512 num_values,
513 std::any::type_name::<T>()
514 );
515 Ok(decoded)
516 }
517}
518
519impl MiniBlockDecompressor for RleMiniBlockDecompressor {
520 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
521 self.decode_data(data, num_values)
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
529 use crate::data::DataBlock;
530 use arrow_array::Int32Array;
531 use lance_core::datatypes::Field;
532
533 #[test]
536 fn test_basic_rle_encoding() {
537 let encoder = RleMiniBlockEncoder::new();
538
539 let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
541 let data_block = DataBlock::from_array(array);
542
543 let (compressed, _) = encoder.compress(data_block).unwrap();
544
545 assert_eq!(compressed.num_values, 9);
546 assert_eq!(compressed.chunks.len(), 1);
547
548 let values_buffer = &compressed.data[0];
550 let lengths_buffer = &compressed.data[1];
551 assert_eq!(values_buffer.len(), 12); assert_eq!(lengths_buffer.len(), 3); }
554
555 #[test]
556 fn test_long_run_splitting() {
557 let encoder = RleMiniBlockEncoder::new();
558
559 let mut data = vec![42i32; 1000]; data.extend(&[100i32; 300]); let array = Int32Array::from(data);
564 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
565
566 let lengths_buffer = &compressed.data[1];
568 assert_eq!(lengths_buffer.len(), 6);
569 }
570
571 #[test]
572 fn test_compression_strategy_selection() {
573 let strategy = DefaultCompressionStrategy::new();
574 let field = Field::new_arrow("test", arrow_schema::DataType::Int32, false).unwrap();
575
576 let repetitive_array = Int32Array::from(vec![1; 1000]);
578 let repetitive_block = DataBlock::from_array(repetitive_array);
579
580 let compressor = strategy
581 .create_miniblock_compressor(&field, &repetitive_block)
582 .unwrap();
583 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
584
585 let unique_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
587 let unique_block = DataBlock::from_array(unique_array);
588
589 let compressor = strategy
590 .create_miniblock_compressor(&field, &unique_block)
591 .unwrap();
592 assert!(!format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
593 }
594
595 #[test]
598 fn test_round_trip_all_types() {
599 test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
601
602 test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
604
605 test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
607
608 test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
610 }
611
612 fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
613 where
614 T: bytemuck::Pod + PartialEq + std::fmt::Debug,
615 {
616 let encoder = RleMiniBlockEncoder::new();
617 let bytes: Vec<u8> = data
618 .iter()
619 .flat_map(|v| bytemuck::bytes_of(v))
620 .copied()
621 .collect();
622
623 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
624 bits_per_value,
625 data: LanceBuffer::Owned(bytes),
626 num_values: data.len() as u64,
627 block_info: BlockInfo::default(),
628 });
629
630 let (compressed, _) = encoder.compress(block).unwrap();
631 let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
632 let decompressed = decompressor
633 .decompress(compressed.data, compressed.num_values)
634 .unwrap();
635
636 match decompressed {
637 DataBlock::FixedWidth(ref block) => {
638 assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
640 }
641 _ => panic!("Expected FixedWidth block"),
642 }
643 }
644
645 #[test]
648 fn test_power_of_two_chunking() {
649 let encoder = RleMiniBlockEncoder::new();
650
651 let test_sizes = vec![1000, 2500, 5000, 10000];
653
654 for size in test_sizes {
655 let data: Vec<i32> = (0..size)
656 .map(|i| i / 50) .collect();
658
659 let array = Int32Array::from(data);
660 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
661
662 for (i, chunk) in compressed.chunks.iter().enumerate() {
664 if i < compressed.chunks.len() - 1 {
665 assert!(chunk.log_num_values > 0);
666 let chunk_values = 1u64 << chunk.log_num_values;
667 assert!(chunk_values.is_power_of_two());
668 assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
669 } else {
670 assert_eq!(chunk.log_num_values, 0);
671 }
672 }
673 }
674 }
675
676 #[test]
679 #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
680 fn test_invalid_buffer_count() {
681 let decompressor = RleMiniBlockDecompressor::new(32);
682 let _ = decompressor.decompress(vec![LanceBuffer::Owned(vec![1, 2, 3, 4])], 10);
683 }
684
685 #[test]
686 #[should_panic(expected = "Inconsistent RLE buffers")]
687 fn test_buffer_consistency() {
688 let decompressor = RleMiniBlockDecompressor::new(32);
689 let values = LanceBuffer::Owned(vec![1, 0, 0, 0]); let lengths = LanceBuffer::Owned(vec![5, 10]); let _ = decompressor.decompress(vec![values, lengths], 15);
692 }
693
694 #[test]
695 fn test_empty_data_handling() {
696 let encoder = RleMiniBlockEncoder::new();
697
698 let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
700 bits_per_value: 32,
701 data: LanceBuffer::Owned(vec![]),
702 num_values: 0,
703 block_info: BlockInfo::default(),
704 });
705
706 let (compressed, _) = encoder.compress(empty_block).unwrap();
707 assert_eq!(compressed.num_values, 0);
708 assert!(compressed.data.is_empty());
709
710 let decompressor = RleMiniBlockDecompressor::new(32);
712 let decompressed = decompressor.decompress(vec![], 0).unwrap();
713
714 match decompressed {
715 DataBlock::FixedWidth(ref block) => {
716 assert_eq!(block.num_values, 0);
717 assert_eq!(block.data.len(), 0);
718 }
719 _ => panic!("Expected FixedWidth block"),
720 }
721 }
722
723 #[test]
726 fn test_multi_chunk_round_trip() {
727 let encoder = RleMiniBlockEncoder::new();
728
729 let mut data = Vec::new();
731
732 data.extend(vec![999i32; 2000]);
734 data.extend(0..1000);
736 data.extend(vec![777i32; 2000]);
738
739 let array = Int32Array::from(data.clone());
740 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
741
742 let mut reconstructed = Vec::new();
744 let mut values_offset = 0usize;
745 let mut lengths_offset = 0usize;
746 let mut values_processed = 0u64;
747
748 assert_eq!(compressed.data.len(), 2);
750 let global_values = &compressed.data[0];
751 let global_lengths = &compressed.data[1];
752
753 for chunk in &compressed.chunks {
754 let chunk_values = if chunk.log_num_values > 0 {
755 1u64 << chunk.log_num_values
756 } else {
757 compressed.num_values - values_processed
758 };
759
760 let values_size = chunk.buffer_sizes[0] as usize;
762 let lengths_size = chunk.buffer_sizes[1] as usize;
763
764 let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
765 let chunk_lengths_buffer =
766 global_lengths.slice_with_length(lengths_offset, lengths_size);
767
768 let decompressor = RleMiniBlockDecompressor::new(32);
769 let chunk_data = decompressor
770 .decompress(
771 vec![chunk_values_buffer, chunk_lengths_buffer],
772 chunk_values,
773 )
774 .unwrap();
775
776 values_offset += values_size;
777 lengths_offset += lengths_size;
778 values_processed += chunk_values;
779
780 match chunk_data {
781 DataBlock::FixedWidth(ref block) => {
782 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
783 reconstructed.extend_from_slice(values);
784 }
785 _ => panic!("Expected FixedWidth block"),
786 }
787 }
788
789 assert_eq!(reconstructed, data);
790 }
791
792 #[test]
793 fn test_1024_boundary_conditions() {
794 let encoder = RleMiniBlockEncoder::new();
797 let decompressor = RleMiniBlockDecompressor::new(32);
798
799 let test_cases = [
800 ("runs_of_2", {
801 let mut data = Vec::new();
802 for i in 0..512 {
803 data.push(i);
804 data.push(i);
805 }
806 data
807 }),
808 ("single_run_1024", vec![42i32; 1024]),
809 ("alternating_values", {
810 let mut data = Vec::new();
811 for i in 0..1024 {
812 data.push(i % 2);
813 }
814 data
815 }),
816 ("run_boundary_255s", {
817 let mut data = Vec::new();
818 data.extend(vec![1i32; 255]);
819 data.extend(vec![2i32; 255]);
820 data.extend(vec![3i32; 255]);
821 data.extend(vec![4i32; 255]);
822 data.extend(vec![5i32; 4]);
823 data
824 }),
825 ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
826 ("unique_plus_duplicate", {
827 let mut data = Vec::new();
829 for i in 0..1023 {
830 data.push(i);
831 }
832 data.push(1022i32); data
834 }),
835 ("bug_4092_pattern", {
836 let mut data = Vec::new();
838 for i in 0..1022 {
839 data.push(i);
840 }
841 data.push(999999i32);
842 data.push(999999i32);
843 data
844 }),
845 ];
846
847 for (test_name, data) in test_cases.iter() {
848 assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
849
850 let array = Int32Array::from(data.clone());
852 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
853
854 match decompressor.decompress(compressed.data, compressed.num_values) {
856 Ok(decompressed) => match decompressed {
857 DataBlock::FixedWidth(ref block) => {
858 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
859 assert_eq!(
860 values.len(),
861 1024,
862 "Test case {} got {} values, expected 1024",
863 test_name,
864 values.len()
865 );
866 assert_eq!(
867 block.data.len(),
868 4096,
869 "Test case {} got {} bytes, expected 4096",
870 test_name,
871 block.data.len()
872 );
873 assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
874 }
875 _ => panic!("Test case {} expected FixedWidth block", test_name),
876 },
877 Err(e) => {
878 if e.to_string().contains("4092") {
879 panic!("Test case {} found bug 4092: {}", test_name, e);
880 }
881 panic!("Test case {} failed with error: {}", test_name, e);
882 }
883 }
884 }
885 }
886
887 #[test]
888 fn test_low_repetition_50pct_bug() {
889 let encoder = RleMiniBlockEncoder::new();
892
893 let num_values = 1_048_576; let mut data = Vec::with_capacity(num_values);
896 let mut value = 0i32;
897 let mut rng = 12345u64; for _ in 0..num_values {
900 data.push(value);
901 rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
903 if (rng >> 16) & 1 == 1 {
905 value += 1;
906 }
907 }
908
909 let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
910
911 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
912 bits_per_value: 32,
913 data: LanceBuffer::Owned(bytes),
914 num_values: num_values as u64,
915 block_info: BlockInfo::default(),
916 });
917
918 let (compressed, _) = encoder.compress(block).unwrap();
919
920 for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
922 let _chunk_values = if chunk.log_num_values > 0 {
923 1 << chunk.log_num_values
924 } else {
925 let prev_total: usize = compressed.chunks[..i]
927 .iter()
928 .map(|c| 1usize << c.log_num_values)
929 .sum();
930 num_values - prev_total
931 };
932 }
933
934 let decompressor = RleMiniBlockDecompressor::new(32);
936 match decompressor.decompress(compressed.data, compressed.num_values) {
937 Ok(decompressed) => match decompressed {
938 DataBlock::FixedWidth(ref block) => {
939 assert_eq!(
940 block.data.len(),
941 num_values * 4,
942 "Expected {} bytes but got {}",
943 num_values * 4,
944 block.data.len()
945 );
946 }
947 _ => panic!("Expected FixedWidth block"),
948 },
949 Err(e) => {
950 if e.to_string().contains("4092") {
951 panic!("Bug reproduced! {}", e);
952 } else {
953 panic!("Unexpected error: {}", e);
954 }
955 }
956 }
957 }
958
959 #[test_log::test(tokio::test)]
962 async fn test_rle_encoding_verification() {
963 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
964 use crate::version::LanceFileVersion;
965 use arrow_array::{Array, Int32Array};
966 use lance_datagen::{ArrayGenerator, RowCount};
967 use std::collections::HashMap;
968 use std::sync::Arc;
969
970 let test_cases = TestCases::default()
971 .with_expected_encoding("rle")
972 .with_file_version(LanceFileVersion::V2_1);
973
974 let metadata_explicit = HashMap::from([(
977 "lance-encoding:rle-threshold".to_string(),
978 "0.8".to_string(),
979 )]);
980 let mut generator = RleDataGenerator::new(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]);
981 let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
982 check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
983 .await;
984
985 let mut values = vec![42i32; 8000]; values.extend([1i32, 2i32, 3i32, 4i32, 5i32].repeat(400)); let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
990 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
991 }
992
993 #[derive(Debug)]
995 struct RleDataGenerator {
996 pattern: Vec<i32>,
997 idx: usize,
998 }
999
1000 impl RleDataGenerator {
1001 fn new(pattern: Vec<i32>) -> Self {
1002 Self { pattern, idx: 0 }
1003 }
1004 }
1005
1006 impl lance_datagen::ArrayGenerator for RleDataGenerator {
1007 fn generate(
1008 &mut self,
1009 _length: lance_datagen::RowCount,
1010 _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1011 ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1012 {
1013 use arrow_array::Int32Array;
1014 use std::sync::Arc;
1015
1016 let mut values = Vec::new();
1018 for _ in 0..10000 {
1019 values.push(self.pattern[self.idx]);
1020 self.idx = (self.idx + 1) % self.pattern.len();
1021 }
1022 Ok(Arc::new(Int32Array::from(values)))
1023 }
1024
1025 fn data_type(&self) -> &arrow_schema::DataType {
1026 &arrow_schema::DataType::Int32
1027 }
1028
1029 fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1030 Some(lance_datagen::ByteCount::from(4))
1031 }
1032 }
1033}