vortex_btrblocks/schemes/
float.rs1use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::ExecutionCtx;
16use vortex_array::IntoArray;
17use vortex_array::arrays::Patched;
18use vortex_array::arrays::PrimitiveArray;
19use vortex_array::arrays::patched::use_experimental_patches;
20use vortex_array::arrays::primitive::PrimitiveArrayExt;
21use vortex_array::dtype::PType;
22use vortex_compressor::estimate::CompressionEstimate;
23use vortex_compressor::estimate::DeferredEstimate;
24use vortex_compressor::estimate::EstimateVerdict;
25use vortex_compressor::scheme::ChildSelection;
26use vortex_compressor::scheme::DescendantExclusion;
27use vortex_error::VortexResult;
28use vortex_error::vortex_panic;
29use vortex_sparse::Sparse;
30use vortex_sparse::SparseExt as _;
31
32use super::integer::SparseScheme as IntSparseScheme;
33use crate::ArrayAndStats;
34use crate::CascadingCompressor;
35use crate::CompressorContext;
36use crate::Scheme;
37use crate::SchemeExt;
38use crate::compress_patches;
39use crate::schemes::rle_ancestor_exclusions;
40use crate::schemes::rle_descendant_exclusions;
41
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub struct ALPScheme;
45
46#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct ALPRDScheme;
49
50#[derive(Debug, Copy, Clone, PartialEq, Eq)]
54pub struct NullDominatedSparseScheme;
55
56#[cfg(feature = "pco")]
58#[derive(Debug, Copy, Clone, PartialEq, Eq)]
59pub struct PcoScheme;
60
61pub use vortex_compressor::builtins::FloatConstantScheme;
63pub use vortex_compressor::builtins::FloatDictScheme;
64pub use vortex_compressor::builtins::is_float_primitive;
65pub use vortex_compressor::stats::FloatStats;
66
67#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct FloatRLEScheme;
70
71impl Scheme for ALPScheme {
72 fn scheme_name(&self) -> &'static str {
73 "vortex.float.alp"
74 }
75
76 fn matches(&self, canonical: &Canonical) -> bool {
77 is_float_primitive(canonical)
78 }
79
80 fn num_children(&self) -> usize {
82 1
83 }
84
85 fn expected_compression_ratio(
86 &self,
87 data: &ArrayAndStats,
88 compress_ctx: CompressorContext,
89 _exec_ctx: &mut ExecutionCtx,
90 ) -> CompressionEstimate {
91 if compress_ctx.finished_cascading() {
94 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
95 }
96
97 if data.array_as_primitive().ptype() == PType::F16 {
99 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
100 }
101
102 CompressionEstimate::Deferred(DeferredEstimate::Sample)
103 }
104
105 fn compress(
106 &self,
107 compressor: &CascadingCompressor,
108 data: &ArrayAndStats,
109 compress_ctx: CompressorContext,
110 exec_ctx: &mut ExecutionCtx,
111 ) -> VortexResult<ArrayRef> {
112 let alp_encoded = alp_encode(data.array_as_primitive(), None, exec_ctx)?;
113
114 let compressed_alp_ints = compressor.compress_child(
116 alp_encoded.encoded(),
117 &compress_ctx,
118 self.id(),
119 0,
120 exec_ctx,
121 )?;
122
123 let alp_stats = alp_encoded.as_array().statistics().to_owned();
124 let exponents = alp_encoded.exponents();
125
126 if use_experimental_patches() {
127 let patches = alp_encoded.patches();
128
129 let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();
131
132 match patches {
133 None => Ok(alp_array),
134 Some(p) => Ok(Patched::from_array_and_patches(alp_array, &p, exec_ctx)?
135 .with_stats_set(alp_stats)
136 .into_array()),
137 }
138 } else {
139 let patches = alp_encoded
140 .patches()
141 .map(|p| compress_patches(p, exec_ctx))
142 .transpose()?;
143
144 Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
145 }
146 }
147}
148
149impl Scheme for ALPRDScheme {
150 fn scheme_name(&self) -> &'static str {
151 "vortex.float.alprd"
152 }
153
154 fn matches(&self, canonical: &Canonical) -> bool {
155 is_float_primitive(canonical)
156 }
157
158 fn expected_compression_ratio(
159 &self,
160 data: &ArrayAndStats,
161 _compress_ctx: CompressorContext,
162 _exec_ctx: &mut ExecutionCtx,
163 ) -> CompressionEstimate {
164 if data.array_as_primitive().ptype() == PType::F16 {
166 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
167 }
168
169 CompressionEstimate::Deferred(DeferredEstimate::Sample)
170 }
171
172 fn compress(
173 &self,
174 _compressor: &CascadingCompressor,
175 data: &ArrayAndStats,
176 _compress_ctx: CompressorContext,
177 exec_ctx: &mut ExecutionCtx,
178 ) -> VortexResult<ArrayRef> {
179 let primitive_array = data.array_as_primitive();
180
181 let encoder = match primitive_array.ptype() {
182 PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
183 PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
184 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
185 };
186
187 let alp_rd = encoder.encode(primitive_array, exec_ctx);
188 let dtype = alp_rd.dtype().clone();
189 let right_bit_width = alp_rd.right_bit_width();
190 let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
191 parts.left_parts_patches = parts
192 .left_parts_patches
193 .map(|p| compress_patches(p, exec_ctx))
194 .transpose()?;
195
196 Ok(vortex_alp::ALPRD::try_new(
197 dtype,
198 parts.left_parts,
199 parts.left_parts_dictionary,
200 parts.right_parts,
201 right_bit_width,
202 parts.left_parts_patches,
203 exec_ctx,
204 )?
205 .into_array())
206 }
207}
208
209impl Scheme for NullDominatedSparseScheme {
210 fn scheme_name(&self) -> &'static str {
211 "vortex.float.sparse"
212 }
213
214 fn matches(&self, canonical: &Canonical) -> bool {
215 is_float_primitive(canonical)
216 }
217
218 fn num_children(&self) -> usize {
220 1
221 }
222
223 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
225 vec![DescendantExclusion {
226 excluded: IntSparseScheme.id(),
227 children: ChildSelection::All,
228 }]
229 }
230
231 fn expected_compression_ratio(
232 &self,
233 data: &ArrayAndStats,
234 _compress_ctx: CompressorContext,
235 exec_ctx: &mut ExecutionCtx,
236 ) -> CompressionEstimate {
237 let len = data.array_len() as f64;
238 let stats = data.float_stats(exec_ctx);
239 let value_count = stats.value_count();
240
241 if value_count == 0 {
243 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
244 }
245
246 if stats.null_count() as f64 / len > 0.9 {
248 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
249 }
250
251 CompressionEstimate::Verdict(EstimateVerdict::Skip)
253 }
254
255 fn compress(
256 &self,
257 compressor: &CascadingCompressor,
258 data: &ArrayAndStats,
259 compress_ctx: CompressorContext,
260 exec_ctx: &mut ExecutionCtx,
261 ) -> VortexResult<ArrayRef> {
262 let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
264
265 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
266 let indices = sparse
267 .patches()
268 .indices()
269 .clone()
270 .execute::<PrimitiveArray>(exec_ctx)?
271 .narrow()?;
272 let compressed_indices = compressor.compress_child(
273 &indices.into_array(),
274 &compress_ctx,
275 self.id(),
276 0,
277 exec_ctx,
278 )?;
279
280 Sparse::try_new(
281 compressed_indices,
282 sparse.patches().values().clone(),
283 sparse.len(),
284 sparse.fill_scalar().clone(),
285 )
286 .map(|a| a.into_array())
287 } else {
288 Ok(sparse_encoded)
289 }
290 }
291}
292
293#[cfg(feature = "pco")]
294impl Scheme for PcoScheme {
295 fn scheme_name(&self) -> &'static str {
296 "vortex.float.pco"
297 }
298
299 fn matches(&self, canonical: &Canonical) -> bool {
300 is_float_primitive(canonical)
301 }
302
303 fn expected_compression_ratio(
304 &self,
305 _data: &ArrayAndStats,
306 _compress_ctx: CompressorContext,
307 _exec_ctx: &mut ExecutionCtx,
308 ) -> CompressionEstimate {
309 CompressionEstimate::Deferred(DeferredEstimate::Sample)
310 }
311
312 fn compress(
313 &self,
314 _compressor: &CascadingCompressor,
315 data: &ArrayAndStats,
316 _compress_ctx: CompressorContext,
317 exec_ctx: &mut ExecutionCtx,
318 ) -> VortexResult<ArrayRef> {
319 Ok(vortex_pco::Pco::from_primitive(
320 data.array_as_primitive(),
321 pco::DEFAULT_COMPRESSION_LEVEL,
322 8192,
323 exec_ctx,
324 )?
325 .into_array())
326 }
327}
328
329impl Scheme for FloatRLEScheme {
330 fn scheme_name(&self) -> &'static str {
331 "vortex.float.rle"
332 }
333
334 fn matches(&self, canonical: &Canonical) -> bool {
335 is_float_primitive(canonical)
336 }
337
338 fn num_children(&self) -> usize {
340 3
341 }
342
343 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
344 rle_descendant_exclusions()
345 }
346
347 fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
348 rle_ancestor_exclusions()
349 }
350
351 fn expected_compression_ratio(
352 &self,
353 data: &ArrayAndStats,
354 compress_ctx: CompressorContext,
355 exec_ctx: &mut ExecutionCtx,
356 ) -> CompressionEstimate {
357 if compress_ctx.finished_cascading() {
359 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
360 }
361
362 if data.float_stats(exec_ctx).average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
363 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
364 }
365
366 CompressionEstimate::Deferred(DeferredEstimate::Sample)
367 }
368
369 fn compress(
370 &self,
371 compressor: &CascadingCompressor,
372 data: &ArrayAndStats,
373 compress_ctx: CompressorContext,
374 exec_ctx: &mut ExecutionCtx,
375 ) -> VortexResult<ArrayRef> {
376 super::integer::rle_compress(self, compressor, data, compress_ctx, exec_ctx)
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use std::iter;
383 use std::sync::LazyLock;
384
385 use vortex_array::IntoArray;
386 use vortex_array::VortexSessionExecute;
387 use vortex_array::arrays::PrimitiveArray;
388 use vortex_array::assert_arrays_eq;
389 use vortex_array::builders::ArrayBuilder;
390 use vortex_array::builders::PrimitiveBuilder;
391 use vortex_array::display::DisplayOptions;
392 use vortex_array::dtype::Nullability;
393 use vortex_array::session::ArraySession;
394 use vortex_array::validity::Validity;
395 use vortex_buffer::Buffer;
396 use vortex_buffer::buffer_mut;
397 use vortex_compressor::CascadingCompressor;
398 use vortex_error::VortexResult;
399 use vortex_fastlanes::RLE;
400 use vortex_session::VortexSession;
401
402 use crate::BtrBlocksCompressor;
403 use crate::schemes::float::FloatRLEScheme;
404
405 static SESSION: LazyLock<VortexSession> =
406 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
407
408 #[test]
409 fn test_empty() -> VortexResult<()> {
410 let btr = BtrBlocksCompressor::default();
411 let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
412 let result = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
413
414 assert!(result.is_empty());
415 Ok(())
416 }
417
418 #[test]
419 fn test_compress() -> VortexResult<()> {
420 let mut values = buffer_mut![1.0f32; 1024];
421 for i in 0..1024 {
422 values[i] = (i % 50) as f32;
423 }
424
425 let array = values.into_array();
426 let btr = BtrBlocksCompressor::default();
427 let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
428 assert_eq!(compressed.len(), 1024);
429
430 let display = compressed
431 .display_as(DisplayOptions::MetadataOnly)
432 .to_string()
433 .to_lowercase();
434 assert_eq!(display, "vortex.dict(f32, len=1024)");
435
436 Ok(())
437 }
438
439 #[test]
440 fn test_rle_compression() -> VortexResult<()> {
441 let mut values = Vec::new();
442 values.extend(iter::repeat_n(1.5f32, 100));
443 values.extend(iter::repeat_n(2.7f32, 200));
444 values.extend(iter::repeat_n(3.15f32, 150));
445
446 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
447
448 let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
449 let compressed =
450 compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
451 assert!(compressed.is::<RLE>());
452
453 let expected = Buffer::copy_from(&values).into_array();
454 assert_arrays_eq!(compressed, expected);
455 Ok(())
456 }
457
458 #[test]
459 fn test_sparse_compression() -> VortexResult<()> {
460 let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
461 array.append_value(f32::NAN);
462 array.append_value(-f32::NAN);
463 array.append_value(f32::INFINITY);
464 array.append_value(-f32::INFINITY);
465 array.append_value(0.0f32);
466 array.append_value(-0.0f32);
467 array.append_nulls(90);
468
469 let array = array.finish_into_primitive().into_array();
470 let btr = BtrBlocksCompressor::default();
471 let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
472 assert_eq!(compressed.len(), 96);
473
474 let display = compressed
475 .display_as(DisplayOptions::MetadataOnly)
476 .to_string()
477 .to_lowercase();
478 assert_eq!(display, "vortex.sparse(f32?, len=96)");
479
480 Ok(())
481 }
482}
483
484#[cfg(test)]
486mod scheme_selection_tests {
487 use std::sync::LazyLock;
488
489 use vortex_alp::ALP;
490 use vortex_array::IntoArray;
491 use vortex_array::VortexSessionExecute;
492 use vortex_array::arrays::Constant;
493 use vortex_array::arrays::Dict;
494 use vortex_array::arrays::PrimitiveArray;
495 use vortex_array::builders::ArrayBuilder;
496 use vortex_array::builders::PrimitiveBuilder;
497 use vortex_array::dtype::Nullability;
498 use vortex_array::session::ArraySession;
499 use vortex_array::validity::Validity;
500 use vortex_buffer::Buffer;
501 use vortex_error::VortexResult;
502 use vortex_session::VortexSession;
503
504 use crate::BtrBlocksCompressor;
505
506 static SESSION: LazyLock<VortexSession> =
507 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
508
509 #[test]
510 fn test_constant_compressed() -> VortexResult<()> {
511 let values: Vec<f64> = vec![42.5; 100];
512 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
513 let btr = BtrBlocksCompressor::default();
514 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
515 assert!(compressed.is::<Constant>());
516 Ok(())
517 }
518
519 #[test]
520 fn test_alp_compressed() -> VortexResult<()> {
521 let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
522 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
523 let btr = BtrBlocksCompressor::default();
524 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
525 assert!(compressed.is::<ALP>());
526 Ok(())
527 }
528
529 #[test]
530 fn test_dict_compressed() -> VortexResult<()> {
531 let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
532 let values: Vec<f64> = (0..1000)
533 .map(|i| distinct_values[i % distinct_values.len()])
534 .collect();
535 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
536 let btr = BtrBlocksCompressor::default();
537 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
538 assert!(compressed.is::<ALP>());
539 assert!(compressed.children()[0].is::<Dict>());
540 Ok(())
541 }
542
543 #[test]
544 fn test_null_dominated_compressed() -> VortexResult<()> {
545 let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
546 for i in 0..5 {
547 builder.append_value(i as f64);
548 }
549 builder.append_nulls(95);
550 let array = builder.finish_into_primitive();
551 let btr = BtrBlocksCompressor::default();
552 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
553 assert_eq!(compressed.len(), 100);
555 Ok(())
556 }
557}