vortex_array/arrays/varbin/
array.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use vortex_buffer::ByteBuffer;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_ensure;
12use vortex_error::vortex_err;
13
14use crate::ArrayRef;
15use crate::LEGACY_SESSION;
16#[expect(deprecated)]
17use crate::ToCanonical as _;
18use crate::VortexSessionExecute;
19use crate::array::Array;
20use crate::array::ArrayParts;
21use crate::array::TypedArrayRef;
22use crate::array::child_to_validity;
23use crate::array::validity_to_child;
24use crate::arrays::VarBin;
25use crate::arrays::varbin::builder::VarBinBuilder;
26use crate::buffer::BufferHandle;
27use crate::dtype::DType;
28use crate::dtype::IntegerPType;
29use crate::dtype::Nullability;
30use crate::match_each_integer_ptype;
31use crate::validity::Validity;
32
33pub(super) const OFFSETS_SLOT: usize = 0;
35pub(super) const VALIDITY_SLOT: usize = 1;
37pub(super) const NUM_SLOTS: usize = 2;
38pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
39
40#[derive(Clone, Debug)]
41pub struct VarBinData {
42 pub(super) bytes: BufferHandle,
43}
44
45impl Display for VarBinData {
46 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
47 Ok(())
48 }
49}
50
51pub struct VarBinDataParts {
52 pub dtype: DType,
53 pub bytes: BufferHandle,
54 pub offsets: ArrayRef,
55 pub validity: Validity,
56}
57
58impl VarBinData {
59 pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
66 Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
67 }
68
69 pub fn build_from_handle(
76 offset: ArrayRef,
77 bytes: BufferHandle,
78 dtype: DType,
79 validity: Validity,
80 ) -> Self {
81 Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
82 }
83
84 pub(crate) fn make_slots(
85 offsets: ArrayRef,
86 validity: &Validity,
87 len: usize,
88 ) -> Vec<Option<ArrayRef>> {
89 vec![Some(offsets), validity_to_child(validity, len)]
90 }
91
92 pub fn try_build(
101 offsets: ArrayRef,
102 bytes: ByteBuffer,
103 dtype: DType,
104 validity: Validity,
105 ) -> VortexResult<Self> {
106 let bytes = BufferHandle::new_host(bytes);
107 Self::validate(&offsets, &bytes, &dtype, &validity)?;
108
109 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
111 }
112
113 pub fn try_build_from_handle(
123 offsets: ArrayRef,
124 bytes: BufferHandle,
125 dtype: DType,
126 validity: Validity,
127 ) -> VortexResult<Self> {
128 Self::validate(&offsets, &bytes, &dtype, &validity)?;
129
130 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
132 }
133
134 pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
163 unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
166 }
167
168 pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
175 Self { bytes }
176 }
177
178 pub fn validate(
182 offsets: &ArrayRef,
183 bytes: &BufferHandle,
184 dtype: &DType,
185 validity: &Validity,
186 ) -> VortexResult<()> {
187 vortex_ensure!(
189 offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
190 MismatchedTypes: "non nullable int", offsets.dtype()
191 );
192
193 vortex_ensure!(
195 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
196 MismatchedTypes: "utf8 or binary", dtype
197 );
198
199 vortex_ensure!(
201 dtype.is_nullable() != matches!(validity, Validity::NonNullable),
202 InvalidArgument: "incorrect validity {:?} for dtype {}",
203 validity,
204 dtype
205 );
206
207 vortex_ensure!(
209 !offsets.is_empty(),
210 InvalidArgument: "Offsets must have at least one element"
211 );
212
213 if offsets.is_host() && bytes.is_on_host() {
215 let last_offset = offsets
216 .execute_scalar(
217 offsets.len() - 1,
218 &mut LEGACY_SESSION.create_execution_ctx(),
219 )?
220 .as_primitive()
221 .as_::<usize>()
222 .ok_or_else(
223 || vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
224 )?;
225 vortex_ensure!(
226 last_offset <= bytes.len(),
227 InvalidArgument: "Last offset {} exceeds bytes length {}",
228 last_offset,
229 bytes.len()
230 );
231 }
232
233 if let Some(validity_len) = validity.maybe_len() {
235 vortex_ensure!(
236 validity_len == offsets.len() - 1,
237 "Validity length {} doesn't match array length {}",
238 validity_len,
239 offsets.len() - 1
240 );
241 }
242
243 if offsets.is_host()
245 && bytes.is_on_host()
246 && matches!(dtype, DType::Utf8(_))
247 && let Some(bytes) = bytes.as_host_opt()
248 {
249 #[expect(deprecated)]
250 let primitive_offsets = offsets.to_primitive();
251 match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
252 let offsets_slice = primitive_offsets.as_slice::<O>();
253 for (i, (start, end)) in offsets_slice
254 .windows(2)
255 .map(|o| (o[0].as_(), o[1].as_()))
256 .enumerate()
257 {
258 if validity.is_null(i)? {
259 continue;
260 }
261
262 let string_bytes = &bytes.as_ref()[start..end];
263 simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
264 #[expect(clippy::unwrap_used)]
265 let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
267 vortex_err!("invalid utf-8: {err} at index {i}")
268 })?;
269 }
270 });
271 }
272
273 Ok(())
274 }
275
276 #[inline]
284 pub fn bytes(&self) -> &ByteBuffer {
285 self.bytes.as_host()
286 }
287
288 #[inline]
290 pub fn bytes_handle(&self) -> &BufferHandle {
291 &self.bytes
292 }
293}
294
295pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
296 fn offsets(&self) -> &ArrayRef {
297 self.as_ref().slots()[OFFSETS_SLOT]
298 .as_ref()
299 .vortex_expect("VarBinArray offsets slot")
300 }
301
302 fn validity_child(&self) -> Option<&ArrayRef> {
303 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
304 }
305
306 fn dtype_parts(&self) -> (bool, Nullability) {
307 match self.as_ref().dtype() {
308 DType::Utf8(nullability) => (true, *nullability),
309 DType::Binary(nullability) => (false, *nullability),
310 _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
311 }
312 }
313
314 fn is_utf8(&self) -> bool {
315 self.dtype_parts().0
316 }
317
318 fn nullability(&self) -> Nullability {
319 self.dtype_parts().1
320 }
321
322 fn varbin_validity(&self) -> Validity {
323 child_to_validity(
324 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
325 self.nullability(),
326 )
327 }
328
329 fn offset_at(&self, index: usize) -> usize {
330 assert!(
331 index <= self.as_ref().len(),
332 "Index {index} out of bounds 0..={}",
333 self.as_ref().len()
334 );
335
336 (&self
337 .offsets()
338 .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
339 .vortex_expect("offsets must support execute_scalar"))
340 .try_into()
341 .vortex_expect("Failed to convert offset to usize")
342 }
343
344 fn bytes_at(&self, index: usize) -> ByteBuffer {
345 let start = self.offset_at(index);
346 let end = self.offset_at(index + 1);
347 self.bytes().slice(start..end)
348 }
349
350 fn sliced_bytes(&self) -> ByteBuffer {
351 let first_offset: usize = self.offset_at(0);
352 let last_offset = self.offset_at(self.as_ref().len());
353 self.bytes().slice(first_offset..last_offset)
354 }
355}
356impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
357
358impl Array<VarBin> {
360 pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
361 let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
362 if size < u32::MAX as usize {
363 Self::from_vec_sized::<u32, T>(vec, dtype)
364 } else {
365 Self::from_vec_sized::<u64, T>(vec, dtype)
366 }
367 }
368
369 #[expect(
370 clippy::same_name_method,
371 reason = "intentionally named from_iter like Iterator::from_iter"
372 )]
373 pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
374 iter: I,
375 dtype: DType,
376 ) -> Self {
377 let iter = iter.into_iter();
378 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
379 for v in iter {
380 builder.append(v.as_ref().map(|o| o.as_ref()));
381 }
382 builder.finish(dtype)
383 }
384
385 pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
386 iter: I,
387 dtype: DType,
388 ) -> Self {
389 let iter = iter.into_iter();
390 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
391 for v in iter {
392 builder.append_value(v);
393 }
394 builder.finish(dtype)
395 }
396
397 fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
398 where
399 O: IntegerPType,
400 T: AsRef<[u8]>,
401 {
402 let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
403 for v in vec {
404 builder.append_value(v.as_ref());
405 }
406 builder.finish(dtype)
407 }
408
409 pub fn from_strs(value: Vec<&str>) -> Self {
411 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
412 }
413
414 pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
416 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
417 }
418
419 pub fn from_bytes(value: Vec<&[u8]>) -> Self {
421 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
422 }
423
424 pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
426 Self::from_iter(value, DType::Binary(Nullability::Nullable))
427 }
428
429 pub fn into_data_parts(self) -> VarBinDataParts {
430 let dtype = self.dtype().clone();
431 let validity = self.varbin_validity();
432 let offsets = self.offsets().clone();
433 let data = self.into_data();
434 VarBinDataParts {
435 dtype,
436 bytes: data.bytes,
437 offsets,
438 validity,
439 }
440 }
441}
442
443impl Array<VarBin> {
444 pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
446 let len = offsets.len().saturating_sub(1);
447 let slots = VarBinData::make_slots(offsets, &validity, len);
448 let data = VarBinData::build(
449 slots[OFFSETS_SLOT]
450 .as_ref()
451 .vortex_expect("VarBinArray offsets slot")
452 .clone(),
453 bytes,
454 dtype.clone(),
455 validity,
456 );
457 unsafe {
458 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
459 }
460 }
461
462 pub unsafe fn new_unchecked(
468 offsets: ArrayRef,
469 bytes: ByteBuffer,
470 dtype: DType,
471 validity: Validity,
472 ) -> Self {
473 let len = offsets.len().saturating_sub(1);
474 let slots = VarBinData::make_slots(offsets, &validity, len);
475 let data = unsafe { VarBinData::new_unchecked(bytes) };
476 unsafe {
477 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
478 }
479 }
480
481 pub unsafe fn new_unchecked_from_handle(
487 offsets: ArrayRef,
488 bytes: BufferHandle,
489 dtype: DType,
490 validity: Validity,
491 ) -> Self {
492 let len = offsets.len().saturating_sub(1);
493 let slots = VarBinData::make_slots(offsets, &validity, len);
494 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
495 unsafe {
496 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
497 }
498 }
499
500 pub fn try_new(
502 offsets: ArrayRef,
503 bytes: ByteBuffer,
504 dtype: DType,
505 validity: Validity,
506 ) -> VortexResult<Self> {
507 let len = offsets.len() - 1;
508 let bytes = BufferHandle::new_host(bytes);
509 VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
510 let slots = VarBinData::make_slots(offsets, &validity, len);
511 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
513 Ok(unsafe {
514 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
515 })
516 }
517}
518
519impl From<Vec<&[u8]>> for Array<VarBin> {
520 fn from(value: Vec<&[u8]>) -> Self {
521 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
522 }
523}
524
525impl From<Vec<Vec<u8>>> for Array<VarBin> {
526 fn from(value: Vec<Vec<u8>>) -> Self {
527 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
528 }
529}
530
531impl From<Vec<String>> for Array<VarBin> {
532 fn from(value: Vec<String>) -> Self {
533 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
534 }
535}
536
537impl From<Vec<&str>> for Array<VarBin> {
538 fn from(value: Vec<&str>) -> Self {
539 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
540 }
541}
542
543impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
544 fn from(value: Vec<Option<&[u8]>>) -> Self {
545 Self::from_iter(value, DType::Binary(Nullability::Nullable))
546 }
547}
548
549impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
550 fn from(value: Vec<Option<Vec<u8>>>) -> Self {
551 Self::from_iter(value, DType::Binary(Nullability::Nullable))
552 }
553}
554
555impl From<Vec<Option<String>>> for Array<VarBin> {
556 fn from(value: Vec<Option<String>>) -> Self {
557 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
558 }
559}
560
561impl From<Vec<Option<&str>>> for Array<VarBin> {
562 fn from(value: Vec<Option<&str>>) -> Self {
563 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
564 }
565}
566
567impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
568 fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
569 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
570 }
571}
572
573impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
574 fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
575 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
576 }
577}
578
579impl FromIterator<Option<String>> for Array<VarBin> {
580 fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
581 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
582 }
583}
584
585impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
586 fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
587 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
588 }
589}