vortex_array/arrays/varbin/compute/
take.rs

1use num_traits::PrimInt;
2use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
3use vortex_error::{VortexResult, vortex_err, vortex_panic};
4use vortex_mask::Mask;
5
6use crate::arrays::VarBinVTable;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::{TakeKernel, TakeKernelAdapter};
10use crate::{Array, ArrayRef, IntoArray, ToCanonical, register_kernel};
11
12impl TakeKernel for VarBinVTable {
13    fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
14        let offsets = array.offsets().to_primitive()?;
15        let data = array.bytes();
16        let indices = indices.to_primitive()?;
17        match_each_integer_ptype!(offsets.ptype(), |O| {
18            match_each_integer_ptype!(indices.ptype(), |I| {
19                Ok(take(
20                    array.dtype().clone(),
21                    offsets.as_slice::<O>(),
22                    data.as_slice(),
23                    indices.as_slice::<I>(),
24                    array.validity_mask()?,
25                    indices.validity_mask()?,
26                )?
27                .into_array())
28            })
29        })
30    }
31}
32
33register_kernel!(TakeKernelAdapter(VarBinVTable).lift());
34
35fn take<I: NativePType, O: NativePType + PrimInt>(
36    dtype: DType,
37    offsets: &[O],
38    data: &[u8],
39    indices: &[I],
40    validity_mask: Mask,
41    indices_validity_mask: Mask,
42) -> VortexResult<VarBinArray> {
43    if !validity_mask.all_true() || !indices_validity_mask.all_true() {
44        return Ok(take_nullable(
45            dtype,
46            offsets,
47            data,
48            indices,
49            validity_mask,
50            indices_validity_mask,
51        ));
52    }
53
54    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
55    for &idx in indices {
56        let idx = idx
57            .to_usize()
58            .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
59        let start = offsets[idx]
60            .to_usize()
61            .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
62        let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
63            vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
64        })?;
65        builder.append_value(&data[start..stop]);
66    }
67    Ok(builder.finish(dtype))
68}
69
70fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
71    dtype: DType,
72    offsets: &[O],
73    data: &[u8],
74    indices: &[I],
75    data_validity: Mask,
76    indices_validity: Mask,
77) -> VarBinArray {
78    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
79    for (idx, data_idx) in indices.iter().enumerate() {
80        if !indices_validity.value(idx) {
81            builder.append_null();
82            continue;
83        }
84        let data_idx = data_idx
85            .to_usize()
86            .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", data_idx));
87        if data_validity.value(data_idx) {
88            let start = offsets[data_idx].to_usize().unwrap_or_else(|| {
89                vortex_panic!("Failed to convert offset to usize: {}", offsets[data_idx])
90            });
91            let stop = offsets[data_idx + 1].to_usize().unwrap_or_else(|| {
92                vortex_panic!(
93                    "Failed to convert offset to usize: {}",
94                    offsets[data_idx + 1]
95                )
96            });
97            builder.append_value(&data[start..stop]);
98        } else {
99            builder.append_null();
100        }
101    }
102    builder.finish(dtype)
103}