coodev_runner/runner/
job.rs1use crate::{
2 error,
3 runner::{JobMessageSender, Step, StepRunContext, WorkflowRunContext},
4 Id, JobRunResult, StepRunResult, WorkflowState,
5};
6use serde::{Deserialize, Serialize};
7use std::{path::PathBuf, sync::Arc};
8use tokio::fs;
9
10#[derive(Serialize, Deserialize, Debug, Clone)]
11pub struct Job {
12 pub name: Option<String>,
13 pub image: String,
14 pub steps: Vec<Step>,
15 pub depends_on: Option<Vec<String>>,
16 pub working_dirs: Option<Vec<String>>,
17}
18
19pub struct JobContext {
20 pub id: Id,
21 pub host_working_dir: PathBuf,
23 pub host_user_dir: PathBuf,
24 pub workflow_run_context: Arc<WorkflowRunContext>,
25 pub message_sender: JobMessageSender,
26 pub working_dir_maps: Vec<(PathBuf, String)>,
28}
29
30impl Job {
31 pub async fn run(&self, context: JobContext) -> crate::Result<JobRunResult> {
32 let started_at = chrono::Utc::now();
33 let job_id = context.id.clone();
34
35 context
36 .message_sender
37 .update_job_state(WorkflowState::InProgress);
38
39 let workflow_working_dir = context.workflow_run_context.working_dir.clone();
40 let job_dirname = format!("workflow-job-{}", job_id);
41 let container_working_dir = workflow_working_dir.join(&job_dirname);
42 let container_user_dir = container_working_dir.join("data");
43
44 fs::create_dir_all(&container_user_dir)
45 .await
46 .map_err(|err| error::Error::io_error(err, "Failed to create container working dir"))?;
47
48 let mut steps = Vec::new();
49 let mut job_state = WorkflowState::InProgress;
50
51 let context = Arc::new(context);
52 for (index, step) in self.steps.iter().enumerate() {
53 let step_number = (index + 1) as u64;
54
55 let context = StepRunContext {
56 number: step_number,
57 message_sender: context
58 .message_sender
59 .create_step_message_sender(step_number),
60 job_context: context.clone(),
61 };
63
64 let skipped = match job_state {
65 WorkflowState::Failed => !step.continue_on_error,
66 WorkflowState::Cancelled | WorkflowState::Skipped => true,
67 _ => false,
68 };
69
70 if skipped {
71 context
72 .message_sender
73 .update_step_state(WorkflowState::Skipped);
74
75 steps.push(StepRunResult {
76 state: WorkflowState::Skipped,
77 started_at: None,
78 ended_at: None,
79 error: None,
80 });
81
82 continue;
83 }
84
85 let started_at = chrono::Utc::now();
86 match step.run(context).await {
87 Ok(result) => {
88 match result.state {
89 WorkflowState::Failed => {
90 job_state = WorkflowState::Failed;
91 }
92 WorkflowState::Cancelled => {
93 job_state = WorkflowState::Cancelled;
94 }
95 _ => {}
96 }
97
98 steps.push(result);
99 }
100 Err(err) => {
101 log::error!("Step failed: {}", err.to_string());
102 if job_state == WorkflowState::InProgress {
103 job_state = WorkflowState::Failed;
104 }
105
106 steps.push(StepRunResult {
107 state: WorkflowState::Failed,
108 started_at: Some(started_at),
109 ended_at: Some(chrono::Utc::now()),
110 error: Some(err),
111 });
112 break;
113 }
114 }
115 }
116
117 if job_state == WorkflowState::InProgress {
118 job_state = WorkflowState::Succeeded;
119 }
120
121 let ended_at = chrono::Utc::now();
122
123 context.message_sender.update_job_state(job_state.clone());
124
125 Ok(JobRunResult {
126 state: job_state,
127 started_at: Some(started_at),
128 ended_at: Some(ended_at),
129 steps,
130 })
131 }
132}