vortex_fastlanes/delta/vtable/
mod.rs1use std::hash::Hash;
5use std::hash::Hasher;
6
7use fastlanes::FastLanes;
8use prost::Message;
9use vortex_array::Array;
10use vortex_array::ArrayEq;
11use vortex_array::ArrayHash;
12use vortex_array::ArrayId;
13use vortex_array::ArrayParts;
14use vortex_array::ArrayRef;
15use vortex_array::ArrayView;
16use vortex_array::ExecutionCtx;
17use vortex_array::ExecutionResult;
18use vortex_array::IntoArray;
19use vortex_array::Precision;
20use vortex_array::arrays::PrimitiveArray;
21use vortex_array::buffer::BufferHandle;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::PType;
24use vortex_array::match_each_unsigned_integer_ptype;
25use vortex_array::serde::ArrayChildren;
26use vortex_array::vtable::VTable;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_ensure;
30use vortex_error::vortex_err;
31use vortex_error::vortex_panic;
32use vortex_session::VortexSession;
33use vortex_session::registry::CachedId;
34
35use crate::DeltaData;
36use crate::delta::array::BASES_SLOT;
37use crate::delta::array::DELTAS_SLOT;
38use crate::delta::array::DeltaArrayExt;
39use crate::delta::array::SLOT_NAMES;
40use crate::delta::array::delta_decompress::delta_decompress;
41use crate::delta::array::lane_count;
42
43mod operations;
44mod rules;
45mod slice;
46mod validity;
47
48pub type DeltaArray = Array<Delta>;
50
51#[derive(Clone, prost::Message)]
52#[repr(C)]
53pub struct DeltaMetadata {
54 #[prost(uint64, tag = "1")]
55 deltas_len: u64,
56 #[prost(uint32, tag = "2")]
57 offset: u32, }
59
60impl ArrayHash for DeltaData {
61 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
62 self.offset.hash(state);
63 }
64}
65
66impl ArrayEq for DeltaData {
67 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
68 self.offset == other.offset
69 }
70}
71
72impl VTable for Delta {
73 type ArrayData = DeltaData;
74
75 type OperationsVTable = Self;
76 type ValidityVTable = Self;
77
78 fn id(&self) -> ArrayId {
79 static ID: CachedId = CachedId::new("fastlanes.delta");
80 *ID
81 }
82
83 fn validate(
84 &self,
85 data: &Self::ArrayData,
86 dtype: &DType,
87 len: usize,
88 slots: &[Option<ArrayRef>],
89 ) -> VortexResult<()> {
90 let bases = slots[BASES_SLOT]
91 .as_ref()
92 .vortex_expect("DeltaArray bases slot");
93 let deltas = slots[DELTAS_SLOT]
94 .as_ref()
95 .vortex_expect("DeltaArray deltas slot");
96 validate_parts(bases, deltas, data.offset, dtype, len)
97 }
98
99 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
100 0
101 }
102
103 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
104 vortex_panic!("DeltaArray buffer index {idx} out of bounds")
105 }
106
107 fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option<String> {
108 None
109 }
110
111 fn reduce_parent(
112 array: ArrayView<'_, Self>,
113 parent: &ArrayRef,
114 child_idx: usize,
115 ) -> VortexResult<Option<ArrayRef>> {
116 rules::RULES.evaluate(array, parent, child_idx)
117 }
118
119 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
120 SLOT_NAMES[idx].to_string()
121 }
122
123 fn serialize(
124 array: ArrayView<'_, Self>,
125 _session: &VortexSession,
126 ) -> VortexResult<Option<Vec<u8>>> {
127 Ok(Some(
128 DeltaMetadata {
129 deltas_len: array.deltas().len() as u64,
130 offset: array.offset() as u32,
131 }
132 .encode_to_vec(),
133 ))
134 }
135
136 fn deserialize(
137 &self,
138 dtype: &DType,
139 len: usize,
140 metadata: &[u8],
141 buffers: &[BufferHandle],
142 children: &dyn ArrayChildren,
143 _session: &VortexSession,
144 ) -> VortexResult<ArrayParts<Self>> {
145 vortex_ensure!(
146 buffers.is_empty(),
147 "DeltaArray expects 0 buffers, got {}",
148 buffers.len()
149 );
150 vortex_ensure!(
151 children.len() == 2,
152 "DeltaArray expects 2 children, got {}",
153 children.len()
154 );
155 let metadata = DeltaMetadata::decode(metadata)?;
156 let ptype = PType::try_from(dtype)?;
157 let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { <T as FastLanes>::LANES });
158
159 let deltas_len = usize::try_from(metadata.deltas_len)
161 .map_err(|_| vortex_err!("deltas_len {} overflowed usize", metadata.deltas_len))?;
162 let num_chunks = deltas_len / 1024;
163 let remainder_base_size = if deltas_len % 1024 > 0 { 1 } else { 0 };
164 let bases_len = num_chunks * lanes + remainder_base_size;
165
166 let bases = children.get(0, dtype, bases_len)?;
167 let deltas = children.get(1, dtype, deltas_len)?;
168
169 let data = DeltaData::try_new(metadata.offset as usize)?;
170 let slots = vec![Some(bases), Some(deltas)];
171 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
172 }
173
174 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
175 Ok(ExecutionResult::done(
176 delta_decompress(&array, ctx)?.into_array(),
177 ))
178 }
179}
180
181#[derive(Clone, Debug)]
182pub struct Delta;
183
184impl Delta {
185 pub fn try_new(
186 bases: ArrayRef,
187 deltas: ArrayRef,
188 offset: usize,
189 len: usize,
190 ) -> VortexResult<DeltaArray> {
191 let dtype = bases.dtype().with_nullability(deltas.dtype().nullability());
192 let data = DeltaData::try_new(offset)?;
193 let slots = vec![Some(bases), Some(deltas)];
194 Array::try_from_parts(ArrayParts::new(Delta, dtype, len, data).with_slots(slots))
195 }
196
197 pub fn try_from_primitive_array(
199 array: &PrimitiveArray,
200 ctx: &mut ExecutionCtx,
201 ) -> VortexResult<DeltaArray> {
202 let logical_len = array.len();
203 let (bases, deltas) = crate::delta::array::delta_compress::delta_compress(array, ctx)?;
204 Self::try_new(bases.into_array(), deltas.into_array(), 0, logical_len)
205 }
206}
207
208fn validate_parts(
209 bases: &ArrayRef,
210 deltas: &ArrayRef,
211 offset: usize,
212 dtype: &DType,
213 len: usize,
214) -> VortexResult<()> {
215 vortex_ensure!(
216 offset + len <= deltas.len(),
217 "offset + len, {offset} + {len}, must be less than or equal to the size of deltas: {}",
218 deltas.len()
219 );
220 vortex_ensure!(
221 bases.dtype().eq_ignore_nullability(deltas.dtype()),
222 "DeltaArray: bases and deltas must have the same dtype, got {} and {}",
223 bases.dtype(),
224 deltas.dtype()
225 );
226
227 vortex_ensure!(
228 bases.dtype().is_unsigned_int(),
229 "DeltaArray: dtype must be an unsigned integer, got {}",
230 bases.dtype()
231 );
232
233 let expected_dtype = bases.dtype().with_nullability(deltas.dtype().nullability());
234 vortex_ensure!(
235 dtype == &expected_dtype,
236 "DeltaArray dtype mismatch: expected {expected_dtype}, got {dtype}"
237 );
238
239 let lanes = lane_count(bases.dtype().as_ptype());
240
241 vortex_ensure!(
242 deltas.len().is_multiple_of(1024),
243 "deltas length ({}) must be a multiple of 1024",
244 deltas.len(),
245 );
246 vortex_ensure!(
247 bases.len().is_multiple_of(lanes),
248 "bases length ({}) must be a multiple of LANES ({lanes})",
249 bases.len(),
250 );
251 Ok(())
252}
253
254#[cfg(test)]
255mod tests {
256 use prost::Message;
257 use vortex_array::test_harness::check_metadata;
258
259 use super::DeltaMetadata;
260
261 #[cfg_attr(miri, ignore)]
262 #[test]
263 fn test_delta_metadata() {
264 check_metadata(
265 "delta.metadata",
266 &DeltaMetadata {
267 offset: u32::MAX,
268 deltas_len: u64::MAX,
269 }
270 .encode_to_vec(),
271 );
272 }
273}