use arrow_buffer::ArrowNativeType;
use log::trace;
use crate::buffer::LanceBuffer;
use crate::compression::{BlockCompressor, BlockDecompressor, MiniBlockDecompressor};
use crate::data::DataBlock;
use crate::data::{BlockInfo, FixedWidthDataBlock};
use crate::encodings::logical::primitive::miniblock::{
MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES, MiniBlockChunk, MiniBlockCompressed,
MiniBlockCompressor,
};
use crate::format::ProtobufUtils21;
use crate::format::pb21::CompressiveEncoding;
use lance_core::{Error, Result};
#[derive(Debug, Default)]
pub struct RleEncoder;
impl RleEncoder {
pub fn new() -> Self {
Self
}
fn encode_data(
&self,
data: &LanceBuffer,
num_values: u64,
bits_per_value: u64,
) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
if num_values == 0 {
return Ok((Vec::new(), Vec::new()));
}
let bytes_per_value = (bits_per_value / 8) as usize;
let estimated_runs = num_values as usize / 10;
let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
let mut all_lengths = Vec::with_capacity(estimated_runs);
let mut chunks = Vec::new();
let mut offset = 0usize;
let mut values_remaining = num_values as usize;
while values_remaining > 0 {
let values_start = all_values.len();
let lengths_start = all_lengths.len();
let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
8 => self.encode_chunk_rolling::<u8>(
data,
offset,
values_remaining,
&mut all_values,
&mut all_lengths,
),
16 => self.encode_chunk_rolling::<u16>(
data,
offset,
values_remaining,
&mut all_values,
&mut all_lengths,
),
32 => self.encode_chunk_rolling::<u32>(
data,
offset,
values_remaining,
&mut all_values,
&mut all_lengths,
),
64 => self.encode_chunk_rolling::<u64>(
data,
offset,
values_remaining,
&mut all_values,
&mut all_lengths,
),
_ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
};
if values_processed == 0 {
break;
}
let log_num_values = if is_last_chunk {
0
} else {
assert!(
values_processed.is_power_of_two(),
"Non-last chunk must have power-of-2 values"
);
values_processed.ilog2() as u8
};
let values_size = all_values.len() - values_start;
let lengths_size = all_lengths.len() - lengths_start;
let chunk = MiniBlockChunk {
buffer_sizes: vec![values_size as u32, lengths_size as u32],
log_num_values,
};
chunks.push(chunk);
offset += values_processed;
values_remaining -= values_processed;
}
Ok((
vec![
LanceBuffer::from(all_values),
LanceBuffer::from(all_lengths),
],
chunks,
))
}
fn encode_chunk_rolling<T>(
&self,
data: &LanceBuffer,
offset: usize,
values_remaining: usize,
all_values: &mut Vec<u8>,
all_lengths: &mut Vec<u8>,
) -> (usize, usize, bool)
where
T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType,
{
let type_size = std::mem::size_of::<T>();
let chunk_start = offset * type_size;
let max_by_count = MAX_MINIBLOCK_VALUES as usize;
let max_values = values_remaining.min(max_by_count);
let chunk_end = chunk_start + max_values * type_size;
if chunk_start >= data.len() {
return (0, 0, false);
}
let chunk_len = chunk_end.min(data.len()) - chunk_start;
let chunk_buffer = data.slice_with_length(chunk_start, chunk_len);
let typed_data_ref = chunk_buffer.borrow_to_typed_slice::<T>();
let typed_data: &[T] = typed_data_ref.as_ref();
if typed_data.is_empty() {
return (0, 0, false);
}
let values_start = all_values.len();
let mut current_value = typed_data[0];
let mut current_length = 1u64;
let mut bytes_used = 0usize;
let mut total_values_encoded = 0usize;
let min_checkpoint_log2 = match type_size {
1 => 8, 2 => 7, _ => 6, };
let max_checkpoint_log2 = (values_remaining.min(MAX_MINIBLOCK_VALUES as usize))
.next_power_of_two()
.ilog2();
let mut checkpoint_log2 = min_checkpoint_log2;
let mut last_checkpoint_state = None;
for &value in typed_data[1..].iter() {
if value == current_value {
current_length += 1;
} else {
let run_chunks = current_length.div_ceil(255) as usize;
let bytes_needed = run_chunks * (type_size + 1);
if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
all_values.truncate(val_pos);
all_lengths.truncate(len_pos);
let num_runs = (val_pos - values_start) / type_size;
return (num_runs, checkpoint_values, false);
}
break;
}
bytes_used += self.add_run(¤t_value, current_length, all_values, all_lengths);
total_values_encoded += current_length as usize;
current_value = value;
current_length = 1;
}
while checkpoint_log2 <= max_checkpoint_log2 {
let checkpoint_values = 1usize << checkpoint_log2;
if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values
{
break;
}
last_checkpoint_state = Some((
all_values.len(),
all_lengths.len(),
bytes_used,
checkpoint_values,
));
checkpoint_log2 += 1;
}
}
if current_length > 0 {
let run_chunks = current_length.div_ceil(255) as usize;
let bytes_needed = run_chunks * (type_size + 1);
if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
let _ = self.add_run(¤t_value, current_length, all_values, all_lengths);
total_values_encoded += current_length as usize;
}
}
let is_last_chunk = total_values_encoded == values_remaining;
if !is_last_chunk {
if total_values_encoded.is_power_of_two() {
} else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
all_values.truncate(val_pos);
all_lengths.truncate(len_pos);
let num_runs = (val_pos - values_start) / type_size;
return (num_runs, checkpoint_values, false);
} else {
return (0, 0, false);
}
}
let num_runs = (all_values.len() - values_start) / type_size;
(num_runs, total_values_encoded, is_last_chunk)
}
fn add_run<T>(
&self,
value: &T,
length: u64,
all_values: &mut Vec<u8>,
all_lengths: &mut Vec<u8>,
) -> usize
where
T: bytemuck::Pod,
{
let value_bytes = bytemuck::bytes_of(value);
let type_size = std::mem::size_of::<T>();
let num_full_chunks = (length / 255) as usize;
let remainder = (length % 255) as u8;
let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
all_values.reserve(total_chunks * type_size);
all_lengths.reserve(total_chunks);
for _ in 0..num_full_chunks {
all_values.extend_from_slice(value_bytes);
all_lengths.push(255);
}
if remainder > 0 {
all_values.extend_from_slice(value_bytes);
all_lengths.push(remainder);
}
total_chunks * (type_size + 1)
}
}
impl MiniBlockCompressor for RleEncoder {
fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
match data {
DataBlock::FixedWidth(fixed_width) => {
let num_values = fixed_width.num_values;
let bits_per_value = fixed_width.bits_per_value;
let (all_buffers, chunks) =
self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
let compressed = MiniBlockCompressed {
data: all_buffers,
chunks,
num_values,
};
let encoding = ProtobufUtils21::rle(
ProtobufUtils21::flat(bits_per_value, None),
ProtobufUtils21::flat( 8, None),
);
Ok((compressed, encoding))
}
_ => Err(Error::invalid_input_source(
"RLE encoding only supports FixedWidth data blocks".into(),
)),
}
}
}
impl BlockCompressor for RleEncoder {
fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
match data {
DataBlock::FixedWidth(fixed_width) => {
let num_values = fixed_width.num_values;
let bits_per_value = fixed_width.bits_per_value;
let (all_buffers, _) =
self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
let values_size = all_buffers[0].len() as u64;
let mut combined = Vec::new();
combined.extend_from_slice(&values_size.to_le_bytes());
combined.extend_from_slice(&all_buffers[0]);
combined.extend_from_slice(&all_buffers[1]);
Ok(LanceBuffer::from(combined))
}
_ => Err(Error::invalid_input_source(
"RLE encoding only supports FixedWidth data blocks".into(),
)),
}
}
}
#[derive(Debug)]
pub struct RleDecompressor {
bits_per_value: u64,
}
impl RleDecompressor {
pub fn new(bits_per_value: u64) -> Self {
Self { bits_per_value }
}
fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
if num_values == 0 {
return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: self.bits_per_value,
data: LanceBuffer::from(vec![]),
num_values: 0,
block_info: BlockInfo::default(),
}));
}
if data.len() != 2 {
return Err(Error::invalid_input_source(
format!(
"RLE decompressor expects exactly 2 buffers, got {}",
data.len()
)
.into(),
));
}
let values_buffer = &data[0];
let lengths_buffer = &data[1];
let decoded_data = match self.bits_per_value {
8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
_ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
};
Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: self.bits_per_value,
data: decoded_data,
num_values,
block_info: BlockInfo::default(),
}))
}
fn decode_generic<T>(
&self,
values_buffer: &LanceBuffer,
lengths_buffer: &LanceBuffer,
num_values: u64,
) -> Result<LanceBuffer>
where
T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
{
let type_size = std::mem::size_of::<T>();
if values_buffer.is_empty() || lengths_buffer.is_empty() {
if num_values == 0 {
return Ok(LanceBuffer::empty());
} else {
return Err(Error::invalid_input_source(
format!("Empty buffers but expected {} values", num_values).into(),
));
}
}
if !values_buffer.len().is_multiple_of(type_size) || lengths_buffer.is_empty() {
return Err(Error::invalid_input_source(format!(
"Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
std::any::type_name::<T>(),
values_buffer.len(),
type_size,
lengths_buffer.len()
)
.into()));
}
let num_runs = values_buffer.len() / type_size;
let num_length_entries = lengths_buffer.len();
if num_runs != num_length_entries {
return Err(Error::invalid_input_source(
format!(
"Inconsistent RLE buffers: {} runs but {} length entries",
num_runs, num_length_entries
)
.into(),
));
}
let values_ref = values_buffer.borrow_to_typed_slice::<T>();
let values: &[T] = values_ref.as_ref();
let lengths: &[u8] = lengths_buffer.as_ref();
let expected_value_count = num_values as usize;
let mut decoded: Vec<T> = Vec::with_capacity(expected_value_count);
for (value, &length) in values.iter().zip(lengths.iter()) {
if decoded.len() == expected_value_count {
break;
}
if length == 0 {
return Err(Error::invalid_input_source(
"RLE decoding encountered a zero run length".into(),
));
}
let remaining = expected_value_count - decoded.len();
let write_len = (length as usize).min(remaining);
decoded.resize(decoded.len() + write_len, *value);
}
if decoded.len() != expected_value_count {
return Err(Error::invalid_input_source(
format!(
"RLE decoding produced {} values, expected {}",
decoded.len(),
expected_value_count
)
.into(),
));
}
trace!(
"RLE decoded {} {} values",
num_values,
std::any::type_name::<T>()
);
Ok(LanceBuffer::reinterpret_vec(decoded))
}
}
impl MiniBlockDecompressor for RleDecompressor {
fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
self.decode_data(data, num_values)
}
}
impl BlockDecompressor for RleDecompressor {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
if data.len() < 8 {
return Err(Error::invalid_input_source(
format!("Insufficient data size: {}", data.len()).into(),
));
}
let values_size_bytes: [u8; 8] =
data[..8].try_into().expect("slice length already checked");
let values_size: u64 = u64::from_le_bytes(values_size_bytes);
let values_start: usize = 8;
let values_size: usize = values_size.try_into().map_err(|_| {
Error::invalid_input_source(
format!("Invalid values buffer size: {}", values_size).into(),
)
})?;
let lengths_start = values_start
.checked_add(values_size)
.ok_or_else(|| Error::invalid_input_source("Invalid RLE values buffer size".into()))?;
if data.len() < lengths_start {
return Err(Error::invalid_input_source(
format!("Insufficient data size: {}", data.len()).into(),
));
}
let values_buffer = data.slice_with_length(values_start, values_size);
let lengths_buffer = data.slice_with_length(lengths_start, data.len() - lengths_start);
self.decode_data(vec![values_buffer, lengths_buffer], num_values)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::DataBlock;
use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES;
use crate::{buffer::LanceBuffer, compression::BlockDecompressor};
use arrow_array::Int32Array;
#[test]
fn test_basic_miniblock_rle_encoding() {
let encoder = RleEncoder::new();
let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
let data_block = DataBlock::from_array(array);
let (compressed, _) = MiniBlockCompressor::compress(&encoder, data_block).unwrap();
assert_eq!(compressed.num_values, 9);
assert_eq!(compressed.chunks.len(), 1);
let values_buffer = &compressed.data[0];
let lengths_buffer = &compressed.data[1];
assert_eq!(values_buffer.len(), 12); assert_eq!(lengths_buffer.len(), 3); }
#[test]
fn test_long_run_splitting() {
let encoder = RleEncoder::new();
let mut data = vec![42i32; 1000]; data.extend(&[100i32; 300]);
let array = Int32Array::from(data);
let (compressed, _) =
MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
let lengths_buffer = &compressed.data[1];
assert_eq!(lengths_buffer.len(), 6);
}
#[test]
fn test_round_trip_all_types() {
test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
}
fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
where
T: bytemuck::Pod + PartialEq + std::fmt::Debug,
{
let encoder = RleEncoder::new();
let bytes: Vec<u8> = data
.iter()
.flat_map(|v| bytemuck::bytes_of(v))
.copied()
.collect();
let block = DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value,
data: LanceBuffer::from(bytes),
num_values: data.len() as u64,
block_info: BlockInfo::default(),
});
let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap();
let decompressor = RleDecompressor::new(bits_per_value);
let decompressed = MiniBlockDecompressor::decompress(
&decompressor,
compressed.data,
compressed.num_values,
)
.unwrap();
match decompressed {
DataBlock::FixedWidth(ref block) => {
assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
}
_ => panic!("Expected FixedWidth block"),
}
}
#[test]
fn test_power_of_two_chunking() {
let encoder = RleEncoder::new();
let test_sizes = vec![1000, 2500, 5000, 10000];
for size in test_sizes {
let data: Vec<i32> = (0..size)
.map(|i| i / 50) .collect();
let array = Int32Array::from(data);
let (compressed, _) =
MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
for (i, chunk) in compressed.chunks.iter().enumerate() {
if i < compressed.chunks.len() - 1 {
assert!(chunk.log_num_values > 0);
let chunk_values = 1u64 << chunk.log_num_values;
assert!(chunk_values.is_power_of_two());
assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
} else {
assert_eq!(chunk.log_num_values, 0);
}
}
}
}
#[test]
fn test_invalid_buffer_count() {
let decompressor = RleDecompressor::new(32);
let result = MiniBlockDecompressor::decompress(
&decompressor,
vec![LanceBuffer::from(vec![1, 2, 3, 4])],
10,
);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("expects exactly 2 buffers")
);
}
#[test]
fn test_buffer_consistency() {
let decompressor = RleDecompressor::new(32);
let values = LanceBuffer::from(vec![1, 0, 0, 0]); let lengths = LanceBuffer::from(vec![5, 10]); let result = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 15);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Inconsistent RLE buffers")
);
}
#[test]
fn test_empty_data_handling() {
let encoder = RleEncoder::new();
let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: 32,
data: LanceBuffer::from(vec![]),
num_values: 0,
block_info: BlockInfo::default(),
});
let (compressed, _) = MiniBlockCompressor::compress(&encoder, empty_block).unwrap();
assert_eq!(compressed.num_values, 0);
assert!(compressed.data.is_empty());
let decompressor = RleDecompressor::new(32);
let decompressed = MiniBlockDecompressor::decompress(&decompressor, vec![], 0).unwrap();
match decompressed {
DataBlock::FixedWidth(ref block) => {
assert_eq!(block.num_values, 0);
assert_eq!(block.data.len(), 0);
}
_ => panic!("Expected FixedWidth block"),
}
}
#[test]
fn test_multi_chunk_round_trip() {
let encoder = RleEncoder::new();
let mut data = Vec::new();
data.extend(vec![999i32; 2000]);
data.extend(0..1000);
data.extend(vec![777i32; 2000]);
let array = Int32Array::from(data.clone());
let (compressed, _) =
MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
let mut reconstructed = Vec::new();
let mut values_offset = 0usize;
let mut lengths_offset = 0usize;
let mut values_processed = 0u64;
assert_eq!(compressed.data.len(), 2);
let global_values = &compressed.data[0];
let global_lengths = &compressed.data[1];
for chunk in &compressed.chunks {
let chunk_values = if chunk.log_num_values > 0 {
1u64 << chunk.log_num_values
} else {
compressed.num_values - values_processed
};
let values_size = chunk.buffer_sizes[0] as usize;
let lengths_size = chunk.buffer_sizes[1] as usize;
let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
let chunk_lengths_buffer =
global_lengths.slice_with_length(lengths_offset, lengths_size);
let decompressor = RleDecompressor::new(32);
let chunk_data = MiniBlockDecompressor::decompress(
&decompressor,
vec![chunk_values_buffer, chunk_lengths_buffer],
chunk_values,
)
.unwrap();
values_offset += values_size;
lengths_offset += lengths_size;
values_processed += chunk_values;
match chunk_data {
DataBlock::FixedWidth(ref block) => {
let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
reconstructed.extend_from_slice(values);
}
_ => panic!("Expected FixedWidth block"),
}
}
assert_eq!(reconstructed, data);
}
#[test]
fn test_1024_boundary_conditions() {
let encoder = RleEncoder::new();
let decompressor = RleDecompressor::new(32);
let test_cases = [
("runs_of_2", {
let mut data = Vec::new();
for i in 0..512 {
data.push(i);
data.push(i);
}
data
}),
("single_run_1024", vec![42i32; 1024]),
("alternating_values", {
let mut data = Vec::new();
for i in 0..1024 {
data.push(i % 2);
}
data
}),
("run_boundary_255s", {
let mut data = Vec::new();
data.extend(vec![1i32; 255]);
data.extend(vec![2i32; 255]);
data.extend(vec![3i32; 255]);
data.extend(vec![4i32; 255]);
data.extend(vec![5i32; 4]);
data
}),
("unique_values_1024", (0..1024).collect::<Vec<_>>()),
("unique_plus_duplicate", {
let mut data = Vec::new();
for i in 0..1023 {
data.push(i);
}
data.push(1022i32); data
}),
("bug_4092_pattern", {
let mut data = Vec::new();
for i in 0..1022 {
data.push(i);
}
data.push(999999i32);
data.push(999999i32);
data
}),
];
for (test_name, data) in test_cases.iter() {
assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
let array = Int32Array::from(data.clone());
let (compressed, _) =
MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
match MiniBlockDecompressor::decompress(
&decompressor,
compressed.data,
compressed.num_values,
) {
Ok(decompressed) => match decompressed {
DataBlock::FixedWidth(ref block) => {
let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
assert_eq!(
values.len(),
1024,
"Test case {} got {} values, expected 1024",
test_name,
values.len()
);
assert_eq!(
block.data.len(),
4096,
"Test case {} got {} bytes, expected 4096",
test_name,
block.data.len()
);
assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
}
_ => panic!("Test case {} expected FixedWidth block", test_name),
},
Err(e) => {
if e.to_string().contains("4092") {
panic!("Test case {} found bug 4092: {}", test_name, e);
}
panic!("Test case {} failed with error: {}", test_name, e);
}
}
}
}
#[test]
fn test_low_repetition_50pct_bug() {
let encoder = RleEncoder::new();
let num_values = 1_048_576; let mut data = Vec::with_capacity(num_values);
let mut value = 0i32;
let mut rng = 12345u64;
for _ in 0..num_values {
data.push(value);
rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
if (rng >> 16) & 1 == 1 {
value += 1;
}
}
let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
let block = DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: 32,
data: LanceBuffer::from(bytes),
num_values: num_values as u64,
block_info: BlockInfo::default(),
});
let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap();
for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
let _chunk_values = if chunk.log_num_values > 0 {
1 << chunk.log_num_values
} else {
let prev_total: usize = compressed.chunks[..i]
.iter()
.map(|c| 1usize << c.log_num_values)
.sum();
num_values - prev_total
};
}
let decompressor = RleDecompressor::new(32);
match MiniBlockDecompressor::decompress(
&decompressor,
compressed.data,
compressed.num_values,
) {
Ok(decompressed) => match decompressed {
DataBlock::FixedWidth(ref block) => {
assert_eq!(
block.data.len(),
num_values * 4,
"Expected {} bytes but got {}",
num_values * 4,
block.data.len()
);
}
_ => panic!("Expected FixedWidth block"),
},
Err(e) => {
if e.to_string().contains("4092") {
panic!("Bug reproduced! {}", e);
} else {
panic!("Unexpected error: {}", e);
}
}
}
}
#[test_log::test(tokio::test)]
async fn test_rle_encoding_verification() {
use crate::testing::{TestCases, check_round_trip_encoding_of_data};
use crate::version::LanceFileVersion;
use arrow_array::{Array, Int32Array};
use lance_datagen::{ArrayGenerator, RowCount};
use std::collections::HashMap;
use std::sync::Arc;
let test_cases = TestCases::default()
.with_expected_encoding("rle")
.with_min_file_version(LanceFileVersion::V2_1);
let mut metadata_explicit = HashMap::new();
metadata_explicit.insert(
"lance-encoding:rle-threshold".to_string(),
"0.8".to_string(),
);
metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
let mut generator = RleDataGenerator::new(vec![
i32::MIN,
i32::MIN,
i32::MIN,
i32::MIN,
i32::MIN + 1,
i32::MIN + 1,
i32::MIN + 1,
i32::MIN + 1,
i32::MIN + 2,
i32::MIN + 2,
i32::MIN + 2,
i32::MIN + 2,
]);
let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
.await;
let mut metadata = HashMap::new();
metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
let mut values = vec![i32::MIN; 8000]; values.extend(
[
i32::MIN + 1,
i32::MIN + 2,
i32::MIN + 3,
i32::MIN + 4,
i32::MIN + 5,
]
.repeat(400),
); let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
check_round_trip_encoding_of_data(vec![arr], &test_cases, metadata).await;
}
#[derive(Debug)]
struct RleDataGenerator {
pattern: Vec<i32>,
idx: usize,
}
impl RleDataGenerator {
fn new(pattern: Vec<i32>) -> Self {
Self { pattern, idx: 0 }
}
}
impl lance_datagen::ArrayGenerator for RleDataGenerator {
fn generate(
&mut self,
_length: lance_datagen::RowCount,
_rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
{
use arrow_array::Int32Array;
use std::sync::Arc;
let mut values = Vec::new();
for _ in 0..10000 {
values.push(self.pattern[self.idx]);
self.idx = (self.idx + 1) % self.pattern.len();
}
Ok(Arc::new(Int32Array::from(values)))
}
fn data_type(&self) -> &arrow_schema::DataType {
&arrow_schema::DataType::Int32
}
fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
Some(lance_datagen::ByteCount::from(4))
}
}
#[test]
fn test_block_decompressor_rejects_overflowing_values_size() {
let decompressor = RleDecompressor::new(32);
let mut data = Vec::new();
data.extend_from_slice(&u64::MAX.to_le_bytes());
let result = BlockDecompressor::decompress(&decompressor, LanceBuffer::from(data), 1);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Invalid RLE values buffer size")
);
}
#[test]
fn test_block_decompressor_too_small() {
let decompressor = RleDecompressor::new(32);
let result =
BlockDecompressor::decompress(&decompressor, LanceBuffer::from(vec![1, 2, 3]), 10);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Insufficient data size: 3")
);
}
#[test]
fn test_block_compressor_header_format() {
let encoder = RleEncoder::new();
let data = vec![1i32, 1, 1];
let array = Int32Array::from(data);
let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
assert!(compressed.len() >= 8);
let values_size_bytes: [u8; 8] = compressed.as_ref()[..8].try_into().unwrap();
let values_size = u64::from_le_bytes(values_size_bytes);
assert_eq!(values_size, 4);
assert_eq!(compressed.len(), 13);
}
#[test]
fn test_block_compressor_round_trip() {
let encoder = RleEncoder::new();
let decompressor = RleDecompressor::new(32);
let data = vec![1i32, 1, 1, 2, 2, 3, 3, 3, 3];
let array = Int32Array::from(data.clone());
let data_block = DataBlock::from_array(array);
let compressed = BlockCompressor::compress(&encoder, data_block).unwrap();
let decompressed =
BlockDecompressor::decompress(&decompressor, compressed, data.len() as u64).unwrap();
match decompressed {
DataBlock::FixedWidth(block) => {
let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
assert_eq!(values, &data[..]);
}
_ => panic!("Expected FixedWidth block"),
}
}
#[test]
fn test_block_compressor_large_data() {
let encoder = RleEncoder::new();
let decompressor = RleDecompressor::new(32);
let mut data = Vec::new();
data.extend(vec![999i32; 3000]); data.extend(vec![777i32; 3000]); data.extend(vec![555i32; 4000]);
let total_values = data.len();
assert_eq!(total_values, 10000);
let array = Int32Array::from(data.clone());
let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
let decompressed =
BlockDecompressor::decompress(&decompressor, compressed, total_values as u64).unwrap();
match decompressed {
DataBlock::FixedWidth(block) => {
let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
assert_eq!(values.len(), total_values);
assert_eq!(values, &data[..]);
}
_ => panic!("Expected FixedWidth block"),
}
}
}