futwaiter/
lib.rs

1mod waitable;
2
3#[cfg(feature = "global")]
4mod global;
5
6#[cfg(test)]
7mod tests;
8
9pub use crate::waitable::{Waitable, Waiter};
10
11#[cfg(feature = "global")]
12pub use crate::{
13    global::{push, set_global, take},
14    waitable::GlobalWaiter,
15};
16
17use futures::{
18    future::{BoxFuture, JoinAll},
19    FutureExt,
20};
21use parking_lot::Mutex;
22use pin_project_lite::pin_project;
23use std::{
24    future::Future,
25    pin::Pin,
26    sync::Arc,
27    task::{Context, Poll},
28};
29
30pin_project! {
31    /// The container of [`Future`]'s that has the ability to complete them all at once
32    #[derive(Default)]
33    pub struct FutWaiter {
34        futs: Arc<Mutex<Option<Vec<BoxFuture<'static, ()>>>>>,
35        #[pin]
36        join_all: Arc<Mutex<Option<JoinAll<BoxFuture<'static, ()>>>>>,
37    }
38}
39
40impl FutWaiter {
41    pub fn new() -> Self {
42        Default::default()
43    }
44
45    /// Add [`Future`] to container
46    pub fn push<F>(&self, fut: F)
47    where
48        F: Future<Output = ()> + Send + 'static,
49    {
50        let fut = fut.boxed();
51        let mut lock = self.futs.lock();
52        match *lock {
53            Some(ref mut futs) => futs.push(fut),
54            None => *lock = Some(vec![fut]),
55        };
56    }
57}
58
59impl Future for FutWaiter {
60    type Output = ();
61
62    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63        let proj = self.project();
64
65        let mut lock = proj.join_all.lock();
66
67        match lock.take() {
68            Some(mut join_all) => match join_all.poll_unpin(cx).map(|_| ()) {
69                Poll::Ready(_) => {
70                    // If JoinAll was previously registered, then set its value to `None`
71                    *lock = None;
72                    Poll::Ready(())
73                }
74                Poll::Pending => {
75                    // Put the value back
76                    *lock = Some(join_all);
77                    Poll::Pending
78                }
79            },
80            None => {
81                // JoinAll was not previously set, so we take the Futures and try to complete them
82                match proj.futs.lock().take() {
83                    Some(futs) => {
84                        let mut join_all = futures::future::join_all(futs);
85                        match join_all.poll_unpin(cx) {
86                            Poll::Ready(_) => Poll::Ready(()),
87                            Poll::Pending => {
88                                *lock = Some(join_all);
89                                Poll::Pending
90                            }
91                        }
92                    }
93                    None => Poll::Ready(()),
94                }
95            }
96        }
97    }
98}