1mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_dict::DictArray;
11use vortex_dtype::PType;
12use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13
14use self::stats::FloatStats;
15use crate::float::dictionary::dictionary_encode;
16use crate::integer::{IntCompressor, IntegerStats};
17use crate::patches::compress_patches;
18use crate::{
19 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20 estimate_compression_ratio_with_sampling, integer,
21};
22
23pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
24
25impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26
27pub struct FloatCompressor;
28
29impl Compressor for FloatCompressor {
30 type ArrayVTable = PrimitiveVTable;
31 type SchemeType = dyn FloatScheme;
32 type StatsType = FloatStats;
33
34 fn schemes() -> &'static [&'static Self::SchemeType] {
35 &[
36 &UncompressedScheme,
37 &ConstantScheme,
38 &ALPScheme,
39 &ALPRDScheme,
40 &DictScheme,
41 ]
42 }
43
44 fn default_scheme() -> &'static Self::SchemeType {
45 &UncompressedScheme
46 }
47
48 fn dict_scheme_code() -> FloatCode {
49 DICT_SCHEME
50 }
51}
52
53const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
54const CONSTANT_SCHEME: FloatCode = FloatCode(1);
55const ALP_SCHEME: FloatCode = FloatCode(2);
56const ALPRD_SCHEME: FloatCode = FloatCode(3);
57const DICT_SCHEME: FloatCode = FloatCode(4);
58const RUNEND_SCHEME: FloatCode = FloatCode(5);
59
60#[derive(Debug, Copy, Clone)]
61struct UncompressedScheme;
62
63#[derive(Debug, Copy, Clone)]
64struct ConstantScheme;
65
66#[derive(Debug, Copy, Clone)]
67struct ALPScheme;
68
69#[derive(Debug, Copy, Clone)]
70struct ALPRDScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct DictScheme;
74
75impl Scheme for UncompressedScheme {
76 type StatsType = FloatStats;
77 type CodeType = FloatCode;
78
79 fn code(&self) -> FloatCode {
80 UNCOMPRESSED_SCHEME
81 }
82
83 fn expected_compression_ratio(
84 &self,
85 _stats: &Self::StatsType,
86 _is_sample: bool,
87 _allowed_cascading: usize,
88 _excludes: &[FloatCode],
89 ) -> VortexResult<f64> {
90 Ok(1.0)
91 }
92
93 fn compress(
94 &self,
95 stats: &Self::StatsType,
96 _is_sample: bool,
97 _allowed_cascading: usize,
98 _excludes: &[FloatCode],
99 ) -> VortexResult<ArrayRef> {
100 Ok(stats.source().to_array())
101 }
102}
103
104impl Scheme for ConstantScheme {
105 type StatsType = FloatStats;
106 type CodeType = FloatCode;
107
108 fn code(&self) -> FloatCode {
109 CONSTANT_SCHEME
110 }
111
112 fn expected_compression_ratio(
113 &self,
114 stats: &Self::StatsType,
115 is_sample: bool,
116 _allowed_cascading: usize,
117 _excludes: &[FloatCode],
118 ) -> VortexResult<f64> {
119 if is_sample {
121 return Ok(0.0);
122 }
123
124 if stats.distinct_values_count > 1 {
126 return Ok(0.0);
127 }
128
129 if stats.null_count > 0 && stats.value_count > 0 {
131 return Ok(0.0);
132 }
133
134 Ok(stats.value_count as f64)
135 }
136
137 fn compress(
138 &self,
139 stats: &Self::StatsType,
140 _is_sample: bool,
141 _allowed_cascading: usize,
142 _excludes: &[FloatCode],
143 ) -> VortexResult<ArrayRef> {
144 let scalar = stats
145 .source()
146 .as_constant()
147 .vortex_expect("must be constant");
148
149 Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
150 }
151}
152
153#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
154pub struct FloatCode(u8);
155
156impl Scheme for ALPScheme {
157 type StatsType = FloatStats;
158 type CodeType = FloatCode;
159
160 fn code(&self) -> FloatCode {
161 ALP_SCHEME
162 }
163
164 fn expected_compression_ratio(
165 &self,
166 stats: &Self::StatsType,
167 is_sample: bool,
168 allowed_cascading: usize,
169 excludes: &[FloatCode],
170 ) -> VortexResult<f64> {
171 if stats.source().ptype() == PType::F16 {
173 return Ok(0.0);
174 }
175
176 if allowed_cascading == 0 {
177 return Ok(0.0);
180 }
181
182 estimate_compression_ratio_with_sampling(
183 self,
184 stats,
185 is_sample,
186 allowed_cascading,
187 excludes,
188 )
189 }
190
191 fn compress(
192 &self,
193 stats: &FloatStats,
194 is_sample: bool,
195 allowed_cascading: usize,
196 excludes: &[FloatCode],
197 ) -> VortexResult<ArrayRef> {
198 let alp_encoded = ALPEncoding
199 .encode(&stats.source().to_canonical()?, None)?
200 .vortex_expect("Input is a supported floating point array");
201 let alp = alp_encoded.as_::<ALPVTable>();
202 let alp_ints = alp.encoded().to_primitive()?;
203
204 let mut int_excludes = Vec::new();
208 if excludes.contains(&DICT_SCHEME) {
209 int_excludes.push(integer::DictScheme.code());
210 }
211 if excludes.contains(&RUNEND_SCHEME) {
212 int_excludes.push(integer::RunEndScheme.code());
213 }
214
215 let compressed_alp_ints =
216 IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
217
218 let patches = alp.patches().map(compress_patches).transpose()?;
219
220 Ok(ALPArray::try_new(compressed_alp_ints, alp.exponents(), patches)?.into_array())
221 }
222}
223
224impl Scheme for ALPRDScheme {
225 type StatsType = FloatStats;
226 type CodeType = FloatCode;
227
228 fn code(&self) -> FloatCode {
229 ALPRD_SCHEME
230 }
231
232 fn expected_compression_ratio(
233 &self,
234 stats: &Self::StatsType,
235 is_sample: bool,
236 allowed_cascading: usize,
237 excludes: &[FloatCode],
238 ) -> VortexResult<f64> {
239 if stats.source().ptype() == PType::F16 {
240 return Ok(0.0);
241 }
242
243 estimate_compression_ratio_with_sampling(
244 self,
245 stats,
246 is_sample,
247 allowed_cascading,
248 excludes,
249 )
250 }
251
252 fn compress(
253 &self,
254 stats: &Self::StatsType,
255 _is_sample: bool,
256 _allowed_cascading: usize,
257 _excludes: &[FloatCode],
258 ) -> VortexResult<ArrayRef> {
259 let encoder = match stats.source().ptype() {
260 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
261 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
262 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
263 };
264
265 let mut alp_rd = encoder.encode(stats.source());
266
267 let patches = alp_rd
268 .left_parts_patches()
269 .map(compress_patches)
270 .transpose()?;
271 alp_rd.replace_left_parts_patches(patches);
272
273 Ok(alp_rd.into_array())
274 }
275}
276
277impl Scheme for DictScheme {
278 type StatsType = FloatStats;
279 type CodeType = FloatCode;
280
281 fn code(&self) -> FloatCode {
282 DICT_SCHEME
283 }
284
285 fn expected_compression_ratio(
286 &self,
287 stats: &Self::StatsType,
288 is_sample: bool,
289 allowed_cascading: usize,
290 excludes: &[FloatCode],
291 ) -> VortexResult<f64> {
292 if stats.value_count == 0 {
293 return Ok(0.0);
294 }
295
296 if stats.distinct_values_count > stats.value_count / 2 {
298 return Ok(0.0);
299 }
300
301 estimate_compression_ratio_with_sampling(
303 self,
304 stats,
305 is_sample,
306 allowed_cascading,
307 excludes,
308 )
309 }
310
311 fn compress(
312 &self,
313 stats: &Self::StatsType,
314 is_sample: bool,
315 allowed_cascading: usize,
316 _excludes: &[FloatCode],
317 ) -> VortexResult<ArrayRef> {
318 let dict_array = dictionary_encode(stats)?;
319
320 let codes_stats = IntegerStats::generate_opts(
322 &dict_array.codes().to_primitive()?,
323 GenerateStatsOptions {
324 count_distinct_values: false,
325 },
326 );
327 let codes_scheme = IntCompressor::choose_scheme(
328 &codes_stats,
329 is_sample,
330 allowed_cascading - 1,
331 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
332 )?;
333 let compressed_codes = codes_scheme.compress(
334 &codes_stats,
335 is_sample,
336 allowed_cascading - 1,
337 &[integer::DictScheme.code()],
338 )?;
339
340 let compressed_values = FloatCompressor::compress(
341 &dict_array.values().to_primitive()?,
342 is_sample,
343 allowed_cascading - 1,
344 &[DICT_SCHEME],
345 )?;
346
347 Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use vortex_array::arrays::PrimitiveArray;
354 use vortex_array::validity::Validity;
355 use vortex_array::{Array, IntoArray, ToCanonical};
356 use vortex_buffer::{Buffer, buffer_mut};
357
358 use crate::float::FloatCompressor;
359 use crate::{Compressor, MAX_CASCADE};
360
361 #[test]
362 fn test_empty() {
363 let result = FloatCompressor::compress(
365 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
366 false,
367 3,
368 &[],
369 )
370 .unwrap();
371
372 assert!(result.is_empty());
373 }
374
375 #[test]
376 fn test_compress() {
377 let mut values = buffer_mut![1.0f32; 1024];
378 for i in 0..1024 {
380 values[i] = (i % 50) as f32;
384 }
385
386 let floats = values.into_array().to_primitive().unwrap();
387 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
388 println!("compressed: {}", compressed.display_tree())
389 }
390}