koper/thread/park.rs
1use crate::either::Either;
2
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Condvar, Mutex};
5use std::sync::atomic::Ordering::SeqCst;
6use std::time::Duration;
7
8
9/// Block the current thread.
10///
11/// See [module documentation][mod] for more details.
12///
13/// [mod]: ../index.html
14pub trait Park {
15 /// Unpark handle type for the `Park` implementation.
16 type Unpark: Unpark;
17
18 /// Error returned by `park`
19 type Error;
20
21 /// Get a new `Unpark` handle associated with this `Park` instance.
22 fn unpark(&self) -> Self::Unpark;
23
24 /// Block the current thread unless or until the token is available.
25 ///
26 /// A call to `park` does not guarantee that the thread will remain blocked
27 /// forever, and callers should be prepared for this possibility. This
28 /// function may wakeup spuriously for any reason.
29 ///
30 /// See [module documentation][mod] for more details.
31 ///
32 /// # Panics
33 ///
34 /// This function **should** not panic, but ultimately, panics are left as
35 /// an implementation detail. Refer to the documentation for the specific
36 /// `Park` implementation
37 ///
38 /// [mod]: ../index.html
39 fn park(&mut self) -> Result<(), Self::Error>;
40
41 /// Park the current thread for at most `duration`.
42 ///
43 /// This function is the same as `park` but allows specifying a maximum time
44 /// to block the thread for.
45 ///
46 /// Same as `park`, there is no guarantee that the thread will remain
47 /// blocked for any amount of time. Spurious wakeups are permitted for any
48 /// reason.
49 ///
50 /// See [module documentation][mod] for more details.
51 ///
52 /// # Panics
53 ///
54 /// This function **should** not panic, but ultimately, panics are left as
55 /// an implementation detail. Refer to the documentation for the specific
56 /// `Park` implementation
57 ///
58 /// [mod]: ../index.html
59 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;
60}
61
62/// Unblock a thread blocked by the associated [`Park`] instance.
63///
64/// See [module documentation][mod] for more details.
65///
66/// [mod]: ../index.html
67/// [`Park`]: trait.Park.html
68pub trait Unpark: Sync + Send + 'static {
69 /// Unblock a thread that is blocked by the associated `Park` handle.
70 ///
71 /// Calling `unpark` atomically makes available the unpark token, if it is
72 /// not already available.
73 ///
74 /// See [module documentation][mod] for more details.
75 ///
76 /// # Panics
77 ///
78 /// This function **should** not panic, but ultimately, panics are left as
79 /// an implementation detail. Refer to the documentation for the specific
80 /// `Unpark` implementation
81 ///
82 /// [mod]: ../index.html
83 fn unpark(&self);
84}
85
86impl Unpark for Box<dyn Unpark> {
87 fn unpark(&self) {
88 (**self).unpark()
89 }
90}
91
92impl Unpark for Arc<dyn Unpark> {
93 fn unpark(&self) {
94 (**self).unpark()
95 }
96}
97
98
99#[derive(Debug)]
100pub struct ParkThread {
101 inner: Arc<Inner>,
102}
103
104/// Error returned by [`ParkThread`]
105///
106/// This currently is never returned, but might at some point in the future.
107///
108/// [`ParkThread`]: struct.ParkThread.html
109#[derive(Debug)]
110pub struct ParkError {
111 _p: (),
112}
113
114/// Unblocks a thread that was blocked by `ParkThread`.
115#[derive(Clone, Debug)]
116pub struct UnparkThread {
117 inner: Arc<Inner>,
118}
119
120#[derive(Debug)]
121struct Inner {
122 state: AtomicUsize,
123 mutex: Mutex<()>,
124 condvar: Condvar,
125}
126
127const EMPTY: usize = 0;
128const PARKED: usize = 1;
129const NOTIFIED: usize = 2;
130
131thread_local! {
132 static CURRENT_PARKER: ParkThread = ParkThread::new();
133}
134
135// ==== impl ParkThread ====
136
137impl ParkThread {
138 pub fn new() -> Self {
139 Self {
140 inner: Arc::new(Inner {
141 state: AtomicUsize::new(EMPTY),
142 mutex: Mutex::new(()),
143 condvar: Condvar::new(),
144 }),
145 }
146 }
147}
148
149impl Park for ParkThread {
150 type Unpark = UnparkThread;
151 type Error = ParkError;
152
153 fn unpark(&self) -> Self::Unpark {
154 let inner = self.inner.clone();
155 UnparkThread { inner }
156 }
157
158 fn park(&mut self) -> Result<(), Self::Error> {
159 self.inner.park();
160 Ok(())
161 }
162
163 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
164 self.inner.park_timeout(duration);
165 Ok(())
166 }
167}
168
169// ==== impl Inner ====
170
171impl Inner {
172 /// Park the current thread for at most `dur`.
173 fn park(&self) {
174 // If we were previously notified then we consume this notification and
175 // return quickly.
176 if self
177 .state
178 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
179 .is_ok()
180 {
181 return;
182 }
183
184 // Otherwise we need to coordinate going to sleep
185 let mut m = self.mutex.lock().unwrap();
186
187 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
188 Ok(_) => {}
189 Err(NOTIFIED) => {
190 // We must read here, even though we know it will be `NOTIFIED`.
191 // This is because `unpark` may have been called again since we read
192 // `NOTIFIED` in the `compare_exchange` above. We must perform an
193 // acquire operation that synchronizes with that `unpark` to observe
194 // any writes it made before the call to unpark. To do that we must
195 // read from the write it made to `state`.
196 let old = self.state.swap(EMPTY, SeqCst);
197 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
198
199 return;
200 }
201 Err(actual) => panic!("inconsistent park state; actual = {}", actual),
202 }
203
204 loop {
205 m = self.condvar.wait(m).unwrap();
206
207 if self
208 .state
209 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
210 .is_ok()
211 {
212 // got a notification
213 return;
214 }
215
216 // spurious wakeup, go back to sleep
217 }
218 }
219
220 fn park_timeout(&self, dur: Duration) {
221 // Like `park` above we have a fast path for an already-notified thread,
222 // and afterwards we start coordinating for a sleep. Return quickly.
223 if self
224 .state
225 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
226 .is_ok()
227 {
228 return;
229 }
230
231 let m = self.mutex.lock().unwrap();
232
233 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
234 Ok(_) => {}
235 Err(NOTIFIED) => {
236 // We must read again here, see `park`.
237 let old = self.state.swap(EMPTY, SeqCst);
238 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
239
240 return;
241 }
242 Err(actual) => panic!("inconsistent park_timeout state; actual = {}", actual),
243 }
244
245 // Wait with a timeout, and if we spuriously wake up or otherwise wake up
246 // from a notification, we just want to unconditionally set the state back to
247 // empty, either consuming a notification or un-flagging ourselves as
248 // parked.
249 let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap();
250
251 match self.state.swap(EMPTY, SeqCst) {
252 NOTIFIED => {} // got a notification, hurray!
253 PARKED => {} // no notification, alas
254 n => panic!("inconsistent park_timeout state: {}", n),
255 }
256 }
257
258 fn unpark(&self) {
259 // To ensure the unparked thread will observe any writes we made before
260 // this call, we must perform a release operation that `park` can
261 // synchronize with. To do that we must write `NOTIFIED` even if `state`
262 // is already `NOTIFIED`. That is why this must be a swap rather than a
263 // compare-and-swap that returns if it reads `NOTIFIED` on failure.
264 match self.state.swap(NOTIFIED, SeqCst) {
265 EMPTY => return, // no one was waiting
266 NOTIFIED => return, // already unparked
267 PARKED => {} // gotta go wake someone up
268 _ => panic!("inconsistent state in unpark"),
269 }
270
271 // There is a period between when the parked thread sets `state` to
272 // `PARKED` (or last checked `state` in the case of a spurious wake
273 // up) and when it actually waits on `cvar`. If we were to notify
274 // during this period it would be ignored and then when the parked
275 // thread went to sleep it would never wake up. Fortunately, it has
276 // `lock` locked at this stage so we can acquire `lock` to wait until
277 // it is ready to receive the notification.
278 //
279 // Releasing `lock` before the call to `notify_one` means that when the
280 // parked thread wakes it doesn't get woken only to have to wait for us
281 // to release `lock`.
282 drop(self.mutex.lock().unwrap());
283
284 self.condvar.notify_one()
285 }
286}
287
288impl Default for ParkThread {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294// ===== impl UnparkThread =====
295
296impl Unpark for UnparkThread {
297 fn unpark(&self) {
298 self.inner.unpark();
299 }
300}
301
302 use std::marker::PhantomData;
303 use std::rc::Rc;
304
305 use std::mem;
306 use std::task::{RawWaker, RawWakerVTable, Waker};
307
308 /// Blocks the current thread using a condition variable.
309 #[derive(Debug)]
310 pub struct CachedParkThread {
311 _anchor: PhantomData<Rc<()>>,
312 }
313
314 impl CachedParkThread {
315 /// Create a new `ParkThread` handle for the current thread.
316 ///
317 /// This type cannot be moved to other threads, so it should be created on
318 /// the thread that the caller intends to park.
319 pub fn new() -> CachedParkThread {
320 CachedParkThread {
321 _anchor: PhantomData,
322 }
323 }
324
325 /// Get a reference to the `ParkThread` handle for this thread.
326 fn with_current<F, R>(&self, f: F) -> R
327 where
328 F: FnOnce(&ParkThread) -> R,
329 {
330 CURRENT_PARKER.with(|inner| f(inner))
331 }
332 }
333
334 impl Park for CachedParkThread {
335 type Unpark = UnparkThread;
336 type Error = ParkError;
337
338 fn unpark(&self) -> Self::Unpark {
339 self.with_current(|park_thread| park_thread.unpark())
340 }
341
342 fn park(&mut self) -> Result<(), Self::Error> {
343 self.with_current(|park_thread| park_thread.inner.park());
344 Ok(())
345 }
346
347 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
348 self.with_current(|park_thread| park_thread.inner.park_timeout(duration));
349 Ok(())
350 }
351 }
352
353
354 impl UnparkThread {
355 pub fn into_waker(self) -> Waker {
356 unsafe {
357 let raw = unparker_to_raw_waker(self.inner);
358 Waker::from_raw(raw)
359 }
360 }
361 }
362
363 impl Inner {
364 #[allow(clippy::wrong_self_convention)]
365 fn into_raw(this: Arc<Inner>) -> *const () {
366 Arc::into_raw(this) as *const ()
367 }
368
369 unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
370 Arc::from_raw(ptr as *const Inner)
371 }
372 }
373
374 unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
375 RawWaker::new(
376 Inner::into_raw(unparker),
377 &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
378 )
379 }
380
381 unsafe fn clone(raw: *const ()) -> RawWaker {
382 let unparker = Inner::from_raw(raw);
383
384 // Increment the ref count
385 mem::forget(unparker.clone());
386
387 unparker_to_raw_waker(unparker)
388 }
389
390 unsafe fn drop_waker(raw: *const ()) {
391 let _ = Inner::from_raw(raw);
392 }
393
394 unsafe fn wake(raw: *const ()) {
395 let unparker = Inner::from_raw(raw);
396 unparker.unpark();
397 }
398
399 unsafe fn wake_by_ref(raw: *const ()) {
400 let unparker = Inner::from_raw(raw);
401 unparker.unpark();
402
403 // We don't actually own a reference to the unparker
404 mem::forget(unparker);
405 }
406
407 impl<A, B> Park for Either<A, B>
408 where
409 A: Park,
410 B: Park,
411 {
412 type Unpark = Either<A::Unpark, B::Unpark>;
413 type Error = Either<A::Error, B::Error>;
414
415 fn unpark(&self) -> Self::Unpark {
416 match self {
417 Either::A(a) => Either::A(a.unpark()),
418 Either::B(b) => Either::B(b.unpark()),
419 }
420 }
421
422 fn park(&mut self) -> Result<(), Self::Error> {
423 match self {
424 Either::A(a) => a.park().map_err(Either::A),
425 Either::B(b) => b.park().map_err(Either::B),
426 }
427 }
428
429 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
430 match self {
431 Either::A(a) => a.park_timeout(duration).map_err(Either::A),
432 Either::B(b) => b.park_timeout(duration).map_err(Either::B),
433 }
434 }
435 }
436
437 impl<A, B> Unpark for Either<A, B>
438 where
439 A: Unpark,
440 B: Unpark,
441 {
442 fn unpark(&self) {
443 match self {
444 Either::A(a) => a.unpark(),
445 Either::B(b) => b.unpark(),
446 }
447 }
448 }