1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
//! Tests using the Shuttle testing framework.
use ThreadPool;
use black_box;
use Level;
use Subscriber;
// -----------------------------------------------------------------------------
// Infrastructure
/*
fn trace<F>(f: F)
where
F: Fn() + Send + Sync + 'static,
{
let subscriber = Subscriber::builder()
.compact()
.with_max_level(Level::TRACE)
.without_time()
.with_thread_names(false)
.finish();
tracing::subscriber::with_default(subscriber, f);
}
*/
/// Provides access to a thread pool which can be treated as static for the
/// purposes of testing.
// -----------------------------------------------------------------------------
// Pool resizing
/// Tests for concurrency issues within the `with_thread_pool` helper function.
/// This spins up a thread pool with a single thread, then spins it back down.
// -----------------------------------------------------------------------------
// Core API
/*
/// Tests for concurrency issues when spawning a static closure.
#[test]
pub fn spawn_closure() {
fn spawn(thread_pool: &'static ThreadPool) {
thread_pool.spawn(|worker| {
black_box(worker);
});
}
shuttle::check_pct(|| with_thread_pool(spawn), 200, 200);
}
/// Tests for concurrency issues when spawning a static future.
#[test]
pub fn spawn_future() {
model(|| {
with_thread_pool(|_, worker| {
let complete = Box::leak(Box::new(AtomicBool::new(false)));
let task = worker.spawn_future(async {
complete.store(true, Ordering::Release);
});
task.detach();
worker.run_until(&complete);
});
});
}
/// Tests for concurrency issues in join operations.
#[test]
pub fn join() {
model(|| {
with_thread_pool(|_, worker| {
worker.join(|_| black_box(()), |_| black_box(()));
});
});
}
/// Tests for concurrency issues when blocking on a future.
#[test]
pub fn block_on() {
model(|| {
with_thread_pool(|_, worker| {
worker.block_on(async {
black_box(());
});
});
});
}
/// Tests for concurrency issues when spawning a future and then blocking on the
/// resulting task.
#[test]
pub fn spawn_and_block() {
model(|| {
with_thread_pool(|_, worker| {
let task = worker.spawn_future(async {
black_box(());
});
worker.block_on(task);
});
});
}
// -----------------------------------------------------------------------------
// Scoped API
/// Test for concurrency issues when creating a scope.
#[test]
pub fn scope_empty() {
model(|| {
with_thread_pool(|_, worker| {
worker.scope(|_| {});
});
});
}
/// Tests for concurrency issues when returning a value from a scope.
#[test]
fn scope_result() {
model(|| {
with_thread_pool(|_, worker| {
let result = worker.scope(|_| 22);
assert_eq!(result, 22);
});
});
}
/// Tests for concurrency issues when spawning a scoped closure.
#[test]
pub fn scope_spawn() {
model(|| {
with_thread_pool(|_, worker| {
let complete = AtomicBool::new(false);
worker.scope(|scope| {
scope.spawn(|_| {
complete.store(true, Ordering::Release);
});
});
worker.run_until(&complete);
});
});
}
/// Tests for concurrency issues when spawning multiple scoped closures.
#[test]
pub fn scope_two() {
model(|| {
with_thread_pool(|_, worker| {
let counter = &AtomicUsize::new(0);
worker.scope(|scope| {
scope.spawn(|_| {
counter.fetch_add(1, Ordering::SeqCst);
});
scope.spawn(|_| {
counter.fetch_add(10, Ordering::SeqCst);
});
});
let v = counter.load(Ordering::SeqCst);
assert_eq!(v, 11);
});
});
}
/// Tests for concurrency issues when spawning a scoped future, and blocking on
/// it.
#[test]
pub fn scope_future() {
model(|| {
with_thread_pool(|_, worker| {
let vec = vec![1, 2, 3];
let task = worker.scope(|scope| scope.spawn_future(async { black_box(vec.len()) }));
let len = worker.block_on(task);
assert_eq!(len, vec.len());
});
});
}
*/