use std::any::type_name;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::hash::Hasher;
use std::ops::Range;
use std::sync::Arc;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_mask::Mask;
use vortex_session::VortexSession;
use crate::AnyCanonical;
use crate::Array;
use crate::ArrayEq;
use crate::ArrayHash;
use crate::ArrayView;
use crate::Canonical;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::ToCanonical;
use crate::VTable;
use crate::VortexSessionExecute;
use crate::aggregate_fn::fns::sum::sum;
use crate::array::ArrayId;
use crate::array::ArrayInner;
use crate::array::DynArray;
use crate::arrays::Bool;
use crate::arrays::Constant;
use crate::arrays::DictArray;
use crate::arrays::FilterArray;
use crate::arrays::Null;
use crate::arrays::Primitive;
use crate::arrays::SliceArray;
use crate::arrays::VarBin;
use crate::arrays::VarBinView;
use crate::arrays::bool::BoolArrayExt;
use crate::buffer::BufferHandle;
use crate::builders::ArrayBuilder;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::expr::stats::Precision;
use crate::expr::stats::Stat;
use crate::expr::stats::StatsProviderExt;
use crate::matcher::Matcher;
use crate::optimizer::ArrayOptimizer;
use crate::scalar::Scalar;
use crate::stats::StatsSetRef;
use crate::validity::Validity;
pub struct DepthFirstArrayIterator {
stack: Vec<ArrayRef>,
}
impl Iterator for DepthFirstArrayIterator {
type Item = ArrayRef;
fn next(&mut self) -> Option<Self::Item> {
let next = self.stack.pop()?;
for child in next.children().into_iter().rev() {
self.stack.push(child);
}
Some(next)
}
}
#[derive(Clone)]
pub struct ArrayRef(Arc<dyn DynArray>);
impl ArrayRef {
pub(crate) fn from_inner(inner: Arc<dyn DynArray>) -> Self {
Self(inner)
}
#[doc(hidden)]
pub fn addr(&self) -> usize {
Arc::as_ptr(&self.0).addr()
}
#[inline(always)]
pub(crate) fn inner(&self) -> &Arc<dyn DynArray> {
&self.0
}
#[inline(always)]
pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
self.0
}
pub fn ptr_eq(this: &ArrayRef, other: &ArrayRef) -> bool {
Arc::ptr_eq(&this.0, &other.0)
}
}
impl Debug for ArrayRef {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&*self.0, f)
}
}
impl ArrayHash for ArrayRef {
fn array_hash<H: Hasher>(&self, state: &mut H, precision: crate::Precision) {
self.0.dyn_array_hash(state as &mut dyn Hasher, precision);
}
}
impl ArrayEq for ArrayRef {
fn array_eq(&self, other: &Self, precision: crate::Precision) -> bool {
self.0.dyn_array_eq(other, precision)
}
}
#[allow(clippy::same_name_method)]
impl ArrayRef {
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.len() == 0
}
#[inline]
pub fn dtype(&self) -> &DType {
self.0.dtype()
}
#[inline]
pub fn encoding_id(&self) -> ArrayId {
self.0.encoding_id()
}
pub fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
let len = self.len();
let start = range.start;
let stop = range.end;
if start == 0 && stop == len {
return Ok(self.clone());
}
vortex_ensure!(start <= len, "OutOfBounds: start {start} > length {}", len);
vortex_ensure!(stop <= len, "OutOfBounds: stop {stop} > length {}", len);
vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
if start == stop {
return Ok(Canonical::empty(self.dtype()).into_array());
}
let sliced = SliceArray::try_new(self.clone(), range)?
.into_array()
.optimize()?;
if !sliced.is::<Constant>() {
self.statistics().with_iter(|iter| {
sliced.statistics().inherit(iter.filter(|(stat, value)| {
matches!(
stat,
Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
) && value.as_ref().as_exact().is_some_and(|v| {
Scalar::try_new(DType::Bool(Nullability::NonNullable), Some(v.clone()))
.vortex_expect("A stat that was expected to be a boolean stat was not")
.as_bool()
.value()
.unwrap_or_default()
})
}));
});
}
Ok(sliced)
}
pub fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
FilterArray::try_new(self.clone(), mask)?
.into_array()
.optimize()
}
pub fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
DictArray::try_new(indices, self.clone())?
.into_array()
.optimize()
}
pub fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
if self.is_invalid(index)? {
return Ok(Scalar::null(self.dtype().clone()));
}
let scalar = self.0.scalar_at(self, index)?;
vortex_ensure!(self.dtype() == scalar.dtype(), "Scalar dtype mismatch");
Ok(scalar)
}
pub fn is_valid(&self, index: usize) -> VortexResult<bool> {
vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
match self.validity()? {
Validity::NonNullable | Validity::AllValid => Ok(true),
Validity::AllInvalid => Ok(false),
Validity::Array(a) => a
.scalar_at(index)?
.as_bool()
.value()
.ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
}
}
pub fn is_invalid(&self, index: usize) -> VortexResult<bool> {
Ok(!self.is_valid(index)?)
}
pub fn all_valid(&self) -> VortexResult<bool> {
match self.validity()? {
Validity::NonNullable | Validity::AllValid => Ok(true),
Validity::AllInvalid => Ok(false),
Validity::Array(a) => Ok(a.statistics().compute_min::<bool>().unwrap_or(false)),
}
}
pub fn all_invalid(&self) -> VortexResult<bool> {
match self.validity()? {
Validity::NonNullable | Validity::AllValid => Ok(false),
Validity::AllInvalid => Ok(true),
Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>().unwrap_or(true)),
}
}
pub fn valid_count(&self) -> VortexResult<usize> {
let len = self.len();
if let Some(Precision::Exact(invalid_count)) =
self.statistics().get_as::<usize>(Stat::NullCount)
{
return Ok(len - invalid_count);
}
let count = match self.validity()? {
Validity::NonNullable | Validity::AllValid => len,
Validity::AllInvalid => 0,
Validity::Array(a) => {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let array_sum = sum(&a, &mut ctx)?;
array_sum
.as_primitive()
.as_::<usize>()
.ok_or_else(|| vortex_err!("sum of validity array is null"))?
}
};
vortex_ensure!(count <= len, "Valid count exceeds array length");
self.statistics()
.set(Stat::NullCount, Precision::exact(len - count));
Ok(count)
}
pub fn invalid_count(&self) -> VortexResult<usize> {
Ok(self.len() - self.valid_count()?)
}
pub fn validity(&self) -> VortexResult<Validity> {
self.0.validity(self)
}
pub fn validity_mask(&self) -> VortexResult<Mask> {
match self.validity()? {
Validity::NonNullable | Validity::AllValid => Ok(Mask::new_true(self.len())),
Validity::AllInvalid => Ok(Mask::new_false(self.len())),
Validity::Array(a) => Ok(a.to_bool().to_mask()),
}
}
pub fn into_canonical(self) -> VortexResult<Canonical> {
self.execute(&mut LEGACY_SESSION.create_execution_ctx())
}
pub fn to_canonical(&self) -> VortexResult<Canonical> {
self.clone().into_canonical()
}
pub fn append_to_builder(
&self,
builder: &mut dyn ArrayBuilder,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
self.0.append_to_builder(self, builder, ctx)
}
pub fn statistics(&self) -> StatsSetRef<'_> {
self.0.statistics().to_ref(self)
}
pub fn is<M: Matcher>(&self) -> bool {
M::matches(self)
}
pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
self.as_opt::<M>().vortex_expect("Failed to downcast")
}
pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
M::try_match(self)
}
pub fn try_downcast<V: VTable>(self) -> Result<Array<V>, ArrayRef> {
Array::<V>::try_from_array_ref(self)
}
pub fn downcast<V: VTable>(self) -> Array<V> {
Self::try_downcast(self)
.unwrap_or_else(|_| vortex_panic!("Failed to downcast to {}", type_name::<V>()))
}
pub fn as_typed<V: VTable>(&self) -> Option<ArrayView<'_, V>> {
let inner = self.0.as_any().downcast_ref::<ArrayInner<V>>()?;
Some(unsafe { ArrayView::new_unchecked(self, &inner.data) })
}
pub fn as_constant(&self) -> Option<Scalar> {
self.as_opt::<Constant>().map(|a| a.scalar().clone())
}
pub fn nbytes(&self) -> u64 {
let mut nbytes = 0;
for array in self.depth_first_traversal() {
for buffer in array.buffers() {
nbytes += buffer.len() as u64;
}
}
nbytes
}
pub fn is_arrow(&self) -> bool {
self.is::<Null>()
|| self.is::<Bool>()
|| self.is::<Primitive>()
|| self.is::<VarBin>()
|| self.is::<VarBinView>()
}
pub fn is_canonical(&self) -> bool {
self.is::<AnyCanonical>()
}
pub fn with_slot(self, slot_idx: usize, replacement: ArrayRef) -> VortexResult<ArrayRef> {
let slots = self.slots().to_vec();
let nslots = slots.len();
vortex_ensure!(
slot_idx < nslots,
"slot index {} out of bounds for array with {} slots",
slot_idx,
nslots
);
let existing = slots[slot_idx]
.as_ref()
.vortex_expect("with_slot cannot replace an absent slot");
vortex_ensure!(
existing.dtype() == replacement.dtype(),
"slot {} dtype changed from {} to {} during physical rewrite",
slot_idx,
existing.dtype(),
replacement.dtype()
);
vortex_ensure!(
existing.len() == replacement.len(),
"slot {} len changed from {} to {} during physical rewrite",
slot_idx,
existing.len(),
replacement.len()
);
let mut slots = slots;
slots[slot_idx] = Some(replacement);
self.with_slots(slots)
}
pub fn with_slots(self, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef> {
let old_slots = self.slots();
vortex_ensure!(
old_slots.len() == slots.len(),
"slot count changed from {} to {} during physical rewrite",
old_slots.len(),
slots.len()
);
for (idx, (old_slot, new_slot)) in old_slots.iter().zip(slots.iter()).enumerate() {
vortex_ensure!(
old_slot.is_some() == new_slot.is_some(),
"slot {} presence changed during physical rewrite",
idx
);
if let (Some(old_slot), Some(new_slot)) = (old_slot.as_ref(), new_slot.as_ref()) {
vortex_ensure!(
old_slot.dtype() == new_slot.dtype(),
"slot {} dtype changed from {} to {} during physical rewrite",
idx,
old_slot.dtype(),
new_slot.dtype()
);
vortex_ensure!(
old_slot.len() == new_slot.len(),
"slot {} len changed from {} to {} during physical rewrite",
idx,
old_slot.len(),
new_slot.len()
);
}
}
let inner = Arc::clone(&self.0);
inner.with_slots(self, slots)
}
pub fn reduce(&self) -> VortexResult<Option<ArrayRef>> {
self.0.reduce(self)
}
pub fn reduce_parent(
&self,
parent: &ArrayRef,
child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
self.0.reduce_parent(self, parent, child_idx)
}
pub(crate) fn execute_encoding(
self,
ctx: &mut ExecutionCtx,
) -> VortexResult<crate::ExecutionResult> {
let inner = Arc::clone(&self.0);
inner.execute(self, ctx)
}
pub fn execute_parent(
&self,
parent: &ArrayRef,
child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
self.0.execute_parent(self, parent, child_idx, ctx)
}
pub fn children(&self) -> Vec<ArrayRef> {
self.0.children(self)
}
pub fn nchildren(&self) -> usize {
self.0.nchildren(self)
}
pub fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
self.0.nth_child(self, idx)
}
pub fn children_names(&self) -> Vec<String> {
self.0.children_names(self)
}
pub fn named_children(&self) -> Vec<(String, ArrayRef)> {
self.0.named_children(self)
}
pub fn buffers(&self) -> Vec<ByteBuffer> {
self.0.buffers(self)
}
pub fn buffer_handles(&self) -> Vec<BufferHandle> {
self.0.buffer_handles(self)
}
pub fn buffer_names(&self) -> Vec<String> {
self.0.buffer_names(self)
}
pub fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
self.0.named_buffers(self)
}
pub fn nbuffers(&self) -> usize {
self.0.nbuffers(self)
}
pub fn slots(&self) -> &[Option<ArrayRef>] {
self.0.slots()
}
pub fn slot_name(&self, idx: usize) -> String {
self.0.slot_name(self, idx)
}
pub fn metadata(&self, session: &VortexSession) -> VortexResult<Option<Vec<u8>>> {
self.0.metadata(self, session)
}
pub fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.0.metadata_fmt(f)
}
pub fn is_host(&self) -> bool {
for array in self.depth_first_traversal() {
if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
return false;
}
}
true
}
pub fn nbuffers_recursive(&self) -> usize {
self.children()
.iter()
.map(|c| c.nbuffers_recursive())
.sum::<usize>()
+ self.nbuffers()
}
pub fn depth_first_traversal(&self) -> DepthFirstArrayIterator {
DepthFirstArrayIterator {
stack: vec![self.clone()],
}
}
}
impl IntoArray for ArrayRef {
#[inline(always)]
fn into_array(self) -> ArrayRef {
self
}
}
impl<V: VTable> Matcher for V {
type Match<'a> = ArrayView<'a, V>;
fn matches(array: &ArrayRef) -> bool {
array.0.as_any().is::<ArrayInner<V>>()
}
fn try_match<'a>(array: &'a ArrayRef) -> Option<ArrayView<'a, V>> {
let inner = array.0.as_any().downcast_ref::<ArrayInner<V>>()?;
Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
}
}