vortex_array/arrays/varbin/compute/
take.rs

1use num_traits::PrimInt;
2use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
3use vortex_error::{VortexResult, vortex_err, vortex_panic};
4use vortex_mask::Mask;
5
6use crate::arrays::VarBinVTable;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::{TakeKernel, TakeKernelAdapter};
10use crate::{Array, ArrayRef, IntoArray, ToCanonical, register_kernel};
11
12impl TakeKernel for VarBinVTable {
13    fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
14        let offsets = array.offsets().to_primitive()?;
15        let data = array.bytes();
16        let indices = indices.to_primitive()?;
17        match_each_integer_ptype!(offsets.ptype(), |$O| {
18            match_each_integer_ptype!(indices.ptype(), |$I| {
19                Ok(take(
20                    array.dtype().clone(),
21                    offsets.as_slice::<$O>(),
22                    data.as_slice(),
23                    indices.as_slice::<$I>(),
24                    array.validity_mask()?,
25                    indices.validity_mask()?,
26                )?.into_array())
27            })
28        })
29    }
30}
31
32register_kernel!(TakeKernelAdapter(VarBinVTable).lift());
33
34fn take<I: NativePType, O: NativePType + PrimInt>(
35    dtype: DType,
36    offsets: &[O],
37    data: &[u8],
38    indices: &[I],
39    validity_mask: Mask,
40    indices_validity_mask: Mask,
41) -> VortexResult<VarBinArray> {
42    if !validity_mask.all_true() || !indices_validity_mask.all_true() {
43        return Ok(take_nullable(
44            dtype,
45            offsets,
46            data,
47            indices,
48            validity_mask,
49            indices_validity_mask,
50        ));
51    }
52
53    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
54    for &idx in indices {
55        let idx = idx
56            .to_usize()
57            .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
58        let start = offsets[idx]
59            .to_usize()
60            .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
61        let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
62            vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
63        })?;
64        builder.append_value(&data[start..stop]);
65    }
66    Ok(builder.finish(dtype))
67}
68
69fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
70    dtype: DType,
71    offsets: &[O],
72    data: &[u8],
73    indices: &[I],
74    data_validity: Mask,
75    indices_validity: Mask,
76) -> VarBinArray {
77    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
78    for (idx, data_idx) in indices.iter().enumerate() {
79        if !indices_validity.value(idx) {
80            builder.append_null();
81            continue;
82        }
83        let data_idx = data_idx
84            .to_usize()
85            .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", data_idx));
86        if data_validity.value(data_idx) {
87            let start = offsets[data_idx].to_usize().unwrap_or_else(|| {
88                vortex_panic!("Failed to convert offset to usize: {}", offsets[data_idx])
89            });
90            let stop = offsets[data_idx + 1].to_usize().unwrap_or_else(|| {
91                vortex_panic!(
92                    "Failed to convert offset to usize: {}",
93                    offsets[data_idx + 1]
94                )
95            });
96            builder.append_value(&data[start..stop]);
97        } else {
98            builder.append_null();
99        }
100    }
101    builder.finish(dtype)
102}