use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::LazyLock;
use tokio::sync::oneshot;
static WRITE_POOL: LazyLock<ThreadPool> =
LazyLock::new(|| ThreadPoolBuilder::new().build().unwrap());
static COMPUTE_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
ThreadPoolBuilder::new()
.stack_size(16 * 1024 * 1024)
.build()
.unwrap()
});
pub async fn blocking_compute<R: Send + 'static, F: FnOnce() -> R + Send + 'static>(
closure: F,
) -> R {
let (send, recv) = oneshot::channel();
COMPUTE_POOL.spawn_fifo(move || {
let _ = send.send(closure()); });
recv.await.expect("Function panicked in rayon::spawn")
}
pub async fn blocking_write<R: Send + 'static, F: FnOnce() -> R + Send + 'static>(closure: F) -> R {
let (send, recv) = oneshot::channel();
WRITE_POOL.spawn(move || {
let _ = send.send(closure()); });
recv.await.expect("Function panicked in rayon::spawn")
}
#[cfg(test)]
mod deadlock_tests {
use parking_lot::Mutex;
use reqwest::{Client, StatusCode};
use std::{sync::Arc, time::Duration};
use tempfile::TempDir;
use crate::{
rayon::{COMPUTE_POOL, WRITE_POOL},
routes::Health,
GraphServer,
};
#[tokio::test]
async fn test_deadlock_in_read_pool() {
test_pool_lock(43871, |lock| {
COMPUTE_POOL.spawn_broadcast(move |_| {
let _guard = lock.lock();
});
})
.await;
}
#[tokio::test]
async fn test_deadlock_in_write_pool() {
test_pool_lock(43872, |lock| {
WRITE_POOL.spawn_broadcast(move |_| {
let _guard = lock.lock();
});
})
.await;
}
async fn test_pool_lock(port: u16, pool_lock: impl FnOnce(Arc<Mutex<()>>)) {
let tempdir = TempDir::new().unwrap();
let server = GraphServer::new(tempdir.path().to_path_buf(), None, None)
.await
.unwrap();
let _running = server.start_with_port(port).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await; let lock = Arc::new(Mutex::new(()));
let _guard = lock.lock();
let lock_clone = lock.clone();
pool_lock(lock_clone);
let client = Client::new();
let req = client.get(format!("http://localhost:{port}/health"));
let response = req.timeout(Duration::from_secs(100)).send().await.unwrap();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
let health: Health = response.json().await.unwrap();
assert_eq!(health.healthy, false);
let req = client.get(format!("http://localhost:{port}/health?timeout=5"));
let response = req.timeout(Duration::from_secs(100)).send().await.unwrap();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
let health: Health = response.json().await.unwrap();
assert_eq!(health.healthy, false);
}
}