notify_future/
lib.rs

1use std::future::Future;
2use std::sync::{Mutex, Arc};
3use std::pin::Pin;
4use std::task::{Poll, Context, Waker};
5
6struct NotifyFutureState<RESULT> {
7    waker: Option<Waker>,
8    result: Option<RESULT>,
9    is_canceled: bool,
10}
11
12impl <RESULT> NotifyFutureState<RESULT> {
13    pub fn new() -> Arc<Mutex<NotifyFutureState<RESULT>>> {
14        Arc::new(Mutex::new(NotifyFutureState {
15            waker: None,
16            result: None,
17            is_canceled: false,
18        }))
19    }
20
21    pub fn set_complete(state: &Arc<Mutex<NotifyFutureState<RESULT>>>, result: RESULT) {
22        let mut state = state.lock().unwrap();
23        state.result = Some(result);
24        if state.waker.is_some() {
25            state.waker.take().unwrap().wake();
26        }
27    }
28
29    pub fn is_canceled(&self) -> bool {
30        self.is_canceled
31    }
32}
33
34#[deprecated(
35    since = "0.2.1",
36    note = "Please use Notify instead"
37)]
38pub struct NotifyFuture<RESULT> {
39    state:Arc<Mutex<NotifyFutureState<RESULT>>>
40}
41
42impl<RESULT> Clone for NotifyFuture<RESULT> {
43    fn clone(&self) -> Self {
44        Self {
45            state: self.state.clone()
46        }
47    }
48}
49
50impl <RESULT> NotifyFuture<RESULT> {
51    pub fn new() -> Self {
52        Self{
53            state: NotifyFutureState::new()
54        }
55    }
56
57    pub fn set_complete(&self, result: RESULT) {
58        NotifyFutureState::set_complete(&self.state, result);
59    }
60}
61
62impl <RESULT> Future for NotifyFuture<RESULT> {
63    type Output = RESULT;
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let mut state = self.state.lock().unwrap();
67        if state.result.is_some() {
68            return Poll::Ready(state.result.take().unwrap());
69        }
70
71        if state.waker.is_none() {
72            state.waker = Some(cx.waker().clone());
73        }
74        Poll::Pending
75    }
76}
77
78pub struct Notify<RESULT> {
79    state: Arc<Mutex<NotifyFutureState<RESULT>>>
80}
81
82impl<RESULT> Notify<RESULT> {
83    pub fn new() -> (Self, NotifyWaiter<RESULT>) {
84        let state = NotifyFutureState::new();
85        (Self {
86            state: state.clone()
87        }, NotifyWaiter::new(state))
88    }
89
90    pub fn notify(self, result: RESULT) {
91        NotifyFutureState::set_complete(&self.state, result);
92    }
93
94    pub fn is_canceled(&self) -> bool {
95        self.state.lock().unwrap().is_canceled()
96    }
97}
98
99pub struct NotifyWaiter<RESULT> {
100    state: Arc<Mutex<NotifyFutureState<RESULT>>>
101}
102
103impl<RESULT> NotifyWaiter<RESULT> {
104    pub(crate) fn new(state: Arc<Mutex<NotifyFutureState<RESULT>>>) -> Self {
105        Self {
106            state
107        }
108    }
109}
110
111impl<RESULT> Drop for NotifyWaiter<RESULT> {
112    fn drop(&mut self) {
113        let mut state = self.state.lock().unwrap();
114        state.waker.take();
115        state.is_canceled = true;
116    }
117}
118
119impl <RESULT> Future for NotifyWaiter<RESULT> {
120    type Output = RESULT;
121
122    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123        let mut state = self.state.lock().unwrap();
124        if state.result.is_some() {
125            return Poll::Ready(state.result.take().unwrap());
126        }
127
128        if state.waker.is_none() || !state.waker.as_ref().unwrap().will_wake(cx.waker()) {
129            state.waker = Some(cx.waker().clone());
130        }
131        Poll::Pending
132    }
133}
134
135#[cfg(test)]
136mod test {
137    use std::time::Duration;
138    use crate::{Notify, NotifyFuture};
139
140    #[test]
141    fn test() {
142        async_std::task::block_on(async {
143            let notify_future = NotifyFuture::<u32>::new();
144            let tmp_future = notify_future.clone();
145            async_std::task::spawn(async move {
146                async_std::task::sleep(Duration::from_secs(3)).await;
147                tmp_future.set_complete(1);
148            });
149            let ret = notify_future.await;
150            assert_eq!(ret, 1);
151        });
152    }
153
154    #[test]
155    fn test2() {
156        async_std::task::block_on(async {
157            let (notify, waiter) = Notify::<u32>::new();
158            async_std::task::spawn(async move {
159                async_std::task::sleep(Duration::from_secs(3)).await;
160                notify.notify(1);
161            });
162            let ret = waiter.await;
163            assert_eq!(ret, 1);
164        });
165    }
166}