async_foundation/common/
ready_future.rs1use futures::future::FusedFuture;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6
7use crate::common::ready_future_state::ReadyFutureState;
8
9use super::ready_future_state::ReadyFutureResult;
10
11pub type ReadyFutureStateSafe<T> = Arc<Mutex<ReadyFutureState<T>>>;
12
13pub struct ReadyFuture<T> {
14 shared_state: ReadyFutureStateSafe<T>,
15}
16
17impl<T> ReadyFuture<T> {
18 pub fn new() -> Self {
19 Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new())))
20 }
21
22 pub fn new_completed(value: T) -> Self {
23 Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new_completed(value))))
24 }
25
26 pub fn with_shared_state(shared_state: ReadyFutureStateSafe<T>) -> Self {
27 ReadyFuture { shared_state }
28 }
29
30 pub fn clone_state(&self) -> ReadyFutureStateSafe<T> {
31 self.shared_state.clone()
32 }
33
34 pub(crate) fn get_state(&self) -> std::sync::MutexGuard<'_, ReadyFutureState<T>> {
35 self.shared_state.lock().unwrap()
36 }
37
38 pub fn new_resolved(val: T) -> Self {
39 let result = Self::new();
40 result.get_state().complete(val);
41 result
42 }
43
44 pub fn complete(&self, val: T) {
45 self.get_state().complete(val)
46 }
47
48 pub fn terminate(&self) {
49 self.get_state().terminate()
50 }
51
52 pub fn is_pending(&self) -> bool {
53 self.get_state().is_pending()
54 }
55
56 pub fn is_fulfilled(&self) -> bool {
57 self.get_state().is_fulfilled()
58 }
59
60 pub fn is_completed(&self) -> bool {
61 self.get_state().is_completed()
62 }
63
64 pub fn is_aborted(&self) -> bool {
65 self.get_state().is_aborted()
66 }
67
68 pub fn is_timeouted(&self) -> bool {
69 self.get_state().is_timeouted()
70 }
71
72 pub fn is_terminated(&self) -> bool {
73 self.get_state().is_terminated()
74 }
75}
76
77impl<T> Future for ReadyFuture<T> {
78 type Output = ReadyFutureResult<T>;
79 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 let mut shared_state = self.shared_state.lock().unwrap();
81 if shared_state.is_fulfilled() {
82 let mut result = ReadyFutureResult::Terminated;
83 std::mem::swap(&mut result, &mut shared_state.result);
84 Poll::Ready(result)
85 } else {
86 if let None = shared_state.waker {
87 shared_state.waker = Some(cx.waker().clone());
88 }
89 Poll::Pending
90 }
91 }
92}
93
94impl<T> Clone for ReadyFuture<T> {
95 fn clone(&self) -> Self {
96 ReadyFuture {
97 shared_state: self.clone_state(),
98 }
99 }
100}
101
102impl<T> FusedFuture for ReadyFuture<T> {
103 fn is_terminated(&self) -> bool {
104 self.shared_state.lock().unwrap().is_terminated()
105 }
106}
107
108#[cfg(test)]
109mod test {
110 use super::*;
111 use futures::{executor::block_on, select};
112
113 use std::pin::Pin;
114 use std::sync::Arc;
115 use std::sync::atomic::{AtomicBool, Ordering};
116 use std::task::{Context, Poll, Wake};
117
118 use super::{ReadyFuture, ReadyFutureResult};
119
120 struct TestWaker {
121 woken: Arc<AtomicBool>,
122 }
123
124 impl Wake for TestWaker {
125 fn wake(self: Arc<Self>) {
126 self.woken.store(true, Ordering::SeqCst);
127 }
128 }
129
130 #[test]
131 fn test_new_fulfilled() {
132 let f = ReadyFuture::new_resolved(42_usize);
133 let result = block_on(f.clone());
134 assert!(matches!(result, ReadyFutureResult::Completed(42)));
135 match result {
136 ReadyFutureResult::Completed(val) => assert_eq!(val, 42),
137 _ => unreachable!(),
138 }
139
140 let result = block_on(f);
142 assert!(matches!(result, ReadyFutureResult::Terminated));
143 }
144
145 #[test]
146 fn test_pending_state() {
147 let f = ReadyFuture::<usize>::new();
148 assert!(f.is_pending(), "Future should be pending initially");
149
150 let state = f.get_state();
151 assert!(!state.is_fulfilled(), "State should not be fulfilled");
152 assert!(!state.is_aborted(), "State should not be aborted");
153 assert!(!state.is_timeouted(), "State should not be timed out");
154 assert!(!state.is_terminated(), "State should not be terminated");
155 }
156
157 #[test]
158 fn test_abort() {
159 let f = ReadyFuture::<usize>::new();
160 {
161 let mut state = f.get_state();
162 state.abort();
163 }
164 let result = block_on(f);
165 assert!(matches!(result, ReadyFutureResult::Aborted));
166 }
167
168 #[test]
169 fn test_timeout() {
170 let f = ReadyFuture::<usize>::new();
171 {
172 let mut state = f.get_state();
173 state.timeout();
174 }
175 let result = block_on(f);
176 assert!(matches!(result, ReadyFutureResult::Timeout));
177 }
178
179 #[test]
180 fn test_terminated() {
181 let mut f = ReadyFuture::new_resolved(1_usize);
182 let mut f_clone = f.clone();
183 block_on(async {
184 let result = select! {
185 _ = f_clone => 0,
186 complete => 100_usize,
187 };
188 assert_eq!(result, 0, "Should resolve immediately");
189 });
190
191 f.terminate();
192 let result = block_on(async {
193 select! {
194 _ = f => { 100 },
195 complete => 200_usize,
196 }
197 });
198 assert_eq!(result, 200, "Terminated future should not resolve");
199 assert!(f.is_terminated(), "Future should be terminated");
200 }
201
202 #[test]
203 fn test_clone_concurrent_access() {
204 let f = ReadyFuture::new();
205 let f_clone1 = f.clone();
206 let f_clone2 = f.clone();
207
208 f.get_state().complete(99_usize);
209
210 let result1 = block_on(f_clone1);
211 assert!(matches!(result1, ReadyFutureResult::Completed(99)));
212 let result2 = block_on(f_clone2);
213 assert!(matches!(result2, ReadyFutureResult::Terminated));
214 let result3 = block_on(f);
215 assert!(matches!(result3, ReadyFutureResult::Terminated));
216 }
217
218 #[test]
219 fn test_waker_invocation() {
220 let f = ReadyFuture::<usize>::new();
221 let woken = Arc::new(AtomicBool::new(false));
222 let waker = Arc::new(TestWaker {
223 woken: woken.clone(),
224 });
225 let waker = std::task::Waker::from(waker);
226 let mut cx = Context::from_waker(&waker);
227
228 let mut f_clone = f.clone();
230 let pinned = Pin::new(&mut f_clone);
231 let result = pinned.poll(&mut cx);
232 assert!(matches!(result, Poll::Pending));
233 assert!(
234 !woken.load(Ordering::SeqCst),
235 "Waker should not be invoked yet"
236 );
237
238 f.get_state().complete(42);
240 assert!(
241 woken.load(Ordering::SeqCst),
242 "Waker should be invoked after fulfill"
243 );
244
245 let result = block_on(f);
247 assert!(matches!(result, ReadyFutureResult::Completed(42)));
248 }
249
250 #[test]
251 fn test_multiple_polls_pending() {
252 let f = ReadyFuture::<usize>::new();
253 let woken = Arc::new(AtomicBool::new(false));
254 let waker = Arc::new(TestWaker {
255 woken: woken.clone(),
256 });
257 let waker = std::task::Waker::from(waker);
258 let mut cx = Context::from_waker(&waker);
259
260 let mut f_clone = f.clone();
261 let pinned = Pin::new(&mut f_clone);
262 assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
263 let mut f_clone = f.clone();
264 let pinned = Pin::new(&mut f_clone);
265 assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
266 assert!(
267 !woken.load(Ordering::SeqCst),
268 "Waker should not be invoked during pending polls"
269 );
270
271 f.get_state().complete(42);
272 assert!(
273 woken.load(Ordering::SeqCst),
274 "Waker should be invoked after fulfill"
275 );
276
277 let result = block_on(f);
278 assert!(matches!(result, ReadyFutureResult::Completed(42)));
279 }
280
281 #[test]
282 fn test_terminated_no_waker() {
283 let f = ReadyFuture::<usize>::new();
284 f.terminate();
285 assert!(f.is_terminated(), "Future should be terminated");
286
287 let woken = Arc::new(AtomicBool::new(false));
288 let waker = Arc::new(TestWaker {
289 woken: woken.clone(),
290 });
291 let waker = std::task::Waker::from(waker);
292 let mut cx = Context::from_waker(&waker);
293
294 let mut f_clone = f.clone();
295 let pinned = Pin::new(&mut f_clone);
296 assert!(matches!(
297 pinned.poll(&mut cx),
298 Poll::Ready(ReadyFutureResult::Terminated)
299 ));
300 assert!(
301 !woken.load(Ordering::SeqCst),
302 "Waker should not be invoked after termination"
303 );
304
305 f.get_state().complete(42);
307 assert!(
308 !woken.load(Ordering::SeqCst),
309 "Waker should not be invoked after termination"
310 );
311 }
312
313 #[test]
314 fn test_completed_no_waker() {
315 let f = ReadyFuture::<usize>::new();
316 f.get_state().complete(1);
317 assert!(f.is_completed(), "Future should be completed");
318 block_on(f);
319 }
320}