vortex_array/arrays/varbin/compute/
take.rs

1use arrow_buffer::NullBuffer;
2use num_traits::PrimInt;
3use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
4use vortex_error::{VortexResult, vortex_err, vortex_panic};
5use vortex_mask::Mask;
6
7use crate::arrays::VarBinEncoding;
8use crate::arrays::varbin::VarBinArray;
9use crate::arrays::varbin::builder::VarBinBuilder;
10use crate::compute::TakeFn;
11use crate::variants::PrimitiveArrayTrait;
12use crate::{Array, ArrayRef, ToCanonical};
13
14impl TakeFn<&VarBinArray> for VarBinEncoding {
15    fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
16        let offsets = array.offsets().to_primitive()?;
17        let data = array.bytes();
18        let indices = indices.to_primitive()?;
19        match_each_integer_ptype!(offsets.ptype(), |$O| {
20            match_each_integer_ptype!(indices.ptype(), |$I| {
21                Ok(take(
22                    array.dtype().clone(),
23                    offsets.as_slice::<$O>(),
24                    data.as_slice(),
25                    indices.as_slice::<$I>(),
26                    array.validity_mask()?,
27                )?.into_array())
28            })
29        })
30    }
31}
32
33fn take<I: NativePType, O: NativePType + PrimInt>(
34    dtype: DType,
35    offsets: &[O],
36    data: &[u8],
37    indices: &[I],
38    validity_mask: Mask,
39) -> VortexResult<VarBinArray> {
40    if let Some(v) = validity_mask.to_null_buffer() {
41        return Ok(take_nullable(dtype, offsets, data, indices, v));
42    }
43
44    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
45    for &idx in indices {
46        let idx = idx
47            .to_usize()
48            .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
49        let start = offsets[idx]
50            .to_usize()
51            .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
52        let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
53            vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
54        })?;
55        builder.append_value(&data[start..stop]);
56    }
57    Ok(builder.finish(dtype))
58}
59
60fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
61    dtype: DType,
62    offsets: &[O],
63    data: &[u8],
64    indices: &[I],
65    null_buffer: NullBuffer,
66) -> VarBinArray {
67    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
68    for &idx in indices {
69        let idx = idx
70            .to_usize()
71            .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", idx));
72        if null_buffer.is_valid(idx) {
73            let start = offsets[idx].to_usize().unwrap_or_else(|| {
74                vortex_panic!("Failed to convert offset to usize: {}", offsets[idx])
75            });
76            let stop = offsets[idx + 1].to_usize().unwrap_or_else(|| {
77                vortex_panic!("Failed to convert offset to usize: {}", offsets[idx + 1])
78            });
79            builder.append_value(&data[start..stop]);
80        } else {
81            builder.append_null();
82        }
83    }
84    builder.finish(dtype)
85}