Expand description
A crate for generic semaphore performing asynchronous permit acquisition.
A semaphore maintains a set of permits. Permits are used to synchronize access to a shared resource. A semaphore differs from a mutex in that it can allow more than one concurrent caller to access the shared resource at a time.
When acquire is called and the semaphore has remaining permits, the
function immediately returns a permit. However, if no remaining permits are
available, acquire (asynchronously) waits until an outstanding permit is
dropped. At this point, the freed permit is assigned to the caller.
This Semaphore is fair, and supports both FIFO and LIFO modes.
- In FIFO mode, this fairness means that permits are given out in the order they were requested.
- In LIFO mode, this fairness means that permits are given out in the reverse order they were requested.
This fairness is also applied when acquire with high ‘parameters’ gets
involved, so if a call to acquire at the end of the queue requests
more permits than currently available, this can prevent another call to acquire
from completing, even if the semaphore has enough permits to complete it.
This semaphore is generic, which means you can customise the state. Examples:
- Using two counters, you can immediately remove permits, while there are some still in flight. This might be useful if you want to remove concurrency if failures are detected.
- There might be multiple quantities you want to limit over. Stacking multiple semaphores can be awkward and risk deadlocks. Instead, making the state contain all those quantities combined can simplify the queueing.
§Performance
My performance testing has shown that flag_bearer’s Semaphore is competitive
with tokio::sync::Semaphore.
The only exception is in the case when try_acquire is called with no permits available,
tokio only needs a single atomic read for this case, whereas flag_bearer still needs to acquire a lock.
This is measurable, but mostly in the contended usecase.
§Examples
§A Semaphore like tokio::sync::Semaphore
#[derive(Debug)]
struct SemaphoreCounter(usize);
impl flag_bearer::SemaphoreState for SemaphoreCounter {
/// Number of permits to acquire
type Params = usize;
/// Number of permits that have been acquired
type Permit = usize;
fn acquire(&mut self, params: Self::Params) -> Result<Self::Permit, Self::Params> {
if let Some(available) = self.0.checked_sub(params) {
self.0 = available;
Ok(params)
} else {
Err(params)
}
}
fn release(&mut self, permit: Self::Permit) {
self.0 = self.0.checked_add(permit).unwrap()
}
}
// create a new FIFO semaphore with 20 permits
let semaphore = flag_bearer::new_fifo().closeable().with_state(SemaphoreCounter(20));
// acquire a token
let _permit = semaphore.acquire(1).await.expect("semaphore shouldn't be closed");
// add 20 more permits
semaphore.with_state(|s| s.0 += 20);
// release a token
drop(_permit);
// close a semaphore
semaphore.close();§A more complex usecase with the ability to update limits on demand
tokio’s Semaphore allows adding permits on demand, but it doesn’t support removing permits.
You have to forget the permits that are already acquired, and if you don’t have those on hand,
you will have to spawn a task to acquire them.
With the following construction, we can define a state that allows removing permits at will.
#[derive(Debug)]
struct Utilisation {
taken: usize,
limit: usize
}
impl flag_bearer::SemaphoreState for Utilisation {
type Params = ();
type Permit = ();
fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
if self.taken < self.limit {
self.taken += 1;
Ok(p)
} else {
Err(p)
}
}
fn release(&mut self, _: Self::Permit) {
self.taken -= 1;
}
}
impl Utilisation {
pub fn new(tokens: usize) -> Self {
Self { limit: tokens, taken: 0 }
}
pub fn add_tokens(&mut self, x: usize) {
self.limit += x;
}
pub fn remove_tokens(&mut self, x: usize) {
self.limit -= x;
}
}
// create a new FIFO semaphore with 20 tokens
let semaphore = flag_bearer::new_fifo().with_state(Utilisation::new(20));
// acquire a permit
let _permit = semaphore.must_acquire(()).await;
// remove 10 tokens
semaphore.with_state(|s| s.remove_tokens(10));
// release a permit
drop(_permit);§A LIFO semaphore which tracks multiple values at once
Last-in, first-out is an unfair strategy of queueing, where the latest queued task will be the first one to get the permit. This might seem like a bad strategy, but if high latency is considered a failure in your applications, this can reduce the failure rate. In a FIFO setting, you might have a P50 = 50ms, P99 = 100ms. The same in a LIFO setting could be P50 = 10ms, P99 = 500ms. If 50ms half the time is too slow for your application, then switching to LIFO could help. https://encore.dev/blog/queueing#lifo
Additionally to LIFO, Our SemaphoreState allows us to track multiple fields at once.
Let’s imagine we want to limit in-flight requests, as well as in-flight request body allocations.
We can put the two counters in a single state object.
#[derive(Debug)]
struct Utilisation {
requests: usize,
bytes: u64,
}
struct Request {
bytes: u64,
}
impl flag_bearer::SemaphoreState for Utilisation {
type Params = Request;
type Permit = Request;
fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
if self.requests >= 1 && self.bytes >= p.bytes {
self.requests -= 1;
self.bytes -= p.bytes;
Ok(p)
} else {
Err(p)
}
}
fn release(&mut self, p: Self::Permit) {
self.requests += 1;
self.bytes += p.bytes;
}
}
// create a new LIFO semaphore with support for 1 MB and 20 requests
let semaphore = flag_bearer::new_lifo().with_state(Utilisation {
requests: 20,
bytes: 1024 * 1024,
});
// acquire a permit for a request with 64KB
let _permit = semaphore.must_acquire(Request { bytes: 64 * 1024 }).await;§A connection pool
A connection pool can work quite like a semaphore sometimes. There’s a limited number of connections
and you don’t want too many at one time. Our SemaphoreState::Permits don’t need to be Plain-Old-Data,
so we can use them to hold connection objects too.
This example still allows creating new connections on demand, if they are needed in high load cases, as well as re-creating connections if they fail.
#[derive(Debug)]
struct Connection {
// ...
}
impl Connection {
/// creates a new conn
pub fn new() -> Self { Self {} }
/// checks the connection liveness
pub async fn check(&mut self) -> bool { true }
/// do something
pub async fn query(&mut self) {}
}
#[derive(Debug, Default)]
struct Pool {
conns: Vec<Connection>,
}
impl flag_bearer::SemaphoreState for Pool {
type Params = ();
type Permit = Connection;
fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
self.conns.pop().ok_or(p)
}
fn release(&mut self, p: Self::Permit) {
self.conns.push(p);
}
}
async fn acquire_conn(s: &flag_bearer::Semaphore<Pool>) -> flag_bearer::Permit<'_, Pool> {
let d = std::time::Duration::from_millis(200);
if let Ok(mut permit) = timeout(s.must_acquire(()), d).await {
if permit.check().await {
// We acquired a permit, and the liveness check succeeded.
// Return the permit.
return permit;
}
// do not return this connection to the semaphore, as it is broken.
flag_bearer::Permit::take(permit);
}
// There was a timeout, or the connection liveness check failed.
// Create a new connection and permit.
let c = Connection::new();
flag_bearer::Permit::out_of_thin_air(&s, c)
}
// Create a new LIFO connection pool.
// We choose LIFO here because we create new connections on timeout, and LIFO is
// more likely to have timeouts but on fewer tasks, which ends up improving our
// performance.
let semaphore = flag_bearer::new_lifo().with_state(Pool::default());
let mut conn = acquire_conn(&semaphore).await;
// access the inner conn
conn.query().await;Structs§
- Acquire
Error - The error returned by
Acquireif the semaphore queue was closed. - Builder
- A Builder for
Semaphores. - Owned
Permit - The drop-guard for owned semaphore permits. Will ensure the permit is released when dropped.
- Permit
- The drop-guard for semaphore permits. Will ensure the permit is released when dropped.
- Semaphore
- Generic semaphore performing asynchronous permit acquisition.
Enums§
- Closeable
- Controls whether the
SemaphoreQueueis closeable. - TryAcquire
Error - The error returned by
try_acquire - Uncloseable
- Controls whether the
SemaphoreQueueis not closeable.
Traits§
- IsCloseable
- Whether the semaphore is closeable
- Semaphore
State - The trait defining how semaphores behave.