1use arrow_buffer::ArrowNativeType;
52use log::trace;
53use snafu::location;
54
55use crate::buffer::LanceBuffer;
56use crate::compression::MiniBlockDecompressor;
57use crate::data::DataBlock;
58use crate::data::{BlockInfo, FixedWidthDataBlock};
59use crate::encodings::logical::primitive::miniblock::{
60 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
61};
62use crate::format::pb21::CompressiveEncoding;
63use crate::format::ProtobufUtils21;
64
65use lance_core::{Error, Result};
66
67#[derive(Debug, Default)]
69pub struct RleMiniBlockEncoder;
70
71impl RleMiniBlockEncoder {
72 pub fn new() -> Self {
73 Self
74 }
75
76 fn encode_data(
77 &self,
78 data: &LanceBuffer,
79 num_values: u64,
80 bits_per_value: u64,
81 ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
82 if num_values == 0 {
83 return Ok((Vec::new(), Vec::new()));
84 }
85
86 let bytes_per_value = (bits_per_value / 8) as usize;
87
88 let estimated_runs = num_values as usize / 10;
91 let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
92 let mut all_lengths = Vec::with_capacity(estimated_runs);
93 let mut chunks = Vec::new();
94
95 let mut offset = 0usize;
96 let mut values_remaining = num_values as usize;
97
98 while values_remaining > 0 {
99 let values_start = all_values.len();
100 let lengths_start = all_lengths.len();
101
102 let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
103 8 => self.encode_chunk_rolling::<u8>(
104 data,
105 offset,
106 values_remaining,
107 &mut all_values,
108 &mut all_lengths,
109 ),
110 16 => self.encode_chunk_rolling::<u16>(
111 data,
112 offset,
113 values_remaining,
114 &mut all_values,
115 &mut all_lengths,
116 ),
117 32 => self.encode_chunk_rolling::<u32>(
118 data,
119 offset,
120 values_remaining,
121 &mut all_values,
122 &mut all_lengths,
123 ),
124 64 => self.encode_chunk_rolling::<u64>(
125 data,
126 offset,
127 values_remaining,
128 &mut all_values,
129 &mut all_lengths,
130 ),
131 _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
132 };
133
134 if values_processed == 0 {
135 break;
136 }
137
138 let log_num_values = if is_last_chunk {
139 0
140 } else {
141 assert!(
142 values_processed.is_power_of_two(),
143 "Non-last chunk must have power-of-2 values"
144 );
145 values_processed.ilog2() as u8
146 };
147
148 let values_size = all_values.len() - values_start;
149 let lengths_size = all_lengths.len() - lengths_start;
150
151 let chunk = MiniBlockChunk {
152 buffer_sizes: vec![values_size as u16, lengths_size as u16],
153 log_num_values,
154 };
155
156 chunks.push(chunk);
157
158 offset += values_processed;
159 values_remaining -= values_processed;
160 }
161
162 Ok((
164 vec![
165 LanceBuffer::from(all_values),
166 LanceBuffer::from(all_lengths),
167 ],
168 chunks,
169 ))
170 }
171
172 fn encode_chunk_rolling<T>(
189 &self,
190 data: &LanceBuffer,
191 offset: usize,
192 values_remaining: usize,
193 all_values: &mut Vec<u8>,
194 all_lengths: &mut Vec<u8>,
195 ) -> (usize, usize, bool)
196 where
197 T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType,
198 {
199 let type_size = std::mem::size_of::<T>();
200
201 let chunk_start = offset * type_size;
202 let max_by_count = 2048usize;
208 let max_values = values_remaining.min(max_by_count);
209 let chunk_end = chunk_start + max_values * type_size;
210
211 if chunk_start >= data.len() {
212 return (0, 0, false);
213 }
214
215 let chunk_len = chunk_end.min(data.len()) - chunk_start;
216 let chunk_buffer = data.slice_with_length(chunk_start, chunk_len);
217 let typed_data_ref = chunk_buffer.borrow_to_typed_slice::<T>();
218 let typed_data: &[T] = typed_data_ref.as_ref();
219
220 if typed_data.is_empty() {
221 return (0, 0, false);
222 }
223
224 let values_start = all_values.len();
226
227 let mut current_value = typed_data[0];
228 let mut current_length = 1u64;
229 let mut bytes_used = 0usize;
230 let mut total_values_encoded = 0usize; let checkpoints = match type_size {
236 1 => vec![256, 512, 1024, 2048, 4096], 2 => vec![128, 256, 512, 1024, 2048, 4096], _ => vec![64, 128, 256, 512, 1024, 2048, 4096], };
240 let valid_checkpoints: Vec<usize> = checkpoints
241 .into_iter()
242 .filter(|&p| p <= values_remaining)
243 .collect();
244 let mut checkpoint_idx = 0;
245
246 let mut last_checkpoint_state = None;
248
249 for &value in typed_data[1..].iter() {
250 if value == current_value {
251 current_length += 1;
252 } else {
253 let run_chunks = current_length.div_ceil(255) as usize;
255 let bytes_needed = run_chunks * (type_size + 1);
256
257 if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
259 if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
260 all_values.truncate(val_pos);
262 all_lengths.truncate(len_pos);
263 let num_runs = (val_pos - values_start) / type_size;
264 return (num_runs, checkpoint_values, false);
265 }
266 break;
267 }
268
269 bytes_used += self.add_run(¤t_value, current_length, all_values, all_lengths);
270 total_values_encoded += current_length as usize;
271 current_value = value;
272 current_length = 1;
273 }
274
275 if checkpoint_idx < valid_checkpoints.len()
277 && total_values_encoded >= valid_checkpoints[checkpoint_idx]
278 {
279 last_checkpoint_state = Some((
280 all_values.len(),
281 all_lengths.len(),
282 bytes_used,
283 valid_checkpoints[checkpoint_idx],
284 ));
285 checkpoint_idx += 1;
286 }
287 }
288
289 if current_length > 0 {
292 let run_chunks = current_length.div_ceil(255) as usize;
293 let bytes_needed = run_chunks * (type_size + 1);
294
295 if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
296 let _ = self.add_run(¤t_value, current_length, all_values, all_lengths);
297 total_values_encoded += current_length as usize;
298 }
299 }
300
301 let is_last_chunk = total_values_encoded == values_remaining;
303
304 if !is_last_chunk {
306 if total_values_encoded.is_power_of_two() {
307 } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
309 all_values.truncate(val_pos);
311 all_lengths.truncate(len_pos);
312 let num_runs = (val_pos - values_start) / type_size;
313 return (num_runs, checkpoint_values, false);
314 } else {
315 return (0, 0, false);
317 }
318 }
319
320 let num_runs = (all_values.len() - values_start) / type_size;
321 (num_runs, total_values_encoded, is_last_chunk)
322 }
323
324 fn add_run<T>(
325 &self,
326 value: &T,
327 length: u64,
328 all_values: &mut Vec<u8>,
329 all_lengths: &mut Vec<u8>,
330 ) -> usize
331 where
332 T: bytemuck::Pod,
333 {
334 let value_bytes = bytemuck::bytes_of(value);
335 let type_size = std::mem::size_of::<T>();
336 let num_full_chunks = (length / 255) as usize;
337 let remainder = (length % 255) as u8;
338
339 let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
340 all_values.reserve(total_chunks * type_size);
341 all_lengths.reserve(total_chunks);
342
343 for _ in 0..num_full_chunks {
344 all_values.extend_from_slice(value_bytes);
345 all_lengths.push(255);
346 }
347
348 if remainder > 0 {
349 all_values.extend_from_slice(value_bytes);
350 all_lengths.push(remainder);
351 }
352
353 total_chunks * (type_size + 1)
354 }
355}
356
357impl MiniBlockCompressor for RleMiniBlockEncoder {
358 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
359 match data {
360 DataBlock::FixedWidth(fixed_width) => {
361 let num_values = fixed_width.num_values;
362 let bits_per_value = fixed_width.bits_per_value;
363
364 let (all_buffers, chunks) =
365 self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
366
367 let compressed = MiniBlockCompressed {
368 data: all_buffers,
369 chunks,
370 num_values,
371 };
372
373 let encoding = ProtobufUtils21::rle(
374 ProtobufUtils21::flat(bits_per_value, None),
375 ProtobufUtils21::flat(8, None),
376 );
377
378 Ok((compressed, encoding))
379 }
380 _ => Err(Error::InvalidInput {
381 location: location!(),
382 source: "RLE encoding only supports FixedWidth data blocks".into(),
383 }),
384 }
385 }
386}
387
388#[derive(Debug)]
390pub struct RleMiniBlockDecompressor {
391 bits_per_value: u64,
392}
393
394impl RleMiniBlockDecompressor {
395 pub fn new(bits_per_value: u64) -> Self {
396 Self { bits_per_value }
397 }
398
399 fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
400 if num_values == 0 {
401 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
402 bits_per_value: self.bits_per_value,
403 data: LanceBuffer::from(vec![]),
404 num_values: 0,
405 block_info: BlockInfo::default(),
406 }));
407 }
408
409 assert_eq!(
410 data.len(),
411 2,
412 "RLE decompressor expects exactly 2 buffers, got {}",
413 data.len()
414 );
415
416 let values_buffer = &data[0];
417 let lengths_buffer = &data[1];
418
419 let decoded_data = match self.bits_per_value {
420 8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
421 16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
422 32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
423 64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
424 _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
425 };
426
427 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
428 bits_per_value: self.bits_per_value,
429 data: LanceBuffer::from(decoded_data),
430 num_values,
431 block_info: BlockInfo::default(),
432 }))
433 }
434
435 fn decode_generic<T>(
436 &self,
437 values_buffer: &LanceBuffer,
438 lengths_buffer: &LanceBuffer,
439 num_values: u64,
440 ) -> Result<Vec<u8>>
441 where
442 T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
443 {
444 let type_size = std::mem::size_of::<T>();
445
446 if values_buffer.is_empty() || lengths_buffer.is_empty() {
447 if num_values == 0 {
448 return Ok(Vec::new());
449 } else {
450 return Err(Error::InvalidInput {
451 location: location!(),
452 source: format!("Empty buffers but expected {} values", num_values).into(),
453 });
454 }
455 }
456
457 if values_buffer.len() % type_size != 0 || lengths_buffer.is_empty() {
458 return Err(Error::InvalidInput {
459 location: location!(),
460 source: format!(
461 "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
462 std::any::type_name::<T>(),
463 values_buffer.len(),
464 type_size,
465 lengths_buffer.len()
466 )
467 .into(),
468 });
469 }
470
471 let num_runs = values_buffer.len() / type_size;
472 let num_length_entries = lengths_buffer.len();
473 assert_eq!(
474 num_runs, num_length_entries,
475 "Inconsistent RLE buffers: {} runs but {} length entries",
476 num_runs, num_length_entries
477 );
478
479 let values_ref = values_buffer.borrow_to_typed_slice::<T>();
480 let values: &[T] = values_ref.as_ref();
481 let lengths: &[u8] = lengths_buffer.as_ref();
482
483 let expected_byte_count = num_values as usize * type_size;
484 let mut decoded = Vec::with_capacity(expected_byte_count);
485
486 for (value, &length) in values.iter().zip(lengths.iter()) {
487 let run_length = length as usize;
488 let bytes_to_write = run_length * type_size;
489 let bytes_of_value = bytemuck::bytes_of(value);
490
491 if decoded.len() + bytes_to_write > expected_byte_count {
492 let remaining_bytes = expected_byte_count - decoded.len();
493 let remaining_values = remaining_bytes / type_size;
494
495 for _ in 0..remaining_values {
496 decoded.extend_from_slice(bytes_of_value);
497 }
498 break;
499 }
500
501 for _ in 0..run_length {
502 decoded.extend_from_slice(bytes_of_value);
503 }
504 }
505
506 if decoded.len() != expected_byte_count {
507 return Err(Error::InvalidInput {
508 location: location!(),
509 source: format!(
510 "RLE decoding produced {} bytes, expected {}",
511 decoded.len(),
512 expected_byte_count
513 )
514 .into(),
515 });
516 }
517
518 trace!(
519 "RLE decoded {} {} values",
520 num_values,
521 std::any::type_name::<T>()
522 );
523 Ok(decoded)
524 }
525}
526
527impl MiniBlockDecompressor for RleMiniBlockDecompressor {
528 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
529 self.decode_data(data, num_values)
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use crate::data::DataBlock;
537 use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES;
538 use arrow_array::Int32Array;
539
540 #[test]
543 fn test_basic_rle_encoding() {
544 let encoder = RleMiniBlockEncoder::new();
545
546 let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
548 let data_block = DataBlock::from_array(array);
549
550 let (compressed, _) = encoder.compress(data_block).unwrap();
551
552 assert_eq!(compressed.num_values, 9);
553 assert_eq!(compressed.chunks.len(), 1);
554
555 let values_buffer = &compressed.data[0];
557 let lengths_buffer = &compressed.data[1];
558 assert_eq!(values_buffer.len(), 12); assert_eq!(lengths_buffer.len(), 3); }
561
562 #[test]
563 fn test_long_run_splitting() {
564 let encoder = RleMiniBlockEncoder::new();
565
566 let mut data = vec![42i32; 1000]; data.extend(&[100i32; 300]); let array = Int32Array::from(data);
571 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
572
573 let lengths_buffer = &compressed.data[1];
575 assert_eq!(lengths_buffer.len(), 6);
576 }
577
578 #[test]
581 fn test_round_trip_all_types() {
582 test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
584
585 test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
587
588 test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
590
591 test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
593 }
594
595 fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
596 where
597 T: bytemuck::Pod + PartialEq + std::fmt::Debug,
598 {
599 let encoder = RleMiniBlockEncoder::new();
600 let bytes: Vec<u8> = data
601 .iter()
602 .flat_map(|v| bytemuck::bytes_of(v))
603 .copied()
604 .collect();
605
606 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
607 bits_per_value,
608 data: LanceBuffer::from(bytes),
609 num_values: data.len() as u64,
610 block_info: BlockInfo::default(),
611 });
612
613 let (compressed, _) = encoder.compress(block).unwrap();
614 let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
615 let decompressed = decompressor
616 .decompress(compressed.data, compressed.num_values)
617 .unwrap();
618
619 match decompressed {
620 DataBlock::FixedWidth(ref block) => {
621 assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
623 }
624 _ => panic!("Expected FixedWidth block"),
625 }
626 }
627
628 #[test]
631 fn test_power_of_two_chunking() {
632 let encoder = RleMiniBlockEncoder::new();
633
634 let test_sizes = vec![1000, 2500, 5000, 10000];
636
637 for size in test_sizes {
638 let data: Vec<i32> = (0..size)
639 .map(|i| i / 50) .collect();
641
642 let array = Int32Array::from(data);
643 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
644
645 for (i, chunk) in compressed.chunks.iter().enumerate() {
647 if i < compressed.chunks.len() - 1 {
648 assert!(chunk.log_num_values > 0);
649 let chunk_values = 1u64 << chunk.log_num_values;
650 assert!(chunk_values.is_power_of_two());
651 assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
652 } else {
653 assert_eq!(chunk.log_num_values, 0);
654 }
655 }
656 }
657 }
658
659 #[test]
662 #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
663 fn test_invalid_buffer_count() {
664 let decompressor = RleMiniBlockDecompressor::new(32);
665 let _ = decompressor.decompress(vec![LanceBuffer::from(vec![1, 2, 3, 4])], 10);
666 }
667
668 #[test]
669 #[should_panic(expected = "Inconsistent RLE buffers")]
670 fn test_buffer_consistency() {
671 let decompressor = RleMiniBlockDecompressor::new(32);
672 let values = LanceBuffer::from(vec![1, 0, 0, 0]); let lengths = LanceBuffer::from(vec![5, 10]); let _ = decompressor.decompress(vec![values, lengths], 15);
675 }
676
677 #[test]
678 fn test_empty_data_handling() {
679 let encoder = RleMiniBlockEncoder::new();
680
681 let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
683 bits_per_value: 32,
684 data: LanceBuffer::from(vec![]),
685 num_values: 0,
686 block_info: BlockInfo::default(),
687 });
688
689 let (compressed, _) = encoder.compress(empty_block).unwrap();
690 assert_eq!(compressed.num_values, 0);
691 assert!(compressed.data.is_empty());
692
693 let decompressor = RleMiniBlockDecompressor::new(32);
695 let decompressed = decompressor.decompress(vec![], 0).unwrap();
696
697 match decompressed {
698 DataBlock::FixedWidth(ref block) => {
699 assert_eq!(block.num_values, 0);
700 assert_eq!(block.data.len(), 0);
701 }
702 _ => panic!("Expected FixedWidth block"),
703 }
704 }
705
706 #[test]
709 fn test_multi_chunk_round_trip() {
710 let encoder = RleMiniBlockEncoder::new();
711
712 let mut data = Vec::new();
714
715 data.extend(vec![999i32; 2000]);
717 data.extend(0..1000);
719 data.extend(vec![777i32; 2000]);
721
722 let array = Int32Array::from(data.clone());
723 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
724
725 let mut reconstructed = Vec::new();
727 let mut values_offset = 0usize;
728 let mut lengths_offset = 0usize;
729 let mut values_processed = 0u64;
730
731 assert_eq!(compressed.data.len(), 2);
733 let global_values = &compressed.data[0];
734 let global_lengths = &compressed.data[1];
735
736 for chunk in &compressed.chunks {
737 let chunk_values = if chunk.log_num_values > 0 {
738 1u64 << chunk.log_num_values
739 } else {
740 compressed.num_values - values_processed
741 };
742
743 let values_size = chunk.buffer_sizes[0] as usize;
745 let lengths_size = chunk.buffer_sizes[1] as usize;
746
747 let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
748 let chunk_lengths_buffer =
749 global_lengths.slice_with_length(lengths_offset, lengths_size);
750
751 let decompressor = RleMiniBlockDecompressor::new(32);
752 let chunk_data = decompressor
753 .decompress(
754 vec![chunk_values_buffer, chunk_lengths_buffer],
755 chunk_values,
756 )
757 .unwrap();
758
759 values_offset += values_size;
760 lengths_offset += lengths_size;
761 values_processed += chunk_values;
762
763 match chunk_data {
764 DataBlock::FixedWidth(ref block) => {
765 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
766 reconstructed.extend_from_slice(values);
767 }
768 _ => panic!("Expected FixedWidth block"),
769 }
770 }
771
772 assert_eq!(reconstructed, data);
773 }
774
775 #[test]
776 fn test_1024_boundary_conditions() {
777 let encoder = RleMiniBlockEncoder::new();
780 let decompressor = RleMiniBlockDecompressor::new(32);
781
782 let test_cases = [
783 ("runs_of_2", {
784 let mut data = Vec::new();
785 for i in 0..512 {
786 data.push(i);
787 data.push(i);
788 }
789 data
790 }),
791 ("single_run_1024", vec![42i32; 1024]),
792 ("alternating_values", {
793 let mut data = Vec::new();
794 for i in 0..1024 {
795 data.push(i % 2);
796 }
797 data
798 }),
799 ("run_boundary_255s", {
800 let mut data = Vec::new();
801 data.extend(vec![1i32; 255]);
802 data.extend(vec![2i32; 255]);
803 data.extend(vec![3i32; 255]);
804 data.extend(vec![4i32; 255]);
805 data.extend(vec![5i32; 4]);
806 data
807 }),
808 ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
809 ("unique_plus_duplicate", {
810 let mut data = Vec::new();
812 for i in 0..1023 {
813 data.push(i);
814 }
815 data.push(1022i32); data
817 }),
818 ("bug_4092_pattern", {
819 let mut data = Vec::new();
821 for i in 0..1022 {
822 data.push(i);
823 }
824 data.push(999999i32);
825 data.push(999999i32);
826 data
827 }),
828 ];
829
830 for (test_name, data) in test_cases.iter() {
831 assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
832
833 let array = Int32Array::from(data.clone());
835 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
836
837 match decompressor.decompress(compressed.data, compressed.num_values) {
839 Ok(decompressed) => match decompressed {
840 DataBlock::FixedWidth(ref block) => {
841 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
842 assert_eq!(
843 values.len(),
844 1024,
845 "Test case {} got {} values, expected 1024",
846 test_name,
847 values.len()
848 );
849 assert_eq!(
850 block.data.len(),
851 4096,
852 "Test case {} got {} bytes, expected 4096",
853 test_name,
854 block.data.len()
855 );
856 assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
857 }
858 _ => panic!("Test case {} expected FixedWidth block", test_name),
859 },
860 Err(e) => {
861 if e.to_string().contains("4092") {
862 panic!("Test case {} found bug 4092: {}", test_name, e);
863 }
864 panic!("Test case {} failed with error: {}", test_name, e);
865 }
866 }
867 }
868 }
869
870 #[test]
871 fn test_low_repetition_50pct_bug() {
872 let encoder = RleMiniBlockEncoder::new();
875
876 let num_values = 1_048_576; let mut data = Vec::with_capacity(num_values);
879 let mut value = 0i32;
880 let mut rng = 12345u64; for _ in 0..num_values {
883 data.push(value);
884 rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
886 if (rng >> 16) & 1 == 1 {
888 value += 1;
889 }
890 }
891
892 let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
893
894 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
895 bits_per_value: 32,
896 data: LanceBuffer::from(bytes),
897 num_values: num_values as u64,
898 block_info: BlockInfo::default(),
899 });
900
901 let (compressed, _) = encoder.compress(block).unwrap();
902
903 for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
905 let _chunk_values = if chunk.log_num_values > 0 {
906 1 << chunk.log_num_values
907 } else {
908 let prev_total: usize = compressed.chunks[..i]
910 .iter()
911 .map(|c| 1usize << c.log_num_values)
912 .sum();
913 num_values - prev_total
914 };
915 }
916
917 let decompressor = RleMiniBlockDecompressor::new(32);
919 match decompressor.decompress(compressed.data, compressed.num_values) {
920 Ok(decompressed) => match decompressed {
921 DataBlock::FixedWidth(ref block) => {
922 assert_eq!(
923 block.data.len(),
924 num_values * 4,
925 "Expected {} bytes but got {}",
926 num_values * 4,
927 block.data.len()
928 );
929 }
930 _ => panic!("Expected FixedWidth block"),
931 },
932 Err(e) => {
933 if e.to_string().contains("4092") {
934 panic!("Bug reproduced! {}", e);
935 } else {
936 panic!("Unexpected error: {}", e);
937 }
938 }
939 }
940 }
941
942 #[test_log::test(tokio::test)]
945 async fn test_rle_encoding_verification() {
946 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
947 use crate::version::LanceFileVersion;
948 use arrow_array::{Array, Int32Array};
949 use lance_datagen::{ArrayGenerator, RowCount};
950 use std::collections::HashMap;
951 use std::sync::Arc;
952
953 let test_cases = TestCases::default()
954 .with_expected_encoding("rle")
955 .with_min_file_version(LanceFileVersion::V2_1);
956
957 let mut metadata_explicit = HashMap::new();
960 metadata_explicit.insert(
961 "lance-encoding:rle-threshold".to_string(),
962 "0.8".to_string(),
963 );
964 metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
965
966 let mut generator = RleDataGenerator::new(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]);
967 let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
968 check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
969 .await;
970
971 let mut metadata = HashMap::new();
975 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
976
977 let mut values = vec![42i32; 8000]; values.extend([1i32, 2i32, 3i32, 4i32, 5i32].repeat(400)); let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
980 check_round_trip_encoding_of_data(vec![arr], &test_cases, metadata).await;
981 }
982
983 #[derive(Debug)]
985 struct RleDataGenerator {
986 pattern: Vec<i32>,
987 idx: usize,
988 }
989
990 impl RleDataGenerator {
991 fn new(pattern: Vec<i32>) -> Self {
992 Self { pattern, idx: 0 }
993 }
994 }
995
996 impl lance_datagen::ArrayGenerator for RleDataGenerator {
997 fn generate(
998 &mut self,
999 _length: lance_datagen::RowCount,
1000 _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1001 ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1002 {
1003 use arrow_array::Int32Array;
1004 use std::sync::Arc;
1005
1006 let mut values = Vec::new();
1008 for _ in 0..10000 {
1009 values.push(self.pattern[self.idx]);
1010 self.idx = (self.idx + 1) % self.pattern.len();
1011 }
1012 Ok(Arc::new(Int32Array::from(values)))
1013 }
1014
1015 fn data_type(&self) -> &arrow_schema::DataType {
1016 &arrow_schema::DataType::Int32
1017 }
1018
1019 fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1020 Some(lance_datagen::ByteCount::from(4))
1021 }
1022 }
1023}