rustfs_workers/
workers.rs1use std::sync::Arc;
16use tokio::sync::{Mutex, Notify};
17use tracing::info;
18
19pub struct Workers {
20 available: Mutex<usize>, notify: Notify, limit: usize, }
24
25impl Workers {
26 pub fn new(n: usize) -> Result<Arc<Workers>, &'static str> {
28 if n == 0 {
29 return Err("n must be > 0");
30 }
31
32 Ok(Arc::new(Workers {
33 available: Mutex::new(n),
34 notify: Notify::new(),
35 limit: n,
36 }))
37 }
38
39 pub async fn take(&self) {
41 loop {
42 let mut available = self.available.lock().await;
43 info!("worker take, {}", *available);
44 if *available == 0 {
45 drop(available);
46 self.notify.notified().await;
47 } else {
48 *available -= 1;
49 break;
50 }
51 }
52 }
53
54 pub async fn give(&self) {
56 let mut available = self.available.lock().await;
57 info!("worker give, {}", *available);
58 *available += 1; self.notify.notify_one(); }
61
62 pub async fn wait(&self) {
64 loop {
65 {
66 let available = self.available.lock().await;
67 if *available == self.limit {
68 break;
69 }
70 }
71 self.notify.notified().await;
73 }
74 info!("worker wait end");
75 }
76
77 pub async fn available(&self) -> usize {
78 *self.available.lock().await
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use std::time::Duration;
86 use tokio::time::sleep;
87
88 #[tokio::test]
89 async fn test_workers() {
90 let workers = Arc::new(Workers::new(5).unwrap());
91
92 for _ in 0..5 {
93 let workers = workers.clone();
94 tokio::spawn(async move {
95 workers.take().await;
96 sleep(Duration::from_secs(3)).await;
97 });
98 }
99
100 for _ in 0..5 {
101 workers.give().await;
102 }
103 sleep(Duration::from_secs(1)).await;
105 workers.wait().await;
106 if workers.available().await != workers.limit {
107 unreachable!();
108 }
109 }
110}