coodev-runner 0.1.42

A simple runner for coodev
Documentation
use crate::{
  error,
  runner::{Job, JobContext, WorkflowRunContext},
  JobId, JobRunResult, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{
  fs,
  sync::mpsc::{channel, Sender},
};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Workflow {
  pub name: Option<String>,
  pub on: Option<WorkflowTriggerEvents>,
  pub jobs: HashMap<String, Job>,
}

#[derive(Debug)]
pub struct RunJobResult(String, JobRunResult);

impl Workflow {
  pub async fn run(&self, context: WorkflowRunContext) -> crate::Result<WorkflowRunResult> {
    let started_at = chrono::Utc::now();
    context
      .message_sender
      .update_workflow_state(WorkflowState::InProgress);

    // TODO: Implement workflow skipping
    // if self.on.is_some() && !self.on.as_ref().unwrap().contains(&context.event) {
    //   // Skip workflow
    //   context
    //     .message_sender
    //     .update_workflow_state(WorkflowState::Skipped);

    //   return Ok(WorkflowRunResult {
    //     state: WorkflowState::Skipped,
    //     started_at: Some(started_at),
    //     ended_at: Some(chrono::Utc::now()),
    //     jobs: HashMap::new(),
    //   });
    // }

    let mut workflow_state = WorkflowState::InProgress;
    let (sender, mut receiver) = channel::<RunJobResult>(10);

    let mut waiting_jobs: Vec<(JobId, Job)> = vec![];
    let mut job_results: HashMap<String, JobRunResult> = HashMap::new();

    let context = Arc::new(context);
    for (key, job) in self.jobs.iter() {
      let key = key.clone();
      let job = job.clone();
      let context = context.clone();

      if let Some(depends_on) = &job.depends_on {
        for depends_on_key in depends_on {
          if !self.jobs.contains_key(depends_on_key) {
            log::error!(
              "Job {} depends on job {} which does not exist",
              key,
              depends_on_key
            );
            workflow_state = WorkflowState::Failed;
            break;
          }
        }

        waiting_jobs.push((key, job));
        continue;
      }
      self.run_job(key.clone(), job.clone(), context.clone(), sender.clone());
    }

    let total_jobs = self.jobs.len();

    // If there are no jobs to run, we are done
    while let Some(RunJobResult(key, job_result)) = receiver.recv().await {
      if job_result.state == WorkflowState::Failed {
        workflow_state = WorkflowState::Failed;
      } else if job_result.state == WorkflowState::Cancelled {
        workflow_state = WorkflowState::Cancelled;
      }

      job_results.insert(key, job_result);

      if job_results.len() == total_jobs {
        if workflow_state == WorkflowState::InProgress {
          workflow_state = WorkflowState::Succeeded;
        }
        break;
      }

      for (job_id, job) in waiting_jobs.iter() {
        if let Some(depends_on) = &job.depends_on {
          let mut all_finished = true;
          for depends_on_key in depends_on {
            if !job_results.contains_key(depends_on_key) {
              all_finished = false;
              break;
            }
          }

          if all_finished {
            self.run_job(job_id.clone(), job.clone(), context.clone(), sender.clone());
          }
        }
      }
    }

    let ended_at = chrono::Utc::now();

    context
      .message_sender
      .update_workflow_state(workflow_state.clone());

    log::info!(
      "Duration: {:?}ms",
      ended_at.timestamp_millis() - started_at.timestamp_millis()
    );

    Ok(WorkflowRunResult {
      state: workflow_state,
      started_at: Some(started_at),
      ended_at: Some(ended_at),
      jobs: HashMap::new(),
    })
  }

  fn run_job(
    &self,
    key: String,
    job: Job,
    workflow_run_context: Arc<WorkflowRunContext>,
    sender: Sender<RunJobResult>,
  ) {
    tokio::spawn(async move {
      let workflow_working_dir = workflow_run_context.working_dir.clone();
      let job_dirname = format!("workflow-job-{}", key.clone());
      let host_working_dir = workflow_working_dir.join(&job_dirname);
      let host_user_dir = host_working_dir.join("data");

      // Create host working dir
      if let Err(err) = fs::create_dir_all(&host_user_dir).await.map_err(|err| {
        error::Error::io_error(
          err,
          format!("Failed to create host working dir on job {}", key.clone()),
        )
      }) {
        log::error!("Job error: {}", err.to_string());
        let res = JobRunResult {
          state: WorkflowState::Failed,
          started_at: None,
          ended_at: None,
          steps: vec![],
        };
        if let Err(err) = sender.send(RunJobResult(key, res)).await {
          log::error!("Failed to send job result: {}", err.to_string());
        }
        return;
      };

      // Create working dirs
      let mut working_dir_maps: Vec<(PathBuf, String)> = vec![];
      if let Some(working_dirs) = &job.working_dirs {
        for (index, working_dir) in working_dirs.iter().enumerate() {
          let host_dir = host_working_dir.join(format!("working-dir-{}", index));
          if let Err(err) = fs::create_dir_all(&host_dir).await.map_err(|err| {
            error::Error::io_error(
              err,
              format!("Failed to create job working dirs {}", working_dir),
            )
          }) {
            log::error!("Job error: {}", err.to_string());
            let res = JobRunResult {
              state: WorkflowState::Failed,
              started_at: None,
              ended_at: None,
              steps: vec![],
            };
            if let Err(err) = sender.send(RunJobResult(key, res)).await {
              log::error!("Failed to send job result: {}", err.to_string());
            }
            return;
          };
          working_dir_maps.push((host_dir, working_dir.clone()));
        }
      }

      let context = JobContext {
        id: key.clone(),
        message_sender: workflow_run_context
          .message_sender
          .create_job_message_sender(key.clone()),
        workflow_run_context,
        host_working_dir: host_working_dir.clone(),
        host_user_dir,
        working_dir_maps,
      };
      match job.run(context).await {
        Ok(job_result) => {
          if let Err(err) = sender.send(RunJobResult(key, job_result)).await {
            log::error!("Failed to send job result: {}", err.to_string());
          }
        }
        Err(err) => {
          log::error!("Job error: {}", err.to_string());
          let res = JobRunResult {
            state: WorkflowState::Failed,
            started_at: None,
            ended_at: None,
            steps: vec![],
          };
          if let Err(err) = sender.send(RunJobResult(key, res)).await {
            log::error!("Failed to send job result: {}", err.to_string());
          }
        }
      };
    });
  }
}