use std::fmt::Display;
use std::fmt::Formatter;
use std::iter;
use vortex_buffer::Alignment;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use crate::LEGACY_SESSION;
use crate::ToCanonical;
use crate::VortexSessionExecute;
use crate::array::Array;
use crate::array::ArrayParts;
use crate::array::TypedArrayRef;
use crate::arrays::Primitive;
use crate::arrays::PrimitiveArray;
use crate::dtype::DType;
use crate::dtype::NativePType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::match_each_native_ptype;
use crate::validity::Validity;
mod accessor;
mod cast;
mod conversion;
mod patch;
mod top_value;
pub use patch::chunk_range;
pub use patch::patch_chunk;
use crate::ArrayRef;
use crate::aggregate_fn::fns::min_max::min_max;
use crate::array::child_to_validity;
use crate::array::validity_to_child;
use crate::arrays::bool::BoolArrayExt;
use crate::buffer::BufferHandle;
use crate::builtins::ArrayBuiltins;
pub(super) const VALIDITY_SLOT: usize = 0;
pub(super) const NUM_SLOTS: usize = 1;
pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
#[derive(Clone, Debug)]
pub struct PrimitiveData {
pub(super) ptype: PType,
pub(super) buffer: BufferHandle,
}
impl Display for PrimitiveData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ptype: {}", self.ptype)
}
}
pub struct PrimitiveDataParts {
pub ptype: PType,
pub buffer: BufferHandle,
pub validity: Validity,
}
pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
fn ptype(&self) -> PType {
match self.as_ref().dtype() {
DType::Primitive(ptype, _) => *ptype,
_ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
}
}
fn nullability(&self) -> Nullability {
match self.as_ref().dtype() {
DType::Primitive(_, nullability) => *nullability,
_ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
}
}
fn validity_child(&self) -> Option<&ArrayRef> {
self.as_ref().slots()[VALIDITY_SLOT].as_ref()
}
fn validity(&self) -> Validity {
child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
}
fn validity_mask(&self) -> vortex_mask::Mask {
self.validity().to_mask(self.as_ref().len())
}
fn buffer_handle(&self) -> &BufferHandle {
&self.buffer
}
fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
if self.ptype() == ptype {
return self.to_owned();
}
assert_eq!(
self.ptype().byte_width(),
ptype.byte_width(),
"can't reinterpret cast between integers of two different widths"
);
PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
}
fn narrow(&self) -> VortexResult<PrimitiveArray> {
if !self.ptype().is_int() {
return Ok(self.to_owned());
}
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else {
return Ok(PrimitiveArray::new(
Buffer::<u8>::zeroed(self.len()),
self.validity(),
));
};
let Ok(min) = min_max
.min
.cast(&PType::I64.into())
.and_then(|s| i64::try_from(&s))
else {
return Ok(self.to_owned());
};
let Ok(max) = min_max
.max
.cast(&PType::I64.into())
.and_then(|s| i64::try_from(&s))
else {
return Ok(self.to_owned());
};
let nullability = self.as_ref().dtype().nullability();
if min < 0 || max < 0 {
if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
return Ok(self
.as_ref()
.cast(DType::Primitive(PType::I8, nullability))?
.to_primitive());
}
if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
return Ok(self
.as_ref()
.cast(DType::Primitive(PType::I16, nullability))?
.to_primitive());
}
if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
return Ok(self
.as_ref()
.cast(DType::Primitive(PType::I32, nullability))?
.to_primitive());
}
} else {
if max <= u8::MAX as i64 {
return Ok(self
.as_ref()
.cast(DType::Primitive(PType::U8, nullability))?
.to_primitive());
}
if max <= u16::MAX as i64 {
return Ok(self
.as_ref()
.cast(DType::Primitive(PType::U16, nullability))?
.to_primitive());
}
if max <= u32::MAX as i64 {
return Ok(self
.as_ref()
.cast(DType::Primitive(PType::U32, nullability))?
.to_primitive());
}
}
Ok(self.to_owned())
}
}
impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
impl PrimitiveData {
pub(super) fn make_slots(validity: &Validity, len: usize) -> Vec<Option<ArrayRef>> {
vec![validity_to_child(validity, len)]
}
pub unsafe fn new_unchecked_from_handle(
handle: BufferHandle,
ptype: PType,
_validity: Validity,
) -> Self {
Self {
ptype,
buffer: handle,
}
}
pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
let buffer = buffer.into();
Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
}
#[inline]
pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
Self::validate(&buffer, &validity)?;
Ok(unsafe { Self::new_unchecked(buffer, validity) })
}
#[inline]
pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
#[cfg(debug_assertions)]
Self::validate(&buffer, &_validity)
.vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
Self {
ptype: T::PTYPE,
buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
}
}
#[inline]
pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
if let Some(len) = validity.maybe_len()
&& buffer.len() != len
{
return Err(vortex_err!(
InvalidArgument:
"Buffer and validity length mismatch: buffer={}, validity={}",
buffer.len(),
len
));
}
Ok(())
}
pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
Self::new(Buffer::<T>::empty(), nullability.into())
}
}
impl Array<Primitive> {
pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
let dtype = DType::Primitive(T::PTYPE, nullability);
let len = 0;
let data = PrimitiveData::empty::<T>(nullability);
let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
unsafe {
Array::from_parts_unchecked(
ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
)
}
}
pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
let buffer = buffer.into();
let dtype = DType::Primitive(T::PTYPE, validity.nullability());
let len = buffer.len();
let slots = PrimitiveData::make_slots(&validity, len);
let data = PrimitiveData::new(buffer, validity);
unsafe {
Array::from_parts_unchecked(
ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
)
}
}
pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
let dtype = DType::Primitive(T::PTYPE, validity.nullability());
let len = buffer.len();
let slots = PrimitiveData::make_slots(&validity, len);
let data = PrimitiveData::try_new(buffer, validity)?;
Ok(unsafe {
Array::from_parts_unchecked(
ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
)
})
}
pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
let dtype = DType::Primitive(T::PTYPE, validity.nullability());
let len = buffer.len();
let slots = PrimitiveData::make_slots(&validity, len);
let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
unsafe {
Array::from_parts_unchecked(
ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
)
}
}
pub unsafe fn new_unchecked_from_handle(
handle: BufferHandle,
ptype: PType,
validity: Validity,
) -> Self {
let dtype = DType::Primitive(ptype, validity.nullability());
let len = handle.len() / ptype.byte_width();
let slots = PrimitiveData::make_slots(&validity, len);
let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
unsafe {
Array::from_parts_unchecked(
ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
)
}
}
pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
let dtype = DType::Primitive(ptype, validity.nullability());
let len = handle.len() / ptype.byte_width();
let slots = PrimitiveData::make_slots(&validity, len);
let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
.vortex_expect("PrimitiveData is always valid")
}
pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
let dtype = DType::Primitive(ptype, validity.nullability());
let len = buffer.len() / ptype.byte_width();
let slots = PrimitiveData::make_slots(&validity, len);
let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
unsafe {
Array::from_parts_unchecked(
ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
)
}
}
pub fn from_values_byte_buffer(
valid_elems_buffer: ByteBuffer,
ptype: PType,
validity: Validity,
n_rows: usize,
) -> Self {
let dtype = DType::Primitive(ptype, validity.nullability());
let len = n_rows;
let slots = PrimitiveData::make_slots(&validity, len);
let data =
PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
unsafe {
Array::from_parts_unchecked(
ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
)
}
}
pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
PrimitiveData::validate(buffer, validity)
}
pub fn into_data_parts(self) -> PrimitiveDataParts {
let validity = PrimitiveArrayExt::validity(&self);
let ptype = PrimitiveArrayExt::ptype(&self);
let data = self.into_data();
PrimitiveDataParts {
ptype,
buffer: data.buffer,
validity,
}
}
pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
where
T: NativePType,
R: NativePType,
F: FnMut((T, bool)) -> R,
{
let validity = PrimitiveArrayExt::validity(&self);
let data = self.into_data();
let buf_iter = data.to_buffer::<T>().into_iter();
let buffer = match &validity {
Validity::NonNullable | Validity::AllValid => {
BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
}
Validity::AllInvalid => {
BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
}
Validity::Array(val) => {
let val = val.to_bool().into_bit_buffer();
BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
}
};
Ok(PrimitiveArray::new(buffer.freeze(), validity))
}
}
impl PrimitiveData {
pub fn len(&self) -> usize {
self.buffer.len() / self.ptype.byte_width()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn ptype(&self) -> PType {
self.ptype
}
pub fn buffer_handle(&self) -> &BufferHandle {
&self.buffer
}
pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
Self {
ptype,
buffer: handle,
}
}
pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
match_each_native_ptype!(ptype, |T| {
Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
})
}
pub fn from_values_byte_buffer(
valid_elems_buffer: ByteBuffer,
ptype: PType,
validity: Validity,
n_rows: usize,
) -> Self {
let byte_width = ptype.byte_width();
let alignment = Alignment::new(byte_width);
let buffer = match &validity {
Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
Validity::Array(is_valid) => {
let bool_array = is_valid.to_bool();
let bool_buffer = bool_array.to_bit_buffer();
let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
for (i, valid_i) in bool_buffer.set_indices().enumerate() {
bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
.copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
}
bytes.freeze()
}
};
Self::from_byte_buffer(buffer, ptype, validity)
}
pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
if T::PTYPE != self.ptype() {
vortex_panic!(
"Attempted to get buffer of type {} from array of type {}",
T::PTYPE,
self.ptype()
)
}
Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
}
pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
if T::PTYPE != self.ptype() {
vortex_panic!(
"Attempted to get buffer of type {} from array of type {}",
T::PTYPE,
self.ptype()
)
}
Buffer::from_byte_buffer(self.buffer.into_host_sync())
}
pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
self.try_into_buffer_mut()
.unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
}
pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
if T::PTYPE != self.ptype() {
vortex_panic!(
"Attempted to get buffer_mut of type {} from array of type {}",
T::PTYPE,
self.ptype()
)
}
let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
buffer.try_into_mut()
}
}