async_weighted_semaphore/
semaphore.rs

1use std::{fmt, mem};
2use std::panic::{RefUnwindSafe, UnwindSafe};
3use std::sync::atomic::Ordering::{Relaxed, Acquire};
4use crate::state::ReleaseState::Unlocked;
5use crate::state::AcquireState::{Available, Queued};
6use std::fmt::{Debug, Formatter};
7use crate::state::{AcquireStep, Waiter, Permits, AcquireState, ReleaseState};
8use std::cell::UnsafeCell;
9use crate::{AcquireFuture, TryAcquireError, SemaphoreGuard, AcquireFutureArc, SemaphoreGuardArc};
10use std::marker::{PhantomPinned, PhantomData};
11use crate::waker::AtomicWaker;
12use std::ptr::null;
13use std::sync::Arc;
14use crate::atomic::Atomic;
15use std::mem::size_of;
16use crate::release::ReleaseAction;
17#[allow(unused_imports)] // used by docs
18use crate::errors::PoisonError;
19
20/// An async weighted semaphore. See [crate documentation](index.html) for usage.
21// This implementation encodes state (the available counter, acquire queue, and cancel queue) into
22// multiple atomic variables and linked lists. Concurrent acquires (and concurrent cancels) synchronize
23// by pushing onto a stack with an atomic swap. Releases synchronize with other operations by attempting
24// to acquire a lock. If the lock is successfully acquired, the release can proceed. Otherwise
25// the lock is marked dirty to indicate that there is additional work for the lock owner to do.
26pub struct Semaphore {
27    // The number of available permits or the back of the queue (without next edges).
28    pub(crate) acquire: Atomic<AcquireState>,
29    // A number of releasable permits, and the state of the current release lock.
30    pub(crate) release: Atomic<ReleaseState>,
31    // The front of the queue (with next edges).
32    pub(crate) front: UnsafeCell<*const Waiter>,
33    // The last node swapped from AcquireState (with next edges).
34    pub(crate) middle: UnsafeCell<*const Waiter>,
35    // A stack of nodes that are cancelling.
36    pub(crate) next_cancel: Atomic<*const Waiter>,
37}
38
39unsafe impl Sync for Semaphore {}
40
41unsafe impl Send for Semaphore {}
42
43impl UnwindSafe for Semaphore {}
44
45impl RefUnwindSafe for Semaphore {}
46
47impl Debug for Semaphore {
48    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49        match self.acquire.load(Relaxed) {
50            Available(available) => write!(f, "Semaphore::Ready({:?})", available)?,
51            Queued(_) => match self.release.load(Relaxed) {
52                Unlocked(available) => write!(f, "Semaphore::Blocked({:?})", available)?,
53                _ => write!(f, "Semaphore::Unknown")?,
54            },
55        };
56        Ok(())
57    }
58}
59
60
61impl Semaphore {
62    /// The maximum number of permits that can be made available. This is slightly smaller than
63    /// [`usize::MAX`]. If the number of available permits exceeds this number, it may poison the
64    /// semaphore.
65    /// # Examples
66    /// ```
67    /// # use async_weighted_semaphore::{Semaphore, SemaphoreGuard};
68    /// struct ReadWriteLock(Semaphore);
69    /// impl ReadWriteLock {
70    ///     fn new() -> Self {
71    ///         ReadWriteLock(Semaphore::new(Semaphore::MAX_AVAILABLE))
72    ///     }
73    ///     // Acquire one permit, allowing up to MAX_AVAILABLE concurrent readers.
74    ///     async fn read(&self) -> SemaphoreGuard<'_> {
75    ///         self.0.acquire(1).await.unwrap()
76    ///     }
77    ///     // The writer acquires all the permits, prevent any concurrent writers or readers. The
78    ///     // first-in-first-out priority policy prevents writer starvation.
79    ///     async fn write(&self) -> SemaphoreGuard<'_> {
80    ///         self.0.acquire(Semaphore::MAX_AVAILABLE).await.unwrap()
81    ///     }
82    /// }
83    /// ```
84    pub const MAX_AVAILABLE: usize = (1 << (size_of::<usize>() * 8 - 3)) - 1;
85
86    /// Create a new semaphore with an initial number of permits.
87    /// # Examples
88    /// ```
89    /// use async_weighted_semaphore::Semaphore;
90    /// let semaphore = Semaphore::new(1024);
91    /// ```
92    pub fn new(initial: usize) -> Self {
93        Semaphore {
94            acquire: Atomic::new(Available(Permits::new(initial))),
95            release: Atomic::new(Unlocked(Permits::new(0))),
96            front: UnsafeCell::new(null()),
97            middle: UnsafeCell::new(null()),
98            next_cancel: Atomic::new(null()),
99        }
100    }
101
102    /// Wait until there are no older pending calls to [acquire](#method.acquire) and at least `amount` permits available.
103    /// Then consume the requested permits and return a [`SemaphoreGuard`].
104    /// # Errors
105    /// Returns [`PoisonError`] is the semaphore is poisoned.
106    /// # Examples
107    /// ```
108    /// # use futures::executor::block_on;
109    /// # use std::future::Future;
110    /// use async_weighted_semaphore::Semaphore;
111    /// async fn limit_concurrency(semaphore: &Semaphore, future: impl Future<Output=()>) {
112    ///     let guard = semaphore.acquire(1).await.unwrap();
113    ///     future.await
114    /// }
115    /// ```
116    pub fn acquire(&self, amount: usize) -> AcquireFuture {
117        AcquireFuture(UnsafeCell::new(Waiter {
118            semaphore: self,
119            step: UnsafeCell::new(AcquireStep::Entering),
120            waker: unsafe { AtomicWaker::new() },
121            amount,
122            next: UnsafeCell::new(null()),
123            prev: UnsafeCell::new(null()),
124            next_cancel: UnsafeCell::new(null()),
125        }), PhantomData, PhantomPinned)
126    }
127
128    /// Like [acquire](#method.acquire), but fails if the call would block.
129    /// # Errors
130    /// * Returns [`TryAcquireError::Poisoned`] is the semaphore is poisoned.
131    /// * Returns [`TryAcquireError::WouldBlock`] if a call to `acquire` would have blocked. This can
132    /// occur if there are insufficient available permits or if there is another pending call to acquire.
133    /// # Examples
134    /// ```
135    /// # use futures::executor::block_on;
136    /// # use std::future::Future;
137    /// use async_weighted_semaphore::Semaphore;
138    /// async fn run_if_safe(semaphore: &Semaphore, future: impl Future<Output=()>) {
139    ///     if semaphore.try_acquire(1).is_ok() {
140    ///         future.await
141    ///     }
142    /// }
143    /// ```
144    pub fn try_acquire(&self, amount: usize) -> Result<SemaphoreGuard, TryAcquireError> {
145        let mut current = self.acquire.load(Acquire);
146        loop {
147            match current {
148                Queued(_) => return Err(TryAcquireError::WouldBlock),
149                Available(available) => {
150                    let available = available.into_usize().ok_or(TryAcquireError::Poisoned)?;
151                    if available < amount {
152                        return Err(TryAcquireError::WouldBlock);
153                    }
154                    if self.acquire.cmpxchg_weak_acqrel(&mut current, Available(Permits::new(available - amount))) {
155                        return Ok(SemaphoreGuard::new(self, amount));
156                    }
157                }
158            }
159        }
160    }
161
162    /// Like [acquire](#method.acquire), but takes an [`Arc`] `<Semaphore>` and returns a guard that is `'static`, [`Send`] and [`Sync`].
163    /// # Examples
164    /// ```
165    /// # use async_weighted_semaphore::{Semaphore, PoisonError, SemaphoreGuardArc};
166    /// # use std::sync::Arc;
167    /// use async_channel::{Sender, SendError};
168    /// // Limit size of a producer-consumer queue
169    /// async fn send<T>(semaphore: &Arc<Semaphore>,
170    ///                  sender: &Sender<(SemaphoreGuardArc, T)>,
171    ///                  message: T
172    ///         ) -> Result<(), SendError<T>>{
173    ///     match semaphore.acquire_arc(1).await {
174    ///         // A semaphore can be poisoned to prevent deadlock when a channel closes.
175    ///         Err(PoisonError) => Err(SendError(message)),
176    ///         Ok(guard) => match sender.send((guard, message)).await{
177    ///             Err(SendError((guard, message))) => Err(SendError(message)),
178    ///             Ok(()) => Ok(())
179    ///         }
180    ///     }
181    /// }
182    /// ```
183    pub fn acquire_arc(self: &Arc<Self>, amount: usize) -> AcquireFutureArc {
184        AcquireFutureArc {
185            arc: self.clone(),
186            inner: unsafe { mem::transmute::<AcquireFuture, AcquireFuture>(self.acquire(amount)) },
187        }
188    }
189
190
191    /// Like [try_acquire](#method.try_acquire), but takes an [`Arc`] `<Semaphore>`, and returns a guard that is `'static`,
192    /// [`Send`] and [`Sync`].
193    /// # Examples
194    /// ```
195    /// # use async_weighted_semaphore::{Semaphore, TryAcquireError, SemaphoreGuardArc};
196    /// # use std::sync::Arc;
197    /// use async_channel::{Sender, TrySendError};
198    /// // Limit size of a producer-consumer queue
199    /// async fn try_send<T>(semaphore: &Arc<Semaphore>,
200    ///                  sender: &Sender<(SemaphoreGuardArc, T)>,
201    ///                  message: T
202    ///         ) -> Result<(), TrySendError<T>>{
203    ///     match semaphore.try_acquire_arc(1) {
204    ///         Err(TryAcquireError::WouldBlock) => Err(TrySendError::Full(message)),
205    ///         // A semaphore can be poisoned to prevent deadlock when a channel closes.
206    ///         Err(TryAcquireError::Poisoned) => Err(TrySendError::Closed(message)),
207    ///         Ok(guard) => match sender.try_send((guard, message)) {
208    ///             Err(TrySendError::Closed((guard, message))) => Err(TrySendError::Closed(message)),
209    ///             Err(TrySendError::Full((guard, message))) => Err(TrySendError::Full(message)),
210    ///             Ok(()) => Ok(())
211    ///         }
212    ///     }
213    /// }
214    /// ```
215    pub fn try_acquire_arc(self: &Arc<Self>, amount: usize) -> Result<SemaphoreGuardArc, TryAcquireError> {
216        let guard = self.try_acquire(amount)?;
217        let result = SemaphoreGuardArc::new(self.clone(), amount);
218        guard.forget();
219        Ok(result)
220    }
221
222    /// Return `amount` permits to the semaphore. This will eventually wake any calls to [acquire](#method.acquire)
223    /// that can succeed with the additional permits. Calling `release` often makes sense after calling
224    /// [`SemaphoreGuard::forget`] or when using the semaphore to signal the number of elements that
225    /// are available for processing.
226    /// # Examples
227    /// ```
228    /// # use async_weighted_semaphore::{Semaphore, TryAcquireError};
229    /// use async_channel::{Receiver, RecvError};
230    /// // Limit size of a producer-consumer queue
231    /// async fn recv<T>(semaphore: &Semaphore, recv: &Receiver<T>) -> Result<T, RecvError>{
232    ///     let result = recv.recv().await?;
233    ///     // Note that this only guards elements in the queue, not those being processed after the
234    ///     // queue.
235    ///     semaphore.release(1);
236    ///     Ok(result)
237    /// }
238    /// ```
239    pub fn release(&self, amount: usize) {
240        unsafe {
241            ReleaseAction { sem: self, releasable: Permits::new(amount) }.release();
242        }
243    }
244
245    /// Poison the semaphore, causing all pending and future calls to `acquire` to fail immediately.
246    /// This can be used to unblock pending acquires when the guarded operation would fail anyway.
247    /// # Examples
248    /// ```
249    /// # use async_weighted_semaphore::{Semaphore, TryAcquireError};
250    /// # use std::sync::Arc;
251    /// # use async_std::sync::Mutex;
252    /// use async_channel::{Receiver, RecvError};
253    /// async fn consume(semaphore: &Semaphore, receiver: Receiver<usize>){
254    ///     while let Ok(x) = receiver.recv().await {
255    ///         println!("{:?}", x);
256    ///         semaphore.release(1);
257    ///     }
258    ///     // There will be no more calls to recv, so unblock all senders.
259    ///     semaphore.poison();
260    /// }
261    /// ```
262    pub fn poison(&self) {
263        unsafe {
264            ReleaseAction { sem: self, releasable: Permits::poison() }.release();
265        }
266    }
267}