vortex_fsst/
array.rs

1use std::fmt::{Debug, Formatter};
2use std::sync::{Arc, LazyLock};
3
4use fsst::{Compressor, Decompressor, Symbol};
5use vortex_array::arrays::VarBinArray;
6use vortex_array::stats::{ArrayStats, StatsSetRef};
7use vortex_array::vtable::{
8    ArrayVTable, NotSupported, VTable, ValidityChild, ValidityVTableFromChild,
9};
10use vortex_array::{Array, ArrayRef, EncodingId, EncodingRef, vtable};
11use vortex_buffer::Buffer;
12use vortex_dtype::DType;
13use vortex_error::{VortexResult, vortex_bail};
14
15vtable!(FSST);
16
17impl VTable for FSSTVTable {
18    type Array = FSSTArray;
19    type Encoding = FSSTEncoding;
20
21    type ArrayVTable = Self;
22    type CanonicalVTable = Self;
23    type OperationsVTable = Self;
24    type ValidityVTable = ValidityVTableFromChild;
25    type VisitorVTable = Self;
26    type ComputeVTable = NotSupported;
27    type EncodeVTable = Self;
28    type SerdeVTable = Self;
29
30    fn id(_encoding: &Self::Encoding) -> EncodingId {
31        EncodingId::new_ref("vortex.fsst")
32    }
33
34    fn encoding(_array: &Self::Array) -> EncodingRef {
35        EncodingRef::new_ref(FSSTEncoding.as_ref())
36    }
37}
38
39#[derive(Clone)]
40pub struct FSSTArray {
41    dtype: DType,
42    symbols: Buffer<Symbol>,
43    symbol_lengths: Buffer<u8>,
44    codes: VarBinArray,
45    /// Lengths of the original values before compression, can be compressed.
46    uncompressed_lengths: ArrayRef,
47    stats_set: ArrayStats,
48
49    /// Memoized compressor used for push-down of compute by compressing the RHS.
50    compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
51}
52
53impl Debug for FSSTArray {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("FSSTArray")
56            .field("dtype", &self.dtype)
57            .field("symbols", &self.symbols)
58            .field("symbol_lengths", &self.symbol_lengths)
59            .field("codes", &self.codes)
60            .field("uncompressed_lengths", &self.uncompressed_lengths)
61            .finish()
62    }
63}
64
65#[derive(Clone, Debug)]
66pub struct FSSTEncoding;
67
68impl FSSTArray {
69    /// Build an FSST array from a set of `symbols` and `codes`.
70    ///
71    /// Symbols are 8-bytes and can represent short strings, each of which is assigned
72    /// a code.
73    ///
74    /// The `codes` array is a Binary array where each binary datum is a sequence of 8-bit codes.
75    /// Each code corresponds either to a symbol, or to the "escape code",
76    /// which tells the decoder to emit the following byte without doing a table lookup.
77    pub fn try_new(
78        dtype: DType,
79        symbols: Buffer<Symbol>,
80        symbol_lengths: Buffer<u8>,
81        codes: VarBinArray,
82        uncompressed_lengths: ArrayRef,
83    ) -> VortexResult<Self> {
84        // Check: symbols must not have length > MAX_CODE
85        if symbols.len() > 255 {
86            vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
87        }
88        if symbols.len() != symbol_lengths.len() {
89            vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
90        }
91
92        if uncompressed_lengths.len() != codes.len() {
93            vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
94        }
95
96        if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
97            vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
98        }
99
100        // Check: strings must be a Binary array.
101        if !matches!(codes.dtype(), DType::Binary(_)) {
102            vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
103        }
104
105        let symbols2 = symbols.clone();
106        let symbol_lengths2 = symbol_lengths.clone();
107        let compressor = Arc::new(LazyLock::new(Box::new(move || {
108            Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
109        })
110            as Box<dyn Fn() -> Compressor + Send>));
111
112        Ok(Self {
113            dtype,
114            symbols,
115            symbol_lengths,
116            codes,
117            uncompressed_lengths,
118            stats_set: Default::default(),
119            compressor,
120        })
121    }
122
123    /// Access the symbol table array
124    pub fn symbols(&self) -> &Buffer<Symbol> {
125        &self.symbols
126    }
127
128    /// Access the symbol table array
129    pub fn symbol_lengths(&self) -> &Buffer<u8> {
130        &self.symbol_lengths
131    }
132
133    /// Access the codes array
134    pub fn codes(&self) -> &VarBinArray {
135        &self.codes
136    }
137
138    /// Get the DType of the codes array
139    #[inline]
140    pub fn codes_dtype(&self) -> &DType {
141        self.codes.dtype()
142    }
143
144    /// Get the uncompressed length for each element in the array.
145    pub fn uncompressed_lengths(&self) -> &ArrayRef {
146        &self.uncompressed_lengths
147    }
148
149    /// Get the DType of the uncompressed lengths array
150    #[inline]
151    pub fn uncompressed_lengths_dtype(&self) -> &DType {
152        self.uncompressed_lengths.dtype()
153    }
154
155    /// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
156    /// this array.
157    ///
158    /// This is private to the crate to avoid leaking `fsst-rs` types as part of the public API.
159    pub(crate) fn decompressor(&self) -> Decompressor {
160        Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
161    }
162
163    pub(crate) fn compressor(&self) -> &Compressor {
164        self.compressor.as_ref()
165    }
166}
167
168impl ArrayVTable<FSSTVTable> for FSSTVTable {
169    fn len(array: &FSSTArray) -> usize {
170        array.codes().len()
171    }
172
173    fn dtype(array: &FSSTArray) -> &DType {
174        &array.dtype
175    }
176
177    fn stats(array: &FSSTArray) -> StatsSetRef<'_> {
178        array.stats_set.to_ref(array.as_ref())
179    }
180}
181
182impl ValidityChild<FSSTVTable> for FSSTVTable {
183    fn validity_child(array: &FSSTArray) -> &dyn Array {
184        array.codes().as_ref()
185    }
186}