vortex_array/arrays/varbin/compute/
take.rs

1use num_traits::PrimInt;
2use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
3use vortex_error::{VortexResult, vortex_err, vortex_panic};
4use vortex_mask::Mask;
5
6use crate::arrays::VarBinEncoding;
7use crate::arrays::varbin::VarBinArray;
8use crate::arrays::varbin::builder::VarBinBuilder;
9use crate::compute::TakeFn;
10use crate::variants::PrimitiveArrayTrait;
11use crate::{Array, ArrayRef, ToCanonical};
12
13impl TakeFn<&VarBinArray> for VarBinEncoding {
14    fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
15        let offsets = array.offsets().to_primitive()?;
16        let data = array.bytes();
17        let indices = indices.to_primitive()?;
18        match_each_integer_ptype!(offsets.ptype(), |$O| {
19            match_each_integer_ptype!(indices.ptype(), |$I| {
20                Ok(take(
21                    array.dtype().clone(),
22                    offsets.as_slice::<$O>(),
23                    data.as_slice(),
24                    indices.as_slice::<$I>(),
25                    array.validity_mask()?,
26                    indices.validity_mask()?,
27                )?.into_array())
28            })
29        })
30    }
31}
32
33fn take<I: NativePType, O: NativePType + PrimInt>(
34    dtype: DType,
35    offsets: &[O],
36    data: &[u8],
37    indices: &[I],
38    validity_mask: Mask,
39    indices_validity_mask: Mask,
40) -> VortexResult<VarBinArray> {
41    if !validity_mask.all_true() || !indices_validity_mask.all_true() {
42        return Ok(take_nullable(
43            dtype,
44            offsets,
45            data,
46            indices,
47            validity_mask,
48            indices_validity_mask,
49        ));
50    }
51
52    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
53    for &idx in indices {
54        let idx = idx
55            .to_usize()
56            .ok_or_else(|| vortex_err!("Failed to convert index to usize: {}", idx))?;
57        let start = offsets[idx]
58            .to_usize()
59            .ok_or_else(|| vortex_err!("Failed to convert offset to usize: {}", offsets[idx]))?;
60        let stop = offsets[idx + 1].to_usize().ok_or_else(|| {
61            vortex_err!("Failed to convert offset to usize: {}", offsets[idx + 1])
62        })?;
63        builder.append_value(&data[start..stop]);
64    }
65    Ok(builder.finish(dtype))
66}
67
68fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
69    dtype: DType,
70    offsets: &[O],
71    data: &[u8],
72    indices: &[I],
73    data_validity: Mask,
74    indices_validity: Mask,
75) -> VarBinArray {
76    let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
77    for (idx, data_idx) in indices.iter().enumerate() {
78        if !indices_validity.value(idx) {
79            builder.append_null();
80            continue;
81        }
82        let data_idx = data_idx
83            .to_usize()
84            .unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", data_idx));
85        if data_validity.value(data_idx) {
86            let start = offsets[data_idx].to_usize().unwrap_or_else(|| {
87                vortex_panic!("Failed to convert offset to usize: {}", offsets[data_idx])
88            });
89            let stop = offsets[data_idx + 1].to_usize().unwrap_or_else(|| {
90                vortex_panic!(
91                    "Failed to convert offset to usize: {}",
92                    offsets[data_idx + 1]
93                )
94            });
95            builder.append_value(&data[start..stop]);
96        } else {
97            builder.append_null();
98        }
99    }
100    builder.finish(dtype)
101}