Skip to main content

mf_core/
event.rs

1use std::{
2    fmt::Debug,
3    sync::{
4        Arc,
5        atomic::{AtomicU64, Ordering},
6    },
7};
8
9use async_channel::{Receiver, Sender};
10use mf_state::{state::State, Transaction};
11// 进程信号处理应由应用层负责,不在库层拦截
12use arc_swap::ArcSwap;
13use dashmap::DashMap;
14
15use crate::{
16    config::EventConfig,
17    debug::debug,
18    error::{ForgeResult, error_utils},
19};
20
21// 事件类型定义
22#[derive(Debug, Clone)]
23pub enum Event {
24    /// 状态创建事件
25    Create(Arc<State>),
26
27    /// 事务应用事件 (old_state, new_state, transactions)
28    /// 统一使用新旧状态模式,与 Undo/Redo 保持一致
29    TrApply {
30        old_state: Arc<State>,
31        new_state: Arc<State>,
32        transactions: Vec<Arc<Transaction>>,
33    },
34
35    /// 撤销事件 (old_state, new_state, undone_transactions)
36    /// 包含被撤销的事务列表,供其他组件(如搜索索引)使用
37    Undo {
38        old_state: Arc<State>,
39        new_state: Arc<State>,
40        transactions: Vec<Arc<Transaction>>,
41    },
42
43    /// 重做事件 (old_state, new_state, redone_transactions)
44    /// 包含重做的事务列表,供其他组件(如搜索索引)使用
45    Redo {
46        old_state: Arc<State>,
47        new_state: Arc<State>,
48        transactions: Vec<Arc<Transaction>>,
49    },
50
51    /// 历史跳转事件 (old_state, new_state, transactions, steps)
52    /// 当用户跳转到历史中的特定位置时触发
53    /// transactions 包含跳转过程中所有被影响的事务
54    Jump {
55        old_state: Arc<State>,
56        new_state: Arc<State>,
57        transactions: Vec<Arc<Transaction>>,
58        steps: isize,
59    },
60
61    /// 事务失败事件
62    /// 当事务应用失败时触发,供错误处理和日志记录使用
63    TrFailed { state: Arc<State>, transaction: Transaction, error: String },
64
65    /// 历史清空事件
66    /// 当历史记录被清空时触发
67    HistoryCleared,
68
69    /// 销毁事件
70    Destroy,
71
72    /// 停止事件(需要重启)
73    Stop,
74}
75
76impl Event {
77    pub fn name(&self) -> &'static str {
78        match self {
79            Event::Create(_) => "Create",
80            Event::TrApply { .. } => "TrApply",
81            Event::Undo { .. } => "Undo",
82            Event::Redo { .. } => "Redo",
83            Event::Jump { .. } => "Jump",
84            Event::TrFailed { .. } => "TrFailed",
85            Event::HistoryCleared => "HistoryCleared",
86            Event::Destroy => "Destroy",
87            Event::Stop => "Stop",
88        }
89    }
90}
91
92/// 事件处理器 ID 类型
93pub type HandlerId = u64;
94
95/// 高性能事件总线
96///
97/// 使用以下优化策略:
98/// - ArcSwap 实现无锁读取事件处理器列表
99/// - DashMap 用于快速查找和管理事件处理器
100/// - 原子计数器生成唯一 ID
101/// - 批量事件处理优化
102pub struct EventBus<T: Send + Sync + Clone + 'static> {
103    tx: Sender<T>,
104    rt: Receiver<T>,
105    /// 使用 ArcSwap 实现无锁读取的事件处理器列表
106    event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T> + Send + Sync>>>>,
107    /// 使用 DashMap 快速查找事件处理器
108    handler_registry:
109        Arc<DashMap<HandlerId, Arc<dyn EventHandler<T> + Send + Sync>>>,
110    /// 原子计数器生成唯一 ID
111    next_handler_id: Arc<AtomicU64>,
112    shutdown: (Sender<()>, Receiver<()>),
113    config: EventConfig,
114    /// 事件统计
115    stats: EventBusStats,
116}
117
118/// 事件总线统计信息
119#[derive(Clone, Debug)]
120pub struct EventBusStats {
121    /// 已处理事件总数
122    pub events_processed: Arc<AtomicU64>,
123    /// 当前活跃处理器数量
124    pub active_handlers: Arc<AtomicU64>,
125    /// 事件处理失败次数
126    pub processing_failures: Arc<AtomicU64>,
127    /// 事件处理超时次数
128    pub processing_timeouts: Arc<AtomicU64>,
129}
130
131impl Default for EventBusStats {
132    fn default() -> Self {
133        Self {
134            events_processed: Arc::new(AtomicU64::new(0)),
135            active_handlers: Arc::new(AtomicU64::new(0)),
136            processing_failures: Arc::new(AtomicU64::new(0)),
137            processing_timeouts: Arc::new(AtomicU64::new(0)),
138        }
139    }
140}
141
142impl<T: Send + Sync + Clone + 'static> Default for EventBus<T> {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl<T: Send + Sync + Clone + 'static> Clone for EventBus<T> {
149    fn clone(&self) -> Self {
150        Self {
151            tx: self.tx.clone(),
152            rt: self.rt.clone(),
153            event_handlers: self.event_handlers.clone(),
154            handler_registry: self.handler_registry.clone(),
155            next_handler_id: self.next_handler_id.clone(),
156            shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
157            config: self.config.clone(),
158            stats: self.stats.clone(),
159        }
160    }
161}
162
163impl<T: Send + Sync + Clone + 'static> EventBus<T> {
164    /// 添加事件处理器,返回处理器 ID
165    pub fn add_event_handler(
166        &self,
167        event_handler: Arc<dyn EventHandler<T> + Send + Sync>,
168    ) -> ForgeResult<HandlerId> {
169        let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
170
171        // 添加到注册表
172        self.handler_registry.insert(handler_id, event_handler.clone());
173
174        // 更新处理器列表(无锁操作)
175        self.update_handler_list();
176
177        // 更新统计
178        self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
179
180        Ok(handler_id)
181    }
182
183    /// 批量添加事件处理器
184    pub fn add_event_handlers(
185        &self,
186        event_handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>>,
187    ) -> ForgeResult<Vec<HandlerId>> {
188        let mut handler_ids = Vec::with_capacity(event_handlers.len());
189
190        for handler in event_handlers {
191            let handler_id =
192                self.next_handler_id.fetch_add(1, Ordering::Relaxed);
193            self.handler_registry.insert(handler_id, handler);
194            handler_ids.push(handler_id);
195        }
196
197        // 批量更新处理器列表
198        self.update_handler_list();
199
200        // 更新统计
201        self.stats
202            .active_handlers
203            .fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
204
205        Ok(handler_ids)
206    }
207
208    /// 移除事件处理器
209    pub fn remove_event_handler(
210        &self,
211        handler_id: HandlerId,
212    ) -> ForgeResult<bool> {
213        let removed = self.handler_registry.remove(&handler_id).is_some();
214
215        if removed {
216            self.update_handler_list();
217            self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
218        }
219
220        Ok(removed)
221    }
222
223    /// 批量移除事件处理器
224    pub fn remove_event_handlers(
225        &self,
226        handler_ids: &[HandlerId],
227    ) -> ForgeResult<usize> {
228        let mut removed_count = 0;
229
230        for &handler_id in handler_ids {
231            if self.handler_registry.remove(&handler_id).is_some() {
232                removed_count += 1;
233            }
234        }
235
236        if removed_count > 0 {
237            self.update_handler_list();
238            self.stats
239                .active_handlers
240                .fetch_sub(removed_count as u64, Ordering::Relaxed);
241        }
242
243        Ok(removed_count)
244    }
245
246    /// 更新处理器列表(内部方法)
247    fn update_handler_list(&self) {
248        let handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>> = self
249            .handler_registry
250            .iter()
251            .map(|entry| entry.value().clone())
252            .collect();
253
254        self.event_handlers.store(Arc::new(handlers));
255    }
256
257    /// 获取当前活跃的处理器数量
258    pub fn handler_count(&self) -> usize {
259        self.handler_registry.len()
260    }
261
262    /// 清空所有事件处理器
263    pub fn clear_handlers(&self) -> ForgeResult<()> {
264        self.handler_registry.clear();
265        self.event_handlers.store(Arc::new(Vec::new()));
266        self.stats.active_handlers.store(0, Ordering::Relaxed);
267        Ok(())
268    }
269    /// 异步销毁事件总线
270    pub async fn destroy(&self) -> ForgeResult<()> {
271        self.shutdown.0.send(()).await.map_err(|e| {
272            error_utils::event_error(format!("发送关闭信号失败: {e}"))
273        })
274    }
275
276    /// 同步销毁事件总线(仅在非异步上下文中使用)
277    ///
278    /// ⚠️ 警告:此方法可能阻塞,应优先使用 `destroy()` 异步版本
279    pub fn destroy_blocking(&self) {
280        let _ = self.shutdown.0.send_blocking(());
281    }
282    /// 启动事件循环
283    pub fn start_event_loop(&self) {
284        let rx: async_channel::Receiver<T> = self.subscribe();
285        let event_handlers = self.event_handlers.clone();
286        let shutdown_rt = self.shutdown.1.clone();
287        let config = self.config.clone();
288        let stats = self.stats.clone();
289        tokio::spawn(async move {
290            let mut join_set = tokio::task::JoinSet::new();
291
292            // 定义清理函数,确保所有任务都被正确清理
293            let cleanup_timeout = config.handler_timeout;
294            async fn cleanup_tasks(
295                join_set: &mut tokio::task::JoinSet<()>,
296                timeout: std::time::Duration,
297            ) {
298                debug!("开始清理事件处理任务...");
299                // 首先停止接受新任务
300                join_set.shutdown().await;
301                // 然后等待所有现有任务完成,设置超时防止无限等待
302                match tokio::time::timeout(timeout, async {
303                    while let Some(result) = join_set.join_next().await {
304                        if let Err(e) = result {
305                            debug!("事件处理任务错误: {}", e);
306                        }
307                    }
308                })
309                .await
310                {
311                    Ok(_) => debug!("所有事件处理任务已正常清理"),
312                    Err(_) => debug!("事件处理任务清理超时"),
313                }
314            }
315            loop {
316                tokio::select! {
317                    event = rx.recv() => match event {
318                        Ok(event) => {
319                            // 限制并发任务数量,防止无限制spawning
320                            if join_set.len() >= config.max_concurrent_handlers {
321                                debug!("事件处理任务数量达到上限,等待部分任务完成...");
322                                // 等待至少一个任务完成
323                                if let Some(Err(e)) = join_set.join_next().await {
324                                    debug!("事件处理任务错误: {}", e);
325                                }
326                            }
327
328                            // 无锁读取事件处理器列表
329                            let handlers = event_handlers.load();
330                            let handler_timeout = config.handler_timeout;
331                            let event_stats = stats.clone();
332
333                            // 更新事件处理统计
334                            event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
335
336                            join_set.spawn(async move {
337                                // 为该事件并发执行所有 handler
338                                let mut handler_set = tokio::task::JoinSet::new();
339                                #[allow(clippy::unnecessary_to_owned)]
340                                for handler in handlers.iter().cloned() {
341                                    let event_for_task = event.clone();
342                                    handler_set.spawn(async move {
343                                        // 每个任务持有自己的事件克隆,避免跨任务借用问题
344                                        let e = event_for_task;
345                                        match tokio::time::timeout(handler_timeout, handler.handle(&e)).await {
346                                            Ok(Ok(_)) => (true, false, false),
347                                            Ok(Err(e)) => { debug!("事件处理器执行失败: {}", e); (false, true, false) },
348                                            Err(_) => { debug!("事件处理器执行超时"); (false, false, true) },
349                                        }
350                                    });
351                                }
352
353                                let mut success_count = 0u64;
354                                let mut failure_count = 0u64;
355                                let mut timeout_count = 0u64;
356                                while let Some(res) = handler_set.join_next().await {
357                                    match res {
358                                        Ok((ok, fail, timeout)) => {
359                                            if ok { success_count += 1; }
360                                            if fail { failure_count += 1; }
361                                            if timeout { timeout_count += 1; }
362                                        }
363                                        Err(e) => debug!("事件处理器任务错误: {}", e),
364                                    }
365                                }
366
367                                if failure_count > 0 {
368                                    event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
369                                }
370                                if timeout_count > 0 {
371                                    event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
372                                }
373
374                                debug!("事件处理完成: 成功={}, 失败={}, 超时={}", success_count, failure_count, timeout_count);
375                            });
376                        },
377                        Err(e) => {
378                            debug!("事件接收错误: {}", e);
379                            cleanup_tasks(&mut join_set, cleanup_timeout).await;
380                            break;
381                        },
382                    },
383                    _ = shutdown_rt.recv() => {
384                        // 使用统一清理流程,带超时
385                        cleanup_tasks(&mut join_set, cleanup_timeout).await;
386                        debug!("事件管理器接收到关闭信号,正在退出...");
387                        break;
388                    },
389                    // 定期清理已完成的任务,防止JoinSet无限增长
390                    _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
391                        // 非阻塞地清理已完成的任务
392                        while let Some(result) = join_set.try_join_next() {
393                            if let Err(e) = result {
394                                debug!("事件处理任务错误: {}", e);
395                            }
396                        }
397                    },
398                }
399            }
400        });
401    }
402
403    pub fn new() -> Self {
404        Self::with_config(EventConfig::default())
405    }
406
407    pub fn with_config(config: EventConfig) -> Self {
408        let (tx, rt) = async_channel::bounded(config.max_queue_size);
409        let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
410        Self {
411            tx,
412            rt,
413            event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
414            handler_registry: Arc::new(DashMap::new()),
415            next_handler_id: Arc::new(AtomicU64::new(1)),
416            shutdown: (shutdown_tx, shutdown_rt),
417            config,
418            stats: EventBusStats::default(),
419        }
420    }
421
422    pub fn subscribe(&self) -> Receiver<T> {
423        self.rt.clone()
424    }
425
426    pub async fn broadcast(
427        &self,
428        event: T,
429    ) -> ForgeResult<()> {
430        self.tx
431            .send(event)
432            .await
433            .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
434    }
435    /// 同步广播事件(仅在非异步上下文中使用)
436    ///
437    /// ⚠️ 警告:此方法可能阻塞当前线程,应优先使用 `broadcast()` 异步版本
438    ///
439    /// # 使用场景
440    /// - 在 Drop 实现中
441    /// - 在同步的测试代码中
442    /// - 在非异步的回调函数中
443    ///
444    /// # 示例
445    /// ```rust,no_run
446    /// // 在异步上下文中,优先使用:
447    /// // event_bus.broadcast(event).await?;
448    ///
449    /// // 仅在必要时使用阻塞版本:
450    /// event_bus.broadcast_blocking(event)?;
451    /// ```
452    pub fn broadcast_blocking(
453        &self,
454        event: T,
455    ) -> ForgeResult<()> {
456        self.tx
457            .send_blocking(event)
458            .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
459    }
460
461    /// 获取事件配置
462    pub fn get_config(&self) -> &EventConfig {
463        &self.config
464    }
465
466    /// 更新事件配置(注意:某些配置更改需要重启事件循环才能生效)
467    pub fn update_config(
468        &mut self,
469        config: EventConfig,
470    ) {
471        self.config = config;
472    }
473
474    /// 获取事件总线统计信息
475    pub fn get_stats(&self) -> EventBusStats {
476        self.stats.clone()
477    }
478
479    /// 重置统计信息
480    pub fn reset_stats(&self) {
481        self.stats.events_processed.store(0, Ordering::Relaxed);
482        self.stats.processing_failures.store(0, Ordering::Relaxed);
483        self.stats.processing_timeouts.store(0, Ordering::Relaxed);
484        // 注意:active_handlers 不重置,因为它反映当前状态
485    }
486
487    /// 获取详细的性能报告
488    pub fn get_performance_report(&self) -> EventBusPerformanceReport {
489        let stats = &self.stats;
490        EventBusPerformanceReport {
491            total_events_processed: stats
492                .events_processed
493                .load(Ordering::Relaxed),
494            active_handlers_count: stats
495                .active_handlers
496                .load(Ordering::Relaxed),
497            total_processing_failures: stats
498                .processing_failures
499                .load(Ordering::Relaxed),
500            total_processing_timeouts: stats
501                .processing_timeouts
502                .load(Ordering::Relaxed),
503            handler_registry_size: self.handler_registry.len(),
504            success_rate: {
505                let total = stats.events_processed.load(Ordering::Relaxed);
506                let failures =
507                    stats.processing_failures.load(Ordering::Relaxed);
508                if total > 0 {
509                    ((total - failures) as f64 / total as f64) * 100.0
510                } else {
511                    100.0
512                }
513            },
514        }
515    }
516}
517
518/// 事件总线性能报告
519#[derive(Debug, Clone)]
520pub struct EventBusPerformanceReport {
521    /// 已处理事件总数
522    pub total_events_processed: u64,
523    /// 当前活跃处理器数量
524    pub active_handlers_count: u64,
525    /// 处理失败总数
526    pub total_processing_failures: u64,
527    /// 处理超时总数
528    pub total_processing_timeouts: u64,
529    /// 处理器注册表大小
530    pub handler_registry_size: usize,
531    /// 成功率(百分比)
532    pub success_rate: f64,
533}
534
535// 事件处理器特征
536#[async_trait::async_trait]
537pub trait EventHandler<T>: Send + Sync + Debug {
538    async fn handle(
539        &self,
540        event: &T,
541    ) -> ForgeResult<()>;
542}