1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use super::Step;
use crate::{
  ExecutionContext, JobId, JobRunResult, StepRunResult, WorkflowState, WorkflowStateEvent,
  WorkflowTriggerEvents,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
  pub id: JobId,
  pub name: Option<String>,
  pub steps: Vec<Step>,
  pub on: Option<WorkflowTriggerEvents>,
  /// For workflow run
  pub depends_on: Vec<String>,
  pub working_directories: Vec<String>,
}

impl Job {
  pub async fn run(&self, ctx: ExecutionContext) -> JobRunResult {
    let started_at = chrono::Utc::now();
    let mut job_state = WorkflowState::InProgress;

    // Dispatch run job event
    ctx.on_run_job(self.clone());
    ctx.on_state_change(WorkflowStateEvent::JobStateUpdated {
      id: self.id.clone(),
      state: job_state.clone(),
    });

    let mut steps = Vec::new();

    for step in self.steps.iter().cloned() {
      let skipped = match job_state {
        WorkflowState::Failed => !step.continue_on_error,
        WorkflowState::Cancelled | WorkflowState::Skipped => true,
        _ => false,
      };

      if skipped {
        // TODO: log skipped step & call plugin manager
        steps.push(StepRunResult {
          id: step.id.clone(),
          state: WorkflowState::Skipped,
          exit_code: None,
          started_at: None,
          completed_at: None,
        });
        continue;
      }

      // TODO: inject environment variables
      let result = ctx.run(step).await;

      match result.state {
        WorkflowState::Failed => {
          job_state = WorkflowState::Failed;
        }
        WorkflowState::Cancelled => {
          job_state = WorkflowState::Cancelled;
        }
        _ => {}
      }

      steps.push(result);
    }

    if job_state.is_in_progress() {
      job_state = WorkflowState::Succeeded;
    }

    let completed_at = chrono::Utc::now();

    ctx.on_state_change(WorkflowStateEvent::JobStateUpdated {
      id: self.id.clone(),
      state: job_state.clone(),
    });

    let result = JobRunResult {
      id: self.id.clone(),
      state: job_state,
      started_at: Some(started_at),
      completed_at: Some(completed_at),
      steps,
    };

    ctx.on_job_completed(result.clone());

    result
  }
}