vortex_array/operator/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines a new way of modelling arrays and expressions in Vortex. To avoid naming
5//! conflicts, we refer to the new model as "operators".
6//!
7//! Operators form a more traditional "logical plan" as might be seen in other query engines.
8//! Each operator supports one primary function which is to produce a canonical representation of
9//! its data, known as `canonicalization`. Operators have the option to produce this canonical
10//! form using different execution models, including batch, pipelined, and GPU.
11//!
12//! Initial designs for this module involved passing masks down through the physical execution
13//! tree as futures, allowing operators to skip computation for rows that are not needed. We
14//! ultimately decided against this approach and instead introduce a `Filter` operator
15//! that can be pushed down in the same way as any other operator.
16//!
17//! On the one hand, this means common subtree elimination is much easier, since we know the mask
18//! or identity of the mask future inside the filter operator up-front. On the other hand, it
19//! means that an operator no longer has a known length. In the end state, we will redefine a
20//! Vortex array to be a wrapped around an operator that _does_ have a known length, amongst other
21//! properties (such as non-blocking evaluation).
22//!
23//! We also introduce the idea of an executor that can evaluate an operator tree efficiently. It
24//! supports common subtree elimination, as well as extracting sub-graphs for pipelined and GPU
25//! execution. The executor is also responsible for managing memory and scheduling work across
26//! different execution resources.
27
28pub mod canonical;
29pub mod compare;
30mod display;
31pub mod filter;
32pub mod getitem;
33mod hash;
34pub mod metrics;
35mod optimize;
36pub mod slice;
37
38use std::any::{Any, type_name};
39use std::fmt;
40use std::fmt::{Debug, Formatter};
41use std::ops::BitAnd;
42use std::sync::Arc;
43
44use arcref::ArcRef;
45use async_trait::async_trait;
46pub use display::*;
47pub use hash::*;
48use termtree::Tree;
49use vortex_dtype::DType;
50use vortex_error::VortexResult;
51
52use crate::Canonical;
53use crate::pipeline::PipelinedOperator;
54
55pub type OperatorId = ArcRef<str>;
56pub type OperatorRef = Arc<dyn Operator>;
57
58/// An operator represents a node in a logical query plan.
59pub trait Operator: 'static + Send + Sync + Debug + DynOperatorHash + DynOperatorEq {
60    /// The unique identifier for this operator instance.
61    fn id(&self) -> OperatorId;
62
63    /// For downcasting.
64    fn as_any(&self) -> &dyn Any;
65
66    /// Returns the [`DType`] of the array produced by this operator.
67    fn dtype(&self) -> &DType;
68
69    /// Returns the bounds on the number of rows produced by this operator.
70    fn bounds(&self) -> LengthBounds;
71
72    /// Returns the exact number of rows produced by this operator, if known.
73    fn len(&self) -> Option<usize> {
74        self.bounds().maybe_len()
75    }
76
77    /// Returns if this operator is known to be empty (i.e. max bound is 0).
78    fn is_empty(&self) -> bool {
79        self.bounds().max == 0
80    }
81
82    /// The children of this operator.
83    fn children(&self) -> &[OperatorRef];
84
85    /// The number of children of this operator.
86    fn nchildren(&self) -> usize {
87        self.children().len()
88    }
89
90    /// Override the default formatting of this operator.
91    fn fmt_as(&self, _df: DisplayFormat, f: &mut Formatter) -> fmt::Result {
92        write!(f, "{}", type_name::<Self>())
93    }
94
95    fn fmt_all(&self) -> String {
96        let node_name = TreeNodeDisplay(self).to_string();
97        let child_trees: Vec<_> = self
98            .children()
99            .iter()
100            .map(|child| child.fmt_all())
101            .collect();
102        Tree::new(node_name)
103            .with_leaves(child_trees)
104            .with_multiline(true)
105            .to_string()
106    }
107
108    /// Create a new instance of this operator with the given children.
109    ///
110    /// ## Panics
111    ///
112    /// Panics if the number or dtypes of children are incorrect.
113    ///
114    fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef>;
115
116    /// Attempt to optimize this node by analyzing its children.
117    ///
118    /// For example, if all the children are constant, this function should perform constant
119    /// folding and return a constant operator.
120    ///
121    /// This function should typically be implemented only for self-contained optimizations based
122    /// on child properties
123    fn reduce_children(&self) -> VortexResult<Option<OperatorRef>> {
124        Ok(None)
125    }
126
127    /// Attempt to push down a parent operator through this node.
128    ///
129    /// The `child_idx` parameter indicates which child of the parent this operator occupies.
130    /// For example, if the parent is a binary operator, and this operator is the left child,
131    /// then `child_idx` will be 0. If this operator is the right child, then `child_idx` will be 1.
132    ///
133    /// The returned operator will replace the parent in the tree.
134    ///
135    /// This function should typically be implemented for cross-operator optimizations where the
136    /// child needs to adapt to the parent's requirements
137    fn reduce_parent(
138        &self,
139        _parent: OperatorRef,
140        _child_idx: usize,
141    ) -> VortexResult<Option<OperatorRef>> {
142        Ok(None)
143    }
144
145    /// Return `true` if the given child is considered to be a selection target.
146    ///
147    /// The definition of this is such that pushing a selection operator down to all selection
148    /// targets will result in the same output as a selection on this operator.
149    ///
150    /// For example, `select(Op, mask) == Op(select(child, mask), ...)` for all children that are
151    /// selection targets.
152    ///
153    /// If any child index returns `None`, then selection pushdown is not possible.
154    /// If all children return `Some(false)`, then selection pushdown is not possible.
155    fn is_selection_target(&self, _child_idx: usize) -> Option<bool> {
156        None
157    }
158
159    /// Returns this operator as a [`BatchOperator`] if it supports batch execution.
160    fn as_batch(&self) -> Option<&dyn BatchOperator> {
161        None
162    }
163
164    /// Returns this operator as a [`PipelinedOperator`] if it supports pipelined execution.
165    ///
166    /// Note that operators that implement [`PipelinedOperator`] *do not need* to implement
167    /// [`BatchOperator`], although they may choose to do so.
168    fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
169        None
170    }
171}
172
173/// Represents the known row count bounds of an operator.
174#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
175pub struct LengthBounds {
176    pub min: usize,
177    pub max: usize,
178}
179
180impl LengthBounds {
181    pub fn maybe_len(&self) -> Option<usize> {
182        (self.min == self.max).then_some(self.min)
183    }
184
185    pub fn contains(&self, len: usize) -> bool {
186        self.min <= len && len <= self.max
187    }
188
189    pub fn intersect_all<I: IntoIterator<Item = LengthBounds>>(iters: I) -> Self {
190        let mut min = 0;
191        let mut max = 0;
192        for bounds in iters {
193            min = min.max(bounds.min);
194            max = max.min(bounds.max);
195        }
196        Self { min, max }
197    }
198}
199
200impl BitAnd for LengthBounds {
201    type Output = Self;
202
203    fn bitand(self, rhs: Self) -> Self::Output {
204        Self {
205            min: self.min.max(rhs.min),
206            max: self.max.min(rhs.max),
207        }
208    }
209}
210
211impl From<usize> for LengthBounds {
212    fn from(value: usize) -> Self {
213        Self {
214            min: value,
215            max: value,
216        }
217    }
218}
219
220/// The default execution mode for an operator is batch mode.
221pub trait BatchOperator: Operator {
222    fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef>;
223}
224
225pub trait BatchBindCtx {
226    /// Returns the execution for the child at the given index, consuming it from the context.
227    /// Each child may be consumed only once.
228    fn child(&mut self, idx: usize) -> VortexResult<BatchExecutionRef>;
229}
230
231/// The primary execution trait for operators.
232///
233/// Alternatively, or additionally, operators may choose to implement [`PipelinedOperator`].
234#[async_trait]
235pub trait BatchExecution: Send {
236    async fn execute(self: Box<Self>) -> VortexResult<Canonical>;
237}
238
239pub type BatchExecutionRef = Box<dyn BatchExecution>;