vortex_btrblocks/schemes/integer/
sparse.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::PrimitiveArray;
12use vortex_array::arrays::primitive::PrimitiveArrayExt;
13use vortex_array::scalar::Scalar;
14use vortex_compressor::builtins::IntDictScheme;
15use vortex_compressor::estimate::CompressionEstimate;
16use vortex_compressor::estimate::EstimateVerdict;
17use vortex_compressor::scheme::ChildSelection;
18use vortex_compressor::scheme::DescendantExclusion;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_sparse::Sparse;
22use vortex_sparse::SparseExt as _;
23
24use super::IntRLEScheme;
25use super::RunEndScheme;
26use crate::ArrayAndStats;
27use crate::CascadingCompressor;
28use crate::CompressorContext;
29use crate::GenerateStatsOptions;
30use crate::Scheme;
31use crate::SchemeExt;
32
33#[derive(Debug, Copy, Clone, PartialEq, Eq)]
35pub struct SparseScheme;
36
37impl Scheme for SparseScheme {
38 fn scheme_name(&self) -> &'static str {
39 "vortex.int.sparse"
40 }
41
42 fn matches(&self, canonical: &Canonical) -> bool {
43 canonical.dtype().is_int()
44 }
45
46 fn stats_options(&self) -> GenerateStatsOptions {
47 GenerateStatsOptions {
48 count_distinct_values: true,
49 }
50 }
51
52 fn num_children(&self) -> usize {
54 2
55 }
56
57 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
60 vec![
61 DescendantExclusion {
62 excluded: IntDictScheme.id(),
63 children: ChildSelection::One(1),
64 },
65 DescendantExclusion {
66 excluded: RunEndScheme.id(),
67 children: ChildSelection::One(1),
68 },
69 DescendantExclusion {
70 excluded: IntRLEScheme.id(),
71 children: ChildSelection::One(1),
72 },
73 DescendantExclusion {
74 excluded: SparseScheme.id(),
75 children: ChildSelection::One(1),
76 },
77 ]
78 }
79
80 fn expected_compression_ratio(
81 &self,
82 data: &ArrayAndStats,
83 _compress_ctx: CompressorContext,
84 exec_ctx: &mut ExecutionCtx,
85 ) -> CompressionEstimate {
86 let len = data.array_len() as f64;
87 let stats = data.integer_stats(exec_ctx);
88 let value_count = stats.value_count();
89
90 if value_count == 0 {
92 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
93 }
94
95 if stats.null_count() as f64 / len > 0.9 {
97 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
98 }
99
100 let (_, most_frequent_count) = stats
101 .erased()
102 .most_frequent_value_and_count()
103 .vortex_expect(
104 "this must be present since `SparseScheme` declared that we need distinct values",
105 );
106
107 if most_frequent_count == value_count {
109 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
110 }
111 debug_assert!(value_count > most_frequent_count);
112
113 let freq = most_frequent_count as f64 / value_count as f64;
115 if freq < 0.9 {
116 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
117 }
118
119 CompressionEstimate::Verdict(EstimateVerdict::Ratio(
121 value_count as f64 / (value_count - most_frequent_count) as f64,
122 ))
123 }
124
125 fn compress(
126 &self,
127 compressor: &CascadingCompressor,
128 data: &ArrayAndStats,
129 compress_ctx: CompressorContext,
130 exec_ctx: &mut ExecutionCtx,
131 ) -> VortexResult<ArrayRef> {
132 let len = data.array_len();
133 let stats = data.integer_stats(exec_ctx);
134 let array = data.array();
135
136 let (most_frequent_value, most_frequent_count) = stats
137 .erased()
138 .most_frequent_value_and_count()
139 .vortex_expect(
140 "this must be present since `SparseScheme` declared that we need distinct values",
141 );
142
143 if most_frequent_count as usize == len {
144 return Ok(ConstantArray::new(
146 Scalar::primitive_value(
147 most_frequent_value,
148 most_frequent_value.ptype(),
149 array.dtype().nullability(),
150 ),
151 len,
152 )
153 .into_array());
154 }
155
156 let sparse_encoded = Sparse::encode(
157 array,
158 Some(Scalar::primitive_value(
159 most_frequent_value,
160 most_frequent_value.ptype(),
161 array.dtype().nullability(),
162 )),
163 exec_ctx,
164 )?;
165
166 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
167 let sparse_values_primitive = sparse
168 .patches()
169 .values()
170 .clone()
171 .execute::<PrimitiveArray>(exec_ctx)?;
172 let compressed_values = compressor.compress_child(
173 &sparse_values_primitive.into_array(),
174 &compress_ctx,
175 self.id(),
176 0,
177 exec_ctx,
178 )?;
179
180 let indices = sparse
181 .patches()
182 .indices()
183 .clone()
184 .execute::<PrimitiveArray>(exec_ctx)?
185 .narrow(exec_ctx)?;
186
187 let compressed_indices = compressor.compress_child(
188 &indices.into_array(),
189 &compress_ctx,
190 self.id(),
191 1,
192 exec_ctx,
193 )?;
194
195 Sparse::try_new(
196 compressed_indices,
197 compressed_values,
198 sparse.len(),
199 sparse.fill_scalar().clone(),
200 )
201 .map(|a| a.into_array())
202 } else {
203 Ok(sparse_encoded)
204 }
205 }
206}