1use std::{
2 fmt::Debug,
3 sync::{Arc, atomic::{AtomicU64, Ordering}},
4};
5
6use async_channel::{Receiver, Sender};
7use mf_state::{debug, state::State, Transaction};
8use tokio::signal;
9use arc_swap::ArcSwap;
10use dashmap::DashMap;
11
12use crate::{
13 config::EventConfig,
14 error::{ForgeResult, error_utils},
15};
16
17#[derive(Clone)]
19pub enum Event {
20 Create(Arc<State>),
21 TrApply(u64, Arc<Vec<Transaction>>, Arc<State>), Destroy, Stop, }
25
26impl Event {
27 pub fn name(&self) -> &'static str {
28 match self {
29 Event::Create(_) => "Create",
30 Event::TrApply(_, _, _) => "TrApply",
31 Event::Destroy => "Destroy",
32 Event::Stop => "Stop",
33 }
34 }
35}
36
37pub type HandlerId = u64;
39
40pub struct EventBus<T: Send + 'static> {
48 tx: Sender<T>,
49 rt: Receiver<T>,
50 event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T>>>>>,
52 handler_registry: Arc<DashMap<HandlerId, Arc<dyn EventHandler<T>>>>,
54 next_handler_id: Arc<AtomicU64>,
56 shutdown: (Sender<()>, Receiver<()>),
57 config: EventConfig,
58 stats: EventBusStats,
60}
61
62#[derive(Clone, Debug)]
64pub struct EventBusStats {
65 pub events_processed: Arc<AtomicU64>,
67 pub active_handlers: Arc<AtomicU64>,
69 pub processing_failures: Arc<AtomicU64>,
71 pub processing_timeouts: Arc<AtomicU64>,
73}
74
75impl Default for EventBusStats {
76 fn default() -> Self {
77 Self {
78 events_processed: Arc::new(AtomicU64::new(0)),
79 active_handlers: Arc::new(AtomicU64::new(0)),
80 processing_failures: Arc::new(AtomicU64::new(0)),
81 processing_timeouts: Arc::new(AtomicU64::new(0)),
82 }
83 }
84}
85
86impl<T: Send + 'static> Default for EventBus<T> {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl<T: Send + 'static> Clone for EventBus<T> {
93 fn clone(&self) -> Self {
94 Self {
95 tx: self.tx.clone(),
96 rt: self.rt.clone(),
97 event_handlers: self.event_handlers.clone(),
98 handler_registry: self.handler_registry.clone(),
99 next_handler_id: self.next_handler_id.clone(),
100 shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
101 config: self.config.clone(),
102 stats: self.stats.clone(),
103 }
104 }
105}
106
107impl<T: Send + 'static> EventBus<T> {
108 pub fn add_event_handler(
110 &self,
111 event_handler: Arc<dyn EventHandler<T>>,
112 ) -> ForgeResult<HandlerId> {
113 let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
114
115 self.handler_registry.insert(handler_id, event_handler.clone());
117
118 self.update_handler_list();
120
121 self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
123
124 Ok(handler_id)
125 }
126
127 pub fn add_event_handlers(
129 &self,
130 event_handlers: Vec<Arc<dyn EventHandler<T>>>,
131 ) -> ForgeResult<Vec<HandlerId>> {
132 let mut handler_ids = Vec::with_capacity(event_handlers.len());
133
134 for handler in event_handlers {
135 let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
136 self.handler_registry.insert(handler_id, handler);
137 handler_ids.push(handler_id);
138 }
139
140 self.update_handler_list();
142
143 self.stats.active_handlers.fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
145
146 Ok(handler_ids)
147 }
148
149 pub fn remove_event_handler(&self, handler_id: HandlerId) -> ForgeResult<bool> {
151 let removed = self.handler_registry.remove(&handler_id).is_some();
152
153 if removed {
154 self.update_handler_list();
155 self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
156 }
157
158 Ok(removed)
159 }
160
161 pub fn remove_event_handlers(&self, handler_ids: &[HandlerId]) -> ForgeResult<usize> {
163 let mut removed_count = 0;
164
165 for &handler_id in handler_ids {
166 if self.handler_registry.remove(&handler_id).is_some() {
167 removed_count += 1;
168 }
169 }
170
171 if removed_count > 0 {
172 self.update_handler_list();
173 self.stats.active_handlers.fetch_sub(removed_count as u64, Ordering::Relaxed);
174 }
175
176 Ok(removed_count)
177 }
178
179 fn update_handler_list(&self) {
181 let handlers: Vec<Arc<dyn EventHandler<T>>> = self.handler_registry
182 .iter()
183 .map(|entry| entry.value().clone())
184 .collect();
185
186 self.event_handlers.store(Arc::new(handlers));
187 }
188
189 pub fn handler_count(&self) -> usize {
191 self.handler_registry.len()
192 }
193
194 pub fn clear_handlers(&self) -> ForgeResult<()> {
196 self.handler_registry.clear();
197 self.event_handlers.store(Arc::new(Vec::new()));
198 self.stats.active_handlers.store(0, Ordering::Relaxed);
199 Ok(())
200 }
201 pub async fn destroy(&self) -> ForgeResult<()> {
203 self.shutdown.0.send(()).await.map_err(|e| {
204 error_utils::event_error(format!("发送关闭信号失败: {}", e))
205 })
206 }
207
208 pub fn destroy_blocking(&self) {
212 let _ = self.shutdown.0.send_blocking(());
213 }
214 pub fn start_event_loop(&self) {
216 let rx: async_channel::Receiver<T> = self.subscribe();
217 let event_handlers = self.event_handlers.clone();
218 let shutdown_rt = self.shutdown.1.clone();
219 let config = self.config.clone();
220 let stats = self.stats.clone();
221 tokio::spawn(async move {
222 let mut join_set = tokio::task::JoinSet::new();
223
224 let cleanup_timeout = config.handler_timeout;
226 async fn cleanup_tasks(
227 join_set: &mut tokio::task::JoinSet<()>,
228 timeout: std::time::Duration,
229 ) {
230 debug!("开始清理事件处理任务...");
231 join_set.shutdown().await;
233 match tokio::time::timeout(timeout, async {
235 while let Some(result) = join_set.join_next().await {
236 if let Err(e) = result {
237 debug!("事件处理任务错误: {}", e);
238 }
239 }
240 })
241 .await
242 {
243 Ok(_) => debug!("所有事件处理任务已正常清理"),
244 Err(_) => debug!("事件处理任务清理超时"),
245 }
246 }
247 loop {
248 tokio::select! {
249 event = rx.recv() => match event {
250 Ok(event) => {
251 if join_set.len() >= config.max_concurrent_handlers {
253 debug!("事件处理任务数量达到上限,等待部分任务完成...");
254 if let Some(result) = join_set.join_next().await {
256 if let Err(e) = result {
257 debug!("事件处理任务错误: {}", e);
258 }
259 }
260 }
261
262 let handlers = event_handlers.load();
264 let handler_timeout = config.handler_timeout;
265 let event_stats = stats.clone();
266
267 event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
269
270 join_set.spawn(async move {
271 let mut success_count = 0;
272 let mut failure_count = 0;
273 let mut timeout_count = 0;
274
275 for handler in handlers.iter() {
277 match tokio::time::timeout(handler_timeout, handler.handle(&event)).await {
278 Ok(Ok(_)) => {
279 success_count += 1;
280 }, Ok(Err(e)) => {
282 failure_count += 1;
283 debug!("事件处理器执行失败: {}", e);
284 },
285 Err(_) => {
286 timeout_count += 1;
287 debug!("事件处理器执行超时");
288 },
289 }
290 }
291
292 if failure_count > 0 {
294 event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
295 }
296 if timeout_count > 0 {
297 event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
298 }
299
300 debug!("事件处理完成: 成功={}, 失败={}, 超时={}",
301 success_count, failure_count, timeout_count);
302 });
303 },
304 Err(e) => {
305 debug!("事件接收错误: {}", e);
306 cleanup_tasks(&mut join_set, cleanup_timeout).await;
307 break;
308 },
309 },
310 _ = shutdown_rt.recv() => {
311 let _ = join_set.join_all().await;
312 debug!("事件管理器,接收到关闭信号,正在退出...");
313 break;
314 },
315 shutdown_signal = Box::pin(signal::ctrl_c()) => {
316 match shutdown_signal {
317 Ok(()) => {
318 debug!("事件管理器,接收到关闭信号,正在退出...");
319 cleanup_tasks(&mut join_set, cleanup_timeout).await;
320 break;
321 },
322 Err(e) => {
323 debug!("事件管理器,处理关闭信号时出错: {}", e);
324 cleanup_tasks(&mut join_set, cleanup_timeout).await;
325 break;
326 }
327 }
328 }
329 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
331 while let Some(result) = join_set.try_join_next() {
333 if let Err(e) = result {
334 debug!("事件处理任务错误: {}", e);
335 }
336 }
337 },
338 }
339 }
340 });
341 }
342
343 pub fn new() -> Self {
344 Self::with_config(EventConfig::default())
345 }
346
347 pub fn with_config(config: EventConfig) -> Self {
348 let (tx, rt) = async_channel::bounded(config.max_queue_size);
349 let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
350 Self {
351 tx,
352 rt,
353 event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
354 handler_registry: Arc::new(DashMap::new()),
355 next_handler_id: Arc::new(AtomicU64::new(1)),
356 shutdown: (shutdown_tx, shutdown_rt),
357 config,
358 stats: EventBusStats::default(),
359 }
360 }
361
362 pub fn subscribe(&self) -> Receiver<T> {
363 self.rt.clone()
364 }
365
366 pub async fn broadcast(
367 &self,
368 event: T,
369 ) -> ForgeResult<()> {
370 self.tx.send(event).await.map_err(|e| {
371 error_utils::event_error(format!("广播事件失败: {}", e))
372 })
373 }
374 pub fn broadcast_blocking(
392 &self,
393 event: T,
394 ) -> ForgeResult<()> {
395 self.tx.send_blocking(event).map_err(|e| {
396 error_utils::event_error(format!("广播事件失败: {}", e))
397 })
398 }
399
400 pub fn get_config(&self) -> &EventConfig {
402 &self.config
403 }
404
405 pub fn update_config(&mut self, config: EventConfig) {
407 self.config = config;
408 }
409
410 pub fn get_stats(&self) -> EventBusStats {
412 self.stats.clone()
413 }
414
415 pub fn reset_stats(&self) {
417 self.stats.events_processed.store(0, Ordering::Relaxed);
418 self.stats.processing_failures.store(0, Ordering::Relaxed);
419 self.stats.processing_timeouts.store(0, Ordering::Relaxed);
420 }
422
423 pub fn get_performance_report(&self) -> EventBusPerformanceReport {
425 let stats = &self.stats;
426 EventBusPerformanceReport {
427 total_events_processed: stats.events_processed.load(Ordering::Relaxed),
428 active_handlers_count: stats.active_handlers.load(Ordering::Relaxed),
429 total_processing_failures: stats.processing_failures.load(Ordering::Relaxed),
430 total_processing_timeouts: stats.processing_timeouts.load(Ordering::Relaxed),
431 handler_registry_size: self.handler_registry.len(),
432 success_rate: {
433 let total = stats.events_processed.load(Ordering::Relaxed);
434 let failures = stats.processing_failures.load(Ordering::Relaxed);
435 if total > 0 {
436 ((total - failures) as f64 / total as f64) * 100.0
437 } else {
438 100.0
439 }
440 },
441 }
442 }
443}
444
445#[derive(Debug, Clone)]
447pub struct EventBusPerformanceReport {
448 pub total_events_processed: u64,
450 pub active_handlers_count: u64,
452 pub total_processing_failures: u64,
454 pub total_processing_timeouts: u64,
456 pub handler_registry_size: usize,
458 pub success_rate: f64,
460}
461
462#[async_trait::async_trait]
464pub trait EventHandler<T>: Send + Sync + Debug {
465 async fn handle(
466 &self,
467 event: &T,
468 ) -> ForgeResult<()>;
469}