async_weighted_semaphore/semaphore.rs
1use std::{fmt, mem};
2use std::panic::{RefUnwindSafe, UnwindSafe};
3use std::sync::atomic::Ordering::{Relaxed, Acquire};
4use crate::state::ReleaseState::Unlocked;
5use crate::state::AcquireState::{Available, Queued};
6use std::fmt::{Debug, Formatter};
7use crate::state::{AcquireStep, Waiter, Permits, AcquireState, ReleaseState};
8use std::cell::UnsafeCell;
9use crate::{AcquireFuture, TryAcquireError, SemaphoreGuard, AcquireFutureArc, SemaphoreGuardArc};
10use std::marker::{PhantomPinned, PhantomData};
11use crate::waker::AtomicWaker;
12use std::ptr::null;
13use std::sync::Arc;
14use crate::atomic::Atomic;
15use std::mem::size_of;
16use crate::release::ReleaseAction;
17#[allow(unused_imports)] // used by docs
18use crate::errors::PoisonError;
19
20/// An async weighted semaphore. See [crate documentation](index.html) for usage.
21// This implementation encodes state (the available counter, acquire queue, and cancel queue) into
22// multiple atomic variables and linked lists. Concurrent acquires (and concurrent cancels) synchronize
23// by pushing onto a stack with an atomic swap. Releases synchronize with other operations by attempting
24// to acquire a lock. If the lock is successfully acquired, the release can proceed. Otherwise
25// the lock is marked dirty to indicate that there is additional work for the lock owner to do.
26pub struct Semaphore {
27 // The number of available permits or the back of the queue (without next edges).
28 pub(crate) acquire: Atomic<AcquireState>,
29 // A number of releasable permits, and the state of the current release lock.
30 pub(crate) release: Atomic<ReleaseState>,
31 // The front of the queue (with next edges).
32 pub(crate) front: UnsafeCell<*const Waiter>,
33 // The last node swapped from AcquireState (with next edges).
34 pub(crate) middle: UnsafeCell<*const Waiter>,
35 // A stack of nodes that are cancelling.
36 pub(crate) next_cancel: Atomic<*const Waiter>,
37}
38
39unsafe impl Sync for Semaphore {}
40
41unsafe impl Send for Semaphore {}
42
43impl UnwindSafe for Semaphore {}
44
45impl RefUnwindSafe for Semaphore {}
46
47impl Debug for Semaphore {
48 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49 match self.acquire.load(Relaxed) {
50 Available(available) => write!(f, "Semaphore::Ready({:?})", available)?,
51 Queued(_) => match self.release.load(Relaxed) {
52 Unlocked(available) => write!(f, "Semaphore::Blocked({:?})", available)?,
53 _ => write!(f, "Semaphore::Unknown")?,
54 },
55 };
56 Ok(())
57 }
58}
59
60
61impl Semaphore {
62 /// The maximum number of permits that can be made available. This is slightly smaller than
63 /// [`usize::MAX`]. If the number of available permits exceeds this number, it may poison the
64 /// semaphore.
65 /// # Examples
66 /// ```
67 /// # use async_weighted_semaphore::{Semaphore, SemaphoreGuard};
68 /// struct ReadWriteLock(Semaphore);
69 /// impl ReadWriteLock {
70 /// fn new() -> Self {
71 /// ReadWriteLock(Semaphore::new(Semaphore::MAX_AVAILABLE))
72 /// }
73 /// // Acquire one permit, allowing up to MAX_AVAILABLE concurrent readers.
74 /// async fn read(&self) -> SemaphoreGuard<'_> {
75 /// self.0.acquire(1).await.unwrap()
76 /// }
77 /// // The writer acquires all the permits, prevent any concurrent writers or readers. The
78 /// // first-in-first-out priority policy prevents writer starvation.
79 /// async fn write(&self) -> SemaphoreGuard<'_> {
80 /// self.0.acquire(Semaphore::MAX_AVAILABLE).await.unwrap()
81 /// }
82 /// }
83 /// ```
84 pub const MAX_AVAILABLE: usize = (1 << (size_of::<usize>() * 8 - 3)) - 1;
85
86 /// Create a new semaphore with an initial number of permits.
87 /// # Examples
88 /// ```
89 /// use async_weighted_semaphore::Semaphore;
90 /// let semaphore = Semaphore::new(1024);
91 /// ```
92 pub fn new(initial: usize) -> Self {
93 Semaphore {
94 acquire: Atomic::new(Available(Permits::new(initial))),
95 release: Atomic::new(Unlocked(Permits::new(0))),
96 front: UnsafeCell::new(null()),
97 middle: UnsafeCell::new(null()),
98 next_cancel: Atomic::new(null()),
99 }
100 }
101
102 /// Wait until there are no older pending calls to [acquire](#method.acquire) and at least `amount` permits available.
103 /// Then consume the requested permits and return a [`SemaphoreGuard`].
104 /// # Errors
105 /// Returns [`PoisonError`] is the semaphore is poisoned.
106 /// # Examples
107 /// ```
108 /// # use futures::executor::block_on;
109 /// # use std::future::Future;
110 /// use async_weighted_semaphore::Semaphore;
111 /// async fn limit_concurrency(semaphore: &Semaphore, future: impl Future<Output=()>) {
112 /// let guard = semaphore.acquire(1).await.unwrap();
113 /// future.await
114 /// }
115 /// ```
116 pub fn acquire(&self, amount: usize) -> AcquireFuture {
117 AcquireFuture(UnsafeCell::new(Waiter {
118 semaphore: self,
119 step: UnsafeCell::new(AcquireStep::Entering),
120 waker: unsafe { AtomicWaker::new() },
121 amount,
122 next: UnsafeCell::new(null()),
123 prev: UnsafeCell::new(null()),
124 next_cancel: UnsafeCell::new(null()),
125 }), PhantomData, PhantomPinned)
126 }
127
128 /// Like [acquire](#method.acquire), but fails if the call would block.
129 /// # Errors
130 /// * Returns [`TryAcquireError::Poisoned`] is the semaphore is poisoned.
131 /// * Returns [`TryAcquireError::WouldBlock`] if a call to `acquire` would have blocked. This can
132 /// occur if there are insufficient available permits or if there is another pending call to acquire.
133 /// # Examples
134 /// ```
135 /// # use futures::executor::block_on;
136 /// # use std::future::Future;
137 /// use async_weighted_semaphore::Semaphore;
138 /// async fn run_if_safe(semaphore: &Semaphore, future: impl Future<Output=()>) {
139 /// if semaphore.try_acquire(1).is_ok() {
140 /// future.await
141 /// }
142 /// }
143 /// ```
144 pub fn try_acquire(&self, amount: usize) -> Result<SemaphoreGuard, TryAcquireError> {
145 let mut current = self.acquire.load(Acquire);
146 loop {
147 match current {
148 Queued(_) => return Err(TryAcquireError::WouldBlock),
149 Available(available) => {
150 let available = available.into_usize().ok_or(TryAcquireError::Poisoned)?;
151 if available < amount {
152 return Err(TryAcquireError::WouldBlock);
153 }
154 if self.acquire.cmpxchg_weak_acqrel(&mut current, Available(Permits::new(available - amount))) {
155 return Ok(SemaphoreGuard::new(self, amount));
156 }
157 }
158 }
159 }
160 }
161
162 /// Like [acquire](#method.acquire), but takes an [`Arc`] `<Semaphore>` and returns a guard that is `'static`, [`Send`] and [`Sync`].
163 /// # Examples
164 /// ```
165 /// # use async_weighted_semaphore::{Semaphore, PoisonError, SemaphoreGuardArc};
166 /// # use std::sync::Arc;
167 /// use async_channel::{Sender, SendError};
168 /// // Limit size of a producer-consumer queue
169 /// async fn send<T>(semaphore: &Arc<Semaphore>,
170 /// sender: &Sender<(SemaphoreGuardArc, T)>,
171 /// message: T
172 /// ) -> Result<(), SendError<T>>{
173 /// match semaphore.acquire_arc(1).await {
174 /// // A semaphore can be poisoned to prevent deadlock when a channel closes.
175 /// Err(PoisonError) => Err(SendError(message)),
176 /// Ok(guard) => match sender.send((guard, message)).await{
177 /// Err(SendError((guard, message))) => Err(SendError(message)),
178 /// Ok(()) => Ok(())
179 /// }
180 /// }
181 /// }
182 /// ```
183 pub fn acquire_arc(self: &Arc<Self>, amount: usize) -> AcquireFutureArc {
184 AcquireFutureArc {
185 arc: self.clone(),
186 inner: unsafe { mem::transmute::<AcquireFuture, AcquireFuture>(self.acquire(amount)) },
187 }
188 }
189
190
191 /// Like [try_acquire](#method.try_acquire), but takes an [`Arc`] `<Semaphore>`, and returns a guard that is `'static`,
192 /// [`Send`] and [`Sync`].
193 /// # Examples
194 /// ```
195 /// # use async_weighted_semaphore::{Semaphore, TryAcquireError, SemaphoreGuardArc};
196 /// # use std::sync::Arc;
197 /// use async_channel::{Sender, TrySendError};
198 /// // Limit size of a producer-consumer queue
199 /// async fn try_send<T>(semaphore: &Arc<Semaphore>,
200 /// sender: &Sender<(SemaphoreGuardArc, T)>,
201 /// message: T
202 /// ) -> Result<(), TrySendError<T>>{
203 /// match semaphore.try_acquire_arc(1) {
204 /// Err(TryAcquireError::WouldBlock) => Err(TrySendError::Full(message)),
205 /// // A semaphore can be poisoned to prevent deadlock when a channel closes.
206 /// Err(TryAcquireError::Poisoned) => Err(TrySendError::Closed(message)),
207 /// Ok(guard) => match sender.try_send((guard, message)) {
208 /// Err(TrySendError::Closed((guard, message))) => Err(TrySendError::Closed(message)),
209 /// Err(TrySendError::Full((guard, message))) => Err(TrySendError::Full(message)),
210 /// Ok(()) => Ok(())
211 /// }
212 /// }
213 /// }
214 /// ```
215 pub fn try_acquire_arc(self: &Arc<Self>, amount: usize) -> Result<SemaphoreGuardArc, TryAcquireError> {
216 let guard = self.try_acquire(amount)?;
217 let result = SemaphoreGuardArc::new(self.clone(), amount);
218 guard.forget();
219 Ok(result)
220 }
221
222 /// Return `amount` permits to the semaphore. This will eventually wake any calls to [acquire](#method.acquire)
223 /// that can succeed with the additional permits. Calling `release` often makes sense after calling
224 /// [`SemaphoreGuard::forget`] or when using the semaphore to signal the number of elements that
225 /// are available for processing.
226 /// # Examples
227 /// ```
228 /// # use async_weighted_semaphore::{Semaphore, TryAcquireError};
229 /// use async_channel::{Receiver, RecvError};
230 /// // Limit size of a producer-consumer queue
231 /// async fn recv<T>(semaphore: &Semaphore, recv: &Receiver<T>) -> Result<T, RecvError>{
232 /// let result = recv.recv().await?;
233 /// // Note that this only guards elements in the queue, not those being processed after the
234 /// // queue.
235 /// semaphore.release(1);
236 /// Ok(result)
237 /// }
238 /// ```
239 pub fn release(&self, amount: usize) {
240 unsafe {
241 ReleaseAction { sem: self, releasable: Permits::new(amount) }.release();
242 }
243 }
244
245 /// Poison the semaphore, causing all pending and future calls to `acquire` to fail immediately.
246 /// This can be used to unblock pending acquires when the guarded operation would fail anyway.
247 /// # Examples
248 /// ```
249 /// # use async_weighted_semaphore::{Semaphore, TryAcquireError};
250 /// # use std::sync::Arc;
251 /// # use async_std::sync::Mutex;
252 /// use async_channel::{Receiver, RecvError};
253 /// async fn consume(semaphore: &Semaphore, receiver: Receiver<usize>){
254 /// while let Ok(x) = receiver.recv().await {
255 /// println!("{:?}", x);
256 /// semaphore.release(1);
257 /// }
258 /// // There will be no more calls to recv, so unblock all senders.
259 /// semaphore.poison();
260 /// }
261 /// ```
262 pub fn poison(&self) {
263 unsafe {
264 ReleaseAction { sem: self, releasable: Permits::poison() }.release();
265 }
266 }
267}