1use std::{
2 fmt::Debug,
3 sync::{
4 Arc,
5 atomic::{AtomicU64, Ordering},
6 },
7};
8
9use async_channel::{Receiver, Sender};
10use mf_state::{state::State, Transaction};
11use arc_swap::ArcSwap;
13use dashmap::DashMap;
14
15use crate::{
16 config::EventConfig,
17 debug::debug,
18 error::{ForgeResult, error_utils},
19};
20
21#[derive(Debug, Clone)]
23pub enum Event {
24 Create(Arc<State>),
26
27 TrApply {
30 old_state: Arc<State>,
31 new_state: Arc<State>,
32 transactions: Vec<Arc<Transaction>>,
33 },
34
35 Undo {
38 old_state: Arc<State>,
39 new_state: Arc<State>,
40 transactions: Vec<Arc<Transaction>>,
41 },
42
43 Redo {
46 old_state: Arc<State>,
47 new_state: Arc<State>,
48 transactions: Vec<Arc<Transaction>>,
49 },
50
51 Jump {
55 old_state: Arc<State>,
56 new_state: Arc<State>,
57 transactions: Vec<Arc<Transaction>>,
58 steps: isize,
59 },
60
61 TrFailed { state: Arc<State>, transaction: Transaction, error: String },
64
65 HistoryCleared,
68
69 Destroy,
71
72 Stop,
74}
75
76impl Event {
77 pub fn name(&self) -> &'static str {
78 match self {
79 Event::Create(_) => "Create",
80 Event::TrApply { .. } => "TrApply",
81 Event::Undo { .. } => "Undo",
82 Event::Redo { .. } => "Redo",
83 Event::Jump { .. } => "Jump",
84 Event::TrFailed { .. } => "TrFailed",
85 Event::HistoryCleared => "HistoryCleared",
86 Event::Destroy => "Destroy",
87 Event::Stop => "Stop",
88 }
89 }
90}
91
92pub type HandlerId = u64;
94
95pub struct EventBus<T: Send + Sync + Clone + 'static> {
103 tx: Sender<T>,
104 rt: Receiver<T>,
105 event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T> + Send + Sync>>>>,
107 handler_registry:
109 Arc<DashMap<HandlerId, Arc<dyn EventHandler<T> + Send + Sync>>>,
110 next_handler_id: Arc<AtomicU64>,
112 shutdown: (Sender<()>, Receiver<()>),
113 config: EventConfig,
114 stats: EventBusStats,
116}
117
118#[derive(Clone, Debug)]
120pub struct EventBusStats {
121 pub events_processed: Arc<AtomicU64>,
123 pub active_handlers: Arc<AtomicU64>,
125 pub processing_failures: Arc<AtomicU64>,
127 pub processing_timeouts: Arc<AtomicU64>,
129}
130
131impl Default for EventBusStats {
132 fn default() -> Self {
133 Self {
134 events_processed: Arc::new(AtomicU64::new(0)),
135 active_handlers: Arc::new(AtomicU64::new(0)),
136 processing_failures: Arc::new(AtomicU64::new(0)),
137 processing_timeouts: Arc::new(AtomicU64::new(0)),
138 }
139 }
140}
141
142impl<T: Send + Sync + Clone + 'static> Default for EventBus<T> {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl<T: Send + Sync + Clone + 'static> Clone for EventBus<T> {
149 fn clone(&self) -> Self {
150 Self {
151 tx: self.tx.clone(),
152 rt: self.rt.clone(),
153 event_handlers: self.event_handlers.clone(),
154 handler_registry: self.handler_registry.clone(),
155 next_handler_id: self.next_handler_id.clone(),
156 shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
157 config: self.config.clone(),
158 stats: self.stats.clone(),
159 }
160 }
161}
162
163impl<T: Send + Sync + Clone + 'static> EventBus<T> {
164 pub fn add_event_handler(
166 &self,
167 event_handler: Arc<dyn EventHandler<T> + Send + Sync>,
168 ) -> ForgeResult<HandlerId> {
169 let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
170
171 self.handler_registry.insert(handler_id, event_handler.clone());
173
174 self.update_handler_list();
176
177 self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
179
180 Ok(handler_id)
181 }
182
183 pub fn add_event_handlers(
185 &self,
186 event_handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>>,
187 ) -> ForgeResult<Vec<HandlerId>> {
188 let mut handler_ids = Vec::with_capacity(event_handlers.len());
189
190 for handler in event_handlers {
191 let handler_id =
192 self.next_handler_id.fetch_add(1, Ordering::Relaxed);
193 self.handler_registry.insert(handler_id, handler);
194 handler_ids.push(handler_id);
195 }
196
197 self.update_handler_list();
199
200 self.stats
202 .active_handlers
203 .fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
204
205 Ok(handler_ids)
206 }
207
208 pub fn remove_event_handler(
210 &self,
211 handler_id: HandlerId,
212 ) -> ForgeResult<bool> {
213 let removed = self.handler_registry.remove(&handler_id).is_some();
214
215 if removed {
216 self.update_handler_list();
217 self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
218 }
219
220 Ok(removed)
221 }
222
223 pub fn remove_event_handlers(
225 &self,
226 handler_ids: &[HandlerId],
227 ) -> ForgeResult<usize> {
228 let mut removed_count = 0;
229
230 for &handler_id in handler_ids {
231 if self.handler_registry.remove(&handler_id).is_some() {
232 removed_count += 1;
233 }
234 }
235
236 if removed_count > 0 {
237 self.update_handler_list();
238 self.stats
239 .active_handlers
240 .fetch_sub(removed_count as u64, Ordering::Relaxed);
241 }
242
243 Ok(removed_count)
244 }
245
246 fn update_handler_list(&self) {
248 let handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>> = self
249 .handler_registry
250 .iter()
251 .map(|entry| entry.value().clone())
252 .collect();
253
254 self.event_handlers.store(Arc::new(handlers));
255 }
256
257 pub fn handler_count(&self) -> usize {
259 self.handler_registry.len()
260 }
261
262 pub fn clear_handlers(&self) -> ForgeResult<()> {
264 self.handler_registry.clear();
265 self.event_handlers.store(Arc::new(Vec::new()));
266 self.stats.active_handlers.store(0, Ordering::Relaxed);
267 Ok(())
268 }
269 pub async fn destroy(&self) -> ForgeResult<()> {
271 self.shutdown.0.send(()).await.map_err(|e| {
272 error_utils::event_error(format!("发送关闭信号失败: {e}"))
273 })
274 }
275
276 pub fn destroy_blocking(&self) {
280 let _ = self.shutdown.0.send_blocking(());
281 }
282 pub fn start_event_loop(&self) {
284 let rx: async_channel::Receiver<T> = self.subscribe();
285 let event_handlers = self.event_handlers.clone();
286 let shutdown_rt = self.shutdown.1.clone();
287 let config = self.config.clone();
288 let stats = self.stats.clone();
289 tokio::spawn(async move {
290 let mut join_set = tokio::task::JoinSet::new();
291
292 let cleanup_timeout = config.handler_timeout;
294 async fn cleanup_tasks(
295 join_set: &mut tokio::task::JoinSet<()>,
296 timeout: std::time::Duration,
297 ) {
298 debug!("开始清理事件处理任务...");
299 join_set.shutdown().await;
301 match tokio::time::timeout(timeout, async {
303 while let Some(result) = join_set.join_next().await {
304 if let Err(e) = result {
305 debug!("事件处理任务错误: {}", e);
306 }
307 }
308 })
309 .await
310 {
311 Ok(_) => debug!("所有事件处理任务已正常清理"),
312 Err(_) => debug!("事件处理任务清理超时"),
313 }
314 }
315 loop {
316 tokio::select! {
317 event = rx.recv() => match event {
318 Ok(event) => {
319 if join_set.len() >= config.max_concurrent_handlers {
321 debug!("事件处理任务数量达到上限,等待部分任务完成...");
322 if let Some(Err(e)) = join_set.join_next().await {
324 debug!("事件处理任务错误: {}", e);
325 }
326 }
327
328 let handlers = event_handlers.load();
330 let handler_timeout = config.handler_timeout;
331 let event_stats = stats.clone();
332
333 event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
335
336 join_set.spawn(async move {
337 let mut handler_set = tokio::task::JoinSet::new();
339 #[allow(clippy::unnecessary_to_owned)]
340 for handler in handlers.iter().cloned() {
341 let event_for_task = event.clone();
342 handler_set.spawn(async move {
343 let e = event_for_task;
345 match tokio::time::timeout(handler_timeout, handler.handle(&e)).await {
346 Ok(Ok(_)) => (true, false, false),
347 Ok(Err(e)) => { debug!("事件处理器执行失败: {}", e); (false, true, false) },
348 Err(_) => { debug!("事件处理器执行超时"); (false, false, true) },
349 }
350 });
351 }
352
353 let mut success_count = 0u64;
354 let mut failure_count = 0u64;
355 let mut timeout_count = 0u64;
356 while let Some(res) = handler_set.join_next().await {
357 match res {
358 Ok((ok, fail, timeout)) => {
359 if ok { success_count += 1; }
360 if fail { failure_count += 1; }
361 if timeout { timeout_count += 1; }
362 }
363 Err(e) => debug!("事件处理器任务错误: {}", e),
364 }
365 }
366
367 if failure_count > 0 {
368 event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
369 }
370 if timeout_count > 0 {
371 event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
372 }
373
374 debug!("事件处理完成: 成功={}, 失败={}, 超时={}", success_count, failure_count, timeout_count);
375 });
376 },
377 Err(e) => {
378 debug!("事件接收错误: {}", e);
379 cleanup_tasks(&mut join_set, cleanup_timeout).await;
380 break;
381 },
382 },
383 _ = shutdown_rt.recv() => {
384 cleanup_tasks(&mut join_set, cleanup_timeout).await;
386 debug!("事件管理器接收到关闭信号,正在退出...");
387 break;
388 },
389 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
391 while let Some(result) = join_set.try_join_next() {
393 if let Err(e) = result {
394 debug!("事件处理任务错误: {}", e);
395 }
396 }
397 },
398 }
399 }
400 });
401 }
402
403 pub fn new() -> Self {
404 Self::with_config(EventConfig::default())
405 }
406
407 pub fn with_config(config: EventConfig) -> Self {
408 let (tx, rt) = async_channel::bounded(config.max_queue_size);
409 let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
410 Self {
411 tx,
412 rt,
413 event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
414 handler_registry: Arc::new(DashMap::new()),
415 next_handler_id: Arc::new(AtomicU64::new(1)),
416 shutdown: (shutdown_tx, shutdown_rt),
417 config,
418 stats: EventBusStats::default(),
419 }
420 }
421
422 pub fn subscribe(&self) -> Receiver<T> {
423 self.rt.clone()
424 }
425
426 pub async fn broadcast(
427 &self,
428 event: T,
429 ) -> ForgeResult<()> {
430 self.tx
431 .send(event)
432 .await
433 .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
434 }
435 pub fn broadcast_blocking(
453 &self,
454 event: T,
455 ) -> ForgeResult<()> {
456 self.tx
457 .send_blocking(event)
458 .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
459 }
460
461 pub fn get_config(&self) -> &EventConfig {
463 &self.config
464 }
465
466 pub fn update_config(
468 &mut self,
469 config: EventConfig,
470 ) {
471 self.config = config;
472 }
473
474 pub fn get_stats(&self) -> EventBusStats {
476 self.stats.clone()
477 }
478
479 pub fn reset_stats(&self) {
481 self.stats.events_processed.store(0, Ordering::Relaxed);
482 self.stats.processing_failures.store(0, Ordering::Relaxed);
483 self.stats.processing_timeouts.store(0, Ordering::Relaxed);
484 }
486
487 pub fn get_performance_report(&self) -> EventBusPerformanceReport {
489 let stats = &self.stats;
490 EventBusPerformanceReport {
491 total_events_processed: stats
492 .events_processed
493 .load(Ordering::Relaxed),
494 active_handlers_count: stats
495 .active_handlers
496 .load(Ordering::Relaxed),
497 total_processing_failures: stats
498 .processing_failures
499 .load(Ordering::Relaxed),
500 total_processing_timeouts: stats
501 .processing_timeouts
502 .load(Ordering::Relaxed),
503 handler_registry_size: self.handler_registry.len(),
504 success_rate: {
505 let total = stats.events_processed.load(Ordering::Relaxed);
506 let failures =
507 stats.processing_failures.load(Ordering::Relaxed);
508 if total > 0 {
509 ((total - failures) as f64 / total as f64) * 100.0
510 } else {
511 100.0
512 }
513 },
514 }
515 }
516}
517
518#[derive(Debug, Clone)]
520pub struct EventBusPerformanceReport {
521 pub total_events_processed: u64,
523 pub active_handlers_count: u64,
525 pub total_processing_failures: u64,
527 pub total_processing_timeouts: u64,
529 pub handler_registry_size: usize,
531 pub success_rate: f64,
533}
534
535#[async_trait::async_trait]
537pub trait EventHandler<T>: Send + Sync + Debug {
538 async fn handle(
539 &self,
540 event: &T,
541 ) -> ForgeResult<()>;
542}