mofa_foundation/secretary/monitoring/
engine.rs1use super::event::{Event, EventStatus};
7use super::plugin::EventResponsePlugin;
8use super::rule_manager::{RuleAdjustmentStrategy, RuleManager};
9use mofa_kernel::plugin::{AgentPlugin, PluginContext, PluginResult};
10use std::collections::{HashMap, VecDeque};
11use std::sync::Arc;
12use tokio::sync::{RwLock, Semaphore};
13
14pub struct EventHandlingEngine {
16 rule_manager: Arc<RuleManager>,
18 event_queue: Arc<RwLock<VecDeque<Event>>>,
20 plugins: Arc<RwLock<HashMap<String, Box<dyn EventResponsePlugin + Send + Sync>>>>,
22 plugin_context: PluginContext,
24 max_concurrent_handlers: usize,
26 semaphore: Arc<Semaphore>,
28}
29
30impl EventHandlingEngine {
31 pub fn new() -> Self {
33 Self {
34 rule_manager: Arc::new(RuleManager::new()),
35 event_queue: Arc::new(RwLock::new(VecDeque::new())),
36 plugins: Arc::new(RwLock::new(HashMap::new())),
37 plugin_context: PluginContext::new("event-handling-engine"),
38 max_concurrent_handlers: 10,
39 semaphore: Arc::new(Semaphore::new(10)),
40 }
41 }
42
43 pub fn with_max_concurrent_handlers(mut self, limit: usize) -> Self {
45 self.max_concurrent_handlers = limit;
46 self.semaphore = Arc::new(Semaphore::new(limit));
47 self
48 }
49
50 pub async fn set_rule_strategy(&self, strategy: RuleAdjustmentStrategy) {
52 self.rule_manager.set_strategy(strategy).await;
53 }
54
55 pub async fn register_plugin(&self, plugin: Box<dyn EventResponsePlugin + Send + Sync>) {
57 let plugin_id = plugin.metadata().id.to_string();
59 let mut plugins = self.plugins.write().await;
60
61 plugins.insert(plugin_id.clone(), plugin);
63
64 }
67
68 pub async fn register_plugins(&self, plugins: Vec<Box<dyn EventResponsePlugin + Send + Sync>>) {
70 for plugin in plugins {
71 self.register_plugin(plugin).await;
72 }
73 }
74
75 pub async fn submit_event(&self, event: Event) {
77 println!(
78 "Submitted new event: [{}] {} - {}",
79 event.priority, event.source, event.description
80 );
81
82 let mut queue = self.event_queue.write().await;
84 queue.push_back(event);
85 }
86
87 pub async fn process_next_event(&self) -> PluginResult<Option<Event>> {
89 let semaphore = self.semaphore.clone();
91 let _permit = semaphore.acquire().await.unwrap();
92
93 let mut queue = self.event_queue.write().await;
95 let mut event = match queue.pop_front() {
96 Some(event) => event,
97 None => return Ok(None),
98 };
99
100 event.update_status(EventStatus::Processing);
102
103 self.rule_manager.adjust_rules(&event).await?;
105
106 let mut plugins = self.plugins.write().await;
108
109 for (_plugin_id, plugin) in plugins.iter_mut() {
111 if plugin.can_handle(&event) {
112 println!(
113 "Processing event {} with plugin: {}",
114 event.id,
115 plugin.metadata().name
116 );
117
118 let processed_event = plugin.handle_event(event).await?;
120
121 println!(
122 "Event {} processed successfully by plugin {}",
123 processed_event.id,
124 plugin.metadata().name
125 );
126
127 return Ok(Some(processed_event));
128 }
129 }
130
131 println!("No plugin found to handle event: {}", event.id);
133 event.update_status(EventStatus::ManualInterventionNeeded);
134
135 Ok(Some(event))
136 }
137
138 pub async fn start(&self) -> PluginResult<()> {
140 println!(
141 "Starting event handling engine with {} concurrent handlers...",
142 self.max_concurrent_handlers
143 );
144
145 loop {
147 match self.process_next_event().await {
148 Ok(Some(event)) => {
149 if event.status == EventStatus::Resolved {
151 println!("Event resolved: {}", event.id);
152 } else {
153 println!("Event {} status: {:?}", event.id, event.status);
154 }
155 }
156 Ok(None) => {
157 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
159 }
160 Err(err) => {
161 println!("Error processing event: {}", err);
163 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
165 }
166 }
167 }
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::secretary::monitoring::event::*;
175 use crate::secretary::monitoring::plugins::{
176 NetworkAttackResponsePlugin, ServerFaultResponsePlugin,
177 };
178
179 #[tokio::test]
180 async fn test_event_handling() {
181 let engine = EventHandlingEngine::new();
183
184 let server_fault_plugin = ServerFaultResponsePlugin::new();
186 let network_attack_plugin = NetworkAttackResponsePlugin::new();
187
188 engine
190 .register_plugins(vec![
191 Box::new(server_fault_plugin) as Box<dyn EventResponsePlugin + Send + Sync>,
192 Box::new(network_attack_plugin) as Box<dyn EventResponsePlugin + Send + Sync>,
193 ])
194 .await;
195
196 let server_fault_event = Event::new(
198 EventType::ServerFault,
199 EventPriority::High,
200 ImpactScope::Instance("web-server-01".to_string()),
201 "monitoring-agent".to_string(),
202 "Server unresponsive".to_string(),
203 serde_json::json!({ "server": "web-server-01" }),
204 );
205
206 let network_attack_event = Event::new(
208 EventType::NetworkAttack,
209 EventPriority::Emergency,
210 ImpactScope::Service("api-gateway".to_string()),
211 "ids".to_string(),
212 "DDoS attack".to_string(),
213 serde_json::json!({ "source_ip": "10.0.0.1" }),
214 );
215
216 engine.submit_event(server_fault_event).await;
218 engine.submit_event(network_attack_event).await;
219
220 let result = engine.process_next_event().await;
222 assert!(result.is_ok());
223
224 if let Ok(Some(event)) = result {
226 assert!(matches!(event.status, EventStatus::Resolved));
227 }
228 }
229}