Skip to main content

vortex_fastlanes/delta/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5use std::hash::Hasher;
6
7use fastlanes::FastLanes;
8use prost::Message;
9use vortex_array::Array;
10use vortex_array::ArrayEq;
11use vortex_array::ArrayHash;
12use vortex_array::ArrayId;
13use vortex_array::ArrayParts;
14use vortex_array::ArrayRef;
15use vortex_array::ArrayView;
16use vortex_array::ExecutionCtx;
17use vortex_array::ExecutionResult;
18use vortex_array::IntoArray;
19use vortex_array::Precision;
20use vortex_array::arrays::PrimitiveArray;
21use vortex_array::buffer::BufferHandle;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::PType;
24use vortex_array::match_each_unsigned_integer_ptype;
25use vortex_array::serde::ArrayChildren;
26use vortex_array::vtable::VTable;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_ensure;
30use vortex_error::vortex_err;
31use vortex_error::vortex_panic;
32use vortex_session::VortexSession;
33use vortex_session::registry::CachedId;
34
35use crate::DeltaData;
36use crate::delta::array::BASES_SLOT;
37use crate::delta::array::DELTAS_SLOT;
38use crate::delta::array::DeltaArrayExt;
39use crate::delta::array::SLOT_NAMES;
40use crate::delta::array::delta_decompress::delta_decompress;
41use crate::delta::array::lane_count;
42
43mod operations;
44mod rules;
45mod slice;
46mod validity;
47
48/// A [`Delta`]-encoded Vortex array.
49pub type DeltaArray = Array<Delta>;
50
51#[derive(Clone, prost::Message)]
52#[repr(C)]
53pub struct DeltaMetadata {
54    #[prost(uint64, tag = "1")]
55    deltas_len: u64,
56    #[prost(uint32, tag = "2")]
57    offset: u32, // must be <1024
58}
59
60impl ArrayHash for DeltaData {
61    fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
62        self.offset.hash(state);
63    }
64}
65
66impl ArrayEq for DeltaData {
67    fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
68        self.offset == other.offset
69    }
70}
71
72impl VTable for Delta {
73    type ArrayData = DeltaData;
74
75    type OperationsVTable = Self;
76    type ValidityVTable = Self;
77
78    fn id(&self) -> ArrayId {
79        static ID: CachedId = CachedId::new("fastlanes.delta");
80        *ID
81    }
82
83    fn validate(
84        &self,
85        data: &Self::ArrayData,
86        dtype: &DType,
87        len: usize,
88        slots: &[Option<ArrayRef>],
89    ) -> VortexResult<()> {
90        let bases = slots[BASES_SLOT]
91            .as_ref()
92            .vortex_expect("DeltaArray bases slot");
93        let deltas = slots[DELTAS_SLOT]
94            .as_ref()
95            .vortex_expect("DeltaArray deltas slot");
96        validate_parts(bases, deltas, data.offset, dtype, len)
97    }
98
99    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
100        0
101    }
102
103    fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
104        vortex_panic!("DeltaArray buffer index {idx} out of bounds")
105    }
106
107    fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option<String> {
108        None
109    }
110
111    fn reduce_parent(
112        array: ArrayView<'_, Self>,
113        parent: &ArrayRef,
114        child_idx: usize,
115    ) -> VortexResult<Option<ArrayRef>> {
116        rules::RULES.evaluate(array, parent, child_idx)
117    }
118
119    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
120        SLOT_NAMES[idx].to_string()
121    }
122
123    fn serialize(
124        array: ArrayView<'_, Self>,
125        _session: &VortexSession,
126    ) -> VortexResult<Option<Vec<u8>>> {
127        Ok(Some(
128            DeltaMetadata {
129                deltas_len: array.deltas().len() as u64,
130                offset: array.offset() as u32,
131            }
132            .encode_to_vec(),
133        ))
134    }
135
136    fn deserialize(
137        &self,
138        dtype: &DType,
139        len: usize,
140        metadata: &[u8],
141        buffers: &[BufferHandle],
142        children: &dyn ArrayChildren,
143        _session: &VortexSession,
144    ) -> VortexResult<ArrayParts<Self>> {
145        vortex_ensure!(
146            buffers.is_empty(),
147            "DeltaArray expects 0 buffers, got {}",
148            buffers.len()
149        );
150        vortex_ensure!(
151            children.len() == 2,
152            "DeltaArray expects 2 children, got {}",
153            children.len()
154        );
155        let metadata = DeltaMetadata::decode(metadata)?;
156        let ptype = PType::try_from(dtype)?;
157        let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { <T as FastLanes>::LANES });
158
159        // Compute the length of the bases array
160        let deltas_len = usize::try_from(metadata.deltas_len)
161            .map_err(|_| vortex_err!("deltas_len {} overflowed usize", metadata.deltas_len))?;
162        let num_chunks = deltas_len / 1024;
163        let remainder_base_size = if deltas_len % 1024 > 0 { 1 } else { 0 };
164        let bases_len = num_chunks * lanes + remainder_base_size;
165
166        let bases = children.get(0, dtype, bases_len)?;
167        let deltas = children.get(1, dtype, deltas_len)?;
168
169        let data = DeltaData::try_new(metadata.offset as usize)?;
170        let slots = vec![Some(bases), Some(deltas)];
171        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
172    }
173
174    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
175        Ok(ExecutionResult::done(
176            delta_decompress(&array, ctx)?.into_array(),
177        ))
178    }
179}
180
181#[derive(Clone, Debug)]
182pub struct Delta;
183
184impl Delta {
185    pub fn try_new(
186        bases: ArrayRef,
187        deltas: ArrayRef,
188        offset: usize,
189        len: usize,
190    ) -> VortexResult<DeltaArray> {
191        let dtype = bases.dtype().with_nullability(deltas.dtype().nullability());
192        let data = DeltaData::try_new(offset)?;
193        let slots = vec![Some(bases), Some(deltas)];
194        Array::try_from_parts(ArrayParts::new(Delta, dtype, len, data).with_slots(slots))
195    }
196
197    /// Compress a primitive array using Delta encoding.
198    pub fn try_from_primitive_array(
199        array: &PrimitiveArray,
200        ctx: &mut ExecutionCtx,
201    ) -> VortexResult<DeltaArray> {
202        let logical_len = array.len();
203        let (bases, deltas) = crate::delta::array::delta_compress::delta_compress(array, ctx)?;
204        Self::try_new(bases.into_array(), deltas.into_array(), 0, logical_len)
205    }
206}
207
208fn validate_parts(
209    bases: &ArrayRef,
210    deltas: &ArrayRef,
211    offset: usize,
212    dtype: &DType,
213    len: usize,
214) -> VortexResult<()> {
215    vortex_ensure!(
216        offset + len <= deltas.len(),
217        "offset + len, {offset} + {len}, must be less than or equal to the size of deltas: {}",
218        deltas.len()
219    );
220    vortex_ensure!(
221        bases.dtype().eq_ignore_nullability(deltas.dtype()),
222        "DeltaArray: bases and deltas must have the same dtype, got {} and {}",
223        bases.dtype(),
224        deltas.dtype()
225    );
226
227    vortex_ensure!(
228        bases.dtype().is_unsigned_int(),
229        "DeltaArray: dtype must be an unsigned integer, got {}",
230        bases.dtype()
231    );
232
233    let expected_dtype = bases.dtype().with_nullability(deltas.dtype().nullability());
234    vortex_ensure!(
235        dtype == &expected_dtype,
236        "DeltaArray dtype mismatch: expected {expected_dtype}, got {dtype}"
237    );
238
239    let lanes = lane_count(bases.dtype().as_ptype());
240
241    vortex_ensure!(
242        deltas.len().is_multiple_of(1024),
243        "deltas length ({}) must be a multiple of 1024",
244        deltas.len(),
245    );
246    vortex_ensure!(
247        bases.len().is_multiple_of(lanes),
248        "bases length ({}) must be a multiple of LANES ({lanes})",
249        bases.len(),
250    );
251    Ok(())
252}
253
254#[cfg(test)]
255mod tests {
256    use prost::Message;
257    use vortex_array::test_harness::check_metadata;
258
259    use super::DeltaMetadata;
260
261    #[cfg_attr(miri, ignore)]
262    #[test]
263    fn test_delta_metadata() {
264        check_metadata(
265            "delta.metadata",
266            &DeltaMetadata {
267                offset: u32::MAX,
268                deltas_len: u64::MAX,
269            }
270            .encode_to_vec(),
271        );
272    }
273}