1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
/// WaitGroup impl use channel,it's also both support sync and async
/// how to use?
///
/// * on tokio
/// ```rust
/// use dark_std::sync::WaitGroup;
/// use std::time::Duration;
/// use tokio::time::sleep;
/// #[tokio::main]
/// async fn main() {
/// let wg = WaitGroup::new();
/// let wg2 = wg.clone();
/// tokio::spawn(async move {
/// sleep(Duration::from_secs(1)).await;
/// drop(wg2);
/// });
/// wg.wait_async().await;
/// println!("all done");
/// }
/// ```
/// * on thread
/// ```rust
/// use dark_std::sync::WaitGroup;
/// use std::time::Duration;
/// use std::thread::sleep;
///
/// fn main() {
/// let wg = WaitGroup::new();
/// let wg2 = wg.clone();
/// std::thread::spawn(move ||{
/// sleep(Duration::from_secs(1));
/// drop(wg2);
/// });
/// wg.wait();
/// println!("all done");
/// }
/// ```
pub struct WaitGroup {
pub total: Arc<AtomicU64>,
pub recv: Arc<flume::Receiver<u64>>,
pub send: Arc<flume::Sender<u64>>,
}
impl Clone for WaitGroup {
fn clone(&self) -> Self {
self.add(1);
Self {
total: self.total.clone(),
recv: self.recv.clone(),
send: self.send.clone(),
}
}
}
impl WaitGroup {
pub fn new() -> Self {
let (s, r) = flume::unbounded();
Self {
total: Arc::new(AtomicU64::new(0)),
recv: Arc::new(r),
send: Arc::new(s),
}
}
pub fn add(&self, v: u64) {
let current = self.total.fetch_or(0, Ordering::SeqCst);
self.total.store(current + v, Ordering::SeqCst);
}
pub async fn wait_async(&self) {
let mut total;
let mut current = 0;
loop {
match self.recv.recv_async().await {
Ok(v) => {
current += v;
total = self.total.fetch_or(0, Ordering::SeqCst);
if current >= total {
break;
}
}
Err(_) => {
break;
}
}
}
}
pub fn wait(&self) {
let mut total;
let mut current = 0;
loop {
match self.recv.recv() {
Ok(v) => {
current += v;
total = self.total.fetch_or(0, Ordering::SeqCst);
if current >= total {
break;
}
}
Err(_) => {
break;
}
}
}
}
}
impl Drop for WaitGroup {
fn drop(&mut self) {
let _ = self.send.send(1);
}
}