vortex_array/arrays/varbin/
array.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use smallvec::smallvec;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_error::vortex_err;
14
15use crate::ArrayRef;
16use crate::ArraySlots;
17use crate::LEGACY_SESSION;
18#[expect(deprecated)]
19use crate::ToCanonical as _;
20use crate::VortexSessionExecute;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::array::child_to_validity;
25use crate::array::validity_to_child;
26use crate::arrays::VarBin;
27use crate::arrays::varbin::builder::VarBinBuilder;
28use crate::buffer::BufferHandle;
29use crate::dtype::DType;
30use crate::dtype::IntegerPType;
31use crate::dtype::Nullability;
32use crate::match_each_integer_ptype;
33use crate::validity::Validity;
34
35pub(super) const OFFSETS_SLOT: usize = 0;
37pub(super) const VALIDITY_SLOT: usize = 1;
39pub(super) const NUM_SLOTS: usize = 2;
40pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
41
42#[derive(Clone, Debug)]
43pub struct VarBinData {
44 pub(super) bytes: BufferHandle,
45}
46
47impl Display for VarBinData {
48 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
49 Ok(())
50 }
51}
52
53pub struct VarBinDataParts {
54 pub dtype: DType,
55 pub bytes: BufferHandle,
56 pub offsets: ArrayRef,
57 pub validity: Validity,
58}
59
60impl VarBinData {
61 pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
68 Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
69 }
70
71 pub fn build_from_handle(
78 offset: ArrayRef,
79 bytes: BufferHandle,
80 dtype: DType,
81 validity: Validity,
82 ) -> Self {
83 Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
84 }
85
86 pub(crate) fn make_slots(offsets: ArrayRef, validity: &Validity, len: usize) -> ArraySlots {
87 smallvec![Some(offsets), validity_to_child(validity, len)]
88 }
89
90 pub fn try_build(
99 offsets: ArrayRef,
100 bytes: ByteBuffer,
101 dtype: DType,
102 validity: Validity,
103 ) -> VortexResult<Self> {
104 let bytes = BufferHandle::new_host(bytes);
105 Self::validate(&offsets, &bytes, &dtype, &validity)?;
106
107 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
109 }
110
111 pub fn try_build_from_handle(
121 offsets: ArrayRef,
122 bytes: BufferHandle,
123 dtype: DType,
124 validity: Validity,
125 ) -> VortexResult<Self> {
126 Self::validate(&offsets, &bytes, &dtype, &validity)?;
127
128 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
130 }
131
132 pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
161 unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
164 }
165
166 pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
173 Self { bytes }
174 }
175
176 pub fn validate(
180 offsets: &ArrayRef,
181 bytes: &BufferHandle,
182 dtype: &DType,
183 validity: &Validity,
184 ) -> VortexResult<()> {
185 vortex_ensure!(
187 offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
188 MismatchedTypes: "non nullable int", offsets.dtype()
189 );
190
191 vortex_ensure!(
193 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
194 MismatchedTypes: "utf8 or binary", dtype
195 );
196
197 vortex_ensure!(
199 dtype.is_nullable() != matches!(validity, Validity::NonNullable),
200 InvalidArgument: "incorrect validity {:?} for dtype {}",
201 validity,
202 dtype
203 );
204
205 vortex_ensure!(
207 !offsets.is_empty(),
208 InvalidArgument: "Offsets must have at least one element"
209 );
210
211 if offsets.is_host() && bytes.is_on_host() {
213 let last_offset = offsets
214 .execute_scalar(
215 offsets.len() - 1,
216 &mut LEGACY_SESSION.create_execution_ctx(),
217 )?
218 .as_primitive()
219 .as_::<usize>()
220 .ok_or_else(
221 || vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
222 )?;
223 vortex_ensure!(
224 last_offset <= bytes.len(),
225 InvalidArgument: "Last offset {} exceeds bytes length {}",
226 last_offset,
227 bytes.len()
228 );
229 }
230
231 if let Some(validity_len) = validity.maybe_len() {
233 vortex_ensure!(
234 validity_len == offsets.len() - 1,
235 "Validity length {} doesn't match array length {}",
236 validity_len,
237 offsets.len() - 1
238 );
239 }
240
241 if offsets.is_host()
243 && bytes.is_on_host()
244 && matches!(dtype, DType::Utf8(_))
245 && let Some(bytes) = bytes.as_host_opt()
246 {
247 #[expect(deprecated)]
248 let primitive_offsets = offsets.to_primitive();
249 match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
250 let offsets_slice = primitive_offsets.as_slice::<O>();
251 for (i, (start, end)) in offsets_slice
252 .windows(2)
253 .map(|o| (o[0].as_(), o[1].as_()))
254 .enumerate()
255 {
256 if validity.is_null(i)? {
257 continue;
258 }
259
260 let string_bytes = &bytes.as_ref()[start..end];
261 simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
262 #[expect(clippy::unwrap_used)]
263 let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
265 vortex_err!("invalid utf-8: {err} at index {i}")
266 })?;
267 }
268 });
269 }
270
271 Ok(())
272 }
273
274 #[inline]
282 pub fn bytes(&self) -> &ByteBuffer {
283 self.bytes.as_host()
284 }
285
286 #[inline]
288 pub fn bytes_handle(&self) -> &BufferHandle {
289 &self.bytes
290 }
291}
292
293pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
294 fn offsets(&self) -> &ArrayRef {
295 self.as_ref().slots()[OFFSETS_SLOT]
296 .as_ref()
297 .vortex_expect("VarBinArray offsets slot")
298 }
299
300 fn validity_child(&self) -> Option<&ArrayRef> {
301 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
302 }
303
304 fn dtype_parts(&self) -> (bool, Nullability) {
305 match self.as_ref().dtype() {
306 DType::Utf8(nullability) => (true, *nullability),
307 DType::Binary(nullability) => (false, *nullability),
308 _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
309 }
310 }
311
312 fn is_utf8(&self) -> bool {
313 self.dtype_parts().0
314 }
315
316 fn nullability(&self) -> Nullability {
317 self.dtype_parts().1
318 }
319
320 fn varbin_validity(&self) -> Validity {
321 child_to_validity(
322 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
323 self.nullability(),
324 )
325 }
326
327 fn offset_at(&self, index: usize) -> usize {
328 assert!(
329 index <= self.as_ref().len(),
330 "Index {index} out of bounds 0..={}",
331 self.as_ref().len()
332 );
333
334 (&self
335 .offsets()
336 .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
337 .vortex_expect("offsets must support execute_scalar"))
338 .try_into()
339 .vortex_expect("Failed to convert offset to usize")
340 }
341
342 fn bytes_at(&self, index: usize) -> ByteBuffer {
343 let start = self.offset_at(index);
344 let end = self.offset_at(index + 1);
345 self.bytes().slice(start..end)
346 }
347
348 fn sliced_bytes(&self) -> ByteBuffer {
349 let first_offset: usize = self.offset_at(0);
350 let last_offset = self.offset_at(self.as_ref().len());
351 self.bytes().slice(first_offset..last_offset)
352 }
353}
354impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
355
356impl Array<VarBin> {
358 pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
359 let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
360 if size < u32::MAX as usize {
361 Self::from_vec_sized::<u32, T>(vec, dtype)
362 } else {
363 Self::from_vec_sized::<u64, T>(vec, dtype)
364 }
365 }
366
367 #[expect(
368 clippy::same_name_method,
369 reason = "intentionally named from_iter like Iterator::from_iter"
370 )]
371 pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
372 iter: I,
373 dtype: DType,
374 ) -> Self {
375 let iter = iter.into_iter();
376 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
377 for v in iter {
378 builder.append(v.as_ref().map(|o| o.as_ref()));
379 }
380 builder.finish(dtype)
381 }
382
383 pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
384 iter: I,
385 dtype: DType,
386 ) -> Self {
387 let iter = iter.into_iter();
388 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
389 for v in iter {
390 builder.append_value(v);
391 }
392 builder.finish(dtype)
393 }
394
395 fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
396 where
397 O: IntegerPType,
398 T: AsRef<[u8]>,
399 {
400 let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
401 for v in vec {
402 builder.append_value(v.as_ref());
403 }
404 builder.finish(dtype)
405 }
406
407 pub fn from_strs(value: Vec<&str>) -> Self {
409 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
410 }
411
412 pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
414 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
415 }
416
417 pub fn from_bytes(value: Vec<&[u8]>) -> Self {
419 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
420 }
421
422 pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
424 Self::from_iter(value, DType::Binary(Nullability::Nullable))
425 }
426
427 pub fn into_data_parts(self) -> VarBinDataParts {
428 let dtype = self.dtype().clone();
429 let validity = self.varbin_validity();
430 let offsets = self.offsets().clone();
431 let data = self.into_data();
432 VarBinDataParts {
433 dtype,
434 bytes: data.bytes,
435 offsets,
436 validity,
437 }
438 }
439}
440
441impl Array<VarBin> {
442 pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
444 let len = offsets.len().saturating_sub(1);
445 let slots = VarBinData::make_slots(offsets, &validity, len);
446 let data = VarBinData::build(
447 slots[OFFSETS_SLOT]
448 .as_ref()
449 .vortex_expect("VarBinArray offsets slot")
450 .clone(),
451 bytes,
452 dtype.clone(),
453 validity,
454 );
455 unsafe {
456 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
457 }
458 }
459
460 pub unsafe fn new_unchecked(
466 offsets: ArrayRef,
467 bytes: ByteBuffer,
468 dtype: DType,
469 validity: Validity,
470 ) -> Self {
471 let len = offsets.len().saturating_sub(1);
472 let slots = VarBinData::make_slots(offsets, &validity, len);
473 let data = unsafe { VarBinData::new_unchecked(bytes) };
474 unsafe {
475 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
476 }
477 }
478
479 pub unsafe fn new_unchecked_from_handle(
485 offsets: ArrayRef,
486 bytes: BufferHandle,
487 dtype: DType,
488 validity: Validity,
489 ) -> Self {
490 let len = offsets.len().saturating_sub(1);
491 let slots = VarBinData::make_slots(offsets, &validity, len);
492 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
493 unsafe {
494 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
495 }
496 }
497
498 pub fn try_new(
500 offsets: ArrayRef,
501 bytes: ByteBuffer,
502 dtype: DType,
503 validity: Validity,
504 ) -> VortexResult<Self> {
505 let len = offsets.len() - 1;
506 let bytes = BufferHandle::new_host(bytes);
507 VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
508 let slots = VarBinData::make_slots(offsets, &validity, len);
509 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
511 Ok(unsafe {
512 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
513 })
514 }
515}
516
517impl From<Vec<&[u8]>> for Array<VarBin> {
518 fn from(value: Vec<&[u8]>) -> Self {
519 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
520 }
521}
522
523impl From<Vec<Vec<u8>>> for Array<VarBin> {
524 fn from(value: Vec<Vec<u8>>) -> Self {
525 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
526 }
527}
528
529impl From<Vec<String>> for Array<VarBin> {
530 fn from(value: Vec<String>) -> Self {
531 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
532 }
533}
534
535impl From<Vec<&str>> for Array<VarBin> {
536 fn from(value: Vec<&str>) -> Self {
537 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
538 }
539}
540
541impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
542 fn from(value: Vec<Option<&[u8]>>) -> Self {
543 Self::from_iter(value, DType::Binary(Nullability::Nullable))
544 }
545}
546
547impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
548 fn from(value: Vec<Option<Vec<u8>>>) -> Self {
549 Self::from_iter(value, DType::Binary(Nullability::Nullable))
550 }
551}
552
553impl From<Vec<Option<String>>> for Array<VarBin> {
554 fn from(value: Vec<Option<String>>) -> Self {
555 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
556 }
557}
558
559impl From<Vec<Option<&str>>> for Array<VarBin> {
560 fn from(value: Vec<Option<&str>>) -> Self {
561 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
562 }
563}
564
565impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
566 fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
567 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
568 }
569}
570
571impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
572 fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
573 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
574 }
575}
576
577impl FromIterator<Option<String>> for Array<VarBin> {
578 fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
579 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
580 }
581}
582
583impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
584 fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
585 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
586 }
587}