vortex_sampling_compressor/compressors/
fsst.rs

1use std::any::Any;
2use std::fmt::Debug;
3use std::sync::Arc;
4
5use fsst::Compressor;
6use vortex_array::aliases::hash_set::HashSet;
7use vortex_array::array::{VarBinEncoding, VarBinViewEncoding};
8use vortex_array::{Encoding, EncodingId, IntoArray};
9use vortex_dtype::DType;
10use vortex_error::{vortex_bail, VortexResult};
11use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding};
12
13use super::bitpacked::BITPACK_WITH_PATCHES;
14use super::delta::DeltaCompressor;
15use super::r#for::FoRCompressor;
16use super::varbin::VarBinCompressor;
17use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
18use crate::downscale::downscale_integer_array;
19use crate::{constants, SamplingCompressor};
20
21#[derive(Debug)]
22pub struct FSSTCompressor;
23
24/// Maximum size in bytes of the FSST symbol table
25const FSST_SYMTAB_MAX_SIZE: usize = 8 * 255 + 255;
26
27impl EncoderMetadata for Compressor {
28    fn as_any(&self) -> &dyn Any {
29        self
30    }
31}
32
33impl EncodingCompressor for FSSTCompressor {
34    fn id(&self) -> &str {
35        FSSTEncoding::ID.as_ref()
36    }
37
38    fn cost(&self) -> u8 {
39        constants::FSST_COST
40    }
41
42    fn can_compress(&self, array: &vortex_array::Array) -> Option<&dyn EncodingCompressor> {
43        // FSST arrays must have DType::Utf8.
44        //
45        // Note that while it can accept binary data, it is unlikely to perform well.
46        if !matches!(array.dtype(), &DType::Utf8(_)) {
47            return None;
48        }
49
50        // FSST can be applied on top of VarBin and VarBinView
51        if array.is_encoding(VarBinEncoding::ID) || array.is_encoding(VarBinViewEncoding::ID) {
52            return Some(self);
53        }
54
55        None
56    }
57
58    fn compress<'a>(
59        &'a self,
60        array: &vortex_array::Array,
61        // TODO(aduffy): reuse compressor from sample run if we have saved it off.
62        like: Option<CompressionTree<'a>>,
63        ctx: SamplingCompressor<'a>,
64    ) -> VortexResult<CompressedArray<'a>> {
65        // Size-check: FSST has a builtin 2KB overhead due to the symbol table, and usually compresses
66        // between 2-3x depending on the text quality.
67        //
68        // It's not worth running a full compression step unless the array is large enough.
69        if array.nbytes() < 5 * FSST_SYMTAB_MAX_SIZE {
70            return Ok(CompressedArray::uncompressed(array.clone()));
71        }
72
73        let compressor = like
74            .clone()
75            .and_then(|mut tree| tree.metadata())
76            .map(VortexResult::Ok)
77            .unwrap_or_else(|| Ok(Arc::new(fsst_train_compressor(array)?)))?;
78
79        let Some(fsst_compressor) = compressor.as_any().downcast_ref::<Compressor>() else {
80            vortex_bail!("Could not downcast metadata as FSST Compressor")
81        };
82
83        let fsst_array =
84            if array.is_encoding(VarBinEncoding::ID) || array.is_encoding(VarBinViewEncoding::ID) {
85                // For a VarBinArray or VarBinViewArray, compress directly.
86                fsst_compress(array, fsst_compressor)?
87            } else {
88                vortex_bail!(
89                    "Unsupported encoding for FSSTCompressor: {}",
90                    array.encoding()
91                )
92            };
93
94        let codes = fsst_array.codes();
95        let compressed_codes = ctx
96            .auxiliary("fsst_codes")
97            .excluding(self)
98            .including_only(&[
99                &VarBinCompressor,
100                &DeltaCompressor,
101                &FoRCompressor,
102                &BITPACK_WITH_PATCHES,
103            ])
104            .compress(&codes, like.as_ref().and_then(|l| l.child(2)))?;
105
106        // Compress the uncompressed_lengths array.
107        let uncompressed_lengths = ctx
108            .auxiliary("uncompressed_lengths")
109            .excluding(self)
110            .compress(
111                &downscale_integer_array(fsst_array.uncompressed_lengths())?,
112                like.as_ref().and_then(|l| l.child(3)),
113            )?;
114
115        Ok(CompressedArray::compressed(
116            FSSTArray::try_new(
117                fsst_array.dtype().clone(),
118                fsst_array.symbols(),
119                fsst_array.symbol_lengths(),
120                compressed_codes.array,
121                uncompressed_lengths.array,
122            )?
123            .into_array(),
124            Some(CompressionTree::new_with_metadata(
125                self,
126                vec![None, None, compressed_codes.path, uncompressed_lengths.path],
127                compressor,
128            )),
129            array,
130        ))
131    }
132
133    fn used_encodings(&self) -> HashSet<EncodingId> {
134        HashSet::from([FSSTEncoding::ID])
135    }
136}