ShardedQueue
ShardedQueue is needed for some schedulers and NonBlockingMutex as a highly specialized for their use case concurrent queue
ShardedQueue is a light-weight concurrent queue, which uses spin locking and fights lock contention with sharding
Notice that while it may seem that FIFO order is guaranteed, it is not, because there can be a situation, when multiple producers triggered long resize of very large shards, all but last, then passed enough time for resize to finish, then 1 producer triggers long resize of last shard, and all other threads start to consume or produce, and eventually start spinning on last shard, without guarantee which will acquire spin lock first, so we can't even guarantee that ShardedQueue::pop_front_or_spin will acquire lock before ShardedQueue::push_back on first attempt
Notice that this queue doesn't track length, since length's increment/decrement logic may change depending on use case, as well as logic when it goes from 1 to 0 or reverse(in some cases, like NonBlockingMutex, we don't even add action to queue when count reaches 1, but run it immediately in same thread), or even negative(to optimize some hot paths, like in some schedulers, since it is cheaper to restore count to correct state than to enforce that it can not go negative in some schedulers)
Examples
use ShardedQueue;
use UnsafeCell;
use ;
use PhantomData;
use ;
use ;
/// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex]
/// and [std::sync::MutexGuard]
///
/// these are the only places where `T: Send` matters; all other
/// functionality works fine on a single thread.
unsafe
unsafe
/// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State]
/// from moving out of synchronized loop
// todo uncomment when this error is no longer actual
// negative trait bounds are not yet fully implemented; use marker types for now [E0658]
// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> !Send
// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
// {
// }
unsafe
Benchmarks
| benchmark_name | operation_count_per_thread | concurrent_thread_count | average_time |
|---|---|---|---|
| sharded_queue_push_and_pop_concurrently | 1_000 | 24 | 3.1980 ms |
| crossbeam_queue_push_and_pop_concurrently | 1_000 | 24 | 5.3154 ms |
| queue_mutex_push_and_pop_concurrently | 1_000 | 24 | 6.4846 ms |
| sharded_queue_push_and_pop_concurrently | 10_000 | 24 | 37.245 ms |
| crossbeam_queue_push_and_pop_concurrently | 10_000 | 24 | 49.234 ms |
| queue_mutex_push_and_pop_concurrently | 10_000 | 24 | 69.207 ms |
| sharded_queue_push_and_pop_concurrently | 100_000 | 24 | 395.12 ms |
| crossbeam_queue_push_and_pop_concurrently | 100_000 | 24 | 434.00 ms |
| queue_mutex_push_and_pop_concurrently | 100_000 | 24 | 476.59 ms |