astro_run/execution_context/
mod.rs

1mod builder;
2mod condition_matcher;
3mod context_payload;
4
5pub use self::builder::ExecutionContextBuilder;
6use crate::{
7  AstroRunSignal, Condition, Context, Error, Job, JobId, JobRunResult, Result, RunResult,
8  RunStepEvent, Runner, SharedPluginDriver, Signal, SignalManager, Step, StepRunResult, StreamExt,
9  Workflow, WorkflowLog, WorkflowRunResult, WorkflowState, WorkflowStateEvent,
10};
11pub use context_payload::*;
12use std::sync::Arc;
13use tokio::time;
14
15#[derive(Clone)]
16pub struct ExecutionContext {
17  runner: Arc<Box<dyn Runner>>,
18  plugin_driver: SharedPluginDriver,
19  signal_manager: SignalManager,
20  condition_matcher: condition_matcher::ConditionMatcher,
21  payload: Option<ContextPayload>,
22}
23
24impl ExecutionContext {
25  pub async fn run(&self, step: Step) -> StepRunResult {
26    let step = self.call_on_before_run_step(step).await;
27
28    let step_id = step.id.clone();
29    let timeout = step.timeout;
30
31    let started_at = chrono::Utc::now();
32
33    let event = crate::RunStepEvent {
34      source: step.clone(),
35      trigger_event: self.condition_matcher.event.clone(),
36      payload: self.payload.clone(),
37    };
38
39    self.call_on_run_step(event.clone()).await;
40
41    // Queued
42    let event = WorkflowStateEvent::StepStateUpdated {
43      id: step_id.clone(),
44      state: WorkflowState::Queued,
45    };
46
47    self.call_on_state_change(event).await;
48
49    // Job signal
50    let job_signal = self
51      .signal_manager
52      .get_signal(&step.id.job_id())
53      .expect("Missing job signal");
54
55    // Step signal
56    let signal = AstroRunSignal::new();
57
58    let mut receiver = match self
59      .runner
60      .run(Context {
61        id: step_id.clone(),
62        signal: signal.clone(),
63        command: step.into(),
64        event: self.condition_matcher.event.clone(),
65        payload: self.payload_string(),
66      })
67      .await
68    {
69      Ok(receiver) => receiver,
70      Err(err) => {
71        let completed_at = chrono::Utc::now();
72        let duration = completed_at - started_at;
73        log::error!(
74          "Step {:?} failed with error {:?} in {} seconds",
75          step_id,
76          err,
77          duration.num_seconds()
78        );
79
80        let event = WorkflowStateEvent::StepStateUpdated {
81          id: step_id.clone(),
82          state: WorkflowState::Failed,
83        };
84
85        self.call_on_state_change(event).await;
86
87        let result = StepRunResult {
88          id: step_id,
89          state: WorkflowState::Failed,
90          exit_code: Some(1),
91          started_at: Some(started_at),
92          completed_at: Some(completed_at),
93        };
94
95        self.call_on_step_completed(result.clone()).await;
96
97        return result;
98      }
99    };
100
101    let event = WorkflowStateEvent::StepStateUpdated {
102      id: step_id.clone(),
103      state: WorkflowState::InProgress,
104    };
105
106    self.call_on_state_change(event).await;
107
108    loop {
109      tokio::select! {
110        // Timeout
111        _ = time::sleep(timeout) => {
112          // Ignore error
113          signal.timeout().ok();
114        }
115        s = job_signal.recv() => {
116          match s {
117            Signal::Cancel => {
118              signal.cancel().ok();
119            }
120            Signal::Timeout => {
121              signal.timeout().ok();
122            }
123          }
124        }
125        received = receiver.next() => {
126          if let Some(log) = received {
127            let log = WorkflowLog {
128              step_id: step_id.clone(),
129              log_type: log.log_type,
130              message: log.message,
131              time: chrono::Utc::now(),
132            };
133
134            self.call_on_log(log.clone()).await;
135          } else {
136            break;
137          }
138        }
139      }
140    }
141
142    let res = receiver
143      .result()
144      // NOTE: This should never happen
145      .ok_or(Error::internal_runtime_error(
146        "Missing result from runner. This is a bug in the runner implementation.",
147      ))
148      .unwrap();
149
150    let completed_at = chrono::Utc::now();
151    let duration = completed_at - started_at;
152
153    log::trace!(
154      "Step {:?} finished with result {:?} in {} seconds",
155      step_id,
156      res,
157      duration.num_seconds()
158    );
159
160    let res = match res {
161      RunResult::Succeeded => StepRunResult {
162        id: step_id.clone(),
163        state: WorkflowState::Succeeded,
164        exit_code: None,
165        started_at: Some(started_at),
166        completed_at: Some(completed_at),
167      },
168      RunResult::Failed { exit_code } => StepRunResult {
169        id: step_id.clone(),
170        state: WorkflowState::Failed,
171        exit_code: Some(exit_code),
172        started_at: Some(started_at),
173        completed_at: Some(completed_at),
174      },
175      RunResult::Cancelled => StepRunResult {
176        id: step_id.clone(),
177        state: WorkflowState::Cancelled,
178        exit_code: None,
179        started_at: Some(started_at),
180        completed_at: Some(completed_at),
181      },
182    };
183
184    let event = WorkflowStateEvent::StepStateUpdated {
185      id: step_id.clone(),
186      state: res.state.clone(),
187    };
188
189    self.call_on_state_change(event).await;
190
191    self.call_on_step_completed(res.clone()).await;
192
193    log::trace!("Step {:?} completed", step_id);
194
195    res
196  }
197
198  pub fn cancel_job(&self, job_id: &JobId) -> Result<()> {
199    self.signal_manager.cancel_job(job_id)
200  }
201
202  pub(crate) async fn is_match(&self, condition: &Condition) -> bool {
203    self.condition_matcher.is_match(condition).await
204  }
205
206  pub(crate) async fn call_on_run_workflow(&self, workflow: Workflow) {
207    let event = crate::RunWorkflowEvent {
208      source: workflow,
209      trigger_event: self.condition_matcher.event.clone(),
210      payload: self.payload.clone(),
211    };
212    self.plugin_driver.on_run_workflow(event.clone()).await;
213    if let Err(err) = self.runner.on_run_workflow(event).await {
214      log::error!("Failed to run workflow: {:?}", err);
215    }
216  }
217
218  pub(crate) async fn call_on_run_job(&self, job: Job) {
219    self
220      .signal_manager
221      .register_signal(job.id.clone(), AstroRunSignal::new());
222
223    let event = crate::RunJobEvent {
224      source: job,
225      trigger_event: self.condition_matcher.event.clone(),
226      payload: self.payload.clone(),
227    };
228
229    self.plugin_driver.on_run_job(event.clone()).await;
230    if let Err(err) = self.runner.on_run_job(event).await {
231      log::error!("Failed to run job: {:?}", err);
232    }
233  }
234
235  pub(crate) async fn call_on_state_change(&self, event: WorkflowStateEvent) {
236    self.plugin_driver.on_state_change(event.clone()).await;
237
238    if let Err(err) = self.runner.on_state_change(event).await {
239      log::error!("Failed to handle state change: {:?}", err);
240    }
241  }
242
243  pub(crate) async fn call_on_job_completed(&self, result: JobRunResult) {
244    self.signal_manager.unregister_signal(&result.id);
245    self.plugin_driver.on_job_completed(result.clone()).await;
246
247    if let Err(err) = self.runner.on_job_completed(result).await {
248      log::error!("Failed to handle job completed: {:?}", err);
249    }
250  }
251
252  pub(crate) async fn call_on_run_step(&self, event: RunStepEvent) {
253    self.plugin_driver.on_run_step(event.clone()).await;
254
255    if let Err(err) = self.runner.on_run_step(event).await {
256      log::error!("Failed to run step: {:?}", err);
257    }
258  }
259
260  pub(crate) async fn call_on_step_completed(&self, result: StepRunResult) {
261    self.plugin_driver.on_step_completed(result.clone()).await;
262
263    if let Err(err) = self.runner.on_step_completed(result.clone()).await {
264      log::error!("Failed to handle step completed: {:?}", err);
265    }
266  }
267
268  pub(crate) async fn call_on_before_run_step(&self, step: Step) -> Step {
269    let step = self.plugin_driver.on_before_run_step(step).await;
270
271    if let Ok(step) = self.runner.on_before_run_step(step.clone()).await {
272      step
273    } else {
274      step
275    }
276  }
277
278  pub(crate) async fn call_on_workflow_completed(&self, result: WorkflowRunResult) {
279    self
280      .plugin_driver
281      .on_workflow_completed(result.clone())
282      .await;
283
284    if let Err(err) = self.runner.on_workflow_completed(result).await {
285      log::error!("Failed to handle workflow completed: {:?}", err);
286    }
287  }
288
289  pub(crate) async fn call_on_log(&self, log: WorkflowLog) {
290    self.plugin_driver.on_log(log.clone()).await;
291
292    if let Err(err) = self.runner.on_log(log).await {
293      log::error!("Failed to handle log: {:?}", err);
294    }
295  }
296
297  pub fn builder() -> ExecutionContextBuilder {
298    ExecutionContextBuilder::new()
299  }
300
301  pub fn payload<P>(&self) -> Option<&P>
302  where
303    P: ContextPayloadExt + 'static,
304  {
305    self.payload.as_ref().and_then(|p| p.payload())
306  }
307
308  fn payload_string(&self) -> Option<String> {
309    self
310      .payload
311      .as_ref()
312      .map(|p| serde_json::to_string(&p).expect("Failed to serialize payload"))
313  }
314}