mofa_foundation/secretary/monitoring/
plugin.rs1use super::event::{Event, EventStatus, EventType};
7use async_trait::async_trait;
8use mofa_kernel::plugin::{
9 AgentPlugin, PluginContext, PluginMetadata, PluginPriority, PluginResult, PluginState,
10 PluginType,
11};
12use serde::{Deserialize, Serialize};
13use std::any::Any;
14use std::collections::HashMap;
15use tokio::sync::RwLock;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct EventResponseConfig {
20 pub enabled: bool,
22 pub priority: PluginPriority,
24 pub handled_event_types: Vec<EventType>,
26 pub max_impact_scope: String, pub rules: HashMap<String, serde_json::Value>,
30}
31
32impl Default for EventResponseConfig {
33 fn default() -> Self {
34 Self {
35 enabled: true,
36 priority: PluginPriority::Normal,
37 handled_event_types: vec![],
38 max_impact_scope: "component".to_string(),
39 rules: HashMap::new(),
40 }
41 }
42}
43
44#[async_trait]
46pub trait EventResponsePlugin: AgentPlugin {
47 fn config(&self) -> &EventResponseConfig;
49
50 async fn update_config(&mut self, config: EventResponseConfig) -> PluginResult<()>;
52
53 fn can_handle(&self, event: &Event) -> bool;
55
56 async fn handle_event(&mut self, event: Event) -> PluginResult<Event>;
58
59 async fn execute_workflow(&self, event: &Event) -> PluginResult<HashMap<String, String>>;
61}
62
63pub struct BaseEventResponsePlugin {
65 metadata: PluginMetadata,
66 state: PluginState,
67 config: RwLock<EventResponseConfig>,
68 handled_event_types: Vec<EventType>,
70 workflow_steps: Vec<String>,
71}
72
73impl BaseEventResponsePlugin {
74 pub fn new(
76 id: &str,
77 name: &str,
78 handled_event_types: Vec<EventType>,
79 workflow_steps: Vec<String>,
80 ) -> Self {
81 let metadata = PluginMetadata::new(id, name, PluginType::Tool)
82 .with_priority(PluginPriority::Normal)
83 .with_capability("event-response");
84
85 let config = EventResponseConfig {
86 handled_event_types: handled_event_types.clone(),
87 ..Default::default()
88 };
89
90 Self {
91 metadata,
92 state: PluginState::Unloaded,
93 config: RwLock::new(config),
94 handled_event_types,
95 workflow_steps,
96 }
97 }
98
99 pub fn with_priority(mut self, priority: PluginPriority) -> Self {
101 self.metadata = self.metadata.with_priority(priority);
102 self
103 }
104
105 pub fn with_max_impact_scope(self, _scope: &str) -> Self {
107 self
109 }
110}
111
112#[async_trait]
113impl AgentPlugin for BaseEventResponsePlugin {
114 fn metadata(&self) -> &PluginMetadata {
115 &self.metadata
116 }
117
118 fn state(&self) -> PluginState {
119 self.state.clone()
120 }
121
122 async fn load(&mut self, _ctx: &PluginContext) -> PluginResult<()> {
123 self.state = PluginState::Loading;
124 self.state = PluginState::Loaded;
126 Ok(())
127 }
128
129 async fn init_plugin(&mut self) -> PluginResult<()> {
130 Ok(())
132 }
133
134 async fn start(&mut self) -> PluginResult<()> {
135 self.state = PluginState::Running;
136 Ok(())
137 }
138
139 async fn stop(&mut self) -> PluginResult<()> {
140 self.state = PluginState::Paused;
141 Ok(())
142 }
143
144 async fn unload(&mut self) -> PluginResult<()> {
145 self.state = PluginState::Unloaded;
146 Ok(())
148 }
149
150 async fn execute(&mut self, input: String) -> PluginResult<String> {
151 let mut event: Event = serde_json::from_str(&input)?;
153
154 if !self.can_handle(&event) {
156 return Err(anyhow::anyhow!("Cannot handle this event type"));
157 }
158
159 event.update_status(EventStatus::Processing);
161 let processed_event = self.handle_event(event).await?;
162
163 processed_event.to_json().map_err(|e| anyhow::anyhow!(e))
165 }
166
167 fn stats(&self) -> HashMap<String, serde_json::Value> {
168 HashMap::new() }
170
171 fn as_any(&self) -> &dyn Any {
172 self
173 }
174
175 fn as_any_mut(&mut self) -> &mut dyn Any {
176 self
177 }
178
179 fn into_any(self: Box<Self>) -> Box<dyn Any> {
180 self
181 }
182}
183
184#[async_trait]
185impl EventResponsePlugin for BaseEventResponsePlugin {
186 fn config(&self) -> &EventResponseConfig {
187 panic!("config() should be implemented by concrete plugin");
188 }
189
190 async fn update_config(&mut self, config: EventResponseConfig) -> PluginResult<()> {
191 let mut current_config = self.config.write().await;
192 *current_config = config;
193 Ok(())
194 }
195
196 fn can_handle(&self, event: &Event) -> bool {
197 self.handled_event_types.contains(&event.event_type)
199 }
200
201 async fn handle_event(&mut self, mut event: Event) -> PluginResult<Event> {
202 let workflow_result = self.execute_workflow(&event).await?;
204
205 event.update_status(EventStatus::Resolved);
207
208 event.data["workflow_result"] = serde_json::json!(workflow_result);
210
211 Ok(event)
212 }
213
214 async fn execute_workflow(&self, event: &Event) -> PluginResult<HashMap<String, String>> {
215 let mut result = HashMap::new();
217 result.insert("status".to_string(), "handled".to_string());
218 result.insert(
219 "message".to_string(),
220 format!("Event handled by default workflow: {:?}", event.event_type),
221 );
222 Ok(result)
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::secretary::monitoring::event::*;
230
231 #[test]
232 fn test_base_plugin_creation() {
233 let plugin = BaseEventResponsePlugin::new(
234 "test-plugin",
235 "Test Plugin",
236 vec![EventType::ServerFault],
237 vec!["step1".to_string(), "step2".to_string()],
238 );
239
240 assert_eq!(plugin.metadata().id, "test-plugin");
241 assert_eq!(plugin.metadata().name, "Test Plugin");
242 assert_eq!(plugin.state(), PluginState::Unloaded);
243 }
244
245 #[tokio::test]
246 async fn test_can_handle_event() {
247 let plugin = BaseEventResponsePlugin::new(
248 "test-plugin",
249 "Test Plugin",
250 vec![EventType::ServerFault, EventType::ServiceException],
251 vec![],
252 );
253
254 let server_fault_event = Event::new(
255 EventType::ServerFault,
256 EventPriority::High,
257 ImpactScope::Instance("server-01".to_string()),
258 "monitoring".to_string(),
259 "Server down".to_string(),
260 serde_json::Value::Null,
261 );
262
263 let network_attack_event = Event::new(
264 EventType::NetworkAttack,
265 EventPriority::Emergency,
266 ImpactScope::System,
267 "ids".to_string(),
268 "DDoS attack".to_string(),
269 serde_json::Value::Null,
270 );
271
272 assert!(plugin.can_handle(&server_fault_event));
273 assert!(!plugin.can_handle(&network_attack_event));
274 }
275}