coodev-runner 0.1.42

A simple runner for coodev
Documentation
use crate::{Id, StepLog, StepNumber, WorkflowMessage, WorkflowState};
use parking_lot::Mutex;
use std::sync::Arc;

pub type EventHandler = Box<dyn Fn(&WorkflowMessage) + Send + Sync>;

#[derive(Clone)]
pub struct MessageSender {
  // TODO: improve performance
  handlers: Arc<Mutex<Vec<EventHandler>>>,
}

impl MessageSender {
  pub fn new() -> Self {
    Self {
      handlers: Arc::new(Mutex::new(vec![])),
    }
  }

  pub fn register(&mut self, handler: EventHandler) {
    self.handlers.lock().push(handler);
  }

  pub fn send(&self, message: &WorkflowMessage) -> anyhow::Result<()> {
    let handlers = self.handlers.lock();

    for handler in &*handlers {
      handler(message);
    }

    Ok(())
  }
}

#[derive(Clone)]
pub struct WorkflowMessageSender {
  workflow_run_id: Id,
  sender: MessageSender,
}

#[derive(Clone)]
pub struct JobMessageSender {
  workflow_run_id: Id,
  job_id: Id,
  sender: MessageSender,
}

#[derive(Clone)]
pub struct StepMessageSender {
  workflow_run_id: Id,
  job_id: Id,
  step_number: StepNumber,
  sender: MessageSender,
}

impl WorkflowMessageSender {
  pub fn new(workflow_run_id: Id, sender: MessageSender) -> Self {
    Self {
      workflow_run_id,
      sender,
    }
  }

  pub fn update_workflow_state(&self, state: WorkflowState) {
    self.send(WorkflowMessage::WorkflowStateUpdated {
      id: self.workflow_run_id.clone(),
      state,
    });
  }

  pub fn create_job_message_sender(&self, job_id: Id) -> JobMessageSender {
    JobMessageSender::new(self.workflow_run_id.clone(), job_id, self.sender.clone())
  }

  fn send(&self, message: WorkflowMessage) {
    if let Err(err) = self.sender.send(&message) {
      log::error!("Failed to send workflow message: {}", err.to_string());
    }
  }
}

impl JobMessageSender {
  pub fn new(workflow_run_id: Id, job_id: Id, sender: MessageSender) -> Self {
    Self {
      workflow_run_id,
      job_id,
      sender,
    }
  }

  pub fn update_job_state(&self, state: WorkflowState) {
    self.send(WorkflowMessage::JobStateUpdated {
      workflow_run_id: self.workflow_run_id.clone(),
      job_id: self.job_id.clone(),
      state,
    });
  }

  pub fn create_step_message_sender(&self, step_number: StepNumber) -> StepMessageSender {
    StepMessageSender::new(
      self.workflow_run_id.clone(),
      self.job_id.clone(),
      step_number,
      self.sender.clone(),
    )
  }

  fn send(&self, message: WorkflowMessage) {
    if let Err(err) = self.sender.send(&message) {
      log::error!("Failed to send job message: {}", err.to_string());
    }
  }
}

impl StepMessageSender {
  pub fn new(
    workflow_run_id: Id,
    job_id: Id,
    step_number: StepNumber,
    sender: MessageSender,
  ) -> Self {
    Self {
      workflow_run_id,
      job_id,
      step_number,
      sender,
    }
  }

  pub fn update_step_state(&self, state: WorkflowState) {
    self.send(WorkflowMessage::StepStateUpdated {
      workflow_run_id: self.workflow_run_id.clone(),
      job_id: self.job_id.clone(),
      number: self.step_number,
      state,
    });
  }

  pub fn send_log(&self, log: StepLog) {
    self.send(WorkflowMessage::Log {
      workflow_run_id: self.workflow_run_id.clone(),
      job_id: self.job_id.clone(),
      number: self.step_number,
      message: log.message,
      log_type: log.log_type,
      time: log.time,
    });
  }

  fn send(&self, message: WorkflowMessage) {
    if let Err(err) = self.sender.send(&message) {
      log::error!("Failed to send step message: {}", err.to_string());
    }
  }
}