astro_run/workflow/
job.rs

1use super::Step;
2use crate::{
3  Condition, ExecutionContext, JobId, JobRunResult, StepRunResult, WorkflowState,
4  WorkflowStateEvent,
5};
6use serde::{Deserialize, Serialize};
7
8#[derive(Serialize, Deserialize, Debug, Clone)]
9pub struct Job {
10  pub id: JobId,
11  pub name: Option<String>,
12  pub on: Option<Condition>,
13  pub steps: Vec<Step>,
14  /// For workflow run
15  pub depends_on: Vec<String>,
16  pub working_directories: Vec<String>,
17}
18
19impl Job {
20  pub async fn run(&self, ctx: ExecutionContext) -> JobRunResult {
21    if self.should_skip(&ctx).await {
22      ctx
23        .call_on_state_change(WorkflowStateEvent::JobStateUpdated {
24          id: self.id.clone(),
25          state: WorkflowState::Skipped,
26        })
27        .await;
28
29      return JobRunResult {
30        id: self.id.clone(),
31        state: WorkflowState::Skipped,
32        started_at: None,
33        completed_at: None,
34        steps: vec![],
35      };
36    }
37
38    let started_at = chrono::Utc::now();
39    let mut job_state = WorkflowState::InProgress;
40
41    // Dispatch run job event
42    ctx.call_on_run_job(self.clone()).await;
43    ctx
44      .call_on_state_change(WorkflowStateEvent::JobStateUpdated {
45        id: self.id.clone(),
46        state: job_state.clone(),
47      })
48      .await;
49
50    let mut steps = Vec::new();
51
52    for step in self.steps.iter().cloned() {
53      let mut skipped = match job_state {
54        WorkflowState::Failed => !step.continue_on_error,
55        WorkflowState::Cancelled | WorkflowState::Skipped => true,
56        _ => false,
57      };
58
59      if !skipped && step.should_skip(&ctx).await {
60        skipped = true;
61      }
62
63      if skipped {
64        log::trace!("Step {} is skipped", step.id.to_string());
65
66        ctx
67          .call_on_state_change(WorkflowStateEvent::StepStateUpdated {
68            id: step.id.clone(),
69            state: WorkflowState::Skipped,
70          })
71          .await;
72
73        steps.push(StepRunResult {
74          id: step.id.clone(),
75          state: WorkflowState::Skipped,
76          exit_code: None,
77          started_at: None,
78          completed_at: None,
79        });
80        continue;
81      }
82
83      let result = ctx.run(step).await;
84
85      match result.state {
86        WorkflowState::Failed => {
87          job_state = WorkflowState::Failed;
88        }
89        WorkflowState::Cancelled => {
90          job_state = WorkflowState::Cancelled;
91        }
92        _ => {}
93      }
94
95      steps.push(result);
96    }
97
98    if job_state.is_in_progress() {
99      job_state = WorkflowState::Succeeded;
100    }
101
102    let completed_at = chrono::Utc::now();
103
104    ctx
105      .call_on_state_change(WorkflowStateEvent::JobStateUpdated {
106        id: self.id.clone(),
107        state: job_state.clone(),
108      })
109      .await;
110
111    let result = JobRunResult {
112      id: self.id.clone(),
113      state: job_state,
114      started_at: Some(started_at),
115      completed_at: Some(completed_at),
116      steps,
117    };
118
119    ctx.call_on_job_completed(result.clone()).await;
120
121    result
122  }
123
124  pub async fn should_skip(&self, ctx: &ExecutionContext) -> bool {
125    if let Some(on) = &self.on {
126      !ctx.is_match(on).await
127    } else {
128      false
129    }
130  }
131}