vortex_array/arrays/varbin/compute/
take.rs1use num_traits::PrimInt;
2use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
3use vortex_error::{VortexResult, vortex_err, vortex_panic};
4use vortex_mask::Mask;
5
6use crate::arrays::VarBinVTable;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::{TakeKernel, TakeKernelAdapter};
10use crate::{Array, ArrayRef, IntoArray, ToCanonical, register_kernel};
11
12impl TakeKernel for VarBinVTable {
13 fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
14 let offsets = array.offsets().to_primitive()?;
15 let data = array.bytes();
16 let indices = indices.to_primitive()?;
17 match_each_integer_ptype!(offsets.ptype(), |O| {
18 match_each_integer_ptype!(indices.ptype(), |I| {
19 Ok(take(
20 array.dtype().clone(),
21 offsets.as_slice::<O>(),
22 data.as_slice(),
23 indices.as_slice::<I>(),
24 array.validity_mask()?,
25 indices.validity_mask()?,
26 )?
27 .into_array())
28 })
29 })
30 }
31}
32
33register_kernel!(TakeKernelAdapter(VarBinVTable).lift());
34
35fn take<I: NativePType, O: NativePType + PrimInt>(
36 dtype: DType,
37 offsets: &[O],
38 data: &[u8],
39 indices: &[I],
40 validity_mask: Mask,
41 indices_validity_mask: Mask,
42) -> VortexResult<VarBinArray> {
43 if !validity_mask.all_true() || !indices_validity_mask.all_true() {
44 return Ok(take_nullable(
45 dtype,
46 offsets,
47 data,
48 indices,
49 validity_mask,
50 indices_validity_mask,
51 ));
52 }
53
54 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
55 for &idx in indices {
56 let idx = idx
57 .to_usize()
58 .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
59 let start = offsets[idx]
60 .to_usize()
61 .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
62 let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
63 vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
64 })?;
65 builder.append_value(&data[start..stop]);
66 }
67 Ok(builder.finish(dtype))
68}
69
70fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
71 dtype: DType,
72 offsets: &[O],
73 data: &[u8],
74 indices: &[I],
75 data_validity: Mask,
76 indices_validity: Mask,
77) -> VarBinArray {
78 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
79 for (idx, data_idx) in indices.iter().enumerate() {
80 if !indices_validity.value(idx) {
81 builder.append_null();
82 continue;
83 }
84 let data_idx = data_idx
85 .to_usize()
86 .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", data_idx));
87 if data_validity.value(data_idx) {
88 let start = offsets[data_idx].to_usize().unwrap_or_else(|| {
89 vortex_panic!("Failed to convert offset to usize: {}", offsets[data_idx])
90 });
91 let stop = offsets[data_idx + 1].to_usize().unwrap_or_else(|| {
92 vortex_panic!(
93 "Failed to convert offset to usize: {}",
94 offsets[data_idx + 1]
95 )
96 });
97 builder.append_value(&data[start..stop]);
98 } else {
99 builder.append_null();
100 }
101 }
102 builder.finish(dtype)
103}