vortex_array/operator/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines a new way of modelling arrays and expressions in Vortex. To avoid naming
5//! conflicts, we refer to the new model as "operators".
6//!
7//! Operators form a more traditional "logical plan" as might be seen in other query engines.
8//! Each operator supports one primary function which is to produce a canonical representation of
9//! its data, known as `canonicalization`. Operators have the option to produce this canonical
10//! form using different execution models, including batch, pipelined, and GPU.
11//!
12//! Initial designs for this module involved passing masks down through the physical execution
13//! tree as futures, allowing operators to skip computation for rows that are not needed. We
14//! ultimately decided against this approach and instead introduce a `Filter` operator
15//! that can be pushed down in the same way as any other operator.
16//!
17//! On the one hand, this means common subtree elimination is much easier, since we know the mask
18//! or identity of the mask future inside the filter operator up-front. On the other hand, it
19//! means that an operator no longer has a known length. In the end state, we will redefine a
20//! Vortex array to be a wrapped around an operator that _does_ have a known length, amongst other
21//! properties (such as non-blocking evaluation).
22//!
23//! We also introduce the idea of an executor that can evaluate an operator tree efficiently. It
24//! supports common subtree elimination, as well as extracting sub-graphs for pipelined and GPU
25//! execution. The executor is also responsible for managing memory and scheduling work across
26//! different execution resources.
27
28pub mod canonical;
29pub mod compare;
30mod display;
31pub mod filter;
32pub mod getitem;
33mod hash;
34pub mod metrics;
35mod optimize;
36pub mod slice;
37
38use std::any::{Any, type_name};
39use std::fmt;
40use std::fmt::{Debug, Formatter};
41use std::ops::BitAnd;
42use std::sync::Arc;
43
44use arcref::ArcRef;
45use async_trait::async_trait;
46pub use display::*;
47pub use hash::*;
48use termtree::Tree;
49use vortex_dtype::DType;
50use vortex_error::VortexResult;
51
52use crate::Canonical;
53use crate::pipeline::PipelinedOperator;
54
55pub type OperatorId = ArcRef<str>;
56pub type OperatorRef = Arc<dyn Operator>;
57
58/// An operator represents a node in a logical query plan.
59pub trait Operator: 'static + Send + Sync + Debug + DynOperatorHash + DynOperatorEq {
60 /// The unique identifier for this operator instance.
61 fn id(&self) -> OperatorId;
62
63 /// For downcasting.
64 fn as_any(&self) -> &dyn Any;
65
66 /// Returns the [`DType`] of the array produced by this operator.
67 fn dtype(&self) -> &DType;
68
69 /// Returns the bounds on the number of rows produced by this operator.
70 fn bounds(&self) -> LengthBounds;
71
72 /// Returns the exact number of rows produced by this operator, if known.
73 fn len(&self) -> Option<usize> {
74 self.bounds().maybe_len()
75 }
76
77 /// Returns if this operator is known to be empty (i.e. max bound is 0).
78 fn is_empty(&self) -> bool {
79 self.bounds().max == 0
80 }
81
82 /// The children of this operator.
83 fn children(&self) -> &[OperatorRef];
84
85 /// The number of children of this operator.
86 fn nchildren(&self) -> usize {
87 self.children().len()
88 }
89
90 /// Override the default formatting of this operator.
91 fn fmt_as(&self, _df: DisplayFormat, f: &mut Formatter) -> fmt::Result {
92 write!(f, "{}", type_name::<Self>())
93 }
94
95 fn fmt_all(&self) -> String {
96 let node_name = TreeNodeDisplay(self).to_string();
97 let child_trees: Vec<_> = self
98 .children()
99 .iter()
100 .map(|child| child.fmt_all())
101 .collect();
102 Tree::new(node_name)
103 .with_leaves(child_trees)
104 .with_multiline(true)
105 .to_string()
106 }
107
108 /// Create a new instance of this operator with the given children.
109 ///
110 /// ## Panics
111 ///
112 /// Panics if the number or dtypes of children are incorrect.
113 ///
114 fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef>;
115
116 /// Attempt to optimize this node by analyzing its children.
117 ///
118 /// For example, if all the children are constant, this function should perform constant
119 /// folding and return a constant operator.
120 ///
121 /// This function should typically be implemented only for self-contained optimizations based
122 /// on child properties
123 fn reduce_children(&self) -> VortexResult<Option<OperatorRef>> {
124 Ok(None)
125 }
126
127 /// Attempt to push down a parent operator through this node.
128 ///
129 /// The `child_idx` parameter indicates which child of the parent this operator occupies.
130 /// For example, if the parent is a binary operator, and this operator is the left child,
131 /// then `child_idx` will be 0. If this operator is the right child, then `child_idx` will be 1.
132 ///
133 /// The returned operator will replace the parent in the tree.
134 ///
135 /// This function should typically be implemented for cross-operator optimizations where the
136 /// child needs to adapt to the parent's requirements
137 fn reduce_parent(
138 &self,
139 _parent: OperatorRef,
140 _child_idx: usize,
141 ) -> VortexResult<Option<OperatorRef>> {
142 Ok(None)
143 }
144
145 /// Return `true` if the given child is considered to be a selection target.
146 ///
147 /// The definition of this is such that pushing a selection operator down to all selection
148 /// targets will result in the same output as a selection on this operator.
149 ///
150 /// For example, `select(Op, mask) == Op(select(child, mask), ...)` for all children that are
151 /// selection targets.
152 ///
153 /// If any child index returns `None`, then selection pushdown is not possible.
154 /// If all children return `Some(false)`, then selection pushdown is not possible.
155 fn is_selection_target(&self, _child_idx: usize) -> Option<bool> {
156 None
157 }
158
159 /// Returns this operator as a [`BatchOperator`] if it supports batch execution.
160 fn as_batch(&self) -> Option<&dyn BatchOperator> {
161 None
162 }
163
164 /// Returns this operator as a [`PipelinedOperator`] if it supports pipelined execution.
165 ///
166 /// Note that operators that implement [`PipelinedOperator`] *do not need* to implement
167 /// [`BatchOperator`], although they may choose to do so.
168 fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
169 None
170 }
171}
172
173/// Represents the known row count bounds of an operator.
174#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
175pub struct LengthBounds {
176 pub min: usize,
177 pub max: usize,
178}
179
180impl LengthBounds {
181 pub fn maybe_len(&self) -> Option<usize> {
182 (self.min == self.max).then_some(self.min)
183 }
184
185 pub fn contains(&self, len: usize) -> bool {
186 self.min <= len && len <= self.max
187 }
188
189 pub fn intersect_all<I: IntoIterator<Item = LengthBounds>>(iters: I) -> Self {
190 let mut min = 0;
191 let mut max = 0;
192 for bounds in iters {
193 min = min.max(bounds.min);
194 max = max.min(bounds.max);
195 }
196 Self { min, max }
197 }
198}
199
200impl BitAnd for LengthBounds {
201 type Output = Self;
202
203 fn bitand(self, rhs: Self) -> Self::Output {
204 Self {
205 min: self.min.max(rhs.min),
206 max: self.max.min(rhs.max),
207 }
208 }
209}
210
211impl From<usize> for LengthBounds {
212 fn from(value: usize) -> Self {
213 Self {
214 min: value,
215 max: value,
216 }
217 }
218}
219
220/// The default execution mode for an operator is batch mode.
221pub trait BatchOperator: Operator {
222 fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef>;
223}
224
225pub trait BatchBindCtx {
226 /// Returns the execution for the child at the given index, consuming it from the context.
227 /// Each child may be consumed only once.
228 fn child(&mut self, idx: usize) -> VortexResult<BatchExecutionRef>;
229}
230
231/// The primary execution trait for operators.
232///
233/// Alternatively, or additionally, operators may choose to implement [`PipelinedOperator`].
234#[async_trait]
235pub trait BatchExecution: Send {
236 async fn execute(self: Box<Self>) -> VortexResult<Canonical>;
237}
238
239pub type BatchExecutionRef = Box<dyn BatchExecution>;