vortex_btrblocks/schemes/
float.rs1use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::ExecutionCtx;
16use vortex_array::IntoArray;
17use vortex_array::arrays::Patched;
18use vortex_array::arrays::PrimitiveArray;
19use vortex_array::arrays::patched::use_experimental_patches;
20use vortex_array::arrays::primitive::PrimitiveArrayExt;
21use vortex_array::dtype::PType;
22use vortex_compressor::estimate::CompressionEstimate;
23use vortex_compressor::estimate::DeferredEstimate;
24use vortex_compressor::estimate::EstimateVerdict;
25use vortex_compressor::scheme::ChildSelection;
26use vortex_compressor::scheme::DescendantExclusion;
27use vortex_error::VortexResult;
28use vortex_error::vortex_panic;
29use vortex_sparse::Sparse;
30
31use super::integer::SparseScheme as IntSparseScheme;
32use crate::ArrayAndStats;
33use crate::CascadingCompressor;
34use crate::CompressorContext;
35use crate::Scheme;
36use crate::SchemeExt;
37use crate::compress_patches;
38use crate::schemes::rle_ancestor_exclusions;
39use crate::schemes::rle_descendant_exclusions;
40
41#[derive(Debug, Copy, Clone, PartialEq, Eq)]
43pub struct ALPScheme;
44
45#[derive(Debug, Copy, Clone, PartialEq, Eq)]
47pub struct ALPRDScheme;
48
49#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53pub struct NullDominatedSparseScheme;
54
55#[cfg(feature = "pco")]
57#[derive(Debug, Copy, Clone, PartialEq, Eq)]
58pub struct PcoScheme;
59
60pub use vortex_compressor::builtins::FloatConstantScheme;
62pub use vortex_compressor::builtins::FloatDictScheme;
63pub use vortex_compressor::builtins::is_float_primitive;
64pub use vortex_compressor::stats::FloatStats;
65
66#[derive(Debug, Copy, Clone, PartialEq, Eq)]
68pub struct FloatRLEScheme;
69
70impl Scheme for ALPScheme {
71 fn scheme_name(&self) -> &'static str {
72 "vortex.float.alp"
73 }
74
75 fn matches(&self, canonical: &Canonical) -> bool {
76 is_float_primitive(canonical)
77 }
78
79 fn num_children(&self) -> usize {
81 1
82 }
83
84 fn expected_compression_ratio(
85 &self,
86 data: &ArrayAndStats,
87 compress_ctx: CompressorContext,
88 _exec_ctx: &mut ExecutionCtx,
89 ) -> CompressionEstimate {
90 if compress_ctx.finished_cascading() {
93 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
94 }
95
96 if data.array_as_primitive().ptype() == PType::F16 {
98 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
99 }
100
101 CompressionEstimate::Deferred(DeferredEstimate::Sample)
102 }
103
104 fn compress(
105 &self,
106 compressor: &CascadingCompressor,
107 data: &ArrayAndStats,
108 compress_ctx: CompressorContext,
109 exec_ctx: &mut ExecutionCtx,
110 ) -> VortexResult<ArrayRef> {
111 let alp_encoded = alp_encode(data.array_as_primitive(), None, exec_ctx)?;
112
113 let compressed_alp_ints = compressor.compress_child(
115 alp_encoded.encoded(),
116 &compress_ctx,
117 self.id(),
118 0,
119 exec_ctx,
120 )?;
121
122 let alp_stats = alp_encoded.as_array().statistics().to_owned();
123 let exponents = alp_encoded.exponents();
124
125 if use_experimental_patches() {
126 let patches = alp_encoded.patches();
127
128 let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();
130
131 match patches {
132 None => Ok(alp_array),
133 Some(p) => Ok(Patched::from_array_and_patches(alp_array, &p, exec_ctx)?
134 .with_stats_set(alp_stats)
135 .into_array()),
136 }
137 } else {
138 let patches = alp_encoded
139 .patches()
140 .map(|p| compress_patches(p, exec_ctx))
141 .transpose()?;
142
143 Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
144 }
145 }
146}
147
148impl Scheme for ALPRDScheme {
149 fn scheme_name(&self) -> &'static str {
150 "vortex.float.alprd"
151 }
152
153 fn matches(&self, canonical: &Canonical) -> bool {
154 is_float_primitive(canonical)
155 }
156
157 fn expected_compression_ratio(
158 &self,
159 data: &ArrayAndStats,
160 _compress_ctx: CompressorContext,
161 _exec_ctx: &mut ExecutionCtx,
162 ) -> CompressionEstimate {
163 if data.array_as_primitive().ptype() == PType::F16 {
165 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
166 }
167
168 CompressionEstimate::Deferred(DeferredEstimate::Sample)
169 }
170
171 fn compress(
172 &self,
173 _compressor: &CascadingCompressor,
174 data: &ArrayAndStats,
175 _compress_ctx: CompressorContext,
176 exec_ctx: &mut ExecutionCtx,
177 ) -> VortexResult<ArrayRef> {
178 let primitive_array = data.array_as_primitive();
179
180 let encoder = match primitive_array.ptype() {
181 PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
182 PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
183 ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
184 };
185
186 let alp_rd = encoder.encode(primitive_array, exec_ctx);
187 let dtype = alp_rd.dtype().clone();
188 let right_bit_width = alp_rd.right_bit_width();
189 let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
190 parts.left_parts_patches = parts
191 .left_parts_patches
192 .map(|p| compress_patches(p, exec_ctx))
193 .transpose()?;
194
195 Ok(vortex_alp::ALPRD::try_new(
196 dtype,
197 parts.left_parts,
198 parts.left_parts_dictionary,
199 parts.right_parts,
200 right_bit_width,
201 parts.left_parts_patches,
202 exec_ctx,
203 )?
204 .into_array())
205 }
206}
207
208impl Scheme for NullDominatedSparseScheme {
209 fn scheme_name(&self) -> &'static str {
210 "vortex.float.sparse"
211 }
212
213 fn matches(&self, canonical: &Canonical) -> bool {
214 is_float_primitive(canonical)
215 }
216
217 fn num_children(&self) -> usize {
219 1
220 }
221
222 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
224 vec![DescendantExclusion {
225 excluded: IntSparseScheme.id(),
226 children: ChildSelection::All,
227 }]
228 }
229
230 fn expected_compression_ratio(
231 &self,
232 data: &ArrayAndStats,
233 _compress_ctx: CompressorContext,
234 exec_ctx: &mut ExecutionCtx,
235 ) -> CompressionEstimate {
236 let len = data.array_len() as f64;
237 let stats = data.float_stats(exec_ctx);
238 let value_count = stats.value_count();
239
240 if value_count == 0 {
242 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
243 }
244
245 if stats.null_count() as f64 / len > 0.9 {
247 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
248 }
249
250 CompressionEstimate::Verdict(EstimateVerdict::Skip)
252 }
253
254 fn compress(
255 &self,
256 compressor: &CascadingCompressor,
257 data: &ArrayAndStats,
258 compress_ctx: CompressorContext,
259 exec_ctx: &mut ExecutionCtx,
260 ) -> VortexResult<ArrayRef> {
261 let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
263
264 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
265 let indices = sparse
266 .patches()
267 .indices()
268 .clone()
269 .execute::<PrimitiveArray>(exec_ctx)?
270 .narrow()?;
271 let compressed_indices = compressor.compress_child(
272 &indices.into_array(),
273 &compress_ctx,
274 self.id(),
275 0,
276 exec_ctx,
277 )?;
278
279 Sparse::try_new(
280 compressed_indices,
281 sparse.patches().values().clone(),
282 sparse.len(),
283 sparse.fill_scalar().clone(),
284 )
285 .map(|a| a.into_array())
286 } else {
287 Ok(sparse_encoded)
288 }
289 }
290}
291
292#[cfg(feature = "pco")]
293impl Scheme for PcoScheme {
294 fn scheme_name(&self) -> &'static str {
295 "vortex.float.pco"
296 }
297
298 fn matches(&self, canonical: &Canonical) -> bool {
299 is_float_primitive(canonical)
300 }
301
302 fn expected_compression_ratio(
303 &self,
304 _data: &ArrayAndStats,
305 _compress_ctx: CompressorContext,
306 _exec_ctx: &mut ExecutionCtx,
307 ) -> CompressionEstimate {
308 CompressionEstimate::Deferred(DeferredEstimate::Sample)
309 }
310
311 fn compress(
312 &self,
313 _compressor: &CascadingCompressor,
314 data: &ArrayAndStats,
315 _compress_ctx: CompressorContext,
316 exec_ctx: &mut ExecutionCtx,
317 ) -> VortexResult<ArrayRef> {
318 Ok(vortex_pco::Pco::from_primitive(
319 data.array_as_primitive(),
320 pco::DEFAULT_COMPRESSION_LEVEL,
321 8192,
322 exec_ctx,
323 )?
324 .into_array())
325 }
326}
327
328impl Scheme for FloatRLEScheme {
329 fn scheme_name(&self) -> &'static str {
330 "vortex.float.rle"
331 }
332
333 fn matches(&self, canonical: &Canonical) -> bool {
334 is_float_primitive(canonical)
335 }
336
337 fn num_children(&self) -> usize {
339 3
340 }
341
342 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
343 rle_descendant_exclusions()
344 }
345
346 fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
347 rle_ancestor_exclusions()
348 }
349
350 fn expected_compression_ratio(
351 &self,
352 data: &ArrayAndStats,
353 compress_ctx: CompressorContext,
354 exec_ctx: &mut ExecutionCtx,
355 ) -> CompressionEstimate {
356 if compress_ctx.finished_cascading() {
358 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
359 }
360
361 if data.float_stats(exec_ctx).average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
362 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
363 }
364
365 CompressionEstimate::Deferred(DeferredEstimate::Sample)
366 }
367
368 fn compress(
369 &self,
370 compressor: &CascadingCompressor,
371 data: &ArrayAndStats,
372 compress_ctx: CompressorContext,
373 exec_ctx: &mut ExecutionCtx,
374 ) -> VortexResult<ArrayRef> {
375 super::integer::rle_compress(self, compressor, data, compress_ctx, exec_ctx)
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::iter;
382 use std::sync::LazyLock;
383
384 use vortex_array::IntoArray;
385 use vortex_array::VortexSessionExecute;
386 use vortex_array::arrays::PrimitiveArray;
387 use vortex_array::assert_arrays_eq;
388 use vortex_array::builders::ArrayBuilder;
389 use vortex_array::builders::PrimitiveBuilder;
390 use vortex_array::display::DisplayOptions;
391 use vortex_array::dtype::Nullability;
392 use vortex_array::session::ArraySession;
393 use vortex_array::validity::Validity;
394 use vortex_buffer::Buffer;
395 use vortex_buffer::buffer_mut;
396 use vortex_compressor::CascadingCompressor;
397 use vortex_error::VortexResult;
398 use vortex_fastlanes::RLE;
399 use vortex_session::VortexSession;
400
401 use crate::BtrBlocksCompressor;
402 use crate::schemes::float::FloatRLEScheme;
403
404 static SESSION: LazyLock<VortexSession> =
405 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
406
407 #[test]
408 fn test_empty() -> VortexResult<()> {
409 let btr = BtrBlocksCompressor::default();
410 let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
411 let result = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
412
413 assert!(result.is_empty());
414 Ok(())
415 }
416
417 #[test]
418 fn test_compress() -> VortexResult<()> {
419 let mut values = buffer_mut![1.0f32; 1024];
420 for i in 0..1024 {
421 values[i] = (i % 50) as f32;
422 }
423
424 let array = values.into_array();
425 let btr = BtrBlocksCompressor::default();
426 let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
427 assert_eq!(compressed.len(), 1024);
428
429 let display = compressed
430 .display_as(DisplayOptions::MetadataOnly)
431 .to_string()
432 .to_lowercase();
433 assert_eq!(display, "vortex.dict(f32, len=1024)");
434
435 Ok(())
436 }
437
438 #[test]
439 fn test_rle_compression() -> VortexResult<()> {
440 let mut values = Vec::new();
441 values.extend(iter::repeat_n(1.5f32, 100));
442 values.extend(iter::repeat_n(2.7f32, 200));
443 values.extend(iter::repeat_n(3.15f32, 150));
444
445 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
446
447 let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
448 let compressed =
449 compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
450 assert!(compressed.is::<RLE>());
451
452 let expected = Buffer::copy_from(&values).into_array();
453 assert_arrays_eq!(compressed, expected);
454 Ok(())
455 }
456
457 #[test]
458 fn test_sparse_compression() -> VortexResult<()> {
459 let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
460 array.append_value(f32::NAN);
461 array.append_value(-f32::NAN);
462 array.append_value(f32::INFINITY);
463 array.append_value(-f32::INFINITY);
464 array.append_value(0.0f32);
465 array.append_value(-0.0f32);
466 array.append_nulls(90);
467
468 let array = array.finish_into_primitive().into_array();
469 let btr = BtrBlocksCompressor::default();
470 let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
471 assert_eq!(compressed.len(), 96);
472
473 let display = compressed
474 .display_as(DisplayOptions::MetadataOnly)
475 .to_string()
476 .to_lowercase();
477 assert_eq!(display, "vortex.sparse(f32?, len=96)");
478
479 Ok(())
480 }
481}
482
483#[cfg(test)]
485mod scheme_selection_tests {
486 use std::sync::LazyLock;
487
488 use vortex_alp::ALP;
489 use vortex_array::IntoArray;
490 use vortex_array::VortexSessionExecute;
491 use vortex_array::arrays::Constant;
492 use vortex_array::arrays::Dict;
493 use vortex_array::arrays::PrimitiveArray;
494 use vortex_array::builders::ArrayBuilder;
495 use vortex_array::builders::PrimitiveBuilder;
496 use vortex_array::dtype::Nullability;
497 use vortex_array::session::ArraySession;
498 use vortex_array::validity::Validity;
499 use vortex_buffer::Buffer;
500 use vortex_error::VortexResult;
501 use vortex_session::VortexSession;
502
503 use crate::BtrBlocksCompressor;
504
505 static SESSION: LazyLock<VortexSession> =
506 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
507
508 #[test]
509 fn test_constant_compressed() -> VortexResult<()> {
510 let values: Vec<f64> = vec![42.5; 100];
511 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
512 let btr = BtrBlocksCompressor::default();
513 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
514 assert!(compressed.is::<Constant>());
515 Ok(())
516 }
517
518 #[test]
519 fn test_alp_compressed() -> VortexResult<()> {
520 let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
521 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
522 let btr = BtrBlocksCompressor::default();
523 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
524 assert!(compressed.is::<ALP>());
525 Ok(())
526 }
527
528 #[test]
529 fn test_dict_compressed() -> VortexResult<()> {
530 let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
531 let values: Vec<f64> = (0..1000)
532 .map(|i| distinct_values[i % distinct_values.len()])
533 .collect();
534 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
535 let btr = BtrBlocksCompressor::default();
536 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
537 assert!(compressed.is::<ALP>());
538 assert!(compressed.children()[0].is::<Dict>());
539 Ok(())
540 }
541
542 #[test]
543 fn test_null_dominated_compressed() -> VortexResult<()> {
544 let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
545 for i in 0..5 {
546 builder.append_value(i as f64);
547 }
548 builder.append_nulls(95);
549 let array = builder.finish_into_primitive();
550 let btr = BtrBlocksCompressor::default();
551 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
552 assert_eq!(compressed.len(), 100);
554 Ok(())
555 }
556}