Skip to main content

vortex_array/arrays/varbin/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use smallvec::smallvec;
9use vortex_array::arrays::PrimitiveArray;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_err;
15
16use crate::ArrayRef;
17use crate::ArraySlots;
18use crate::LEGACY_SESSION;
19use crate::VortexSessionExecute;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::array::child_to_validity;
24use crate::array::validity_to_child;
25use crate::arrays::VarBin;
26use crate::arrays::varbin::builder::VarBinBuilder;
27use crate::buffer::BufferHandle;
28use crate::dtype::DType;
29use crate::dtype::IntegerPType;
30use crate::dtype::Nullability;
31use crate::match_each_integer_ptype;
32use crate::validity::Validity;
33
34/// The offsets array defining the start/end of each variable-length binary element.
35pub(super) const OFFSETS_SLOT: usize = 0;
36/// The validity bitmap indicating which elements are non-null.
37pub(super) const VALIDITY_SLOT: usize = 1;
38pub(super) const NUM_SLOTS: usize = 2;
39pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
40
41#[derive(Clone, Debug)]
42pub struct VarBinData {
43    pub(super) bytes: BufferHandle,
44}
45
46impl Display for VarBinData {
47    fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
48        Ok(())
49    }
50}
51
52pub struct VarBinDataParts {
53    pub dtype: DType,
54    pub bytes: BufferHandle,
55    pub offsets: ArrayRef,
56    pub validity: Validity,
57}
58
59impl VarBinData {
60    /// Creates a new `VarBinArray`.
61    ///
62    /// # Panics
63    ///
64    /// Panics if the provided components do not satisfy the invariants documented
65    /// in `VarBinArray::new_unchecked`.
66    pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
67        Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
68    }
69
70    /// Creates a new `VarBinArray`.
71    ///
72    /// # Panics
73    ///
74    /// Panics if the provided components do not satisfy the invariants documented
75    /// in `VarBinArray::new_unchecked`.
76    pub fn build_from_handle(
77        offset: ArrayRef,
78        bytes: BufferHandle,
79        dtype: DType,
80        validity: Validity,
81    ) -> Self {
82        Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
83    }
84
85    pub(crate) fn make_slots(offsets: ArrayRef, validity: &Validity, len: usize) -> ArraySlots {
86        smallvec![Some(offsets), validity_to_child(validity, len)]
87    }
88
89    /// Constructs a new `VarBinArray`.
90    ///
91    /// See `VarBinArray::new_unchecked` for more information.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the provided components do not satisfy the invariants documented in
96    /// `VarBinArray::new_unchecked`.
97    pub fn try_build(
98        offsets: ArrayRef,
99        bytes: ByteBuffer,
100        dtype: DType,
101        validity: Validity,
102    ) -> VortexResult<Self> {
103        let bytes = BufferHandle::new_host(bytes);
104        Self::validate(&offsets, &bytes, &dtype, &validity)?;
105
106        // SAFETY: validate ensures all invariants are met.
107        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
108    }
109
110    /// Constructs a new `VarBinArray` from a `BufferHandle` of memory that may exist
111    /// on the CPU or GPU.
112    ///
113    /// See `VarBinArray::new_unchecked` for more information.
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if the provided components do not satisfy the invariants documented in
118    /// `VarBinArray::new_unchecked`.
119    pub fn try_build_from_handle(
120        offsets: ArrayRef,
121        bytes: BufferHandle,
122        dtype: DType,
123        validity: Validity,
124    ) -> VortexResult<Self> {
125        Self::validate(&offsets, &bytes, &dtype, &validity)?;
126
127        // SAFETY: validate ensures all invariants are met.
128        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
129    }
130
131    /// Creates a new `VarBinArray` without validation from these components:
132    ///
133    /// * `offsets` is an array of byte offsets into the `bytes` buffer.
134    /// * `bytes` is a buffer containing all the variable-length data concatenated.
135    /// * `dtype` specifies whether this contains UTF-8 strings or binary data.
136    /// * `validity` holds the null values.
137    ///
138    /// # Safety
139    ///
140    /// The caller must ensure all of the following invariants are satisfied:
141    ///
142    /// ## Offsets Requirements
143    ///
144    /// - `offsets` must be a non-nullable integer array.
145    /// - `offsets` must contain at least 1 element (for empty array, it contains \[0\]).
146    /// - All values in `offsets` must be monotonically non-decreasing.
147    /// - The first value in `offsets` must be 0.
148    /// - No offset value may exceed `bytes.len()`.
149    ///
150    /// ## Type Requirements
151    ///
152    /// - `dtype` must be exactly [`DType::Binary`] or [`DType::Utf8`].
153    /// - If `dtype` is [`DType::Utf8`], every byte slice `bytes[offsets[i]..offsets[i+1]]` must be valid UTF-8.
154    /// - `dtype.is_nullable()` must match the nullability of `validity`.
155    ///
156    /// ## Validity Requirements
157    ///
158    /// - If `validity` is [`Validity::Array`], its length must exactly equal `offsets.len() - 1`.
159    pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
160        // SAFETY: `new_unchecked_from_handle` has same invariants which should be checked
161        //  by caller.
162        unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
163    }
164
165    /// Creates a new `VarBinArray` without validation from its components, with string data
166    /// stored in a `BufferHandle` (CPU or GPU).
167    ///
168    /// # Safety
169    ///
170    /// The caller must ensure all the invariants documented in `new_unchecked` are satisfied.
171    pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
172        Self { bytes }
173    }
174
175    /// Validates the components that would be used to create a `VarBinArray`.
176    ///
177    /// This function checks all the invariants required by `VarBinArray::new_unchecked`.
178    pub fn validate(
179        offsets: &ArrayRef,
180        bytes: &BufferHandle,
181        dtype: &DType,
182        validity: &Validity,
183    ) -> VortexResult<()> {
184        // Check offsets are non-nullable integer
185        vortex_ensure!(
186            offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
187            MismatchedTypes: "non nullable int", offsets.dtype()
188        );
189
190        // Check dtype is Binary or Utf8
191        vortex_ensure!(
192            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
193            MismatchedTypes: "utf8 or binary", dtype
194        );
195
196        // Check nullability matches
197        vortex_ensure!(
198            dtype.is_nullable() != matches!(validity, Validity::NonNullable),
199            InvalidArgument: "incorrect validity {:?} for dtype {}",
200            validity,
201            dtype
202        );
203
204        // Check offsets has at least one element
205        vortex_ensure!(
206            !offsets.is_empty(),
207            InvalidArgument: "Offsets must have at least one element"
208        );
209
210        // Check validity length
211        if let Some(validity_len) = validity.maybe_len() {
212            vortex_ensure!(
213                validity_len == offsets.len() - 1,
214                "Validity length {} doesn't match array length {}",
215                validity_len,
216                offsets.len() - 1
217            );
218        }
219
220        // Validate UTF-8 for Utf8 dtype. Skip when offsets/bytes are not host-resident.
221        if offsets.is_host()
222            && bytes.is_on_host()
223            && matches!(dtype, DType::Utf8(_))
224            && let Some(bytes) = bytes.as_host_opt()
225        {
226            Self::validate_utf8(offsets, bytes.as_ref(), validity)?;
227        }
228
229        Ok(())
230    }
231
232    /// Validates that every non-null value is valid UTF-8.
233    fn validate_utf8(offsets: &ArrayRef, bytes: &[u8], validity: &Validity) -> VortexResult<()> {
234        let validate_at = |i: usize, start: usize, end: usize| -> VortexResult<()> {
235            let string_bytes = &bytes[start..end];
236            simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
237                #[expect(clippy::unwrap_used)]
238                // run validation using `compat` package to get more detailed error message
239                let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
240                vortex_err!("invalid utf-8: {err} at index {i}")
241            })?;
242            Ok(())
243        };
244
245        let mut ctx = LEGACY_SESSION.create_execution_ctx();
246        // TODO(joe): update the created VarBin with this decompressed Array.
247        let primitive_offsets = offsets.clone().execute::<PrimitiveArray>(&mut ctx)?;
248
249        // Array-backed validity is the only variant that needs an execution context: execute it into
250        // a mask once. The constant variants resolve null-ness without one. Resolving this before
251        // the per-type dispatch keeps the dtype loop simple.
252        let mask = match validity {
253            Validity::Array(_) => {
254                Some(validity.execute_mask(primitive_offsets.len().saturating_sub(1), &mut ctx)?)
255            }
256            _ => None,
257        };
258        let all_invalid = matches!(validity, Validity::AllInvalid);
259
260        match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
261            let offsets_slice = primitive_offsets.as_slice::<O>();
262
263            let last_offset: usize = offsets_slice[offsets_slice.len() - 1].as_();
264            vortex_ensure!(
265                last_offset <= bytes.len(),
266                InvalidArgument: "Last offset {} exceeds bytes length {}",
267                last_offset,
268                bytes.len()
269            );
270
271            for (i, (start, end)) in offsets_slice
272                .windows(2)
273                .map(|o| (o[0].as_(), o[1].as_()))
274                .enumerate()
275            {
276                let valid = mask.as_ref().map_or(!all_invalid, |mask| mask.value(i));
277                if valid {
278                    validate_at(i, start, end)?;
279                }
280            }
281        });
282        Ok(())
283    }
284
285    /// Access the value bytes child buffer
286    ///
287    /// # Note
288    ///
289    /// Bytes child buffer is never sliced when the array is sliced so this can include values
290    /// that are not logically present in the array. Users should prefer `sliced_bytes`
291    /// unless they're resolving values via the offset child array.
292    #[inline]
293    pub fn bytes(&self) -> &ByteBuffer {
294        self.bytes.as_host()
295    }
296
297    /// Access the value bytes buffer handle.
298    #[inline]
299    pub fn bytes_handle(&self) -> &BufferHandle {
300        &self.bytes
301    }
302}
303
304pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
305    fn offsets(&self) -> &ArrayRef {
306        self.as_ref().slots()[OFFSETS_SLOT]
307            .as_ref()
308            .vortex_expect("VarBinArray offsets slot")
309    }
310
311    fn validity_child(&self) -> Option<&ArrayRef> {
312        self.as_ref().slots()[VALIDITY_SLOT].as_ref()
313    }
314
315    fn dtype_parts(&self) -> (bool, Nullability) {
316        match self.as_ref().dtype() {
317            DType::Utf8(nullability) => (true, *nullability),
318            DType::Binary(nullability) => (false, *nullability),
319            _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
320        }
321    }
322
323    fn is_utf8(&self) -> bool {
324        self.dtype_parts().0
325    }
326
327    fn nullability(&self) -> Nullability {
328        self.dtype_parts().1
329    }
330
331    fn varbin_validity(&self) -> Validity {
332        child_to_validity(
333            self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
334            self.nullability(),
335        )
336    }
337
338    fn offset_at(&self, index: usize) -> usize {
339        assert!(
340            index <= self.as_ref().len(),
341            "Index {index} out of bounds 0..={}",
342            self.as_ref().len()
343        );
344
345        (&self
346            .offsets()
347            .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
348            .vortex_expect("offsets must support execute_scalar"))
349            .try_into()
350            .vortex_expect("Failed to convert offset to usize")
351    }
352
353    fn bytes_at(&self, index: usize) -> ByteBuffer {
354        let start = self.offset_at(index);
355        let end = self.offset_at(index + 1);
356        self.bytes().slice(start..end)
357    }
358
359    fn sliced_bytes(&self) -> ByteBuffer {
360        let first_offset: usize = self.offset_at(0);
361        let last_offset = self.offset_at(self.as_ref().len());
362        self.bytes().slice(first_offset..last_offset)
363    }
364}
365impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
366
367/// Forwarding constructors for `VarBinArray` (= `Array<VarBin>`).
368impl Array<VarBin> {
369    pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
370        let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
371        if size < u32::MAX as usize {
372            Self::from_vec_sized::<u32, T>(vec, dtype)
373        } else {
374            Self::from_vec_sized::<u64, T>(vec, dtype)
375        }
376    }
377
378    #[expect(
379        clippy::same_name_method,
380        reason = "intentionally named from_iter like Iterator::from_iter"
381    )]
382    pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
383        iter: I,
384        dtype: DType,
385    ) -> Self {
386        let iter = iter.into_iter();
387        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
388        for v in iter {
389            builder.append(v.as_ref().map(|o| o.as_ref()));
390        }
391        builder.finish(dtype)
392    }
393
394    pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
395        iter: I,
396        dtype: DType,
397    ) -> Self {
398        let iter = iter.into_iter();
399        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
400        for v in iter {
401            builder.append_value(v);
402        }
403        builder.finish(dtype)
404    }
405
406    fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
407    where
408        O: IntegerPType,
409        T: AsRef<[u8]>,
410    {
411        let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
412        for v in vec {
413            builder.append_value(v.as_ref());
414        }
415        builder.finish(dtype)
416    }
417
418    /// Create from a vector of string slices.
419    pub fn from_strs(value: Vec<&str>) -> Self {
420        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
421    }
422
423    /// Create from a vector of optional string slices.
424    pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
425        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
426    }
427
428    /// Create from a vector of byte slices.
429    pub fn from_bytes(value: Vec<&[u8]>) -> Self {
430        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
431    }
432
433    /// Create from a vector of optional byte slices.
434    pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
435        Self::from_iter(value, DType::Binary(Nullability::Nullable))
436    }
437
438    pub fn into_data_parts(self) -> VarBinDataParts {
439        let dtype = self.dtype().clone();
440        let validity = self.varbin_validity();
441        let offsets = self.offsets().clone();
442        let data = self.into_data();
443        VarBinDataParts {
444            dtype,
445            bytes: data.bytes,
446            offsets,
447            validity,
448        }
449    }
450}
451
452impl Array<VarBin> {
453    /// Creates a new `VarBinArray`.
454    pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
455        let len = offsets.len().saturating_sub(1);
456        let slots = VarBinData::make_slots(offsets, &validity, len);
457        let data = VarBinData::build(
458            slots[OFFSETS_SLOT]
459                .as_ref()
460                .vortex_expect("VarBinArray offsets slot")
461                .clone(),
462            bytes,
463            dtype.clone(),
464            validity,
465        );
466        unsafe {
467            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
468        }
469    }
470
471    /// Creates a new `VarBinArray` without validation.
472    ///
473    /// # Safety
474    ///
475    /// See [`VarBinData::new_unchecked`].
476    pub unsafe fn new_unchecked(
477        offsets: ArrayRef,
478        bytes: ByteBuffer,
479        dtype: DType,
480        validity: Validity,
481    ) -> Self {
482        let len = offsets.len().saturating_sub(1);
483        let slots = VarBinData::make_slots(offsets, &validity, len);
484        let data = unsafe { VarBinData::new_unchecked(bytes) };
485        unsafe {
486            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
487        }
488    }
489
490    /// Creates a new `VarBinArray` without validation from a [`BufferHandle`].
491    ///
492    /// # Safety
493    ///
494    /// See [`VarBinData::new_unchecked_from_handle`].
495    pub unsafe fn new_unchecked_from_handle(
496        offsets: ArrayRef,
497        bytes: BufferHandle,
498        dtype: DType,
499        validity: Validity,
500    ) -> Self {
501        let len = offsets.len().saturating_sub(1);
502        let slots = VarBinData::make_slots(offsets, &validity, len);
503        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
504        unsafe {
505            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
506        }
507    }
508
509    /// Constructs a new `VarBinArray`.
510    pub fn try_new(
511        offsets: ArrayRef,
512        bytes: ByteBuffer,
513        dtype: DType,
514        validity: Validity,
515    ) -> VortexResult<Self> {
516        let len = offsets.len() - 1;
517        let bytes = BufferHandle::new_host(bytes);
518        VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
519        let slots = VarBinData::make_slots(offsets, &validity, len);
520        // SAFETY: validate ensures all invariants are met.
521        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
522        Ok(unsafe {
523            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
524        })
525    }
526}
527
528impl From<Vec<&[u8]>> for Array<VarBin> {
529    fn from(value: Vec<&[u8]>) -> Self {
530        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
531    }
532}
533
534impl From<Vec<Vec<u8>>> for Array<VarBin> {
535    fn from(value: Vec<Vec<u8>>) -> Self {
536        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
537    }
538}
539
540impl From<Vec<String>> for Array<VarBin> {
541    fn from(value: Vec<String>) -> Self {
542        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
543    }
544}
545
546impl From<Vec<&str>> for Array<VarBin> {
547    fn from(value: Vec<&str>) -> Self {
548        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
549    }
550}
551
552impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
553    fn from(value: Vec<Option<&[u8]>>) -> Self {
554        Self::from_iter(value, DType::Binary(Nullability::Nullable))
555    }
556}
557
558impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
559    fn from(value: Vec<Option<Vec<u8>>>) -> Self {
560        Self::from_iter(value, DType::Binary(Nullability::Nullable))
561    }
562}
563
564impl From<Vec<Option<String>>> for Array<VarBin> {
565    fn from(value: Vec<Option<String>>) -> Self {
566        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
567    }
568}
569
570impl From<Vec<Option<&str>>> for Array<VarBin> {
571    fn from(value: Vec<Option<&str>>) -> Self {
572        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
573    }
574}
575
576impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
577    fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
578        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
579    }
580}
581
582impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
583    fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
584        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
585    }
586}
587
588impl FromIterator<Option<String>> for Array<VarBin> {
589    fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
590        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
591    }
592}
593
594impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
595    fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
596        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
597    }
598}