astro_run/workflow/
job.rs1use super::Step;
2use crate::{
3 Condition, ExecutionContext, JobId, JobRunResult, StepRunResult, WorkflowState,
4 WorkflowStateEvent,
5};
6use serde::{Deserialize, Serialize};
7
8#[derive(Serialize, Deserialize, Debug, Clone)]
9pub struct Job {
10 pub id: JobId,
11 pub name: Option<String>,
12 pub on: Option<Condition>,
13 pub steps: Vec<Step>,
14 pub depends_on: Vec<String>,
16 pub working_directories: Vec<String>,
17}
18
19impl Job {
20 pub async fn run(&self, ctx: ExecutionContext) -> JobRunResult {
21 if self.should_skip(&ctx).await {
22 ctx
23 .call_on_state_change(WorkflowStateEvent::JobStateUpdated {
24 id: self.id.clone(),
25 state: WorkflowState::Skipped,
26 })
27 .await;
28
29 return JobRunResult {
30 id: self.id.clone(),
31 state: WorkflowState::Skipped,
32 started_at: None,
33 completed_at: None,
34 steps: vec![],
35 };
36 }
37
38 let started_at = chrono::Utc::now();
39 let mut job_state = WorkflowState::InProgress;
40
41 ctx.call_on_run_job(self.clone()).await;
43 ctx
44 .call_on_state_change(WorkflowStateEvent::JobStateUpdated {
45 id: self.id.clone(),
46 state: job_state.clone(),
47 })
48 .await;
49
50 let mut steps = Vec::new();
51
52 for step in self.steps.iter().cloned() {
53 let mut skipped = match job_state {
54 WorkflowState::Failed => !step.continue_on_error,
55 WorkflowState::Cancelled | WorkflowState::Skipped => true,
56 _ => false,
57 };
58
59 if !skipped && step.should_skip(&ctx).await {
60 skipped = true;
61 }
62
63 if skipped {
64 log::trace!("Step {} is skipped", step.id.to_string());
65
66 ctx
67 .call_on_state_change(WorkflowStateEvent::StepStateUpdated {
68 id: step.id.clone(),
69 state: WorkflowState::Skipped,
70 })
71 .await;
72
73 steps.push(StepRunResult {
74 id: step.id.clone(),
75 state: WorkflowState::Skipped,
76 exit_code: None,
77 started_at: None,
78 completed_at: None,
79 });
80 continue;
81 }
82
83 let result = ctx.run(step).await;
84
85 match result.state {
86 WorkflowState::Failed => {
87 job_state = WorkflowState::Failed;
88 }
89 WorkflowState::Cancelled => {
90 job_state = WorkflowState::Cancelled;
91 }
92 _ => {}
93 }
94
95 steps.push(result);
96 }
97
98 if job_state.is_in_progress() {
99 job_state = WorkflowState::Succeeded;
100 }
101
102 let completed_at = chrono::Utc::now();
103
104 ctx
105 .call_on_state_change(WorkflowStateEvent::JobStateUpdated {
106 id: self.id.clone(),
107 state: job_state.clone(),
108 })
109 .await;
110
111 let result = JobRunResult {
112 id: self.id.clone(),
113 state: job_state,
114 started_at: Some(started_at),
115 completed_at: Some(completed_at),
116 steps,
117 };
118
119 ctx.call_on_job_completed(result.clone()).await;
120
121 result
122 }
123
124 pub async fn should_skip(&self, ctx: &ExecutionContext) -> bool {
125 if let Some(on) = &self.on {
126 !ctx.is_match(on).await
127 } else {
128 false
129 }
130 }
131}