1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))]
use std::time::Duration;
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
#[test]
fn issue_8056_regression_test() {
type Senders = Vec<mpsc::Sender<()>>;
type Handles = JoinSet<()>;
fn make_writer() -> (Senders, Handles) {
let mut senders = vec![];
let mut handles = JoinSet::new();
for _ in 0..2 {
let (tx, mut rx) = mpsc::channel::<()>(1);
senders.push(tx);
handles.spawn_blocking(move || while rx.blocking_recv().is_some() {});
}
(senders, handles)
}
async fn drive_writer(senders: Senders, mut handles: Handles) {
for tx in &senders {
tx.send(()).await.unwrap();
}
drop(senders);
while let Some(res) = handles.join_next().await {
res.unwrap();
}
}
// Regression test for the lost-spawn race in the blocking pool introduced
// by #7757. See https://github.com/tokio-rs/tokio/issues/8056 for complete
// details.
//
// The bug was a race condition in `Spawner::spawn_task` - after pushing a
// task to the queue, it chose between "wake an idle worker" and "spawn a
// new worker", by reading the `num_idle_threads` `AtomicUsize`. The
// `num_idle_threads` counter was incremented by a worker before it calls
// `wait_for_task`, and was deremented after `Inner::run` after
// `wait_for_task` had already returned a `Task`.
//
// In that window, the counter indicates a worker is idle even though it
// has already claimed a notification and is about to run a task. In the
// the omnicron test case that reproduced this, this task was long lived,
// which would essential result in "under spawning" workers.
//
// This test attempts to reproduce that scenario by doing the following on
// a fresh runtime
// - Two awaited `spawn_blocking` calls to get pool workers cycling through
// idle → notified → busy transitions.
// - `make_writer` spawns two closures that park in
// `mpsc::Receiver::blocking_recv` — long-lived blocking tasks that turn
// a stranded spawn into a real deadlock instead of a transient stall.
// - Two more rounds of "create a writer and immediately drop it" churn the
// worker pool while those persistent tasks are still blocked, and then a
// second persistent writer is created — this `spawn_blocking` is most
// likely to trigger the bug, because the prior churn has left a worker
// mid-transition with a stale `num_idle_threads`.
// - Finally, we finish all the tasks -- if any of them never got pulled
// from the `spawn_blocking` queue then it hits the timeout.
// Run multiple times to make hitting the race condition very likely.
for _ in 0..512 {
let rt = Builder::new_current_thread().enable_all().build().unwrap();
let completed = rt.block_on(async {
// We use a timeout to turn a hang into a reliable (and fast)
// failure. On working code this needs like a millisecond, so any
// timeout will do.
tokio::time::timeout(Duration::from_secs(1), async {
let quick = || async {
tokio::task::spawn_blocking(|| {}).await.unwrap();
};
quick().await;
quick().await;
let (persistent_senders, persistent_handles) = make_writer();
for _ in 0..2 {
quick().await;
quick().await;
let (senders, mut handles) = make_writer();
drop(senders);
while let Some(res) = handles.join_next().await {
res.unwrap();
}
}
quick().await;
quick().await;
let (second_senders, second_handles) = make_writer();
drive_writer(persistent_senders, persistent_handles).await;
drive_writer(second_senders, second_handles).await;
})
.await
.is_ok()
});
assert!(completed);
}
}