vortex_btrblocks/schemes/
float.rs1use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::IntoArray;
16use vortex_array::ToCanonical;
17use vortex_array::arrays::primitive::PrimitiveArrayExt;
18use vortex_array::dtype::PType;
19use vortex_compressor::estimate::CompressionEstimate;
20use vortex_compressor::scheme::ChildSelection;
21use vortex_compressor::scheme::DescendantExclusion;
22use vortex_error::VortexResult;
23use vortex_error::vortex_panic;
24use vortex_sparse::Sparse;
25
26use super::integer::SparseScheme as IntSparseScheme;
27use crate::ArrayAndStats;
28use crate::CascadingCompressor;
29use crate::CompressorContext;
30use crate::Scheme;
31use crate::SchemeExt;
32use crate::compress_patches;
33use crate::schemes::rle_ancestor_exclusions;
34use crate::schemes::rle_descendant_exclusions;
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq)]
38pub struct ALPScheme;
39
40#[derive(Debug, Copy, Clone, PartialEq, Eq)]
42pub struct ALPRDScheme;
43
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct NullDominatedSparseScheme;
49
50#[cfg(feature = "pco")]
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53pub struct PcoScheme;
54
55pub use vortex_compressor::builtins::FloatConstantScheme;
57pub use vortex_compressor::builtins::FloatDictScheme;
58pub use vortex_compressor::builtins::is_float_primitive;
59pub use vortex_compressor::stats::FloatStats;
60
61#[derive(Debug, Copy, Clone, PartialEq, Eq)]
63pub struct FloatRLEScheme;
64
65impl Scheme for ALPScheme {
66 fn scheme_name(&self) -> &'static str {
67 "vortex.float.alp"
68 }
69
70 fn matches(&self, canonical: &Canonical) -> bool {
71 is_float_primitive(canonical)
72 }
73
74 fn num_children(&self) -> usize {
76 1
77 }
78
79 fn expected_compression_ratio(
80 &self,
81 data: &mut ArrayAndStats,
82 ctx: CompressorContext,
83 ) -> CompressionEstimate {
84 if ctx.finished_cascading() {
87 return CompressionEstimate::Skip;
88 }
89
90 if data.array_as_primitive().ptype() == PType::F16 {
92 return CompressionEstimate::Skip;
93 }
94
95 CompressionEstimate::Sample
96 }
97
98 fn compress(
99 &self,
100 compressor: &CascadingCompressor,
101 data: &mut ArrayAndStats,
102 ctx: CompressorContext,
103 ) -> VortexResult<ArrayRef> {
104 let alp_encoded = alp_encode(&data.array_as_primitive(), None)?;
105
106 let compressed_alp_ints =
108 compressor.compress_child(alp_encoded.encoded(), &ctx, self.id(), 0)?;
109
110 let patches = alp_encoded.patches().map(compress_patches).transpose()?;
113
114 Ok(ALP::new(compressed_alp_ints, alp_encoded.exponents(), patches).into_array())
115 }
116}
117
118impl Scheme for ALPRDScheme {
119 fn scheme_name(&self) -> &'static str {
120 "vortex.float.alprd"
121 }
122
123 fn matches(&self, canonical: &Canonical) -> bool {
124 is_float_primitive(canonical)
125 }
126
127 fn expected_compression_ratio(
128 &self,
129 data: &mut ArrayAndStats,
130 _ctx: CompressorContext,
131 ) -> CompressionEstimate {
132 if data.array_as_primitive().ptype() == PType::F16 {
134 return CompressionEstimate::Skip;
135 }
136
137 CompressionEstimate::Sample
138 }
139
140 fn compress(
141 &self,
142 _compressor: &CascadingCompressor,
143 data: &mut ArrayAndStats,
144 _ctx: CompressorContext,
145 ) -> VortexResult<ArrayRef> {
146 let primitive_array = data.array_as_primitive();
147
148 let encoder = match primitive_array.ptype() {
149 PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
150 PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
151 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
152 };
153
154 let alp_rd = encoder.encode(&primitive_array);
155 let dtype = alp_rd.dtype().clone();
156 let right_bit_width = alp_rd.right_bit_width();
157 let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
158 parts.left_parts_patches = parts.left_parts_patches.map(compress_patches).transpose()?;
159
160 Ok(vortex_alp::ALPRD::try_new(
161 dtype,
162 parts.left_parts,
163 parts.left_parts_dictionary,
164 parts.right_parts,
165 right_bit_width,
166 parts.left_parts_patches,
167 )?
168 .into_array())
169 }
170}
171
172impl Scheme for NullDominatedSparseScheme {
173 fn scheme_name(&self) -> &'static str {
174 "vortex.float.sparse"
175 }
176
177 fn matches(&self, canonical: &Canonical) -> bool {
178 is_float_primitive(canonical)
179 }
180
181 fn num_children(&self) -> usize {
183 1
184 }
185
186 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
188 vec![DescendantExclusion {
189 excluded: IntSparseScheme.id(),
190 children: ChildSelection::All,
191 }]
192 }
193
194 fn expected_compression_ratio(
195 &self,
196 data: &mut ArrayAndStats,
197 _ctx: CompressorContext,
198 ) -> CompressionEstimate {
199 let len = data.array_len() as f64;
200 let stats = data.float_stats();
201 let value_count = stats.value_count();
202
203 if value_count == 0 {
205 return CompressionEstimate::Skip;
206 }
207
208 if stats.null_count() as f64 / len > 0.9 {
210 return CompressionEstimate::Ratio(len / value_count as f64);
211 }
212
213 CompressionEstimate::Skip
215 }
216
217 fn compress(
218 &self,
219 compressor: &CascadingCompressor,
220 data: &mut ArrayAndStats,
221 ctx: CompressorContext,
222 ) -> VortexResult<ArrayRef> {
223 let sparse_encoded = Sparse::encode(data.array(), None)?;
225
226 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
227 let indices = sparse.patches().indices().to_primitive().narrow()?;
228 let compressed_indices =
229 compressor.compress_child(&indices.into_array(), &ctx, self.id(), 0)?;
230
231 Sparse::try_new(
232 compressed_indices,
233 sparse.patches().values().clone(),
234 sparse.len(),
235 sparse.fill_scalar().clone(),
236 )
237 .map(|a| a.into_array())
238 } else {
239 Ok(sparse_encoded)
240 }
241 }
242}
243
244#[cfg(feature = "pco")]
245impl Scheme for PcoScheme {
246 fn scheme_name(&self) -> &'static str {
247 "vortex.float.pco"
248 }
249
250 fn matches(&self, canonical: &Canonical) -> bool {
251 is_float_primitive(canonical)
252 }
253
254 fn expected_compression_ratio(
255 &self,
256 _data: &mut ArrayAndStats,
257 _ctx: CompressorContext,
258 ) -> CompressionEstimate {
259 CompressionEstimate::Sample
260 }
261
262 fn compress(
263 &self,
264 _compressor: &CascadingCompressor,
265 data: &mut ArrayAndStats,
266 _ctx: CompressorContext,
267 ) -> VortexResult<ArrayRef> {
268 Ok(vortex_pco::Pco::from_primitive(
269 &data.array_as_primitive(),
270 pco::DEFAULT_COMPRESSION_LEVEL,
271 8192,
272 )?
273 .into_array())
274 }
275}
276
277impl Scheme for FloatRLEScheme {
278 fn scheme_name(&self) -> &'static str {
279 "vortex.float.rle"
280 }
281
282 fn matches(&self, canonical: &Canonical) -> bool {
283 is_float_primitive(canonical)
284 }
285
286 fn num_children(&self) -> usize {
288 3
289 }
290
291 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
292 rle_descendant_exclusions()
293 }
294
295 fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
296 rle_ancestor_exclusions()
297 }
298
299 fn expected_compression_ratio(
300 &self,
301 data: &mut ArrayAndStats,
302 ctx: CompressorContext,
303 ) -> CompressionEstimate {
304 if ctx.finished_cascading() {
306 return CompressionEstimate::Skip;
307 }
308
309 if data.float_stats().average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
310 return CompressionEstimate::Skip;
311 }
312
313 CompressionEstimate::Sample
314 }
315
316 fn compress(
317 &self,
318 compressor: &CascadingCompressor,
319 data: &mut ArrayAndStats,
320 ctx: CompressorContext,
321 ) -> VortexResult<ArrayRef> {
322 super::integer::rle_compress(self, compressor, data, ctx)
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use std::iter;
329
330 use vortex_array::IntoArray;
331 use vortex_array::arrays::PrimitiveArray;
332 use vortex_array::assert_arrays_eq;
333 use vortex_array::builders::ArrayBuilder;
334 use vortex_array::builders::PrimitiveBuilder;
335 use vortex_array::display::DisplayOptions;
336 use vortex_array::dtype::Nullability;
337 use vortex_array::validity::Validity;
338 use vortex_buffer::Buffer;
339 use vortex_buffer::buffer_mut;
340 use vortex_compressor::CascadingCompressor;
341 use vortex_error::VortexResult;
342 use vortex_fastlanes::RLE;
343
344 use crate::BtrBlocksCompressor;
345 use crate::schemes::float::FloatRLEScheme;
346
347 #[test]
348 fn test_empty() -> VortexResult<()> {
349 let btr = BtrBlocksCompressor::default();
350 let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
351 let result = btr.compress(&array)?;
352
353 assert!(result.is_empty());
354 Ok(())
355 }
356
357 #[test]
358 fn test_compress() -> VortexResult<()> {
359 let mut values = buffer_mut![1.0f32; 1024];
360 for i in 0..1024 {
361 values[i] = (i % 50) as f32;
362 }
363
364 let array = values.into_array();
365 let btr = BtrBlocksCompressor::default();
366 let compressed = btr.compress(&array)?;
367 assert_eq!(compressed.len(), 1024);
368
369 let display = compressed
370 .display_as(DisplayOptions::MetadataOnly)
371 .to_string()
372 .to_lowercase();
373 assert_eq!(display, "vortex.dict(f32, len=1024)");
374
375 Ok(())
376 }
377
378 #[test]
379 fn test_rle_compression() -> VortexResult<()> {
380 let mut values = Vec::new();
381 values.extend(iter::repeat_n(1.5f32, 100));
382 values.extend(iter::repeat_n(2.7f32, 200));
383 values.extend(iter::repeat_n(3.15f32, 150));
384
385 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
386
387 let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
388 let compressed = compressor.compress(&array.into_array())?;
389 assert!(compressed.is::<RLE>());
390
391 let expected = Buffer::copy_from(&values).into_array();
392 assert_arrays_eq!(compressed, expected);
393 Ok(())
394 }
395
396 #[test]
397 fn test_sparse_compression() -> VortexResult<()> {
398 let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
399 array.append_value(f32::NAN);
400 array.append_value(-f32::NAN);
401 array.append_value(f32::INFINITY);
402 array.append_value(-f32::INFINITY);
403 array.append_value(0.0f32);
404 array.append_value(-0.0f32);
405 array.append_nulls(90);
406
407 let array = array.finish_into_primitive().into_array();
408 let btr = BtrBlocksCompressor::default();
409 let compressed = btr.compress(&array)?;
410 assert_eq!(compressed.len(), 96);
411
412 let display = compressed
413 .display_as(DisplayOptions::MetadataOnly)
414 .to_string()
415 .to_lowercase();
416 assert_eq!(display, "vortex.sparse(f32?, len=96)");
417
418 Ok(())
419 }
420}
421
422#[cfg(test)]
424mod scheme_selection_tests {
425 use vortex_alp::ALP;
426 use vortex_array::IntoArray;
427 use vortex_array::arrays::Constant;
428 use vortex_array::arrays::Dict;
429 use vortex_array::arrays::PrimitiveArray;
430 use vortex_array::builders::ArrayBuilder;
431 use vortex_array::builders::PrimitiveBuilder;
432 use vortex_array::dtype::Nullability;
433 use vortex_array::validity::Validity;
434 use vortex_buffer::Buffer;
435 use vortex_error::VortexResult;
436
437 use crate::BtrBlocksCompressor;
438
439 #[test]
440 fn test_constant_compressed() -> VortexResult<()> {
441 let values: Vec<f64> = vec![42.5; 100];
442 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
443 let btr = BtrBlocksCompressor::default();
444 let compressed = btr.compress(&array.into_array())?;
445 assert!(compressed.is::<Constant>());
446 Ok(())
447 }
448
449 #[test]
450 fn test_alp_compressed() -> VortexResult<()> {
451 let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
452 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
453 let btr = BtrBlocksCompressor::default();
454 let compressed = btr.compress(&array.into_array())?;
455 assert!(compressed.is::<ALP>());
456 Ok(())
457 }
458
459 #[test]
460 fn test_dict_compressed() -> VortexResult<()> {
461 let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
462 let values: Vec<f64> = (0..1000)
463 .map(|i| distinct_values[i % distinct_values.len()])
464 .collect();
465 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
466 let btr = BtrBlocksCompressor::default();
467 let compressed = btr.compress(&array.into_array())?;
468 assert!(compressed.is::<ALP>());
469 assert!(compressed.children()[0].is::<Dict>());
470 Ok(())
471 }
472
473 #[test]
474 fn test_null_dominated_compressed() -> VortexResult<()> {
475 let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
476 for i in 0..5 {
477 builder.append_value(i as f64);
478 }
479 builder.append_nulls(95);
480 let array = builder.finish_into_primitive();
481 let btr = BtrBlocksCompressor::default();
482 let compressed = btr.compress(&array.into_array())?;
483 assert_eq!(compressed.len(), 100);
485 Ok(())
486 }
487}