vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::future::{BoxFuture, Shared, WeakShared};
8use futures::{FutureExt, TryFutureExt};
9use itertools::Itertools;
10use vortex_error::{
11    SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err,
12};
13use vortex_utils::aliases::hash_map::HashMap;
14
15use crate::Canonical;
16use crate::operator::{BatchBindCtx, BatchExecution, BatchExecutionRef, OperatorKey, OperatorRef};
17use crate::pipeline::operator::PipelineOperator;
18
19/// An executor that runs an operator tree.
20///
21/// The executor performs common subtree elimination by creating BatchExecution nodes that hold
22/// shared futures to the underlying execution.
23///
24/// It also finds sub-graphs of operator operators and executes them as a operator.
25#[derive(Default)]
26pub struct Executor {
27    /// Cache of shared futures for common subtree elimination.
28    /// We use WeakShared to allow futures to be dropped when no longer needed.
29    execution_cache: HashMap<
30        OperatorKey<OperatorRef>,
31        WeakShared<BoxFuture<'static, SharedVortexResult<Canonical>>>,
32    >,
33}
34
35impl Executor {
36    /// Returns an execution future for the given operator.
37    pub fn execute(
38        &mut self,
39        operator: OperatorRef,
40    ) -> BoxFuture<'static, VortexResult<Canonical>> {
41        let execution = self.batch_execution(&operator);
42        async move { execution?.execute().await }.boxed()
43    }
44
45    fn batch_execution(&mut self, operator: &OperatorRef) -> VortexResult<BatchExecutionRef> {
46        // FIXME(ngates): we should have a separate optimize call that turns the operator tree
47        //  into a DAG by inserting shared CSE nodes, each of which has the ability to construct
48        //  a shared execution future... somehow...
49
50        // Check if we already have a shared future for this operator
51        let key = OperatorKey(operator.clone());
52        if let Some(weak_shared) = self.execution_cache.get(&key) {
53            if let Some(shared) = weak_shared.upgrade() {
54                // Return a SharedBatchExecution that references the existing shared future
55                return Ok(Box::new(SharedBatchExecution(shared)));
56            } else {
57                // If the weak reference is dead, remove it from the cache
58                self.execution_cache.remove(&key);
59            }
60        }
61
62        // Attempt to convert the operator into a operator operator, if so we use that to execute.
63        //
64        // The construction of this operator pulls the largest subgraph of nodes that can be
65        // executed in a pipelined fashion.
66        let operator = match PipelineOperator::new(operator.clone()) {
67            None => operator.clone(),
68            Some(pipeline_op) => Arc::new(pipeline_op),
69        };
70
71        log::info!("Executing operator: {}", operator.display_tree());
72        println!("Executing operator: {}", operator.display_tree());
73
74        // For each child, create a batch execution that uses the executor to compute it.
75        let mut children: Vec<_> = operator
76            .children()
77            .iter()
78            .map(|child| self.batch_execution(child))
79            .map_ok(Some)
80            .try_collect()?;
81
82        let execution = operator
83            .as_batch()
84            .ok_or_else(|| {
85                vortex_err!(
86                    "Operator does not support batch execution OR pipelined execution: {:?}",
87                    operator
88                )
89            })?
90            .bind(&mut children)?;
91
92        let shared_future = execution.execute().map_err(Arc::new).boxed().shared();
93        self.execution_cache.insert(
94            OperatorKey(operator),
95            shared_future.downgrade().vortex_expect("just created"),
96        );
97        Ok(Box::new(SharedBatchExecution(shared_future)))
98    }
99}
100
101impl BatchBindCtx for Vec<Option<BatchExecutionRef>> {
102    fn child(&mut self, idx: usize) -> VortexResult<BatchExecutionRef> {
103        if idx >= self.len() {
104            vortex_bail!("Child index {} out of bounds", idx);
105        }
106        self[idx]
107            .take()
108            .ok_or_else(|| vortex_err!("Child already consumed"))
109    }
110}
111
112/// A wrapper around a batch execution that makes it available for sharing across nodes within
113/// common subtree elimination.
114///
115// TODO(ngates): I think we could turn this into a full operator so that the tree display
116//  makes more sense? Currently we just perform CSE during execution, rather than an up-front
117//  optimization.
118struct SharedBatchExecution(Shared<BoxFuture<'static, SharedVortexResult<Canonical>>>);
119
120#[async_trait]
121impl BatchExecution for SharedBatchExecution {
122    async fn execute(self: Box<Self>) -> VortexResult<Canonical> {
123        self.0.await.map_err(VortexError::from)
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use futures::executor::block_on;
130    use vortex_buffer::buffer;
131    use vortex_metrics::VortexMetrics;
132
133    use super::*;
134    use crate::compute::Operator as Op;
135    use crate::operator::compare::CompareOperator;
136    use crate::operator::metrics::MetricsOperator;
137    use crate::{IntoArray, ToCanonical};
138
139    #[test]
140    fn test_basic_execution() {
141        let array = buffer![1i32, 2, 3, 4].into_array().to_primitive();
142
143        let mut executor = Executor::default();
144        let result = block_on(executor.execute(Arc::new(array.clone()))).unwrap();
145        assert_eq!(
146            result.into_primitive().as_slice::<i32>(),
147            array.as_slice::<i32>()
148        );
149    }
150
151    #[test]
152    fn test_pipelined_execution() {
153        let lhs = buffer![1i32, 2, 3].into_array().to_primitive();
154        let rhs = buffer![3i32, 2, 1].into_array().to_primitive();
155
156        // The CompareOperator uses pipelined execution
157        let compare =
158            Arc::new(CompareOperator::try_new(Arc::new(lhs), Arc::new(rhs), Op::Gt).unwrap());
159
160        let mut executor = Executor::default();
161        let result = block_on(executor.execute(compare)).unwrap();
162        assert_eq!(
163            result.into_bool().bool_vec().unwrap(),
164            vec![false, false, true]
165        );
166    }
167
168    #[test]
169    fn test_common_subtree_elimination() {
170        // We use the same array for lhs and rhs to check we eliminate the common subtree
171        let array = buffer![1i32, 2, 3, 4].into_array().to_primitive();
172        let array = Arc::new(MetricsOperator::new(
173            Arc::new(array),
174            VortexMetrics::default(),
175        ));
176
177        let compare =
178            Arc::new(CompareOperator::try_new(array.clone(), array.clone(), Op::Gt).unwrap());
179        let compare = Arc::new(MetricsOperator::new(compare, VortexMetrics::default()));
180
181        let mut executor = Executor::default();
182        let result = block_on(executor.execute(compare.clone())).unwrap();
183        assert_eq!(
184            result.into_bool().bool_vec().unwrap(),
185            vec![false, false, false, false]
186        );
187
188        // The comparison operator is pipelined, it also only gets executed once
189        assert_eq!(compare.metrics().timer("operator.operator.step").count(), 1);
190        // The array only gets executed once due to common subtree elimination
191        assert_eq!(array.metrics().timer("operator.batch.execute").count(), 1);
192    }
193}