vortex_array/arrays/varbin/compute/
take.rs1use arrow_buffer::NullBuffer;
2use num_traits::PrimInt;
3use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
4use vortex_error::{VortexResult, vortex_err, vortex_panic};
5
6use crate::arrays::VarBinEncoding;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::TakeFn;
10use crate::validity::Validity;
11use crate::variants::PrimitiveArrayTrait;
12use crate::{Array, ArrayRef, ToCanonical};
13
14impl TakeFn<&VarBinArray> for VarBinEncoding {
15 fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
16 let offsets = array.offsets().to_primitive()?;
17 let data = array.bytes();
18 let indices = indices.to_primitive()?;
19 match_each_integer_ptype!(offsets.ptype(), |$O| {
20 match_each_integer_ptype!(indices.ptype(), |$I| {
21 Ok(take(
22 array.dtype().clone(),
23 offsets.as_slice::<$O>(),
24 data.as_slice(),
25 indices.as_slice::<$I>(),
26 array.validity().clone(),
27 )?.into_array())
28 })
29 })
30 }
31}
32
33fn take<I: NativePType, O: NativePType + PrimInt>(
34 dtype: DType,
35 offsets: &[O],
36 data: &[u8],
37 indices: &[I],
38 validity: Validity,
39) -> VortexResult<VarBinArray> {
40 let validity_mask = validity.to_logical(offsets.len() - 1)?;
41 if let Some(v) = validity_mask.to_null_buffer() {
42 return Ok(take_nullable(dtype, offsets, data, indices, v));
43 }
44
45 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
46 for &idx in indices {
47 let idx = idx
48 .to_usize()
49 .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
50 let start = offsets[idx]
51 .to_usize()
52 .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
53 let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
54 vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
55 })?;
56 builder.append_value(&data[start..stop]);
57 }
58 Ok(builder.finish(dtype))
59}
60
61fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
62 dtype: DType,
63 offsets: &[O],
64 data: &[u8],
65 indices: &[I],
66 null_buffer: NullBuffer,
67) -> VarBinArray {
68 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
69 for &idx in indices {
70 let idx = idx
71 .to_usize()
72 .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", idx));
73 if null_buffer.is_valid(idx) {
74 let start = offsets[idx].to_usize().unwrap_or_else(|| {
75 vortex_panic!("Failed to convert offset to usize: {}", offsets[idx])
76 });
77 let stop = offsets[idx + 1].to_usize().unwrap_or_else(|| {
78 vortex_panic!("Failed to convert offset to usize: {}", offsets[idx + 1])
79 });
80 builder.append_value(&data[start..stop]);
81 } else {
82 builder.append_null();
83 }
84 }
85 builder.finish(dtype)
86}