vortex_array/arrays/varbin/
mod.rs1use std::fmt::Debug;
5
6pub(crate) use compute::compute_min_max;
7use num_traits::{AsPrimitive, PrimInt};
8use vortex_buffer::ByteBuffer;
9use vortex_dtype::{DType, NativePType, Nullability, match_each_integer_ptype};
10use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap as _, vortex_ensure, vortex_err};
11use vortex_scalar::Scalar;
12
13use crate::arrays::varbin::builder::VarBinBuilder;
14use crate::stats::{ArrayStats, StatsSetRef};
15use crate::validity::Validity;
16use crate::vtable::{
17 ArrayVTable, NotSupported, VTable, ValidityHelper, ValidityVTableFromValidityHelper,
18};
19use crate::{Array, ArrayRef, EncodingId, EncodingRef, ToCanonical, vtable};
20
21mod accessor;
22pub mod builder;
23mod canonical;
24mod compute;
25mod operator;
26mod ops;
27mod serde;
28
29vtable!(VarBin);
30
31impl VTable for VarBinVTable {
32 type Array = VarBinArray;
33 type Encoding = VarBinEncoding;
34 type ArrayVTable = Self;
35 type CanonicalVTable = Self;
36 type OperationsVTable = Self;
37 type ValidityVTable = ValidityVTableFromValidityHelper;
38 type VisitorVTable = Self;
39 type ComputeVTable = NotSupported;
40 type EncodeVTable = NotSupported;
41 type PipelineVTable = NotSupported;
42 type SerdeVTable = Self;
43
44 fn id(_encoding: &Self::Encoding) -> EncodingId {
45 EncodingId::new_ref("vortex.varbin")
46 }
47
48 fn encoding(_array: &Self::Array) -> EncodingRef {
49 EncodingRef::new_ref(VarBinEncoding.as_ref())
50 }
51}
52
53#[derive(Clone, Debug)]
54pub struct VarBinArray {
55 dtype: DType,
56 bytes: ByteBuffer,
57 offsets: ArrayRef,
58 validity: Validity,
59 stats_set: ArrayStats,
60}
61
62#[derive(Clone, Debug)]
63pub struct VarBinEncoding;
64
65impl VarBinArray {
66 pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
73 Self::try_new(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
74 }
75
76 pub fn try_new(
85 offsets: ArrayRef,
86 bytes: ByteBuffer,
87 dtype: DType,
88 validity: Validity,
89 ) -> VortexResult<Self> {
90 Self::validate(&offsets, &bytes, &dtype, &validity)?;
91
92 Ok(unsafe { Self::new_unchecked(offsets, bytes, dtype, validity) })
94 }
95
96 pub unsafe fn new_unchecked(
125 offsets: ArrayRef,
126 bytes: ByteBuffer,
127 dtype: DType,
128 validity: Validity,
129 ) -> Self {
130 Self {
131 dtype,
132 bytes,
133 offsets,
134 validity,
135 stats_set: Default::default(),
136 }
137 }
138
139 pub(crate) fn validate(
143 offsets: &dyn Array,
144 bytes: &ByteBuffer,
145 dtype: &DType,
146 validity: &Validity,
147 ) -> VortexResult<()> {
148 vortex_ensure!(
150 offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
151 MismatchedTypes: "non nullable int", offsets.dtype()
152 );
153
154 vortex_ensure!(
156 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
157 MismatchedTypes: "utf8 or binary", dtype
158 );
159
160 vortex_ensure!(
162 dtype.is_nullable() != (validity == &Validity::NonNullable),
163 "incorrect validity {:?} for dtype {}",
164 validity,
165 dtype
166 );
167
168 vortex_ensure!(
170 !offsets.is_empty(),
171 "Offsets must have at least one element"
172 );
173
174 if let Some(is_sorted) = offsets.statistics().compute_is_sorted() {
176 vortex_ensure!(is_sorted, "offsets must be sorted");
177 }
178
179 let last_offset = offsets
180 .scalar_at(offsets.len() - 1)
181 .as_primitive()
182 .as_::<usize>()
183 .ok_or_else(|| vortex_err!("Last offset must be convertible to usize"))?;
184 vortex_ensure!(
185 last_offset <= bytes.len(),
186 "Last offset {} exceeds bytes length {}",
187 last_offset,
188 bytes.len()
189 );
190
191 if let Some(validity_len) = validity.maybe_len() {
193 vortex_ensure!(
194 validity_len == offsets.len() - 1,
195 "Validity length {} doesn't match array length {}",
196 validity_len,
197 offsets.len() - 1
198 );
199 }
200
201 if matches!(dtype, DType::Utf8(_)) {
203 let primitive_offsets = offsets.to_primitive();
204 match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
205 let offsets_slice = primitive_offsets.as_slice::<O>();
206 for (i, (start, end)) in offsets_slice
207 .windows(2)
208 .map(|o| (o[0].as_(), o[1].as_()))
209 .enumerate()
210 {
211 if validity.is_null(i) {
212 continue;
213 }
214
215 let string_bytes = &bytes.as_ref()[start..end];
216 simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
217 #[allow(clippy::unwrap_used)]
218 let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
220 vortex_err!("invalid utf-8: {err} at index {i}")
221 })?;
222 }
223 });
224 }
225
226 Ok(())
227 }
228
229 #[inline]
230 pub fn offsets(&self) -> &ArrayRef {
231 &self.offsets
232 }
233
234 #[inline]
242 pub fn bytes(&self) -> &ByteBuffer {
243 &self.bytes
244 }
245
246 pub fn sliced_bytes(&self) -> ByteBuffer {
249 let first_offset: usize = self.offset_at(0);
250 let last_offset = self.offset_at(self.len());
251
252 self.bytes().slice(first_offset..last_offset)
253 }
254
255 pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
256 let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
257 if size < u32::MAX as usize {
258 Self::from_vec_sized::<u32, T>(vec, dtype)
259 } else {
260 Self::from_vec_sized::<u64, T>(vec, dtype)
261 }
262 }
263
264 fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
265 where
266 O: NativePType + PrimInt,
267 T: AsRef<[u8]>,
268 {
269 let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
270 for v in vec {
271 builder.append_value(v.as_ref());
272 }
273 builder.finish(dtype)
274 }
275
276 #[allow(clippy::same_name_method)]
277 pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
278 iter: I,
279 dtype: DType,
280 ) -> Self {
281 let iter = iter.into_iter();
282 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
283 for v in iter {
284 builder.append(v.as_ref().map(|o| o.as_ref()));
285 }
286 builder.finish(dtype)
287 }
288
289 pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
290 iter: I,
291 dtype: DType,
292 ) -> Self {
293 let iter = iter.into_iter();
294 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
295 for v in iter {
296 builder.append_value(v);
297 }
298 builder.finish(dtype)
299 }
300
301 pub fn offset_at(&self, index: usize) -> usize {
307 assert!(
308 index <= self.len(),
309 "Index {index} out of bounds 0..={}",
310 self.len()
311 );
312
313 self.offsets()
314 .scalar_at(index)
315 .as_ref()
316 .try_into()
317 .vortex_expect("Failed to convert offset to usize")
318 }
319
320 pub fn bytes_at(&self, index: usize) -> ByteBuffer {
324 let start = self.offset_at(index);
325 let end = self.offset_at(index + 1);
326
327 self.bytes().slice(start..end)
328 }
329
330 pub fn into_parts(self) -> (DType, ByteBuffer, ArrayRef, Validity) {
333 (self.dtype, self.bytes, self.offsets, self.validity)
334 }
335}
336
337impl ValidityHelper for VarBinArray {
338 fn validity(&self) -> &Validity {
339 &self.validity
340 }
341}
342
343impl ArrayVTable<VarBinVTable> for VarBinVTable {
344 fn len(array: &VarBinArray) -> usize {
345 array.offsets().len().saturating_sub(1)
346 }
347
348 fn dtype(array: &VarBinArray) -> &DType {
349 &array.dtype
350 }
351
352 fn stats(array: &VarBinArray) -> StatsSetRef<'_> {
353 array.stats_set.to_ref(array.as_ref())
354 }
355}
356
357impl From<Vec<&[u8]>> for VarBinArray {
358 fn from(value: Vec<&[u8]>) -> Self {
359 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
360 }
361}
362
363impl From<Vec<Vec<u8>>> for VarBinArray {
364 fn from(value: Vec<Vec<u8>>) -> Self {
365 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
366 }
367}
368
369impl From<Vec<String>> for VarBinArray {
370 fn from(value: Vec<String>) -> Self {
371 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
372 }
373}
374
375impl From<Vec<&str>> for VarBinArray {
376 fn from(value: Vec<&str>) -> Self {
377 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
378 }
379}
380
381impl<'a> FromIterator<Option<&'a [u8]>> for VarBinArray {
382 fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
383 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
384 }
385}
386
387impl FromIterator<Option<Vec<u8>>> for VarBinArray {
388 fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
389 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
390 }
391}
392
393impl FromIterator<Option<String>> for VarBinArray {
394 fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
395 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
396 }
397}
398
399impl<'a> FromIterator<Option<&'a str>> for VarBinArray {
400 fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
401 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
402 }
403}
404
405pub fn varbin_scalar(value: ByteBuffer, dtype: &DType) -> Scalar {
406 if matches!(dtype, DType::Utf8(_)) {
407 Scalar::try_utf8(value, dtype.nullability())
408 .map_err(|err| vortex_err!("Failed to create scalar from utf8 buffer: {}", err))
409 .vortex_unwrap()
410 } else {
411 Scalar::binary(value, dtype.nullability())
412 }
413}
414
415#[cfg(test)]
416mod test {
417 use rstest::{fixture, rstest};
418 use vortex_buffer::{Buffer, buffer};
419 use vortex_dtype::{DType, Nullability};
420
421 use crate::arrays::varbin::VarBinArray;
422 use crate::validity::Validity;
423 use crate::{Array, ArrayRef, IntoArray};
424
425 #[fixture]
426 fn binary_array() -> ArrayRef {
427 let values = Buffer::copy_from("hello worldhello world this is a long string".as_bytes());
428 let offsets = buffer![0, 11, 44].into_array();
429
430 VarBinArray::try_new(
431 offsets.into_array(),
432 values,
433 DType::Utf8(Nullability::NonNullable),
434 Validity::NonNullable,
435 )
436 .unwrap()
437 .into_array()
438 }
439
440 #[rstest]
441 pub fn test_scalar_at(binary_array: ArrayRef) {
442 assert_eq!(binary_array.len(), 2);
443 assert_eq!(binary_array.scalar_at(0), "hello world".into());
444 assert_eq!(
445 binary_array.scalar_at(1),
446 "hello world this is a long string".into()
447 )
448 }
449
450 #[rstest]
451 pub fn slice_array(binary_array: ArrayRef) {
452 let binary_arr = binary_array.slice(1..2);
453 assert_eq!(
454 binary_arr.scalar_at(0),
455 "hello world this is a long string".into()
456 );
457 }
458}