use std::mem;
use fastlanes::RLE as FastLanesRLE;
use vortex_array::ArrayView;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
use vortex_array::arrays::Primitive;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::bool::BoolArrayExt;
use vortex_array::arrays::primitive::NativeValue;
use vortex_array::dtype::NativePType;
use vortex_array::match_each_native_ptype;
use vortex_array::validity::Validity;
use vortex_buffer::BitBufferMut;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use crate::FL_CHUNK_SIZE;
use crate::RLE;
use crate::RLEArray;
use crate::RLEData;
use crate::fill_forward_nulls;
impl RLEData {
pub fn encode(array: ArrayView<'_, Primitive>) -> VortexResult<RLEArray> {
let array = array.into_owned();
match_each_native_ptype!(array.ptype(), |T| { rle_encode_typed::<T>(&array) })
}
}
fn rle_encode_typed<T>(array: &PrimitiveArray) -> VortexResult<RLEArray>
where
T: NativePType + FastLanesRLE,
NativeValue<T>: FastLanesRLE,
{
let values = fill_forward_nulls(array.to_buffer::<T>(), &array.validity()?);
let len = values.len();
let padded_len = len.next_multiple_of(FL_CHUNK_SIZE);
let mut values_buf = BufferMut::<NativeValue<T>>::with_capacity(padded_len);
let mut indices_buf = BufferMut::<u16>::with_capacity(padded_len);
let mut values_idx_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));
let values_uninit = values_buf.spare_capacity_mut();
let (indices_uninit, _) = indices_buf
.spare_capacity_mut()
.as_chunks_mut::<FL_CHUNK_SIZE>();
let mut value_count_acc = 0;
let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();
let mut process_chunk =
|input: &[T; FL_CHUNK_SIZE], rle_idxs: &mut [mem::MaybeUninit<u16>; FL_CHUNK_SIZE]| {
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { mem::transmute(input) };
let rle_idxs: &mut [u16; FL_CHUNK_SIZE] = unsafe { mem::transmute(rle_idxs) };
let rle_vals: &mut [NativeValue<T>] =
unsafe { mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) };
values_idx_offsets.push(value_count_acc as u64);
let value_count = NativeValue::<T>::encode(
input,
unsafe { &mut *(rle_vals.as_mut_ptr() as *mut [_; FL_CHUNK_SIZE]) },
rle_idxs,
);
value_count_acc += value_count;
};
for (chunk_slice, rle_idxs) in chunks.iter().zip(indices_uninit.iter_mut()) {
process_chunk(chunk_slice, rle_idxs);
}
if !remainder.is_empty() {
let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE];
padded_chunk[..remainder.len()].copy_from_slice(remainder);
let last_idx_chunk = &mut indices_uninit[chunks.len()];
process_chunk(&padded_chunk, last_idx_chunk);
}
unsafe {
values_buf.set_len(value_count_acc);
indices_buf.set_len(padded_len);
}
let values_buf = unsafe { values_buf.transmute::<T>().freeze() };
RLE::try_new(
values_buf.into_array(),
PrimitiveArray::new(indices_buf.freeze(), padded_validity(array)).into_array(),
values_idx_offsets.into_array(),
0,
array.len(),
)
}
fn padded_validity(array: &PrimitiveArray) -> Validity {
match array
.validity()
.vortex_expect("RLE validity should be derivable")
{
Validity::NonNullable => Validity::NonNullable,
Validity::AllValid => Validity::AllValid,
Validity::AllInvalid => Validity::AllInvalid,
Validity::Array(validity_array) => {
let len = array.len();
let padded_len = len.next_multiple_of(FL_CHUNK_SIZE);
if len == padded_len {
return Validity::Array(validity_array);
}
let mut builder = BitBufferMut::with_capacity(padded_len);
let bool_array = validity_array.to_bool();
builder.append_buffer(&bool_array.to_bit_buffer());
builder.append_n(false, padded_len - len);
Validity::from(builder.freeze())
}
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::MaskedArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::half::f16;
use vortex_buffer::Buffer;
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use super::*;
use crate::rle::array::RLEArrayExt;
#[test]
fn test_encode_decode() {
let array_u8: Buffer<u8> = buffer![1, 1, 2, 2, 3, 3];
let encoded_u8 =
RLEData::encode(PrimitiveArray::new(array_u8, Validity::NonNullable).as_view())
.unwrap();
let decoded_u8 = encoded_u8.as_array().to_primitive();
let expected_u8 = PrimitiveArray::from_iter(vec![1u8, 1, 2, 2, 3, 3]);
assert_arrays_eq!(decoded_u8, expected_u8);
let array_u16: Buffer<u16> = buffer![100, 100, 200, 200];
let encoded_u16 =
RLEData::encode(PrimitiveArray::new(array_u16, Validity::NonNullable).as_view())
.unwrap();
let decoded_u16 = encoded_u16.as_array().to_primitive();
let expected_u16 = PrimitiveArray::from_iter(vec![100u16, 100, 200, 200]);
assert_arrays_eq!(decoded_u16, expected_u16);
let array_u64: Buffer<u64> = buffer![1000, 1000, 2000];
let encoded_u64 =
RLEData::encode(PrimitiveArray::new(array_u64, Validity::NonNullable).as_view())
.unwrap();
let decoded_u64 = encoded_u64.as_array().to_primitive();
let expected_u64 = PrimitiveArray::from_iter(vec![1000u64, 1000, 2000]);
assert_arrays_eq!(decoded_u64, expected_u64);
}
#[test]
fn test_length() {
let values: Buffer<u32> = buffer![1, 1, 2, 2, 2, 3];
let encoded =
RLEData::encode(PrimitiveArray::new(values, Validity::NonNullable).as_view()).unwrap();
assert_eq!(encoded.len(), 6);
}
#[test]
fn test_empty_length() {
let values: Buffer<u32> = Buffer::empty();
let encoded =
RLEData::encode(PrimitiveArray::new(values, Validity::NonNullable).as_view()).unwrap();
assert_eq!(encoded.len(), 0);
assert_eq!(encoded.values().len(), 0);
}
#[test]
fn test_single_value() {
let values: Buffer<u16> = vec![42; 2000].into_iter().collect();
let encoded =
RLEData::encode(PrimitiveArray::new(values, Validity::NonNullable).as_view()).unwrap();
assert_eq!(encoded.values().len(), 2);
let decoded = encoded.as_array().to_primitive(); let expected = PrimitiveArray::from_iter(vec![42u16; 2000]);
assert_arrays_eq!(decoded, expected);
}
#[test]
fn test_all_different() {
let values: Buffer<u8> = (0u8..=255).collect();
let encoded =
RLEData::encode(PrimitiveArray::new(values, Validity::NonNullable).as_view()).unwrap();
assert_eq!(encoded.values().len(), 256);
let decoded = encoded.as_array().to_primitive(); let expected = PrimitiveArray::from_iter((0u8..=255).collect::<Vec<_>>());
assert_arrays_eq!(decoded, expected);
}
#[test]
fn test_partial_last_chunk() {
let values: Buffer<u32> = (0..1500).map(|i| (i / 100) as u32).collect();
let array = PrimitiveArray::new(values, Validity::NonNullable);
let encoded = RLEData::encode(array.as_view()).unwrap();
assert_eq!(encoded.len(), 1500);
assert_arrays_eq!(encoded, array);
assert_eq!(encoded.values_idx_offsets().len(), 2);
}
#[test]
fn test_two_full_chunks() {
let values: Buffer<u32> = (0..2048).map(|i| (i / 100) as u32).collect();
let array = PrimitiveArray::new(values, Validity::NonNullable);
let encoded = RLEData::encode(array.as_view()).unwrap();
assert_eq!(encoded.len(), 2048);
assert_arrays_eq!(encoded, array);
assert_eq!(encoded.values_idx_offsets().len(), 2);
}
#[rstest]
#[case::u8((0u8..100).collect::<Buffer<u8>>())]
#[case::u16((0u16..2000).collect::<Buffer<u16>>())]
#[case::u32((0u32..2000).collect::<Buffer<u32>>())]
#[case::u64((0u64..2000).collect::<Buffer<u64>>())]
#[case::i8((-100i8..100).collect::<Buffer<i8>>())]
#[case::i16((-2000i16..2000).collect::<Buffer<i16>>())]
#[case::i32((-2000i32..2000).collect::<Buffer<i32>>())]
#[case::i64((-2000i64..2000).collect::<Buffer<i64>>())]
#[case::f16((-2000..2000).map(|i| f16::from_f32(i as f32)).collect::<Buffer<f16>>())]
#[case::f32((-2000..2000).map(|i| i as f32).collect::<Buffer<f32>>())]
#[case::f64((-2000..2000).map(|i| i as f64).collect::<Buffer<f64>>())]
fn test_roundtrip_primitive_types<T: NativePType>(#[case] values: Buffer<T>) {
let primitive = values.clone().into_array().to_primitive();
let result = RLEData::encode(primitive.as_view()).unwrap();
let decoded = result.as_array().to_primitive();
let expected = PrimitiveArray::new(
values,
primitive
.validity()
.vortex_expect("primitive validity should be derivable"),
);
assert_arrays_eq!(decoded, expected);
}
fn with_masked_constant_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
let indices_prim = rle.indices().to_primitive();
let masked_indices = MaskedArray::try_new(
ConstantArray::new(1u16, indices_prim.len()).into_array(),
indices_prim.validity()?,
)?
.into_array();
RLE::try_new(
rle.values().clone(),
masked_indices,
rle.values_idx_offsets().clone(),
rle.offset(),
rle.len(),
)
}
#[test]
fn test_encode_all_null_chunk() -> VortexResult<()> {
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}
#[test]
fn test_encode_all_null_chunk_then_value_chunk() -> VortexResult<()> {
let mut values: Vec<Option<u32>> = vec![None; 2 * FL_CHUNK_SIZE];
values[FL_CHUNK_SIZE + 100] = Some(42);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}
#[test]
fn test_encode_one_value_near_end() -> VortexResult<()> {
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
values[1000] = Some(42);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}
#[test]
fn test_encode_value_chunk_then_all_null_remainder() -> VortexResult<()> {
const NEG1_POSITIONS: &[usize] = &[
273, 276, 277, 278, 279, 281, 282, 284, 285, 286, 287, 288, 289, 291, 292, 293, 296,
298, 299, 302, 304, 308, 310, 311, 313, 314, 315, 317, 318, 322, 324, 325, 334, 335,
336, 337, 338, 339, 340, 341, 342, 343, 344, 346, 347, 348, 350, 352, 353, 355, 358,
359, 362, 363, 364, 366,
];
let mut values: Vec<Option<i16>> = vec![None; 1085];
for &pos in NEG1_POSITIONS {
values[pos] = Some(-1);
}
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}
fn with_random_invalid_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
let indices_prim = rle.indices().to_primitive();
let mut indices_data: Vec<u16> = indices_prim.as_slice::<u16>().to_vec();
let mut rng_state: u32 = 0xDEAD_BEEF;
let validity = indices_prim.validity()?;
for (i, idx) in indices_data.iter_mut().enumerate() {
if !validity.is_valid(i).unwrap_or(true) {
rng_state ^= rng_state << 13;
rng_state ^= rng_state >> 17;
rng_state ^= rng_state << 5;
*idx = rng_state as u16;
}
}
let clobbered_indices =
PrimitiveArray::new(Buffer::from(indices_data), indices_prim.validity()?).into_array();
RLE::try_new(
rle.values().clone(),
clobbered_indices,
rle.values_idx_offsets().clone(),
rle.offset(),
rle.len(),
)
}
#[test]
fn test_random_invalid_indices_all_null_chunk() -> VortexResult<()> {
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}
#[test]
fn test_random_invalid_indices_sparse_values() -> VortexResult<()> {
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
values[0] = Some(10);
values[500] = Some(20);
values[1000] = Some(30);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}
#[test]
fn test_random_invalid_indices_multi_chunk() -> VortexResult<()> {
let mut values: Vec<Option<i16>> = vec![None; 2 * FL_CHUNK_SIZE];
values[0] = Some(10);
values[500] = Some(20);
values[FL_CHUNK_SIZE + 100] = Some(42);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}
#[test]
fn test_random_invalid_indices_partial_last_chunk() -> VortexResult<()> {
let mut values: Vec<Option<u32>> = vec![None; 1085];
for i in (100..200).step_by(7) {
values[i] = Some(i as u32);
}
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}
#[test]
fn test_random_invalid_indices_mostly_valid() -> VortexResult<()> {
let mut values: Vec<Option<u64>> =
(0..FL_CHUNK_SIZE).map(|i| Some((i / 100) as u64)).collect();
for i in (0..FL_CHUNK_SIZE).step_by(37) {
values[i] = None;
}
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(original.as_view())?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}
#[rstest]
#[case(vec![f16::ZERO, f16::NEG_ZERO])]
#[case(vec![0f32, -0f32])]
#[case(vec![0f64, -0f64])]
fn test_float_zeros<T: NativePType + fastlanes::RLE>(#[case] values: Vec<T>) {
let primitive = PrimitiveArray::from_iter(values);
let rle = RLEData::encode(primitive.as_view()).unwrap();
let decoded = rle.as_array().to_primitive();
assert_arrays_eq!(primitive, decoded);
}
}