Skip to main content

mofa_foundation/secretary/monitoring/
plugin.rs

1//! Event response plugin system
2//!
3//! This module defines the event response plugin interface and basic
4//! implementation for handling various operational events.
5
6use super::event::{Event, EventStatus, EventType};
7use async_trait::async_trait;
8use mofa_kernel::plugin::{
9    AgentPlugin, PluginContext, PluginMetadata, PluginPriority, PluginResult, PluginState,
10    PluginType,
11};
12use serde::{Deserialize, Serialize};
13use std::any::Any;
14use std::collections::HashMap;
15use tokio::sync::RwLock;
16
17/// Event response plugin configuration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct EventResponseConfig {
20    /// Whether the plugin is enabled
21    pub enabled: bool,
22    /// Plugin priority
23    pub priority: PluginPriority,
24    /// Event types handled by this plugin
25    pub handled_event_types: Vec<EventType>,
26    /// Maximum impact scope this plugin can handle
27    pub max_impact_scope: String, // Using string for flexibility
28    /// Rule configuration
29    pub rules: HashMap<String, serde_json::Value>,
30}
31
32impl Default for EventResponseConfig {
33    fn default() -> Self {
34        Self {
35            enabled: true,
36            priority: PluginPriority::Normal,
37            handled_event_types: vec![],
38            max_impact_scope: "component".to_string(),
39            rules: HashMap::new(),
40        }
41    }
42}
43
44/// Event response plugin trait
45#[async_trait]
46pub trait EventResponsePlugin: AgentPlugin {
47    /// Get the plugin configuration
48    fn config(&self) -> &EventResponseConfig;
49
50    /// Update the plugin configuration at runtime
51    async fn update_config(&mut self, config: EventResponseConfig) -> PluginResult<()>;
52
53    /// Check if this plugin can handle the given event
54    fn can_handle(&self, event: &Event) -> bool;
55
56    /// Handle the event
57    async fn handle_event(&mut self, event: Event) -> PluginResult<Event>;
58
59    /// Execute the response workflow for the event
60    async fn execute_workflow(&self, event: &Event) -> PluginResult<HashMap<String, String>>;
61}
62
63/// Base implementation of EventResponsePlugin
64pub struct BaseEventResponsePlugin {
65    metadata: PluginMetadata,
66    state: PluginState,
67    config: RwLock<EventResponseConfig>,
68    /// Cached handled event types for synchronous access
69    handled_event_types: Vec<EventType>,
70    workflow_steps: Vec<String>,
71}
72
73impl BaseEventResponsePlugin {
74    /// Create a new base event response plugin
75    pub fn new(
76        id: &str,
77        name: &str,
78        handled_event_types: Vec<EventType>,
79        workflow_steps: Vec<String>,
80    ) -> Self {
81        let metadata = PluginMetadata::new(id, name, PluginType::Tool)
82            .with_priority(PluginPriority::Normal)
83            .with_capability("event-response");
84
85        let config = EventResponseConfig {
86            handled_event_types: handled_event_types.clone(),
87            ..Default::default()
88        };
89
90        Self {
91            metadata,
92            state: PluginState::Unloaded,
93            config: RwLock::new(config),
94            handled_event_types,
95            workflow_steps,
96        }
97    }
98
99    /// Set the plugin priority
100    pub fn with_priority(mut self, priority: PluginPriority) -> Self {
101        self.metadata = self.metadata.with_priority(priority);
102        self
103    }
104
105    /// Set the max impact scope
106    pub fn with_max_impact_scope(self, _scope: &str) -> Self {
107        // Max impact scope is now stored in config, update via update_config
108        self
109    }
110}
111
112#[async_trait]
113impl AgentPlugin for BaseEventResponsePlugin {
114    fn metadata(&self) -> &PluginMetadata {
115        &self.metadata
116    }
117
118    fn state(&self) -> PluginState {
119        self.state.clone()
120    }
121
122    async fn load(&mut self, _ctx: &PluginContext) -> PluginResult<()> {
123        self.state = PluginState::Loading;
124        // Perform resource allocation if needed
125        self.state = PluginState::Loaded;
126        Ok(())
127    }
128
129    async fn init_plugin(&mut self) -> PluginResult<()> {
130        // Perform initialization logic
131        Ok(())
132    }
133
134    async fn start(&mut self) -> PluginResult<()> {
135        self.state = PluginState::Running;
136        Ok(())
137    }
138
139    async fn stop(&mut self) -> PluginResult<()> {
140        self.state = PluginState::Paused;
141        Ok(())
142    }
143
144    async fn unload(&mut self) -> PluginResult<()> {
145        self.state = PluginState::Unloaded;
146        // Release resources if needed
147        Ok(())
148    }
149
150    async fn execute(&mut self, input: String) -> PluginResult<String> {
151        // Parse input as Event
152        let mut event: Event = serde_json::from_str(&input)?;
153
154        // Check if we can handle this event
155        if !self.can_handle(&event) {
156            return Err(anyhow::anyhow!("Cannot handle this event type"));
157        }
158
159        // Handle the event
160        event.update_status(EventStatus::Processing);
161        let processed_event = self.handle_event(event).await?;
162
163        // Return the result as JSON
164        processed_event.to_json().map_err(|e| anyhow::anyhow!(e))
165    }
166
167    fn stats(&self) -> HashMap<String, serde_json::Value> {
168        HashMap::new() // Implement stats collection if needed
169    }
170
171    fn as_any(&self) -> &dyn Any {
172        self
173    }
174
175    fn as_any_mut(&mut self) -> &mut dyn Any {
176        self
177    }
178
179    fn into_any(self: Box<Self>) -> Box<dyn Any> {
180        self
181    }
182}
183
184#[async_trait]
185impl EventResponsePlugin for BaseEventResponsePlugin {
186    fn config(&self) -> &EventResponseConfig {
187        panic!("config() should be implemented by concrete plugin");
188    }
189
190    async fn update_config(&mut self, config: EventResponseConfig) -> PluginResult<()> {
191        let mut current_config = self.config.write().await;
192        *current_config = config;
193        Ok(())
194    }
195
196    fn can_handle(&self, event: &Event) -> bool {
197        // Use cached handled_event_types for synchronous access
198        self.handled_event_types.contains(&event.event_type)
199    }
200
201    async fn handle_event(&mut self, mut event: Event) -> PluginResult<Event> {
202        // Execute the response workflow
203        let workflow_result = self.execute_workflow(&event).await?;
204
205        // Update event status based on workflow result
206        event.update_status(EventStatus::Resolved);
207
208        // Add workflow result to event data
209        event.data["workflow_result"] = serde_json::json!(workflow_result);
210
211        Ok(event)
212    }
213
214    async fn execute_workflow(&self, event: &Event) -> PluginResult<HashMap<String, String>> {
215        // Default workflow implementation - to be overridden by concrete plugins
216        let mut result = HashMap::new();
217        result.insert("status".to_string(), "handled".to_string());
218        result.insert(
219            "message".to_string(),
220            format!("Event handled by default workflow: {:?}", event.event_type),
221        );
222        Ok(result)
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::secretary::monitoring::event::*;
230
231    #[test]
232    fn test_base_plugin_creation() {
233        let plugin = BaseEventResponsePlugin::new(
234            "test-plugin",
235            "Test Plugin",
236            vec![EventType::ServerFault],
237            vec!["step1".to_string(), "step2".to_string()],
238        );
239
240        assert_eq!(plugin.metadata().id, "test-plugin");
241        assert_eq!(plugin.metadata().name, "Test Plugin");
242        assert_eq!(plugin.state(), PluginState::Unloaded);
243    }
244
245    #[tokio::test]
246    async fn test_can_handle_event() {
247        let plugin = BaseEventResponsePlugin::new(
248            "test-plugin",
249            "Test Plugin",
250            vec![EventType::ServerFault, EventType::ServiceException],
251            vec![],
252        );
253
254        let server_fault_event = Event::new(
255            EventType::ServerFault,
256            EventPriority::High,
257            ImpactScope::Instance("server-01".to_string()),
258            "monitoring".to_string(),
259            "Server down".to_string(),
260            serde_json::Value::Null,
261        );
262
263        let network_attack_event = Event::new(
264            EventType::NetworkAttack,
265            EventPriority::Emergency,
266            ImpactScope::System,
267            "ids".to_string(),
268            "DDoS attack".to_string(),
269            serde_json::Value::Null,
270        );
271
272        assert!(plugin.can_handle(&server_fault_event));
273        assert!(!plugin.can_handle(&network_attack_event));
274    }
275}