mf_core/
event.rs

1use std::{
2    fmt::Debug,
3    sync::{
4        Arc,
5        atomic::{AtomicU64, Ordering},
6    },
7};
8
9use async_channel::{Receiver, Sender};
10use mf_state::{state::State, Transaction};
11// 进程信号处理应由应用层负责,不在库层拦截
12use arc_swap::ArcSwap;
13use dashmap::DashMap;
14
15use crate::{
16    config::EventConfig,
17    debug::debug,
18    error::{ForgeResult, error_utils},
19};
20
21// 事件类型定义
22#[derive(Debug, Clone)]
23pub enum Event {
24    Create(Arc<State>),
25    TrApply(u64, Vec<Arc<Transaction>>, Arc<State>), // 事务应用后 + 是否成功
26    Destroy,                                         // 销毁事件
27    Stop,                                            // 停止后需要重启
28}
29
30impl Event {
31    pub fn name(&self) -> &'static str {
32        match self {
33            Event::Create(_) => "Create",
34            Event::TrApply(_, _, _) => "TrApply",
35            Event::Destroy => "Destroy",
36            Event::Stop => "Stop",
37        }
38    }
39}
40
41/// 事件处理器 ID 类型
42pub type HandlerId = u64;
43
44/// 高性能事件总线
45///
46/// 使用以下优化策略:
47/// - ArcSwap 实现无锁读取事件处理器列表
48/// - DashMap 用于快速查找和管理事件处理器
49/// - 原子计数器生成唯一 ID
50/// - 批量事件处理优化
51pub struct EventBus<T: Send + Sync + Clone + 'static> {
52    tx: Sender<T>,
53    rt: Receiver<T>,
54    /// 使用 ArcSwap 实现无锁读取的事件处理器列表
55    event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T> + Send + Sync>>>>,
56    /// 使用 DashMap 快速查找事件处理器
57    handler_registry:
58        Arc<DashMap<HandlerId, Arc<dyn EventHandler<T> + Send + Sync>>>,
59    /// 原子计数器生成唯一 ID
60    next_handler_id: Arc<AtomicU64>,
61    shutdown: (Sender<()>, Receiver<()>),
62    config: EventConfig,
63    /// 事件统计
64    stats: EventBusStats,
65}
66
67/// 事件总线统计信息
68#[derive(Clone, Debug)]
69pub struct EventBusStats {
70    /// 已处理事件总数
71    pub events_processed: Arc<AtomicU64>,
72    /// 当前活跃处理器数量
73    pub active_handlers: Arc<AtomicU64>,
74    /// 事件处理失败次数
75    pub processing_failures: Arc<AtomicU64>,
76    /// 事件处理超时次数
77    pub processing_timeouts: Arc<AtomicU64>,
78}
79
80impl Default for EventBusStats {
81    fn default() -> Self {
82        Self {
83            events_processed: Arc::new(AtomicU64::new(0)),
84            active_handlers: Arc::new(AtomicU64::new(0)),
85            processing_failures: Arc::new(AtomicU64::new(0)),
86            processing_timeouts: Arc::new(AtomicU64::new(0)),
87        }
88    }
89}
90
91impl<T: Send + Sync + Clone + 'static> Default for EventBus<T> {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl<T: Send + Sync + Clone + 'static> Clone for EventBus<T> {
98    fn clone(&self) -> Self {
99        Self {
100            tx: self.tx.clone(),
101            rt: self.rt.clone(),
102            event_handlers: self.event_handlers.clone(),
103            handler_registry: self.handler_registry.clone(),
104            next_handler_id: self.next_handler_id.clone(),
105            shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
106            config: self.config.clone(),
107            stats: self.stats.clone(),
108        }
109    }
110}
111
112impl<T: Send + Sync + Clone + 'static> EventBus<T> {
113    /// 添加事件处理器,返回处理器 ID
114    pub fn add_event_handler(
115        &self,
116        event_handler: Arc<dyn EventHandler<T> + Send + Sync>,
117    ) -> ForgeResult<HandlerId> {
118        let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
119
120        // 添加到注册表
121        self.handler_registry.insert(handler_id, event_handler.clone());
122
123        // 更新处理器列表(无锁操作)
124        self.update_handler_list();
125
126        // 更新统计
127        self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
128
129        Ok(handler_id)
130    }
131
132    /// 批量添加事件处理器
133    pub fn add_event_handlers(
134        &self,
135        event_handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>>,
136    ) -> ForgeResult<Vec<HandlerId>> {
137        let mut handler_ids = Vec::with_capacity(event_handlers.len());
138
139        for handler in event_handlers {
140            let handler_id =
141                self.next_handler_id.fetch_add(1, Ordering::Relaxed);
142            self.handler_registry.insert(handler_id, handler);
143            handler_ids.push(handler_id);
144        }
145
146        // 批量更新处理器列表
147        self.update_handler_list();
148
149        // 更新统计
150        self.stats
151            .active_handlers
152            .fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
153
154        Ok(handler_ids)
155    }
156
157    /// 移除事件处理器
158    pub fn remove_event_handler(
159        &self,
160        handler_id: HandlerId,
161    ) -> ForgeResult<bool> {
162        let removed = self.handler_registry.remove(&handler_id).is_some();
163
164        if removed {
165            self.update_handler_list();
166            self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
167        }
168
169        Ok(removed)
170    }
171
172    /// 批量移除事件处理器
173    pub fn remove_event_handlers(
174        &self,
175        handler_ids: &[HandlerId],
176    ) -> ForgeResult<usize> {
177        let mut removed_count = 0;
178
179        for &handler_id in handler_ids {
180            if self.handler_registry.remove(&handler_id).is_some() {
181                removed_count += 1;
182            }
183        }
184
185        if removed_count > 0 {
186            self.update_handler_list();
187            self.stats
188                .active_handlers
189                .fetch_sub(removed_count as u64, Ordering::Relaxed);
190        }
191
192        Ok(removed_count)
193    }
194
195    /// 更新处理器列表(内部方法)
196    fn update_handler_list(&self) {
197        let handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>> = self
198            .handler_registry
199            .iter()
200            .map(|entry| entry.value().clone())
201            .collect();
202
203        self.event_handlers.store(Arc::new(handlers));
204    }
205
206    /// 获取当前活跃的处理器数量
207    pub fn handler_count(&self) -> usize {
208        self.handler_registry.len()
209    }
210
211    /// 清空所有事件处理器
212    pub fn clear_handlers(&self) -> ForgeResult<()> {
213        self.handler_registry.clear();
214        self.event_handlers.store(Arc::new(Vec::new()));
215        self.stats.active_handlers.store(0, Ordering::Relaxed);
216        Ok(())
217    }
218    /// 异步销毁事件总线
219    pub async fn destroy(&self) -> ForgeResult<()> {
220        self.shutdown.0.send(()).await.map_err(|e| {
221            error_utils::event_error(format!("发送关闭信号失败: {e}"))
222        })
223    }
224
225    /// 同步销毁事件总线(仅在非异步上下文中使用)
226    ///
227    /// ⚠️ 警告:此方法可能阻塞,应优先使用 `destroy()` 异步版本
228    pub fn destroy_blocking(&self) {
229        let _ = self.shutdown.0.send_blocking(());
230    }
231    /// 启动事件循环
232    pub fn start_event_loop(&self) {
233        let rx: async_channel::Receiver<T> = self.subscribe();
234        let event_handlers = self.event_handlers.clone();
235        let shutdown_rt = self.shutdown.1.clone();
236        let config = self.config.clone();
237        let stats = self.stats.clone();
238        tokio::spawn(async move {
239            let mut join_set = tokio::task::JoinSet::new();
240
241            // 定义清理函数,确保所有任务都被正确清理
242            let cleanup_timeout = config.handler_timeout;
243            async fn cleanup_tasks(
244                join_set: &mut tokio::task::JoinSet<()>,
245                timeout: std::time::Duration,
246            ) {
247                debug!("开始清理事件处理任务...");
248                // 首先停止接受新任务
249                join_set.shutdown().await;
250                // 然后等待所有现有任务完成,设置超时防止无限等待
251                match tokio::time::timeout(timeout, async {
252                    while let Some(result) = join_set.join_next().await {
253                        if let Err(e) = result {
254                            debug!("事件处理任务错误: {}", e);
255                        }
256                    }
257                })
258                .await
259                {
260                    Ok(_) => debug!("所有事件处理任务已正常清理"),
261                    Err(_) => debug!("事件处理任务清理超时"),
262                }
263            }
264            loop {
265                tokio::select! {
266                    event = rx.recv() => match event {
267                        Ok(event) => {
268                            // 限制并发任务数量,防止无限制spawning
269                            if join_set.len() >= config.max_concurrent_handlers {
270                                debug!("事件处理任务数量达到上限,等待部分任务完成...");
271                                // 等待至少一个任务完成
272                                if let Some(Err(e)) = join_set.join_next().await {
273                                    debug!("事件处理任务错误: {}", e);
274                                }
275                            }
276
277                            // 无锁读取事件处理器列表
278                            let handlers = event_handlers.load();
279                            let handler_timeout = config.handler_timeout;
280                            let event_stats = stats.clone();
281
282                            // 更新事件处理统计
283                            event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
284
285                            join_set.spawn(async move {
286                                // 为该事件并发执行所有 handler
287                                let mut handler_set = tokio::task::JoinSet::new();
288                                #[allow(clippy::unnecessary_to_owned)]
289                                for handler in handlers.iter().cloned() {
290                                    let event_for_task = event.clone();
291                                    handler_set.spawn(async move {
292                                        // 每个任务持有自己的事件克隆,避免跨任务借用问题
293                                        let e = event_for_task;
294                                        match tokio::time::timeout(handler_timeout, handler.handle(&e)).await {
295                                            Ok(Ok(_)) => (true, false, false),
296                                            Ok(Err(e)) => { debug!("事件处理器执行失败: {}", e); (false, true, false) },
297                                            Err(_) => { debug!("事件处理器执行超时"); (false, false, true) },
298                                        }
299                                    });
300                                }
301
302                                let mut success_count = 0u64;
303                                let mut failure_count = 0u64;
304                                let mut timeout_count = 0u64;
305                                while let Some(res) = handler_set.join_next().await {
306                                    match res {
307                                        Ok((ok, fail, timeout)) => {
308                                            if ok { success_count += 1; }
309                                            if fail { failure_count += 1; }
310                                            if timeout { timeout_count += 1; }
311                                        }
312                                        Err(e) => debug!("事件处理器任务错误: {}", e),
313                                    }
314                                }
315
316                                if failure_count > 0 {
317                                    event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
318                                }
319                                if timeout_count > 0 {
320                                    event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
321                                }
322
323                                debug!("事件处理完成: 成功={}, 失败={}, 超时={}", success_count, failure_count, timeout_count);
324                            });
325                        },
326                        Err(e) => {
327                            debug!("事件接收错误: {}", e);
328                            cleanup_tasks(&mut join_set, cleanup_timeout).await;
329                            break;
330                        },
331                    },
332                    _ = shutdown_rt.recv() => {
333                        // 使用统一清理流程,带超时
334                        cleanup_tasks(&mut join_set, cleanup_timeout).await;
335                        debug!("事件管理器接收到关闭信号,正在退出...");
336                        break;
337                    },
338                    // 定期清理已完成的任务,防止JoinSet无限增长
339                    _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
340                        // 非阻塞地清理已完成的任务
341                        while let Some(result) = join_set.try_join_next() {
342                            if let Err(e) = result {
343                                debug!("事件处理任务错误: {}", e);
344                            }
345                        }
346                    },
347                }
348            }
349        });
350    }
351
352    pub fn new() -> Self {
353        Self::with_config(EventConfig::default())
354    }
355
356    pub fn with_config(config: EventConfig) -> Self {
357        let (tx, rt) = async_channel::bounded(config.max_queue_size);
358        let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
359        Self {
360            tx,
361            rt,
362            event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
363            handler_registry: Arc::new(DashMap::new()),
364            next_handler_id: Arc::new(AtomicU64::new(1)),
365            shutdown: (shutdown_tx, shutdown_rt),
366            config,
367            stats: EventBusStats::default(),
368        }
369    }
370
371    pub fn subscribe(&self) -> Receiver<T> {
372        self.rt.clone()
373    }
374
375    pub async fn broadcast(
376        &self,
377        event: T,
378    ) -> ForgeResult<()> {
379        self.tx
380            .send(event)
381            .await
382            .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
383    }
384    /// 同步广播事件(仅在非异步上下文中使用)
385    ///
386    /// ⚠️ 警告:此方法可能阻塞当前线程,应优先使用 `broadcast()` 异步版本
387    ///
388    /// # 使用场景
389    /// - 在 Drop 实现中
390    /// - 在同步的测试代码中
391    /// - 在非异步的回调函数中
392    ///
393    /// # 示例
394    /// ```rust,no_run
395    /// // 在异步上下文中,优先使用:
396    /// // event_bus.broadcast(event).await?;
397    ///
398    /// // 仅在必要时使用阻塞版本:
399    /// event_bus.broadcast_blocking(event)?;
400    /// ```
401    pub fn broadcast_blocking(
402        &self,
403        event: T,
404    ) -> ForgeResult<()> {
405        self.tx
406            .send_blocking(event)
407            .map_err(|e| error_utils::event_error(format!("广播事件失败: {e}")))
408    }
409
410    /// 获取事件配置
411    pub fn get_config(&self) -> &EventConfig {
412        &self.config
413    }
414
415    /// 更新事件配置(注意:某些配置更改需要重启事件循环才能生效)
416    pub fn update_config(
417        &mut self,
418        config: EventConfig,
419    ) {
420        self.config = config;
421    }
422
423    /// 获取事件总线统计信息
424    pub fn get_stats(&self) -> EventBusStats {
425        self.stats.clone()
426    }
427
428    /// 重置统计信息
429    pub fn reset_stats(&self) {
430        self.stats.events_processed.store(0, Ordering::Relaxed);
431        self.stats.processing_failures.store(0, Ordering::Relaxed);
432        self.stats.processing_timeouts.store(0, Ordering::Relaxed);
433        // 注意:active_handlers 不重置,因为它反映当前状态
434    }
435
436    /// 获取详细的性能报告
437    pub fn get_performance_report(&self) -> EventBusPerformanceReport {
438        let stats = &self.stats;
439        EventBusPerformanceReport {
440            total_events_processed: stats
441                .events_processed
442                .load(Ordering::Relaxed),
443            active_handlers_count: stats
444                .active_handlers
445                .load(Ordering::Relaxed),
446            total_processing_failures: stats
447                .processing_failures
448                .load(Ordering::Relaxed),
449            total_processing_timeouts: stats
450                .processing_timeouts
451                .load(Ordering::Relaxed),
452            handler_registry_size: self.handler_registry.len(),
453            success_rate: {
454                let total = stats.events_processed.load(Ordering::Relaxed);
455                let failures =
456                    stats.processing_failures.load(Ordering::Relaxed);
457                if total > 0 {
458                    ((total - failures) as f64 / total as f64) * 100.0
459                } else {
460                    100.0
461                }
462            },
463        }
464    }
465}
466
467/// 事件总线性能报告
468#[derive(Debug, Clone)]
469pub struct EventBusPerformanceReport {
470    /// 已处理事件总数
471    pub total_events_processed: u64,
472    /// 当前活跃处理器数量
473    pub active_handlers_count: u64,
474    /// 处理失败总数
475    pub total_processing_failures: u64,
476    /// 处理超时总数
477    pub total_processing_timeouts: u64,
478    /// 处理器注册表大小
479    pub handler_registry_size: usize,
480    /// 成功率(百分比)
481    pub success_rate: f64,
482}
483
484// 事件处理器特征
485#[async_trait::async_trait]
486pub trait EventHandler<T>: Send + Sync + Debug {
487    async fn handle(
488        &self,
489        event: &T,
490    ) -> ForgeResult<()>;
491}