pub struct ShardedQueue<Item> { /* private fields */ }

Implementations§

source§

impl<Item> ShardedQueue<Item>

ShardedQueue

ShardedQueue is needed for some schedulers and [NonBlockingMutex] to store and retrieve data concurrently in the most efficient way

ShardedQueue is a light-weight concurrent queue, which uses spin locking and fights lock contention with sharding

Notice that while it may seem that FIFO order is guaranteed, it is not, because there can be a situation, when multiple consumers or producers triggered long resize of very large shards, all but last, then passed enough time for resize to finish, then 1 consumer or producer triggers long resize of last shard, and all other threads start to consume or produce, and eventually start spinning on last shard, without guarantee which will acquire spin lock first, so we can’t even guarantee that ShardedQueue::pop_front_or_spin will acquire lock before ShardedQueue::push_back on first attempt

Notice that this queue doesn’t track length, since length’s increment/decrement logic may change depending on use case, as well as logic when it goes from 1 to 0 or reverse (in some cases, like [NonBlockingMutex], we don’t even add action to queue when count reaches 1, but run it immediately in same thread), or even negative (to optimize some hot paths, like in some schedulers, since it is cheaper to restore count to correct state than to enforce that it can not go negative in some schedulers)

Examples

use sharded_queue::ShardedQueue;
use std::cell::UnsafeCell;
use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};

pub struct NonBlockingMutex<'captured_variables, State: ?Sized> {
    task_count: AtomicUsize,
    task_queue: ShardedQueue<Box<dyn FnOnce(MutexGuard<State>) + Send + 'captured_variables>>,
    unsafe_state: UnsafeCell<State>,
}

/// [NonBlockingMutex] is needed to run actions atomically without thread blocking, or context
/// switch, or spin lock contention, or rescheduling on some scheduler
///
/// Notice that it uses [ShardedQueue] which doesn't guarantee order of retrieval, hence
/// [NonBlockingMutex] doesn't guarantee order of execution too, even of already added
/// items
impl<'captured_variables, State> NonBlockingMutex<'captured_variables, State> {
    #[inline]
    pub fn new(max_concurrent_thread_count: usize, state: State) -> Self {
        Self {
            task_count: AtomicUsize::new(0),
            task_queue: ShardedQueue::new(max_concurrent_thread_count),
            unsafe_state: UnsafeCell::new(state),
        }
    }

    /// Please don't forget that order of execution is not guaranteed. Atomicity of operations is guaranteed,
    /// but order can be random
    #[inline]
    pub fn run_if_first_or_schedule_on_first(
        &self,
        run_with_state: impl FnOnce(MutexGuard<State>) + Send + 'captured_variables,
    ) {
        if self.task_count.fetch_add(1, Ordering::Acquire) != 0 {
            self.task_queue.push_back(Box::new(run_with_state));
        } else {
            // If we acquired first lock, run should be executed immediately and run loop started
            run_with_state(unsafe { MutexGuard::new(self) });
            /// Note that if [`fetch_sub`] != 1
            /// => some thread entered first if block in method
            /// => [ShardedQueue::push_back] is guaranteed to be called
            /// => [ShardedQueue::pop_front_or_spin] will not deadlock while spins until it gets item
            ///
            /// Notice that we run action first, and only then decrement count
            /// with releasing(pushing) memory changes, even if it looks otherwise
            while self.task_count.fetch_sub(1, Ordering::Release) != 1 {
                self.task_queue.pop_front_or_spin()(unsafe { MutexGuard::new(self) });
            }
        }
    }
}

/// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex]
/// and [std::sync::MutexGuard]
///
/// these are the only places where `T: Send` matters; all other
/// functionality works fine on a single thread.
unsafe impl<'captured_variables, State: ?Sized + Send> Send
    for NonBlockingMutex<'captured_variables, State>
{
}
unsafe impl<'captured_variables, State: ?Sized + Send> Sync
    for NonBlockingMutex<'captured_variables, State>
{
}

/// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State]
/// from moving out of synchronized loop
pub struct MutexGuard<
    'captured_variables,
    'non_blocking_mutex_ref,
    State: ?Sized + 'non_blocking_mutex_ref,
> {
    non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
    /// Adding it to ensure that [MutexGuard] implements [Send] and [Sync] in same cases
    /// as [std::sync::MutexGuard] and protects [State] from going out of synchronized
    /// execution loop
    ///
    /// todo remove when this error is no longer actual
    ///  negative trait bounds are not yet fully implemented; use marker types for now [E0658]
    _phantom_unsend: PhantomData<std::sync::MutexGuard<'non_blocking_mutex_ref, State>>,
}

// todo uncomment when this error is no longer actual
//  negative trait bounds are not yet fully implemented; use marker types for now [E0658]
// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> !Send
//     for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
// {
// }
unsafe impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Sync> Sync
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized>
    MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    unsafe fn new(
        non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
    ) -> Self {
        Self {
            non_blocking_mutex,
            _phantom_unsend: PhantomData,
        }
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> Deref
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    type Target = State;

    #[inline]
    fn deref(&self) -> &State {
        unsafe { &*self.non_blocking_mutex.unsafe_state.get() }
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> DerefMut
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    #[inline]
    fn deref_mut(&mut self) -> &mut State {
        unsafe { &mut *self.non_blocking_mutex.unsafe_state.get() }
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Debug> Debug
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    #[inline]
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        Debug::fmt(&**self, f)
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Display> Display
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    #[inline]
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        (**self).fmt(f)
    }
}
use sharded_queue::ShardedQueue;

fn is_send<T: Send>(t: T) {}

let shardedQueue: ShardedQueue<usize> = ShardedQueue::new(1);

is_send(shardedQueue);
use sharded_queue::ShardedQueue;

fn is_sync<T: Sync>(t: T) {}

let shardedQueue: ShardedQueue<usize> = ShardedQueue::new(1);

is_sync(shardedQueue);
use std::rc::Rc;
use sharded_queue::ShardedQueue;

fn is_send<T: Send>(t: T) {}

let shardedQueue: ShardedQueue<Rc<usize>> = ShardedQueue::new(1);

is_send(shardedQueue);
use std::rc::Rc;
use sharded_queue::ShardedQueue;

fn is_sync<T: Sync>(t: T) {}

let shardedQueue: ShardedQueue<Rc<usize>> = ShardedQueue::new(1);

is_sync(shardedQueue);
use std::collections::VecDeque;

fn is_send<T: Send>(t: T) {}

let vecDeque: VecDeque<usize> = VecDeque::new();

is_send(vecDeque);
use std::collections::VecDeque;

fn is_sync<T: Sync>(t: T) {}

let vecDeque: VecDeque<usize> = VecDeque::new();

is_sync(vecDeque);
use std::collections::VecDeque;
use std::rc::Rc;

fn is_send<T: Send>(t: T) {}

let vecDeque: VecDeque<Rc<usize>> = VecDeque::new();

is_send(vecDeque);
use std::collections::VecDeque;
use std::rc::Rc;

fn is_sync<T: Sync>(t: T) {}

let vecDeque: VecDeque<Rc<usize>> = VecDeque::new();

is_sync(vecDeque);
source

pub fn new(max_concurrent_thread_count: usize) -> Self

source

pub fn pop_front_or_spin(&self) -> Item

Note that it will spin until item is added => it can spin very long if ShardedQueue::pop_front_or_spin is called without guarantee that ShardedQueue::push_back will be called

source

pub fn push_back(&self, item: Item)

Auto Trait Implementations§

§

impl<Item> RefUnwindSafe for ShardedQueue<Item>

§

impl<Item> Send for ShardedQueue<Item>where Item: Send,

§

impl<Item> Sync for ShardedQueue<Item>where Item: Send,

§

impl<Item> Unpin for ShardedQueue<Item>where Item: Unpin,

§

impl<Item> UnwindSafe for ShardedQueue<Item>

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.