vortex_array/arrays/varbin/compute/
take.rs1use num_traits::PrimInt;
2use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
3use vortex_error::{VortexResult, vortex_err, vortex_panic};
4use vortex_mask::Mask;
5
6use crate::arrays::VarBinEncoding;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::TakeFn;
10use crate::variants::PrimitiveArrayTrait;
11use crate::{Array, ArrayRef, ToCanonical};
12
13impl TakeFn<&VarBinArray> for VarBinEncoding {
14 fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
15 let offsets = array.offsets().to_primitive()?;
16 let data = array.bytes();
17 let indices = indices.to_primitive()?;
18 match_each_integer_ptype!(offsets.ptype(), |$O| {
19 match_each_integer_ptype!(indices.ptype(), |$I| {
20 Ok(take(
21 array.dtype().clone(),
22 offsets.as_slice::<$O>(),
23 data.as_slice(),
24 indices.as_slice::<$I>(),
25 array.validity_mask()?,
26 indices.validity_mask()?,
27 )?.into_array())
28 })
29 })
30 }
31}
32
33fn take<I: NativePType, O: NativePType + PrimInt>(
34 dtype: DType,
35 offsets: &[O],
36 data: &[u8],
37 indices: &[I],
38 validity_mask: Mask,
39 indices_validity_mask: Mask,
40) -> VortexResult<VarBinArray> {
41 if !validity_mask.all_true() || !indices_validity_mask.all_true() {
42 return Ok(take_nullable(
43 dtype,
44 offsets,
45 data,
46 indices,
47 validity_mask,
48 indices_validity_mask,
49 ));
50 }
51
52 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
53 for &idx in indices {
54 let idx = idx
55 .to_usize()
56 .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
57 let start = offsets[idx]
58 .to_usize()
59 .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
60 let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
61 vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
62 })?;
63 builder.append_value(&data[start..stop]);
64 }
65 Ok(builder.finish(dtype))
66}
67
68fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
69 dtype: DType,
70 offsets: &[O],
71 data: &[u8],
72 indices: &[I],
73 data_validity: Mask,
74 indices_validity: Mask,
75) -> VarBinArray {
76 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
77 for (idx, data_idx) in indices.iter().enumerate() {
78 if !indices_validity.value(idx) {
79 builder.append_null();
80 continue;
81 }
82 let data_idx = data_idx
83 .to_usize()
84 .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", data_idx));
85 if data_validity.value(data_idx) {
86 let start = offsets[data_idx].to_usize().unwrap_or_else(|| {
87 vortex_panic!("Failed to convert offset to usize: {}", offsets[data_idx])
88 });
89 let stop = offsets[data_idx + 1].to_usize().unwrap_or_else(|| {
90 vortex_panic!(
91 "Failed to convert offset to usize: {}",
92 offsets[data_idx + 1]
93 )
94 });
95 builder.append_value(&data[start..stop]);
96 } else {
97 builder.append_null();
98 }
99 }
100 builder.finish(dtype)
101}