vortex_array/arrays/varbin/compute/
take.rs

1use arrow_buffer::NullBuffer;
2use num_traits::PrimInt;
3use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
4use vortex_error::{VortexResult, vortex_err, vortex_panic};
5
6use crate::arrays::VarBinEncoding;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::TakeFn;
10use crate::validity::Validity;
11use crate::variants::PrimitiveArrayTrait;
12use crate::{Array, ArrayRef, ToCanonical};
13
14impl TakeFn<&VarBinArray> for VarBinEncoding {
15    fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
16        let offsets = array.offsets().to_primitive()?;
17        let data = array.bytes();
18        let indices = indices.to_primitive()?;
19        match_each_integer_ptype!(offsets.ptype(), |$O| {
20            match_each_integer_ptype!(indices.ptype(), |$I| {
21                Ok(take(
22                    array.dtype().clone(),
23                    offsets.as_slice::<$O>(),
24                    data.as_slice(),
25                    indices.as_slice::<$I>(),
26                    array.validity().clone(),
27                )?.into_array())
28            })
29        })
30    }
31}
32
33fn take<I: NativePType, O: NativePType + PrimInt>(
34    dtype: DType,
35    offsets: &[O],
36    data: &[u8],
37    indices: &[I],
38    validity: Validity,
39) -> VortexResult<VarBinArray> {
40    let validity_mask = validity.to_logical(offsets.len() - 1)?;
41    if let Some(v) = validity_mask.to_null_buffer() {
42        return Ok(take_nullable(dtype, offsets, data, indices, v));
43    }
44
45    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
46    for &idx in indices {
47        let idx = idx
48            .to_usize()
49            .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
50        let start = offsets[idx]
51            .to_usize()
52            .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
53        let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
54            vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
55        })?;
56        builder.append_value(&data[start..stop]);
57    }
58    Ok(builder.finish(dtype))
59}
60
61fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
62    dtype: DType,
63    offsets: &[O],
64    data: &[u8],
65    indices: &[I],
66    null_buffer: NullBuffer,
67) -> VarBinArray {
68    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
69    for &idx in indices {
70        let idx = idx
71            .to_usize()
72            .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", idx));
73        if null_buffer.is_valid(idx) {
74            let start = offsets[idx].to_usize().unwrap_or_else(|| {
75                vortex_panic!("Failed to convert offset to usize: {}", offsets[idx])
76            });
77            let stop = offsets[idx + 1].to_usize().unwrap_or_else(|| {
78                vortex_panic!("Failed to convert offset to usize: {}", offsets[idx + 1])
79            });
80            builder.append_value(&data[start..stop]);
81        } else {
82            builder.append_null();
83        }
84    }
85    builder.finish(dtype)
86}