use onpair::Config;
use onpair::Offset;
use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use crate::OnPair;
use crate::OnPairArray;
pub const DEFAULT_DICT12_CONFIG: Config = onpair::DEFAULT_CONFIG;
pub fn onpair_compress_iter<'a, I>(
iter: I,
len: usize,
dtype: DType,
config: Config,
) -> VortexResult<OnPairArray>
where
I: Iterator<Item = Option<&'a [u8]>>,
{
onpair_compress_iter_with_offsets::<u64, _>(iter, len, dtype, config)
}
fn onpair_compress_iter_with_offsets<'a, O, I>(
iter: I,
len: usize,
dtype: DType,
config: Config,
) -> VortexResult<OnPairArray>
where
O: Offset,
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut flat: Vec<u8> = Vec::with_capacity(len * 16);
let mut offsets: Vec<O> = Vec::with_capacity(len + 1);
let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
let mut validity_bits: Vec<bool> = Vec::with_capacity(len);
offsets.push(<O as Offset>::from_usize(0));
for item in iter {
match item {
Some(bytes) => {
flat.extend_from_slice(bytes);
offsets.push(<O as Offset>::from_usize(flat.len()));
uncompressed_lengths.push(
i32::try_from(bytes.len()).vortex_expect("string length must fit in i32"),
);
validity_bits.push(true);
}
None => {
offsets.push(<O as Offset>::from_usize(flat.len()));
uncompressed_lengths.push(0);
validity_bits.push(false);
}
}
}
let column = onpair::compress(&flat, &offsets, config)
.map_err(|e| vortex_err!("OnPair compress failed: {e}"))?;
let bits = column.bits;
let dict_bytes = dict_bytes_to_buffer(column.dict_bytes);
let codes_offsets = build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?;
let codes = Buffer::from(column.codes).into_array();
let dict_offsets = Buffer::from(column.dict_offsets).into_array();
let codes_offsets = Buffer::from(codes_offsets).into_array();
let uncompressed_lengths = uncompressed_lengths.into_array();
let validity = match dtype.nullability() {
Nullability::NonNullable => Validity::NonNullable,
Nullability::Nullable => Validity::from_iter(validity_bits),
};
OnPair::try_new(
dtype,
dict_bytes,
dict_offsets,
codes,
codes_offsets,
uncompressed_lengths,
validity,
bits,
)
}
fn dict_bytes_to_buffer(dict_bytes: Vec<u8>) -> BufferHandle {
let mut padded = Vec::with_capacity(dict_bytes.len() + onpair::MAX_TOKEN_SIZE);
padded.extend_from_slice(&dict_bytes);
padded.resize(dict_bytes.len() + onpair::MAX_TOKEN_SIZE, 0);
BufferHandle::new_host(ByteBuffer::from(padded).aligned(vortex_buffer::Alignment::new(8)))
}
fn build_codes_offsets<O: Offset>(
codes: &[u16],
dict_offsets: &[u32],
row_byte_offsets: &[O],
) -> VortexResult<Vec<u32>> {
let nrows = row_byte_offsets.len() - 1;
let mut codes_offsets = Vec::with_capacity(nrows + 1);
codes_offsets.push(0u32);
let mut decoded_bytes: u64 = 0;
let mut code_idx: usize = 0;
for r in 0..nrows {
let target = row_byte_offsets[r + 1]
.to_usize()
.ok_or_else(|| vortex_err!("OnPair row byte offset does not fit usize"))?
as u64;
while decoded_bytes < target {
let code = codes[code_idx] as usize;
decoded_bytes += u64::from(dict_offsets[code + 1] - dict_offsets[code]);
code_idx += 1;
}
codes_offsets.push(
u32::try_from(code_idx)
.map_err(|_| vortex_err!("OnPair: code boundary {code_idx} does not fit u32"))?,
);
}
Ok(codes_offsets)
}
pub fn onpair_compress<A: ArrayAccessor<[u8]>>(
array: A,
len: usize,
dtype: &DType,
config: Config,
) -> VortexResult<OnPairArray> {
array.with_iterator(|iter| onpair_compress_iter(iter, len, dtype.clone(), config))
}
pub fn onpair_compress_array(
array: &ArrayRef,
config: Config,
ctx: &mut ExecutionCtx,
) -> VortexResult<OnPairArray> {
let view = array.clone().execute::<VarBinViewArray>(ctx)?;
let len = view.len();
let dtype = view.dtype().clone();
onpair_compress(&view, len, &dtype, config)
}
pub fn onpair_compress_array_default(
array: &ArrayRef,
config: Config,
) -> VortexResult<OnPairArray> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
onpair_compress_array(array, config, &mut ctx)
}