Skip to main content

notify_future/
lib.rs

1use std::future::Future;
2use std::sync::{Arc, Mutex, MutexGuard};
3use std::pin::Pin;
4use std::task::{Poll, Context, Waker};
5
6struct NotifyFutureState<RESULT> {
7    waker: Option<Waker>,
8    result: Option<RESULT>,
9    is_completed: bool,
10    is_canceled: bool,
11}
12
13impl <RESULT> NotifyFutureState<RESULT> {
14    pub fn new() -> Arc<Mutex<NotifyFutureState<RESULT>>> {
15        Arc::new(Mutex::new(NotifyFutureState {
16            waker: None,
17            result: None,
18            is_completed: false,
19            is_canceled: false,
20        }))
21    }
22
23    pub fn set_complete(state: &Arc<Mutex<NotifyFutureState<RESULT>>>, result: RESULT) {
24        let waker = {
25            let mut state = lock_state(state);
26            if state.is_completed || state.is_canceled {
27                return;
28            }
29
30            state.result = Some(result);
31            state.is_completed = true;
32            state.waker.take()
33        };
34
35        if let Some(waker) = waker {
36            waker.wake();
37        }
38    }
39
40    pub fn is_canceled(&self) -> bool {
41        self.is_canceled
42    }
43}
44
45fn lock_state<RESULT>(state: &Arc<Mutex<NotifyFutureState<RESULT>>>) -> MutexGuard<'_, NotifyFutureState<RESULT>> {
46    state.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
47}
48
49#[deprecated(
50    since = "0.2.1",
51    note = "Please use Notify instead"
52)]
53pub struct NotifyFuture<RESULT> {
54    state:Arc<Mutex<NotifyFutureState<RESULT>>>
55}
56
57#[allow(deprecated)]
58impl<RESULT> Clone for NotifyFuture<RESULT> {
59    fn clone(&self) -> Self {
60        Self {
61            state: self.state.clone()
62        }
63    }
64}
65
66#[allow(deprecated)]
67impl <RESULT> NotifyFuture<RESULT> {
68    pub fn new() -> Self {
69        Self{
70            state: NotifyFutureState::new()
71        }
72    }
73
74    pub fn set_complete(&self, result: RESULT) {
75        NotifyFutureState::set_complete(&self.state, result);
76    }
77}
78
79#[allow(deprecated)]
80impl <RESULT> Future for NotifyFuture<RESULT> {
81    type Output = RESULT;
82
83    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84        let mut state = lock_state(&self.state);
85        if state.is_completed {
86            if let Some(result) = state.result.take() {
87                return Poll::Ready(result);
88            }
89
90            panic!("NotifyFuture was awaited by more than one task. Use Notify::new() instead");
91        }
92
93        if state.waker.is_none() || !state.waker.as_ref().unwrap().will_wake(cx.waker()) {
94            state.waker = Some(cx.waker().clone());
95        }
96        Poll::Pending
97    }
98}
99
100pub struct Notify<RESULT> {
101    state: Arc<Mutex<NotifyFutureState<RESULT>>>
102}
103
104impl<RESULT> Notify<RESULT> {
105    pub fn new() -> (Self, NotifyWaiter<RESULT>) {
106        let state = NotifyFutureState::new();
107        (Self {
108            state: state.clone()
109        }, NotifyWaiter::new(state))
110    }
111
112    pub fn notify(self, result: RESULT) {
113        NotifyFutureState::set_complete(&self.state, result);
114    }
115
116    pub fn is_canceled(&self) -> bool {
117        lock_state(&self.state).is_canceled()
118    }
119}
120
121pub struct NotifyWaiter<RESULT> {
122    state: Arc<Mutex<NotifyFutureState<RESULT>>>
123}
124
125impl<RESULT> NotifyWaiter<RESULT> {
126    pub(crate) fn new(state: Arc<Mutex<NotifyFutureState<RESULT>>>) -> Self {
127        Self {
128            state
129        }
130    }
131}
132
133impl<RESULT> Drop for NotifyWaiter<RESULT> {
134    fn drop(&mut self) {
135        let mut state = lock_state(&self.state);
136        state.waker.take();
137        if !state.is_completed {
138            state.is_canceled = true;
139        }
140    }
141}
142
143impl <RESULT> Future for NotifyWaiter<RESULT> {
144    type Output = RESULT;
145
146    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
147        let mut state = lock_state(&self.state);
148        if state.is_completed {
149            return Poll::Ready(state.result.take().unwrap());
150        }
151
152        if state.waker.is_none() || !state.waker.as_ref().unwrap().will_wake(cx.waker()) {
153            state.waker = Some(cx.waker().clone());
154        }
155        Poll::Pending
156    }
157}
158
159#[cfg(test)]
160#[allow(deprecated)]
161mod test {
162    use std::time::Duration;
163    use crate::{Notify, NotifyFuture};
164
165    #[test]
166    fn test() {
167        async_std::task::block_on(async {
168            let notify_future = NotifyFuture::<u32>::new();
169            let tmp_future = notify_future.clone();
170            async_std::task::spawn(async move {
171                async_std::task::sleep(Duration::from_millis(2000)).await;
172                tmp_future.set_complete(1);
173            });
174            let ret = notify_future.await;
175            assert_eq!(ret, 1);
176        });
177    }
178
179    #[test]
180    fn test2() {
181        async_std::task::block_on(async {
182            let (notify, waiter) = Notify::<u32>::new();
183            async_std::task::spawn(async move {
184                async_std::task::sleep(Duration::from_millis(2000)).await;
185                notify.notify(1);
186            });
187            let ret = waiter.await;
188            assert_eq!(ret, 1);
189        });
190    }
191
192    #[test]
193    fn notify_waiter_drop_before_ready_is_canceled() {
194        let (notify, waiter) = Notify::<u32>::new();
195        drop(waiter);
196        assert!(notify.is_canceled());
197    }
198
199    #[test]
200    fn repeated_set_complete_keeps_first_value() {
201        async_std::task::block_on(async {
202            let notify_future = NotifyFuture::<u32>::new();
203            let notifier = notify_future.clone();
204
205            notifier.set_complete(1);
206            notifier.set_complete(2);
207
208            let ret = notify_future.await;
209            assert_eq!(ret, 1);
210        });
211    }
212}