mf_core/
event.rs

1use std::{
2    fmt::Debug,
3    sync::{Arc, atomic::{AtomicU64, Ordering}},
4};
5
6use async_channel::{Receiver, Sender};
7use mf_state::{debug, state::State, Transaction};
8use tokio::signal;
9use arc_swap::ArcSwap;
10use dashmap::DashMap;
11
12use crate::{
13    config::EventConfig,
14    error::{ForgeResult, error_utils},
15};
16
17// 事件类型定义
18#[derive(Clone)]
19pub enum Event {
20    Create(Arc<State>),
21    TrApply(u64, Arc<Vec<Transaction>>, Arc<State>), // 事务应用后 + 是否成功
22    Destroy,                                         // 销毁事件
23    Stop,                                            // 停止后需要重启
24}
25
26impl Event {
27    pub fn name(&self) -> &'static str {
28        match self {
29            Event::Create(_) => "Create",
30            Event::TrApply(_, _, _) => "TrApply",
31            Event::Destroy => "Destroy",
32            Event::Stop => "Stop",
33        }
34    }
35}
36
37/// 事件处理器 ID 类型
38pub type HandlerId = u64;
39
40/// 高性能事件总线
41///
42/// 使用以下优化策略:
43/// - ArcSwap 实现无锁读取事件处理器列表
44/// - DashMap 用于快速查找和管理事件处理器
45/// - 原子计数器生成唯一 ID
46/// - 批量事件处理优化
47pub struct EventBus<T: Send + 'static> {
48    tx: Sender<T>,
49    rt: Receiver<T>,
50    /// 使用 ArcSwap 实现无锁读取的事件处理器列表
51    event_handlers: Arc<ArcSwap<Vec<Arc<dyn EventHandler<T>>>>>,
52    /// 使用 DashMap 快速查找事件处理器
53    handler_registry: Arc<DashMap<HandlerId, Arc<dyn EventHandler<T>>>>,
54    /// 原子计数器生成唯一 ID
55    next_handler_id: Arc<AtomicU64>,
56    shutdown: (Sender<()>, Receiver<()>),
57    config: EventConfig,
58    /// 事件统计
59    stats: EventBusStats,
60}
61
62/// 事件总线统计信息
63#[derive(Clone, Debug)]
64pub struct EventBusStats {
65    /// 已处理事件总数
66    pub events_processed: Arc<AtomicU64>,
67    /// 当前活跃处理器数量
68    pub active_handlers: Arc<AtomicU64>,
69    /// 事件处理失败次数
70    pub processing_failures: Arc<AtomicU64>,
71    /// 事件处理超时次数
72    pub processing_timeouts: Arc<AtomicU64>,
73}
74
75impl Default for EventBusStats {
76    fn default() -> Self {
77        Self {
78            events_processed: Arc::new(AtomicU64::new(0)),
79            active_handlers: Arc::new(AtomicU64::new(0)),
80            processing_failures: Arc::new(AtomicU64::new(0)),
81            processing_timeouts: Arc::new(AtomicU64::new(0)),
82        }
83    }
84}
85
86impl<T: Send + 'static> Default for EventBus<T> {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl<T: Send + 'static> Clone for EventBus<T> {
93    fn clone(&self) -> Self {
94        Self {
95            tx: self.tx.clone(),
96            rt: self.rt.clone(),
97            event_handlers: self.event_handlers.clone(),
98            handler_registry: self.handler_registry.clone(),
99            next_handler_id: self.next_handler_id.clone(),
100            shutdown: (self.shutdown.0.clone(), self.shutdown.1.clone()),
101            config: self.config.clone(),
102            stats: self.stats.clone(),
103        }
104    }
105}
106
107impl<T: Send + 'static> EventBus<T> {
108    /// 添加事件处理器,返回处理器 ID
109    pub fn add_event_handler(
110        &self,
111        event_handler: Arc<dyn EventHandler<T>>,
112    ) -> ForgeResult<HandlerId> {
113        let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
114
115        // 添加到注册表
116        self.handler_registry.insert(handler_id, event_handler.clone());
117
118        // 更新处理器列表(无锁操作)
119        self.update_handler_list();
120
121        // 更新统计
122        self.stats.active_handlers.fetch_add(1, Ordering::Relaxed);
123
124        Ok(handler_id)
125    }
126
127    /// 批量添加事件处理器
128    pub fn add_event_handlers(
129        &self,
130        event_handlers: Vec<Arc<dyn EventHandler<T>>>,
131    ) -> ForgeResult<Vec<HandlerId>> {
132        let mut handler_ids = Vec::with_capacity(event_handlers.len());
133
134        for handler in event_handlers {
135            let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
136            self.handler_registry.insert(handler_id, handler);
137            handler_ids.push(handler_id);
138        }
139
140        // 批量更新处理器列表
141        self.update_handler_list();
142
143        // 更新统计
144        self.stats.active_handlers.fetch_add(handler_ids.len() as u64, Ordering::Relaxed);
145
146        Ok(handler_ids)
147    }
148
149    /// 移除事件处理器
150    pub fn remove_event_handler(&self, handler_id: HandlerId) -> ForgeResult<bool> {
151        let removed = self.handler_registry.remove(&handler_id).is_some();
152
153        if removed {
154            self.update_handler_list();
155            self.stats.active_handlers.fetch_sub(1, Ordering::Relaxed);
156        }
157
158        Ok(removed)
159    }
160
161    /// 批量移除事件处理器
162    pub fn remove_event_handlers(&self, handler_ids: &[HandlerId]) -> ForgeResult<usize> {
163        let mut removed_count = 0;
164
165        for &handler_id in handler_ids {
166            if self.handler_registry.remove(&handler_id).is_some() {
167                removed_count += 1;
168            }
169        }
170
171        if removed_count > 0 {
172            self.update_handler_list();
173            self.stats.active_handlers.fetch_sub(removed_count as u64, Ordering::Relaxed);
174        }
175
176        Ok(removed_count)
177    }
178
179    /// 更新处理器列表(内部方法)
180    fn update_handler_list(&self) {
181        let handlers: Vec<Arc<dyn EventHandler<T>>> = self.handler_registry
182            .iter()
183            .map(|entry| entry.value().clone())
184            .collect();
185
186        self.event_handlers.store(Arc::new(handlers));
187    }
188
189    /// 获取当前活跃的处理器数量
190    pub fn handler_count(&self) -> usize {
191        self.handler_registry.len()
192    }
193
194    /// 清空所有事件处理器
195    pub fn clear_handlers(&self) -> ForgeResult<()> {
196        self.handler_registry.clear();
197        self.event_handlers.store(Arc::new(Vec::new()));
198        self.stats.active_handlers.store(0, Ordering::Relaxed);
199        Ok(())
200    }
201    /// 异步销毁事件总线
202    pub async fn destroy(&self) -> ForgeResult<()> {
203        self.shutdown.0.send(()).await.map_err(|e| {
204            error_utils::event_error(format!("发送关闭信号失败: {}", e))
205        })
206    }
207
208    /// 同步销毁事件总线(仅在非异步上下文中使用)
209    ///
210    /// ⚠️ 警告:此方法可能阻塞,应优先使用 `destroy()` 异步版本
211    pub fn destroy_blocking(&self) {
212        let _ = self.shutdown.0.send_blocking(());
213    }
214    /// 启动事件循环
215    pub fn start_event_loop(&self) {
216        let rx: async_channel::Receiver<T> = self.subscribe();
217        let event_handlers = self.event_handlers.clone();
218        let shutdown_rt = self.shutdown.1.clone();
219        let config = self.config.clone();
220        let stats = self.stats.clone();
221        tokio::spawn(async move {
222            let mut join_set = tokio::task::JoinSet::new();
223
224            // 定义清理函数,确保所有任务都被正确清理
225            let cleanup_timeout = config.handler_timeout;
226            async fn cleanup_tasks(
227                join_set: &mut tokio::task::JoinSet<()>,
228                timeout: std::time::Duration,
229            ) {
230                debug!("开始清理事件处理任务...");
231                // 首先停止接受新任务
232                join_set.shutdown().await;
233                // 然后等待所有现有任务完成,设置超时防止无限等待
234                match tokio::time::timeout(timeout, async {
235                    while let Some(result) = join_set.join_next().await {
236                        if let Err(e) = result {
237                            debug!("事件处理任务错误: {}", e);
238                        }
239                    }
240                })
241                .await
242                {
243                    Ok(_) => debug!("所有事件处理任务已正常清理"),
244                    Err(_) => debug!("事件处理任务清理超时"),
245                }
246            }
247            loop {
248                tokio::select! {
249                    event = rx.recv() => match event {
250                        Ok(event) => {
251                            // 限制并发任务数量,防止无限制spawning
252                            if join_set.len() >= config.max_concurrent_handlers {
253                                debug!("事件处理任务数量达到上限,等待部分任务完成...");
254                                // 等待至少一个任务完成
255                                if let Some(result) = join_set.join_next().await {
256                                    if let Err(e) = result {
257                                        debug!("事件处理任务错误: {}", e);
258                                    }
259                                }
260                            }
261
262                            // 无锁读取事件处理器列表
263                            let handlers = event_handlers.load();
264                            let handler_timeout = config.handler_timeout;
265                            let event_stats = stats.clone();
266
267                            // 更新事件处理统计
268                            event_stats.events_processed.fetch_add(1, Ordering::Relaxed);
269
270                            join_set.spawn(async move {
271                                let mut success_count = 0;
272                                let mut failure_count = 0;
273                                let mut timeout_count = 0;
274
275                                // 并发处理所有handler,添加超时保护
276                                for handler in handlers.iter() {
277                                    match tokio::time::timeout(handler_timeout, handler.handle(&event)).await {
278                                        Ok(Ok(_)) => {
279                                            success_count += 1;
280                                        }, // 处理成功
281                                        Ok(Err(e)) => {
282                                            failure_count += 1;
283                                            debug!("事件处理器执行失败: {}", e);
284                                        },
285                                        Err(_) => {
286                                            timeout_count += 1;
287                                            debug!("事件处理器执行超时");
288                                        },
289                                    }
290                                }
291
292                                // 更新统计信息
293                                if failure_count > 0 {
294                                    event_stats.processing_failures.fetch_add(failure_count, Ordering::Relaxed);
295                                }
296                                if timeout_count > 0 {
297                                    event_stats.processing_timeouts.fetch_add(timeout_count, Ordering::Relaxed);
298                                }
299
300                                debug!("事件处理完成: 成功={}, 失败={}, 超时={}",
301                                    success_count, failure_count, timeout_count);
302                            });
303                        },
304                        Err(e) => {
305                            debug!("事件接收错误: {}", e);
306                            cleanup_tasks(&mut join_set, cleanup_timeout).await;
307                            break;
308                        },
309                    },
310                    _ = shutdown_rt.recv() => {
311                        let _ = join_set.join_all().await;
312                        debug!("事件管理器,接收到关闭信号,正在退出...");
313                        break;
314                    },
315                    shutdown_signal = Box::pin(signal::ctrl_c()) => {
316                        match shutdown_signal {
317                            Ok(()) => {
318                                debug!("事件管理器,接收到关闭信号,正在退出...");
319                                cleanup_tasks(&mut join_set, cleanup_timeout).await;
320                                break;
321                            },
322                            Err(e) => {
323                                debug!("事件管理器,处理关闭信号时出错: {}", e);
324                                cleanup_tasks(&mut join_set, cleanup_timeout).await;
325                                break;
326                            }
327                        }
328                    }
329                    // 定期清理已完成的任务,防止JoinSet无限增长
330                    _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
331                        // 非阻塞地清理已完成的任务
332                        while let Some(result) = join_set.try_join_next() {
333                            if let Err(e) = result {
334                                debug!("事件处理任务错误: {}", e);
335                            }
336                        }
337                    },
338                }
339            }
340        });
341    }
342
343    pub fn new() -> Self {
344        Self::with_config(EventConfig::default())
345    }
346
347    pub fn with_config(config: EventConfig) -> Self {
348        let (tx, rt) = async_channel::bounded(config.max_queue_size);
349        let (shutdown_tx, shutdown_rt) = async_channel::bounded(1);
350        Self {
351            tx,
352            rt,
353            event_handlers: Arc::new(ArcSwap::new(Arc::new(Vec::new()))),
354            handler_registry: Arc::new(DashMap::new()),
355            next_handler_id: Arc::new(AtomicU64::new(1)),
356            shutdown: (shutdown_tx, shutdown_rt),
357            config,
358            stats: EventBusStats::default(),
359        }
360    }
361
362    pub fn subscribe(&self) -> Receiver<T> {
363        self.rt.clone()
364    }
365
366    pub async fn broadcast(
367        &self,
368        event: T,
369    ) -> ForgeResult<()> {
370        self.tx.send(event).await.map_err(|e| {
371            error_utils::event_error(format!("广播事件失败: {}", e))
372        })
373    }
374    /// 同步广播事件(仅在非异步上下文中使用)
375    ///
376    /// ⚠️ 警告:此方法可能阻塞当前线程,应优先使用 `broadcast()` 异步版本
377    ///
378    /// # 使用场景
379    /// - 在 Drop 实现中
380    /// - 在同步的测试代码中
381    /// - 在非异步的回调函数中
382    ///
383    /// # 示例
384    /// ```rust,no_run
385    /// // 在异步上下文中,优先使用:
386    /// // event_bus.broadcast(event).await?;
387    ///
388    /// // 仅在必要时使用阻塞版本:
389    /// event_bus.broadcast_blocking(event)?;
390    /// ```
391    pub fn broadcast_blocking(
392        &self,
393        event: T,
394    ) -> ForgeResult<()> {
395        self.tx.send_blocking(event).map_err(|e| {
396            error_utils::event_error(format!("广播事件失败: {}", e))
397        })
398    }
399
400    /// 获取事件配置
401    pub fn get_config(&self) -> &EventConfig {
402        &self.config
403    }
404
405    /// 更新事件配置(注意:某些配置更改需要重启事件循环才能生效)
406    pub fn update_config(&mut self, config: EventConfig) {
407        self.config = config;
408    }
409
410    /// 获取事件总线统计信息
411    pub fn get_stats(&self) -> EventBusStats {
412        self.stats.clone()
413    }
414
415    /// 重置统计信息
416    pub fn reset_stats(&self) {
417        self.stats.events_processed.store(0, Ordering::Relaxed);
418        self.stats.processing_failures.store(0, Ordering::Relaxed);
419        self.stats.processing_timeouts.store(0, Ordering::Relaxed);
420        // 注意:active_handlers 不重置,因为它反映当前状态
421    }
422
423    /// 获取详细的性能报告
424    pub fn get_performance_report(&self) -> EventBusPerformanceReport {
425        let stats = &self.stats;
426        EventBusPerformanceReport {
427            total_events_processed: stats.events_processed.load(Ordering::Relaxed),
428            active_handlers_count: stats.active_handlers.load(Ordering::Relaxed),
429            total_processing_failures: stats.processing_failures.load(Ordering::Relaxed),
430            total_processing_timeouts: stats.processing_timeouts.load(Ordering::Relaxed),
431            handler_registry_size: self.handler_registry.len(),
432            success_rate: {
433                let total = stats.events_processed.load(Ordering::Relaxed);
434                let failures = stats.processing_failures.load(Ordering::Relaxed);
435                if total > 0 {
436                    ((total - failures) as f64 / total as f64) * 100.0
437                } else {
438                    100.0
439                }
440            },
441        }
442    }
443}
444
445/// 事件总线性能报告
446#[derive(Debug, Clone)]
447pub struct EventBusPerformanceReport {
448    /// 已处理事件总数
449    pub total_events_processed: u64,
450    /// 当前活跃处理器数量
451    pub active_handlers_count: u64,
452    /// 处理失败总数
453    pub total_processing_failures: u64,
454    /// 处理超时总数
455    pub total_processing_timeouts: u64,
456    /// 处理器注册表大小
457    pub handler_registry_size: usize,
458    /// 成功率(百分比)
459    pub success_rate: f64,
460}
461
462// 事件处理器特征
463#[async_trait::async_trait]
464pub trait EventHandler<T>: Send + Sync + Debug {
465    async fn handle(
466        &self,
467        event: &T,
468    ) -> ForgeResult<()>;
469}