vortex_sampling_compressor/compressors/
fsst.rs1use std::any::Any;
2use std::fmt::Debug;
3use std::sync::Arc;
4
5use fsst::Compressor;
6use vortex_array::aliases::hash_set::HashSet;
7use vortex_array::array::{VarBinEncoding, VarBinViewEncoding};
8use vortex_array::{Encoding, EncodingId, IntoArray};
9use vortex_dtype::DType;
10use vortex_error::{vortex_bail, VortexResult};
11use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding};
12
13use super::bitpacked::BITPACK_WITH_PATCHES;
14use super::delta::DeltaCompressor;
15use super::r#for::FoRCompressor;
16use super::varbin::VarBinCompressor;
17use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
18use crate::downscale::downscale_integer_array;
19use crate::{constants, SamplingCompressor};
20
21#[derive(Debug)]
22pub struct FSSTCompressor;
23
24const FSST_SYMTAB_MAX_SIZE: usize = 8 * 255 + 255;
26
27impl EncoderMetadata for Compressor {
28 fn as_any(&self) -> &dyn Any {
29 self
30 }
31}
32
33impl EncodingCompressor for FSSTCompressor {
34 fn id(&self) -> &str {
35 FSSTEncoding::ID.as_ref()
36 }
37
38 fn cost(&self) -> u8 {
39 constants::FSST_COST
40 }
41
42 fn can_compress(&self, array: &vortex_array::Array) -> Option<&dyn EncodingCompressor> {
43 if !matches!(array.dtype(), &DType::Utf8(_)) {
47 return None;
48 }
49
50 if array.is_encoding(VarBinEncoding::ID) || array.is_encoding(VarBinViewEncoding::ID) {
52 return Some(self);
53 }
54
55 None
56 }
57
58 fn compress<'a>(
59 &'a self,
60 array: &vortex_array::Array,
61 like: Option<CompressionTree<'a>>,
63 ctx: SamplingCompressor<'a>,
64 ) -> VortexResult<CompressedArray<'a>> {
65 if array.nbytes() < 5 * FSST_SYMTAB_MAX_SIZE {
70 return Ok(CompressedArray::uncompressed(array.clone()));
71 }
72
73 let compressor = like
74 .clone()
75 .and_then(|mut tree| tree.metadata())
76 .map(VortexResult::Ok)
77 .unwrap_or_else(|| Ok(Arc::new(fsst_train_compressor(array)?)))?;
78
79 let Some(fsst_compressor) = compressor.as_any().downcast_ref::<Compressor>() else {
80 vortex_bail!("Could not downcast metadata as FSST Compressor")
81 };
82
83 let fsst_array =
84 if array.is_encoding(VarBinEncoding::ID) || array.is_encoding(VarBinViewEncoding::ID) {
85 fsst_compress(array, fsst_compressor)?
87 } else {
88 vortex_bail!(
89 "Unsupported encoding for FSSTCompressor: {}",
90 array.encoding()
91 )
92 };
93
94 let codes = fsst_array.codes();
95 let compressed_codes = ctx
96 .auxiliary("fsst_codes")
97 .excluding(self)
98 .including_only(&[
99 &VarBinCompressor,
100 &DeltaCompressor,
101 &FoRCompressor,
102 &BITPACK_WITH_PATCHES,
103 ])
104 .compress(&codes, like.as_ref().and_then(|l| l.child(2)))?;
105
106 let uncompressed_lengths = ctx
108 .auxiliary("uncompressed_lengths")
109 .excluding(self)
110 .compress(
111 &downscale_integer_array(fsst_array.uncompressed_lengths())?,
112 like.as_ref().and_then(|l| l.child(3)),
113 )?;
114
115 Ok(CompressedArray::compressed(
116 FSSTArray::try_new(
117 fsst_array.dtype().clone(),
118 fsst_array.symbols(),
119 fsst_array.symbol_lengths(),
120 compressed_codes.array,
121 uncompressed_lengths.array,
122 )?
123 .into_array(),
124 Some(CompressionTree::new_with_metadata(
125 self,
126 vec![None, None, compressed_codes.path, uncompressed_lengths.path],
127 compressor,
128 )),
129 array,
130 ))
131 }
132
133 fn used_encodings(&self) -> HashSet<EncodingId> {
134 HashSet::from([FSSTEncoding::ID])
135 }
136}