Skip to main content

vortex_btrblocks/schemes/integer/
rle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Run-length integer encoding and shared RLE compression helpers.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::primitive::PrimitiveArrayExt;
12use vortex_compressor::estimate::CompressionEstimate;
13use vortex_compressor::estimate::DeferredEstimate;
14use vortex_compressor::estimate::EstimateVerdict;
15use vortex_compressor::scheme::AncestorExclusion;
16use vortex_compressor::scheme::DescendantExclusion;
17#[cfg(feature = "unstable_encodings")]
18use vortex_compressor::scheme::SchemeId;
19use vortex_error::VortexResult;
20#[cfg(feature = "unstable_encodings")]
21use vortex_fastlanes::Delta;
22use vortex_fastlanes::RLE;
23use vortex_fastlanes::RLEArrayExt;
24
25use super::RUN_LENGTH_THRESHOLD;
26use crate::ArrayAndStats;
27use crate::CascadingCompressor;
28use crate::CompressorContext;
29use crate::Scheme;
30use crate::SchemeExt;
31use crate::schemes::rle_ancestor_exclusions;
32use crate::schemes::rle_descendant_exclusions;
33
34/// RLE scheme for integer arrays.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct IntRLEScheme;
37
38/// Shared compression logic for RLE schemes.
39pub(crate) fn rle_compress(
40    scheme: &dyn Scheme,
41    compressor: &CascadingCompressor,
42    data: &ArrayAndStats,
43    compress_ctx: CompressorContext,
44    exec_ctx: &mut ExecutionCtx,
45) -> VortexResult<ArrayRef> {
46    let rle_array = RLE::encode(data.array_as_primitive(), exec_ctx)?;
47
48    let rle_values_primitive = rle_array
49        .values()
50        .clone()
51        .execute::<PrimitiveArray>(exec_ctx)?;
52    let compressed_values = compressor.compress_child(
53        &rle_values_primitive.into_array(),
54        &compress_ctx,
55        scheme.id(),
56        0,
57        exec_ctx,
58    )?;
59
60    // Delta is an unstable encoding, once we deem it stable we can switch over to this always.
61    #[cfg(feature = "unstable_encodings")]
62    let compressed_indices = {
63        let rle_indices_primitive = rle_array
64            .indices()
65            .clone()
66            .execute::<PrimitiveArray>(exec_ctx)?
67            .narrow(exec_ctx)?;
68        try_compress_delta(
69            compressor,
70            &rle_indices_primitive.into_array(),
71            &compress_ctx,
72            scheme.id(),
73            1,
74            exec_ctx,
75        )?
76    };
77
78    #[cfg(not(feature = "unstable_encodings"))]
79    let compressed_indices = {
80        let rle_indices_primitive = rle_array
81            .indices()
82            .clone()
83            .execute::<PrimitiveArray>(exec_ctx)?
84            .narrow(exec_ctx)?;
85        compressor.compress_child(
86            &rle_indices_primitive.into_array(),
87            &compress_ctx,
88            scheme.id(),
89            1,
90            exec_ctx,
91        )?
92    };
93
94    let rle_offsets_primitive = rle_array
95        .values_idx_offsets()
96        .clone()
97        .execute::<PrimitiveArray>(exec_ctx)?
98        .narrow(exec_ctx)?;
99    let compressed_offsets = compressor.compress_child(
100        &rle_offsets_primitive.into_array(),
101        &compress_ctx,
102        scheme.id(),
103        2,
104        exec_ctx,
105    )?;
106
107    // SAFETY: Recursive compression doesn't affect the invariants.
108    unsafe {
109        Ok(RLE::new_unchecked(
110            compressed_values,
111            compressed_indices,
112            compressed_offsets,
113            rle_array.offset(),
114            rle_array.len(),
115        )
116        .into_array())
117    }
118}
119
120#[cfg(feature = "unstable_encodings")]
121pub(crate) fn try_compress_delta(
122    compressor: &CascadingCompressor,
123    child: &ArrayRef,
124    parent_ctx: &CompressorContext,
125    parent_id: SchemeId,
126    child_index: usize,
127    exec_ctx: &mut ExecutionCtx,
128) -> VortexResult<ArrayRef> {
129    let child_primitive = child.clone().execute::<PrimitiveArray>(exec_ctx)?;
130    let (bases, deltas) = vortex_fastlanes::delta_compress(&child_primitive, exec_ctx)?;
131
132    let compressed_bases = compressor.compress_child(
133        &bases.into_array(),
134        parent_ctx,
135        parent_id,
136        child_index,
137        exec_ctx,
138    )?;
139    let compressed_deltas = compressor.compress_child(
140        &deltas.into_array(),
141        parent_ctx,
142        parent_id,
143        child_index,
144        exec_ctx,
145    )?;
146
147    Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
148}
149
150impl Scheme for IntRLEScheme {
151    fn scheme_name(&self) -> &'static str {
152        "vortex.int.rle"
153    }
154
155    fn matches(&self, canonical: &Canonical) -> bool {
156        canonical.dtype().is_int()
157    }
158
159    /// Children: values=0, indices=1, offsets=2.
160    fn num_children(&self) -> usize {
161        3
162    }
163
164    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
165        rle_descendant_exclusions()
166    }
167
168    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
169        rle_ancestor_exclusions()
170    }
171
172    fn expected_compression_ratio(
173        &self,
174        data: &ArrayAndStats,
175        compress_ctx: CompressorContext,
176        exec_ctx: &mut ExecutionCtx,
177    ) -> CompressionEstimate {
178        // RLE is only useful when we cascade it with another encoding.
179        if compress_ctx.finished_cascading() {
180            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
181        }
182        if data.integer_stats(exec_ctx).average_run_length() < RUN_LENGTH_THRESHOLD {
183            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
184        }
185
186        CompressionEstimate::Deferred(DeferredEstimate::Sample)
187    }
188
189    fn compress(
190        &self,
191        compressor: &CascadingCompressor,
192        data: &ArrayAndStats,
193        compress_ctx: CompressorContext,
194        exec_ctx: &mut ExecutionCtx,
195    ) -> VortexResult<ArrayRef> {
196        rle_compress(self, compressor, data, compress_ctx, exec_ctx)
197    }
198}