mf_core/runtime/
async_utils.rs

1//! 异步工具模块
2//! 
3//! 提供异步/同步边界处理工具,帮助避免在异步上下文中使用阻塞调用
4
5use std::{future::Future, time::Duration};
6use tokio::runtime::Handle;
7use crate::{error::error_utils, ForgeResult};
8
9/// 异步/同步边界处理工具
10pub struct AsyncBridge;
11
12impl AsyncBridge {
13    /// 在同步上下文中安全地执行异步操作
14    /// 
15    /// 此方法会检查当前是否在 Tokio 运行时中:
16    /// - 如果在运行时中,使用 spawn_blocking 避免阻塞
17    /// - 如果不在运行时中,创建临时运行时执行
18    /// 
19    /// # 警告
20    /// 此方法应该谨慎使用,主要用于:
21    /// - Drop 实现
22    /// - 同步的 FFI 边界
23    /// - 测试代码
24    pub fn run_async_in_sync<F, T>(future: F) -> ForgeResult<T>
25    where
26        F: Future<Output = ForgeResult<T>> + Send + 'static,
27        T: Send + 'static,
28    {
29        // 检查是否在 Tokio 运行时中
30        if let Ok(handle) = Handle::try_current() {
31            // 在运行时中,使用 spawn_blocking 避免阻塞
32            let (tx, rx) = std::sync::mpsc::channel();
33            
34            handle.spawn(async move {
35                let result = future.await;
36                let _ = tx.send(result);
37            });
38            
39            // 等待结果,设置合理的超时
40            match rx.recv_timeout(Duration::from_secs(30)) {
41                Ok(result) => result,
42                Err(_) => Err(error_utils::timeout_error(
43                    "异步操作在同步上下文中超时".to_string()
44                )),
45            }
46        } else {
47            // 不在运行时中,创建临时运行时
48            let rt = tokio::runtime::Runtime::new().map_err(|e| {
49                error_utils::runtime_error(format!("创建临时运行时失败: {}", e))
50            })?;
51            
52            rt.block_on(future)
53        }
54    }
55
56    /// 在同步上下文中安全地执行异步操作(带超时)
57    pub fn run_async_in_sync_with_timeout<F, T>(
58        future: F,
59        timeout: Duration,
60    ) -> ForgeResult<T>
61    where
62        F: Future<Output = ForgeResult<T>> + Send + 'static,
63        T: Send + 'static,
64    {
65        Self::run_async_in_sync(async move {
66            tokio::time::timeout(timeout, future)
67                .await
68                .map_err(|_| error_utils::timeout_error("异步操作超时".to_string()))?
69        })
70    }
71
72    /// 检查当前是否在异步上下文中
73    pub fn is_in_async_context() -> bool {
74        Handle::try_current().is_ok()
75    }
76
77    /// 安全地在异步上下文中执行可能阻塞的操作
78    /// 
79    /// 此方法会将阻塞操作移到专用的阻塞线程池中执行
80    pub async fn run_blocking_in_async<F, T>(blocking_op: F) -> ForgeResult<T>
81    where
82        F: FnOnce() -> ForgeResult<T> + Send + 'static,
83        T: Send + 'static,
84    {
85        tokio::task::spawn_blocking(blocking_op)
86            .await
87            .map_err(|e| error_utils::runtime_error(format!("阻塞任务执行失败: {}", e)))?
88    }
89}
90
91/// 异步安全的资源清理器
92///
93/// 用于在 Drop 中安全地执行异步清理操作
94pub struct AsyncDropper<T>
95where
96    T: Send + 'static,
97{
98    resource: Option<T>,
99    cleanup_fn: Option<Box<dyn FnOnce(T) -> ForgeResult<()> + Send>>,
100}
101
102impl<T> AsyncDropper<T>
103where
104    T: Send + 'static,
105{
106    /// 创建新的异步清理器
107    pub fn new<F>(resource: T, cleanup_fn: F) -> Self
108    where
109        F: FnOnce(T) -> ForgeResult<()> + Send + 'static,
110    {
111        Self {
112            resource: Some(resource),
113            cleanup_fn: Some(Box::new(cleanup_fn)),
114        }
115    }
116
117    /// 手动执行清理(异步版本)
118    pub async fn cleanup_async(mut self) -> ForgeResult<()> {
119        if let (Some(resource), Some(cleanup_fn)) = (self.resource.take(), self.cleanup_fn.take()) {
120            cleanup_fn(resource)
121        } else {
122            Ok(())
123        }
124    }
125
126    /// 手动执行清理(同步版本)
127    pub fn cleanup_sync(mut self) -> ForgeResult<()> {
128        if let (Some(resource), Some(cleanup_fn)) = (self.resource.take(), self.cleanup_fn.take()) {
129            cleanup_fn(resource)
130        } else {
131            Ok(())
132        }
133    }
134}
135
136impl<T> Drop for AsyncDropper<T>
137where
138    T: Send + 'static,
139{
140    fn drop(&mut self) {
141        if let (Some(resource), Some(cleanup_fn)) = (self.resource.take(), self.cleanup_fn.take()) {
142            // 在 Drop 中只能使用同步清理
143            if let Err(e) = cleanup_fn(resource) {
144                eprintln!("异步资源清理失败: {}", e);
145            }
146        }
147    }
148}
149
150/// 异步操作的同步包装器
151/// 
152/// 提供一个安全的方式在同步代码中调用异步操作
153pub struct SyncWrapper<T> {
154    inner: T,
155}
156
157impl<T> SyncWrapper<T> {
158    pub fn new(inner: T) -> Self {
159        Self { inner }
160    }
161
162    pub fn inner(&self) -> &T {
163        &self.inner
164    }
165
166    pub fn inner_mut(&mut self) -> &mut T {
167        &mut self.inner
168    }
169
170    pub fn into_inner(self) -> T {
171        self.inner
172    }
173}
174
175/// 为事件总线提供同步包装
176impl<T: Send + 'static> SyncWrapper<crate::event::EventBus<T>> {
177    /// 同步广播事件(自动选择最佳方法)
178    pub fn broadcast_auto(&self, event: T) -> ForgeResult<()> {
179        if AsyncBridge::is_in_async_context() {
180            // 在异步上下文中,使用 spawn 避免阻塞
181            let bus = self.inner.clone();
182            tokio::spawn(async move {
183                if let Err(e) = bus.broadcast(event).await {
184                    eprintln!("异步事件广播失败: {}", e);
185                }
186            });
187            Ok(())
188        } else {
189            // 在同步上下文中,使用阻塞版本
190            self.inner.broadcast_blocking(event)
191        }
192    }
193
194    /// 同步销毁事件总线(自动选择最佳方法)
195    pub fn destroy_auto(&self) -> ForgeResult<()> {
196        if AsyncBridge::is_in_async_context() {
197            // 在异步上下文中,使用异步版本
198            let bus = self.inner.clone();
199            AsyncBridge::run_async_in_sync(async move {
200                bus.destroy().await
201            })
202        } else {
203            // 在同步上下文中,使用阻塞版本
204            self.inner.destroy_blocking();
205            Ok(())
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use std::time::Duration;
214
215    #[test]
216    fn test_sync_context_detection() {
217        // 在测试中,通常不在异步上下文中
218        assert!(!AsyncBridge::is_in_async_context());
219    }
220
221    #[tokio::test]
222    async fn test_async_context_detection() {
223        // 在 tokio::test 中,应该在异步上下文中
224        assert!(AsyncBridge::is_in_async_context());
225    }
226
227    #[tokio::test]
228    async fn test_run_blocking_in_async() {
229        let result = AsyncBridge::run_blocking_in_async(|| {
230            // 模拟阻塞操作
231            std::thread::sleep(Duration::from_millis(10));
232            Ok(42)
233        }).await;
234
235        assert!(result.is_ok());
236        assert_eq!(result.unwrap(), 42);
237    }
238
239    #[test]
240    fn test_run_async_in_sync() {
241        let result = AsyncBridge::run_async_in_sync(async {
242            tokio::time::sleep(Duration::from_millis(10)).await;
243            Ok(42)
244        });
245
246        assert!(result.is_ok());
247        assert_eq!(result.unwrap(), 42);
248    }
249
250    #[tokio::test]
251    async fn test_async_dropper() {
252        let dropper = AsyncDropper::new(42, |value| {
253            assert_eq!(value, 42);
254            Ok(())
255        });
256
257        // 手动清理
258        dropper.cleanup_async().await.unwrap();
259    }
260}