async_foundation/common/
ready_future.rs

1use futures::future::FusedFuture;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6
7use crate::common::ready_future_state::ReadyFutureState;
8
9use super::ready_future_state::ReadyFutureResult;
10
11pub type ReadyFutureStateSafe<T> = Arc<Mutex<ReadyFutureState<T>>>;
12
13/// A manually-completable future used for coordination between async tasks.
14///
15/// `ReadyFuture<T>` can be created in a pending state and later completed,
16/// aborted, or timed out from external code via its underlying
17/// [`ReadyFutureState`]. Once completed, awaiting it yields a
18/// [`ReadyFutureResult<T>`] describing the outcome.
19pub struct ReadyFuture<T> {
20    shared_state: ReadyFutureStateSafe<T>,
21}
22
23impl<T> ReadyFuture<T> {
24    pub fn new() -> Self {
25        Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new())))
26    }
27
28    pub fn new_completed(value: T) -> Self {
29        Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new_completed(value))))
30    }
31
32    pub fn with_shared_state(shared_state: ReadyFutureStateSafe<T>) -> Self {
33        ReadyFuture { shared_state }
34    }
35
36    pub fn clone_state(&self) -> ReadyFutureStateSafe<T> {
37        self.shared_state.clone()
38    }
39
40    pub(crate) fn get_state(&self) -> std::sync::MutexGuard<'_, ReadyFutureState<T>> {
41        self.shared_state.lock().unwrap()
42    }
43
44    pub fn new_resolved(val: T) -> Self {
45        let result = Self::new();
46        result.get_state().complete(val);
47        result
48    }
49
50    pub fn complete(&self, val: T) {
51        self.get_state().complete(val)
52    }
53
54    pub fn terminate(&self) {
55        self.get_state().terminate()
56    }
57
58    pub fn is_pending(&self) -> bool {
59        self.get_state().is_pending()
60    }
61
62    pub fn is_fulfilled(&self) -> bool {
63        self.get_state().is_fulfilled()
64    }
65
66    pub fn is_completed(&self) -> bool {
67        self.get_state().is_completed()
68    }
69
70    pub fn is_aborted(&self) -> bool {
71        self.get_state().is_aborted()
72    }
73
74    pub fn is_timeouted(&self) -> bool {
75        self.get_state().is_timeouted()
76    }
77
78    pub fn is_terminated(&self) -> bool {
79        self.get_state().is_terminated()
80    }
81}
82
83impl<T> Future for ReadyFuture<T> {
84    type Output = ReadyFutureResult<T>;
85    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86        let mut shared_state = self.shared_state.lock().unwrap();
87        if shared_state.is_fulfilled() {
88            let mut result = ReadyFutureResult::Terminated;
89            std::mem::swap(&mut result, &mut shared_state.result);
90            Poll::Ready(result)
91        } else {
92            if let None = shared_state.waker {
93                shared_state.waker = Some(cx.waker().clone());
94            }
95            Poll::Pending
96        }
97    }
98}
99
100impl<T> Clone for ReadyFuture<T> {
101    fn clone(&self) -> Self {
102        ReadyFuture {
103            shared_state: self.clone_state(),
104        }
105    }
106}
107
108impl<T> FusedFuture for ReadyFuture<T> {
109    fn is_terminated(&self) -> bool {
110        self.shared_state.lock().unwrap().is_terminated()
111    }
112}
113
114#[cfg(test)]
115mod test {
116    use super::*;
117    use futures::{executor::block_on, select};
118
119    use std::pin::Pin;
120    use std::sync::Arc;
121    use std::sync::atomic::{AtomicBool, Ordering};
122    use std::task::{Context, Poll, Wake};
123
124    use super::{ReadyFuture, ReadyFutureResult};
125
126    struct TestWaker {
127        woken: Arc<AtomicBool>,
128    }
129
130    impl Wake for TestWaker {
131        fn wake(self: Arc<Self>) {
132            self.woken.store(true, Ordering::SeqCst);
133        }
134    }
135
136    #[test]
137    fn test_new_fulfilled() {
138        let f = ReadyFuture::new_resolved(42_usize);
139        let result = block_on(f.clone());
140        assert!(matches!(result, ReadyFutureResult::Completed(42)));
141        match result {
142            ReadyFutureResult::Completed(val) => assert_eq!(val, 42),
143            _ => unreachable!(),
144        }
145
146        // Test cloning and re-polling
147        let result = block_on(f);
148        assert!(matches!(result, ReadyFutureResult::Terminated));
149    }
150
151    #[test]
152    fn test_pending_state() {
153        let f = ReadyFuture::<usize>::new();
154        assert!(f.is_pending(), "Future should be pending initially");
155
156        let state = f.get_state();
157        assert!(!state.is_fulfilled(), "State should not be fulfilled");
158        assert!(!state.is_aborted(), "State should not be aborted");
159        assert!(!state.is_timeouted(), "State should not be timed out");
160        assert!(!state.is_terminated(), "State should not be terminated");
161    }
162
163    #[test]
164    fn test_abort() {
165        let f = ReadyFuture::<usize>::new();
166        {
167            let mut state = f.get_state();
168            state.abort();
169        }
170        let result = block_on(f);
171        assert!(matches!(result, ReadyFutureResult::Aborted));
172    }
173
174    #[test]
175    fn test_timeout() {
176        let f = ReadyFuture::<usize>::new();
177        {
178            let mut state = f.get_state();
179            state.timeout();
180        }
181        let result = block_on(f);
182        assert!(matches!(result, ReadyFutureResult::Timeout));
183    }
184
185    #[test]
186    fn test_terminated() {
187        let mut f = ReadyFuture::new_resolved(1_usize);
188        let mut f_clone = f.clone();
189        block_on(async {
190            let result = select! {
191                _ = f_clone => 0,
192                complete => 100_usize,
193            };
194            assert_eq!(result, 0, "Should resolve immediately");
195        });
196
197        f.terminate();
198        let result = block_on(async {
199            select! {
200                _ = f => { 100 },
201                complete => 200_usize,
202            }
203        });
204        assert_eq!(result, 200, "Terminated future should not resolve");
205        assert!(f.is_terminated(), "Future should be terminated");
206    }
207
208    #[test]
209    fn test_clone_concurrent_access() {
210        let f = ReadyFuture::new();
211        let f_clone1 = f.clone();
212        let f_clone2 = f.clone();
213
214        f.get_state().complete(99_usize);
215
216        let result1 = block_on(f_clone1);
217        assert!(matches!(result1, ReadyFutureResult::Completed(99)));
218        let result2 = block_on(f_clone2);
219        assert!(matches!(result2, ReadyFutureResult::Terminated));
220        let result3 = block_on(f);
221        assert!(matches!(result3, ReadyFutureResult::Terminated));
222    }
223
224    #[test]
225    fn test_waker_invocation() {
226        let f = ReadyFuture::<usize>::new();
227        let woken = Arc::new(AtomicBool::new(false));
228        let waker = Arc::new(TestWaker {
229            woken: woken.clone(),
230        });
231        let waker = std::task::Waker::from(waker);
232        let mut cx = Context::from_waker(&waker);
233
234        // Poll while pending, should store waker
235        let mut f_clone = f.clone();
236        let pinned = Pin::new(&mut f_clone);
237        let result = pinned.poll(&mut cx);
238        assert!(matches!(result, Poll::Pending));
239        assert!(
240            !woken.load(Ordering::SeqCst),
241            "Waker should not be invoked yet"
242        );
243
244        // Fulfill the future, should invoke waker
245        f.get_state().complete(42);
246        assert!(
247            woken.load(Ordering::SeqCst),
248            "Waker should be invoked after fulfill"
249        );
250
251        // Poll again, should return result
252        let result = block_on(f);
253        assert!(matches!(result, ReadyFutureResult::Completed(42)));
254    }
255
256    #[test]
257    fn test_multiple_polls_pending() {
258        let f = ReadyFuture::<usize>::new();
259        let woken = Arc::new(AtomicBool::new(false));
260        let waker = Arc::new(TestWaker {
261            woken: woken.clone(),
262        });
263        let waker = std::task::Waker::from(waker);
264        let mut cx = Context::from_waker(&waker);
265
266        let mut f_clone = f.clone();
267        let pinned = Pin::new(&mut f_clone);
268        assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
269        let mut f_clone = f.clone();
270        let pinned = Pin::new(&mut f_clone);
271        assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
272        assert!(
273            !woken.load(Ordering::SeqCst),
274            "Waker should not be invoked during pending polls"
275        );
276
277        f.get_state().complete(42);
278        assert!(
279            woken.load(Ordering::SeqCst),
280            "Waker should be invoked after fulfill"
281        );
282
283        let result = block_on(f);
284        assert!(matches!(result, ReadyFutureResult::Completed(42)));
285    }
286
287    #[test]
288    fn test_terminated_no_waker() {
289        let f = ReadyFuture::<usize>::new();
290        f.terminate();
291        assert!(f.is_terminated(), "Future should be terminated");
292
293        let woken = Arc::new(AtomicBool::new(false));
294        let waker = Arc::new(TestWaker {
295            woken: woken.clone(),
296        });
297        let waker = std::task::Waker::from(waker);
298        let mut cx = Context::from_waker(&waker);
299
300        let mut f_clone = f.clone();
301        let pinned = Pin::new(&mut f_clone);
302        assert!(matches!(
303            pinned.poll(&mut cx),
304            Poll::Ready(ReadyFutureResult::Terminated)
305        ));
306        assert!(
307            !woken.load(Ordering::SeqCst),
308            "Waker should not be invoked after termination"
309        );
310
311        // Attempt to fulfill, should not invoke waker
312        f.get_state().complete(42);
313        assert!(
314            !woken.load(Ordering::SeqCst),
315            "Waker should not be invoked after termination"
316        );
317    }
318
319    #[test]
320    fn test_completed_no_waker() {
321        let f = ReadyFuture::<usize>::new();
322        f.get_state().complete(1);
323        assert!(f.is_completed(), "Future should be completed");
324        block_on(f);
325    }
326}