moduforge_runtime/
event.rs

1use std::{fmt::Debug, sync::Arc, time::Duration};
2
3use async_channel::{Receiver, Sender};
4use futures::future::join_all;
5use moduforge_state::{debug, state::State, Transaction};
6use tokio::{signal, sync::RwLock, time::timeout};
7
8use crate::error::{EditorResult, error_utils};
9
10// 事件类型定义
11#[derive(Clone)]
12pub enum Event {
13    Create(Arc<State>),
14    TrApply(Arc<Vec<Transaction>>, Arc<State>), // 事务应用后 + 是否成功
15    Destroy,                                    // 销毁事件
16    Stop,                                       // 停止后需要重启
17}
18/// 事件总线
19#[derive(Clone)]
20pub struct EventBus {
21    tx: Sender<Event>,
22    rt: Receiver<Event>,
23    event_handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
24}
25
26impl Default for EventBus {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl EventBus {
33    pub async fn restart(&self) -> EditorResult<()> {
34        self.broadcast(Event::Stop).await?;
35        //由于是异步的 延迟50毫秒启动
36        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
37        self.start_event_loop();
38        Ok(())
39    }
40    pub async fn add_event_handler(
41        &mut self,
42        event_handler: Arc<dyn EventHandler>,
43    ) -> EditorResult<()> {
44        let mut write = self.event_handlers.write().await;
45        write.push(event_handler);
46        Ok(())
47    }
48    pub async fn add_event_handlers(
49        &mut self,
50        event_handlers: Vec<Arc<dyn EventHandler>>,
51    ) -> EditorResult<()> {
52        let mut write = self.event_handlers.write().await;
53        write.extend(event_handlers);
54        Ok(())
55    }
56    /// 启动事件循环
57    pub fn start_event_loop(&self) {
58        let rx: async_channel::Receiver<Event> = self.subscribe();
59        let event_handlers = self.event_handlers.clone();
60        tokio::spawn(async move {
61            let handlers_clone = {
62                let handlers = event_handlers.read().await;
63                handlers.clone()
64            };
65            loop {
66                tokio::select! {
67                    event = rx.recv() => match event {
68                        Ok(Event::Stop) => {
69                            debug!("接收到停止事件,等待所有处理器完成...");
70                            // 等待所有正在进行的处理完成
71                            let mut pending_handles = Vec::new();
72                            for handler in &handlers_clone {
73                                let handle = handler.handle(&Event::Stop);
74                                pending_handles.push(handle);
75                            }
76                            // 设置超时时间为5秒
77                            if let Err(e) = timeout(Duration::from_secs(5), join_all(pending_handles)).await {
78                                debug!("等待处理器完成超时: {}", e);
79                            }
80                            break;
81                        },
82                        Ok(event) => {
83                            // 并发处理所有handler
84                            let mut handles = Vec::new();
85                            for handler in &handlers_clone {
86                                let handle = handler.handle(&event);
87                                handles.push(handle);
88                            }
89
90                            // 设置每个handler的超时时间为3秒
91                            let results = join_all(handles.into_iter().map(|handle| {
92                                timeout(Duration::from_secs(3), handle)
93                            })).await;
94
95                            // 处理结果
96                            for result in results {
97                                match result {
98                                    Ok(Ok(())) => continue,
99                                    Ok(Err(e)) => debug!("事件处理错误: {}", e),
100                                    Err(e) => debug!("事件处理超时: {}", e),
101                                }
102                            }
103                        },
104                        Err(e) => {
105                            debug!("事件接收错误: {}", e);
106                            break;
107                        },
108                    },
109                    shutdown_signal = Box::pin(signal::ctrl_c()) => {
110                        match shutdown_signal {
111                            Ok(()) => {
112                                debug!("事件管理器,接收到关闭信号,正在退出...");
113                                break;
114                            },
115                            Err(e) => {
116                                debug!("事件管理器,处理关闭信号时出错: {}", e);
117                                break;
118                            }
119                        }
120                    },
121                }
122            }
123        });
124    }
125
126    pub fn new() -> Self {
127        let (tx, rt) = async_channel::bounded(100);
128        Self { tx, rt, event_handlers: Arc::new(RwLock::new(vec![])) }
129    }
130
131    pub fn subscribe(&self) -> Receiver<Event> {
132        self.rt.clone()
133    }
134
135    pub async fn broadcast(
136        &self,
137        event: Event,
138    ) -> EditorResult<()> {
139        self.tx.send(event).await.map_err(|e| {
140            error_utils::event_error(format!(
141                "Failed to broadcast event: {}",
142                e
143            ))
144        })
145    }
146    pub fn broadcast_blocking(
147        &self,
148        event: Event,
149    ) -> EditorResult<()> {
150        self.tx.send_blocking(event).map_err(|e| {
151            error_utils::event_error(format!(
152                "Failed to broadcast event: {}",
153                e
154            ))
155        })
156    }
157}
158
159impl Drop for EventBus {
160    fn drop(&mut self) {
161        // Create a new runtime to handle async operations during drop
162        let rt = tokio::runtime::Runtime::new().unwrap();
163        rt.block_on(async {
164            // Broadcast Stop event to signal handlers to complete
165            if let Err(e) = self.broadcast_blocking(Event::Stop) {
166                debug!("Failed to broadcast stop event during drop: {}", e);
167            }
168
169            // Wait for handlers to complete with a timeout
170            let handlers = self.event_handlers.read().await;
171            let mut pending_handles = Vec::new();
172            for handler in handlers.iter() {
173                let handle = handler.handle(&Event::Stop);
174                pending_handles.push(handle);
175            }
176
177            // Wait up to 5 seconds for all handlers to complete
178            if let Err(e) =
179                timeout(Duration::from_secs(5), join_all(pending_handles)).await
180            {
181                debug!(
182                    "Timeout waiting for handlers to complete during drop: {}",
183                    e
184                );
185            }
186        });
187    }
188}
189
190// 事件处理器特征
191#[async_trait::async_trait]
192pub trait EventHandler: Send + Sync + Debug {
193    async fn handle(
194        &self,
195        event: &Event,
196    ) -> EditorResult<()>;
197}
198
199// 事件上下文
200pub struct EventContext {
201    pub state: Arc<State>,
202}