vortex_fastlanes/delta/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use fastlanes::Delta;
5use fastlanes::FastLanes;
6use fastlanes::Transpose;
7use num_traits::WrappingAdd;
8use prost::Message;
9use vortex_array::ArrayRef;
10use vortex_array::ExecutionCtx;
11use vortex_array::ProstMetadata;
12use vortex_array::VectorExecutor;
13use vortex_array::buffer::BufferHandle;
14use vortex_array::serde::ArrayChildren;
15use vortex_array::vtable;
16use vortex_array::vtable::ArrayId;
17use vortex_array::vtable::ArrayVTable;
18use vortex_array::vtable::ArrayVTableExt;
19use vortex_array::vtable::NotSupported;
20use vortex_array::vtable::VTable;
21use vortex_array::vtable::ValidityVTableFromChildSliceHelper;
22use vortex_dtype::DType;
23use vortex_dtype::NativePType;
24use vortex_dtype::PType;
25use vortex_dtype::PTypeDowncastExt;
26use vortex_dtype::match_each_unsigned_integer_ptype;
27use vortex_error::VortexResult;
28use vortex_error::vortex_ensure;
29use vortex_error::vortex_err;
30use vortex_error::vortex_panic;
31use vortex_mask::Mask;
32use vortex_vector::Vector;
33use vortex_vector::primitive::PVector;
34use vortex_vector::primitive::PrimitiveVector;
35
36use crate::DeltaArray;
37use crate::delta::array::delta_decompress::decompress_primitive;
38
39mod array;
40mod canonical;
41mod operations;
42mod validity;
43mod visitor;
44
45vtable!(Delta);
46
47#[derive(Clone, prost::Message)]
48#[repr(C)]
49pub struct DeltaMetadata {
50    #[prost(uint64, tag = "1")]
51    deltas_len: u64,
52    #[prost(uint32, tag = "2")]
53    offset: u32, // must be <1024
54}
55
56impl VTable for DeltaVTable {
57    type Array = DeltaArray;
58
59    type Metadata = ProstMetadata<DeltaMetadata>;
60
61    type ArrayVTable = Self;
62    type CanonicalVTable = Self;
63    type OperationsVTable = Self;
64    type ValidityVTable = ValidityVTableFromChildSliceHelper;
65    type VisitorVTable = Self;
66    type ComputeVTable = NotSupported;
67    type EncodeVTable = NotSupported;
68
69    fn id(&self) -> ArrayId {
70        ArrayId::new_ref("fastlanes.delta")
71    }
72
73    fn encoding(_array: &Self::Array) -> ArrayVTable {
74        DeltaVTable.as_vtable()
75    }
76
77    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
78        // DeltaArray children order (from visit_children):
79        // 1. bases
80        // 2. deltas
81
82        vortex_ensure!(
83            children.len() == 2,
84            "Expected 2 children for Delta encoding, got {}",
85            children.len()
86        );
87
88        array.bases = children[0].clone();
89        array.deltas = children[1].clone();
90
91        Ok(())
92    }
93
94    fn metadata(array: &DeltaArray) -> VortexResult<Self::Metadata> {
95        Ok(ProstMetadata(DeltaMetadata {
96            deltas_len: array.deltas().len() as u64,
97            offset: array.offset() as u32,
98        }))
99    }
100
101    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
102        Ok(Some(metadata.0.encode_to_vec()))
103    }
104
105    fn deserialize(buffer: &[u8]) -> VortexResult<Self::Metadata> {
106        Ok(ProstMetadata(DeltaMetadata::decode(buffer)?))
107    }
108
109    fn build(
110        &self,
111        dtype: &DType,
112        len: usize,
113        metadata: &Self::Metadata,
114        _buffers: &[BufferHandle],
115        children: &dyn ArrayChildren,
116    ) -> VortexResult<DeltaArray> {
117        assert_eq!(children.len(), 2);
118        let ptype = PType::try_from(dtype)?;
119        let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { <T as FastLanes>::LANES });
120
121        // Compute the length of the bases array
122        let deltas_len = usize::try_from(metadata.0.deltas_len)
123            .map_err(|_| vortex_err!("deltas_len {} overflowed usize", metadata.0.deltas_len))?;
124        let num_chunks = deltas_len / 1024;
125        let remainder_base_size = if deltas_len % 1024 > 0 { 1 } else { 0 };
126        let bases_len = num_chunks * lanes + remainder_base_size;
127
128        let bases = children.get(0, dtype, bases_len)?;
129        let deltas = children.get(1, dtype, deltas_len)?;
130
131        DeltaArray::try_new(bases, deltas, metadata.0.offset as usize, len)
132    }
133
134    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
135        let bases = array.bases().execute(ctx)?.into_primitive();
136        let deltas = array.deltas().execute(ctx)?.into_primitive();
137
138        let start = array.offset();
139        let end = start + array.len();
140        let validity = array.deltas().validity_mask().slice(start..end);
141
142        Ok(match bases {
143            PrimitiveVector::U8(pv) => {
144                decompress::<u8, { u8::LANES }>(&pv, &deltas, start, end, validity)
145            }
146            PrimitiveVector::U16(pv) => {
147                decompress::<u16, { u16::LANES }>(&pv, &deltas, start, end, validity)
148            }
149            PrimitiveVector::U32(pv) => {
150                decompress::<u32, { u32::LANES }>(&pv, &deltas, start, end, validity)
151            }
152            PrimitiveVector::U64(pv) => {
153                decompress::<u64, { u64::LANES }>(&pv, &deltas, start, end, validity)
154            }
155            PrimitiveVector::I8(_)
156            | PrimitiveVector::I16(_)
157            | PrimitiveVector::I32(_)
158            | PrimitiveVector::I64(_)
159            | PrimitiveVector::F16(_)
160            | PrimitiveVector::F32(_)
161            | PrimitiveVector::F64(_) => {
162                vortex_panic!("Tried to match a non-unsigned vector in an unsigned match statement")
163            }
164        })
165    }
166}
167
168/// Decompresses delta-encoded data for a specific primitive type.
169fn decompress<T, const LANES: usize>(
170    bases: &PVector<T>,
171    deltas: &PrimitiveVector,
172    start: usize,
173    end: usize,
174    validity: Mask,
175) -> Vector
176where
177    T: NativePType + Delta + Transpose + WrappingAdd,
178{
179    let buffer = decompress_primitive::<T, LANES>(bases.as_ref(), deltas.downcast::<T>().as_ref());
180    let buffer = buffer.slice(start..end);
181
182    // SAFETY: We slice the buffer and the validity by the same range.
183    unsafe { PVector::<T>::new_unchecked(buffer, validity) }.into()
184}
185
186#[derive(Debug)]
187pub struct DeltaVTable;
188
189#[cfg(test)]
190mod tests {
191    use vortex_array::test_harness::check_metadata;
192
193    use super::DeltaMetadata;
194    use super::ProstMetadata;
195
196    #[cfg_attr(miri, ignore)]
197    #[test]
198    fn test_delta_metadata() {
199        check_metadata(
200            "delta.metadata",
201            ProstMetadata(DeltaMetadata {
202                offset: u32::MAX,
203                deltas_len: u64::MAX,
204            }),
205        );
206    }
207}