vortex_array/arrays/varbin/
array.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use vortex_buffer::ByteBuffer;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_ensure;
12use vortex_error::vortex_err;
13use vortex_mask::Mask;
14
15use crate::ArrayRef;
16use crate::ToCanonical;
17use crate::array::Array;
18use crate::array::ArrayParts;
19use crate::array::TypedArrayRef;
20use crate::array::child_to_validity;
21use crate::array::validity_to_child;
22use crate::arrays::VarBin;
23use crate::arrays::varbin::builder::VarBinBuilder;
24use crate::buffer::BufferHandle;
25use crate::dtype::DType;
26use crate::dtype::IntegerPType;
27use crate::dtype::Nullability;
28use crate::match_each_integer_ptype;
29use crate::validity::Validity;
30
31pub(super) const OFFSETS_SLOT: usize = 0;
33pub(super) const VALIDITY_SLOT: usize = 1;
35pub(super) const NUM_SLOTS: usize = 2;
36pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
37
38#[derive(Clone, Debug)]
39pub struct VarBinData {
40 pub(super) bytes: BufferHandle,
41}
42
43impl Display for VarBinData {
44 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
45 Ok(())
46 }
47}
48
49pub struct VarBinDataParts {
50 pub dtype: DType,
51 pub bytes: BufferHandle,
52 pub offsets: ArrayRef,
53 pub validity: Validity,
54}
55
56impl VarBinData {
57 pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
64 Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
65 }
66
67 pub fn build_from_handle(
74 offset: ArrayRef,
75 bytes: BufferHandle,
76 dtype: DType,
77 validity: Validity,
78 ) -> Self {
79 Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
80 }
81
82 pub(crate) fn make_slots(
83 offsets: ArrayRef,
84 validity: &Validity,
85 len: usize,
86 ) -> Vec<Option<ArrayRef>> {
87 vec![Some(offsets), validity_to_child(validity, len)]
88 }
89
90 pub fn try_build(
99 offsets: ArrayRef,
100 bytes: ByteBuffer,
101 dtype: DType,
102 validity: Validity,
103 ) -> VortexResult<Self> {
104 let bytes = BufferHandle::new_host(bytes);
105 Self::validate(&offsets, &bytes, &dtype, &validity)?;
106
107 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
109 }
110
111 pub fn try_build_from_handle(
121 offsets: ArrayRef,
122 bytes: BufferHandle,
123 dtype: DType,
124 validity: Validity,
125 ) -> VortexResult<Self> {
126 Self::validate(&offsets, &bytes, &dtype, &validity)?;
127
128 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
130 }
131
132 pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
161 unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
164 }
165
166 pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
173 Self { bytes }
174 }
175
176 pub fn validate(
180 offsets: &ArrayRef,
181 bytes: &BufferHandle,
182 dtype: &DType,
183 validity: &Validity,
184 ) -> VortexResult<()> {
185 vortex_ensure!(
187 offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
188 MismatchedTypes: "non nullable int", offsets.dtype()
189 );
190
191 vortex_ensure!(
193 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
194 MismatchedTypes: "utf8 or binary", dtype
195 );
196
197 vortex_ensure!(
199 dtype.is_nullable() != matches!(validity, Validity::NonNullable),
200 InvalidArgument: "incorrect validity {:?} for dtype {}",
201 validity,
202 dtype
203 );
204
205 vortex_ensure!(
207 !offsets.is_empty(),
208 InvalidArgument: "Offsets must have at least one element"
209 );
210
211 if offsets.is_host() && bytes.is_on_host() {
213 let last_offset = offsets
214 .scalar_at(offsets.len() - 1)?
215 .as_primitive()
216 .as_::<usize>()
217 .ok_or_else(
218 || vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
219 )?;
220 vortex_ensure!(
221 last_offset <= bytes.len(),
222 InvalidArgument: "Last offset {} exceeds bytes length {}",
223 last_offset,
224 bytes.len()
225 );
226 }
227
228 if let Some(validity_len) = validity.maybe_len() {
230 vortex_ensure!(
231 validity_len == offsets.len() - 1,
232 "Validity length {} doesn't match array length {}",
233 validity_len,
234 offsets.len() - 1
235 );
236 }
237
238 if offsets.is_host()
240 && bytes.is_on_host()
241 && matches!(dtype, DType::Utf8(_))
242 && let Some(bytes) = bytes.as_host_opt()
243 {
244 let primitive_offsets = offsets.to_primitive();
245 match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
246 let offsets_slice = primitive_offsets.as_slice::<O>();
247 for (i, (start, end)) in offsets_slice
248 .windows(2)
249 .map(|o| (o[0].as_(), o[1].as_()))
250 .enumerate()
251 {
252 if validity.is_null(i)? {
253 continue;
254 }
255
256 let string_bytes = &bytes.as_ref()[start..end];
257 simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
258 #[allow(clippy::unwrap_used)]
259 let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
261 vortex_err!("invalid utf-8: {err} at index {i}")
262 })?;
263 }
264 });
265 }
266
267 Ok(())
268 }
269
270 #[inline]
278 pub fn bytes(&self) -> &ByteBuffer {
279 self.bytes.as_host()
280 }
281
282 #[inline]
284 pub fn bytes_handle(&self) -> &BufferHandle {
285 &self.bytes
286 }
287}
288
289pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
290 fn offsets(&self) -> &ArrayRef {
291 self.as_ref().slots()[OFFSETS_SLOT]
292 .as_ref()
293 .vortex_expect("VarBinArray offsets slot")
294 }
295
296 fn validity_child(&self) -> Option<&ArrayRef> {
297 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
298 }
299
300 fn dtype_parts(&self) -> (bool, Nullability) {
301 match self.as_ref().dtype() {
302 DType::Utf8(nullability) => (true, *nullability),
303 DType::Binary(nullability) => (false, *nullability),
304 _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
305 }
306 }
307
308 fn is_utf8(&self) -> bool {
309 self.dtype_parts().0
310 }
311
312 fn nullability(&self) -> Nullability {
313 self.dtype_parts().1
314 }
315
316 fn varbin_validity(&self) -> Validity {
317 child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
318 }
319
320 fn varbin_validity_mask(&self) -> Mask {
321 self.varbin_validity().to_mask(self.as_ref().len())
322 }
323
324 fn offset_at(&self, index: usize) -> usize {
325 assert!(
326 index <= self.as_ref().len(),
327 "Index {index} out of bounds 0..={}",
328 self.as_ref().len()
329 );
330
331 (&self
332 .offsets()
333 .scalar_at(index)
334 .vortex_expect("offsets must support scalar_at"))
335 .try_into()
336 .vortex_expect("Failed to convert offset to usize")
337 }
338
339 fn bytes_at(&self, index: usize) -> ByteBuffer {
340 let start = self.offset_at(index);
341 let end = self.offset_at(index + 1);
342 self.bytes().slice(start..end)
343 }
344
345 fn sliced_bytes(&self) -> ByteBuffer {
346 let first_offset: usize = self.offset_at(0);
347 let last_offset = self.offset_at(self.as_ref().len());
348 self.bytes().slice(first_offset..last_offset)
349 }
350}
351impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
352
353impl Array<VarBin> {
355 pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
356 let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
357 if size < u32::MAX as usize {
358 Self::from_vec_sized::<u32, T>(vec, dtype)
359 } else {
360 Self::from_vec_sized::<u64, T>(vec, dtype)
361 }
362 }
363
364 #[expect(
365 clippy::same_name_method,
366 reason = "intentionally named from_iter like Iterator::from_iter"
367 )]
368 pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
369 iter: I,
370 dtype: DType,
371 ) -> Self {
372 let iter = iter.into_iter();
373 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
374 for v in iter {
375 builder.append(v.as_ref().map(|o| o.as_ref()));
376 }
377 builder.finish(dtype)
378 }
379
380 pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
381 iter: I,
382 dtype: DType,
383 ) -> Self {
384 let iter = iter.into_iter();
385 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
386 for v in iter {
387 builder.append_value(v);
388 }
389 builder.finish(dtype)
390 }
391
392 fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
393 where
394 O: IntegerPType,
395 T: AsRef<[u8]>,
396 {
397 let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
398 for v in vec {
399 builder.append_value(v.as_ref());
400 }
401 builder.finish(dtype)
402 }
403
404 pub fn from_strs(value: Vec<&str>) -> Self {
406 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
407 }
408
409 pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
411 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
412 }
413
414 pub fn from_bytes(value: Vec<&[u8]>) -> Self {
416 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
417 }
418
419 pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
421 Self::from_iter(value, DType::Binary(Nullability::Nullable))
422 }
423
424 pub fn into_data_parts(self) -> VarBinDataParts {
425 let dtype = self.dtype().clone();
426 let validity = self.varbin_validity();
427 let offsets = self.offsets().clone();
428 let data = self.into_data();
429 VarBinDataParts {
430 dtype,
431 bytes: data.bytes,
432 offsets,
433 validity,
434 }
435 }
436}
437
438impl Array<VarBin> {
439 pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
441 let len = offsets.len().saturating_sub(1);
442 let slots = VarBinData::make_slots(offsets, &validity, len);
443 let data = VarBinData::build(
444 slots[OFFSETS_SLOT]
445 .as_ref()
446 .vortex_expect("VarBinArray offsets slot")
447 .clone(),
448 bytes,
449 dtype.clone(),
450 validity,
451 );
452 unsafe {
453 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
454 }
455 }
456
457 pub unsafe fn new_unchecked(
463 offsets: ArrayRef,
464 bytes: ByteBuffer,
465 dtype: DType,
466 validity: Validity,
467 ) -> Self {
468 let len = offsets.len().saturating_sub(1);
469 let slots = VarBinData::make_slots(offsets, &validity, len);
470 let data = unsafe { VarBinData::new_unchecked(bytes) };
471 unsafe {
472 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
473 }
474 }
475
476 pub unsafe fn new_unchecked_from_handle(
482 offsets: ArrayRef,
483 bytes: BufferHandle,
484 dtype: DType,
485 validity: Validity,
486 ) -> Self {
487 let len = offsets.len().saturating_sub(1);
488 let slots = VarBinData::make_slots(offsets, &validity, len);
489 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
490 unsafe {
491 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
492 }
493 }
494
495 pub fn try_new(
497 offsets: ArrayRef,
498 bytes: ByteBuffer,
499 dtype: DType,
500 validity: Validity,
501 ) -> VortexResult<Self> {
502 let len = offsets.len() - 1;
503 let bytes = BufferHandle::new_host(bytes);
504 VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
505 let slots = VarBinData::make_slots(offsets, &validity, len);
506 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
508 Ok(unsafe {
509 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
510 })
511 }
512}
513
514impl From<Vec<&[u8]>> for Array<VarBin> {
515 fn from(value: Vec<&[u8]>) -> Self {
516 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
517 }
518}
519
520impl From<Vec<Vec<u8>>> for Array<VarBin> {
521 fn from(value: Vec<Vec<u8>>) -> Self {
522 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
523 }
524}
525
526impl From<Vec<String>> for Array<VarBin> {
527 fn from(value: Vec<String>) -> Self {
528 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
529 }
530}
531
532impl From<Vec<&str>> for Array<VarBin> {
533 fn from(value: Vec<&str>) -> Self {
534 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
535 }
536}
537
538impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
539 fn from(value: Vec<Option<&[u8]>>) -> Self {
540 Self::from_iter(value, DType::Binary(Nullability::Nullable))
541 }
542}
543
544impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
545 fn from(value: Vec<Option<Vec<u8>>>) -> Self {
546 Self::from_iter(value, DType::Binary(Nullability::Nullable))
547 }
548}
549
550impl From<Vec<Option<String>>> for Array<VarBin> {
551 fn from(value: Vec<Option<String>>) -> Self {
552 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
553 }
554}
555
556impl From<Vec<Option<&str>>> for Array<VarBin> {
557 fn from(value: Vec<Option<&str>>) -> Self {
558 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
559 }
560}
561
562impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
563 fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
564 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
565 }
566}
567
568impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
569 fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
570 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
571 }
572}
573
574impl FromIterator<Option<String>> for Array<VarBin> {
575 fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
576 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
577 }
578}
579
580impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
581 fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
582 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
583 }
584}