1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
use event_listener::Event;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Debug)]
pub(crate) struct SemaphoreInner {
count: AtomicUsize,
event: Event,
}
impl SemaphoreInner {
pub const fn new(n: usize) -> Self {
Self {
count: AtomicUsize::new(n),
event: Event::new(),
}
}
pub fn try_acquire(&self) -> bool {
let mut count = self.count.load(Ordering::Acquire);
loop {
if count == 0 {
return false;
}
match self.count.compare_exchange_weak(
count,
count - 1,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(c) => count = c,
}
}
}
pub async fn acquire(&self) {
let mut listener = None;
loop {
if self.try_acquire() {
return;
}
match listener.take() {
None => listener = Some(self.event.listen()),
Some(l) => l.await,
}
}
}
pub fn add_permits(&self, n: usize) {
self.count.fetch_add(n, Ordering::AcqRel);
self.event.notify(n);
}
}
/// A counter for limiting the number of concurrent operations.
#[derive(Debug, Clone)]
pub struct Semaphore {
inner: Arc<SemaphoreInner>,
}
unsafe impl Send for Semaphore {}
impl Semaphore {
/// Creates a new semaphore with a limit of `n` concurrent operations.
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// let s = Semaphore::new(5);
/// ```
pub fn new(n: usize) -> Semaphore {
Semaphore {
inner: Arc::new(SemaphoreInner::new(n)),
}
}
/// Attempts to get a permit for a concurrent operation.
///
/// Return whether permit has been acquired
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let s = Semaphore::new(2);
///
/// s.acquire().await;
/// s.acquire().await;
///
/// assert!(!s.try_acquire());
/// s.add_permits(1);
/// assert!(s.try_acquire());
/// # });
/// ```
pub fn try_acquire(&self) -> bool {
self.inner.try_acquire()
}
/// Waits for a permit for a concurrent operation.
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let s = Semaphore::new(2);
///
/// s.acquire().await;
/// # });
/// ```
pub async fn acquire(&self) {
self.inner.acquire().await
}
/// Add permit for a concurrent operations
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// let s = Semaphore::new(0);
///
/// assert!(!s.try_acquire());
/// s.add_permits(1);
/// assert!(s.try_acquire());
/// ```
pub fn add_permits(&self, n: usize) {
self.inner.add_permits(n)
}
}