vortex_fsst/canonical.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
use arrow_array::builder::make_view;
use vortex_array::array::{BinaryView, VarBinArray, VarBinViewArray};
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::vtable::CanonicalVTable;
use vortex_array::{Canonical, IntoArrayVariant};
use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use crate::{FSSTArray, FSSTEncoding};
impl CanonicalVTable<FSSTArray> for FSSTEncoding {
fn into_canonical(&self, array: FSSTArray) -> VortexResult<Canonical> {
array.with_decompressor(|decompressor| {
// FSSTArray has two child arrays:
//
// 1. A VarBinArray, which holds the string heap of the compressed codes.
// 2. An uncompressed_lengths primitive array, storing the length of each original
// string element.
//
// To speed up canonicalization, we can decompress the entire string-heap in a single
// call. We then turn our uncompressed_lengths into an offsets buffer
// necessary for a VarBinViewArray and construct the canonical array.
let bytes = VarBinArray::try_from(array.codes())?.sliced_bytes();
let uncompressed_lens_array = array.uncompressed_lengths().into_primitive()?;
// Decompres the full dataset.
#[allow(clippy::cast_possible_truncation)]
let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |$P| {
uncompressed_lens_array.as_slice::<$P>().iter().map(|x| *x as usize).sum()
});
// Bulk-decompress the entire array.
let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
// SAFETY: uncompressed bytes is large enough to contain all data + the 7 additional bytes
// of padding required for vectorized decompression. See the docstring for `decompress_into`
// for more details.
unsafe {
let len = decompressor
.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
uncompressed_bytes.set_len(len);
};
// Directly create the binary views.
let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
match_each_integer_ptype!(uncompressed_lens_array.ptype(), |$P| {
let mut offset = 0;
for len in uncompressed_lens_array.as_slice::<$P>() {
let len = *len as usize;
let view = make_view(
&uncompressed_bytes[offset..][..len],
0u32,
offset as u32,
);
// SAFETY: we reserved the right capacity beforehand
unsafe { views.push_unchecked(view.into()) };
offset += len;
}
});
let views = views.freeze();
let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes);
VarBinViewArray::try_new(
views,
vec![uncompressed_bytes_array],
array.dtype().clone(),
array.validity(),
)
.map(Canonical::VarBinView)
})
}
}