vortex-fastlanes 0.74.0

Vortex fastlanes arrays
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use fastlanes::RLE;
use num_traits::AsPrimitive;
use num_traits::NumCast;
use vortex_array::ExecutionCtx;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::dtype::NativePType;
use vortex_array::match_each_native_ptype;
use vortex_array::match_each_unsigned_integer_ptype;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_panic;

use crate::FL_CHUNK_SIZE;
use crate::RLEArray;
use crate::rle::RLEArrayExt;

/// Decompresses an RLE array back into a primitive array.
pub fn rle_decompress(array: &RLEArray, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
    // The per-chunk value-index offsets are tiny (one entry per 1024-element chunk), so cast them
    // to `u64` once here instead of monomorphizing the whole decode loop over the offset width.
    let values_idx_offsets = array
        .values_idx_offsets()
        .clone()
        .execute::<PrimitiveArray>(ctx)?;
    let values_idx_offsets: Vec<u64> =
        match_each_unsigned_integer_ptype!(values_idx_offsets.ptype(), |O| {
            values_idx_offsets
                .as_slice::<O>()
                .iter()
                .map(|&o| o.as_())
                .collect()
        });

    match_each_native_ptype!(array.values().dtype().as_ptype(), |V| {
        // RLE indices are always u16 (or u8 if downcasted).
        match array.indices().dtype().as_ptype() {
            PType::U8 => rle_decode_typed::<V, u8>(array, &values_idx_offsets, ctx),
            PType::U16 => rle_decode_typed::<V, u16>(array, &values_idx_offsets, ctx),
            _ => vortex_panic!(
                "Unsupported index type for RLE decoding: {}",
                array.indices().dtype().as_ptype()
            ),
        }
    })
}

/// Decompresses an `RLEArray` into to a primitive array of unsigned integers.
fn rle_decode_typed<V, I>(
    array: &RLEArray,
    values_idx_offsets: &[u64],
    ctx: &mut ExecutionCtx,
) -> VortexResult<PrimitiveArray>
where
    V: NativePType + RLE + Clone + Copy,
    I: NativePType + Into<usize>,
{
    let values = array.values().clone().execute::<PrimitiveArray>(ctx)?;
    let values = values.as_slice::<V>();

    let indices = array.indices().clone().execute::<PrimitiveArray>(ctx)?;
    assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE));
    let has_invalid = !indices.all_valid(ctx)?;
    let (indices_sl, _) = indices.as_slice::<I>().as_chunks::<FL_CHUNK_SIZE>();

    let chunk_start_idx = array.offset() / FL_CHUNK_SIZE;
    let chunk_end_idx = (array.offset() + array.len()).div_ceil(FL_CHUNK_SIZE);
    let num_chunks = chunk_end_idx - chunk_start_idx;

    let mut buffer = BufferMut::<V>::with_capacity(num_chunks * FL_CHUNK_SIZE);
    let (out_buf, _) = buffer.spare_capacity_mut().as_chunks_mut::<FL_CHUNK_SIZE>();

    for (chunk_idx, (chunk_indices, chunk_out)) in
        indices_sl.iter().zip(out_buf.iter_mut()).enumerate()
    {
        // Offsets in `values_idx_offsets` are absolute and need to be shifted
        // by the offset of the first chunk, respective of the current slice,
        // to make them relative.
        let value_idx_offset = (values_idx_offsets[chunk_idx] - values_idx_offsets[0]) as usize;

        let next_value_idx_offset = if chunk_idx + 1 < num_chunks {
            (values_idx_offsets[chunk_idx + 1] - values_idx_offsets[0]) as usize
        } else {
            values.len()
        };
        let num_chunk_values = u16::try_from(next_value_idx_offset - value_idx_offset)
            .vortex_expect("There can be at most 1024 values in RLE chunk");

        // SAFETY: `MaybeUninit<T>` and `T` have the same layout.
        let buffer_values: &mut [V; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(chunk_out) };
        let chunk_values = &values[value_idx_offset..];
        if num_chunk_values == 1 {
            // Single-value chunk: fill directly to avoid out-of-bounds index
            // access. The indices may contain values other than 0 when they
            // have been further compressed (e.g., as a masked constant).
            buffer_values.fill(chunk_values[0]);
        } else if has_invalid {
            // When the indices array has invalid (null) positions, those
            // positions may contain arbitrary garbage values after further
            // compression. Clamp all indices into [0, num_chunk_values) to
            // prevent out-of-bounds access in the fastlanes decoder.
            let mut sanitized: [u16; FL_CHUNK_SIZE] = [0; FL_CHUNK_SIZE];
            for (idx_out, idx) in sanitized.iter_mut().zip(chunk_indices) {
                let idx: u16 =
                    NumCast::from(*idx).vortex_expect("RLE indices are always less than u16");
                *idx_out = idx.min(num_chunk_values - 1);
            }
            V::decode(chunk_values, &sanitized, buffer_values);
        } else {
            V::decode(chunk_values, chunk_indices, buffer_values);
        }
    }

    unsafe {
        buffer.set_len(num_chunks * FL_CHUNK_SIZE);
    }

    let offset_within_chunk = array.offset();

    Ok(PrimitiveArray::new(
        buffer
            .freeze()
            .slice(offset_within_chunk..(offset_within_chunk + array.len())),
        array.validity()?,
    ))
}