1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
mod builder;

use self::builder::ExecutionContextBuilder;
use crate::{
  AstroRunSharedState, Command, Context, Error, Job, JobRunResult, RunResult, Runner,
  StepRunResult, StreamExt, Workflow, WorkflowLog, WorkflowRunResult, WorkflowState,
  WorkflowStateEvent,
};
use std::sync::Arc;

#[derive(Clone)]
pub struct ExecutionContext {
  // pub workflow_shared: WorkflowShared,
  runner: Arc<Box<dyn Runner>>,
  shared_state: AstroRunSharedState,
}

impl ExecutionContext {
  pub fn builder() -> ExecutionContextBuilder {
    ExecutionContextBuilder::new()
  }

  pub async fn run(&self, command: Command) -> StepRunResult {
    let step_id = command.id.clone();

    let plugin_manager = self.shared_state.plugins();

    let started_at = chrono::Utc::now();
    plugin_manager.on_state_change(WorkflowStateEvent::StepStateUpdated {
      id: step_id.clone(),
      state: WorkflowState::InProgress,
    });

    let mut receiver = match self.runner.run(Context {
      id: step_id.to_string(),
      command,
    }) {
      Ok(receiver) => receiver,
      Err(err) => {
        let completed_at = chrono::Utc::now();
        let duration = completed_at - started_at;
        log::error!(
          "Step {:?} failed with error {:?} in {} seconds",
          step_id,
          err,
          duration.num_seconds()
        );

        plugin_manager.on_state_change(WorkflowStateEvent::StepStateUpdated {
          id: step_id.clone(),
          state: WorkflowState::Failed,
        });

        return StepRunResult {
          id: step_id,
          state: WorkflowState::Failed,
          exit_code: Some(1),
          started_at: Some(started_at),
          completed_at: Some(completed_at),
        };
      }
    };

    while let Some(log) = receiver.next().await {
      let log = WorkflowLog {
        step_id: step_id.clone(),
        log_type: log.log_type,
        message: log.message,
        time: chrono::Utc::now(),
      };

      plugin_manager.on_log(log.clone());
      self.runner.on_log(log);
    }

    let res = receiver
      .result()
      // NOTE: This should never happen
      .ok_or(Error::internal_runtime_error(
        "Missing result from runner. This is a bug in the runner implementation.",
      ))
      .unwrap();

    let completed_at = chrono::Utc::now();
    let duration = completed_at - started_at;
    log::info!(
      "Step {:?} finished with result {:?} in {} seconds",
      step_id,
      res,
      duration.num_seconds()
    );

    let res = match res {
      RunResult::Succeeded => StepRunResult {
        id: step_id,
        state: WorkflowState::Succeeded,
        exit_code: None,
        started_at: Some(started_at),
        completed_at: Some(completed_at),
      },
      RunResult::Failed { exit_code } => StepRunResult {
        id: step_id,
        state: WorkflowState::Failed,
        exit_code: Some(exit_code),
        started_at: Some(started_at),
        completed_at: Some(completed_at),
      },
      RunResult::Cancelled => StepRunResult {
        id: step_id,
        state: WorkflowState::Cancelled,
        exit_code: None,
        started_at: Some(started_at),
        completed_at: Some(completed_at),
      },
    };

    res
  }

  pub fn on_run_workflow(&self, workflow: Workflow) {
    self.shared_state.on_run_workflow(workflow.clone());
    self.runner.on_run_workflow(workflow);
  }

  pub fn on_run_job(&self, job: Job) {
    self.shared_state.on_run_job(job.clone());
    self.runner.on_run_job(job);
  }

  pub fn on_state_change(&self, event: WorkflowStateEvent) {
    self.shared_state.on_state_change(event.clone());
    self.runner.on_state_change(event);
  }

  pub fn on_job_completed(&self, result: JobRunResult) {
    self.shared_state.on_job_completed(result.clone());
    self.runner.on_job_completed(result);
  }

  pub fn on_workflow_completed(&self, result: WorkflowRunResult) {
    self.shared_state.on_workflow_completed(result.clone());
    self.runner.on_workflow_completed(result);
  }
}