pub mod display;
mod visitor;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::ops::Range;
use std::sync::Arc;
pub use visitor::*;
use vortex_buffer::ByteBuffer;
use vortex_dtype::{DType, Nullability};
use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic};
use vortex_mask::Mask;
use vortex_scalar::Scalar;
use crate::arrays::{
BoolEncoding, ConstantVTable, DecimalEncoding, ExtensionEncoding, FixedSizeListEncoding,
ListViewEncoding, NullEncoding, PrimitiveEncoding, StructEncoding, VarBinEncoding,
VarBinViewEncoding,
};
use crate::builders::ArrayBuilder;
use crate::compute::{ComputeFn, Cost, InvocationArgs, IsConstantOpts, Output, is_constant_opts};
use crate::operator::OperatorRef;
use crate::serde::ArrayChildren;
use crate::stats::{Precision, Stat, StatsProviderExt, StatsSetRef};
use crate::vtable::{
ArrayVTable, CanonicalVTable, ComputeVTable, OperationsVTable, PipelineVTable, SerdeVTable,
VTable, ValidityVTable, VisitorVTable,
};
use crate::{Canonical, EncodingId, EncodingRef, SerializeMetadata};
pub trait Array: 'static + private::Sealed + Send + Sync + Debug + ArrayVisitor {
fn as_any(&self) -> &dyn Any;
fn to_array(&self) -> ArrayRef;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn dtype(&self) -> &DType;
fn encoding(&self) -> EncodingRef;
fn encoding_id(&self) -> EncodingId;
fn slice(&self, range: Range<usize>) -> ArrayRef;
fn scalar_at(&self, index: usize) -> Scalar;
fn is_encoding(&self, encoding: EncodingId) -> bool {
self.encoding_id() == encoding
}
fn is_arrow(&self) -> bool {
self.is_encoding(NullEncoding.id())
|| self.is_encoding(BoolEncoding.id())
|| self.is_encoding(PrimitiveEncoding.id())
|| self.is_encoding(VarBinEncoding.id())
|| self.is_encoding(VarBinViewEncoding.id())
}
fn is_canonical(&self) -> bool {
self.is_encoding(NullEncoding.id())
|| self.is_encoding(BoolEncoding.id())
|| self.is_encoding(PrimitiveEncoding.id())
|| self.is_encoding(DecimalEncoding.id())
|| self.is_encoding(StructEncoding.id())
|| self.is_encoding(ListViewEncoding.id())
|| self.is_encoding(FixedSizeListEncoding.id())
|| self.is_encoding(VarBinViewEncoding.id())
|| self.is_encoding(ExtensionEncoding.id())
}
fn is_valid(&self, index: usize) -> bool;
fn is_invalid(&self, index: usize) -> bool;
fn all_valid(&self) -> bool;
fn all_invalid(&self) -> bool;
fn valid_count(&self) -> usize;
fn invalid_count(&self) -> usize;
fn validity_mask(&self) -> Mask;
fn to_canonical(&self) -> Canonical;
fn append_to_builder(&self, builder: &mut dyn ArrayBuilder);
fn statistics(&self) -> StatsSetRef<'_>;
fn with_children(&self, children: &[ArrayRef]) -> VortexResult<ArrayRef>;
fn invoke(&self, compute_fn: &ComputeFn, args: &InvocationArgs)
-> VortexResult<Option<Output>>;
fn to_operator(&self) -> VortexResult<Option<OperatorRef>>;
}
impl Array for Arc<dyn Array> {
#[inline]
fn as_any(&self) -> &dyn Any {
self.as_ref().as_any()
}
#[inline]
fn to_array(&self) -> ArrayRef {
self.clone()
}
#[inline]
fn len(&self) -> usize {
self.as_ref().len()
}
#[inline]
fn dtype(&self) -> &DType {
self.as_ref().dtype()
}
#[inline]
fn encoding(&self) -> EncodingRef {
self.as_ref().encoding()
}
#[inline]
fn encoding_id(&self) -> EncodingId {
self.as_ref().encoding_id()
}
#[inline]
fn slice(&self, range: Range<usize>) -> ArrayRef {
self.as_ref().slice(range)
}
#[inline]
fn scalar_at(&self, index: usize) -> Scalar {
self.as_ref().scalar_at(index)
}
#[inline]
fn is_valid(&self, index: usize) -> bool {
self.as_ref().is_valid(index)
}
#[inline]
fn is_invalid(&self, index: usize) -> bool {
self.as_ref().is_invalid(index)
}
#[inline]
fn all_valid(&self) -> bool {
self.as_ref().all_valid()
}
#[inline]
fn all_invalid(&self) -> bool {
self.as_ref().all_invalid()
}
#[inline]
fn valid_count(&self) -> usize {
self.as_ref().valid_count()
}
#[inline]
fn invalid_count(&self) -> usize {
self.as_ref().invalid_count()
}
#[inline]
fn validity_mask(&self) -> Mask {
self.as_ref().validity_mask()
}
fn to_canonical(&self) -> Canonical {
self.as_ref().to_canonical()
}
fn append_to_builder(&self, builder: &mut dyn ArrayBuilder) {
self.as_ref().append_to_builder(builder)
}
fn statistics(&self) -> StatsSetRef<'_> {
self.as_ref().statistics()
}
fn with_children(&self, children: &[ArrayRef]) -> VortexResult<ArrayRef> {
self.as_ref().with_children(children)
}
fn invoke(
&self,
compute_fn: &ComputeFn,
args: &InvocationArgs,
) -> VortexResult<Option<Output>> {
self.as_ref().invoke(compute_fn, args)
}
fn to_operator(&self) -> VortexResult<Option<OperatorRef>> {
self.as_ref().to_operator()
}
}
pub type ArrayRef = Arc<dyn Array>;
impl ToOwned for dyn Array {
type Owned = ArrayRef;
fn to_owned(&self) -> Self::Owned {
self.to_array()
}
}
impl dyn Array + '_ {
pub fn as_<V: VTable>(&self) -> &V::Array {
self.as_opt::<V>().vortex_expect("Failed to downcast")
}
pub fn as_opt<V: VTable>(&self) -> Option<&V::Array> {
self.as_any()
.downcast_ref::<ArrayAdapter<V>>()
.map(|array_adapter| &array_adapter.0)
}
pub fn is<V: VTable>(&self) -> bool {
self.as_opt::<V>().is_some()
}
pub fn is_constant(&self) -> bool {
let opts = IsConstantOpts {
cost: Cost::Specialized,
};
is_constant_opts(self, &opts)
.inspect_err(|e| log::warn!("Failed to compute IsConstant: {e}"))
.ok()
.flatten()
.unwrap_or_default()
}
pub fn is_constant_opts(&self, cost: Cost) -> bool {
let opts = IsConstantOpts { cost };
is_constant_opts(self, &opts)
.inspect_err(|e| log::warn!("Failed to compute IsConstant: {e}"))
.ok()
.flatten()
.unwrap_or_default()
}
pub fn as_constant(&self) -> Option<Scalar> {
self.is_constant().then(|| self.scalar_at(0))
}
pub fn nbytes(&self) -> u64 {
let mut nbytes = 0;
for array in self.depth_first_traversal() {
for buffer in array.buffers() {
nbytes += buffer.len() as u64;
}
}
nbytes
}
}
pub trait IntoArray {
fn into_array(self) -> ArrayRef;
}
impl IntoArray for ArrayRef {
fn into_array(self) -> ArrayRef {
self
}
}
mod private {
use super::*;
pub trait Sealed {}
impl<V: VTable> Sealed for ArrayAdapter<V> {}
impl Sealed for Arc<dyn Array> {}
}
#[repr(transparent)]
pub struct ArrayAdapter<V: VTable>(V::Array);
impl<V: VTable> ArrayAdapter<V> {
pub fn as_inner(&self) -> &V::Array {
&self.0
}
pub fn into_inner(self) -> V::Array {
self.0
}
}
impl<V: VTable> Debug for ArrayAdapter<V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<V: VTable> Array for ArrayAdapter<V> {
fn as_any(&self) -> &dyn Any {
self
}
fn to_array(&self) -> ArrayRef {
Arc::new(ArrayAdapter::<V>(self.0.clone()))
}
fn len(&self) -> usize {
<V::ArrayVTable as ArrayVTable<V>>::len(&self.0)
}
fn dtype(&self) -> &DType {
<V::ArrayVTable as ArrayVTable<V>>::dtype(&self.0)
}
fn encoding(&self) -> EncodingRef {
V::encoding(&self.0)
}
fn encoding_id(&self) -> EncodingId {
V::encoding(&self.0).id()
}
fn slice(&self, range: Range<usize>) -> ArrayRef {
let start = range.start;
let stop = range.end;
if start == 0 && stop == self.len() {
return self.to_array();
}
assert!(
start <= self.len(),
"OutOfBounds: start {start} > length {}",
self.len()
);
assert!(
stop <= self.len(),
"OutOfBounds: stop {stop} > length {}",
self.len()
);
assert!(start <= stop, "start ({start}) must be <= stop ({stop})");
if start == stop {
return Canonical::empty(self.dtype()).into_array();
}
let sliced = <V::OperationsVTable as OperationsVTable<V>>::slice(&self.0, range);
assert_eq!(
sliced.len(),
stop - start,
"Slice length mismatch {}",
self.encoding_id()
);
debug_assert_eq!(
sliced.dtype(),
self.dtype(),
"Slice dtype mismatch {}",
self.encoding_id()
);
if !sliced.is::<ConstantVTable>() {
self.statistics().with_iter(|iter| {
sliced.statistics().inherit(iter.filter(|(stat, value)| {
matches!(
stat,
Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
) && value.as_ref().as_exact().is_some_and(|v| {
Scalar::new(DType::Bool(Nullability::NonNullable), v.clone())
.as_bool()
.value()
.unwrap_or_default()
})
}));
});
}
sliced
}
fn scalar_at(&self, index: usize) -> Scalar {
assert!(index < self.len(), "index {index} out of bounds");
if self.is_invalid(index) {
return Scalar::null(self.dtype().clone());
}
let scalar = <V::OperationsVTable as OperationsVTable<V>>::scalar_at(&self.0, index);
assert_eq!(self.dtype(), scalar.dtype(), "Scalar dtype mismatch");
scalar
}
fn is_valid(&self, index: usize) -> bool {
if index >= self.len() {
vortex_panic!(OutOfBounds: index, 0, self.len());
}
<V::ValidityVTable as ValidityVTable<V>>::is_valid(&self.0, index)
}
fn is_invalid(&self, index: usize) -> bool {
!self.is_valid(index)
}
fn all_valid(&self) -> bool {
<V::ValidityVTable as ValidityVTable<V>>::all_valid(&self.0)
}
fn all_invalid(&self) -> bool {
<V::ValidityVTable as ValidityVTable<V>>::all_invalid(&self.0)
}
fn valid_count(&self) -> usize {
if let Some(Precision::Exact(invalid_count)) =
self.statistics().get_as::<usize>(Stat::NullCount)
{
return self.len() - invalid_count;
}
let count = <V::ValidityVTable as ValidityVTable<V>>::valid_count(&self.0);
assert!(count <= self.len(), "Valid count exceeds array length");
self.statistics()
.set(Stat::NullCount, Precision::exact(self.len() - count));
count
}
fn invalid_count(&self) -> usize {
if let Some(Precision::Exact(invalid_count)) =
self.statistics().get_as::<usize>(Stat::NullCount)
{
return invalid_count;
}
let count = <V::ValidityVTable as ValidityVTable<V>>::invalid_count(&self.0);
assert!(count <= self.len(), "Invalid count exceeds array length");
self.statistics()
.set(Stat::NullCount, Precision::exact(count));
count
}
fn validity_mask(&self) -> Mask {
let mask = <V::ValidityVTable as ValidityVTable<V>>::validity_mask(&self.0);
assert_eq!(mask.len(), self.len(), "Validity mask length mismatch");
mask
}
fn to_canonical(&self) -> Canonical {
let canonical = <V::CanonicalVTable as CanonicalVTable<V>>::canonicalize(&self.0);
assert_eq!(
self.len(),
canonical.as_ref().len(),
"Canonical length mismatch {}. Expected {} but encoded into {}.",
self.encoding_id(),
self.len(),
canonical.as_ref().len()
);
assert_eq!(
self.dtype(),
canonical.as_ref().dtype(),
"Canonical dtype mismatch {}. Expected {} but encoded into {}.",
self.encoding_id(),
self.dtype(),
canonical.as_ref().dtype()
);
canonical
.as_ref()
.statistics()
.inherit_from(self.statistics());
canonical
}
fn append_to_builder(&self, builder: &mut dyn ArrayBuilder) {
if builder.dtype() != self.dtype() {
vortex_panic!(
"Builder dtype mismatch: expected {}, got {}",
self.dtype(),
builder.dtype(),
);
}
let len = builder.len();
<V::CanonicalVTable as CanonicalVTable<V>>::append_to_builder(&self.0, builder);
assert_eq!(
len + self.len(),
builder.len(),
"Builder length mismatch after writing array for encoding {}",
self.encoding_id(),
);
}
fn statistics(&self) -> StatsSetRef<'_> {
<V::ArrayVTable as ArrayVTable<V>>::stats(&self.0)
}
fn with_children(&self, children: &[ArrayRef]) -> VortexResult<ArrayRef> {
struct ReplacementChildren<'a> {
children: &'a [ArrayRef],
}
impl ArrayChildren for ReplacementChildren<'_> {
fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
if index >= self.children.len() {
vortex_bail!(OutOfBounds: index, 0, self.children.len());
}
let child = &self.children[index];
if child.len() != len {
vortex_bail!(
"Child length mismatch: expected {}, got {}",
len,
child.len()
);
}
if child.dtype() != dtype {
vortex_bail!(
"Child dtype mismatch: expected {}, got {}",
dtype,
child.dtype()
);
}
Ok(child.clone())
}
fn len(&self) -> usize {
self.children.len()
}
}
let metadata = self.metadata()?.ok_or_else(|| {
vortex_err!("Cannot replace children for arrays that do not support serialization")
})?;
self.encoding().build(
self.dtype(),
self.len(),
&metadata,
&self.buffers(),
&ReplacementChildren { children },
)
}
fn invoke(
&self,
compute_fn: &ComputeFn,
args: &InvocationArgs,
) -> VortexResult<Option<Output>> {
<V::ComputeVTable as ComputeVTable<V>>::invoke(&self.0, compute_fn, args)
}
fn to_operator(&self) -> VortexResult<Option<OperatorRef>> {
<V::PipelineVTable as PipelineVTable<V>>::to_operator(&self.0)
}
}
impl<V: VTable> ArrayVisitor for ArrayAdapter<V> {
fn children(&self) -> Vec<ArrayRef> {
struct ChildrenCollector {
children: Vec<ArrayRef>,
}
impl ArrayChildVisitor for ChildrenCollector {
fn visit_child(&mut self, _name: &str, array: &dyn Array) {
self.children.push(array.to_array());
}
}
let mut collector = ChildrenCollector {
children: Vec::new(),
};
<V::VisitorVTable as VisitorVTable<V>>::visit_children(&self.0, &mut collector);
collector.children
}
fn nchildren(&self) -> usize {
<V::VisitorVTable as VisitorVTable<V>>::nchildren(&self.0)
}
fn children_names(&self) -> Vec<String> {
struct ChildNameCollector {
names: Vec<String>,
}
impl ArrayChildVisitor for ChildNameCollector {
fn visit_child(&mut self, name: &str, _array: &dyn Array) {
self.names.push(name.to_string());
}
}
let mut collector = ChildNameCollector { names: Vec::new() };
<V::VisitorVTable as VisitorVTable<V>>::visit_children(&self.0, &mut collector);
collector.names
}
fn named_children(&self) -> Vec<(String, ArrayRef)> {
struct NamedChildrenCollector {
children: Vec<(String, ArrayRef)>,
}
impl ArrayChildVisitor for NamedChildrenCollector {
fn visit_child(&mut self, name: &str, array: &dyn Array) {
self.children.push((name.to_string(), array.to_array()));
}
}
let mut collector = NamedChildrenCollector {
children: Vec::new(),
};
<V::VisitorVTable as VisitorVTable<V>>::visit_children(&self.0, &mut collector);
collector.children
}
fn buffers(&self) -> Vec<ByteBuffer> {
struct BufferCollector {
buffers: Vec<ByteBuffer>,
}
impl ArrayBufferVisitor for BufferCollector {
fn visit_buffer(&mut self, buffer: &ByteBuffer) {
self.buffers.push(buffer.clone());
}
}
let mut collector = BufferCollector {
buffers: Vec::new(),
};
<V::VisitorVTable as VisitorVTable<V>>::visit_buffers(&self.0, &mut collector);
collector.buffers
}
fn nbuffers(&self) -> usize {
<V::VisitorVTable as VisitorVTable<V>>::nbuffers(&self.0)
}
fn metadata(&self) -> VortexResult<Option<Vec<u8>>> {
Ok(<V::SerdeVTable as SerdeVTable<V>>::metadata(&self.0)?.map(|m| m.serialize()))
}
fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match <V::SerdeVTable as SerdeVTable<V>>::metadata(&self.0) {
Err(e) => write!(f, "<serde error: {e}>"),
Ok(None) => write!(f, "<serde not supported>"),
Ok(Some(metadata)) => Debug::fmt(&metadata, f),
}
}
}