1#![deny(
2 clippy::all,
3 clippy::cargo,
4 clippy::missing_inline_in_public_items,
5 clippy::must_use_candidate
6)]
7
8mod waker {
9 #[cfg(feature = "futures-util")]
10 pub use futures_util::task::AtomicWaker;
11
12 #[cfg(all(not(feature = "futures-util"), feature = "atomic-waker"))]
13 pub use atomic_waker::AtomicWaker;
14
15 #[cfg(all(not(feature = "atomic-waker"), not(feature = "futures-util")))]
16 compile_error!("Please select an AtomicWaker implementation: futures-util or atomic-waker");
17}
18
19mod inner;
20use self::inner::InnerPtr;
21
22use std::future::Future;
23use std::pin::Pin;
24use std::task::{Context, Poll};
25
26pub struct WaitGroup(InnerPtr);
27
28#[derive(Clone)]
29pub struct Working(InnerPtr);
30
31impl Working {
32 #[inline]
33 #[must_use]
34 pub fn count(&self) -> usize {
35 self.0.count()
36 }
37}
38
39impl WaitGroup {
40 #[inline]
41 #[must_use]
42 pub fn new() -> Self {
43 Self(InnerPtr::new())
44 }
45
46 #[inline]
47 #[must_use]
48 pub fn working(&self) -> Working {
49 Working(self.0.clone())
50 }
51
52 #[inline]
53 #[must_use]
54 pub fn count(&self) -> usize {
55 self.0.count()
56 }
57
58 #[inline]
59 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<()> {
60 self.0.poll_wait(cx)
61 }
62
63 #[inline]
64 #[must_use]
65 pub fn wait(&self) -> WaitFuture<'_> {
66 WaitFuture(self)
67 }
68
69 #[inline]
70 #[must_use]
71 pub fn wait_owned(self) -> WaitOwnedFuture {
72 WaitOwnedFuture(self)
73 }
74}
75
76impl Default for WaitGroup {
77 #[inline]
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83pub struct WaitOwnedFuture(WaitGroup);
84
85impl Future for WaitOwnedFuture {
86 type Output = ();
87
88 #[inline]
89 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90 self.0.poll_wait(cx)
91 }
92}
93
94impl AsRef<WaitGroup> for WaitOwnedFuture {
95 #[inline]
96 fn as_ref(&self) -> &WaitGroup {
97 &self.0
98 }
99}
100
101pub struct WaitFuture<'a>(&'a WaitGroup);
102
103impl Future for WaitFuture<'_> {
104 type Output = ();
105
106 #[inline]
107 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108 self.0.poll_wait(cx)
109 }
110}
111
112impl AsRef<WaitGroup> for WaitFuture<'_> {
113 #[inline]
114 fn as_ref(&self) -> &WaitGroup {
115 self.0
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::WaitGroup;
122
123 use tokio::time::{sleep, Duration};
124
125 #[test]
126 fn simple() {
127 let wg = WaitGroup::new();
128 let n = 100;
129 let working_vec = vec![wg.working(); n];
130 assert_eq!(wg.count(), n);
131 drop(wg);
132 drop(working_vec);
133 }
134
135 #[tokio::test]
136 async fn tokio_test() {
137 let wg = WaitGroup::new();
138 let n = 100;
139
140 assert_eq!(wg.count(), 0);
141 for _ in 0..n {
142 let working = wg.working();
143 tokio::spawn(async move {
144 sleep(Duration::from_millis(50)).await;
145 drop(working);
146 });
147 }
148 assert_eq!(wg.count(), n);
149 wg.wait().await;
150
151 assert_eq!(wg.count(), 0);
152 for _ in 0..n {
153 let working = wg.working();
154 tokio::spawn(async move {
155 sleep(Duration::from_millis(50)).await;
156 drop(working);
157 });
158 }
159 assert_eq!(wg.count(), n);
160 wg.wait_owned().await;
161 }
162}