1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::future::{BoxFuture, Shared, WeakShared};
8use futures::{FutureExt, TryFutureExt};
9use itertools::Itertools;
10use vortex_error::{
11 SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err,
12};
13use vortex_utils::aliases::hash_map::HashMap;
14
15use crate::Canonical;
16use crate::operator::{BatchBindCtx, BatchExecution, BatchExecutionRef, OperatorKey, OperatorRef};
17use crate::pipeline::operator::PipelineOperator;
18
19#[derive(Default)]
26pub struct Executor {
27 execution_cache: HashMap<
30 OperatorKey<OperatorRef>,
31 WeakShared<BoxFuture<'static, SharedVortexResult<Canonical>>>,
32 >,
33}
34
35impl Executor {
36 pub fn execute(
38 &mut self,
39 operator: OperatorRef,
40 ) -> BoxFuture<'static, VortexResult<Canonical>> {
41 let execution = self.batch_execution(&operator);
42 async move { execution?.execute().await }.boxed()
43 }
44
45 fn batch_execution(&mut self, operator: &OperatorRef) -> VortexResult<BatchExecutionRef> {
46 let key = OperatorKey(operator.clone());
52 if let Some(weak_shared) = self.execution_cache.get(&key) {
53 if let Some(shared) = weak_shared.upgrade() {
54 return Ok(Box::new(SharedBatchExecution(shared)));
56 } else {
57 self.execution_cache.remove(&key);
59 }
60 }
61
62 let operator = match PipelineOperator::new(operator.clone()) {
67 None => operator.clone(),
68 Some(pipeline_op) => Arc::new(pipeline_op),
69 };
70
71 log::info!("Executing operator: {}", operator.display_tree());
72 println!("Executing operator: {}", operator.display_tree());
73
74 let mut children: Vec<_> = operator
76 .children()
77 .iter()
78 .map(|child| self.batch_execution(child))
79 .map_ok(Some)
80 .try_collect()?;
81
82 let execution = operator
83 .as_batch()
84 .ok_or_else(|| {
85 vortex_err!(
86 "Operator does not support batch execution OR pipelined execution: {:?}",
87 operator
88 )
89 })?
90 .bind(&mut children)?;
91
92 let shared_future = execution.execute().map_err(Arc::new).boxed().shared();
93 self.execution_cache.insert(
94 OperatorKey(operator),
95 shared_future.downgrade().vortex_expect("just created"),
96 );
97 Ok(Box::new(SharedBatchExecution(shared_future)))
98 }
99}
100
101impl BatchBindCtx for Vec<Option<BatchExecutionRef>> {
102 fn child(&mut self, idx: usize) -> VortexResult<BatchExecutionRef> {
103 if idx >= self.len() {
104 vortex_bail!("Child index {} out of bounds", idx);
105 }
106 self[idx]
107 .take()
108 .ok_or_else(|| vortex_err!("Child already consumed"))
109 }
110}
111
112struct SharedBatchExecution(Shared<BoxFuture<'static, SharedVortexResult<Canonical>>>);
119
120#[async_trait]
121impl BatchExecution for SharedBatchExecution {
122 async fn execute(self: Box<Self>) -> VortexResult<Canonical> {
123 self.0.await.map_err(VortexError::from)
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use futures::executor::block_on;
130 use vortex_buffer::buffer;
131 use vortex_metrics::VortexMetrics;
132
133 use super::*;
134 use crate::compute::Operator as Op;
135 use crate::operator::compare::CompareOperator;
136 use crate::operator::metrics::MetricsOperator;
137 use crate::{IntoArray, ToCanonical};
138
139 #[test]
140 fn test_basic_execution() {
141 let array = buffer![1i32, 2, 3, 4].into_array().to_primitive();
142
143 let mut executor = Executor::default();
144 let result = block_on(executor.execute(Arc::new(array.clone()))).unwrap();
145 assert_eq!(
146 result.into_primitive().as_slice::<i32>(),
147 array.as_slice::<i32>()
148 );
149 }
150
151 #[test]
152 fn test_pipelined_execution() {
153 let lhs = buffer![1i32, 2, 3].into_array().to_primitive();
154 let rhs = buffer![3i32, 2, 1].into_array().to_primitive();
155
156 let compare =
158 Arc::new(CompareOperator::try_new(Arc::new(lhs), Arc::new(rhs), Op::Gt).unwrap());
159
160 let mut executor = Executor::default();
161 let result = block_on(executor.execute(compare)).unwrap();
162 assert_eq!(
163 result.into_bool().bool_vec().unwrap(),
164 vec![false, false, true]
165 );
166 }
167
168 #[test]
169 fn test_common_subtree_elimination() {
170 let array = buffer![1i32, 2, 3, 4].into_array().to_primitive();
172 let array = Arc::new(MetricsOperator::new(
173 Arc::new(array),
174 VortexMetrics::default(),
175 ));
176
177 let compare =
178 Arc::new(CompareOperator::try_new(array.clone(), array.clone(), Op::Gt).unwrap());
179 let compare = Arc::new(MetricsOperator::new(compare, VortexMetrics::default()));
180
181 let mut executor = Executor::default();
182 let result = block_on(executor.execute(compare.clone())).unwrap();
183 assert_eq!(
184 result.into_bool().bool_vec().unwrap(),
185 vec![false, false, false, false]
186 );
187
188 assert_eq!(compare.metrics().timer("operator.operator.step").count(), 1);
190 assert_eq!(array.metrics().timer("operator.batch.execute").count(), 1);
192 }
193}