use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use tracing::info;
pub struct Workers {
available: Mutex<usize>, notify: Notify, limit: usize, }
impl Workers {
pub fn new(n: usize) -> Result<Arc<Workers>, &'static str> {
if n == 0 {
return Err("n must be > 0");
}
Ok(Arc::new(Workers {
available: Mutex::new(n),
notify: Notify::new(),
limit: n,
}))
}
pub async fn take(&self) {
loop {
let mut available = self.available.lock().await;
info!("worker take, {}", *available);
if *available == 0 {
drop(available);
self.notify.notified().await;
} else {
*available -= 1;
break;
}
}
}
pub async fn give(&self) {
let mut available = self.available.lock().await;
info!("worker give, {}", *available);
*available += 1; self.notify.notify_one(); }
pub async fn wait(&self) {
loop {
{
let available = self.available.lock().await;
if *available == self.limit {
break;
}
}
self.notify.notified().await;
}
info!("worker wait end");
}
pub async fn available(&self) -> usize {
*self.available.lock().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_workers() {
let workers = Arc::new(Workers::new(5).unwrap());
for _ in 0..5 {
let workers = workers.clone();
tokio::spawn(async move {
workers.take().await;
sleep(Duration::from_secs(3)).await;
});
}
for _ in 0..5 {
workers.give().await;
}
sleep(Duration::from_secs(1)).await;
workers.wait().await;
if workers.available().await != workers.limit {
unreachable!();
}
}
}