1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
pub fn pair() -> (Parker, Unparker) {
let p = Parker::new();
let u = p.unparker();
(p, u)
}
pub struct Parker {
unparker: Unparker,
}
impl Parker {
pub fn new() -> Parker {
Parker {
unparker: Unparker {
inner: Arc::new(Inner {
state: AtomicUsize::new(0),
lock: Mutex::new(()),
cvar: Condvar::new(),
}),
},
}
}
pub fn park(&self) -> bool {
self.unparker.inner.park(None)
}
pub fn park_timeout(&self, timeout: Option<Duration>) -> bool {
self.unparker.inner.park(timeout)
}
pub fn unparker(&self) -> Unparker {
self.unparker.clone()
}
}
impl Unparker {
pub fn unpark(&self) -> bool {
self.inner.unpark()
}
}
impl Clone for Unparker {
fn clone(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
}
pub struct Unparker {
inner: Arc<Inner>,
}
const EMPTY: usize = 0;
const PARKED: usize = 1;
const NOTIFIED: usize = 2;
struct Inner {
state: AtomicUsize,
lock: Mutex<()>,
cvar: Condvar,
}
impl Inner {
fn park(&self, timeout: Option<Duration>) -> bool {
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return true;
}
if let Some(d) = timeout {
if d == Duration::from_secs(0) {
return false;
}
}
let mut m = self.lock.lock().unwrap();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return true;
}
Err(_) => panic!("invalid park state"),
}
match timeout {
None => loop {
m = self.cvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return true; // got a notification
}
},
Some(d) => {
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
// notification we just want to unconditionally set `state` back to `EMPTY`, either
// consuming a notification or un-flagging ourselves as parked.
let (_m, _result) = self.cvar.wait_timeout(m, d).unwrap();
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => true, // got a notification
PARKED => false, // no notification
n => panic!("inconsistent park_timeout state: {}", n),
}
}
}
}
fn unpark(&self) -> bool {
// To ensure the unparked thread will observe any writes we made before this call, we must
// perform a release operation that `park` can synchronize with. To do that we must write
// `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
// than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return true, // no one was waiting
NOTIFIED => return false, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}
// There is a period between when the parked thread sets `state` to `PARKED` (or last
// checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
// If we were to notify during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
// stage so we can acquire `lock` to wait until it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
// it doesn't get woken only to have to wait for us to release `lock`.
drop(self.lock.lock().unwrap());
self.cvar.notify_one();
true
}
}