use std::{
fmt::Debug,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use async_channel::{Receiver, Sender};
use mf_state::{state::State, Transaction};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use crate::{
config::EventConfig,
debug::debug,
error::{ForgeResult, error_utils},
};
#[derive(Debug, Clone)]
pub enum Event {
Create(Arc<State>),
TrApply(u64, Vec<Arc<Transaction>>, Arc<State>), Destroy, Stop, }
impl Event {
pub fn name(&self) -> &'static str {
match self {
Event::Create(_) => "Create",
Event::TrApply(_, _, _) => "TrApply",
Event::Destroy => "Destroy",
Event::Stop => "Stop",
}
}
}
pub type HandlerId = u64;
pub struct EventBus<T: Send + Sync + Clone + 'static> {
tx: Sender<T>,
rt: Receiver<T>,
event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T> + Send + Sync>>>>,
handler_registry:
Arc<DashMap<HandlerId, Arc<dyn EventHandler<T> + Send + Sync>>>,
next_handler_id: Arc<AtomicU64>,
shutdown: (Sender<()>, Receiver<()>),
config: EventConfig,
stats: EventBusStats,
}
#[derive(Clone, Debug)]
pub struct EventBusStats {
pub events_processed: Arc<AtomicU64>,
pub active_handlers: Arc<AtomicU64>,
pub processing_failures: Arc<AtomicU64>,
pub processing_timeouts: Arc<AtomicU64>,
}
impl Default for EventBusStats {
fn default() -> Self {
Self {
events_processed: Arc::new(AtomicU64::new(0)),
active_handlers: Arc::new(AtomicU64::new(0)),
processing_failures: Arc::new(AtomicU64::new(0)),
processing_timeouts: Arc::new(AtomicU64::new(0)),
}
}
}
impl<T: Send + Sync + Clone + 'static> Default for EventBus<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Send + Sync + Clone + 'static> Clone for EventBus<T> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
rt: self.rt.clone(),
event_handlers: self.event_handlers.clone(),
handler_registry: self.handler_registry.clone(),
next_handler_id: self.next_handler_id.clone(),
shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
config: self.config.clone(),
stats: self.stats.clone(),
}
}
}
impl<T: Send + Sync + Clone + 'static> EventBus<T> {
pub fn add_event_handler(
&self,
event_handler: Arc<dyn EventHandler<T> + Send + Sync>,
) -> ForgeResult<HandlerId> {
let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
self.handler_registry.insert(handler_id, event_handler.clone());
self.update_handler_list();
self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
Ok(handler_id)
}
pub fn add_event_handlers(
&self,
event_handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>>,
) -> ForgeResult<Vec<HandlerId>> {
let mut handler_ids = Vec::with_capacity(event_handlers.len());
for handler in event_handlers {
let handler_id =
self.next_handler_id.fetch_add(1, Ordering::Relaxed);
self.handler_registry.insert(handler_id, handler);
handler_ids.push(handler_id);
}
self.update_handler_list();
self.stats
.active_handlers
.fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
Ok(handler_ids)
}
pub fn remove_event_handler(
&self,
handler_id: HandlerId,
) -> ForgeResult<bool> {
let removed = self.handler_registry.remove(&handler_id).is_some();
if removed {
self.update_handler_list();
self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
}
Ok(removed)
}
pub fn remove_event_handlers(
&self,
handler_ids: &[HandlerId],
) -> ForgeResult<usize> {
let mut removed_count = 0;
for &handler_id in handler_ids {
if self.handler_registry.remove(&handler_id).is_some() {
removed_count += 1;
}
}
if removed_count > 0 {
self.update_handler_list();
self.stats
.active_handlers
.fetch_sub(removed_count as u64, Ordering::Relaxed);
}
Ok(removed_count)
}
fn update_handler_list(&self) {
let handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>> = self
.handler_registry
.iter()
.map(|entry| entry.value().clone())
.collect();
self.event_handlers.store(Arc::new(handlers));
}
pub fn handler_count(&self) -> usize {
self.handler_registry.len()
}
pub fn clear_handlers(&self) -> ForgeResult<()> {
self.handler_registry.clear();
self.event_handlers.store(Arc::new(Vec::new()));
self.stats.active_handlers.store(0, Ordering::Relaxed);
Ok(())
}
pub async fn destroy(&self) -> ForgeResult<()> {
self.shutdown.0.send(()).await.map_err(|e| {
error_utils::event_error(format!("发送关闭信号失败: {e}"))
})
}
pub fn destroy_blocking(&self) {
let _ = self.shutdown.0.send_blocking(());
}
pub fn start_event_loop(&self) {
let rx: async_channel::Receiver<T> = self.subscribe();
let event_handlers = self.event_handlers.clone();
let shutdown_rt = self.shutdown.1.clone();
let config = self.config.clone();
let stats = self.stats.clone();
tokio::spawn(async move {
let mut join_set = tokio::task::JoinSet::new();
let cleanup_timeout = config.handler_timeout;
async fn cleanup_tasks(
join_set: &mut tokio::task::JoinSet<()>,
timeout: std::time::Duration,
) {
debug!("开始清理事件处理任务...");
join_set.shutdown().await;
match tokio::time::timeout(timeout, async {
while let Some(result) = join_set.join_next().await {
if let Err(e) = result {
debug!("事件处理任务错误: {}", e);
}
}
})
.await
{
Ok(_) => debug!("所有事件处理任务已正常清理"),
Err(_) => debug!("事件处理任务清理超时"),
}
}
loop {
tokio::select! {
event = rx.recv() => match event {
Ok(event) => {
if join_set.len() >= config.max_concurrent_handlers {
debug!("事件处理任务数量达到上限,等待部分任务完成...");
if let Some(Err(e)) = join_set.join_next().await {
debug!("事件处理任务错误: {}", e);
}
}
let handlers = event_handlers.load();
let handler_timeout = config.handler_timeout;
let event_stats = stats.clone();
event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
join_set.spawn(async move {
let mut handler_set = tokio::task::JoinSet::new();
#[allow(clippy::unnecessary_to_owned)]
for handler in handlers.iter().cloned() {
let event_for_task = event.clone();
handler_set.spawn(async move {
let e = event_for_task;
match tokio::time::timeout(handler_timeout, handler.handle(&e)).await {
Ok(Ok(_)) => (true, false, false),
Ok(Err(e)) => { debug!("事件处理器执行失败: {}", e); (false, true, false) },
Err(_) => { debug!("事件处理器执行超时"); (false, false, true) },
}
});
}
let mut success_count = 0u64;
let mut failure_count = 0u64;
let mut timeout_count = 0u64;
while let Some(res) = handler_set.join_next().await {
match res {
Ok((ok, fail, timeout)) => {
if ok { success_count += 1; }
if fail { failure_count += 1; }
if timeout { timeout_count += 1; }
}
Err(e) => debug!("事件处理器任务错误: {}", e),
}
}
if failure_count > 0 {
event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
}
if timeout_count > 0 {
event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
}
debug!("事件处理完成: 成功={}, 失败={}, 超时={}", success_count, failure_count, timeout_count);
});
},
Err(e) => {
debug!("事件接收错误: {}", e);
cleanup_tasks(&mut join_set, cleanup_timeout).await;
break;
},
},
_ = shutdown_rt.recv() => {
cleanup_tasks(&mut join_set, cleanup_timeout).await;
debug!("事件管理器接收到关闭信号,正在退出...");
break;
},
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
while let Some(result) = join_set.try_join_next() {
if let Err(e) = result {
debug!("事件处理任务错误: {}", e);
}
}
},
}
}
});
}
pub fn new() -> Self {
Self::with_config(EventConfig::default())
}
pub fn with_config(config: EventConfig) -> Self {
let (tx, rt) = async_channel::bounded(config.max_queue_size);
let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
Self {
tx,
rt,
event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
handler_registry: Arc::new(DashMap::new()),
next_handler_id: Arc::new(AtomicU64::new(1)),
shutdown: (shutdown_tx, shutdown_rt),
config,
stats: EventBusStats::default(),
}
}
pub fn subscribe(&self) -> Receiver<T> {
self.rt.clone()
}
pub async fn broadcast(
&self,
event: T,
) -> ForgeResult<()> {
self.tx
.send(event)
.await
.map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
}
pub fn broadcast_blocking(
&self,
event: T,
) -> ForgeResult<()> {
self.tx
.send_blocking(event)
.map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
}
pub fn get_config(&self) -> &EventConfig {
&self.config
}
pub fn update_config(
&mut self,
config: EventConfig,
) {
self.config = config;
}
pub fn get_stats(&self) -> EventBusStats {
self.stats.clone()
}
pub fn reset_stats(&self) {
self.stats.events_processed.store(0, Ordering::Relaxed);
self.stats.processing_failures.store(0, Ordering::Relaxed);
self.stats.processing_timeouts.store(0, Ordering::Relaxed);
}
pub fn get_performance_report(&self) -> EventBusPerformanceReport {
let stats = &self.stats;
EventBusPerformanceReport {
total_events_processed: stats
.events_processed
.load(Ordering::Relaxed),
active_handlers_count: stats
.active_handlers
.load(Ordering::Relaxed),
total_processing_failures: stats
.processing_failures
.load(Ordering::Relaxed),
total_processing_timeouts: stats
.processing_timeouts
.load(Ordering::Relaxed),
handler_registry_size: self.handler_registry.len(),
success_rate: {
let total = stats.events_processed.load(Ordering::Relaxed);
let failures =
stats.processing_failures.load(Ordering::Relaxed);
if total > 0 {
((total - failures) as f64 / total as f64) * 100.0
} else {
100.0
}
},
}
}
}
#[derive(Debug, Clone)]
pub struct EventBusPerformanceReport {
pub total_events_processed: u64,
pub active_handlers_count: u64,
pub total_processing_failures: u64,
pub total_processing_timeouts: u64,
pub handler_registry_size: usize,
pub success_rate: f64,
}
#[async_trait::async_trait]
pub trait EventHandler<T>: Send + Sync + Debug {
async fn handle(
&self,
event: &T,
) -> ForgeResult<()>;
}