vortex_array/arrays/varbin/compute/
take.rs1use arrow_buffer::NullBuffer;
2use num_traits::PrimInt;
3use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
4use vortex_error::{VortexResult, vortex_err, vortex_panic};
5use vortex_mask::Mask;
6
7use crate::arrays::VarBinEncoding;
8use crate::arrays::varbin::VarBinArray;
9use crate::arrays::varbin::builder::VarBinBuilder;
10use crate::compute::TakeFn;
11use crate::variants::PrimitiveArrayTrait;
12use crate::{Array, ArrayRef, ToCanonical};
13
14impl TakeFn<&VarBinArray> for VarBinEncoding {
15 fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
16 let offsets = array.offsets().to_primitive()?;
17 let data = array.bytes();
18 let indices = indices.to_primitive()?;
19 match_each_integer_ptype!(offsets.ptype(), |$O| {
20 match_each_integer_ptype!(indices.ptype(), |$I| {
21 Ok(take(
22 array.dtype().clone(),
23 offsets.as_slice::<$O>(),
24 data.as_slice(),
25 indices.as_slice::<$I>(),
26 array.validity_mask()?,
27 )?.into_array())
28 })
29 })
30 }
31}
32
33fn take<I: NativePType, O: NativePType + PrimInt>(
34 dtype: DType,
35 offsets: &[O],
36 data: &[u8],
37 indices: &[I],
38 validity_mask: Mask,
39) -> VortexResult<VarBinArray> {
40 if let Some(v) = validity_mask.to_null_buffer() {
41 return Ok(take_nullable(dtype, offsets, data, indices, v));
42 }
43
44 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
45 for &idx in indices {
46 let idx = idx
47 .to_usize()
48 .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
49 let start = offsets[idx]
50 .to_usize()
51 .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
52 let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
53 vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
54 })?;
55 builder.append_value(&data[start..stop]);
56 }
57 Ok(builder.finish(dtype))
58}
59
60fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
61 dtype: DType,
62 offsets: &[O],
63 data: &[u8],
64 indices: &[I],
65 null_buffer: NullBuffer,
66) -> VarBinArray {
67 let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
68 for &idx in indices {
69 let idx = idx
70 .to_usize()
71 .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", idx));
72 if null_buffer.is_valid(idx) {
73 let start = offsets[idx].to_usize().unwrap_or_else(|| {
74 vortex_panic!("Failed to convert offset to usize: {}", offsets[idx])
75 });
76 let stop = offsets[idx + 1].to_usize().unwrap_or_else(|| {
77 vortex_panic!("Failed to convert offset to usize: {}", offsets[idx + 1])
78 });
79 builder.append_value(&data[start..stop]);
80 } else {
81 builder.append_null();
82 }
83 }
84 builder.finish(dtype)
85}