use arrayref::array_mut_ref;
use fastlanes::RLE;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::primitive::NativeValue;
use vortex_array::dtype::NativePType;
use vortex_array::match_each_native_ptype;
use vortex_array::validity::Validity;
use vortex_array::vtable::ValidityHelper;
use vortex_buffer::BitBufferMut;
use vortex_buffer::BufferMut;
use vortex_error::VortexResult;
use crate::FL_CHUNK_SIZE;
use crate::RLEArray;
use crate::fill_forward_nulls;
impl RLEArray {
pub fn encode(array: &PrimitiveArray) -> VortexResult<Self> {
match_each_native_ptype!(array.ptype(), |T| { rle_encode_typed::<T>(array) })
}
}
fn rle_encode_typed<T>(array: &PrimitiveArray) -> VortexResult<RLEArray>
where
T: NativePType + RLE,
NativeValue<T>: RLE,
{
let values = fill_forward_nulls(array.to_buffer::<T>(), array.validity());
let len = values.len();
let padded_len = len.next_multiple_of(FL_CHUNK_SIZE);
let mut values_buf = BufferMut::<NativeValue<T>>::with_capacity(padded_len);
let mut indices_buf = BufferMut::<u16>::with_capacity(padded_len);
let mut values_idx_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));
let values_uninit = values_buf.spare_capacity_mut();
let indices_uninit = indices_buf.spare_capacity_mut();
let mut value_count_acc = 0;
let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();
let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| {
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) };
let rle_vals: &mut [NativeValue<T>] =
unsafe { std::mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) };
let rle_idxs: &mut [u16] =
unsafe { std::mem::transmute(&mut indices_uninit[chunk_start_idx..][..FL_CHUNK_SIZE]) };
values_idx_offsets.push(value_count_acc as u64);
let value_count = NativeValue::<T>::encode(
input,
array_mut_ref![rle_vals, 0, FL_CHUNK_SIZE],
array_mut_ref![rle_idxs, 0, FL_CHUNK_SIZE],
);
value_count_acc += value_count;
};
for (chunk_idx, chunk_slice) in chunks.iter().enumerate() {
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice);
}
if !remainder.is_empty() {
let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE];
padded_chunk[..remainder.len()].copy_from_slice(remainder);
process_chunk((len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE, &padded_chunk);
}
unsafe {
values_buf.set_len(value_count_acc);
indices_buf.set_len(padded_len);
}
let values_buf = unsafe { values_buf.transmute::<T>().freeze() };
RLEArray::try_new(
values_buf.into_array(),
PrimitiveArray::new(indices_buf.freeze(), padded_validity(array)).into_array(),
values_idx_offsets.into_array(),
0,
array.len(),
)
}
fn padded_validity(array: &PrimitiveArray) -> Validity {
match array.validity() {
Validity::NonNullable => Validity::NonNullable,
Validity::AllValid => Validity::AllValid,
Validity::AllInvalid => Validity::AllInvalid,
Validity::Array(validity_array) => {
let len = array.len();
let padded_len = len.next_multiple_of(FL_CHUNK_SIZE);
if len == padded_len {
return Validity::Array(validity_array.clone());
}
let mut builder = BitBufferMut::with_capacity(padded_len);
let bool_array = validity_array.to_bool();
builder.append_buffer(&bool_array.to_bit_buffer());
builder.append_n(false, padded_len - len);
Validity::from(builder.freeze())
}
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::half::f16;
use vortex_buffer::Buffer;
use vortex_buffer::buffer;
use super::*;
#[test]
fn test_encode_decode() {
let array_u8: Buffer<u8> = buffer![1, 1, 2, 2, 3, 3];
let encoded_u8 =
RLEArray::encode(&PrimitiveArray::new(array_u8, Validity::NonNullable)).unwrap();
let decoded_u8 = encoded_u8.to_primitive();
let expected_u8 = PrimitiveArray::from_iter(vec![1u8, 1, 2, 2, 3, 3]);
assert_arrays_eq!(decoded_u8, expected_u8);
let array_u16: Buffer<u16> = buffer![100, 100, 200, 200];
let encoded_u16 =
RLEArray::encode(&PrimitiveArray::new(array_u16, Validity::NonNullable)).unwrap();
let decoded_u16 = encoded_u16.to_primitive();
let expected_u16 = PrimitiveArray::from_iter(vec![100u16, 100, 200, 200]);
assert_arrays_eq!(decoded_u16, expected_u16);
let array_u64: Buffer<u64> = buffer![1000, 1000, 2000];
let encoded_u64 =
RLEArray::encode(&PrimitiveArray::new(array_u64, Validity::NonNullable)).unwrap();
let decoded_u64 = encoded_u64.to_primitive();
let expected_u64 = PrimitiveArray::from_iter(vec![1000u64, 1000, 2000]);
assert_arrays_eq!(decoded_u64, expected_u64);
}
#[test]
fn test_length() {
let values: Buffer<u32> = buffer![1, 1, 2, 2, 2, 3];
let encoded =
RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap();
assert_eq!(encoded.len(), 6);
}
#[test]
fn test_empty_length() {
let values: Buffer<u32> = Buffer::empty();
let encoded =
RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap();
assert_eq!(encoded.len(), 0);
assert_eq!(encoded.values().len(), 0);
}
#[test]
fn test_single_value() {
let values: Buffer<u16> = vec![42; 2000].into_iter().collect();
let encoded =
RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap();
assert_eq!(encoded.values().len(), 2);
let decoded = encoded.to_primitive(); let expected = PrimitiveArray::from_iter(vec![42u16; 2000]);
assert_arrays_eq!(decoded, expected);
}
#[test]
fn test_all_different() {
let values: Buffer<u8> = (0u8..=255).collect();
let encoded =
RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap();
assert_eq!(encoded.values().len(), 256);
let decoded = encoded.to_primitive(); let expected = PrimitiveArray::from_iter((0u8..=255).collect::<Vec<_>>());
assert_arrays_eq!(decoded, expected);
}
#[test]
fn test_partial_last_chunk() {
let values: Buffer<u32> = (0..1500).map(|i| (i / 100) as u32).collect();
let array = PrimitiveArray::new(values, Validity::NonNullable);
let encoded = RLEArray::encode(&array).unwrap();
assert_eq!(encoded.len(), 1500);
assert_arrays_eq!(encoded, array);
assert_eq!(encoded.values_idx_offsets().len(), 2);
}
#[test]
fn test_two_full_chunks() {
let values: Buffer<u32> = (0..2048).map(|i| (i / 100) as u32).collect();
let array = PrimitiveArray::new(values, Validity::NonNullable);
let encoded = RLEArray::encode(&array).unwrap();
assert_eq!(encoded.len(), 2048);
assert_arrays_eq!(encoded, array);
assert_eq!(encoded.values_idx_offsets().len(), 2);
}
#[rstest]
#[case::u8((0u8..100).collect::<Buffer<u8>>())]
#[case::u16((0u16..2000).collect::<Buffer<u16>>())]
#[case::u32((0u32..2000).collect::<Buffer<u32>>())]
#[case::u64((0u64..2000).collect::<Buffer<u64>>())]
#[case::i8((-100i8..100).collect::<Buffer<i8>>())]
#[case::i16((-2000i16..2000).collect::<Buffer<i16>>())]
#[case::i32((-2000i32..2000).collect::<Buffer<i32>>())]
#[case::i64((-2000i64..2000).collect::<Buffer<i64>>())]
#[case::f16((-2000..2000).map(|i| f16::from_f32(i as f32)).collect::<Buffer<f16>>())]
#[case::f32((-2000..2000).map(|i| i as f32).collect::<Buffer<f32>>())]
#[case::f64((-2000..2000).map(|i| i as f64).collect::<Buffer<f64>>())]
fn test_roundtrip_primitive_types<T: NativePType>(#[case] values: Buffer<T>) {
let primitive = values.clone().into_array().to_primitive();
let result = RLEArray::encode(&primitive).unwrap();
let decoded = result.to_primitive();
let expected = PrimitiveArray::new(values, primitive.validity().clone());
assert_arrays_eq!(decoded, expected);
}
#[rstest]
#[case(vec![f16::ZERO, f16::NEG_ZERO])]
#[case(vec![0f32, -0f32])]
#[case(vec![0f64, -0f64])]
fn test_float_zeros<T: NativePType + RLE>(#[case] values: Vec<T>) {
let primitive = PrimitiveArray::from_iter(values);
let rle = RLEArray::encode(&primitive).unwrap();
let decoded = rle.to_primitive();
assert_arrays_eq!(primitive, decoded);
}
}