Skip to main content

fastskill_core/events/
event_bus.rs

1//! Event bus for skill lifecycle events
2
3use crate::core::service::ServiceError;
4use crate::core::skill_manager::SkillDefinition;
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::{broadcast, RwLock};
10use tracing::{debug, info, warn};
11
12/// Type alias for event handlers map to reduce complexity
13type EventHandlersMap = HashMap<String, Vec<Arc<dyn EventHandler>>>;
14
15/// Skill event types
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub enum SkillEvent {
18    /// Skill registered
19    SkillRegistered {
20        skill_id: String,
21        skill: Box<SkillDefinition>,
22    },
23
24    /// Skill updated
25    SkillUpdated {
26        skill_id: String,
27        changes: SkillUpdate,
28    },
29
30    /// Skill unregistered
31    SkillUnregistered { skill_id: String },
32
33    /// Skill reloaded
34    SkillReloaded {
35        skill_id: String,
36        success: bool,
37        error_message: Option<String>,
38    },
39
40    /// Skill validation failed
41    SkillValidationFailed {
42        skill_id: String,
43        errors: Vec<String>,
44    },
45
46    /// Hot reload enabled
47    HotReloadEnabled { config: HotReloadConfig },
48
49    /// Hot reload disabled
50    HotReloadDisabled,
51
52    /// Skill enabled
53    SkillEnabled { skill_id: String },
54
55    /// Skill disabled
56    SkillDisabled { skill_id: String },
57
58    /// Custom event
59    Custom {
60        event_type: String,
61        data: serde_json::Value,
62    },
63}
64
65/// Skill update information
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct SkillUpdate {
68    pub name: Option<String>,
69    pub description: Option<String>,
70    pub version: Option<String>,
71    pub enabled: Option<bool>,
72}
73
74/// Hot reload configuration
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct HotReloadConfig {
77    pub watch_paths: Vec<String>,
78    pub debounce_ms: u64,
79    pub auto_reload: bool,
80    pub max_concurrent_reloads: usize,
81}
82
83/// Event handler trait
84#[async_trait]
85pub trait EventHandler: Send + Sync {
86    async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError>;
87}
88
89/// Event bus for managing skill lifecycle events
90pub struct EventBus {
91    /// Broadcast sender for events
92    sender: broadcast::Sender<SkillEvent>,
93
94    /// Event handlers registry
95    handlers: Arc<RwLock<EventHandlersMap>>,
96
97    /// Event history for debugging
98    event_history: Arc<RwLock<Vec<(SkillEvent, std::time::Instant)>>>,
99
100    /// Maximum history size
101    max_history_size: usize,
102}
103
104impl Default for EventBus {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl EventBus {
111    /// Create a new event bus
112    pub fn new() -> Self {
113        let (sender, _) = broadcast::channel(1000); // Buffer for 1000 events
114
115        Self {
116            sender,
117            handlers: Arc::new(RwLock::new(HashMap::new())),
118            event_history: Arc::new(RwLock::new(Vec::new())),
119            max_history_size: 100,
120        }
121    }
122
123    /// Subscribe to events
124    pub fn subscribe(&self) -> broadcast::Receiver<SkillEvent> {
125        self.sender.subscribe()
126    }
127
128    /// Register an event handler for a specific event type
129    pub async fn register_handler<H: EventHandler + 'static>(
130        &self,
131        event_type: &str,
132        handler: H,
133    ) -> Result<(), ServiceError> {
134        let mut handlers = self.handlers.write().await;
135
136        let handler_arc = Arc::new(handler) as Arc<dyn EventHandler>;
137
138        handlers
139            .entry(event_type.to_string())
140            .or_insert_with(Vec::new)
141            .push(handler_arc);
142
143        info!("Registered event handler for event type: {}", event_type);
144
145        Ok(())
146    }
147
148    /// Unregister an event handler
149    pub async fn unregister_handler(
150        &self,
151        event_type: &str,
152        _handler_id: &str,
153    ) -> Result<(), ServiceError> {
154        let mut handlers = self.handlers.write().await;
155
156        if let Some(handler_list) = handlers.get_mut(event_type) {
157            // For now, just remove all handlers with this event type
158            // In a more sophisticated implementation, we'd track handler IDs
159            handler_list.clear();
160            info!("Unregistered all handlers for event type: {}", event_type);
161        }
162
163        Ok(())
164    }
165
166    /// Publish an event
167    pub async fn publish_event(&self, event: SkillEvent) -> Result<usize, ServiceError> {
168        // Add to history
169        {
170            let mut history = self.event_history.write().await;
171
172            history.push((event.clone(), std::time::Instant::now()));
173
174            // Trim history if too large
175            if history.len() > self.max_history_size {
176                history.truncate(self.max_history_size);
177            }
178        }
179
180        // Send to all subscribers
181        let subscriber_count = self.sender.send(event.clone()).unwrap_or(0);
182
183        // Notify registered handlers
184        self.notify_handlers(&event).await;
185
186        debug!(
187            "Published event: subscriber_count={}, handlers_notified",
188            subscriber_count
189        );
190
191        Ok(subscriber_count)
192    }
193
194    /// Notify registered handlers about an event
195    async fn notify_handlers(&self, event: &SkillEvent) {
196        let handlers = self.handlers.read().await;
197
198        // Determine event type for handler lookup
199        let event_type = match event {
200            SkillEvent::SkillRegistered { .. } => "skill:registered",
201            SkillEvent::SkillUpdated { .. } => "skill:updated",
202            SkillEvent::SkillUnregistered { .. } => "skill:unregistered",
203            SkillEvent::SkillReloaded { .. } => "skill:reloaded",
204            SkillEvent::SkillValidationFailed { .. } => "skill:validation:failed",
205            SkillEvent::HotReloadEnabled { .. } => "hot-reload:enabled",
206            SkillEvent::HotReloadDisabled => "hot-reload:disabled",
207            SkillEvent::SkillEnabled { .. } => "skill:enabled",
208            SkillEvent::SkillDisabled { .. } => "skill:disabled",
209            SkillEvent::Custom { event_type, .. } => event_type.as_str(),
210        }
211        .to_string();
212
213        if let Some(event_handlers) = handlers.get(&event_type) {
214            for handler in event_handlers {
215                match handler.handle_event(event.clone()).await {
216                    Ok(_) => {
217                        debug!("Event handler processed event successfully");
218                    }
219                    Err(e) => {
220                        warn!("Event handler failed to process event: {}", e);
221                    }
222                }
223            }
224        }
225    }
226
227    /// Get event history
228    pub async fn get_event_history(&self) -> Vec<(SkillEvent, std::time::Instant)> {
229        self.event_history.read().await.clone()
230    }
231
232    /// Clear event history
233    pub async fn clear_event_history(&self) {
234        self.event_history.write().await.clear();
235    }
236
237    /// Get registered event handlers
238    pub async fn get_registered_handlers(&self) -> HashMap<String, usize> {
239        let handlers = self.handlers.read().await;
240        handlers.iter().map(|(k, v)| (k.clone(), v.len())).collect()
241    }
242}
243
244/// Default event handler implementations
245/// Logging event handler - logs all events
246pub struct LoggingEventHandler;
247
248impl Default for LoggingEventHandler {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254impl LoggingEventHandler {
255    pub fn new() -> Self {
256        Self
257    }
258}
259
260#[async_trait]
261impl EventHandler for LoggingEventHandler {
262    async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError> {
263        match event {
264            SkillEvent::SkillRegistered { skill_id, skill } => {
265                info!("[OK] Skill registered: {} ({})", skill.name, skill_id);
266            }
267            SkillEvent::SkillUpdated { skill_id, .. } => {
268                info!("Skill updated: {}", skill_id);
269            }
270            SkillEvent::SkillUnregistered { skill_id } => {
271                info!("Skill unregistered: {}", skill_id);
272            }
273            SkillEvent::SkillReloaded {
274                skill_id,
275                success,
276                error_message,
277            } => {
278                if success {
279                    info!("Skill reloaded successfully: {}", skill_id);
280                } else {
281                    warn!(
282                        "[ERROR] Skill reload failed: {} - {:?}",
283                        skill_id, error_message
284                    );
285                }
286            }
287            SkillEvent::SkillValidationFailed { skill_id, errors } => {
288                warn!(
289                    "[ERROR] Skill validation failed: {} - {} errors",
290                    skill_id,
291                    errors.len()
292                );
293            }
294            SkillEvent::HotReloadEnabled { config } => {
295                info!(
296                    "[INFO] Hot reload enabled for {} paths",
297                    config.watch_paths.len()
298                );
299            }
300            SkillEvent::HotReloadDisabled => {
301                info!("Hot reload disabled");
302            }
303            SkillEvent::SkillEnabled { skill_id } => {
304                info!("[OK] Skill enabled: {}", skill_id);
305            }
306            SkillEvent::SkillDisabled { skill_id } => {
307                info!("Skill disabled: {}", skill_id);
308            }
309            SkillEvent::Custom { event_type, data } => {
310                debug!("Custom event: {} - {:?}", event_type, data);
311            }
312        }
313
314        Ok(())
315    }
316}
317
318/// Metrics event handler - tracks event statistics
319pub struct MetricsEventHandler {
320    event_counts: Arc<RwLock<HashMap<String, usize>>>,
321}
322
323impl Default for MetricsEventHandler {
324    fn default() -> Self {
325        Self::new()
326    }
327}
328
329impl MetricsEventHandler {
330    pub fn new() -> Self {
331        Self {
332            event_counts: Arc::new(RwLock::new(HashMap::new())),
333        }
334    }
335
336    /// Get event statistics
337    pub async fn get_event_counts(&self) -> HashMap<String, usize> {
338        self.event_counts.read().await.clone()
339    }
340}
341
342#[async_trait]
343impl EventHandler for MetricsEventHandler {
344    async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError> {
345        let event_type = match event {
346            SkillEvent::SkillRegistered { .. } => "skill:registered".to_string(),
347            SkillEvent::SkillUpdated { .. } => "skill:updated".to_string(),
348            SkillEvent::SkillUnregistered { .. } => "skill:unregistered".to_string(),
349            SkillEvent::SkillReloaded { .. } => "skill:reloaded".to_string(),
350            SkillEvent::SkillValidationFailed { .. } => "skill:validation:failed".to_string(),
351            SkillEvent::HotReloadEnabled { .. } => "hot-reload:enabled".to_string(),
352            SkillEvent::HotReloadDisabled => "hot-reload:disabled".to_string(),
353            SkillEvent::SkillEnabled { .. } => "skill:enabled".to_string(),
354            SkillEvent::SkillDisabled { .. } => "skill:disabled".to_string(),
355            SkillEvent::Custom { event_type, .. } => event_type.clone(),
356        };
357
358        let mut counts = self.event_counts.write().await;
359        *counts.entry(event_type).or_insert(0) += 1;
360
361        Ok(())
362    }
363}
364
365/// Convenience methods for publishing common events
366impl EventBus {
367    /// Publish skill registered event
368    pub async fn publish_skill_registered(
369        &self,
370        skill_id: String,
371        skill: SkillDefinition,
372    ) -> Result<usize, ServiceError> {
373        self.publish_event(SkillEvent::SkillRegistered {
374            skill_id,
375            skill: Box::new(skill),
376        })
377        .await
378    }
379
380    /// Publish skill updated event
381    pub async fn publish_skill_updated(
382        &self,
383        skill_id: String,
384        changes: SkillUpdate,
385    ) -> Result<usize, ServiceError> {
386        self.publish_event(SkillEvent::SkillUpdated { skill_id, changes })
387            .await
388    }
389
390    /// Publish skill unregistered event
391    pub async fn publish_skill_unregistered(
392        &self,
393        skill_id: String,
394    ) -> Result<usize, ServiceError> {
395        self.publish_event(SkillEvent::SkillUnregistered { skill_id })
396            .await
397    }
398
399    /// Publish skill reloaded event
400    pub async fn publish_skill_reloaded(
401        &self,
402        skill_id: String,
403        success: bool,
404        error_message: Option<String>,
405    ) -> Result<usize, ServiceError> {
406        self.publish_event(SkillEvent::SkillReloaded {
407            skill_id,
408            success,
409            error_message,
410        })
411        .await
412    }
413
414    /// Publish skill validation failed event
415    pub async fn publish_skill_validation_failed(
416        &self,
417        skill_id: String,
418        errors: Vec<String>,
419    ) -> Result<usize, ServiceError> {
420        self.publish_event(SkillEvent::SkillValidationFailed { skill_id, errors })
421            .await
422    }
423
424    /// Publish hot reload enabled event
425    pub async fn publish_hot_reload_enabled(
426        &self,
427        config: HotReloadConfig,
428    ) -> Result<usize, ServiceError> {
429        self.publish_event(SkillEvent::HotReloadEnabled { config })
430            .await
431    }
432
433    /// Publish hot reload disabled event
434    pub async fn publish_hot_reload_disabled(&self) -> Result<usize, ServiceError> {
435        self.publish_event(SkillEvent::HotReloadDisabled).await
436    }
437
438    /// Publish skill enabled event
439    pub async fn publish_skill_enabled(&self, skill_id: String) -> Result<usize, ServiceError> {
440        self.publish_event(SkillEvent::SkillEnabled { skill_id })
441            .await
442    }
443
444    /// Publish skill disabled event
445    pub async fn publish_skill_disabled(&self, skill_id: String) -> Result<usize, ServiceError> {
446        self.publish_event(SkillEvent::SkillDisabled { skill_id })
447            .await
448    }
449}