1use crate::core::service::ServiceError;
4use crate::core::skill_manager::SkillDefinition;
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::{broadcast, RwLock};
10use tracing::{debug, info, warn};
11
12type EventHandlersMap = HashMap<String, Vec<Arc<dyn EventHandler>>>;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub enum SkillEvent {
18 SkillRegistered {
20 skill_id: String,
21 skill: Box<SkillDefinition>,
22 },
23
24 SkillUpdated {
26 skill_id: String,
27 changes: SkillUpdate,
28 },
29
30 SkillUnregistered { skill_id: String },
32
33 SkillReloaded {
35 skill_id: String,
36 success: bool,
37 error_message: Option<String>,
38 },
39
40 SkillValidationFailed {
42 skill_id: String,
43 errors: Vec<String>,
44 },
45
46 HotReloadEnabled { config: HotReloadConfig },
48
49 HotReloadDisabled,
51
52 SkillEnabled { skill_id: String },
54
55 SkillDisabled { skill_id: String },
57
58 Custom {
60 event_type: String,
61 data: serde_json::Value,
62 },
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct SkillUpdate {
68 pub name: Option<String>,
69 pub description: Option<String>,
70 pub version: Option<String>,
71 pub enabled: Option<bool>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct HotReloadConfig {
77 pub watch_paths: Vec<String>,
78 pub debounce_ms: u64,
79 pub auto_reload: bool,
80 pub max_concurrent_reloads: usize,
81}
82
83#[async_trait]
85pub trait EventHandler: Send + Sync {
86 async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError>;
87}
88
89pub struct EventBus {
91 sender: broadcast::Sender<SkillEvent>,
93
94 handlers: Arc<RwLock<EventHandlersMap>>,
96
97 event_history: Arc<RwLock<Vec<(SkillEvent, std::time::Instant)>>>,
99
100 max_history_size: usize,
102}
103
104impl Default for EventBus {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl EventBus {
111 pub fn new() -> Self {
113 let (sender, _) = broadcast::channel(1000); Self {
116 sender,
117 handlers: Arc::new(RwLock::new(HashMap::new())),
118 event_history: Arc::new(RwLock::new(Vec::new())),
119 max_history_size: 100,
120 }
121 }
122
123 pub fn subscribe(&self) -> broadcast::Receiver<SkillEvent> {
125 self.sender.subscribe()
126 }
127
128 pub async fn register_handler<H: EventHandler + 'static>(
130 &self,
131 event_type: &str,
132 handler: H,
133 ) -> Result<(), ServiceError> {
134 let mut handlers = self.handlers.write().await;
135
136 let handler_arc = Arc::new(handler) as Arc<dyn EventHandler>;
137
138 handlers
139 .entry(event_type.to_string())
140 .or_insert_with(Vec::new)
141 .push(handler_arc);
142
143 info!("Registered event handler for event type: {}", event_type);
144
145 Ok(())
146 }
147
148 pub async fn unregister_handler(
150 &self,
151 event_type: &str,
152 _handler_id: &str,
153 ) -> Result<(), ServiceError> {
154 let mut handlers = self.handlers.write().await;
155
156 if let Some(handler_list) = handlers.get_mut(event_type) {
157 handler_list.clear();
160 info!("Unregistered all handlers for event type: {}", event_type);
161 }
162
163 Ok(())
164 }
165
166 pub async fn publish_event(&self, event: SkillEvent) -> Result<usize, ServiceError> {
168 {
170 let mut history = self.event_history.write().await;
171
172 history.push((event.clone(), std::time::Instant::now()));
173
174 if history.len() > self.max_history_size {
176 history.truncate(self.max_history_size);
177 }
178 }
179
180 let subscriber_count = self.sender.send(event.clone()).unwrap_or(0);
182
183 self.notify_handlers(&event).await;
185
186 debug!(
187 "Published event: subscriber_count={}, handlers_notified",
188 subscriber_count
189 );
190
191 Ok(subscriber_count)
192 }
193
194 async fn notify_handlers(&self, event: &SkillEvent) {
196 let handlers = self.handlers.read().await;
197
198 let event_type = match event {
200 SkillEvent::SkillRegistered { .. } => "skill:registered",
201 SkillEvent::SkillUpdated { .. } => "skill:updated",
202 SkillEvent::SkillUnregistered { .. } => "skill:unregistered",
203 SkillEvent::SkillReloaded { .. } => "skill:reloaded",
204 SkillEvent::SkillValidationFailed { .. } => "skill:validation:failed",
205 SkillEvent::HotReloadEnabled { .. } => "hot-reload:enabled",
206 SkillEvent::HotReloadDisabled => "hot-reload:disabled",
207 SkillEvent::SkillEnabled { .. } => "skill:enabled",
208 SkillEvent::SkillDisabled { .. } => "skill:disabled",
209 SkillEvent::Custom { event_type, .. } => event_type.as_str(),
210 }
211 .to_string();
212
213 if let Some(event_handlers) = handlers.get(&event_type) {
214 for handler in event_handlers {
215 match handler.handle_event(event.clone()).await {
216 Ok(_) => {
217 debug!("Event handler processed event successfully");
218 }
219 Err(e) => {
220 warn!("Event handler failed to process event: {}", e);
221 }
222 }
223 }
224 }
225 }
226
227 pub async fn get_event_history(&self) -> Vec<(SkillEvent, std::time::Instant)> {
229 self.event_history.read().await.clone()
230 }
231
232 pub async fn clear_event_history(&self) {
234 self.event_history.write().await.clear();
235 }
236
237 pub async fn get_registered_handlers(&self) -> HashMap<String, usize> {
239 let handlers = self.handlers.read().await;
240 handlers.iter().map(|(k, v)| (k.clone(), v.len())).collect()
241 }
242}
243
244pub struct LoggingEventHandler;
247
248impl Default for LoggingEventHandler {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254impl LoggingEventHandler {
255 pub fn new() -> Self {
256 Self
257 }
258}
259
260#[async_trait]
261impl EventHandler for LoggingEventHandler {
262 async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError> {
263 match event {
264 SkillEvent::SkillRegistered { skill_id, skill } => {
265 info!("[OK] Skill registered: {} ({})", skill.name, skill_id);
266 }
267 SkillEvent::SkillUpdated { skill_id, .. } => {
268 info!("Skill updated: {}", skill_id);
269 }
270 SkillEvent::SkillUnregistered { skill_id } => {
271 info!("Skill unregistered: {}", skill_id);
272 }
273 SkillEvent::SkillReloaded {
274 skill_id,
275 success,
276 error_message,
277 } => {
278 if success {
279 info!("Skill reloaded successfully: {}", skill_id);
280 } else {
281 warn!(
282 "[ERROR] Skill reload failed: {} - {:?}",
283 skill_id, error_message
284 );
285 }
286 }
287 SkillEvent::SkillValidationFailed { skill_id, errors } => {
288 warn!(
289 "[ERROR] Skill validation failed: {} - {} errors",
290 skill_id,
291 errors.len()
292 );
293 }
294 SkillEvent::HotReloadEnabled { config } => {
295 info!(
296 "[INFO] Hot reload enabled for {} paths",
297 config.watch_paths.len()
298 );
299 }
300 SkillEvent::HotReloadDisabled => {
301 info!("Hot reload disabled");
302 }
303 SkillEvent::SkillEnabled { skill_id } => {
304 info!("[OK] Skill enabled: {}", skill_id);
305 }
306 SkillEvent::SkillDisabled { skill_id } => {
307 info!("Skill disabled: {}", skill_id);
308 }
309 SkillEvent::Custom { event_type, data } => {
310 debug!("Custom event: {} - {:?}", event_type, data);
311 }
312 }
313
314 Ok(())
315 }
316}
317
318pub struct MetricsEventHandler {
320 event_counts: Arc<RwLock<HashMap<String, usize>>>,
321}
322
323impl Default for MetricsEventHandler {
324 fn default() -> Self {
325 Self::new()
326 }
327}
328
329impl MetricsEventHandler {
330 pub fn new() -> Self {
331 Self {
332 event_counts: Arc::new(RwLock::new(HashMap::new())),
333 }
334 }
335
336 pub async fn get_event_counts(&self) -> HashMap<String, usize> {
338 self.event_counts.read().await.clone()
339 }
340}
341
342#[async_trait]
343impl EventHandler for MetricsEventHandler {
344 async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError> {
345 let event_type = match event {
346 SkillEvent::SkillRegistered { .. } => "skill:registered".to_string(),
347 SkillEvent::SkillUpdated { .. } => "skill:updated".to_string(),
348 SkillEvent::SkillUnregistered { .. } => "skill:unregistered".to_string(),
349 SkillEvent::SkillReloaded { .. } => "skill:reloaded".to_string(),
350 SkillEvent::SkillValidationFailed { .. } => "skill:validation:failed".to_string(),
351 SkillEvent::HotReloadEnabled { .. } => "hot-reload:enabled".to_string(),
352 SkillEvent::HotReloadDisabled => "hot-reload:disabled".to_string(),
353 SkillEvent::SkillEnabled { .. } => "skill:enabled".to_string(),
354 SkillEvent::SkillDisabled { .. } => "skill:disabled".to_string(),
355 SkillEvent::Custom { event_type, .. } => event_type.clone(),
356 };
357
358 let mut counts = self.event_counts.write().await;
359 *counts.entry(event_type).or_insert(0) += 1;
360
361 Ok(())
362 }
363}
364
365impl EventBus {
367 pub async fn publish_skill_registered(
369 &self,
370 skill_id: String,
371 skill: SkillDefinition,
372 ) -> Result<usize, ServiceError> {
373 self.publish_event(SkillEvent::SkillRegistered {
374 skill_id,
375 skill: Box::new(skill),
376 })
377 .await
378 }
379
380 pub async fn publish_skill_updated(
382 &self,
383 skill_id: String,
384 changes: SkillUpdate,
385 ) -> Result<usize, ServiceError> {
386 self.publish_event(SkillEvent::SkillUpdated { skill_id, changes })
387 .await
388 }
389
390 pub async fn publish_skill_unregistered(
392 &self,
393 skill_id: String,
394 ) -> Result<usize, ServiceError> {
395 self.publish_event(SkillEvent::SkillUnregistered { skill_id })
396 .await
397 }
398
399 pub async fn publish_skill_reloaded(
401 &self,
402 skill_id: String,
403 success: bool,
404 error_message: Option<String>,
405 ) -> Result<usize, ServiceError> {
406 self.publish_event(SkillEvent::SkillReloaded {
407 skill_id,
408 success,
409 error_message,
410 })
411 .await
412 }
413
414 pub async fn publish_skill_validation_failed(
416 &self,
417 skill_id: String,
418 errors: Vec<String>,
419 ) -> Result<usize, ServiceError> {
420 self.publish_event(SkillEvent::SkillValidationFailed { skill_id, errors })
421 .await
422 }
423
424 pub async fn publish_hot_reload_enabled(
426 &self,
427 config: HotReloadConfig,
428 ) -> Result<usize, ServiceError> {
429 self.publish_event(SkillEvent::HotReloadEnabled { config })
430 .await
431 }
432
433 pub async fn publish_hot_reload_disabled(&self) -> Result<usize, ServiceError> {
435 self.publish_event(SkillEvent::HotReloadDisabled).await
436 }
437
438 pub async fn publish_skill_enabled(&self, skill_id: String) -> Result<usize, ServiceError> {
440 self.publish_event(SkillEvent::SkillEnabled { skill_id })
441 .await
442 }
443
444 pub async fn publish_skill_disabled(&self, skill_id: String) -> Result<usize, ServiceError> {
446 self.publish_event(SkillEvent::SkillDisabled { skill_id })
447 .await
448 }
449}