vortex_array/arrays/varbin/vtable/
operator.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use num_traits::ToPrimitive;
8use vortex_buffer::{Buffer, ByteBuffer};
9use vortex_dtype::{DType, IntegerPType, PTypeDowncastExt, match_each_integer_ptype};
10use vortex_error::{VortexExpect, VortexResult, vortex_ensure};
11use vortex_mask::Mask;
12use vortex_vector::Vector;
13use vortex_vector::binaryview::{
14    BinaryType, BinaryView, BinaryViewType, BinaryViewVector, StringType,
15};
16
17use crate::ArrayRef;
18use crate::arrays::{VarBinArray, VarBinVTable};
19use crate::execution::{BatchKernel, BatchKernelRef, BindCtx, MaskExecution};
20use crate::vtable::{OperatorVTable, ValidityHelper};
21
22impl OperatorVTable<VarBinVTable> for VarBinVTable {
23    fn bind(
24        array: &VarBinArray,
25        selection: Option<&ArrayRef>,
26        ctx: &mut dyn BindCtx,
27    ) -> VortexResult<BatchKernelRef> {
28        let mask = ctx.bind_selection(array.len(), selection)?;
29        let validity = ctx.bind_validity(array.validity(), array.len(), selection)?;
30        let offsets = ctx.bind(array.offsets(), None)?;
31
32        match array.dtype() {
33            DType::Utf8(_) => Ok(Box::new(VarBinKernel::<StringType>::new(
34                offsets,
35                array.bytes().clone(),
36                validity,
37                mask,
38            ))),
39            DType::Binary(_) => Ok(Box::new(VarBinKernel::<BinaryType>::new(
40                offsets,
41                array.bytes().clone(),
42                validity,
43                mask,
44            ))),
45            _ => unreachable!("invalid DType for VarBinArray {}", array.dtype()),
46        }
47    }
48}
49
50struct VarBinKernel<V> {
51    offsets: BatchKernelRef,
52    bytes: ByteBuffer,
53    validity: MaskExecution,
54    selection: MaskExecution,
55    _type: PhantomData<V>,
56}
57
58impl<V> VarBinKernel<V> {
59    fn new(
60        offsets: BatchKernelRef,
61        bytes: ByteBuffer,
62        validity: MaskExecution,
63        selection: MaskExecution,
64    ) -> Self {
65        Self {
66            offsets,
67            bytes,
68            validity,
69            selection,
70            _type: PhantomData,
71        }
72    }
73}
74
75impl<V: BinaryViewType> BatchKernel for VarBinKernel<V> {
76    fn execute(self: Box<Self>) -> VortexResult<Vector> {
77        let offsets = self.offsets.execute()?.into_primitive();
78
79        match_each_integer_ptype!(offsets.ptype(), |T| {
80            let pvec = offsets.downcast::<T>();
81            // NOTE: discard the validity because offsets must be non-nullable
82            let (offsets, _) = pvec.into_parts();
83            let first = offsets[0];
84
85            let lens: Buffer<u32> = offsets
86                .iter()
87                .copied()
88                .skip(1)
89                .scan(first, |prev, next| {
90                    let len = (next - *prev)
91                        .to_u32()
92                        .vortex_expect("offset must map to u32");
93                    *prev = next;
94                    Some(len)
95                })
96                .collect();
97
98            let selection = self.selection.execute()?;
99
100            let views = match selection {
101                Mask::AllFalse(_) => Buffer::empty(),
102                Mask::AllTrue(_) => make_views::<T>(offsets.as_ref(), lens, &self.bytes),
103                Mask::Values(values) => {
104                    make_views_filtered::<T>(offsets.as_ref(), lens, values.indices(), &self.bytes)
105                }
106            };
107
108            let validity = self.validity.execute()?;
109
110            vortex_ensure!(
111                validity.len() == views.len(),
112                "mismatched validity and views length"
113            );
114
115            // SAFETY: views were constructed in the loop above to point at valid data from
116            //  the buffer. Validity was checked immediately above to be of the appropriate length.
117            Ok(Vector::from(unsafe {
118                BinaryViewVector::<V>::new_unchecked(
119                    views,
120                    Arc::new(Box::new([self.bytes.clone()])),
121                    validity,
122                )
123            }))
124        })
125    }
126}
127
128// Returns a set of views
129fn make_views<OffsetType: IntegerPType>(
130    offsets: &[OffsetType],
131    lens: Buffer<u32>,
132    bytes: &[u8],
133) -> Buffer<BinaryView> {
134    std::iter::zip(offsets, lens)
135        .map(|(offset, len)| {
136            let offset = offset.to_u32().vortex_expect("offset must fit in u32");
137            let bytes = &bytes[offset as usize..(offset + len) as usize];
138            if len as usize <= BinaryView::MAX_INLINED_SIZE {
139                BinaryView::new_inlined(bytes)
140            } else {
141                BinaryView::make_view(bytes, 0, offset)
142            }
143        })
144        .collect()
145}
146
147/// Only make views for values at the given `indices`
148fn make_views_filtered<OffsetType: IntegerPType>(
149    offsets: &[OffsetType],
150    lens: Buffer<u32>,
151    indices: &[usize],
152    bytes: &[u8],
153) -> Buffer<BinaryView> {
154    indices
155        .iter()
156        .copied()
157        .map(|index| {
158            let offset = offsets[index]
159                .to_u32()
160                .vortex_expect("offset must fit in u32");
161            let len = lens[index];
162            let bytes = &bytes[offset as usize..(offset + len) as usize];
163            if len as usize <= BinaryView::MAX_INLINED_SIZE {
164                BinaryView::new_inlined(bytes)
165            } else {
166                BinaryView::make_view(bytes, 0, offset)
167            }
168        })
169        .collect()
170}
171
172#[cfg(test)]
173mod tests {
174    use rstest::{fixture, rstest};
175    use vortex_dtype::{DType, Nullability};
176
177    use crate::IntoArray;
178    use crate::arrays::builder::VarBinBuilder;
179    use crate::arrays::{BoolArray, VarBinArray};
180
181    #[fixture]
182    fn strings() -> VarBinArray {
183        let mut strings = VarBinBuilder::<u32>::with_capacity(5);
184        strings.append_value("inlined");
185        strings.append_null();
186        strings.append_value("large string 1");
187        strings.append_value("large string 2");
188        strings.append_value("large string 3");
189        strings.finish(DType::Utf8(Nullability::Nullable))
190    }
191
192    #[rstest]
193    fn test_bind(strings: VarBinArray) {
194        // Attempt to bind with a full selection.
195        let strings_vec = strings
196            .bind(None, &mut ())
197            .unwrap()
198            .execute()
199            .unwrap()
200            .into_string();
201        assert_eq!(strings_vec.get_ref(0), Some("inlined"));
202        assert_eq!(strings_vec.get_ref(1), None);
203        assert_eq!(strings_vec.get_ref(2), Some("large string 1"));
204        assert_eq!(strings_vec.get_ref(3), Some("large string 2"));
205        assert_eq!(strings_vec.get_ref(4), Some("large string 3"));
206    }
207
208    #[rstest]
209    fn test_bind_with_selection(strings: VarBinArray) {
210        let selection = BoolArray::from_iter([false, true, false, true, true]).into_array();
211        let strings_vec = strings
212            .bind(Some(&selection), &mut ())
213            .unwrap()
214            .execute()
215            .unwrap()
216            .into_string();
217
218        assert_eq!(strings_vec.get_ref(0), None);
219        assert_eq!(strings_vec.get_ref(1), Some("large string 2"));
220        assert_eq!(strings_vec.get_ref(2), Some("large string 3"));
221    }
222}