1use std::future::Future;
2use std::sync::{Mutex, Arc};
3use std::pin::Pin;
4use std::task::{Poll, Context, Waker};
5
6struct NotifyFutureState<RESULT> {
7 waker: Option<Waker>,
8 result: Option<RESULT>,
9 is_canceled: bool,
10}
11
12impl <RESULT> NotifyFutureState<RESULT> {
13 pub fn new() -> Arc<Mutex<NotifyFutureState<RESULT>>> {
14 Arc::new(Mutex::new(NotifyFutureState {
15 waker: None,
16 result: None,
17 is_canceled: false,
18 }))
19 }
20
21 pub fn set_complete(state: &Arc<Mutex<NotifyFutureState<RESULT>>>, result: RESULT) {
22 let mut state = state.lock().unwrap();
23 state.result = Some(result);
24 if state.waker.is_some() {
25 state.waker.take().unwrap().wake();
26 }
27 }
28
29 pub fn is_canceled(&self) -> bool {
30 self.is_canceled
31 }
32}
33
34#[deprecated(
35 since = "0.2.1",
36 note = "Please use Notify instead"
37)]
38pub struct NotifyFuture<RESULT> {
39 state:Arc<Mutex<NotifyFutureState<RESULT>>>
40}
41
42impl<RESULT> Clone for NotifyFuture<RESULT> {
43 fn clone(&self) -> Self {
44 Self {
45 state: self.state.clone()
46 }
47 }
48}
49
50impl <RESULT> NotifyFuture<RESULT> {
51 pub fn new() -> Self {
52 Self{
53 state: NotifyFutureState::new()
54 }
55 }
56
57 pub fn set_complete(&self, result: RESULT) {
58 NotifyFutureState::set_complete(&self.state, result);
59 }
60}
61
62impl <RESULT> Future for NotifyFuture<RESULT> {
63 type Output = RESULT;
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let mut state = self.state.lock().unwrap();
67 if state.result.is_some() {
68 return Poll::Ready(state.result.take().unwrap());
69 }
70
71 if state.waker.is_none() {
72 state.waker = Some(cx.waker().clone());
73 }
74 Poll::Pending
75 }
76}
77
78pub struct Notify<RESULT> {
79 state: Arc<Mutex<NotifyFutureState<RESULT>>>
80}
81
82impl<RESULT> Notify<RESULT> {
83 pub fn new() -> (Self, NotifyWaiter<RESULT>) {
84 let state = NotifyFutureState::new();
85 (Self {
86 state: state.clone()
87 }, NotifyWaiter::new(state))
88 }
89
90 pub fn notify(self, result: RESULT) {
91 NotifyFutureState::set_complete(&self.state, result);
92 }
93
94 pub fn is_canceled(&self) -> bool {
95 self.state.lock().unwrap().is_canceled()
96 }
97}
98
99pub struct NotifyWaiter<RESULT> {
100 state: Arc<Mutex<NotifyFutureState<RESULT>>>
101}
102
103impl<RESULT> NotifyWaiter<RESULT> {
104 pub(crate) fn new(state: Arc<Mutex<NotifyFutureState<RESULT>>>) -> Self {
105 Self {
106 state
107 }
108 }
109}
110
111impl<RESULT> Drop for NotifyWaiter<RESULT> {
112 fn drop(&mut self) {
113 let mut state = self.state.lock().unwrap();
114 state.waker.take();
115 state.is_canceled = true;
116 }
117}
118
119impl <RESULT> Future for NotifyWaiter<RESULT> {
120 type Output = RESULT;
121
122 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123 let mut state = self.state.lock().unwrap();
124 if state.result.is_some() {
125 return Poll::Ready(state.result.take().unwrap());
126 }
127
128 if state.waker.is_none() || !state.waker.as_ref().unwrap().will_wake(cx.waker()) {
129 state.waker = Some(cx.waker().clone());
130 }
131 Poll::Pending
132 }
133}
134
135#[cfg(test)]
136mod test {
137 use std::time::Duration;
138 use crate::{Notify, NotifyFuture};
139
140 #[test]
141 fn test() {
142 async_std::task::block_on(async {
143 let notify_future = NotifyFuture::<u32>::new();
144 let tmp_future = notify_future.clone();
145 async_std::task::spawn(async move {
146 async_std::task::sleep(Duration::from_secs(3)).await;
147 tmp_future.set_complete(1);
148 });
149 let ret = notify_future.await;
150 assert_eq!(ret, 1);
151 });
152 }
153
154 #[test]
155 fn test2() {
156 async_std::task::block_on(async {
157 let (notify, waiter) = Notify::<u32>::new();
158 async_std::task::spawn(async move {
159 async_std::task::sleep(Duration::from_secs(3)).await;
160 notify.notify(1);
161 });
162 let ret = waiter.await;
163 assert_eq!(ret, 1);
164 });
165 }
166}