flag_bearer/lib.rs
1//! A crate for generic semaphore performing asynchronous permit acquisition.
2//!
3//! A semaphore maintains a set of permits. Permits are used to synchronize
4//! access to a shared resource. A semaphore differs from a mutex in that it
5//! can allow more than one concurrent caller to access the shared resource at a
6//! time.
7//!
8//! When `acquire` is called and the semaphore has remaining permits, the
9//! function immediately returns a permit. However, if no remaining permits are
10//! available, `acquire` (asynchronously) waits until an outstanding permit is
11//! dropped. At this point, the freed permit is assigned to the caller.
12//!
13//! This `Semaphore` is fair, and supports both FIFO and LIFO modes.
14//! * In FIFO mode, this fairness means that permits are given out in the order
15//! they were requested.
16//! * In LIFO mode, this fairness means that permits are given out in the reverse
17//! order they were requested.
18//!
19//! This fairness is also applied when `acquire` with high 'parameters' gets
20//! involved, so if a call to `acquire` at the end of the queue requests
21//! more permits than currently available, this can prevent another call to `acquire`
22//! from completing, even if the semaphore has enough permits to complete it.
23//!
24//! This semaphore is generic, which means you can customise the state.
25//! Examples:
26//! * Using two counters, you can immediately remove permits,
27//! while there are some still in flight. This might be useful
28//! if you want to remove concurrency if failures are detected.
29//! * There might be multiple quantities you want to limit over.
30//! Stacking multiple semaphores can be awkward and risk deadlocks.
31//! Instead, making the state contain all those quantities combined
32//! can simplify the queueing.
33//!
34//! ## Performance
35//!
36//! My performance testing has shown that `flag_bearer`'s [`Semaphore`] is competitive
37//! with [`tokio::sync::Semaphore`](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html).
38//!
39//! The only exception is in the case when `try_acquire` is called with no permits available,
40//! `tokio` only needs a single atomic read for this case, whereas `flag_bearer` still needs to acquire a lock.
41//! This is measurable, but mostly in the contended usecase.
42//!
43//! ## Examples
44//!
45//! ### A Semaphore like `tokio::sync::Semaphore`
46//!
47//! ```
48//! #[derive(Debug)]
49//! struct SemaphoreCounter(usize);
50//!
51//! impl flag_bearer::SemaphoreState for SemaphoreCounter {
52//! /// Number of permits to acquire
53//! type Params = usize;
54//!
55//! /// Number of permits that have been acquired
56//! type Permit = usize;
57//!
58//! fn acquire(&mut self, params: Self::Params) -> Result<Self::Permit, Self::Params> {
59//! if let Some(available) = self.0.checked_sub(params) {
60//! self.0 = available;
61//! Ok(params)
62//! } else {
63//! Err(params)
64//! }
65//! }
66//!
67//! fn release(&mut self, permit: Self::Permit) {
68//! self.0 = self.0.checked_add(permit).unwrap()
69//! }
70//! }
71//!
72//! # pollster::block_on(async {
73//! // create a new FIFO semaphore with 20 permits
74//! let semaphore = flag_bearer::new_fifo().closeable().with_state(SemaphoreCounter(20));
75//!
76//! // acquire a token
77//! let _permit = semaphore.acquire(1).await.expect("semaphore shouldn't be closed");
78//!
79//! // add 20 more permits
80//! semaphore.with_state(|s| s.0 += 20);
81//!
82//! // release a token
83//! drop(_permit);
84//!
85//! // close a semaphore
86//! semaphore.close();
87//! # })
88//! ```
89//!
90//! ### A more complex usecase with the ability to update limits on demand
91//!
92//! `tokio`'s Semaphore allows adding permits on demand, but it doesn't support removing permits.
93//! You have to `forget` the permits that are already acquired, and if you don't have those on hand,
94//! you will have to spawn a task to acquire them.
95//!
96//! With the following construction, we can define a state that allows removing permits at will.
97//!
98//! ```
99//! #[derive(Debug)]
100//! struct Utilisation {
101//! taken: usize,
102//! limit: usize
103//! }
104//!
105//! impl flag_bearer::SemaphoreState for Utilisation {
106//! type Params = ();
107//! type Permit = ();
108//!
109//! fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
110//! if self.taken < self.limit {
111//! self.taken += 1;
112//! Ok(p)
113//! } else {
114//! Err(p)
115//! }
116//! }
117//!
118//! fn release(&mut self, _: Self::Permit) {
119//! self.taken -= 1;
120//! }
121//! }
122//!
123//! impl Utilisation {
124//! pub fn new(tokens: usize) -> Self {
125//! Self { limit: tokens, taken: 0 }
126//! }
127//! pub fn add_tokens(&mut self, x: usize) {
128//! self.limit += x;
129//! }
130//! pub fn remove_tokens(&mut self, x: usize) {
131//! self.limit -= x;
132//! }
133//! }
134//!
135//! # pollster::block_on(async {
136//! // create a new FIFO semaphore with 20 tokens
137//! let semaphore = flag_bearer::new_fifo().with_state(Utilisation::new(20));
138//!
139//! // acquire a permit
140//! let _permit = semaphore.must_acquire(()).await;
141//!
142//! // remove 10 tokens
143//! semaphore.with_state(|s| s.remove_tokens(10));
144//!
145//! // release a permit
146//! drop(_permit);
147//! # })
148//! ```
149//!
150//! ### A LIFO semaphore which tracks multiple values at once
151//!
152//! Last-in, first-out is an unfair strategy of queueing, where the latest queued task
153//! will be the first one to get the permit. This might seem like a bad strategy, but
154//! if high latency is considered a failure in your applications, this can reduce
155//! the failure rate. In a FIFO setting, you might have a P50 = 50ms, P99 = 100ms.
156//! The same in a LIFO setting could be P50 = 10ms, P99 = 500ms. If 50ms half the time
157//! is too slow for your application, then switching to LIFO could help.
158//! <https://encore.dev/blog/queueing#lifo>
159//!
160//! Additionally to LIFO, Our [`SemaphoreState`] allows us to track multiple fields at once.
161//! Let's imagine we want to limit in-flight requests, as well as in-flight request body allocations.
162//! We can put the two counters in a single state object.
163//!
164//! ```
165//! #[derive(Debug)]
166//! struct Utilisation {
167//! requests: usize,
168//! bytes: u64,
169//! }
170//!
171//! struct Request {
172//! bytes: u64,
173//! }
174//!
175//! impl flag_bearer::SemaphoreState for Utilisation {
176//! type Params = Request;
177//! type Permit = Request;
178//!
179//! fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
180//! if self.requests >= 1 && self.bytes >= p.bytes {
181//! self.requests -= 1;
182//! self.bytes -= p.bytes;
183//! Ok(p)
184//! } else {
185//! Err(p)
186//! }
187//! }
188//!
189//! fn release(&mut self, p: Self::Permit) {
190//! self.requests += 1;
191//! self.bytes += p.bytes;
192//! }
193//! }
194//!
195//! # pollster::block_on(async {
196//! // create a new LIFO semaphore with support for 1 MB and 20 requests
197//! let semaphore = flag_bearer::new_lifo().with_state(Utilisation {
198//! requests: 20,
199//! bytes: 1024 * 1024,
200//! });
201//!
202//! // acquire a permit for a request with 64KB
203//! let _permit = semaphore.must_acquire(Request { bytes: 64 * 1024 }).await;
204//! # })
205//! ```
206//!
207//! ### A connection pool
208//!
209//! A connection pool can work quite like a semaphore sometimes. There's a limited number of connections
210//! and you don't want too many at one time. Our [`SemaphoreState::Permit`]s don't need to be Plain-Old-Data,
211//! so we can use them to hold connection objects too.
212//!
213//! This example still allows creating new connections on demand, if they are needed in high load cases, as well as re-creating
214//! connections if they fail.
215//!
216//! ```
217//! #[derive(Debug)]
218//! struct Connection {
219//! // ...
220//! }
221//!
222//! impl Connection {
223//! /// creates a new conn
224//! pub fn new() -> Self { Self {} }
225//! /// checks the connection liveness
226//! pub async fn check(&mut self) -> bool { true }
227//! /// do something
228//! pub async fn query(&mut self) {}
229//! }
230//!
231//! #[derive(Debug, Default)]
232//! struct Pool {
233//! conns: Vec<Connection>,
234//! }
235//!
236//! impl flag_bearer::SemaphoreState for Pool {
237//! type Params = ();
238//! type Permit = Connection;
239//!
240//! fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
241//! self.conns.pop().ok_or(p)
242//! }
243//!
244//! fn release(&mut self, p: Self::Permit) {
245//! self.conns.push(p);
246//! }
247//! }
248//!
249//! # async fn timeout<F: std::future::Future>(_f: F, _duration: std::time::Duration) -> Result<F::Output, ()> { Err(()) }
250//! async fn acquire_conn(s: &flag_bearer::Semaphore<Pool>) -> flag_bearer::Permit<'_, Pool> {
251//! let d = std::time::Duration::from_millis(200);
252//! if let Ok(mut permit) = timeout(s.must_acquire(()), d).await {
253//! if permit.check().await {
254//! // We acquired a permit, and the liveness check succeeded.
255//! // Return the permit.
256//! return permit;
257//! }
258//!
259//! // do not return this connection to the semaphore, as it is broken.
260//! flag_bearer::Permit::take(permit);
261//! }
262//!
263//! // There was a timeout, or the connection liveness check failed.
264//! // Create a new connection and permit.
265//! let c = Connection::new();
266//! flag_bearer::Permit::out_of_thin_air(&s, c)
267//! }
268//!
269//! # pollster::block_on(async {
270//! // Create a new LIFO connection pool.
271//! // We choose LIFO here because we create new connections on timeout, and LIFO is
272//! // more likely to have timeouts but on fewer tasks, which ends up improving our
273//! // performance.
274//! let semaphore = flag_bearer::new_lifo().with_state(Pool::default());
275//!
276//! let mut conn = acquire_conn(&semaphore).await;
277//!
278//! // access the inner conn
279//! conn.query().await;
280//! # })
281//! ```
282#![warn(
283 unsafe_op_in_unsafe_fn,
284 clippy::missing_safety_doc,
285 clippy::multiple_unsafe_ops_per_block,
286 clippy::undocumented_unsafe_blocks,
287 clippy::doc_markdown,
288 missing_docs
289)]
290#![deny(unsafe_code)]
291
292use core::marker::PhantomData;
293
294mod drop_wrapper;
295mod permit;
296
297pub use flag_bearer_core::SemaphoreState;
298
299use flag_bearer_queue::{
300 SemaphoreQueue,
301 acquire::{FairOrder, Fairness},
302};
303
304pub use flag_bearer_queue::acquire::AcquireError;
305/// The error returned by [`try_acquire`](Semaphore::try_acquire)
306///
307/// ### `NoPermits`
308///
309/// ```
310/// struct Counter(usize);
311///
312/// impl flag_bearer::SemaphoreState for Counter {
313/// type Params = ();
314/// type Permit = ();
315///
316/// fn acquire(&mut self, _: Self::Params) -> Result<Self::Permit, Self::Params> {
317/// if self.0 > 0 {
318/// self.0 -= 1;
319/// Ok(())
320/// } else {
321/// Err(())
322/// }
323/// }
324///
325/// fn release(&mut self, _: Self::Permit) {
326/// self.0 += 1;
327/// }
328/// }
329///
330/// let s = flag_bearer::new_fifo().with_state(Counter(0));
331///
332/// match s.try_acquire(()) {
333/// Err(flag_bearer::TryAcquireError::NoPermits(_)) => {},
334/// _ => unreachable!(),
335/// }
336/// ```
337///
338/// ### Closed
339///
340/// ```
341/// struct Counter(usize);
342///
343/// impl flag_bearer::SemaphoreState for Counter {
344/// type Params = ();
345/// type Permit = ();
346///
347/// fn acquire(&mut self, _: Self::Params) -> Result<Self::Permit, Self::Params> {
348/// if self.0 > 0 {
349/// self.0 -= 1;
350/// Ok(())
351/// } else {
352/// Err(())
353/// }
354/// }
355///
356/// fn release(&mut self, _: Self::Permit) {
357/// self.0 += 1;
358/// }
359/// }
360///
361/// let s = flag_bearer::new_fifo().closeable().with_state(Counter(1));
362///
363/// // closing the semaphore makes all current and new acquire() calls return an error.
364/// s.close();
365///
366/// match s.try_acquire(()) {
367/// Err(flag_bearer::TryAcquireError::Closed(_)) => {},
368/// _ => unreachable!(),
369/// }
370/// ```
371pub use flag_bearer_queue::acquire::TryAcquireError;
372pub use flag_bearer_queue::closeable::{Closeable, IsCloseable, Uncloseable};
373
374pub use permit::{OwnedPermit, Permit};
375
376use flag_bearer_mutex::RawMutex as DefaultRawMutex;
377use flag_bearer_mutex::lock_api::Mutex;
378
379/// Create a new first-in-first-out semaphore builder
380#[must_use]
381pub fn new_fifo() -> Builder {
382 Builder::fifo()
383}
384
385/// Create a new last-in-first-out semaphore builder
386#[must_use]
387pub fn new_lifo() -> Builder {
388 Builder::lifo()
389}
390
391/// A Builder for [`Semaphore`]s.
392pub struct Builder<C = Uncloseable>
393where
394 C: IsCloseable,
395{
396 order: FairOrder,
397 closeable: PhantomData<C>,
398}
399
400impl Builder {
401 /// Create a new first-in-first-out semaphore builder
402 #[must_use]
403 pub fn fifo() -> Self {
404 Self {
405 order: FairOrder::Fifo,
406 closeable: PhantomData,
407 }
408 }
409
410 /// Create a new last-in-first-out semaphore builder
411 #[must_use]
412 pub fn lifo() -> Self {
413 Self {
414 order: FairOrder::Lifo,
415 closeable: PhantomData,
416 }
417 }
418
419 /// The semaphore this builder constructs will be closeable.
420 #[must_use]
421 pub fn closeable(self) -> Builder<Closeable> {
422 Builder {
423 order: self.order,
424 closeable: PhantomData,
425 }
426 }
427}
428
429impl<C> Builder<C>
430where
431 C: IsCloseable,
432{
433 /// Build the [`Semaphore`] with the provided initial state.
434 #[must_use]
435 pub fn with_state<S: SemaphoreState>(self, state: S) -> Semaphore<S, C> {
436 Semaphore::new_inner(state, self.order)
437 }
438}
439
440/// Generic semaphore performing asynchronous permit acquisition.
441///
442/// See the [top level docs](crate) for information about semaphores.
443///
444/// See [`Builder`] for methods to construct a [`Semaphore`].
445///
446/// The generics on this semaphore are as follows:
447/// * `S`: The [`SemaphoreState`] value that this semaphore is backed by.
448/// * `C`: A type-safe configuration value for whether [`Semaphore::close`] can be called, and whether [`Semaphore::acquire`] can fail.
449pub struct Semaphore<S, C = Uncloseable>
450where
451 S: SemaphoreState + ?Sized,
452 C: IsCloseable,
453{
454 order: FairOrder,
455 state: Mutex<DefaultRawMutex, SemaphoreQueue<S, C>>,
456}
457
458impl<S: SemaphoreState, C: IsCloseable> Semaphore<S, C> {
459 fn new_inner(state: S, order: FairOrder) -> Self {
460 let state = SemaphoreQueue::new(state);
461 Self {
462 state: Mutex::new(state),
463 order,
464 }
465 }
466}
467
468impl<S: SemaphoreState + core::fmt::Debug> core::fmt::Debug for Semaphore<S> {
469 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
470 let mut d = f.debug_struct("Semaphore");
471 d.field("order", &self.order);
472 match self.state.try_lock() {
473 Some(guard) => {
474 d.field("queue", &*guard);
475 }
476 None => {
477 d.field("queue", &format_args!("<locked>"));
478 }
479 }
480 d.finish()
481 }
482}
483
484impl<S> Semaphore<S, Uncloseable>
485where
486 S: SemaphoreState + ?Sized,
487{
488 /// Acquire a new permit fairly with the given parameters.
489 ///
490 /// If a permit is not immediately available, this task will
491 /// join a queue.
492 pub async fn must_acquire(&self, params: S::Params) -> Permit<'_, S, Uncloseable> {
493 self.acquire(params).await.unwrap_or_else(|e| e.never())
494 }
495}
496
497impl<S, C> Semaphore<S, C>
498where
499 S: SemaphoreState + ?Sized,
500 C: IsCloseable,
501{
502 /// Acquire a new permit fairly with the given parameters.
503 ///
504 /// If a permit is not immediately available, this task will
505 /// join a queue.
506 ///
507 /// # Errors
508 ///
509 /// If this semaphore [`is_closed`](Semaphore::is_closed), then an [`AcquireError`] is returned.
510 pub async fn acquire(
511 &self,
512 params: S::Params,
513 ) -> Result<Permit<'_, S, C>, C::AcquireError<S::Params>> {
514 let acquire = flag_bearer_queue::SemaphoreQueue::acquire(&self.state, params, self.order);
515 Ok(Permit::out_of_thin_air(self, acquire.await?))
516 }
517
518 /// Acquire a new permit fairly with the given parameters.
519 ///
520 /// If this is a LIFO semaphore, and there are other tasks waiting for permits,
521 /// this will still try to acquire the permit - as this task would effectively
522 /// be the last in the queue.
523 ///
524 /// # Errors
525 ///
526 /// If there are currently not enough permits available for the given request,
527 /// then [`TryAcquireError::NoPermits`] is returned.
528 ///
529 /// If this is a FIFO semaphore, and there are other tasks waiting for permits,
530 /// then [`TryAcquireError::NoPermits`] is returned.
531 ///
532 /// If this semaphore [`is_closed`](Semaphore::is_closed), then [`TryAcquireError::Closed`] is returned.
533 pub fn try_acquire(
534 &self,
535 params: S::Params,
536 ) -> Result<Permit<'_, S, C>, TryAcquireError<S::Params, C>> {
537 let mut state = self.state.lock();
538 let permit = state.try_acquire(params, Fairness::Fair(self.order))?;
539 Ok(Permit::out_of_thin_air(self, permit))
540 }
541
542 /// Acquire a new permit, potentially unfairly, with the given parameters.
543 ///
544 /// If this is a FIFO semaphore, and there are other tasks waiting for permits,
545 /// this will still try to acquire the permit.
546 ///
547 /// # Errors
548 ///
549 /// If there are currently not enough permits available for the given request,
550 /// then [`TryAcquireError::NoPermits`] is returned.
551 ///
552 /// If this semaphore [`is_closed`](Semaphore::is_closed), then [`TryAcquireError::Closed`] is returned.
553 pub fn try_acquire_unfair(
554 &self,
555 params: S::Params,
556 ) -> Result<Permit<'_, S, C>, TryAcquireError<S::Params, C>> {
557 let mut state = self.state.lock();
558 let permit = state.try_acquire(params, Fairness::Unfair)?;
559 Ok(Permit::out_of_thin_air(self, permit))
560 }
561}
562
563impl<S, C> Semaphore<S, C>
564where
565 S: SemaphoreState + ?Sized,
566 C: IsCloseable,
567{
568 /// Access the state with mutable access.
569 ///
570 /// This gives direct access to the state, be careful not to
571 /// break any of your own state invariants. You can use this
572 /// to peek at the current state, or to modify it, eg to add or
573 /// remove permits from the semaphore.
574 pub fn with_state<T>(&self, f: impl FnOnce(&mut S) -> T) -> T {
575 self.state.lock().with_state(f)
576 }
577
578 /// Check if the semaphore is closed
579 pub fn is_closed(&self) -> bool {
580 C::CLOSE && self.state.lock().is_closed()
581 }
582}
583
584impl<S> Semaphore<S, Closeable>
585where
586 S: SemaphoreState + ?Sized,
587{
588 /// Close the semaphore.
589 ///
590 /// All tasks currently waiting to acquire a token will immediately stop.
591 /// No new acquire attempts will succeed.
592 pub fn close(&self) {
593 self.state.lock().close();
594 }
595}
596
597#[cfg(test)]
598mod test {
599 #[derive(Debug)]
600 struct Dummy;
601
602 impl crate::SemaphoreState for Dummy {
603 type Params = ();
604 type Permit = ();
605
606 fn acquire(&mut self, _params: Self::Params) -> Result<Self::Permit, Self::Params> {
607 Ok(())
608 }
609
610 fn release(&mut self, _permit: Self::Permit) {}
611 }
612
613 #[test]
614 fn debug() {
615 let s = crate::new_fifo().with_state(Dummy);
616 let s = std::format!("{s:?}");
617 assert_eq!(
618 s,
619 "Semaphore { order: Fifo, queue: SemaphoreQueue { state: Dummy, .. } }"
620 );
621 }
622
623 #[test]
624 fn is_closed() {
625 let s = crate::new_fifo().with_state(Dummy);
626 assert!(!s.is_closed());
627
628 let s = crate::new_fifo().closeable().with_state(Dummy);
629 assert!(!s.is_closed());
630 s.close();
631 assert!(s.is_closed());
632 }
633}