Skip to main content

vortex_fsst/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::sync::Arc;
9use std::sync::LazyLock;
10
11use fsst::Compressor;
12use fsst::Decompressor;
13use fsst::Symbol;
14use prost::Message as _;
15use vortex_array::Array;
16use vortex_array::ArrayEq;
17use vortex_array::ArrayHash;
18use vortex_array::ArrayId;
19use vortex_array::ArrayParts;
20use vortex_array::ArrayRef;
21use vortex_array::ArraySlots;
22use vortex_array::ArrayView;
23use vortex_array::Canonical;
24use vortex_array::ExecutionCtx;
25use vortex_array::ExecutionResult;
26use vortex_array::IntoArray;
27use vortex_array::LEGACY_SESSION;
28use vortex_array::Precision;
29use vortex_array::TypedArrayRef;
30use vortex_array::VortexSessionExecute;
31use vortex_array::arrays::VarBin;
32use vortex_array::arrays::VarBinArray;
33use vortex_array::arrays::varbin::VarBinArrayExt;
34use vortex_array::buffer::BufferHandle;
35use vortex_array::builders::ArrayBuilder;
36use vortex_array::builders::VarBinViewBuilder;
37use vortex_array::dtype::DType;
38use vortex_array::dtype::Nullability;
39use vortex_array::dtype::PType;
40use vortex_array::serde::ArrayChildren;
41use vortex_array::smallvec::smallvec;
42use vortex_array::validity::Validity;
43use vortex_array::vtable::VTable;
44use vortex_array::vtable::ValidityVTable;
45use vortex_array::vtable::child_to_validity;
46use vortex_array::vtable::validity_to_child;
47use vortex_buffer::Buffer;
48use vortex_buffer::ByteBuffer;
49use vortex_error::VortexExpect;
50use vortex_error::VortexResult;
51use vortex_error::vortex_bail;
52use vortex_error::vortex_ensure;
53use vortex_error::vortex_err;
54use vortex_error::vortex_panic;
55use vortex_session::VortexSession;
56use vortex_session::registry::CachedId;
57
58use crate::canonical::canonicalize_fsst;
59use crate::canonical::fsst_decode_views;
60use crate::kernel::PARENT_KERNELS;
61use crate::rules::RULES;
62
63/// A [`FSST`]-encoded Vortex array.
64pub type FSSTArray = Array<FSST>;
65
66#[derive(Clone, prost::Message)]
67pub struct FSSTMetadata {
68    #[prost(enumeration = "PType", tag = "1")]
69    uncompressed_lengths_ptype: i32,
70
71    #[prost(enumeration = "PType", tag = "2")]
72    codes_offsets_ptype: i32,
73}
74
75impl FSSTMetadata {
76    pub fn get_uncompressed_lengths_ptype(&self) -> VortexResult<PType> {
77        PType::try_from(self.uncompressed_lengths_ptype)
78            .map_err(|_| vortex_err!("Invalid PType {}", self.uncompressed_lengths_ptype))
79    }
80}
81
82impl ArrayHash for FSSTData {
83    fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
84        self.symbols.array_hash(state, precision);
85        self.symbol_lengths.array_hash(state, precision);
86        self.codes_bytes.as_host().array_hash(state, precision);
87    }
88}
89
90impl ArrayEq for FSSTData {
91    fn array_eq(&self, other: &Self, precision: Precision) -> bool {
92        self.symbols.array_eq(&other.symbols, precision)
93            && self
94                .symbol_lengths
95                .array_eq(&other.symbol_lengths, precision)
96            && self
97                .codes_bytes
98                .as_host()
99                .array_eq(other.codes_bytes.as_host(), precision)
100    }
101}
102
103impl VTable for FSST {
104    type TypedArrayData = FSSTData;
105    type OperationsVTable = Self;
106    type ValidityVTable = Self;
107
108    fn id(&self) -> ArrayId {
109        static ID: CachedId = CachedId::new("vortex.fsst");
110        *ID
111    }
112
113    fn validate(
114        &self,
115        data: &Self::TypedArrayData,
116        dtype: &DType,
117        len: usize,
118        slots: &[Option<ArrayRef>],
119    ) -> VortexResult<()> {
120        // TODO(ctx): trait fixes - VTable::validate has a fixed signature.
121        let mut ctx = LEGACY_SESSION.create_execution_ctx();
122        data.validate(dtype, len, slots, &mut ctx)
123    }
124
125    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
126        3
127    }
128
129    fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
130        match idx {
131            0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()),
132            1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
133            2 => array.codes_bytes_handle().clone(),
134            _ => vortex_panic!("FSSTArray buffer index {idx} out of bounds"),
135        }
136    }
137
138    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
139        match idx {
140            0 => Some("symbols".to_string()),
141            1 => Some("symbol_lengths".to_string()),
142            2 => Some("compressed_codes".to_string()),
143            _ => vortex_panic!("FSSTArray buffer_name index {idx} out of bounds"),
144        }
145    }
146
147    fn serialize(
148        array: ArrayView<'_, Self>,
149        _session: &VortexSession,
150    ) -> VortexResult<Option<Vec<u8>>> {
151        let codes_offsets = array.as_ref().slots()[CODES_OFFSETS_SLOT]
152            .as_ref()
153            .vortex_expect("FSSTArray codes_offsets slot");
154        Ok(Some(
155            FSSTMetadata {
156                uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
157                codes_offsets_ptype: codes_offsets.dtype().as_ptype().into(),
158            }
159            .encode_to_vec(),
160        ))
161    }
162
163    /// Deserializes an FSST array from its serialized components.
164    ///
165    /// Supports two serialization formats:
166    ///
167    /// ## Legacy format (2 buffers, 2 children)
168    ///
169    /// The original FSST layout stored the compressed codes as a full `VarBinArray` child.
170    /// - **Buffers**: `[symbols, symbol_lengths]`
171    /// - **Children**: `[codes (VarBinArray), uncompressed_lengths (Primitive)]`
172    ///
173    /// The codes VarBinArray child is decomposed: its bytes become the `codes_bytes` buffer,
174    /// and its offsets/validity are extracted into slots.
175    /// See `FSST::deserialize_legacy`.
176    ///
177    /// ## Current format (3 buffers, 2-3 children)
178    ///
179    /// The current layout stores the compressed bytes as a raw buffer alongside the symbol
180    /// table, with offsets and validity as separate children.
181    /// - **Buffers**: `[symbols, symbol_lengths, compressed_codes_bytes]`
182    /// - **Children**: `[uncompressed_lengths, codes_offsets, (optional) codes_validity]`
183    ///
184    /// The `codes_bytes` buffer is stored directly in `FSSTData`. A `VarBinArray` for the
185    /// codes can be reconstructed on demand via [`FSSTArrayExt::codes()`] using the bytes
186    /// from `FSSTData` combined with offsets and validity from the array's slots.
187    fn deserialize(
188        &self,
189        dtype: &DType,
190        len: usize,
191        metadata: &[u8],
192        buffers: &[BufferHandle],
193        children: &dyn ArrayChildren,
194        session: &VortexSession,
195    ) -> VortexResult<ArrayParts<Self>> {
196        let metadata = FSSTMetadata::decode(metadata)?;
197        let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
198        let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);
199
200        let mut ctx = session.create_execution_ctx();
201        if buffers.len() == 2 {
202            return Self::deserialize_legacy(
203                self,
204                dtype,
205                len,
206                &metadata,
207                &symbols,
208                &symbol_lengths,
209                children,
210                &mut ctx,
211            );
212        }
213
214        if buffers.len() == 3 {
215            let uncompressed_lengths = children.get(
216                0,
217                &DType::Primitive(
218                    metadata.get_uncompressed_lengths_ptype()?,
219                    Nullability::NonNullable,
220                ),
221                len,
222            )?;
223
224            let codes_bytes = buffers[2].clone();
225            let codes_offsets = children.get(
226                1,
227                &DType::Primitive(
228                    PType::try_from(metadata.codes_offsets_ptype)?,
229                    Nullability::NonNullable,
230                ),
231                // VarBin offsets are len + 1
232                len + 1,
233            )?;
234
235            let codes_validity = if children.len() == 2 {
236                Validity::from(dtype.nullability())
237            } else if children.len() == 3 {
238                let validity = children.get(2, &Validity::DTYPE, len)?;
239                Validity::Array(validity)
240            } else {
241                vortex_bail!("Expected 2 or 3 children, got {}", children.len());
242            };
243
244            FSSTData::validate_parts(
245                &symbols,
246                &symbol_lengths,
247                &codes_bytes,
248                &codes_offsets,
249                dtype.nullability(),
250                &uncompressed_lengths,
251                dtype,
252                len,
253                &mut ctx,
254            )?;
255            let slots = smallvec![
256                Some(uncompressed_lengths),
257                Some(codes_offsets),
258                validity_to_child(&codes_validity, len),
259            ];
260            let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
261            return Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots));
262        }
263
264        vortex_bail!(
265            "InvalidArgument: Expected 2 or 3 buffers, got {}",
266            buffers.len()
267        );
268    }
269
270    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
271        SLOT_NAMES[idx].to_string()
272    }
273
274    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
275        canonicalize_fsst(array.as_view(), ctx).map(ExecutionResult::done)
276    }
277
278    fn append_to_builder(
279        array: ArrayView<'_, Self>,
280        builder: &mut dyn ArrayBuilder,
281        ctx: &mut ExecutionCtx,
282    ) -> VortexResult<()> {
283        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
284            builder.extend_from_array(
285                &array
286                    .array()
287                    .clone()
288                    .execute::<Canonical>(ctx)?
289                    .into_array(),
290            );
291            return Ok(());
292        };
293
294        // Decompress the whole block of data into a new buffer, and create some views
295        // from it instead. The new buffer lands after any pending in-progress
296        // buffer that push_buffer_and_adjusted_views will flush first.
297        let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress());
298        let (buffers, views) = fsst_decode_views(array, next_buffer_index, ctx)?;
299
300        builder.push_buffer_and_adjusted_views(
301            &buffers,
302            &views,
303            array
304                .array()
305                .validity()?
306                .execute_mask(array.array().len(), ctx)?,
307        );
308        Ok(())
309    }
310
311    fn execute_parent(
312        array: ArrayView<'_, Self>,
313        parent: &ArrayRef,
314        child_idx: usize,
315        ctx: &mut ExecutionCtx,
316    ) -> VortexResult<Option<ArrayRef>> {
317        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
318    }
319
320    fn reduce_parent(
321        array: ArrayView<'_, Self>,
322        parent: &ArrayRef,
323        child_idx: usize,
324    ) -> VortexResult<Option<ArrayRef>> {
325        RULES.evaluate(array, parent, child_idx)
326    }
327}
328
329/// Lengths of the original values before compression, can be compressed.
330pub(crate) const UNCOMPRESSED_LENGTHS_SLOT: usize = 0;
331/// The offsets array for the FSST-compressed codes.
332pub(crate) const CODES_OFFSETS_SLOT: usize = 1;
333/// The validity bitmap for the compressed codes.
334pub(crate) const CODES_VALIDITY_SLOT: usize = 2;
335pub(crate) const NUM_SLOTS: usize = 3;
336pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
337    ["uncompressed_lengths", "codes_offsets", "codes_validity"];
338
339/// The inner data for an FSST-compressed array.
340///
341/// Holds the FSST symbol table (`symbols` + `symbol_lengths`) and the raw compressed
342/// codes bytes buffer. The codes offsets and validity live in the outer array's slots
343/// (slots 1 and 2 respectively).
344///
345/// A full [`VarBinArray`] representing the codes can be reconstructed on demand via
346/// [`FSSTArrayExt::codes()`], combining this buffer with the offsets/validity from slots.
347#[derive(Clone)]
348pub struct FSSTData {
349    symbols: Buffer<Symbol>,
350    symbol_lengths: Buffer<u8>,
351    /// The raw compressed codes bytes, equivalent to `VarBinData::bytes`.
352    codes_bytes: BufferHandle,
353    /// Cached length (number of elements).
354    len: usize,
355
356    /// Memoized compressor used for push-down of compute by compressing the RHS.
357    compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
358}
359
360impl Display for FSSTData {
361    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
362        write!(f, "len: {}, nsymbols: {}", self.len, self.symbols.len())
363    }
364}
365
366impl Debug for FSSTData {
367    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
368        f.debug_struct("FSSTArray")
369            .field("symbols", &self.symbols)
370            .field("symbol_lengths", &self.symbol_lengths)
371            .field("codes_bytes_len", &self.codes_bytes.len())
372            .field("len", &self.len)
373            .field("uncompressed_lengths", &"<outer slot>")
374            .field("codes_offsets", &"<outer slot>")
375            .field("codes_validity", &"<outer slot>")
376            .finish()
377    }
378}
379
380#[derive(Clone, Debug)]
381pub struct FSST;
382
383impl FSST {
384    /// Build an FSST array from a set of `symbols` and `codes`.
385    ///
386    /// The `codes` VarBinArray is decomposed: its bytes are stored in [`FSSTData`], while
387    /// its offsets and validity become array slots. The codes VarBinArray can be
388    /// reconstructed on demand via [`FSSTArrayExt::codes()`].
389    pub fn try_new(
390        dtype: DType,
391        symbols: Buffer<Symbol>,
392        symbol_lengths: Buffer<u8>,
393        codes: VarBinArray,
394        uncompressed_lengths: ArrayRef,
395        ctx: &mut ExecutionCtx,
396    ) -> VortexResult<FSSTArray> {
397        let len = codes.len();
398        FSSTData::validate_parts_from_codes(
399            &symbols,
400            &symbol_lengths,
401            &codes,
402            &uncompressed_lengths,
403            &dtype,
404            len,
405            ctx,
406        )?;
407        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
408        let codes_bytes = codes.bytes_handle().clone();
409        let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
410        Ok(unsafe {
411            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
412        })
413    }
414
415    /// Legacy deserialization path (2 buffers): the codes were stored as a full
416    /// `VarBinArray` child. We decompose the VarBinArray into its bytes (stored in
417    /// FSSTData) and offsets/validity (stored in slots).
418    #[allow(clippy::too_many_arguments)]
419    fn deserialize_legacy(
420        &self,
421        dtype: &DType,
422        len: usize,
423        metadata: &FSSTMetadata,
424        symbols: &Buffer<Symbol>,
425        symbol_lengths: &Buffer<u8>,
426        children: &dyn ArrayChildren,
427        ctx: &mut ExecutionCtx,
428    ) -> VortexResult<ArrayParts<Self>> {
429        if children.len() != 2 {
430            vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
431        }
432        let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
433        let codes: VarBinArray = codes
434            .as_opt::<VarBin>()
435            .ok_or_else(|| {
436                vortex_err!(
437                    "Expected VarBinArray for codes, got {}",
438                    codes.encoding_id()
439                )
440            })?
441            .into_owned();
442        let uncompressed_lengths = children.get(
443            1,
444            &DType::Primitive(
445                metadata.get_uncompressed_lengths_ptype()?,
446                Nullability::NonNullable,
447            ),
448            len,
449        )?;
450
451        FSSTData::validate_parts_from_codes(
452            symbols,
453            symbol_lengths,
454            &codes,
455            &uncompressed_lengths,
456            dtype,
457            len,
458            ctx,
459        )?;
460        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
461        let codes_bytes = codes.bytes_handle().clone();
462        let data = FSSTData::try_new(symbols.clone(), symbol_lengths.clone(), codes_bytes, len)?;
463        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
464    }
465
466    pub(crate) unsafe fn new_unchecked(
467        dtype: DType,
468        symbols: Buffer<Symbol>,
469        symbol_lengths: Buffer<u8>,
470        codes: VarBinArray,
471        uncompressed_lengths: ArrayRef,
472    ) -> FSSTArray {
473        let len = codes.len();
474        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
475        let codes_bytes = codes.bytes_handle().clone();
476        let data = unsafe { FSSTData::new_unchecked(symbols, symbol_lengths, codes_bytes, len) };
477        unsafe {
478            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
479        }
480    }
481}
482
483impl FSSTData {
484    fn make_slots(codes: &VarBinArray, uncompressed_lengths: &ArrayRef) -> ArraySlots {
485        smallvec![
486            Some(uncompressed_lengths.clone()),
487            Some(codes.offsets().clone()),
488            validity_to_child(
489                &codes
490                    .validity()
491                    .vortex_expect("FSST codes validity should be derivable"),
492                codes.len(),
493            ),
494        ]
495    }
496
497    /// Build FSST data from a set of `symbols`, `symbol_lengths`, and compressed codes bytes.
498    ///
499    /// Symbols are 8-bytes and can represent short strings, each of which is assigned
500    /// a code.
501    ///
502    /// The `codes_bytes` buffer contains the concatenated compressed bytecodes for all elements.
503    /// Each element's compressed bytecodes are a sequence of 8-bit codes, where each code
504    /// corresponds either to a symbol or to the "escape code" (which tells the decoder to
505    /// emit the following byte without doing a table lookup).
506    ///
507    /// The offsets and validity for the codes are stored in the array's slots, not here.
508    /// Use [`FSSTArrayExt::codes()`] to reconstruct a full `VarBinArray`.
509    pub fn try_new(
510        symbols: Buffer<Symbol>,
511        symbol_lengths: Buffer<u8>,
512        codes_bytes: BufferHandle,
513        len: usize,
514    ) -> VortexResult<Self> {
515        // SAFETY: all components validated above
516        unsafe {
517            Ok(Self::new_unchecked(
518                symbols,
519                symbol_lengths,
520                codes_bytes,
521                len,
522            ))
523        }
524    }
525
526    pub fn validate(
527        &self,
528        dtype: &DType,
529        len: usize,
530        slots: &[Option<ArrayRef>],
531        ctx: &mut ExecutionCtx,
532    ) -> VortexResult<()> {
533        let codes_offsets = slots[CODES_OFFSETS_SLOT]
534            .as_ref()
535            .vortex_expect("FSSTArray codes_offsets slot");
536        Self::validate_parts(
537            &self.symbols,
538            &self.symbol_lengths,
539            &self.codes_bytes,
540            codes_offsets,
541            dtype.nullability(),
542            uncompressed_lengths_from_slots(slots),
543            dtype,
544            len,
545            ctx,
546        )
547    }
548
549    /// Validate using the decomposed components (codes bytes + offsets + nullability).
550    #[expect(clippy::too_many_arguments)]
551    fn validate_parts(
552        symbols: &Buffer<Symbol>,
553        symbol_lengths: &Buffer<u8>,
554        codes_bytes: &BufferHandle,
555        codes_offsets: &ArrayRef,
556        codes_nullability: Nullability,
557        uncompressed_lengths: &ArrayRef,
558        dtype: &DType,
559        len: usize,
560        ctx: &mut ExecutionCtx,
561    ) -> VortexResult<()> {
562        vortex_ensure!(
563            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
564            "FSST arrays must be Binary or Utf8, found {dtype}"
565        );
566
567        if symbols.len() > 255 {
568            vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
569        }
570        if symbols.len() != symbol_lengths.len() {
571            vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
572        }
573
574        // codes_offsets.len() - 1 == number of elements
575        let codes_len = codes_offsets.len().saturating_sub(1);
576        if codes_len != len {
577            vortex_bail!(InvalidArgument: "codes must have same len as outer array");
578        }
579
580        if uncompressed_lengths.len() != len {
581            vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
582        }
583
584        if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
585            vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
586        }
587
588        // Offsets must be non-nullable integer.
589        if !codes_offsets.dtype().is_int() || codes_offsets.dtype().is_nullable() {
590            vortex_bail!(InvalidArgument: "codes offsets must be non-nullable integer type, found {}", codes_offsets.dtype());
591        }
592
593        if codes_nullability != dtype.nullability() {
594            vortex_bail!(InvalidArgument: "codes nullability must match outer dtype nullability");
595        }
596
597        // Validate that last offset doesn't exceed bytes length (when host-resident).
598        if codes_bytes.is_on_host() && codes_offsets.is_host() && !codes_offsets.is_empty() {
599            let last_offset: usize = (&codes_offsets
600                .execute_scalar(codes_offsets.len() - 1, ctx)
601                .vortex_expect("offsets must support scalar_at"))
602                .try_into()
603                .vortex_expect("Failed to convert offset to usize");
604            vortex_ensure!(
605                last_offset <= codes_bytes.len(),
606                InvalidArgument: "Last codes offset {} exceeds codes bytes length {}",
607                last_offset,
608                codes_bytes.len()
609            );
610        }
611
612        Ok(())
613    }
614
615    /// Validate using a VarBinArray for the codes (convenience for construction paths).
616    fn validate_parts_from_codes(
617        symbols: &Buffer<Symbol>,
618        symbol_lengths: &Buffer<u8>,
619        codes: &VarBinArray,
620        uncompressed_lengths: &ArrayRef,
621        dtype: &DType,
622        len: usize,
623        ctx: &mut ExecutionCtx,
624    ) -> VortexResult<()> {
625        Self::validate_parts(
626            symbols,
627            symbol_lengths,
628            codes.bytes_handle(),
629            codes.offsets(),
630            codes.dtype().nullability(),
631            uncompressed_lengths,
632            dtype,
633            len,
634            ctx,
635        )
636    }
637
638    pub(crate) unsafe fn new_unchecked(
639        symbols: Buffer<Symbol>,
640        symbol_lengths: Buffer<u8>,
641        codes_bytes: BufferHandle,
642        len: usize,
643    ) -> Self {
644        let symbols2 = symbols.clone();
645        let symbol_lengths2 = symbol_lengths.clone();
646        let compressor = Arc::new(LazyLock::new(Box::new(move || {
647            Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
648        })
649            as Box<dyn Fn() -> Compressor + Send>));
650        Self {
651            symbols,
652            symbol_lengths,
653            codes_bytes,
654            len,
655            compressor,
656        }
657    }
658
659    /// Returns the number of elements in the array.
660    pub fn len(&self) -> usize {
661        self.len
662    }
663
664    /// Returns `true` if the array contains no elements.
665    pub fn is_empty(&self) -> bool {
666        self.len == 0
667    }
668
669    /// Access the symbol table array.
670    pub fn symbols(&self) -> &Buffer<Symbol> {
671        &self.symbols
672    }
673
674    /// Access the symbol lengths array.
675    pub fn symbol_lengths(&self) -> &Buffer<u8> {
676        &self.symbol_lengths
677    }
678
679    /// Access the compressed codes bytes buffer handle (may be on host or device).
680    pub fn codes_bytes_handle(&self) -> &BufferHandle {
681        &self.codes_bytes
682    }
683
684    /// Access the compressed codes bytes on the host.
685    pub fn codes_bytes(&self) -> &ByteBuffer {
686        self.codes_bytes.as_host()
687    }
688
689    /// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
690    /// this array.
691    pub fn decompressor(&self) -> Decompressor<'_> {
692        Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
693    }
694
695    /// Retrieves the FSST compressor.
696    pub fn compressor(&self) -> &Compressor {
697        self.compressor.as_ref()
698    }
699}
700
701fn uncompressed_lengths_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
702    slots[UNCOMPRESSED_LENGTHS_SLOT]
703        .as_ref()
704        .vortex_expect("FSSTArray uncompressed_lengths slot")
705}
706
707pub trait FSSTArrayExt: TypedArrayRef<FSST> {
708    fn uncompressed_lengths(&self) -> &ArrayRef {
709        uncompressed_lengths_from_slots(self.as_ref().slots())
710    }
711
712    fn uncompressed_lengths_dtype(&self) -> &DType {
713        self.uncompressed_lengths().dtype()
714    }
715
716    /// Reconstruct a [`VarBinArray`] for the compressed codes by combining the bytes
717    /// from [`FSSTData`] with the offsets and validity stored in the array's slots.
718    fn codes(&self) -> VarBinArray {
719        let offsets = self.as_ref().slots()[CODES_OFFSETS_SLOT]
720            .as_ref()
721            .vortex_expect("FSSTArray codes_offsets slot")
722            .clone();
723        let validity = child_to_validity(
724            self.as_ref().slots()[CODES_VALIDITY_SLOT].as_ref(),
725            self.as_ref().dtype().nullability(),
726        );
727        let codes_bytes = self.codes_bytes_handle().clone();
728        // SAFETY: components were validated at construction time.
729        unsafe {
730            VarBinArray::new_unchecked_from_handle(
731                offsets,
732                codes_bytes,
733                DType::Binary(self.as_ref().dtype().nullability()),
734                validity,
735            )
736        }
737    }
738
739    /// Get the DType of the codes array.
740    fn codes_dtype(&self) -> DType {
741        DType::Binary(self.as_ref().dtype().nullability())
742    }
743}
744
745impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}
746
747impl ValidityVTable<FSST> for FSST {
748    fn validity(array: ArrayView<'_, FSST>) -> VortexResult<Validity> {
749        Ok(child_to_validity(
750            array.slots()[CODES_VALIDITY_SLOT].as_ref(),
751            array.dtype().nullability(),
752        ))
753    }
754}
755
756#[cfg(test)]
757mod test {
758    use fsst::Compressor;
759    use fsst::Symbol;
760    use prost::Message;
761    use vortex_array::ArrayPlugin;
762    use vortex_array::IntoArray;
763    use vortex_array::LEGACY_SESSION;
764    use vortex_array::VortexSessionExecute;
765    use vortex_array::accessor::ArrayAccessor;
766    use vortex_array::arrays::VarBinViewArray;
767    use vortex_array::buffer::BufferHandle;
768    use vortex_array::dtype::DType;
769    use vortex_array::dtype::Nullability;
770    use vortex_array::dtype::PType;
771    use vortex_array::test_harness::check_metadata;
772    use vortex_buffer::Buffer;
773    use vortex_error::VortexError;
774
775    use crate::FSST;
776    use crate::array::FSSTArrayExt;
777    use crate::array::FSSTMetadata;
778    use crate::fsst_compress_iter;
779
780    #[cfg_attr(miri, ignore)]
781    #[test]
782    fn test_fsst_metadata() {
783        check_metadata(
784            "fsst.metadata",
785            &FSSTMetadata {
786                uncompressed_lengths_ptype: PType::U64 as i32,
787                codes_offsets_ptype: PType::I32 as i32,
788            }
789            .encode_to_vec(),
790        );
791    }
792
793    /// The original FSST array stored codes as a VarBinArray child and required that the child
794    /// have this encoding. Vortex forbids this kind of introspection, therefore we had to fix
795    /// the array to store the compressed offsets and compressed data buffer separately, and only
796    /// use VarBinArray to delegate behavior.
797    ///
798    /// This test manually constructs an old-style FSST array and ensures that it can still be
799    /// deserialized.
800    #[test]
801    fn test_back_compat() {
802        let symbols = Buffer::<Symbol>::copy_from([
803            Symbol::from_slice(b"abc00000"),
804            Symbol::from_slice(b"defghijk"),
805        ]);
806        let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);
807
808        let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
809        let mut ctx = LEGACY_SESSION.create_execution_ctx();
810        let fsst_array = fsst_compress_iter(
811            [Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
812            2,
813            DType::Utf8(Nullability::NonNullable),
814            &compressor,
815            &mut ctx,
816        );
817
818        let compressed_codes = fsst_array.codes();
819
820        // There were two buffers:
821        // 1. The 8 byte symbols
822        // 2. The symbol lengths as u8.
823        let buffers = [
824            BufferHandle::new_host(symbols.into_byte_buffer()),
825            BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
826        ];
827
828        // There were 2 children:
829        // 1. The compressed codes, stored as a VarBinArray.
830        // 2. The uncompressed lengths, stored as a Primitive array.
831        let children = vec![
832            compressed_codes.into_array(),
833            fsst_array.uncompressed_lengths().clone(),
834        ];
835
836        let fsst = ArrayPlugin::deserialize(
837            &FSST,
838            &DType::Utf8(Nullability::NonNullable),
839            2,
840            &FSSTMetadata {
841                uncompressed_lengths_ptype: fsst_array
842                    .uncompressed_lengths()
843                    .dtype()
844                    .as_ptype()
845                    .into(),
846                // Legacy array did not store this field, use Protobuf default of 0.
847                codes_offsets_ptype: 0,
848            }
849            .encode_to_vec(),
850            &buffers,
851            &children.as_slice(),
852            &LEGACY_SESSION,
853        )
854        .unwrap();
855
856        let decompressed = fsst
857            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
858            .unwrap();
859        decompressed
860            .with_iterator(|it| {
861                assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
862                assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
863                Ok::<_, VortexError>(())
864            })
865            .unwrap()
866    }
867}