1use crate::{
2 error,
3 runner::{Job, JobContext, WorkflowRunContext},
4 JobId, JobRunResult, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
5};
6use serde::{Deserialize, Serialize};
7use std::{collections::HashMap, path::PathBuf, sync::Arc};
8use tokio::{
9 fs,
10 sync::mpsc::{channel, Sender},
11};
12
13#[derive(Serialize, Deserialize, Debug, Clone)]
14pub struct Workflow {
15 pub name: Option<String>,
16 pub on: Option<WorkflowTriggerEvents>,
17 pub jobs: HashMap<String, Job>,
18}
19
20#[derive(Debug)]
21pub struct RunJobResult(String, JobRunResult);
22
23impl Workflow {
24 pub async fn run(&self, context: WorkflowRunContext) -> crate::Result<WorkflowRunResult> {
25 let started_at = chrono::Utc::now();
26 context
27 .message_sender
28 .update_workflow_state(WorkflowState::InProgress);
29
30 let mut workflow_state = WorkflowState::InProgress;
46 let (sender, mut receiver) = channel::<RunJobResult>(10);
47
48 let mut waiting_jobs: Vec<(JobId, Job)> = vec![];
49 let mut job_results: HashMap<String, JobRunResult> = HashMap::new();
50
51 let context = Arc::new(context);
52 for (key, job) in self.jobs.iter() {
53 let key = key.clone();
54 let job = job.clone();
55 let context = context.clone();
56
57 if let Some(depends_on) = &job.depends_on {
58 for depends_on_key in depends_on {
59 if !self.jobs.contains_key(depends_on_key) {
60 log::error!(
61 "Job {} depends on job {} which does not exist",
62 key,
63 depends_on_key
64 );
65 workflow_state = WorkflowState::Failed;
66 break;
67 }
68 }
69
70 waiting_jobs.push((key, job));
71 continue;
72 }
73 self.run_job(key.clone(), job.clone(), context.clone(), sender.clone());
74 }
75
76 let total_jobs = self.jobs.len();
77
78 while let Some(RunJobResult(key, job_result)) = receiver.recv().await {
80 if job_result.state == WorkflowState::Failed {
81 workflow_state = WorkflowState::Failed;
82 } else if job_result.state == WorkflowState::Cancelled {
83 workflow_state = WorkflowState::Cancelled;
84 }
85
86 job_results.insert(key, job_result);
87
88 if job_results.len() == total_jobs {
89 if workflow_state == WorkflowState::InProgress {
90 workflow_state = WorkflowState::Succeeded;
91 }
92 break;
93 }
94
95 for (job_id, job) in waiting_jobs.iter() {
96 if let Some(depends_on) = &job.depends_on {
97 let mut all_finished = true;
98 for depends_on_key in depends_on {
99 if !job_results.contains_key(depends_on_key) {
100 all_finished = false;
101 break;
102 }
103 }
104
105 if all_finished {
106 self.run_job(job_id.clone(), job.clone(), context.clone(), sender.clone());
107 }
108 }
109 }
110 }
111
112 let ended_at = chrono::Utc::now();
113
114 context
115 .message_sender
116 .update_workflow_state(workflow_state.clone());
117
118 log::info!(
119 "Duration: {:?}ms",
120 ended_at.timestamp_millis() - started_at.timestamp_millis()
121 );
122
123 Ok(WorkflowRunResult {
124 state: workflow_state,
125 started_at: Some(started_at),
126 ended_at: Some(ended_at),
127 jobs: HashMap::new(),
128 })
129 }
130
131 fn run_job(
132 &self,
133 key: String,
134 job: Job,
135 workflow_run_context: Arc<WorkflowRunContext>,
136 sender: Sender<RunJobResult>,
137 ) {
138 tokio::spawn(async move {
139 let workflow_working_dir = workflow_run_context.working_dir.clone();
140 let job_dirname = format!("workflow-job-{}", key.clone());
141 let host_working_dir = workflow_working_dir.join(&job_dirname);
142 let host_user_dir = host_working_dir.join("data");
143
144 if let Err(err) = fs::create_dir_all(&host_user_dir).await.map_err(|err| {
146 error::Error::io_error(
147 err,
148 format!("Failed to create host working dir on job {}", key.clone()),
149 )
150 }) {
151 log::error!("Job error: {}", err.to_string());
152 let res = JobRunResult {
153 state: WorkflowState::Failed,
154 started_at: None,
155 ended_at: None,
156 steps: vec![],
157 };
158 if let Err(err) = sender.send(RunJobResult(key, res)).await {
159 log::error!("Failed to send job result: {}", err.to_string());
160 }
161 return;
162 };
163
164 let mut working_dir_maps: Vec<(PathBuf, String)> = vec![];
166 if let Some(working_dirs) = &job.working_dirs {
167 for (index, working_dir) in working_dirs.iter().enumerate() {
168 let host_dir = host_working_dir.join(format!("working-dir-{}", index));
169 if let Err(err) = fs::create_dir_all(&host_dir).await.map_err(|err| {
170 error::Error::io_error(
171 err,
172 format!("Failed to create job working dirs {}", working_dir),
173 )
174 }) {
175 log::error!("Job error: {}", err.to_string());
176 let res = JobRunResult {
177 state: WorkflowState::Failed,
178 started_at: None,
179 ended_at: None,
180 steps: vec![],
181 };
182 if let Err(err) = sender.send(RunJobResult(key, res)).await {
183 log::error!("Failed to send job result: {}", err.to_string());
184 }
185 return;
186 };
187 working_dir_maps.push((host_dir, working_dir.clone()));
188 }
189 }
190
191 let context = JobContext {
192 id: key.clone(),
193 message_sender: workflow_run_context
194 .message_sender
195 .create_job_message_sender(key.clone()),
196 workflow_run_context,
197 host_working_dir: host_working_dir.clone(),
198 host_user_dir,
199 working_dir_maps,
200 };
201 match job.run(context).await {
202 Ok(job_result) => {
203 if let Err(err) = sender.send(RunJobResult(key, job_result)).await {
204 log::error!("Failed to send job result: {}", err.to_string());
205 }
206 }
207 Err(err) => {
208 log::error!("Job error: {}", err.to_string());
209 let res = JobRunResult {
210 state: WorkflowState::Failed,
211 started_at: None,
212 ended_at: None,
213 steps: vec![],
214 };
215 if let Err(err) = sender.send(RunJobResult(key, res)).await {
216 log::error!("Failed to send job result: {}", err.to_string());
217 }
218 }
219 };
220 });
221 }
222}