pub struct NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask: NonBlockingMutexTask<State> + 'captured_variables> { /* private fields */ }

Implementations§

source§

impl<'captured_variables, State> NonBlockingMutex<'captured_variables, State, DynamicNonBlockingMutexTask<'captured_variables, State>>

source

pub fn run_fn_once_if_first_or_schedule_on_first( &self, run_with_state: impl FnOnce(MutexGuard<'_, State>) + Send + 'captured_variables )

Please don’t forget that order of execution is not guaranteed. Atomicity of operations is guaranteed, but order can be random

source§

impl<'captured_variables, State, TNonBlockingMutexTask: NonBlockingMutexTask<State> + 'captured_variables> NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask>

source

pub fn has_running_tasks(&self, ordering: Ordering) -> bool

source

pub fn has_no_running_tasks(&self, ordering: Ordering) -> bool

source§

impl<'captured_variables, State: Send + 'captured_variables> NonBlockingMutex<'captured_variables, State, DynamicNonBlockingMutexTask<'captured_variables, State>>

source

pub unsafe fn run_if_first_or_schedule_to_run_last( non_blocking_mutex_arc: Arc<Self>, run_with_state: impl FnOnce(MutexGuard<'_, State>) + Send + 'captured_variables )

NonBlockingMutex::run_if_first_or_schedule_to_run_last should be called only if there won’t be other calls to NonBlockingMutex, to be sure that it is run last, like inside Drop

source§

impl<'captured_variables, State, TNonBlockingMutexTask: NonBlockingMutexTask<State>> NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask>

NonBlockingMutex

Why you should use NonBlockingMutex

NonBlockingMutex is currently the fastest way to do expensive calculations under lock, or do cheap calculations under lock when concurrency/load/contention is very high - see benchmarks in directory benches and run them with

cargo bench
Examples
Optimized for 1 type of NonBlockingMutexTask
use non_blocking_mutex::mutex_guard::MutexGuard;
use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex;
use std::thread::{available_parallelism};

/// How many threads can physically access [NonBlockingMutex]
/// simultaneously, needed for computing `shard_count` of [ShardedQueue],
/// used to store queue of tasks
let max_concurrent_thread_count = available_parallelism().unwrap().get();

let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0);
/// Will infer exact type and size(0) of this [FnOnce] and
/// make sized [NonBlockingMutex] which takes only this exact [FnOnce]
/// without ever requiring [Box]-ing or dynamic dispatch
non_blocking_mutex.run_if_first_or_schedule_on_first(|mut state: MutexGuard<usize>| {
    *state += 1;
});
Easy to use with any FnOnce, but may Box tasks and use dynamic dispatch when can’t acquire lock on first try
use non_blocking_mutex::dynamic_non_blocking_mutex::DynamicNonBlockingMutex;
use std::thread::{available_parallelism, scope};

let mut state_snapshot_before_increment = 0;
let mut state_snapshot_after_increment = 0;

let mut state_snapshot_before_decrement = 0;
let mut state_snapshot_after_decrement = 0;

{
    /// How many threads can physically access [NonBlockingMutex]
    /// simultaneously, needed for computing `shard_count` of [ShardedQueue],
    /// used to store queue of tasks
    let max_concurrent_thread_count = available_parallelism().unwrap().get();

    /// Will work with any [FnOnce] and is easy to use,
    /// but will [Box] tasks and use dynamic dispatch
    /// when can't acquire lock on first try
    let non_blocking_mutex = DynamicNonBlockingMutex::new(max_concurrent_thread_count, 0);

    scope(|scope| {
        scope.spawn(|| {
            non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
                *(&mut state_snapshot_before_increment) = *state;
                *state += 1;
                *(&mut state_snapshot_after_increment) = *state;
            });
            non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
                *(&mut state_snapshot_before_decrement) = *state;
                *state -= 1;
                *(&mut state_snapshot_after_decrement) = *state;
            });
        });
    });
}

assert_eq!(state_snapshot_before_increment, 0);
assert_eq!(state_snapshot_after_increment, 1);

assert_eq!(state_snapshot_before_decrement, 1);
assert_eq!(state_snapshot_after_decrement, 0);
Optimized for multiple known types of NonBlockingMutexTask which capture variables
use non_blocking_mutex::mutex_guard::MutexGuard;
use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex;
use non_blocking_mutex::non_blocking_mutex_task::NonBlockingMutexTask;
use std::thread::{available_parallelism, scope};

let mut state_snapshot_before_increment = 0;
let mut state_snapshot_after_increment = 0;

let mut state_snapshot_before_decrement = 0;
let mut state_snapshot_after_decrement = 0;

{
    /// How many threads can physically access [NonBlockingMutex]
    /// simultaneously, needed for computing `shard_count` of [ShardedQueue],
    /// used to store queue of tasks
    let max_concurrent_thread_count = available_parallelism().unwrap().get();

    /// Will infer exact type and size of struct [Task] and
    /// make sized [NonBlockingMutex] which takes only [Task]
    /// without ever requiring [Box]-ing or dynamic dispatch
    let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0);

    scope(|scope| {
        scope.spawn(|| {
            non_blocking_mutex.run_if_first_or_schedule_on_first(
                Task::new_increment_and_store_snapshots(
                    &mut state_snapshot_before_increment,
                    &mut state_snapshot_after_increment,
                ),
            );
            non_blocking_mutex.run_if_first_or_schedule_on_first(
                Task::new_decrement_and_store_snapshots(
                    &mut state_snapshot_before_decrement,
                    &mut state_snapshot_after_decrement,
                ),
            );
        });
    });
}

assert_eq!(state_snapshot_before_increment, 0);
assert_eq!(state_snapshot_after_increment, 1);

assert_eq!(state_snapshot_before_decrement, 1);
assert_eq!(state_snapshot_after_decrement, 0);

struct SnapshotsBeforeAndAfterChangeRefs<
    'snapshot_before_change_ref,
    'snapshot_after_change_ref,
> {
    /// Where to write snapshot of `State` before applying function to `State`
    snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
    /// Where to write snapshot of `State` after applying function to `State
    snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
}

enum TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref> {
    IncrementAndStoreSnapshots(
        SnapshotsBeforeAndAfterChangeRefs<
            'snapshot_before_change_ref,
            'snapshot_after_change_ref,
        >,
    ),
    DecrementAndStoreSnapshots(
        SnapshotsBeforeAndAfterChangeRefs<
            'snapshot_before_change_ref,
            'snapshot_after_change_ref,
        >,
    ),
}

struct Task<'snapshot_before_change_ref, 'snapshot_after_change_ref> {
    task_type: TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref>,
}

impl<'snapshot_before_change_ref, 'snapshot_after_change_ref>
    Task<'snapshot_before_change_ref, 'snapshot_after_change_ref>
{
    fn new_increment_and_store_snapshots(
        // Where to write snapshot of `State` before applying function to `State`
        snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
        // Where to write snapshot of `State` after applying function to `State
        snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
    ) -> Self {
        Self {
            task_type: TaskType::IncrementAndStoreSnapshots(
                SnapshotsBeforeAndAfterChangeRefs {
                    /// Where to write snapshot of `State` before applying function to `State`
                    snapshot_before_change_ref,
                    /// Where to write snapshot of `State` after applying function to `State
                    snapshot_after_change_ref,
                },
            ),
        }
    }

    fn new_decrement_and_store_snapshots(
        // Where to write snapshot of `State` before applying function to `State`
        snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
        // Where to write snapshot of `State` after applying function to `State
        snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
    ) -> Self {
        Self {
            task_type: TaskType::DecrementAndStoreSnapshots(
                SnapshotsBeforeAndAfterChangeRefs {
                    /// Where to write snapshot of `State` before applying function to `State`
                    snapshot_before_change_ref,
                    /// Where to write snapshot of `State` after applying function to `State
                    snapshot_after_change_ref,
                },
            ),
        }
    }
}

impl<'snapshot_before_change_ref, 'snapshot_after_change_ref> NonBlockingMutexTask<usize>
    for Task<'snapshot_before_change_ref, 'snapshot_after_change_ref>
{
    fn run_with_state(self, mut state: MutexGuard<usize>) {
        match self.task_type {
            TaskType::IncrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs {
                snapshot_before_change_ref,
                snapshot_after_change_ref,
            }) => {
                *snapshot_before_change_ref = *state;
                *state += 1;
                *snapshot_after_change_ref = *state;
            }
            TaskType::DecrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs {
                snapshot_before_change_ref,
                snapshot_after_change_ref,
            }) => {
                *snapshot_before_change_ref = *state;
                *state -= 1;
                *snapshot_after_change_ref = *state;
            }
        }
    }
}
Why you may want to not use NonBlockingMutex
  • NonBlockingMutex forces first thread to enter synchronized block to do all tasks(including added while it is running, potentially running forever if tasks are being added forever)

  • It is more difficult to continue execution on same thread after synchronized logic is run, you need to schedule continuation on some scheduler when you want to continue after end of synchronized logic in new thread or introduce other synchronization primitives, like channels, or WaitGroup-s, or similar

  • NonBlockingMutex performs worse than std::sync::Mutex when concurrency/load/contention is low

  • Similar to std::sync::Mutex, NonBlockingMutex doesn’t guarantee order of execution, only atomicity of operations is guaranteed

Design explanation

First thread, which calls NonBlockingMutex::run_if_first_or_schedule_on_first, atomically increments task_count, and, if thread was first to increment task_count from 0 to 1, first thread immediately executes first task, and then atomically decrements task_count and checks if task_count changed from 1 to 0. If task_count changed from 1 to 0 - there are no more tasks and first thread can finish execution loop, otherwise first thread gets next task from task_queue and runs task, then decrements tasks count after it was run and repeats check if task_count changed from 1 to 0 and running tasks until there are no more tasks left.

Not first threads also atomically increment task_count, do check if they are first, Box task and push task Box to task_queue

This design allows us to avoid lock contention, but adds ~constant time of Box-ing task and putting task Box into concurrent task_queue, and incrementing and decrementing task_count, so when lock contention is low, NonBlockingMutex performs worse than std::sync::Mutex, but when contention is high (because we have more CPU-s or because we want to do expensive calculations under lock), NonBlockingMutex performs better than std::sync::Mutex

source

pub fn new(max_concurrent_thread_count: usize, state: State) -> Self

Arguments
  • max_concurrent_thread_count - how many threads can physically access NonBlockingMutex simultaneously, needed for computing shard_count of ShardedQueue, used to store queue of tasks
source

pub fn run_if_first_or_schedule_on_first(&self, task: TNonBlockingMutexTask)

Please don’t forget that order of execution is not guaranteed. Atomicity of operations is guaranteed, but order can be random

Trait Implementations§

source§

impl<'captured_variables, State: Send, TNonBlockingMutexTask: NonBlockingMutexTask<State>> Send for NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask>

Send and Sync logic was taken from std::sync::Mutex

source§

impl<'captured_variables, State: Send, TNonBlockingMutexTask: NonBlockingMutexTask<State>> Sync for NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask>

Auto Trait Implementations§

§

impl<'captured_variables, State, TNonBlockingMutexTask> !RefUnwindSafe for NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask>

§

impl<'captured_variables, State, TNonBlockingMutexTask> Unpin for NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask>where State: Unpin, TNonBlockingMutexTask: Unpin,

§

impl<'captured_variables, State, TNonBlockingMutexTask> !UnwindSafe for NonBlockingMutex<'captured_variables, State, TNonBlockingMutexTask>

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.