1use std::future::Future;
2use std::sync::{Arc, Mutex, MutexGuard};
3use std::pin::Pin;
4use std::task::{Poll, Context, Waker};
5
6struct NotifyFutureState<RESULT> {
7 waker: Option<Waker>,
8 result: Option<RESULT>,
9 is_completed: bool,
10 is_canceled: bool,
11}
12
13impl <RESULT> NotifyFutureState<RESULT> {
14 pub fn new() -> Arc<Mutex<NotifyFutureState<RESULT>>> {
15 Arc::new(Mutex::new(NotifyFutureState {
16 waker: None,
17 result: None,
18 is_completed: false,
19 is_canceled: false,
20 }))
21 }
22
23 pub fn set_complete(state: &Arc<Mutex<NotifyFutureState<RESULT>>>, result: RESULT) {
24 let waker = {
25 let mut state = lock_state(state);
26 if state.is_completed || state.is_canceled {
27 return;
28 }
29
30 state.result = Some(result);
31 state.is_completed = true;
32 state.waker.take()
33 };
34
35 if let Some(waker) = waker {
36 waker.wake();
37 }
38 }
39
40 pub fn is_canceled(&self) -> bool {
41 self.is_canceled
42 }
43}
44
45fn lock_state<RESULT>(state: &Arc<Mutex<NotifyFutureState<RESULT>>>) -> MutexGuard<'_, NotifyFutureState<RESULT>> {
46 state.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
47}
48
49#[deprecated(
50 since = "0.2.1",
51 note = "Please use Notify instead"
52)]
53pub struct NotifyFuture<RESULT> {
54 state:Arc<Mutex<NotifyFutureState<RESULT>>>
55}
56
57#[allow(deprecated)]
58impl<RESULT> Clone for NotifyFuture<RESULT> {
59 fn clone(&self) -> Self {
60 Self {
61 state: self.state.clone()
62 }
63 }
64}
65
66#[allow(deprecated)]
67impl <RESULT> NotifyFuture<RESULT> {
68 pub fn new() -> Self {
69 Self{
70 state: NotifyFutureState::new()
71 }
72 }
73
74 pub fn set_complete(&self, result: RESULT) {
75 NotifyFutureState::set_complete(&self.state, result);
76 }
77}
78
79#[allow(deprecated)]
80impl <RESULT> Future for NotifyFuture<RESULT> {
81 type Output = RESULT;
82
83 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84 let mut state = lock_state(&self.state);
85 if state.is_completed {
86 if let Some(result) = state.result.take() {
87 return Poll::Ready(result);
88 }
89
90 panic!("NotifyFuture was awaited by more than one task. Use Notify::new() instead");
91 }
92
93 if state.waker.is_none() || !state.waker.as_ref().unwrap().will_wake(cx.waker()) {
94 state.waker = Some(cx.waker().clone());
95 }
96 Poll::Pending
97 }
98}
99
100pub struct Notify<RESULT> {
101 state: Arc<Mutex<NotifyFutureState<RESULT>>>
102}
103
104impl<RESULT> Notify<RESULT> {
105 pub fn new() -> (Self, NotifyWaiter<RESULT>) {
106 let state = NotifyFutureState::new();
107 (Self {
108 state: state.clone()
109 }, NotifyWaiter::new(state))
110 }
111
112 pub fn notify(self, result: RESULT) {
113 NotifyFutureState::set_complete(&self.state, result);
114 }
115
116 pub fn is_canceled(&self) -> bool {
117 lock_state(&self.state).is_canceled()
118 }
119}
120
121pub struct NotifyWaiter<RESULT> {
122 state: Arc<Mutex<NotifyFutureState<RESULT>>>
123}
124
125impl<RESULT> NotifyWaiter<RESULT> {
126 pub(crate) fn new(state: Arc<Mutex<NotifyFutureState<RESULT>>>) -> Self {
127 Self {
128 state
129 }
130 }
131}
132
133impl<RESULT> Drop for NotifyWaiter<RESULT> {
134 fn drop(&mut self) {
135 let mut state = lock_state(&self.state);
136 state.waker.take();
137 if !state.is_completed {
138 state.is_canceled = true;
139 }
140 }
141}
142
143impl <RESULT> Future for NotifyWaiter<RESULT> {
144 type Output = RESULT;
145
146 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
147 let mut state = lock_state(&self.state);
148 if state.is_completed {
149 return Poll::Ready(state.result.take().unwrap());
150 }
151
152 if state.waker.is_none() || !state.waker.as_ref().unwrap().will_wake(cx.waker()) {
153 state.waker = Some(cx.waker().clone());
154 }
155 Poll::Pending
156 }
157}
158
159#[cfg(test)]
160#[allow(deprecated)]
161mod test {
162 use std::time::Duration;
163 use crate::{Notify, NotifyFuture};
164
165 #[test]
166 fn test() {
167 async_std::task::block_on(async {
168 let notify_future = NotifyFuture::<u32>::new();
169 let tmp_future = notify_future.clone();
170 async_std::task::spawn(async move {
171 async_std::task::sleep(Duration::from_millis(2000)).await;
172 tmp_future.set_complete(1);
173 });
174 let ret = notify_future.await;
175 assert_eq!(ret, 1);
176 });
177 }
178
179 #[test]
180 fn test2() {
181 async_std::task::block_on(async {
182 let (notify, waiter) = Notify::<u32>::new();
183 async_std::task::spawn(async move {
184 async_std::task::sleep(Duration::from_millis(2000)).await;
185 notify.notify(1);
186 });
187 let ret = waiter.await;
188 assert_eq!(ret, 1);
189 });
190 }
191
192 #[test]
193 fn notify_waiter_drop_before_ready_is_canceled() {
194 let (notify, waiter) = Notify::<u32>::new();
195 drop(waiter);
196 assert!(notify.is_canceled());
197 }
198
199 #[test]
200 fn repeated_set_complete_keeps_first_value() {
201 async_std::task::block_on(async {
202 let notify_future = NotifyFuture::<u32>::new();
203 let notifier = notify_future.clone();
204
205 notifier.set_complete(1);
206 notifier.set_complete(2);
207
208 let ret = notify_future.await;
209 assert_eq!(ret, 1);
210 });
211 }
212}