moduforge_runtime/
event.rs1use std::{fmt::Debug, sync::Arc, time::Duration};
2
3use async_channel::{Receiver, Sender};
4use futures::future::join_all;
5use moduforge_state::{debug, state::State, Transaction};
6use tokio::{signal, sync::RwLock, time::timeout};
7
8use crate::error::{EditorResult, error_utils};
9
10#[derive(Clone)]
12pub enum Event {
13 Create(Arc<State>),
14 TrApply(Arc<Vec<Transaction>>, Arc<State>), Destroy, Stop, }
18#[derive(Clone)]
20pub struct EventBus {
21 tx: Sender<Event>,
22 rt: Receiver<Event>,
23 event_handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
24}
25
26impl Default for EventBus {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl EventBus {
33 pub async fn restart(&self) -> EditorResult<()> {
34 self.broadcast(Event::Stop).await?;
35 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
37 self.start_event_loop();
38 Ok(())
39 }
40 pub async fn add_event_handler(
41 &mut self,
42 event_handler: Arc<dyn EventHandler>,
43 ) -> EditorResult<()> {
44 let mut write = self.event_handlers.write().await;
45 write.push(event_handler);
46 Ok(())
47 }
48 pub async fn add_event_handlers(
49 &mut self,
50 event_handlers: Vec<Arc<dyn EventHandler>>,
51 ) -> EditorResult<()> {
52 let mut write = self.event_handlers.write().await;
53 write.extend(event_handlers);
54 Ok(())
55 }
56 pub fn start_event_loop(&self) {
58 let rx: async_channel::Receiver<Event> = self.subscribe();
59 let event_handlers = self.event_handlers.clone();
60 tokio::spawn(async move {
61 let handlers_clone = {
62 let handlers = event_handlers.read().await;
63 handlers.clone()
64 };
65 loop {
66 tokio::select! {
67 event = rx.recv() => match event {
68 Ok(Event::Stop) => {
69 debug!("接收到停止事件,等待所有处理器完成...");
70 let mut pending_handles = Vec::new();
72 for handler in &handlers_clone {
73 let handle = handler.handle(&Event::Stop);
74 pending_handles.push(handle);
75 }
76 if let Err(e) = timeout(Duration::from_secs(5), join_all(pending_handles)).await {
78 debug!("等待处理器完成超时: {}", e);
79 }
80 break;
81 },
82 Ok(event) => {
83 let mut handles = Vec::new();
85 for handler in &handlers_clone {
86 let handle = handler.handle(&event);
87 handles.push(handle);
88 }
89
90 let results = join_all(handles.into_iter().map(|handle| {
92 timeout(Duration::from_secs(3), handle)
93 })).await;
94
95 for result in results {
97 match result {
98 Ok(Ok(())) => continue,
99 Ok(Err(e)) => debug!("事件处理错误: {}", e),
100 Err(e) => debug!("事件处理超时: {}", e),
101 }
102 }
103 },
104 Err(e) => {
105 debug!("事件接收错误: {}", e);
106 break;
107 },
108 },
109 shutdown_signal = Box::pin(signal::ctrl_c()) => {
110 match shutdown_signal {
111 Ok(()) => {
112 debug!("事件管理器,接收到关闭信号,正在退出...");
113 break;
114 },
115 Err(e) => {
116 debug!("事件管理器,处理关闭信号时出错: {}", e);
117 break;
118 }
119 }
120 },
121 }
122 }
123 });
124 }
125
126 pub fn new() -> Self {
127 let (tx, rt) = async_channel::bounded(100);
128 Self { tx, rt, event_handlers: Arc::new(RwLock::new(vec![])) }
129 }
130
131 pub fn subscribe(&self) -> Receiver<Event> {
132 self.rt.clone()
133 }
134
135 pub async fn broadcast(
136 &self,
137 event: Event,
138 ) -> EditorResult<()> {
139 self.tx.send(event).await.map_err(|e| {
140 error_utils::event_error(format!(
141 "Failed to broadcast event: {}",
142 e
143 ))
144 })
145 }
146 pub fn broadcast_blocking(
147 &self,
148 event: Event,
149 ) -> EditorResult<()> {
150 self.tx.send_blocking(event).map_err(|e| {
151 error_utils::event_error(format!(
152 "Failed to broadcast event: {}",
153 e
154 ))
155 })
156 }
157}
158
159impl Drop for EventBus {
160 fn drop(&mut self) {
161 let rt = tokio::runtime::Runtime::new().unwrap();
163 rt.block_on(async {
164 if let Err(e) = self.broadcast_blocking(Event::Stop) {
166 debug!("Failed to broadcast stop event during drop: {}", e);
167 }
168
169 let handlers = self.event_handlers.read().await;
171 let mut pending_handles = Vec::new();
172 for handler in handlers.iter() {
173 let handle = handler.handle(&Event::Stop);
174 pending_handles.push(handle);
175 }
176
177 if let Err(e) =
179 timeout(Duration::from_secs(5), join_all(pending_handles)).await
180 {
181 debug!(
182 "Timeout waiting for handlers to complete during drop: {}",
183 e
184 );
185 }
186 });
187 }
188}
189
190#[async_trait::async_trait]
192pub trait EventHandler: Send + Sync + Debug {
193 async fn handle(
194 &self,
195 event: &Event,
196 ) -> EditorResult<()>;
197}
198
199pub struct EventContext {
201 pub state: Arc<State>,
202}