1mod dictionary;
2mod stats;
3
4use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
5use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dtype::PType;
9use vortex_error::{VortexExpect, VortexResult, vortex_panic};
10use vortex_runend::RunEndArray;
11use vortex_runend::compress::runend_encode;
12
13use self::stats::FloatStats;
14use crate::float::dictionary::dictionary_encode;
15use crate::integer::{IntCompressor, IntegerStats};
16use crate::patches::compress_patches;
17use crate::{
18 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
19 estimate_compression_ratio_with_sampling, integer,
20};
21
22const RUN_END_THRESHOLD: u32 = 3;
24
25pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26
27impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
28
29pub struct FloatCompressor;
30
31impl Compressor for FloatCompressor {
32 type ArrayVTable = PrimitiveVTable;
33 type SchemeType = dyn FloatScheme;
34 type StatsType = FloatStats;
35
36 fn schemes() -> &'static [&'static Self::SchemeType] {
37 &[
38 &UncompressedScheme,
39 &ConstantScheme,
40 &ALPScheme,
41 &ALPRDScheme,
42 &DictScheme,
43 ]
44 }
45
46 fn default_scheme() -> &'static Self::SchemeType {
47 &UncompressedScheme
48 }
49
50 fn dict_scheme_code() -> FloatCode {
51 DICT_SCHEME
52 }
53}
54
55const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
56const CONSTANT_SCHEME: FloatCode = FloatCode(1);
57const ALP_SCHEME: FloatCode = FloatCode(2);
58const ALPRD_SCHEME: FloatCode = FloatCode(3);
59const DICT_SCHEME: FloatCode = FloatCode(4);
60const RUNEND_SCHEME: FloatCode = FloatCode(5);
61
62#[derive(Debug, Copy, Clone)]
63struct UncompressedScheme;
64
65#[derive(Debug, Copy, Clone)]
66struct ConstantScheme;
67
68#[derive(Debug, Copy, Clone)]
69struct ALPScheme;
70
71#[derive(Debug, Copy, Clone)]
72struct ALPRDScheme;
73
74#[derive(Debug, Copy, Clone)]
75struct DictScheme;
76
77#[derive(Debug, Copy, Clone)]
78struct RunEndScheme;
79
80impl Scheme for UncompressedScheme {
81 type StatsType = FloatStats;
82 type CodeType = FloatCode;
83
84 fn code(&self) -> FloatCode {
85 UNCOMPRESSED_SCHEME
86 }
87
88 fn expected_compression_ratio(
89 &self,
90 _stats: &Self::StatsType,
91 _is_sample: bool,
92 _allowed_cascading: usize,
93 _excludes: &[FloatCode],
94 ) -> VortexResult<f64> {
95 Ok(1.0)
96 }
97
98 fn compress(
99 &self,
100 stats: &Self::StatsType,
101 _is_sample: bool,
102 _allowed_cascading: usize,
103 _excludes: &[FloatCode],
104 ) -> VortexResult<ArrayRef> {
105 Ok(stats.source().to_array())
106 }
107}
108
109impl Scheme for ConstantScheme {
110 type StatsType = FloatStats;
111 type CodeType = FloatCode;
112
113 fn code(&self) -> FloatCode {
114 CONSTANT_SCHEME
115 }
116
117 fn expected_compression_ratio(
118 &self,
119 stats: &Self::StatsType,
120 is_sample: bool,
121 _allowed_cascading: usize,
122 _excludes: &[FloatCode],
123 ) -> VortexResult<f64> {
124 if is_sample {
126 return Ok(0.0);
127 }
128
129 if stats.distinct_values_count > 1 {
131 return Ok(0.0);
132 }
133
134 if stats.null_count > 0 && stats.value_count > 0 {
136 return Ok(0.0);
137 }
138
139 Ok(stats.value_count as f64)
140 }
141
142 fn compress(
143 &self,
144 stats: &Self::StatsType,
145 _is_sample: bool,
146 _allowed_cascading: usize,
147 _excludes: &[FloatCode],
148 ) -> VortexResult<ArrayRef> {
149 let scalar = stats
150 .source()
151 .as_constant()
152 .vortex_expect("must be constant");
153
154 Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
155 }
156}
157
158#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
159pub struct FloatCode(u8);
160
161impl Scheme for ALPScheme {
162 type StatsType = FloatStats;
163 type CodeType = FloatCode;
164
165 fn code(&self) -> FloatCode {
166 ALP_SCHEME
167 }
168
169 fn expected_compression_ratio(
170 &self,
171 stats: &Self::StatsType,
172 is_sample: bool,
173 allowed_cascading: usize,
174 excludes: &[FloatCode],
175 ) -> VortexResult<f64> {
176 if stats.source().ptype() == PType::F16 {
178 return Ok(0.0);
179 }
180
181 if allowed_cascading == 0 {
182 return Ok(0.0);
185 }
186
187 estimate_compression_ratio_with_sampling(
188 self,
189 stats,
190 is_sample,
191 allowed_cascading,
192 excludes,
193 )
194 }
195
196 fn compress(
197 &self,
198 stats: &FloatStats,
199 is_sample: bool,
200 allowed_cascading: usize,
201 excludes: &[FloatCode],
202 ) -> VortexResult<ArrayRef> {
203 let alp_encoded = ALPEncoding
204 .encode(&stats.source().to_canonical()?, None)?
205 .vortex_expect("Input is a supported floating point array");
206 let alp = alp_encoded.as_::<ALPVTable>();
207 let alp_ints = alp.encoded().to_primitive()?;
208
209 let mut int_excludes = Vec::new();
213 if excludes.contains(&DICT_SCHEME) {
214 int_excludes.push(integer::DictScheme.code());
215 }
216 if excludes.contains(&RUNEND_SCHEME) {
217 int_excludes.push(integer::RunEndScheme.code());
218 }
219
220 let compressed_alp_ints =
221 IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
222
223 let patches = alp.patches().map(compress_patches).transpose()?;
224
225 Ok(ALPArray::try_new(compressed_alp_ints, alp.exponents(), patches)?.into_array())
226 }
227}
228
229impl Scheme for ALPRDScheme {
230 type StatsType = FloatStats;
231 type CodeType = FloatCode;
232
233 fn code(&self) -> FloatCode {
234 ALPRD_SCHEME
235 }
236
237 fn expected_compression_ratio(
238 &self,
239 stats: &Self::StatsType,
240 is_sample: bool,
241 allowed_cascading: usize,
242 excludes: &[FloatCode],
243 ) -> VortexResult<f64> {
244 if stats.source().ptype() == PType::F16 {
245 return Ok(0.0);
246 }
247
248 estimate_compression_ratio_with_sampling(
249 self,
250 stats,
251 is_sample,
252 allowed_cascading,
253 excludes,
254 )
255 }
256
257 fn compress(
258 &self,
259 stats: &Self::StatsType,
260 _is_sample: bool,
261 _allowed_cascading: usize,
262 _excludes: &[FloatCode],
263 ) -> VortexResult<ArrayRef> {
264 let encoder = match stats.source().ptype() {
265 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
266 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
267 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
268 };
269
270 let mut alp_rd = encoder.encode(stats.source());
271
272 let patches = alp_rd
273 .left_parts_patches()
274 .map(compress_patches)
275 .transpose()?;
276 alp_rd.replace_left_parts_patches(patches);
277
278 Ok(alp_rd.into_array())
279 }
280}
281
282impl Scheme for DictScheme {
283 type StatsType = FloatStats;
284 type CodeType = FloatCode;
285
286 fn code(&self) -> FloatCode {
287 DICT_SCHEME
288 }
289
290 fn expected_compression_ratio(
291 &self,
292 stats: &Self::StatsType,
293 is_sample: bool,
294 allowed_cascading: usize,
295 excludes: &[FloatCode],
296 ) -> VortexResult<f64> {
297 if stats.value_count == 0 {
298 return Ok(0.0);
299 }
300
301 if stats.distinct_values_count > stats.value_count / 2 {
303 return Ok(0.0);
304 }
305
306 estimate_compression_ratio_with_sampling(
308 self,
309 stats,
310 is_sample,
311 allowed_cascading,
312 excludes,
313 )
314 }
315
316 fn compress(
317 &self,
318 stats: &Self::StatsType,
319 is_sample: bool,
320 allowed_cascading: usize,
321 _excludes: &[FloatCode],
322 ) -> VortexResult<ArrayRef> {
323 let dict_array = dictionary_encode(stats)?;
324
325 let codes_stats = IntegerStats::generate_opts(
327 &dict_array.codes().to_primitive()?,
328 GenerateStatsOptions {
329 count_distinct_values: false,
330 },
331 );
332 let codes_scheme = IntCompressor::choose_scheme(
333 &codes_stats,
334 is_sample,
335 allowed_cascading - 1,
336 &[integer::DictScheme.code()],
337 )?;
338 let compressed_codes = codes_scheme.compress(
339 &codes_stats,
340 is_sample,
341 allowed_cascading - 1,
342 &[integer::DictScheme.code()],
343 )?;
344
345 let compressed_values = FloatCompressor::compress(
346 &dict_array.values().to_primitive()?,
347 is_sample,
348 allowed_cascading - 1,
349 &[DICT_SCHEME],
350 )?;
351
352 Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
353 }
354}
355
356impl Scheme for RunEndScheme {
357 type StatsType = FloatStats;
358 type CodeType = FloatCode;
359
360 fn code(&self) -> FloatCode {
361 RUNEND_SCHEME
362 }
363
364 fn expected_compression_ratio(
365 &self,
366 stats: &Self::StatsType,
367 is_sample: bool,
368 allowed_cascading: usize,
369 excludes: &[FloatCode],
370 ) -> VortexResult<f64> {
371 if stats.average_run_length < RUN_END_THRESHOLD {
372 return Ok(0.0);
373 }
374
375 estimate_compression_ratio_with_sampling(
376 self,
377 stats,
378 is_sample,
379 allowed_cascading,
380 excludes,
381 )
382 }
383
384 fn compress(
385 &self,
386 stats: &FloatStats,
387 is_sample: bool,
388 allowed_cascading: usize,
389 _excludes: &[FloatCode],
390 ) -> VortexResult<ArrayRef> {
391 let (ends, values) = runend_encode(stats.source())?;
392 let compressed_ends = IntCompressor::compress(
394 &ends,
395 is_sample,
396 allowed_cascading - 1,
397 &[
398 integer::RunEndScheme.code(),
399 integer::DictScheme.code(),
400 integer::SparseScheme.code(),
401 ],
402 )?;
403
404 Ok(RunEndArray::try_new(compressed_ends, values)?.into_array())
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use vortex_array::arrays::PrimitiveArray;
411 use vortex_array::validity::Validity;
412 use vortex_array::{Array, IntoArray, ToCanonical};
413 use vortex_buffer::{Buffer, buffer_mut};
414
415 use crate::float::FloatCompressor;
416 use crate::{Compressor, MAX_CASCADE};
417
418 #[test]
419 fn test_empty() {
420 let result = FloatCompressor::compress(
422 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
423 false,
424 3,
425 &[],
426 )
427 .unwrap();
428
429 assert!(result.is_empty());
430 }
431
432 #[test]
433 fn test_compress() {
434 let mut values = buffer_mut![1.0f32; 1024];
435 for i in 0..1024 {
437 values[i] = (i % 50) as f32;
441 }
442
443 let floats = values.into_array().to_primitive().unwrap();
444 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
445 println!("compressed: {}", compressed.tree_display())
446 }
447}