vortex_array/arrays/varbin/vtable/
operator.rs1use std::marker::PhantomData;
5use std::sync::Arc;
6
7use num_traits::ToPrimitive;
8use vortex_buffer::{Buffer, ByteBuffer};
9use vortex_dtype::{DType, IntegerPType, PTypeDowncastExt, match_each_integer_ptype};
10use vortex_error::{VortexExpect, VortexResult, vortex_ensure};
11use vortex_mask::Mask;
12use vortex_vector::Vector;
13use vortex_vector::binaryview::{
14 BinaryType, BinaryView, BinaryViewType, BinaryViewVector, StringType,
15};
16
17use crate::ArrayRef;
18use crate::arrays::{VarBinArray, VarBinVTable};
19use crate::execution::{BatchKernel, BatchKernelRef, BindCtx, MaskExecution};
20use crate::vtable::{OperatorVTable, ValidityHelper};
21
22impl OperatorVTable<VarBinVTable> for VarBinVTable {
23 fn bind(
24 array: &VarBinArray,
25 selection: Option<&ArrayRef>,
26 ctx: &mut dyn BindCtx,
27 ) -> VortexResult<BatchKernelRef> {
28 let mask = ctx.bind_selection(array.len(), selection)?;
29 let validity = ctx.bind_validity(array.validity(), array.len(), selection)?;
30 let offsets = ctx.bind(array.offsets(), None)?;
31
32 match array.dtype() {
33 DType::Utf8(_) => Ok(Box::new(VarBinKernel::<StringType>::new(
34 offsets,
35 array.bytes().clone(),
36 validity,
37 mask,
38 ))),
39 DType::Binary(_) => Ok(Box::new(VarBinKernel::<BinaryType>::new(
40 offsets,
41 array.bytes().clone(),
42 validity,
43 mask,
44 ))),
45 _ => unreachable!("invalid DType for VarBinArray {}", array.dtype()),
46 }
47 }
48}
49
50struct VarBinKernel<V> {
51 offsets: BatchKernelRef,
52 bytes: ByteBuffer,
53 validity: MaskExecution,
54 selection: MaskExecution,
55 _type: PhantomData<V>,
56}
57
58impl<V> VarBinKernel<V> {
59 fn new(
60 offsets: BatchKernelRef,
61 bytes: ByteBuffer,
62 validity: MaskExecution,
63 selection: MaskExecution,
64 ) -> Self {
65 Self {
66 offsets,
67 bytes,
68 validity,
69 selection,
70 _type: PhantomData,
71 }
72 }
73}
74
75impl<V: BinaryViewType> BatchKernel for VarBinKernel<V> {
76 fn execute(self: Box<Self>) -> VortexResult<Vector> {
77 let offsets = self.offsets.execute()?.into_primitive();
78
79 match_each_integer_ptype!(offsets.ptype(), |T| {
80 let pvec = offsets.downcast::<T>();
81 let (offsets, _) = pvec.into_parts();
83 let first = offsets[0];
84
85 let lens: Buffer<u32> = offsets
86 .iter()
87 .copied()
88 .skip(1)
89 .scan(first, |prev, next| {
90 let len = (next - *prev)
91 .to_u32()
92 .vortex_expect("offset must map to u32");
93 *prev = next;
94 Some(len)
95 })
96 .collect();
97
98 let selection = self.selection.execute()?;
99
100 let views = match selection {
101 Mask::AllFalse(_) => Buffer::empty(),
102 Mask::AllTrue(_) => make_views::<T>(offsets.as_ref(), lens, &self.bytes),
103 Mask::Values(values) => {
104 make_views_filtered::<T>(offsets.as_ref(), lens, values.indices(), &self.bytes)
105 }
106 };
107
108 let validity = self.validity.execute()?;
109
110 vortex_ensure!(
111 validity.len() == views.len(),
112 "mismatched validity and views length"
113 );
114
115 Ok(Vector::from(unsafe {
118 BinaryViewVector::<V>::new_unchecked(
119 views,
120 Arc::new(Box::new([self.bytes.clone()])),
121 validity,
122 )
123 }))
124 })
125 }
126}
127
128fn make_views<OffsetType: IntegerPType>(
130 offsets: &[OffsetType],
131 lens: Buffer<u32>,
132 bytes: &[u8],
133) -> Buffer<BinaryView> {
134 std::iter::zip(offsets, lens)
135 .map(|(offset, len)| {
136 let offset = offset.to_u32().vortex_expect("offset must fit in u32");
137 let bytes = &bytes[offset as usize..(offset + len) as usize];
138 if len as usize <= BinaryView::MAX_INLINED_SIZE {
139 BinaryView::new_inlined(bytes)
140 } else {
141 BinaryView::make_view(bytes, 0, offset)
142 }
143 })
144 .collect()
145}
146
147fn make_views_filtered<OffsetType: IntegerPType>(
149 offsets: &[OffsetType],
150 lens: Buffer<u32>,
151 indices: &[usize],
152 bytes: &[u8],
153) -> Buffer<BinaryView> {
154 indices
155 .iter()
156 .copied()
157 .map(|index| {
158 let offset = offsets[index]
159 .to_u32()
160 .vortex_expect("offset must fit in u32");
161 let len = lens[index];
162 let bytes = &bytes[offset as usize..(offset + len) as usize];
163 if len as usize <= BinaryView::MAX_INLINED_SIZE {
164 BinaryView::new_inlined(bytes)
165 } else {
166 BinaryView::make_view(bytes, 0, offset)
167 }
168 })
169 .collect()
170}
171
172#[cfg(test)]
173mod tests {
174 use rstest::{fixture, rstest};
175 use vortex_dtype::{DType, Nullability};
176
177 use crate::IntoArray;
178 use crate::arrays::builder::VarBinBuilder;
179 use crate::arrays::{BoolArray, VarBinArray};
180
181 #[fixture]
182 fn strings() -> VarBinArray {
183 let mut strings = VarBinBuilder::<u32>::with_capacity(5);
184 strings.append_value("inlined");
185 strings.append_null();
186 strings.append_value("large string 1");
187 strings.append_value("large string 2");
188 strings.append_value("large string 3");
189 strings.finish(DType::Utf8(Nullability::Nullable))
190 }
191
192 #[rstest]
193 fn test_bind(strings: VarBinArray) {
194 let strings_vec = strings
196 .bind(None, &mut ())
197 .unwrap()
198 .execute()
199 .unwrap()
200 .into_string();
201 assert_eq!(strings_vec.get_ref(0), Some("inlined"));
202 assert_eq!(strings_vec.get_ref(1), None);
203 assert_eq!(strings_vec.get_ref(2), Some("large string 1"));
204 assert_eq!(strings_vec.get_ref(3), Some("large string 2"));
205 assert_eq!(strings_vec.get_ref(4), Some("large string 3"));
206 }
207
208 #[rstest]
209 fn test_bind_with_selection(strings: VarBinArray) {
210 let selection = BoolArray::from_iter([false, true, false, true, true]).into_array();
211 let strings_vec = strings
212 .bind(Some(&selection), &mut ())
213 .unwrap()
214 .execute()
215 .unwrap()
216 .into_string();
217
218 assert_eq!(strings_vec.get_ref(0), None);
219 assert_eq!(strings_vec.get_ref(1), Some("large string 2"));
220 assert_eq!(strings_vec.get_ref(2), Some("large string 3"));
221 }
222}