astro-run 1.0.0

A highly customizable workflow orchestrator
Documentation
use crate::{
  Action, JobRunResult, Plugin, RunJobEvent, RunStepEvent, RunWorkflowEvent, Step, StepRunResult,
  UserActionStep, WorkflowLog, WorkflowRunResult, WorkflowStateEvent,
};
use std::sync::Arc;

pub type SharedPluginDriver = Arc<PluginDriver>;

pub struct PluginDriver {
  pub(crate) plugins: Vec<Box<dyn Plugin>>,
}

impl PluginDriver {
  pub fn new(plugins: Vec<Box<dyn Plugin>>) -> Self {
    PluginDriver { plugins }
  }

  pub async fn on_state_change(&self, event: WorkflowStateEvent) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_state_change(event.clone()).await {
        log::error!(
          "Plugin {} failed to handle state change: {}",
          plugin.name(),
          err
        );
      }
    }
  }

  pub async fn on_log(&self, log: WorkflowLog) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_log(log.clone()).await {
        log::error!("Plugin {} failed to handle log: {}", plugin.name(), err);
      }
    }
  }

  pub async fn on_run_workflow(&self, event: RunWorkflowEvent) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_run_workflow(event.clone()).await {
        log::error!(
          "Plugin {} failed to handle run workflow: {}",
          plugin.name(),
          err
        );
      }
    }
  }

  pub async fn on_run_job(&self, event: RunJobEvent) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_run_job(event.clone()).await {
        log::error!("Plugin {} failed to handle run job: {}", plugin.name(), err);
      }
    }
  }

  pub async fn on_run_step(&self, event: RunStepEvent) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_run_step(event.clone()).await {
        log::error!(
          "Plugin {} failed to handle run step: {}",
          plugin.name(),
          err
        );
      }
    }
  }

  pub async fn on_workflow_completed(&self, result: WorkflowRunResult) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_workflow_completed(result.clone()).await {
        log::error!(
          "Plugin {} failed to handle workflow completed: {}",
          plugin.name(),
          err
        );
      }
    }
  }

  pub async fn on_job_completed(&self, result: JobRunResult) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_job_completed(result.clone()).await {
        log::error!(
          "Plugin {} failed to handle job completed: {}",
          plugin.name(),
          err
        );
      }
    }
  }

  pub async fn on_step_completed(&self, result: StepRunResult) {
    for plugin in &self.plugins {
      if let Err(err) = plugin.on_step_completed(result.clone()).await {
        log::error!(
          "Plugin {} failed to handle step completed: {}",
          plugin.name(),
          err
        );
      }
    }
  }

  pub async fn on_before_run_step(&self, step: Step) -> Step {
    let mut step = step;
    for plugin in &self.plugins {
      match plugin.on_before_run_step(step.clone()).await {
        Ok(new_step) => step = new_step,
        Err(err) => {
          log::error!(
            "Plugin {} failed to handle before run step: {}",
            plugin.name(),
            err
          );
        }
      }
    }

    step
  }

  pub async fn on_resolve_dynamic_action(&self, step: UserActionStep) -> Option<Box<dyn Action>> {
    for plugin in &self.plugins {
      match plugin.on_resolve_dynamic_action(step.clone()).await {
        Ok(Some(action)) => return Some(action),
        Ok(None) => {}
        Err(err) => {
          log::error!(
            "Plugin {} failed to handle resolve dynamic action: {}",
            plugin.name(),
            err
          );
        }
      }
    }

    None
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use crate::{AstroRunPlugin, Error, WorkflowId, WorkflowState, WorkflowStateEvent};

  #[astro_run_test::test]
  async fn plugin_driver_on_state_change() {
    let plugin = AstroRunPlugin::builder("test")
      .on_state_change(|event| {
        if let WorkflowStateEvent::WorkflowStateUpdated { id, state } = event {
          assert_eq!(id, WorkflowId::new("test"));
          assert_eq!(state, WorkflowState::Cancelled);

          Ok(())
        } else {
          panic!("Unexpected event type");
        }
      })
      .build();

    let error_plugin = AstroRunPlugin::builder("error")
      .on_state_change(|_| Err(Error::error("test")))
      .build();

    let plugin_driver = PluginDriver::new(vec![Box::new(plugin), Box::new(error_plugin)]);

    plugin_driver
      .on_state_change(WorkflowStateEvent::WorkflowStateUpdated {
        id: WorkflowId::new("test"),
        state: WorkflowState::Cancelled,
      })
      .await;
  }

  #[astro_run_test::test]
  async fn plugin_driver_on_before_run_step() {
    let plugin = AstroRunPlugin::builder("test")
      .on_before_run_step(|step| {
        let mut step = step;
        step.run = "Updated".to_string();

        Ok(step)
      })
      .build();

    let update_name_plugin = AstroRunPlugin::builder("update_name")
      .on_before_run_step(|step| {
        let mut step = step;
        step.name = Some("Updated".to_string());

        Ok(step)
      })
      .build();

    let error_plugin = AstroRunPlugin::builder("error")
      .on_before_run_step(|_| Err(Error::error("test")))
      .build();

    let plugin_driver = PluginDriver::new(vec![
      Box::new(plugin),
      Box::new(error_plugin),
      Box::new(update_name_plugin),
    ]);

    plugin_driver
      .on_before_run_step(Step {
        ..Default::default()
      })
      .await;
  }

  #[astro_run_test::test]
  async fn plugin_driver_on_log() {
    let plugin = AstroRunPlugin::builder("test")
      .on_log(|log| {
        assert_eq!(log.message, "test");

        Ok(())
      })
      .build();

    let error_plugin = AstroRunPlugin::builder("error")
      .on_log(|_| Err(Error::error("test")))
      .build();

    let plugin_driver = PluginDriver::new(vec![Box::new(plugin), Box::new(error_plugin)]);

    plugin_driver
      .on_log(WorkflowLog {
        message: "test".to_string(),
        ..Default::default()
      })
      .await;
  }

  #[astro_run_test::test]
  async fn test_plugin_trait() {
    struct TestPlugin;

    impl Plugin for TestPlugin {
      fn name(&self) -> &'static str {
        "test"
      }
    }

    let plugin_driver = PluginDriver::new(vec![Box::new(TestPlugin)]);

    plugin_driver
      .on_log(WorkflowLog {
        message: "test".to_string(),
        ..Default::default()
      })
      .await;

    let action = plugin_driver
      .on_resolve_dynamic_action(UserActionStep {
        name: Some("test".to_string()),
        ..Default::default()
      })
      .await;

    assert!(action.is_none());
  }
}