1mod waitable;
2
3#[cfg(feature = "global")]
4mod global;
5
6#[cfg(test)]
7mod tests;
8
9pub use crate::waitable::{Waitable, Waiter};
10
11#[cfg(feature = "global")]
12pub use crate::{
13 global::{push, set_global, take},
14 waitable::GlobalWaiter,
15};
16
17use futures::{
18 future::{BoxFuture, JoinAll},
19 FutureExt,
20};
21use parking_lot::Mutex;
22use pin_project_lite::pin_project;
23use std::{
24 future::Future,
25 pin::Pin,
26 sync::Arc,
27 task::{Context, Poll},
28};
29
30pin_project! {
31 #[derive(Default)]
33 pub struct FutWaiter {
34 futs: Arc<Mutex<Option<Vec<BoxFuture<'static, ()>>>>>,
35 #[pin]
36 join_all: Arc<Mutex<Option<JoinAll<BoxFuture<'static, ()>>>>>,
37 }
38}
39
40impl FutWaiter {
41 pub fn new() -> Self {
42 Default::default()
43 }
44
45 pub fn push<F>(&self, fut: F)
47 where
48 F: Future<Output = ()> + Send + 'static,
49 {
50 let fut = fut.boxed();
51 let mut lock = self.futs.lock();
52 match *lock {
53 Some(ref mut futs) => futs.push(fut),
54 None => *lock = Some(vec![fut]),
55 };
56 }
57}
58
59impl Future for FutWaiter {
60 type Output = ();
61
62 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63 let proj = self.project();
64
65 let mut lock = proj.join_all.lock();
66
67 match lock.take() {
68 Some(mut join_all) => match join_all.poll_unpin(cx).map(|_| ()) {
69 Poll::Ready(_) => {
70 *lock = None;
72 Poll::Ready(())
73 }
74 Poll::Pending => {
75 *lock = Some(join_all);
77 Poll::Pending
78 }
79 },
80 None => {
81 match proj.futs.lock().take() {
83 Some(futs) => {
84 let mut join_all = futures::future::join_all(futs);
85 match join_all.poll_unpin(cx) {
86 Poll::Ready(_) => Poll::Ready(()),
87 Poll::Pending => {
88 *lock = Some(join_all);
89 Poll::Pending
90 }
91 }
92 }
93 None => Poll::Ready(()),
94 }
95 }
96 }
97 }
98}