vortex_btrblocks/schemes/integer/
runend.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_compressor::builtins::BinaryDictScheme;
12use vortex_compressor::builtins::FloatDictScheme;
13use vortex_compressor::builtins::IntDictScheme;
14use vortex_compressor::builtins::StringDictScheme;
15use vortex_compressor::estimate::CompressionEstimate;
16use vortex_compressor::estimate::DeferredEstimate;
17use vortex_compressor::estimate::EstimateVerdict;
18use vortex_compressor::scheme::AncestorExclusion;
19use vortex_compressor::scheme::ChildSelection;
20use vortex_compressor::scheme::DescendantExclusion;
21use vortex_error::VortexResult;
22use vortex_runend::RunEnd;
23use vortex_runend::compress::runend_encode;
24
25use super::IntRLEScheme;
26use super::SparseScheme;
27use crate::ArrayAndStats;
28use crate::CascadingCompressor;
29use crate::CompressorContext;
30use crate::Scheme;
31use crate::SchemeExt;
32
33const RUN_END_THRESHOLD: u32 = 4;
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq)]
38pub struct RunEndScheme;
39
40impl Scheme for RunEndScheme {
41 fn scheme_name(&self) -> &'static str {
42 "vortex.int.runend"
43 }
44
45 fn matches(&self, canonical: &Canonical) -> bool {
46 canonical.dtype().is_int()
47 }
48
49 fn num_children(&self) -> usize {
51 2
52 }
53
54 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
57 vec![
58 DescendantExclusion {
59 excluded: IntDictScheme.id(),
60 children: ChildSelection::One(1),
61 },
62 DescendantExclusion {
63 excluded: RunEndScheme.id(),
64 children: ChildSelection::One(1),
65 },
66 DescendantExclusion {
67 excluded: IntRLEScheme.id(),
68 children: ChildSelection::One(1),
69 },
70 DescendantExclusion {
71 excluded: SparseScheme.id(),
72 children: ChildSelection::One(1),
73 },
74 ]
75 }
76
77 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
80 vec![
81 AncestorExclusion {
82 ancestor: IntDictScheme.id(),
83 children: ChildSelection::One(0),
84 },
85 AncestorExclusion {
86 ancestor: FloatDictScheme.id(),
87 children: ChildSelection::One(0),
88 },
89 AncestorExclusion {
90 ancestor: StringDictScheme.id(),
91 children: ChildSelection::One(0),
92 },
93 AncestorExclusion {
94 ancestor: BinaryDictScheme.id(),
95 children: ChildSelection::One(0),
96 },
97 ]
98 }
99
100 fn expected_compression_ratio(
101 &self,
102 data: &ArrayAndStats,
103 _compress_ctx: CompressorContext,
104 exec_ctx: &mut ExecutionCtx,
105 ) -> CompressionEstimate {
106 if data.integer_stats(exec_ctx).average_run_length() < RUN_END_THRESHOLD {
108 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
109 }
110
111 CompressionEstimate::Deferred(DeferredEstimate::Sample)
112 }
113
114 fn compress(
115 &self,
116 compressor: &CascadingCompressor,
117 data: &ArrayAndStats,
118 compress_ctx: CompressorContext,
119 exec_ctx: &mut ExecutionCtx,
120 ) -> VortexResult<ArrayRef> {
121 let (ends, values) = runend_encode(data.array_as_primitive(), exec_ctx);
123
124 let values_primitive = values.execute::<PrimitiveArray>(exec_ctx)?;
125 let compressed_values = compressor.compress_child(
126 &values_primitive.into_array(),
127 &compress_ctx,
128 self.id(),
129 0,
130 exec_ctx,
131 )?;
132
133 let compressed_ends =
134 compressor.compress_child(&ends.into_array(), &compress_ctx, self.id(), 1, exec_ctx)?;
135
136 Ok(unsafe {
138 RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
139 .into_array()
140 })
141 }
142}