futures_executor/
unpark_mutex.rs

1use std::cell::UnsafeCell;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering::SeqCst;
4
5/// A "lock" around data `D`, which employs a *helping* strategy.
6///
7/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being
8/// invoked on only a single thread at a time (2) `poll` being invoked at least
9/// once after each `unpark` (unless the future has completed).
10pub(crate) struct UnparkMutex<D> {
11    // The state of task execution (state machine described below)
12    status: AtomicUsize,
13
14    // The actual task data, accessible only in the POLLING state
15    inner: UnsafeCell<Option<D>>,
16}
17
18// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on
19// acquisition failure, the current lock holder performs the desired work --
20// re-polling.
21//
22// As such, these impls mirror those for `Mutex<D>`. In particular, a reference
23// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which
24// must therefore be `Send`.
25unsafe impl<D: Send> Send for UnparkMutex<D> {}
26unsafe impl<D: Send> Sync for UnparkMutex<D> {}
27
28// There are four possible task states, listed below with their possible
29// transitions:
30
31// The task is blocked, waiting on an event
32const WAITING: usize = 0;       // --> POLLING
33
34// The task is actively being polled by a thread; arrival of additional events
35// of interest should move it to the REPOLL state
36const POLLING: usize = 1;       // --> WAITING, REPOLL, or COMPLETE
37
38// The task is actively being polled, but will need to be re-polled upon
39// completion to ensure that all events were observed.
40const REPOLL: usize = 2;        // --> POLLING
41
42// The task has finished executing (either successfully or with an error/panic)
43const COMPLETE: usize = 3;      // No transitions out
44
45impl<D> UnparkMutex<D> {
46    pub(crate) fn new() -> UnparkMutex<D> {
47        UnparkMutex {
48            status: AtomicUsize::new(WAITING),
49            inner: UnsafeCell::new(None),
50        }
51    }
52
53    /// Attempt to "notify" the mutex that a poll should occur.
54    ///
55    /// An `Ok` result indicates that the `POLLING` state has been entered, and
56    /// the caller can proceed to poll the future. An `Err` result indicates
57    /// that polling is not necessary (because the task is finished or the
58    /// polling has been delegated).
59    pub(crate) fn notify(&self) -> Result<D, ()> {
60        let mut status = self.status.load(SeqCst);
61        loop {
62            match status {
63                // The task is idle, so try to run it immediately.
64                WAITING => {
65                    match self.status.compare_exchange(WAITING, POLLING,
66                                                       SeqCst, SeqCst) {
67                        Ok(_) => {
68                            let data = unsafe {
69                                // SAFETY: we've ensured mutual exclusion via
70                                // the status protocol; we are the only thread
71                                // that has transitioned to the POLLING state,
72                                // and we won't transition back to QUEUED until
73                                // the lock is "released" by this thread. See
74                                // the protocol diagram above.
75                                (*self.inner.get()).take().unwrap()
76                            };
77                            return Ok(data);
78                        }
79                        Err(cur) => status = cur,
80                    }
81                }
82
83                // The task is being polled, so we need to record that it should
84                // be *repolled* when complete.
85                POLLING => {
86                    match self.status.compare_exchange(POLLING, REPOLL,
87                                                       SeqCst, SeqCst) {
88                        Ok(_) => return Err(()),
89                        Err(cur) => status = cur,
90                    }
91                }
92
93                // The task is already scheduled for polling, or is complete, so
94                // we've got nothing to do.
95                _ => return Err(()),
96            }
97        }
98    }
99
100    /// Alert the mutex that polling is about to begin, clearing any accumulated
101    /// re-poll requests.
102    ///
103    /// # Safety
104    ///
105    /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
106    /// successful calls to `notify` and `wait`/`complete`.
107    pub(crate) unsafe fn start_poll(&self) {
108        self.status.store(POLLING, SeqCst);
109    }
110
111    /// Alert the mutex that polling completed with NotReady.
112    ///
113    /// # Safety
114    ///
115    /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
116    /// successful calls to `notify` and `wait`/`complete`.
117    pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> {
118        *self.inner.get() = Some(data);
119
120        match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) {
121            // no unparks came in while we were running
122            Ok(_) => Ok(()),
123
124            // guaranteed to be in REPOLL state; just clobber the
125            // state and run again.
126            Err(status) => {
127                assert_eq!(status, REPOLL);
128                self.status.store(POLLING, SeqCst);
129                Err((*self.inner.get()).take().unwrap())
130            }
131        }
132    }
133
134    /// Alert the mutex that the task has completed execution and should not be
135    /// notified again.
136    ///
137    /// # Safety
138    ///
139    /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
140    /// successful calls to `notify` and `wait`/`complete`.
141    pub(crate) unsafe fn complete(&self) {
142        self.status.store(COMPLETE, SeqCst);
143    }
144}