Skip to main content

vortex_array/arrays/varbin/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use vortex_buffer::ByteBuffer;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_ensure;
12use vortex_error::vortex_err;
13
14use crate::ArrayRef;
15use crate::LEGACY_SESSION;
16#[expect(deprecated)]
17use crate::ToCanonical as _;
18use crate::VortexSessionExecute;
19use crate::array::Array;
20use crate::array::ArrayParts;
21use crate::array::TypedArrayRef;
22use crate::array::child_to_validity;
23use crate::array::validity_to_child;
24use crate::arrays::VarBin;
25use crate::arrays::varbin::builder::VarBinBuilder;
26use crate::buffer::BufferHandle;
27use crate::dtype::DType;
28use crate::dtype::IntegerPType;
29use crate::dtype::Nullability;
30use crate::match_each_integer_ptype;
31use crate::validity::Validity;
32
33/// The offsets array defining the start/end of each variable-length binary element.
34pub(super) const OFFSETS_SLOT: usize = 0;
35/// The validity bitmap indicating which elements are non-null.
36pub(super) const VALIDITY_SLOT: usize = 1;
37pub(super) const NUM_SLOTS: usize = 2;
38pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
39
40#[derive(Clone, Debug)]
41pub struct VarBinData {
42    pub(super) bytes: BufferHandle,
43}
44
45impl Display for VarBinData {
46    fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
47        Ok(())
48    }
49}
50
51pub struct VarBinDataParts {
52    pub dtype: DType,
53    pub bytes: BufferHandle,
54    pub offsets: ArrayRef,
55    pub validity: Validity,
56}
57
58impl VarBinData {
59    /// Creates a new `VarBinArray`.
60    ///
61    /// # Panics
62    ///
63    /// Panics if the provided components do not satisfy the invariants documented
64    /// in `VarBinArray::new_unchecked`.
65    pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
66        Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
67    }
68
69    /// Creates a new `VarBinArray`.
70    ///
71    /// # Panics
72    ///
73    /// Panics if the provided components do not satisfy the invariants documented
74    /// in `VarBinArray::new_unchecked`.
75    pub fn build_from_handle(
76        offset: ArrayRef,
77        bytes: BufferHandle,
78        dtype: DType,
79        validity: Validity,
80    ) -> Self {
81        Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
82    }
83
84    pub(crate) fn make_slots(
85        offsets: ArrayRef,
86        validity: &Validity,
87        len: usize,
88    ) -> Vec<Option<ArrayRef>> {
89        vec![Some(offsets), validity_to_child(validity, len)]
90    }
91
92    /// Constructs a new `VarBinArray`.
93    ///
94    /// See `VarBinArray::new_unchecked` for more information.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the provided components do not satisfy the invariants documented in
99    /// `VarBinArray::new_unchecked`.
100    pub fn try_build(
101        offsets: ArrayRef,
102        bytes: ByteBuffer,
103        dtype: DType,
104        validity: Validity,
105    ) -> VortexResult<Self> {
106        let bytes = BufferHandle::new_host(bytes);
107        Self::validate(&offsets, &bytes, &dtype, &validity)?;
108
109        // SAFETY: validate ensures all invariants are met.
110        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
111    }
112
113    /// Constructs a new `VarBinArray` from a `BufferHandle` of memory that may exist
114    /// on the CPU or GPU.
115    ///
116    /// See `VarBinArray::new_unchecked` for more information.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the provided components do not satisfy the invariants documented in
121    /// `VarBinArray::new_unchecked`.
122    pub fn try_build_from_handle(
123        offsets: ArrayRef,
124        bytes: BufferHandle,
125        dtype: DType,
126        validity: Validity,
127    ) -> VortexResult<Self> {
128        Self::validate(&offsets, &bytes, &dtype, &validity)?;
129
130        // SAFETY: validate ensures all invariants are met.
131        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
132    }
133
134    /// Creates a new `VarBinArray` without validation from these components:
135    ///
136    /// * `offsets` is an array of byte offsets into the `bytes` buffer.
137    /// * `bytes` is a buffer containing all the variable-length data concatenated.
138    /// * `dtype` specifies whether this contains UTF-8 strings or binary data.
139    /// * `validity` holds the null values.
140    ///
141    /// # Safety
142    ///
143    /// The caller must ensure all of the following invariants are satisfied:
144    ///
145    /// ## Offsets Requirements
146    ///
147    /// - `offsets` must be a non-nullable integer array.
148    /// - `offsets` must contain at least 1 element (for empty array, it contains \[0\]).
149    /// - All values in `offsets` must be monotonically non-decreasing.
150    /// - The first value in `offsets` must be 0.
151    /// - No offset value may exceed `bytes.len()`.
152    ///
153    /// ## Type Requirements
154    ///
155    /// - `dtype` must be exactly [`DType::Binary`] or [`DType::Utf8`].
156    /// - If `dtype` is [`DType::Utf8`], every byte slice `bytes[offsets[i]..offsets[i+1]]` must be valid UTF-8.
157    /// - `dtype.is_nullable()` must match the nullability of `validity`.
158    ///
159    /// ## Validity Requirements
160    ///
161    /// - If `validity` is [`Validity::Array`], its length must exactly equal `offsets.len() - 1`.
162    pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
163        // SAFETY: `new_unchecked_from_handle` has same invariants which should be checked
164        //  by caller.
165        unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
166    }
167
168    /// Creates a new `VarBinArray` without validation from its components, with string data
169    /// stored in a `BufferHandle` (CPU or GPU).
170    ///
171    /// # Safety
172    ///
173    /// The caller must ensure all the invariants documented in `new_unchecked` are satisfied.
174    pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
175        Self { bytes }
176    }
177
178    /// Validates the components that would be used to create a `VarBinArray`.
179    ///
180    /// This function checks all the invariants required by `VarBinArray::new_unchecked`.
181    pub fn validate(
182        offsets: &ArrayRef,
183        bytes: &BufferHandle,
184        dtype: &DType,
185        validity: &Validity,
186    ) -> VortexResult<()> {
187        // Check offsets are non-nullable integer
188        vortex_ensure!(
189            offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
190            MismatchedTypes: "non nullable int", offsets.dtype()
191        );
192
193        // Check dtype is Binary or Utf8
194        vortex_ensure!(
195            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
196            MismatchedTypes: "utf8 or binary", dtype
197        );
198
199        // Check nullability matches
200        vortex_ensure!(
201            dtype.is_nullable() != matches!(validity, Validity::NonNullable),
202            InvalidArgument: "incorrect validity {:?} for dtype {}",
203            validity,
204            dtype
205        );
206
207        // Check offsets has at least one element
208        vortex_ensure!(
209            !offsets.is_empty(),
210            InvalidArgument: "Offsets must have at least one element"
211        );
212
213        // Skip host-only validation when offsets/bytes are not host-resident.
214        if offsets.is_host() && bytes.is_on_host() {
215            let last_offset = offsets
216                .execute_scalar(
217                    offsets.len() - 1,
218                    &mut LEGACY_SESSION.create_execution_ctx(),
219                )?
220                .as_primitive()
221                .as_::<usize>()
222                .ok_or_else(
223                    || vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
224                )?;
225            vortex_ensure!(
226                last_offset <= bytes.len(),
227                InvalidArgument: "Last offset {} exceeds bytes length {}",
228                last_offset,
229                bytes.len()
230            );
231        }
232
233        // Check validity length
234        if let Some(validity_len) = validity.maybe_len() {
235            vortex_ensure!(
236                validity_len == offsets.len() - 1,
237                "Validity length {} doesn't match array length {}",
238                validity_len,
239                offsets.len() - 1
240            );
241        }
242
243        // Validate UTF-8 for Utf8 dtype. Skip when offsets/bytes are not host-resident.
244        if offsets.is_host()
245            && bytes.is_on_host()
246            && matches!(dtype, DType::Utf8(_))
247            && let Some(bytes) = bytes.as_host_opt()
248        {
249            #[expect(deprecated)]
250            let primitive_offsets = offsets.to_primitive();
251            match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
252                let offsets_slice = primitive_offsets.as_slice::<O>();
253                for (i, (start, end)) in offsets_slice
254                    .windows(2)
255                    .map(|o| (o[0].as_(), o[1].as_()))
256                    .enumerate()
257                {
258                    if validity.is_null(i)? {
259                        continue;
260                    }
261
262                    let string_bytes = &bytes.as_ref()[start..end];
263                    simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
264                        #[expect(clippy::unwrap_used)]
265                        // run validation using `compat` package to get more detailed error message
266                        let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
267                        vortex_err!("invalid utf-8: {err} at index {i}")
268                    })?;
269                }
270            });
271        }
272
273        Ok(())
274    }
275
276    /// Access the value bytes child buffer
277    ///
278    /// # Note
279    ///
280    /// Bytes child buffer is never sliced when the array is sliced so this can include values
281    /// that are not logically present in the array. Users should prefer `sliced_bytes`
282    /// unless they're resolving values via the offset child array.
283    #[inline]
284    pub fn bytes(&self) -> &ByteBuffer {
285        self.bytes.as_host()
286    }
287
288    /// Access the value bytes buffer handle.
289    #[inline]
290    pub fn bytes_handle(&self) -> &BufferHandle {
291        &self.bytes
292    }
293}
294
295pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
296    fn offsets(&self) -> &ArrayRef {
297        self.as_ref().slots()[OFFSETS_SLOT]
298            .as_ref()
299            .vortex_expect("VarBinArray offsets slot")
300    }
301
302    fn validity_child(&self) -> Option<&ArrayRef> {
303        self.as_ref().slots()[VALIDITY_SLOT].as_ref()
304    }
305
306    fn dtype_parts(&self) -> (bool, Nullability) {
307        match self.as_ref().dtype() {
308            DType::Utf8(nullability) => (true, *nullability),
309            DType::Binary(nullability) => (false, *nullability),
310            _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
311        }
312    }
313
314    fn is_utf8(&self) -> bool {
315        self.dtype_parts().0
316    }
317
318    fn nullability(&self) -> Nullability {
319        self.dtype_parts().1
320    }
321
322    fn varbin_validity(&self) -> Validity {
323        child_to_validity(
324            self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
325            self.nullability(),
326        )
327    }
328
329    fn offset_at(&self, index: usize) -> usize {
330        assert!(
331            index <= self.as_ref().len(),
332            "Index {index} out of bounds 0..={}",
333            self.as_ref().len()
334        );
335
336        (&self
337            .offsets()
338            .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
339            .vortex_expect("offsets must support execute_scalar"))
340            .try_into()
341            .vortex_expect("Failed to convert offset to usize")
342    }
343
344    fn bytes_at(&self, index: usize) -> ByteBuffer {
345        let start = self.offset_at(index);
346        let end = self.offset_at(index + 1);
347        self.bytes().slice(start..end)
348    }
349
350    fn sliced_bytes(&self) -> ByteBuffer {
351        let first_offset: usize = self.offset_at(0);
352        let last_offset = self.offset_at(self.as_ref().len());
353        self.bytes().slice(first_offset..last_offset)
354    }
355}
356impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
357
358/// Forwarding constructors for `VarBinArray` (= `Array<VarBin>`).
359impl Array<VarBin> {
360    pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
361        let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
362        if size < u32::MAX as usize {
363            Self::from_vec_sized::<u32, T>(vec, dtype)
364        } else {
365            Self::from_vec_sized::<u64, T>(vec, dtype)
366        }
367    }
368
369    #[expect(
370        clippy::same_name_method,
371        reason = "intentionally named from_iter like Iterator::from_iter"
372    )]
373    pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
374        iter: I,
375        dtype: DType,
376    ) -> Self {
377        let iter = iter.into_iter();
378        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
379        for v in iter {
380            builder.append(v.as_ref().map(|o| o.as_ref()));
381        }
382        builder.finish(dtype)
383    }
384
385    pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
386        iter: I,
387        dtype: DType,
388    ) -> Self {
389        let iter = iter.into_iter();
390        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
391        for v in iter {
392            builder.append_value(v);
393        }
394        builder.finish(dtype)
395    }
396
397    fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
398    where
399        O: IntegerPType,
400        T: AsRef<[u8]>,
401    {
402        let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
403        for v in vec {
404            builder.append_value(v.as_ref());
405        }
406        builder.finish(dtype)
407    }
408
409    /// Create from a vector of string slices.
410    pub fn from_strs(value: Vec<&str>) -> Self {
411        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
412    }
413
414    /// Create from a vector of optional string slices.
415    pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
416        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
417    }
418
419    /// Create from a vector of byte slices.
420    pub fn from_bytes(value: Vec<&[u8]>) -> Self {
421        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
422    }
423
424    /// Create from a vector of optional byte slices.
425    pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
426        Self::from_iter(value, DType::Binary(Nullability::Nullable))
427    }
428
429    pub fn into_data_parts(self) -> VarBinDataParts {
430        let dtype = self.dtype().clone();
431        let validity = self.varbin_validity();
432        let offsets = self.offsets().clone();
433        let data = self.into_data();
434        VarBinDataParts {
435            dtype,
436            bytes: data.bytes,
437            offsets,
438            validity,
439        }
440    }
441}
442
443impl Array<VarBin> {
444    /// Creates a new `VarBinArray`.
445    pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
446        let len = offsets.len().saturating_sub(1);
447        let slots = VarBinData::make_slots(offsets, &validity, len);
448        let data = VarBinData::build(
449            slots[OFFSETS_SLOT]
450                .as_ref()
451                .vortex_expect("VarBinArray offsets slot")
452                .clone(),
453            bytes,
454            dtype.clone(),
455            validity,
456        );
457        unsafe {
458            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
459        }
460    }
461
462    /// Creates a new `VarBinArray` without validation.
463    ///
464    /// # Safety
465    ///
466    /// See [`VarBinData::new_unchecked`].
467    pub unsafe fn new_unchecked(
468        offsets: ArrayRef,
469        bytes: ByteBuffer,
470        dtype: DType,
471        validity: Validity,
472    ) -> Self {
473        let len = offsets.len().saturating_sub(1);
474        let slots = VarBinData::make_slots(offsets, &validity, len);
475        let data = unsafe { VarBinData::new_unchecked(bytes) };
476        unsafe {
477            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
478        }
479    }
480
481    /// Creates a new `VarBinArray` without validation from a [`BufferHandle`].
482    ///
483    /// # Safety
484    ///
485    /// See [`VarBinData::new_unchecked_from_handle`].
486    pub unsafe fn new_unchecked_from_handle(
487        offsets: ArrayRef,
488        bytes: BufferHandle,
489        dtype: DType,
490        validity: Validity,
491    ) -> Self {
492        let len = offsets.len().saturating_sub(1);
493        let slots = VarBinData::make_slots(offsets, &validity, len);
494        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
495        unsafe {
496            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
497        }
498    }
499
500    /// Constructs a new `VarBinArray`.
501    pub fn try_new(
502        offsets: ArrayRef,
503        bytes: ByteBuffer,
504        dtype: DType,
505        validity: Validity,
506    ) -> VortexResult<Self> {
507        let len = offsets.len() - 1;
508        let bytes = BufferHandle::new_host(bytes);
509        VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
510        let slots = VarBinData::make_slots(offsets, &validity, len);
511        // SAFETY: validate ensures all invariants are met.
512        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
513        Ok(unsafe {
514            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
515        })
516    }
517}
518
519impl From<Vec<&[u8]>> for Array<VarBin> {
520    fn from(value: Vec<&[u8]>) -> Self {
521        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
522    }
523}
524
525impl From<Vec<Vec<u8>>> for Array<VarBin> {
526    fn from(value: Vec<Vec<u8>>) -> Self {
527        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
528    }
529}
530
531impl From<Vec<String>> for Array<VarBin> {
532    fn from(value: Vec<String>) -> Self {
533        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
534    }
535}
536
537impl From<Vec<&str>> for Array<VarBin> {
538    fn from(value: Vec<&str>) -> Self {
539        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
540    }
541}
542
543impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
544    fn from(value: Vec<Option<&[u8]>>) -> Self {
545        Self::from_iter(value, DType::Binary(Nullability::Nullable))
546    }
547}
548
549impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
550    fn from(value: Vec<Option<Vec<u8>>>) -> Self {
551        Self::from_iter(value, DType::Binary(Nullability::Nullable))
552    }
553}
554
555impl From<Vec<Option<String>>> for Array<VarBin> {
556    fn from(value: Vec<Option<String>>) -> Self {
557        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
558    }
559}
560
561impl From<Vec<Option<&str>>> for Array<VarBin> {
562    fn from(value: Vec<Option<&str>>) -> Self {
563        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
564    }
565}
566
567impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
568    fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
569        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
570    }
571}
572
573impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
574    fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
575        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
576    }
577}
578
579impl FromIterator<Option<String>> for Array<VarBin> {
580    fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
581        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
582    }
583}
584
585impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
586    fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
587        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
588    }
589}