1use std::{
2 fmt::Debug,
3 sync::{
4 Arc,
5 atomic::{AtomicU64, Ordering},
6 },
7};
8
9use async_channel::{Receiver, Sender};
10use mf_state::{state::State, Transaction};
11use arc_swap::ArcSwap;
13use dashmap::DashMap;
14
15use crate::{
16 config::EventConfig,
17 debug::debug,
18 error::{ForgeResult, error_utils},
19};
20
21#[derive(Debug, Clone)]
23pub enum Event {
24 Create(Arc<State>),
25 TrApply(u64, Vec<Arc<Transaction>>, Arc<State>), Destroy, Stop, }
29
30impl Event {
31 pub fn name(&self) -> &'static str {
32 match self {
33 Event::Create(_) => "Create",
34 Event::TrApply(_, _, _) => "TrApply",
35 Event::Destroy => "Destroy",
36 Event::Stop => "Stop",
37 }
38 }
39}
40
41pub type HandlerId = u64;
43
44pub struct EventBus<T: Send + Sync + Clone + 'static> {
52 tx: Sender<T>,
53 rt: Receiver<T>,
54 event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T> + Send + Sync>>>>,
56 handler_registry:
58 Arc<DashMap<HandlerId, Arc<dyn EventHandler<T> + Send + Sync>>>,
59 next_handler_id: Arc<AtomicU64>,
61 shutdown: (Sender<()>, Receiver<()>),
62 config: EventConfig,
63 stats: EventBusStats,
65}
66
67#[derive(Clone, Debug)]
69pub struct EventBusStats {
70 pub events_processed: Arc<AtomicU64>,
72 pub active_handlers: Arc<AtomicU64>,
74 pub processing_failures: Arc<AtomicU64>,
76 pub processing_timeouts: Arc<AtomicU64>,
78}
79
80impl Default for EventBusStats {
81 fn default() -> Self {
82 Self {
83 events_processed: Arc::new(AtomicU64::new(0)),
84 active_handlers: Arc::new(AtomicU64::new(0)),
85 processing_failures: Arc::new(AtomicU64::new(0)),
86 processing_timeouts: Arc::new(AtomicU64::new(0)),
87 }
88 }
89}
90
91impl<T: Send + Sync + Clone + 'static> Default for EventBus<T> {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl<T: Send + Sync + Clone + 'static> Clone for EventBus<T> {
98 fn clone(&self) -> Self {
99 Self {
100 tx: self.tx.clone(),
101 rt: self.rt.clone(),
102 event_handlers: self.event_handlers.clone(),
103 handler_registry: self.handler_registry.clone(),
104 next_handler_id: self.next_handler_id.clone(),
105 shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
106 config: self.config.clone(),
107 stats: self.stats.clone(),
108 }
109 }
110}
111
112impl<T: Send + Sync + Clone + 'static> EventBus<T> {
113 pub fn add_event_handler(
115 &self,
116 event_handler: Arc<dyn EventHandler<T> + Send + Sync>,
117 ) -> ForgeResult<HandlerId> {
118 let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
119
120 self.handler_registry.insert(handler_id, event_handler.clone());
122
123 self.update_handler_list();
125
126 self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
128
129 Ok(handler_id)
130 }
131
132 pub fn add_event_handlers(
134 &self,
135 event_handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>>,
136 ) -> ForgeResult<Vec<HandlerId>> {
137 let mut handler_ids = Vec::with_capacity(event_handlers.len());
138
139 for handler in event_handlers {
140 let handler_id =
141 self.next_handler_id.fetch_add(1, Ordering::Relaxed);
142 self.handler_registry.insert(handler_id, handler);
143 handler_ids.push(handler_id);
144 }
145
146 self.update_handler_list();
148
149 self.stats
151 .active_handlers
152 .fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
153
154 Ok(handler_ids)
155 }
156
157 pub fn remove_event_handler(
159 &self,
160 handler_id: HandlerId,
161 ) -> ForgeResult<bool> {
162 let removed = self.handler_registry.remove(&handler_id).is_some();
163
164 if removed {
165 self.update_handler_list();
166 self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
167 }
168
169 Ok(removed)
170 }
171
172 pub fn remove_event_handlers(
174 &self,
175 handler_ids: &[HandlerId],
176 ) -> ForgeResult<usize> {
177 let mut removed_count = 0;
178
179 for &handler_id in handler_ids {
180 if self.handler_registry.remove(&handler_id).is_some() {
181 removed_count += 1;
182 }
183 }
184
185 if removed_count > 0 {
186 self.update_handler_list();
187 self.stats
188 .active_handlers
189 .fetch_sub(removed_count as u64, Ordering::Relaxed);
190 }
191
192 Ok(removed_count)
193 }
194
195 fn update_handler_list(&self) {
197 let handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>> = self
198 .handler_registry
199 .iter()
200 .map(|entry| entry.value().clone())
201 .collect();
202
203 self.event_handlers.store(Arc::new(handlers));
204 }
205
206 pub fn handler_count(&self) -> usize {
208 self.handler_registry.len()
209 }
210
211 pub fn clear_handlers(&self) -> ForgeResult<()> {
213 self.handler_registry.clear();
214 self.event_handlers.store(Arc::new(Vec::new()));
215 self.stats.active_handlers.store(0, Ordering::Relaxed);
216 Ok(())
217 }
218 pub async fn destroy(&self) -> ForgeResult<()> {
220 self.shutdown.0.send(()).await.map_err(|e| {
221 error_utils::event_error(format!("发送关闭信号失败: {e}"))
222 })
223 }
224
225 pub fn destroy_blocking(&self) {
229 let _ = self.shutdown.0.send_blocking(());
230 }
231 pub fn start_event_loop(&self) {
233 let rx: async_channel::Receiver<T> = self.subscribe();
234 let event_handlers = self.event_handlers.clone();
235 let shutdown_rt = self.shutdown.1.clone();
236 let config = self.config.clone();
237 let stats = self.stats.clone();
238 tokio::spawn(async move {
239 let mut join_set = tokio::task::JoinSet::new();
240
241 let cleanup_timeout = config.handler_timeout;
243 async fn cleanup_tasks(
244 join_set: &mut tokio::task::JoinSet<()>,
245 timeout: std::time::Duration,
246 ) {
247 debug!("开始清理事件处理任务...");
248 join_set.shutdown().await;
250 match tokio::time::timeout(timeout, async {
252 while let Some(result) = join_set.join_next().await {
253 if let Err(e) = result {
254 debug!("事件处理任务错误: {}", e);
255 }
256 }
257 })
258 .await
259 {
260 Ok(_) => debug!("所有事件处理任务已正常清理"),
261 Err(_) => debug!("事件处理任务清理超时"),
262 }
263 }
264 loop {
265 tokio::select! {
266 event = rx.recv() => match event {
267 Ok(event) => {
268 if join_set.len() >= config.max_concurrent_handlers {
270 debug!("事件处理任务数量达到上限,等待部分任务完成...");
271 if let Some(Err(e)) = join_set.join_next().await {
273 debug!("事件处理任务错误: {}", e);
274 }
275 }
276
277 let handlers = event_handlers.load();
279 let handler_timeout = config.handler_timeout;
280 let event_stats = stats.clone();
281
282 event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
284
285 join_set.spawn(async move {
286 let mut handler_set = tokio::task::JoinSet::new();
288 #[allow(clippy::unnecessary_to_owned)]
289 for handler in handlers.iter().cloned() {
290 let event_for_task = event.clone();
291 handler_set.spawn(async move {
292 let e = event_for_task;
294 match tokio::time::timeout(handler_timeout, handler.handle(&e)).await {
295 Ok(Ok(_)) => (true, false, false),
296 Ok(Err(e)) => { debug!("事件处理器执行失败: {}", e); (false, true, false) },
297 Err(_) => { debug!("事件处理器执行超时"); (false, false, true) },
298 }
299 });
300 }
301
302 let mut success_count = 0u64;
303 let mut failure_count = 0u64;
304 let mut timeout_count = 0u64;
305 while let Some(res) = handler_set.join_next().await {
306 match res {
307 Ok((ok, fail, timeout)) => {
308 if ok { success_count += 1; }
309 if fail { failure_count += 1; }
310 if timeout { timeout_count += 1; }
311 }
312 Err(e) => debug!("事件处理器任务错误: {}", e),
313 }
314 }
315
316 if failure_count > 0 {
317 event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
318 }
319 if timeout_count > 0 {
320 event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
321 }
322
323 debug!("事件处理完成: 成功={}, 失败={}, 超时={}", success_count, failure_count, timeout_count);
324 });
325 },
326 Err(e) => {
327 debug!("事件接收错误: {}", e);
328 cleanup_tasks(&mut join_set, cleanup_timeout).await;
329 break;
330 },
331 },
332 _ = shutdown_rt.recv() => {
333 cleanup_tasks(&mut join_set, cleanup_timeout).await;
335 debug!("事件管理器接收到关闭信号,正在退出...");
336 break;
337 },
338 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
340 while let Some(result) = join_set.try_join_next() {
342 if let Err(e) = result {
343 debug!("事件处理任务错误: {}", e);
344 }
345 }
346 },
347 }
348 }
349 });
350 }
351
352 pub fn new() -> Self {
353 Self::with_config(EventConfig::default())
354 }
355
356 pub fn with_config(config: EventConfig) -> Self {
357 let (tx, rt) = async_channel::bounded(config.max_queue_size);
358 let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
359 Self {
360 tx,
361 rt,
362 event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
363 handler_registry: Arc::new(DashMap::new()),
364 next_handler_id: Arc::new(AtomicU64::new(1)),
365 shutdown: (shutdown_tx, shutdown_rt),
366 config,
367 stats: EventBusStats::default(),
368 }
369 }
370
371 pub fn subscribe(&self) -> Receiver<T> {
372 self.rt.clone()
373 }
374
375 pub async fn broadcast(
376 &self,
377 event: T,
378 ) -> ForgeResult<()> {
379 self.tx
380 .send(event)
381 .await
382 .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
383 }
384 pub fn broadcast_blocking(
402 &self,
403 event: T,
404 ) -> ForgeResult<()> {
405 self.tx
406 .send_blocking(event)
407 .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
408 }
409
410 pub fn get_config(&self) -> &EventConfig {
412 &self.config
413 }
414
415 pub fn update_config(
417 &mut self,
418 config: EventConfig,
419 ) {
420 self.config = config;
421 }
422
423 pub fn get_stats(&self) -> EventBusStats {
425 self.stats.clone()
426 }
427
428 pub fn reset_stats(&self) {
430 self.stats.events_processed.store(0, Ordering::Relaxed);
431 self.stats.processing_failures.store(0, Ordering::Relaxed);
432 self.stats.processing_timeouts.store(0, Ordering::Relaxed);
433 }
435
436 pub fn get_performance_report(&self) -> EventBusPerformanceReport {
438 let stats = &self.stats;
439 EventBusPerformanceReport {
440 total_events_processed: stats
441 .events_processed
442 .load(Ordering::Relaxed),
443 active_handlers_count: stats
444 .active_handlers
445 .load(Ordering::Relaxed),
446 total_processing_failures: stats
447 .processing_failures
448 .load(Ordering::Relaxed),
449 total_processing_timeouts: stats
450 .processing_timeouts
451 .load(Ordering::Relaxed),
452 handler_registry_size: self.handler_registry.len(),
453 success_rate: {
454 let total = stats.events_processed.load(Ordering::Relaxed);
455 let failures =
456 stats.processing_failures.load(Ordering::Relaxed);
457 if total > 0 {
458 ((total - failures) as f64 / total as f64) * 100.0
459 } else {
460 100.0
461 }
462 },
463 }
464 }
465}
466
467#[derive(Debug, Clone)]
469pub struct EventBusPerformanceReport {
470 pub total_events_processed: u64,
472 pub active_handlers_count: u64,
474 pub total_processing_failures: u64,
476 pub total_processing_timeouts: u64,
478 pub handler_registry_size: usize,
480 pub success_rate: f64,
482}
483
484#[async_trait::async_trait]
486pub trait EventHandler<T>: Send + Sync + Debug {
487 async fn handle(
488 &self,
489 event: &T,
490 ) -> ForgeResult<()>;
491}