astro_run/plugins/
plugin_driver.rs1use crate::{
2 Action, JobRunResult, Plugin, RunJobEvent, RunStepEvent, RunWorkflowEvent, Step, StepRunResult,
3 UserActionStep, WorkflowLog, WorkflowRunResult, WorkflowStateEvent,
4};
5use std::sync::Arc;
6
7pub type SharedPluginDriver = Arc<PluginDriver>;
8
9pub struct PluginDriver {
10 pub(crate) plugins: Vec<Box<dyn Plugin>>,
11}
12
13impl PluginDriver {
14 pub fn new(plugins: Vec<Box<dyn Plugin>>) -> Self {
15 PluginDriver { plugins }
16 }
17
18 pub async fn on_state_change(&self, event: WorkflowStateEvent) {
19 for plugin in &self.plugins {
20 if let Err(err) = plugin.on_state_change(event.clone()).await {
21 log::error!(
22 "Plugin {} failed to handle state change: {}",
23 plugin.name(),
24 err
25 );
26 }
27 }
28 }
29
30 pub async fn on_log(&self, log: WorkflowLog) {
31 for plugin in &self.plugins {
32 if let Err(err) = plugin.on_log(log.clone()).await {
33 log::error!("Plugin {} failed to handle log: {}", plugin.name(), err);
34 }
35 }
36 }
37
38 pub async fn on_run_workflow(&self, event: RunWorkflowEvent) {
39 for plugin in &self.plugins {
40 if let Err(err) = plugin.on_run_workflow(event.clone()).await {
41 log::error!(
42 "Plugin {} failed to handle run workflow: {}",
43 plugin.name(),
44 err
45 );
46 }
47 }
48 }
49
50 pub async fn on_run_job(&self, event: RunJobEvent) {
51 for plugin in &self.plugins {
52 if let Err(err) = plugin.on_run_job(event.clone()).await {
53 log::error!("Plugin {} failed to handle run job: {}", plugin.name(), err);
54 }
55 }
56 }
57
58 pub async fn on_run_step(&self, event: RunStepEvent) {
59 for plugin in &self.plugins {
60 if let Err(err) = plugin.on_run_step(event.clone()).await {
61 log::error!(
62 "Plugin {} failed to handle run step: {}",
63 plugin.name(),
64 err
65 );
66 }
67 }
68 }
69
70 pub async fn on_workflow_completed(&self, result: WorkflowRunResult) {
71 for plugin in &self.plugins {
72 if let Err(err) = plugin.on_workflow_completed(result.clone()).await {
73 log::error!(
74 "Plugin {} failed to handle workflow completed: {}",
75 plugin.name(),
76 err
77 );
78 }
79 }
80 }
81
82 pub async fn on_job_completed(&self, result: JobRunResult) {
83 for plugin in &self.plugins {
84 if let Err(err) = plugin.on_job_completed(result.clone()).await {
85 log::error!(
86 "Plugin {} failed to handle job completed: {}",
87 plugin.name(),
88 err
89 );
90 }
91 }
92 }
93
94 pub async fn on_step_completed(&self, result: StepRunResult) {
95 for plugin in &self.plugins {
96 if let Err(err) = plugin.on_step_completed(result.clone()).await {
97 log::error!(
98 "Plugin {} failed to handle step completed: {}",
99 plugin.name(),
100 err
101 );
102 }
103 }
104 }
105
106 pub async fn on_before_run_step(&self, step: Step) -> Step {
107 let mut step = step;
108 for plugin in &self.plugins {
109 match plugin.on_before_run_step(step.clone()).await {
110 Ok(new_step) => step = new_step,
111 Err(err) => {
112 log::error!(
113 "Plugin {} failed to handle before run step: {}",
114 plugin.name(),
115 err
116 );
117 }
118 }
119 }
120
121 step
122 }
123
124 pub async fn on_resolve_dynamic_action(&self, step: UserActionStep) -> Option<Box<dyn Action>> {
125 for plugin in &self.plugins {
126 match plugin.on_resolve_dynamic_action(step.clone()).await {
127 Ok(Some(action)) => return Some(action),
128 Ok(None) => {}
129 Err(err) => {
130 log::error!(
131 "Plugin {} failed to handle resolve dynamic action: {}",
132 plugin.name(),
133 err
134 );
135 }
136 }
137 }
138
139 None
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::{AstroRunPlugin, Error, WorkflowId, WorkflowState, WorkflowStateEvent};
147
148 #[astro_run_test::test]
149 async fn plugin_driver_on_state_change() {
150 let plugin = AstroRunPlugin::builder("test")
151 .on_state_change(|event| {
152 if let WorkflowStateEvent::WorkflowStateUpdated { id, state } = event {
153 assert_eq!(id, WorkflowId::new("test"));
154 assert_eq!(state, WorkflowState::Cancelled);
155
156 Ok(())
157 } else {
158 panic!("Unexpected event type");
159 }
160 })
161 .build();
162
163 let error_plugin = AstroRunPlugin::builder("error")
164 .on_state_change(|_| Err(Error::error("test")))
165 .build();
166
167 let plugin_driver = PluginDriver::new(vec![Box::new(plugin), Box::new(error_plugin)]);
168
169 plugin_driver
170 .on_state_change(WorkflowStateEvent::WorkflowStateUpdated {
171 id: WorkflowId::new("test"),
172 state: WorkflowState::Cancelled,
173 })
174 .await;
175 }
176
177 #[astro_run_test::test]
178 async fn plugin_driver_on_before_run_step() {
179 let plugin = AstroRunPlugin::builder("test")
180 .on_before_run_step(|step| {
181 let mut step = step;
182 step.run = "Updated".to_string();
183
184 Ok(step)
185 })
186 .build();
187
188 let update_name_plugin = AstroRunPlugin::builder("update_name")
189 .on_before_run_step(|step| {
190 let mut step = step;
191 step.name = Some("Updated".to_string());
192
193 Ok(step)
194 })
195 .build();
196
197 let error_plugin = AstroRunPlugin::builder("error")
198 .on_before_run_step(|_| Err(Error::error("test")))
199 .build();
200
201 let plugin_driver = PluginDriver::new(vec![
202 Box::new(plugin),
203 Box::new(error_plugin),
204 Box::new(update_name_plugin),
205 ]);
206
207 plugin_driver
208 .on_before_run_step(Step {
209 ..Default::default()
210 })
211 .await;
212 }
213
214 #[astro_run_test::test]
215 async fn plugin_driver_on_log() {
216 let plugin = AstroRunPlugin::builder("test")
217 .on_log(|log| {
218 assert_eq!(log.message, "test");
219
220 Ok(())
221 })
222 .build();
223
224 let error_plugin = AstroRunPlugin::builder("error")
225 .on_log(|_| Err(Error::error("test")))
226 .build();
227
228 let plugin_driver = PluginDriver::new(vec![Box::new(plugin), Box::new(error_plugin)]);
229
230 plugin_driver
231 .on_log(WorkflowLog {
232 message: "test".to_string(),
233 ..Default::default()
234 })
235 .await;
236 }
237
238 #[astro_run_test::test]
239 async fn test_plugin_trait() {
240 struct TestPlugin;
241
242 impl Plugin for TestPlugin {
243 fn name(&self) -> &'static str {
244 "test"
245 }
246 }
247
248 let plugin_driver = PluginDriver::new(vec![Box::new(TestPlugin)]);
249
250 plugin_driver
251 .on_log(WorkflowLog {
252 message: "test".to_string(),
253 ..Default::default()
254 })
255 .await;
256
257 let action = plugin_driver
258 .on_resolve_dynamic_action(UserActionStep {
259 name: Some("test".to_string()),
260 ..Default::default()
261 })
262 .await;
263
264 assert!(action.is_none());
265 }
266}