#![allow(clippy::cast_possible_truncation)]
pub use array::*;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_fastlanes::bitpack_compress::bitpack_encode_unchecked;
mod array;
mod compute;
mod kernel;
mod ops;
mod rules;
mod slice;
use std::ops::Shl;
use std::ops::Shr;
use itertools::Itertools;
use num_traits::Float;
use num_traits::One;
use num_traits::PrimInt;
use rustc_hash::FxBuildHasher;
use vortex_array::DynArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::NativePType;
use vortex_array::match_each_integer_ptype;
use vortex_array::vtable::ValidityHelper;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_panic;
use vortex_utils::aliases::hash_map::HashMap;
use crate::match_each_alp_float_ptype;
macro_rules! bit_width {
($value:expr) => {
if $value == 0 {
1
} else {
$value.ilog2().wrapping_add(1) as usize
}
};
}
const CUT_LIMIT: usize = 16;
const MAX_DICT_SIZE: u8 = 8;
mod private {
pub trait Sealed {}
impl Sealed for f32 {}
impl Sealed for f64 {}
}
pub trait ALPRDFloat: private::Sealed + Float + Copy + NativePType {
type UINT: NativePType + PrimInt + One + Copy;
const BITS: usize = size_of::<Self>() * 8;
fn from_bits(bits: Self::UINT) -> Self;
fn to_bits(value: Self) -> Self::UINT;
fn to_u16(bits: Self::UINT) -> u16;
fn from_u16(value: u16) -> Self::UINT;
}
impl ALPRDFloat for f64 {
type UINT = u64;
fn from_bits(bits: Self::UINT) -> Self {
f64::from_bits(bits)
}
fn to_bits(value: Self) -> Self::UINT {
value.to_bits()
}
fn to_u16(bits: Self::UINT) -> u16 {
bits as u16
}
fn from_u16(value: u16) -> Self::UINT {
value as u64
}
}
impl ALPRDFloat for f32 {
type UINT = u32;
fn from_bits(bits: Self::UINT) -> Self {
f32::from_bits(bits)
}
fn to_bits(value: Self) -> Self::UINT {
value.to_bits()
}
fn to_u16(bits: Self::UINT) -> u16 {
bits as u16
}
fn from_u16(value: u16) -> Self::UINT {
value as u32
}
}
pub struct RDEncoder {
right_bit_width: u8,
codes: Vec<u16>,
}
impl RDEncoder {
pub fn new<T>(sample: &[T]) -> Self
where
T: ALPRDFloat + NativePType,
T::UINT: NativePType,
{
let dictionary = find_best_dictionary::<T>(sample);
let mut codes = vec![0; dictionary.dictionary.len()];
dictionary.dictionary.into_iter().for_each(|(bits, code)| {
codes[code as usize] = bits
});
Self {
right_bit_width: dictionary.right_bit_width,
codes,
}
}
pub fn from_parts(right_bit_width: u8, codes: Vec<u16>) -> Self {
Self {
right_bit_width,
codes,
}
}
pub fn encode(&self, array: &PrimitiveArray) -> ALPRDArray {
match_each_alp_float_ptype!(array.ptype(), |P| { self.encode_generic::<P>(array) })
}
fn encode_generic<T>(&self, array: &PrimitiveArray) -> ALPRDArray
where
T: ALPRDFloat + NativePType,
T::UINT: NativePType,
{
assert!(
!self.codes.is_empty(),
"codes lookup table must be populated before RD encoding"
);
let doubles = array.as_slice::<T>();
let mut left_parts: BufferMut<u16> = BufferMut::with_capacity(doubles.len());
let mut right_parts: BufferMut<T::UINT> = BufferMut::with_capacity(doubles.len());
let mut exceptions_pos: BufferMut<u64> = BufferMut::with_capacity(doubles.len() / 4);
let mut exceptions: BufferMut<u16> = BufferMut::with_capacity(doubles.len() / 4);
let right_mask = T::UINT::one().shl(self.right_bit_width as _) - T::UINT::one();
let max_code = self.codes.len() - 1;
let left_bit_width = bit_width!(max_code);
for v in doubles.iter().copied() {
right_parts.push(T::to_bits(v) & right_mask);
left_parts.push(<T as ALPRDFloat>::to_u16(
T::to_bits(v).shr(self.right_bit_width as _),
));
}
for (idx, left) in left_parts.iter_mut().enumerate() {
if let Some(code) = self.codes.iter().position(|v| *v == *left) {
*left = code as u16;
} else {
exceptions.push(*left);
exceptions_pos.push(idx as _);
*left = 0u16;
}
}
let primitive_left = PrimitiveArray::new(left_parts, array.validity().clone());
let packed_left = unsafe {
bitpack_encode_unchecked(primitive_left, left_bit_width as _)
.vortex_expect("bitpack_encode_unchecked should succeed for left parts")
.into_array()
};
let primitive_right = PrimitiveArray::new(right_parts, Validity::NonNullable);
let packed_right = unsafe {
bitpack_encode_unchecked(primitive_right, self.right_bit_width as _)
.vortex_expect("bitpack_encode_unchecked should succeed for right parts")
.into_array()
};
let exceptions = (!exceptions_pos.is_empty()).then(|| {
let max_exc_pos = exceptions_pos.last().copied().unwrap_or_default();
let bw = bit_width!(max_exc_pos) as u8;
let exc_pos_array = PrimitiveArray::new(exceptions_pos, Validity::NonNullable);
let packed_pos = unsafe {
bitpack_encode_unchecked(exc_pos_array, bw)
.vortex_expect(
"bitpack_encode_unchecked should succeed for exception positions",
)
.into_array()
};
Patches::new(
doubles.len(),
0,
packed_pos,
exceptions.into_array(),
None,
)
.vortex_expect("Patches construction in encode")
});
ALPRDArray::try_new(
DType::Primitive(T::PTYPE, packed_left.dtype().nullability()),
packed_left,
Buffer::<u16>::copy_from(&self.codes),
packed_right,
self.right_bit_width,
exceptions,
)
.vortex_expect("ALPRDArray construction in encode")
}
}
pub fn alp_rd_decode<T: ALPRDFloat>(
left_parts: Buffer<u16>,
left_parts_dict: &[u16],
right_bit_width: u8,
right_parts: Buffer<T::UINT>,
left_parts_patches: Option<Patches>,
ctx: &mut ExecutionCtx,
) -> VortexResult<Buffer<T>> {
if left_parts.len() != right_parts.len() {
vortex_panic!("alp_rd_decode: left_parts.len != right_parts.len");
}
let mut values = BufferMut::<u16>::from_iter(
left_parts
.iter()
.map(|code| left_parts_dict[*code as usize]),
);
if let Some(patches) = left_parts_patches {
let indices = patches.indices().clone().execute::<PrimitiveArray>(ctx)?;
let patch_values = patches.values().clone().execute::<PrimitiveArray>(ctx)?;
alp_rd_apply_patches(&mut values, &indices, &patch_values, patches.offset());
}
Ok(alp_rd_decode_core(
left_parts_dict,
right_bit_width,
right_parts,
values,
))
}
fn alp_rd_apply_patches(
values: &mut BufferMut<u16>,
indices: &PrimitiveArray,
patch_values: &PrimitiveArray,
offset: usize,
) {
match_each_integer_ptype!(indices.ptype(), |T| {
indices
.as_slice::<T>()
.iter()
.copied()
.map(|idx| idx - offset as T)
.zip(patch_values.as_slice::<u16>().iter())
.for_each(|(idx, v)| values[idx as usize] = *v);
})
}
fn alp_rd_decode_core<T: ALPRDFloat>(
_left_parts_dict: &[u16],
right_bit_width: u8,
right_parts: Buffer<T::UINT>,
values: BufferMut<u16>,
) -> Buffer<T> {
let mut index = 0;
right_parts
.map_each_in_place(|right| {
let left = values[index];
index += 1;
let left = <T as ALPRDFloat>::from_u16(left);
T::from_bits((left << (right_bit_width as usize)) | right)
})
.freeze()
}
fn find_best_dictionary<T: ALPRDFloat>(samples: &[T]) -> ALPRDDictionary {
let mut best_est_size = f64::MAX;
let mut best_dict = ALPRDDictionary::default();
for p in 1..=16 {
let candidate_right_bw = (T::BITS - p) as u8;
let (dictionary, exception_count) =
build_left_parts_dictionary::<T>(samples, candidate_right_bw, MAX_DICT_SIZE);
let estimated_size = estimate_compression_size(
dictionary.right_bit_width,
dictionary.left_bit_width,
exception_count,
samples.len(),
);
if estimated_size < best_est_size {
best_est_size = estimated_size;
best_dict = dictionary;
}
}
best_dict
}
fn build_left_parts_dictionary<T: ALPRDFloat>(
samples: &[T],
right_bw: u8,
max_dict_size: u8,
) -> (ALPRDDictionary, usize) {
assert!(
right_bw >= (T::BITS - CUT_LIMIT) as _,
"left-parts must be <= 16 bits"
);
let mut counts = HashMap::new();
samples
.iter()
.copied()
.map(|v| <T as ALPRDFloat>::to_u16(T::to_bits(v).shr(right_bw as _)))
.for_each(|item| *counts.entry(item).or_default() += 1);
let mut sorted_bit_counts: Vec<(u16, usize)> = counts.into_iter().collect_vec();
sorted_bit_counts.sort_by_key(|(_, count)| count.wrapping_neg());
let mut dictionary = HashMap::with_capacity_and_hasher(max_dict_size as _, FxBuildHasher);
let mut code = 0u16;
while code < (max_dict_size as _) && (code as usize) < sorted_bit_counts.len() {
let (bits, _) = sorted_bit_counts[code as usize];
dictionary.insert(bits, code);
code += 1;
}
let exception_count: usize = sorted_bit_counts
.iter()
.skip(code as _)
.map(|(_, count)| *count)
.sum();
let max_code = dictionary.len() - 1;
let left_bw = bit_width!(max_code) as u8;
(
ALPRDDictionary {
dictionary,
right_bit_width: right_bw,
left_bit_width: left_bw,
},
exception_count,
)
}
fn estimate_compression_size(
right_bw: u8,
left_bw: u8,
exception_count: usize,
sample_n: usize,
) -> f64 {
const EXC_POSITION_SIZE: usize = 16; const EXC_SIZE: usize = 16;
let exceptions_size = exception_count * (EXC_POSITION_SIZE + EXC_SIZE);
(right_bw as f64) + (left_bw as f64) + ((exceptions_size as f64) / (sample_n as f64))
}
#[derive(Debug, Default)]
struct ALPRDDictionary {
dictionary: HashMap<u16, u16, FxBuildHasher>,
left_bit_width: u8,
right_bit_width: u8,
}