vortex_btrblocks/schemes/integer/
rle.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::primitive::PrimitiveArrayExt;
12use vortex_compressor::estimate::CompressionEstimate;
13use vortex_compressor::estimate::DeferredEstimate;
14use vortex_compressor::estimate::EstimateVerdict;
15use vortex_compressor::scheme::AncestorExclusion;
16use vortex_compressor::scheme::DescendantExclusion;
17#[cfg(feature = "unstable_encodings")]
18use vortex_compressor::scheme::SchemeId;
19use vortex_error::VortexResult;
20#[cfg(feature = "unstable_encodings")]
21use vortex_fastlanes::Delta;
22use vortex_fastlanes::RLE;
23use vortex_fastlanes::RLEArrayExt;
24
25use super::RUN_LENGTH_THRESHOLD;
26use crate::ArrayAndStats;
27use crate::CascadingCompressor;
28use crate::CompressorContext;
29use crate::Scheme;
30use crate::SchemeExt;
31use crate::schemes::rle_ancestor_exclusions;
32use crate::schemes::rle_descendant_exclusions;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct IntRLEScheme;
37
38pub(crate) fn rle_compress(
40 scheme: &dyn Scheme,
41 compressor: &CascadingCompressor,
42 data: &ArrayAndStats,
43 compress_ctx: CompressorContext,
44 exec_ctx: &mut ExecutionCtx,
45) -> VortexResult<ArrayRef> {
46 let rle_array = RLE::encode(data.array_as_primitive(), exec_ctx)?;
47
48 let rle_values_primitive = rle_array
49 .values()
50 .clone()
51 .execute::<PrimitiveArray>(exec_ctx)?;
52 let compressed_values = compressor.compress_child(
53 &rle_values_primitive.into_array(),
54 &compress_ctx,
55 scheme.id(),
56 0,
57 exec_ctx,
58 )?;
59
60 #[cfg(feature = "unstable_encodings")]
62 let compressed_indices = {
63 let rle_indices_primitive = rle_array
64 .indices()
65 .clone()
66 .execute::<PrimitiveArray>(exec_ctx)?
67 .narrow(exec_ctx)?;
68 try_compress_delta(
69 compressor,
70 &rle_indices_primitive.into_array(),
71 &compress_ctx,
72 scheme.id(),
73 1,
74 exec_ctx,
75 )?
76 };
77
78 #[cfg(not(feature = "unstable_encodings"))]
79 let compressed_indices = {
80 let rle_indices_primitive = rle_array
81 .indices()
82 .clone()
83 .execute::<PrimitiveArray>(exec_ctx)?
84 .narrow(exec_ctx)?;
85 compressor.compress_child(
86 &rle_indices_primitive.into_array(),
87 &compress_ctx,
88 scheme.id(),
89 1,
90 exec_ctx,
91 )?
92 };
93
94 let rle_offsets_primitive = rle_array
95 .values_idx_offsets()
96 .clone()
97 .execute::<PrimitiveArray>(exec_ctx)?
98 .narrow(exec_ctx)?;
99 let compressed_offsets = compressor.compress_child(
100 &rle_offsets_primitive.into_array(),
101 &compress_ctx,
102 scheme.id(),
103 2,
104 exec_ctx,
105 )?;
106
107 unsafe {
109 Ok(RLE::new_unchecked(
110 compressed_values,
111 compressed_indices,
112 compressed_offsets,
113 rle_array.offset(),
114 rle_array.len(),
115 )
116 .into_array())
117 }
118}
119
120#[cfg(feature = "unstable_encodings")]
121pub(crate) fn try_compress_delta(
122 compressor: &CascadingCompressor,
123 child: &ArrayRef,
124 parent_ctx: &CompressorContext,
125 parent_id: SchemeId,
126 child_index: usize,
127 exec_ctx: &mut ExecutionCtx,
128) -> VortexResult<ArrayRef> {
129 let child_primitive = child.clone().execute::<PrimitiveArray>(exec_ctx)?;
130 let (bases, deltas) = vortex_fastlanes::delta_compress(&child_primitive, exec_ctx)?;
131
132 let compressed_bases = compressor.compress_child(
133 &bases.into_array(),
134 parent_ctx,
135 parent_id,
136 child_index,
137 exec_ctx,
138 )?;
139 let compressed_deltas = compressor.compress_child(
140 &deltas.into_array(),
141 parent_ctx,
142 parent_id,
143 child_index,
144 exec_ctx,
145 )?;
146
147 Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
148}
149
150impl Scheme for IntRLEScheme {
151 fn scheme_name(&self) -> &'static str {
152 "vortex.int.rle"
153 }
154
155 fn matches(&self, canonical: &Canonical) -> bool {
156 canonical.dtype().is_int()
157 }
158
159 fn num_children(&self) -> usize {
161 3
162 }
163
164 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
165 rle_descendant_exclusions()
166 }
167
168 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
169 rle_ancestor_exclusions()
170 }
171
172 fn expected_compression_ratio(
173 &self,
174 data: &ArrayAndStats,
175 compress_ctx: CompressorContext,
176 exec_ctx: &mut ExecutionCtx,
177 ) -> CompressionEstimate {
178 if compress_ctx.finished_cascading() {
180 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
181 }
182 if data.integer_stats(exec_ctx).average_run_length() < RUN_LENGTH_THRESHOLD {
183 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
184 }
185
186 CompressionEstimate::Deferred(DeferredEstimate::Sample)
187 }
188
189 fn compress(
190 &self,
191 compressor: &CascadingCompressor,
192 data: &ArrayAndStats,
193 compress_ctx: CompressorContext,
194 exec_ctx: &mut ExecutionCtx,
195 ) -> VortexResult<ArrayRef> {
196 rle_compress(self, compressor, data, compress_ctx, exec_ctx)
197 }
198}