astro_run/plugins/
plugin_driver.rs

1use crate::{
2  Action, JobRunResult, Plugin, RunJobEvent, RunStepEvent, RunWorkflowEvent, Step, StepRunResult,
3  UserActionStep, WorkflowLog, WorkflowRunResult, WorkflowStateEvent,
4};
5use std::sync::Arc;
6
7pub type SharedPluginDriver = Arc<PluginDriver>;
8
9pub struct PluginDriver {
10  pub(crate) plugins: Vec<Box<dyn Plugin>>,
11}
12
13impl PluginDriver {
14  pub fn new(plugins: Vec<Box<dyn Plugin>>) -> Self {
15    PluginDriver { plugins }
16  }
17
18  pub async fn on_state_change(&self, event: WorkflowStateEvent) {
19    for plugin in &self.plugins {
20      if let Err(err) = plugin.on_state_change(event.clone()).await {
21        log::error!(
22          "Plugin {} failed to handle state change: {}",
23          plugin.name(),
24          err
25        );
26      }
27    }
28  }
29
30  pub async fn on_log(&self, log: WorkflowLog) {
31    for plugin in &self.plugins {
32      if let Err(err) = plugin.on_log(log.clone()).await {
33        log::error!("Plugin {} failed to handle log: {}", plugin.name(), err);
34      }
35    }
36  }
37
38  pub async fn on_run_workflow(&self, event: RunWorkflowEvent) {
39    for plugin in &self.plugins {
40      if let Err(err) = plugin.on_run_workflow(event.clone()).await {
41        log::error!(
42          "Plugin {} failed to handle run workflow: {}",
43          plugin.name(),
44          err
45        );
46      }
47    }
48  }
49
50  pub async fn on_run_job(&self, event: RunJobEvent) {
51    for plugin in &self.plugins {
52      if let Err(err) = plugin.on_run_job(event.clone()).await {
53        log::error!("Plugin {} failed to handle run job: {}", plugin.name(), err);
54      }
55    }
56  }
57
58  pub async fn on_run_step(&self, event: RunStepEvent) {
59    for plugin in &self.plugins {
60      if let Err(err) = plugin.on_run_step(event.clone()).await {
61        log::error!(
62          "Plugin {} failed to handle run step: {}",
63          plugin.name(),
64          err
65        );
66      }
67    }
68  }
69
70  pub async fn on_workflow_completed(&self, result: WorkflowRunResult) {
71    for plugin in &self.plugins {
72      if let Err(err) = plugin.on_workflow_completed(result.clone()).await {
73        log::error!(
74          "Plugin {} failed to handle workflow completed: {}",
75          plugin.name(),
76          err
77        );
78      }
79    }
80  }
81
82  pub async fn on_job_completed(&self, result: JobRunResult) {
83    for plugin in &self.plugins {
84      if let Err(err) = plugin.on_job_completed(result.clone()).await {
85        log::error!(
86          "Plugin {} failed to handle job completed: {}",
87          plugin.name(),
88          err
89        );
90      }
91    }
92  }
93
94  pub async fn on_step_completed(&self, result: StepRunResult) {
95    for plugin in &self.plugins {
96      if let Err(err) = plugin.on_step_completed(result.clone()).await {
97        log::error!(
98          "Plugin {} failed to handle step completed: {}",
99          plugin.name(),
100          err
101        );
102      }
103    }
104  }
105
106  pub async fn on_before_run_step(&self, step: Step) -> Step {
107    let mut step = step;
108    for plugin in &self.plugins {
109      match plugin.on_before_run_step(step.clone()).await {
110        Ok(new_step) => step = new_step,
111        Err(err) => {
112          log::error!(
113            "Plugin {} failed to handle before run step: {}",
114            plugin.name(),
115            err
116          );
117        }
118      }
119    }
120
121    step
122  }
123
124  pub async fn on_resolve_dynamic_action(&self, step: UserActionStep) -> Option<Box<dyn Action>> {
125    for plugin in &self.plugins {
126      match plugin.on_resolve_dynamic_action(step.clone()).await {
127        Ok(Some(action)) => return Some(action),
128        Ok(None) => {}
129        Err(err) => {
130          log::error!(
131            "Plugin {} failed to handle resolve dynamic action: {}",
132            plugin.name(),
133            err
134          );
135        }
136      }
137    }
138
139    None
140  }
141}
142
143#[cfg(test)]
144mod tests {
145  use super::*;
146  use crate::{AstroRunPlugin, Error, WorkflowId, WorkflowState, WorkflowStateEvent};
147
148  #[astro_run_test::test]
149  async fn plugin_driver_on_state_change() {
150    let plugin = AstroRunPlugin::builder("test")
151      .on_state_change(|event| {
152        if let WorkflowStateEvent::WorkflowStateUpdated { id, state } = event {
153          assert_eq!(id, WorkflowId::new("test"));
154          assert_eq!(state, WorkflowState::Cancelled);
155
156          Ok(())
157        } else {
158          panic!("Unexpected event type");
159        }
160      })
161      .build();
162
163    let error_plugin = AstroRunPlugin::builder("error")
164      .on_state_change(|_| Err(Error::error("test")))
165      .build();
166
167    let plugin_driver = PluginDriver::new(vec![Box::new(plugin), Box::new(error_plugin)]);
168
169    plugin_driver
170      .on_state_change(WorkflowStateEvent::WorkflowStateUpdated {
171        id: WorkflowId::new("test"),
172        state: WorkflowState::Cancelled,
173      })
174      .await;
175  }
176
177  #[astro_run_test::test]
178  async fn plugin_driver_on_before_run_step() {
179    let plugin = AstroRunPlugin::builder("test")
180      .on_before_run_step(|step| {
181        let mut step = step;
182        step.run = "Updated".to_string();
183
184        Ok(step)
185      })
186      .build();
187
188    let update_name_plugin = AstroRunPlugin::builder("update_name")
189      .on_before_run_step(|step| {
190        let mut step = step;
191        step.name = Some("Updated".to_string());
192
193        Ok(step)
194      })
195      .build();
196
197    let error_plugin = AstroRunPlugin::builder("error")
198      .on_before_run_step(|_| Err(Error::error("test")))
199      .build();
200
201    let plugin_driver = PluginDriver::new(vec![
202      Box::new(plugin),
203      Box::new(error_plugin),
204      Box::new(update_name_plugin),
205    ]);
206
207    plugin_driver
208      .on_before_run_step(Step {
209        ..Default::default()
210      })
211      .await;
212  }
213
214  #[astro_run_test::test]
215  async fn plugin_driver_on_log() {
216    let plugin = AstroRunPlugin::builder("test")
217      .on_log(|log| {
218        assert_eq!(log.message, "test");
219
220        Ok(())
221      })
222      .build();
223
224    let error_plugin = AstroRunPlugin::builder("error")
225      .on_log(|_| Err(Error::error("test")))
226      .build();
227
228    let plugin_driver = PluginDriver::new(vec![Box::new(plugin), Box::new(error_plugin)]);
229
230    plugin_driver
231      .on_log(WorkflowLog {
232        message: "test".to_string(),
233        ..Default::default()
234      })
235      .await;
236  }
237
238  #[astro_run_test::test]
239  async fn test_plugin_trait() {
240    struct TestPlugin;
241
242    impl Plugin for TestPlugin {
243      fn name(&self) -> &'static str {
244        "test"
245      }
246    }
247
248    let plugin_driver = PluginDriver::new(vec![Box::new(TestPlugin)]);
249
250    plugin_driver
251      .on_log(WorkflowLog {
252        message: "test".to_string(),
253        ..Default::default()
254      })
255      .await;
256
257    let action = plugin_driver
258      .on_resolve_dynamic_action(UserActionStep {
259        name: Some("test".to_string()),
260        ..Default::default()
261      })
262      .await;
263
264    assert!(action.is_none());
265  }
266}