async_foundation/common/
ready_future.rs

1use futures::future::FusedFuture;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6
7use crate::common::ready_future_state::ReadyFutureState;
8
9use super::ready_future_state::ReadyFutureResult;
10
11pub type ReadyFutureStateSafe<T> = Arc<Mutex<ReadyFutureState<T>>>;
12
13pub struct ReadyFuture<T> {
14    shared_state: ReadyFutureStateSafe<T>,
15}
16
17impl<T> ReadyFuture<T> {
18    pub fn new() -> Self {
19        Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new())))
20    }
21
22    pub fn new_completed(value: T) -> Self {
23        Self::with_shared_state(Arc::new(Mutex::new(ReadyFutureState::new_completed(value))))
24    }
25
26    pub fn with_shared_state(shared_state: ReadyFutureStateSafe<T>) -> Self {
27        ReadyFuture { shared_state }
28    }
29
30    pub fn clone_state(&self) -> ReadyFutureStateSafe<T> {
31        self.shared_state.clone()
32    }
33
34    pub(crate) fn get_state(&self) -> std::sync::MutexGuard<'_, ReadyFutureState<T>> {
35        self.shared_state.lock().unwrap()
36    }
37
38    pub fn new_resolved(val: T) -> Self {
39        let result = Self::new();
40        result.get_state().complete(val);
41        result
42    }
43
44    pub fn complete(&self, val: T) {
45        self.get_state().complete(val)
46    }
47
48    pub fn terminate(&self) {
49        self.get_state().terminate()
50    }
51
52    pub fn is_pending(&self) -> bool {
53        self.get_state().is_pending()
54    }
55
56    pub fn is_fulfilled(&self) -> bool {
57        self.get_state().is_fulfilled()
58    }
59
60    pub fn is_completed(&self) -> bool {
61        self.get_state().is_completed()
62    }
63
64    pub fn is_aborted(&self) -> bool {
65        self.get_state().is_aborted()
66    }
67
68    pub fn is_timeouted(&self) -> bool {
69        self.get_state().is_timeouted()
70    }
71
72    pub fn is_terminated(&self) -> bool {
73        self.get_state().is_terminated()
74    }
75}
76
77impl<T> Future for ReadyFuture<T> {
78    type Output = ReadyFutureResult<T>;
79    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80        let mut shared_state = self.shared_state.lock().unwrap();
81        if shared_state.is_fulfilled() {
82            let mut result = ReadyFutureResult::Terminated;
83            std::mem::swap(&mut result, &mut shared_state.result);
84            Poll::Ready(result)
85        } else {
86            if let None = shared_state.waker {
87                shared_state.waker = Some(cx.waker().clone());
88            }
89            Poll::Pending
90        }
91    }
92}
93
94impl<T> Clone for ReadyFuture<T> {
95    fn clone(&self) -> Self {
96        ReadyFuture {
97            shared_state: self.clone_state(),
98        }
99    }
100}
101
102impl<T> FusedFuture for ReadyFuture<T> {
103    fn is_terminated(&self) -> bool {
104        self.shared_state.lock().unwrap().is_terminated()
105    }
106}
107
108#[cfg(test)]
109mod test {
110    use super::*;
111    use futures::{executor::block_on, select};
112
113    use std::pin::Pin;
114    use std::sync::Arc;
115    use std::sync::atomic::{AtomicBool, Ordering};
116    use std::task::{Context, Poll, Wake};
117
118    use super::{ReadyFuture, ReadyFutureResult};
119
120    struct TestWaker {
121        woken: Arc<AtomicBool>,
122    }
123
124    impl Wake for TestWaker {
125        fn wake(self: Arc<Self>) {
126            self.woken.store(true, Ordering::SeqCst);
127        }
128    }
129
130    #[test]
131    fn test_new_fulfilled() {
132        let f = ReadyFuture::new_resolved(42_usize);
133        let result = block_on(f.clone());
134        assert!(matches!(result, ReadyFutureResult::Completed(42)));
135        match result {
136            ReadyFutureResult::Completed(val) => assert_eq!(val, 42),
137            _ => unreachable!(),
138        }
139
140        // Test cloning and re-polling
141        let result = block_on(f);
142        assert!(matches!(result, ReadyFutureResult::Terminated));
143    }
144
145    #[test]
146    fn test_pending_state() {
147        let f = ReadyFuture::<usize>::new();
148        assert!(f.is_pending(), "Future should be pending initially");
149
150        let state = f.get_state();
151        assert!(!state.is_fulfilled(), "State should not be fulfilled");
152        assert!(!state.is_aborted(), "State should not be aborted");
153        assert!(!state.is_timeouted(), "State should not be timed out");
154        assert!(!state.is_terminated(), "State should not be terminated");
155    }
156
157    #[test]
158    fn test_abort() {
159        let f = ReadyFuture::<usize>::new();
160        {
161            let mut state = f.get_state();
162            state.abort();
163        }
164        let result = block_on(f);
165        assert!(matches!(result, ReadyFutureResult::Aborted));
166    }
167
168    #[test]
169    fn test_timeout() {
170        let f = ReadyFuture::<usize>::new();
171        {
172            let mut state = f.get_state();
173            state.timeout();
174        }
175        let result = block_on(f);
176        assert!(matches!(result, ReadyFutureResult::Timeout));
177    }
178
179    #[test]
180    fn test_terminated() {
181        let mut f = ReadyFuture::new_resolved(1_usize);
182        let mut f_clone = f.clone();
183        block_on(async {
184            let result = select! {
185                _ = f_clone => 0,
186                complete => 100_usize,
187            };
188            assert_eq!(result, 0, "Should resolve immediately");
189        });
190
191        f.terminate();
192        let result = block_on(async {
193            select! {
194                _ = f => { 100 },
195                complete => 200_usize,
196            }
197        });
198        assert_eq!(result, 200, "Terminated future should not resolve");
199        assert!(f.is_terminated(), "Future should be terminated");
200    }
201
202    #[test]
203    fn test_clone_concurrent_access() {
204        let f = ReadyFuture::new();
205        let f_clone1 = f.clone();
206        let f_clone2 = f.clone();
207
208        f.get_state().complete(99_usize);
209
210        let result1 = block_on(f_clone1);
211        assert!(matches!(result1, ReadyFutureResult::Completed(99)));
212        let result2 = block_on(f_clone2);
213        assert!(matches!(result2, ReadyFutureResult::Terminated));
214        let result3 = block_on(f);
215        assert!(matches!(result3, ReadyFutureResult::Terminated));
216    }
217
218    #[test]
219    fn test_waker_invocation() {
220        let f = ReadyFuture::<usize>::new();
221        let woken = Arc::new(AtomicBool::new(false));
222        let waker = Arc::new(TestWaker {
223            woken: woken.clone(),
224        });
225        let waker = std::task::Waker::from(waker);
226        let mut cx = Context::from_waker(&waker);
227
228        // Poll while pending, should store waker
229        let mut f_clone = f.clone();
230        let pinned = Pin::new(&mut f_clone);
231        let result = pinned.poll(&mut cx);
232        assert!(matches!(result, Poll::Pending));
233        assert!(
234            !woken.load(Ordering::SeqCst),
235            "Waker should not be invoked yet"
236        );
237
238        // Fulfill the future, should invoke waker
239        f.get_state().complete(42);
240        assert!(
241            woken.load(Ordering::SeqCst),
242            "Waker should be invoked after fulfill"
243        );
244
245        // Poll again, should return result
246        let result = block_on(f);
247        assert!(matches!(result, ReadyFutureResult::Completed(42)));
248    }
249
250    #[test]
251    fn test_multiple_polls_pending() {
252        let f = ReadyFuture::<usize>::new();
253        let woken = Arc::new(AtomicBool::new(false));
254        let waker = Arc::new(TestWaker {
255            woken: woken.clone(),
256        });
257        let waker = std::task::Waker::from(waker);
258        let mut cx = Context::from_waker(&waker);
259
260        let mut f_clone = f.clone();
261        let pinned = Pin::new(&mut f_clone);
262        assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
263        let mut f_clone = f.clone();
264        let pinned = Pin::new(&mut f_clone);
265        assert!(matches!(pinned.poll(&mut cx), Poll::Pending));
266        assert!(
267            !woken.load(Ordering::SeqCst),
268            "Waker should not be invoked during pending polls"
269        );
270
271        f.get_state().complete(42);
272        assert!(
273            woken.load(Ordering::SeqCst),
274            "Waker should be invoked after fulfill"
275        );
276
277        let result = block_on(f);
278        assert!(matches!(result, ReadyFutureResult::Completed(42)));
279    }
280
281    #[test]
282    fn test_terminated_no_waker() {
283        let f = ReadyFuture::<usize>::new();
284        f.terminate();
285        assert!(f.is_terminated(), "Future should be terminated");
286
287        let woken = Arc::new(AtomicBool::new(false));
288        let waker = Arc::new(TestWaker {
289            woken: woken.clone(),
290        });
291        let waker = std::task::Waker::from(waker);
292        let mut cx = Context::from_waker(&waker);
293
294        let mut f_clone = f.clone();
295        let pinned = Pin::new(&mut f_clone);
296        assert!(matches!(
297            pinned.poll(&mut cx),
298            Poll::Ready(ReadyFutureResult::Terminated)
299        ));
300        assert!(
301            !woken.load(Ordering::SeqCst),
302            "Waker should not be invoked after termination"
303        );
304
305        // Attempt to fulfill, should not invoke waker
306        f.get_state().complete(42);
307        assert!(
308            !woken.load(Ordering::SeqCst),
309            "Waker should not be invoked after termination"
310        );
311    }
312
313    #[test]
314    fn test_completed_no_waker() {
315        let f = ReadyFuture::<usize>::new();
316        f.get_state().complete(1);
317        assert!(f.is_completed(), "Future should be completed");
318        block_on(f);
319    }
320}