vortex_array/operator/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines a new way of modelling arrays and expressions in Vortex. To avoid naming
5//! conflicts, we refer to the new model as "operators".
6//!
7//! Operators form a more traditional "logical plan" as might be seen in other query engines.
8//! Each operator supports one primary function which is to produce a canonical representation of
9//! its data, known as `canonicalization`. Operators have the option to produce this canonical
10//! form using different execution models, including batch, pipelined, and GPU.
11//!
12//! Initial designs for this module involved passing masks down through the physical execution
13//! tree as futures, allowing operators to skip computation for rows that are not needed. We
14//! ultimately decided against this approach and instead introduce a `Filter` operator
15//! that can be pushed down in the same way as any other operator.
16//!
17//! On the one hand, this means common subtree elimination is much easier, since we know the mask
18//! or identity of the mask future inside the filter operator up-front. On the other hand, it
19//! means that an operator no longer has a known length. In the end state, we will redefine a
20//! Vortex array to be a wrapped around an operator that _does_ have a known length, amongst other
21//! properties (such as non-blocking evaluation).
22//!
23//! We also introduce the idea of an executor that can evaluate an operator tree efficiently. It
24//! supports common subtree elimination, as well as extracting sub-graphs for pipelined and GPU
25//! execution. The executor is also responsible for managing memory and scheduling work across
26//! different execution resources.
27
28pub mod canonical;
29pub mod compare;
30mod display;
31pub mod filter;
32pub mod getitem;
33mod hash;
34pub mod metrics;
35mod optimize;
36pub mod slice;
37
38use std::any::{Any, type_name};
39use std::fmt::Debug;
40use std::sync::Arc;
41
42use arcref::ArcRef;
43use async_trait::async_trait;
44pub use display::*;
45pub use hash::*;
46use vortex_dtype::DType;
47use vortex_error::VortexResult;
48
49use crate::Canonical;
50use crate::pipeline::PipelinedOperator;
51
52pub type OperatorId = ArcRef<str>;
53pub type OperatorRef = Arc<dyn Operator>;
54
55/// An operator represents a node in a logical query plan.
56pub trait Operator: 'static + Send + Sync + Debug + DynOperatorHash + DynOperatorEq {
57 /// The unique identifier for this operator instance.
58 fn id(&self) -> OperatorId;
59
60 /// For downcasting.
61 fn as_any(&self) -> &dyn Any;
62
63 /// Returns the [`DType`] of the array produced by this operator.
64 fn dtype(&self) -> &DType;
65
66 /// Returns the number of rows produced by this operator.
67 fn len(&self) -> usize;
68
69 /// Returns whether this operator produces zero rows.
70 fn is_empty(&self) -> bool {
71 self.len() == 0
72 }
73
74 // TODO(ngates): add StatsSet?
75
76 /// The children of this operator.
77 fn children(&self) -> &[OperatorRef];
78
79 /// The number of children of this operator.
80 fn nchildren(&self) -> usize {
81 self.children().len()
82 }
83
84 /// Override the default formatting of this operator.
85 fn fmt_as(&self, _df: DisplayFormat, f: &mut std::fmt::Formatter) -> std::fmt::Result {
86 write!(f, "{}", type_name::<Self>())
87 }
88
89 /// Create a new instance of this operator with the given children.
90 ///
91 /// ## Panics
92 ///
93 /// Panics if the number or dtypes of children are incorrect.
94 ///
95 fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef>;
96
97 /// Attempt to optimize this node by analyzing its children.
98 ///
99 /// For example, if all the children are constant, this function should perform constant
100 /// folding and return a constant operator.
101 ///
102 /// This function should typically be implemented only for self-contained optimizations based
103 /// on child properties
104 fn reduce_children(&self) -> VortexResult<Option<OperatorRef>> {
105 Ok(None)
106 }
107
108 /// Attempt to push down a parent operator through this node.
109 ///
110 /// The `child_idx` parameter indicates which child of the parent this operator occupies.
111 /// For example, if the parent is a binary operator, and this operator is the left child,
112 /// then `child_idx` will be 0. If this operator is the right child, then `child_idx` will be 1.
113 ///
114 /// The returned operator will replace the parent in the tree.
115 ///
116 /// This function should typically be implemented for cross-operator optimizations where the
117 /// child needs to adapt to the parent's requirements
118 fn reduce_parent(
119 &self,
120 _parent: OperatorRef,
121 _child_idx: usize,
122 ) -> VortexResult<Option<OperatorRef>> {
123 Ok(None)
124 }
125
126 /// Return `true` if the given child is considered to be a selection target.
127 ///
128 /// The definition of this is such that pushing a selection operator down to all selection
129 /// targets will result in the same output as a selection on this operator.
130 ///
131 /// For example, `select(Op, mask) == Op(select(child, mask), ...)` for all children that are
132 /// selection targets.
133 ///
134 /// If any child index returns `None`, then selection pushdown is not possible.
135 /// If all children return `Some(false)`, then selection pushdown is not possible.
136 fn is_selection_target(&self, _child_idx: usize) -> Option<bool> {
137 None
138 }
139
140 /// Returns this operator as a [`BatchOperator`] if it supports batch execution.
141 fn as_batch(&self) -> Option<&dyn BatchOperator> {
142 None
143 }
144
145 /// Returns this operator as a [`PipelinedOperator`] if it supports pipelined execution.
146 ///
147 /// Note that operators that implement [`PipelinedOperator`] *do not need* to implement
148 /// [`BatchOperator`], although they may choose to do so.
149 fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
150 None
151 }
152}
153
154/// The default execution mode for an operator is batch mode.
155pub trait BatchOperator: Operator {
156 fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef>;
157}
158
159pub trait BatchBindCtx {
160 /// Returns the execution for the child at the given index, consuming it from the context.
161 /// Each child may be consumed only once.
162 fn child(&mut self, idx: usize) -> VortexResult<BatchExecutionRef>;
163}
164
165/// The primary execution trait for operators.
166///
167/// Alternatively, or additionally, operators may choose to implement [`PipelinedOperator`].
168#[async_trait]
169pub trait BatchExecution: Send {
170 async fn execute(self: Box<Self>) -> VortexResult<Canonical>;
171}
172
173pub type BatchExecutionRef = Box<dyn BatchExecution>;