coodev_runner/runner/
job.rs

1use crate::{
2  error,
3  runner::{JobMessageSender, Step, StepRunContext, WorkflowRunContext},
4  Id, JobRunResult, StepRunResult, WorkflowState,
5};
6use serde::{Deserialize, Serialize};
7use std::{path::PathBuf, sync::Arc};
8use tokio::fs;
9
10#[derive(Serialize, Deserialize, Debug, Clone)]
11pub struct Job {
12  pub name: Option<String>,
13  pub image: String,
14  pub steps: Vec<Step>,
15  pub depends_on: Option<Vec<String>>,
16  pub working_dirs: Option<Vec<String>>,
17}
18
19pub struct JobContext {
20  pub id: Id,
21  // pub container_working_dir: String,
22  pub host_working_dir: PathBuf,
23  pub host_user_dir: PathBuf,
24  pub workflow_run_context: Arc<WorkflowRunContext>,
25  pub message_sender: JobMessageSender,
26  // Host working dir -> container working dir
27  pub working_dir_maps: Vec<(PathBuf, String)>,
28}
29
30impl Job {
31  pub async fn run(&self, context: JobContext) -> crate::Result<JobRunResult> {
32    let started_at = chrono::Utc::now();
33    let job_id = context.id.clone();
34
35    context
36      .message_sender
37      .update_job_state(WorkflowState::InProgress);
38
39    let workflow_working_dir = context.workflow_run_context.working_dir.clone();
40    let job_dirname = format!("workflow-job-{}", job_id);
41    let container_working_dir = workflow_working_dir.join(&job_dirname);
42    let container_user_dir = container_working_dir.join("data");
43
44    fs::create_dir_all(&container_user_dir)
45      .await
46      .map_err(|err| error::Error::io_error(err, "Failed to create container working dir"))?;
47
48    let mut steps = Vec::new();
49    let mut job_state = WorkflowState::InProgress;
50
51    let context = Arc::new(context);
52    for (index, step) in self.steps.iter().enumerate() {
53      let step_number = (index + 1) as u64;
54
55      let context = StepRunContext {
56        number: step_number,
57        message_sender: context
58          .message_sender
59          .create_step_message_sender(step_number),
60        job_context: context.clone(),
61        // workflow_run_context: context.workflow_run_context.clone(),
62      };
63
64      let skipped = match job_state {
65        WorkflowState::Failed => !step.continue_on_error,
66        WorkflowState::Cancelled | WorkflowState::Skipped => true,
67        _ => false,
68      };
69
70      if skipped {
71        context
72          .message_sender
73          .update_step_state(WorkflowState::Skipped);
74
75        steps.push(StepRunResult {
76          state: WorkflowState::Skipped,
77          started_at: None,
78          ended_at: None,
79          error: None,
80        });
81
82        continue;
83      }
84
85      let started_at = chrono::Utc::now();
86      match step.run(context).await {
87        Ok(result) => {
88          match result.state {
89            WorkflowState::Failed => {
90              job_state = WorkflowState::Failed;
91            }
92            WorkflowState::Cancelled => {
93              job_state = WorkflowState::Cancelled;
94            }
95            _ => {}
96          }
97
98          steps.push(result);
99        }
100        Err(err) => {
101          log::error!("Step failed: {}", err.to_string());
102          if job_state == WorkflowState::InProgress {
103            job_state = WorkflowState::Failed;
104          }
105
106          steps.push(StepRunResult {
107            state: WorkflowState::Failed,
108            started_at: Some(started_at),
109            ended_at: Some(chrono::Utc::now()),
110            error: Some(err),
111          });
112          break;
113        }
114      }
115    }
116
117    if job_state == WorkflowState::InProgress {
118      job_state = WorkflowState::Succeeded;
119    }
120
121    let ended_at = chrono::Utc::now();
122
123    context.message_sender.update_job_state(job_state.clone());
124
125    Ok(JobRunResult {
126      state: job_state,
127      started_at: Some(started_at),
128      ended_at: Some(ended_at),
129      steps,
130    })
131  }
132}