vortex_array/arrays/varbinview/vtable/
mod.rs1use std::hash::Hasher;
5use std::mem::size_of;
6use std::sync::Arc;
7
8use kernel::PARENT_KERNELS;
9use vortex_buffer::Buffer;
10use vortex_error::VortexResult;
11use vortex_error::vortex_bail;
12use vortex_error::vortex_ensure;
13use vortex_error::vortex_err;
14use vortex_error::vortex_panic;
15use vortex_session::VortexSession;
16use vortex_session::registry::CachedId;
17
18use crate::ArrayRef;
19use crate::ExecutionCtx;
20use crate::ExecutionResult;
21use crate::Precision;
22use crate::array::Array;
23use crate::array::ArrayId;
24use crate::array::ArrayView;
25use crate::array::VTable;
26use crate::arrays::varbinview::BinaryView;
27use crate::arrays::varbinview::VarBinViewData;
28use crate::arrays::varbinview::array::NUM_SLOTS;
29use crate::arrays::varbinview::array::SLOT_NAMES;
30use crate::arrays::varbinview::compute::rules::PARENT_RULES;
31use crate::buffer::BufferHandle;
32use crate::dtype::DType;
33use crate::hash::ArrayEq;
34use crate::hash::ArrayHash;
35use crate::serde::ArrayChildren;
36use crate::validity::Validity;
37mod kernel;
38mod operations;
39mod validity;
40pub type VarBinViewArray = Array<VarBinView>;
42
43#[derive(Clone, Debug)]
44pub struct VarBinView;
45
46impl ArrayHash for VarBinViewData {
47 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
48 for buffer in self.buffers.iter() {
49 buffer.array_hash(state, precision);
50 }
51 self.views.array_hash(state, precision);
52 }
53}
54
55impl ArrayEq for VarBinViewData {
56 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
57 self.buffers.len() == other.buffers.len()
58 && self
59 .buffers
60 .iter()
61 .zip(other.buffers.iter())
62 .all(|(a, b)| a.array_eq(b, precision))
63 && self.views.array_eq(&other.views, precision)
64 }
65}
66
67impl VTable for VarBinView {
68 type ArrayData = VarBinViewData;
69
70 type OperationsVTable = Self;
71 type ValidityVTable = Self;
72
73 fn id(&self) -> ArrayId {
74 static ID: CachedId = CachedId::new("vortex.varbinview");
75 *ID
76 }
77
78 fn nbuffers(array: ArrayView<'_, Self>) -> usize {
79 array.data_buffers().len() + 1
80 }
81
82 fn validate(
83 &self,
84 data: &VarBinViewData,
85 dtype: &DType,
86 len: usize,
87 slots: &[Option<ArrayRef>],
88 ) -> VortexResult<()> {
89 vortex_ensure!(
90 slots.len() == NUM_SLOTS,
91 "VarBinViewArray expected {NUM_SLOTS} slots, found {}",
92 slots.len()
93 );
94 vortex_ensure!(
95 data.len() == len,
96 "VarBinViewArray length {} does not match outer length {}",
97 data.len(),
98 len
99 );
100 vortex_ensure!(
101 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
102 "VarBinViewArray dtype must be binary or utf8, got {dtype}"
103 );
104 Ok(())
105 }
106
107 fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
108 let ndata = array.data_buffers().len();
109 if idx < ndata {
110 array.data_buffers()[idx].clone()
111 } else if idx == ndata {
112 array.views_handle().clone()
113 } else {
114 vortex_panic!("VarBinViewArray buffer index {idx} out of bounds")
115 }
116 }
117
118 fn buffer_name(array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
119 let ndata = array.data_buffers().len();
120 if idx < ndata {
121 Some(format!("buffer_{idx}"))
122 } else if idx == ndata {
123 Some("views".to_string())
124 } else {
125 vortex_panic!("VarBinViewArray buffer_name index {idx} out of bounds")
126 }
127 }
128
129 fn serialize(
130 _array: ArrayView<'_, Self>,
131 _session: &VortexSession,
132 ) -> VortexResult<Option<Vec<u8>>> {
133 Ok(Some(vec![]))
134 }
135
136 fn deserialize(
137 &self,
138 dtype: &DType,
139 len: usize,
140 metadata: &[u8],
141
142 buffers: &[BufferHandle],
143 children: &dyn ArrayChildren,
144 _session: &VortexSession,
145 ) -> VortexResult<crate::array::ArrayParts<Self>> {
146 if !metadata.is_empty() {
147 vortex_bail!(
148 "VarBinViewArray expects empty metadata, got {} bytes",
149 metadata.len()
150 );
151 }
152 let Some((views_handle, data_handles)) = buffers.split_last() else {
153 vortex_bail!("Expected at least 1 buffer, got 0");
154 };
155
156 let validity = if children.is_empty() {
157 Validity::from(dtype.nullability())
158 } else if children.len() == 1 {
159 let validity = children.get(0, &Validity::DTYPE, len)?;
160 Validity::Array(validity)
161 } else {
162 vortex_bail!("Expected 0 or 1 children, got {}", children.len());
163 };
164
165 let views_nbytes = views_handle.len();
166 let expected_views_nbytes = len
167 .checked_mul(size_of::<BinaryView>())
168 .ok_or_else(|| vortex_err!("views byte length overflow for len={len}"))?;
169 if views_nbytes != expected_views_nbytes {
170 vortex_bail!(
171 "Expected views buffer length {} bytes, got {} bytes",
172 expected_views_nbytes,
173 views_nbytes
174 );
175 }
176
177 if buffers.iter().any(|b| b.is_on_device()) {
179 let data = VarBinViewData::try_new_handle(
180 views_handle.clone(),
181 Arc::from(data_handles.to_vec()),
182 dtype.clone(),
183 validity.clone(),
184 )?;
185 let slots = VarBinViewData::make_slots(&validity, len);
186 return Ok(
187 crate::array::ArrayParts::new(self.clone(), dtype.clone(), len, data)
188 .with_slots(slots),
189 );
190 }
191
192 let data_buffers = data_handles
193 .iter()
194 .map(|b| b.as_host().clone())
195 .collect::<Vec<_>>();
196 let views = Buffer::<BinaryView>::from_byte_buffer(views_handle.clone().as_host().clone());
197
198 let data = VarBinViewData::try_new(
199 views,
200 Arc::from(data_buffers),
201 dtype.clone(),
202 validity.clone(),
203 )?;
204 let slots = VarBinViewData::make_slots(&validity, len);
205 Ok(crate::array::ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
206 }
207
208 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
209 SLOT_NAMES[idx].to_string()
210 }
211
212 fn reduce_parent(
213 array: ArrayView<'_, Self>,
214 parent: &ArrayRef,
215 child_idx: usize,
216 ) -> VortexResult<Option<ArrayRef>> {
217 PARENT_RULES.evaluate(array, parent, child_idx)
218 }
219
220 fn execute_parent(
221 array: ArrayView<'_, Self>,
222 parent: &ArrayRef,
223 child_idx: usize,
224 ctx: &mut ExecutionCtx,
225 ) -> VortexResult<Option<ArrayRef>> {
226 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
227 }
228
229 fn execute(array: Array<Self>, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
230 Ok(ExecutionResult::done(array))
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use vortex_buffer::ByteBufferMut;
237 use vortex_session::registry::ReadContext;
238
239 use super::*;
240 use crate::ArrayContext;
241 use crate::IntoArray;
242 use crate::LEGACY_SESSION;
243 use crate::assert_arrays_eq;
244 use crate::serde::SerializeOptions;
245 use crate::serde::SerializedArray;
246
247 #[test]
248 fn test_nullable_varbinview_serde_roundtrip() {
249 let array = VarBinViewArray::from_iter_nullable_str([
250 Some("hello"),
251 None,
252 Some("world"),
253 None,
254 Some("a moderately long string for testing"),
255 ]);
256 let dtype = array.dtype().clone();
257 let len = array.len();
258
259 let ctx = ArrayContext::empty();
260 let serialized = array
261 .clone()
262 .into_array()
263 .serialize(&ctx, &LEGACY_SESSION, &SerializeOptions::default())
264 .unwrap();
265
266 let mut concat = ByteBufferMut::empty();
267 for buf in serialized {
268 concat.extend_from_slice(buf.as_ref());
269 }
270 let parts = SerializedArray::try_from(concat.freeze()).unwrap();
271 let decoded = parts
272 .decode(
273 &dtype,
274 len,
275 &ReadContext::new(ctx.to_ids()),
276 &LEGACY_SESSION,
277 )
278 .unwrap();
279
280 assert_arrays_eq!(decoded, array);
281 }
282}