1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
use event_listener::Event;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time as tktime;
#[derive(Debug)]
pub(crate) struct SemaphoreInner {
count: AtomicUsize,
event: Event,
}
impl SemaphoreInner {
pub const fn new(n: usize) -> Self {
Self {
count: AtomicUsize::new(n),
event: Event::new(),
}
}
pub fn try_acquire(&self) -> bool {
let mut count = self.count.load(Ordering::Acquire);
loop {
if count == 0 {
return false;
}
match self.count.compare_exchange_weak(
count,
count - 1,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(c) => count = c,
}
}
}
pub async fn acquire(&self) {
let mut listener = None;
loop {
if self.try_acquire() {
return;
}
match listener.take() {
None => listener = Some(self.event.listen()),
Some(l) => l.await,
}
}
}
pub async fn acquire_timeout(&self, dur: Duration) -> bool {
let processed = Arc::new(AtomicBool::new(false));
let processed2 = Arc::clone(&processed);
macro_rules! mark_process {
($ela:expr) => {
$ela.compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
};
}
let fut = async move {
self.acquire().await;
if !mark_process!(processed2) {
self.add_permits(1);
}
};
match tktime::timeout(dur, fut).await {
Ok(_) => true,
Err(_) => !mark_process!(processed),
}
}
pub fn add_permits(&self, n: usize) {
self.count.fetch_add(n, Ordering::AcqRel);
self.event.notify(n);
}
}
/// A counter for limiting the number of concurrent operations.
#[derive(Debug, Clone)]
pub struct Semaphore {
inner: Arc<SemaphoreInner>,
}
unsafe impl Send for Semaphore {}
impl Semaphore {
/// Creates a new semaphore with a limit of `n` concurrent operations.
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// let s = Semaphore::new(5);
/// ```
pub fn new(n: usize) -> Semaphore {
Semaphore {
inner: Arc::new(SemaphoreInner::new(n)),
}
}
/// Attempts to get a permit for a concurrent operation.
///
/// Return whether permit has been acquired
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let s = Semaphore::new(2);
///
/// s.acquire().await;
/// s.acquire().await;
///
/// assert!(!s.try_acquire());
/// s.add_permits(1);
/// assert!(s.try_acquire());
/// # });
/// ```
pub fn try_acquire(&self) -> bool {
self.inner.try_acquire()
}
/// Waits for a permit for a concurrent operation.
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let s = Semaphore::new(2);
///
/// s.acquire().await;
/// # });
/// ```
pub async fn acquire(&self) {
self.inner.acquire().await
}
/// Waits for a permit for a concurrent operation.
///
/// Return whether permit has been acquired
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
/// use std::time::Duration;
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let s = Semaphore::new(2);
///
/// s.acquire_timeout(Duration::from_secs(1)).await;
/// # });
/// ```
pub async fn acquire_timeout(&self, dur: Duration) -> bool {
self.inner.acquire_timeout(dur).await
}
/// Add permit for a concurrent operations
///
/// # Examples
///
/// ```
/// use async_sema::Semaphore;
///
/// let s = Semaphore::new(0);
///
/// assert!(!s.try_acquire());
/// s.add_permits(1);
/// assert!(s.try_acquire());
/// ```
pub fn add_permits(&self, n: usize) {
self.inner.add_permits(n)
}
}