Skip to main content

vortex_array/arrays/varbin/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use vortex_buffer::ByteBuffer;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_ensure;
12use vortex_error::vortex_err;
13
14use crate::ArrayRef;
15use crate::LEGACY_SESSION;
16use crate::ToCanonical;
17use crate::VortexSessionExecute;
18use crate::array::Array;
19use crate::array::ArrayParts;
20use crate::array::TypedArrayRef;
21use crate::array::child_to_validity;
22use crate::array::validity_to_child;
23use crate::arrays::VarBin;
24use crate::arrays::varbin::builder::VarBinBuilder;
25use crate::buffer::BufferHandle;
26use crate::dtype::DType;
27use crate::dtype::IntegerPType;
28use crate::dtype::Nullability;
29use crate::match_each_integer_ptype;
30use crate::validity::Validity;
31
32/// The offsets array defining the start/end of each variable-length binary element.
33pub(super) const OFFSETS_SLOT: usize = 0;
34/// The validity bitmap indicating which elements are non-null.
35pub(super) const VALIDITY_SLOT: usize = 1;
36pub(super) const NUM_SLOTS: usize = 2;
37pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
38
39#[derive(Clone, Debug)]
40pub struct VarBinData {
41    pub(super) bytes: BufferHandle,
42}
43
44impl Display for VarBinData {
45    fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
46        Ok(())
47    }
48}
49
50pub struct VarBinDataParts {
51    pub dtype: DType,
52    pub bytes: BufferHandle,
53    pub offsets: ArrayRef,
54    pub validity: Validity,
55}
56
57impl VarBinData {
58    /// Creates a new `VarBinArray`.
59    ///
60    /// # Panics
61    ///
62    /// Panics if the provided components do not satisfy the invariants documented
63    /// in `VarBinArray::new_unchecked`.
64    pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
65        Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
66    }
67
68    /// Creates a new `VarBinArray`.
69    ///
70    /// # Panics
71    ///
72    /// Panics if the provided components do not satisfy the invariants documented
73    /// in `VarBinArray::new_unchecked`.
74    pub fn build_from_handle(
75        offset: ArrayRef,
76        bytes: BufferHandle,
77        dtype: DType,
78        validity: Validity,
79    ) -> Self {
80        Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
81    }
82
83    pub(crate) fn make_slots(
84        offsets: ArrayRef,
85        validity: &Validity,
86        len: usize,
87    ) -> Vec<Option<ArrayRef>> {
88        vec![Some(offsets), validity_to_child(validity, len)]
89    }
90
91    /// Constructs a new `VarBinArray`.
92    ///
93    /// See `VarBinArray::new_unchecked` for more information.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the provided components do not satisfy the invariants documented in
98    /// `VarBinArray::new_unchecked`.
99    pub fn try_build(
100        offsets: ArrayRef,
101        bytes: ByteBuffer,
102        dtype: DType,
103        validity: Validity,
104    ) -> VortexResult<Self> {
105        let bytes = BufferHandle::new_host(bytes);
106        Self::validate(&offsets, &bytes, &dtype, &validity)?;
107
108        // SAFETY: validate ensures all invariants are met.
109        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
110    }
111
112    /// Constructs a new `VarBinArray` from a `BufferHandle` of memory that may exist
113    /// on the CPU or GPU.
114    ///
115    /// See `VarBinArray::new_unchecked` for more information.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the provided components do not satisfy the invariants documented in
120    /// `VarBinArray::new_unchecked`.
121    pub fn try_build_from_handle(
122        offsets: ArrayRef,
123        bytes: BufferHandle,
124        dtype: DType,
125        validity: Validity,
126    ) -> VortexResult<Self> {
127        Self::validate(&offsets, &bytes, &dtype, &validity)?;
128
129        // SAFETY: validate ensures all invariants are met.
130        Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
131    }
132
133    /// Creates a new `VarBinArray` without validation from these components:
134    ///
135    /// * `offsets` is an array of byte offsets into the `bytes` buffer.
136    /// * `bytes` is a buffer containing all the variable-length data concatenated.
137    /// * `dtype` specifies whether this contains UTF-8 strings or binary data.
138    /// * `validity` holds the null values.
139    ///
140    /// # Safety
141    ///
142    /// The caller must ensure all of the following invariants are satisfied:
143    ///
144    /// ## Offsets Requirements
145    ///
146    /// - `offsets` must be a non-nullable integer array.
147    /// - `offsets` must contain at least 1 element (for empty array, it contains \[0\]).
148    /// - All values in `offsets` must be monotonically non-decreasing.
149    /// - The first value in `offsets` must be 0.
150    /// - No offset value may exceed `bytes.len()`.
151    ///
152    /// ## Type Requirements
153    ///
154    /// - `dtype` must be exactly [`DType::Binary`] or [`DType::Utf8`].
155    /// - If `dtype` is [`DType::Utf8`], every byte slice `bytes[offsets[i]..offsets[i+1]]` must be valid UTF-8.
156    /// - `dtype.is_nullable()` must match the nullability of `validity`.
157    ///
158    /// ## Validity Requirements
159    ///
160    /// - If `validity` is [`Validity::Array`], its length must exactly equal `offsets.len() - 1`.
161    pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
162        // SAFETY: `new_unchecked_from_handle` has same invariants which should be checked
163        //  by caller.
164        unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
165    }
166
167    /// Creates a new `VarBinArray` without validation from its components, with string data
168    /// stored in a `BufferHandle` (CPU or GPU).
169    ///
170    /// # Safety
171    ///
172    /// The caller must ensure all the invariants documented in `new_unchecked` are satisfied.
173    pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
174        Self { bytes }
175    }
176
177    /// Validates the components that would be used to create a `VarBinArray`.
178    ///
179    /// This function checks all the invariants required by `VarBinArray::new_unchecked`.
180    pub fn validate(
181        offsets: &ArrayRef,
182        bytes: &BufferHandle,
183        dtype: &DType,
184        validity: &Validity,
185    ) -> VortexResult<()> {
186        // Check offsets are non-nullable integer
187        vortex_ensure!(
188            offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
189            MismatchedTypes: "non nullable int", offsets.dtype()
190        );
191
192        // Check dtype is Binary or Utf8
193        vortex_ensure!(
194            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
195            MismatchedTypes: "utf8 or binary", dtype
196        );
197
198        // Check nullability matches
199        vortex_ensure!(
200            dtype.is_nullable() != matches!(validity, Validity::NonNullable),
201            InvalidArgument: "incorrect validity {:?} for dtype {}",
202            validity,
203            dtype
204        );
205
206        // Check offsets has at least one element
207        vortex_ensure!(
208            !offsets.is_empty(),
209            InvalidArgument: "Offsets must have at least one element"
210        );
211
212        // Skip host-only validation when offsets/bytes are not host-resident.
213        if offsets.is_host() && bytes.is_on_host() {
214            let last_offset = offsets
215                .execute_scalar(
216                    offsets.len() - 1,
217                    &mut LEGACY_SESSION.create_execution_ctx(),
218                )?
219                .as_primitive()
220                .as_::<usize>()
221                .ok_or_else(
222                    || vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
223                )?;
224            vortex_ensure!(
225                last_offset <= bytes.len(),
226                InvalidArgument: "Last offset {} exceeds bytes length {}",
227                last_offset,
228                bytes.len()
229            );
230        }
231
232        // Check validity length
233        if let Some(validity_len) = validity.maybe_len() {
234            vortex_ensure!(
235                validity_len == offsets.len() - 1,
236                "Validity length {} doesn't match array length {}",
237                validity_len,
238                offsets.len() - 1
239            );
240        }
241
242        // Validate UTF-8 for Utf8 dtype. Skip when offsets/bytes are not host-resident.
243        if offsets.is_host()
244            && bytes.is_on_host()
245            && matches!(dtype, DType::Utf8(_))
246            && let Some(bytes) = bytes.as_host_opt()
247        {
248            let primitive_offsets = offsets.to_primitive();
249            match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
250                let offsets_slice = primitive_offsets.as_slice::<O>();
251                for (i, (start, end)) in offsets_slice
252                    .windows(2)
253                    .map(|o| (o[0].as_(), o[1].as_()))
254                    .enumerate()
255                {
256                    if validity.is_null(i)? {
257                        continue;
258                    }
259
260                    let string_bytes = &bytes.as_ref()[start..end];
261                    simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
262                        #[expect(clippy::unwrap_used)]
263                        // run validation using `compat` package to get more detailed error message
264                        let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
265                        vortex_err!("invalid utf-8: {err} at index {i}")
266                    })?;
267                }
268            });
269        }
270
271        Ok(())
272    }
273
274    /// Access the value bytes child buffer
275    ///
276    /// # Note
277    ///
278    /// Bytes child buffer is never sliced when the array is sliced so this can include values
279    /// that are not logically present in the array. Users should prefer `sliced_bytes`
280    /// unless they're resolving values via the offset child array.
281    #[inline]
282    pub fn bytes(&self) -> &ByteBuffer {
283        self.bytes.as_host()
284    }
285
286    /// Access the value bytes buffer handle.
287    #[inline]
288    pub fn bytes_handle(&self) -> &BufferHandle {
289        &self.bytes
290    }
291}
292
293pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
294    fn offsets(&self) -> &ArrayRef {
295        self.as_ref().slots()[OFFSETS_SLOT]
296            .as_ref()
297            .vortex_expect("VarBinArray offsets slot")
298    }
299
300    fn validity_child(&self) -> Option<&ArrayRef> {
301        self.as_ref().slots()[VALIDITY_SLOT].as_ref()
302    }
303
304    fn dtype_parts(&self) -> (bool, Nullability) {
305        match self.as_ref().dtype() {
306            DType::Utf8(nullability) => (true, *nullability),
307            DType::Binary(nullability) => (false, *nullability),
308            _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
309        }
310    }
311
312    fn is_utf8(&self) -> bool {
313        self.dtype_parts().0
314    }
315
316    fn nullability(&self) -> Nullability {
317        self.dtype_parts().1
318    }
319
320    fn varbin_validity(&self) -> Validity {
321        child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
322    }
323
324    fn offset_at(&self, index: usize) -> usize {
325        assert!(
326            index <= self.as_ref().len(),
327            "Index {index} out of bounds 0..={}",
328            self.as_ref().len()
329        );
330
331        (&self
332            .offsets()
333            .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
334            .vortex_expect("offsets must support execute_scalar"))
335            .try_into()
336            .vortex_expect("Failed to convert offset to usize")
337    }
338
339    fn bytes_at(&self, index: usize) -> ByteBuffer {
340        let start = self.offset_at(index);
341        let end = self.offset_at(index + 1);
342        self.bytes().slice(start..end)
343    }
344
345    fn sliced_bytes(&self) -> ByteBuffer {
346        let first_offset: usize = self.offset_at(0);
347        let last_offset = self.offset_at(self.as_ref().len());
348        self.bytes().slice(first_offset..last_offset)
349    }
350}
351impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
352
353/// Forwarding constructors for `VarBinArray` (= `Array<VarBin>`).
354impl Array<VarBin> {
355    pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
356        let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
357        if size < u32::MAX as usize {
358            Self::from_vec_sized::<u32, T>(vec, dtype)
359        } else {
360            Self::from_vec_sized::<u64, T>(vec, dtype)
361        }
362    }
363
364    #[expect(
365        clippy::same_name_method,
366        reason = "intentionally named from_iter like Iterator::from_iter"
367    )]
368    pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
369        iter: I,
370        dtype: DType,
371    ) -> Self {
372        let iter = iter.into_iter();
373        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
374        for v in iter {
375            builder.append(v.as_ref().map(|o| o.as_ref()));
376        }
377        builder.finish(dtype)
378    }
379
380    pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
381        iter: I,
382        dtype: DType,
383    ) -> Self {
384        let iter = iter.into_iter();
385        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
386        for v in iter {
387            builder.append_value(v);
388        }
389        builder.finish(dtype)
390    }
391
392    fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
393    where
394        O: IntegerPType,
395        T: AsRef<[u8]>,
396    {
397        let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
398        for v in vec {
399            builder.append_value(v.as_ref());
400        }
401        builder.finish(dtype)
402    }
403
404    /// Create from a vector of string slices.
405    pub fn from_strs(value: Vec<&str>) -> Self {
406        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
407    }
408
409    /// Create from a vector of optional string slices.
410    pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
411        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
412    }
413
414    /// Create from a vector of byte slices.
415    pub fn from_bytes(value: Vec<&[u8]>) -> Self {
416        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
417    }
418
419    /// Create from a vector of optional byte slices.
420    pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
421        Self::from_iter(value, DType::Binary(Nullability::Nullable))
422    }
423
424    pub fn into_data_parts(self) -> VarBinDataParts {
425        let dtype = self.dtype().clone();
426        let validity = self.varbin_validity();
427        let offsets = self.offsets().clone();
428        let data = self.into_data();
429        VarBinDataParts {
430            dtype,
431            bytes: data.bytes,
432            offsets,
433            validity,
434        }
435    }
436}
437
438impl Array<VarBin> {
439    /// Creates a new `VarBinArray`.
440    pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
441        let len = offsets.len().saturating_sub(1);
442        let slots = VarBinData::make_slots(offsets, &validity, len);
443        let data = VarBinData::build(
444            slots[OFFSETS_SLOT]
445                .as_ref()
446                .vortex_expect("VarBinArray offsets slot")
447                .clone(),
448            bytes,
449            dtype.clone(),
450            validity,
451        );
452        unsafe {
453            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
454        }
455    }
456
457    /// Creates a new `VarBinArray` without validation.
458    ///
459    /// # Safety
460    ///
461    /// See [`VarBinData::new_unchecked`].
462    pub unsafe fn new_unchecked(
463        offsets: ArrayRef,
464        bytes: ByteBuffer,
465        dtype: DType,
466        validity: Validity,
467    ) -> Self {
468        let len = offsets.len().saturating_sub(1);
469        let slots = VarBinData::make_slots(offsets, &validity, len);
470        let data = unsafe { VarBinData::new_unchecked(bytes) };
471        unsafe {
472            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
473        }
474    }
475
476    /// Creates a new `VarBinArray` without validation from a [`BufferHandle`].
477    ///
478    /// # Safety
479    ///
480    /// See [`VarBinData::new_unchecked_from_handle`].
481    pub unsafe fn new_unchecked_from_handle(
482        offsets: ArrayRef,
483        bytes: BufferHandle,
484        dtype: DType,
485        validity: Validity,
486    ) -> Self {
487        let len = offsets.len().saturating_sub(1);
488        let slots = VarBinData::make_slots(offsets, &validity, len);
489        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
490        unsafe {
491            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
492        }
493    }
494
495    /// Constructs a new `VarBinArray`.
496    pub fn try_new(
497        offsets: ArrayRef,
498        bytes: ByteBuffer,
499        dtype: DType,
500        validity: Validity,
501    ) -> VortexResult<Self> {
502        let len = offsets.len() - 1;
503        let bytes = BufferHandle::new_host(bytes);
504        VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
505        let slots = VarBinData::make_slots(offsets, &validity, len);
506        // SAFETY: validate ensures all invariants are met.
507        let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
508        Ok(unsafe {
509            Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
510        })
511    }
512}
513
514impl From<Vec<&[u8]>> for Array<VarBin> {
515    fn from(value: Vec<&[u8]>) -> Self {
516        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
517    }
518}
519
520impl From<Vec<Vec<u8>>> for Array<VarBin> {
521    fn from(value: Vec<Vec<u8>>) -> Self {
522        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
523    }
524}
525
526impl From<Vec<String>> for Array<VarBin> {
527    fn from(value: Vec<String>) -> Self {
528        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
529    }
530}
531
532impl From<Vec<&str>> for Array<VarBin> {
533    fn from(value: Vec<&str>) -> Self {
534        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
535    }
536}
537
538impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
539    fn from(value: Vec<Option<&[u8]>>) -> Self {
540        Self::from_iter(value, DType::Binary(Nullability::Nullable))
541    }
542}
543
544impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
545    fn from(value: Vec<Option<Vec<u8>>>) -> Self {
546        Self::from_iter(value, DType::Binary(Nullability::Nullable))
547    }
548}
549
550impl From<Vec<Option<String>>> for Array<VarBin> {
551    fn from(value: Vec<Option<String>>) -> Self {
552        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
553    }
554}
555
556impl From<Vec<Option<&str>>> for Array<VarBin> {
557    fn from(value: Vec<Option<&str>>) -> Self {
558        Self::from_iter(value, DType::Utf8(Nullability::Nullable))
559    }
560}
561
562impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
563    fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
564        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
565    }
566}
567
568impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
569    fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
570        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
571    }
572}
573
574impl FromIterator<Option<String>> for Array<VarBin> {
575    fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
576        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
577    }
578}
579
580impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
581    fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
582        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
583    }
584}