use std::fmt::Debug;
use std::hash::Hash;
use vortex_array::ArrayEq;
use vortex_array::ArrayHash;
use vortex_array::ArrayRef;
use vortex_array::DeserializeMetadata;
use vortex_array::DynArray;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionStep;
use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::PType;
use vortex_array::patches::Patches;
use vortex_array::patches::PatchesMetadata;
use vortex_array::serde::ArrayChildren;
use vortex_array::stats::ArrayStats;
use vortex_array::stats::StatsSetRef;
use vortex_array::vtable;
use vortex_array::vtable::ArrayId;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityChild;
use vortex_array::vtable::ValidityVTableFromChild;
use vortex_array::vtable::patches_child;
use vortex_array::vtable::patches_child_name;
use vortex_array::vtable::patches_nchildren;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;
use crate::ALPFloat;
use crate::alp::Exponents;
use crate::alp::decompress::execute_decompress;
use crate::alp::rules::PARENT_KERNELS;
use crate::alp::rules::RULES;
vtable!(ALP);
impl VTable for ALP {
type Array = ALPArray;
type Metadata = ProstMetadata<ALPMetadata>;
type OperationsVTable = Self;
type ValidityVTable = ValidityVTableFromChild;
fn id(_array: &Self::Array) -> ArrayId {
Self::ID
}
fn len(array: &ALPArray) -> usize {
array.encoded.len()
}
fn dtype(array: &ALPArray) -> &DType {
&array.dtype
}
fn stats(array: &ALPArray) -> StatsSetRef<'_> {
array.stats_set.to_ref(array.as_ref())
}
fn array_hash<H: std::hash::Hasher>(array: &ALPArray, state: &mut H, precision: Precision) {
array.dtype.hash(state);
array.encoded.array_hash(state, precision);
array.exponents.hash(state);
array.patches.array_hash(state, precision);
}
fn array_eq(array: &ALPArray, other: &ALPArray, precision: Precision) -> bool {
array.dtype == other.dtype
&& array.encoded.array_eq(&other.encoded, precision)
&& array.exponents == other.exponents
&& array.patches.array_eq(&other.patches, precision)
}
fn nbuffers(_array: &ALPArray) -> usize {
0
}
fn buffer(_array: &ALPArray, idx: usize) -> BufferHandle {
vortex_panic!("ALPArray buffer index {idx} out of bounds")
}
fn buffer_name(_array: &ALPArray, _idx: usize) -> Option<String> {
None
}
fn nchildren(array: &ALPArray) -> usize {
1 + array.patches().map_or(0, patches_nchildren)
}
fn child(array: &ALPArray, idx: usize) -> ArrayRef {
match idx {
0 => array.encoded().clone(),
_ => {
let patches = array
.patches()
.unwrap_or_else(|| vortex_panic!("ALPArray child index {idx} out of bounds"));
patches_child(patches, idx - 1)
}
}
}
fn child_name(array: &ALPArray, idx: usize) -> String {
match idx {
0 => "encoded".to_string(),
_ => {
if array.patches().is_none() {
vortex_panic!("ALPArray child_name index {idx} out of bounds");
}
patches_child_name(idx - 1).to_string()
}
}
}
fn metadata(array: &ALPArray) -> VortexResult<Self::Metadata> {
let exponents = array.exponents();
Ok(ProstMetadata(ALPMetadata {
exp_e: exponents.e as u32,
exp_f: exponents.f as u32,
patches: array
.patches()
.map(|p| p.to_metadata(array.len(), array.dtype()))
.transpose()?,
}))
}
fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(metadata.serialize()))
}
fn deserialize(
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
<ProstMetadata<ALPMetadata> as DeserializeMetadata>::deserialize(bytes)?,
))
}
fn build(
dtype: &DType,
len: usize,
metadata: &Self::Metadata,
_buffers: &[BufferHandle],
children: &dyn ArrayChildren,
) -> VortexResult<ALPArray> {
let encoded_ptype = match &dtype {
DType::Primitive(PType::F32, n) => DType::Primitive(PType::I32, *n),
DType::Primitive(PType::F64, n) => DType::Primitive(PType::I64, *n),
d => vortex_bail!(MismatchedTypes: "f32 or f64", d),
};
let encoded = children.get(0, &encoded_ptype, len)?;
let patches = metadata
.patches
.map(|p| {
let indices = children.get(1, &p.indices_dtype()?, p.len()?)?;
let values = children.get(2, dtype, p.len()?)?;
let chunk_offsets = p
.chunk_offsets_dtype()?
.map(|dtype| children.get(3, &dtype, usize::try_from(p.chunk_offsets_len())?))
.transpose()?;
Patches::new(len, p.offset()?, indices, values, chunk_offsets)
})
.transpose()?;
ALPArray::try_new(
encoded,
Exponents {
e: u8::try_from(metadata.exp_e)?,
f: u8::try_from(metadata.exp_f)?,
},
patches,
)
}
fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
let patches_info = array
.patches
.as_ref()
.map(|p| (p.array_len(), p.offset(), p.chunk_offsets().is_some()));
let expected_children = match &patches_info {
Some((_, _, has_chunk_offsets)) => 1 + 2 + if *has_chunk_offsets { 1 } else { 0 },
None => 1,
};
vortex_ensure!(
children.len() == expected_children,
"ALPArray expects {} children, got {}",
expected_children,
children.len()
);
let mut children_iter = children.into_iter();
array.encoded = children_iter
.next()
.ok_or_else(|| vortex_err!("Expected encoded child"))?;
if let Some((array_len, offset, _has_chunk_offsets)) = patches_info {
let indices = children_iter
.next()
.ok_or_else(|| vortex_err!("Expected patch indices child"))?;
let values = children_iter
.next()
.ok_or_else(|| vortex_err!("Expected patch values child"))?;
let chunk_offsets = children_iter.next();
array.patches = Some(Patches::new(
array_len,
offset,
indices,
values,
chunk_offsets,
)?);
}
Ok(())
}
fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(
execute_decompress(array.clone(), ctx)?.into_array(),
))
}
fn reduce_parent(
array: &Self::Array,
parent: &ArrayRef,
child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
RULES.evaluate(array, parent, child_idx)
}
fn execute_parent(
array: &Self::Array,
parent: &ArrayRef,
child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
}
}
#[derive(Clone, Debug)]
pub struct ALPArray {
encoded: ArrayRef,
patches: Option<Patches>,
dtype: DType,
exponents: Exponents,
stats_set: ArrayStats,
}
#[derive(Debug)]
pub struct ALP;
impl ALP {
pub const ID: ArrayId = ArrayId::new_ref("vortex.alp");
}
#[derive(Clone, prost::Message)]
pub struct ALPMetadata {
#[prost(uint32, tag = "1")]
pub(crate) exp_e: u32,
#[prost(uint32, tag = "2")]
pub(crate) exp_f: u32,
#[prost(message, optional, tag = "3")]
pub(crate) patches: Option<PatchesMetadata>,
}
impl ALPArray {
fn validate(
encoded: &ArrayRef,
exponents: Exponents,
patches: Option<&Patches>,
) -> VortexResult<()> {
vortex_ensure!(
matches!(
encoded.dtype(),
DType::Primitive(PType::I32 | PType::I64, _)
),
"ALP encoded ints have invalid DType {}",
encoded.dtype(),
);
let Exponents { e, f } = exponents;
match encoded.dtype().as_ptype() {
PType::I32 => {
vortex_ensure!(exponents.e <= f32::MAX_EXPONENT, "e out of bounds: {e}");
vortex_ensure!(exponents.f <= f32::MAX_EXPONENT, "f out of bounds: {f}");
if let Some(patches) = patches {
Self::validate_patches::<f32>(patches, encoded)?;
}
}
PType::I64 => {
vortex_ensure!(e <= f64::MAX_EXPONENT, "e out of bounds: {e}");
vortex_ensure!(f <= f64::MAX_EXPONENT, "f out of bounds: {f}");
if let Some(patches) = patches {
Self::validate_patches::<f64>(patches, encoded)?;
}
}
_ => unreachable!(),
}
if let Some(patches) = patches {
vortex_ensure!(
patches.array_len() == encoded.len(),
"patches array_len != encoded len: {} != {}",
patches.array_len(),
encoded.len()
);
}
Ok(())
}
fn validate_patches<T: ALPFloat>(patches: &Patches, encoded: &ArrayRef) -> VortexResult<()> {
vortex_ensure!(
patches.array_len() == encoded.len(),
"patches array_len != encoded len: {} != {}",
patches.array_len(),
encoded.len()
);
let expected_type = DType::Primitive(T::PTYPE, encoded.dtype().nullability());
vortex_ensure!(
patches.dtype() == &expected_type,
"Expected patches type {expected_type}, actual {}",
patches.dtype(),
);
Ok(())
}
}
impl ALPArray {
pub fn new(encoded: ArrayRef, exponents: Exponents, patches: Option<Patches>) -> Self {
Self::try_new(encoded, exponents, patches).vortex_expect("ALPArray new")
}
pub fn try_new(
encoded: ArrayRef,
exponents: Exponents,
patches: Option<Patches>,
) -> VortexResult<Self> {
Self::validate(&encoded, exponents, patches.as_ref())?;
let dtype = match encoded.dtype() {
DType::Primitive(PType::I32, nullability) => DType::Primitive(PType::F32, *nullability),
DType::Primitive(PType::I64, nullability) => DType::Primitive(PType::F64, *nullability),
_ => unreachable!(),
};
Ok(Self {
dtype,
encoded,
exponents,
patches,
stats_set: Default::default(),
})
}
pub(crate) unsafe fn new_unchecked(
encoded: ArrayRef,
exponents: Exponents,
patches: Option<Patches>,
dtype: DType,
) -> Self {
Self {
dtype,
encoded,
exponents,
patches,
stats_set: Default::default(),
}
}
pub fn ptype(&self) -> PType {
self.dtype.as_ptype()
}
pub fn encoded(&self) -> &ArrayRef {
&self.encoded
}
#[inline]
pub fn exponents(&self) -> Exponents {
self.exponents
}
pub fn patches(&self) -> Option<&Patches> {
self.patches.as_ref()
}
#[inline]
pub fn into_parts(self) -> (ArrayRef, Exponents, Option<Patches>, DType) {
(self.encoded, self.exponents, self.patches, self.dtype)
}
}
impl ValidityChild<ALP> for ALP {
fn validity_child(array: &ALPArray) -> &ArrayRef {
array.encoded()
}
}
#[cfg(test)]
mod tests {
use std::f64::consts::PI;
use std::sync::LazyLock;
use rstest::rstest;
use vortex_array::Canonical;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_array::session::ArraySession;
use vortex_array::vtable::ValidityHelper;
use vortex_session::VortexSession;
use super::*;
use crate::alp_encode;
use crate::decompress_into_array;
static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
#[rstest]
#[case(0)]
#[case(1)]
#[case(100)]
#[case(1023)]
#[case(1024)]
#[case(1025)]
#[case(2047)]
#[case(2048)]
#[case(2049)]
fn test_execute_f32(#[case] size: usize) {
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f32));
let encoded = alp_encode(&values, None).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded
.clone()
.into_array()
.execute::<Canonical>(&mut ctx)
.unwrap()
};
let expected =
decompress_into_array(encoded, &mut LEGACY_SESSION.create_execution_ctx()).unwrap();
assert_arrays_eq!(result_canonical.into_array(), expected);
}
#[rstest]
#[case(0)]
#[case(1)]
#[case(100)]
#[case(1023)]
#[case(1024)]
#[case(1025)]
#[case(2047)]
#[case(2048)]
#[case(2049)]
fn test_execute_f64(#[case] size: usize) {
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f64));
let encoded = alp_encode(&values, None).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded
.clone()
.into_array()
.execute::<Canonical>(&mut ctx)
.unwrap()
};
let expected =
decompress_into_array(encoded, &mut LEGACY_SESSION.create_execution_ctx()).unwrap();
assert_arrays_eq!(result_canonical.into_array(), expected);
}
#[rstest]
#[case(100)]
#[case(1023)]
#[case(1024)]
#[case(1025)]
#[case(2047)]
#[case(2048)]
#[case(2049)]
fn test_execute_with_patches(#[case] size: usize) {
let values: Vec<f64> = (0..size)
.map(|i| match i % 4 {
0..=2 => 1.0,
_ => PI,
})
.collect();
let array = PrimitiveArray::from_iter(values);
let encoded = alp_encode(&array, None).unwrap();
assert!(encoded.patches().unwrap().array_len() > 0);
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded
.clone()
.into_array()
.execute::<Canonical>(&mut ctx)
.unwrap()
};
let expected =
decompress_into_array(encoded, &mut LEGACY_SESSION.create_execution_ctx()).unwrap();
assert_arrays_eq!(result_canonical.into_array(), expected);
}
#[rstest]
#[case(0)]
#[case(1)]
#[case(100)]
#[case(1023)]
#[case(1024)]
#[case(1025)]
#[case(2047)]
#[case(2048)]
#[case(2049)]
fn test_execute_with_validity(#[case] size: usize) {
let values: Vec<Option<f32>> = (0..size)
.map(|i| if i % 2 == 1 { None } else { Some(1.0) })
.collect();
let array = PrimitiveArray::from_option_iter(values);
let encoded = alp_encode(&array, None).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded
.clone()
.into_array()
.execute::<Canonical>(&mut ctx)
.unwrap()
};
let expected =
decompress_into_array(encoded, &mut LEGACY_SESSION.create_execution_ctx()).unwrap();
assert_arrays_eq!(result_canonical.into_array(), expected);
}
#[rstest]
#[case(100)]
#[case(1023)]
#[case(1024)]
#[case(1025)]
#[case(2047)]
#[case(2048)]
#[case(2049)]
fn test_execute_with_patches_and_validity(#[case] size: usize) {
let values: Vec<Option<f64>> = (0..size)
.map(|idx| match idx % 3 {
0 => Some(1.0),
1 => None,
_ => Some(PI),
})
.collect();
let array = PrimitiveArray::from_option_iter(values);
let encoded = alp_encode(&array, None).unwrap();
assert!(encoded.patches().unwrap().array_len() > 0);
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
encoded
.clone()
.into_array()
.execute::<Canonical>(&mut ctx)
.unwrap()
};
let expected =
decompress_into_array(encoded, &mut LEGACY_SESSION.create_execution_ctx()).unwrap();
assert_arrays_eq!(result_canonical.into_array(), expected);
}
#[rstest]
#[case(500, 100)]
#[case(1000, 200)]
#[case(2048, 512)]
fn test_execute_sliced_vector(#[case] size: usize, #[case] slice_start: usize) {
let values: Vec<Option<f64>> = (0..size)
.map(|i| {
if i % 5 == 0 {
None
} else if i % 4 == 3 {
Some(PI)
} else {
Some(1.0)
}
})
.collect();
let array = PrimitiveArray::from_option_iter(values.clone());
let encoded = alp_encode(&array, None).unwrap();
let slice_end = size - slice_start;
let slice_len = slice_end - slice_start;
let sliced_encoded = encoded.slice(slice_start..slice_end).unwrap();
let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
sliced_encoded.execute::<Canonical>(&mut ctx).unwrap()
};
let result_primitive = result_canonical.into_primitive();
for idx in 0..slice_len {
let expected_value = values[slice_start + idx];
let result_valid = result_primitive.validity().is_valid(idx).unwrap();
assert_eq!(
result_valid,
expected_value.is_some(),
"Validity mismatch at idx={idx}",
);
if let Some(expected_val) = expected_value {
let result_val = result_primitive.as_slice::<f64>()[idx];
assert_eq!(result_val, expected_val, "Value mismatch at idx={idx}",);
}
}
}
#[rstest]
#[case(500, 100)]
#[case(1000, 200)]
#[case(2048, 512)]
fn test_sliced_to_primitive(#[case] size: usize, #[case] slice_start: usize) {
let values: Vec<Option<f64>> = (0..size)
.map(|i| {
if i % 5 == 0 {
None
} else if i % 4 == 3 {
Some(PI)
} else {
Some(1.0)
}
})
.collect();
let array = PrimitiveArray::from_option_iter(values.clone());
let encoded = alp_encode(&array, None).unwrap();
let slice_end = size - slice_start;
let slice_len = slice_end - slice_start;
let sliced_encoded = encoded.slice(slice_start..slice_end).unwrap();
let result_primitive = sliced_encoded.to_primitive();
for idx in 0..slice_len {
let expected_value = values[slice_start + idx];
let result_valid = result_primitive.validity_mask().unwrap().value(idx);
assert_eq!(
result_valid,
expected_value.is_some(),
"Validity mismatch at idx={idx}",
);
if let Some(expected_val) = expected_value {
let buf = result_primitive.to_buffer::<f64>();
let result_val = buf.as_slice()[idx];
assert_eq!(result_val, expected_val, "Value mismatch at idx={idx}",);
}
}
}
#[test]
fn test_execute_decompress_with_patches_no_chunk_offsets_regression_5948() {
let values: Vec<f64> = vec![1.0, 2.0, PI, 4.0, 5.0];
let original = PrimitiveArray::from_iter(values);
let normally_encoded = alp_encode(&original, None).unwrap();
assert!(
normally_encoded.patches().is_some(),
"Test requires patches to be present"
);
let original_patches = normally_encoded.patches().unwrap();
assert!(
original_patches.chunk_offsets().is_some(),
"Normal encoding should have chunk_offsets"
);
let patches_without_chunk_offsets = Patches::new(
original_patches.array_len(),
original_patches.offset(),
original_patches.indices().clone(),
original_patches.values().clone(),
None, )
.unwrap();
let alp_without_chunk_offsets = ALPArray::new(
normally_encoded.encoded().clone(),
normally_encoded.exponents(),
Some(patches_without_chunk_offsets),
);
let result_legacy = decompress_into_array(
alp_without_chunk_offsets.clone(),
&mut LEGACY_SESSION.create_execution_ctx(),
)
.unwrap();
let legacy_slice = result_legacy.as_slice::<f64>();
assert!(
(legacy_slice[2] - PI).abs() < 1e-10,
"Legacy path should have PI at index 2, got {}",
legacy_slice[2]
);
let result_execute = {
let mut ctx = SESSION.create_execution_ctx();
execute_decompress(alp_without_chunk_offsets, &mut ctx).unwrap()
};
let execute_slice = result_execute.as_slice::<f64>();
assert!(
(execute_slice[2] - PI).abs() < 1e-10,
"Execute path should have PI at index 2, but got {} (patches were dropped!)",
execute_slice[2]
);
}
}