Skip to main content

simple_semaphore/
lib.rs

1use std::{
2    sync::{
3        Arc, Condvar, Mutex,
4        atomic::{AtomicUsize, Ordering},
5    },
6    thread::available_parallelism,
7};
8
9/// A Semaphore maintains the number of permits it is still allowed to give.
10///
11/// * When `acquire()` is called and the semaphore still has permits to give, it will return a `Permit`. If there are no permits that can be given, it will wait for one permit to be given back from a thread so that it can return a new `Permit`.
12/// * When `try_acquire()` is called and the semaphore still has permits to give, it will return `Some(Permit)`. If there are no permits that can be given, it will return `None`.
13#[derive(Debug)]
14pub struct Semaphore {
15    permits: Arc<AtomicUsize>,
16    condvar: Condvar,
17    mutex: Mutex<()>,
18}
19
20impl Semaphore {
21    /// Returns a new `Arc<Semaphore>` with the limit of permits chosen by you.
22    pub fn new(permits: usize) -> Arc<Self> {
23        Arc::new(Semaphore {
24            permits: Arc::new(AtomicUsize::new(permits)),
25            condvar: Condvar::new(),
26            mutex: Mutex::new(()),
27        })
28    }
29
30    /// Returns a new `Arc<Semaphore>` with the limit of permits set to the machine's parallelism value, usually CPU cores.
31    pub fn new_available_parallelism() -> Result<Arc<Self>, String> {
32        match available_parallelism() {
33            Ok(parallelism) => Ok(Arc::new(Semaphore {
34                permits: Arc::new(AtomicUsize::new(parallelism.get())),
35                condvar: Condvar::new(),
36                mutex: Mutex::new(()),
37            })),
38            Err(err) => Err(err.to_string()),
39        }
40    }
41
42    /// Returns the number of available permits
43    pub fn available_permits(self: &Arc<Self>) -> usize {
44        self.permits.load(Ordering::Relaxed)
45    }
46
47    /// Tries to get a `Permit`. If no more permits can be given, it will wait for one permit to be given back from a thread so that it can return a new `Permit`.
48    #[allow(unused_must_use)]
49    pub fn acquire(self: &Arc<Self>) -> Permit {
50        loop {
51            if self.permits.load(Ordering::Acquire) != 0 {
52                self.permits.fetch_sub(1, Ordering::AcqRel);
53                return Permit {
54                    semaphore: Arc::clone(self),
55                };
56            }
57            let guard = self.mutex.lock().unwrap();
58            self.condvar.wait(guard).unwrap();
59        }
60    }
61
62    /// Tries to get a `Option<Permit>`. If no more permits can be given, it will return `None`.
63    pub fn try_acquire(self: &Arc<Self>) -> Option<Permit> {
64        if self.permits.load(Ordering::Acquire) != 0 {
65            self.permits.fetch_sub(1, Ordering::Release);
66            return Some(Permit {
67                semaphore: Arc::clone(self),
68            });
69        }
70        None
71    }
72
73    /// Releases a permit. This is what `drop()` on `Permit` calls, ideally use `drop(permit)`.
74    pub fn release(&self) {
75        self.permits.fetch_add(1, Ordering::Release);
76        self.condvar.notify_one();
77    }
78}
79
80/// A permit that holds the Semaphore, so that `drop(permit)` can be called.
81#[derive(Debug)]
82pub struct Permit {
83    semaphore: Arc<Semaphore>,
84}
85
86impl Drop for Permit {
87    /// Releases the permit.
88    fn drop(&mut self) {
89        self.semaphore.release();
90    }
91}