vortex_array/arrays/varbin/
array.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6
7use num_traits::AsPrimitive;
8use vortex_buffer::ByteBuffer;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_ensure;
12use vortex_error::vortex_err;
13
14use crate::ArrayRef;
15use crate::LEGACY_SESSION;
16use crate::ToCanonical;
17use crate::VortexSessionExecute;
18use crate::array::Array;
19use crate::array::ArrayParts;
20use crate::array::TypedArrayRef;
21use crate::array::child_to_validity;
22use crate::array::validity_to_child;
23use crate::arrays::VarBin;
24use crate::arrays::varbin::builder::VarBinBuilder;
25use crate::buffer::BufferHandle;
26use crate::dtype::DType;
27use crate::dtype::IntegerPType;
28use crate::dtype::Nullability;
29use crate::match_each_integer_ptype;
30use crate::validity::Validity;
31
32pub(super) const OFFSETS_SLOT: usize = 0;
34pub(super) const VALIDITY_SLOT: usize = 1;
36pub(super) const NUM_SLOTS: usize = 2;
37pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
38
39#[derive(Clone, Debug)]
40pub struct VarBinData {
41 pub(super) bytes: BufferHandle,
42}
43
44impl Display for VarBinData {
45 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
46 Ok(())
47 }
48}
49
50pub struct VarBinDataParts {
51 pub dtype: DType,
52 pub bytes: BufferHandle,
53 pub offsets: ArrayRef,
54 pub validity: Validity,
55}
56
57impl VarBinData {
58 pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
65 Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
66 }
67
68 pub fn build_from_handle(
75 offset: ArrayRef,
76 bytes: BufferHandle,
77 dtype: DType,
78 validity: Validity,
79 ) -> Self {
80 Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
81 }
82
83 pub(crate) fn make_slots(
84 offsets: ArrayRef,
85 validity: &Validity,
86 len: usize,
87 ) -> Vec<Option<ArrayRef>> {
88 vec![Some(offsets), validity_to_child(validity, len)]
89 }
90
91 pub fn try_build(
100 offsets: ArrayRef,
101 bytes: ByteBuffer,
102 dtype: DType,
103 validity: Validity,
104 ) -> VortexResult<Self> {
105 let bytes = BufferHandle::new_host(bytes);
106 Self::validate(&offsets, &bytes, &dtype, &validity)?;
107
108 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
110 }
111
112 pub fn try_build_from_handle(
122 offsets: ArrayRef,
123 bytes: BufferHandle,
124 dtype: DType,
125 validity: Validity,
126 ) -> VortexResult<Self> {
127 Self::validate(&offsets, &bytes, &dtype, &validity)?;
128
129 Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
131 }
132
133 pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
162 unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
165 }
166
167 pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
174 Self { bytes }
175 }
176
177 pub fn validate(
181 offsets: &ArrayRef,
182 bytes: &BufferHandle,
183 dtype: &DType,
184 validity: &Validity,
185 ) -> VortexResult<()> {
186 vortex_ensure!(
188 offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
189 MismatchedTypes: "non nullable int", offsets.dtype()
190 );
191
192 vortex_ensure!(
194 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
195 MismatchedTypes: "utf8 or binary", dtype
196 );
197
198 vortex_ensure!(
200 dtype.is_nullable() != matches!(validity, Validity::NonNullable),
201 InvalidArgument: "incorrect validity {:?} for dtype {}",
202 validity,
203 dtype
204 );
205
206 vortex_ensure!(
208 !offsets.is_empty(),
209 InvalidArgument: "Offsets must have at least one element"
210 );
211
212 if offsets.is_host() && bytes.is_on_host() {
214 let last_offset = offsets
215 .execute_scalar(
216 offsets.len() - 1,
217 &mut LEGACY_SESSION.create_execution_ctx(),
218 )?
219 .as_primitive()
220 .as_::<usize>()
221 .ok_or_else(
222 || vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
223 )?;
224 vortex_ensure!(
225 last_offset <= bytes.len(),
226 InvalidArgument: "Last offset {} exceeds bytes length {}",
227 last_offset,
228 bytes.len()
229 );
230 }
231
232 if let Some(validity_len) = validity.maybe_len() {
234 vortex_ensure!(
235 validity_len == offsets.len() - 1,
236 "Validity length {} doesn't match array length {}",
237 validity_len,
238 offsets.len() - 1
239 );
240 }
241
242 if offsets.is_host()
244 && bytes.is_on_host()
245 && matches!(dtype, DType::Utf8(_))
246 && let Some(bytes) = bytes.as_host_opt()
247 {
248 let primitive_offsets = offsets.to_primitive();
249 match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
250 let offsets_slice = primitive_offsets.as_slice::<O>();
251 for (i, (start, end)) in offsets_slice
252 .windows(2)
253 .map(|o| (o[0].as_(), o[1].as_()))
254 .enumerate()
255 {
256 if validity.is_null(i)? {
257 continue;
258 }
259
260 let string_bytes = &bytes.as_ref()[start..end];
261 simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
262 #[expect(clippy::unwrap_used)]
263 let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
265 vortex_err!("invalid utf-8: {err} at index {i}")
266 })?;
267 }
268 });
269 }
270
271 Ok(())
272 }
273
274 #[inline]
282 pub fn bytes(&self) -> &ByteBuffer {
283 self.bytes.as_host()
284 }
285
286 #[inline]
288 pub fn bytes_handle(&self) -> &BufferHandle {
289 &self.bytes
290 }
291}
292
293pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
294 fn offsets(&self) -> &ArrayRef {
295 self.as_ref().slots()[OFFSETS_SLOT]
296 .as_ref()
297 .vortex_expect("VarBinArray offsets slot")
298 }
299
300 fn validity_child(&self) -> Option<&ArrayRef> {
301 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
302 }
303
304 fn dtype_parts(&self) -> (bool, Nullability) {
305 match self.as_ref().dtype() {
306 DType::Utf8(nullability) => (true, *nullability),
307 DType::Binary(nullability) => (false, *nullability),
308 _ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
309 }
310 }
311
312 fn is_utf8(&self) -> bool {
313 self.dtype_parts().0
314 }
315
316 fn nullability(&self) -> Nullability {
317 self.dtype_parts().1
318 }
319
320 fn varbin_validity(&self) -> Validity {
321 child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
322 }
323
324 fn offset_at(&self, index: usize) -> usize {
325 assert!(
326 index <= self.as_ref().len(),
327 "Index {index} out of bounds 0..={}",
328 self.as_ref().len()
329 );
330
331 (&self
332 .offsets()
333 .execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
334 .vortex_expect("offsets must support execute_scalar"))
335 .try_into()
336 .vortex_expect("Failed to convert offset to usize")
337 }
338
339 fn bytes_at(&self, index: usize) -> ByteBuffer {
340 let start = self.offset_at(index);
341 let end = self.offset_at(index + 1);
342 self.bytes().slice(start..end)
343 }
344
345 fn sliced_bytes(&self) -> ByteBuffer {
346 let first_offset: usize = self.offset_at(0);
347 let last_offset = self.offset_at(self.as_ref().len());
348 self.bytes().slice(first_offset..last_offset)
349 }
350}
351impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
352
353impl Array<VarBin> {
355 pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
356 let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
357 if size < u32::MAX as usize {
358 Self::from_vec_sized::<u32, T>(vec, dtype)
359 } else {
360 Self::from_vec_sized::<u64, T>(vec, dtype)
361 }
362 }
363
364 #[expect(
365 clippy::same_name_method,
366 reason = "intentionally named from_iter like Iterator::from_iter"
367 )]
368 pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
369 iter: I,
370 dtype: DType,
371 ) -> Self {
372 let iter = iter.into_iter();
373 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
374 for v in iter {
375 builder.append(v.as_ref().map(|o| o.as_ref()));
376 }
377 builder.finish(dtype)
378 }
379
380 pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
381 iter: I,
382 dtype: DType,
383 ) -> Self {
384 let iter = iter.into_iter();
385 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
386 for v in iter {
387 builder.append_value(v);
388 }
389 builder.finish(dtype)
390 }
391
392 fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
393 where
394 O: IntegerPType,
395 T: AsRef<[u8]>,
396 {
397 let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
398 for v in vec {
399 builder.append_value(v.as_ref());
400 }
401 builder.finish(dtype)
402 }
403
404 pub fn from_strs(value: Vec<&str>) -> Self {
406 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
407 }
408
409 pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
411 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
412 }
413
414 pub fn from_bytes(value: Vec<&[u8]>) -> Self {
416 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
417 }
418
419 pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
421 Self::from_iter(value, DType::Binary(Nullability::Nullable))
422 }
423
424 pub fn into_data_parts(self) -> VarBinDataParts {
425 let dtype = self.dtype().clone();
426 let validity = self.varbin_validity();
427 let offsets = self.offsets().clone();
428 let data = self.into_data();
429 VarBinDataParts {
430 dtype,
431 bytes: data.bytes,
432 offsets,
433 validity,
434 }
435 }
436}
437
438impl Array<VarBin> {
439 pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
441 let len = offsets.len().saturating_sub(1);
442 let slots = VarBinData::make_slots(offsets, &validity, len);
443 let data = VarBinData::build(
444 slots[OFFSETS_SLOT]
445 .as_ref()
446 .vortex_expect("VarBinArray offsets slot")
447 .clone(),
448 bytes,
449 dtype.clone(),
450 validity,
451 );
452 unsafe {
453 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
454 }
455 }
456
457 pub unsafe fn new_unchecked(
463 offsets: ArrayRef,
464 bytes: ByteBuffer,
465 dtype: DType,
466 validity: Validity,
467 ) -> Self {
468 let len = offsets.len().saturating_sub(1);
469 let slots = VarBinData::make_slots(offsets, &validity, len);
470 let data = unsafe { VarBinData::new_unchecked(bytes) };
471 unsafe {
472 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
473 }
474 }
475
476 pub unsafe fn new_unchecked_from_handle(
482 offsets: ArrayRef,
483 bytes: BufferHandle,
484 dtype: DType,
485 validity: Validity,
486 ) -> Self {
487 let len = offsets.len().saturating_sub(1);
488 let slots = VarBinData::make_slots(offsets, &validity, len);
489 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
490 unsafe {
491 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
492 }
493 }
494
495 pub fn try_new(
497 offsets: ArrayRef,
498 bytes: ByteBuffer,
499 dtype: DType,
500 validity: Validity,
501 ) -> VortexResult<Self> {
502 let len = offsets.len() - 1;
503 let bytes = BufferHandle::new_host(bytes);
504 VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
505 let slots = VarBinData::make_slots(offsets, &validity, len);
506 let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
508 Ok(unsafe {
509 Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
510 })
511 }
512}
513
514impl From<Vec<&[u8]>> for Array<VarBin> {
515 fn from(value: Vec<&[u8]>) -> Self {
516 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
517 }
518}
519
520impl From<Vec<Vec<u8>>> for Array<VarBin> {
521 fn from(value: Vec<Vec<u8>>) -> Self {
522 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
523 }
524}
525
526impl From<Vec<String>> for Array<VarBin> {
527 fn from(value: Vec<String>) -> Self {
528 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
529 }
530}
531
532impl From<Vec<&str>> for Array<VarBin> {
533 fn from(value: Vec<&str>) -> Self {
534 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
535 }
536}
537
538impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
539 fn from(value: Vec<Option<&[u8]>>) -> Self {
540 Self::from_iter(value, DType::Binary(Nullability::Nullable))
541 }
542}
543
544impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
545 fn from(value: Vec<Option<Vec<u8>>>) -> Self {
546 Self::from_iter(value, DType::Binary(Nullability::Nullable))
547 }
548}
549
550impl From<Vec<Option<String>>> for Array<VarBin> {
551 fn from(value: Vec<Option<String>>) -> Self {
552 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
553 }
554}
555
556impl From<Vec<Option<&str>>> for Array<VarBin> {
557 fn from(value: Vec<Option<&str>>) -> Self {
558 Self::from_iter(value, DType::Utf8(Nullability::Nullable))
559 }
560}
561
562impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
563 fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
564 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
565 }
566}
567
568impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
569 fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
570 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
571 }
572}
573
574impl FromIterator<Option<String>> for Array<VarBin> {
575 fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
576 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
577 }
578}
579
580impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
581 fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
582 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
583 }
584}