1use ractor::{Actor, ActorRef, ActorProcessingErr};
6use std::sync::Arc;
7use tokio::sync::oneshot;
8
9use crate::{
10 config::EventConfig,
11 debug::debug,
12 error::{error_utils, ForgeResult},
13 event::{Event, EventHandler, HandlerId},
14};
15
16use super::{ActorSystemResult, ActorMetrics};
17
18#[derive(Debug)]
20pub enum EventBusMessage {
21 PublishEvent { event: Event },
23 AddHandler {
25 handler: Arc<dyn EventHandler<Event> + Send + Sync>,
26 reply: oneshot::Sender<HandlerId>,
27 },
28 RemoveHandler {
30 handler_id: HandlerId,
31 reply: oneshot::Sender<ForgeResult<()>>,
32 },
33 GetStats { reply: oneshot::Sender<EventBusStats> },
35 UpdateConfig {
37 config: EventConfig,
38 reply: oneshot::Sender<ForgeResult<()>>,
39 },
40}
41
42#[derive(Debug, Clone)]
46pub struct EventBusStats {
47 pub events_published: u64,
48 pub events_processed: u64,
49 pub event_failures: u64,
50 pub active_handlers: usize,
51 pub avg_processing_time_ms: u64,
52}
53
54pub struct EventBusActorState {
56 handlers: Vec<(HandlerId, Arc<dyn EventHandler<Event> + Send + Sync>)>,
58 next_handler_id: HandlerId,
60 config: EventConfig,
62 metrics: ActorMetrics,
64 stats: EventBusStats,
66}
67
68pub struct EventBusActor;
70
71#[ractor::async_trait]
72impl Actor for EventBusActor {
73 type Msg = EventBusMessage;
74 type State = EventBusActorState;
75 type Arguments = EventConfig;
76
77 async fn pre_start(
78 &self,
79 _myself: ActorRef<Self::Msg>,
80 config: Self::Arguments,
81 ) -> Result<Self::State, ActorProcessingErr> {
82 debug!("启动事件总线Actor");
83
84 Ok(EventBusActorState {
85 handlers: Vec::new(),
86 next_handler_id: 1,
87 config,
88 metrics: ActorMetrics::default(),
89 stats: EventBusStats {
90 events_published: 0,
91 events_processed: 0,
92 event_failures: 0,
93 active_handlers: 0,
94 avg_processing_time_ms: 0,
95 },
96 })
97 }
98
99 async fn post_stop(
100 &self,
101 _myself: ActorRef<Self::Msg>,
102 _state: &mut Self::State,
103 ) -> Result<(), ActorProcessingErr> {
104 debug!("停止事件总线Actor");
105 Ok(())
106 }
107
108 async fn handle(
109 &self,
110 _myself: ActorRef<Self::Msg>,
111 message: Self::Msg,
112 state: &mut Self::State,
113 ) -> Result<(), ActorProcessingErr> {
114 match message {
115 EventBusMessage::PublishEvent { event } => {
116 let start_time = std::time::Instant::now();
117
118 let result = self.broadcast_event_logic(state, event).await;
120
121 let processing_time = start_time.elapsed();
122 state.stats.events_published += 1;
123
124 if result.is_err() {
125 state.stats.event_failures += 1;
126 state.metrics.increment_errors();
127 }
128
129 state.stats.avg_processing_time_ms =
130 processing_time.as_millis() as u64;
131 state
132 .metrics
133 .update_processing_time(processing_time.as_millis() as u64);
134 state.metrics.increment_messages();
135
136 if let Err(e) = result {
138 debug!("事件发布失败: {}", e);
139 }
140 },
141
142 EventBusMessage::AddHandler { handler, reply } => {
143 let handler_id = state.next_handler_id;
144 state.next_handler_id += 1;
145
146 state.handlers.push((handler_id, handler));
147 state.stats.active_handlers = state.handlers.len();
148
149 let _ = reply.send(handler_id);
150 },
151
152 EventBusMessage::RemoveHandler { handler_id, reply } => {
153 let initial_len = state.handlers.len();
154 state.handlers.retain(|(id, _)| *id != handler_id);
155
156 let result = if state.handlers.len() < initial_len {
157 state.stats.active_handlers = state.handlers.len();
158 Ok(())
159 } else {
160 Err(error_utils::event_error(format!(
161 "事件处理器 {handler_id} 不存在"
162 )))
163 };
164
165 let _ = reply.send(result);
166 },
167
168 EventBusMessage::GetStats { reply } => {
169 let _ = reply.send(state.stats.clone());
170 },
171
172 EventBusMessage::UpdateConfig { config, reply } => {
173 state.config = config;
174 let _ = reply.send(Ok(()));
175 },
176 }
177
178 Ok(())
179 }
180}
181
182impl EventBusActor {
183 async fn broadcast_event_logic(
187 &self,
188 actor_state: &mut EventBusActorState,
189 event: Event,
190 ) -> ForgeResult<()> {
191 debug!("广播事件: {}", event.name());
192
193 let mut processing_errors = Vec::new();
194 let event_name = event.name();
195
196 let mut tasks = Vec::new();
198
199 for (handler_id, handler) in &actor_state.handlers {
200 let handler_clone = handler.clone();
201 let event_clone = event.clone();
202 let handler_id = *handler_id;
203
204 let task = tokio::spawn(async move {
206 let result = handler_clone.handle(&event_clone).await;
207 (handler_id, result)
208 });
209
210 tasks.push(task);
211 }
212
213 for task in tasks {
215 match task.await {
216 Ok((handler_id, Ok(()))) => {
217 actor_state.stats.events_processed += 1;
218 debug!(
219 "事件处理器 {} 成功处理事件 {}",
220 handler_id, event_name
221 );
222 },
223 Ok((handler_id, Err(e))) => {
224 processing_errors.push(format!(
225 "处理器 {handler_id} 处理事件 {event_name} 失败: {e}"
226 ));
227 actor_state.stats.event_failures += 1;
228 },
229 Err(e) => {
230 processing_errors
231 .push(format!("事件处理任务执行失败: {e}"));
232 actor_state.stats.event_failures += 1;
233 },
234 }
235 }
236
237 if !processing_errors.is_empty() {
239 let error_summary = processing_errors.join("; ");
240 debug!("事件处理过程中出现错误: {}", error_summary);
241
242 if false {
245 return Err(error_utils::event_error(format!(
247 "事件 {event_name} 处理失败: {error_summary}"
248 )));
249 }
250 }
251
252 Ok(())
253 }
254}
255
256pub struct EventBusActorManager;
258
259impl EventBusActorManager {
260 pub async fn start(
262 config: EventConfig
263 ) -> ActorSystemResult<ActorRef<EventBusMessage>> {
264 let (actor_ref, _handle) = Actor::spawn(
265 Some("EventBusActor".to_string()),
266 EventBusActor,
267 config,
268 )
269 .await
270 .map_err(|e| super::ActorSystemError::ActorStartupFailed {
271 actor_name: "EventBusActor".to_string(),
272 source: e,
273 })?;
274
275 debug!("事件总线Actor启动成功");
276 Ok(actor_ref)
277 }
278
279 pub async fn add_handlers(
281 event_bus: &ActorRef<EventBusMessage>,
282 handlers: Vec<Arc<dyn EventHandler<Event> + Send + Sync>>,
283 ) -> ForgeResult<Vec<HandlerId>> {
284 let mut handler_ids = Vec::new();
285
286 for handler in handlers {
287 let (tx, rx) = oneshot::channel();
288
289 event_bus
290 .send_message(EventBusMessage::AddHandler {
291 handler,
292 reply: tx,
293 })
294 .map_err(|e| {
295 error_utils::event_error(format!(
296 "发送添加处理器消息失败: {e}"
297 ))
298 })?;
299
300 let handler_id = rx.await.map_err(|e| {
301 error_utils::event_error(format!("接收处理器ID失败: {e}"))
302 })?;
303
304 handler_ids.push(handler_id);
305 }
306
307 Ok(handler_ids)
308 }
309}