coodev-runner 0.1.42

A simple runner for coodev
Documentation
use crate::{
  error,
  runner::{JobMessageSender, Step, StepRunContext, WorkflowRunContext},
  Id, JobRunResult, StepRunResult, WorkflowState,
};
use serde::{Deserialize, Serialize};
use std::{path::PathBuf, sync::Arc};
use tokio::fs;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
  pub name: Option<String>,
  pub image: String,
  pub steps: Vec<Step>,
  pub depends_on: Option<Vec<String>>,
  pub working_dirs: Option<Vec<String>>,
}

pub struct JobContext {
  pub id: Id,
  // pub container_working_dir: String,
  pub host_working_dir: PathBuf,
  pub host_user_dir: PathBuf,
  pub workflow_run_context: Arc<WorkflowRunContext>,
  pub message_sender: JobMessageSender,
  // Host working dir -> container working dir
  pub working_dir_maps: Vec<(PathBuf, String)>,
}

impl Job {
  pub async fn run(&self, context: JobContext) -> crate::Result<JobRunResult> {
    let started_at = chrono::Utc::now();
    let job_id = context.id.clone();

    context
      .message_sender
      .update_job_state(WorkflowState::InProgress);

    let workflow_working_dir = context.workflow_run_context.working_dir.clone();
    let job_dirname = format!("workflow-job-{}", job_id);
    let container_working_dir = workflow_working_dir.join(&job_dirname);
    let container_user_dir = container_working_dir.join("data");

    fs::create_dir_all(&container_user_dir)
      .await
      .map_err(|err| error::Error::io_error(err, "Failed to create container working dir"))?;

    let mut steps = Vec::new();
    let mut job_state = WorkflowState::InProgress;

    let context = Arc::new(context);
    for (index, step) in self.steps.iter().enumerate() {
      let step_number = (index + 1) as u64;

      let context = StepRunContext {
        number: step_number,
        message_sender: context
          .message_sender
          .create_step_message_sender(step_number),
        job_context: context.clone(),
        // workflow_run_context: context.workflow_run_context.clone(),
      };

      let skipped = match job_state {
        WorkflowState::Failed => !step.continue_on_error,
        WorkflowState::Cancelled | WorkflowState::Skipped => true,
        _ => false,
      };

      if skipped {
        context
          .message_sender
          .update_step_state(WorkflowState::Skipped);

        steps.push(StepRunResult {
          state: WorkflowState::Skipped,
          started_at: None,
          ended_at: None,
          error: None,
        });

        continue;
      }

      let started_at = chrono::Utc::now();
      match step.run(context).await {
        Ok(result) => {
          match result.state {
            WorkflowState::Failed => {
              job_state = WorkflowState::Failed;
            }
            WorkflowState::Cancelled => {
              job_state = WorkflowState::Cancelled;
            }
            _ => {}
          }

          steps.push(result);
        }
        Err(err) => {
          log::error!("Step failed: {}", err.to_string());
          if job_state == WorkflowState::InProgress {
            job_state = WorkflowState::Failed;
          }

          steps.push(StepRunResult {
            state: WorkflowState::Failed,
            started_at: Some(started_at),
            ended_at: Some(chrono::Utc::now()),
            error: Some(err),
          });
          break;
        }
      }
    }

    if job_state == WorkflowState::InProgress {
      job_state = WorkflowState::Succeeded;
    }

    let ended_at = chrono::Utc::now();

    context.message_sender.update_job_state(job_state.clone());

    Ok(JobRunResult {
      state: job_state,
      started_at: Some(started_at),
      ended_at: Some(ended_at),
      steps,
    })
  }
}