vortex_btrblocks/schemes/
float.rs1use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::IntoArray;
16use vortex_array::LEGACY_SESSION;
17use vortex_array::ToCanonical;
18use vortex_array::VortexSessionExecute;
19use vortex_array::arrays::Patched;
20use vortex_array::arrays::patched::use_experimental_patches;
21use vortex_array::arrays::primitive::PrimitiveArrayExt;
22use vortex_array::dtype::PType;
23use vortex_compressor::estimate::CompressionEstimate;
24use vortex_compressor::estimate::DeferredEstimate;
25use vortex_compressor::estimate::EstimateVerdict;
26use vortex_compressor::scheme::ChildSelection;
27use vortex_compressor::scheme::DescendantExclusion;
28use vortex_error::VortexResult;
29use vortex_error::vortex_panic;
30use vortex_sparse::Sparse;
31
32use super::integer::SparseScheme as IntSparseScheme;
33use crate::ArrayAndStats;
34use crate::CascadingCompressor;
35use crate::CompressorContext;
36use crate::Scheme;
37use crate::SchemeExt;
38use crate::compress_patches;
39use crate::schemes::rle_ancestor_exclusions;
40use crate::schemes::rle_descendant_exclusions;
41
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub struct ALPScheme;
45
46#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct ALPRDScheme;
49
50#[derive(Debug, Copy, Clone, PartialEq, Eq)]
54pub struct NullDominatedSparseScheme;
55
56#[cfg(feature = "pco")]
58#[derive(Debug, Copy, Clone, PartialEq, Eq)]
59pub struct PcoScheme;
60
61pub use vortex_compressor::builtins::FloatConstantScheme;
63pub use vortex_compressor::builtins::FloatDictScheme;
64pub use vortex_compressor::builtins::is_float_primitive;
65pub use vortex_compressor::stats::FloatStats;
66
67#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct FloatRLEScheme;
70
71impl Scheme for ALPScheme {
72 fn scheme_name(&self) -> &'static str {
73 "vortex.float.alp"
74 }
75
76 fn matches(&self, canonical: &Canonical) -> bool {
77 is_float_primitive(canonical)
78 }
79
80 fn num_children(&self) -> usize {
82 1
83 }
84
85 fn expected_compression_ratio(
86 &self,
87 data: &mut ArrayAndStats,
88 ctx: CompressorContext,
89 ) -> CompressionEstimate {
90 if ctx.finished_cascading() {
93 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
94 }
95
96 if data.array_as_primitive().ptype() == PType::F16 {
98 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
99 }
100
101 CompressionEstimate::Deferred(DeferredEstimate::Sample)
102 }
103
104 fn compress(
105 &self,
106 compressor: &CascadingCompressor,
107 data: &mut ArrayAndStats,
108 ctx: CompressorContext,
109 ) -> VortexResult<ArrayRef> {
110 let alp_encoded = alp_encode(
111 data.array_as_primitive(),
112 None,
113 &mut LEGACY_SESSION.create_execution_ctx(),
114 )?;
115
116 let compressed_alp_ints =
118 compressor.compress_child(alp_encoded.encoded(), &ctx, self.id(), 0)?;
119
120 let alp_stats = alp_encoded.as_array().statistics().to_owned();
121 let exponents = alp_encoded.exponents();
122
123 if use_experimental_patches() {
124 let patches = alp_encoded.patches();
125
126 let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();
128
129 match patches {
130 None => Ok(alp_array),
131 Some(p) => Ok(Patched::from_array_and_patches(
132 alp_array,
133 &p,
134 &mut LEGACY_SESSION.create_execution_ctx(),
135 )?
136 .with_stats_set(alp_stats)
137 .into_array()),
138 }
139 } else {
140 let patches = alp_encoded.patches().map(compress_patches).transpose()?;
141
142 Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
143 }
144 }
145}
146
147impl Scheme for ALPRDScheme {
148 fn scheme_name(&self) -> &'static str {
149 "vortex.float.alprd"
150 }
151
152 fn matches(&self, canonical: &Canonical) -> bool {
153 is_float_primitive(canonical)
154 }
155
156 fn expected_compression_ratio(
157 &self,
158 data: &mut ArrayAndStats,
159 _ctx: CompressorContext,
160 ) -> CompressionEstimate {
161 if data.array_as_primitive().ptype() == PType::F16 {
163 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
164 }
165
166 CompressionEstimate::Deferred(DeferredEstimate::Sample)
167 }
168
169 fn compress(
170 &self,
171 _compressor: &CascadingCompressor,
172 data: &mut ArrayAndStats,
173 _ctx: CompressorContext,
174 ) -> VortexResult<ArrayRef> {
175 let primitive_array = data.array_as_primitive();
176
177 let encoder = match primitive_array.ptype() {
178 PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
179 PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
180 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
181 };
182
183 let alp_rd = encoder.encode(primitive_array);
184 let dtype = alp_rd.dtype().clone();
185 let right_bit_width = alp_rd.right_bit_width();
186 let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
187 parts.left_parts_patches = parts.left_parts_patches.map(compress_patches).transpose()?;
188
189 Ok(vortex_alp::ALPRD::try_new(
190 dtype,
191 parts.left_parts,
192 parts.left_parts_dictionary,
193 parts.right_parts,
194 right_bit_width,
195 parts.left_parts_patches,
196 )?
197 .into_array())
198 }
199}
200
201impl Scheme for NullDominatedSparseScheme {
202 fn scheme_name(&self) -> &'static str {
203 "vortex.float.sparse"
204 }
205
206 fn matches(&self, canonical: &Canonical) -> bool {
207 is_float_primitive(canonical)
208 }
209
210 fn num_children(&self) -> usize {
212 1
213 }
214
215 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
217 vec![DescendantExclusion {
218 excluded: IntSparseScheme.id(),
219 children: ChildSelection::All,
220 }]
221 }
222
223 fn expected_compression_ratio(
224 &self,
225 data: &mut ArrayAndStats,
226 _ctx: CompressorContext,
227 ) -> CompressionEstimate {
228 let len = data.array_len() as f64;
229 let stats = data.float_stats();
230 let value_count = stats.value_count();
231
232 if value_count == 0 {
234 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
235 }
236
237 if stats.null_count() as f64 / len > 0.9 {
239 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
240 }
241
242 CompressionEstimate::Verdict(EstimateVerdict::Skip)
244 }
245
246 fn compress(
247 &self,
248 compressor: &CascadingCompressor,
249 data: &mut ArrayAndStats,
250 ctx: CompressorContext,
251 ) -> VortexResult<ArrayRef> {
252 let sparse_encoded = Sparse::encode(data.array(), None)?;
254
255 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
256 let indices = sparse.patches().indices().to_primitive().narrow()?;
257 let compressed_indices =
258 compressor.compress_child(&indices.into_array(), &ctx, self.id(), 0)?;
259
260 Sparse::try_new(
261 compressed_indices,
262 sparse.patches().values().clone(),
263 sparse.len(),
264 sparse.fill_scalar().clone(),
265 )
266 .map(|a| a.into_array())
267 } else {
268 Ok(sparse_encoded)
269 }
270 }
271}
272
273#[cfg(feature = "pco")]
274impl Scheme for PcoScheme {
275 fn scheme_name(&self) -> &'static str {
276 "vortex.float.pco"
277 }
278
279 fn matches(&self, canonical: &Canonical) -> bool {
280 is_float_primitive(canonical)
281 }
282
283 fn expected_compression_ratio(
284 &self,
285 _data: &mut ArrayAndStats,
286 _ctx: CompressorContext,
287 ) -> CompressionEstimate {
288 CompressionEstimate::Deferred(DeferredEstimate::Sample)
289 }
290
291 fn compress(
292 &self,
293 _compressor: &CascadingCompressor,
294 data: &mut ArrayAndStats,
295 _ctx: CompressorContext,
296 ) -> VortexResult<ArrayRef> {
297 Ok(vortex_pco::Pco::from_primitive(
298 data.array_as_primitive(),
299 pco::DEFAULT_COMPRESSION_LEVEL,
300 8192,
301 )?
302 .into_array())
303 }
304}
305
306impl Scheme for FloatRLEScheme {
307 fn scheme_name(&self) -> &'static str {
308 "vortex.float.rle"
309 }
310
311 fn matches(&self, canonical: &Canonical) -> bool {
312 is_float_primitive(canonical)
313 }
314
315 fn num_children(&self) -> usize {
317 3
318 }
319
320 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
321 rle_descendant_exclusions()
322 }
323
324 fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
325 rle_ancestor_exclusions()
326 }
327
328 fn expected_compression_ratio(
329 &self,
330 data: &mut ArrayAndStats,
331 ctx: CompressorContext,
332 ) -> CompressionEstimate {
333 if ctx.finished_cascading() {
335 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
336 }
337
338 if data.float_stats().average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
339 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
340 }
341
342 CompressionEstimate::Deferred(DeferredEstimate::Sample)
343 }
344
345 fn compress(
346 &self,
347 compressor: &CascadingCompressor,
348 data: &mut ArrayAndStats,
349 ctx: CompressorContext,
350 ) -> VortexResult<ArrayRef> {
351 super::integer::rle_compress(self, compressor, data, ctx)
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use std::iter;
358
359 use vortex_array::IntoArray;
360 use vortex_array::arrays::PrimitiveArray;
361 use vortex_array::assert_arrays_eq;
362 use vortex_array::builders::ArrayBuilder;
363 use vortex_array::builders::PrimitiveBuilder;
364 use vortex_array::display::DisplayOptions;
365 use vortex_array::dtype::Nullability;
366 use vortex_array::validity::Validity;
367 use vortex_buffer::Buffer;
368 use vortex_buffer::buffer_mut;
369 use vortex_compressor::CascadingCompressor;
370 use vortex_error::VortexResult;
371 use vortex_fastlanes::RLE;
372
373 use crate::BtrBlocksCompressor;
374 use crate::schemes::float::FloatRLEScheme;
375
376 #[test]
377 fn test_empty() -> VortexResult<()> {
378 let btr = BtrBlocksCompressor::default();
379 let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
380 let result = btr.compress(&array)?;
381
382 assert!(result.is_empty());
383 Ok(())
384 }
385
386 #[test]
387 fn test_compress() -> VortexResult<()> {
388 let mut values = buffer_mut![1.0f32; 1024];
389 for i in 0..1024 {
390 values[i] = (i % 50) as f32;
391 }
392
393 let array = values.into_array();
394 let btr = BtrBlocksCompressor::default();
395 let compressed = btr.compress(&array)?;
396 assert_eq!(compressed.len(), 1024);
397
398 let display = compressed
399 .display_as(DisplayOptions::MetadataOnly)
400 .to_string()
401 .to_lowercase();
402 assert_eq!(display, "vortex.dict(f32, len=1024)");
403
404 Ok(())
405 }
406
407 #[test]
408 fn test_rle_compression() -> VortexResult<()> {
409 let mut values = Vec::new();
410 values.extend(iter::repeat_n(1.5f32, 100));
411 values.extend(iter::repeat_n(2.7f32, 200));
412 values.extend(iter::repeat_n(3.15f32, 150));
413
414 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
415
416 let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
417 let compressed = compressor.compress(&array.into_array())?;
418 assert!(compressed.is::<RLE>());
419
420 let expected = Buffer::copy_from(&values).into_array();
421 assert_arrays_eq!(compressed, expected);
422 Ok(())
423 }
424
425 #[test]
426 fn test_sparse_compression() -> VortexResult<()> {
427 let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
428 array.append_value(f32::NAN);
429 array.append_value(-f32::NAN);
430 array.append_value(f32::INFINITY);
431 array.append_value(-f32::INFINITY);
432 array.append_value(0.0f32);
433 array.append_value(-0.0f32);
434 array.append_nulls(90);
435
436 let array = array.finish_into_primitive().into_array();
437 let btr = BtrBlocksCompressor::default();
438 let compressed = btr.compress(&array)?;
439 assert_eq!(compressed.len(), 96);
440
441 let display = compressed
442 .display_as(DisplayOptions::MetadataOnly)
443 .to_string()
444 .to_lowercase();
445 assert_eq!(display, "vortex.sparse(f32?, len=96)");
446
447 Ok(())
448 }
449}
450
451#[cfg(test)]
453mod scheme_selection_tests {
454 use vortex_alp::ALP;
455 use vortex_array::IntoArray;
456 use vortex_array::arrays::Constant;
457 use vortex_array::arrays::Dict;
458 use vortex_array::arrays::PrimitiveArray;
459 use vortex_array::builders::ArrayBuilder;
460 use vortex_array::builders::PrimitiveBuilder;
461 use vortex_array::dtype::Nullability;
462 use vortex_array::validity::Validity;
463 use vortex_buffer::Buffer;
464 use vortex_error::VortexResult;
465
466 use crate::BtrBlocksCompressor;
467
468 #[test]
469 fn test_constant_compressed() -> VortexResult<()> {
470 let values: Vec<f64> = vec![42.5; 100];
471 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
472 let btr = BtrBlocksCompressor::default();
473 let compressed = btr.compress(&array.into_array())?;
474 assert!(compressed.is::<Constant>());
475 Ok(())
476 }
477
478 #[test]
479 fn test_alp_compressed() -> VortexResult<()> {
480 let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
481 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
482 let btr = BtrBlocksCompressor::default();
483 let compressed = btr.compress(&array.into_array())?;
484 assert!(compressed.is::<ALP>());
485 Ok(())
486 }
487
488 #[test]
489 fn test_dict_compressed() -> VortexResult<()> {
490 let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
491 let values: Vec<f64> = (0..1000)
492 .map(|i| distinct_values[i % distinct_values.len()])
493 .collect();
494 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
495 let btr = BtrBlocksCompressor::default();
496 let compressed = btr.compress(&array.into_array())?;
497 assert!(compressed.is::<ALP>());
498 assert!(compressed.children()[0].is::<Dict>());
499 Ok(())
500 }
501
502 #[test]
503 fn test_null_dominated_compressed() -> VortexResult<()> {
504 let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
505 for i in 0..5 {
506 builder.append_value(i as f64);
507 }
508 builder.append_nulls(95);
509 let array = builder.finish_into_primitive();
510 let btr = BtrBlocksCompressor::default();
511 let compressed = btr.compress(&array.into_array())?;
512 assert_eq!(compressed.len(), 100);
514 Ok(())
515 }
516}