1use arrow_buffer::ArrowNativeType;
58use log::trace;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::{BlockCompressor, BlockDecompressor, MiniBlockDecompressor};
62use crate::data::DataBlock;
63use crate::data::{BlockInfo, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65 MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES, MiniBlockChunk, MiniBlockCompressed,
66 MiniBlockCompressor,
67};
68use crate::format::ProtobufUtils21;
69use crate::format::pb21::CompressiveEncoding;
70
71use lance_core::{Error, Result};
72
73#[derive(Debug, Default)]
75pub struct RleEncoder;
76
77impl RleEncoder {
78 pub fn new() -> Self {
79 Self
80 }
81
82 fn encode_data(
83 &self,
84 data: &LanceBuffer,
85 num_values: u64,
86 bits_per_value: u64,
87 ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
88 if num_values == 0 {
89 return Ok((Vec::new(), Vec::new()));
90 }
91
92 let bytes_per_value = (bits_per_value / 8) as usize;
93
94 let estimated_runs = num_values as usize / 10;
97 let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
98 let mut all_lengths = Vec::with_capacity(estimated_runs);
99 let mut chunks = Vec::new();
100
101 let mut offset = 0usize;
102 let mut values_remaining = num_values as usize;
103
104 while values_remaining > 0 {
105 let values_start = all_values.len();
106 let lengths_start = all_lengths.len();
107
108 let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
109 8 => self.encode_chunk_rolling::<u8>(
110 data,
111 offset,
112 values_remaining,
113 &mut all_values,
114 &mut all_lengths,
115 ),
116 16 => self.encode_chunk_rolling::<u16>(
117 data,
118 offset,
119 values_remaining,
120 &mut all_values,
121 &mut all_lengths,
122 ),
123 32 => self.encode_chunk_rolling::<u32>(
124 data,
125 offset,
126 values_remaining,
127 &mut all_values,
128 &mut all_lengths,
129 ),
130 64 => self.encode_chunk_rolling::<u64>(
131 data,
132 offset,
133 values_remaining,
134 &mut all_values,
135 &mut all_lengths,
136 ),
137 _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
138 };
139
140 if values_processed == 0 {
141 break;
142 }
143
144 let log_num_values = if is_last_chunk {
145 0
146 } else {
147 assert!(
148 values_processed.is_power_of_two(),
149 "Non-last chunk must have power-of-2 values"
150 );
151 values_processed.ilog2() as u8
152 };
153
154 let values_size = all_values.len() - values_start;
155 let lengths_size = all_lengths.len() - lengths_start;
156
157 let chunk = MiniBlockChunk {
158 buffer_sizes: vec![values_size as u32, lengths_size as u32],
159 log_num_values,
160 };
161
162 chunks.push(chunk);
163
164 offset += values_processed;
165 values_remaining -= values_processed;
166 }
167
168 Ok((
170 vec![
171 LanceBuffer::from(all_values),
172 LanceBuffer::from(all_lengths),
173 ],
174 chunks,
175 ))
176 }
177
178 fn encode_chunk_rolling<T>(
195 &self,
196 data: &LanceBuffer,
197 offset: usize,
198 values_remaining: usize,
199 all_values: &mut Vec<u8>,
200 all_lengths: &mut Vec<u8>,
201 ) -> (usize, usize, bool)
202 where
203 T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType,
204 {
205 let type_size = std::mem::size_of::<T>();
206
207 let chunk_start = offset * type_size;
208 let max_by_count = MAX_MINIBLOCK_VALUES as usize;
209 let max_values = values_remaining.min(max_by_count);
210 let chunk_end = chunk_start + max_values * type_size;
211
212 if chunk_start >= data.len() {
213 return (0, 0, false);
214 }
215
216 let chunk_len = chunk_end.min(data.len()) - chunk_start;
217 let chunk_buffer = data.slice_with_length(chunk_start, chunk_len);
218 let typed_data_ref = chunk_buffer.borrow_to_typed_slice::<T>();
219 let typed_data: &[T] = typed_data_ref.as_ref();
220
221 if typed_data.is_empty() {
222 return (0, 0, false);
223 }
224
225 let values_start = all_values.len();
227
228 let mut current_value = typed_data[0];
229 let mut current_length = 1u64;
230 let mut bytes_used = 0usize;
231 let mut total_values_encoded = 0usize; let min_checkpoint_log2 = match type_size {
238 1 => 8, 2 => 7, _ => 6, };
242 let max_checkpoint_log2 = (values_remaining.min(MAX_MINIBLOCK_VALUES as usize))
243 .next_power_of_two()
244 .ilog2();
245 let mut checkpoint_log2 = min_checkpoint_log2;
246
247 let mut last_checkpoint_state = None;
249
250 for &value in typed_data[1..].iter() {
251 if value == current_value {
252 current_length += 1;
253 } else {
254 let run_chunks = current_length.div_ceil(255) as usize;
256 let bytes_needed = run_chunks * (type_size + 1);
257
258 if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
260 if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
261 all_values.truncate(val_pos);
263 all_lengths.truncate(len_pos);
264 let num_runs = (val_pos - values_start) / type_size;
265 return (num_runs, checkpoint_values, false);
266 }
267 break;
268 }
269
270 bytes_used += self.add_run(¤t_value, current_length, all_values, all_lengths);
271 total_values_encoded += current_length as usize;
272 current_value = value;
273 current_length = 1;
274 }
275
276 while checkpoint_log2 <= max_checkpoint_log2 {
278 let checkpoint_values = 1usize << checkpoint_log2;
279 if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values
280 {
281 break;
282 }
283 last_checkpoint_state = Some((
284 all_values.len(),
285 all_lengths.len(),
286 bytes_used,
287 checkpoint_values,
288 ));
289 checkpoint_log2 += 1;
290 }
291 }
292
293 if current_length > 0 {
296 let run_chunks = current_length.div_ceil(255) as usize;
297 let bytes_needed = run_chunks * (type_size + 1);
298
299 if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
300 let _ = self.add_run(¤t_value, current_length, all_values, all_lengths);
301 total_values_encoded += current_length as usize;
302 }
303 }
304
305 let is_last_chunk = total_values_encoded == values_remaining;
307
308 if !is_last_chunk {
310 if total_values_encoded.is_power_of_two() {
311 } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
313 all_values.truncate(val_pos);
315 all_lengths.truncate(len_pos);
316 let num_runs = (val_pos - values_start) / type_size;
317 return (num_runs, checkpoint_values, false);
318 } else {
319 return (0, 0, false);
321 }
322 }
323
324 let num_runs = (all_values.len() - values_start) / type_size;
325 (num_runs, total_values_encoded, is_last_chunk)
326 }
327
328 fn add_run<T>(
329 &self,
330 value: &T,
331 length: u64,
332 all_values: &mut Vec<u8>,
333 all_lengths: &mut Vec<u8>,
334 ) -> usize
335 where
336 T: bytemuck::Pod,
337 {
338 let value_bytes = bytemuck::bytes_of(value);
339 let type_size = std::mem::size_of::<T>();
340 let num_full_chunks = (length / 255) as usize;
341 let remainder = (length % 255) as u8;
342
343 let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
344 all_values.reserve(total_chunks * type_size);
345 all_lengths.reserve(total_chunks);
346
347 for _ in 0..num_full_chunks {
348 all_values.extend_from_slice(value_bytes);
349 all_lengths.push(255);
350 }
351
352 if remainder > 0 {
353 all_values.extend_from_slice(value_bytes);
354 all_lengths.push(remainder);
355 }
356
357 total_chunks * (type_size + 1)
358 }
359}
360
361impl MiniBlockCompressor for RleEncoder {
362 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
363 match data {
364 DataBlock::FixedWidth(fixed_width) => {
365 let num_values = fixed_width.num_values;
366 let bits_per_value = fixed_width.bits_per_value;
367
368 let (all_buffers, chunks) =
369 self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
370
371 let compressed = MiniBlockCompressed {
372 data: all_buffers,
373 chunks,
374 num_values,
375 };
376
377 let encoding = ProtobufUtils21::rle(
378 ProtobufUtils21::flat(bits_per_value, None),
379 ProtobufUtils21::flat(8, None),
380 );
381
382 Ok((compressed, encoding))
383 }
384 _ => Err(Error::invalid_input_source(
385 "RLE encoding only supports FixedWidth data blocks".into(),
386 )),
387 }
388 }
389}
390
391impl BlockCompressor for RleEncoder {
392 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
394 match data {
395 DataBlock::FixedWidth(fixed_width) => {
396 let num_values = fixed_width.num_values;
397 let bits_per_value = fixed_width.bits_per_value;
398
399 let (all_buffers, _) =
400 self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
401
402 let values_size = all_buffers[0].len() as u64;
403
404 let mut combined = Vec::new();
405 combined.extend_from_slice(&values_size.to_le_bytes());
406 combined.extend_from_slice(&all_buffers[0]);
407 combined.extend_from_slice(&all_buffers[1]);
408 Ok(LanceBuffer::from(combined))
409 }
410 _ => Err(Error::invalid_input_source(
411 "RLE encoding only supports FixedWidth data blocks".into(),
412 )),
413 }
414 }
415}
416
417#[derive(Debug)]
419pub struct RleDecompressor {
420 bits_per_value: u64,
421}
422
423impl RleDecompressor {
424 pub fn new(bits_per_value: u64) -> Self {
425 Self { bits_per_value }
426 }
427
428 fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
429 if num_values == 0 {
430 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
431 bits_per_value: self.bits_per_value,
432 data: LanceBuffer::from(vec![]),
433 num_values: 0,
434 block_info: BlockInfo::default(),
435 }));
436 }
437
438 if data.len() != 2 {
439 return Err(Error::invalid_input_source(
440 format!(
441 "RLE decompressor expects exactly 2 buffers, got {}",
442 data.len()
443 )
444 .into(),
445 ));
446 }
447
448 let values_buffer = &data[0];
449 let lengths_buffer = &data[1];
450
451 let decoded_data = match self.bits_per_value {
452 8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
453 16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
454 32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
455 64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
456 _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
457 };
458
459 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
460 bits_per_value: self.bits_per_value,
461 data: decoded_data,
462 num_values,
463 block_info: BlockInfo::default(),
464 }))
465 }
466
467 fn decode_generic<T>(
468 &self,
469 values_buffer: &LanceBuffer,
470 lengths_buffer: &LanceBuffer,
471 num_values: u64,
472 ) -> Result<LanceBuffer>
473 where
474 T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
475 {
476 let type_size = std::mem::size_of::<T>();
477
478 if values_buffer.is_empty() || lengths_buffer.is_empty() {
479 if num_values == 0 {
480 return Ok(LanceBuffer::empty());
481 } else {
482 return Err(Error::invalid_input_source(
483 format!("Empty buffers but expected {} values", num_values).into(),
484 ));
485 }
486 }
487
488 if !values_buffer.len().is_multiple_of(type_size) || lengths_buffer.is_empty() {
489 return Err(Error::invalid_input_source(format!(
490 "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
491 std::any::type_name::<T>(),
492 values_buffer.len(),
493 type_size,
494 lengths_buffer.len()
495 )
496 .into()));
497 }
498
499 let num_runs = values_buffer.len() / type_size;
500 let num_length_entries = lengths_buffer.len();
501 if num_runs != num_length_entries {
502 return Err(Error::invalid_input_source(
503 format!(
504 "Inconsistent RLE buffers: {} runs but {} length entries",
505 num_runs, num_length_entries
506 )
507 .into(),
508 ));
509 }
510
511 let values_ref = values_buffer.borrow_to_typed_slice::<T>();
512 let values: &[T] = values_ref.as_ref();
513 let lengths: &[u8] = lengths_buffer.as_ref();
514
515 let expected_value_count = num_values as usize;
516 let mut decoded: Vec<T> = Vec::with_capacity(expected_value_count);
517
518 for (value, &length) in values.iter().zip(lengths.iter()) {
519 if decoded.len() == expected_value_count {
520 break;
521 }
522
523 if length == 0 {
524 return Err(Error::invalid_input_source(
525 "RLE decoding encountered a zero run length".into(),
526 ));
527 }
528
529 let remaining = expected_value_count - decoded.len();
530 let write_len = (length as usize).min(remaining);
531
532 decoded.resize(decoded.len() + write_len, *value);
533 }
534
535 if decoded.len() != expected_value_count {
536 return Err(Error::invalid_input_source(
537 format!(
538 "RLE decoding produced {} values, expected {}",
539 decoded.len(),
540 expected_value_count
541 )
542 .into(),
543 ));
544 }
545
546 trace!(
547 "RLE decoded {} {} values",
548 num_values,
549 std::any::type_name::<T>()
550 );
551 Ok(LanceBuffer::reinterpret_vec(decoded))
552 }
553}
554
555impl MiniBlockDecompressor for RleDecompressor {
556 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
557 self.decode_data(data, num_values)
558 }
559}
560
561impl BlockDecompressor for RleDecompressor {
562 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
563 if data.len() < 8 {
565 return Err(Error::invalid_input_source(
566 format!("Insufficient data size: {}", data.len()).into(),
567 ));
568 }
569
570 let values_size_bytes: [u8; 8] =
571 data[..8].try_into().expect("slice length already checked");
572 let values_size: u64 = u64::from_le_bytes(values_size_bytes);
573
574 let values_start: usize = 8;
576 let values_size: usize = values_size.try_into().map_err(|_| {
577 Error::invalid_input_source(
578 format!("Invalid values buffer size: {}", values_size).into(),
579 )
580 })?;
581 let lengths_start = values_start
582 .checked_add(values_size)
583 .ok_or_else(|| Error::invalid_input_source("Invalid RLE values buffer size".into()))?;
584
585 if data.len() < lengths_start {
586 return Err(Error::invalid_input_source(
587 format!("Insufficient data size: {}", data.len()).into(),
588 ));
589 }
590
591 let values_buffer = data.slice_with_length(values_start, values_size);
592 let lengths_buffer = data.slice_with_length(lengths_start, data.len() - lengths_start);
593
594 self.decode_data(vec![values_buffer, lengths_buffer], num_values)
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use crate::data::DataBlock;
602 use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES;
603 use crate::{buffer::LanceBuffer, compression::BlockDecompressor};
604 use arrow_array::Int32Array;
605 #[test]
608 fn test_basic_miniblock_rle_encoding() {
609 let encoder = RleEncoder::new();
610
611 let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
613 let data_block = DataBlock::from_array(array);
614
615 let (compressed, _) = MiniBlockCompressor::compress(&encoder, data_block).unwrap();
616
617 assert_eq!(compressed.num_values, 9);
618 assert_eq!(compressed.chunks.len(), 1);
619
620 let values_buffer = &compressed.data[0];
622 let lengths_buffer = &compressed.data[1];
623 assert_eq!(values_buffer.len(), 12); assert_eq!(lengths_buffer.len(), 3); }
626
627 #[test]
628 fn test_long_run_splitting() {
629 let encoder = RleEncoder::new();
630
631 let mut data = vec![42i32; 1000]; data.extend(&[100i32; 300]); let array = Int32Array::from(data);
636 let (compressed, _) =
637 MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
638
639 let lengths_buffer = &compressed.data[1];
641 assert_eq!(lengths_buffer.len(), 6);
642 }
643
644 #[test]
647 fn test_round_trip_all_types() {
648 test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
650
651 test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
653
654 test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
656
657 test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
659 }
660
661 fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
662 where
663 T: bytemuck::Pod + PartialEq + std::fmt::Debug,
664 {
665 let encoder = RleEncoder::new();
666 let bytes: Vec<u8> = data
667 .iter()
668 .flat_map(|v| bytemuck::bytes_of(v))
669 .copied()
670 .collect();
671
672 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
673 bits_per_value,
674 data: LanceBuffer::from(bytes),
675 num_values: data.len() as u64,
676 block_info: BlockInfo::default(),
677 });
678
679 let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap();
680 let decompressor = RleDecompressor::new(bits_per_value);
681 let decompressed = MiniBlockDecompressor::decompress(
682 &decompressor,
683 compressed.data,
684 compressed.num_values,
685 )
686 .unwrap();
687
688 match decompressed {
689 DataBlock::FixedWidth(ref block) => {
690 assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
692 }
693 _ => panic!("Expected FixedWidth block"),
694 }
695 }
696
697 #[test]
700 fn test_power_of_two_chunking() {
701 let encoder = RleEncoder::new();
702
703 let test_sizes = vec![1000, 2500, 5000, 10000];
705
706 for size in test_sizes {
707 let data: Vec<i32> = (0..size)
708 .map(|i| i / 50) .collect();
710
711 let array = Int32Array::from(data);
712 let (compressed, _) =
713 MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
714
715 for (i, chunk) in compressed.chunks.iter().enumerate() {
717 if i < compressed.chunks.len() - 1 {
718 assert!(chunk.log_num_values > 0);
719 let chunk_values = 1u64 << chunk.log_num_values;
720 assert!(chunk_values.is_power_of_two());
721 assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
722 } else {
723 assert_eq!(chunk.log_num_values, 0);
724 }
725 }
726 }
727 }
728
729 #[test]
732 fn test_invalid_buffer_count() {
733 let decompressor = RleDecompressor::new(32);
734 let result = MiniBlockDecompressor::decompress(
735 &decompressor,
736 vec![LanceBuffer::from(vec![1, 2, 3, 4])],
737 10,
738 );
739 assert!(result.is_err());
740 assert!(
741 result
742 .unwrap_err()
743 .to_string()
744 .contains("expects exactly 2 buffers")
745 );
746 }
747
748 #[test]
749 fn test_buffer_consistency() {
750 let decompressor = RleDecompressor::new(32);
751 let values = LanceBuffer::from(vec![1, 0, 0, 0]); let lengths = LanceBuffer::from(vec![5, 10]); let result = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 15);
754 assert!(result.is_err());
755 assert!(
756 result
757 .unwrap_err()
758 .to_string()
759 .contains("Inconsistent RLE buffers")
760 );
761 }
762
763 #[test]
764 fn test_empty_data_handling() {
765 let encoder = RleEncoder::new();
766
767 let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
769 bits_per_value: 32,
770 data: LanceBuffer::from(vec![]),
771 num_values: 0,
772 block_info: BlockInfo::default(),
773 });
774
775 let (compressed, _) = MiniBlockCompressor::compress(&encoder, empty_block).unwrap();
776 assert_eq!(compressed.num_values, 0);
777 assert!(compressed.data.is_empty());
778
779 let decompressor = RleDecompressor::new(32);
781 let decompressed = MiniBlockDecompressor::decompress(&decompressor, vec![], 0).unwrap();
782
783 match decompressed {
784 DataBlock::FixedWidth(ref block) => {
785 assert_eq!(block.num_values, 0);
786 assert_eq!(block.data.len(), 0);
787 }
788 _ => panic!("Expected FixedWidth block"),
789 }
790 }
791
792 #[test]
795 fn test_multi_chunk_round_trip() {
796 let encoder = RleEncoder::new();
797
798 let mut data = Vec::new();
800
801 data.extend(vec![999i32; 2000]);
803 data.extend(0..1000);
805 data.extend(vec![777i32; 2000]);
807
808 let array = Int32Array::from(data.clone());
809 let (compressed, _) =
810 MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
811
812 let mut reconstructed = Vec::new();
814 let mut values_offset = 0usize;
815 let mut lengths_offset = 0usize;
816 let mut values_processed = 0u64;
817
818 assert_eq!(compressed.data.len(), 2);
820 let global_values = &compressed.data[0];
821 let global_lengths = &compressed.data[1];
822
823 for chunk in &compressed.chunks {
824 let chunk_values = if chunk.log_num_values > 0 {
825 1u64 << chunk.log_num_values
826 } else {
827 compressed.num_values - values_processed
828 };
829
830 let values_size = chunk.buffer_sizes[0] as usize;
832 let lengths_size = chunk.buffer_sizes[1] as usize;
833
834 let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
835 let chunk_lengths_buffer =
836 global_lengths.slice_with_length(lengths_offset, lengths_size);
837
838 let decompressor = RleDecompressor::new(32);
839 let chunk_data = MiniBlockDecompressor::decompress(
840 &decompressor,
841 vec![chunk_values_buffer, chunk_lengths_buffer],
842 chunk_values,
843 )
844 .unwrap();
845
846 values_offset += values_size;
847 lengths_offset += lengths_size;
848 values_processed += chunk_values;
849
850 match chunk_data {
851 DataBlock::FixedWidth(ref block) => {
852 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
853 reconstructed.extend_from_slice(values);
854 }
855 _ => panic!("Expected FixedWidth block"),
856 }
857 }
858
859 assert_eq!(reconstructed, data);
860 }
861
862 #[test]
863 fn test_1024_boundary_conditions() {
864 let encoder = RleEncoder::new();
867 let decompressor = RleDecompressor::new(32);
868
869 let test_cases = [
870 ("runs_of_2", {
871 let mut data = Vec::new();
872 for i in 0..512 {
873 data.push(i);
874 data.push(i);
875 }
876 data
877 }),
878 ("single_run_1024", vec![42i32; 1024]),
879 ("alternating_values", {
880 let mut data = Vec::new();
881 for i in 0..1024 {
882 data.push(i % 2);
883 }
884 data
885 }),
886 ("run_boundary_255s", {
887 let mut data = Vec::new();
888 data.extend(vec![1i32; 255]);
889 data.extend(vec![2i32; 255]);
890 data.extend(vec![3i32; 255]);
891 data.extend(vec![4i32; 255]);
892 data.extend(vec![5i32; 4]);
893 data
894 }),
895 ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
896 ("unique_plus_duplicate", {
897 let mut data = Vec::new();
899 for i in 0..1023 {
900 data.push(i);
901 }
902 data.push(1022i32); data
904 }),
905 ("bug_4092_pattern", {
906 let mut data = Vec::new();
908 for i in 0..1022 {
909 data.push(i);
910 }
911 data.push(999999i32);
912 data.push(999999i32);
913 data
914 }),
915 ];
916
917 for (test_name, data) in test_cases.iter() {
918 assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
919
920 let array = Int32Array::from(data.clone());
922 let (compressed, _) =
923 MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
924
925 match MiniBlockDecompressor::decompress(
927 &decompressor,
928 compressed.data,
929 compressed.num_values,
930 ) {
931 Ok(decompressed) => match decompressed {
932 DataBlock::FixedWidth(ref block) => {
933 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
934 assert_eq!(
935 values.len(),
936 1024,
937 "Test case {} got {} values, expected 1024",
938 test_name,
939 values.len()
940 );
941 assert_eq!(
942 block.data.len(),
943 4096,
944 "Test case {} got {} bytes, expected 4096",
945 test_name,
946 block.data.len()
947 );
948 assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
949 }
950 _ => panic!("Test case {} expected FixedWidth block", test_name),
951 },
952 Err(e) => {
953 if e.to_string().contains("4092") {
954 panic!("Test case {} found bug 4092: {}", test_name, e);
955 }
956 panic!("Test case {} failed with error: {}", test_name, e);
957 }
958 }
959 }
960 }
961
962 #[test]
963 fn test_low_repetition_50pct_bug() {
964 let encoder = RleEncoder::new();
967
968 let num_values = 1_048_576; let mut data = Vec::with_capacity(num_values);
971 let mut value = 0i32;
972 let mut rng = 12345u64; for _ in 0..num_values {
975 data.push(value);
976 rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
978 if (rng >> 16) & 1 == 1 {
980 value += 1;
981 }
982 }
983
984 let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
985
986 let block = DataBlock::FixedWidth(FixedWidthDataBlock {
987 bits_per_value: 32,
988 data: LanceBuffer::from(bytes),
989 num_values: num_values as u64,
990 block_info: BlockInfo::default(),
991 });
992
993 let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap();
994
995 for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
997 let _chunk_values = if chunk.log_num_values > 0 {
998 1 << chunk.log_num_values
999 } else {
1000 let prev_total: usize = compressed.chunks[..i]
1002 .iter()
1003 .map(|c| 1usize << c.log_num_values)
1004 .sum();
1005 num_values - prev_total
1006 };
1007 }
1008
1009 let decompressor = RleDecompressor::new(32);
1011 match MiniBlockDecompressor::decompress(
1012 &decompressor,
1013 compressed.data,
1014 compressed.num_values,
1015 ) {
1016 Ok(decompressed) => match decompressed {
1017 DataBlock::FixedWidth(ref block) => {
1018 assert_eq!(
1019 block.data.len(),
1020 num_values * 4,
1021 "Expected {} bytes but got {}",
1022 num_values * 4,
1023 block.data.len()
1024 );
1025 }
1026 _ => panic!("Expected FixedWidth block"),
1027 },
1028 Err(e) => {
1029 if e.to_string().contains("4092") {
1030 panic!("Bug reproduced! {}", e);
1031 } else {
1032 panic!("Unexpected error: {}", e);
1033 }
1034 }
1035 }
1036 }
1037
1038 #[test_log::test(tokio::test)]
1041 async fn test_rle_encoding_verification() {
1042 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
1043 use crate::version::LanceFileVersion;
1044 use arrow_array::{Array, Int32Array};
1045 use lance_datagen::{ArrayGenerator, RowCount};
1046 use std::collections::HashMap;
1047 use std::sync::Arc;
1048
1049 let test_cases = TestCases::default()
1050 .with_expected_encoding("rle")
1051 .with_min_file_version(LanceFileVersion::V2_1);
1052
1053 let mut metadata_explicit = HashMap::new();
1056 metadata_explicit.insert(
1057 "lance-encoding:rle-threshold".to_string(),
1058 "0.8".to_string(),
1059 );
1060 metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
1061
1062 let mut generator = RleDataGenerator::new(vec![
1063 i32::MIN,
1064 i32::MIN,
1065 i32::MIN,
1066 i32::MIN,
1067 i32::MIN + 1,
1068 i32::MIN + 1,
1069 i32::MIN + 1,
1070 i32::MIN + 1,
1071 i32::MIN + 2,
1072 i32::MIN + 2,
1073 i32::MIN + 2,
1074 i32::MIN + 2,
1075 ]);
1076 let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
1077 check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
1078 .await;
1079
1080 let mut metadata = HashMap::new();
1086 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
1087
1088 let mut values = vec![i32::MIN; 8000]; values.extend(
1090 [
1091 i32::MIN + 1,
1092 i32::MIN + 2,
1093 i32::MIN + 3,
1094 i32::MIN + 4,
1095 i32::MIN + 5,
1096 ]
1097 .repeat(400),
1098 ); let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
1100 check_round_trip_encoding_of_data(vec![arr], &test_cases, metadata).await;
1101 }
1102
1103 #[derive(Debug)]
1105 struct RleDataGenerator {
1106 pattern: Vec<i32>,
1107 idx: usize,
1108 }
1109
1110 impl RleDataGenerator {
1111 fn new(pattern: Vec<i32>) -> Self {
1112 Self { pattern, idx: 0 }
1113 }
1114 }
1115
1116 impl lance_datagen::ArrayGenerator for RleDataGenerator {
1117 fn generate(
1118 &mut self,
1119 _length: lance_datagen::RowCount,
1120 _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1121 ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1122 {
1123 use arrow_array::Int32Array;
1124 use std::sync::Arc;
1125
1126 let mut values = Vec::new();
1128 for _ in 0..10000 {
1129 values.push(self.pattern[self.idx]);
1130 self.idx = (self.idx + 1) % self.pattern.len();
1131 }
1132 Ok(Arc::new(Int32Array::from(values)))
1133 }
1134
1135 fn data_type(&self) -> &arrow_schema::DataType {
1136 &arrow_schema::DataType::Int32
1137 }
1138
1139 fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1140 Some(lance_datagen::ByteCount::from(4))
1141 }
1142 }
1143
1144 #[test]
1146 fn test_block_decompressor_rejects_overflowing_values_size() {
1147 let decompressor = RleDecompressor::new(32);
1148
1149 let mut data = Vec::new();
1150 data.extend_from_slice(&u64::MAX.to_le_bytes());
1151 let result = BlockDecompressor::decompress(&decompressor, LanceBuffer::from(data), 1);
1152 assert!(result.is_err());
1153 assert!(
1154 result
1155 .unwrap_err()
1156 .to_string()
1157 .contains("Invalid RLE values buffer size")
1158 );
1159 }
1160
1161 #[test]
1162 fn test_block_decompressor_too_small() {
1163 let decompressor = RleDecompressor::new(32);
1164 let result =
1165 BlockDecompressor::decompress(&decompressor, LanceBuffer::from(vec![1, 2, 3]), 10);
1166 assert!(result.is_err());
1167 assert!(
1168 result
1169 .unwrap_err()
1170 .to_string()
1171 .contains("Insufficient data size: 3")
1172 );
1173 }
1174
1175 #[test]
1176 fn test_block_compressor_header_format() {
1177 let encoder = RleEncoder::new();
1178
1179 let data = vec![1i32, 1, 1];
1180 let array = Int32Array::from(data);
1181 let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
1182
1183 assert!(compressed.len() >= 8);
1185 let values_size_bytes: [u8; 8] = compressed.as_ref()[..8].try_into().unwrap();
1186 let values_size = u64::from_le_bytes(values_size_bytes);
1187
1188 assert_eq!(values_size, 4);
1190
1191 assert_eq!(compressed.len(), 13);
1193 }
1194
1195 #[test]
1196 fn test_block_compressor_round_trip() {
1197 let encoder = RleEncoder::new();
1198 let decompressor = RleDecompressor::new(32);
1199
1200 let data = vec![1i32, 1, 1, 2, 2, 3, 3, 3, 3];
1202 let array = Int32Array::from(data.clone());
1203 let data_block = DataBlock::from_array(array);
1204
1205 let compressed = BlockCompressor::compress(&encoder, data_block).unwrap();
1206 let decompressed =
1207 BlockDecompressor::decompress(&decompressor, compressed, data.len() as u64).unwrap();
1208
1209 match decompressed {
1210 DataBlock::FixedWidth(block) => {
1211 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
1212 assert_eq!(values, &data[..]);
1213 }
1214 _ => panic!("Expected FixedWidth block"),
1215 }
1216 }
1217
1218 #[test]
1219 fn test_block_compressor_large_data() {
1220 let encoder = RleEncoder::new();
1221 let decompressor = RleDecompressor::new(32);
1222
1223 let mut data = Vec::new();
1226 data.extend(vec![999i32; 3000]); data.extend(vec![777i32; 3000]); data.extend(vec![555i32; 4000]); let total_values = data.len();
1231 assert_eq!(total_values, 10000);
1232
1233 let array = Int32Array::from(data.clone());
1234 let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
1235 let decompressed =
1236 BlockDecompressor::decompress(&decompressor, compressed, total_values as u64).unwrap();
1237
1238 match decompressed {
1239 DataBlock::FixedWidth(block) => {
1240 let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
1241 assert_eq!(values.len(), total_values);
1242 assert_eq!(values, &data[..]);
1243 }
1244 _ => panic!("Expected FixedWidth block"),
1245 }
1246 }
1247}