async_foundation/common/
ready_future.rs1use futures::future::FusedFuture;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6
7use crate::common::ready_future_state::ReadyFutureState;
8
9use super::ready_future_state::ReadyFutureResult;
10
11pub type ReadyFutureStateSafe<T> = Arc<Mutex<ReadyFutureState<T>>>;
12
13pub struct ReadyFuture<T> {
20 shared_state: ReadyFutureStateSafe<T>,
21}
22
23impl<T> ReadyFuture<T> {
24 pub fn new() -> Self {
25 Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new())))
26 }
27
28 pub fn new_completed(value: T) -> Self {
29 Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new_completed(value))))
30 }
31
32 pub fn with_shared_state(shared_state: ReadyFutureStateSafe<T>) -> Self {
33 ReadyFuture { shared_state }
34 }
35
36 pub fn clone_state(&self) -> ReadyFutureStateSafe<T> {
37 self.shared_state.clone()
38 }
39
40 pub(crate) fn get_state(&self) -> std::sync::MutexGuard<'_, ReadyFutureState<T>> {
41 self.shared_state.lock().unwrap()
42 }
43
44 pub fn new_resolved(val: T) -> Self {
45 let result = Self::new();
46 result.get_state().complete(val);
47 result
48 }
49
50 pub fn complete(&self, val: T) {
51 self.get_state().complete(val)
52 }
53
54 pub fn terminate(&self) {
55 self.get_state().terminate()
56 }
57
58 pub fn is_pending(&self) -> bool {
59 self.get_state().is_pending()
60 }
61
62 pub fn is_fulfilled(&self) -> bool {
63 self.get_state().is_fulfilled()
64 }
65
66 pub fn is_completed(&self) -> bool {
67 self.get_state().is_completed()
68 }
69
70 pub fn is_aborted(&self) -> bool {
71 self.get_state().is_aborted()
72 }
73
74 pub fn is_timeouted(&self) -> bool {
75 self.get_state().is_timeouted()
76 }
77
78 pub fn is_terminated(&self) -> bool {
79 self.get_state().is_terminated()
80 }
81}
82
83impl<T> Future for ReadyFuture<T> {
84 type Output = ReadyFutureResult<T>;
85 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86 let mut shared_state = self.shared_state.lock().unwrap();
87 if shared_state.is_fulfilled() {
88 let mut result = ReadyFutureResult::Terminated;
89 std::mem::swap(&mut result, &mut shared_state.result);
90 Poll::Ready(result)
91 } else {
92 if let None = shared_state.waker {
93 shared_state.waker = Some(cx.waker().clone());
94 }
95 Poll::Pending
96 }
97 }
98}
99
100impl<T> Clone for ReadyFuture<T> {
101 fn clone(&self) -> Self {
102 ReadyFuture {
103 shared_state: self.clone_state(),
104 }
105 }
106}
107
108impl<T> FusedFuture for ReadyFuture<T> {
109 fn is_terminated(&self) -> bool {
110 self.shared_state.lock().unwrap().is_terminated()
111 }
112}
113
114#[cfg(test)]
115mod test {
116 use super::*;
117 use futures::{executor::block_on, select};
118
119 use std::pin::Pin;
120 use std::sync::Arc;
121 use std::sync::atomic::{AtomicBool, Ordering};
122 use std::task::{Context, Poll, Wake};
123
124 use super::{ReadyFuture, ReadyFutureResult};
125
126 struct TestWaker {
127 woken: Arc<AtomicBool>,
128 }
129
130 impl Wake for TestWaker {
131 fn wake(self: Arc<Self>) {
132 self.woken.store(true, Ordering::SeqCst);
133 }
134 }
135
136 #[test]
137 fn test_new_fulfilled() {
138 let f = ReadyFuture::new_resolved(42_usize);
139 let result = block_on(f.clone());
140 assert!(matches!(result, ReadyFutureResult::Completed(42)));
141 match result {
142 ReadyFutureResult::Completed(val) => assert_eq!(val, 42),
143 _ => unreachable!(),
144 }
145
146 let result = block_on(f);
148 assert!(matches!(result, ReadyFutureResult::Terminated));
149 }
150
151 #[test]
152 fn test_pending_state() {
153 let f = ReadyFuture::<usize>::new();
154 assert!(f.is_pending(), "Future should be pending initially");
155
156 let state = f.get_state();
157 assert!(!state.is_fulfilled(), "State should not be fulfilled");
158 assert!(!state.is_aborted(), "State should not be aborted");
159 assert!(!state.is_timeouted(), "State should not be timed out");
160 assert!(!state.is_terminated(), "State should not be terminated");
161 }
162
163 #[test]
164 fn test_abort() {
165 let f = ReadyFuture::<usize>::new();
166 {
167 let mut state = f.get_state();
168 state.abort();
169 }
170 let result = block_on(f);
171 assert!(matches!(result, ReadyFutureResult::Aborted));
172 }
173
174 #[test]
175 fn test_timeout() {
176 let f = ReadyFuture::<usize>::new();
177 {
178 let mut state = f.get_state();
179 state.timeout();
180 }
181 let result = block_on(f);
182 assert!(matches!(result, ReadyFutureResult::Timeout));
183 }
184
185 #[test]
186 fn test_terminated() {
187 let mut f = ReadyFuture::new_resolved(1_usize);
188 let mut f_clone = f.clone();
189 block_on(async {
190 let result = select! {
191 _ = f_clone => 0,
192 complete => 100_usize,
193 };
194 assert_eq!(result, 0, "Should resolve immediately");
195 });
196
197 f.terminate();
198 let result = block_on(async {
199 select! {
200 _ = f => { 100 },
201 complete => 200_usize,
202 }
203 });
204 assert_eq!(result, 200, "Terminated future should not resolve");
205 assert!(f.is_terminated(), "Future should be terminated");
206 }
207
208 #[test]
209 fn test_clone_concurrent_access() {
210 let f = ReadyFuture::new();
211 let f_clone1 = f.clone();
212 let f_clone2 = f.clone();
213
214 f.get_state().complete(99_usize);
215
216 let result1 = block_on(f_clone1);
217 assert!(matches!(result1, ReadyFutureResult::Completed(99)));
218 let result2 = block_on(f_clone2);
219 assert!(matches!(result2, ReadyFutureResult::Terminated));
220 let result3 = block_on(f);
221 assert!(matches!(result3, ReadyFutureResult::Terminated));
222 }
223
224 #[test]
225 fn test_waker_invocation() {
226 let f = ReadyFuture::<usize>::new();
227 let woken = Arc::new(AtomicBool::new(false));
228 let waker = Arc::new(TestWaker {
229 woken: woken.clone(),
230 });
231 let waker = std::task::Waker::from(waker);
232 let mut cx = Context::from_waker(&waker);
233
234 let mut f_clone = f.clone();
236 let pinned = Pin::new(&mut f_clone);
237 let result = pinned.poll(&mut cx);
238 assert!(matches!(result, Poll::Pending));
239 assert!(
240 !woken.load(Ordering::SeqCst),
241 "Waker should not be invoked yet"
242 );
243
244 f.get_state().complete(42);
246 assert!(
247 woken.load(Ordering::SeqCst),
248 "Waker should be invoked after fulfill"
249 );
250
251 let result = block_on(f);
253 assert!(matches!(result, ReadyFutureResult::Completed(42)));
254 }
255
256 #[test]
257 fn test_multiple_polls_pending() {
258 let f = ReadyFuture::<usize>::new();
259 let woken = Arc::new(AtomicBool::new(false));
260 let waker = Arc::new(TestWaker {
261 woken: woken.clone(),
262 });
263 let waker = std::task::Waker::from(waker);
264 let mut cx = Context::from_waker(&waker);
265
266 let mut f_clone = f.clone();
267 let pinned = Pin::new(&mut f_clone);
268 assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
269 let mut f_clone = f.clone();
270 let pinned = Pin::new(&mut f_clone);
271 assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
272 assert!(
273 !woken.load(Ordering::SeqCst),
274 "Waker should not be invoked during pending polls"
275 );
276
277 f.get_state().complete(42);
278 assert!(
279 woken.load(Ordering::SeqCst),
280 "Waker should be invoked after fulfill"
281 );
282
283 let result = block_on(f);
284 assert!(matches!(result, ReadyFutureResult::Completed(42)));
285 }
286
287 #[test]
288 fn test_terminated_no_waker() {
289 let f = ReadyFuture::<usize>::new();
290 f.terminate();
291 assert!(f.is_terminated(), "Future should be terminated");
292
293 let woken = Arc::new(AtomicBool::new(false));
294 let waker = Arc::new(TestWaker {
295 woken: woken.clone(),
296 });
297 let waker = std::task::Waker::from(waker);
298 let mut cx = Context::from_waker(&waker);
299
300 let mut f_clone = f.clone();
301 let pinned = Pin::new(&mut f_clone);
302 assert!(matches!(
303 pinned.poll(&mut cx),
304 Poll::Ready(ReadyFutureResult::Terminated)
305 ));
306 assert!(
307 !woken.load(Ordering::SeqCst),
308 "Waker should not be invoked after termination"
309 );
310
311 f.get_state().complete(42);
313 assert!(
314 !woken.load(Ordering::SeqCst),
315 "Waker should not be invoked after termination"
316 );
317 }
318
319 #[test]
320 fn test_completed_no_waker() {
321 let f = ReadyFuture::<usize>::new();
322 f.get_state().complete(1);
323 assert!(f.is_completed(), "Future should be completed");
324 block_on(f);
325 }
326}