rustfs_workers/
workers.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use tokio::sync::{Mutex, Notify};
17use tracing::info;
18
19pub struct Workers {
20    available: Mutex<usize>, // Available working slots
21    notify: Notify,          // Used to notify waiting tasks
22    limit: usize,            // Maximum number of concurrent jobs
23}
24
25impl Workers {
26    // Create a Workers object that allows up to n jobs to execute concurrently
27    pub fn new(n: usize) -> Result<Arc<Workers>, &'static str> {
28        if n == 0 {
29            return Err("n must be > 0");
30        }
31
32        Ok(Arc::new(Workers {
33            available: Mutex::new(n),
34            notify: Notify::new(),
35            limit: n,
36        }))
37    }
38
39    // Give a job a chance to be executed
40    pub async fn take(&self) {
41        loop {
42            let mut available = self.available.lock().await;
43            info!("worker take, {}", *available);
44            if *available == 0 {
45                drop(available);
46                self.notify.notified().await;
47            } else {
48                *available -= 1;
49                break;
50            }
51        }
52    }
53
54    // Release a job's slot
55    pub async fn give(&self) {
56        let mut available = self.available.lock().await;
57        info!("worker give, {}", *available);
58        *available += 1; // Increase available slots
59        self.notify.notify_one(); // Notify a waiting task
60    }
61
62    // Wait for all concurrent jobs to complete
63    pub async fn wait(&self) {
64        loop {
65            {
66                let available = self.available.lock().await;
67                if *available == self.limit {
68                    break;
69                }
70            }
71            // Wait until all slots are freed
72            self.notify.notified().await;
73        }
74        info!("worker wait end");
75    }
76
77    pub async fn available(&self) -> usize {
78        *self.available.lock().await
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use std::time::Duration;
86    use tokio::time::sleep;
87
88    #[tokio::test]
89    async fn test_workers() {
90        let workers = Arc::new(Workers::new(5).unwrap());
91
92        for _ in 0..5 {
93            let workers = workers.clone();
94            tokio::spawn(async move {
95                workers.take().await;
96                sleep(Duration::from_secs(3)).await;
97            });
98        }
99
100        for _ in 0..5 {
101            workers.give().await;
102        }
103        // Sleep: wait for spawn task started
104        sleep(Duration::from_secs(1)).await;
105        workers.wait().await;
106        if workers.available().await != workers.limit {
107            unreachable!();
108        }
109    }
110}