vortex_array/arrays/varbin/compute/
take.rs1use num_traits::PrimInt;
2use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
3use vortex_error::{VortexResult, vortex_err, vortex_panic};
4use vortex_mask::Mask;
5
6use crate::arrays::VarBinVTable;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::{TakeKernel, TakeKernelAdapter};
10use crate::{Array, ArrayRef, IntoArray, ToCanonical, register_kernel};
11
12impl TakeKernel for VarBinVTable {
13 fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
14 let offsets = array.offsets().to_primitive()?;
15 let data = array.bytes();
16 let indices = indices.to_primitive()?;
17 match_each_integer_ptype!(offsets.ptype(), |$O| {
18 match_each_integer_ptype!(indices.ptype(), |$I| {
19 Ok(take(
20 array.dtype().clone(),
21 offsets.as_slice::<$O>(),
22 data.as_slice(),
23 indices.as_slice::<$I>(),
24 array.validity_mask()?,
25 indices.validity_mask()?,
26 )?.into_array())
27 })
28 })
29 }
30}
31
32register_kernel!(TakeKernelAdapter(VarBinVTable).lift());
33
34fn take<I: NativePType, O: NativePType + PrimInt>(
35 dtype: DType,
36 offsets: &[O],
37 data: &[u8],
38 indices: &[I],
39 validity_mask: Mask,
40 indices_validity_mask: Mask,
41) -> VortexResult<VarBinArray> {
42 if !validity_mask.all_true() || !indices_validity_mask.all_true() {
43 return Ok(take_nullable(
44 dtype,
45 offsets,
46 data,
47 indices,
48 validity_mask,
49 indices_validity_mask,
50 ));
51 }
52
53 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
54 for &idx in indices {
55 let idx = idx
56 .to_usize()
57 .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
58 let start = offsets[idx]
59 .to_usize()
60 .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
61 let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
62 vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
63 })?;
64 builder.append_value(&data[start..stop]);
65 }
66 Ok(builder.finish(dtype))
67}
68
69fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
70 dtype: DType,
71 offsets: &[O],
72 data: &[u8],
73 indices: &[I],
74 data_validity: Mask,
75 indices_validity: Mask,
76) -> VarBinArray {
77 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
78 for (idx, data_idx) in indices.iter().enumerate() {
79 if !indices_validity.value(idx) {
80 builder.append_null();
81 continue;
82 }
83 let data_idx = data_idx
84 .to_usize()
85 .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", data_idx));
86 if data_validity.value(data_idx) {
87 let start = offsets[data_idx].to_usize().unwrap_or_else(|| {
88 vortex_panic!("Failed to convert offset to usize: {}", offsets[data_idx])
89 });
90 let stop = offsets[data_idx + 1].to_usize().unwrap_or_else(|| {
91 vortex_panic!(
92 "Failed to convert offset to usize: {}",
93 offsets[data_idx + 1]
94 )
95 });
96 builder.append_value(&data[start..stop]);
97 } else {
98 builder.append_null();
99 }
100 }
101 builder.finish(dtype)
102}