vortex_array/arrays/varbin/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5
6pub(crate) use compute::compute_min_max;
7use num_traits::{AsPrimitive, PrimInt};
8use vortex_buffer::ByteBuffer;
9use vortex_dtype::{DType, NativePType, Nullability, match_each_integer_ptype};
10use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap as _, vortex_ensure, vortex_err};
11use vortex_scalar::Scalar;
12
13use crate::arrays::varbin::builder::VarBinBuilder;
14use crate::stats::{ArrayStats, StatsSetRef};
15use crate::validity::Validity;
16use crate::vtable::{
17    ArrayVTable, NotSupported, VTable, ValidityHelper, ValidityVTableFromValidityHelper,
18};
19use crate::{Array, ArrayRef, EncodingId, EncodingRef, ToCanonical, vtable};
20
21mod accessor;
22pub mod builder;
23mod canonical;
24mod compute;
25mod operator;
26mod ops;
27mod serde;
28
29vtable!(VarBin);
30
31impl VTable for VarBinVTable {
32    type Array = VarBinArray;
33    type Encoding = VarBinEncoding;
34    type ArrayVTable = Self;
35    type CanonicalVTable = Self;
36    type OperationsVTable = Self;
37    type ValidityVTable = ValidityVTableFromValidityHelper;
38    type VisitorVTable = Self;
39    type ComputeVTable = NotSupported;
40    type EncodeVTable = NotSupported;
41    type PipelineVTable = NotSupported;
42    type SerdeVTable = Self;
43
44    fn id(_encoding: &Self::Encoding) -> EncodingId {
45        EncodingId::new_ref("vortex.varbin")
46    }
47
48    fn encoding(_array: &Self::Array) -> EncodingRef {
49        EncodingRef::new_ref(VarBinEncoding.as_ref())
50    }
51}
52
53#[derive(Clone, Debug)]
54pub struct VarBinArray {
55    dtype: DType,
56    bytes: ByteBuffer,
57    offsets: ArrayRef,
58    validity: Validity,
59    stats_set: ArrayStats,
60}
61
62#[derive(Clone, Debug)]
63pub struct VarBinEncoding;
64
65impl VarBinArray {
66    /// Creates a new [`VarBinArray`].
67    ///
68    /// # Panics
69    ///
70    /// Panics if the provided components do not satisfy the invariants documented
71    /// in [`VarBinArray::new_unchecked`].
72    pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
73        Self::try_new(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
74    }
75
76    /// Constructs a new `VarBinArray`.
77    ///
78    /// See [`VarBinArray::new_unchecked`] for more information.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the provided components do not satisfy the invariants documented in
83    /// [`VarBinArray::new_unchecked`].
84    pub fn try_new(
85        offsets: ArrayRef,
86        bytes: ByteBuffer,
87        dtype: DType,
88        validity: Validity,
89    ) -> VortexResult<Self> {
90        Self::validate(&offsets, &bytes, &dtype, &validity)?;
91
92        // SAFETY: validate ensures all invariants are met.
93        Ok(unsafe { Self::new_unchecked(offsets, bytes, dtype, validity) })
94    }
95
96    /// Creates a new [`VarBinArray`] without validation from these components:
97    ///
98    /// * `offsets` is an array of byte offsets into the `bytes` buffer.
99    /// * `bytes` is a buffer containing all the variable-length data concatenated.
100    /// * `dtype` specifies whether this contains UTF-8 strings or binary data.
101    /// * `validity` holds the null values.
102    ///
103    /// # Safety
104    ///
105    /// The caller must ensure all of the following invariants are satisfied:
106    ///
107    /// ## Offsets Requirements
108    ///
109    /// - `offsets` must be a non-nullable integer array.
110    /// - `offsets` must contain at least 1 element (for empty array, it contains \[0\]).
111    /// - All values in `offsets` must be monotonically non-decreasing.
112    /// - The first value in `offsets` must be 0.
113    /// - No offset value may exceed `bytes.len()`.
114    ///
115    /// ## Type Requirements
116    ///
117    /// - `dtype` must be exactly [`DType::Binary`] or [`DType::Utf8`].
118    /// - If `dtype` is [`DType::Utf8`], every byte slice `bytes[offsets[i]..offsets[i+1]]` must be valid UTF-8.
119    /// - `dtype.is_nullable()` must match the nullability of `validity`.
120    ///
121    /// ## Validity Requirements
122    ///
123    /// - If `validity` is [`Validity::Array`], its length must exactly equal `offsets.len() - 1`.
124    pub unsafe fn new_unchecked(
125        offsets: ArrayRef,
126        bytes: ByteBuffer,
127        dtype: DType,
128        validity: Validity,
129    ) -> Self {
130        Self {
131            dtype,
132            bytes,
133            offsets,
134            validity,
135            stats_set: Default::default(),
136        }
137    }
138
139    /// Validates the components that would be used to create a [`VarBinArray`].
140    ///
141    /// This function checks all the invariants required by [`VarBinArray::new_unchecked`].
142    pub(crate) fn validate(
143        offsets: &dyn Array,
144        bytes: &ByteBuffer,
145        dtype: &DType,
146        validity: &Validity,
147    ) -> VortexResult<()> {
148        // Check offsets are non-nullable integer
149        vortex_ensure!(
150            offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
151            MismatchedTypes: "non nullable int", offsets.dtype()
152        );
153
154        // Check dtype is Binary or Utf8
155        vortex_ensure!(
156            matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
157            MismatchedTypes: "utf8 or binary", dtype
158        );
159
160        // Check nullability matches
161        vortex_ensure!(
162            dtype.is_nullable() != (validity == &Validity::NonNullable),
163            "incorrect validity {:?} for dtype {}",
164            validity,
165            dtype
166        );
167
168        // Check offsets has at least one element
169        vortex_ensure!(
170            !offsets.is_empty(),
171            "Offsets must have at least one element"
172        );
173
174        // Check offsets are sorted
175        if let Some(is_sorted) = offsets.statistics().compute_is_sorted() {
176            vortex_ensure!(is_sorted, "offsets must be sorted");
177        }
178
179        let last_offset = offsets
180            .scalar_at(offsets.len() - 1)
181            .as_primitive()
182            .as_::<usize>()
183            .ok_or_else(|| vortex_err!("Last offset must be convertible to usize"))?;
184        vortex_ensure!(
185            last_offset <= bytes.len(),
186            "Last offset {} exceeds bytes length {}",
187            last_offset,
188            bytes.len()
189        );
190
191        // Check validity length
192        if let Some(validity_len) = validity.maybe_len() {
193            vortex_ensure!(
194                validity_len == offsets.len() - 1,
195                "Validity length {} doesn't match array length {}",
196                validity_len,
197                offsets.len() - 1
198            );
199        }
200
201        // Validate UTF-8 for Utf8 dtype
202        if matches!(dtype, DType::Utf8(_)) {
203            let primitive_offsets = offsets.to_primitive();
204            match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
205                let offsets_slice = primitive_offsets.as_slice::<O>();
206                for (i, (start, end)) in offsets_slice
207                    .windows(2)
208                    .map(|o| (o[0].as_(), o[1].as_()))
209                    .enumerate()
210                {
211                    if validity.is_null(i) {
212                        continue;
213                    }
214
215                    let string_bytes = &bytes.as_ref()[start..end];
216                    simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
217                        #[allow(clippy::unwrap_used)]
218                        // run validation using `compat` package to get more detailed error message
219                        let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
220                        vortex_err!("invalid utf-8: {err} at index {i}")
221                    })?;
222                }
223            });
224        }
225
226        Ok(())
227    }
228
229    #[inline]
230    pub fn offsets(&self) -> &ArrayRef {
231        &self.offsets
232    }
233
234    /// Access the value bytes child buffer
235    ///
236    /// # Note
237    ///
238    /// Bytes child buffer is never sliced when the array is sliced so this can include values
239    /// that are not logically present in the array. Users should prefer [sliced_bytes][Self::sliced_bytes]
240    /// unless they're resolving values via the offset child array.
241    #[inline]
242    pub fn bytes(&self) -> &ByteBuffer {
243        &self.bytes
244    }
245
246    /// Access value bytes child array limited to values that are logically present in
247    /// the array unlike [bytes][Self::bytes].
248    pub fn sliced_bytes(&self) -> ByteBuffer {
249        let first_offset: usize = self.offset_at(0);
250        let last_offset = self.offset_at(self.len());
251
252        self.bytes().slice(first_offset..last_offset)
253    }
254
255    pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
256        let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
257        if size < u32::MAX as usize {
258            Self::from_vec_sized::<u32, T>(vec, dtype)
259        } else {
260            Self::from_vec_sized::<u64, T>(vec, dtype)
261        }
262    }
263
264    fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
265    where
266        O: NativePType + PrimInt,
267        T: AsRef<[u8]>,
268    {
269        let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
270        for v in vec {
271            builder.append_value(v.as_ref());
272        }
273        builder.finish(dtype)
274    }
275
276    #[allow(clippy::same_name_method)]
277    pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
278        iter: I,
279        dtype: DType,
280    ) -> Self {
281        let iter = iter.into_iter();
282        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
283        for v in iter {
284            builder.append(v.as_ref().map(|o| o.as_ref()));
285        }
286        builder.finish(dtype)
287    }
288
289    pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
290        iter: I,
291        dtype: DType,
292    ) -> Self {
293        let iter = iter.into_iter();
294        let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
295        for v in iter {
296            builder.append_value(v);
297        }
298        builder.finish(dtype)
299    }
300
301    /// Get value offset at a given index
302    ///
303    /// Note: There's 1 more offsets than the elements in the array, thus last offset is at array length index
304    ///
305    /// Panics if index is out of bounds
306    pub fn offset_at(&self, index: usize) -> usize {
307        assert!(
308            index <= self.len(),
309            "Index {index} out of bounds 0..={}",
310            self.len()
311        );
312
313        self.offsets()
314            .scalar_at(index)
315            .as_ref()
316            .try_into()
317            .vortex_expect("Failed to convert offset to usize")
318    }
319
320    /// Access value bytes at a given index
321    ///
322    /// Will return buffer referencing underlying data without performing a copy
323    pub fn bytes_at(&self, index: usize) -> ByteBuffer {
324        let start = self.offset_at(index);
325        let end = self.offset_at(index + 1);
326
327        self.bytes().slice(start..end)
328    }
329
330    /// Consumes self, returning a tuple containing the `DType`, the `bytes` array,
331    /// the `offsets` array, and the `validity`.
332    pub fn into_parts(self) -> (DType, ByteBuffer, ArrayRef, Validity) {
333        (self.dtype, self.bytes, self.offsets, self.validity)
334    }
335}
336
337impl ValidityHelper for VarBinArray {
338    fn validity(&self) -> &Validity {
339        &self.validity
340    }
341}
342
343impl ArrayVTable<VarBinVTable> for VarBinVTable {
344    fn len(array: &VarBinArray) -> usize {
345        array.offsets().len().saturating_sub(1)
346    }
347
348    fn dtype(array: &VarBinArray) -> &DType {
349        &array.dtype
350    }
351
352    fn stats(array: &VarBinArray) -> StatsSetRef<'_> {
353        array.stats_set.to_ref(array.as_ref())
354    }
355}
356
357impl From<Vec<&[u8]>> for VarBinArray {
358    fn from(value: Vec<&[u8]>) -> Self {
359        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
360    }
361}
362
363impl From<Vec<Vec<u8>>> for VarBinArray {
364    fn from(value: Vec<Vec<u8>>) -> Self {
365        Self::from_vec(value, DType::Binary(Nullability::NonNullable))
366    }
367}
368
369impl From<Vec<String>> for VarBinArray {
370    fn from(value: Vec<String>) -> Self {
371        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
372    }
373}
374
375impl From<Vec<&str>> for VarBinArray {
376    fn from(value: Vec<&str>) -> Self {
377        Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
378    }
379}
380
381impl<'a> FromIterator<Option<&'a [u8]>> for VarBinArray {
382    fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
383        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
384    }
385}
386
387impl FromIterator<Option<Vec<u8>>> for VarBinArray {
388    fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
389        Self::from_iter(iter, DType::Binary(Nullability::Nullable))
390    }
391}
392
393impl FromIterator<Option<String>> for VarBinArray {
394    fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
395        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
396    }
397}
398
399impl<'a> FromIterator<Option<&'a str>> for VarBinArray {
400    fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
401        Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
402    }
403}
404
405pub fn varbin_scalar(value: ByteBuffer, dtype: &DType) -> Scalar {
406    if matches!(dtype, DType::Utf8(_)) {
407        Scalar::try_utf8(value, dtype.nullability())
408            .map_err(|err| vortex_err!("Failed to create scalar from utf8 buffer: {}", err))
409            .vortex_unwrap()
410    } else {
411        Scalar::binary(value, dtype.nullability())
412    }
413}
414
415#[cfg(test)]
416mod test {
417    use rstest::{fixture, rstest};
418    use vortex_buffer::{Buffer, buffer};
419    use vortex_dtype::{DType, Nullability};
420
421    use crate::arrays::varbin::VarBinArray;
422    use crate::validity::Validity;
423    use crate::{Array, ArrayRef, IntoArray};
424
425    #[fixture]
426    fn binary_array() -> ArrayRef {
427        let values = Buffer::copy_from("hello worldhello world this is a long string".as_bytes());
428        let offsets = buffer![0, 11, 44].into_array();
429
430        VarBinArray::try_new(
431            offsets.into_array(),
432            values,
433            DType::Utf8(Nullability::NonNullable),
434            Validity::NonNullable,
435        )
436        .unwrap()
437        .into_array()
438    }
439
440    #[rstest]
441    pub fn test_scalar_at(binary_array: ArrayRef) {
442        assert_eq!(binary_array.len(), 2);
443        assert_eq!(binary_array.scalar_at(0), "hello world".into());
444        assert_eq!(
445            binary_array.scalar_at(1),
446            "hello world this is a long string".into()
447        )
448    }
449
450    #[rstest]
451    pub fn slice_array(binary_array: ArrayRef) {
452        let binary_arr = binary_array.slice(1..2);
453        assert_eq!(
454            binary_arr.scalar_at(0),
455            "hello world this is a long string".into()
456        );
457    }
458}