1use std::{
2 fmt::Debug,
3 sync::{
4 Arc,
5 atomic::{AtomicU64, Ordering},
6 },
7};
8
9use async_channel::{Receiver, Sender};
10use mf_state::{debug, state::State, Transaction};
11use arc_swap::ArcSwap;
13use dashmap::DashMap;
14
15use crate::{
16 config::EventConfig,
17 error::{ForgeResult, error_utils},
18};
19
20#[derive(Clone)]
22pub enum Event {
23 Create(Arc<State>),
24 TrApply(u64, Arc<Vec<Transaction>>, Arc<State>), Destroy, Stop, }
28
29impl Event {
30 pub fn name(&self) -> &'static str {
31 match self {
32 Event::Create(_) => "Create",
33 Event::TrApply(_, _, _) => "TrApply",
34 Event::Destroy => "Destroy",
35 Event::Stop => "Stop",
36 }
37 }
38}
39
40pub type HandlerId = u64;
42
43pub struct EventBus<T: Send + Sync + Clone + 'static> {
51 tx: Sender<T>,
52 rt: Receiver<T>,
53 event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T> + Send + Sync>>>>,
55 handler_registry:
57 Arc<DashMap<HandlerId, Arc<dyn EventHandler<T> + Send + Sync>>>,
58 next_handler_id: Arc<AtomicU64>,
60 shutdown: (Sender<()>, Receiver<()>),
61 config: EventConfig,
62 stats: EventBusStats,
64}
65
66#[derive(Clone, Debug)]
68pub struct EventBusStats {
69 pub events_processed: Arc<AtomicU64>,
71 pub active_handlers: Arc<AtomicU64>,
73 pub processing_failures: Arc<AtomicU64>,
75 pub processing_timeouts: Arc<AtomicU64>,
77}
78
79impl Default for EventBusStats {
80 fn default() -> Self {
81 Self {
82 events_processed: Arc::new(AtomicU64::new(0)),
83 active_handlers: Arc::new(AtomicU64::new(0)),
84 processing_failures: Arc::new(AtomicU64::new(0)),
85 processing_timeouts: Arc::new(AtomicU64::new(0)),
86 }
87 }
88}
89
90impl<T: Send + Sync + Clone + 'static> Default for EventBus<T> {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96impl<T: Send + Sync + Clone + 'static> Clone for EventBus<T> {
97 fn clone(&self) -> Self {
98 Self {
99 tx: self.tx.clone(),
100 rt: self.rt.clone(),
101 event_handlers: self.event_handlers.clone(),
102 handler_registry: self.handler_registry.clone(),
103 next_handler_id: self.next_handler_id.clone(),
104 shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
105 config: self.config.clone(),
106 stats: self.stats.clone(),
107 }
108 }
109}
110
111impl<T: Send + Sync + Clone + 'static> EventBus<T> {
112 pub fn add_event_handler(
114 &self,
115 event_handler: Arc<dyn EventHandler<T> + Send + Sync>,
116 ) -> ForgeResult<HandlerId> {
117 let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
118
119 self.handler_registry.insert(handler_id, event_handler.clone());
121
122 self.update_handler_list();
124
125 self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
127
128 Ok(handler_id)
129 }
130
131 pub fn add_event_handlers(
133 &self,
134 event_handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>>,
135 ) -> ForgeResult<Vec<HandlerId>> {
136 let mut handler_ids = Vec::with_capacity(event_handlers.len());
137
138 for handler in event_handlers {
139 let handler_id =
140 self.next_handler_id.fetch_add(1, Ordering::Relaxed);
141 self.handler_registry.insert(handler_id, handler);
142 handler_ids.push(handler_id);
143 }
144
145 self.update_handler_list();
147
148 self.stats
150 .active_handlers
151 .fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
152
153 Ok(handler_ids)
154 }
155
156 pub fn remove_event_handler(
158 &self,
159 handler_id: HandlerId,
160 ) -> ForgeResult<bool> {
161 let removed = self.handler_registry.remove(&handler_id).is_some();
162
163 if removed {
164 self.update_handler_list();
165 self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
166 }
167
168 Ok(removed)
169 }
170
171 pub fn remove_event_handlers(
173 &self,
174 handler_ids: &[HandlerId],
175 ) -> ForgeResult<usize> {
176 let mut removed_count = 0;
177
178 for &handler_id in handler_ids {
179 if self.handler_registry.remove(&handler_id).is_some() {
180 removed_count += 1;
181 }
182 }
183
184 if removed_count > 0 {
185 self.update_handler_list();
186 self.stats
187 .active_handlers
188 .fetch_sub(removed_count as u64, Ordering::Relaxed);
189 }
190
191 Ok(removed_count)
192 }
193
194 fn update_handler_list(&self) {
196 let handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>> = self
197 .handler_registry
198 .iter()
199 .map(|entry| entry.value().clone())
200 .collect();
201
202 self.event_handlers.store(Arc::new(handlers));
203 }
204
205 pub fn handler_count(&self) -> usize {
207 self.handler_registry.len()
208 }
209
210 pub fn clear_handlers(&self) -> ForgeResult<()> {
212 self.handler_registry.clear();
213 self.event_handlers.store(Arc::new(Vec::new()));
214 self.stats.active_handlers.store(0, Ordering::Relaxed);
215 Ok(())
216 }
217 pub async fn destroy(&self) -> ForgeResult<()> {
219 self.shutdown.0.send(()).await.map_err(|e| {
220 error_utils::event_error(format!("发送关闭信号失败: {}", e))
221 })
222 }
223
224 pub fn destroy_blocking(&self) {
228 let _ = self.shutdown.0.send_blocking(());
229 }
230 pub fn start_event_loop(&self) {
232 let rx: async_channel::Receiver<T> = self.subscribe();
233 let event_handlers = self.event_handlers.clone();
234 let shutdown_rt = self.shutdown.1.clone();
235 let config = self.config.clone();
236 let stats = self.stats.clone();
237 tokio::spawn(async move {
238 let mut join_set = tokio::task::JoinSet::new();
239
240 let cleanup_timeout = config.handler_timeout;
242 async fn cleanup_tasks(
243 join_set: &mut tokio::task::JoinSet<()>,
244 timeout: std::time::Duration,
245 ) {
246 debug!("开始清理事件处理任务...");
247 join_set.shutdown().await;
249 match tokio::time::timeout(timeout, async {
251 while let Some(result) = join_set.join_next().await {
252 if let Err(e) = result {
253 debug!("事件处理任务错误: {}", e);
254 }
255 }
256 })
257 .await
258 {
259 Ok(_) => debug!("所有事件处理任务已正常清理"),
260 Err(_) => debug!("事件处理任务清理超时"),
261 }
262 }
263 loop {
264 tokio::select! {
265 event = rx.recv() => match event {
266 Ok(event) => {
267 if join_set.len() >= config.max_concurrent_handlers {
269 debug!("事件处理任务数量达到上限,等待部分任务完成...");
270 if let Some(result) = join_set.join_next().await {
272 if let Err(e) = result {
273 debug!("事件处理任务错误: {}", e);
274 }
275 }
276 }
277
278 let handlers = event_handlers.load();
280 let handler_timeout = config.handler_timeout;
281 let event_stats = stats.clone();
282
283 event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
285
286 join_set.spawn(async move {
287 let mut handler_set = tokio::task::JoinSet::new();
289 for handler in handlers.iter().cloned() {
290 let event_for_task = event.clone();
291 handler_set.spawn(async move {
292 let e = event_for_task;
294 match tokio::time::timeout(handler_timeout, handler.handle(&e)).await {
295 Ok(Ok(_)) => (true, false, false),
296 Ok(Err(e)) => { debug!("事件处理器执行失败: {}", e); (false, true, false) },
297 Err(_) => { debug!("事件处理器执行超时"); (false, false, true) },
298 }
299 });
300 }
301
302 let mut success_count = 0u64;
303 let mut failure_count = 0u64;
304 let mut timeout_count = 0u64;
305 while let Some(res) = handler_set.join_next().await {
306 match res {
307 Ok((ok, fail, timeout)) => {
308 if ok { success_count += 1; }
309 if fail { failure_count += 1; }
310 if timeout { timeout_count += 1; }
311 }
312 Err(e) => debug!("事件处理器任务错误: {}", e),
313 }
314 }
315
316 if failure_count > 0 {
317 event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
318 }
319 if timeout_count > 0 {
320 event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
321 }
322
323 debug!("事件处理完成: 成功={}, 失败={}, 超时={}", success_count, failure_count, timeout_count);
324 });
325 },
326 Err(e) => {
327 debug!("事件接收错误: {}", e);
328 cleanup_tasks(&mut join_set, cleanup_timeout).await;
329 break;
330 },
331 },
332 _ = shutdown_rt.recv() => {
333 cleanup_tasks(&mut join_set, cleanup_timeout).await;
335 debug!("事件管理器接收到关闭信号,正在退出...");
336 break;
337 },
338 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
340 while let Some(result) = join_set.try_join_next() {
342 if let Err(e) = result {
343 debug!("事件处理任务错误: {}", e);
344 }
345 }
346 },
347 }
348 }
349 });
350 }
351
352 pub fn new() -> Self {
353 Self::with_config(EventConfig::default())
354 }
355
356 pub fn with_config(config: EventConfig) -> Self {
357 let (tx, rt) = async_channel::bounded(config.max_queue_size);
358 let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
359 Self {
360 tx,
361 rt,
362 event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
363 handler_registry: Arc::new(DashMap::new()),
364 next_handler_id: Arc::new(AtomicU64::new(1)),
365 shutdown: (shutdown_tx, shutdown_rt),
366 config,
367 stats: EventBusStats::default(),
368 }
369 }
370
371 pub fn subscribe(&self) -> Receiver<T> {
372 self.rt.clone()
373 }
374
375 pub async fn broadcast(
376 &self,
377 event: T,
378 ) -> ForgeResult<()> {
379 self.tx.send(event).await.map_err(|e| {
380 error_utils::event_error(format!("广播事件失败: {}", e))
381 })
382 }
383 pub fn broadcast_blocking(
401 &self,
402 event: T,
403 ) -> ForgeResult<()> {
404 self.tx.send_blocking(event).map_err(|e| {
405 error_utils::event_error(format!("广播事件失败: {}", e))
406 })
407 }
408
409 pub fn get_config(&self) -> &EventConfig {
411 &self.config
412 }
413
414 pub fn update_config(
416 &mut self,
417 config: EventConfig,
418 ) {
419 self.config = config;
420 }
421
422 pub fn get_stats(&self) -> EventBusStats {
424 self.stats.clone()
425 }
426
427 pub fn reset_stats(&self) {
429 self.stats.events_processed.store(0, Ordering::Relaxed);
430 self.stats.processing_failures.store(0, Ordering::Relaxed);
431 self.stats.processing_timeouts.store(0, Ordering::Relaxed);
432 }
434
435 pub fn get_performance_report(&self) -> EventBusPerformanceReport {
437 let stats = &self.stats;
438 EventBusPerformanceReport {
439 total_events_processed: stats
440 .events_processed
441 .load(Ordering::Relaxed),
442 active_handlers_count: stats
443 .active_handlers
444 .load(Ordering::Relaxed),
445 total_processing_failures: stats
446 .processing_failures
447 .load(Ordering::Relaxed),
448 total_processing_timeouts: stats
449 .processing_timeouts
450 .load(Ordering::Relaxed),
451 handler_registry_size: self.handler_registry.len(),
452 success_rate: {
453 let total = stats.events_processed.load(Ordering::Relaxed);
454 let failures =
455 stats.processing_failures.load(Ordering::Relaxed);
456 if total > 0 {
457 ((total - failures) as f64 / total as f64) * 100.0
458 } else {
459 100.0
460 }
461 },
462 }
463 }
464}
465
466#[derive(Debug, Clone)]
468pub struct EventBusPerformanceReport {
469 pub total_events_processed: u64,
471 pub active_handlers_count: u64,
473 pub total_processing_failures: u64,
475 pub total_processing_timeouts: u64,
477 pub handler_registry_size: usize,
479 pub success_rate: f64,
481}
482
483#[async_trait::async_trait]
485pub trait EventHandler<T>: Send + Sync + Debug {
486 async fn handle(
487 &self,
488 event: &T,
489 ) -> ForgeResult<()>;
490}