1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use super::Step;
use crate::{
  Condition, ExecutionContext, JobId, JobRunResult, StepRunResult, WorkflowState,
  WorkflowStateEvent,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
  pub id: JobId,
  pub name: Option<String>,
  pub on: Option<Condition>,
  pub steps: Vec<Step>,
  /// For workflow run
  pub depends_on: Vec<String>,
  pub working_directories: Vec<String>,
}

impl Job {
  pub async fn run(&self, ctx: ExecutionContext) -> JobRunResult {
    if let Some(on) = &self.on {
      if !ctx.is_match(on).await {
        ctx.on_state_change(WorkflowStateEvent::JobStateUpdated {
          id: self.id.clone(),
          state: WorkflowState::Skipped,
        });

        return JobRunResult {
          id: self.id.clone(),
          state: WorkflowState::Skipped,
          started_at: None,
          completed_at: None,
          steps: vec![],
        };
      }
    }

    let started_at = chrono::Utc::now();
    let mut job_state = WorkflowState::InProgress;

    // Dispatch run job event
    ctx.on_run_job(self.clone());
    ctx.on_state_change(WorkflowStateEvent::JobStateUpdated {
      id: self.id.clone(),
      state: job_state.clone(),
    });

    let mut steps = Vec::new();

    for step in self.steps.iter().cloned() {
      let mut skipped = match job_state {
        WorkflowState::Failed => !step.continue_on_error,
        WorkflowState::Cancelled | WorkflowState::Skipped => true,
        _ => false,
      };

      if !skipped {
        if let Some(on) = &step.on {
          if !ctx.is_match(on).await {
            skipped = true;
          }
        }
      }

      if skipped {
        log::trace!("Step {} is skipped", step.id.to_string());

        ctx.on_state_change(WorkflowStateEvent::StepStateUpdated {
          id: step.id.clone(),
          state: WorkflowState::Skipped,
        });

        steps.push(StepRunResult {
          id: step.id.clone(),
          state: WorkflowState::Skipped,
          exit_code: None,
          started_at: None,
          completed_at: None,
        });
        continue;
      }

      let result = ctx.run(step).await;

      match result.state {
        WorkflowState::Failed => {
          job_state = WorkflowState::Failed;
        }
        WorkflowState::Cancelled => {
          job_state = WorkflowState::Cancelled;
        }
        _ => {}
      }

      steps.push(result);
    }

    if job_state.is_in_progress() {
      job_state = WorkflowState::Succeeded;
    }

    let completed_at = chrono::Utc::now();

    ctx.on_state_change(WorkflowStateEvent::JobStateUpdated {
      id: self.id.clone(),
      state: job_state.clone(),
    });

    let result = JobRunResult {
      id: self.id.clone(),
      state: job_state,
      started_at: Some(started_at),
      completed_at: Some(completed_at),
      steps,
    };

    ctx.on_job_completed(result.clone());

    result
  }
}