use crate::prelude::AppResult;
use anyhow::Context;
use std::future::Future;
use std::sync::OnceLock;
use tokio::runtime::Runtime;
use tokio::task::{JoinHandle, spawn_blocking};
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
fn runtime() -> &'static Runtime {
RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime"))
}
pub fn blk<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
spawn_blocking(f)
}
pub async fn block<F, R>(f: F) -> AppResult<R>
where
F: FnOnce() -> AppResult<R> + Send + Sync + 'static,
R: Send + 'static,
{
if tokio::runtime::Handle::try_current().is_ok() {
tracing::debug!("Using existing tokio runtime for blocking task");
spawn_blocking(f)
.await
.context("Failed to spawn blocking task")
.flatten()
} else {
tracing::debug!("Using Foxtive's dedicated tokio runtime for blocking task");
runtime()
.spawn_blocking(f)
.await
.map_err(crate::Error::from)
.flatten()
}
}
pub fn run_async<F: Future>(fut: F) -> F::Output {
runtime().block_on(fut)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[test]
fn test_run_async_returns_value() {
let result = run_async(async { 42 });
assert_eq!(result, 42);
}
#[test]
fn test_run_async_returns_string() {
let result = run_async(async { "hello world".to_string() });
assert_eq!(result, "hello world");
}
#[test]
fn test_run_async_with_computation() {
let result = run_async(async {
let a = 10;
let b = 20;
a + b
});
assert_eq!(result, 30);
}
#[test]
fn test_run_async_with_await() {
let result = run_async(async {
tokio::time::sleep(Duration::from_millis(10)).await;
"completed"
});
assert_eq!(result, "completed");
}
#[test]
fn test_run_async_nested_calls() {
let result1 = run_async(async { 1 });
let result2 = run_async(async { 2 });
assert_eq!(result1, 1);
assert_eq!(result2, 2);
}
#[test]
fn test_run_async_with_existing_runtime() {
#[allow(clippy::redundant_async_block)]
let result = run_async(async {
async { 99 }.await
});
assert_eq!(result, 99);
}
#[test]
fn test_run_async_with_result_type() {
let result: Result<i32, &str> = run_async(async { Ok(42) });
assert_eq!(result, Ok(42));
let error: Result<i32, &str> = run_async(async { Err("failed") });
assert_eq!(error, Err("failed"));
}
#[test]
fn test_blk_returns_value() {
run_async(async {
let handle = blk(|| 42);
let result = handle.await.unwrap();
assert_eq!(result, 42);
});
}
#[test]
fn test_blk_with_computation() {
run_async(async {
let handle = blk(|| {
let mut sum = 0;
for i in 1..=10 {
sum += i;
}
sum
});
let result = handle.await.unwrap();
assert_eq!(result, 55);
});
}
#[test]
fn test_blk_with_string() {
run_async(async {
let handle = blk(|| "blocking result".to_string());
let result = handle.await.unwrap();
assert_eq!(result, "blocking result");
});
}
#[test]
fn test_blk_multiple_tasks() {
run_async(async {
let handle1 = blk(|| 1);
let handle2 = blk(|| 2);
let handle3 = blk(|| 3);
let r1 = handle1.await.unwrap();
let r2 = handle2.await.unwrap();
let r3 = handle3.await.unwrap();
let result = r1 + r2 + r3;
assert_eq!(result, 6);
});
}
#[test]
fn test_blk_with_sleep() {
use std::thread;
run_async(async {
let handle = blk(|| {
thread::sleep(Duration::from_millis(10));
"done"
});
let result = handle.await.unwrap();
assert_eq!(result, "done");
});
}
#[test]
fn test_blk_captures_variables() {
run_async(async {
let value = 100;
let handle = blk(move || value * 2);
let result = handle.await.unwrap();
assert_eq!(result, 200);
});
}
#[test]
fn test_blk_with_shared_state() {
run_async(async {
let counter = Arc::new(Mutex::new(0));
let counter_clone = counter.clone();
let handle = blk(move || {
let mut count = counter_clone.lock().unwrap();
*count += 1;
*count
});
let result = handle.await.unwrap();
assert_eq!(result, 1);
assert_eq!(*counter.lock().unwrap(), 1);
});
}
#[test]
fn test_blk_concurrent_execution() {
run_async(async {
let handles: Vec<_> = (0..5).map(|i| blk(move || i * 2)).collect();
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
assert_eq!(results, vec![0, 2, 4, 6, 8]);
});
}
#[test]
fn test_run_async_and_blk_integration() {
let result = run_async(async {
let blocking_result = blk(|| {
std::thread::sleep(Duration::from_millis(10));
42
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
blocking_result + 8
});
assert_eq!(result, 50);
}
#[test]
fn test_blk_with_result_type() {
run_async(async {
let handle = blk(|| -> Result<i32, String> { Ok(42) });
let result = handle.await.unwrap();
assert_eq!(result, Ok(42));
});
}
#[test]
fn test_blk_with_panic_recovery() {
run_async(async {
let handle = blk(|| {
panic!("intentional panic");
});
let result = handle.await;
assert!(result.is_err());
});
}
#[tokio::test]
async fn test_block_with_runtime() {
let result = block(|| Ok(42)).await.unwrap();
assert_eq!(result, 42);
}
#[tokio::test]
async fn test_block_with_computation() {
let result = block(|| {
let mut sum = 0;
for i in 1..=100 {
sum += i;
}
Ok(sum)
})
.await
.unwrap();
assert_eq!(result, 5050);
}
#[tokio::test]
async fn test_block_with_string() {
let result = block(|| Ok("blocking result".to_string())).await.unwrap();
assert_eq!(result, "blocking result");
}
#[tokio::test]
async fn test_block_with_sleep() {
use std::thread;
let result = block(|| {
thread::sleep(Duration::from_millis(10));
Ok("done")
})
.await
.unwrap();
assert_eq!(result, "done");
}
#[tokio::test]
async fn test_block_captures_variables() {
let value = 100;
let result = block(move || Ok(value * 2)).await.unwrap();
assert_eq!(result, 200);
}
#[tokio::test]
async fn test_block_with_shared_state() {
let counter = Arc::new(Mutex::new(0));
let counter_clone = counter.clone();
let result = block(move || {
let mut count = counter_clone.lock().unwrap();
*count += 1;
Ok(*count)
})
.await
.unwrap();
assert_eq!(result, 1);
assert_eq!(*counter.lock().unwrap(), 1);
}
#[tokio::test]
async fn test_block_concurrent_execution() {
let handles: Vec<_> = (0..5)
.map(|i| tokio::spawn(block(move || Ok(i * 2))))
.collect();
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap().unwrap());
}
assert_eq!(results, vec![0, 2, 4, 6, 8]);
}
#[tokio::test]
async fn test_block_with_result_type() {
let result: AppResult<i32> = block(|| Ok(42)).await;
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_block_with_nested_spawn() {
let result = block(|| {
tokio::runtime::Handle::current().block_on(async {
tokio::time::sleep(Duration::from_millis(1)).await;
Ok(42)
})
})
.await
.unwrap();
assert_eq!(result, 42);
}
#[test]
fn test_block_without_runtime() {
let result = futures::executor::block_on(block(|| Ok(99))).unwrap();
assert_eq!(result, 99);
}
#[test]
fn test_block_creates_runtime_when_needed() {
let result = futures::executor::block_on(block(|| {
std::thread::sleep(Duration::from_millis(10));
Ok(42)
}))
.unwrap();
assert_eq!(result, 42);
}
#[tokio::test]
async fn test_block_integration_with_async() {
let blocking_result = block(|| {
std::thread::sleep(Duration::from_millis(10));
Ok(42)
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
let final_result = blocking_result + 8;
assert_eq!(final_result, 50);
}
#[tokio::test]
async fn test_block_multiple_calls() {
let result1 = block(|| Ok(1)).await.unwrap();
let result2 = block(|| Ok(2)).await.unwrap();
assert_eq!(result1, 1);
assert_eq!(result2, 2);
}
#[test]
fn test_block_nested_calls_without_runtime() {
let result1 = futures::executor::block_on(block(|| Ok(1))).unwrap();
let result2 = futures::executor::block_on(block(|| Ok(2))).unwrap();
assert_eq!(result1, 1);
assert_eq!(result2, 2);
}
#[tokio::test]
async fn test_block_with_panic_recovery() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tokio::runtime::Handle::current().block_on(async {
block::<_, crate::Error>(|| {
panic!("intentional panic");
})
.await
})
}));
assert!(result.is_err());
}
}