notify_future/
lib.rs

1use std::future::Future;
2use std::sync::{Mutex, Arc};
3use std::pin::Pin;
4use std::task::{Poll, Context, Waker};
5
6struct NotifyFutureState<RESULT> {
7    waker: Option<Waker>,
8    result: Option<RESULT>,
9}
10
11impl <RESULT> NotifyFutureState<RESULT> {
12    pub fn new() -> Arc<Mutex<NotifyFutureState<RESULT>>> {
13        Arc::new(Mutex::new(NotifyFutureState {
14            waker: None,
15            result: None
16        }))
17    }
18
19    pub fn set_complete(state: &Arc<Mutex<NotifyFutureState<RESULT>>>, result: RESULT) {
20        let mut state = state.lock().unwrap();
21        state.result = Some(result);
22        if state.waker.is_some() {
23            state.waker.take().unwrap().wake();
24        }
25    }
26}
27
28pub struct NotifyFuture<RESULT> {
29    state:Arc<Mutex<NotifyFutureState<RESULT>>>
30}
31
32impl<RESULT> Clone for NotifyFuture<RESULT> {
33    fn clone(&self) -> Self {
34        Self {
35            state: self.state.clone()
36        }
37    }
38}
39
40impl <RESULT> NotifyFuture<RESULT> {
41    pub fn new() -> Self {
42        Self{
43            state: NotifyFutureState::new()
44        }
45    }
46
47    pub fn set_complete(&self, result: RESULT) {
48        NotifyFutureState::set_complete(&self.state, result);
49    }
50}
51
52impl <RESULT> Future for NotifyFuture<RESULT> {
53    type Output = RESULT;
54
55    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56        let mut state = self.state.lock().unwrap();
57        if state.result.is_some() {
58            return Poll::Ready(state.result.take().unwrap());
59        }
60
61        if state.waker.is_none() {
62            state.waker = Some(cx.waker().clone());
63        }
64        Poll::Pending
65    }
66}
67
68pub struct Notify<RESULT> {
69    state: Arc<Mutex<NotifyFutureState<RESULT>>>
70}
71
72impl<RESULT> Notify<RESULT> {
73    fn new() -> (Self, NotifyWaiter<RESULT>) {
74        let state = NotifyFutureState::new();
75        (Self {
76            state: state.clone()
77        }, NotifyWaiter::new(state))
78    }
79
80    pub fn notify(self, result: RESULT) {
81        NotifyFutureState::set_complete(&self.state, result);
82    }
83}
84
85pub struct NotifyWaiter<RESULT> {
86    state: Arc<Mutex<NotifyFutureState<RESULT>>>
87}
88
89impl<RESULT> NotifyWaiter<RESULT> {
90    pub(crate) fn new(state: Arc<Mutex<NotifyFutureState<RESULT>>>) -> Self {
91        Self {
92            state
93        }
94    }
95}
96
97impl <RESULT> Future for NotifyWaiter<RESULT> {
98    type Output = RESULT;
99
100    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
101        let mut state = self.state.lock().unwrap();
102        if state.result.is_some() {
103            return Poll::Ready(state.result.take().unwrap());
104        }
105
106        if state.waker.is_none() {
107            state.waker = Some(cx.waker().clone());
108        }
109        Poll::Pending
110    }
111}
112
113#[cfg(test)]
114mod test {
115    use std::time::Duration;
116    use crate::{Notify, NotifyFuture};
117
118    #[test]
119    fn test() {
120        async_std::task::block_on(async {
121            let notify_future = NotifyFuture::<u32>::new();
122            let tmp_future = notify_future.clone();
123            async_std::task::spawn(async move {
124                async_std::task::sleep(Duration::from_secs(3)).await;
125                tmp_future.set_complete(1);
126            });
127            let ret = notify_future.await;
128            assert_eq!(ret, 1);
129        });
130    }
131
132    #[test]
133    fn test2() {
134        async_std::task::block_on(async {
135            let (notify, waiter) = Notify::<u32>::new();
136            async_std::task::spawn(async move {
137                async_std::task::sleep(Duration::from_secs(3)).await;
138                notify.notify(1);
139            });
140            let ret = waiter.await;
141            assert_eq!(ret, 1);
142        });
143    }
144}