1use std::future::Future;
2use std::sync::{Mutex, Arc};
3use std::pin::Pin;
4use std::task::{Poll, Context, Waker};
5
6struct NotifyFutureState<RESULT> {
7 waker: Option<Waker>,
8 result: Option<RESULT>,
9}
10
11impl <RESULT> NotifyFutureState<RESULT> {
12 pub fn new() -> Arc<Mutex<NotifyFutureState<RESULT>>> {
13 Arc::new(Mutex::new(NotifyFutureState {
14 waker: None,
15 result: None
16 }))
17 }
18
19 pub fn set_complete(state: &Arc<Mutex<NotifyFutureState<RESULT>>>, result: RESULT) {
20 let mut state = state.lock().unwrap();
21 state.result = Some(result);
22 if state.waker.is_some() {
23 state.waker.take().unwrap().wake();
24 }
25 }
26}
27
28#[deprecated(
29 since = "0.2.1",
30 note = "Please use Notify instead"
31)]
32pub struct NotifyFuture<RESULT> {
33 state:Arc<Mutex<NotifyFutureState<RESULT>>>
34}
35
36impl<RESULT> Clone for NotifyFuture<RESULT> {
37 fn clone(&self) -> Self {
38 Self {
39 state: self.state.clone()
40 }
41 }
42}
43
44impl <RESULT> NotifyFuture<RESULT> {
45 pub fn new() -> Self {
46 Self{
47 state: NotifyFutureState::new()
48 }
49 }
50
51 pub fn set_complete(&self, result: RESULT) {
52 NotifyFutureState::set_complete(&self.state, result);
53 }
54}
55
56impl <RESULT> Future for NotifyFuture<RESULT> {
57 type Output = RESULT;
58
59 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
60 let mut state = self.state.lock().unwrap();
61 if state.result.is_some() {
62 return Poll::Ready(state.result.take().unwrap());
63 }
64
65 if state.waker.is_none() {
66 state.waker = Some(cx.waker().clone());
67 }
68 Poll::Pending
69 }
70}
71
72pub struct Notify<RESULT> {
73 state: Arc<Mutex<NotifyFutureState<RESULT>>>
74}
75
76impl<RESULT> Notify<RESULT> {
77 pub fn new() -> (Self, NotifyWaiter<RESULT>) {
78 let state = NotifyFutureState::new();
79 (Self {
80 state: state.clone()
81 }, NotifyWaiter::new(state))
82 }
83
84 pub fn notify(self, result: RESULT) {
85 NotifyFutureState::set_complete(&self.state, result);
86 }
87}
88
89pub struct NotifyWaiter<RESULT> {
90 state: Arc<Mutex<NotifyFutureState<RESULT>>>
91}
92
93impl<RESULT> NotifyWaiter<RESULT> {
94 pub(crate) fn new(state: Arc<Mutex<NotifyFutureState<RESULT>>>) -> Self {
95 Self {
96 state
97 }
98 }
99}
100
101impl <RESULT> Future for NotifyWaiter<RESULT> {
102 type Output = RESULT;
103
104 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105 let mut state = self.state.lock().unwrap();
106 if state.result.is_some() {
107 return Poll::Ready(state.result.take().unwrap());
108 }
109
110 if state.waker.is_none() || !state.waker.as_ref().unwrap().will_wake(cx.waker()) {
111 state.waker = Some(cx.waker().clone());
112 }
113 Poll::Pending
114 }
115}
116
117#[cfg(test)]
118mod test {
119 use std::time::Duration;
120 use crate::{Notify, NotifyFuture};
121
122 #[test]
123 fn test() {
124 async_std::task::block_on(async {
125 let notify_future = NotifyFuture::<u32>::new();
126 let tmp_future = notify_future.clone();
127 async_std::task::spawn(async move {
128 async_std::task::sleep(Duration::from_secs(3)).await;
129 tmp_future.set_complete(1);
130 });
131 let ret = notify_future.await;
132 assert_eq!(ret, 1);
133 });
134 }
135
136 #[test]
137 fn test2() {
138 async_std::task::block_on(async {
139 let (notify, waiter) = Notify::<u32>::new();
140 async_std::task::spawn(async move {
141 async_std::task::sleep(Duration::from_secs(3)).await;
142 notify.notify(1);
143 });
144 let ret = waiter.await;
145 assert_eq!(ret, 1);
146 });
147 }
148}