1pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_dict::DictArray;
11use vortex_dtype::PType;
12use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13
14pub use self::stats::FloatStats;
15use crate::float::dictionary::dictionary_encode;
16use crate::integer::{IntCompressor, IntegerStats};
17use crate::patches::compress_patches;
18use crate::{
19 Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20 estimate_compression_ratio_with_sampling, integer,
21};
22
23pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
24
25impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26
27pub struct FloatCompressor;
29
30impl Compressor for FloatCompressor {
31 type ArrayVTable = PrimitiveVTable;
32 type SchemeType = dyn FloatScheme;
33 type StatsType = FloatStats;
34
35 fn schemes() -> &'static [&'static Self::SchemeType] {
36 &[
37 &UncompressedScheme,
38 &ConstantScheme,
39 &ALPScheme,
40 &ALPRDScheme,
41 &DictScheme,
42 ]
43 }
44
45 fn default_scheme() -> &'static Self::SchemeType {
46 &UncompressedScheme
47 }
48
49 fn dict_scheme_code() -> FloatCode {
50 DICT_SCHEME
51 }
52}
53
54const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
55const CONSTANT_SCHEME: FloatCode = FloatCode(1);
56const ALP_SCHEME: FloatCode = FloatCode(2);
57const ALPRD_SCHEME: FloatCode = FloatCode(3);
58const DICT_SCHEME: FloatCode = FloatCode(4);
59const RUNEND_SCHEME: FloatCode = FloatCode(5);
60
61#[derive(Debug, Copy, Clone)]
62struct UncompressedScheme;
63
64#[derive(Debug, Copy, Clone)]
65struct ConstantScheme;
66
67#[derive(Debug, Copy, Clone)]
68struct ALPScheme;
69
70#[derive(Debug, Copy, Clone)]
71struct ALPRDScheme;
72
73#[derive(Debug, Copy, Clone)]
74struct DictScheme;
75
76impl Scheme for UncompressedScheme {
77 type StatsType = FloatStats;
78 type CodeType = FloatCode;
79
80 fn code(&self) -> FloatCode {
81 UNCOMPRESSED_SCHEME
82 }
83
84 fn expected_compression_ratio(
85 &self,
86 _stats: &Self::StatsType,
87 _is_sample: bool,
88 _allowed_cascading: usize,
89 _excludes: &[FloatCode],
90 ) -> VortexResult<f64> {
91 Ok(1.0)
92 }
93
94 fn compress(
95 &self,
96 stats: &Self::StatsType,
97 _is_sample: bool,
98 _allowed_cascading: usize,
99 _excludes: &[FloatCode],
100 ) -> VortexResult<ArrayRef> {
101 Ok(stats.source().to_array())
102 }
103}
104
105impl Scheme for ConstantScheme {
106 type StatsType = FloatStats;
107 type CodeType = FloatCode;
108
109 fn code(&self) -> FloatCode {
110 CONSTANT_SCHEME
111 }
112
113 fn expected_compression_ratio(
114 &self,
115 stats: &Self::StatsType,
116 is_sample: bool,
117 _allowed_cascading: usize,
118 _excludes: &[FloatCode],
119 ) -> VortexResult<f64> {
120 if is_sample {
122 return Ok(0.0);
123 }
124
125 if stats.distinct_values_count > 1 {
127 return Ok(0.0);
128 }
129
130 if stats.null_count > 0 && stats.value_count > 0 {
132 return Ok(0.0);
133 }
134
135 Ok(stats.value_count as f64)
136 }
137
138 fn compress(
139 &self,
140 stats: &Self::StatsType,
141 _is_sample: bool,
142 _allowed_cascading: usize,
143 _excludes: &[FloatCode],
144 ) -> VortexResult<ArrayRef> {
145 let scalar = stats
146 .source()
147 .as_constant()
148 .vortex_expect("must be constant");
149
150 Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
151 }
152}
153
154#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
155pub struct FloatCode(u8);
156
157impl Scheme for ALPScheme {
158 type StatsType = FloatStats;
159 type CodeType = FloatCode;
160
161 fn code(&self) -> FloatCode {
162 ALP_SCHEME
163 }
164
165 fn expected_compression_ratio(
166 &self,
167 stats: &Self::StatsType,
168 is_sample: bool,
169 allowed_cascading: usize,
170 excludes: &[FloatCode],
171 ) -> VortexResult<f64> {
172 if stats.source().ptype() == PType::F16 {
174 return Ok(0.0);
175 }
176
177 if allowed_cascading == 0 {
178 return Ok(0.0);
181 }
182
183 estimate_compression_ratio_with_sampling(
184 self,
185 stats,
186 is_sample,
187 allowed_cascading,
188 excludes,
189 )
190 }
191
192 fn compress(
193 &self,
194 stats: &FloatStats,
195 is_sample: bool,
196 allowed_cascading: usize,
197 excludes: &[FloatCode],
198 ) -> VortexResult<ArrayRef> {
199 let alp_encoded = ALPEncoding
200 .encode(&stats.source().to_canonical(), None)?
201 .vortex_expect("Input is a supported floating point array");
202 let alp = alp_encoded.as_::<ALPVTable>();
203 let alp_ints = alp.encoded().to_primitive();
204
205 let mut int_excludes = Vec::new();
209 if excludes.contains(&DICT_SCHEME) {
210 int_excludes.push(integer::DictScheme.code());
211 }
212 if excludes.contains(&RUNEND_SCHEME) {
213 int_excludes.push(integer::RunEndScheme.code());
214 }
215
216 let compressed_alp_ints =
217 IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
218
219 let patches = alp.patches().map(compress_patches).transpose()?;
220
221 Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
222 }
223}
224
225impl Scheme for ALPRDScheme {
226 type StatsType = FloatStats;
227 type CodeType = FloatCode;
228
229 fn code(&self) -> FloatCode {
230 ALPRD_SCHEME
231 }
232
233 fn expected_compression_ratio(
234 &self,
235 stats: &Self::StatsType,
236 is_sample: bool,
237 allowed_cascading: usize,
238 excludes: &[FloatCode],
239 ) -> VortexResult<f64> {
240 if stats.source().ptype() == PType::F16 {
241 return Ok(0.0);
242 }
243
244 estimate_compression_ratio_with_sampling(
245 self,
246 stats,
247 is_sample,
248 allowed_cascading,
249 excludes,
250 )
251 }
252
253 fn compress(
254 &self,
255 stats: &Self::StatsType,
256 _is_sample: bool,
257 _allowed_cascading: usize,
258 _excludes: &[FloatCode],
259 ) -> VortexResult<ArrayRef> {
260 let encoder = match stats.source().ptype() {
261 PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
262 PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
263 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
264 };
265
266 let mut alp_rd = encoder.encode(stats.source());
267
268 let patches = alp_rd
269 .left_parts_patches()
270 .map(compress_patches)
271 .transpose()?;
272 alp_rd.replace_left_parts_patches(patches);
273
274 Ok(alp_rd.into_array())
275 }
276}
277
278impl Scheme for DictScheme {
279 type StatsType = FloatStats;
280 type CodeType = FloatCode;
281
282 fn code(&self) -> FloatCode {
283 DICT_SCHEME
284 }
285
286 fn expected_compression_ratio(
287 &self,
288 stats: &Self::StatsType,
289 is_sample: bool,
290 allowed_cascading: usize,
291 excludes: &[FloatCode],
292 ) -> VortexResult<f64> {
293 if stats.value_count == 0 {
294 return Ok(0.0);
295 }
296
297 if stats.distinct_values_count > stats.value_count / 2 {
299 return Ok(0.0);
300 }
301
302 estimate_compression_ratio_with_sampling(
304 self,
305 stats,
306 is_sample,
307 allowed_cascading,
308 excludes,
309 )
310 }
311
312 fn compress(
313 &self,
314 stats: &Self::StatsType,
315 is_sample: bool,
316 allowed_cascading: usize,
317 _excludes: &[FloatCode],
318 ) -> VortexResult<ArrayRef> {
319 let dict_array = dictionary_encode(stats);
320
321 let codes_stats = IntegerStats::generate_opts(
323 &dict_array.codes().to_primitive().downcast()?,
324 GenerateStatsOptions {
325 count_distinct_values: false,
326 },
327 );
328 let codes_scheme = IntCompressor::choose_scheme(
329 &codes_stats,
330 is_sample,
331 allowed_cascading - 1,
332 &[integer::DictScheme.code(), integer::SequenceScheme.code()],
333 )?;
334 let compressed_codes = codes_scheme.compress(
335 &codes_stats,
336 is_sample,
337 allowed_cascading - 1,
338 &[integer::DictScheme.code()],
339 )?;
340
341 let compressed_values = FloatCompressor::compress(
342 &dict_array.values().to_primitive(),
343 is_sample,
344 allowed_cascading - 1,
345 &[DICT_SCHEME],
346 )?;
347
348 unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use vortex_array::arrays::PrimitiveArray;
356 use vortex_array::validity::Validity;
357 use vortex_array::{Array, IntoArray, ToCanonical};
358 use vortex_buffer::{Buffer, buffer_mut};
359
360 use crate::float::FloatCompressor;
361 use crate::{Compressor, MAX_CASCADE};
362
363 #[test]
364 fn test_empty() {
365 let result = FloatCompressor::compress(
367 &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
368 false,
369 3,
370 &[],
371 )
372 .unwrap();
373
374 assert!(result.is_empty());
375 }
376
377 #[test]
378 fn test_compress() {
379 let mut values = buffer_mut![1.0f32; 1024];
380 for i in 0..1024 {
382 values[i] = (i % 50) as f32;
386 }
387
388 let floats = values.into_array().to_primitive();
389 let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
390 println!("compressed: {}", compressed.display_tree())
391 }
392}