vortex_array/operator/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines a new way of modelling arrays and expressions in Vortex. To avoid naming
5//! conflicts, we refer to the new model as "operators".
6//!
7//! Operators form a more traditional "logical plan" as might be seen in other query engines.
8//! Each operator supports one primary function which is to produce a canonical representation of
9//! its data, known as `canonicalization`. Operators have the option to produce this canonical
10//! form using different execution models, including batch, pipelined, and GPU.
11//!
12//! Initial designs for this module involved passing masks down through the physical execution
13//! tree as futures, allowing operators to skip computation for rows that are not needed. We
14//! ultimately decided against this approach and instead introduce a `Filter` operator
15//! that can be pushed down in the same way as any other operator.
16//!
17//! On the one hand, this means common subtree elimination is much easier, since we know the mask
18//! or identity of the mask future inside the filter operator up-front. On the other hand, it
19//! means that an operator no longer has a known length. In the end state, we will redefine a
20//! Vortex array to be a wrapped around an operator that _does_ have a known length, amongst other
21//! properties (such as non-blocking evaluation).
22//!
23//! We also introduce the idea of an executor that can evaluate an operator tree efficiently. It
24//! supports common subtree elimination, as well as extracting sub-graphs for pipelined and GPU
25//! execution. The executor is also responsible for managing memory and scheduling work across
26//! different execution resources.
27
28pub mod canonical;
29pub mod compare;
30mod display;
31pub mod filter;
32pub mod getitem;
33mod hash;
34pub mod metrics;
35mod optimize;
36pub mod slice;
37
38use std::any::{Any, type_name};
39use std::fmt::Debug;
40use std::sync::Arc;
41
42use arcref::ArcRef;
43use async_trait::async_trait;
44pub use display::*;
45pub use hash::*;
46use vortex_dtype::DType;
47use vortex_error::VortexResult;
48
49use crate::Canonical;
50use crate::pipeline::PipelinedOperator;
51
52pub type OperatorId = ArcRef<str>;
53pub type OperatorRef = Arc<dyn Operator>;
54
55/// An operator represents a node in a logical query plan.
56pub trait Operator: 'static + Send + Sync + Debug + DynOperatorHash + DynOperatorEq {
57    /// The unique identifier for this operator instance.
58    fn id(&self) -> OperatorId;
59
60    /// For downcasting.
61    fn as_any(&self) -> &dyn Any;
62
63    /// Returns the [`DType`] of the array produced by this operator.
64    fn dtype(&self) -> &DType;
65
66    /// Returns the number of rows produced by this operator.
67    fn len(&self) -> usize;
68
69    /// Returns whether this operator produces zero rows.
70    fn is_empty(&self) -> bool {
71        self.len() == 0
72    }
73
74    // TODO(ngates): add StatsSet?
75
76    /// The children of this operator.
77    fn children(&self) -> &[OperatorRef];
78
79    /// The number of children of this operator.
80    fn nchildren(&self) -> usize {
81        self.children().len()
82    }
83
84    /// Override the default formatting of this operator.
85    fn fmt_as(&self, _df: DisplayFormat, f: &mut std::fmt::Formatter) -> std::fmt::Result {
86        write!(f, "{}", type_name::<Self>())
87    }
88
89    /// Create a new instance of this operator with the given children.
90    ///
91    /// ## Panics
92    ///
93    /// Panics if the number or dtypes of children are incorrect.
94    ///
95    fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef>;
96
97    /// Attempt to optimize this node by analyzing its children.
98    ///
99    /// For example, if all the children are constant, this function should perform constant
100    /// folding and return a constant operator.
101    ///
102    /// This function should typically be implemented only for self-contained optimizations based
103    /// on child properties
104    fn reduce_children(&self) -> VortexResult<Option<OperatorRef>> {
105        Ok(None)
106    }
107
108    /// Attempt to push down a parent operator through this node.
109    ///
110    /// The `child_idx` parameter indicates which child of the parent this operator occupies.
111    /// For example, if the parent is a binary operator, and this operator is the left child,
112    /// then `child_idx` will be 0. If this operator is the right child, then `child_idx` will be 1.
113    ///
114    /// The returned operator will replace the parent in the tree.
115    ///
116    /// This function should typically be implemented for cross-operator optimizations where the
117    /// child needs to adapt to the parent's requirements
118    fn reduce_parent(
119        &self,
120        _parent: OperatorRef,
121        _child_idx: usize,
122    ) -> VortexResult<Option<OperatorRef>> {
123        Ok(None)
124    }
125
126    /// Return `true` if the given child is considered to be a selection target.
127    ///
128    /// The definition of this is such that pushing a selection operator down to all selection
129    /// targets will result in the same output as a selection on this operator.
130    ///
131    /// For example, `select(Op, mask) == Op(select(child, mask), ...)` for all children that are
132    /// selection targets.
133    ///
134    /// If any child index returns `None`, then selection pushdown is not possible.
135    /// If all children return `Some(false)`, then selection pushdown is not possible.
136    fn is_selection_target(&self, _child_idx: usize) -> Option<bool> {
137        None
138    }
139
140    /// Returns this operator as a [`BatchOperator`] if it supports batch execution.
141    fn as_batch(&self) -> Option<&dyn BatchOperator> {
142        None
143    }
144
145    /// Returns this operator as a [`PipelinedOperator`] if it supports pipelined execution.
146    ///
147    /// Note that operators that implement [`PipelinedOperator`] *do not need* to implement
148    /// [`BatchOperator`], although they may choose to do so.
149    fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
150        None
151    }
152}
153
154/// The default execution mode for an operator is batch mode.
155pub trait BatchOperator: Operator {
156    fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef>;
157}
158
159pub trait BatchBindCtx {
160    /// Returns the execution for the child at the given index, consuming it from the context.
161    /// Each child may be consumed only once.
162    fn child(&mut self, idx: usize) -> VortexResult<BatchExecutionRef>;
163}
164
165/// The primary execution trait for operators.
166///
167/// Alternatively, or additionally, operators may choose to implement [`PipelinedOperator`].
168#[async_trait]
169pub trait BatchExecution: Send {
170    async fn execute(self: Box<Self>) -> VortexResult<Canonical>;
171}
172
173pub type BatchExecutionRef = Box<dyn BatchExecution>;