use std::{future::Future, time::Duration};
use tokio::runtime::Handle;
use crate::{error::error_utils, ForgeResult};
pub struct AsyncBridge;
impl AsyncBridge {
pub fn run_async_in_sync<F, T>(future: F) -> ForgeResult<T>
where
F: Future<Output = ForgeResult<T>> + Send + 'static,
T: Send + 'static,
{
if let Ok(handle) = Handle::try_current() {
let (tx, rx) = std::sync::mpsc::channel();
handle.spawn(async move {
let result = future.await;
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(result) => result,
Err(_) => Err(error_utils::timeout_error(
"异步操作在同步上下文中超时".to_string(),
)),
}
} else {
let rt = tokio::runtime::Runtime::new().map_err(|e| {
error_utils::runtime_error(format!("创建临时运行时失败: {e}"))
})?;
rt.block_on(future)
}
}
pub fn run_async_in_sync_with_timeout<F, T>(
future: F,
timeout: Duration,
) -> ForgeResult<T>
where
F: Future<Output = ForgeResult<T>> + Send + 'static,
T: Send + 'static,
{
Self::run_async_in_sync(async move {
tokio::time::timeout(timeout, future).await.map_err(|_| {
error_utils::timeout_error("异步操作超时".to_string())
})?
})
}
pub fn is_in_async_context() -> bool {
Handle::try_current().is_ok()
}
pub async fn run_blocking_in_async<F, T>(blocking_op: F) -> ForgeResult<T>
where
F: FnOnce() -> ForgeResult<T> + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn_blocking(blocking_op).await.map_err(|e| {
error_utils::runtime_error(format!("阻塞任务执行失败: {e}"))
})?
}
}
pub struct AsyncDropper<T>
where
T: Send + 'static,
{
resource: Option<T>,
cleanup_fn: Option<Box<dyn FnOnce(T) -> ForgeResult<()> + Send>>,
}
impl<T> AsyncDropper<T>
where
T: Send + 'static,
{
pub fn new<F>(
resource: T,
cleanup_fn: F,
) -> Self
where
F: FnOnce(T) -> ForgeResult<()> + Send + 'static,
{
Self {
resource: Some(resource),
cleanup_fn: Some(Box::new(cleanup_fn)),
}
}
pub async fn cleanup_async(mut self) -> ForgeResult<()> {
if let (Some(resource), Some(cleanup_fn)) =
(self.resource.take(), self.cleanup_fn.take())
{
cleanup_fn(resource)
} else {
Ok(())
}
}
pub fn cleanup_sync(mut self) -> ForgeResult<()> {
if let (Some(resource), Some(cleanup_fn)) =
(self.resource.take(), self.cleanup_fn.take())
{
cleanup_fn(resource)
} else {
Ok(())
}
}
}
impl<T> Drop for AsyncDropper<T>
where
T: Send + 'static,
{
fn drop(&mut self) {
if let (Some(resource), Some(cleanup_fn)) =
(self.resource.take(), self.cleanup_fn.take())
{
if let Err(e) = cleanup_fn(resource) {
eprintln!("异步资源清理失败: {e}");
}
}
}
}
pub struct SyncWrapper<T> {
inner: T,
}
impl<T> SyncWrapper<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}
pub fn inner(&self) -> &T {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut T {
&mut self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T: Send + Sync + Clone + 'static> SyncWrapper<crate::event::EventBus<T>> {
pub fn broadcast_auto(
&self,
event: T,
) -> ForgeResult<()> {
if AsyncBridge::is_in_async_context() {
let bus = self.inner.clone();
tokio::spawn(async move {
if let Err(e) = bus.broadcast(event).await {
eprintln!("异步事件广播失败: {e}");
}
});
Ok(())
} else {
self.inner.broadcast_blocking(event)
}
}
pub fn destroy_auto(&self) -> ForgeResult<()> {
if AsyncBridge::is_in_async_context() {
let bus = self.inner.clone();
AsyncBridge::run_async_in_sync(async move { bus.destroy().await })
} else {
self.inner.destroy_blocking();
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_sync_context_detection() {
assert!(!AsyncBridge::is_in_async_context());
}
#[tokio::test]
async fn test_async_context_detection() {
assert!(AsyncBridge::is_in_async_context());
}
#[tokio::test]
async fn test_run_blocking_in_async() {
let result = AsyncBridge::run_blocking_in_async(|| {
std::thread::sleep(Duration::from_millis(10));
Ok(42)
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[test]
fn test_run_async_in_sync() {
let result = AsyncBridge::run_async_in_sync(async {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(42)
});
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_async_dropper() {
let dropper = AsyncDropper::new(42, |value| {
assert_eq!(value, 42);
Ok(())
});
dropper.cleanup_async().await.unwrap();
}
}