lilos_semaphore/lib.rs
1//! A [counting semaphore] for use with [`lilos`].
2//!
3//! See the docs on [`Semaphore`] for more details.
4//!
5//! [`lilos`]: https://docs.rs/lilos/
6//! [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming)
7
8#![no_std]
9#![warn(
10 elided_lifetimes_in_paths,
11 explicit_outlives_requirements,
12 missing_debug_implementations,
13 missing_docs,
14 semicolon_in_expressions_from_macros,
15 single_use_lifetimes,
16 trivial_casts,
17 trivial_numeric_casts,
18 unreachable_pub,
19 unsafe_op_in_unsafe_fn,
20 unused_qualifications
21)]
22
23use core::pin::Pin;
24use core::sync::atomic::{AtomicUsize, Ordering};
25use lilos_list::List;
26use lilos::atomic::AtomicExt;
27use pin_project::pin_project;
28
29/// A [counting semaphore].
30///
31/// A `Semaphore` gets initialized with a certain number of _permits._
32/// Callers can take one permit from the semaphore using the
33/// [`Semaphore::acquire`] operation, which will block if there are none
34/// available, and wake when one becomes available.
35///
36/// Permits can be added back to the semaphore one at a time using the
37/// [`Semaphore::release`] operation, or in batches using
38/// [`Semaphore::release_multiple`].
39///
40/// Semaphores are useful for restricting concurrent access to something,
41/// particularly in cases where you don't want to restrict it to exactly one
42/// observer (like with a `Mutex`). Two common use cases are:
43///
44/// - To ensure that no more than `N` tasks can make it into a critical section
45/// simultaneously, you'd create a `Semaphore` with `N` permits. Each task would
46/// `acquire` a permit to gain entry, and then `release` it at the end. In this
47/// case, a "permit object" might be fine, because `acquire` and `release` are
48/// both being called in the same context.
49///
50/// - To represent data being generated or received (perhaps over a network),
51/// you'd create a `Semaphore` with 0 permits. Tasks interested in consuming
52/// resources would attempt to `acquire` and block; as data becomes available,
53/// the generating task would `release` permits, potentially in batches, to
54/// unblock the consumers. In this case, `release` and `acquire` are happening
55/// in totally different contexts.
56///
57/// To support _both_ these uses, the `Semaphore` API is different from a lot of
58/// Rust concurrency APIs, and does not return a "permit object" that represents
59/// a permit until dropped. If your use case is closer to the first example, and
60/// you would like the convenience of a permit object managing calls to
61/// `release` for you, have a look at [`ScopedSemaphore`], a thin wrapper that
62/// provides a [`Permit`].
63///
64///
65/// # Getting a semaphore
66///
67/// Like `lilos`'s `Mutex` type, `Semaphore` must be pinned to be useful. So
68/// generally you'll wind up writing something like this:
69///
70/// ```
71/// let scooters = pin!(Semaphore::new(5));
72/// // Drop &mut:
73/// let scooters = scooters.into_ref();
74///
75/// // Check out one scooter from the pool.
76/// scooters.acquire().await;
77/// ```
78///
79/// This crate includes a convenience macro, [`create_semaphore!`], that
80/// basically just wraps up the first two lines:
81///
82/// ```
83/// lilos_semaphore::create_semaphore!(scooters, 5);
84///
85/// // Check out one scooter from the pool.
86/// scooters.acquire().await;
87/// ```
88///
89///
90/// # Fairness
91///
92/// This semaphore implementation is _fair,_ which in this context means
93/// that permits are handed out in the order they're requested. If the
94/// semaphore runs out of permits, tasks requesting permits are queued in
95/// order and will be issued permits in order as they are returned to the
96/// semaphore.
97///
98/// [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming)
99#[derive(Debug)]
100#[pin_project]
101pub struct Semaphore {
102 available: AtomicUsize,
103 #[pin]
104 waiters: List<()>,
105}
106
107impl Semaphore {
108 /// Creates a future that will resolve when it can take a single permit from
109 /// the semaphore. Until then, the future will remain pending (i.e. block).
110 ///
111 /// # Cancellation
112 ///
113 /// Cancel-safe but affects your position in line, to maintain fairness.
114 ///
115 /// If you drop the returned future before it resolves...
116 /// - If it had not successfully acquired a permit, nothing happens.
117 /// - If it had, the permit is released.
118 ///
119 /// Dropping the future and re-calling `acquire` bumps the caller to the
120 /// back of the priority list, to maintain fairness. Otherwise, the result
121 /// is indistinguishable.
122 pub async fn acquire(self: Pin<&Self>) {
123 if self.try_acquire().is_ok() {
124 return;
125 }
126
127 // Add ourselves to the wait list...
128 self.project_ref()
129 .waiters
130 .join_with_cleanup((), || {
131 // This is called when we've been detached from the wait
132 // list, which means a permit was transferred to us, but
133 // we haven't been polled -- and won't ever be polled,
134 // for we are being dropped. This means we need to
135 // release our permit, which might wake another task.
136 self.release();
137 })
138 .await
139 }
140
141 /// Attempts to take a single permit from the semaphore, returning `Ok` if
142 /// one is available immediately, or `Err` if they are all taken.
143 pub fn try_acquire(&self) -> Result<(), NoPermits> {
144 self.available
145 .fetch_update_polyfill(Ordering::Relaxed, Ordering::Relaxed, |a| {
146 a.checked_sub(1)
147 })
148 .map_err(|_| NoPermits)?;
149 Ok(())
150 }
151
152 /// Returns the number of permits available in the semaphore.
153 ///
154 /// Note that this is a _snapshot._ If this returns 4, for instance, it
155 /// doesn't mean you can successfully call `acquire` 4 times without
156 /// blocking, because another acquirer may be racing you.
157 pub fn permits_available(&self) -> usize {
158 self.available.load(Ordering::Relaxed)
159 }
160
161 /// Stuffs one permit back into the semaphore.
162 #[inline(always)]
163 pub fn release(self: Pin<&Self>) {
164 self.release_multiple(1)
165 }
166
167 /// Stuffs one permit back into the semaphore.
168 ///
169 /// Use this if you have called [`core::mem::forget`] on a [`Permit`], when
170 /// you want to restore that permit to the semaphore. Note that this is an
171 /// unusual use case and should only be done with good reason.
172 ///
173 /// It is, however, safe, in the Rust sense.
174 ///
175 /// It's possible to use this operation to increase the total number of
176 /// permits available in the `Semaphore`. That's an even weirder use case,
177 /// so be careful.
178 pub fn release_multiple(self: Pin<&Self>, mut n: usize) {
179 debug_assert!(n > 0);
180
181 let p = self.project_ref();
182 while n > 0 {
183 if !p.waiters.wake_one() {
184 // We have exhausted the list, stop using this strategy.
185 break;
186 }
187 n -= 1;
188 }
189
190 if n > 0 {
191 // Since we're not yielding -- we're not even in an async fn! -- the
192 // only thing concurrent with us is ISRs, which can only wake tasks,
193 // not insert them.
194 //
195 // So the fact that the waiters list was found empty cannot change
196 // during this loop.
197 self.available
198 .fetch_update_polyfill(
199 Ordering::Relaxed,
200 Ordering::Relaxed,
201 // Note that this has a potential overflow on addition. This is
202 // deliberate, and is why we're not using fetch_add here!
203 |a| Some(a + n),
204 )
205 .unwrap();
206 }
207 }
208
209 /// Returns an `Semaphore` initialized with `permits` permits.
210 ///
211 /// The result needs to be pinned to be useful, so you'll usually write:
212 ///
213 /// ```
214 /// let semaphore = pin!(Semaphore::new(permit_count));
215 /// let semaphore = semaphore.into_ref();
216 /// ```
217 ///
218 /// See also the convenience macro [`create_semaphore!`].
219 pub const fn new(permits: usize) -> Self {
220 Semaphore {
221 available: AtomicUsize::new(permits),
222 waiters: List::new(),
223 }
224 }
225}
226
227/// Error produced by [`Semaphore::try_acquire`] when no permits were available.
228#[derive(Copy, Clone, Debug, Eq, PartialEq)]
229pub struct NoPermits;
230
231/// Convenience macro for creating a [`Semaphore`] on the stack.
232///
233/// `create_semaphore!(ident, num_permits)` creates a semaphore that initially
234/// contains `num_permits` permits, and assigns it to a local variable called
235/// `ident`. `ident` will have the type `Pin<&Semaphore>`.
236///
237/// Yes, it's weird that this macro creates a local variable, but there's no
238/// good way around it in current Rust. You can, of course, write the
239/// initialization code yourself if you'd prefer --- see the macro source code
240/// for details.
241#[macro_export]
242macro_rules! create_semaphore {
243 ($var:ident, $permits:expr) => {
244 let $var = core::pin::pin!($crate::Semaphore::new($permits));
245 let $var = $var.into_ref();
246 };
247}
248
249/// A [counting semaphore] that uses resource objects to manage permits,
250/// eliminating the need to explicitly call `release` in certain kinds of use
251/// cases.
252///
253/// A `ScopedSemaphore` is almost identical to a [`Semaphore`], but any time a
254/// permit is successfully acquired from a `ScopedSemaphore`, it produces a
255/// [`Permit`] object that represents the lifetime of that permit. When the
256/// `Permit` is dropped, it will be automatically returned to the
257/// `ScopedSemaphore`. This makes the API closer to a traditional Rust mutex
258/// API, but only works in cases where the permits are being acquired and
259/// released in the same context.
260///
261/// See [`Semaphore`] for background and information about fairness.
262///
263/// [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming)
264#[derive(Debug)]
265#[pin_project]
266pub struct ScopedSemaphore {
267 #[pin]
268 inner: Semaphore,
269}
270
271impl ScopedSemaphore {
272 /// Creates a future that will resolve when it can take a single [`Permit`]
273 /// from the semaphore. Until then, the future will remain pending (i.e.
274 /// block).
275 ///
276 /// # Cancellation
277 ///
278 /// Cancel-safe but affects your position in line, to maintain fairness.
279 ///
280 /// If you drop the returned future before it resolves...
281 /// - If it had not successfully acquired a permit, nothing happens.
282 /// - If it had, the permit is released.
283 ///
284 /// Dropping the future and re-calling `acquire` bumps the caller to the
285 /// back of the priority list, to maintain fairness. Otherwise, the result
286 /// is indistinguishable.
287 pub async fn acquire(self: Pin<&Self>) -> Permit<'_> {
288 self.project_ref().inner.acquire().await;
289
290 Permit { semaphore: self }
291 }
292
293 /// Attempts to take a single [`Permit`] from the semaphore, returning
294 /// `Ok(permit)` on success, or `Err` if they are all taken.
295 pub fn try_acquire(self: Pin<&Self>) -> Result<Permit<'_>, NoPermits> {
296 self.inner.try_acquire()?;
297 Ok(Permit { semaphore: self })
298 }
299
300 /// Returns the number of permits available in the semaphore.
301 ///
302 /// Note that this is a _snapshot._ If this returns 4, for instance, it
303 /// doesn't mean you can successfully call `acquire` 4 times without
304 /// blocking, because another acquirer may be racing you.
305 pub fn permits_available(&self) -> usize {
306 self.inner.permits_available()
307 }
308
309 /// Stuffs `n` permits back into the semaphore.
310 ///
311 /// This operation is useful for either increasing the number of permits
312 /// available in an existing semaphore, or restoring permits that were
313 /// hidden from the compiler's view by calling [`core::mem::forget`] on a
314 /// [`Permit`].
315 ///
316 /// If you find yourself using this operation regularly, it may be a sign
317 /// that you want a plain old [`Semaphore`] instead of a `ScopedSemaphore`.
318 pub fn out_of_band_release(self: Pin<&Self>, n: usize) {
319 self.project_ref().inner.release_multiple(n);
320 }
321
322 /// Returns a `ScopedSemaphore` that initially contains `permits` permits.
323 ///
324 /// The result needs to be pinned to be useful, so you'll usually write:
325 ///
326 /// ```
327 /// let semaphore = pin!(ScopedSemaphore::new(permit_count));
328 /// let semaphore = semaphore.into_ref();
329 /// ```
330 ///
331 /// See also the convenience macro [`create_scoped_semaphore!`].
332 pub const fn new(permits: usize) -> Self {
333 Self {
334 inner: Semaphore::new(permits),
335 }
336 }
337}
338
339/// A resource object representing one permit acquired from a
340/// [`ScopedSemaphore`].
341///
342/// When dropped, this will return one permit to its originating semaphore.
343#[derive(Debug)]
344#[must_use = "dropping the permit will immediately release it"]
345pub struct Permit<'a> {
346 semaphore: Pin<&'a ScopedSemaphore>,
347}
348
349impl Drop for Permit<'_> {
350 fn drop(&mut self) {
351 self.semaphore.out_of_band_release(1)
352 }
353}
354
355/// Convenience macro for creating a [`ScopedSemaphore`] on the stack.
356///
357/// `create_scoped_semaphore!(ident, num_permits)` creates a semaphore that
358/// initially contains `num_permits` permits, and assigns it to a local variable
359/// called `ident`. `ident` will have the type `Pin<&ScopedSemaphore>`.
360///
361/// Yes, it's weird that this macro creates a local variable, but there's no
362/// good way around it in current Rust. You can, of course, write the
363/// initialization code yourself if you'd prefer --- see the macro source code
364/// for details.
365#[macro_export]
366macro_rules! create_scoped_semaphore {
367 ($var:ident, $permits:expr) => {
368 let $var = core::pin::pin!($crate::ScopedSemaphore::new($permits));
369 let $var = $var.into_ref();
370 };
371}