use std::sync::Arc;
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::varbin::VarBinArrayExt;
use vortex_array::arrays::varbinview::build_views::BinaryView;
use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN;
use vortex_array::arrays::varbinview::build_views::build_views;
use vortex_array::match_each_integer_ptype;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexResult;
use crate::FSST;
use crate::FSSTArrayExt;
pub(super) fn canonicalize_fsst(
array: ArrayView<'_, FSST>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let (buffers, views) = fsst_decode_views(array, 0, ctx)?;
Ok(unsafe {
VarBinViewArray::new_unchecked(
views,
Arc::from(buffers),
array.dtype().clone(),
array.codes().validity()?,
)
.into_array()
})
}
pub(crate) fn fsst_decode_views(
fsst_array: ArrayView<'_, FSST>,
start_buf_index: u32,
ctx: &mut ExecutionCtx,
) -> VortexResult<(Vec<ByteBuffer>, Buffer<BinaryView>)> {
let bytes = fsst_array.codes().sliced_bytes();
let uncompressed_lens_array = fsst_array
.uncompressed_lengths()
.clone()
.execute::<PrimitiveArray>(ctx)?;
#[expect(clippy::cast_possible_truncation)]
let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
uncompressed_lens_array
.as_slice::<P>()
.iter()
.map(|x| *x as usize)
.sum()
});
let decompressor = fsst_array.decompressor();
let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
let len =
decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
unsafe { uncompressed_bytes.set_len(len) };
match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
Ok(build_views(
start_buf_index,
MAX_BUFFER_LEN,
uncompressed_bytes,
uncompressed_lens_array.as_slice::<P>(),
))
})
}
#[cfg(test)]
mod tests {
use std::sync::LazyLock;
use rand::RngExt;
use rand::SeedableRng;
use rand::prelude::StdRng;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::ChunkedArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::builders::ArrayBuilder;
use vortex_array::builders::VarBinViewBuilder;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::session::ArraySession;
use vortex_error::VortexResult;
use vortex_session::VortexSession;
use crate::fsst_compress;
use crate::fsst_train_compressor;
static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
fn make_data() -> (VarBinArray, Vec<Option<Vec<u8>>>) {
const STRING_COUNT: usize = 1000;
let mut rng = StdRng::seed_from_u64(0);
let mut strings = Vec::with_capacity(STRING_COUNT);
for _ in 0..STRING_COUNT {
if rng.random_bool(0.9) {
strings.push(None)
} else {
let len = 10 * rng.random_range(50..=150) / 100;
strings.push(Some(
(0..len)
.map(|_| rng.random_range(b'a'..=b'z') as char)
.collect::<String>()
.into_bytes(),
));
}
}
(
VarBinArray::from_iter(
strings
.clone()
.into_iter()
.map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
DType::Binary(Nullability::Nullable),
),
strings,
)
}
fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
#[expect(clippy::type_complexity)]
let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
.map(|_| {
let (array, data) = make_data();
let compressor = fsst_train_compressor(&array);
(
fsst_compress(&array, array.len(), array.dtype(), &compressor).into_array(),
data,
)
})
.unzip();
(
ChunkedArray::from_iter(arr_vec),
data_vec.into_iter().flatten().collect(),
)
}
#[test]
fn test_to_canonical() -> VortexResult<()> {
let (chunked_arr, data) = make_data_chunked();
let mut builder =
VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
chunked_arr
.clone()
.into_array()
.append_to_builder(&mut builder, &mut SESSION.create_execution_ctx())?;
{
let arr = builder.finish_into_canonical().into_varbinview();
let res1 =
arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
assert_eq!(data, res1);
};
{
let arr2 = chunked_arr.as_array().to_varbinview();
let res2 =
arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
assert_eq!(data, res2)
};
Ok(())
}
#[test]
fn test_append_after_in_progress_buffer() -> VortexResult<()> {
let dtype = DType::Binary(Nullability::NonNullable);
let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
builder.append_value(b"long enough!!!");
let varbin = VarBinArray::from_iter(
[Some(b"long enough too".to_vec().into_boxed_slice())],
dtype,
);
let fsst_array = fsst_compress(
&varbin,
varbin.len(),
varbin.dtype(),
&fsst_train_compressor(&varbin),
)
.into_array();
fsst_array.append_to_builder(&mut builder, &mut SESSION.create_execution_ctx())?;
let _result = builder.finish_into_varbinview();
Ok(())
}
}