mf_core/runtime/
async_utils.rs1use std::{future::Future, time::Duration};
6use tokio::runtime::Handle;
7use crate::{error::error_utils, ForgeResult};
8
9pub struct AsyncBridge;
11
12impl AsyncBridge {
13 pub fn run_async_in_sync<F, T>(future: F) -> ForgeResult<T>
25 where
26 F: Future<Output = ForgeResult<T>> + Send + 'static,
27 T: Send + 'static,
28 {
29 if let Ok(handle) = Handle::try_current() {
31 let (tx, rx) = std::sync::mpsc::channel();
33
34 handle.spawn(async move {
35 let result = future.await;
36 let _ = tx.send(result);
37 });
38
39 match rx.recv_timeout(Duration::from_secs(30)) {
41 Ok(result) => result,
42 Err(_) => Err(error_utils::timeout_error(
43 "异步操作在同步上下文中超时".to_string()
44 )),
45 }
46 } else {
47 let rt = tokio::runtime::Runtime::new().map_err(|e| {
49 error_utils::runtime_error(format!("创建临时运行时失败: {}", e))
50 })?;
51
52 rt.block_on(future)
53 }
54 }
55
56 pub fn run_async_in_sync_with_timeout<F, T>(
58 future: F,
59 timeout: Duration,
60 ) -> ForgeResult<T>
61 where
62 F: Future<Output = ForgeResult<T>> + Send + 'static,
63 T: Send + 'static,
64 {
65 Self::run_async_in_sync(async move {
66 tokio::time::timeout(timeout, future)
67 .await
68 .map_err(|_| error_utils::timeout_error("异步操作超时".to_string()))?
69 })
70 }
71
72 pub fn is_in_async_context() -> bool {
74 Handle::try_current().is_ok()
75 }
76
77 pub async fn run_blocking_in_async<F, T>(blocking_op: F) -> ForgeResult<T>
81 where
82 F: FnOnce() -> ForgeResult<T> + Send + 'static,
83 T: Send + 'static,
84 {
85 tokio::task::spawn_blocking(blocking_op)
86 .await
87 .map_err(|e| error_utils::runtime_error(format!("阻塞任务执行失败: {}", e)))?
88 }
89}
90
91pub struct AsyncDropper<T>
95where
96 T: Send + 'static,
97{
98 resource: Option<T>,
99 cleanup_fn: Option<Box<dyn FnOnce(T) -> ForgeResult<()> + Send>>,
100}
101
102impl<T> AsyncDropper<T>
103where
104 T: Send + 'static,
105{
106 pub fn new<F>(resource: T, cleanup_fn: F) -> Self
108 where
109 F: FnOnce(T) -> ForgeResult<()> + Send + 'static,
110 {
111 Self {
112 resource: Some(resource),
113 cleanup_fn: Some(Box::new(cleanup_fn)),
114 }
115 }
116
117 pub async fn cleanup_async(mut self) -> ForgeResult<()> {
119 if let (Some(resource), Some(cleanup_fn)) = (self.resource.take(), self.cleanup_fn.take()) {
120 cleanup_fn(resource)
121 } else {
122 Ok(())
123 }
124 }
125
126 pub fn cleanup_sync(mut self) -> ForgeResult<()> {
128 if let (Some(resource), Some(cleanup_fn)) = (self.resource.take(), self.cleanup_fn.take()) {
129 cleanup_fn(resource)
130 } else {
131 Ok(())
132 }
133 }
134}
135
136impl<T> Drop for AsyncDropper<T>
137where
138 T: Send + 'static,
139{
140 fn drop(&mut self) {
141 if let (Some(resource), Some(cleanup_fn)) = (self.resource.take(), self.cleanup_fn.take()) {
142 if let Err(e) = cleanup_fn(resource) {
144 eprintln!("异步资源清理失败: {}", e);
145 }
146 }
147 }
148}
149
150pub struct SyncWrapper<T> {
154 inner: T,
155}
156
157impl<T> SyncWrapper<T> {
158 pub fn new(inner: T) -> Self {
159 Self { inner }
160 }
161
162 pub fn inner(&self) -> &T {
163 &self.inner
164 }
165
166 pub fn inner_mut(&mut self) -> &mut T {
167 &mut self.inner
168 }
169
170 pub fn into_inner(self) -> T {
171 self.inner
172 }
173}
174
175impl<T: Send + 'static> SyncWrapper<crate::event::EventBus<T>> {
177 pub fn broadcast_auto(&self, event: T) -> ForgeResult<()> {
179 if AsyncBridge::is_in_async_context() {
180 let bus = self.inner.clone();
182 tokio::spawn(async move {
183 if let Err(e) = bus.broadcast(event).await {
184 eprintln!("异步事件广播失败: {}", e);
185 }
186 });
187 Ok(())
188 } else {
189 self.inner.broadcast_blocking(event)
191 }
192 }
193
194 pub fn destroy_auto(&self) -> ForgeResult<()> {
196 if AsyncBridge::is_in_async_context() {
197 let bus = self.inner.clone();
199 AsyncBridge::run_async_in_sync(async move {
200 bus.destroy().await
201 })
202 } else {
203 self.inner.destroy_blocking();
205 Ok(())
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use std::time::Duration;
214
215 #[test]
216 fn test_sync_context_detection() {
217 assert!(!AsyncBridge::is_in_async_context());
219 }
220
221 #[tokio::test]
222 async fn test_async_context_detection() {
223 assert!(AsyncBridge::is_in_async_context());
225 }
226
227 #[tokio::test]
228 async fn test_run_blocking_in_async() {
229 let result = AsyncBridge::run_blocking_in_async(|| {
230 std::thread::sleep(Duration::from_millis(10));
232 Ok(42)
233 }).await;
234
235 assert!(result.is_ok());
236 assert_eq!(result.unwrap(), 42);
237 }
238
239 #[test]
240 fn test_run_async_in_sync() {
241 let result = AsyncBridge::run_async_in_sync(async {
242 tokio::time::sleep(Duration::from_millis(10)).await;
243 Ok(42)
244 });
245
246 assert!(result.is_ok());
247 assert_eq!(result.unwrap(), 42);
248 }
249
250 #[tokio::test]
251 async fn test_async_dropper() {
252 let dropper = AsyncDropper::new(42, |value| {
253 assert_eq!(value, 42);
254 Ok(())
255 });
256
257 dropper.cleanup_async().await.unwrap();
259 }
260}