mf_core/runtime/
async_utils.rs

1//! 异步工具模块
2//!
3//! 提供异步/同步边界处理工具,帮助避免在异步上下文中使用阻塞调用
4
5use std::{future::Future, time::Duration};
6use tokio::runtime::Handle;
7use crate::{error::error_utils, ForgeResult};
8
9/// 异步/同步边界处理工具
10pub struct AsyncBridge;
11
12impl AsyncBridge {
13    /// 在同步上下文中安全地执行异步操作
14    ///
15    /// 此方法会检查当前是否在 Tokio 运行时中:
16    /// - 如果在运行时中,使用 spawn_blocking 避免阻塞
17    /// - 如果不在运行时中,创建临时运行时执行
18    ///
19    /// # 警告
20    /// 此方法应该谨慎使用,主要用于:
21    /// - Drop 实现
22    /// - 同步的 FFI 边界
23    /// - 测试代码
24    pub fn run_async_in_sync<F, T>(future: F) -> ForgeResult<T>
25    where
26        F: Future<Output = ForgeResult<T>> + Send + 'static,
27        T: Send + 'static,
28    {
29        // 检查是否在 Tokio 运行时中
30        if let Ok(handle) = Handle::try_current() {
31            // 在运行时中,使用 spawn_blocking 避免阻塞
32            let (tx, rx) = std::sync::mpsc::channel();
33
34            handle.spawn(async move {
35                let result = future.await;
36                let _ = tx.send(result);
37            });
38
39            // 等待结果,设置合理的超时
40            match rx.recv_timeout(Duration::from_secs(30)) {
41                Ok(result) => result,
42                Err(_) => Err(error_utils::timeout_error(
43                    "异步操作在同步上下文中超时".to_string(),
44                )),
45            }
46        } else {
47            // 不在运行时中,创建临时运行时
48            let rt = tokio::runtime::Runtime::new().map_err(|e| {
49                error_utils::runtime_error(format!("创建临时运行时失败: {}", e))
50            })?;
51
52            rt.block_on(future)
53        }
54    }
55
56    /// 在同步上下文中安全地执行异步操作(带超时)
57    pub fn run_async_in_sync_with_timeout<F, T>(
58        future: F,
59        timeout: Duration,
60    ) -> ForgeResult<T>
61    where
62        F: Future<Output = ForgeResult<T>> + Send + 'static,
63        T: Send + 'static,
64    {
65        Self::run_async_in_sync(async move {
66            tokio::time::timeout(timeout, future).await.map_err(|_| {
67                error_utils::timeout_error("异步操作超时".to_string())
68            })?
69        })
70    }
71
72    /// 检查当前是否在异步上下文中
73    pub fn is_in_async_context() -> bool {
74        Handle::try_current().is_ok()
75    }
76
77    /// 安全地在异步上下文中执行可能阻塞的操作
78    ///
79    /// 此方法会将阻塞操作移到专用的阻塞线程池中执行
80    pub async fn run_blocking_in_async<F, T>(blocking_op: F) -> ForgeResult<T>
81    where
82        F: FnOnce() -> ForgeResult<T> + Send + 'static,
83        T: Send + 'static,
84    {
85        tokio::task::spawn_blocking(blocking_op).await.map_err(|e| {
86            error_utils::runtime_error(format!("阻塞任务执行失败: {}", e))
87        })?
88    }
89}
90
91/// 异步安全的资源清理器
92///
93/// 用于在 Drop 中安全地执行异步清理操作
94pub struct AsyncDropper<T>
95where
96    T: Send + 'static,
97{
98    resource: Option<T>,
99    cleanup_fn: Option<Box<dyn FnOnce(T) -> ForgeResult<()> + Send>>,
100}
101
102impl<T> AsyncDropper<T>
103where
104    T: Send + 'static,
105{
106    /// 创建新的异步清理器
107    pub fn new<F>(
108        resource: T,
109        cleanup_fn: F,
110    ) -> Self
111    where
112        F: FnOnce(T) -> ForgeResult<()> + Send + 'static,
113    {
114        Self {
115            resource: Some(resource),
116            cleanup_fn: Some(Box::new(cleanup_fn)),
117        }
118    }
119
120    /// 手动执行清理(异步版本)
121    pub async fn cleanup_async(mut self) -> ForgeResult<()> {
122        if let (Some(resource), Some(cleanup_fn)) =
123            (self.resource.take(), self.cleanup_fn.take())
124        {
125            cleanup_fn(resource)
126        } else {
127            Ok(())
128        }
129    }
130
131    /// 手动执行清理(同步版本)
132    pub fn cleanup_sync(mut self) -> ForgeResult<()> {
133        if let (Some(resource), Some(cleanup_fn)) =
134            (self.resource.take(), self.cleanup_fn.take())
135        {
136            cleanup_fn(resource)
137        } else {
138            Ok(())
139        }
140    }
141}
142
143impl<T> Drop for AsyncDropper<T>
144where
145    T: Send + 'static,
146{
147    fn drop(&mut self) {
148        if let (Some(resource), Some(cleanup_fn)) =
149            (self.resource.take(), self.cleanup_fn.take())
150        {
151            // 在 Drop 中只能使用同步清理
152            if let Err(e) = cleanup_fn(resource) {
153                eprintln!("异步资源清理失败: {}", e);
154            }
155        }
156    }
157}
158
159/// 异步操作的同步包装器
160///
161/// 提供一个安全的方式在同步代码中调用异步操作
162pub struct SyncWrapper<T> {
163    inner: T,
164}
165
166impl<T> SyncWrapper<T> {
167    pub fn new(inner: T) -> Self {
168        Self { inner }
169    }
170
171    pub fn inner(&self) -> &T {
172        &self.inner
173    }
174
175    pub fn inner_mut(&mut self) -> &mut T {
176        &mut self.inner
177    }
178
179    pub fn into_inner(self) -> T {
180        self.inner
181    }
182}
183
184/// 为事件总线提供同步包装
185impl<T: Send + Sync + Clone + 'static> SyncWrapper<crate::event::EventBus<T>> {
186    /// 同步广播事件(自动选择最佳方法)
187    pub fn broadcast_auto(
188        &self,
189        event: T,
190    ) -> ForgeResult<()> {
191        if AsyncBridge::is_in_async_context() {
192            // 在异步上下文中,使用 spawn 避免阻塞
193            let bus = self.inner.clone();
194            tokio::spawn(async move {
195                if let Err(e) = bus.broadcast(event).await {
196                    eprintln!("异步事件广播失败: {}", e);
197                }
198            });
199            Ok(())
200        } else {
201            // 在同步上下文中,使用阻塞版本
202            self.inner.broadcast_blocking(event)
203        }
204    }
205
206    /// 同步销毁事件总线(自动选择最佳方法)
207    pub fn destroy_auto(&self) -> ForgeResult<()> {
208        if AsyncBridge::is_in_async_context() {
209            // 在异步上下文中,使用异步版本
210            let bus = self.inner.clone();
211            AsyncBridge::run_async_in_sync(async move { bus.destroy().await })
212        } else {
213            // 在同步上下文中,使用阻塞版本
214            self.inner.destroy_blocking();
215            Ok(())
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use std::time::Duration;
224
225    #[test]
226    fn test_sync_context_detection() {
227        // 在测试中,通常不在异步上下文中
228        assert!(!AsyncBridge::is_in_async_context());
229    }
230
231    #[tokio::test]
232    async fn test_async_context_detection() {
233        // 在 tokio::test 中,应该在异步上下文中
234        assert!(AsyncBridge::is_in_async_context());
235    }
236
237    #[tokio::test]
238    async fn test_run_blocking_in_async() {
239        let result = AsyncBridge::run_blocking_in_async(|| {
240            // 模拟阻塞操作
241            std::thread::sleep(Duration::from_millis(10));
242            Ok(42)
243        })
244        .await;
245
246        assert!(result.is_ok());
247        assert_eq!(result.unwrap(), 42);
248    }
249
250    #[test]
251    fn test_run_async_in_sync() {
252        let result = AsyncBridge::run_async_in_sync(async {
253            tokio::time::sleep(Duration::from_millis(10)).await;
254            Ok(42)
255        });
256
257        assert!(result.is_ok());
258        assert_eq!(result.unwrap(), 42);
259    }
260
261    #[tokio::test]
262    async fn test_async_dropper() {
263        let dropper = AsyncDropper::new(42, |value| {
264            assert_eq!(value, 42);
265            Ok(())
266        });
267
268        // 手动清理
269        dropper.cleanup_async().await.unwrap();
270    }
271}