Skip to main content

mofa_foundation/secretary/monitoring/
engine.rs

1//! Event handling engine
2//!
3//! This module provides the main event handling engine that manages plugins
4//! and dispatches events to the appropriate handlers.
5
6use super::event::{Event, EventStatus};
7use super::plugin::EventResponsePlugin;
8use super::rule_manager::{RuleAdjustmentStrategy, RuleManager};
9use mofa_kernel::plugin::{AgentPlugin, PluginContext, PluginResult};
10use std::collections::{HashMap, VecDeque};
11use std::sync::Arc;
12use tokio::sync::{RwLock, Semaphore};
13
14/// Event handling engine
15pub struct EventHandlingEngine {
16    /// Rule manager for runtime rule adjustment
17    rule_manager: Arc<RuleManager>,
18    /// Event queue
19    event_queue: Arc<RwLock<VecDeque<Event>>>,
20    /// Plugins registered with the engine
21    plugins: Arc<RwLock<HashMap<String, Box<dyn EventResponsePlugin + Send + Sync>>>>,
22    /// Plugin context
23    plugin_context: PluginContext,
24    /// Maximum concurrent event handlers
25    max_concurrent_handlers: usize,
26    /// Semaphore to control concurrent handlers
27    semaphore: Arc<Semaphore>,
28}
29
30impl EventHandlingEngine {
31    /// Create a new event handling engine
32    pub fn new() -> Self {
33        Self {
34            rule_manager: Arc::new(RuleManager::new()),
35            event_queue: Arc::new(RwLock::new(VecDeque::new())),
36            plugins: Arc::new(RwLock::new(HashMap::new())),
37            plugin_context: PluginContext::new("event-handling-engine"),
38            max_concurrent_handlers: 10,
39            semaphore: Arc::new(Semaphore::new(10)),
40        }
41    }
42
43    /// Create a new event handling engine with custom concurrent handlers limit
44    pub fn with_max_concurrent_handlers(mut self, limit: usize) -> Self {
45        self.max_concurrent_handlers = limit;
46        self.semaphore = Arc::new(Semaphore::new(limit));
47        self
48    }
49
50    /// Set rule adjustment strategy
51    pub async fn set_rule_strategy(&self, strategy: RuleAdjustmentStrategy) {
52        self.rule_manager.set_strategy(strategy).await;
53    }
54
55    /// Register an event response plugin
56    pub async fn register_plugin(&self, plugin: Box<dyn EventResponsePlugin + Send + Sync>) {
57        // Register with the engine
58        let plugin_id = plugin.metadata().id.to_string();
59        let mut plugins = self.plugins.write().await;
60
61        // Ownership of the plugin is transferred to the engine
62        plugins.insert(plugin_id.clone(), plugin);
63
64        // Note: Removed adding to rule manager due to trait object cloning restrictions
65        // We can modify the rule manager to use shared ownership if needed
66    }
67
68    /// Register multiple plugins at once
69    pub async fn register_plugins(&self, plugins: Vec<Box<dyn EventResponsePlugin + Send + Sync>>) {
70        for plugin in plugins {
71            self.register_plugin(plugin).await;
72        }
73    }
74
75    /// Submit an event to be handled
76    pub async fn submit_event(&self, event: Event) {
77        println!(
78            "Submitted new event: [{}] {} - {}",
79            event.priority, event.source, event.description
80        );
81
82        // Lock the queue and add the event
83        let mut queue = self.event_queue.write().await;
84        queue.push_back(event);
85    }
86
87    /// Process the next event in the queue
88    pub async fn process_next_event(&self) -> PluginResult<Option<Event>> {
89        // Acquire a semaphore permit
90        let semaphore = self.semaphore.clone();
91        let _permit = semaphore.acquire().await.unwrap();
92
93        // Get the next event from the queue
94        let mut queue = self.event_queue.write().await;
95        let mut event = match queue.pop_front() {
96            Some(event) => event,
97            None => return Ok(None),
98        };
99
100        // Update event status
101        event.update_status(EventStatus::Processing);
102
103        // Adjust rules based on the event
104        self.rule_manager.adjust_rules(&event).await?;
105
106        // Lock plugins for reading
107        let mut plugins = self.plugins.write().await;
108
109        // Find the first plugin that can handle the event
110        for (_plugin_id, plugin) in plugins.iter_mut() {
111            if plugin.can_handle(&event) {
112                println!(
113                    "Processing event {} with plugin: {}",
114                    event.id,
115                    plugin.metadata().name
116                );
117
118                // Process the event - call handle_event directly on the trait object
119                let processed_event = plugin.handle_event(event).await?;
120
121                println!(
122                    "Event {} processed successfully by plugin {}",
123                    processed_event.id,
124                    plugin.metadata().name
125                );
126
127                return Ok(Some(processed_event));
128            }
129        }
130
131        // No plugin found to handle the event
132        println!("No plugin found to handle event: {}", event.id);
133        event.update_status(EventStatus::ManualInterventionNeeded);
134
135        Ok(Some(event))
136    }
137
138    /// Start the engine and process events continuously
139    pub async fn start(&self) -> PluginResult<()> {
140        println!(
141            "Starting event handling engine with {} concurrent handlers...",
142            self.max_concurrent_handlers
143        );
144
145        // Keep processing events forever
146        loop {
147            match self.process_next_event().await {
148                Ok(Some(event)) => {
149                    // Event processed, do any post-processing if needed
150                    if event.status == EventStatus::Resolved {
151                        println!("Event resolved: {}", event.id);
152                    } else {
153                        println!("Event {} status: {:?}", event.id, event.status);
154                    }
155                }
156                Ok(None) => {
157                    // No events in queue, wait a bit before checking again
158                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
159                }
160                Err(err) => {
161                    // Handle error
162                    println!("Error processing event: {}", err);
163                    // Continue processing
164                    tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
165                }
166            }
167        }
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::secretary::monitoring::event::*;
175    use crate::secretary::monitoring::plugins::{
176        NetworkAttackResponsePlugin, ServerFaultResponsePlugin,
177    };
178
179    #[tokio::test]
180    async fn test_event_handling() {
181        // Create event engine
182        let engine = EventHandlingEngine::new();
183
184        // Create plugins
185        let server_fault_plugin = ServerFaultResponsePlugin::new();
186        let network_attack_plugin = NetworkAttackResponsePlugin::new();
187
188        // Register plugins with explicit boxing
189        engine
190            .register_plugins(vec![
191                Box::new(server_fault_plugin) as Box<dyn EventResponsePlugin + Send + Sync>,
192                Box::new(network_attack_plugin) as Box<dyn EventResponsePlugin + Send + Sync>,
193            ])
194            .await;
195
196        // Create server fault event
197        let server_fault_event = Event::new(
198            EventType::ServerFault,
199            EventPriority::High,
200            ImpactScope::Instance("web-server-01".to_string()),
201            "monitoring-agent".to_string(),
202            "Server unresponsive".to_string(),
203            serde_json::json!({ "server": "web-server-01" }),
204        );
205
206        // Create network attack event
207        let network_attack_event = Event::new(
208            EventType::NetworkAttack,
209            EventPriority::Emergency,
210            ImpactScope::Service("api-gateway".to_string()),
211            "ids".to_string(),
212            "DDoS attack".to_string(),
213            serde_json::json!({ "source_ip": "10.0.0.1" }),
214        );
215
216        // Submit both events
217        engine.submit_event(server_fault_event).await;
218        engine.submit_event(network_attack_event).await;
219
220        // Process the first event
221        let result = engine.process_next_event().await;
222        assert!(result.is_ok());
223
224        // Check that event was processed
225        if let Ok(Some(event)) = result {
226            assert!(matches!(event.status, EventStatus::Resolved));
227        }
228    }
229}