1use std::fmt::{Debug, Formatter};
5use std::sync::{Arc, LazyLock};
6
7use fsst::{Compressor, Decompressor, Symbol};
8use vortex_array::arrays::VarBinArray;
9use vortex_array::stats::{ArrayStats, StatsSetRef};
10use vortex_array::vtable::{
11 ArrayVTable, NotSupported, VTable, ValidityChild, ValidityVTableFromChild,
12};
13use vortex_array::{Array, ArrayRef, EncodingId, EncodingRef, vtable};
14use vortex_buffer::Buffer;
15use vortex_dtype::DType;
16use vortex_error::{VortexResult, vortex_bail};
17
18vtable!(FSST);
19
20impl VTable for FSSTVTable {
21 type Array = FSSTArray;
22 type Encoding = FSSTEncoding;
23
24 type ArrayVTable = Self;
25 type CanonicalVTable = Self;
26 type OperationsVTable = Self;
27 type ValidityVTable = ValidityVTableFromChild;
28 type VisitorVTable = Self;
29 type ComputeVTable = NotSupported;
30 type EncodeVTable = Self;
31 type SerdeVTable = Self;
32 type PipelineVTable = NotSupported;
33
34 fn id(_encoding: &Self::Encoding) -> EncodingId {
35 EncodingId::new_ref("vortex.fsst")
36 }
37
38 fn encoding(_array: &Self::Array) -> EncodingRef {
39 EncodingRef::new_ref(FSSTEncoding.as_ref())
40 }
41}
42
43#[derive(Clone)]
44pub struct FSSTArray {
45 dtype: DType,
46 symbols: Buffer<Symbol>,
47 symbol_lengths: Buffer<u8>,
48 codes: VarBinArray,
49 uncompressed_lengths: ArrayRef,
51 stats_set: ArrayStats,
52
53 compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
55}
56
57impl Debug for FSSTArray {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("FSSTArray")
60 .field("dtype", &self.dtype)
61 .field("symbols", &self.symbols)
62 .field("symbol_lengths", &self.symbol_lengths)
63 .field("codes", &self.codes)
64 .field("uncompressed_lengths", &self.uncompressed_lengths)
65 .finish()
66 }
67}
68
69#[derive(Clone, Debug)]
70pub struct FSSTEncoding;
71
72impl FSSTArray {
73 pub fn try_new(
82 dtype: DType,
83 symbols: Buffer<Symbol>,
84 symbol_lengths: Buffer<u8>,
85 codes: VarBinArray,
86 uncompressed_lengths: ArrayRef,
87 ) -> VortexResult<Self> {
88 if symbols.len() > 255 {
90 vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
91 }
92 if symbols.len() != symbol_lengths.len() {
93 vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
94 }
95
96 if uncompressed_lengths.len() != codes.len() {
97 vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
98 }
99
100 if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
101 vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
102 }
103
104 if !matches!(codes.dtype(), DType::Binary(_)) {
106 vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
107 }
108
109 unsafe {
111 Ok(Self::new_unchecked(
112 dtype,
113 symbols,
114 symbol_lengths,
115 codes,
116 uncompressed_lengths,
117 ))
118 }
119 }
120
121 pub(crate) unsafe fn new_unchecked(
122 dtype: DType,
123 symbols: Buffer<Symbol>,
124 symbol_lengths: Buffer<u8>,
125 codes: VarBinArray,
126 uncompressed_lengths: ArrayRef,
127 ) -> Self {
128 let symbols2 = symbols.clone();
129 let symbol_lengths2 = symbol_lengths.clone();
130 let compressor = Arc::new(LazyLock::new(Box::new(move || {
131 Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
132 })
133 as Box<dyn Fn() -> Compressor + Send>));
134
135 Self {
136 dtype,
137 symbols,
138 symbol_lengths,
139 codes,
140 uncompressed_lengths,
141 stats_set: Default::default(),
142 compressor,
143 }
144 }
145
146 pub fn symbols(&self) -> &Buffer<Symbol> {
148 &self.symbols
149 }
150
151 pub fn symbol_lengths(&self) -> &Buffer<u8> {
153 &self.symbol_lengths
154 }
155
156 pub fn codes(&self) -> &VarBinArray {
158 &self.codes
159 }
160
161 #[inline]
163 pub fn codes_dtype(&self) -> &DType {
164 self.codes.dtype()
165 }
166
167 pub fn uncompressed_lengths(&self) -> &ArrayRef {
169 &self.uncompressed_lengths
170 }
171
172 #[inline]
174 pub fn uncompressed_lengths_dtype(&self) -> &DType {
175 self.uncompressed_lengths.dtype()
176 }
177
178 pub(crate) fn decompressor(&self) -> Decompressor<'_> {
183 Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
184 }
185
186 pub(crate) fn compressor(&self) -> &Compressor {
187 self.compressor.as_ref()
188 }
189}
190
191impl ArrayVTable<FSSTVTable> for FSSTVTable {
192 fn len(array: &FSSTArray) -> usize {
193 array.codes().len()
194 }
195
196 fn dtype(array: &FSSTArray) -> &DType {
197 &array.dtype
198 }
199
200 fn stats(array: &FSSTArray) -> StatsSetRef<'_> {
201 array.stats_set.to_ref(array.as_ref())
202 }
203}
204
205impl ValidityChild<FSSTVTable> for FSSTVTable {
206 fn validity_child(array: &FSSTArray) -> &dyn Array {
207 array.codes().as_ref()
208 }
209}