pub struct Semaphore { /* private fields */ }
Expand description

A concurrency-limiting synchronization primitive, used to limit the number of threads performing a certain operation or accessing a particular resource at the same time.

A Semaphore is created with a maximum concurrency count that can never be exceeded, and an initial concurrency count that determines the available concurrency at creation. Threads attempting to access a limited-concurrency resource or perform a concurrency-limited operation wait on the Semaphore, an operation which either immediately grants access to the calling thread if the available concurrency has not been saturated or blocks, sleeping the thread until another thread completes its concurrency-limited operation or the available concurrency limit is further increased.

While the available concurrency count may be modified (decremented to zero or incremented up to the maximum specified at the time of its instantiation), the maximum concurrency limit cannot be changed once the Semaphore has been created.

Example:

use rsevents_extra::{Semaphore};
use std::sync::atomic::{AtomicU32, Ordering};

// Limit maximum number of simultaneous network requests to 4, but start
// with only 1 simultaneous network request allowed.
const MAX_REQUESTS: u16 = 4;
const START_REQUESTS: u16 = 1;
static HTTP_SEM: Semaphore = Semaphore::new(START_REQUESTS, MAX_REQUESTS);
static TASKS_LEFT: AtomicU32 = AtomicU32::new(42);

fn download_file(url: &str) -> Result<Vec<u8>, std::io::Error> {
    // Make sure we never exceed the maximum number of simultaneous
    // network connections allowed.
    let sem_guard = HTTP_SEM.wait();

    // <download the file here>

    // When `sem_guard` is dropped at the end of the scope, we give up our
    // network access slot letting another thread through.
    return Ok(unimplemented!());
}

fn get_file_from_cache(url: &str) -> Result<Vec<u8>, ()> { todo!() }

fn do_work() -> Result<(), std::io::Error> {
    loop {
        let mut file_in_cache = false;
        // Do some stuff that takes time here...
        // ...
        let url = "https://some-url/some/path/";
        let file = get_file_from_cache(url).or_else(|_| download_file(url))?;
        // Do something with the file...
        // ...
        TASKS_LEFT.fetch_sub(1, Ordering::Relaxed);
    }
}

fn main() {
    // Start a thread to read control messages from the user
    std::thread::spawn(|| {
        let mut network_limit = START_REQUESTS;
        loop {
            println!("Press f to go faster or s to go slower");
            let mut input = String::new();
            std::io::stdin().read_line(&mut input).unwrap();
            match input.trim() {
                "f" if network_limit < MAX_REQUESTS => {
                    HTTP_SEM.release(1);
                    network_limit += 1;
                }
                "s" if network_limit > 0 => {
                    HTTP_SEM.wait().forget();
                    network_limit -= 1;
                }
                _ => eprintln!("Invalid request!"),
            }
        }
    });

    // Start 8 worker threads and wait for them to finish
    std::thread::scope(|scope| {
        for _ in 0..8 {
            scope.spawn(do_work);
        }
    });
}

Implementations

Create a new Semaphore with a maximum available concurrency count of max_count and an initial available concurrency count of initial_count.

Attempts to obtain access to the resource or code protected by the Semaphore, subject to the available concurrency count. Returns immediately if the Semaphore’s internal concurrency count is non-zero or blocks sleeping until the Semaphore becomes available (via another thread completing its access to the controlled-concurrency region or if the semaphore’s concurrency limit is raised).

A successful wait against the semaphore decrements its internal available concurrency count (possibly preventing other threads from obtaining the semaphore) until Semaphore::release() is called (which happens automatically when the SemaphoreGuard concurrency token is dropped).

Attempts a time-bounded wait against the Semaphore, returning Ok(()) if and when the semaphore becomes available or a TimeoutError if the specified time limit elapses without the semaphore becoming available to the calling thread.

Directly increments or decrements the current availability limit for a Semaphore without blocking. This is only possible when the semaphore is not currently borrowed or being waited on. Panics if the change will result in an available concurrency limit of less than zero or greater than the semaphore’s maximum. See Semaphore::try_modify() for a non-panicking alternative.

To increment the semaphore’s concurrency limit without an &mut Semaphore reference, call Semaphore::release() instead. To decrement the concurrency limit, wait on the semaphore then call forget() on the returned SemaphoreGuard:

use rsevents_extra::Semaphore;

fn adjust_sem(sem: &Semaphore, count: i16) {
    if count >= 0 {
       sem.release(count as u16);
    } else {
        // Note: this will block if the semaphore isn't available!
        for _ in 0..(-1 * count) {
            let guard = sem.wait();
            guard.forget();
        }
    }
}

Directly increments or decrements the current availability limit for a Semaphore without blocking. This is only possible when the semaphore is not currently borrowed or being waited on. Returns false if the change will result in an available concurrency limit of less than zero or greater than the semaphore’s maximum.

See Semaphore::modify() for more info.

Increments the available concurrency by count, and panics if this results in a count that exceeds the max_count the Semaphore was created with (see Semaphore::new()). Unlike Semaphore::modify(), this can be called with a non-mutable reference to the semaphore, but can only increment the concurrency level.

See try_release for a non-panicking version of this function. See the documentation for modify() for info on decrementing the available concurrency level.

Attempts to increment the available concurrency counter by count, and returns false if this operation would result in a count that exceeds the max_count the Semaphore was created with (see Semaphore::new()).

If you can guarantee that the count cannot exceed the maximum allowed, you may want to use Semaphore::release() instead as it is both lock-free and wait-free, whereas try_release() is only lock-free and may spin internally in case of contention.

Returns the currently available count of the semaphore.

Note that this may race with other calls such as release() or wait().

Trait Implementations

Attempts to obtain access to the resource or code protected by the Semaphore, subject to the available concurrency count. Returns immediately if the Semaphore’s internal concurrency count is non-zero or blocks sleeping until the Semaphore becomes available (via another thread completing its access to the controlled-concurrency region or if the semaphore’s concurrency limit is raised).

A successful wait against the semaphore decrements its internal available concurrency count (possibly preventing other threads from obtaining the semaphore) until Semaphore::release() is called.

Attempts a time-bounded wait against the Semaphore, returning Ok(()) if and when the semaphore becomes available or a TimeoutError if the specified time limit elapses without the semaphore becoming available to the calling thread.

Attempts to obtain the Semaphore without waiting, returning Ok(()) if the semaphore is immediately available or a TimeoutError otherwise.

The type yielded by the Awaitable type on a successful wait
The type yielded by the Awaitable type in case of an error, also specifying whether or not an unbounded Awaitable::wait() returns any error at all. Read more
Blocks until the Awaitable type and its associated type T become available. Like try_wait() but bypasses error handling. Read more

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.