1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
use shuttle::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use shuttle::sync::mpsc::{channel, sync_channel};
use shuttle::sync::{Barrier, Condvar, Mutex, Once, RwLock};
use shuttle::{check_dfs, check_pct, check_random, current, thread};
use std::collections::HashSet;
use std::sync::Arc;
use test_log::test;
pub fn me() -> usize {
usize::from(thread::current().id())
}
#[track_caller]
pub fn check_clock(f: impl Fn(usize, u32) -> bool) {
for (i, &c) in current::clock().iter().enumerate() {
assert!(
f(i, c),
"clock {:?} doesn't satisfy predicate at {}",
current::clock(),
i
);
}
}
fn clock_mutex(num_threads: usize) {
// This test checks that when a thread acquires a lock, it inherits the vector clocks of
// threads that accessed the lock before it.
//
// Test: create a mutex-protected set, initialized with 0 (the id of the main thread)
// and spawn N threads where each thread does the following:
// (1) check that its own initial vector clock only has nonzero for the creator (thread 0)
// this checks that when a thread is created, it only inherits the clock of the spawner
// (2) lock the set and add its own thread id to it; let the resulting set be S
// (3) read its own clock again, call this C
// (4) check that the only nonzero entries in C are for the threads in S
// For sanity checking, we also spawn an initial dummy thread (with id 1) and ensure that its
// clock is always 0.
let mut set = HashSet::new();
set.insert(0);
let set = Arc::new(Mutex::new(set));
// Create dummy thread (should have id 1)
thread::spawn(|| {
assert_eq!(me(), 1usize);
});
let threads = (0..num_threads)
.map(|_| {
let set = Arc::clone(&set);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
let mut set = set.lock().unwrap();
set.insert(me());
assert!(!set.contains(&1)); // dummy thread is never in the set
check_clock(|i, c| (c > 0) == set.contains(&i));
})
})
.collect::<Vec<_>>();
for thd in threads {
thd.join().unwrap();
}
assert_eq!(set.lock().unwrap().len(), 1 + num_threads); // +1 because we initialized the set to {0}
}
#[test]
fn clock_mutex_dfs() {
check_dfs(|| clock_mutex(2), None);
}
#[test]
fn clock_mutex_pct() {
check_pct(|| clock_mutex(20), 1000, 3);
}
// RWLocks
fn clock_rwlock(num_writers: usize, num_readers: usize) {
// This test checks that when a thread acquires a RwLock, it inherits the clocks of writers that
// accessed the lock before it. It's the same as `clock_mutex`, except that readers don't update
// the set S, and aren't required to appear in the clock for future lock holders.
//
// TODO this test is pretty weak. Testing readers is hard because they race with each other; for
// example, a reader might see the clock update from another reader before that reader has a
// chance to update the set S. Causality is also pretty fuzzy for readers (see the TODOs in the
// RwLock implementation). So we don't test very much about them here.
let set = Arc::new(std::sync::Mutex::new(HashSet::from([0])));
let lock = Arc::new(RwLock::new(()));
// Create dummy thread (should have id 1)
thread::spawn(|| {
assert_eq!(me(), 1usize);
});
// Spawn the writers
let _thds = (0..num_writers)
.map(|_| {
let set = Arc::clone(&set);
let lock = Arc::clone(&lock);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
let _guard = lock.write().unwrap();
let mut set = set.lock().unwrap();
set.insert(me());
assert!(!set.contains(&1)); // dummy thread is never in the set
check_clock(|i, c| !set.contains(&i) || (c > 0));
})
})
.collect::<Vec<_>>();
// Spawn the readers
let _thds = (0..num_readers)
.map(|_| {
let set = Arc::clone(&set);
let lock = Arc::clone(&lock);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
let _guard = lock.read().unwrap();
let set = set.lock().unwrap();
assert!(!set.contains(&1)); // dummy thread is never in the set
check_clock(|i, c| !set.contains(&i) || (c > 0));
})
})
.collect::<Vec<_>>();
}
#[test]
fn clock_rwlock_dfs() {
// Unfortunately anything larger than this takes > 500k iterations, too slow to be useful :(
// But the PCT and random tests below buy us a much bigger search.
check_dfs(|| clock_rwlock(1, 1), None);
}
#[test]
fn clock_rwlock_pct() {
check_pct(|| clock_rwlock(4, 4), 10_000, 3);
}
#[test]
fn clock_rwlock_random() {
check_random(|| clock_rwlock(4, 4), 10_000);
}
// Barrier
fn clock_barrier(n: usize) {
// This test checks that threads waiting on a barrier inherit the clocks from all the other participants in the barrier.
//
// The test creates a barrier with bound n and creates n threads (including the main thread).
// Each thread initially checks that its clock is nonzero only for the main thread, and then waits on the barrier.
// When it exits the barrier, each thread checks that its current clock is nonzero for all threads.
// For sanity checking, we also spawn a dummy thread and check that its clock entry is always 0.
let barrier = Arc::new(Barrier::new(n));
// Create dummy thread (should have id 1)
thread::spawn(|| {
assert_eq!(me(), 1usize);
});
let _thds = (0..n - 1)
.map(|_| {
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
barrier.wait();
// Since all threads reached the barrier, everyone's clock must be nonzero
// except the dummy, whose clock must be 0
check_clock(|i, c| (c > 0) == (i != 1));
});
})
.collect::<Vec<_>>();
barrier.wait();
// Since all threads reached the barrier, everyone's clock must be nonzero, except for the dummy
check_clock(|i, c| (c > 0) == (i != 1));
}
#[test]
fn clock_barrier_dfs() {
check_dfs(|| clock_barrier(4), None);
}
#[test]
fn clock_barrier_pct() {
check_pct(|| clock_barrier(50), 1000, 3);
}
// Condvars
#[test]
fn clock_condvar_single() {
check_dfs(
|| {
let lock = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
assert_eq!(me(), 1);
*lock.lock().unwrap() = true;
cond.notify_one();
});
}
let mut guard = lock.lock().unwrap();
while !*guard {
check_clock(|i, c| (c > 0) == (i == 0)); // spawned thread has not executed notify_one
guard = cond.wait(guard).unwrap();
}
check_clock(|i, c| (c > 0) == (i == 0 || i == 1));
},
None,
)
}
fn clock_condvar_notify_one(num_notifiers: usize, num_waiters: usize) {
let lock = Arc::new(Mutex::new(0usize));
let cond = Arc::new(Condvar::new());
for _ in 0..num_notifiers {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
assert!(me() <= num_notifiers);
*lock.lock().unwrap() = me();
cond.notify_one();
});
}
for _ in 0..num_waiters {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
let mut guard = lock.lock().unwrap();
while *guard == 0 {
check_clock(|i, c| !(i >= 1 && i <= num_notifiers) || (c == 0)); // no notifier has gone yet
guard = cond.wait(guard).unwrap();
}
// Note that since all the threads touch the lock, any of them may have preceded this thread.
// But we know for sure that the thread that unblocked us should causally precede us.
check_clock(|i, c| (i != *guard) || (c > 0));
});
}
}
#[test]
fn clock_condvar_notify_one_dfs() {
check_dfs(|| clock_condvar_notify_one(1, 1), None);
}
#[test]
fn clock_condvar_notify_one_pct() {
check_pct(|| clock_condvar_notify_one(10, 10), 10_000, 3);
}
fn clock_condvar_notify_all(num_waiters: usize) {
let lock = Arc::new(Mutex::new(0usize));
let cond = Arc::new(Condvar::new());
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
assert_eq!(me(), 1);
*lock.lock().unwrap() = me();
cond.notify_all();
});
}
for _ in 0..num_waiters {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
let mut guard = lock.lock().unwrap();
while *guard == 0 {
check_clock(|i, c| (i != 1) || (c == 0)); // notifier hasn't been scheduled
guard = cond.wait(guard).unwrap();
}
// Note that since all the threads touch the lock, any of them may have preceded this thread.
// But we know for sure that the thread that unblocked us should causally precede us.
check_clock(|i, c| (i != *guard) || (c > 0));
});
}
}
#[test]
fn clock_condvar_notify_all_dfs() {
check_dfs(|| clock_condvar_notify_all(2), None);
}
#[test]
fn clock_condvar_notify_all_pct() {
check_pct(|| clock_condvar_notify_all(20), 10_000, 3);
}
// MPSC Channels
#[test]
fn clock_mpsc_unbounded() {
const NUM_MSG: usize = 3;
check_dfs(
|| {
let (tx, rx) = channel::<usize>();
thread::spawn(move || {
assert_eq!(me(), 1);
for i in 0..NUM_MSG {
tx.send(i).unwrap();
}
});
for _ in 0..NUM_MSG {
let c1 = current::clock().get(1); // save clock of thread 1
let _ = rx.recv().unwrap();
check_clock(|i, c| (i != 1) || (c > c1)); // thread 1's clock increased
}
},
None,
);
}
#[test]
fn clock_mpsc_bounded() {
const BOUND: usize = 3;
check_dfs(
|| {
let (tx, rx) = sync_channel::<()>(BOUND);
thread::spawn(move || {
assert_eq!(me(), 1);
for _ in 0..BOUND {
tx.send(()).unwrap();
}
// At this point the sender doesn't know about the receiver
check_clock(|i, c| (c > 0) == (i == 0 || i == 1));
tx.send(()).unwrap();
// Here, we know the receiver picked up the 1st message, so its clock is nonzero
let c1 = current::clock().get(2);
assert!(c1 > 0);
tx.send(()).unwrap();
// Here, we know that the receiver picked up the 2nd message, so its clock has increased
assert!(current::clock().get(2) > c1);
});
thread::spawn(move || {
assert_eq!(me(), 2);
// Receiver doesn't know about the sender yet
check_clock(|i, c| (c > 0) == (i == 0));
rx.recv().unwrap();
// The sender has sent a message, so its clock is nonzero
let c1 = current::clock().get(1);
assert!(c1 > 0);
rx.recv().unwrap();
// The sender has sent another message, so its clock has increased
assert!(current::clock().get(2) > c1);
// Receive the remaining messages
for _ in 0..BOUND {
rx.recv().unwrap();
}
});
},
None,
);
}
#[test]
fn clock_mpsc_rendezvous() {
check_dfs(
|| {
let (tx, rx) = sync_channel::<()>(0);
thread::spawn(move || {
assert_eq!(me(), 1);
// At this point the sender doesn't know about the receiver
check_clock(|i, c| (c > 0) == (i == 0));
tx.send(()).unwrap();
// Since this is a rendezvous channel, and we successfully sent a message, we know about the receiver
let c1 = current::clock().get(2);
assert!(c1 > 0);
tx.send(()).unwrap();
// After the 2nd rendezvous, the receiver's clock has increased
assert!(current::clock().get(2) > c1);
});
thread::spawn(move || {
assert_eq!(me(), 2);
// At this point the receiver doesn't know about the sender
check_clock(|i, c| (c > 0) == (i == 0));
rx.recv().unwrap();
// Since we received a message, we know about the sender
let c1 = current::clock().get(1);
assert!(c1 > 0);
rx.recv().unwrap();
// After the 2nd rendezvous, the sender's clock has increased
assert!(current::clock().get(1) > c1);
});
},
None,
);
}
// Threads
fn clock_threads(num_threads: usize) {
// Use an AtomicBool to create a synchronization point so a thread's clock is incremented.
let flag = Arc::new(AtomicBool::new(false));
let handles = (1..num_threads + 1)
.map(|k| {
let flag = Arc::clone(&flag);
thread::spawn(move || {
assert_eq!(me(), k);
check_clock(|i, c| (c > 0) == (i == 0));
assert!(!flag.load(Ordering::SeqCst));
check_clock(|i, c| (c > 0) == (i == 0) || (i == k));
k
})
})
.collect::<Vec<_>>();
// As each thread joins, we get knowledge of its vector clock.
for handle in handles {
let k = handle.join().unwrap();
check_clock(move |i, c| (c > 0) == (i <= k));
}
}
#[test]
fn clock_threads_dfs() {
check_dfs(|| clock_threads(2), None);
}
#[test]
fn clock_threads_pct() {
check_pct(|| clock_threads(20), 10_000, 3);
}
#[test]
fn clock_fetch_update() {
// Ensure that when a fetch_update fails, the caller does not inherit the clock from the register.
check_dfs(
|| {
let n = Arc::new(AtomicU32::new(0));
{
let n = Arc::clone(&n);
thread::spawn(move || {
let _ = n.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None);
});
}
let _ = n.load(Ordering::SeqCst);
// Note that we are using check_dfs, so there are executions where the fetch_update happens before this
// load. But the load above never causally depends on the spawned thread's clock, since it never managed to
// store a value into the register.
check_clock(|i, c| (c > 0) == (i == 0));
},
None,
);
}
fn clock_once(num_threads: usize) {
let once = Arc::new(Once::new());
let init = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let threads = (0..num_threads)
.map(|_| {
let once = Arc::clone(&once);
let init = Arc::clone(&init);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
once.call_once(|| init.store(me(), std::sync::atomic::Ordering::SeqCst));
let who_inited = init.load(std::sync::atomic::Ordering::SeqCst);
// should have inhaled the clock of the thread that inited the Once, but might also
// have inhaled the clocks of threads that we were racing with for initialization
check_clock(|i, c| !(i == who_inited || i == 0 || i == me()) || c > 0);
})
})
.collect::<Vec<_>>();
for thd in threads {
thd.join().unwrap();
}
}
#[test]
fn clock_once_dfs() {
check_dfs(|| clock_once(2), None);
}
#[test]
fn clock_once_pct() {
check_pct(|| clock_once(20), 10_000, 3);
}