Skip to main content

vortex_array/arrays/varbin/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use smallvec::smallvec;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_error::vortex_err;
14
15use crate::ArrayRef;
16use crate::ArraySlots;
17use crate::LEGACY_SESSION;
18#[expect(deprecated)]
19use crate::ToCanonical as _;
20use crate::VortexSessionExecute;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::array::child_to_validity;
25use crate::array::validity_to_child;
26use crate::arrays::VarBin;
27use crate::arrays::varbin::builder::VarBinBuilder;
28use crate::buffer::BufferHandle;
29use crate::dtype::DType;
30use crate::dtype::IntegerPType;
31use crate::dtype::Nullability;
32use crate::match_each_integer_ptype;
33use crate::validity::Validity;
34
35/// The offsets array defining the start/end of each variable-length binary element.
36pub(super) const OFFSETS_SLOT: usize = 0;
37/// The validity bitmap indicating which elements are non-null.
38pub(super) const VALIDITY_SLOT: usize = 1;
39pub(super) const NUM_SLOTS: usize = 2;
40pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
41
42#[derive(Clone, Debug)]
43pub struct VarBinData {
44    pub(super) bytes: BufferHandle,
45}
46
47impl Display for VarBinData {
48    fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
49        Ok(())
50    }
51}
52
53pub struct VarBinDataParts {
54    pub dtype: DType,
55    pub bytes: BufferHandle,
56    pub offsets: ArrayRef,
57    pub validity: Validity,
58}
59
60impl VarBinData {
61    /// Creates a new `VarBinArray`.
62    ///
63    /// # Panics
64    ///
65    /// Panics if the provided components do not satisfy the invariants documented
66    /// in `VarBinArray::new_unchecked`.
67    pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
68        Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
69    }
70
71    /// Creates a new `VarBinArray`.
72    ///
73    /// # Panics
74    ///
75    /// Panics if the provided components do not satisfy the invariants documented
76    /// in `VarBinArray::new_unchecked`.
77    pub fn build_from_handle(
78        offset: ArrayRef,
79        bytes: BufferHandle,
80        dtype: DType,
81        validity: Validity,
82    ) -> Self {
83        Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
84    }
85
86    pub(crate) fn make_slots(offsets: ArrayRef, validity: &Validity, len: usize) -> ArraySlots {
87        smallvec![Some(offsets), validity_to_child(validity, len)]
88    }
89
90    /// Constructs a new `VarBinArray`.
91    ///
92    /// See `VarBinArray::new_unchecked` for more information.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the provided components do not satisfy the invariants documented in
97    /// `VarBinArray::new_unchecked`.
98    pub fn try_build(
99        offsets: ArrayRef,
100        bytes: ByteBuffer,
101        dtype: DType,
102        validity: Validity,
103    ) -> VortexResult<Self> {
104        let bytes = BufferHandle::new_host(bytes);
105        Self::validate(&offsets, &bytes, &dtype, &validity)?;
106
107        // SAFETY: validate ensures all invariants are met.
108        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
109    }
110
111    /// Constructs a new `VarBinArray` from a `BufferHandle` of memory that may exist
112    /// on the CPU or GPU.
113    ///
114    /// See `VarBinArray::new_unchecked` for more information.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the provided components do not satisfy the invariants documented in
119    /// `VarBinArray::new_unchecked`.
120    pub fn try_build_from_handle(
121        offsets: ArrayRef,
122        bytes: BufferHandle,
123        dtype: DType,
124        validity: Validity,
125    ) -> VortexResult<Self> {
126        Self::validate(&offsets, &bytes, &dtype, &validity)?;
127
128        // SAFETY: validate ensures all invariants are met.
129        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
130    }
131
132    /// Creates a new `VarBinArray` without validation from these components:
133    ///
134    /// * `offsets` is an array of byte offsets into the `bytes` buffer.
135    /// * `bytes` is a buffer containing all the variable-length data concatenated.
136    /// * `dtype` specifies whether this contains UTF-8 strings or binary data.
137    /// * `validity` holds the null values.
138    ///
139    /// # Safety
140    ///
141    /// The caller must ensure all of the following invariants are satisfied:
142    ///
143    /// ## Offsets Requirements
144    ///
145    /// - `offsets` must be a non-nullable integer array.
146    /// - `offsets` must contain at least 1 element (for empty array, it contains \[0\]).
147    /// - All values in `offsets` must be monotonically non-decreasing.
148    /// - The first value in `offsets` must be 0.
149    /// - No offset value may exceed `bytes.len()`.
150    ///
151    /// ## Type Requirements
152    ///
153    /// - `dtype` must be exactly [`DType::Binary`] or [`DType::Utf8`].
154    /// - If `dtype` is [`DType::Utf8`], every byte slice `bytes[offsets[i]..offsets[i+1]]` must be valid UTF-8.
155    /// - `dtype.is_nullable()` must match the nullability of `validity`.
156    ///
157    /// ## Validity Requirements
158    ///
159    /// - If `validity` is [`Validity::Array`], its length must exactly equal `offsets.len() - 1`.
160    pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
161        // SAFETY: `new_unchecked_from_handle` has same invariants which should be checked
162        //  by caller.
163        unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
164    }
165
166    /// Creates a new `VarBinArray` without validation from its components, with string data
167    /// stored in a `BufferHandle` (CPU or GPU).
168    ///
169    /// # Safety
170    ///
171    /// The caller must ensure all the invariants documented in `new_unchecked` are satisfied.
172    pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
173        Self { bytes }
174    }
175
176    /// Validates the components that would be used to create a `VarBinArray`.
177    ///
178    /// This function checks all the invariants required by `VarBinArray::new_unchecked`.
179    pub fn validate(
180        offsets: &ArrayRef,
181        bytes: &BufferHandle,
182        dtype: &DType,
183        validity: &Validity,
184    ) -> VortexResult<()> {
185        // Check offsets are non-nullable integer
186        vortex_ensure!(
187            offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
188            MismatchedTypes: "non nullable int", offsets.dtype()
189        );
190
191        // Check dtype is Binary or Utf8
192        vortex_ensure!(
193            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
194            MismatchedTypes: "utf8 or binary", dtype
195        );
196
197        // Check nullability matches
198        vortex_ensure!(
199            dtype.is_nullable() != matches!(validity, Validity::NonNullable),
200            InvalidArgument: "incorrect validity {:?} for dtype {}",
201            validity,
202            dtype
203        );
204
205        // Check offsets has at least one element
206        vortex_ensure!(
207            !offsets.is_empty(),
208            InvalidArgument: "Offsets must have at least one element"
209        );
210
211        // Skip host-only validation when offsets/bytes are not host-resident.
212        if offsets.is_host() && bytes.is_on_host() {
213            let last_offset = offsets
214                .execute_scalar(
215                    offsets.len() - 1,
216                    &mut LEGACY_SESSION.create_execution_ctx(),
217                )?
218                .as_primitive()
219                .as_::<usize>()
220                .ok_or_else(
221                    || vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
222                )?;
223            vortex_ensure!(
224                last_offset <= bytes.len(),
225                InvalidArgument: "Last offset {} exceeds bytes length {}",
226                last_offset,
227                bytes.len()
228            );
229        }
230
231        // Check validity length
232        if let Some(validity_len) = validity.maybe_len() {
233            vortex_ensure!(
234                validity_len == offsets.len() - 1,
235                "Validity length {} doesn't match array length {}",
236                validity_len,
237                offsets.len() - 1
238            );
239        }
240
241        // Validate UTF-8 for Utf8 dtype. Skip when offsets/bytes are not host-resident.
242        if offsets.is_host()
243            && bytes.is_on_host()
244            && matches!(dtype, DType::Utf8(_))
245            && let Some(bytes) = bytes.as_host_opt()
246        {
247            #[expect(deprecated)]
248            let primitive_offsets = offsets.to_primitive();
249            match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
250                let offsets_slice = primitive_offsets.as_slice::<O>();
251                for (i, (start, end)) in offsets_slice
252                    .windows(2)
253                    .map(|o| (o[0].as_(), o[1].as_()))
254                    .enumerate()
255                {
256                    if validity.is_null(i)? {
257                        continue;
258                    }
259
260                    let string_bytes = &bytes.as_ref()[start..end];
261                    simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
262                        #[expect(clippy::unwrap_used)]
263                        // run validation using `compat` package to get more detailed error message
264                        let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
265                        vortex_err!("invalid utf-8: {err} at index {i}")
266                    })?;
267                }
268            });
269        }
270
271        Ok(())
272    }
273
274    /// Access the value bytes child buffer
275    ///
276    /// # Note
277    ///
278    /// Bytes child buffer is never sliced when the array is sliced so this can include values
279    /// that are not logically present in the array. Users should prefer `sliced_bytes`
280    /// unless they're resolving values via the offset child array.
281    #[inline]
282    pub fn bytes(&self) -> &ByteBuffer {
283        self.bytes.as_host()
284    }
285
286    /// Access the value bytes buffer handle.
287    #[inline]
288    pub fn bytes_handle(&self) -> &BufferHandle {
289        &self.bytes
290    }
291}
292
293pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
294    fn offsets(&self) -> &ArrayRef {
295        self.as_ref().slots()[OFFSETS_SLOT]
296            .as_ref()
297            .vortex_expect("VarBinArray offsets slot")
298    }
299
300    fn validity_child(&self) -> Option<&ArrayRef> {
301        self.as_ref().slots()[VALIDITY_SLOT].as_ref()
302    }
303
304    fn dtype_parts(&self) -> (bool, Nullability) {
305        match self.as_ref().dtype() {
306            DType::Utf8(nullability) => (true, *nullability),
307            DType::Binary(nullability) => (false, *nullability),
308            _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
309        }
310    }
311
312    fn is_utf8(&self) -> bool {
313        self.dtype_parts().0
314    }
315
316    fn nullability(&self) -> Nullability {
317        self.dtype_parts().1
318    }
319
320    fn varbin_validity(&self) -> Validity {
321        child_to_validity(
322            self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
323            self.nullability(),
324        )
325    }
326
327    fn offset_at(&self, index: usize) -> usize {
328        assert!(
329            index <= self.as_ref().len(),
330            "Index {index} out of bounds 0..={}",
331            self.as_ref().len()
332        );
333
334        (&self
335            .offsets()
336            .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
337            .vortex_expect("offsets must support execute_scalar"))
338            .try_into()
339            .vortex_expect("Failed to convert offset to usize")
340    }
341
342    fn bytes_at(&self, index: usize) -> ByteBuffer {
343        let start = self.offset_at(index);
344        let end = self.offset_at(index + 1);
345        self.bytes().slice(start..end)
346    }
347
348    fn sliced_bytes(&self) -> ByteBuffer {
349        let first_offset: usize = self.offset_at(0);
350        let last_offset = self.offset_at(self.as_ref().len());
351        self.bytes().slice(first_offset..last_offset)
352    }
353}
354impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
355
356/// Forwarding constructors for `VarBinArray` (= `Array<VarBin>`).
357impl Array<VarBin> {
358    pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
359        let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
360        if size < u32::MAX as usize {
361            Self::from_vec_sized::<u32, T>(vec, dtype)
362        } else {
363            Self::from_vec_sized::<u64, T>(vec, dtype)
364        }
365    }
366
367    #[expect(
368        clippy::same_name_method,
369        reason = "intentionally named from_iter like Iterator::from_iter"
370    )]
371    pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
372        iter: I,
373        dtype: DType,
374    ) -> Self {
375        let iter = iter.into_iter();
376        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
377        for v in iter {
378            builder.append(v.as_ref().map(|o| o.as_ref()));
379        }
380        builder.finish(dtype)
381    }
382
383    pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
384        iter: I,
385        dtype: DType,
386    ) -> Self {
387        let iter = iter.into_iter();
388        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
389        for v in iter {
390            builder.append_value(v);
391        }
392        builder.finish(dtype)
393    }
394
395    fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
396    where
397        O: IntegerPType,
398        T: AsRef<[u8]>,
399    {
400        let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
401        for v in vec {
402            builder.append_value(v.as_ref());
403        }
404        builder.finish(dtype)
405    }
406
407    /// Create from a vector of string slices.
408    pub fn from_strs(value: Vec<&str>) -> Self {
409        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
410    }
411
412    /// Create from a vector of optional string slices.
413    pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
414        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
415    }
416
417    /// Create from a vector of byte slices.
418    pub fn from_bytes(value: Vec<&[u8]>) -> Self {
419        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
420    }
421
422    /// Create from a vector of optional byte slices.
423    pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
424        Self::from_iter(value, DType::Binary(Nullability::Nullable))
425    }
426
427    pub fn into_data_parts(self) -> VarBinDataParts {
428        let dtype = self.dtype().clone();
429        let validity = self.varbin_validity();
430        let offsets = self.offsets().clone();
431        let data = self.into_data();
432        VarBinDataParts {
433            dtype,
434            bytes: data.bytes,
435            offsets,
436            validity,
437        }
438    }
439}
440
441impl Array<VarBin> {
442    /// Creates a new `VarBinArray`.
443    pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
444        let len = offsets.len().saturating_sub(1);
445        let slots = VarBinData::make_slots(offsets, &validity, len);
446        let data = VarBinData::build(
447            slots[OFFSETS_SLOT]
448                .as_ref()
449                .vortex_expect("VarBinArray offsets slot")
450                .clone(),
451            bytes,
452            dtype.clone(),
453            validity,
454        );
455        unsafe {
456            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
457        }
458    }
459
460    /// Creates a new `VarBinArray` without validation.
461    ///
462    /// # Safety
463    ///
464    /// See [`VarBinData::new_unchecked`].
465    pub unsafe fn new_unchecked(
466        offsets: ArrayRef,
467        bytes: ByteBuffer,
468        dtype: DType,
469        validity: Validity,
470    ) -> Self {
471        let len = offsets.len().saturating_sub(1);
472        let slots = VarBinData::make_slots(offsets, &validity, len);
473        let data = unsafe { VarBinData::new_unchecked(bytes) };
474        unsafe {
475            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
476        }
477    }
478
479    /// Creates a new `VarBinArray` without validation from a [`BufferHandle`].
480    ///
481    /// # Safety
482    ///
483    /// See [`VarBinData::new_unchecked_from_handle`].
484    pub unsafe fn new_unchecked_from_handle(
485        offsets: ArrayRef,
486        bytes: BufferHandle,
487        dtype: DType,
488        validity: Validity,
489    ) -> Self {
490        let len = offsets.len().saturating_sub(1);
491        let slots = VarBinData::make_slots(offsets, &validity, len);
492        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
493        unsafe {
494            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
495        }
496    }
497
498    /// Constructs a new `VarBinArray`.
499    pub fn try_new(
500        offsets: ArrayRef,
501        bytes: ByteBuffer,
502        dtype: DType,
503        validity: Validity,
504    ) -> VortexResult<Self> {
505        let len = offsets.len() - 1;
506        let bytes = BufferHandle::new_host(bytes);
507        VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
508        let slots = VarBinData::make_slots(offsets, &validity, len);
509        // SAFETY: validate ensures all invariants are met.
510        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
511        Ok(unsafe {
512            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
513        })
514    }
515}
516
517impl From<Vec<&[u8]>> for Array<VarBin> {
518    fn from(value: Vec<&[u8]>) -> Self {
519        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
520    }
521}
522
523impl From<Vec<Vec<u8>>> for Array<VarBin> {
524    fn from(value: Vec<Vec<u8>>) -> Self {
525        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
526    }
527}
528
529impl From<Vec<String>> for Array<VarBin> {
530    fn from(value: Vec<String>) -> Self {
531        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
532    }
533}
534
535impl From<Vec<&str>> for Array<VarBin> {
536    fn from(value: Vec<&str>) -> Self {
537        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
538    }
539}
540
541impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
542    fn from(value: Vec<Option<&[u8]>>) -> Self {
543        Self::from_iter(value, DType::Binary(Nullability::Nullable))
544    }
545}
546
547impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
548    fn from(value: Vec<Option<Vec<u8>>>) -> Self {
549        Self::from_iter(value, DType::Binary(Nullability::Nullable))
550    }
551}
552
553impl From<Vec<Option<String>>> for Array<VarBin> {
554    fn from(value: Vec<Option<String>>) -> Self {
555        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
556    }
557}
558
559impl From<Vec<Option<&str>>> for Array<VarBin> {
560    fn from(value: Vec<Option<&str>>) -> Self {
561        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
562    }
563}
564
565impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
566    fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
567        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
568    }
569}
570
571impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
572    fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
573        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
574    }
575}
576
577impl FromIterator<Option<String>> for Array<VarBin> {
578    fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
579        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
580    }
581}
582
583impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
584    fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
585        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
586    }
587}