Skip to main content

mountpoint_s3_crt/io/
futures.rs

1//! This module provides an interface to use [Future]s on top of the CRT's event loops and event
2//! loop groups.
3
4use std::fmt::Debug;
5use std::future::Future;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll};
8
9use futures::channel::oneshot;
10use futures::future::BoxFuture;
11use futures::task::ArcWake;
12use futures::{FutureExt, TryFutureExt};
13use thiserror::Error;
14
15use crate::common::allocator::Allocator;
16use crate::common::task_scheduler::{Task, TaskScheduler, TaskStatus};
17
18/// Handle to a spawned future. Can be converted into a [Future] that completes when the task finishes.
19#[derive(Debug)]
20pub struct FutureJoinHandle<T: Send + 'static> {
21    inner: Arc<Mutex<Option<FutureTaskInner<T>>>>,
22
23    receiver: oneshot::Receiver<Result<T, JoinError>>,
24}
25
26impl<T> FutureJoinHandle<T>
27where
28    T: Send + 'static,
29{
30    /// Convert this handle into a future that completes when the spawned future does.
31    pub fn into_future(self) -> impl Future<Output = Result<T, JoinError>> {
32        self.receiver
33            .unwrap_or_else(|oneshot::Canceled| Err(JoinError::Canceled))
34    }
35
36    /// Wait for a result, blocking the current thread.
37    pub fn wait(self) -> Result<T, JoinError> {
38        futures::executor::block_on(self.into_future())
39    }
40
41    /// Cancel this Future. This is best-effort: the Future can continue to run in the background
42    /// after this until the next time that it gets woken up (probably by some CRT callback deep
43    /// down, but it could be anything that calls wake).
44    ///
45    /// However, this _does_ synchronously drop the [Future] provided to `EventLoopGroup::spawn_future`.
46    /// This frees any resources associated with that Future before cancel returns.
47    pub fn cancel(self) {
48        let mut locked = self.inner.lock().unwrap();
49
50        // Cancel the task by dropping the [FutureTaskInner] held by the mutex. The next time the
51        // task is woken up, it will look as though the future has already completed and won't
52        // be able to make any progress.
53        if let Some(inner) = locked.take() {
54            std::mem::drop(inner);
55        }
56    }
57}
58
59/// Internal bookkeeping about a not-yet-completed future.
60struct FutureTaskInner<T: Send + 'static> {
61    /// The [Future] from the client.
62    future: BoxFuture<'static, T>,
63
64    /// A channel to write the result to when the future completes.
65    result_channel: oneshot::Sender<Result<T, JoinError>>,
66}
67
68/// Manual [Debug] implementation since [BoxFuture] doesn't implement Debug.
69impl<T: Debug + Send + 'static> Debug for FutureTaskInner<T> {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("FutureTaskInner")
72            .field("future", &(&self.future as *const BoxFuture<'static, T>))
73            .field("result_channel", &self.result_channel)
74            .finish()
75    }
76}
77
78/// Implements [ArcWake] for Futures spawned on an event loop group.
79struct FutureTaskWaker<S: TaskScheduler, T: Send + 'static> {
80    /// Inner information about the task if it hasn't completed yet. Held behind a [Mutex] (since wake
81    /// can be called from different threads). If the future is not [None], then it is still busy
82    /// and we should call poll again when woken. If it is [None], the future has already
83    /// finished executing.
84    inner: Arc<Mutex<Option<FutureTaskInner<T>>>>,
85
86    /// The [TaskScheduler] that knows how to arrange for [Task]s to be run by the CRT.
87    scheduler: S,
88}
89
90impl<S: TaskScheduler, T: Send + 'static> FutureTaskWaker<S, T> {
91    /// Finish this task with an error. Does not call [Future::poll], and prevents any future
92    /// wake-ups from calling poll either.
93    fn finish_with_error(arc_self: &Arc<Self>, error: JoinError) {
94        let mut locked = arc_self.inner.lock().unwrap();
95
96        if let Some(inner) = locked.take() {
97            // Drop the future before replying, so the client can rely on resources from future
98            // being freed before continuing execution.
99            std::mem::drop(inner.future);
100            let _ = inner.result_channel.send(Err(error));
101        }
102    }
103
104    /// Poll the future associated with this FutureTask. If the future has already completed,
105    /// this does nothing. If it hasn't, then it calls [Future::poll]. If the future is ready, write
106    /// the result back to the synchronous channel. Otherwise wait for someone to poll again.
107    fn poll(arc_self: &Arc<Self>) {
108        // Lock to read the future and call poll on it. Note this will block other tasks if wake was
109        // called multiple times.
110        let mut locked = arc_self.inner.lock().unwrap();
111
112        // Only do anything if there is a future to poll (i.e., it hasn't completed yet).
113        if let Some(mut inner) = locked.take() {
114            // Otherwise poll the client-provided future.
115            let waker = futures::task::waker_ref(arc_self);
116            let context = &mut Context::from_waker(&waker);
117
118            match Future::poll(inner.future.as_mut(), context) {
119                Poll::Ready(value) => {
120                    // Drop the future before replying on the channel. This guarantees that the
121                    // client can rely on values owned by the Future / closure will be dropped
122                    // once the channel has a result on it.
123                    std::mem::drop(inner.future);
124                    let _ = inner.result_channel.send(Ok(value));
125                }
126                Poll::Pending => {
127                    // The future isn't done, so put inner back into the [FutureTask] so that it
128                    // will still be there the next time this is polled.
129                    *locked = Some(inner);
130                }
131            }
132        }
133    }
134}
135
136impl<S: TaskScheduler, T: Send + 'static> ArcWake for FutureTaskWaker<S, T> {
137    /// Wakes the FutureTask by creating a new [Task] to call poll, and scheduling it on the event
138    /// loop group associated with the task.
139    fn wake_by_ref(arc_self: &Arc<Self>) {
140        let task_arc_self = arc_self.clone();
141
142        // Create a [Task] that calls poll. If the CRT tells us that the task is canceled, finishes
143        // the future with an error.
144        let task = Task::init(
145            &Allocator::default(),
146            move |status| match status {
147                TaskStatus::RunReady => FutureTaskWaker::poll(&task_arc_self),
148                TaskStatus::Canceled => FutureTaskWaker::finish_with_error(&task_arc_self, JoinError::Canceled),
149            },
150            "FutureTaskWaker_wake_by_ref",
151        );
152
153        // Schedule the task. If it fails, finish with the error.
154        match arc_self.scheduler.schedule_task_now(task) {
155            Ok(()) => {}
156            Err(err) => FutureTaskWaker::finish_with_error(arc_self, err.into()),
157        }
158    }
159}
160
161/// Trait for things that can spawn futures. For now this is just an extension to the [TaskScheduler] trait.
162pub trait FutureSpawner: crate::private::Sealed {
163    /// Spawn the given [Future] to run asynchronously. This [TaskScheduler] is responsible for
164    /// determining how to run [Task]s in the CRT. This returns a [FutureJoinHandle] that can be
165    /// used to cancel, block on, or await the result.
166    ///
167    /// - If the scheduler is an [crate::io::event_loop::EventLoopGroup], then every time the Future is polled, the CRT
168    ///   will determine the best [crate::io::event_loop::EventLoop] to run on.
169    ///
170    /// - If the scheduler is an [crate::io::event_loop::EventLoop],
171    ///   the Future will be pinned to the core that [crate::io::event_loop::EventLoop] runs on.
172    ///
173    /// - If the scheduler is [crate::common::task_scheduler::BlockingTaskScheduler],
174    ///   then the thread that calls wake will block on [Future::poll].
175    ///   (This is undesirable except in tests,
176    ///   and could cause deadlocks or other issues when combined with other CRT functionality.)
177    fn spawn_future<T>(&self, future: impl Future<Output = T> + Send + 'static) -> FutureJoinHandle<T>
178    where
179        T: Send + 'static;
180}
181
182impl<S: TaskScheduler + Clone> FutureSpawner for S {
183    fn spawn_future<T>(&self, future: impl Future<Output = T> + Send + 'static) -> FutureJoinHandle<T>
184    where
185        T: Send + 'static,
186    {
187        let future = future.boxed();
188
189        let (tx, rx) = oneshot::channel();
190
191        let task_inner = Arc::new(Mutex::new(Some(FutureTaskInner {
192            future,
193            result_channel: tx,
194        })));
195
196        let waker = futures::task::waker(Arc::new(FutureTaskWaker {
197            inner: task_inner.clone(),
198            scheduler: self.clone(),
199        }));
200
201        // Inject a wake to kick-start the Future's execution. (This internally uses the TaskScheduler
202        // to call poll, so this won't block unless the scheduler does.)
203        waker.wake_by_ref();
204
205        FutureJoinHandle {
206            inner: task_inner,
207            receiver: rx,
208        }
209    }
210}
211
212/// Future completion failures
213#[derive(Error, Debug)]
214pub enum JoinError {
215    /// The task was cancelled
216    #[error("The task was cancelled")]
217    Canceled,
218
219    /// Internal error from the AWS Common Runtime
220    #[error("Internal CRT error: {0}")]
221    InternalError(#[from] crate::common::error::Error),
222}
223
224#[cfg(test)]
225mod test {
226    use futures::executor::block_on;
227    use futures::future::join_all;
228    use std::sync::atomic::{AtomicBool, AtomicU64};
229    use std::time::Duration;
230
231    use super::*;
232    use crate::common::allocator::Allocator;
233    use crate::io::event_loop::{EventLoopGroup, EventLoopTimer};
234    use std::sync::atomic::Ordering;
235
236    /// Test that running a small future on an event loop works correctly.
237    #[test]
238    fn test_simple_future() {
239        let allocator = Allocator::default();
240        let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
241
242        let handle = el_group.spawn_future(async {
243            println!("Hello from the future");
244        });
245
246        handle.wait().unwrap();
247    }
248
249    /// Test that spawns a lot of futures and waits for them all to finish, parameterized by the FutureSpawner.
250    fn test_join_all_futures(scheduler: &impl FutureSpawner) {
251        const NUM_FUTURES: u64 = 50_000;
252
253        let counter = Arc::new(AtomicU64::new(0));
254
255        let mut future_handles = vec![];
256
257        for _ in 0..NUM_FUTURES {
258            let counter = counter.clone();
259            future_handles.push(scheduler.spawn_future(async move {
260                counter.fetch_add(1, Ordering::SeqCst);
261            }))
262        }
263
264        let results = block_on(join_all(future_handles.into_iter().map(FutureJoinHandle::into_future)));
265
266        assert_eq!(
267            Arc::strong_count(&counter),
268            1,
269            "all references to the counter except ours should be dropped"
270        );
271
272        // Check that all Futures completed successfully.
273        let results: Result<(), JoinError> = results.into_iter().collect();
274        results.expect("one or more futures failed");
275
276        assert_eq!(counter.load(Ordering::SeqCst), NUM_FUTURES);
277    }
278
279    /// test_join_all_futures using a pinned EventLoop.
280    #[test]
281    fn test_join_all_futures_event_loop() {
282        let allocator = Allocator::default();
283        let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
284        let event_loop = el_group.get_next_loop().unwrap();
285
286        test_join_all_futures(&event_loop);
287    }
288
289    /// test_join_all_futures using an EventLoopGroup.
290    #[test]
291    fn test_join_all_futures_event_loop_group() {
292        let allocator = Allocator::default();
293        let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
294
295        test_join_all_futures(&el_group);
296    }
297
298    /// Test that cancelling a future works.
299    #[test]
300    fn test_cancel_future() {
301        let allocator = Allocator::default();
302        let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
303
304        // Create a long timer to delay the future for some time.
305        let timer = EventLoopTimer::new(&el_group.get_next_loop().unwrap(), Duration::from_secs(20));
306
307        // Set up a flag that will set to true when the timer is finished.
308        let flag = Arc::new(AtomicBool::new(false));
309
310        // Spawn a future that waits for the timer to be done then stores true to the flag.
311        let future_handle = {
312            let flag = flag.clone();
313            el_group.spawn_future(async move {
314                timer.await.expect("failed while awaiting timer");
315                flag.store(true, Ordering::SeqCst);
316            })
317        };
318
319        assert_eq!(
320            Arc::strong_count(&flag),
321            2,
322            "there should be 2 references to flag: ours and the Future's"
323        );
324
325        // Sleep this thread some amount of time (enough for the timer to start running after the first poll).
326        std::thread::sleep(Duration::from_secs(1));
327
328        // Cancel the future
329        future_handle.cancel();
330
331        assert_eq!(
332            Arc::strong_count(&flag),
333            1,
334            "The Future should be dropped at this point"
335        );
336        assert!(
337            !flag.load(Ordering::SeqCst),
338            "flag should still be false after cancellation"
339        );
340    }
341}