vortex_fsst/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::{Debug, Formatter};
5use std::sync::{Arc, LazyLock};
6
7use fsst::{Compressor, Decompressor, Symbol};
8use vortex_array::arrays::VarBinArray;
9use vortex_array::stats::{ArrayStats, StatsSetRef};
10use vortex_array::vtable::{
11    ArrayVTable, NotSupported, VTable, ValidityChild, ValidityVTableFromChild,
12};
13use vortex_array::{Array, ArrayRef, EncodingId, EncodingRef, vtable};
14use vortex_buffer::Buffer;
15use vortex_dtype::DType;
16use vortex_error::{VortexResult, vortex_bail};
17
18vtable!(FSST);
19
20impl VTable for FSSTVTable {
21    type Array = FSSTArray;
22    type Encoding = FSSTEncoding;
23
24    type ArrayVTable = Self;
25    type CanonicalVTable = Self;
26    type OperationsVTable = Self;
27    type ValidityVTable = ValidityVTableFromChild;
28    type VisitorVTable = Self;
29    type ComputeVTable = NotSupported;
30    type EncodeVTable = Self;
31    type SerdeVTable = Self;
32    type PipelineVTable = NotSupported;
33
34    fn id(_encoding: &Self::Encoding) -> EncodingId {
35        EncodingId::new_ref("vortex.fsst")
36    }
37
38    fn encoding(_array: &Self::Array) -> EncodingRef {
39        EncodingRef::new_ref(FSSTEncoding.as_ref())
40    }
41}
42
43#[derive(Clone)]
44pub struct FSSTArray {
45    dtype: DType,
46    symbols: Buffer<Symbol>,
47    symbol_lengths: Buffer<u8>,
48    codes: VarBinArray,
49    /// Lengths of the original values before compression, can be compressed.
50    uncompressed_lengths: ArrayRef,
51    stats_set: ArrayStats,
52
53    /// Memoized compressor used for push-down of compute by compressing the RHS.
54    compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
55}
56
57impl Debug for FSSTArray {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("FSSTArray")
60            .field("dtype", &self.dtype)
61            .field("symbols", &self.symbols)
62            .field("symbol_lengths", &self.symbol_lengths)
63            .field("codes", &self.codes)
64            .field("uncompressed_lengths", &self.uncompressed_lengths)
65            .finish()
66    }
67}
68
69#[derive(Clone, Debug)]
70pub struct FSSTEncoding;
71
72impl FSSTArray {
73    /// Build an FSST array from a set of `symbols` and `codes`.
74    ///
75    /// Symbols are 8-bytes and can represent short strings, each of which is assigned
76    /// a code.
77    ///
78    /// The `codes` array is a Binary array where each binary datum is a sequence of 8-bit codes.
79    /// Each code corresponds either to a symbol, or to the "escape code",
80    /// which tells the decoder to emit the following byte without doing a table lookup.
81    pub fn try_new(
82        dtype: DType,
83        symbols: Buffer<Symbol>,
84        symbol_lengths: Buffer<u8>,
85        codes: VarBinArray,
86        uncompressed_lengths: ArrayRef,
87    ) -> VortexResult<Self> {
88        // Check: symbols must not have length > MAX_CODE
89        if symbols.len() > 255 {
90            vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
91        }
92        if symbols.len() != symbol_lengths.len() {
93            vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
94        }
95
96        if uncompressed_lengths.len() != codes.len() {
97            vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
98        }
99
100        if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
101            vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
102        }
103
104        // Check: strings must be a Binary array.
105        if !matches!(codes.dtype(), DType::Binary(_)) {
106            vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
107        }
108
109        // SAFETY: all components validated above
110        unsafe {
111            Ok(Self::new_unchecked(
112                dtype,
113                symbols,
114                symbol_lengths,
115                codes,
116                uncompressed_lengths,
117            ))
118        }
119    }
120
121    pub(crate) unsafe fn new_unchecked(
122        dtype: DType,
123        symbols: Buffer<Symbol>,
124        symbol_lengths: Buffer<u8>,
125        codes: VarBinArray,
126        uncompressed_lengths: ArrayRef,
127    ) -> Self {
128        let symbols2 = symbols.clone();
129        let symbol_lengths2 = symbol_lengths.clone();
130        let compressor = Arc::new(LazyLock::new(Box::new(move || {
131            Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
132        })
133            as Box<dyn Fn() -> Compressor + Send>));
134
135        Self {
136            dtype,
137            symbols,
138            symbol_lengths,
139            codes,
140            uncompressed_lengths,
141            stats_set: Default::default(),
142            compressor,
143        }
144    }
145
146    /// Access the symbol table array
147    pub fn symbols(&self) -> &Buffer<Symbol> {
148        &self.symbols
149    }
150
151    /// Access the symbol table array
152    pub fn symbol_lengths(&self) -> &Buffer<u8> {
153        &self.symbol_lengths
154    }
155
156    /// Access the codes array
157    pub fn codes(&self) -> &VarBinArray {
158        &self.codes
159    }
160
161    /// Get the DType of the codes array
162    #[inline]
163    pub fn codes_dtype(&self) -> &DType {
164        self.codes.dtype()
165    }
166
167    /// Get the uncompressed length for each element in the array.
168    pub fn uncompressed_lengths(&self) -> &ArrayRef {
169        &self.uncompressed_lengths
170    }
171
172    /// Get the DType of the uncompressed lengths array
173    #[inline]
174    pub fn uncompressed_lengths_dtype(&self) -> &DType {
175        self.uncompressed_lengths.dtype()
176    }
177
178    /// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
179    /// this array.
180    ///
181    /// This is private to the crate to avoid leaking `fsst-rs` types as part of the public API.
182    pub(crate) fn decompressor(&self) -> Decompressor<'_> {
183        Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
184    }
185
186    pub(crate) fn compressor(&self) -> &Compressor {
187        self.compressor.as_ref()
188    }
189}
190
191impl ArrayVTable<FSSTVTable> for FSSTVTable {
192    fn len(array: &FSSTArray) -> usize {
193        array.codes().len()
194    }
195
196    fn dtype(array: &FSSTArray) -> &DType {
197        &array.dtype
198    }
199
200    fn stats(array: &FSSTArray) -> StatsSetRef<'_> {
201        array.stats_set.to_ref(array.as_ref())
202    }
203}
204
205impl ValidityChild<FSSTVTable> for FSSTVTable {
206    fn validity_child(array: &FSSTArray) -> &dyn Array {
207        array.codes().as_ref()
208    }
209}