Skip to main content

wgp/
lib.rs

1#![deny(
2    clippy::all,
3    clippy::cargo,
4    clippy::missing_inline_in_public_items,
5    clippy::must_use_candidate
6)]
7
8mod waker {
9    #[cfg(feature = "futures-util")]
10    pub use futures_util::task::AtomicWaker;
11
12    #[cfg(all(not(feature = "futures-util"), feature = "atomic-waker"))]
13    pub use atomic_waker::AtomicWaker;
14
15    #[cfg(all(not(feature = "atomic-waker"), not(feature = "futures-util")))]
16    compile_error!("Please select an AtomicWaker implementation: futures-util or atomic-waker");
17}
18
19mod inner;
20use self::inner::InnerPtr;
21
22use std::future::Future;
23use std::pin::Pin;
24use std::task::{Context, Poll};
25
26pub struct WaitGroup(InnerPtr);
27
28#[derive(Clone)]
29pub struct Working(InnerPtr);
30
31impl Working {
32    #[inline]
33    #[must_use]
34    pub fn count(&self) -> usize {
35        self.0.count()
36    }
37}
38
39impl WaitGroup {
40    #[inline]
41    #[must_use]
42    pub fn new() -> Self {
43        Self(InnerPtr::new())
44    }
45
46    #[inline]
47    #[must_use]
48    pub fn working(&self) -> Working {
49        Working(self.0.clone())
50    }
51
52    #[inline]
53    #[must_use]
54    pub fn count(&self) -> usize {
55        self.0.count()
56    }
57
58    #[inline]
59    pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<()> {
60        self.0.poll_wait(cx)
61    }
62
63    #[inline]
64    #[must_use]
65    pub fn wait(&self) -> WaitFuture<'_> {
66        WaitFuture(self)
67    }
68
69    #[inline]
70    #[must_use]
71    pub fn wait_owned(self) -> WaitOwnedFuture {
72        WaitOwnedFuture(self)
73    }
74}
75
76impl Default for WaitGroup {
77    #[inline]
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83pub struct WaitOwnedFuture(WaitGroup);
84
85impl Future for WaitOwnedFuture {
86    type Output = ();
87
88    #[inline]
89    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90        self.0.poll_wait(cx)
91    }
92}
93
94impl AsRef<WaitGroup> for WaitOwnedFuture {
95    #[inline]
96    fn as_ref(&self) -> &WaitGroup {
97        &self.0
98    }
99}
100
101pub struct WaitFuture<'a>(&'a WaitGroup);
102
103impl Future for WaitFuture<'_> {
104    type Output = ();
105
106    #[inline]
107    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108        self.0.poll_wait(cx)
109    }
110}
111
112impl AsRef<WaitGroup> for WaitFuture<'_> {
113    #[inline]
114    fn as_ref(&self) -> &WaitGroup {
115        self.0
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::WaitGroup;
122
123    use tokio::time::{sleep, Duration};
124
125    #[test]
126    fn simple() {
127        let wg = WaitGroup::new();
128        let n = 100;
129        let working_vec = vec![wg.working(); n];
130        assert_eq!(wg.count(), n);
131        drop(wg);
132        drop(working_vec);
133    }
134
135    #[tokio::test]
136    async fn tokio_test() {
137        let wg = WaitGroup::new();
138        let n = 100;
139
140        assert_eq!(wg.count(), 0);
141        for _ in 0..n {
142            let working = wg.working();
143            tokio::spawn(async move {
144                sleep(Duration::from_millis(50)).await;
145                drop(working);
146            });
147        }
148        assert_eq!(wg.count(), n);
149        wg.wait().await;
150
151        assert_eq!(wg.count(), 0);
152        for _ in 0..n {
153            let working = wg.working();
154            tokio::spawn(async move {
155                sleep(Duration::from_millis(50)).await;
156                drop(working);
157            });
158        }
159        assert_eq!(wg.count(), n);
160        wg.wait_owned().await;
161    }
162}