1mod dictionary;
2mod stats;
3
4use vortex_alp::{ALPArray, RDEncoder, alp_encode};
5use vortex_array::arrays::{ConstantArray, PrimitiveArray};
6use vortex_array::variants::PrimitiveArrayTrait;
7use vortex_array::{Array, ArrayRef, ArrayStatistics, ToCanonical};
8use vortex_dict::DictArray;
9use vortex_dtype::PType;
10use vortex_error::{VortexExpect, VortexResult, vortex_panic};
11use vortex_runend::RunEndArray;
12use vortex_runend::compress::runend_encode;
13
14use self::stats::FloatStats;
15use crate::float::dictionary::dictionary_encode;
16use crate::integer::{IntCompressor, IntegerStats};
17use crate::patches::compress_patches;
18use crate::{
19 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20 estimate_compression_ratio_with_sampling, integer,
21};
22
23const RUN_END_THRESHOLD: u32 = 3;
25
26pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
27
28impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
29
30pub struct FloatCompressor;
31
32impl Compressor for FloatCompressor {
33 type ArrayType = PrimitiveArray;
34 type SchemeType = dyn FloatScheme;
35 type StatsType = FloatStats;
36
37 fn schemes() -> &'static [&'static Self::SchemeType] {
38 &[
39 &UncompressedScheme,
40 &ConstantScheme,
41 &ALPScheme,
42 &ALPRDScheme,
43 &DictScheme,
44 ]
45 }
46
47 fn default_scheme() -> &'static Self::SchemeType {
48 &UncompressedScheme
49 }
50
51 fn dict_scheme_code() -> FloatCode {
52 DICT_SCHEME
53 }
54}
55
56const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
57const CONSTANT_SCHEME: FloatCode = FloatCode(1);
58const ALP_SCHEME: FloatCode = FloatCode(2);
59const ALPRD_SCHEME: FloatCode = FloatCode(3);
60const DICT_SCHEME: FloatCode = FloatCode(4);
61const RUNEND_SCHEME: FloatCode = FloatCode(5);
62
63#[derive(Debug, Copy, Clone)]
64struct UncompressedScheme;
65
66#[derive(Debug, Copy, Clone)]
67struct ConstantScheme;
68
69#[derive(Debug, Copy, Clone)]
70struct ALPScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct ALPRDScheme;
74
75#[derive(Debug, Copy, Clone)]
76struct DictScheme;
77
78#[derive(Debug, Copy, Clone)]
79struct RunEndScheme;
80
81impl Scheme for UncompressedScheme {
82 type StatsType = FloatStats;
83 type CodeType = FloatCode;
84
85 fn code(&self) -> FloatCode {
86 UNCOMPRESSED_SCHEME
87 }
88
89 fn expected_compression_ratio(
90 &self,
91 _stats: &Self::StatsType,
92 _is_sample: bool,
93 _allowed_cascading: usize,
94 _excludes: &[FloatCode],
95 ) -> VortexResult<f64> {
96 Ok(1.0)
97 }
98
99 fn compress(
100 &self,
101 stats: &Self::StatsType,
102 _is_sample: bool,
103 _allowed_cascading: usize,
104 _excludes: &[FloatCode],
105 ) -> VortexResult<ArrayRef> {
106 Ok(stats.source().to_array())
107 }
108}
109
110impl Scheme for ConstantScheme {
111 type StatsType = FloatStats;
112 type CodeType = FloatCode;
113
114 fn code(&self) -> FloatCode {
115 CONSTANT_SCHEME
116 }
117
118 fn expected_compression_ratio(
119 &self,
120 stats: &Self::StatsType,
121 is_sample: bool,
122 _allowed_cascading: usize,
123 _excludes: &[FloatCode],
124 ) -> VortexResult<f64> {
125 if is_sample {
127 return Ok(0.0);
128 }
129
130 if stats.distinct_values_count > 1 {
132 return Ok(0.0);
133 }
134
135 if stats.null_count > 0 && stats.value_count > 0 {
137 return Ok(0.0);
138 }
139
140 Ok(stats.value_count as f64)
141 }
142
143 fn compress(
144 &self,
145 stats: &Self::StatsType,
146 _is_sample: bool,
147 _allowed_cascading: usize,
148 _excludes: &[FloatCode],
149 ) -> VortexResult<ArrayRef> {
150 let scalar = stats
151 .source()
152 .as_constant()
153 .vortex_expect("must be constant");
154
155 Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
156 }
157}
158
159#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
160pub struct FloatCode(u8);
161
162impl Scheme for ALPScheme {
163 type StatsType = FloatStats;
164 type CodeType = FloatCode;
165
166 fn code(&self) -> FloatCode {
167 ALP_SCHEME
168 }
169
170 fn expected_compression_ratio(
171 &self,
172 stats: &Self::StatsType,
173 is_sample: bool,
174 allowed_cascading: usize,
175 excludes: &[FloatCode],
176 ) -> VortexResult<f64> {
177 if stats.source().ptype() == PType::F16 {
179 return Ok(0.0);
180 }
181
182 if allowed_cascading == 0 {
183 return Ok(0.0);
186 }
187
188 estimate_compression_ratio_with_sampling(
189 self,
190 stats,
191 is_sample,
192 allowed_cascading,
193 excludes,
194 )
195 }
196
197 fn compress(
198 &self,
199 stats: &FloatStats,
200 is_sample: bool,
201 allowed_cascading: usize,
202 excludes: &[FloatCode],
203 ) -> VortexResult<ArrayRef> {
204 let alp = alp_encode(stats.source())?;
205 let alp_ints = alp.encoded().to_primitive()?;
206
207 let mut int_excludes = Vec::new();
211 if excludes.contains(&DICT_SCHEME) {
212 int_excludes.push(integer::DictScheme.code());
213 }
214 if excludes.contains(&RUNEND_SCHEME) {
215 int_excludes.push(integer::RunEndScheme.code());
216 }
217
218 let compressed_alp_ints =
219 IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
220
221 let patches = alp.patches().map(compress_patches).transpose()?;
222
223 Ok(ALPArray::try_new(compressed_alp_ints, alp.exponents(), patches)?.into_array())
224 }
225}
226
227impl Scheme for ALPRDScheme {
228 type StatsType = FloatStats;
229 type CodeType = FloatCode;
230
231 fn code(&self) -> FloatCode {
232 ALPRD_SCHEME
233 }
234
235 fn expected_compression_ratio(
236 &self,
237 stats: &Self::StatsType,
238 is_sample: bool,
239 allowed_cascading: usize,
240 excludes: &[FloatCode],
241 ) -> VortexResult<f64> {
242 if stats.source().ptype() == PType::F16 {
243 return Ok(0.0);
244 }
245
246 estimate_compression_ratio_with_sampling(
247 self,
248 stats,
249 is_sample,
250 allowed_cascading,
251 excludes,
252 )
253 }
254
255 fn compress(
256 &self,
257 stats: &Self::StatsType,
258 _is_sample: bool,
259 _allowed_cascading: usize,
260 _excludes: &[FloatCode],
261 ) -> VortexResult<ArrayRef> {
262 let encoder = match stats.source().ptype() {
263 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
264 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
265 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
266 };
267
268 let mut alp_rd = encoder.encode(stats.source());
269
270 let patches = alp_rd
271 .left_parts_patches()
272 .map(compress_patches)
273 .transpose()?;
274 alp_rd.replace_left_parts_patches(patches);
275
276 Ok(alp_rd.into_array())
277 }
278}
279
280impl Scheme for DictScheme {
281 type StatsType = FloatStats;
282 type CodeType = FloatCode;
283
284 fn code(&self) -> FloatCode {
285 DICT_SCHEME
286 }
287
288 fn expected_compression_ratio(
289 &self,
290 stats: &Self::StatsType,
291 is_sample: bool,
292 allowed_cascading: usize,
293 excludes: &[FloatCode],
294 ) -> VortexResult<f64> {
295 if stats.value_count == 0 {
296 return Ok(0.0);
297 }
298
299 if stats.distinct_values_count > stats.value_count / 2 {
301 return Ok(0.0);
302 }
303
304 estimate_compression_ratio_with_sampling(
306 self,
307 stats,
308 is_sample,
309 allowed_cascading,
310 excludes,
311 )
312 }
313
314 fn compress(
315 &self,
316 stats: &Self::StatsType,
317 is_sample: bool,
318 allowed_cascading: usize,
319 _excludes: &[FloatCode],
320 ) -> VortexResult<ArrayRef> {
321 let dict_array = dictionary_encode(stats)?;
322
323 let codes_stats = IntegerStats::generate_opts(
325 &dict_array.codes().to_primitive()?,
326 GenerateStatsOptions {
327 count_distinct_values: false,
328 },
329 );
330 let codes_scheme = IntCompressor::choose_scheme(
331 &codes_stats,
332 is_sample,
333 allowed_cascading - 1,
334 &[integer::DictScheme.code()],
335 )?;
336 let compressed_codes = codes_scheme.compress(
337 &codes_stats,
338 is_sample,
339 allowed_cascading - 1,
340 &[integer::DictScheme.code()],
341 )?;
342
343 let compressed_values = FloatCompressor::compress(
344 &dict_array.values().to_primitive()?,
345 is_sample,
346 allowed_cascading - 1,
347 &[DICT_SCHEME],
348 )?;
349
350 Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
351 }
352}
353
354impl Scheme for RunEndScheme {
355 type StatsType = FloatStats;
356 type CodeType = FloatCode;
357
358 fn code(&self) -> FloatCode {
359 RUNEND_SCHEME
360 }
361
362 fn expected_compression_ratio(
363 &self,
364 stats: &Self::StatsType,
365 is_sample: bool,
366 allowed_cascading: usize,
367 excludes: &[FloatCode],
368 ) -> VortexResult<f64> {
369 if stats.average_run_length < RUN_END_THRESHOLD {
370 return Ok(0.0);
371 }
372
373 estimate_compression_ratio_with_sampling(
374 self,
375 stats,
376 is_sample,
377 allowed_cascading,
378 excludes,
379 )
380 }
381
382 fn compress(
383 &self,
384 stats: &FloatStats,
385 is_sample: bool,
386 allowed_cascading: usize,
387 _excludes: &[FloatCode],
388 ) -> VortexResult<ArrayRef> {
389 let (ends, values) = runend_encode(stats.source())?;
390 let compressed_ends = IntCompressor::compress(
392 &ends,
393 is_sample,
394 allowed_cascading - 1,
395 &[
396 integer::RunEndScheme.code(),
397 integer::DictScheme.code(),
398 integer::SparseScheme.code(),
399 ],
400 )?;
401
402 Ok(RunEndArray::try_new(compressed_ends, values)?.into_array())
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use vortex_array::arrays::PrimitiveArray;
409 use vortex_array::validity::Validity;
410 use vortex_array::{Array, IntoArray, ToCanonical};
411 use vortex_buffer::{Buffer, buffer_mut};
412
413 use crate::float::FloatCompressor;
414 use crate::{Compressor, MAX_CASCADE};
415
416 #[test]
417 fn test_empty() {
418 let result = FloatCompressor::compress(
420 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
421 false,
422 3,
423 &[],
424 )
425 .unwrap();
426
427 assert!(result.is_empty());
428 }
429
430 #[test]
431 fn test_compress() {
432 let mut values = buffer_mut![1.0f32; 1024];
433 for i in 0..1024 {
435 values[i] = (i % 50) as f32;
439 }
440
441 let floats = values.into_array().to_primitive().unwrap();
442 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
443 println!("compressed: {}", compressed.tree_display())
444 }
445}