Skip to main content

mf_core/actors/
event_bus.rs

1//! 事件总线Actor - 基于ractor框架实现
2//!
3//! 此Actor负责事件的发布和订阅,保持与原始EventBus完全相同的行为。
4
5use ractor::{Actor, ActorRef, ActorProcessingErr};
6use std::sync::Arc;
7use tokio::sync::oneshot;
8
9use crate::{
10    config::EventConfig,
11    debug::debug,
12    error::{error_utils, ForgeResult},
13    event::{Event, EventHandler, HandlerId},
14};
15
16use super::{ActorSystemResult, ActorMetrics};
17
18/// 事件总线消息类型
19#[derive(Debug)]
20pub enum EventBusMessage {
21    /// 发布事件
22    PublishEvent { event: Event },
23    /// 添加事件处理器
24    AddHandler {
25        handler: Arc<dyn EventHandler<Event> + Send + Sync>,
26        reply: oneshot::Sender<HandlerId>,
27    },
28    /// 移除事件处理器
29    RemoveHandler {
30        handler_id: HandlerId,
31        reply: oneshot::Sender<ForgeResult<()>>,
32    },
33    /// 获取事件总线统计信息
34    GetStats { reply: oneshot::Sender<EventBusStats> },
35    /// 更新配置
36    UpdateConfig {
37        config: EventConfig,
38        reply: oneshot::Sender<ForgeResult<()>>,
39    },
40}
41
42// EventBusMessage 自动实现 ractor::Message (Debug + Send + 'static)
43
44/// 事件总线统计信息
45#[derive(Debug, Clone)]
46pub struct EventBusStats {
47    pub events_published: u64,
48    pub events_processed: u64,
49    pub event_failures: u64,
50    pub active_handlers: usize,
51    pub avg_processing_time_ms: u64,
52}
53
54/// 事件总线Actor状态
55pub struct EventBusActorState {
56    /// 事件处理器列表
57    handlers: Vec<(HandlerId, Arc<dyn EventHandler<Event> + Send + Sync>)>,
58    /// 下一个处理器ID
59    next_handler_id: HandlerId,
60    /// 配置
61    config: EventConfig,
62    /// 指标收集
63    metrics: ActorMetrics,
64    /// 统计信息
65    stats: EventBusStats,
66}
67
68/// 事件总线Actor
69pub struct EventBusActor;
70
71#[ractor::async_trait]
72impl Actor for EventBusActor {
73    type Msg = EventBusMessage;
74    type State = EventBusActorState;
75    type Arguments = EventConfig;
76
77    async fn pre_start(
78        &self,
79        _myself: ActorRef<Self::Msg>,
80        config: Self::Arguments,
81    ) -> Result<Self::State, ActorProcessingErr> {
82        debug!("启动事件总线Actor");
83
84        Ok(EventBusActorState {
85            handlers: Vec::new(),
86            next_handler_id: 1,
87            config,
88            metrics: ActorMetrics::default(),
89            stats: EventBusStats {
90                events_published: 0,
91                events_processed: 0,
92                event_failures: 0,
93                active_handlers: 0,
94                avg_processing_time_ms: 0,
95            },
96        })
97    }
98
99    async fn post_stop(
100        &self,
101        _myself: ActorRef<Self::Msg>,
102        _state: &mut Self::State,
103    ) -> Result<(), ActorProcessingErr> {
104        debug!("停止事件总线Actor");
105        Ok(())
106    }
107
108    async fn handle(
109        &self,
110        _myself: ActorRef<Self::Msg>,
111        message: Self::Msg,
112        state: &mut Self::State,
113    ) -> Result<(), ActorProcessingErr> {
114        match message {
115            EventBusMessage::PublishEvent { event } => {
116                let start_time = std::time::Instant::now();
117
118                // 🎯 与原始事件广播逻辑完全相同
119                let result = self.broadcast_event_logic(state, event).await;
120
121                let processing_time = start_time.elapsed();
122                state.stats.events_published += 1;
123
124                if result.is_err() {
125                    state.stats.event_failures += 1;
126                    state.metrics.increment_errors();
127                }
128
129                state.stats.avg_processing_time_ms =
130                    processing_time.as_millis() as u64;
131                state
132                    .metrics
133                    .update_processing_time(processing_time.as_millis() as u64);
134                state.metrics.increment_messages();
135
136                // 注意:PublishEvent通常不需要回复,因为它是"fire and forget"模式
137                if let Err(e) = result {
138                    debug!("事件发布失败: {}", e);
139                }
140            },
141
142            EventBusMessage::AddHandler { handler, reply } => {
143                let handler_id = state.next_handler_id;
144                state.next_handler_id += 1;
145
146                state.handlers.push((handler_id, handler));
147                state.stats.active_handlers = state.handlers.len();
148
149                let _ = reply.send(handler_id);
150            },
151
152            EventBusMessage::RemoveHandler { handler_id, reply } => {
153                let initial_len = state.handlers.len();
154                state.handlers.retain(|(id, _)| *id != handler_id);
155
156                let result = if state.handlers.len() < initial_len {
157                    state.stats.active_handlers = state.handlers.len();
158                    Ok(())
159                } else {
160                    Err(error_utils::event_error(format!(
161                        "事件处理器 {handler_id} 不存在"
162                    )))
163                };
164
165                let _ = reply.send(result);
166            },
167
168            EventBusMessage::GetStats { reply } => {
169                let _ = reply.send(state.stats.clone());
170            },
171
172            EventBusMessage::UpdateConfig { config, reply } => {
173                state.config = config;
174                let _ = reply.send(Ok(()));
175            },
176        }
177
178        Ok(())
179    }
180}
181
182impl EventBusActor {
183    /// 🎯 与原始事件广播逻辑完全相同
184    ///
185    /// 对应原始EventBus::broadcast的逻辑
186    async fn broadcast_event_logic(
187        &self,
188        actor_state: &mut EventBusActorState,
189        event: Event,
190    ) -> ForgeResult<()> {
191        debug!("广播事件: {}", event.name());
192
193        let mut processing_errors = Vec::new();
194        let event_name = event.name();
195
196        // 并行处理所有事件处理器(与原始实现相同)
197        let mut tasks = Vec::new();
198
199        for (handler_id, handler) in &actor_state.handlers {
200            let handler_clone = handler.clone();
201            let event_clone = event.clone();
202            let handler_id = *handler_id;
203
204            // 创建处理任务
205            let task = tokio::spawn(async move {
206                let result = handler_clone.handle(&event_clone).await;
207                (handler_id, result)
208            });
209
210            tasks.push(task);
211        }
212
213        // 等待所有处理器完成
214        for task in tasks {
215            match task.await {
216                Ok((handler_id, Ok(()))) => {
217                    actor_state.stats.events_processed += 1;
218                    debug!(
219                        "事件处理器 {} 成功处理事件 {}",
220                        handler_id, event_name
221                    );
222                },
223                Ok((handler_id, Err(e))) => {
224                    processing_errors.push(format!(
225                        "处理器 {handler_id} 处理事件 {event_name} 失败: {e}"
226                    ));
227                    actor_state.stats.event_failures += 1;
228                },
229                Err(e) => {
230                    processing_errors
231                        .push(format!("事件处理任务执行失败: {e}"));
232                    actor_state.stats.event_failures += 1;
233                },
234            }
235        }
236
237        // 错误处理策略(与原始实现相同)
238        if !processing_errors.is_empty() {
239            let error_summary = processing_errors.join("; ");
240            debug!("事件处理过程中出现错误: {}", error_summary);
241
242            // 根据配置决定是否抛出错误
243            // 如果处理失败,记录错误但继续处理其他handlers
244            if false {
245                // TODO: 可以考虑添加fail_on_handler_error配置
246                return Err(error_utils::event_error(format!(
247                    "事件 {event_name} 处理失败: {error_summary}"
248                )));
249            }
250        }
251
252        Ok(())
253    }
254}
255
256/// 事件总线Actor管理器
257pub struct EventBusActorManager;
258
259impl EventBusActorManager {
260    /// 启动事件总线Actor
261    pub async fn start(
262        config: EventConfig
263    ) -> ActorSystemResult<ActorRef<EventBusMessage>> {
264        let (actor_ref, _handle) = Actor::spawn(
265            Some("EventBusActor".to_string()),
266            EventBusActor,
267            config,
268        )
269        .await
270        .map_err(|e| super::ActorSystemError::ActorStartupFailed {
271            actor_name: "EventBusActor".to_string(),
272            source: e,
273        })?;
274
275        debug!("事件总线Actor启动成功");
276        Ok(actor_ref)
277    }
278
279    /// 向事件总线添加处理器(便捷方法)
280    pub async fn add_handlers(
281        event_bus: &ActorRef<EventBusMessage>,
282        handlers: Vec<Arc<dyn EventHandler<Event> + Send + Sync>>,
283    ) -> ForgeResult<Vec<HandlerId>> {
284        let mut handler_ids = Vec::new();
285
286        for handler in handlers {
287            let (tx, rx) = oneshot::channel();
288
289            event_bus
290                .send_message(EventBusMessage::AddHandler {
291                    handler,
292                    reply: tx,
293                })
294                .map_err(|e| {
295                    error_utils::event_error(format!(
296                        "发送添加处理器消息失败: {e}"
297                    ))
298                })?;
299
300            let handler_id = rx.await.map_err(|e| {
301                error_utils::event_error(format!("接收处理器ID失败: {e}"))
302            })?;
303
304            handler_ids.push(handler_id);
305        }
306
307        Ok(handler_ids)
308    }
309}