vortex_fastlanes/delta/vtable/
mod.rs1use fastlanes::Delta;
5use fastlanes::FastLanes;
6use fastlanes::Transpose;
7use num_traits::WrappingAdd;
8use prost::Message;
9use vortex_array::ArrayRef;
10use vortex_array::ExecutionCtx;
11use vortex_array::ProstMetadata;
12use vortex_array::VectorExecutor;
13use vortex_array::buffer::BufferHandle;
14use vortex_array::serde::ArrayChildren;
15use vortex_array::vtable;
16use vortex_array::vtable::ArrayId;
17use vortex_array::vtable::ArrayVTable;
18use vortex_array::vtable::ArrayVTableExt;
19use vortex_array::vtable::NotSupported;
20use vortex_array::vtable::VTable;
21use vortex_array::vtable::ValidityVTableFromChildSliceHelper;
22use vortex_dtype::DType;
23use vortex_dtype::NativePType;
24use vortex_dtype::PType;
25use vortex_dtype::PTypeDowncastExt;
26use vortex_dtype::match_each_unsigned_integer_ptype;
27use vortex_error::VortexResult;
28use vortex_error::vortex_ensure;
29use vortex_error::vortex_err;
30use vortex_error::vortex_panic;
31use vortex_mask::Mask;
32use vortex_vector::Vector;
33use vortex_vector::primitive::PVector;
34use vortex_vector::primitive::PrimitiveVector;
35
36use crate::DeltaArray;
37use crate::delta::array::delta_decompress::decompress_primitive;
38
39mod array;
40mod canonical;
41mod operations;
42mod validity;
43mod visitor;
44
45vtable!(Delta);
46
47#[derive(Clone, prost::Message)]
48#[repr(C)]
49pub struct DeltaMetadata {
50 #[prost(uint64, tag = "1")]
51 deltas_len: u64,
52 #[prost(uint32, tag = "2")]
53 offset: u32, }
55
56impl VTable for DeltaVTable {
57 type Array = DeltaArray;
58
59 type Metadata = ProstMetadata<DeltaMetadata>;
60
61 type ArrayVTable = Self;
62 type CanonicalVTable = Self;
63 type OperationsVTable = Self;
64 type ValidityVTable = ValidityVTableFromChildSliceHelper;
65 type VisitorVTable = Self;
66 type ComputeVTable = NotSupported;
67 type EncodeVTable = NotSupported;
68
69 fn id(&self) -> ArrayId {
70 ArrayId::new_ref("fastlanes.delta")
71 }
72
73 fn encoding(_array: &Self::Array) -> ArrayVTable {
74 DeltaVTable.as_vtable()
75 }
76
77 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
78 vortex_ensure!(
83 children.len() == 2,
84 "Expected 2 children for Delta encoding, got {}",
85 children.len()
86 );
87
88 array.bases = children[0].clone();
89 array.deltas = children[1].clone();
90
91 Ok(())
92 }
93
94 fn metadata(array: &DeltaArray) -> VortexResult<Self::Metadata> {
95 Ok(ProstMetadata(DeltaMetadata {
96 deltas_len: array.deltas().len() as u64,
97 offset: array.offset() as u32,
98 }))
99 }
100
101 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
102 Ok(Some(metadata.0.encode_to_vec()))
103 }
104
105 fn deserialize(buffer: &[u8]) -> VortexResult<Self::Metadata> {
106 Ok(ProstMetadata(DeltaMetadata::decode(buffer)?))
107 }
108
109 fn build(
110 &self,
111 dtype: &DType,
112 len: usize,
113 metadata: &Self::Metadata,
114 _buffers: &[BufferHandle],
115 children: &dyn ArrayChildren,
116 ) -> VortexResult<DeltaArray> {
117 assert_eq!(children.len(), 2);
118 let ptype = PType::try_from(dtype)?;
119 let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { <T as FastLanes>::LANES });
120
121 let deltas_len = usize::try_from(metadata.0.deltas_len)
123 .map_err(|_| vortex_err!("deltas_len {} overflowed usize", metadata.0.deltas_len))?;
124 let num_chunks = deltas_len / 1024;
125 let remainder_base_size = if deltas_len % 1024 > 0 { 1 } else { 0 };
126 let bases_len = num_chunks * lanes + remainder_base_size;
127
128 let bases = children.get(0, dtype, bases_len)?;
129 let deltas = children.get(1, dtype, deltas_len)?;
130
131 DeltaArray::try_new(bases, deltas, metadata.0.offset as usize, len)
132 }
133
134 fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
135 let bases = array.bases().execute(ctx)?.into_primitive();
136 let deltas = array.deltas().execute(ctx)?.into_primitive();
137
138 let start = array.offset();
139 let end = start + array.len();
140 let validity = array.deltas().validity_mask().slice(start..end);
141
142 Ok(match bases {
143 PrimitiveVector::U8(pv) => {
144 decompress::<u8, { u8::LANES }>(&pv, &deltas, start, end, validity)
145 }
146 PrimitiveVector::U16(pv) => {
147 decompress::<u16, { u16::LANES }>(&pv, &deltas, start, end, validity)
148 }
149 PrimitiveVector::U32(pv) => {
150 decompress::<u32, { u32::LANES }>(&pv, &deltas, start, end, validity)
151 }
152 PrimitiveVector::U64(pv) => {
153 decompress::<u64, { u64::LANES }>(&pv, &deltas, start, end, validity)
154 }
155 PrimitiveVector::I8(_)
156 | PrimitiveVector::I16(_)
157 | PrimitiveVector::I32(_)
158 | PrimitiveVector::I64(_)
159 | PrimitiveVector::F16(_)
160 | PrimitiveVector::F32(_)
161 | PrimitiveVector::F64(_) => {
162 vortex_panic!("Tried to match a non-unsigned vector in an unsigned match statement")
163 }
164 })
165 }
166}
167
168fn decompress<T, const LANES: usize>(
170 bases: &PVector<T>,
171 deltas: &PrimitiveVector,
172 start: usize,
173 end: usize,
174 validity: Mask,
175) -> Vector
176where
177 T: NativePType + Delta + Transpose + WrappingAdd,
178{
179 let buffer = decompress_primitive::<T, LANES>(bases.as_ref(), deltas.downcast::<T>().as_ref());
180 let buffer = buffer.slice(start..end);
181
182 unsafe { PVector::<T>::new_unchecked(buffer, validity) }.into()
184}
185
186#[derive(Debug)]
187pub struct DeltaVTable;
188
189#[cfg(test)]
190mod tests {
191 use vortex_array::test_harness::check_metadata;
192
193 use super::DeltaMetadata;
194 use super::ProstMetadata;
195
196 #[cfg_attr(miri, ignore)]
197 #[test]
198 fn test_delta_metadata() {
199 check_metadata(
200 "delta.metadata",
201 ProstMetadata(DeltaMetadata {
202 offset: u32::MAX,
203 deltas_len: u64::MAX,
204 }),
205 );
206 }
207}