Skip to main content

ntex_util/
task.rs

1//! A synchronization primitive for task wakeup.
2use std::{cell::Cell, fmt, marker::PhantomData, rc, task::Waker};
3
4/// A synchronization primitive for task wakeup.
5///
6/// Sometimes the task interested in a given event will change over time.
7/// An `LocalWaker` can coordinate concurrent notifications with the consumer
8/// potentially "updating" the underlying task to wake up. This is useful in
9/// scenarios where a computation completes in another task and wants to
10/// notify the consumer, but the consumer is in the process of being migrated to
11/// a new logical task.
12///
13/// Consumers should call `register` before checking the result of a computation
14/// and producers should call `wake` after producing the computation (this
15/// differs from the usual `thread::park` pattern). It is also permitted for
16/// `wake` to be called **before** `register`. This results in a no-op.
17///
18/// A single `LocalWaker` may be reused for any number of calls to `register` or
19/// `wake`.
20#[derive(Default)]
21pub struct LocalWaker {
22    waker: Cell<Option<Waker>>,
23    _t: PhantomData<rc::Rc<()>>,
24}
25
26impl LocalWaker {
27    /// Create an `LocalWaker`.
28    pub fn new() -> Self {
29        LocalWaker::with(None)
30    }
31
32    /// Create an `LocalWaker`.
33    pub fn with(waker: Option<Waker>) -> Self {
34        LocalWaker {
35            waker: Cell::new(waker),
36            _t: PhantomData,
37        }
38    }
39
40    #[inline]
41    /// Registers the waker to be notified on calls to `wake`.
42    ///
43    /// Returns `true` if waker was registered before.
44    pub fn register(&self, waker: &Waker) -> bool {
45        self.waker.replace(Some(waker.clone())).is_some()
46    }
47
48    #[inline]
49    /// Calls `wake` on the last `Waker` passed to `register`.
50    ///
51    /// If `register` has not been called yet, then this does nothing.
52    pub fn wake(&self) {
53        if let Some(waker) = self.take() {
54            waker.wake();
55        }
56    }
57
58    #[inline]
59    /// Calls `wake` on the last `Waker` passed to `register`.
60    ///
61    /// If `register` has not been called yet, then this returns `false`.
62    pub fn wake_checked(&self) -> bool {
63        if let Some(waker) = self.take() {
64            waker.wake();
65            true
66        } else {
67            false
68        }
69    }
70
71    /// Returns the last `Waker` passed to `register`, so that the user can wake it.
72    ///
73    /// If a waker has not been registered, this returns `None`.
74    pub fn take(&self) -> Option<Waker> {
75        self.waker.take()
76    }
77
78    #[doc(hidden)]
79    /// Check if waker is set
80    pub fn is_set(&self) -> bool {
81        let waker = self.waker.take();
82        let set = waker.is_some();
83        self.waker.set(waker);
84        set
85    }
86}
87
88impl Clone for LocalWaker {
89    fn clone(&self) -> Self {
90        LocalWaker::new()
91    }
92}
93
94impl fmt::Debug for LocalWaker {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        write!(f, "LocalWaker")
97    }
98}
99
100/// Yields execution back to the current runtime.
101pub async fn yield_to() {
102    use std::{future::Future, pin::Pin, task::Context, task::Poll};
103
104    struct Yield {
105        completed: bool,
106    }
107
108    impl Future for Yield {
109        type Output = ();
110
111        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
112            if self.completed {
113                return Poll::Ready(());
114            }
115
116            self.completed = true;
117            cx.waker().wake_by_ref();
118
119            Poll::Pending
120        }
121    }
122
123    Yield { completed: false }.await;
124}
125
126#[cfg(test)]
127mod test {
128    use super::*;
129
130    #[ntex::test]
131    async fn yield_test() {
132        yield_to().await;
133    }
134}