coodev_runner/runner/
workflow.rs

1use crate::{
2  error,
3  runner::{Job, JobContext, WorkflowRunContext},
4  JobId, JobRunResult, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
5};
6use serde::{Deserialize, Serialize};
7use std::{collections::HashMap, path::PathBuf, sync::Arc};
8use tokio::{
9  fs,
10  sync::mpsc::{channel, Sender},
11};
12
13#[derive(Serialize, Deserialize, Debug, Clone)]
14pub struct Workflow {
15  pub name: Option<String>,
16  pub on: Option<WorkflowTriggerEvents>,
17  pub jobs: HashMap<String, Job>,
18}
19
20#[derive(Debug)]
21pub struct RunJobResult(String, JobRunResult);
22
23impl Workflow {
24  pub async fn run(&self, context: WorkflowRunContext) -> crate::Result<WorkflowRunResult> {
25    let started_at = chrono::Utc::now();
26    context
27      .message_sender
28      .update_workflow_state(WorkflowState::InProgress);
29
30    // TODO: Implement workflow skipping
31    // if self.on.is_some() && !self.on.as_ref().unwrap().contains(&context.event) {
32    //   // Skip workflow
33    //   context
34    //     .message_sender
35    //     .update_workflow_state(WorkflowState::Skipped);
36
37    //   return Ok(WorkflowRunResult {
38    //     state: WorkflowState::Skipped,
39    //     started_at: Some(started_at),
40    //     ended_at: Some(chrono::Utc::now()),
41    //     jobs: HashMap::new(),
42    //   });
43    // }
44
45    let mut workflow_state = WorkflowState::InProgress;
46    let (sender, mut receiver) = channel::<RunJobResult>(10);
47
48    let mut waiting_jobs: Vec<(JobId, Job)> = vec![];
49    let mut job_results: HashMap<String, JobRunResult> = HashMap::new();
50
51    let context = Arc::new(context);
52    for (key, job) in self.jobs.iter() {
53      let key = key.clone();
54      let job = job.clone();
55      let context = context.clone();
56
57      if let Some(depends_on) = &job.depends_on {
58        for depends_on_key in depends_on {
59          if !self.jobs.contains_key(depends_on_key) {
60            log::error!(
61              "Job {} depends on job {} which does not exist",
62              key,
63              depends_on_key
64            );
65            workflow_state = WorkflowState::Failed;
66            break;
67          }
68        }
69
70        waiting_jobs.push((key, job));
71        continue;
72      }
73      self.run_job(key.clone(), job.clone(), context.clone(), sender.clone());
74    }
75
76    let total_jobs = self.jobs.len();
77
78    // If there are no jobs to run, we are done
79    while let Some(RunJobResult(key, job_result)) = receiver.recv().await {
80      if job_result.state == WorkflowState::Failed {
81        workflow_state = WorkflowState::Failed;
82      } else if job_result.state == WorkflowState::Cancelled {
83        workflow_state = WorkflowState::Cancelled;
84      }
85
86      job_results.insert(key, job_result);
87
88      if job_results.len() == total_jobs {
89        if workflow_state == WorkflowState::InProgress {
90          workflow_state = WorkflowState::Succeeded;
91        }
92        break;
93      }
94
95      for (job_id, job) in waiting_jobs.iter() {
96        if let Some(depends_on) = &job.depends_on {
97          let mut all_finished = true;
98          for depends_on_key in depends_on {
99            if !job_results.contains_key(depends_on_key) {
100              all_finished = false;
101              break;
102            }
103          }
104
105          if all_finished {
106            self.run_job(job_id.clone(), job.clone(), context.clone(), sender.clone());
107          }
108        }
109      }
110    }
111
112    let ended_at = chrono::Utc::now();
113
114    context
115      .message_sender
116      .update_workflow_state(workflow_state.clone());
117
118    log::info!(
119      "Duration: {:?}ms",
120      ended_at.timestamp_millis() - started_at.timestamp_millis()
121    );
122
123    Ok(WorkflowRunResult {
124      state: workflow_state,
125      started_at: Some(started_at),
126      ended_at: Some(ended_at),
127      jobs: HashMap::new(),
128    })
129  }
130
131  fn run_job(
132    &self,
133    key: String,
134    job: Job,
135    workflow_run_context: Arc<WorkflowRunContext>,
136    sender: Sender<RunJobResult>,
137  ) {
138    tokio::spawn(async move {
139      let workflow_working_dir = workflow_run_context.working_dir.clone();
140      let job_dirname = format!("workflow-job-{}", key.clone());
141      let host_working_dir = workflow_working_dir.join(&job_dirname);
142      let host_user_dir = host_working_dir.join("data");
143
144      // Create host working dir
145      if let Err(err) = fs::create_dir_all(&host_user_dir).await.map_err(|err| {
146        error::Error::io_error(
147          err,
148          format!("Failed to create host working dir on job {}", key.clone()),
149        )
150      }) {
151        log::error!("Job error: {}", err.to_string());
152        let res = JobRunResult {
153          state: WorkflowState::Failed,
154          started_at: None,
155          ended_at: None,
156          steps: vec![],
157        };
158        if let Err(err) = sender.send(RunJobResult(key, res)).await {
159          log::error!("Failed to send job result: {}", err.to_string());
160        }
161        return;
162      };
163
164      // Create working dirs
165      let mut working_dir_maps: Vec<(PathBuf, String)> = vec![];
166      if let Some(working_dirs) = &job.working_dirs {
167        for (index, working_dir) in working_dirs.iter().enumerate() {
168          let host_dir = host_working_dir.join(format!("working-dir-{}", index));
169          if let Err(err) = fs::create_dir_all(&host_dir).await.map_err(|err| {
170            error::Error::io_error(
171              err,
172              format!("Failed to create job working dirs {}", working_dir),
173            )
174          }) {
175            log::error!("Job error: {}", err.to_string());
176            let res = JobRunResult {
177              state: WorkflowState::Failed,
178              started_at: None,
179              ended_at: None,
180              steps: vec![],
181            };
182            if let Err(err) = sender.send(RunJobResult(key, res)).await {
183              log::error!("Failed to send job result: {}", err.to_string());
184            }
185            return;
186          };
187          working_dir_maps.push((host_dir, working_dir.clone()));
188        }
189      }
190
191      let context = JobContext {
192        id: key.clone(),
193        message_sender: workflow_run_context
194          .message_sender
195          .create_job_message_sender(key.clone()),
196        workflow_run_context,
197        host_working_dir: host_working_dir.clone(),
198        host_user_dir,
199        working_dir_maps,
200      };
201      match job.run(context).await {
202        Ok(job_result) => {
203          if let Err(err) = sender.send(RunJobResult(key, job_result)).await {
204            log::error!("Failed to send job result: {}", err.to_string());
205          }
206        }
207        Err(err) => {
208          log::error!("Job error: {}", err.to_string());
209          let res = JobRunResult {
210            state: WorkflowState::Failed,
211            started_at: None,
212            ended_at: None,
213            steps: vec![],
214          };
215          if let Err(err) = sender.send(RunJobResult(key, res)).await {
216            log::error!("Failed to send job result: {}", err.to_string());
217          }
218        }
219      };
220    });
221  }
222}