mf_core/
event.rs

1use std::{
2    fmt::Debug,
3    sync::{
4        Arc,
5        atomic::{AtomicU64, Ordering},
6    },
7};
8
9use async_channel::{Receiver, Sender};
10use mf_state::{debug, state::State, Transaction};
11// 进程信号处理应由应用层负责,不在库层拦截
12use arc_swap::ArcSwap;
13use dashmap::DashMap;
14
15use crate::{
16    config::EventConfig,
17    error::{ForgeResult, error_utils},
18};
19
20// 事件类型定义
21#[derive(Clone)]
22pub enum Event {
23    Create(Arc<State>),
24    TrApply(u64, Arc<Vec<Transaction>>, Arc<State>), // 事务应用后 + 是否成功
25    Destroy,                                         // 销毁事件
26    Stop,                                            // 停止后需要重启
27}
28
29impl Event {
30    pub fn name(&self) -> &'static str {
31        match self {
32            Event::Create(_) => "Create",
33            Event::TrApply(_, _, _) => "TrApply",
34            Event::Destroy => "Destroy",
35            Event::Stop => "Stop",
36        }
37    }
38}
39
40/// 事件处理器 ID 类型
41pub type HandlerId = u64;
42
43/// 高性能事件总线
44///
45/// 使用以下优化策略:
46/// - ArcSwap 实现无锁读取事件处理器列表
47/// - DashMap 用于快速查找和管理事件处理器
48/// - 原子计数器生成唯一 ID
49/// - 批量事件处理优化
50pub struct EventBus<T: Send + Sync + Clone + 'static> {
51    tx: Sender<T>,
52    rt: Receiver<T>,
53    /// 使用 ArcSwap 实现无锁读取的事件处理器列表
54    event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T> + Send + Sync>>>>,
55    /// 使用 DashMap 快速查找事件处理器
56    handler_registry:
57        Arc<DashMap<HandlerId, Arc<dyn EventHandler<T> + Send + Sync>>>,
58    /// 原子计数器生成唯一 ID
59    next_handler_id: Arc<AtomicU64>,
60    shutdown: (Sender<()>, Receiver<()>),
61    config: EventConfig,
62    /// 事件统计
63    stats: EventBusStats,
64}
65
66/// 事件总线统计信息
67#[derive(Clone, Debug)]
68pub struct EventBusStats {
69    /// 已处理事件总数
70    pub events_processed: Arc<AtomicU64>,
71    /// 当前活跃处理器数量
72    pub active_handlers: Arc<AtomicU64>,
73    /// 事件处理失败次数
74    pub processing_failures: Arc<AtomicU64>,
75    /// 事件处理超时次数
76    pub processing_timeouts: Arc<AtomicU64>,
77}
78
79impl Default for EventBusStats {
80    fn default() -> Self {
81        Self {
82            events_processed: Arc::new(AtomicU64::new(0)),
83            active_handlers: Arc::new(AtomicU64::new(0)),
84            processing_failures: Arc::new(AtomicU64::new(0)),
85            processing_timeouts: Arc::new(AtomicU64::new(0)),
86        }
87    }
88}
89
90impl<T: Send + Sync + Clone + 'static> Default for EventBus<T> {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96impl<T: Send + Sync + Clone + 'static> Clone for EventBus<T> {
97    fn clone(&self) -> Self {
98        Self {
99            tx: self.tx.clone(),
100            rt: self.rt.clone(),
101            event_handlers: self.event_handlers.clone(),
102            handler_registry: self.handler_registry.clone(),
103            next_handler_id: self.next_handler_id.clone(),
104            shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
105            config: self.config.clone(),
106            stats: self.stats.clone(),
107        }
108    }
109}
110
111impl<T: Send + Sync + Clone + 'static> EventBus<T> {
112    /// 添加事件处理器,返回处理器 ID
113    pub fn add_event_handler(
114        &self,
115        event_handler: Arc<dyn EventHandler<T> + Send + Sync>,
116    ) -> ForgeResult<HandlerId> {
117        let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
118
119        // 添加到注册表
120        self.handler_registry.insert(handler_id, event_handler.clone());
121
122        // 更新处理器列表(无锁操作)
123        self.update_handler_list();
124
125        // 更新统计
126        self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
127
128        Ok(handler_id)
129    }
130
131    /// 批量添加事件处理器
132    pub fn add_event_handlers(
133        &self,
134        event_handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>>,
135    ) -> ForgeResult<Vec<HandlerId>> {
136        let mut handler_ids = Vec::with_capacity(event_handlers.len());
137
138        for handler in event_handlers {
139            let handler_id =
140                self.next_handler_id.fetch_add(1, Ordering::Relaxed);
141            self.handler_registry.insert(handler_id, handler);
142            handler_ids.push(handler_id);
143        }
144
145        // 批量更新处理器列表
146        self.update_handler_list();
147
148        // 更新统计
149        self.stats
150            .active_handlers
151            .fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
152
153        Ok(handler_ids)
154    }
155
156    /// 移除事件处理器
157    pub fn remove_event_handler(
158        &self,
159        handler_id: HandlerId,
160    ) -> ForgeResult<bool> {
161        let removed = self.handler_registry.remove(&handler_id).is_some();
162
163        if removed {
164            self.update_handler_list();
165            self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
166        }
167
168        Ok(removed)
169    }
170
171    /// 批量移除事件处理器
172    pub fn remove_event_handlers(
173        &self,
174        handler_ids: &[HandlerId],
175    ) -> ForgeResult<usize> {
176        let mut removed_count = 0;
177
178        for &handler_id in handler_ids {
179            if self.handler_registry.remove(&handler_id).is_some() {
180                removed_count += 1;
181            }
182        }
183
184        if removed_count > 0 {
185            self.update_handler_list();
186            self.stats
187                .active_handlers
188                .fetch_sub(removed_count as u64, Ordering::Relaxed);
189        }
190
191        Ok(removed_count)
192    }
193
194    /// 更新处理器列表(内部方法)
195    fn update_handler_list(&self) {
196        let handlers: Vec<Arc<dyn EventHandler<T> + Send + Sync>> = self
197            .handler_registry
198            .iter()
199            .map(|entry| entry.value().clone())
200            .collect();
201
202        self.event_handlers.store(Arc::new(handlers));
203    }
204
205    /// 获取当前活跃的处理器数量
206    pub fn handler_count(&self) -> usize {
207        self.handler_registry.len()
208    }
209
210    /// 清空所有事件处理器
211    pub fn clear_handlers(&self) -> ForgeResult<()> {
212        self.handler_registry.clear();
213        self.event_handlers.store(Arc::new(Vec::new()));
214        self.stats.active_handlers.store(0, Ordering::Relaxed);
215        Ok(())
216    }
217    /// 异步销毁事件总线
218    pub async fn destroy(&self) -> ForgeResult<()> {
219        self.shutdown.0.send(()).await.map_err(|e| {
220            error_utils::event_error(format!("发送关闭信号失败: {}", e))
221        })
222    }
223
224    /// 同步销毁事件总线(仅在非异步上下文中使用)
225    ///
226    /// ⚠️ 警告:此方法可能阻塞,应优先使用 `destroy()` 异步版本
227    pub fn destroy_blocking(&self) {
228        let _ = self.shutdown.0.send_blocking(());
229    }
230    /// 启动事件循环
231    pub fn start_event_loop(&self) {
232        let rx: async_channel::Receiver<T> = self.subscribe();
233        let event_handlers = self.event_handlers.clone();
234        let shutdown_rt = self.shutdown.1.clone();
235        let config = self.config.clone();
236        let stats = self.stats.clone();
237        tokio::spawn(async move {
238            let mut join_set = tokio::task::JoinSet::new();
239
240            // 定义清理函数,确保所有任务都被正确清理
241            let cleanup_timeout = config.handler_timeout;
242            async fn cleanup_tasks(
243                join_set: &mut tokio::task::JoinSet<()>,
244                timeout: std::time::Duration,
245            ) {
246                debug!("开始清理事件处理任务...");
247                // 首先停止接受新任务
248                join_set.shutdown().await;
249                // 然后等待所有现有任务完成,设置超时防止无限等待
250                match tokio::time::timeout(timeout, async {
251                    while let Some(result) = join_set.join_next().await {
252                        if let Err(e) = result {
253                            debug!("事件处理任务错误: {}", e);
254                        }
255                    }
256                })
257                .await
258                {
259                    Ok(_) => debug!("所有事件处理任务已正常清理"),
260                    Err(_) => debug!("事件处理任务清理超时"),
261                }
262            }
263            loop {
264                tokio::select! {
265                    event = rx.recv() => match event {
266                        Ok(event) => {
267                            // 限制并发任务数量,防止无限制spawning
268                            if join_set.len() >= config.max_concurrent_handlers {
269                                debug!("事件处理任务数量达到上限,等待部分任务完成...");
270                                // 等待至少一个任务完成
271                                if let Some(result) = join_set.join_next().await {
272                                    if let Err(e) = result {
273                                        debug!("事件处理任务错误: {}", e);
274                                    }
275                                }
276                            }
277
278                            // 无锁读取事件处理器列表
279                            let handlers = event_handlers.load();
280                            let handler_timeout = config.handler_timeout;
281                            let event_stats = stats.clone();
282
283                            // 更新事件处理统计
284                            event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
285
286                            join_set.spawn(async move {
287                                // 为该事件并发执行所有 handler
288                                let mut handler_set = tokio::task::JoinSet::new();
289                                for handler in handlers.iter().cloned() {
290                                    let event_for_task = event.clone();
291                                    handler_set.spawn(async move {
292                                        // 每个任务持有自己的事件克隆,避免跨任务借用问题
293                                        let e = event_for_task;
294                                        match tokio::time::timeout(handler_timeout, handler.handle(&e)).await {
295                                            Ok(Ok(_)) => (true, false, false),
296                                            Ok(Err(e)) => { debug!("事件处理器执行失败: {}", e); (false, true, false) },
297                                            Err(_) => { debug!("事件处理器执行超时"); (false, false, true) },
298                                        }
299                                    });
300                                }
301
302                                let mut success_count = 0u64;
303                                let mut failure_count = 0u64;
304                                let mut timeout_count = 0u64;
305                                while let Some(res) = handler_set.join_next().await {
306                                    match res {
307                                        Ok((ok, fail, timeout)) => {
308                                            if ok { success_count += 1; }
309                                            if fail { failure_count += 1; }
310                                            if timeout { timeout_count += 1; }
311                                        }
312                                        Err(e) => debug!("事件处理器任务错误: {}", e),
313                                    }
314                                }
315
316                                if failure_count > 0 {
317                                    event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
318                                }
319                                if timeout_count > 0 {
320                                    event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
321                                }
322
323                                debug!("事件处理完成: 成功={}, 失败={}, 超时={}", success_count, failure_count, timeout_count);
324                            });
325                        },
326                        Err(e) => {
327                            debug!("事件接收错误: {}", e);
328                            cleanup_tasks(&mut join_set, cleanup_timeout).await;
329                            break;
330                        },
331                    },
332                    _ = shutdown_rt.recv() => {
333                        // 使用统一清理流程,带超时
334                        cleanup_tasks(&mut join_set, cleanup_timeout).await;
335                        debug!("事件管理器接收到关闭信号,正在退出...");
336                        break;
337                    },
338                    // 定期清理已完成的任务,防止JoinSet无限增长
339                    _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
340                        // 非阻塞地清理已完成的任务
341                        while let Some(result) = join_set.try_join_next() {
342                            if let Err(e) = result {
343                                debug!("事件处理任务错误: {}", e);
344                            }
345                        }
346                    },
347                }
348            }
349        });
350    }
351
352    pub fn new() -> Self {
353        Self::with_config(EventConfig::default())
354    }
355
356    pub fn with_config(config: EventConfig) -> Self {
357        let (tx, rt) = async_channel::bounded(config.max_queue_size);
358        let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
359        Self {
360            tx,
361            rt,
362            event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
363            handler_registry: Arc::new(DashMap::new()),
364            next_handler_id: Arc::new(AtomicU64::new(1)),
365            shutdown: (shutdown_tx, shutdown_rt),
366            config,
367            stats: EventBusStats::default(),
368        }
369    }
370
371    pub fn subscribe(&self) -> Receiver<T> {
372        self.rt.clone()
373    }
374
375    pub async fn broadcast(
376        &self,
377        event: T,
378    ) -> ForgeResult<()> {
379        self.tx.send(event).await.map_err(|e| {
380            error_utils::event_error(format!("广播事件失败: {}", e))
381        })
382    }
383    /// 同步广播事件(仅在非异步上下文中使用)
384    ///
385    /// ⚠️ 警告:此方法可能阻塞当前线程,应优先使用 `broadcast()` 异步版本
386    ///
387    /// # 使用场景
388    /// - 在 Drop 实现中
389    /// - 在同步的测试代码中
390    /// - 在非异步的回调函数中
391    ///
392    /// # 示例
393    /// ```rust,no_run
394    /// // 在异步上下文中,优先使用:
395    /// // event_bus.broadcast(event).await?;
396    ///
397    /// // 仅在必要时使用阻塞版本:
398    /// event_bus.broadcast_blocking(event)?;
399    /// ```
400    pub fn broadcast_blocking(
401        &self,
402        event: T,
403    ) -> ForgeResult<()> {
404        self.tx.send_blocking(event).map_err(|e| {
405            error_utils::event_error(format!("广播事件失败: {}", e))
406        })
407    }
408
409    /// 获取事件配置
410    pub fn get_config(&self) -> &EventConfig {
411        &self.config
412    }
413
414    /// 更新事件配置(注意:某些配置更改需要重启事件循环才能生效)
415    pub fn update_config(
416        &mut self,
417        config: EventConfig,
418    ) {
419        self.config = config;
420    }
421
422    /// 获取事件总线统计信息
423    pub fn get_stats(&self) -> EventBusStats {
424        self.stats.clone()
425    }
426
427    /// 重置统计信息
428    pub fn reset_stats(&self) {
429        self.stats.events_processed.store(0, Ordering::Relaxed);
430        self.stats.processing_failures.store(0, Ordering::Relaxed);
431        self.stats.processing_timeouts.store(0, Ordering::Relaxed);
432        // 注意:active_handlers 不重置,因为它反映当前状态
433    }
434
435    /// 获取详细的性能报告
436    pub fn get_performance_report(&self) -> EventBusPerformanceReport {
437        let stats = &self.stats;
438        EventBusPerformanceReport {
439            total_events_processed: stats
440                .events_processed
441                .load(Ordering::Relaxed),
442            active_handlers_count: stats
443                .active_handlers
444                .load(Ordering::Relaxed),
445            total_processing_failures: stats
446                .processing_failures
447                .load(Ordering::Relaxed),
448            total_processing_timeouts: stats
449                .processing_timeouts
450                .load(Ordering::Relaxed),
451            handler_registry_size: self.handler_registry.len(),
452            success_rate: {
453                let total = stats.events_processed.load(Ordering::Relaxed);
454                let failures =
455                    stats.processing_failures.load(Ordering::Relaxed);
456                if total > 0 {
457                    ((total - failures) as f64 / total as f64) * 100.0
458                } else {
459                    100.0
460                }
461            },
462        }
463    }
464}
465
466/// 事件总线性能报告
467#[derive(Debug, Clone)]
468pub struct EventBusPerformanceReport {
469    /// 已处理事件总数
470    pub total_events_processed: u64,
471    /// 当前活跃处理器数量
472    pub active_handlers_count: u64,
473    /// 处理失败总数
474    pub total_processing_failures: u64,
475    /// 处理超时总数
476    pub total_processing_timeouts: u64,
477    /// 处理器注册表大小
478    pub handler_registry_size: usize,
479    /// 成功率(百分比)
480    pub success_rate: f64,
481}
482
483// 事件处理器特征
484#[async_trait::async_trait]
485pub trait EventHandler<T>: Send + Sync + Debug {
486    async fn handle(
487        &self,
488        event: &T,
489    ) -> ForgeResult<()>;
490}