Skip to main content

vortex_fsst/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::sync::Arc;
9use std::sync::LazyLock;
10
11use fsst::Compressor;
12use fsst::Decompressor;
13use fsst::Symbol;
14use prost::Message as _;
15use vortex_array::Array;
16use vortex_array::ArrayEq;
17use vortex_array::ArrayHash;
18use vortex_array::ArrayId;
19use vortex_array::ArrayParts;
20use vortex_array::ArrayRef;
21use vortex_array::ArrayView;
22use vortex_array::Canonical;
23use vortex_array::ExecutionCtx;
24use vortex_array::ExecutionResult;
25use vortex_array::IntoArray;
26use vortex_array::Precision;
27use vortex_array::TypedArrayRef;
28use vortex_array::arrays::VarBin;
29use vortex_array::arrays::VarBinArray;
30use vortex_array::arrays::varbin::VarBinArrayExt;
31use vortex_array::buffer::BufferHandle;
32use vortex_array::builders::ArrayBuilder;
33use vortex_array::builders::VarBinViewBuilder;
34use vortex_array::dtype::DType;
35use vortex_array::dtype::Nullability;
36use vortex_array::dtype::PType;
37use vortex_array::serde::ArrayChildren;
38use vortex_array::validity::Validity;
39use vortex_array::vtable::VTable;
40use vortex_array::vtable::ValidityVTable;
41use vortex_array::vtable::child_to_validity;
42use vortex_array::vtable::validity_to_child;
43use vortex_buffer::Buffer;
44use vortex_buffer::ByteBuffer;
45use vortex_error::VortexExpect;
46use vortex_error::VortexResult;
47use vortex_error::vortex_bail;
48use vortex_error::vortex_ensure;
49use vortex_error::vortex_err;
50use vortex_error::vortex_panic;
51use vortex_session::VortexSession;
52
53use crate::canonical::canonicalize_fsst;
54use crate::canonical::fsst_decode_views;
55use crate::kernel::PARENT_KERNELS;
56use crate::rules::RULES;
57
58/// A [`FSST`]-encoded Vortex array.
59pub type FSSTArray = Array<FSST>;
60
61#[derive(Clone, prost::Message)]
62pub struct FSSTMetadata {
63    #[prost(enumeration = "PType", tag = "1")]
64    uncompressed_lengths_ptype: i32,
65
66    #[prost(enumeration = "PType", tag = "2")]
67    codes_offsets_ptype: i32,
68}
69
70impl FSSTMetadata {
71    pub fn get_uncompressed_lengths_ptype(&self) -> VortexResult<PType> {
72        PType::try_from(self.uncompressed_lengths_ptype)
73            .map_err(|_| vortex_err!("Invalid PType {}", self.uncompressed_lengths_ptype))
74    }
75}
76
77impl ArrayHash for FSSTData {
78    fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
79        self.symbols.array_hash(state, precision);
80        self.symbol_lengths.array_hash(state, precision);
81        self.codes_bytes.as_host().array_hash(state, precision);
82    }
83}
84
85impl ArrayEq for FSSTData {
86    fn array_eq(&self, other: &Self, precision: Precision) -> bool {
87        self.symbols.array_eq(&other.symbols, precision)
88            && self
89                .symbol_lengths
90                .array_eq(&other.symbol_lengths, precision)
91            && self
92                .codes_bytes
93                .as_host()
94                .array_eq(other.codes_bytes.as_host(), precision)
95    }
96}
97
98impl VTable for FSST {
99    type ArrayData = FSSTData;
100    type OperationsVTable = Self;
101    type ValidityVTable = Self;
102
103    fn id(&self) -> ArrayId {
104        Self::ID
105    }
106
107    fn validate(
108        &self,
109        data: &Self::ArrayData,
110        dtype: &DType,
111        len: usize,
112        slots: &[Option<ArrayRef>],
113    ) -> VortexResult<()> {
114        data.validate(dtype, len, slots)
115    }
116
117    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
118        3
119    }
120
121    fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
122        match idx {
123            0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()),
124            1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
125            2 => array.codes_bytes_handle().clone(),
126            _ => vortex_panic!("FSSTArray buffer index {idx} out of bounds"),
127        }
128    }
129
130    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
131        match idx {
132            0 => Some("symbols".to_string()),
133            1 => Some("symbol_lengths".to_string()),
134            2 => Some("compressed_codes".to_string()),
135            _ => vortex_panic!("FSSTArray buffer_name index {idx} out of bounds"),
136        }
137    }
138
139    fn serialize(
140        array: ArrayView<'_, Self>,
141        _session: &VortexSession,
142    ) -> VortexResult<Option<Vec<u8>>> {
143        let codes_offsets = array.as_ref().slots()[CODES_OFFSETS_SLOT]
144            .as_ref()
145            .vortex_expect("FSSTArray codes_offsets slot");
146        Ok(Some(
147            FSSTMetadata {
148                uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
149                codes_offsets_ptype: codes_offsets.dtype().as_ptype().into(),
150            }
151            .encode_to_vec(),
152        ))
153    }
154
155    /// Deserializes an FSST array from its serialized components.
156    ///
157    /// Supports two serialization formats:
158    ///
159    /// ## Legacy format (2 buffers, 2 children)
160    ///
161    /// The original FSST layout stored the compressed codes as a full `VarBinArray` child.
162    /// - **Buffers**: `[symbols, symbol_lengths]`
163    /// - **Children**: `[codes (VarBinArray), uncompressed_lengths (Primitive)]`
164    ///
165    /// The codes VarBinArray child is decomposed: its bytes become the `codes_bytes` buffer,
166    /// and its offsets/validity are extracted into slots.
167    /// See `FSST::deserialize_legacy`.
168    ///
169    /// ## Current format (3 buffers, 2-3 children)
170    ///
171    /// The current layout stores the compressed bytes as a raw buffer alongside the symbol
172    /// table, with offsets and validity as separate children.
173    /// - **Buffers**: `[symbols, symbol_lengths, compressed_codes_bytes]`
174    /// - **Children**: `[uncompressed_lengths, codes_offsets, (optional) codes_validity]`
175    ///
176    /// The `codes_bytes` buffer is stored directly in `FSSTData`. A `VarBinArray` for the
177    /// codes can be reconstructed on demand via [`FSSTArrayExt::codes()`] using the bytes
178    /// from `FSSTData` combined with offsets and validity from the array's slots.
179    fn deserialize(
180        &self,
181        dtype: &DType,
182        len: usize,
183        metadata: &[u8],
184        buffers: &[BufferHandle],
185        children: &dyn ArrayChildren,
186        _session: &VortexSession,
187    ) -> VortexResult<ArrayParts<Self>> {
188        let metadata = FSSTMetadata::decode(metadata)?;
189        let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
190        let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);
191
192        if buffers.len() == 2 {
193            return Self::deserialize_legacy(
194                self,
195                dtype,
196                len,
197                &metadata,
198                &symbols,
199                &symbol_lengths,
200                children,
201            );
202        }
203
204        if buffers.len() == 3 {
205            let uncompressed_lengths = children.get(
206                0,
207                &DType::Primitive(
208                    metadata.get_uncompressed_lengths_ptype()?,
209                    Nullability::NonNullable,
210                ),
211                len,
212            )?;
213
214            let codes_bytes = buffers[2].clone();
215            let codes_offsets = children.get(
216                1,
217                &DType::Primitive(
218                    PType::try_from(metadata.codes_offsets_ptype)?,
219                    Nullability::NonNullable,
220                ),
221                // VarBin offsets are len + 1
222                len + 1,
223            )?;
224
225            let codes_validity = if children.len() == 2 {
226                Validity::from(dtype.nullability())
227            } else if children.len() == 3 {
228                let validity = children.get(2, &Validity::DTYPE, len)?;
229                Validity::Array(validity)
230            } else {
231                vortex_bail!("Expected 2 or 3 children, got {}", children.len());
232            };
233
234            FSSTData::validate_parts(
235                &symbols,
236                &symbol_lengths,
237                &codes_bytes,
238                &codes_offsets,
239                dtype.nullability(),
240                &uncompressed_lengths,
241                dtype,
242                len,
243            )?;
244            let slots = vec![
245                Some(uncompressed_lengths),
246                Some(codes_offsets),
247                validity_to_child(&codes_validity, len),
248            ];
249            let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
250            return Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots));
251        }
252
253        vortex_bail!(
254            "InvalidArgument: Expected 2 or 3 buffers, got {}",
255            buffers.len()
256        );
257    }
258
259    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
260        SLOT_NAMES[idx].to_string()
261    }
262
263    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
264        canonicalize_fsst(array.as_view(), ctx).map(ExecutionResult::done)
265    }
266
267    fn append_to_builder(
268        array: ArrayView<'_, Self>,
269        builder: &mut dyn ArrayBuilder,
270        ctx: &mut ExecutionCtx,
271    ) -> VortexResult<()> {
272        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
273            builder.extend_from_array(
274                &array
275                    .array()
276                    .clone()
277                    .execute::<Canonical>(ctx)?
278                    .into_array(),
279            );
280            return Ok(());
281        };
282
283        // Decompress the whole block of data into a new buffer, and create some views
284        // from it instead. The new buffer lands after any pending in-progress
285        // buffer that push_buffer_and_adjusted_views will flush first.
286        let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress());
287        let (buffers, views) = fsst_decode_views(array, next_buffer_index, ctx)?;
288
289        builder.push_buffer_and_adjusted_views(&buffers, &views, array.array().validity_mask()?);
290        Ok(())
291    }
292
293    fn execute_parent(
294        array: ArrayView<'_, Self>,
295        parent: &ArrayRef,
296        child_idx: usize,
297        ctx: &mut ExecutionCtx,
298    ) -> VortexResult<Option<ArrayRef>> {
299        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
300    }
301
302    fn reduce_parent(
303        array: ArrayView<'_, Self>,
304        parent: &ArrayRef,
305        child_idx: usize,
306    ) -> VortexResult<Option<ArrayRef>> {
307        RULES.evaluate(array, parent, child_idx)
308    }
309}
310
311/// Lengths of the original values before compression, can be compressed.
312pub(crate) const UNCOMPRESSED_LENGTHS_SLOT: usize = 0;
313/// The offsets array for the FSST-compressed codes.
314#[allow(dead_code, reason = "reserved for back-compat slot numbering")]
315pub(crate) const CODES_OFFSETS_SLOT: usize = 1;
316/// The validity bitmap for the compressed codes.
317#[allow(dead_code, reason = "reserved for back-compat slot numbering")]
318pub(crate) const CODES_VALIDITY_SLOT: usize = 2;
319pub(crate) const NUM_SLOTS: usize = 3;
320pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
321    ["uncompressed_lengths", "codes_offsets", "codes_validity"];
322
323/// The inner data for an FSST-compressed array.
324///
325/// Holds the FSST symbol table (`symbols` + `symbol_lengths`) and the raw compressed
326/// codes bytes buffer. The codes offsets and validity live in the outer array's slots
327/// (slots 1 and 2 respectively).
328///
329/// A full [`VarBinArray`] representing the codes can be reconstructed on demand via
330/// [`FSSTArrayExt::codes()`], combining this buffer with the offsets/validity from slots.
331#[derive(Clone)]
332pub struct FSSTData {
333    symbols: Buffer<Symbol>,
334    symbol_lengths: Buffer<u8>,
335    /// The raw compressed codes bytes, equivalent to `VarBinData::bytes`.
336    codes_bytes: BufferHandle,
337    /// Cached length (number of elements).
338    len: usize,
339
340    /// Memoized compressor used for push-down of compute by compressing the RHS.
341    compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
342}
343
344impl Display for FSSTData {
345    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
346        write!(f, "len: {}, nsymbols: {}", self.len, self.symbols.len())
347    }
348}
349
350impl Debug for FSSTData {
351    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
352        f.debug_struct("FSSTArray")
353            .field("symbols", &self.symbols)
354            .field("symbol_lengths", &self.symbol_lengths)
355            .field("codes_bytes_len", &self.codes_bytes.len())
356            .field("len", &self.len)
357            .field("uncompressed_lengths", &"<outer slot>")
358            .field("codes_offsets", &"<outer slot>")
359            .field("codes_validity", &"<outer slot>")
360            .finish()
361    }
362}
363
364#[derive(Clone, Debug)]
365pub struct FSST;
366
367impl FSST {
368    pub const ID: ArrayId = ArrayId::new_ref("vortex.fsst");
369
370    /// Build an FSST array from a set of `symbols` and `codes`.
371    ///
372    /// The `codes` VarBinArray is decomposed: its bytes are stored in [`FSSTData`], while
373    /// its offsets and validity become array slots. The codes VarBinArray can be
374    /// reconstructed on demand via [`FSSTArrayExt::codes()`].
375    pub fn try_new(
376        dtype: DType,
377        symbols: Buffer<Symbol>,
378        symbol_lengths: Buffer<u8>,
379        codes: VarBinArray,
380        uncompressed_lengths: ArrayRef,
381    ) -> VortexResult<FSSTArray> {
382        let len = codes.len();
383        FSSTData::validate_parts_from_codes(
384            &symbols,
385            &symbol_lengths,
386            &codes,
387            &uncompressed_lengths,
388            &dtype,
389            len,
390        )?;
391        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
392        let codes_bytes = codes.bytes_handle().clone();
393        let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
394        Ok(unsafe {
395            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
396        })
397    }
398
399    /// Legacy deserialization path (2 buffers): the codes were stored as a full
400    /// `VarBinArray` child. We decompose the VarBinArray into its bytes (stored in
401    /// FSSTData) and offsets/validity (stored in slots).
402    fn deserialize_legacy(
403        &self,
404        dtype: &DType,
405        len: usize,
406        metadata: &FSSTMetadata,
407        symbols: &Buffer<Symbol>,
408        symbol_lengths: &Buffer<u8>,
409        children: &dyn ArrayChildren,
410    ) -> VortexResult<ArrayParts<Self>> {
411        if children.len() != 2 {
412            vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
413        }
414        let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
415        let codes: VarBinArray = codes
416            .as_opt::<VarBin>()
417            .ok_or_else(|| {
418                vortex_err!(
419                    "Expected VarBinArray for codes, got {}",
420                    codes.encoding_id()
421                )
422            })?
423            .into_owned();
424        let uncompressed_lengths = children.get(
425            1,
426            &DType::Primitive(
427                metadata.get_uncompressed_lengths_ptype()?,
428                Nullability::NonNullable,
429            ),
430            len,
431        )?;
432
433        FSSTData::validate_parts_from_codes(
434            symbols,
435            symbol_lengths,
436            &codes,
437            &uncompressed_lengths,
438            dtype,
439            len,
440        )?;
441        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
442        let codes_bytes = codes.bytes_handle().clone();
443        let data = FSSTData::try_new(symbols.clone(), symbol_lengths.clone(), codes_bytes, len)?;
444        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
445    }
446
447    pub(crate) unsafe fn new_unchecked(
448        dtype: DType,
449        symbols: Buffer<Symbol>,
450        symbol_lengths: Buffer<u8>,
451        codes: VarBinArray,
452        uncompressed_lengths: ArrayRef,
453    ) -> FSSTArray {
454        let len = codes.len();
455        let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
456        let codes_bytes = codes.bytes_handle().clone();
457        let data = unsafe { FSSTData::new_unchecked(symbols, symbol_lengths, codes_bytes, len) };
458        unsafe {
459            Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
460        }
461    }
462}
463
464impl FSSTData {
465    fn make_slots(codes: &VarBinArray, uncompressed_lengths: &ArrayRef) -> Vec<Option<ArrayRef>> {
466        vec![
467            Some(uncompressed_lengths.clone()),
468            Some(codes.offsets().clone()),
469            validity_to_child(
470                &codes
471                    .validity()
472                    .vortex_expect("FSST codes validity should be derivable"),
473                codes.len(),
474            ),
475        ]
476    }
477
478    /// Build FSST data from a set of `symbols`, `symbol_lengths`, and compressed codes bytes.
479    ///
480    /// Symbols are 8-bytes and can represent short strings, each of which is assigned
481    /// a code.
482    ///
483    /// The `codes_bytes` buffer contains the concatenated compressed bytecodes for all elements.
484    /// Each element's compressed bytecodes are a sequence of 8-bit codes, where each code
485    /// corresponds either to a symbol or to the "escape code" (which tells the decoder to
486    /// emit the following byte without doing a table lookup).
487    ///
488    /// The offsets and validity for the codes are stored in the array's slots, not here.
489    /// Use [`FSSTArrayExt::codes()`] to reconstruct a full `VarBinArray`.
490    pub fn try_new(
491        symbols: Buffer<Symbol>,
492        symbol_lengths: Buffer<u8>,
493        codes_bytes: BufferHandle,
494        len: usize,
495    ) -> VortexResult<Self> {
496        // SAFETY: all components validated above
497        unsafe {
498            Ok(Self::new_unchecked(
499                symbols,
500                symbol_lengths,
501                codes_bytes,
502                len,
503            ))
504        }
505    }
506
507    pub fn validate(
508        &self,
509        dtype: &DType,
510        len: usize,
511        slots: &[Option<ArrayRef>],
512    ) -> VortexResult<()> {
513        let codes_offsets = slots[CODES_OFFSETS_SLOT]
514            .as_ref()
515            .vortex_expect("FSSTArray codes_offsets slot");
516        Self::validate_parts(
517            &self.symbols,
518            &self.symbol_lengths,
519            &self.codes_bytes,
520            codes_offsets,
521            dtype.nullability(),
522            uncompressed_lengths_from_slots(slots),
523            dtype,
524            len,
525        )
526    }
527
528    /// Validate using the decomposed components (codes bytes + offsets + nullability).
529    #[allow(clippy::too_many_arguments)]
530    fn validate_parts(
531        symbols: &Buffer<Symbol>,
532        symbol_lengths: &Buffer<u8>,
533        codes_bytes: &BufferHandle,
534        codes_offsets: &ArrayRef,
535        codes_nullability: Nullability,
536        uncompressed_lengths: &ArrayRef,
537        dtype: &DType,
538        len: usize,
539    ) -> VortexResult<()> {
540        vortex_ensure!(
541            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
542            "FSST arrays must be Binary or Utf8, found {dtype}"
543        );
544
545        if symbols.len() > 255 {
546            vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
547        }
548        if symbols.len() != symbol_lengths.len() {
549            vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
550        }
551
552        // codes_offsets.len() - 1 == number of elements
553        let codes_len = codes_offsets.len().saturating_sub(1);
554        if codes_len != len {
555            vortex_bail!(InvalidArgument: "codes must have same len as outer array");
556        }
557
558        if uncompressed_lengths.len() != len {
559            vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
560        }
561
562        if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
563            vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
564        }
565
566        // Offsets must be non-nullable integer.
567        if !codes_offsets.dtype().is_int() || codes_offsets.dtype().is_nullable() {
568            vortex_bail!(InvalidArgument: "codes offsets must be non-nullable integer type, found {}", codes_offsets.dtype());
569        }
570
571        if codes_nullability != dtype.nullability() {
572            vortex_bail!(InvalidArgument: "codes nullability must match outer dtype nullability");
573        }
574
575        // Validate that last offset doesn't exceed bytes length (when host-resident).
576        if codes_bytes.is_on_host() && codes_offsets.is_host() && !codes_offsets.is_empty() {
577            let last_offset: usize = (&codes_offsets
578                .scalar_at(codes_offsets.len() - 1)
579                .vortex_expect("offsets must support scalar_at"))
580                .try_into()
581                .vortex_expect("Failed to convert offset to usize");
582            vortex_ensure!(
583                last_offset <= codes_bytes.len(),
584                InvalidArgument: "Last codes offset {} exceeds codes bytes length {}",
585                last_offset,
586                codes_bytes.len()
587            );
588        }
589
590        Ok(())
591    }
592
593    /// Validate using a VarBinArray for the codes (convenience for construction paths).
594    fn validate_parts_from_codes(
595        symbols: &Buffer<Symbol>,
596        symbol_lengths: &Buffer<u8>,
597        codes: &VarBinArray,
598        uncompressed_lengths: &ArrayRef,
599        dtype: &DType,
600        len: usize,
601    ) -> VortexResult<()> {
602        Self::validate_parts(
603            symbols,
604            symbol_lengths,
605            codes.bytes_handle(),
606            codes.offsets(),
607            codes.dtype().nullability(),
608            uncompressed_lengths,
609            dtype,
610            len,
611        )
612    }
613
614    pub(crate) unsafe fn new_unchecked(
615        symbols: Buffer<Symbol>,
616        symbol_lengths: Buffer<u8>,
617        codes_bytes: BufferHandle,
618        len: usize,
619    ) -> Self {
620        let symbols2 = symbols.clone();
621        let symbol_lengths2 = symbol_lengths.clone();
622        let compressor = Arc::new(LazyLock::new(Box::new(move || {
623            Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
624        })
625            as Box<dyn Fn() -> Compressor + Send>));
626        Self {
627            symbols,
628            symbol_lengths,
629            codes_bytes,
630            len,
631            compressor,
632        }
633    }
634
635    /// Returns the number of elements in the array.
636    pub fn len(&self) -> usize {
637        self.len
638    }
639
640    /// Returns `true` if the array contains no elements.
641    pub fn is_empty(&self) -> bool {
642        self.len == 0
643    }
644
645    /// Access the symbol table array.
646    pub fn symbols(&self) -> &Buffer<Symbol> {
647        &self.symbols
648    }
649
650    /// Access the symbol lengths array.
651    pub fn symbol_lengths(&self) -> &Buffer<u8> {
652        &self.symbol_lengths
653    }
654
655    /// Access the compressed codes bytes buffer handle (may be on host or device).
656    pub fn codes_bytes_handle(&self) -> &BufferHandle {
657        &self.codes_bytes
658    }
659
660    /// Access the compressed codes bytes on the host.
661    pub fn codes_bytes(&self) -> &ByteBuffer {
662        self.codes_bytes.as_host()
663    }
664
665    /// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
666    /// this array.
667    pub fn decompressor(&self) -> Decompressor<'_> {
668        Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
669    }
670
671    /// Retrieves the FSST compressor.
672    pub fn compressor(&self) -> &Compressor {
673        self.compressor.as_ref()
674    }
675}
676
677fn uncompressed_lengths_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
678    slots[UNCOMPRESSED_LENGTHS_SLOT]
679        .as_ref()
680        .vortex_expect("FSSTArray uncompressed_lengths slot")
681}
682
683pub trait FSSTArrayExt: TypedArrayRef<FSST> {
684    fn uncompressed_lengths(&self) -> &ArrayRef {
685        uncompressed_lengths_from_slots(self.as_ref().slots())
686    }
687
688    fn uncompressed_lengths_dtype(&self) -> &DType {
689        self.uncompressed_lengths().dtype()
690    }
691
692    /// Reconstruct a [`VarBinArray`] for the compressed codes by combining the bytes
693    /// from [`FSSTData`] with the offsets and validity stored in the array's slots.
694    fn codes(&self) -> VarBinArray {
695        let offsets = self.as_ref().slots()[CODES_OFFSETS_SLOT]
696            .as_ref()
697            .vortex_expect("FSSTArray codes_offsets slot")
698            .clone();
699        let validity = child_to_validity(
700            &self.as_ref().slots()[CODES_VALIDITY_SLOT],
701            self.as_ref().dtype().nullability(),
702        );
703        let codes_bytes = self.codes_bytes_handle().clone();
704        // SAFETY: components were validated at construction time.
705        unsafe {
706            VarBinArray::new_unchecked_from_handle(
707                offsets,
708                codes_bytes,
709                DType::Binary(self.as_ref().dtype().nullability()),
710                validity,
711            )
712        }
713    }
714
715    /// Get the DType of the codes array.
716    fn codes_dtype(&self) -> DType {
717        DType::Binary(self.as_ref().dtype().nullability())
718    }
719}
720
721impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}
722
723impl ValidityVTable<FSST> for FSST {
724    fn validity(array: ArrayView<'_, FSST>) -> VortexResult<Validity> {
725        Ok(child_to_validity(
726            &array.slots()[CODES_VALIDITY_SLOT],
727            array.dtype().nullability(),
728        ))
729    }
730}
731
732#[cfg(test)]
733mod test {
734    use fsst::Compressor;
735    use fsst::Symbol;
736    use prost::Message;
737    use vortex_array::ArrayPlugin;
738    use vortex_array::IntoArray;
739    use vortex_array::LEGACY_SESSION;
740    use vortex_array::VortexSessionExecute;
741    use vortex_array::accessor::ArrayAccessor;
742    use vortex_array::arrays::VarBinViewArray;
743    use vortex_array::buffer::BufferHandle;
744    use vortex_array::dtype::DType;
745    use vortex_array::dtype::Nullability;
746    use vortex_array::dtype::PType;
747    use vortex_array::test_harness::check_metadata;
748    use vortex_buffer::Buffer;
749    use vortex_error::VortexError;
750
751    use crate::FSST;
752    use crate::array::FSSTArrayExt;
753    use crate::array::FSSTMetadata;
754    use crate::fsst_compress_iter;
755
756    #[cfg_attr(miri, ignore)]
757    #[test]
758    fn test_fsst_metadata() {
759        check_metadata(
760            "fsst.metadata",
761            &FSSTMetadata {
762                uncompressed_lengths_ptype: PType::U64 as i32,
763                codes_offsets_ptype: PType::I32 as i32,
764            }
765            .encode_to_vec(),
766        );
767    }
768
769    /// The original FSST array stored codes as a VarBinArray child and required that the child
770    /// have this encoding. Vortex forbids this kind of introspection, therefore we had to fix
771    /// the array to store the compressed offsets and compressed data buffer separately, and only
772    /// use VarBinArray to delegate behavior.
773    ///
774    /// This test manually constructs an old-style FSST array and ensures that it can still be
775    /// deserialized.
776    #[test]
777    fn test_back_compat() {
778        let symbols = Buffer::<Symbol>::copy_from([
779            Symbol::from_slice(b"abc00000"),
780            Symbol::from_slice(b"defghijk"),
781        ]);
782        let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);
783
784        let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
785        let fsst_array = fsst_compress_iter(
786            [Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
787            2,
788            DType::Utf8(Nullability::NonNullable),
789            &compressor,
790        );
791
792        let compressed_codes = fsst_array.codes();
793
794        // There were two buffers:
795        // 1. The 8 byte symbols
796        // 2. The symbol lengths as u8.
797        let buffers = [
798            BufferHandle::new_host(symbols.into_byte_buffer()),
799            BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
800        ];
801
802        // There were 2 children:
803        // 1. The compressed codes, stored as a VarBinArray.
804        // 2. The uncompressed lengths, stored as a Primitive array.
805        let children = vec![
806            compressed_codes.into_array(),
807            fsst_array.uncompressed_lengths().clone(),
808        ];
809
810        let fsst = ArrayPlugin::deserialize(
811            &FSST,
812            &DType::Utf8(Nullability::NonNullable),
813            2,
814            &FSSTMetadata {
815                uncompressed_lengths_ptype: fsst_array
816                    .uncompressed_lengths()
817                    .dtype()
818                    .as_ptype()
819                    .into(),
820                // Legacy array did not store this field, use Protobuf default of 0.
821                codes_offsets_ptype: 0,
822            }
823            .encode_to_vec(),
824            &buffers,
825            &children.as_slice(),
826            &LEGACY_SESSION,
827        )
828        .unwrap();
829
830        let decompressed = fsst
831            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
832            .unwrap();
833        decompressed
834            .with_iterator(|it| {
835                assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
836                assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
837                Ok::<_, VortexError>(())
838            })
839            .unwrap()
840    }
841}