dark_std/sync/
wg.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Arc;
3
4/// WaitGroup impl use channel,it's also both support sync and async
5/// how to use?
6///
7/// * on tokio
8/// ```rust
9/// use dark_std::sync::WaitGroup;
10/// use std::time::Duration;
11/// use tokio::time::sleep;
12/// #[tokio::main]
13/// async fn main() {
14///     let wg = WaitGroup::new();
15///     let wg2 = wg.clone();
16///     tokio::spawn(async move {
17///         sleep(Duration::from_secs(1)).await;
18///         drop(wg2);
19///     });
20///     wg.wait_async().await;
21///     println!("all done");
22/// }
23/// ```
24/// * on thread
25/// ```rust
26/// use dark_std::sync::WaitGroup;
27/// use std::time::Duration;
28/// use std::thread::sleep;
29///
30/// fn main() {
31///     let wg = WaitGroup::new();
32///     let wg2 = wg.clone();
33///     std::thread::spawn(move ||{
34///         sleep(Duration::from_secs(1));
35///         drop(wg2);
36///     });
37///     wg.wait();
38///     println!("all done");
39/// }
40/// ```
41pub struct WaitGroup {
42    pub total: Arc<AtomicU64>,
43    pub recv: Arc<flume::Receiver<u64>>,
44    pub send: Arc<flume::Sender<u64>>,
45}
46
47impl Clone for WaitGroup {
48    fn clone(&self) -> Self {
49        self.add(1);
50        Self {
51            total: self.total.clone(),
52            recv: self.recv.clone(),
53            send: self.send.clone(),
54        }
55    }
56}
57
58impl WaitGroup {
59    pub fn new() -> Self {
60        let (s, r) = flume::unbounded();
61        Self {
62            total: Arc::new(AtomicU64::new(0)),
63            recv: Arc::new(r),
64            send: Arc::new(s),
65        }
66    }
67
68    pub fn add(&self, v: u64) {
69        let current = self.total.fetch_or(0, Ordering::SeqCst);
70        self.total.store(current + v, Ordering::SeqCst);
71    }
72
73    pub async fn wait_async(&self) {
74        if self.total.load(Ordering::Relaxed)==0{
75            return;
76        }
77        let mut total;
78        let mut current = 0;
79        loop {
80            match self.recv.recv_async().await {
81                Ok(v) => {
82                    current += v;
83                    total = self.total.fetch_or(0, Ordering::SeqCst);
84                    if current >= total {
85                        break;
86                    }
87                }
88                Err(_) => {
89                    break;
90                }
91            }
92        }
93    }
94
95    pub fn wait(&self) {
96        if self.total.load(Ordering::Relaxed)==0{
97            return;
98        }
99        let mut total;
100        let mut current = 0;
101        loop {
102            match self.recv.recv() {
103                Ok(v) => {
104                    current += v;
105                    total = self.total.fetch_or(0, Ordering::SeqCst);
106                    if current >= total {
107                        break;
108                    }
109                }
110                Err(_) => {
111                    break;
112                }
113            }
114        }
115    }
116}
117
118impl Drop for WaitGroup {
119    fn drop(&mut self) {
120        let _ = self.send.send(1);
121    }
122}