mf_core/runtime/
async_utils.rs1use std::{future::Future, time::Duration};
6use tokio::runtime::Handle;
7use crate::{error::error_utils, ForgeResult};
8
9pub struct AsyncBridge;
11
12impl AsyncBridge {
13 pub fn run_async_in_sync<F, T>(future: F) -> ForgeResult<T>
25 where
26 F: Future<Output = ForgeResult<T>> + Send + 'static,
27 T: Send + 'static,
28 {
29 if let Ok(handle) = Handle::try_current() {
31 let (tx, rx) = std::sync::mpsc::channel();
33
34 handle.spawn(async move {
35 let result = future.await;
36 let _ = tx.send(result);
37 });
38
39 match rx.recv_timeout(Duration::from_secs(30)) {
41 Ok(result) => result,
42 Err(_) => Err(error_utils::timeout_error(
43 "异步操作在同步上下文中超时".to_string(),
44 )),
45 }
46 } else {
47 let rt = tokio::runtime::Runtime::new().map_err(|e| {
49 error_utils::runtime_error(format!("创建临时运行时失败: {}", e))
50 })?;
51
52 rt.block_on(future)
53 }
54 }
55
56 pub fn run_async_in_sync_with_timeout<F, T>(
58 future: F,
59 timeout: Duration,
60 ) -> ForgeResult<T>
61 where
62 F: Future<Output = ForgeResult<T>> + Send + 'static,
63 T: Send + 'static,
64 {
65 Self::run_async_in_sync(async move {
66 tokio::time::timeout(timeout, future).await.map_err(|_| {
67 error_utils::timeout_error("异步操作超时".to_string())
68 })?
69 })
70 }
71
72 pub fn is_in_async_context() -> bool {
74 Handle::try_current().is_ok()
75 }
76
77 pub async fn run_blocking_in_async<F, T>(blocking_op: F) -> ForgeResult<T>
81 where
82 F: FnOnce() -> ForgeResult<T> + Send + 'static,
83 T: Send + 'static,
84 {
85 tokio::task::spawn_blocking(blocking_op).await.map_err(|e| {
86 error_utils::runtime_error(format!("阻塞任务执行失败: {}", e))
87 })?
88 }
89}
90
91pub struct AsyncDropper<T>
95where
96 T: Send + 'static,
97{
98 resource: Option<T>,
99 cleanup_fn: Option<Box<dyn FnOnce(T) -> ForgeResult<()> + Send>>,
100}
101
102impl<T> AsyncDropper<T>
103where
104 T: Send + 'static,
105{
106 pub fn new<F>(
108 resource: T,
109 cleanup_fn: F,
110 ) -> Self
111 where
112 F: FnOnce(T) -> ForgeResult<()> + Send + 'static,
113 {
114 Self {
115 resource: Some(resource),
116 cleanup_fn: Some(Box::new(cleanup_fn)),
117 }
118 }
119
120 pub async fn cleanup_async(mut self) -> ForgeResult<()> {
122 if let (Some(resource), Some(cleanup_fn)) =
123 (self.resource.take(), self.cleanup_fn.take())
124 {
125 cleanup_fn(resource)
126 } else {
127 Ok(())
128 }
129 }
130
131 pub fn cleanup_sync(mut self) -> ForgeResult<()> {
133 if let (Some(resource), Some(cleanup_fn)) =
134 (self.resource.take(), self.cleanup_fn.take())
135 {
136 cleanup_fn(resource)
137 } else {
138 Ok(())
139 }
140 }
141}
142
143impl<T> Drop for AsyncDropper<T>
144where
145 T: Send + 'static,
146{
147 fn drop(&mut self) {
148 if let (Some(resource), Some(cleanup_fn)) =
149 (self.resource.take(), self.cleanup_fn.take())
150 {
151 if let Err(e) = cleanup_fn(resource) {
153 eprintln!("异步资源清理失败: {}", e);
154 }
155 }
156 }
157}
158
159pub struct SyncWrapper<T> {
163 inner: T,
164}
165
166impl<T> SyncWrapper<T> {
167 pub fn new(inner: T) -> Self {
168 Self { inner }
169 }
170
171 pub fn inner(&self) -> &T {
172 &self.inner
173 }
174
175 pub fn inner_mut(&mut self) -> &mut T {
176 &mut self.inner
177 }
178
179 pub fn into_inner(self) -> T {
180 self.inner
181 }
182}
183
184impl<T: Send + Sync + Clone + 'static> SyncWrapper<crate::event::EventBus<T>> {
186 pub fn broadcast_auto(
188 &self,
189 event: T,
190 ) -> ForgeResult<()> {
191 if AsyncBridge::is_in_async_context() {
192 let bus = self.inner.clone();
194 tokio::spawn(async move {
195 if let Err(e) = bus.broadcast(event).await {
196 eprintln!("异步事件广播失败: {}", e);
197 }
198 });
199 Ok(())
200 } else {
201 self.inner.broadcast_blocking(event)
203 }
204 }
205
206 pub fn destroy_auto(&self) -> ForgeResult<()> {
208 if AsyncBridge::is_in_async_context() {
209 let bus = self.inner.clone();
211 AsyncBridge::run_async_in_sync(async move { bus.destroy().await })
212 } else {
213 self.inner.destroy_blocking();
215 Ok(())
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use std::time::Duration;
224
225 #[test]
226 fn test_sync_context_detection() {
227 assert!(!AsyncBridge::is_in_async_context());
229 }
230
231 #[tokio::test]
232 async fn test_async_context_detection() {
233 assert!(AsyncBridge::is_in_async_context());
235 }
236
237 #[tokio::test]
238 async fn test_run_blocking_in_async() {
239 let result = AsyncBridge::run_blocking_in_async(|| {
240 std::thread::sleep(Duration::from_millis(10));
242 Ok(42)
243 })
244 .await;
245
246 assert!(result.is_ok());
247 assert_eq!(result.unwrap(), 42);
248 }
249
250 #[test]
251 fn test_run_async_in_sync() {
252 let result = AsyncBridge::run_async_in_sync(async {
253 tokio::time::sleep(Duration::from_millis(10)).await;
254 Ok(42)
255 });
256
257 assert!(result.is_ok());
258 assert_eq!(result.unwrap(), 42);
259 }
260
261 #[tokio::test]
262 async fn test_async_dropper() {
263 let dropper = AsyncDropper::new(42, |value| {
264 assert_eq!(value, 42);
265 Ok(())
266 });
267
268 dropper.cleanup_async().await.unwrap();
270 }
271}