pub mod canonical;
pub mod compare;
mod display;
pub mod filter;
pub mod getitem;
mod hash;
pub mod metrics;
mod optimize;
pub mod slice;
use std::any::{Any, type_name};
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::ops::BitAnd;
use std::sync::Arc;
use arcref::ArcRef;
use async_trait::async_trait;
pub use display::*;
pub use hash::*;
use termtree::Tree;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use crate::Canonical;
use crate::pipeline::PipelinedOperator;
pub type OperatorId = ArcRef<str>;
pub type OperatorRef = Arc<dyn Operator>;
pub trait Operator: 'static + Send + Sync + Debug + DynOperatorHash + DynOperatorEq {
fn id(&self) -> OperatorId;
fn as_any(&self) -> &dyn Any;
fn dtype(&self) -> &DType;
fn bounds(&self) -> LengthBounds;
fn len(&self) -> Option<usize> {
self.bounds().maybe_len()
}
fn is_empty(&self) -> bool {
self.bounds().max == 0
}
fn children(&self) -> &[OperatorRef];
fn nchildren(&self) -> usize {
self.children().len()
}
fn fmt_as(&self, _df: DisplayFormat, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", type_name::<Self>())
}
fn fmt_all(&self) -> String {
let node_name = TreeNodeDisplay(self).to_string();
let child_trees: Vec<_> = self
.children()
.iter()
.map(|child| child.fmt_all())
.collect();
Tree::new(node_name)
.with_leaves(child_trees)
.with_multiline(true)
.to_string()
}
fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef>;
fn reduce_children(&self) -> VortexResult<Option<OperatorRef>> {
Ok(None)
}
fn reduce_parent(
&self,
_parent: OperatorRef,
_child_idx: usize,
) -> VortexResult<Option<OperatorRef>> {
Ok(None)
}
fn is_selection_target(&self, _child_idx: usize) -> Option<bool> {
None
}
fn as_batch(&self) -> Option<&dyn BatchOperator> {
None
}
fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
None
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct LengthBounds {
pub min: usize,
pub max: usize,
}
impl LengthBounds {
pub fn maybe_len(&self) -> Option<usize> {
(self.min == self.max).then_some(self.min)
}
pub fn contains(&self, len: usize) -> bool {
self.min <= len && len <= self.max
}
pub fn intersect_all<I: IntoIterator<Item = LengthBounds>>(iters: I) -> Self {
let mut min = 0;
let mut max = 0;
for bounds in iters {
min = min.max(bounds.min);
max = max.min(bounds.max);
}
Self { min, max }
}
}
impl BitAnd for LengthBounds {
type Output = Self;
fn bitand(self, rhs: Self) -> Self::Output {
Self {
min: self.min.max(rhs.min),
max: self.max.min(rhs.max),
}
}
}
impl From<usize> for LengthBounds {
fn from(value: usize) -> Self {
Self {
min: value,
max: value,
}
}
}
pub trait BatchOperator: Operator {
fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef>;
}
pub trait BatchBindCtx {
fn child(&mut self, idx: usize) -> VortexResult<BatchExecutionRef>;
}
#[async_trait]
pub trait BatchExecution: Send {
async fn execute(self: Box<Self>) -> VortexResult<Canonical>;
}
pub type BatchExecutionRef = Box<dyn BatchExecution>;