koper/thread/
park.rs

1use crate::either::Either;
2
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Condvar, Mutex};
5use std::sync::atomic::Ordering::SeqCst;
6use std::time::Duration;
7
8
9/// Block the current thread.
10///
11/// See [module documentation][mod] for more details.
12///
13/// [mod]: ../index.html
14pub trait Park {
15    /// Unpark handle type for the `Park` implementation.
16    type Unpark: Unpark;
17
18    /// Error returned by `park`
19    type Error;
20
21    /// Get a new `Unpark` handle associated with this `Park` instance.
22    fn unpark(&self) -> Self::Unpark;
23
24    /// Block the current thread unless or until the token is available.
25    ///
26    /// A call to `park` does not guarantee that the thread will remain blocked
27    /// forever, and callers should be prepared for this possibility. This
28    /// function may wakeup spuriously for any reason.
29    ///
30    /// See [module documentation][mod] for more details.
31    ///
32    /// # Panics
33    ///
34    /// This function **should** not panic, but ultimately, panics are left as
35    /// an implementation detail. Refer to the documentation for the specific
36    /// `Park` implementation
37    ///
38    /// [mod]: ../index.html
39    fn park(&mut self) -> Result<(), Self::Error>;
40
41    /// Park the current thread for at most `duration`.
42    ///
43    /// This function is the same as `park` but allows specifying a maximum time
44    /// to block the thread for.
45    ///
46    /// Same as `park`, there is no guarantee that the thread will remain
47    /// blocked for any amount of time. Spurious wakeups are permitted for any
48    /// reason.
49    ///
50    /// See [module documentation][mod] for more details.
51    ///
52    /// # Panics
53    ///
54    /// This function **should** not panic, but ultimately, panics are left as
55    /// an implementation detail. Refer to the documentation for the specific
56    /// `Park` implementation
57    ///
58    /// [mod]: ../index.html
59    fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;
60}
61
62/// Unblock a thread blocked by the associated [`Park`] instance.
63///
64/// See [module documentation][mod] for more details.
65///
66/// [mod]: ../index.html
67/// [`Park`]: trait.Park.html
68pub trait Unpark: Sync + Send + 'static {
69    /// Unblock a thread that is blocked by the associated `Park` handle.
70    ///
71    /// Calling `unpark` atomically makes available the unpark token, if it is
72    /// not already available.
73    ///
74    /// See [module documentation][mod] for more details.
75    ///
76    /// # Panics
77    ///
78    /// This function **should** not panic, but ultimately, panics are left as
79    /// an implementation detail. Refer to the documentation for the specific
80    /// `Unpark` implementation
81    ///
82    /// [mod]: ../index.html
83    fn unpark(&self);
84}
85
86impl Unpark for Box<dyn Unpark> {
87    fn unpark(&self) {
88        (**self).unpark()
89    }
90}
91
92impl Unpark for Arc<dyn Unpark> {
93    fn unpark(&self) {
94        (**self).unpark()
95    }
96}
97
98
99#[derive(Debug)]
100pub struct ParkThread {
101    inner: Arc<Inner>,
102}
103
104/// Error returned by [`ParkThread`]
105///
106/// This currently is never returned, but might at some point in the future.
107///
108/// [`ParkThread`]: struct.ParkThread.html
109#[derive(Debug)]
110pub struct ParkError {
111    _p: (),
112}
113
114/// Unblocks a thread that was blocked by `ParkThread`.
115#[derive(Clone, Debug)]
116pub struct UnparkThread {
117    inner: Arc<Inner>,
118}
119
120#[derive(Debug)]
121struct Inner {
122    state: AtomicUsize,
123    mutex: Mutex<()>,
124    condvar: Condvar,
125}
126
127const EMPTY: usize = 0;
128const PARKED: usize = 1;
129const NOTIFIED: usize = 2;
130
131thread_local! {
132    static CURRENT_PARKER: ParkThread = ParkThread::new();
133}
134
135// ==== impl ParkThread ====
136
137impl ParkThread {
138    pub fn new() -> Self {
139        Self {
140            inner: Arc::new(Inner {
141                state: AtomicUsize::new(EMPTY),
142                mutex: Mutex::new(()),
143                condvar: Condvar::new(),
144            }),
145        }
146    }
147}
148
149impl Park for ParkThread {
150    type Unpark = UnparkThread;
151    type Error = ParkError;
152
153    fn unpark(&self) -> Self::Unpark {
154        let inner = self.inner.clone();
155        UnparkThread { inner }
156    }
157
158    fn park(&mut self) -> Result<(), Self::Error> {
159        self.inner.park();
160        Ok(())
161    }
162
163    fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
164        self.inner.park_timeout(duration);
165        Ok(())
166    }
167}
168
169// ==== impl Inner ====
170
171impl Inner {
172    /// Park the current thread for at most `dur`.
173    fn park(&self) {
174        // If we were previously notified then we consume this notification and
175        // return quickly.
176        if self
177            .state
178            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
179            .is_ok()
180        {
181            return;
182        }
183
184        // Otherwise we need to coordinate going to sleep
185        let mut m = self.mutex.lock().unwrap();
186
187        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
188            Ok(_) => {}
189            Err(NOTIFIED) => {
190                // We must read here, even though we know it will be `NOTIFIED`.
191                // This is because `unpark` may have been called again since we read
192                // `NOTIFIED` in the `compare_exchange` above. We must perform an
193                // acquire operation that synchronizes with that `unpark` to observe
194                // any writes it made before the call to unpark. To do that we must
195                // read from the write it made to `state`.
196                let old = self.state.swap(EMPTY, SeqCst);
197                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
198
199                return;
200            }
201            Err(actual) => panic!("inconsistent park state; actual = {}", actual),
202        }
203
204        loop {
205            m = self.condvar.wait(m).unwrap();
206
207            if self
208                .state
209                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
210                .is_ok()
211            {
212                // got a notification
213                return;
214            }
215
216            // spurious wakeup, go back to sleep
217        }
218    }
219
220    fn park_timeout(&self, dur: Duration) {
221        // Like `park` above we have a fast path for an already-notified thread,
222        // and afterwards we start coordinating for a sleep. Return quickly.
223        if self
224            .state
225            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
226            .is_ok()
227        {
228            return;
229        }
230
231        let m = self.mutex.lock().unwrap();
232
233        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
234            Ok(_) => {}
235            Err(NOTIFIED) => {
236                // We must read again here, see `park`.
237                let old = self.state.swap(EMPTY, SeqCst);
238                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
239
240                return;
241            }
242            Err(actual) => panic!("inconsistent park_timeout state; actual = {}", actual),
243        }
244
245        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
246        // from a notification, we just want to unconditionally set the state back to
247        // empty, either consuming a notification or un-flagging ourselves as
248        // parked.
249        let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap();
250
251        match self.state.swap(EMPTY, SeqCst) {
252            NOTIFIED => {} // got a notification, hurray!
253            PARKED => {}   // no notification, alas
254            n => panic!("inconsistent park_timeout state: {}", n),
255        }
256    }
257
258    fn unpark(&self) {
259        // To ensure the unparked thread will observe any writes we made before
260        // this call, we must perform a release operation that `park` can
261        // synchronize with. To do that we must write `NOTIFIED` even if `state`
262        // is already `NOTIFIED`. That is why this must be a swap rather than a
263        // compare-and-swap that returns if it reads `NOTIFIED` on failure.
264        match self.state.swap(NOTIFIED, SeqCst) {
265            EMPTY => return,    // no one was waiting
266            NOTIFIED => return, // already unparked
267            PARKED => {}        // gotta go wake someone up
268            _ => panic!("inconsistent state in unpark"),
269        }
270
271        // There is a period between when the parked thread sets `state` to
272        // `PARKED` (or last checked `state` in the case of a spurious wake
273        // up) and when it actually waits on `cvar`. If we were to notify
274        // during this period it would be ignored and then when the parked
275        // thread went to sleep it would never wake up. Fortunately, it has
276        // `lock` locked at this stage so we can acquire `lock` to wait until
277        // it is ready to receive the notification.
278        //
279        // Releasing `lock` before the call to `notify_one` means that when the
280        // parked thread wakes it doesn't get woken only to have to wait for us
281        // to release `lock`.
282        drop(self.mutex.lock().unwrap());
283
284        self.condvar.notify_one()
285    }
286}
287
288impl Default for ParkThread {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294// ===== impl UnparkThread =====
295
296impl Unpark for UnparkThread {
297    fn unpark(&self) {
298        self.inner.unpark();
299    }
300}
301
302    use std::marker::PhantomData;
303    use std::rc::Rc;
304
305    use std::mem;
306    use std::task::{RawWaker, RawWakerVTable, Waker};
307
308    /// Blocks the current thread using a condition variable.
309    #[derive(Debug)]
310    pub struct CachedParkThread {
311        _anchor: PhantomData<Rc<()>>,
312    }
313
314    impl CachedParkThread {
315        /// Create a new `ParkThread` handle for the current thread.
316        ///
317        /// This type cannot be moved to other threads, so it should be created on
318        /// the thread that the caller intends to park.
319        pub fn new() -> CachedParkThread {
320            CachedParkThread {
321                _anchor: PhantomData,
322            }
323        }
324
325        /// Get a reference to the `ParkThread` handle for this thread.
326        fn with_current<F, R>(&self, f: F) -> R
327        where
328            F: FnOnce(&ParkThread) -> R,
329        {
330            CURRENT_PARKER.with(|inner| f(inner))
331        }
332    }
333
334    impl Park for CachedParkThread {
335        type Unpark = UnparkThread;
336        type Error = ParkError;
337
338        fn unpark(&self) -> Self::Unpark {
339            self.with_current(|park_thread| park_thread.unpark())
340        }
341
342        fn park(&mut self) -> Result<(), Self::Error> {
343            self.with_current(|park_thread| park_thread.inner.park());
344            Ok(())
345        }
346
347        fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
348            self.with_current(|park_thread| park_thread.inner.park_timeout(duration));
349            Ok(())
350        }
351    }
352
353
354    impl UnparkThread {
355        pub fn into_waker(self) -> Waker {
356            unsafe {
357                let raw = unparker_to_raw_waker(self.inner);
358                Waker::from_raw(raw)
359            }
360        }
361    }
362
363    impl Inner {
364        #[allow(clippy::wrong_self_convention)]
365        fn into_raw(this: Arc<Inner>) -> *const () {
366            Arc::into_raw(this) as *const ()
367        }
368
369        unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
370            Arc::from_raw(ptr as *const Inner)
371        }
372    }
373
374    unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
375        RawWaker::new(
376            Inner::into_raw(unparker),
377            &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
378        )
379    }
380
381    unsafe fn clone(raw: *const ()) -> RawWaker {
382        let unparker = Inner::from_raw(raw);
383
384        // Increment the ref count
385        mem::forget(unparker.clone());
386
387        unparker_to_raw_waker(unparker)
388    }
389
390    unsafe fn drop_waker(raw: *const ()) {
391        let _ = Inner::from_raw(raw);
392    }
393
394    unsafe fn wake(raw: *const ()) {
395        let unparker = Inner::from_raw(raw);
396        unparker.unpark();
397    }
398
399    unsafe fn wake_by_ref(raw: *const ()) {
400        let unparker = Inner::from_raw(raw);
401        unparker.unpark();
402
403        // We don't actually own a reference to the unparker
404        mem::forget(unparker);
405    }
406
407    impl<A, B> Park for Either<A, B>
408    where
409        A: Park,
410        B: Park,
411    {
412        type Unpark = Either<A::Unpark, B::Unpark>;
413        type Error = Either<A::Error, B::Error>;
414    
415        fn unpark(&self) -> Self::Unpark {
416            match self {
417                Either::A(a) => Either::A(a.unpark()),
418                Either::B(b) => Either::B(b.unpark()),
419            }
420        }
421    
422        fn park(&mut self) -> Result<(), Self::Error> {
423            match self {
424                Either::A(a) => a.park().map_err(Either::A),
425                Either::B(b) => b.park().map_err(Either::B),
426            }
427        }
428    
429        fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
430            match self {
431                Either::A(a) => a.park_timeout(duration).map_err(Either::A),
432                Either::B(b) => b.park_timeout(duration).map_err(Either::B),
433            }
434        }
435    }
436    
437    impl<A, B> Unpark for Either<A, B>
438    where
439        A: Unpark,
440        B: Unpark,
441    {
442        fn unpark(&self) {
443            match self {
444                Either::A(a) => a.unpark(),
445                Either::B(b) => b.unpark(),
446            }
447        }
448    }