Skip to main content

vortex_fsst/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::sync::Arc;
9use std::sync::LazyLock;
10
11use fsst::Compressor;
12use fsst::Decompressor;
13use fsst::Symbol;
14use prost::Message as _;
15use vortex_array::Array;
16use vortex_array::ArrayEq;
17use vortex_array::ArrayHash;
18use vortex_array::ArrayId;
19use vortex_array::ArrayParts;
20use vortex_array::ArrayRef;
21use vortex_array::ArrayView;
22use vortex_array::Canonical;
23use vortex_array::ExecutionCtx;
24use vortex_array::ExecutionResult;
25use vortex_array::IntoArray;
26use vortex_array::LEGACY_SESSION;
27use vortex_array::Precision;
28use vortex_array::TypedArrayRef;
29use vortex_array::VortexSessionExecute;
30use vortex_array::arrays::VarBin;
31use vortex_array::arrays::VarBinArray;
32use vortex_array::arrays::varbin::VarBinArrayExt;
33use vortex_array::buffer::BufferHandle;
34use vortex_array::builders::ArrayBuilder;
35use vortex_array::builders::VarBinViewBuilder;
36use vortex_array::dtype::DType;
37use vortex_array::dtype::Nullability;
38use vortex_array::dtype::PType;
39use vortex_array::serde::ArrayChildren;
40use vortex_array::validity::Validity;
41use vortex_array::vtable::VTable;
42use vortex_array::vtable::ValidityVTable;
43use vortex_array::vtable::child_to_validity;
44use vortex_array::vtable::validity_to_child;
45use vortex_buffer::Buffer;
46use vortex_buffer::ByteBuffer;
47use vortex_error::VortexExpect;
48use vortex_error::VortexResult;
49use vortex_error::vortex_bail;
50use vortex_error::vortex_ensure;
51use vortex_error::vortex_err;
52use vortex_error::vortex_panic;
53use vortex_session::VortexSession;
54use vortex_session::registry::CachedId;
55
56use crate::canonical::canonicalize_fsst;
57use crate::canonical::fsst_decode_views;
58use crate::kernel::PARENT_KERNELS;
59use crate::rules::RULES;
60
61/// A [`FSST`]-encoded Vortex array.
62pub type FSSTArray = Array<FSST>;
63
64#[derive(Clone, prost::Message)]
65pub struct FSSTMetadata {
66    #[prost(enumeration = "PType", tag = "1")]
67    uncompressed_lengths_ptype: i32,
68
69    #[prost(enumeration = "PType", tag = "2")]
70    codes_offsets_ptype: i32,
71}
72
73impl FSSTMetadata {
74    pub fn get_uncompressed_lengths_ptype(&self) -> VortexResult<PType> {
75        PType::try_from(self.uncompressed_lengths_ptype)
76            .map_err(|_| vortex_err!("Invalid PType {}", self.uncompressed_lengths_ptype))
77    }
78}
79
80impl ArrayHash for FSSTData {
81    fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
82        self.symbols.array_hash(state, precision);
83        self.symbol_lengths.array_hash(state, precision);
84        self.codes_bytes.as_host().array_hash(state, precision);
85    }
86}
87
88impl ArrayEq for FSSTData {
89    fn array_eq(&self, other: &Self, precision: Precision) -> bool {
90        self.symbols.array_eq(&other.symbols, precision)
91            && self
92                .symbol_lengths
93                .array_eq(&other.symbol_lengths, precision)
94            && self
95                .codes_bytes
96                .as_host()
97                .array_eq(other.codes_bytes.as_host(), precision)
98    }
99}
100
101impl VTable for FSST {
102    type ArrayData = FSSTData;
103    type OperationsVTable = Self;
104    type ValidityVTable = Self;
105
106    fn id(&self) -> ArrayId {
107        static ID: CachedId = CachedId::new("vortex.fsst");
108        *ID
109    }
110
111    fn validate(
112        &self,
113        data: &Self::ArrayData,
114        dtype: &DType,
115        len: usize,
116        slots: &[Option<ArrayRef>],
117    ) -> VortexResult<()> {
118        data.validate(dtype, len, slots)
119    }
120
121    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
122        3
123    }
124
125    fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
126        match idx {
127            0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()),
128            1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
129            2 => array.codes_bytes_handle().clone(),
130            _ => vortex_panic!("FSSTArray buffer index {idx} out of bounds"),
131        }
132    }
133
134    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
135        match idx {
136            0 => Some("symbols".to_string()),
137            1 => Some("symbol_lengths".to_string()),
138            2 => Some("compressed_codes".to_string()),
139            _ => vortex_panic!("FSSTArray buffer_name index {idx} out of bounds"),
140        }
141    }
142
143    fn serialize(
144        array: ArrayView<'_, Self>,
145        _session: &VortexSession,
146    ) -> VortexResult<Option<Vec<u8>>> {
147        let codes_offsets = array.as_ref().slots()[CODES_OFFSETS_SLOT]
148            .as_ref()
149            .vortex_expect("FSSTArray codes_offsets slot");
150        Ok(Some(
151            FSSTMetadata {
152                uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
153                codes_offsets_ptype: codes_offsets.dtype().as_ptype().into(),
154            }
155            .encode_to_vec(),
156        ))
157    }
158
159    /// Deserializes an FSST array from its serialized components.
160    ///
161    /// Supports two serialization formats:
162    ///
163    /// ## Legacy format (2 buffers, 2 children)
164    ///
165    /// The original FSST layout stored the compressed codes as a full `VarBinArray` child.
166    /// - **Buffers**: `[symbols, symbol_lengths]`
167    /// - **Children**: `[codes (VarBinArray), uncompressed_lengths (Primitive)]`
168    ///
169    /// The codes VarBinArray child is decomposed: its bytes become the `codes_bytes` buffer,
170    /// and its offsets/validity are extracted into slots.
171    /// See `FSST::deserialize_legacy`.
172    ///
173    /// ## Current format (3 buffers, 2-3 children)
174    ///
175    /// The current layout stores the compressed bytes as a raw buffer alongside the symbol
176    /// table, with offsets and validity as separate children.
177    /// - **Buffers**: `[symbols, symbol_lengths, compressed_codes_bytes]`
178    /// - **Children**: `[uncompressed_lengths, codes_offsets, (optional) codes_validity]`
179    ///
180    /// The `codes_bytes` buffer is stored directly in `FSSTData`. A `VarBinArray` for the
181    /// codes can be reconstructed on demand via [`FSSTArrayExt::codes()`] using the bytes
182    /// from `FSSTData` combined with offsets and validity from the array's slots.
183    fn deserialize(
184        &self,
185        dtype: &DType,
186        len: usize,
187        metadata: &[u8],
188        buffers: &[BufferHandle],
189        children: &dyn ArrayChildren,
190        _session: &VortexSession,
191    ) -> VortexResult<ArrayParts<Self>> {
192        let metadata = FSSTMetadata::decode(metadata)?;
193        let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
194        let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);
195
196        if buffers.len() == 2 {
197            return Self::deserialize_legacy(
198                self,
199                dtype,
200                len,
201                &metadata,
202                &symbols,
203                &symbol_lengths,
204                children,
205            );
206        }
207
208        if buffers.len() == 3 {
209            let uncompressed_lengths = children.get(
210                0,
211                &DType::Primitive(
212                    metadata.get_uncompressed_lengths_ptype()?,
213                    Nullability::NonNullable,
214                ),
215                len,
216            )?;
217
218            let codes_bytes = buffers[2].clone();
219            let codes_offsets = children.get(
220                1,
221                &DType::Primitive(
222                    PType::try_from(metadata.codes_offsets_ptype)?,
223                    Nullability::NonNullable,
224                ),
225                // VarBin offsets are len + 1
226                len + 1,
227            )?;
228
229            let codes_validity = if children.len() == 2 {
230                Validity::from(dtype.nullability())
231            } else if children.len() == 3 {
232                let validity = children.get(2, &Validity::DTYPE, len)?;
233                Validity::Array(validity)
234            } else {
235                vortex_bail!("Expected 2 or 3 children, got {}", children.len());
236            };
237
238            FSSTData::validate_parts(
239                &symbols,
240                &symbol_lengths,
241                &codes_bytes,
242                &codes_offsets,
243                dtype.nullability(),
244                &uncompressed_lengths,
245                dtype,
246                len,
247            )?;
248            let slots = vec![
249                Some(uncompressed_lengths),
250                Some(codes_offsets),
251                validity_to_child(&codes_validity, len),
252            ];
253            let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
254            return Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots));
255        }
256
257        vortex_bail!(
258            "InvalidArgument: Expected 2 or 3 buffers, got {}",
259            buffers.len()
260        );
261    }
262
263    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
264        SLOT_NAMES[idx].to_string()
265    }
266
267    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
268        canonicalize_fsst(array.as_view(), ctx).map(ExecutionResult::done)
269    }
270
271    fn append_to_builder(
272        array: ArrayView<'_, Self>,
273        builder: &mut dyn ArrayBuilder,
274        ctx: &mut ExecutionCtx,
275    ) -> VortexResult<()> {
276        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
277            builder.extend_from_array(
278                &array
279                    .array()
280                    .clone()
281                    .execute::<Canonical>(ctx)?
282                    .into_array(),
283            );
284            return Ok(());
285        };
286
287        // Decompress the whole block of data into a new buffer, and create some views
288        // from it instead. The new buffer lands after any pending in-progress
289        // buffer that push_buffer_and_adjusted_views will flush first.
290        let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress());
291        let (buffers, views) = fsst_decode_views(array, next_buffer_index, ctx)?;
292
293        builder.push_buffer_and_adjusted_views(
294            &buffers,
295            &views,
296            array
297                .array()
298                .validity()?
299                .to_mask(array.array().len(), ctx)?,
300        );
301        Ok(())
302    }
303
304    fn execute_parent(
305        array: ArrayView<'_, Self>,
306        parent: &ArrayRef,
307        child_idx: usize,
308        ctx: &mut ExecutionCtx,
309    ) -> VortexResult<Option<ArrayRef>> {
310        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
311    }
312
313    fn reduce_parent(
314        array: ArrayView<'_, Self>,
315        parent: &ArrayRef,
316        child_idx: usize,
317    ) -> VortexResult<Option<ArrayRef>> {
318        RULES.evaluate(array, parent, child_idx)
319    }
320}
321
322/// Lengths of the original values before compression, can be compressed.
323pub(crate) const UNCOMPRESSED_LENGTHS_SLOT: usize = 0;
324/// The offsets array for the FSST-compressed codes.
325pub(crate) const CODES_OFFSETS_SLOT: usize = 1;
326/// The validity bitmap for the compressed codes.
327pub(crate) const CODES_VALIDITY_SLOT: usize = 2;
328pub(crate) const NUM_SLOTS: usize = 3;
329pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
330    ["uncompressed_lengths", "codes_offsets", "codes_validity"];
331
332/// The inner data for an FSST-compressed array.
333///
334/// Holds the FSST symbol table (`symbols` + `symbol_lengths`) and the raw compressed
335/// codes bytes buffer. The codes offsets and validity live in the outer array's slots
336/// (slots 1 and 2 respectively).
337///
338/// A full [`VarBinArray`] representing the codes can be reconstructed on demand via
339/// [`FSSTArrayExt::codes()`], combining this buffer with the offsets/validity from slots.
340#[derive(Clone)]
341pub struct FSSTData {
342    symbols: Buffer<Symbol>,
343    symbol_lengths: Buffer<u8>,
344    /// The raw compressed codes bytes, equivalent to `VarBinData::bytes`.
345    codes_bytes: BufferHandle,
346    /// Cached length (number of elements).
347    len: usize,
348
349    /// Memoized compressor used for push-down of compute by compressing the RHS.
350    compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
351}
352
353impl Display for FSSTData {
354    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
355        write!(f, "len: {}, nsymbols: {}", self.len, self.symbols.len())
356    }
357}
358
359impl Debug for FSSTData {
360    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
361        f.debug_struct("FSSTArray")
362            .field("symbols", &self.symbols)
363            .field("symbol_lengths", &self.symbol_lengths)
364            .field("codes_bytes_len", &self.codes_bytes.len())
365            .field("len", &self.len)
366            .field("uncompressed_lengths", &"<outer slot>")
367            .field("codes_offsets", &"<outer slot>")
368            .field("codes_validity", &"<outer slot>")
369            .finish()
370    }
371}
372
373#[derive(Clone, Debug)]
374pub struct FSST;
375
376impl FSST {
377    /// Build an FSST array from a set of `symbols` and `codes`.
378    ///
379    /// The `codes` VarBinArray is decomposed: its bytes are stored in [`FSSTData`], while
380    /// its offsets and validity become array slots. The codes VarBinArray can be
381    /// reconstructed on demand via [`FSSTArrayExt::codes()`].
382    pub fn try_new(
383        dtype: DType,
384        symbols: Buffer<Symbol>,
385        symbol_lengths: Buffer<u8>,
386        codes: VarBinArray,
387        uncompressed_lengths: ArrayRef,
388    ) -> VortexResult<FSSTArray> {
389        let len = codes.len();
390        FSSTData::validate_parts_from_codes(
391            &symbols,
392            &symbol_lengths,
393            &codes,
394            &uncompressed_lengths,
395            &dtype,
396            len,
397        )?;
398        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
399        let codes_bytes = codes.bytes_handle().clone();
400        let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
401        Ok(unsafe {
402            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
403        })
404    }
405
406    /// Legacy deserialization path (2 buffers): the codes were stored as a full
407    /// `VarBinArray` child. We decompose the VarBinArray into its bytes (stored in
408    /// FSSTData) and offsets/validity (stored in slots).
409    fn deserialize_legacy(
410        &self,
411        dtype: &DType,
412        len: usize,
413        metadata: &FSSTMetadata,
414        symbols: &Buffer<Symbol>,
415        symbol_lengths: &Buffer<u8>,
416        children: &dyn ArrayChildren,
417    ) -> VortexResult<ArrayParts<Self>> {
418        if children.len() != 2 {
419            vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
420        }
421        let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
422        let codes: VarBinArray = codes
423            .as_opt::<VarBin>()
424            .ok_or_else(|| {
425                vortex_err!(
426                    "Expected VarBinArray for codes, got {}",
427                    codes.encoding_id()
428                )
429            })?
430            .into_owned();
431        let uncompressed_lengths = children.get(
432            1,
433            &DType::Primitive(
434                metadata.get_uncompressed_lengths_ptype()?,
435                Nullability::NonNullable,
436            ),
437            len,
438        )?;
439
440        FSSTData::validate_parts_from_codes(
441            symbols,
442            symbol_lengths,
443            &codes,
444            &uncompressed_lengths,
445            dtype,
446            len,
447        )?;
448        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
449        let codes_bytes = codes.bytes_handle().clone();
450        let data = FSSTData::try_new(symbols.clone(), symbol_lengths.clone(), codes_bytes, len)?;
451        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
452    }
453
454    pub(crate) unsafe fn new_unchecked(
455        dtype: DType,
456        symbols: Buffer<Symbol>,
457        symbol_lengths: Buffer<u8>,
458        codes: VarBinArray,
459        uncompressed_lengths: ArrayRef,
460    ) -> FSSTArray {
461        let len = codes.len();
462        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
463        let codes_bytes = codes.bytes_handle().clone();
464        let data = unsafe { FSSTData::new_unchecked(symbols, symbol_lengths, codes_bytes, len) };
465        unsafe {
466            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
467        }
468    }
469}
470
471impl FSSTData {
472    fn make_slots(codes: &VarBinArray, uncompressed_lengths: &ArrayRef) -> Vec<Option<ArrayRef>> {
473        vec![
474            Some(uncompressed_lengths.clone()),
475            Some(codes.offsets().clone()),
476            validity_to_child(
477                &codes
478                    .validity()
479                    .vortex_expect("FSST codes validity should be derivable"),
480                codes.len(),
481            ),
482        ]
483    }
484
485    /// Build FSST data from a set of `symbols`, `symbol_lengths`, and compressed codes bytes.
486    ///
487    /// Symbols are 8-bytes and can represent short strings, each of which is assigned
488    /// a code.
489    ///
490    /// The `codes_bytes` buffer contains the concatenated compressed bytecodes for all elements.
491    /// Each element's compressed bytecodes are a sequence of 8-bit codes, where each code
492    /// corresponds either to a symbol or to the "escape code" (which tells the decoder to
493    /// emit the following byte without doing a table lookup).
494    ///
495    /// The offsets and validity for the codes are stored in the array's slots, not here.
496    /// Use [`FSSTArrayExt::codes()`] to reconstruct a full `VarBinArray`.
497    pub fn try_new(
498        symbols: Buffer<Symbol>,
499        symbol_lengths: Buffer<u8>,
500        codes_bytes: BufferHandle,
501        len: usize,
502    ) -> VortexResult<Self> {
503        // SAFETY: all components validated above
504        unsafe {
505            Ok(Self::new_unchecked(
506                symbols,
507                symbol_lengths,
508                codes_bytes,
509                len,
510            ))
511        }
512    }
513
514    pub fn validate(
515        &self,
516        dtype: &DType,
517        len: usize,
518        slots: &[Option<ArrayRef>],
519    ) -> VortexResult<()> {
520        let codes_offsets = slots[CODES_OFFSETS_SLOT]
521            .as_ref()
522            .vortex_expect("FSSTArray codes_offsets slot");
523        Self::validate_parts(
524            &self.symbols,
525            &self.symbol_lengths,
526            &self.codes_bytes,
527            codes_offsets,
528            dtype.nullability(),
529            uncompressed_lengths_from_slots(slots),
530            dtype,
531            len,
532        )
533    }
534
535    /// Validate using the decomposed components (codes bytes + offsets + nullability).
536    #[expect(clippy::too_many_arguments)]
537    fn validate_parts(
538        symbols: &Buffer<Symbol>,
539        symbol_lengths: &Buffer<u8>,
540        codes_bytes: &BufferHandle,
541        codes_offsets: &ArrayRef,
542        codes_nullability: Nullability,
543        uncompressed_lengths: &ArrayRef,
544        dtype: &DType,
545        len: usize,
546    ) -> VortexResult<()> {
547        vortex_ensure!(
548            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
549            "FSST arrays must be Binary or Utf8, found {dtype}"
550        );
551
552        if symbols.len() > 255 {
553            vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
554        }
555        if symbols.len() != symbol_lengths.len() {
556            vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
557        }
558
559        // codes_offsets.len() - 1 == number of elements
560        let codes_len = codes_offsets.len().saturating_sub(1);
561        if codes_len != len {
562            vortex_bail!(InvalidArgument: "codes must have same len as outer array");
563        }
564
565        if uncompressed_lengths.len() != len {
566            vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
567        }
568
569        if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
570            vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
571        }
572
573        // Offsets must be non-nullable integer.
574        if !codes_offsets.dtype().is_int() || codes_offsets.dtype().is_nullable() {
575            vortex_bail!(InvalidArgument: "codes offsets must be non-nullable integer type, found {}", codes_offsets.dtype());
576        }
577
578        if codes_nullability != dtype.nullability() {
579            vortex_bail!(InvalidArgument: "codes nullability must match outer dtype nullability");
580        }
581
582        // Validate that last offset doesn't exceed bytes length (when host-resident).
583        if codes_bytes.is_on_host() && codes_offsets.is_host() && !codes_offsets.is_empty() {
584            let last_offset: usize = (&codes_offsets
585                .execute_scalar(
586                    codes_offsets.len() - 1,
587                    &mut LEGACY_SESSION.create_execution_ctx(),
588                )
589                .vortex_expect("offsets must support scalar_at"))
590                .try_into()
591                .vortex_expect("Failed to convert offset to usize");
592            vortex_ensure!(
593                last_offset <= codes_bytes.len(),
594                InvalidArgument: "Last codes offset {} exceeds codes bytes length {}",
595                last_offset,
596                codes_bytes.len()
597            );
598        }
599
600        Ok(())
601    }
602
603    /// Validate using a VarBinArray for the codes (convenience for construction paths).
604    fn validate_parts_from_codes(
605        symbols: &Buffer<Symbol>,
606        symbol_lengths: &Buffer<u8>,
607        codes: &VarBinArray,
608        uncompressed_lengths: &ArrayRef,
609        dtype: &DType,
610        len: usize,
611    ) -> VortexResult<()> {
612        Self::validate_parts(
613            symbols,
614            symbol_lengths,
615            codes.bytes_handle(),
616            codes.offsets(),
617            codes.dtype().nullability(),
618            uncompressed_lengths,
619            dtype,
620            len,
621        )
622    }
623
624    pub(crate) unsafe fn new_unchecked(
625        symbols: Buffer<Symbol>,
626        symbol_lengths: Buffer<u8>,
627        codes_bytes: BufferHandle,
628        len: usize,
629    ) -> Self {
630        let symbols2 = symbols.clone();
631        let symbol_lengths2 = symbol_lengths.clone();
632        let compressor = Arc::new(LazyLock::new(Box::new(move || {
633            Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
634        })
635            as Box<dyn Fn() -> Compressor + Send>));
636        Self {
637            symbols,
638            symbol_lengths,
639            codes_bytes,
640            len,
641            compressor,
642        }
643    }
644
645    /// Returns the number of elements in the array.
646    pub fn len(&self) -> usize {
647        self.len
648    }
649
650    /// Returns `true` if the array contains no elements.
651    pub fn is_empty(&self) -> bool {
652        self.len == 0
653    }
654
655    /// Access the symbol table array.
656    pub fn symbols(&self) -> &Buffer<Symbol> {
657        &self.symbols
658    }
659
660    /// Access the symbol lengths array.
661    pub fn symbol_lengths(&self) -> &Buffer<u8> {
662        &self.symbol_lengths
663    }
664
665    /// Access the compressed codes bytes buffer handle (may be on host or device).
666    pub fn codes_bytes_handle(&self) -> &BufferHandle {
667        &self.codes_bytes
668    }
669
670    /// Access the compressed codes bytes on the host.
671    pub fn codes_bytes(&self) -> &ByteBuffer {
672        self.codes_bytes.as_host()
673    }
674
675    /// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
676    /// this array.
677    pub fn decompressor(&self) -> Decompressor<'_> {
678        Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
679    }
680
681    /// Retrieves the FSST compressor.
682    pub fn compressor(&self) -> &Compressor {
683        self.compressor.as_ref()
684    }
685}
686
687fn uncompressed_lengths_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
688    slots[UNCOMPRESSED_LENGTHS_SLOT]
689        .as_ref()
690        .vortex_expect("FSSTArray uncompressed_lengths slot")
691}
692
693pub trait FSSTArrayExt: TypedArrayRef<FSST> {
694    fn uncompressed_lengths(&self) -> &ArrayRef {
695        uncompressed_lengths_from_slots(self.as_ref().slots())
696    }
697
698    fn uncompressed_lengths_dtype(&self) -> &DType {
699        self.uncompressed_lengths().dtype()
700    }
701
702    /// Reconstruct a [`VarBinArray`] for the compressed codes by combining the bytes
703    /// from [`FSSTData`] with the offsets and validity stored in the array's slots.
704    fn codes(&self) -> VarBinArray {
705        let offsets = self.as_ref().slots()[CODES_OFFSETS_SLOT]
706            .as_ref()
707            .vortex_expect("FSSTArray codes_offsets slot")
708            .clone();
709        let validity = child_to_validity(
710            &self.as_ref().slots()[CODES_VALIDITY_SLOT],
711            self.as_ref().dtype().nullability(),
712        );
713        let codes_bytes = self.codes_bytes_handle().clone();
714        // SAFETY: components were validated at construction time.
715        unsafe {
716            VarBinArray::new_unchecked_from_handle(
717                offsets,
718                codes_bytes,
719                DType::Binary(self.as_ref().dtype().nullability()),
720                validity,
721            )
722        }
723    }
724
725    /// Get the DType of the codes array.
726    fn codes_dtype(&self) -> DType {
727        DType::Binary(self.as_ref().dtype().nullability())
728    }
729}
730
731impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}
732
733impl ValidityVTable<FSST> for FSST {
734    fn validity(array: ArrayView<'_, FSST>) -> VortexResult<Validity> {
735        Ok(child_to_validity(
736            &array.slots()[CODES_VALIDITY_SLOT],
737            array.dtype().nullability(),
738        ))
739    }
740}
741
742#[cfg(test)]
743mod test {
744    use fsst::Compressor;
745    use fsst::Symbol;
746    use prost::Message;
747    use vortex_array::ArrayPlugin;
748    use vortex_array::IntoArray;
749    use vortex_array::LEGACY_SESSION;
750    use vortex_array::VortexSessionExecute;
751    use vortex_array::accessor::ArrayAccessor;
752    use vortex_array::arrays::VarBinViewArray;
753    use vortex_array::buffer::BufferHandle;
754    use vortex_array::dtype::DType;
755    use vortex_array::dtype::Nullability;
756    use vortex_array::dtype::PType;
757    use vortex_array::test_harness::check_metadata;
758    use vortex_buffer::Buffer;
759    use vortex_error::VortexError;
760
761    use crate::FSST;
762    use crate::array::FSSTArrayExt;
763    use crate::array::FSSTMetadata;
764    use crate::fsst_compress_iter;
765
766    #[cfg_attr(miri, ignore)]
767    #[test]
768    fn test_fsst_metadata() {
769        check_metadata(
770            "fsst.metadata",
771            &FSSTMetadata {
772                uncompressed_lengths_ptype: PType::U64 as i32,
773                codes_offsets_ptype: PType::I32 as i32,
774            }
775            .encode_to_vec(),
776        );
777    }
778
779    /// The original FSST array stored codes as a VarBinArray child and required that the child
780    /// have this encoding. Vortex forbids this kind of introspection, therefore we had to fix
781    /// the array to store the compressed offsets and compressed data buffer separately, and only
782    /// use VarBinArray to delegate behavior.
783    ///
784    /// This test manually constructs an old-style FSST array and ensures that it can still be
785    /// deserialized.
786    #[test]
787    fn test_back_compat() {
788        let symbols = Buffer::<Symbol>::copy_from([
789            Symbol::from_slice(b"abc00000"),
790            Symbol::from_slice(b"defghijk"),
791        ]);
792        let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);
793
794        let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
795        let fsst_array = fsst_compress_iter(
796            [Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
797            2,
798            DType::Utf8(Nullability::NonNullable),
799            &compressor,
800        );
801
802        let compressed_codes = fsst_array.codes();
803
804        // There were two buffers:
805        // 1. The 8 byte symbols
806        // 2. The symbol lengths as u8.
807        let buffers = [
808            BufferHandle::new_host(symbols.into_byte_buffer()),
809            BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
810        ];
811
812        // There were 2 children:
813        // 1. The compressed codes, stored as a VarBinArray.
814        // 2. The uncompressed lengths, stored as a Primitive array.
815        let children = vec![
816            compressed_codes.into_array(),
817            fsst_array.uncompressed_lengths().clone(),
818        ];
819
820        let fsst = ArrayPlugin::deserialize(
821            &FSST,
822            &DType::Utf8(Nullability::NonNullable),
823            2,
824            &FSSTMetadata {
825                uncompressed_lengths_ptype: fsst_array
826                    .uncompressed_lengths()
827                    .dtype()
828                    .as_ptype()
829                    .into(),
830                // Legacy array did not store this field, use Protobuf default of 0.
831                codes_offsets_ptype: 0,
832            }
833            .encode_to_vec(),
834            &buffers,
835            &children.as_slice(),
836            &LEGACY_SESSION,
837        )
838        .unwrap();
839
840        let decompressed = fsst
841            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
842            .unwrap();
843        decompressed
844            .with_iterator(|it| {
845                assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
846                assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
847                Ok::<_, VortexError>(())
848            })
849            .unwrap()
850    }
851}