1use arrow_buffer::ArrowNativeType;
52use log::trace;
53use snafu::location;
54
55use crate::buffer::LanceBuffer;
56use crate::compression::MiniBlockDecompressor;
57use crate::data::DataBlock;
58use crate::data::{BlockInfo, FixedWidthDataBlock};
59use crate::encodings::logical::primitive::miniblock::{
60 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
61 MAX_MINIBLOCK_VALUES,
62};
63use crate::format::pb21::CompressiveEncoding;
64use crate::format::ProtobufUtils21;
65
66use lance_core::{Error, Result};
67
68#[derive(Debug, Default)]
70pub struct RleMiniBlockEncoder;
71
72impl RleMiniBlockEncoder {
73 pub fn new() -> Self {
74 Self
75 }
76
77 fn encode_data(
78 &self,
79 data: &LanceBuffer,
80 num_values: u64,
81 bits_per_value: u64,
82 ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
83 if num_values == 0 {
84 return Ok((Vec::new(), Vec::new()));
85 }
86
87 let bytes_per_value = (bits_per_value / 8) as usize;
88
89 let estimated_runs = num_values as usize / 10;
92 let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
93 let mut all_lengths = Vec::with_capacity(estimated_runs);
94 let mut chunks = Vec::new();
95
96 let mut offset = 0usize;
97 let mut values_remaining = num_values as usize;
98
99 while values_remaining > 0 {
100 let values_start = all_values.len();
101 let lengths_start = all_lengths.len();
102
103 let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
104 8 => self.encode_chunk_rolling::<u8>(
105 data,
106 offset,
107 values_remaining,
108 &mut all_values,
109 &mut all_lengths,
110 ),
111 16 => self.encode_chunk_rolling::<u16>(
112 data,
113 offset,
114 values_remaining,
115 &mut all_values,
116 &mut all_lengths,
117 ),
118 32 => self.encode_chunk_rolling::<u32>(
119 data,
120 offset,
121 values_remaining,
122 &mut all_values,
123 &mut all_lengths,
124 ),
125 64 => self.encode_chunk_rolling::<u64>(
126 data,
127 offset,
128 values_remaining,
129 &mut all_values,
130 &mut all_lengths,
131 ),
132 _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
133 };
134
135 if values_processed == 0 {
136 break;
137 }
138
139 let log_num_values = if is_last_chunk {
140 0
141 } else {
142 assert!(
143 values_processed.is_power_of_two(),
144 "Non-last chunk must have power-of-2 values"
145 );
146 values_processed.ilog2() as u8
147 };
148
149 let values_size = all_values.len() - values_start;
150 let lengths_size = all_lengths.len() - lengths_start;
151
152 let chunk = MiniBlockChunk {
153 buffer_sizes: vec![values_size as u32, lengths_size as u32],
154 log_num_values,
155 };
156
157 chunks.push(chunk);
158
159 offset += values_processed;
160 values_remaining -= values_processed;
161 }
162
163 Ok((
165 vec![
166 LanceBuffer::from(all_values),
167 LanceBuffer::from(all_lengths),
168 ],
169 chunks,
170 ))
171 }
172
173 fn encode_chunk_rolling<T>(
190 &self,
191 data: &LanceBuffer,
192 offset: usize,
193 values_remaining: usize,
194 all_values: &mut Vec<u8>,
195 all_lengths: &mut Vec<u8>,
196 ) -> (usize, usize, bool)
197 where
198 T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType,
199 {
200 let type_size = std::mem::size_of::<T>();
201
202 let chunk_start = offset * type_size;
203 let max_by_count = MAX_MINIBLOCK_VALUES as usize;
204 let max_values = values_remaining.min(max_by_count);
205 let chunk_end = chunk_start + max_values * type_size;
206
207 if chunk_start >= data.len() {
208 return (0, 0, false);
209 }
210
211 let chunk_len = chunk_end.min(data.len()) - chunk_start;
212 let chunk_buffer = data.slice_with_length(chunk_start, chunk_len);
213 let typed_data_ref = chunk_buffer.borrow_to_typed_slice::<T>();
214 let typed_data: &[T] = typed_data_ref.as_ref();
215
216 if typed_data.is_empty() {
217 return (0, 0, false);
218 }
219
220 let values_start = all_values.len();
222
223 let mut current_value = typed_data[0];
224 let mut current_length = 1u64;
225 let mut bytes_used = 0usize;
226 let mut total_values_encoded = 0usize; let min_checkpoint_log2 = match type_size {
233 1 => 8, 2 => 7, _ => 6, };
237 let max_checkpoint_log2 = (values_remaining.min(MAX_MINIBLOCK_VALUES as usize))
238 .next_power_of_two()
239 .ilog2();
240 let mut checkpoint_log2 = min_checkpoint_log2;
241
242 let mut last_checkpoint_state = None;
244
245 for &value in typed_data[1..].iter() {
246 if value == current_value {
247 current_length += 1;
248 } else {
249 let run_chunks = current_length.div_ceil(255) as usize;
251 let bytes_needed = run_chunks * (type_size + 1);
252
253 if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
255 if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
256 all_values.truncate(val_pos);
258 all_lengths.truncate(len_pos);
259 let num_runs = (val_pos - values_start) / type_size;
260 return (num_runs, checkpoint_values, false);
261 }
262 break;
263 }
264
265 bytes_used += self.add_run(¤t_value, current_length, all_values, all_lengths);
266 total_values_encoded += current_length as usize;
267 current_value = value;
268 current_length = 1;
269 }
270
271 while checkpoint_log2 <= max_checkpoint_log2 {
273 let checkpoint_values = 1usize << checkpoint_log2;
274 if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values
275 {
276 break;
277 }
278 last_checkpoint_state = Some((
279 all_values.len(),
280 all_lengths.len(),
281 bytes_used,
282 checkpoint_values,
283 ));
284 checkpoint_log2 += 1;
285 }
286 }
287
288 if current_length > 0 {
291 let run_chunks = current_length.div_ceil(255) as usize;
292 let bytes_needed = run_chunks * (type_size + 1);
293
294 if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
295 let _ = self.add_run(¤t_value, current_length, all_values, all_lengths);
296 total_values_encoded += current_length as usize;
297 }
298 }
299
300 let is_last_chunk = total_values_encoded == values_remaining;
302
303 if !is_last_chunk {
305 if total_values_encoded.is_power_of_two() {
306 } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
308 all_values.truncate(val_pos);
310 all_lengths.truncate(len_pos);
311 let num_runs = (val_pos - values_start) / type_size;
312 return (num_runs, checkpoint_values, false);
313 } else {
314 return (0, 0, false);
316 }
317 }
318
319 let num_runs = (all_values.len() - values_start) / type_size;
320 (num_runs, total_values_encoded, is_last_chunk)
321 }
322
323 fn add_run<T>(
324 &self,
325 value: &T,
326 length: u64,
327 all_values: &mut Vec<u8>,
328 all_lengths: &mut Vec<u8>,
329 ) -> usize
330 where
331 T: bytemuck::Pod,
332 {
333 let value_bytes = bytemuck::bytes_of(value);
334 let type_size = std::mem::size_of::<T>();
335 let num_full_chunks = (length / 255) as usize;
336 let remainder = (length % 255) as u8;
337
338 let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
339 all_values.reserve(total_chunks * type_size);
340 all_lengths.reserve(total_chunks);
341
342 for _ in 0..num_full_chunks {
343 all_values.extend_from_slice(value_bytes);
344 all_lengths.push(255);
345 }
346
347 if remainder > 0 {
348 all_values.extend_from_slice(value_bytes);
349 all_lengths.push(remainder);
350 }
351
352 total_chunks * (type_size + 1)
353 }
354}
355
356impl MiniBlockCompressor for RleMiniBlockEncoder {
357 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
358 match data {
359 DataBlock::FixedWidth(fixed_width) => {
360 let num_values = fixed_width.num_values;
361 let bits_per_value = fixed_width.bits_per_value;
362
363 let (all_buffers, chunks) =
364 self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
365
366 let compressed = MiniBlockCompressed {
367 data: all_buffers,
368 chunks,
369 num_values,
370 };
371
372 let encoding = ProtobufUtils21::rle(
373 ProtobufUtils21::flat(bits_per_value, None),
374 ProtobufUtils21::flat(8, None),
375 );
376
377 Ok((compressed, encoding))
378 }
379 _ => Err(Error::InvalidInput {
380 location: location!(),
381 source: "RLE encoding only supports FixedWidth data blocks".into(),
382 }),
383 }
384 }
385}
386
387#[derive(Debug)]
389pub struct RleMiniBlockDecompressor {
390 bits_per_value: u64,
391}
392
393impl RleMiniBlockDecompressor {
394 pub fn new(bits_per_value: u64) -> Self {
395 Self { bits_per_value }
396 }
397
398 fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
399 if num_values == 0 {
400 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
401 bits_per_value: self.bits_per_value,
402 data: LanceBuffer::from(vec![]),
403 num_values: 0,
404 block_info: BlockInfo::default(),
405 }));
406 }
407
408 assert_eq!(
409 data.len(),
410 2,
411 "RLE decompressor expects exactly 2 buffers, got {}",
412 data.len()
413 );
414
415 let values_buffer = &data[0];
416 let lengths_buffer = &data[1];
417
418 let decoded_data = match self.bits_per_value {
419 8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
420 16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
421 32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
422 64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
423 _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
424 };
425
426 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
427 bits_per_value: self.bits_per_value,
428 data: decoded_data,
429 num_values,
430 block_info: BlockInfo::default(),
431 }))
432 }
433
434 fn decode_generic<T>(
435 &self,
436 values_buffer: &LanceBuffer,
437 lengths_buffer: &LanceBuffer,
438 num_values: u64,
439 ) -> Result<LanceBuffer>
440 where
441 T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
442 {
443 let type_size = std::mem::size_of::<T>();
444
445 if values_buffer.is_empty() || lengths_buffer.is_empty() {
446 if num_values == 0 {
447 return Ok(LanceBuffer::empty());
448 } else {
449 return Err(Error::InvalidInput {
450 location: location!(),
451 source: format!("Empty buffers but expected {} values", num_values).into(),
452 });
453 }
454 }
455
456 if values_buffer.len() % type_size != 0 || lengths_buffer.is_empty() {
457 return Err(Error::InvalidInput {
458 location: location!(),
459 source: format!(
460 "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
461 std::any::type_name::<T>(),
462 values_buffer.len(),
463 type_size,
464 lengths_buffer.len()
465 )
466 .into(),
467 });
468 }
469
470 let num_runs = values_buffer.len() / type_size;
471 let num_length_entries = lengths_buffer.len();
472 assert_eq!(
473 num_runs, num_length_entries,
474 "Inconsistent RLE buffers: {} runs but {} length entries",
475 num_runs, num_length_entries
476 );
477
478 let values_ref = values_buffer.borrow_to_typed_slice::<T>();
479 let values: &[T] = values_ref.as_ref();
480 let lengths: &[u8] = lengths_buffer.as_ref();
481
482 let expected_value_count = num_values as usize;
483 let mut decoded: Vec<T> = Vec::with_capacity(expected_value_count);
484
485 for (value, &length) in values.iter().zip(lengths.iter()) {
486 if decoded.len() == expected_value_count {
487 break;
488 }
489
490 if length == 0 {
491 return Err(Error::InvalidInput {
492 location: location!(),
493 source: "RLE decoding encountered a zero run length".into(),
494 });
495 }
496
497 let remaining = expected_value_count - decoded.len();
498 let write_len = (length as usize).min(remaining);
499
500 decoded.resize(decoded.len() + write_len, *value);
501 }
502
503 if decoded.len() != expected_value_count {
504 return Err(Error::InvalidInput {
505 location: location!(),
506 source: format!(
507 "RLE decoding produced {} values, expected {}",
508 decoded.len(),
509 expected_value_count
510 )
511 .into(),
512 });
513 }
514
515 trace!(
516 "RLE decoded {} {} values",
517 num_values,
518 std::any::type_name::<T>()
519 );
520 Ok(LanceBuffer::reinterpret_vec(decoded))
521 }
522}
523
524impl MiniBlockDecompressor for RleMiniBlockDecompressor {
525 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
526 self.decode_data(data, num_values)
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use crate::data::DataBlock;
534 use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES;
535 use arrow_array::Int32Array;
536
537 #[test]
540 fn test_basic_rle_encoding() {
541 let encoder = RleMiniBlockEncoder::new();
542
543 let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
545 let data_block = DataBlock::from_array(array);
546
547 let (compressed, _) = encoder.compress(data_block).unwrap();
548
549 assert_eq!(compressed.num_values, 9);
550 assert_eq!(compressed.chunks.len(), 1);
551
552 let values_buffer = &compressed.data[0];
554 let lengths_buffer = &compressed.data[1];
555 assert_eq!(values_buffer.len(), 12); assert_eq!(lengths_buffer.len(), 3); }
558
559 #[test]
560 fn test_long_run_splitting() {
561 let encoder = RleMiniBlockEncoder::new();
562
563 let mut data = vec![42i32; 1000]; data.extend(&[100i32; 300]); let array = Int32Array::from(data);
568 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
569
570 let lengths_buffer = &compressed.data[1];
572 assert_eq!(lengths_buffer.len(), 6);
573 }
574
575 #[test]
578 fn test_round_trip_all_types() {
579 test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
581
582 test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
584
585 test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
587
588 test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
590 }
591
592 fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
593 where
594 T: bytemuck::Pod + PartialEq + std::fmt::Debug,
595 {
596 let encoder = RleMiniBlockEncoder::new();
597 let bytes: Vec<u8> = data
598 .iter()
599 .flat_map(|v| bytemuck::bytes_of(v))
600 .copied()
601 .collect();
602
603 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
604 bits_per_value,
605 data: LanceBuffer::from(bytes),
606 num_values: data.len() as u64,
607 block_info: BlockInfo::default(),
608 });
609
610 let (compressed, _) = encoder.compress(block).unwrap();
611 let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
612 let decompressed = decompressor
613 .decompress(compressed.data, compressed.num_values)
614 .unwrap();
615
616 match decompressed {
617 DataBlock::FixedWidth(ref block) => {
618 assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
620 }
621 _ => panic!("Expected FixedWidth block"),
622 }
623 }
624
625 #[test]
628 fn test_power_of_two_chunking() {
629 let encoder = RleMiniBlockEncoder::new();
630
631 let test_sizes = vec![1000, 2500, 5000, 10000];
633
634 for size in test_sizes {
635 let data: Vec<i32> = (0..size)
636 .map(|i| i / 50) .collect();
638
639 let array = Int32Array::from(data);
640 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
641
642 for (i, chunk) in compressed.chunks.iter().enumerate() {
644 if i < compressed.chunks.len() - 1 {
645 assert!(chunk.log_num_values > 0);
646 let chunk_values = 1u64 << chunk.log_num_values;
647 assert!(chunk_values.is_power_of_two());
648 assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
649 } else {
650 assert_eq!(chunk.log_num_values, 0);
651 }
652 }
653 }
654 }
655
656 #[test]
659 #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
660 fn test_invalid_buffer_count() {
661 let decompressor = RleMiniBlockDecompressor::new(32);
662 let _ = decompressor.decompress(vec![LanceBuffer::from(vec![1, 2, 3, 4])], 10);
663 }
664
665 #[test]
666 #[should_panic(expected = "Inconsistent RLE buffers")]
667 fn test_buffer_consistency() {
668 let decompressor = RleMiniBlockDecompressor::new(32);
669 let values = LanceBuffer::from(vec![1, 0, 0, 0]); let lengths = LanceBuffer::from(vec![5, 10]); let _ = decompressor.decompress(vec![values, lengths], 15);
672 }
673
674 #[test]
675 fn test_empty_data_handling() {
676 let encoder = RleMiniBlockEncoder::new();
677
678 let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
680 bits_per_value: 32,
681 data: LanceBuffer::from(vec![]),
682 num_values: 0,
683 block_info: BlockInfo::default(),
684 });
685
686 let (compressed, _) = encoder.compress(empty_block).unwrap();
687 assert_eq!(compressed.num_values, 0);
688 assert!(compressed.data.is_empty());
689
690 let decompressor = RleMiniBlockDecompressor::new(32);
692 let decompressed = decompressor.decompress(vec![], 0).unwrap();
693
694 match decompressed {
695 DataBlock::FixedWidth(ref block) => {
696 assert_eq!(block.num_values, 0);
697 assert_eq!(block.data.len(), 0);
698 }
699 _ => panic!("Expected FixedWidth block"),
700 }
701 }
702
703 #[test]
706 fn test_multi_chunk_round_trip() {
707 let encoder = RleMiniBlockEncoder::new();
708
709 let mut data = Vec::new();
711
712 data.extend(vec![999i32; 2000]);
714 data.extend(0..1000);
716 data.extend(vec![777i32; 2000]);
718
719 let array = Int32Array::from(data.clone());
720 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
721
722 let mut reconstructed = Vec::new();
724 let mut values_offset = 0usize;
725 let mut lengths_offset = 0usize;
726 let mut values_processed = 0u64;
727
728 assert_eq!(compressed.data.len(), 2);
730 let global_values = &compressed.data[0];
731 let global_lengths = &compressed.data[1];
732
733 for chunk in &compressed.chunks {
734 let chunk_values = if chunk.log_num_values > 0 {
735 1u64 << chunk.log_num_values
736 } else {
737 compressed.num_values - values_processed
738 };
739
740 let values_size = chunk.buffer_sizes[0] as usize;
742 let lengths_size = chunk.buffer_sizes[1] as usize;
743
744 let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
745 let chunk_lengths_buffer =
746 global_lengths.slice_with_length(lengths_offset, lengths_size);
747
748 let decompressor = RleMiniBlockDecompressor::new(32);
749 let chunk_data = decompressor
750 .decompress(
751 vec![chunk_values_buffer, chunk_lengths_buffer],
752 chunk_values,
753 )
754 .unwrap();
755
756 values_offset += values_size;
757 lengths_offset += lengths_size;
758 values_processed += chunk_values;
759
760 match chunk_data {
761 DataBlock::FixedWidth(ref block) => {
762 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
763 reconstructed.extend_from_slice(values);
764 }
765 _ => panic!("Expected FixedWidth block"),
766 }
767 }
768
769 assert_eq!(reconstructed, data);
770 }
771
772 #[test]
773 fn test_1024_boundary_conditions() {
774 let encoder = RleMiniBlockEncoder::new();
777 let decompressor = RleMiniBlockDecompressor::new(32);
778
779 let test_cases = [
780 ("runs_of_2", {
781 let mut data = Vec::new();
782 for i in 0..512 {
783 data.push(i);
784 data.push(i);
785 }
786 data
787 }),
788 ("single_run_1024", vec![42i32; 1024]),
789 ("alternating_values", {
790 let mut data = Vec::new();
791 for i in 0..1024 {
792 data.push(i % 2);
793 }
794 data
795 }),
796 ("run_boundary_255s", {
797 let mut data = Vec::new();
798 data.extend(vec![1i32; 255]);
799 data.extend(vec![2i32; 255]);
800 data.extend(vec![3i32; 255]);
801 data.extend(vec![4i32; 255]);
802 data.extend(vec![5i32; 4]);
803 data
804 }),
805 ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
806 ("unique_plus_duplicate", {
807 let mut data = Vec::new();
809 for i in 0..1023 {
810 data.push(i);
811 }
812 data.push(1022i32); data
814 }),
815 ("bug_4092_pattern", {
816 let mut data = Vec::new();
818 for i in 0..1022 {
819 data.push(i);
820 }
821 data.push(999999i32);
822 data.push(999999i32);
823 data
824 }),
825 ];
826
827 for (test_name, data) in test_cases.iter() {
828 assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
829
830 let array = Int32Array::from(data.clone());
832 let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
833
834 match decompressor.decompress(compressed.data, compressed.num_values) {
836 Ok(decompressed) => match decompressed {
837 DataBlock::FixedWidth(ref block) => {
838 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
839 assert_eq!(
840 values.len(),
841 1024,
842 "Test case {} got {} values, expected 1024",
843 test_name,
844 values.len()
845 );
846 assert_eq!(
847 block.data.len(),
848 4096,
849 "Test case {} got {} bytes, expected 4096",
850 test_name,
851 block.data.len()
852 );
853 assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
854 }
855 _ => panic!("Test case {} expected FixedWidth block", test_name),
856 },
857 Err(e) => {
858 if e.to_string().contains("4092") {
859 panic!("Test case {} found bug 4092: {}", test_name, e);
860 }
861 panic!("Test case {} failed with error: {}", test_name, e);
862 }
863 }
864 }
865 }
866
867 #[test]
868 fn test_low_repetition_50pct_bug() {
869 let encoder = RleMiniBlockEncoder::new();
872
873 let num_values = 1_048_576; let mut data = Vec::with_capacity(num_values);
876 let mut value = 0i32;
877 let mut rng = 12345u64; for _ in 0..num_values {
880 data.push(value);
881 rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
883 if (rng >> 16) & 1 == 1 {
885 value += 1;
886 }
887 }
888
889 let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
890
891 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
892 bits_per_value: 32,
893 data: LanceBuffer::from(bytes),
894 num_values: num_values as u64,
895 block_info: BlockInfo::default(),
896 });
897
898 let (compressed, _) = encoder.compress(block).unwrap();
899
900 for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
902 let _chunk_values = if chunk.log_num_values > 0 {
903 1 << chunk.log_num_values
904 } else {
905 let prev_total: usize = compressed.chunks[..i]
907 .iter()
908 .map(|c| 1usize << c.log_num_values)
909 .sum();
910 num_values - prev_total
911 };
912 }
913
914 let decompressor = RleMiniBlockDecompressor::new(32);
916 match decompressor.decompress(compressed.data, compressed.num_values) {
917 Ok(decompressed) => match decompressed {
918 DataBlock::FixedWidth(ref block) => {
919 assert_eq!(
920 block.data.len(),
921 num_values * 4,
922 "Expected {} bytes but got {}",
923 num_values * 4,
924 block.data.len()
925 );
926 }
927 _ => panic!("Expected FixedWidth block"),
928 },
929 Err(e) => {
930 if e.to_string().contains("4092") {
931 panic!("Bug reproduced! {}", e);
932 } else {
933 panic!("Unexpected error: {}", e);
934 }
935 }
936 }
937 }
938
939 #[test_log::test(tokio::test)]
942 async fn test_rle_encoding_verification() {
943 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
944 use crate::version::LanceFileVersion;
945 use arrow_array::{Array, Int32Array};
946 use lance_datagen::{ArrayGenerator, RowCount};
947 use std::collections::HashMap;
948 use std::sync::Arc;
949
950 let test_cases = TestCases::default()
951 .with_expected_encoding("rle")
952 .with_min_file_version(LanceFileVersion::V2_1);
953
954 let mut metadata_explicit = HashMap::new();
957 metadata_explicit.insert(
958 "lance-encoding:rle-threshold".to_string(),
959 "0.8".to_string(),
960 );
961 metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
962
963 let mut generator = RleDataGenerator::new(vec![
964 i32::MIN,
965 i32::MIN,
966 i32::MIN,
967 i32::MIN,
968 i32::MIN + 1,
969 i32::MIN + 1,
970 i32::MIN + 1,
971 i32::MIN + 1,
972 i32::MIN + 2,
973 i32::MIN + 2,
974 i32::MIN + 2,
975 i32::MIN + 2,
976 ]);
977 let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
978 check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
979 .await;
980
981 let mut metadata = HashMap::new();
987 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
988
989 let mut values = vec![i32::MIN; 8000]; values.extend(
991 [
992 i32::MIN + 1,
993 i32::MIN + 2,
994 i32::MIN + 3,
995 i32::MIN + 4,
996 i32::MIN + 5,
997 ]
998 .repeat(400),
999 ); let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
1001 check_round_trip_encoding_of_data(vec![arr], &test_cases, metadata).await;
1002 }
1003
1004 #[derive(Debug)]
1006 struct RleDataGenerator {
1007 pattern: Vec<i32>,
1008 idx: usize,
1009 }
1010
1011 impl RleDataGenerator {
1012 fn new(pattern: Vec<i32>) -> Self {
1013 Self { pattern, idx: 0 }
1014 }
1015 }
1016
1017 impl lance_datagen::ArrayGenerator for RleDataGenerator {
1018 fn generate(
1019 &mut self,
1020 _length: lance_datagen::RowCount,
1021 _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1022 ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1023 {
1024 use arrow_array::Int32Array;
1025 use std::sync::Arc;
1026
1027 let mut values = Vec::new();
1029 for _ in 0..10000 {
1030 values.push(self.pattern[self.idx]);
1031 self.idx = (self.idx + 1) % self.pattern.len();
1032 }
1033 Ok(Arc::new(Int32Array::from(values)))
1034 }
1035
1036 fn data_type(&self) -> &arrow_schema::DataType {
1037 &arrow_schema::DataType::Int32
1038 }
1039
1040 fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1041 Some(lance_datagen::ByteCount::from(4))
1042 }
1043 }
1044}