vortex_btrblocks/schemes/string/
fsst.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::VarBinArray;
12use vortex_array::arrays::primitive::PrimitiveArrayExt;
13use vortex_array::arrays::varbin::VarBinArrayExt;
14use vortex_compressor::estimate::CompressionEstimate;
15use vortex_compressor::estimate::DeferredEstimate;
16use vortex_error::VortexResult;
17use vortex_fsst::FSST;
18use vortex_fsst::FSSTArrayExt;
19use vortex_fsst::fsst_compress;
20use vortex_fsst::fsst_train_compressor;
21
22use crate::ArrayAndStats;
23use crate::CascadingCompressor;
24use crate::CompressorContext;
25use crate::Scheme;
26use crate::SchemeExt;
27
28#[derive(Debug, Copy, Clone, PartialEq, Eq)]
35pub struct FSSTScheme;
36
37impl Scheme for FSSTScheme {
38 fn scheme_name(&self) -> &'static str {
39 "vortex.string.fsst"
40 }
41
42 fn matches(&self, canonical: &Canonical) -> bool {
43 canonical.dtype().is_utf8()
44 }
45
46 fn num_children(&self) -> usize {
48 2
49 }
50
51 fn expected_compression_ratio(
52 &self,
53 _data: &ArrayAndStats,
54 _compress_ctx: CompressorContext,
55 _exec_ctx: &mut ExecutionCtx,
56 ) -> CompressionEstimate {
57 CompressionEstimate::Deferred(DeferredEstimate::Sample)
58 }
59
60 fn compress(
61 &self,
62 compressor: &CascadingCompressor,
63 data: &ArrayAndStats,
64 compress_ctx: CompressorContext,
65 exec_ctx: &mut ExecutionCtx,
66 ) -> VortexResult<ArrayRef> {
67 let utf8 = data.array_as_varbinview().into_owned();
68 let compressor_fsst = fsst_train_compressor(&utf8);
69 let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx);
70
71 let uncompressed_lengths_primitive = fsst
72 .uncompressed_lengths()
73 .clone()
74 .execute::<PrimitiveArray>(exec_ctx)?
75 .narrow(exec_ctx)?;
76 let compressed_original_lengths = compressor.compress_child(
77 &uncompressed_lengths_primitive.into_array(),
78 &compress_ctx,
79 self.id(),
80 0,
81 exec_ctx,
82 )?;
83
84 let codes_offsets_primitive = fsst
85 .codes()
86 .offsets()
87 .clone()
88 .execute::<PrimitiveArray>(exec_ctx)?
89 .narrow(exec_ctx)?;
90 let compressed_codes_offsets = compressor.compress_child(
91 &codes_offsets_primitive.into_array(),
92 &compress_ctx,
93 self.id(),
94 1,
95 exec_ctx,
96 )?;
97 let compressed_codes = VarBinArray::try_new(
98 compressed_codes_offsets,
99 fsst.codes().bytes().clone(),
100 fsst.codes().dtype().clone(),
101 fsst.codes().validity()?,
102 )?;
103
104 let fsst = FSST::try_new(
105 fsst.dtype().clone(),
106 fsst.symbols().clone(),
107 fsst.symbol_lengths().clone(),
108 compressed_codes,
109 compressed_original_lengths,
110 exec_ctx,
111 )?;
112
113 Ok(fsst.into_array())
114 }
115}