1use std::future::Future;
2use std::sync::{Mutex, Arc};
3use std::pin::Pin;
4use std::task::{Poll, Context, Waker};
5
6struct NotifyFutureState<RESULT> {
7 waker: Option<Waker>,
8 result: Option<RESULT>,
9}
10
11impl <RESULT> NotifyFutureState<RESULT> {
12 pub fn new() -> Arc<Mutex<NotifyFutureState<RESULT>>> {
13 Arc::new(Mutex::new(NotifyFutureState {
14 waker: None,
15 result: None
16 }))
17 }
18
19 pub fn set_complete(state: &Arc<Mutex<NotifyFutureState<RESULT>>>, result: RESULT) {
20 let mut state = state.lock().unwrap();
21 state.result = Some(result);
22 if state.waker.is_some() {
23 state.waker.take().unwrap().wake();
24 }
25 }
26}
27
28pub struct NotifyFuture<RESULT> {
29 state:Arc<Mutex<NotifyFutureState<RESULT>>>
30}
31
32impl<RESULT> Clone for NotifyFuture<RESULT> {
33 fn clone(&self) -> Self {
34 Self {
35 state: self.state.clone()
36 }
37 }
38}
39
40impl <RESULT> NotifyFuture<RESULT> {
41 pub fn new() -> Self {
42 Self{
43 state: NotifyFutureState::new()
44 }
45 }
46
47 pub fn set_complete(&self, result: RESULT) {
48 NotifyFutureState::set_complete(&self.state, result);
49 }
50}
51
52impl <RESULT> Future for NotifyFuture<RESULT> {
53 type Output = RESULT;
54
55 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56 let mut state = self.state.lock().unwrap();
57 if state.result.is_some() {
58 return Poll::Ready(state.result.take().unwrap());
59 }
60
61 if state.waker.is_none() {
62 state.waker = Some(cx.waker().clone());
63 }
64 Poll::Pending
65 }
66}
67
68pub struct Notify<RESULT> {
69 state: Arc<Mutex<NotifyFutureState<RESULT>>>
70}
71
72impl<RESULT> Notify<RESULT> {
73 fn new() -> (Self, NotifyWaiter<RESULT>) {
74 let state = NotifyFutureState::new();
75 (Self {
76 state: state.clone()
77 }, NotifyWaiter::new(state))
78 }
79
80 pub fn notify(self, result: RESULT) {
81 NotifyFutureState::set_complete(&self.state, result);
82 }
83}
84
85pub struct NotifyWaiter<RESULT> {
86 state: Arc<Mutex<NotifyFutureState<RESULT>>>
87}
88
89impl<RESULT> NotifyWaiter<RESULT> {
90 pub(crate) fn new(state: Arc<Mutex<NotifyFutureState<RESULT>>>) -> Self {
91 Self {
92 state
93 }
94 }
95}
96
97impl <RESULT> Future for NotifyWaiter<RESULT> {
98 type Output = RESULT;
99
100 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
101 let mut state = self.state.lock().unwrap();
102 if state.result.is_some() {
103 return Poll::Ready(state.result.take().unwrap());
104 }
105
106 if state.waker.is_none() {
107 state.waker = Some(cx.waker().clone());
108 }
109 Poll::Pending
110 }
111}
112
113#[cfg(test)]
114mod test {
115 use std::time::Duration;
116 use crate::{Notify, NotifyFuture};
117
118 #[test]
119 fn test() {
120 async_std::task::block_on(async {
121 let notify_future = NotifyFuture::<u32>::new();
122 let tmp_future = notify_future.clone();
123 async_std::task::spawn(async move {
124 async_std::task::sleep(Duration::from_secs(3)).await;
125 tmp_future.set_complete(1);
126 });
127 let ret = notify_future.await;
128 assert_eq!(ret, 1);
129 });
130 }
131
132 #[test]
133 fn test2() {
134 async_std::task::block_on(async {
135 let (notify, waiter) = Notify::<u32>::new();
136 async_std::task::spawn(async move {
137 async_std::task::sleep(Duration::from_secs(3)).await;
138 notify.notify(1);
139 });
140 let ret = waiter.await;
141 assert_eq!(ret, 1);
142 });
143 }
144}