flag_bearer/
lib.rs

1//! A crate for generic semaphore performing asynchronous permit acquisition.
2//!
3//! A semaphore maintains a set of permits. Permits are used to synchronize
4//! access to a shared resource. A semaphore differs from a mutex in that it
5//! can allow more than one concurrent caller to access the shared resource at a
6//! time.
7//!
8//! When `acquire` is called and the semaphore has remaining permits, the
9//! function immediately returns a permit. However, if no remaining permits are
10//! available, `acquire` (asynchronously) waits until an outstanding permit is
11//! dropped. At this point, the freed permit is assigned to the caller.
12//!
13//! This `Semaphore` is fair, and supports both FIFO and LIFO modes.
14//! * In FIFO mode, this fairness means that permits are given out in the order
15//!   they were requested.
16//! * In LIFO mode, this fairness means that permits are given out in the reverse
17//!   order they were requested.
18//!
19//! This fairness is also applied when `acquire` with high 'parameters' gets
20//! involved, so if a call to `acquire` at the end of the queue requests
21//! more permits than currently available, this can prevent another call to `acquire`
22//! from completing, even if the semaphore has enough permits to complete it.
23//!
24//! This semaphore is generic, which means you can customise the state.
25//! Examples:
26//! * Using two counters, you can immediately remove permits,
27//!   while there are some still in flight. This might be useful
28//!   if you want to remove concurrency if failures are detected.
29//! * There might be multiple quantities you want to limit over.
30//!   Stacking multiple semaphores can be awkward and risk deadlocks.
31//!   Instead, making the state contain all those quantities combined
32//!   can simplify the queueing.
33//!
34//! ## Performance
35//!
36//! My performance testing has shown that `flag_bearer`'s [`Semaphore`] is competitive
37//! with [`tokio::sync::Semaphore`](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html).
38//!
39//! The only exception is in the case when `try_acquire` is called with no permits available,
40//! `tokio` only needs a single atomic read for this case, whereas `flag_bearer` still needs to acquire a lock.
41//! This is measurable, but mostly in the contended usecase.
42//!
43//! ## Examples
44//!
45//! ### A Semaphore like `tokio::sync::Semaphore`
46//!
47//! ```
48//! #[derive(Debug)]
49//! struct SemaphoreCounter(usize);
50//!
51//! impl flag_bearer::SemaphoreState for SemaphoreCounter {
52//!     /// Number of permits to acquire
53//!     type Params = usize;
54//!
55//!     /// Number of permits that have been acquired
56//!     type Permit = usize;
57//!
58//!     fn acquire(&mut self, params: Self::Params) -> Result<Self::Permit, Self::Params> {
59//!         if let Some(available) = self.0.checked_sub(params) {
60//!             self.0 = available;
61//!             Ok(params)
62//!         } else {
63//!             Err(params)
64//!         }
65//!     }
66//!
67//!     fn release(&mut self, permit: Self::Permit) {
68//!         self.0 = self.0.checked_add(permit).unwrap()
69//!     }
70//! }
71//!
72//! # pollster::block_on(async {
73//! // create a new FIFO semaphore with 20 permits
74//! let semaphore = flag_bearer::new_fifo().closeable().with_state(SemaphoreCounter(20));
75//!
76//! // acquire a token
77//! let _permit = semaphore.acquire(1).await.expect("semaphore shouldn't be closed");
78//!
79//! // add 20 more permits
80//! semaphore.with_state(|s| s.0 += 20);
81//!
82//! // release a token
83//! drop(_permit);
84//!
85//! // close a semaphore
86//! semaphore.close();
87//! # })
88//! ```
89//!
90//! ### A more complex usecase with the ability to update limits on demand
91//!
92//! `tokio`'s Semaphore allows adding permits on demand, but it doesn't support removing permits.
93//! You have to `forget` the permits that are already acquired, and if you don't have those on hand,
94//! you will have to spawn a task to acquire them.
95//!
96//! With the following construction, we can define a state that allows removing permits at will.
97//!
98//! ```
99//! #[derive(Debug)]
100//! struct Utilisation {
101//!     taken: usize,
102//!     limit: usize
103//! }
104//!
105//! impl flag_bearer::SemaphoreState for Utilisation {
106//!     type Params = ();
107//!     type Permit = ();
108//!
109//!     fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
110//!         if self.taken < self.limit {
111//!             self.taken += 1;
112//!             Ok(p)
113//!         } else {
114//!             Err(p)
115//!         }
116//!     }
117//!
118//!     fn release(&mut self, _: Self::Permit) {
119//!         self.taken -= 1;
120//!     }
121//! }
122//!
123//! impl Utilisation {
124//!     pub fn new(tokens: usize) -> Self {
125//!         Self { limit: tokens, taken: 0 }
126//!     }
127//!     pub fn add_tokens(&mut self, x: usize) {
128//!         self.limit += x;
129//!     }
130//!     pub fn remove_tokens(&mut self, x: usize) {
131//!         self.limit -= x;
132//!     }
133//! }
134//!
135//! # pollster::block_on(async {
136//! // create a new FIFO semaphore with 20 tokens
137//! let semaphore = flag_bearer::new_fifo().with_state(Utilisation::new(20));
138//!
139//! // acquire a permit
140//! let _permit = semaphore.must_acquire(()).await;
141//!
142//! // remove 10 tokens
143//! semaphore.with_state(|s| s.remove_tokens(10));
144//!
145//! // release a permit
146//! drop(_permit);
147//! # })
148//! ```
149//!
150//! ### A LIFO semaphore which tracks multiple values at once
151//!
152//! Last-in, first-out is an unfair strategy of queueing, where the latest queued task
153//! will be the first one to get the permit. This might seem like a bad strategy, but
154//! if high latency is considered a failure in your applications, this can reduce
155//! the failure rate. In a FIFO setting, you might have a P50 = 50ms, P99 = 100ms.
156//! The same in a LIFO setting could be P50 = 10ms, P99 = 500ms. If 50ms half the time
157//! is too slow for your application, then switching to LIFO could help.
158//! <https://encore.dev/blog/queueing#lifo>
159//!
160//! Additionally to LIFO, Our [`SemaphoreState`] allows us to track multiple fields at once.
161//! Let's imagine we want to limit in-flight requests, as well as in-flight request body allocations.
162//! We can put the two counters in a single state object.
163//!
164//! ```
165//! #[derive(Debug)]
166//! struct Utilisation {
167//!     requests: usize,
168//!     bytes: u64,
169//! }
170//!
171//! struct Request {
172//!     bytes: u64,
173//! }
174//!
175//! impl flag_bearer::SemaphoreState for Utilisation {
176//!     type Params = Request;
177//!     type Permit = Request;
178//!
179//!     fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
180//!         if self.requests >= 1 && self.bytes >= p.bytes {
181//!             self.requests -= 1;
182//!             self.bytes -= p.bytes;
183//!             Ok(p)
184//!         } else {
185//!             Err(p)
186//!         }
187//!     }
188//!
189//!     fn release(&mut self, p: Self::Permit) {
190//!         self.requests += 1;
191//!         self.bytes += p.bytes;
192//!     }
193//! }
194//!
195//! # pollster::block_on(async {
196//! // create a new LIFO semaphore with support for 1 MB and 20 requests
197//! let semaphore = flag_bearer::new_lifo().with_state(Utilisation {
198//!     requests: 20,
199//!     bytes: 1024 * 1024,
200//! });
201//!
202//! // acquire a permit for a request with 64KB
203//! let _permit = semaphore.must_acquire(Request { bytes: 64 * 1024 }).await;
204//! # })
205//! ```
206//!
207//! ### A connection pool
208//!
209//! A connection pool can work quite like a semaphore sometimes. There's a limited number of connections
210//! and you don't want too many at one time. Our [`SemaphoreState::Permit`]s don't need to be Plain-Old-Data,
211//! so we can use them to hold connection objects too.
212//!
213//! This example still allows creating new connections on demand, if they are needed in high load cases, as well as re-creating
214//! connections if they fail.
215//!
216//! ```
217//! #[derive(Debug)]
218//! struct Connection {
219//!     // ...
220//! }
221//!
222//! impl Connection {
223//!     /// creates a new conn
224//!     pub fn new() -> Self { Self {} }
225//!     /// checks the connection liveness
226//!     pub async fn check(&mut self) -> bool { true }
227//!     /// do something
228//!     pub async fn query(&mut self) {}
229//! }
230//!
231//! #[derive(Debug, Default)]
232//! struct Pool {
233//!     conns: Vec<Connection>,
234//! }
235//!
236//! impl flag_bearer::SemaphoreState for Pool {
237//!     type Params = ();
238//!     type Permit = Connection;
239//!
240//!     fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
241//!         self.conns.pop().ok_or(p)
242//!     }
243//!
244//!     fn release(&mut self, p: Self::Permit) {
245//!         self.conns.push(p);
246//!     }
247//! }
248//!
249//! # async fn timeout<F: std::future::Future>(_f: F, _duration: std::time::Duration) -> Result<F::Output, ()> { Err(()) }
250//! async fn acquire_conn(s: &flag_bearer::Semaphore<Pool>) -> flag_bearer::Permit<'_, Pool> {
251//!     let d = std::time::Duration::from_millis(200);
252//!     if let Ok(mut permit) = timeout(s.must_acquire(()), d).await {
253//!         if permit.check().await {
254//!             // We acquired a permit, and the liveness check succeeded.
255//!             // Return the permit.
256//!             return permit;
257//!         }
258//!
259//!         // do not return this connection to the semaphore, as it is broken.
260//!         flag_bearer::Permit::take(permit);
261//!     }
262//!
263//!     // There was a timeout, or the connection liveness check failed.
264//!     // Create a new connection and permit.
265//!     let c = Connection::new();
266//!     flag_bearer::Permit::out_of_thin_air(&s, c)
267//! }
268//!
269//! # pollster::block_on(async {
270//! // Create a new LIFO connection pool.
271//! // We choose LIFO here because we create new connections on timeout, and LIFO is
272//! // more likely to have timeouts but on fewer tasks, which ends up improving our
273//! // performance.
274//! let semaphore = flag_bearer::new_lifo().with_state(Pool::default());
275//!
276//! let mut conn = acquire_conn(&semaphore).await;
277//!
278//! // access the inner conn
279//! conn.query().await;
280//! # })
281//! ```
282#![warn(
283    unsafe_op_in_unsafe_fn,
284    clippy::missing_safety_doc,
285    clippy::multiple_unsafe_ops_per_block,
286    clippy::undocumented_unsafe_blocks,
287    clippy::doc_markdown,
288    missing_docs
289)]
290#![deny(unsafe_code)]
291
292use core::marker::PhantomData;
293
294mod drop_wrapper;
295mod permit;
296
297pub use flag_bearer_core::SemaphoreState;
298
299use flag_bearer_queue::{
300    SemaphoreQueue,
301    acquire::{FairOrder, Fairness},
302};
303
304pub use flag_bearer_queue::acquire::AcquireError;
305/// The error returned by [`try_acquire`](Semaphore::try_acquire)
306///
307/// ### `NoPermits`
308///
309/// ```
310/// struct Counter(usize);
311///
312/// impl flag_bearer::SemaphoreState for Counter {
313///     type Params = ();
314///     type Permit = ();
315///
316///     fn acquire(&mut self, _: Self::Params) -> Result<Self::Permit, Self::Params> {
317///         if self.0 > 0 {
318///             self.0 -= 1;
319///             Ok(())
320///         } else {
321///             Err(())
322///         }
323///     }
324///
325///     fn release(&mut self, _: Self::Permit) {
326///         self.0 += 1;
327///     }
328/// }
329///
330/// let s = flag_bearer::new_fifo().with_state(Counter(0));
331///
332/// match s.try_acquire(()) {
333///     Err(flag_bearer::TryAcquireError::NoPermits(_)) => {},
334///     _ => unreachable!(),
335/// }
336/// ```
337///
338/// ### Closed
339///
340/// ```
341/// struct Counter(usize);
342///
343/// impl flag_bearer::SemaphoreState for Counter {
344///     type Params = ();
345///     type Permit = ();
346///
347///     fn acquire(&mut self, _: Self::Params) -> Result<Self::Permit, Self::Params> {
348///         if self.0 > 0 {
349///             self.0 -= 1;
350///             Ok(())
351///         } else {
352///             Err(())
353///         }
354///     }
355///
356///     fn release(&mut self, _: Self::Permit) {
357///         self.0 += 1;
358///     }
359/// }
360///
361/// let s = flag_bearer::new_fifo().closeable().with_state(Counter(1));
362///
363/// // closing the semaphore makes all current and new acquire() calls return an error.
364/// s.close();
365///
366/// match s.try_acquire(()) {
367///     Err(flag_bearer::TryAcquireError::Closed(_)) => {},
368///     _ => unreachable!(),
369/// }
370/// ```
371pub use flag_bearer_queue::acquire::TryAcquireError;
372pub use flag_bearer_queue::closeable::{Closeable, IsCloseable, Uncloseable};
373
374pub use permit::{OwnedPermit, Permit};
375
376use flag_bearer_mutex::RawMutex as DefaultRawMutex;
377use flag_bearer_mutex::lock_api::Mutex;
378
379/// Create a new first-in-first-out semaphore builder
380#[must_use]
381pub fn new_fifo() -> Builder {
382    Builder::fifo()
383}
384
385/// Create a new last-in-first-out semaphore builder
386#[must_use]
387pub fn new_lifo() -> Builder {
388    Builder::lifo()
389}
390
391/// A Builder for [`Semaphore`]s.
392pub struct Builder<C = Uncloseable>
393where
394    C: IsCloseable,
395{
396    order: FairOrder,
397    closeable: PhantomData<C>,
398}
399
400impl Builder {
401    /// Create a new first-in-first-out semaphore builder
402    #[must_use]
403    pub fn fifo() -> Self {
404        Self {
405            order: FairOrder::Fifo,
406            closeable: PhantomData,
407        }
408    }
409
410    /// Create a new last-in-first-out semaphore builder
411    #[must_use]
412    pub fn lifo() -> Self {
413        Self {
414            order: FairOrder::Lifo,
415            closeable: PhantomData,
416        }
417    }
418
419    /// The semaphore this builder constructs will be closeable.
420    #[must_use]
421    pub fn closeable(self) -> Builder<Closeable> {
422        Builder {
423            order: self.order,
424            closeable: PhantomData,
425        }
426    }
427}
428
429impl<C> Builder<C>
430where
431    C: IsCloseable,
432{
433    /// Build the [`Semaphore`] with the provided initial state.
434    #[must_use]
435    pub fn with_state<S: SemaphoreState>(self, state: S) -> Semaphore<S, C> {
436        Semaphore::new_inner(state, self.order)
437    }
438}
439
440/// Generic semaphore performing asynchronous permit acquisition.
441///
442/// See the [top level docs](crate) for information about semaphores.
443///
444/// See [`Builder`] for methods to construct a [`Semaphore`].
445///
446/// The generics on this semaphore are as follows:
447/// * `S`: The [`SemaphoreState`] value that this semaphore is backed by.
448/// * `C`: A type-safe configuration value for whether [`Semaphore::close`] can be called, and whether [`Semaphore::acquire`] can fail.
449pub struct Semaphore<S, C = Uncloseable>
450where
451    S: SemaphoreState + ?Sized,
452    C: IsCloseable,
453{
454    order: FairOrder,
455    state: Mutex<DefaultRawMutex, SemaphoreQueue<S, C>>,
456}
457
458impl<S: SemaphoreState, C: IsCloseable> Semaphore<S, C> {
459    fn new_inner(state: S, order: FairOrder) -> Self {
460        let state = SemaphoreQueue::new(state);
461        Self {
462            state: Mutex::new(state),
463            order,
464        }
465    }
466}
467
468impl<S: SemaphoreState + core::fmt::Debug> core::fmt::Debug for Semaphore<S> {
469    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
470        let mut d = f.debug_struct("Semaphore");
471        d.field("order", &self.order);
472        match self.state.try_lock() {
473            Some(guard) => {
474                d.field("queue", &*guard);
475            }
476            None => {
477                d.field("queue", &format_args!("<locked>"));
478            }
479        }
480        d.finish()
481    }
482}
483
484impl<S> Semaphore<S, Uncloseable>
485where
486    S: SemaphoreState + ?Sized,
487{
488    /// Acquire a new permit fairly with the given parameters.
489    ///
490    /// If a permit is not immediately available, this task will
491    /// join a queue.
492    pub async fn must_acquire(&self, params: S::Params) -> Permit<'_, S, Uncloseable> {
493        self.acquire(params).await.unwrap_or_else(|e| e.never())
494    }
495}
496
497impl<S, C> Semaphore<S, C>
498where
499    S: SemaphoreState + ?Sized,
500    C: IsCloseable,
501{
502    /// Acquire a new permit fairly with the given parameters.
503    ///
504    /// If a permit is not immediately available, this task will
505    /// join a queue.
506    ///
507    /// # Errors
508    ///
509    /// If this semaphore [`is_closed`](Semaphore::is_closed), then an [`AcquireError`] is returned.
510    pub async fn acquire(
511        &self,
512        params: S::Params,
513    ) -> Result<Permit<'_, S, C>, C::AcquireError<S::Params>> {
514        let acquire = flag_bearer_queue::SemaphoreQueue::acquire(&self.state, params, self.order);
515        Ok(Permit::out_of_thin_air(self, acquire.await?))
516    }
517
518    /// Acquire a new permit fairly with the given parameters.
519    ///
520    /// If this is a LIFO semaphore, and there are other tasks waiting for permits,
521    /// this will still try to acquire the permit - as this task would effectively
522    /// be the last in the queue.
523    ///
524    /// # Errors
525    ///
526    /// If there are currently not enough permits available for the given request,
527    /// then [`TryAcquireError::NoPermits`] is returned.
528    ///
529    /// If this is a FIFO semaphore, and there are other tasks waiting for permits,
530    /// then [`TryAcquireError::NoPermits`] is returned.
531    ///
532    /// If this semaphore [`is_closed`](Semaphore::is_closed), then [`TryAcquireError::Closed`] is returned.
533    pub fn try_acquire(
534        &self,
535        params: S::Params,
536    ) -> Result<Permit<'_, S, C>, TryAcquireError<S::Params, C>> {
537        let mut state = self.state.lock();
538        let permit = state.try_acquire(params, Fairness::Fair(self.order))?;
539        Ok(Permit::out_of_thin_air(self, permit))
540    }
541
542    /// Acquire a new permit, potentially unfairly, with the given parameters.
543    ///
544    /// If this is a FIFO semaphore, and there are other tasks waiting for permits,
545    /// this will still try to acquire the permit.
546    ///
547    /// # Errors
548    ///
549    /// If there are currently not enough permits available for the given request,
550    /// then [`TryAcquireError::NoPermits`] is returned.
551    ///
552    /// If this semaphore [`is_closed`](Semaphore::is_closed), then [`TryAcquireError::Closed`] is returned.
553    pub fn try_acquire_unfair(
554        &self,
555        params: S::Params,
556    ) -> Result<Permit<'_, S, C>, TryAcquireError<S::Params, C>> {
557        let mut state = self.state.lock();
558        let permit = state.try_acquire(params, Fairness::Unfair)?;
559        Ok(Permit::out_of_thin_air(self, permit))
560    }
561}
562
563impl<S, C> Semaphore<S, C>
564where
565    S: SemaphoreState + ?Sized,
566    C: IsCloseable,
567{
568    /// Access the state with mutable access.
569    ///
570    /// This gives direct access to the state, be careful not to
571    /// break any of your own state invariants. You can use this
572    /// to peek at the current state, or to modify it, eg to add or
573    /// remove permits from the semaphore.
574    pub fn with_state<T>(&self, f: impl FnOnce(&mut S) -> T) -> T {
575        self.state.lock().with_state(f)
576    }
577
578    /// Check if the semaphore is closed
579    pub fn is_closed(&self) -> bool {
580        C::CLOSE && self.state.lock().is_closed()
581    }
582}
583
584impl<S> Semaphore<S, Closeable>
585where
586    S: SemaphoreState + ?Sized,
587{
588    /// Close the semaphore.
589    ///
590    /// All tasks currently waiting to acquire a token will immediately stop.
591    /// No new acquire attempts will succeed.
592    pub fn close(&self) {
593        self.state.lock().close();
594    }
595}
596
597#[cfg(test)]
598mod test {
599    #[derive(Debug)]
600    struct Dummy;
601
602    impl crate::SemaphoreState for Dummy {
603        type Params = ();
604        type Permit = ();
605
606        fn acquire(&mut self, _params: Self::Params) -> Result<Self::Permit, Self::Params> {
607            Ok(())
608        }
609
610        fn release(&mut self, _permit: Self::Permit) {}
611    }
612
613    #[test]
614    fn debug() {
615        let s = crate::new_fifo().with_state(Dummy);
616        let s = std::format!("{s:?}");
617        assert_eq!(
618            s,
619            "Semaphore { order: Fifo, queue: SemaphoreQueue { state: Dummy, .. } }"
620        );
621    }
622
623    #[test]
624    fn is_closed() {
625        let s = crate::new_fifo().with_state(Dummy);
626        assert!(!s.is_closed());
627
628        let s = crate::new_fifo().closeable().with_state(Dummy);
629        assert!(!s.is_closed());
630        s.close();
631        assert!(s.is_closed());
632    }
633}