use std::fmt::Debug;
use std::fmt::Formatter;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::LazyLock;
use fsst::Compressor;
use fsst::Decompressor;
use fsst::Symbol;
use vortex_array::ArrayEq;
use vortex_array::ArrayHash;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::DeserializeMetadata;
use vortex_array::DynArray;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionResult;
use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::arrays::VarBin;
use vortex_array::arrays::VarBinArray;
use vortex_array::buffer::BufferHandle;
use vortex_array::builders::ArrayBuilder;
use vortex_array::builders::VarBinViewBuilder;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_array::serde::ArrayChildren;
use vortex_array::stats::ArrayStats;
use vortex_array::stats::StatsSetRef;
use vortex_array::validity::Validity;
use vortex_array::vtable;
use vortex_array::vtable::Array;
use vortex_array::vtable::ArrayId;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityChild;
use vortex_array::vtable::ValidityHelper;
use vortex_array::vtable::ValidityVTableFromChild;
use vortex_array::vtable::validity_nchildren;
use vortex_array::vtable::validity_to_child;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;
use crate::canonical::canonicalize_fsst;
use crate::canonical::fsst_decode_views;
use crate::kernel::PARENT_KERNELS;
use crate::rules::RULES;
vtable!(FSST);
#[derive(Clone, prost::Message)]
pub struct FSSTMetadata {
#[prost(enumeration = "PType", tag = "1")]
uncompressed_lengths_ptype: i32,
#[prost(enumeration = "PType", tag = "2")]
codes_offsets_ptype: i32,
}
impl FSSTMetadata {
pub fn get_uncompressed_lengths_ptype(&self) -> VortexResult<PType> {
PType::try_from(self.uncompressed_lengths_ptype)
.map_err(|_| vortex_err!("Invalid PType {}", self.uncompressed_lengths_ptype))
}
}
impl VTable for FSST {
type Array = FSSTArray;
type Metadata = ProstMetadata<FSSTMetadata>;
type OperationsVTable = Self;
type ValidityVTable = ValidityVTableFromChild;
fn vtable(_array: &Self::Array) -> &Self {
&FSST
}
fn id(&self) -> ArrayId {
Self::ID
}
fn len(array: &FSSTArray) -> usize {
array.codes().len()
}
fn dtype(array: &FSSTArray) -> &DType {
&array.dtype
}
fn stats(array: &FSSTArray) -> StatsSetRef<'_> {
array.stats_set.to_ref(array.as_ref())
}
fn array_hash<H: std::hash::Hasher>(array: &FSSTArray, state: &mut H, precision: Precision) {
array.dtype.hash(state);
array.symbols.array_hash(state, precision);
array.symbol_lengths.array_hash(state, precision);
array.codes.as_ref().array_hash(state, precision);
array.uncompressed_lengths.array_hash(state, precision);
}
fn array_eq(array: &FSSTArray, other: &FSSTArray, precision: Precision) -> bool {
array.dtype == other.dtype
&& array.symbols.array_eq(&other.symbols, precision)
&& array
.symbol_lengths
.array_eq(&other.symbol_lengths, precision)
&& array
.codes
.as_ref()
.array_eq(other.codes.as_ref(), precision)
&& array
.uncompressed_lengths
.array_eq(&other.uncompressed_lengths, precision)
}
fn nbuffers(_array: &FSSTArray) -> usize {
3
}
fn buffer(array: &FSSTArray, idx: usize) -> BufferHandle {
match idx {
0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()),
1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
2 => array.codes.bytes_handle().clone(),
_ => vortex_panic!("FSSTArray buffer index {idx} out of bounds"),
}
}
fn buffer_name(_array: &FSSTArray, idx: usize) -> Option<String> {
match idx {
0 => Some("symbols".to_string()),
1 => Some("symbol_lengths".to_string()),
2 => Some("compressed_codes".to_string()),
_ => vortex_panic!("FSSTArray buffer_name index {idx} out of bounds"),
}
}
fn nchildren(array: &FSSTArray) -> usize {
2 + validity_nchildren(array.codes.validity())
}
fn child(array: &FSSTArray, idx: usize) -> ArrayRef {
match idx {
0 => array.uncompressed_lengths().clone(),
1 => array.codes.offsets().clone(),
2 => validity_to_child(array.codes.validity(), array.codes.len())
.unwrap_or_else(|| vortex_panic!("FSSTArray child index {idx} out of bounds")),
_ => vortex_panic!("FSSTArray child index {idx} out of bounds"),
}
}
fn child_name(_array: &FSSTArray, idx: usize) -> String {
match idx {
0 => "uncompressed_lengths".to_string(),
1 => "codes_offsets".to_string(),
2 => "validity".to_string(),
_ => vortex_panic!("FSSTArray child_name index {idx} out of bounds"),
}
}
fn metadata(array: &FSSTArray) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(FSSTMetadata {
uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
codes_offsets_ptype: array.codes.offsets().dtype().as_ptype().into(),
}))
}
fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(metadata.serialize()))
}
fn deserialize(
bytes: &[u8],
_dtype: &DType,
_len: usize,
_buffers: &[BufferHandle],
_session: &VortexSession,
) -> VortexResult<Self::Metadata> {
Ok(ProstMetadata(
<ProstMetadata<FSSTMetadata> as DeserializeMetadata>::deserialize(bytes)?,
))
}
fn append_to_builder(
array: &FSSTArray,
builder: &mut dyn ArrayBuilder,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
builder.extend_from_array(
&array
.clone()
.into_array()
.execute::<Canonical>(ctx)?
.into_array(),
);
return Ok(());
};
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;
builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask()?);
Ok(())
}
fn build(
dtype: &DType,
len: usize,
metadata: &Self::Metadata,
buffers: &[BufferHandle],
children: &dyn ArrayChildren,
) -> VortexResult<FSSTArray> {
let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);
if buffers.len() == 2 {
if children.len() != 2 {
vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
}
let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
let codes = codes
.as_opt::<VarBin>()
.ok_or_else(|| {
vortex_err!(
"Expected VarBinArray for codes, got {}",
codes.encoding_id()
)
})?
.clone();
let uncompressed_lengths = children.get(
1,
&DType::Primitive(
metadata.0.get_uncompressed_lengths_ptype()?,
Nullability::NonNullable,
),
len,
)?;
return FSSTArray::try_new(
dtype.clone(),
symbols,
symbol_lengths,
codes,
uncompressed_lengths,
);
}
if buffers.len() == 3 {
let uncompressed_lengths = children.get(
0,
&DType::Primitive(
metadata.0.get_uncompressed_lengths_ptype()?,
Nullability::NonNullable,
),
len,
)?;
let codes_buffer = ByteBuffer::from_byte_buffer(buffers[2].clone().try_to_host_sync()?);
let codes_offsets = children.get(
1,
&DType::Primitive(
PType::try_from(metadata.codes_offsets_ptype)?,
Nullability::NonNullable,
),
len + 1,
)?;
let codes_validity = if children.len() == 2 {
Validity::from(dtype.nullability())
} else if children.len() == 3 {
let validity = children.get(2, &Validity::DTYPE, len)?;
Validity::Array(validity)
} else {
vortex_bail!("Expected 0 or 1 child, got {}", children.len());
};
let codes = VarBinArray::try_new(
codes_offsets,
codes_buffer,
DType::Binary(dtype.nullability()),
codes_validity,
)?;
return FSSTArray::try_new(
dtype.clone(),
symbols,
symbol_lengths,
codes,
uncompressed_lengths,
);
}
vortex_bail!(
"InvalidArgument: Expected 2 or 3 buffers, got {}",
buffers.len()
);
}
fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
vortex_ensure!(
children.len() == 2,
"FSSTArray expects 2 children, got {}",
children.len()
);
let mut children_iter = children.into_iter();
let codes = children_iter
.next()
.ok_or_else(|| vortex_err!("FSSTArray with_children missing codes"))?;
let codes = codes
.as_opt::<VarBin>()
.ok_or_else(|| {
vortex_err!(
"Expected VarBinArray for codes, got {}",
codes.encoding_id()
)
})?
.clone();
let uncompressed_lengths = children_iter
.next()
.ok_or_else(|| vortex_err!("FSSTArray with_children missing uncompressed_lengths"))?;
array.codes = codes;
array.uncompressed_lengths = uncompressed_lengths;
Ok(())
}
fn execute(array: Arc<Array<Self>>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
canonicalize_fsst(&array, ctx).map(ExecutionResult::done)
}
fn execute_parent(
array: &Array<Self>,
parent: &ArrayRef,
child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
}
fn reduce_parent(
array: &Array<Self>,
parent: &ArrayRef,
child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
RULES.evaluate(array, parent, child_idx)
}
}
#[derive(Clone)]
pub struct FSSTArray {
dtype: DType,
symbols: Buffer<Symbol>,
symbol_lengths: Buffer<u8>,
codes: VarBinArray,
codes_array: ArrayRef,
uncompressed_lengths: ArrayRef,
stats_set: ArrayStats,
compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
}
impl Debug for FSSTArray {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FSSTArray")
.field("dtype", &self.dtype)
.field("symbols", &self.symbols)
.field("symbol_lengths", &self.symbol_lengths)
.field("codes", &self.codes)
.field("uncompressed_lengths", &self.uncompressed_lengths)
.finish()
}
}
#[derive(Clone, Debug)]
pub struct FSST;
impl FSST {
pub const ID: ArrayId = ArrayId::new_ref("vortex.fsst");
}
impl FSSTArray {
pub fn try_new(
dtype: DType,
symbols: Buffer<Symbol>,
symbol_lengths: Buffer<u8>,
codes: VarBinArray,
uncompressed_lengths: ArrayRef,
) -> VortexResult<Self> {
if symbols.len() > 255 {
vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
}
if symbols.len() != symbol_lengths.len() {
vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
}
if uncompressed_lengths.len() != codes.len() {
vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
}
if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
}
if !matches!(codes.dtype(), DType::Binary(_)) {
vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
}
unsafe {
Ok(Self::new_unchecked(
dtype,
symbols,
symbol_lengths,
codes,
uncompressed_lengths,
))
}
}
pub(crate) unsafe fn new_unchecked(
dtype: DType,
symbols: Buffer<Symbol>,
symbol_lengths: Buffer<u8>,
codes: VarBinArray,
uncompressed_lengths: ArrayRef,
) -> Self {
let symbols2 = symbols.clone();
let symbol_lengths2 = symbol_lengths.clone();
let compressor = Arc::new(LazyLock::new(Box::new(move || {
Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
})
as Box<dyn Fn() -> Compressor + Send>));
let codes_array = codes.clone().into_array();
Self {
dtype,
symbols,
symbol_lengths,
codes,
codes_array,
uncompressed_lengths,
stats_set: Default::default(),
compressor,
}
}
pub fn symbols(&self) -> &Buffer<Symbol> {
&self.symbols
}
pub fn symbol_lengths(&self) -> &Buffer<u8> {
&self.symbol_lengths
}
pub fn codes(&self) -> &VarBinArray {
&self.codes
}
#[inline]
pub fn codes_dtype(&self) -> &DType {
self.codes.dtype()
}
pub fn uncompressed_lengths(&self) -> &ArrayRef {
&self.uncompressed_lengths
}
#[inline]
pub fn uncompressed_lengths_dtype(&self) -> &DType {
self.uncompressed_lengths.dtype()
}
pub fn decompressor(&self) -> Decompressor<'_> {
Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
}
pub fn compressor(&self) -> &Compressor {
self.compressor.as_ref()
}
}
impl ValidityChild<FSST> for FSST {
fn validity_child(array: &FSSTArray) -> &ArrayRef {
&array.codes_array
}
}
#[cfg(test)]
mod test {
use fsst::Compressor;
use fsst::Symbol;
use vortex_array::DynArray;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ProstMetadata;
use vortex_array::VortexSessionExecute;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_array::test_harness::check_metadata;
use vortex_array::vtable::VTable;
use vortex_buffer::Buffer;
use vortex_error::VortexError;
use crate::FSST;
use crate::array::FSSTMetadata;
use crate::fsst_compress_iter;
#[cfg_attr(miri, ignore)]
#[test]
fn test_fsst_metadata() {
check_metadata(
"fsst.metadata",
ProstMetadata(FSSTMetadata {
uncompressed_lengths_ptype: PType::U64 as i32,
codes_offsets_ptype: PType::I32 as i32,
}),
);
}
#[test]
fn test_back_compat() {
let symbols = Buffer::<Symbol>::copy_from([
Symbol::from_slice(b"abc00000"),
Symbol::from_slice(b"defghijk"),
]);
let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);
let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
let fsst_array = fsst_compress_iter(
[Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
2,
DType::Utf8(Nullability::NonNullable),
&compressor,
);
let compressed_codes = fsst_array.codes().clone();
let buffers = [
BufferHandle::new_host(symbols.into_byte_buffer()),
BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
];
let children = vec![
compressed_codes.into_array(),
fsst_array.uncompressed_lengths().clone(),
];
let fsst = FSST::build(
&DType::Utf8(Nullability::NonNullable),
2,
&ProstMetadata(FSSTMetadata {
uncompressed_lengths_ptype: fsst_array
.uncompressed_lengths()
.dtype()
.as_ptype()
.into(),
codes_offsets_ptype: 0,
}),
&buffers,
&children.as_slice(),
)
.unwrap();
let decompressed = fsst
.into_array()
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
decompressed
.with_iterator(|it| {
assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
Ok::<_, VortexError>(())
})
.unwrap()
}
}