maniac_runtime/utils/parking.rs
1//! Thread parking and unparking.
2//!
3//! A [`Parker`] is in either the notified or unnotified state. The [`park()`][`Parker::park()`] method blocks
4//! the current thread until the [`Parker`] becomes notified and then puts it back into the unnotified
5//! state. The [`unpark()`][`Unparker::unpark()`] method puts it into the notified state.
6//!
7//! This API is similar to [`thread::park()`] and [`Thread::unpark()`] from the standard library.
8//! The difference is that the state "token" managed by those functions is shared across an entire
9//! thread, and anyone can call [`thread::current()`] to access it. If you use `park` and `unpark`,
10//! but you also call a function that uses `park` and `unpark` internally, that function could
11//! cause a deadlock by consuming a wakeup that was intended for you. The [`Parker`] object in this
12//! crate avoids that problem by managing its own state, which isn't shared with unrelated callers.
13//!
14//! [`thread::park()`]: https://doc.rust-lang.org/std/thread/fn.park.html
15//! [`Thread::unpark()`]: https://doc.rust-lang.org/std/thread/struct.Thread.html#method.unpark
16//! [`thread::current()`]: https://doc.rust-lang.org/std/thread/fn.current.html
17//!
18//! # Examples
19//!
20//! ```
21//! use std::thread;
22//! use std::time::Duration;
23//! use parking::Parker;
24//!
25//! let p = Parker::new();
26//! let u = p.unparker();
27//!
28//! // Notify the parker.
29//! u.unpark();
30//!
31//! // Wakes up immediately because the parker is notified.
32//! p.park();
33//!
34//! thread::spawn(move || {
35//! thread::sleep(Duration::from_millis(500));
36//! u.unpark();
37//! });
38//!
39//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
40//! p.park();
41//! ```
42
43#![forbid(unsafe_code)]
44#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
45
46// #[cfg(not(all(loom, feature = "loom")))]
47use std::sync;
48
49// #[cfg(all(loom, feature = "loom"))]
50// use loom::sync;
51
52use std::cell::Cell;
53use std::fmt;
54use std::marker::PhantomData;
55use std::sync::Arc;
56use std::task::{Wake, Waker};
57use std::time::Duration;
58
59// #[cfg(not(all(loom, feature = "loom")))]
60use std::time::Instant;
61
62use sync::atomic::AtomicUsize;
63use sync::atomic::Ordering::SeqCst;
64use sync::{Condvar, Mutex};
65
66/// Creates a parker and an associated unparker.
67///
68/// # Examples
69///
70/// ```
71/// let (p, u) = parking::pair();
72/// ```
73pub fn pair() -> (Parker, Unparker) {
74 let p = Parker::new();
75 let u = p.unparker();
76 (p, u)
77}
78
79/// Waits for a notification.
80pub struct Parker {
81 unparker: Unparker,
82 _marker: PhantomData<Cell<()>>,
83}
84
85impl Parker {
86 /// Creates a new parker.
87 ///
88 /// # Examples
89 ///
90 /// ```
91 /// use parking::Parker;
92 ///
93 /// let p = Parker::new();
94 /// ```
95 ///
96 pub fn new() -> Parker {
97 Parker {
98 unparker: Unparker {
99 inner: Arc::new(Inner {
100 state: AtomicUsize::new(EMPTY),
101 lock: Mutex::new(()),
102 cvar: Condvar::new(),
103 }),
104 },
105 _marker: PhantomData,
106 }
107 }
108
109 /// Blocks until notified and then goes back into unnotified state.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// use parking::Parker;
115 ///
116 /// let p = Parker::new();
117 /// let u = p.unparker();
118 ///
119 /// // Notify the parker.
120 /// u.unpark();
121 ///
122 /// // Wakes up immediately because the parker is notified.
123 /// p.park();
124 /// ```
125 pub fn park(&self) {
126 self.unparker.inner.park(None);
127 }
128
129 /// Blocks until notified and then goes back into unnotified state, or times out after
130 /// `duration`.
131 ///
132 /// Returns `true` if notified before the timeout.
133 ///
134 /// # Examples
135 ///
136 /// ```
137 /// use std::time::Duration;
138 /// use parking::Parker;
139 ///
140 /// let p = Parker::new();
141 ///
142 /// // Wait for a notification, or time out after 500 ms.
143 /// p.park_timeout(Duration::from_millis(500));
144 /// ```
145 #[cfg(not(loom))]
146 pub fn park_timeout(&self, duration: Duration) -> bool {
147 self.unparker.inner.park(Some(duration))
148 }
149
150 /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
151 ///
152 /// Returns `true` if notified before the deadline.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use std::time::{Duration, Instant};
158 /// use parking::Parker;
159 ///
160 /// let p = Parker::new();
161 ///
162 /// // Wait for a notification, or time out after 500 ms.
163 /// p.park_deadline(Instant::now() + Duration::from_millis(500));
164 /// ```
165 #[cfg(not(loom))]
166 pub fn park_deadline(&self, instant: Instant) -> bool {
167 self.unparker
168 .inner
169 .park(Some(instant.saturating_duration_since(Instant::now())))
170 }
171
172 /// Notifies the parker.
173 ///
174 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
175 /// was already notified.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use std::thread;
181 /// use std::time::Duration;
182 /// use parking::Parker;
183 ///
184 /// let p = Parker::new();
185 ///
186 /// assert_eq!(p.unpark(), true);
187 /// assert_eq!(p.unpark(), false);
188 ///
189 /// // Wakes up immediately.
190 /// p.park();
191 /// ```
192 pub fn unpark(&self) -> bool {
193 self.unparker.unpark()
194 }
195
196 /// Returns a handle for unparking.
197 ///
198 /// The returned [`Unparker`] can be cloned and shared among threads.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// use parking::Parker;
204 ///
205 /// let p = Parker::new();
206 /// let u = p.unparker();
207 ///
208 /// // Notify the parker.
209 /// u.unpark();
210 ///
211 /// // Wakes up immediately because the parker is notified.
212 /// p.park();
213 /// ```
214 pub fn unparker(&self) -> Unparker {
215 self.unparker.clone()
216 }
217}
218
219impl Default for Parker {
220 fn default() -> Parker {
221 Parker::new()
222 }
223}
224
225impl fmt::Debug for Parker {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 f.pad("Parker { .. }")
228 }
229}
230
231/// Notifies a parker.
232pub struct Unparker {
233 inner: Arc<Inner>,
234}
235
236impl Unparker {
237 /// Notifies the associated parker.
238 ///
239 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
240 /// was already notified.
241 ///
242 /// # Examples
243 ///
244 /// ```
245 /// use std::thread;
246 /// use std::time::Duration;
247 /// use parking::Parker;
248 ///
249 /// let p = Parker::new();
250 /// let u = p.unparker();
251 ///
252 /// thread::spawn(move || {
253 /// thread::sleep(Duration::from_millis(500));
254 /// u.unpark();
255 /// });
256 ///
257 /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
258 /// p.park();
259 /// ```
260 pub fn unpark(&self) -> bool {
261 self.inner.unpark()
262 }
263
264 /// Indicates whether this unparker will unpark the associated parker.
265 ///
266 /// This can be used to avoid unnecessary work before calling `unpark()`.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use parking::Parker;
272 ///
273 /// let p = Parker::new();
274 /// let u = p.unparker();
275 ///
276 /// assert!(u.will_unpark(&p));
277 /// ```
278 pub fn will_unpark(&self, parker: &Parker) -> bool {
279 Arc::ptr_eq(&self.inner, &parker.unparker.inner)
280 }
281
282 /// Indicates whether two unparkers will unpark the same parker.
283 ///
284 /// # Examples
285 ///
286 /// ```
287 /// use parking::Parker;
288 ///
289 /// let p = Parker::new();
290 /// let u1 = p.unparker();
291 /// let u2 = p.unparker();
292 ///
293 /// assert!(u1.same_parker(&u2));
294 /// ```
295 pub fn same_parker(&self, other: &Unparker) -> bool {
296 Arc::ptr_eq(&self.inner, &other.inner)
297 }
298}
299
300impl fmt::Debug for Unparker {
301 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302 f.pad("Unparker { .. }")
303 }
304}
305
306impl Clone for Unparker {
307 fn clone(&self) -> Unparker {
308 Unparker {
309 inner: self.inner.clone(),
310 }
311 }
312}
313
314impl From<Unparker> for Waker {
315 fn from(up: Unparker) -> Self {
316 Waker::from(up.inner)
317 }
318}
319
320const EMPTY: usize = 0;
321const PARKED: usize = 1;
322const NOTIFIED: usize = 2;
323
324struct Inner {
325 state: AtomicUsize,
326 lock: Mutex<()>,
327 cvar: Condvar,
328}
329
330impl Inner {
331 fn park(&self, timeout: Option<Duration>) -> bool {
332 // If we were previously notified then we consume this notification and return quickly.
333 if self
334 .state
335 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
336 .is_ok()
337 {
338 return true;
339 }
340
341 // If the timeout is zero, then there is no need to actually block.
342 if let Some(dur) = timeout {
343 if dur == Duration::from_millis(0) {
344 return false;
345 }
346 }
347
348 // Otherwise we need to coordinate going to sleep.
349 let mut m = self.lock.lock().unwrap();
350
351 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
352 Ok(_) => {}
353 // Consume this notification to avoid spurious wakeups in the next park.
354 Err(NOTIFIED) => {
355 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
356 // because `unpark` may have been called again since we read `NOTIFIED` in the
357 // `compare_exchange` above. We must perform an acquire operation that synchronizes
358 // with that `unpark` to observe any writes it made before the call to `unpark`. To
359 // do that we must read from the write it made to `state`.
360 let old = self.state.swap(EMPTY, SeqCst);
361 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
362 return true;
363 }
364 Err(n) => panic!("inconsistent park_timeout state: {}", n),
365 }
366
367 match timeout {
368 None => {
369 loop {
370 // Block the current thread on the conditional variable.
371 m = self.cvar.wait(m).unwrap();
372
373 if self
374 .state
375 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
376 .is_ok()
377 {
378 // got a notification
379 return true;
380 }
381 }
382 }
383 Some(timeout) => {
384 #[cfg(not(loom))]
385 {
386 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
387 // notification we just want to unconditionally set `state` back to `EMPTY`, either
388 // consuming a notification or un-flagging ourselves as parked.
389 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
390
391 match self.state.swap(EMPTY, SeqCst) {
392 NOTIFIED => true, // got a notification
393 PARKED => false, // no notification
394 n => panic!("inconsistent park_timeout state: {}", n),
395 }
396 }
397
398 #[cfg(loom)]
399 {
400 let _ = timeout;
401 panic!("park_timeout is not supported under loom");
402 }
403 }
404 }
405 }
406
407 pub fn unpark(&self) -> bool {
408 // To ensure the unparked thread will observe any writes we made before this call, we must
409 // perform a release operation that `park` can synchronize with. To do that we must write
410 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
411 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
412 match self.state.swap(NOTIFIED, SeqCst) {
413 EMPTY => return true, // no one was waiting
414 NOTIFIED => return false, // already unparked
415 PARKED => {} // gotta go wake someone up
416 _ => panic!("inconsistent state in unpark"),
417 }
418
419 // There is a period between when the parked thread sets `state` to `PARKED` (or last
420 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
421 // If we were to notify during this period it would be ignored and then when the parked
422 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
423 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
424 //
425 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
426 // it doesn't get woken only to have to wait for us to release `lock`.
427 drop(self.lock.lock().unwrap());
428 self.cvar.notify_one();
429 true
430 }
431}
432
433impl Wake for Inner {
434 #[inline]
435 fn wake(self: Arc<Self>) {
436 self.unpark();
437 }
438
439 #[inline]
440 fn wake_by_ref(self: &Arc<Self>) {
441 self.unpark();
442 }
443}