use crate::cx::Cx;
use crate::runtime::blocking_pool::{BlockingPoolHandle, BlockingTaskHandle};
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
use std::thread;
const MAX_FALLBACK_THREADS: usize = 256;
static FALLBACK_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
struct CancelOnDrop {
handle: BlockingTaskHandle,
done: bool,
}
impl CancelOnDrop {
fn new(handle: BlockingTaskHandle) -> Self {
Self {
handle,
done: false,
}
}
fn mark_done(&mut self) {
self.done = true;
}
}
impl Drop for CancelOnDrop {
fn drop(&mut self) {
if !self.done {
self.handle.cancel();
}
}
}
struct BlockingOneshotState<T> {
result: Option<std::thread::Result<T>>,
waker: Option<Waker>,
done: bool,
closed_without_result: bool,
}
struct BlockingOneshot<T> {
state: Arc<Mutex<BlockingOneshotState<T>>>,
sent: bool,
}
impl<T> BlockingOneshot<T> {
fn new() -> (Self, BlockingOneshotReceiver<T>) {
let state = Arc::new(Mutex::new(BlockingOneshotState {
result: None,
waker: None,
done: false,
closed_without_result: false,
}));
(
Self {
state: state.clone(),
sent: false,
},
BlockingOneshotReceiver {
state,
completed: false,
},
)
}
fn send(mut self, val: std::thread::Result<T>) {
let waker = {
let mut guard = self.state.lock();
guard.result = Some(val);
guard.done = true;
guard.closed_without_result = false;
guard.waker.take()
};
self.sent = true;
if let Some(waker) = waker {
waker.wake();
}
}
}
impl<T> Drop for BlockingOneshot<T> {
fn drop(&mut self) {
if self.sent {
return;
}
let waker = {
let mut guard = self.state.lock();
if guard.done {
return;
}
guard.done = true;
guard.closed_without_result = true;
guard.waker.take()
};
if let Some(waker) = waker {
waker.wake();
}
}
}
struct BlockingOneshotReceiver<T> {
state: Arc<Mutex<BlockingOneshotState<T>>>,
completed: bool,
}
impl<T> Drop for BlockingOneshotReceiver<T> {
fn drop(&mut self) {
self.state.lock().waker = None;
}
}
impl<T> std::future::Future for BlockingOneshotReceiver<T> {
type Output = T;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
assert!(
!this.completed,
"blocking operation polled after completion"
);
let mut guard = this.state.lock();
if guard.done {
this.completed = true;
let result = guard.result.take();
let closed_without_result = guard.closed_without_result;
drop(guard);
result.map_or_else(
|| {
if closed_without_result {
panic!("blocking operation ended without producing a result"); } else {
panic!("blocking operation polled after completion"); }
},
|result| match result {
Ok(val) => std::task::Poll::Ready(val),
Err(payload) => std::panic::resume_unwind(payload),
},
)
} else {
if !guard
.waker
.as_ref()
.is_some_and(|w| w.will_wake(cx.waker()))
{
guard.waker = Some(cx.waker().clone());
}
std::task::Poll::Pending
}
}
}
pub async fn spawn_blocking<F, T>(f: F) -> T
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
if let Some(cx) = Cx::current() {
if let Some(pool) = cx.blocking_pool_handle() {
return spawn_blocking_on_pool(pool, f).await;
}
return f();
}
spawn_blocking_on_thread(f).await
}
pub async fn spawn_blocking_io<F, T>(f: F) -> std::io::Result<T>
where
F: FnOnce() -> std::io::Result<T> + Send + 'static,
T: Send + 'static,
{
spawn_blocking(f).await
}
pub(crate) async fn spawn_blocking_on_pool<F, T>(pool: BlockingPoolHandle, f: F) -> T
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = BlockingOneshot::new();
let handle = pool.spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
tx.send(result);
});
let mut guard = CancelOnDrop::new(handle);
let result = rx.await;
guard.mark_done();
result
}
struct FallbackGuard;
impl Drop for FallbackGuard {
fn drop(&mut self) {
FALLBACK_THREAD_COUNT.fetch_sub(1, Ordering::Release);
}
}
pub(crate) async fn spawn_blocking_on_thread<F, T>(f: F) -> T
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
loop {
let current = FALLBACK_THREAD_COUNT.load(Ordering::Relaxed);
if current < MAX_FALLBACK_THREADS {
if FALLBACK_THREAD_COUNT
.compare_exchange_weak(current, current + 1, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
break;
}
} else {
crate::runtime::yield_now::yield_now().await;
}
}
let (tx, rx) = BlockingOneshot::new();
let f_cell = Arc::new(Mutex::new(Some(f)));
let f_for_thread = Arc::clone(&f_cell);
let thread_result = thread::Builder::new()
.name("asupersync-blocking".to_string())
.spawn(move || {
let _guard = FallbackGuard;
let f = f_for_thread
.lock()
.take()
.expect("spawn_blocking_on_thread fn missing");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
tx.send(result);
});
match thread_result {
Ok(_) => rx.await,
Err(_err) => {
FALLBACK_THREAD_COUNT.fetch_sub(1, Ordering::Release);
let f = f_cell
.lock()
.take()
.expect("spawn_blocking_on_thread fn missing");
f()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::conformance::{ConformanceTarget, LabRuntimeTarget, TestConfig};
use crate::runtime::yield_now::yield_now;
use crate::types::{Budget, RegionId, TaskId};
use futures_lite::future;
use serde_json::Value;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Condvar, Mutex as StdMutex};
use std::time::Duration;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn spawn_blocking_returns_result() {
init_test("spawn_blocking_returns_result");
future::block_on(async {
let result = spawn_blocking(|| 42).await;
crate::assert_with_log!(result == 42, "result", 42, result);
});
crate::test_complete!("spawn_blocking_returns_result");
}
#[test]
fn spawn_blocking_io_returns_result() {
init_test("spawn_blocking_io_returns_result");
future::block_on(async {
let result = spawn_blocking_io(|| Ok::<_, std::io::Error>(42))
.await
.unwrap();
crate::assert_with_log!(result == 42, "result", 42, result);
});
crate::test_complete!("spawn_blocking_io_returns_result");
}
#[test]
fn spawn_blocking_io_propagates_error() {
init_test("spawn_blocking_io_propagates_error");
future::block_on(async {
let result: std::io::Result<()> = spawn_blocking_io(|| {
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"test error",
))
})
.await;
crate::assert_with_log!(result.is_err(), "is error", true, result.is_err());
});
crate::test_complete!("spawn_blocking_io_propagates_error");
}
#[test]
fn spawn_blocking_captures_closure() {
init_test("spawn_blocking_captures_closure");
future::block_on(async {
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
spawn_blocking(move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
})
.await;
let count = counter.load(Ordering::Relaxed);
crate::assert_with_log!(count == 1, "counter incremented", 1u32, count);
});
crate::test_complete!("spawn_blocking_captures_closure");
}
#[test]
fn spawn_blocking_uses_pool_when_current() {
init_test("spawn_blocking_uses_pool_when_current");
let pool = crate::runtime::BlockingPool::new(1, 1);
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
None,
None,
None,
None,
None,
)
.with_blocking_pool_handle(Some(pool.handle()));
let _guard = Cx::set_current(Some(cx));
let thread_name = future::block_on(async {
spawn_blocking(|| {
std::thread::current()
.name()
.unwrap_or("unnamed")
.to_string()
})
.await
});
crate::assert_with_log!(
thread_name.contains("-blocking-"),
"thread name uses pool",
true,
thread_name.contains("-blocking-")
);
crate::test_complete!("spawn_blocking_uses_pool_when_current");
}
#[test]
fn spawn_blocking_inline_when_no_pool() {
init_test("spawn_blocking_inline_when_no_pool");
let cx: Cx = Cx::for_testing();
let _guard = Cx::set_current(Some(cx));
let current_id = std::thread::current().id();
let thread_id =
future::block_on(async { spawn_blocking(|| std::thread::current().id()).await });
crate::assert_with_log!(
thread_id == current_id,
"same thread",
current_id,
thread_id
);
crate::test_complete!("spawn_blocking_inline_when_no_pool");
}
#[test]
fn spawn_blocking_runs_in_parallel() {
init_test("spawn_blocking_runs_in_parallel");
future::block_on(async {
let counter = Arc::new(AtomicU32::new(0));
let c1 = Arc::clone(&counter);
let h1 = spawn_blocking(move || {
thread::sleep(Duration::from_millis(10));
c1.fetch_add(1, Ordering::Relaxed);
1
});
let c2 = Arc::clone(&counter);
let h2 = spawn_blocking(move || {
thread::sleep(Duration::from_millis(10));
c2.fetch_add(1, Ordering::Relaxed);
2
});
let (r1, r2) = future::zip(h1, h2).await;
let count = counter.load(Ordering::Relaxed);
crate::assert_with_log!(count == 2, "both completed", 2u32, count);
crate::assert_with_log!(r1 == 1, "first result", 1, r1);
crate::assert_with_log!(r2 == 2, "second result", 2, r2);
});
crate::test_complete!("spawn_blocking_runs_in_parallel");
}
#[test]
fn spawn_blocking_pool_overflow_queues_under_lab_runtime() {
init_test("spawn_blocking_pool_overflow_queues_under_lab_runtime");
let config = TestConfig::new()
.with_seed(0x5A0B_B10C)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let pool = crate::runtime::BlockingPool::new(1, 1);
let pool_handle = pool.handle();
let checkpoints = Arc::new(StdMutex::new(Vec::<Value>::new()));
let gate = Arc::new((StdMutex::new(false), Condvar::new()));
let first_started = Arc::new(std::sync::atomic::AtomicBool::new(false));
let second_started = Arc::new(std::sync::atomic::AtomicBool::new(false));
let (first_value, second_value, queued_before_release, second_started_after, checkpoints) =
LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = Cx::current().expect("lab runtime should install a current Cx");
let first_spawn_cx = cx.clone();
let second_spawn_cx = cx.clone();
let first_task = LabRuntimeTarget::spawn(&first_spawn_cx, Budget::INFINITE, {
let pool_handle = pool_handle.clone();
let checkpoints = Arc::clone(&checkpoints);
let gate = Arc::clone(&gate);
let first_started = Arc::clone(&first_started);
async move {
spawn_blocking_on_pool(pool_handle, move || {
first_started.store(true, Ordering::SeqCst);
let started = serde_json::json!({
"phase": "first_started",
});
tracing::info!(event = %started, "spawn_blocking_lab_checkpoint");
checkpoints.lock().unwrap().push(started);
let (lock, cvar) = &*gate;
let mut released = lock.lock().unwrap();
while !*released {
released = cvar.wait(released).expect("gate wait should succeed");
}
let completed = serde_json::json!({
"phase": "first_completed",
"value": 11,
});
tracing::info!(event = %completed, "spawn_blocking_lab_checkpoint");
checkpoints.lock().unwrap().push(completed);
11
})
.await
}
});
while !first_started.load(Ordering::SeqCst) {
yield_now().await;
}
let second_task = LabRuntimeTarget::spawn(&second_spawn_cx, Budget::INFINITE, {
let pool_handle = pool_handle.clone();
let checkpoints = Arc::clone(&checkpoints);
let second_started = Arc::clone(&second_started);
async move {
spawn_blocking_on_pool(pool_handle, move || {
second_started.store(true, Ordering::SeqCst);
let started = serde_json::json!({
"phase": "second_started",
});
tracing::info!(event = %started, "spawn_blocking_lab_checkpoint");
checkpoints.lock().unwrap().push(started);
22
})
.await
}
});
yield_now().await;
yield_now().await;
let queued_before_release = !second_started.load(Ordering::SeqCst);
let queued = serde_json::json!({
"phase": "queue_observed",
"second_started": second_started.load(Ordering::SeqCst),
});
tracing::info!(event = %queued, "spawn_blocking_lab_checkpoint");
checkpoints.lock().unwrap().push(queued);
{
let (lock, cvar) = &*gate;
let mut released = lock.lock().unwrap();
*released = true;
cvar.notify_all();
}
let first_outcome = first_task.await;
crate::assert_with_log!(
matches!(first_outcome, crate::types::Outcome::Ok(11)),
"first blocking task completes successfully",
true,
matches!(first_outcome, crate::types::Outcome::Ok(11))
);
let crate::types::Outcome::Ok(first_value) = first_outcome else {
panic!("first blocking task should finish successfully");
};
let second_outcome = second_task.await;
crate::assert_with_log!(
matches!(second_outcome, crate::types::Outcome::Ok(22)),
"second blocking task completes successfully",
true,
matches!(second_outcome, crate::types::Outcome::Ok(22))
);
let crate::types::Outcome::Ok(second_value) = second_outcome else {
panic!("second blocking task should finish successfully");
};
(
first_value,
second_value,
queued_before_release,
second_started.load(Ordering::SeqCst),
checkpoints.lock().unwrap().clone(),
)
});
assert_eq!(first_value, 11);
assert_eq!(second_value, 22);
assert!(
queued_before_release,
"second blocking task should remain queued while the single worker is occupied"
);
assert!(
second_started_after,
"second blocking task should eventually start after the first releases the worker"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "first_started"),
"first task start checkpoint should be recorded"
);
assert!(
checkpoints.iter().any(|event| {
event["phase"] == "queue_observed" && event["second_started"] == false
}),
"queue observation checkpoint should record that the second task was still queued"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "second_started"),
"second task start checkpoint should be recorded"
);
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"spawn_blocking lab-runtime overflow test should leave runtime invariants clean: {violations:?}"
);
assert!(
pool.shutdown_and_wait(Duration::from_secs(1)),
"blocking pool should shut down cleanly after the test"
);
}
#[test]
fn blocking_oneshot_sender_drop_fails_closed() {
init_test("blocking_oneshot_sender_drop_fails_closed");
let (tx, rx) = BlockingOneshot::<u32>::new();
drop(tx);
let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
future::block_on(rx);
}));
let payload = panic.expect_err("receiver should fail closed when sender drops");
let message = payload
.downcast_ref::<&str>()
.map(ToString::to_string)
.or_else(|| payload.downcast_ref::<String>().cloned())
.unwrap_or_default();
crate::assert_with_log!(
message.contains("without producing a result"),
"receiver panic message",
true,
message.contains("without producing a result")
);
crate::test_complete!("blocking_oneshot_sender_drop_fails_closed");
}
#[test]
fn blocking_oneshot_success_repoll_fails_closed() {
init_test("blocking_oneshot_success_repoll_fails_closed");
let (tx, rx) = BlockingOneshot::<u32>::new();
tx.send(Ok(42));
let mut rx = Box::pin(rx);
let first = future::block_on(std::future::poll_fn(|cx| rx.as_mut().poll(cx)));
crate::assert_with_log!(first == 42, "first result", 42u32, first);
let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
future::block_on(std::future::poll_fn(|cx| rx.as_mut().poll(cx)));
}));
let payload = panic.expect_err("second poll should fail closed");
let message = payload
.downcast_ref::<&str>()
.map(ToString::to_string)
.or_else(|| payload.downcast_ref::<String>().cloned())
.unwrap_or_default();
crate::assert_with_log!(
message.contains("polled after completion"),
"repoll panic message",
true,
message.contains("polled after completion")
);
crate::test_complete!("blocking_oneshot_success_repoll_fails_closed");
}
#[test]
fn blocking_oneshot_sender_drop_repoll_fails_closed() {
init_test("blocking_oneshot_sender_drop_repoll_fails_closed");
let (tx, rx) = BlockingOneshot::<u32>::new();
drop(tx);
let mut rx = Box::pin(rx);
let first = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
future::block_on(std::future::poll_fn(|cx| rx.as_mut().poll(cx)));
}));
let first_payload = first.expect_err("first poll should fail closed on sender drop");
let first_message = first_payload
.downcast_ref::<&str>()
.map(ToString::to_string)
.or_else(|| first_payload.downcast_ref::<String>().cloned())
.unwrap_or_default();
crate::assert_with_log!(
first_message.contains("without producing a result"),
"first sender-drop panic message",
true,
first_message.contains("without producing a result")
);
let second = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
future::block_on(std::future::poll_fn(|cx| rx.as_mut().poll(cx)));
}));
let second_payload = second.expect_err("second poll should fail closed");
let second_message = second_payload
.downcast_ref::<&str>()
.map(ToString::to_string)
.or_else(|| second_payload.downcast_ref::<String>().cloned())
.unwrap_or_default();
crate::assert_with_log!(
second_message.contains("polled after completion"),
"second sender-drop panic message",
true,
second_message.contains("polled after completion")
);
crate::test_complete!("blocking_oneshot_sender_drop_repoll_fails_closed");
}
#[test]
#[should_panic(expected = "test panic")]
fn spawn_blocking_propagates_panic() {
future::block_on(async {
spawn_blocking(|| std::panic::panic_any("test panic")).await;
});
}
}