use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::core::{PluginError, PluginResult};
pub type EventId = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum EventPriority {
Low = 1,
Normal = 2,
High = 3,
Critical = 4,
}
impl Default for EventPriority {
fn default() -> Self {
Self::Normal
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub id: EventId,
pub event_type: String,
pub source: Option<String>,
pub target: Option<Vec<String>>,
pub data: serde_json::Value,
pub priority: EventPriority,
pub timestamp: SystemTime,
pub metadata: HashMap<String, String>,
}
impl Event {
pub fn new(event_type: impl Into<String>, data: serde_json::Value) -> Self {
Self {
id: Self::generate_id(),
event_type: event_type.into(),
source: None,
target: None,
data,
priority: EventPriority::default(),
timestamp: SystemTime::now(),
metadata: HashMap::new(),
}
}
pub fn from_plugin(
event_type: impl Into<String>,
source: impl Into<String>,
data: serde_json::Value,
) -> Self {
Self {
id: Self::generate_id(),
event_type: event_type.into(),
source: Some(source.into()),
target: None,
data,
priority: EventPriority::default(),
timestamp: SystemTime::now(),
metadata: HashMap::new(),
}
}
pub fn with_priority(mut self, priority: EventPriority) -> Self {
self.priority = priority;
self
}
pub fn with_target(mut self, target: Vec<String>) -> Self {
self.target = Some(target);
self
}
pub fn with_single_target(mut self, target: impl Into<String>) -> Self {
self.target = Some(vec![target.into()]);
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn is_targeted_to(&self, plugin_name: &str) -> bool {
match &self.target {
None => true, Some(targets) => targets.contains(&plugin_name.to_string()),
}
}
fn generate_id() -> EventId {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_nanos() as EventId
}
}
#[async_trait]
pub trait EventHandler: Send + Sync {
async fn handle_event(&self, event: &Event) -> PluginResult<()>;
fn event_types(&self) -> Vec<String>;
fn priority(&self) -> EventPriority {
EventPriority::Normal
}
}
#[derive(Debug, Clone)]
pub struct HookEvent {
pub plugin_name: String,
pub hook_type: HookType,
pub result: Option<Result<(), PluginError>>,
pub context: serde_json::Value,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum HookType {
BeforeInitialize,
AfterInitialize,
BeforeExecute,
AfterExecute,
BeforeCleanup,
AfterCleanup,
OnError,
OnSuccess,
}
impl std::fmt::Display for HookType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HookType::BeforeInitialize => write!(f, "before_initialize"),
HookType::AfterInitialize => write!(f, "after_initialize"),
HookType::BeforeExecute => write!(f, "before_execute"),
HookType::AfterExecute => write!(f, "after_execute"),
HookType::BeforeCleanup => write!(f, "before_cleanup"),
HookType::AfterCleanup => write!(f, "after_cleanup"),
HookType::OnError => write!(f, "on_error"),
HookType::OnSuccess => write!(f, "on_success"),
}
}
}
type EventHandlers = Arc<RwLock<HashMap<String, Vec<Arc<dyn EventHandler>>>>>;
pub struct EventBus {
sender: broadcast::Sender<Event>,
handlers: EventHandlers,
history: Arc<RwLock<Vec<Event>>>,
max_history: usize,
}
impl EventBus {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(1000);
Self {
sender,
handlers: Arc::new(RwLock::new(HashMap::new())),
history: Arc::new(RwLock::new(Vec::new())),
max_history: 1000,
}
}
pub fn with_capacity(capacity: usize, max_history: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
handlers: Arc::new(RwLock::new(HashMap::new())),
history: Arc::new(RwLock::new(Vec::new())),
max_history,
}
}
pub async fn publish(&self, event: Event) -> PluginResult<()> {
{
let mut history = self.history.write().unwrap();
history.push(event.clone());
if history.len() > self.max_history {
history.remove(0);
}
}
self.call_handlers(&event).await?;
if let Err(e) = self.sender.send(event) {
return Err(PluginError::EventError(format!(
"Failed to broadcast event: {e}"
)));
}
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
pub fn register_handler(&self, handler: Arc<dyn EventHandler>) {
let mut handlers = self.handlers.write().unwrap();
for event_type in handler.event_types() {
handlers
.entry(event_type)
.or_default()
.push(handler.clone());
}
}
pub fn unregister_handlers(&self, event_types: &[String]) {
let mut handlers = self.handlers.write().unwrap();
for event_type in event_types {
handlers.remove(event_type);
}
}
pub fn get_history(&self) -> Vec<Event> {
self.history.read().unwrap().clone()
}
pub fn get_history_by_type(&self, event_type: &str) -> Vec<Event> {
self.history
.read()
.unwrap()
.iter()
.filter(|e| e.event_type == event_type)
.cloned()
.collect()
}
pub fn clear_history(&self) {
self.history.write().unwrap().clear();
}
async fn call_handlers(&self, event: &Event) -> PluginResult<()> {
let handlers = {
let handlers_map = self.handlers.read().unwrap();
handlers_map
.get(&event.event_type)
.cloned()
.unwrap_or_default()
};
let mut sorted_handlers = handlers;
sorted_handlers.sort_by_key(|b| std::cmp::Reverse(b.priority()));
for handler in sorted_handlers {
if let Err(e) = handler.handle_event(event).await {
eprintln!("Event handler error for {}: {}", event.event_type, e);
}
}
Ok(())
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
pub mod system_events {
use super::*;
pub const PLUGIN_REGISTERED: &str = "plugin.registered";
pub const PLUGIN_UNREGISTERED: &str = "plugin.unregistered";
pub const PLUGIN_INITIALIZED: &str = "plugin.initialized";
pub const PLUGIN_EXECUTION_STARTED: &str = "plugin.execution.started";
pub const PLUGIN_EXECUTION_COMPLETED: &str = "plugin.execution.completed";
pub const PLUGIN_EXECUTION_FAILED: &str = "plugin.execution.failed";
pub const PLUGIN_CLEANUP_STARTED: &str = "plugin.cleanup.started";
pub const PLUGIN_CLEANUP_COMPLETED: &str = "plugin.cleanup.completed";
pub const PIPELINE_STARTED: &str = "pipeline.started";
pub const PIPELINE_COMPLETED: &str = "pipeline.completed";
pub const PIPELINE_FAILED: &str = "pipeline.failed";
pub fn plugin_registered(plugin_name: impl Into<String>) -> Event {
Event::new(
PLUGIN_REGISTERED,
serde_json::json!({
"plugin_name": plugin_name.into()
}),
)
.with_priority(EventPriority::Normal)
}
pub fn plugin_execution_started(plugin_name: impl Into<String>) -> Event {
Event::from_plugin(
PLUGIN_EXECUTION_STARTED,
plugin_name.into(),
serde_json::json!({}),
)
.with_priority(EventPriority::Normal)
}
pub fn plugin_execution_completed(plugin_name: impl Into<String>, duration: Duration) -> Event {
Event::from_plugin(
PLUGIN_EXECUTION_COMPLETED,
plugin_name.into(),
serde_json::json!({
"duration_ms": duration.as_millis()
}),
)
.with_priority(EventPriority::Normal)
}
pub fn plugin_execution_failed(plugin_name: impl Into<String>, error: &PluginError) -> Event {
Event::from_plugin(
PLUGIN_EXECUTION_FAILED,
plugin_name.into(),
serde_json::json!({
"error": error.to_string()
}),
)
.with_priority(EventPriority::High)
}
pub fn plugin_initialized(plugin_name: impl Into<String>) -> Event {
Event::from_plugin(
PLUGIN_INITIALIZED,
plugin_name.into(),
serde_json::json!({}),
)
.with_priority(EventPriority::Normal)
}
pub fn plugin_cleanup_started(plugin_name: impl Into<String>) -> Event {
Event::from_plugin(
PLUGIN_CLEANUP_STARTED,
plugin_name.into(),
serde_json::json!({}),
)
.with_priority(EventPriority::Normal)
}
pub fn plugin_cleanup_completed(plugin_name: impl Into<String>) -> Event {
Event::from_plugin(
PLUGIN_CLEANUP_COMPLETED,
plugin_name.into(),
serde_json::json!({}),
)
.with_priority(EventPriority::Normal)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_event_creation() {
let event = Event::new("test.event", json!({"key": "value"}));
assert_eq!(event.event_type, "test.event");
assert_eq!(event.data, json!({"key": "value"}));
assert_eq!(event.priority, EventPriority::Normal);
assert!(event.source.is_none());
assert!(event.target.is_none());
}
#[tokio::test]
async fn test_event_from_plugin() {
let event = Event::from_plugin("plugin.event", "test-plugin", json!({"data": "test"}))
.with_priority(EventPriority::High)
.with_single_target("target-plugin")
.with_metadata("key", "value");
assert_eq!(event.event_type, "plugin.event");
assert_eq!(event.source, Some("test-plugin".to_string()));
assert_eq!(event.target, Some(vec!["target-plugin".to_string()]));
assert_eq!(event.priority, EventPriority::High);
assert_eq!(event.metadata.get("key"), Some(&"value".to_string()));
}
#[tokio::test]
async fn test_event_targeting() {
let broadcast_event = Event::new("broadcast", json!({}));
assert!(broadcast_event.is_targeted_to("any-plugin"));
let targeted_event = Event::new("targeted", json!({}))
.with_target(vec!["plugin-a".to_string(), "plugin-b".to_string()]);
assert!(targeted_event.is_targeted_to("plugin-a"));
assert!(targeted_event.is_targeted_to("plugin-b"));
assert!(!targeted_event.is_targeted_to("plugin-c"));
}
struct TestHandler {
call_count: Arc<AtomicUsize>,
event_types: Vec<String>,
priority: EventPriority,
}
impl TestHandler {
fn new(event_types: Vec<String>, priority: EventPriority) -> Self {
Self {
call_count: Arc::new(AtomicUsize::new(0)),
event_types,
priority,
}
}
fn call_count(&self) -> usize {
self.call_count.load(Ordering::Relaxed)
}
}
#[async_trait]
impl EventHandler for TestHandler {
async fn handle_event(&self, _event: &Event) -> PluginResult<()> {
self.call_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn event_types(&self) -> Vec<String> {
self.event_types.clone()
}
fn priority(&self) -> EventPriority {
self.priority
}
}
#[tokio::test]
async fn test_event_bus_basic() {
let bus = EventBus::new();
let _receiver = bus.subscribe(); let event = Event::new("test.event", json!({"test": true}));
bus.publish(event.clone()).await.unwrap();
let history = bus.get_history();
assert_eq!(history.len(), 1);
assert_eq!(history[0].event_type, "test.event");
}
#[tokio::test]
async fn test_event_bus_subscription() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = Event::new("subscription.test", json!({"data": "test"}));
bus.publish(event.clone()).await.unwrap();
let received = timeout(Duration::from_millis(100), receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(received.event_type, "subscription.test");
assert_eq!(received.data, json!({"data": "test"}));
}
#[tokio::test]
async fn test_event_bus_handlers() {
let bus = EventBus::new();
let _receiver = bus.subscribe();
let handler1 = Arc::new(TestHandler::new(
vec!["test.event".to_string()],
EventPriority::Normal,
));
let handler2 = Arc::new(TestHandler::new(
vec!["test.event".to_string(), "other.event".to_string()],
EventPriority::High,
));
bus.register_handler(handler1.clone());
bus.register_handler(handler2.clone());
let event = Event::new("test.event", json!({}));
bus.publish(event).await.unwrap();
assert_eq!(handler1.call_count(), 1);
assert_eq!(handler2.call_count(), 1);
let event = Event::new("other.event", json!({}));
bus.publish(event).await.unwrap();
assert_eq!(handler1.call_count(), 1);
assert_eq!(handler2.call_count(), 2);
}
#[tokio::test]
async fn test_event_history() {
let bus = EventBus::with_capacity(100, 5); let _receiver = bus.subscribe();
for i in 0..10 {
let event = Event::new(format!("event.{i}"), json!({"index": i}));
bus.publish(event).await.unwrap();
}
let history = bus.get_history();
assert_eq!(history.len(), 5);
for (i, event) in history.iter().enumerate() {
assert_eq!(event.event_type, format!("event.{}", i + 5));
}
}
#[tokio::test]
async fn test_system_events() {
let event = system_events::plugin_registered("test-plugin");
assert_eq!(event.event_type, system_events::PLUGIN_REGISTERED);
assert_eq!(event.data["plugin_name"], "test-plugin");
let duration = Duration::from_millis(100);
let event = system_events::plugin_execution_completed("test-plugin", duration);
assert_eq!(event.event_type, system_events::PLUGIN_EXECUTION_COMPLETED);
assert_eq!(event.data["duration_ms"], 100);
}
#[tokio::test]
async fn test_hook_type_display() {
assert_eq!(HookType::BeforeExecute.to_string(), "before_execute");
assert_eq!(HookType::AfterExecute.to_string(), "after_execute");
assert_eq!(HookType::OnError.to_string(), "on_error");
}
}