use std::fmt::Display;
use std::fmt::Formatter;
use num_traits::AsPrimitive;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_mask::Mask;
use crate::ArrayRef;
use crate::ToCanonical;
use crate::array::Array;
use crate::array::ArrayParts;
use crate::array::TypedArrayRef;
use crate::array::child_to_validity;
use crate::array::validity_to_child;
use crate::arrays::VarBin;
use crate::arrays::varbin::builder::VarBinBuilder;
use crate::buffer::BufferHandle;
use crate::dtype::DType;
use crate::dtype::IntegerPType;
use crate::dtype::Nullability;
use crate::match_each_integer_ptype;
use crate::validity::Validity;
pub(super) const OFFSETS_SLOT: usize = 0;
pub(super) const VALIDITY_SLOT: usize = 1;
pub(super) const NUM_SLOTS: usize = 2;
pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["offsets", "validity"];
#[derive(Clone, Debug)]
pub struct VarBinData {
pub(super) bytes: BufferHandle,
}
impl Display for VarBinData {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
pub struct VarBinDataParts {
pub dtype: DType,
pub bytes: BufferHandle,
pub offsets: ArrayRef,
pub validity: Validity,
}
impl VarBinData {
pub fn build(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
Self::try_build(offsets, bytes, dtype, validity).vortex_expect("VarBinArray new")
}
pub fn build_from_handle(
offset: ArrayRef,
bytes: BufferHandle,
dtype: DType,
validity: Validity,
) -> Self {
Self::try_build_from_handle(offset, bytes, dtype, validity).vortex_expect("VarBinArray new")
}
pub(crate) fn make_slots(
offsets: ArrayRef,
validity: &Validity,
len: usize,
) -> Vec<Option<ArrayRef>> {
vec![Some(offsets), validity_to_child(validity, len)]
}
pub fn try_build(
offsets: ArrayRef,
bytes: ByteBuffer,
dtype: DType,
validity: Validity,
) -> VortexResult<Self> {
let bytes = BufferHandle::new_host(bytes);
Self::validate(&offsets, &bytes, &dtype, &validity)?;
Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
}
pub fn try_build_from_handle(
offsets: ArrayRef,
bytes: BufferHandle,
dtype: DType,
validity: Validity,
) -> VortexResult<Self> {
Self::validate(&offsets, &bytes, &dtype, &validity)?;
Ok(unsafe { Self::new_unchecked_from_handle(bytes) })
}
pub unsafe fn new_unchecked(bytes: ByteBuffer) -> Self {
unsafe { Self::new_unchecked_from_handle(BufferHandle::new_host(bytes)) }
}
pub unsafe fn new_unchecked_from_handle(bytes: BufferHandle) -> Self {
Self { bytes }
}
pub fn validate(
offsets: &ArrayRef,
bytes: &BufferHandle,
dtype: &DType,
validity: &Validity,
) -> VortexResult<()> {
vortex_ensure!(
offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
MismatchedTypes: "non nullable int", offsets.dtype()
);
vortex_ensure!(
matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
MismatchedTypes: "utf8 or binary", dtype
);
vortex_ensure!(
dtype.is_nullable() != matches!(validity, Validity::NonNullable),
InvalidArgument: "incorrect validity {:?} for dtype {}",
validity,
dtype
);
vortex_ensure!(
!offsets.is_empty(),
InvalidArgument: "Offsets must have at least one element"
);
if offsets.is_host() && bytes.is_on_host() {
let last_offset = offsets
.scalar_at(offsets.len() - 1)?
.as_primitive()
.as_::<usize>()
.ok_or_else(
|| vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
)?;
vortex_ensure!(
last_offset <= bytes.len(),
InvalidArgument: "Last offset {} exceeds bytes length {}",
last_offset,
bytes.len()
);
}
if let Some(validity_len) = validity.maybe_len() {
vortex_ensure!(
validity_len == offsets.len() - 1,
"Validity length {} doesn't match array length {}",
validity_len,
offsets.len() - 1
);
}
if offsets.is_host()
&& bytes.is_on_host()
&& matches!(dtype, DType::Utf8(_))
&& let Some(bytes) = bytes.as_host_opt()
{
let primitive_offsets = offsets.to_primitive();
match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
let offsets_slice = primitive_offsets.as_slice::<O>();
for (i, (start, end)) in offsets_slice
.windows(2)
.map(|o| (o[0].as_(), o[1].as_()))
.enumerate()
{
if validity.is_null(i)? {
continue;
}
let string_bytes = &bytes.as_ref()[start..end];
simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
#[allow(clippy::unwrap_used)]
let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
vortex_err!("invalid utf-8: {err} at index {i}")
})?;
}
});
}
Ok(())
}
#[inline]
pub fn bytes(&self) -> &ByteBuffer {
self.bytes.as_host()
}
#[inline]
pub fn bytes_handle(&self) -> &BufferHandle {
&self.bytes
}
}
pub trait VarBinArrayExt: TypedArrayRef<VarBin> {
fn offsets(&self) -> &ArrayRef {
self.as_ref().slots()[OFFSETS_SLOT]
.as_ref()
.vortex_expect("VarBinArray offsets slot")
}
fn validity_child(&self) -> Option<&ArrayRef> {
self.as_ref().slots()[VALIDITY_SLOT].as_ref()
}
fn dtype_parts(&self) -> (bool, Nullability) {
match self.as_ref().dtype() {
DType::Utf8(nullability) => (true, *nullability),
DType::Binary(nullability) => (false, *nullability),
_ => unreachable!("VarBinArrayExt requires a utf8 or binary dtype"),
}
}
fn is_utf8(&self) -> bool {
self.dtype_parts().0
}
fn nullability(&self) -> Nullability {
self.dtype_parts().1
}
fn varbin_validity(&self) -> Validity {
child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
}
fn varbin_validity_mask(&self) -> Mask {
self.varbin_validity().to_mask(self.as_ref().len())
}
fn offset_at(&self, index: usize) -> usize {
assert!(
index <= self.as_ref().len(),
"Index {index} out of bounds 0..={}",
self.as_ref().len()
);
(&self
.offsets()
.scalar_at(index)
.vortex_expect("offsets must support scalar_at"))
.try_into()
.vortex_expect("Failed to convert offset to usize")
}
fn bytes_at(&self, index: usize) -> ByteBuffer {
let start = self.offset_at(index);
let end = self.offset_at(index + 1);
self.bytes().slice(start..end)
}
fn sliced_bytes(&self) -> ByteBuffer {
let first_offset: usize = self.offset_at(0);
let last_offset = self.offset_at(self.as_ref().len());
self.bytes().slice(first_offset..last_offset)
}
}
impl<T: TypedArrayRef<VarBin>> VarBinArrayExt for T {}
impl Array<VarBin> {
pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
if size < u32::MAX as usize {
Self::from_vec_sized::<u32, T>(vec, dtype)
} else {
Self::from_vec_sized::<u64, T>(vec, dtype)
}
}
#[expect(
clippy::same_name_method,
reason = "intentionally named from_iter like Iterator::from_iter"
)]
pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
iter: I,
dtype: DType,
) -> Self {
let iter = iter.into_iter();
let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
for v in iter {
builder.append(v.as_ref().map(|o| o.as_ref()));
}
builder.finish(dtype)
}
pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
iter: I,
dtype: DType,
) -> Self {
let iter = iter.into_iter();
let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
for v in iter {
builder.append_value(v);
}
builder.finish(dtype)
}
fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
where
O: IntegerPType,
T: AsRef<[u8]>,
{
let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
for v in vec {
builder.append_value(v.as_ref());
}
builder.finish(dtype)
}
pub fn from_strs(value: Vec<&str>) -> Self {
Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
}
pub fn from_nullable_strs(value: Vec<Option<&str>>) -> Self {
Self::from_iter(value, DType::Utf8(Nullability::Nullable))
}
pub fn from_bytes(value: Vec<&[u8]>) -> Self {
Self::from_vec(value, DType::Binary(Nullability::NonNullable))
}
pub fn from_nullable_bytes(value: Vec<Option<&[u8]>>) -> Self {
Self::from_iter(value, DType::Binary(Nullability::Nullable))
}
pub fn into_data_parts(self) -> VarBinDataParts {
let dtype = self.dtype().clone();
let validity = self.varbin_validity();
let offsets = self.offsets().clone();
let data = self.into_data();
VarBinDataParts {
dtype,
bytes: data.bytes,
offsets,
validity,
}
}
}
impl Array<VarBin> {
pub fn new(offsets: ArrayRef, bytes: ByteBuffer, dtype: DType, validity: Validity) -> Self {
let len = offsets.len().saturating_sub(1);
let slots = VarBinData::make_slots(offsets, &validity, len);
let data = VarBinData::build(
slots[OFFSETS_SLOT]
.as_ref()
.vortex_expect("VarBinArray offsets slot")
.clone(),
bytes,
dtype.clone(),
validity,
);
unsafe {
Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
}
}
pub unsafe fn new_unchecked(
offsets: ArrayRef,
bytes: ByteBuffer,
dtype: DType,
validity: Validity,
) -> Self {
let len = offsets.len().saturating_sub(1);
let slots = VarBinData::make_slots(offsets, &validity, len);
let data = unsafe { VarBinData::new_unchecked(bytes) };
unsafe {
Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
}
}
pub unsafe fn new_unchecked_from_handle(
offsets: ArrayRef,
bytes: BufferHandle,
dtype: DType,
validity: Validity,
) -> Self {
let len = offsets.len().saturating_sub(1);
let slots = VarBinData::make_slots(offsets, &validity, len);
let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
unsafe {
Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
}
}
pub fn try_new(
offsets: ArrayRef,
bytes: ByteBuffer,
dtype: DType,
validity: Validity,
) -> VortexResult<Self> {
let len = offsets.len() - 1;
let bytes = BufferHandle::new_host(bytes);
VarBinData::validate(&offsets, &bytes, &dtype, &validity)?;
let slots = VarBinData::make_slots(offsets, &validity, len);
let data = unsafe { VarBinData::new_unchecked_from_handle(bytes) };
Ok(unsafe {
Array::from_parts_unchecked(ArrayParts::new(VarBin, dtype, len, data).with_slots(slots))
})
}
}
impl From<Vec<&[u8]>> for Array<VarBin> {
fn from(value: Vec<&[u8]>) -> Self {
Self::from_vec(value, DType::Binary(Nullability::NonNullable))
}
}
impl From<Vec<Vec<u8>>> for Array<VarBin> {
fn from(value: Vec<Vec<u8>>) -> Self {
Self::from_vec(value, DType::Binary(Nullability::NonNullable))
}
}
impl From<Vec<String>> for Array<VarBin> {
fn from(value: Vec<String>) -> Self {
Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
}
}
impl From<Vec<&str>> for Array<VarBin> {
fn from(value: Vec<&str>) -> Self {
Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
}
}
impl From<Vec<Option<&[u8]>>> for Array<VarBin> {
fn from(value: Vec<Option<&[u8]>>) -> Self {
Self::from_iter(value, DType::Binary(Nullability::Nullable))
}
}
impl From<Vec<Option<Vec<u8>>>> for Array<VarBin> {
fn from(value: Vec<Option<Vec<u8>>>) -> Self {
Self::from_iter(value, DType::Binary(Nullability::Nullable))
}
}
impl From<Vec<Option<String>>> for Array<VarBin> {
fn from(value: Vec<Option<String>>) -> Self {
Self::from_iter(value, DType::Utf8(Nullability::Nullable))
}
}
impl From<Vec<Option<&str>>> for Array<VarBin> {
fn from(value: Vec<Option<&str>>) -> Self {
Self::from_iter(value, DType::Utf8(Nullability::Nullable))
}
}
impl<'a> FromIterator<Option<&'a [u8]>> for Array<VarBin> {
fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
Self::from_iter(iter, DType::Binary(Nullability::Nullable))
}
}
impl FromIterator<Option<Vec<u8>>> for Array<VarBin> {
fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
Self::from_iter(iter, DType::Binary(Nullability::Nullable))
}
}
impl FromIterator<Option<String>> for Array<VarBin> {
fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
}
}
impl<'a> FromIterator<Option<&'a str>> for Array<VarBin> {
fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
}
}