use crate::core::service::ServiceError;
use crate::core::skill_manager::SkillDefinition;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, info, warn};
type EventHandlersMap = HashMap<String, Vec<Arc<dyn EventHandler>>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SkillEvent {
SkillRegistered {
skill_id: String,
skill: Box<SkillDefinition>,
},
SkillUpdated {
skill_id: String,
changes: SkillUpdate,
},
SkillUnregistered { skill_id: String },
SkillReloaded {
skill_id: String,
success: bool,
error_message: Option<String>,
},
SkillValidationFailed {
skill_id: String,
errors: Vec<String>,
},
HotReloadEnabled { config: HotReloadConfig },
HotReloadDisabled,
SkillEnabled { skill_id: String },
SkillDisabled { skill_id: String },
Custom {
event_type: String,
data: serde_json::Value,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SkillUpdate {
pub name: Option<String>,
pub description: Option<String>,
pub version: Option<String>,
pub enabled: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HotReloadConfig {
pub watch_paths: Vec<String>,
pub debounce_ms: u64,
pub auto_reload: bool,
pub max_concurrent_reloads: usize,
}
#[async_trait]
pub trait EventHandler: Send + Sync {
async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError>;
}
pub struct EventBus {
sender: broadcast::Sender<SkillEvent>,
handlers: Arc<RwLock<EventHandlersMap>>,
event_history: Arc<RwLock<Vec<(SkillEvent, std::time::Instant)>>>,
max_history_size: usize,
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl EventBus {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(1000);
Self {
sender,
handlers: Arc::new(RwLock::new(HashMap::new())),
event_history: Arc::new(RwLock::new(Vec::new())),
max_history_size: 100,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<SkillEvent> {
self.sender.subscribe()
}
pub async fn register_handler<H: EventHandler + 'static>(
&self,
event_type: &str,
handler: H,
) -> Result<(), ServiceError> {
let mut handlers = self.handlers.write().await;
let handler_arc = Arc::new(handler) as Arc<dyn EventHandler>;
handlers
.entry(event_type.to_string())
.or_insert_with(Vec::new)
.push(handler_arc);
info!("Registered event handler for event type: {}", event_type);
Ok(())
}
pub async fn unregister_handler(
&self,
event_type: &str,
_handler_id: &str,
) -> Result<(), ServiceError> {
let mut handlers = self.handlers.write().await;
if let Some(handler_list) = handlers.get_mut(event_type) {
handler_list.clear();
info!("Unregistered all handlers for event type: {}", event_type);
}
Ok(())
}
pub async fn publish_event(&self, event: SkillEvent) -> Result<usize, ServiceError> {
{
let mut history = self.event_history.write().await;
history.push((event.clone(), std::time::Instant::now()));
if history.len() > self.max_history_size {
history.truncate(self.max_history_size);
}
}
let subscriber_count = self.sender.send(event.clone()).unwrap_or(0);
self.notify_handlers(&event).await;
debug!(
"Published event: subscriber_count={}, handlers_notified",
subscriber_count
);
Ok(subscriber_count)
}
async fn notify_handlers(&self, event: &SkillEvent) {
let handlers = self.handlers.read().await;
let event_type = match event {
SkillEvent::SkillRegistered { .. } => "skill:registered",
SkillEvent::SkillUpdated { .. } => "skill:updated",
SkillEvent::SkillUnregistered { .. } => "skill:unregistered",
SkillEvent::SkillReloaded { .. } => "skill:reloaded",
SkillEvent::SkillValidationFailed { .. } => "skill:validation:failed",
SkillEvent::HotReloadEnabled { .. } => "hot-reload:enabled",
SkillEvent::HotReloadDisabled => "hot-reload:disabled",
SkillEvent::SkillEnabled { .. } => "skill:enabled",
SkillEvent::SkillDisabled { .. } => "skill:disabled",
SkillEvent::Custom { event_type, .. } => event_type.as_str(),
}
.to_string();
if let Some(event_handlers) = handlers.get(&event_type) {
for handler in event_handlers {
match handler.handle_event(event.clone()).await {
Ok(_) => {
debug!("Event handler processed event successfully");
}
Err(e) => {
warn!("Event handler failed to process event: {}", e);
}
}
}
}
}
pub async fn get_event_history(&self) -> Vec<(SkillEvent, std::time::Instant)> {
self.event_history.read().await.clone()
}
pub async fn clear_event_history(&self) {
self.event_history.write().await.clear();
}
pub async fn get_registered_handlers(&self) -> HashMap<String, usize> {
let handlers = self.handlers.read().await;
handlers.iter().map(|(k, v)| (k.clone(), v.len())).collect()
}
}
pub struct LoggingEventHandler;
impl Default for LoggingEventHandler {
fn default() -> Self {
Self::new()
}
}
impl LoggingEventHandler {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl EventHandler for LoggingEventHandler {
async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError> {
match event {
SkillEvent::SkillRegistered { skill_id, skill } => {
info!("[OK] Skill registered: {} ({})", skill.name, skill_id);
}
SkillEvent::SkillUpdated { skill_id, .. } => {
info!("Skill updated: {}", skill_id);
}
SkillEvent::SkillUnregistered { skill_id } => {
info!("Skill unregistered: {}", skill_id);
}
SkillEvent::SkillReloaded {
skill_id,
success,
error_message,
} => {
if success {
info!("Skill reloaded successfully: {}", skill_id);
} else {
warn!(
"[ERROR] Skill reload failed: {} - {:?}",
skill_id, error_message
);
}
}
SkillEvent::SkillValidationFailed { skill_id, errors } => {
warn!(
"[ERROR] Skill validation failed: {} - {} errors",
skill_id,
errors.len()
);
}
SkillEvent::HotReloadEnabled { config } => {
info!(
"[INFO] Hot reload enabled for {} paths",
config.watch_paths.len()
);
}
SkillEvent::HotReloadDisabled => {
info!("Hot reload disabled");
}
SkillEvent::SkillEnabled { skill_id } => {
info!("[OK] Skill enabled: {}", skill_id);
}
SkillEvent::SkillDisabled { skill_id } => {
info!("Skill disabled: {}", skill_id);
}
SkillEvent::Custom { event_type, data } => {
debug!("Custom event: {} - {:?}", event_type, data);
}
}
Ok(())
}
}
pub struct MetricsEventHandler {
event_counts: Arc<RwLock<HashMap<String, usize>>>,
}
impl Default for MetricsEventHandler {
fn default() -> Self {
Self::new()
}
}
impl MetricsEventHandler {
pub fn new() -> Self {
Self {
event_counts: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn get_event_counts(&self) -> HashMap<String, usize> {
self.event_counts.read().await.clone()
}
}
#[async_trait]
impl EventHandler for MetricsEventHandler {
async fn handle_event(&self, event: SkillEvent) -> Result<(), ServiceError> {
let event_type = match event {
SkillEvent::SkillRegistered { .. } => "skill:registered".to_string(),
SkillEvent::SkillUpdated { .. } => "skill:updated".to_string(),
SkillEvent::SkillUnregistered { .. } => "skill:unregistered".to_string(),
SkillEvent::SkillReloaded { .. } => "skill:reloaded".to_string(),
SkillEvent::SkillValidationFailed { .. } => "skill:validation:failed".to_string(),
SkillEvent::HotReloadEnabled { .. } => "hot-reload:enabled".to_string(),
SkillEvent::HotReloadDisabled => "hot-reload:disabled".to_string(),
SkillEvent::SkillEnabled { .. } => "skill:enabled".to_string(),
SkillEvent::SkillDisabled { .. } => "skill:disabled".to_string(),
SkillEvent::Custom { event_type, .. } => event_type.clone(),
};
let mut counts = self.event_counts.write().await;
*counts.entry(event_type).or_insert(0) += 1;
Ok(())
}
}
impl EventBus {
pub async fn publish_skill_registered(
&self,
skill_id: String,
skill: SkillDefinition,
) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::SkillRegistered {
skill_id,
skill: Box::new(skill),
})
.await
}
pub async fn publish_skill_updated(
&self,
skill_id: String,
changes: SkillUpdate,
) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::SkillUpdated { skill_id, changes })
.await
}
pub async fn publish_skill_unregistered(
&self,
skill_id: String,
) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::SkillUnregistered { skill_id })
.await
}
pub async fn publish_skill_reloaded(
&self,
skill_id: String,
success: bool,
error_message: Option<String>,
) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::SkillReloaded {
skill_id,
success,
error_message,
})
.await
}
pub async fn publish_skill_validation_failed(
&self,
skill_id: String,
errors: Vec<String>,
) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::SkillValidationFailed { skill_id, errors })
.await
}
pub async fn publish_hot_reload_enabled(
&self,
config: HotReloadConfig,
) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::HotReloadEnabled { config })
.await
}
pub async fn publish_hot_reload_disabled(&self) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::HotReloadDisabled).await
}
pub async fn publish_skill_enabled(&self, skill_id: String) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::SkillEnabled { skill_id })
.await
}
pub async fn publish_skill_disabled(&self, skill_id: String) -> Result<usize, ServiceError> {
self.publish_event(SkillEvent::SkillDisabled { skill_id })
.await
}
}