use crate::{
error,
runner::{JobMessageSender, Step, StepRunContext, WorkflowRunContext},
Id, JobRunResult, StepRunResult, WorkflowState,
};
use serde::{Deserialize, Serialize};
use std::{path::PathBuf, sync::Arc};
use tokio::fs;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
pub name: Option<String>,
pub image: String,
pub steps: Vec<Step>,
pub depends_on: Option<Vec<String>>,
pub working_dirs: Option<Vec<String>>,
}
pub struct JobContext {
pub id: Id,
pub host_working_dir: PathBuf,
pub host_user_dir: PathBuf,
pub workflow_run_context: Arc<WorkflowRunContext>,
pub message_sender: JobMessageSender,
pub working_dir_maps: Vec<(PathBuf, String)>,
}
impl Job {
pub async fn run(&self, context: JobContext) -> crate::Result<JobRunResult> {
let started_at = chrono::Utc::now();
let job_id = context.id.clone();
context
.message_sender
.update_job_state(WorkflowState::InProgress);
let workflow_working_dir = context.workflow_run_context.working_dir.clone();
let job_dirname = format!("workflow-job-{}", job_id);
let container_working_dir = workflow_working_dir.join(&job_dirname);
let container_user_dir = container_working_dir.join("data");
fs::create_dir_all(&container_user_dir)
.await
.map_err(|err| error::Error::io_error(err, "Failed to create container working dir"))?;
let mut steps = Vec::new();
let mut job_state = WorkflowState::InProgress;
let context = Arc::new(context);
for (index, step) in self.steps.iter().enumerate() {
let step_number = (index + 1) as u64;
let context = StepRunContext {
number: step_number,
message_sender: context
.message_sender
.create_step_message_sender(step_number),
job_context: context.clone(),
};
let skipped = match job_state {
WorkflowState::Failed => !step.continue_on_error,
WorkflowState::Cancelled | WorkflowState::Skipped => true,
_ => false,
};
if skipped {
context
.message_sender
.update_step_state(WorkflowState::Skipped);
steps.push(StepRunResult {
state: WorkflowState::Skipped,
started_at: None,
ended_at: None,
error: None,
});
continue;
}
let started_at = chrono::Utc::now();
match step.run(context).await {
Ok(result) => {
match result.state {
WorkflowState::Failed => {
job_state = WorkflowState::Failed;
}
WorkflowState::Cancelled => {
job_state = WorkflowState::Cancelled;
}
_ => {}
}
steps.push(result);
}
Err(err) => {
log::error!("Step failed: {}", err.to_string());
if job_state == WorkflowState::InProgress {
job_state = WorkflowState::Failed;
}
steps.push(StepRunResult {
state: WorkflowState::Failed,
started_at: Some(started_at),
ended_at: Some(chrono::Utc::now()),
error: Some(err),
});
break;
}
}
}
if job_state == WorkflowState::InProgress {
job_state = WorkflowState::Succeeded;
}
let ended_at = chrono::Utc::now();
context.message_sender.update_job_state(job_state.clone());
Ok(JobRunResult {
state: job_state,
started_at: Some(started_at),
ended_at: Some(ended_at),
steps,
})
}
}