Skip to main content

vortex_fsst/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::sync::Arc;
9use std::sync::LazyLock;
10
11use fsst::Compressor;
12use fsst::Decompressor;
13use fsst::Symbol;
14use prost::Message as _;
15use vortex_array::Array;
16use vortex_array::ArrayEq;
17use vortex_array::ArrayHash;
18use vortex_array::ArrayId;
19use vortex_array::ArrayParts;
20use vortex_array::ArrayRef;
21use vortex_array::ArrayView;
22use vortex_array::Canonical;
23use vortex_array::ExecutionCtx;
24use vortex_array::ExecutionResult;
25use vortex_array::IntoArray;
26use vortex_array::LEGACY_SESSION;
27use vortex_array::Precision;
28use vortex_array::TypedArrayRef;
29use vortex_array::VortexSessionExecute;
30use vortex_array::arrays::VarBin;
31use vortex_array::arrays::VarBinArray;
32use vortex_array::arrays::varbin::VarBinArrayExt;
33use vortex_array::buffer::BufferHandle;
34use vortex_array::builders::ArrayBuilder;
35use vortex_array::builders::VarBinViewBuilder;
36use vortex_array::dtype::DType;
37use vortex_array::dtype::Nullability;
38use vortex_array::dtype::PType;
39use vortex_array::serde::ArrayChildren;
40use vortex_array::validity::Validity;
41use vortex_array::vtable::VTable;
42use vortex_array::vtable::ValidityVTable;
43use vortex_array::vtable::child_to_validity;
44use vortex_array::vtable::validity_to_child;
45use vortex_buffer::Buffer;
46use vortex_buffer::ByteBuffer;
47use vortex_error::VortexExpect;
48use vortex_error::VortexResult;
49use vortex_error::vortex_bail;
50use vortex_error::vortex_ensure;
51use vortex_error::vortex_err;
52use vortex_error::vortex_panic;
53use vortex_session::VortexSession;
54use vortex_session::registry::CachedId;
55
56use crate::canonical::canonicalize_fsst;
57use crate::canonical::fsst_decode_views;
58use crate::kernel::PARENT_KERNELS;
59use crate::rules::RULES;
60
61/// A [`FSST`]-encoded Vortex array.
62pub type FSSTArray = Array<FSST>;
63
64#[derive(Clone, prost::Message)]
65pub struct FSSTMetadata {
66    #[prost(enumeration = "PType", tag = "1")]
67    uncompressed_lengths_ptype: i32,
68
69    #[prost(enumeration = "PType", tag = "2")]
70    codes_offsets_ptype: i32,
71}
72
73impl FSSTMetadata {
74    pub fn get_uncompressed_lengths_ptype(&self) -> VortexResult<PType> {
75        PType::try_from(self.uncompressed_lengths_ptype)
76            .map_err(|_| vortex_err!("Invalid PType {}", self.uncompressed_lengths_ptype))
77    }
78}
79
80impl ArrayHash for FSSTData {
81    fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
82        self.symbols.array_hash(state, precision);
83        self.symbol_lengths.array_hash(state, precision);
84        self.codes_bytes.as_host().array_hash(state, precision);
85    }
86}
87
88impl ArrayEq for FSSTData {
89    fn array_eq(&self, other: &Self, precision: Precision) -> bool {
90        self.symbols.array_eq(&other.symbols, precision)
91            && self
92                .symbol_lengths
93                .array_eq(&other.symbol_lengths, precision)
94            && self
95                .codes_bytes
96                .as_host()
97                .array_eq(other.codes_bytes.as_host(), precision)
98    }
99}
100
101impl VTable for FSST {
102    type ArrayData = FSSTData;
103    type OperationsVTable = Self;
104    type ValidityVTable = Self;
105
106    fn id(&self) -> ArrayId {
107        static ID: CachedId = CachedId::new("vortex.fsst");
108        *ID
109    }
110
111    fn validate(
112        &self,
113        data: &Self::ArrayData,
114        dtype: &DType,
115        len: usize,
116        slots: &[Option<ArrayRef>],
117    ) -> VortexResult<()> {
118        // TODO(ctx): trait fixes - VTable::validate has a fixed signature.
119        let mut ctx = LEGACY_SESSION.create_execution_ctx();
120        data.validate(dtype, len, slots, &mut ctx)
121    }
122
123    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
124        3
125    }
126
127    fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
128        match idx {
129            0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()),
130            1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
131            2 => array.codes_bytes_handle().clone(),
132            _ => vortex_panic!("FSSTArray buffer index {idx} out of bounds"),
133        }
134    }
135
136    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
137        match idx {
138            0 => Some("symbols".to_string()),
139            1 => Some("symbol_lengths".to_string()),
140            2 => Some("compressed_codes".to_string()),
141            _ => vortex_panic!("FSSTArray buffer_name index {idx} out of bounds"),
142        }
143    }
144
145    fn serialize(
146        array: ArrayView<'_, Self>,
147        _session: &VortexSession,
148    ) -> VortexResult<Option<Vec<u8>>> {
149        let codes_offsets = array.as_ref().slots()[CODES_OFFSETS_SLOT]
150            .as_ref()
151            .vortex_expect("FSSTArray codes_offsets slot");
152        Ok(Some(
153            FSSTMetadata {
154                uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
155                codes_offsets_ptype: codes_offsets.dtype().as_ptype().into(),
156            }
157            .encode_to_vec(),
158        ))
159    }
160
161    /// Deserializes an FSST array from its serialized components.
162    ///
163    /// Supports two serialization formats:
164    ///
165    /// ## Legacy format (2 buffers, 2 children)
166    ///
167    /// The original FSST layout stored the compressed codes as a full `VarBinArray` child.
168    /// - **Buffers**: `[symbols, symbol_lengths]`
169    /// - **Children**: `[codes (VarBinArray), uncompressed_lengths (Primitive)]`
170    ///
171    /// The codes VarBinArray child is decomposed: its bytes become the `codes_bytes` buffer,
172    /// and its offsets/validity are extracted into slots.
173    /// See `FSST::deserialize_legacy`.
174    ///
175    /// ## Current format (3 buffers, 2-3 children)
176    ///
177    /// The current layout stores the compressed bytes as a raw buffer alongside the symbol
178    /// table, with offsets and validity as separate children.
179    /// - **Buffers**: `[symbols, symbol_lengths, compressed_codes_bytes]`
180    /// - **Children**: `[uncompressed_lengths, codes_offsets, (optional) codes_validity]`
181    ///
182    /// The `codes_bytes` buffer is stored directly in `FSSTData`. A `VarBinArray` for the
183    /// codes can be reconstructed on demand via [`FSSTArrayExt::codes()`] using the bytes
184    /// from `FSSTData` combined with offsets and validity from the array's slots.
185    fn deserialize(
186        &self,
187        dtype: &DType,
188        len: usize,
189        metadata: &[u8],
190        buffers: &[BufferHandle],
191        children: &dyn ArrayChildren,
192        session: &VortexSession,
193    ) -> VortexResult<ArrayParts<Self>> {
194        let metadata = FSSTMetadata::decode(metadata)?;
195        let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
196        let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);
197
198        let mut ctx = session.create_execution_ctx();
199        if buffers.len() == 2 {
200            return Self::deserialize_legacy(
201                self,
202                dtype,
203                len,
204                &metadata,
205                &symbols,
206                &symbol_lengths,
207                children,
208                &mut ctx,
209            );
210        }
211
212        if buffers.len() == 3 {
213            let uncompressed_lengths = children.get(
214                0,
215                &DType::Primitive(
216                    metadata.get_uncompressed_lengths_ptype()?,
217                    Nullability::NonNullable,
218                ),
219                len,
220            )?;
221
222            let codes_bytes = buffers[2].clone();
223            let codes_offsets = children.get(
224                1,
225                &DType::Primitive(
226                    PType::try_from(metadata.codes_offsets_ptype)?,
227                    Nullability::NonNullable,
228                ),
229                // VarBin offsets are len + 1
230                len + 1,
231            )?;
232
233            let codes_validity = if children.len() == 2 {
234                Validity::from(dtype.nullability())
235            } else if children.len() == 3 {
236                let validity = children.get(2, &Validity::DTYPE, len)?;
237                Validity::Array(validity)
238            } else {
239                vortex_bail!("Expected 2 or 3 children, got {}", children.len());
240            };
241
242            FSSTData::validate_parts(
243                &symbols,
244                &symbol_lengths,
245                &codes_bytes,
246                &codes_offsets,
247                dtype.nullability(),
248                &uncompressed_lengths,
249                dtype,
250                len,
251                &mut ctx,
252            )?;
253            let slots = vec![
254                Some(uncompressed_lengths),
255                Some(codes_offsets),
256                validity_to_child(&codes_validity, len),
257            ];
258            let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
259            return Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots));
260        }
261
262        vortex_bail!(
263            "InvalidArgument: Expected 2 or 3 buffers, got {}",
264            buffers.len()
265        );
266    }
267
268    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
269        SLOT_NAMES[idx].to_string()
270    }
271
272    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
273        canonicalize_fsst(array.as_view(), ctx).map(ExecutionResult::done)
274    }
275
276    fn append_to_builder(
277        array: ArrayView<'_, Self>,
278        builder: &mut dyn ArrayBuilder,
279        ctx: &mut ExecutionCtx,
280    ) -> VortexResult<()> {
281        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
282            builder.extend_from_array(
283                &array
284                    .array()
285                    .clone()
286                    .execute::<Canonical>(ctx)?
287                    .into_array(),
288            );
289            return Ok(());
290        };
291
292        // Decompress the whole block of data into a new buffer, and create some views
293        // from it instead. The new buffer lands after any pending in-progress
294        // buffer that push_buffer_and_adjusted_views will flush first.
295        let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress());
296        let (buffers, views) = fsst_decode_views(array, next_buffer_index, ctx)?;
297
298        builder.push_buffer_and_adjusted_views(
299            &buffers,
300            &views,
301            array
302                .array()
303                .validity()?
304                .execute_mask(array.array().len(), ctx)?,
305        );
306        Ok(())
307    }
308
309    fn execute_parent(
310        array: ArrayView<'_, Self>,
311        parent: &ArrayRef,
312        child_idx: usize,
313        ctx: &mut ExecutionCtx,
314    ) -> VortexResult<Option<ArrayRef>> {
315        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
316    }
317
318    fn reduce_parent(
319        array: ArrayView<'_, Self>,
320        parent: &ArrayRef,
321        child_idx: usize,
322    ) -> VortexResult<Option<ArrayRef>> {
323        RULES.evaluate(array, parent, child_idx)
324    }
325}
326
327/// Lengths of the original values before compression, can be compressed.
328pub(crate) const UNCOMPRESSED_LENGTHS_SLOT: usize = 0;
329/// The offsets array for the FSST-compressed codes.
330pub(crate) const CODES_OFFSETS_SLOT: usize = 1;
331/// The validity bitmap for the compressed codes.
332pub(crate) const CODES_VALIDITY_SLOT: usize = 2;
333pub(crate) const NUM_SLOTS: usize = 3;
334pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
335    ["uncompressed_lengths", "codes_offsets", "codes_validity"];
336
337/// The inner data for an FSST-compressed array.
338///
339/// Holds the FSST symbol table (`symbols` + `symbol_lengths`) and the raw compressed
340/// codes bytes buffer. The codes offsets and validity live in the outer array's slots
341/// (slots 1 and 2 respectively).
342///
343/// A full [`VarBinArray`] representing the codes can be reconstructed on demand via
344/// [`FSSTArrayExt::codes()`], combining this buffer with the offsets/validity from slots.
345#[derive(Clone)]
346pub struct FSSTData {
347    symbols: Buffer<Symbol>,
348    symbol_lengths: Buffer<u8>,
349    /// The raw compressed codes bytes, equivalent to `VarBinData::bytes`.
350    codes_bytes: BufferHandle,
351    /// Cached length (number of elements).
352    len: usize,
353
354    /// Memoized compressor used for push-down of compute by compressing the RHS.
355    compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
356}
357
358impl Display for FSSTData {
359    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
360        write!(f, "len: {}, nsymbols: {}", self.len, self.symbols.len())
361    }
362}
363
364impl Debug for FSSTData {
365    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
366        f.debug_struct("FSSTArray")
367            .field("symbols", &self.symbols)
368            .field("symbol_lengths", &self.symbol_lengths)
369            .field("codes_bytes_len", &self.codes_bytes.len())
370            .field("len", &self.len)
371            .field("uncompressed_lengths", &"<outer slot>")
372            .field("codes_offsets", &"<outer slot>")
373            .field("codes_validity", &"<outer slot>")
374            .finish()
375    }
376}
377
378#[derive(Clone, Debug)]
379pub struct FSST;
380
381impl FSST {
382    /// Build an FSST array from a set of `symbols` and `codes`.
383    ///
384    /// The `codes` VarBinArray is decomposed: its bytes are stored in [`FSSTData`], while
385    /// its offsets and validity become array slots. The codes VarBinArray can be
386    /// reconstructed on demand via [`FSSTArrayExt::codes()`].
387    pub fn try_new(
388        dtype: DType,
389        symbols: Buffer<Symbol>,
390        symbol_lengths: Buffer<u8>,
391        codes: VarBinArray,
392        uncompressed_lengths: ArrayRef,
393        ctx: &mut ExecutionCtx,
394    ) -> VortexResult<FSSTArray> {
395        let len = codes.len();
396        FSSTData::validate_parts_from_codes(
397            &symbols,
398            &symbol_lengths,
399            &codes,
400            &uncompressed_lengths,
401            &dtype,
402            len,
403            ctx,
404        )?;
405        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
406        let codes_bytes = codes.bytes_handle().clone();
407        let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
408        Ok(unsafe {
409            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
410        })
411    }
412
413    /// Legacy deserialization path (2 buffers): the codes were stored as a full
414    /// `VarBinArray` child. We decompose the VarBinArray into its bytes (stored in
415    /// FSSTData) and offsets/validity (stored in slots).
416    #[allow(clippy::too_many_arguments)]
417    fn deserialize_legacy(
418        &self,
419        dtype: &DType,
420        len: usize,
421        metadata: &FSSTMetadata,
422        symbols: &Buffer<Symbol>,
423        symbol_lengths: &Buffer<u8>,
424        children: &dyn ArrayChildren,
425        ctx: &mut ExecutionCtx,
426    ) -> VortexResult<ArrayParts<Self>> {
427        if children.len() != 2 {
428            vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
429        }
430        let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
431        let codes: VarBinArray = codes
432            .as_opt::<VarBin>()
433            .ok_or_else(|| {
434                vortex_err!(
435                    "Expected VarBinArray for codes, got {}",
436                    codes.encoding_id()
437                )
438            })?
439            .into_owned();
440        let uncompressed_lengths = children.get(
441            1,
442            &DType::Primitive(
443                metadata.get_uncompressed_lengths_ptype()?,
444                Nullability::NonNullable,
445            ),
446            len,
447        )?;
448
449        FSSTData::validate_parts_from_codes(
450            symbols,
451            symbol_lengths,
452            &codes,
453            &uncompressed_lengths,
454            dtype,
455            len,
456            ctx,
457        )?;
458        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
459        let codes_bytes = codes.bytes_handle().clone();
460        let data = FSSTData::try_new(symbols.clone(), symbol_lengths.clone(), codes_bytes, len)?;
461        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
462    }
463
464    pub(crate) unsafe fn new_unchecked(
465        dtype: DType,
466        symbols: Buffer<Symbol>,
467        symbol_lengths: Buffer<u8>,
468        codes: VarBinArray,
469        uncompressed_lengths: ArrayRef,
470    ) -> FSSTArray {
471        let len = codes.len();
472        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
473        let codes_bytes = codes.bytes_handle().clone();
474        let data = unsafe { FSSTData::new_unchecked(symbols, symbol_lengths, codes_bytes, len) };
475        unsafe {
476            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
477        }
478    }
479}
480
481impl FSSTData {
482    fn make_slots(codes: &VarBinArray, uncompressed_lengths: &ArrayRef) -> Vec<Option<ArrayRef>> {
483        vec![
484            Some(uncompressed_lengths.clone()),
485            Some(codes.offsets().clone()),
486            validity_to_child(
487                &codes
488                    .validity()
489                    .vortex_expect("FSST codes validity should be derivable"),
490                codes.len(),
491            ),
492        ]
493    }
494
495    /// Build FSST data from a set of `symbols`, `symbol_lengths`, and compressed codes bytes.
496    ///
497    /// Symbols are 8-bytes and can represent short strings, each of which is assigned
498    /// a code.
499    ///
500    /// The `codes_bytes` buffer contains the concatenated compressed bytecodes for all elements.
501    /// Each element's compressed bytecodes are a sequence of 8-bit codes, where each code
502    /// corresponds either to a symbol or to the "escape code" (which tells the decoder to
503    /// emit the following byte without doing a table lookup).
504    ///
505    /// The offsets and validity for the codes are stored in the array's slots, not here.
506    /// Use [`FSSTArrayExt::codes()`] to reconstruct a full `VarBinArray`.
507    pub fn try_new(
508        symbols: Buffer<Symbol>,
509        symbol_lengths: Buffer<u8>,
510        codes_bytes: BufferHandle,
511        len: usize,
512    ) -> VortexResult<Self> {
513        // SAFETY: all components validated above
514        unsafe {
515            Ok(Self::new_unchecked(
516                symbols,
517                symbol_lengths,
518                codes_bytes,
519                len,
520            ))
521        }
522    }
523
524    pub fn validate(
525        &self,
526        dtype: &DType,
527        len: usize,
528        slots: &[Option<ArrayRef>],
529        ctx: &mut ExecutionCtx,
530    ) -> VortexResult<()> {
531        let codes_offsets = slots[CODES_OFFSETS_SLOT]
532            .as_ref()
533            .vortex_expect("FSSTArray codes_offsets slot");
534        Self::validate_parts(
535            &self.symbols,
536            &self.symbol_lengths,
537            &self.codes_bytes,
538            codes_offsets,
539            dtype.nullability(),
540            uncompressed_lengths_from_slots(slots),
541            dtype,
542            len,
543            ctx,
544        )
545    }
546
547    /// Validate using the decomposed components (codes bytes + offsets + nullability).
548    #[expect(clippy::too_many_arguments)]
549    fn validate_parts(
550        symbols: &Buffer<Symbol>,
551        symbol_lengths: &Buffer<u8>,
552        codes_bytes: &BufferHandle,
553        codes_offsets: &ArrayRef,
554        codes_nullability: Nullability,
555        uncompressed_lengths: &ArrayRef,
556        dtype: &DType,
557        len: usize,
558        ctx: &mut ExecutionCtx,
559    ) -> VortexResult<()> {
560        vortex_ensure!(
561            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
562            "FSST arrays must be Binary or Utf8, found {dtype}"
563        );
564
565        if symbols.len() > 255 {
566            vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
567        }
568        if symbols.len() != symbol_lengths.len() {
569            vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
570        }
571
572        // codes_offsets.len() - 1 == number of elements
573        let codes_len = codes_offsets.len().saturating_sub(1);
574        if codes_len != len {
575            vortex_bail!(InvalidArgument: "codes must have same len as outer array");
576        }
577
578        if uncompressed_lengths.len() != len {
579            vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
580        }
581
582        if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
583            vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
584        }
585
586        // Offsets must be non-nullable integer.
587        if !codes_offsets.dtype().is_int() || codes_offsets.dtype().is_nullable() {
588            vortex_bail!(InvalidArgument: "codes offsets must be non-nullable integer type, found {}", codes_offsets.dtype());
589        }
590
591        if codes_nullability != dtype.nullability() {
592            vortex_bail!(InvalidArgument: "codes nullability must match outer dtype nullability");
593        }
594
595        // Validate that last offset doesn't exceed bytes length (when host-resident).
596        if codes_bytes.is_on_host() && codes_offsets.is_host() && !codes_offsets.is_empty() {
597            let last_offset: usize = (&codes_offsets
598                .execute_scalar(codes_offsets.len() - 1, ctx)
599                .vortex_expect("offsets must support scalar_at"))
600                .try_into()
601                .vortex_expect("Failed to convert offset to usize");
602            vortex_ensure!(
603                last_offset <= codes_bytes.len(),
604                InvalidArgument: "Last codes offset {} exceeds codes bytes length {}",
605                last_offset,
606                codes_bytes.len()
607            );
608        }
609
610        Ok(())
611    }
612
613    /// Validate using a VarBinArray for the codes (convenience for construction paths).
614    fn validate_parts_from_codes(
615        symbols: &Buffer<Symbol>,
616        symbol_lengths: &Buffer<u8>,
617        codes: &VarBinArray,
618        uncompressed_lengths: &ArrayRef,
619        dtype: &DType,
620        len: usize,
621        ctx: &mut ExecutionCtx,
622    ) -> VortexResult<()> {
623        Self::validate_parts(
624            symbols,
625            symbol_lengths,
626            codes.bytes_handle(),
627            codes.offsets(),
628            codes.dtype().nullability(),
629            uncompressed_lengths,
630            dtype,
631            len,
632            ctx,
633        )
634    }
635
636    pub(crate) unsafe fn new_unchecked(
637        symbols: Buffer<Symbol>,
638        symbol_lengths: Buffer<u8>,
639        codes_bytes: BufferHandle,
640        len: usize,
641    ) -> Self {
642        let symbols2 = symbols.clone();
643        let symbol_lengths2 = symbol_lengths.clone();
644        let compressor = Arc::new(LazyLock::new(Box::new(move || {
645            Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
646        })
647            as Box<dyn Fn() -> Compressor + Send>));
648        Self {
649            symbols,
650            symbol_lengths,
651            codes_bytes,
652            len,
653            compressor,
654        }
655    }
656
657    /// Returns the number of elements in the array.
658    pub fn len(&self) -> usize {
659        self.len
660    }
661
662    /// Returns `true` if the array contains no elements.
663    pub fn is_empty(&self) -> bool {
664        self.len == 0
665    }
666
667    /// Access the symbol table array.
668    pub fn symbols(&self) -> &Buffer<Symbol> {
669        &self.symbols
670    }
671
672    /// Access the symbol lengths array.
673    pub fn symbol_lengths(&self) -> &Buffer<u8> {
674        &self.symbol_lengths
675    }
676
677    /// Access the compressed codes bytes buffer handle (may be on host or device).
678    pub fn codes_bytes_handle(&self) -> &BufferHandle {
679        &self.codes_bytes
680    }
681
682    /// Access the compressed codes bytes on the host.
683    pub fn codes_bytes(&self) -> &ByteBuffer {
684        self.codes_bytes.as_host()
685    }
686
687    /// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
688    /// this array.
689    pub fn decompressor(&self) -> Decompressor<'_> {
690        Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
691    }
692
693    /// Retrieves the FSST compressor.
694    pub fn compressor(&self) -> &Compressor {
695        self.compressor.as_ref()
696    }
697}
698
699fn uncompressed_lengths_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
700    slots[UNCOMPRESSED_LENGTHS_SLOT]
701        .as_ref()
702        .vortex_expect("FSSTArray uncompressed_lengths slot")
703}
704
705pub trait FSSTArrayExt: TypedArrayRef<FSST> {
706    fn uncompressed_lengths(&self) -> &ArrayRef {
707        uncompressed_lengths_from_slots(self.as_ref().slots())
708    }
709
710    fn uncompressed_lengths_dtype(&self) -> &DType {
711        self.uncompressed_lengths().dtype()
712    }
713
714    /// Reconstruct a [`VarBinArray`] for the compressed codes by combining the bytes
715    /// from [`FSSTData`] with the offsets and validity stored in the array's slots.
716    fn codes(&self) -> VarBinArray {
717        let offsets = self.as_ref().slots()[CODES_OFFSETS_SLOT]
718            .as_ref()
719            .vortex_expect("FSSTArray codes_offsets slot")
720            .clone();
721        let validity = child_to_validity(
722            self.as_ref().slots()[CODES_VALIDITY_SLOT].as_ref(),
723            self.as_ref().dtype().nullability(),
724        );
725        let codes_bytes = self.codes_bytes_handle().clone();
726        // SAFETY: components were validated at construction time.
727        unsafe {
728            VarBinArray::new_unchecked_from_handle(
729                offsets,
730                codes_bytes,
731                DType::Binary(self.as_ref().dtype().nullability()),
732                validity,
733            )
734        }
735    }
736
737    /// Get the DType of the codes array.
738    fn codes_dtype(&self) -> DType {
739        DType::Binary(self.as_ref().dtype().nullability())
740    }
741}
742
743impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}
744
745impl ValidityVTable<FSST> for FSST {
746    fn validity(array: ArrayView<'_, FSST>) -> VortexResult<Validity> {
747        Ok(child_to_validity(
748            array.slots()[CODES_VALIDITY_SLOT].as_ref(),
749            array.dtype().nullability(),
750        ))
751    }
752}
753
754#[cfg(test)]
755mod test {
756    use fsst::Compressor;
757    use fsst::Symbol;
758    use prost::Message;
759    use vortex_array::ArrayPlugin;
760    use vortex_array::IntoArray;
761    use vortex_array::LEGACY_SESSION;
762    use vortex_array::VortexSessionExecute;
763    use vortex_array::accessor::ArrayAccessor;
764    use vortex_array::arrays::VarBinViewArray;
765    use vortex_array::buffer::BufferHandle;
766    use vortex_array::dtype::DType;
767    use vortex_array::dtype::Nullability;
768    use vortex_array::dtype::PType;
769    use vortex_array::test_harness::check_metadata;
770    use vortex_buffer::Buffer;
771    use vortex_error::VortexError;
772
773    use crate::FSST;
774    use crate::array::FSSTArrayExt;
775    use crate::array::FSSTMetadata;
776    use crate::fsst_compress_iter;
777
778    #[cfg_attr(miri, ignore)]
779    #[test]
780    fn test_fsst_metadata() {
781        check_metadata(
782            "fsst.metadata",
783            &FSSTMetadata {
784                uncompressed_lengths_ptype: PType::U64 as i32,
785                codes_offsets_ptype: PType::I32 as i32,
786            }
787            .encode_to_vec(),
788        );
789    }
790
791    /// The original FSST array stored codes as a VarBinArray child and required that the child
792    /// have this encoding. Vortex forbids this kind of introspection, therefore we had to fix
793    /// the array to store the compressed offsets and compressed data buffer separately, and only
794    /// use VarBinArray to delegate behavior.
795    ///
796    /// This test manually constructs an old-style FSST array and ensures that it can still be
797    /// deserialized.
798    #[test]
799    fn test_back_compat() {
800        let symbols = Buffer::<Symbol>::copy_from([
801            Symbol::from_slice(b"abc00000"),
802            Symbol::from_slice(b"defghijk"),
803        ]);
804        let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);
805
806        let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
807        let mut ctx = LEGACY_SESSION.create_execution_ctx();
808        let fsst_array = fsst_compress_iter(
809            [Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
810            2,
811            DType::Utf8(Nullability::NonNullable),
812            &compressor,
813            &mut ctx,
814        );
815
816        let compressed_codes = fsst_array.codes();
817
818        // There were two buffers:
819        // 1. The 8 byte symbols
820        // 2. The symbol lengths as u8.
821        let buffers = [
822            BufferHandle::new_host(symbols.into_byte_buffer()),
823            BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
824        ];
825
826        // There were 2 children:
827        // 1. The compressed codes, stored as a VarBinArray.
828        // 2. The uncompressed lengths, stored as a Primitive array.
829        let children = vec![
830            compressed_codes.into_array(),
831            fsst_array.uncompressed_lengths().clone(),
832        ];
833
834        let fsst = ArrayPlugin::deserialize(
835            &FSST,
836            &DType::Utf8(Nullability::NonNullable),
837            2,
838            &FSSTMetadata {
839                uncompressed_lengths_ptype: fsst_array
840                    .uncompressed_lengths()
841                    .dtype()
842                    .as_ptype()
843                    .into(),
844                // Legacy array did not store this field, use Protobuf default of 0.
845                codes_offsets_ptype: 0,
846            }
847            .encode_to_vec(),
848            &buffers,
849            &children.as_slice(),
850            &LEGACY_SESSION,
851        )
852        .unwrap();
853
854        let decompressed = fsst
855            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
856            .unwrap();
857        decompressed
858            .with_iterator(|it| {
859                assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
860                assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
861                Ok::<_, VortexError>(())
862            })
863            .unwrap()
864    }
865}