maniac_runtime/utils/
parking.rs

1//! Thread parking and unparking.
2//!
3//! A [`Parker`] is in either the notified or unnotified state. The [`park()`][`Parker::park()`] method blocks
4//! the current thread until the [`Parker`] becomes notified and then puts it back into the unnotified
5//! state. The [`unpark()`][`Unparker::unpark()`] method puts it into the notified state.
6//!
7//! This API is similar to [`thread::park()`] and [`Thread::unpark()`] from the standard library.
8//! The difference is that the state "token" managed by those functions is shared across an entire
9//! thread, and anyone can call [`thread::current()`] to access it. If you use `park` and `unpark`,
10//! but you also call a function that uses `park` and `unpark` internally, that function could
11//! cause a deadlock by consuming a wakeup that was intended for you. The [`Parker`] object in this
12//! crate avoids that problem by managing its own state, which isn't shared with unrelated callers.
13//!
14//! [`thread::park()`]: https://doc.rust-lang.org/std/thread/fn.park.html
15//! [`Thread::unpark()`]: https://doc.rust-lang.org/std/thread/struct.Thread.html#method.unpark
16//! [`thread::current()`]: https://doc.rust-lang.org/std/thread/fn.current.html
17//!
18//! # Examples
19//!
20//! ```
21//! use std::thread;
22//! use std::time::Duration;
23//! use parking::Parker;
24//!
25//! let p = Parker::new();
26//! let u = p.unparker();
27//!
28//! // Notify the parker.
29//! u.unpark();
30//!
31//! // Wakes up immediately because the parker is notified.
32//! p.park();
33//!
34//! thread::spawn(move || {
35//!     thread::sleep(Duration::from_millis(500));
36//!     u.unpark();
37//! });
38//!
39//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
40//! p.park();
41//! ```
42
43#![forbid(unsafe_code)]
44#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
45
46// #[cfg(not(all(loom, feature = "loom")))]
47use std::sync;
48
49// #[cfg(all(loom, feature = "loom"))]
50// use loom::sync;
51
52use std::cell::Cell;
53use std::fmt;
54use std::marker::PhantomData;
55use std::sync::Arc;
56use std::task::{Wake, Waker};
57use std::time::Duration;
58
59// #[cfg(not(all(loom, feature = "loom")))]
60use std::time::Instant;
61
62use sync::atomic::AtomicUsize;
63use sync::atomic::Ordering::SeqCst;
64use sync::{Condvar, Mutex};
65
66/// Creates a parker and an associated unparker.
67///
68/// # Examples
69///
70/// ```
71/// let (p, u) = parking::pair();
72/// ```
73pub fn pair() -> (Parker, Unparker) {
74    let p = Parker::new();
75    let u = p.unparker();
76    (p, u)
77}
78
79/// Waits for a notification.
80pub struct Parker {
81    unparker: Unparker,
82    _marker: PhantomData<Cell<()>>,
83}
84
85impl Parker {
86    /// Creates a new parker.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use parking::Parker;
92    ///
93    /// let p = Parker::new();
94    /// ```
95    ///
96    pub fn new() -> Parker {
97        Parker {
98            unparker: Unparker {
99                inner: Arc::new(Inner {
100                    state: AtomicUsize::new(EMPTY),
101                    lock: Mutex::new(()),
102                    cvar: Condvar::new(),
103                }),
104            },
105            _marker: PhantomData,
106        }
107    }
108
109    /// Blocks until notified and then goes back into unnotified state.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// use parking::Parker;
115    ///
116    /// let p = Parker::new();
117    /// let u = p.unparker();
118    ///
119    /// // Notify the parker.
120    /// u.unpark();
121    ///
122    /// // Wakes up immediately because the parker is notified.
123    /// p.park();
124    /// ```
125    pub fn park(&self) {
126        self.unparker.inner.park(None);
127    }
128
129    /// Blocks until notified and then goes back into unnotified state, or times out after
130    /// `duration`.
131    ///
132    /// Returns `true` if notified before the timeout.
133    ///
134    /// # Examples
135    ///
136    /// ```
137    /// use std::time::Duration;
138    /// use parking::Parker;
139    ///
140    /// let p = Parker::new();
141    ///
142    /// // Wait for a notification, or time out after 500 ms.
143    /// p.park_timeout(Duration::from_millis(500));
144    /// ```
145    #[cfg(not(loom))]
146    pub fn park_timeout(&self, duration: Duration) -> bool {
147        self.unparker.inner.park(Some(duration))
148    }
149
150    /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
151    ///
152    /// Returns `true` if notified before the deadline.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// use std::time::{Duration, Instant};
158    /// use parking::Parker;
159    ///
160    /// let p = Parker::new();
161    ///
162    /// // Wait for a notification, or time out after 500 ms.
163    /// p.park_deadline(Instant::now() + Duration::from_millis(500));
164    /// ```
165    #[cfg(not(loom))]
166    pub fn park_deadline(&self, instant: Instant) -> bool {
167        self.unparker
168            .inner
169            .park(Some(instant.saturating_duration_since(Instant::now())))
170    }
171
172    /// Notifies the parker.
173    ///
174    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
175    /// was already notified.
176    ///
177    /// # Examples
178    ///
179    /// ```
180    /// use std::thread;
181    /// use std::time::Duration;
182    /// use parking::Parker;
183    ///
184    /// let p = Parker::new();
185    ///
186    /// assert_eq!(p.unpark(), true);
187    /// assert_eq!(p.unpark(), false);
188    ///
189    /// // Wakes up immediately.
190    /// p.park();
191    /// ```
192    pub fn unpark(&self) -> bool {
193        self.unparker.unpark()
194    }
195
196    /// Returns a handle for unparking.
197    ///
198    /// The returned [`Unparker`] can be cloned and shared among threads.
199    ///
200    /// # Examples
201    ///
202    /// ```
203    /// use parking::Parker;
204    ///
205    /// let p = Parker::new();
206    /// let u = p.unparker();
207    ///
208    /// // Notify the parker.
209    /// u.unpark();
210    ///
211    /// // Wakes up immediately because the parker is notified.
212    /// p.park();
213    /// ```
214    pub fn unparker(&self) -> Unparker {
215        self.unparker.clone()
216    }
217}
218
219impl Default for Parker {
220    fn default() -> Parker {
221        Parker::new()
222    }
223}
224
225impl fmt::Debug for Parker {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        f.pad("Parker { .. }")
228    }
229}
230
231/// Notifies a parker.
232pub struct Unparker {
233    inner: Arc<Inner>,
234}
235
236impl Unparker {
237    /// Notifies the associated parker.
238    ///
239    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
240    /// was already notified.
241    ///
242    /// # Examples
243    ///
244    /// ```
245    /// use std::thread;
246    /// use std::time::Duration;
247    /// use parking::Parker;
248    ///
249    /// let p = Parker::new();
250    /// let u = p.unparker();
251    ///
252    /// thread::spawn(move || {
253    ///     thread::sleep(Duration::from_millis(500));
254    ///     u.unpark();
255    /// });
256    ///
257    /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
258    /// p.park();
259    /// ```
260    pub fn unpark(&self) -> bool {
261        self.inner.unpark()
262    }
263
264    /// Indicates whether this unparker will unpark the associated parker.
265    ///
266    /// This can be used to avoid unnecessary work before calling `unpark()`.
267    ///
268    /// # Examples
269    ///
270    /// ```
271    /// use parking::Parker;
272    ///
273    /// let p = Parker::new();
274    /// let u = p.unparker();
275    ///
276    /// assert!(u.will_unpark(&p));
277    /// ```
278    pub fn will_unpark(&self, parker: &Parker) -> bool {
279        Arc::ptr_eq(&self.inner, &parker.unparker.inner)
280    }
281
282    /// Indicates whether two unparkers will unpark the same parker.
283    ///
284    /// # Examples
285    ///
286    /// ```
287    /// use parking::Parker;
288    ///
289    /// let p = Parker::new();
290    /// let u1 = p.unparker();
291    /// let u2 = p.unparker();
292    ///
293    /// assert!(u1.same_parker(&u2));
294    /// ```
295    pub fn same_parker(&self, other: &Unparker) -> bool {
296        Arc::ptr_eq(&self.inner, &other.inner)
297    }
298}
299
300impl fmt::Debug for Unparker {
301    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302        f.pad("Unparker { .. }")
303    }
304}
305
306impl Clone for Unparker {
307    fn clone(&self) -> Unparker {
308        Unparker {
309            inner: self.inner.clone(),
310        }
311    }
312}
313
314impl From<Unparker> for Waker {
315    fn from(up: Unparker) -> Self {
316        Waker::from(up.inner)
317    }
318}
319
320const EMPTY: usize = 0;
321const PARKED: usize = 1;
322const NOTIFIED: usize = 2;
323
324struct Inner {
325    state: AtomicUsize,
326    lock: Mutex<()>,
327    cvar: Condvar,
328}
329
330impl Inner {
331    fn park(&self, timeout: Option<Duration>) -> bool {
332        // If we were previously notified then we consume this notification and return quickly.
333        if self
334            .state
335            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
336            .is_ok()
337        {
338            return true;
339        }
340
341        // If the timeout is zero, then there is no need to actually block.
342        if let Some(dur) = timeout {
343            if dur == Duration::from_millis(0) {
344                return false;
345            }
346        }
347
348        // Otherwise we need to coordinate going to sleep.
349        let mut m = self.lock.lock().unwrap();
350
351        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
352            Ok(_) => {}
353            // Consume this notification to avoid spurious wakeups in the next park.
354            Err(NOTIFIED) => {
355                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
356                // because `unpark` may have been called again since we read `NOTIFIED` in the
357                // `compare_exchange` above. We must perform an acquire operation that synchronizes
358                // with that `unpark` to observe any writes it made before the call to `unpark`. To
359                // do that we must read from the write it made to `state`.
360                let old = self.state.swap(EMPTY, SeqCst);
361                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
362                return true;
363            }
364            Err(n) => panic!("inconsistent park_timeout state: {}", n),
365        }
366
367        match timeout {
368            None => {
369                loop {
370                    // Block the current thread on the conditional variable.
371                    m = self.cvar.wait(m).unwrap();
372
373                    if self
374                        .state
375                        .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
376                        .is_ok()
377                    {
378                        // got a notification
379                        return true;
380                    }
381                }
382            }
383            Some(timeout) => {
384                #[cfg(not(loom))]
385                {
386                    // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
387                    // notification we just want to unconditionally set `state` back to `EMPTY`, either
388                    // consuming a notification or un-flagging ourselves as parked.
389                    let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
390
391                    match self.state.swap(EMPTY, SeqCst) {
392                        NOTIFIED => true, // got a notification
393                        PARKED => false,  // no notification
394                        n => panic!("inconsistent park_timeout state: {}", n),
395                    }
396                }
397
398                #[cfg(loom)]
399                {
400                    let _ = timeout;
401                    panic!("park_timeout is not supported under loom");
402                }
403            }
404        }
405    }
406
407    pub fn unpark(&self) -> bool {
408        // To ensure the unparked thread will observe any writes we made before this call, we must
409        // perform a release operation that `park` can synchronize with. To do that we must write
410        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
411        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
412        match self.state.swap(NOTIFIED, SeqCst) {
413            EMPTY => return true,     // no one was waiting
414            NOTIFIED => return false, // already unparked
415            PARKED => {}              // gotta go wake someone up
416            _ => panic!("inconsistent state in unpark"),
417        }
418
419        // There is a period between when the parked thread sets `state` to `PARKED` (or last
420        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
421        // If we were to notify during this period it would be ignored and then when the parked
422        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
423        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
424        //
425        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
426        // it doesn't get woken only to have to wait for us to release `lock`.
427        drop(self.lock.lock().unwrap());
428        self.cvar.notify_one();
429        true
430    }
431}
432
433impl Wake for Inner {
434    #[inline]
435    fn wake(self: Arc<Self>) {
436        self.unpark();
437    }
438
439    #[inline]
440    fn wake_by_ref(self: &Arc<Self>) {
441        self.unpark();
442    }
443}