1use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use smallvec::smallvec;
9use vortex_array::arrays::PrimitiveArray;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_err;
15
16use crate::ArrayRef;
17use crate::ArraySlots;
18use crate::LEGACY_SESSION;
19use crate::VortexSessionExecute;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::array::child_to_validity;
24use crate::array::validity_to_child;
25use crate::arrays::VarBin;
26use crate::arrays::varbin::builder::VarBinBuilder;
27use crate::buffer::BufferHandle;
28use crate::dtype::DType;
29use crate::dtype::IntegerPType;
30use crate::dtype::Nullability;
31use crate::match_each_integer_ptype;
32use crate::validity::Validity;
33
34pub(super) const OFFSETS_SLOT: usize = 0;
36pub(super) const VALIDITY_SLOT: usize = 1;
38pub(super) const NUM_SLOTS: usize = 2;
39pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
40
41#[derive(Clone, Debug)]
42pub struct VarBinData {
43 pub(super) bytes: BufferHandle,
44}
45
46impl Display for VarBinData {
47 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
48 Ok(())
49 }
50}
51
52pub struct VarBinDataParts {
53 pub dtype: DType,
54 pub bytes: BufferHandle,
55 pub offsets: ArrayRef,
56 pub validity: Validity,
57}
58
59impl VarBinData {
60 pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
67 Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
68 }
69
70 pub fn build_from_handle(
77 offset: ArrayRef,
78 bytes: BufferHandle,
79 dtype: DType,
80 validity: Validity,
81 ) -> Self {
82 Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
83 }
84
85 pub(crate) fn make_slots(offsets: ArrayRef, validity: &Validity, len: usize) -> ArraySlots {
86 smallvec![Some(offsets), validity_to_child(validity, len)]
87 }
88
89 pub fn try_build(
98 offsets: ArrayRef,
99 bytes: ByteBuffer,
100 dtype: DType,
101 validity: Validity,
102 ) -> VortexResult<Self> {
103 let bytes = BufferHandle::new_host(bytes);
104 Self::validate(&offsets, &bytes, &dtype, &validity)?;
105
106 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
108 }
109
110 pub fn try_build_from_handle(
120 offsets: ArrayRef,
121 bytes: BufferHandle,
122 dtype: DType,
123 validity: Validity,
124 ) -> VortexResult<Self> {
125 Self::validate(&offsets, &bytes, &dtype, &validity)?;
126
127 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
129 }
130
131 pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
160 unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
163 }
164
165 pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
172 Self { bytes }
173 }
174
175 pub fn validate(
179 offsets: &ArrayRef,
180 bytes: &BufferHandle,
181 dtype: &DType,
182 validity: &Validity,
183 ) -> VortexResult<()> {
184 vortex_ensure!(
186 offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
187 MismatchedTypes: "non nullable int", offsets.dtype()
188 );
189
190 vortex_ensure!(
192 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
193 MismatchedTypes: "utf8 or binary", dtype
194 );
195
196 vortex_ensure!(
198 dtype.is_nullable() != matches!(validity, Validity::NonNullable),
199 InvalidArgument: "incorrect validity {:?} for dtype {}",
200 validity,
201 dtype
202 );
203
204 vortex_ensure!(
206 !offsets.is_empty(),
207 InvalidArgument: "Offsets must have at least one element"
208 );
209
210 if let Some(validity_len) = validity.maybe_len() {
212 vortex_ensure!(
213 validity_len == offsets.len() - 1,
214 "Validity length {} doesn't match array length {}",
215 validity_len,
216 offsets.len() - 1
217 );
218 }
219
220 if offsets.is_host()
222 && bytes.is_on_host()
223 && matches!(dtype, DType::Utf8(_))
224 && let Some(bytes) = bytes.as_host_opt()
225 {
226 Self::validate_utf8(offsets, bytes.as_ref(), validity)?;
227 }
228
229 Ok(())
230 }
231
232 fn validate_utf8(offsets: &ArrayRef, bytes: &[u8], validity: &Validity) -> VortexResult<()> {
234 let validate_at = |i: usize, start: usize, end: usize| -> VortexResult<()> {
235 let string_bytes = &bytes[start..end];
236 simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
237 #[expect(clippy::unwrap_used)]
238 let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
240 vortex_err!("invalid utf-8: {err} at index {i}")
241 })?;
242 Ok(())
243 };
244
245 let mut ctx = LEGACY_SESSION.create_execution_ctx();
246 let primitive_offsets = offsets.clone().execute::<PrimitiveArray>(&mut ctx)?;
248
249 let mask = match validity {
253 Validity::Array(_) => {
254 Some(validity.execute_mask(primitive_offsets.len().saturating_sub(1), &mut ctx)?)
255 }
256 _ => None,
257 };
258 let all_invalid = matches!(validity, Validity::AllInvalid);
259
260 match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
261 let offsets_slice = primitive_offsets.as_slice::<O>();
262
263 let last_offset: usize = offsets_slice[offsets_slice.len() - 1].as_();
264 vortex_ensure!(
265 last_offset <= bytes.len(),
266 InvalidArgument: "Last offset {} exceeds bytes length {}",
267 last_offset,
268 bytes.len()
269 );
270
271 for (i, (start, end)) in offsets_slice
272 .windows(2)
273 .map(|o| (o[0].as_(), o[1].as_()))
274 .enumerate()
275 {
276 let valid = mask.as_ref().map_or(!all_invalid, |mask| mask.value(i));
277 if valid {
278 validate_at(i, start, end)?;
279 }
280 }
281 });
282 Ok(())
283 }
284
285 #[inline]
293 pub fn bytes(&self) -> &ByteBuffer {
294 self.bytes.as_host()
295 }
296
297 #[inline]
299 pub fn bytes_handle(&self) -> &BufferHandle {
300 &self.bytes
301 }
302}
303
304pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
305 fn offsets(&self) -> &ArrayRef {
306 self.as_ref().slots()[OFFSETS_SLOT]
307 .as_ref()
308 .vortex_expect("VarBinArray offsets slot")
309 }
310
311 fn validity_child(&self) -> Option<&ArrayRef> {
312 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
313 }
314
315 fn dtype_parts(&self) -> (bool, Nullability) {
316 match self.as_ref().dtype() {
317 DType::Utf8(nullability) => (true, *nullability),
318 DType::Binary(nullability) => (false, *nullability),
319 _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
320 }
321 }
322
323 fn is_utf8(&self) -> bool {
324 self.dtype_parts().0
325 }
326
327 fn nullability(&self) -> Nullability {
328 self.dtype_parts().1
329 }
330
331 fn varbin_validity(&self) -> Validity {
332 child_to_validity(
333 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
334 self.nullability(),
335 )
336 }
337
338 fn offset_at(&self, index: usize) -> usize {
339 assert!(
340 index <= self.as_ref().len(),
341 "Index {index} out of bounds 0..={}",
342 self.as_ref().len()
343 );
344
345 (&self
346 .offsets()
347 .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
348 .vortex_expect("offsets must support execute_scalar"))
349 .try_into()
350 .vortex_expect("Failed to convert offset to usize")
351 }
352
353 fn bytes_at(&self, index: usize) -> ByteBuffer {
354 let start = self.offset_at(index);
355 let end = self.offset_at(index + 1);
356 self.bytes().slice(start..end)
357 }
358
359 fn sliced_bytes(&self) -> ByteBuffer {
360 let first_offset: usize = self.offset_at(0);
361 let last_offset = self.offset_at(self.as_ref().len());
362 self.bytes().slice(first_offset..last_offset)
363 }
364}
365impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
366
367impl Array<VarBin> {
369 pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
370 let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
371 if size < u32::MAX as usize {
372 Self::from_vec_sized::<u32, T>(vec, dtype)
373 } else {
374 Self::from_vec_sized::<u64, T>(vec, dtype)
375 }
376 }
377
378 #[expect(
379 clippy::same_name_method,
380 reason = "intentionally named from_iter like Iterator::from_iter"
381 )]
382 pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
383 iter: I,
384 dtype: DType,
385 ) -> Self {
386 let iter = iter.into_iter();
387 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
388 for v in iter {
389 builder.append(v.as_ref().map(|o| o.as_ref()));
390 }
391 builder.finish(dtype)
392 }
393
394 pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
395 iter: I,
396 dtype: DType,
397 ) -> Self {
398 let iter = iter.into_iter();
399 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
400 for v in iter {
401 builder.append_value(v);
402 }
403 builder.finish(dtype)
404 }
405
406 fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
407 where
408 O: IntegerPType,
409 T: AsRef<[u8]>,
410 {
411 let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
412 for v in vec {
413 builder.append_value(v.as_ref());
414 }
415 builder.finish(dtype)
416 }
417
418 pub fn from_strs(value: Vec<&str>) -> Self {
420 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
421 }
422
423 pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
425 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
426 }
427
428 pub fn from_bytes(value: Vec<&[u8]>) -> Self {
430 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
431 }
432
433 pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
435 Self::from_iter(value, DType::Binary(Nullability::Nullable))
436 }
437
438 pub fn into_data_parts(self) -> VarBinDataParts {
439 let dtype = self.dtype().clone();
440 let validity = self.varbin_validity();
441 let offsets = self.offsets().clone();
442 let data = self.into_data();
443 VarBinDataParts {
444 dtype,
445 bytes: data.bytes,
446 offsets,
447 validity,
448 }
449 }
450}
451
452impl Array<VarBin> {
453 pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
455 let len = offsets.len().saturating_sub(1);
456 let slots = VarBinData::make_slots(offsets, &validity, len);
457 let data = VarBinData::build(
458 slots[OFFSETS_SLOT]
459 .as_ref()
460 .vortex_expect("VarBinArray offsets slot")
461 .clone(),
462 bytes,
463 dtype.clone(),
464 validity,
465 );
466 unsafe {
467 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
468 }
469 }
470
471 pub unsafe fn new_unchecked(
477 offsets: ArrayRef,
478 bytes: ByteBuffer,
479 dtype: DType,
480 validity: Validity,
481 ) -> Self {
482 let len = offsets.len().saturating_sub(1);
483 let slots = VarBinData::make_slots(offsets, &validity, len);
484 let data = unsafe { VarBinData::new_unchecked(bytes) };
485 unsafe {
486 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
487 }
488 }
489
490 pub unsafe fn new_unchecked_from_handle(
496 offsets: ArrayRef,
497 bytes: BufferHandle,
498 dtype: DType,
499 validity: Validity,
500 ) -> Self {
501 let len = offsets.len().saturating_sub(1);
502 let slots = VarBinData::make_slots(offsets, &validity, len);
503 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
504 unsafe {
505 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
506 }
507 }
508
509 pub fn try_new(
511 offsets: ArrayRef,
512 bytes: ByteBuffer,
513 dtype: DType,
514 validity: Validity,
515 ) -> VortexResult<Self> {
516 let len = offsets.len() - 1;
517 let bytes = BufferHandle::new_host(bytes);
518 VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
519 let slots = VarBinData::make_slots(offsets, &validity, len);
520 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
522 Ok(unsafe {
523 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
524 })
525 }
526}
527
528impl From<Vec<&[u8]>> for Array<VarBin> {
529 fn from(value: Vec<&[u8]>) -> Self {
530 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
531 }
532}
533
534impl From<Vec<Vec<u8>>> for Array<VarBin> {
535 fn from(value: Vec<Vec<u8>>) -> Self {
536 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
537 }
538}
539
540impl From<Vec<String>> for Array<VarBin> {
541 fn from(value: Vec<String>) -> Self {
542 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
543 }
544}
545
546impl From<Vec<&str>> for Array<VarBin> {
547 fn from(value: Vec<&str>) -> Self {
548 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
549 }
550}
551
552impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
553 fn from(value: Vec<Option<&[u8]>>) -> Self {
554 Self::from_iter(value, DType::Binary(Nullability::Nullable))
555 }
556}
557
558impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
559 fn from(value: Vec<Option<Vec<u8>>>) -> Self {
560 Self::from_iter(value, DType::Binary(Nullability::Nullable))
561 }
562}
563
564impl From<Vec<Option<String>>> for Array<VarBin> {
565 fn from(value: Vec<Option<String>>) -> Self {
566 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
567 }
568}
569
570impl From<Vec<Option<&str>>> for Array<VarBin> {
571 fn from(value: Vec<Option<&str>>) -> Self {
572 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
573 }
574}
575
576impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
577 fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
578 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
579 }
580}
581
582impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
583 fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
584 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
585 }
586}
587
588impl FromIterator<Option<String>> for Array<VarBin> {
589 fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
590 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
591 }
592}
593
594impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
595 fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
596 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
597 }
598}