Struct rsevents_extra::Semaphore
source · [−]pub struct Semaphore { /* private fields */ }
Expand description
A concurrency-limiting synchronization primitive, used to limit the number of threads performing a certain operation or accessing a particular resource at the same time.
A Semaphore
is created with a maximum concurrency count that can never be exceeded, and an
initial concurrency count that determines the available concurrency at creation. Threads
attempting to access a limited-concurrency resource or perform a concurrency-limited operation
wait on the Semaphore
, an operation which either immediately grants
access to the calling thread if the available concurrency has not been saturated or blocks,
sleeping the thread until another thread completes its concurrency-limited operation or the
available concurrency limit is further increased.
While the available concurrency count may be modified (decremented to zero or incremented up to
the maximum specified at the time of its instantiation), the maximum concurrency limit cannot be
changed once the Semaphore
has been created.
Example:
use rsevents_extra::{Semaphore};
use std::sync::atomic::{AtomicU32, Ordering};
// Limit maximum number of simultaneous network requests to 4, but start
// with only 1 simultaneous network request allowed.
const MAX_REQUESTS: u16 = 4;
const START_REQUESTS: u16 = 1;
static HTTP_SEM: Semaphore = Semaphore::new(START_REQUESTS, MAX_REQUESTS);
static TASKS_LEFT: AtomicU32 = AtomicU32::new(42);
fn download_file(url: &str) -> Result<Vec<u8>, std::io::Error> {
// Make sure we never exceed the maximum number of simultaneous
// network connections allowed.
let sem_guard = HTTP_SEM.wait();
// <download the file here>
// When `sem_guard` is dropped at the end of the scope, we give up our
// network access slot letting another thread through.
return Ok(unimplemented!());
}
fn get_file_from_cache(url: &str) -> Result<Vec<u8>, ()> { todo!() }
fn do_work() -> Result<(), std::io::Error> {
loop {
let mut file_in_cache = false;
// Do some stuff that takes time here...
// ...
let url = "https://some-url/some/path/";
let file = get_file_from_cache(url).or_else(|_| download_file(url))?;
// Do something with the file...
// ...
TASKS_LEFT.fetch_sub(1, Ordering::Relaxed);
}
}
fn main() {
// Start a thread to read control messages from the user
std::thread::spawn(|| {
let mut network_limit = START_REQUESTS;
loop {
println!("Press f to go faster or s to go slower");
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
match input.trim() {
"f" if network_limit < MAX_REQUESTS => {
HTTP_SEM.release(1);
network_limit += 1;
}
"s" if network_limit > 0 => {
HTTP_SEM.wait().forget();
network_limit -= 1;
}
_ => eprintln!("Invalid request!"),
}
}
});
// Start 8 worker threads and wait for them to finish
std::thread::scope(|scope| {
for _ in 0..8 {
scope.spawn(do_work);
}
});
}
Implementations
sourceimpl Semaphore
impl Semaphore
sourcepub const fn new(initial_count: u16, max_count: u16) -> Self
pub const fn new(initial_count: u16, max_count: u16) -> Self
Create a new Semaphore
with a maximum available concurrency count of max_count
and an initial available concurrency count of initial_count
.
sourcepub fn wait<'a>(&'a self) -> SemaphoreGuard<'a>
pub fn wait<'a>(&'a self) -> SemaphoreGuard<'a>
Attempts to obtain access to the resource or code protected by the Semaphore
, subject to
the available concurrency count. Returns immediately if the Semaphore
’s internal
concurrency count is non-zero or blocks sleeping until the Semaphore
becomes available
(via another thread completing its access to the controlled-concurrency region or if the
semaphore’s concurrency limit is raised).
A successful wait against the semaphore decrements its internal available concurrency
count (possibly preventing other threads from obtaining the semaphore) until
Semaphore::release()
is called (which happens automatically when the SemaphoreGuard
concurrency token is dropped).
sourcepub fn wait_for<'a>(
&'a self,
limit: Duration
) -> Result<SemaphoreGuard<'a>, TimeoutError>
pub fn wait_for<'a>(
&'a self,
limit: Duration
) -> Result<SemaphoreGuard<'a>, TimeoutError>
Attempts a time-bounded wait against the Semaphore
, returning Ok(())
if and when the
semaphore becomes available or a TimeoutError
if the specified
time limit elapses without the semaphore becoming available to the calling thread.
sourcepub fn modify(&mut self, count: i16)
pub fn modify(&mut self, count: i16)
Directly increments or decrements the current availability limit for a Semaphore
without
blocking. This is only possible when the semaphore is not currently borrowed or being waited
on. Panics if the change will result in an available concurrency limit of less than zero or
greater than the semaphore’s maximum. See Semaphore::try_modify()
for a non-panicking
alternative.
To increment the semaphore’s concurrency limit without an &mut Semaphore
reference, call
Semaphore::release()
instead. To decrement the concurrency limit, wait on the semaphore
then call forget()
on the returned SemaphoreGuard
:
use rsevents_extra::Semaphore;
fn adjust_sem(sem: &Semaphore, count: i16) {
if count >= 0 {
sem.release(count as u16);
} else {
// Note: this will block if the semaphore isn't available!
for _ in 0..(-1 * count) {
let guard = sem.wait();
guard.forget();
}
}
}
sourcepub fn try_modify(&mut self, count: i16) -> bool
pub fn try_modify(&mut self, count: i16) -> bool
Directly increments or decrements the current availability limit for a Semaphore
without
blocking. This is only possible when the semaphore is not currently borrowed or being waited
on. Returns false
if the change will result in an available concurrency limit of less
than zero or greater than the semaphore’s maximum.
See Semaphore::modify()
for more info.
sourcepub fn release(&self, count: u16)
pub fn release(&self, count: u16)
Increments the available concurrency by count
, and panics if this results in a count that
exceeds the max_count
the Semaphore
was created with (see Semaphore::new()
). Unlike
Semaphore::modify()
, this can be called with a non-mutable reference to the semaphore,
but can only increment the concurrency level.
See try_release
for a non-panicking version of this function.
See the documentation for modify()
for info on decrementing the available
concurrency level.
sourcepub fn try_release(&self, count: u16) -> bool
pub fn try_release(&self, count: u16) -> bool
Attempts to increment the available concurrency counter by count
, and returns false
if
this operation would result in a count that exceeds the max_count
the Semaphore
was
created with (see Semaphore::new()
).
If you can guarantee that the count cannot exceed the maximum allowed, you may want to use
Semaphore::release()
instead as it is both lock-free and wait-free, whereas
try_release()
is only lock-free and may spin internally in case of contention.
Trait Implementations
sourceimpl<'a> Awaitable<'a> for Semaphore
impl<'a> Awaitable<'a> for Semaphore
sourcefn try_wait(&'a self) -> Result<SemaphoreGuard<'a>, Infallible>
fn try_wait(&'a self) -> Result<SemaphoreGuard<'a>, Infallible>
Attempts to obtain access to the resource or code protected by the Semaphore
, subject to
the available concurrency count. Returns immediately if the Semaphore
’s internal
concurrency count is non-zero or blocks sleeping until the Semaphore
becomes available
(via another thread completing its access to the controlled-concurrency region or if the
semaphore’s concurrency limit is raised).
A successful wait against the semaphore decrements its internal available concurrency
count (possibly preventing other threads from obtaining the semaphore) until
Semaphore::release()
is called.
sourcefn try_wait_for(
&'a self,
limit: Duration
) -> Result<SemaphoreGuard<'a>, TimeoutError>
fn try_wait_for(
&'a self,
limit: Duration
) -> Result<SemaphoreGuard<'a>, TimeoutError>
Attempts a time-bounded wait against the Semaphore
, returning Ok(())
if and when the
semaphore becomes available or a TimeoutError
if the specified
time limit elapses without the semaphore becoming available to the calling thread.
sourcefn try_wait0(&'a self) -> Result<SemaphoreGuard<'a>, TimeoutError>
fn try_wait0(&'a self) -> Result<SemaphoreGuard<'a>, TimeoutError>
Attempts to obtain the Semaphore
without waiting, returning Ok(())
if the semaphore
is immediately available or a TimeoutError
otherwise.
type T = SemaphoreGuard<'a>
type T = SemaphoreGuard<'a>
type Error = TimeoutError
type Error = TimeoutError
Awaitable::wait()
returns any error at all. Read more