use crate::{
error,
runner::{Job, JobContext, WorkflowRunContext},
JobId, JobRunResult, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{
fs,
sync::mpsc::{channel, Sender},
};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Workflow {
pub name: Option<String>,
pub on: Option<WorkflowTriggerEvents>,
pub jobs: HashMap<String, Job>,
}
#[derive(Debug)]
pub struct RunJobResult(String, JobRunResult);
impl Workflow {
pub async fn run(&self, context: WorkflowRunContext) -> crate::Result<WorkflowRunResult> {
let started_at = chrono::Utc::now();
context
.message_sender
.update_workflow_state(WorkflowState::InProgress);
let mut workflow_state = WorkflowState::InProgress;
let (sender, mut receiver) = channel::<RunJobResult>(10);
let mut waiting_jobs: Vec<(JobId, Job)> = vec![];
let mut job_results: HashMap<String, JobRunResult> = HashMap::new();
let context = Arc::new(context);
for (key, job) in self.jobs.iter() {
let key = key.clone();
let job = job.clone();
let context = context.clone();
if let Some(depends_on) = &job.depends_on {
for depends_on_key in depends_on {
if !self.jobs.contains_key(depends_on_key) {
log::error!(
"Job {} depends on job {} which does not exist",
key,
depends_on_key
);
workflow_state = WorkflowState::Failed;
break;
}
}
waiting_jobs.push((key, job));
continue;
}
self.run_job(key.clone(), job.clone(), context.clone(), sender.clone());
}
let total_jobs = self.jobs.len();
while let Some(RunJobResult(key, job_result)) = receiver.recv().await {
if job_result.state == WorkflowState::Failed {
workflow_state = WorkflowState::Failed;
} else if job_result.state == WorkflowState::Cancelled {
workflow_state = WorkflowState::Cancelled;
}
job_results.insert(key, job_result);
if job_results.len() == total_jobs {
if workflow_state == WorkflowState::InProgress {
workflow_state = WorkflowState::Succeeded;
}
break;
}
for (job_id, job) in waiting_jobs.iter() {
if let Some(depends_on) = &job.depends_on {
let mut all_finished = true;
for depends_on_key in depends_on {
if !job_results.contains_key(depends_on_key) {
all_finished = false;
break;
}
}
if all_finished {
self.run_job(job_id.clone(), job.clone(), context.clone(), sender.clone());
}
}
}
}
let ended_at = chrono::Utc::now();
context
.message_sender
.update_workflow_state(workflow_state.clone());
log::info!(
"Duration: {:?}ms",
ended_at.timestamp_millis() - started_at.timestamp_millis()
);
Ok(WorkflowRunResult {
state: workflow_state,
started_at: Some(started_at),
ended_at: Some(ended_at),
jobs: HashMap::new(),
})
}
fn run_job(
&self,
key: String,
job: Job,
workflow_run_context: Arc<WorkflowRunContext>,
sender: Sender<RunJobResult>,
) {
tokio::spawn(async move {
let workflow_working_dir = workflow_run_context.working_dir.clone();
let job_dirname = format!("workflow-job-{}", key.clone());
let host_working_dir = workflow_working_dir.join(&job_dirname);
let host_user_dir = host_working_dir.join("data");
if let Err(err) = fs::create_dir_all(&host_user_dir).await.map_err(|err| {
error::Error::io_error(
err,
format!("Failed to create host working dir on job {}", key.clone()),
)
}) {
log::error!("Job error: {}", err.to_string());
let res = JobRunResult {
state: WorkflowState::Failed,
started_at: None,
ended_at: None,
steps: vec![],
};
if let Err(err) = sender.send(RunJobResult(key, res)).await {
log::error!("Failed to send job result: {}", err.to_string());
}
return;
};
let mut working_dir_maps: Vec<(PathBuf, String)> = vec![];
if let Some(working_dirs) = &job.working_dirs {
for (index, working_dir) in working_dirs.iter().enumerate() {
let host_dir = host_working_dir.join(format!("working-dir-{}", index));
if let Err(err) = fs::create_dir_all(&host_dir).await.map_err(|err| {
error::Error::io_error(
err,
format!("Failed to create job working dirs {}", working_dir),
)
}) {
log::error!("Job error: {}", err.to_string());
let res = JobRunResult {
state: WorkflowState::Failed,
started_at: None,
ended_at: None,
steps: vec![],
};
if let Err(err) = sender.send(RunJobResult(key, res)).await {
log::error!("Failed to send job result: {}", err.to_string());
}
return;
};
working_dir_maps.push((host_dir, working_dir.clone()));
}
}
let context = JobContext {
id: key.clone(),
message_sender: workflow_run_context
.message_sender
.create_job_message_sender(key.clone()),
workflow_run_context,
host_working_dir: host_working_dir.clone(),
host_user_dir,
working_dir_maps,
};
match job.run(context).await {
Ok(job_result) => {
if let Err(err) = sender.send(RunJobResult(key, job_result)).await {
log::error!("Failed to send job result: {}", err.to_string());
}
}
Err(err) => {
log::error!("Job error: {}", err.to_string());
let res = JobRunResult {
state: WorkflowState::Failed,
started_at: None,
ended_at: None,
steps: vec![],
};
if let Err(err) = sender.send(RunJobResult(key, res)).await {
log::error!("Failed to send job result: {}", err.to_string());
}
}
};
});
}
}