lilos_semaphore/
lib.rs

1//! A [counting semaphore] for use with [`lilos`].
2//!
3//! See the docs on [`Semaphore`] for more details.
4//!
5//! [`lilos`]: https://docs.rs/lilos/
6//! [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming)
7
8#![no_std]
9#![warn(
10    elided_lifetimes_in_paths,
11    explicit_outlives_requirements,
12    missing_debug_implementations,
13    missing_docs,
14    semicolon_in_expressions_from_macros,
15    single_use_lifetimes,
16    trivial_casts,
17    trivial_numeric_casts,
18    unreachable_pub,
19    unsafe_op_in_unsafe_fn,
20    unused_qualifications
21)]
22
23use core::pin::Pin;
24use core::sync::atomic::{AtomicUsize, Ordering};
25use lilos_list::List;
26use lilos::atomic::AtomicExt;
27use pin_project::pin_project;
28
29/// A [counting semaphore].
30///
31/// A `Semaphore` gets initialized with a certain number of _permits._
32/// Callers can take one permit from the semaphore using the
33/// [`Semaphore::acquire`] operation, which will block if there are none
34/// available, and wake when one becomes available.
35///
36/// Permits can be added back to the semaphore one at a time using the
37/// [`Semaphore::release`] operation, or in batches using
38/// [`Semaphore::release_multiple`].
39///
40/// Semaphores are useful for restricting concurrent access to something,
41/// particularly in cases where you don't want to restrict it to exactly one
42/// observer (like with a `Mutex`). Two common use cases are:
43///
44/// - To ensure that no more than `N` tasks can make it into a critical section
45/// simultaneously, you'd create a `Semaphore` with `N` permits. Each task would
46/// `acquire` a permit to gain entry, and then `release` it at the end. In this
47/// case, a "permit object" might be fine, because `acquire` and `release` are
48/// both being called in the same context.
49///
50/// - To represent data being generated or received (perhaps over a network),
51/// you'd create a `Semaphore` with 0 permits. Tasks interested in consuming
52/// resources would attempt to `acquire` and block; as data becomes available,
53/// the generating task would `release` permits, potentially in batches, to
54/// unblock the consumers. In this case, `release` and `acquire` are happening
55/// in totally different contexts.
56///
57/// To support _both_ these uses, the `Semaphore` API is different from a lot of
58/// Rust concurrency APIs, and does not return a "permit object" that represents
59/// a permit until dropped. If your use case is closer to the first example, and
60/// you would like the convenience of a permit object managing calls to
61/// `release` for you, have a look at [`ScopedSemaphore`], a thin wrapper that
62/// provides a [`Permit`].
63///
64///
65/// # Getting a semaphore
66///
67/// Like `lilos`'s `Mutex` type, `Semaphore` must be pinned to be useful. So
68/// generally you'll wind up writing something like this:
69///
70/// ```
71/// let scooters = pin!(Semaphore::new(5));
72/// // Drop &mut:
73/// let scooters = scooters.into_ref();
74///
75/// // Check out one scooter from the pool.
76/// scooters.acquire().await;
77/// ```
78///
79/// This crate includes a convenience macro, [`create_semaphore!`], that
80/// basically just wraps up the first two lines:
81///
82/// ```
83/// lilos_semaphore::create_semaphore!(scooters, 5);
84///
85/// // Check out one scooter from the pool.
86/// scooters.acquire().await;
87/// ```
88///
89///
90/// # Fairness
91///
92/// This semaphore implementation is _fair,_ which in this context means
93/// that permits are handed out in the order they're requested. If the
94/// semaphore runs out of permits, tasks requesting permits are queued in
95/// order and will be issued permits in order as they are returned to the
96/// semaphore.
97///
98/// [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming)
99#[derive(Debug)]
100#[pin_project]
101pub struct Semaphore {
102    available: AtomicUsize,
103    #[pin]
104    waiters: List<()>,
105}
106
107impl Semaphore {
108    /// Creates a future that will resolve when it can take a single permit from
109    /// the semaphore. Until then, the future will remain pending (i.e. block).
110    ///
111    /// # Cancellation
112    ///
113    /// Cancel-safe but affects your position in line, to maintain fairness.
114    ///
115    /// If you drop the returned future before it resolves...
116    /// - If it had not successfully acquired a permit, nothing happens.
117    /// - If it had, the permit is released.
118    ///
119    /// Dropping the future and re-calling `acquire` bumps the caller to the
120    /// back of the priority list, to maintain fairness. Otherwise, the result
121    /// is indistinguishable.
122    pub async fn acquire(self: Pin<&Self>) {
123        if self.try_acquire().is_ok() {
124            return;
125        }
126
127        // Add ourselves to the wait list...
128        self.project_ref()
129            .waiters
130            .join_with_cleanup((), || {
131                // This is called when we've been detached from the wait
132                // list, which means a permit was transferred to us, but
133                // we haven't been polled -- and won't ever be polled,
134                // for we are being dropped. This means we need to
135                // release our permit, which might wake another task.
136                self.release();
137            })
138            .await
139    }
140
141    /// Attempts to take a single permit from the semaphore, returning `Ok` if
142    /// one is available immediately, or `Err` if they are all taken.
143    pub fn try_acquire(&self) -> Result<(), NoPermits> {
144        self.available
145            .fetch_update_polyfill(Ordering::Relaxed, Ordering::Relaxed, |a| {
146                a.checked_sub(1)
147            })
148            .map_err(|_| NoPermits)?;
149        Ok(())
150    }
151
152    /// Returns the number of permits available in the semaphore.
153    ///
154    /// Note that this is a _snapshot._ If this returns 4, for instance, it
155    /// doesn't mean you can successfully call `acquire` 4 times without
156    /// blocking, because another acquirer may be racing you.
157    pub fn permits_available(&self) -> usize {
158        self.available.load(Ordering::Relaxed)
159    }
160
161    /// Stuffs one permit back into the semaphore.
162    #[inline(always)]
163    pub fn release(self: Pin<&Self>) {
164        self.release_multiple(1)
165    }
166
167    /// Stuffs one permit back into the semaphore.
168    ///
169    /// Use this if you have called [`core::mem::forget`] on a [`Permit`], when
170    /// you want to restore that permit to the semaphore. Note that this is an
171    /// unusual use case and should only be done with good reason.
172    ///
173    /// It is, however, safe, in the Rust sense.
174    ///
175    /// It's possible to use this operation to increase the total number of
176    /// permits available in the `Semaphore`. That's an even weirder use case,
177    /// so be careful.
178    pub fn release_multiple(self: Pin<&Self>, mut n: usize) {
179        debug_assert!(n > 0);
180
181        let p = self.project_ref();
182        while n > 0 {
183            if !p.waiters.wake_one() {
184                // We have exhausted the list, stop using this strategy.
185                break;
186            }
187            n -= 1;
188        }
189
190        if n > 0 {
191            // Since we're not yielding -- we're not even in an async fn! -- the
192            // only thing concurrent with us is ISRs, which can only wake tasks,
193            // not insert them.
194            //
195            // So the fact that the waiters list was found empty cannot change
196            // during this loop.
197            self.available
198                .fetch_update_polyfill(
199                    Ordering::Relaxed,
200                    Ordering::Relaxed,
201                    // Note that this has a potential overflow on addition. This is
202                    // deliberate, and is why we're not using fetch_add here!
203                    |a| Some(a + n),
204                )
205                .unwrap();
206        }
207    }
208
209    /// Returns an `Semaphore` initialized with `permits` permits.
210    ///
211    /// The result needs to be pinned to be useful, so you'll usually write:
212    ///
213    /// ```
214    /// let semaphore = pin!(Semaphore::new(permit_count));
215    /// let semaphore = semaphore.into_ref();
216    /// ```
217    ///
218    /// See also the convenience macro [`create_semaphore!`].
219    pub const fn new(permits: usize) -> Self {
220        Semaphore {
221            available: AtomicUsize::new(permits),
222            waiters: List::new(),
223        }
224    }
225}
226
227/// Error produced by [`Semaphore::try_acquire`] when no permits were available.
228#[derive(Copy, Clone, Debug, Eq, PartialEq)]
229pub struct NoPermits;
230
231/// Convenience macro for creating a [`Semaphore`] on the stack.
232///
233/// `create_semaphore!(ident, num_permits)` creates a semaphore that initially
234/// contains `num_permits` permits, and assigns it to a local variable called
235/// `ident`. `ident` will have the type `Pin<&Semaphore>`.
236///
237/// Yes, it's weird that this macro creates a local variable, but there's no
238/// good way around it in current Rust. You can, of course, write the
239/// initialization code yourself if you'd prefer --- see the macro source code
240/// for details.
241#[macro_export]
242macro_rules! create_semaphore {
243    ($var:ident, $permits:expr) => {
244        let $var = core::pin::pin!($crate::Semaphore::new($permits));
245        let $var = $var.into_ref();
246    };
247}
248
249/// A [counting semaphore] that uses resource objects to manage permits,
250/// eliminating the need to explicitly call `release` in certain kinds of use
251/// cases.
252///
253/// A `ScopedSemaphore` is almost identical to a [`Semaphore`], but any time a
254/// permit is successfully acquired from a `ScopedSemaphore`, it produces a
255/// [`Permit`] object that represents the lifetime of that permit. When the
256/// `Permit` is dropped, it will be automatically returned to the
257/// `ScopedSemaphore`. This makes the API closer to a traditional Rust mutex
258/// API, but only works in cases where the permits are being acquired and
259/// released in the same context.
260///
261/// See [`Semaphore`] for background and information about fairness.
262///
263/// [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming)
264#[derive(Debug)]
265#[pin_project]
266pub struct ScopedSemaphore {
267    #[pin]
268    inner: Semaphore,
269}
270
271impl ScopedSemaphore {
272    /// Creates a future that will resolve when it can take a single [`Permit`]
273    /// from the semaphore. Until then, the future will remain pending (i.e.
274    /// block).
275    ///
276    /// # Cancellation
277    ///
278    /// Cancel-safe but affects your position in line, to maintain fairness.
279    ///
280    /// If you drop the returned future before it resolves...
281    /// - If it had not successfully acquired a permit, nothing happens.
282    /// - If it had, the permit is released.
283    ///
284    /// Dropping the future and re-calling `acquire` bumps the caller to the
285    /// back of the priority list, to maintain fairness. Otherwise, the result
286    /// is indistinguishable.
287    pub async fn acquire(self: Pin<&Self>) -> Permit<'_> {
288        self.project_ref().inner.acquire().await;
289
290        Permit { semaphore: self }
291    }
292
293    /// Attempts to take a single [`Permit`] from the semaphore, returning
294    /// `Ok(permit)` on success, or `Err` if they are all taken.
295    pub fn try_acquire(self: Pin<&Self>) -> Result<Permit<'_>, NoPermits> {
296        self.inner.try_acquire()?;
297        Ok(Permit { semaphore: self })
298    }
299
300    /// Returns the number of permits available in the semaphore.
301    ///
302    /// Note that this is a _snapshot._ If this returns 4, for instance, it
303    /// doesn't mean you can successfully call `acquire` 4 times without
304    /// blocking, because another acquirer may be racing you.
305    pub fn permits_available(&self) -> usize {
306        self.inner.permits_available()
307    }
308
309    /// Stuffs `n` permits back into the semaphore.
310    ///
311    /// This operation is useful for either increasing the number of permits
312    /// available in an existing semaphore, or restoring permits that were
313    /// hidden from the compiler's view by calling [`core::mem::forget`] on a
314    /// [`Permit`].
315    ///
316    /// If you find yourself using this operation regularly, it may be a sign
317    /// that you want a plain old [`Semaphore`] instead of a `ScopedSemaphore`.
318    pub fn out_of_band_release(self: Pin<&Self>, n: usize) {
319        self.project_ref().inner.release_multiple(n);
320    }
321
322    /// Returns a `ScopedSemaphore` that initially contains `permits` permits.
323    ///
324    /// The result needs to be pinned to be useful, so you'll usually write:
325    ///
326    /// ```
327    /// let semaphore = pin!(ScopedSemaphore::new(permit_count));
328    /// let semaphore = semaphore.into_ref();
329    /// ```
330    ///
331    /// See also the convenience macro [`create_scoped_semaphore!`].
332    pub const fn new(permits: usize) -> Self {
333        Self {
334            inner: Semaphore::new(permits),
335        }
336    }
337}
338
339/// A resource object representing one permit acquired from a
340/// [`ScopedSemaphore`].
341///
342/// When dropped, this will return one permit to its originating semaphore.
343#[derive(Debug)]
344#[must_use = "dropping the permit will immediately release it"]
345pub struct Permit<'a> {
346    semaphore: Pin<&'a ScopedSemaphore>,
347}
348
349impl Drop for Permit<'_> {
350    fn drop(&mut self) {
351        self.semaphore.out_of_band_release(1)
352    }
353}
354
355/// Convenience macro for creating a [`ScopedSemaphore`] on the stack.
356///
357/// `create_scoped_semaphore!(ident, num_permits)` creates a semaphore that
358/// initially contains `num_permits` permits, and assigns it to a local variable
359/// called `ident`. `ident` will have the type `Pin<&ScopedSemaphore>`.
360///
361/// Yes, it's weird that this macro creates a local variable, but there's no
362/// good way around it in current Rust. You can, of course, write the
363/// initialization code yourself if you'd prefer --- see the macro source code
364/// for details.
365#[macro_export]
366macro_rules! create_scoped_semaphore {
367    ($var:ident, $permits:expr) => {
368        let $var = core::pin::pin!($crate::ScopedSemaphore::new($permits));
369        let $var = $var.into_ref();
370    };
371}