use std::sync::Arc;
use num_traits::AsPrimitive;
use onpair::Parts;
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::varbinview::build_views::BinaryView;
use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN;
use vortex_array::arrays::varbinview::build_views::build_views;
use vortex_array::match_each_integer_ptype;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use crate::OnPair;
use crate::OnPairArraySlotsExt;
use crate::decode::code_boundary_at;
use crate::decode::collect_widened;
pub(super) fn canonicalize_onpair(
array: ArrayView<'_, OnPair>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let (buffers, views) = onpair_decode_views(array, 0, ctx)?;
let validity = array.array().validity()?;
Ok(unsafe {
VarBinViewArray::new_unchecked(views, Arc::from(buffers), array.dtype().clone(), validity)
.into_array()
})
}
pub(crate) fn onpair_decode_views(
array: ArrayView<'_, OnPair>,
start_buf_index: u32,
ctx: &mut ExecutionCtx,
) -> VortexResult<(Vec<ByteBuffer>, Buffer<BinaryView>)> {
let lengths = array
.uncompressed_lengths()
.clone()
.execute::<PrimitiveArray>(ctx)?;
let total_size: usize = match_each_integer_ptype!(lengths.ptype(), |P| {
lengths
.as_slice::<P>()
.iter()
.map(|&l| AsPrimitive::<usize>::as_(l))
.sum()
});
let codes_offsets = array.codes_offsets();
let code_start = code_boundary_at(codes_offsets, 0, ctx)?;
let code_end = code_boundary_at(codes_offsets, array.len(), ctx)?;
vortex_ensure!(
code_start <= code_end,
"OnPair codes_offsets must be nondecreasing"
);
vortex_ensure!(
code_end <= array.codes().len(),
"OnPair codes_offsets end {} exceeds codes len {}",
code_end,
array.codes().len()
);
let codes = collect_widened::<u16>(&array.codes().slice(code_start..code_end)?, ctx)?;
let dict_offsets = collect_widened::<u32>(array.dict_offsets(), ctx)?;
let mut out_bytes = ByteBufferMut::with_capacity(total_size);
let written = onpair::decompress_into(
Parts {
dict_bytes: array.dict_bytes().as_slice(),
dict_offsets: dict_offsets.as_slice(),
bits: array.bits(),
codes: codes.as_slice(),
},
out_bytes.spare_capacity_mut(),
);
debug_assert_eq!(written, total_size);
unsafe { out_bytes.set_len(written) };
match_each_integer_ptype!(lengths.ptype(), |P| {
Ok(build_views(
start_buf_index,
MAX_BUFFER_LEN,
out_bytes,
lengths.as_slice::<P>(),
))
})
}