dark_std/sync/wg.rs
1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Arc;
3
4/// WaitGroup impl use channel,it's also both support sync and async
5/// how to use?
6///
7/// * on tokio
8/// ```rust
9/// use dark_std::sync::WaitGroup;
10/// use std::time::Duration;
11/// use tokio::time::sleep;
12/// #[tokio::main]
13/// async fn main() {
14/// let wg = WaitGroup::new();
15/// let wg2 = wg.clone();
16/// tokio::spawn(async move {
17/// sleep(Duration::from_secs(1)).await;
18/// drop(wg2);
19/// });
20/// wg.wait_async().await;
21/// println!("all done");
22/// }
23/// ```
24/// * on thread
25/// ```rust
26/// use dark_std::sync::WaitGroup;
27/// use std::time::Duration;
28/// use std::thread::sleep;
29///
30/// fn main() {
31/// let wg = WaitGroup::new();
32/// let wg2 = wg.clone();
33/// std::thread::spawn(move ||{
34/// sleep(Duration::from_secs(1));
35/// drop(wg2);
36/// });
37/// wg.wait();
38/// println!("all done");
39/// }
40/// ```
41pub struct WaitGroup {
42 pub total: Arc<AtomicU64>,
43 pub recv: Arc<flume::Receiver<u64>>,
44 pub send: Arc<flume::Sender<u64>>,
45}
46
47impl Clone for WaitGroup {
48 fn clone(&self) -> Self {
49 self.add(1);
50 Self {
51 total: self.total.clone(),
52 recv: self.recv.clone(),
53 send: self.send.clone(),
54 }
55 }
56}
57
58impl WaitGroup {
59 pub fn new() -> Self {
60 let (s, r) = flume::unbounded();
61 Self {
62 total: Arc::new(AtomicU64::new(0)),
63 recv: Arc::new(r),
64 send: Arc::new(s),
65 }
66 }
67
68 pub fn add(&self, v: u64) {
69 let current = self.total.fetch_or(0, Ordering::SeqCst);
70 self.total.store(current + v, Ordering::SeqCst);
71 }
72
73 pub async fn wait_async(&self) {
74 if self.total.load(Ordering::Relaxed)==0{
75 return;
76 }
77 let mut total;
78 let mut current = 0;
79 loop {
80 match self.recv.recv_async().await {
81 Ok(v) => {
82 current += v;
83 total = self.total.fetch_or(0, Ordering::SeqCst);
84 if current >= total {
85 break;
86 }
87 }
88 Err(_) => {
89 break;
90 }
91 }
92 }
93 }
94
95 pub fn wait(&self) {
96 if self.total.load(Ordering::Relaxed)==0{
97 return;
98 }
99 let mut total;
100 let mut current = 0;
101 loop {
102 match self.recv.recv() {
103 Ok(v) => {
104 current += v;
105 total = self.total.fetch_or(0, Ordering::SeqCst);
106 if current >= total {
107 break;
108 }
109 }
110 Err(_) => {
111 break;
112 }
113 }
114 }
115 }
116}
117
118impl Drop for WaitGroup {
119 fn drop(&mut self) {
120 let _ = self.send.send(1);
121 }
122}