Skip to main content

vortex_btrblocks/schemes/integer/
for_.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Frame of Reference integer encoding.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_compressor::builtins::BinaryDictScheme;
12use vortex_compressor::builtins::FloatDictScheme;
13use vortex_compressor::builtins::IntDictScheme;
14use vortex_compressor::builtins::StringDictScheme;
15use vortex_compressor::estimate::CompressionEstimate;
16use vortex_compressor::estimate::EstimateVerdict;
17use vortex_compressor::scheme::AncestorExclusion;
18use vortex_compressor::scheme::ChildSelection;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_fastlanes::FoR;
22use vortex_fastlanes::FoRArrayExt;
23
24use super::BitPackingScheme;
25use crate::ArrayAndStats;
26use crate::CascadingCompressor;
27use crate::CompressorContext;
28use crate::Scheme;
29use crate::SchemeExt;
30
31/// Frame of Reference encoding.
32#[derive(Debug, Copy, Clone, PartialEq, Eq)]
33pub struct FoRScheme;
34
35impl Scheme for FoRScheme {
36    fn scheme_name(&self) -> &'static str {
37        "vortex.int.for"
38    }
39
40    fn matches(&self, canonical: &Canonical) -> bool {
41        canonical.dtype().is_int()
42    }
43
44    /// Dict codes always start at 0, so FoR (which subtracts the min) is a no-op.
45    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
46        vec![
47            AncestorExclusion {
48                ancestor: IntDictScheme.id(),
49                children: ChildSelection::One(1),
50            },
51            AncestorExclusion {
52                ancestor: FloatDictScheme.id(),
53                children: ChildSelection::One(1),
54            },
55            AncestorExclusion {
56                ancestor: StringDictScheme.id(),
57                children: ChildSelection::One(1),
58            },
59            AncestorExclusion {
60                ancestor: BinaryDictScheme.id(),
61                children: ChildSelection::One(1),
62            },
63        ]
64    }
65
66    fn expected_compression_ratio(
67        &self,
68        data: &ArrayAndStats,
69        compress_ctx: CompressorContext,
70        exec_ctx: &mut ExecutionCtx,
71    ) -> CompressionEstimate {
72        // FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is
73        // the same size.
74        if compress_ctx.finished_cascading() {
75            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
76        }
77        let stats = data.integer_stats(exec_ctx);
78
79        // Only apply when the min is not already zero.
80        if stats.erased().min_is_zero() {
81            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
82        }
83
84        // Difference between max and min.
85        let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
86            Some(l) => l + 1,
87            // If max-min == 0, the we should be compressing this as a constant array.
88            None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
89        };
90
91        // If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width
92        // compared to BitPacking, don't use FoR since it has a small amount of overhead (storing
93        // the reference) for effectively no benefits.
94        if let Some(max_log) = stats
95            .erased()
96            .max_ilog2()
97            // Only skip FoR when min >= 0, otherwise BitPacking can't be applied without ZigZag.
98            .filter(|_| !stats.erased().min_is_negative())
99        {
100            let bitpack_bitwidth = max_log + 1;
101            if for_bitwidth >= bitpack_bitwidth {
102                return CompressionEstimate::Verdict(EstimateVerdict::Skip);
103            }
104        }
105
106        let full_width: u32 = data
107            .array_as_primitive()
108            .ptype()
109            .bit_width()
110            .try_into()
111            .vortex_expect("bit width must fit in u32");
112
113        CompressionEstimate::Verdict(EstimateVerdict::Ratio(
114            full_width as f64 / for_bitwidth as f64,
115        ))
116    }
117
118    fn compress(
119        &self,
120        compressor: &CascadingCompressor,
121        data: &ArrayAndStats,
122        compress_ctx: CompressorContext,
123        exec_ctx: &mut ExecutionCtx,
124    ) -> VortexResult<ArrayRef> {
125        let primitive = data.array().clone().execute::<PrimitiveArray>(exec_ctx)?;
126        let for_array = FoR::encode(primitive)?;
127        let biased = for_array
128            .encoded()
129            .clone()
130            .execute::<PrimitiveArray>(exec_ctx)?;
131
132        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
133        // of bitpacking.
134        // NOTE: we could delegate in the future if we had another downstream codec that performs
135        //  as well.
136        let leaf_ctx = compress_ctx.clone().as_leaf();
137        let biased_data =
138            ArrayAndStats::new(biased.into_array(), compress_ctx.merged_stats_options());
139        let compressed = BitPackingScheme.compress(compressor, &biased_data, leaf_ctx, exec_ctx)?;
140
141        // TODO(connor): This should really be `new_unchecked`.
142        let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
143        for_compressed
144            .as_ref()
145            .statistics()
146            .inherit_from(for_array.as_ref().statistics());
147
148        Ok(for_compressed.into_array())
149    }
150}