sharded_queue/lib.rs
1use crossbeam_utils::CachePadded;
2
3use std::cell::UnsafeCell;
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
6
7pub struct ShardedQueue<Item> {
8 modulo_number: usize,
9 unsafe_queue_and_is_locked_list: Vec<CachePadded<(UnsafeCell<VecDeque<Item>>, AtomicBool)>>,
10
11 head_index: CachePadded<AtomicUsize>,
12 tail_index: CachePadded<AtomicUsize>,
13}
14
15/// SAFETY: [ShardedQueue] should be [Send] and [Sync] for [Item] in same cases as
16/// [std::sync::Mutex] for [Item] - when [Item] can be sent between threads,
17/// [ShardedQueue] can be sent/shared
18unsafe impl<Item: Send> Send for ShardedQueue<Item> {}
19unsafe impl<Item: Send> Sync for ShardedQueue<Item> {}
20
21/// # [ShardedQueue]
22///
23/// ## Why you should use [ShardedQueue]
24///
25/// [ShardedQueue] is currently the fastest concurrent collection which can be used
26/// under highest concurrency and load
27/// among most popular solutions, like `concurrent-queue` -
28/// see benchmarks in directory `benches` and run them with
29/// ```bash
30/// cargo bench
31/// ```
32///
33/// ## Example
34///
35/// ```rust
36/// use std::thread::{available_parallelism};
37/// use sharded_queue::ShardedQueue;
38///
39/// /// How many threads can physically access [ShardedQueue]
40/// /// simultaneously, needed for computing `shard_count`
41/// let max_concurrent_thread_count = available_parallelism().unwrap().get();
42///
43/// let sharded_queue = ShardedQueue::new(max_concurrent_thread_count);
44///
45/// sharded_queue.push_back(1);
46/// let item = sharded_queue.pop_front_or_spin_wait_item();
47/// ```
48///
49/// ## Why you may want to not use `ShardedQueue`
50///
51/// - Unlike in other concurrent queues, FIFO order is not guaranteed.
52/// While it may seem that FIFO order is guaranteed, it is not, because
53/// there can be a situation, when multiple consumers or producers triggered long resize of very large shards,
54/// all but last, then passed enough time for resize to finish, then 1 consumer or producer triggers long resize of
55/// last shard, and all other threads start to consume or produce, and eventually start spinning on
56/// last shard, without guarantee which will acquire spin lock first, so we can't even guarantee that
57/// [ShardedQueue::pop_front_or_spin_wait_item] will acquire lock before [ShardedQueue::push_back] on first
58/// attempt
59///
60/// - [ShardedQueue] doesn't track length, since length's increment/decrement logic may change
61/// depending on use case, as well as logic when it goes from 1 to 0 or reverse
62/// (in some cases, like [non_blocking_mutex::NonBlockingMutex], we don't even add action to queue when count
63/// reaches 1, but run it immediately in same thread), or even negative
64/// (to optimize some hot paths, like in some schedulers,
65/// since it is cheaper to restore count to correct state than to enforce that it can not go negative
66/// in some schedulers)
67///
68/// - [ShardedQueue] doesn't have many features, only necessary methods
69/// [ShardedQueue::pop_front_or_spin_wait_item] and
70/// [ShardedQueue::push_back] are implemented
71///
72/// ## Design explanation
73///
74/// [ShardedQueue] is designed to be used in some schedulers and [NonBlockingMutex]
75/// as the most efficient collection under highest
76/// concurrently and load
77/// (concurrent stack can't outperform it, because, unlike queue, which
78/// spreads `pop` and `push` contention between `front` and `back`,
79/// stack `pop`-s from `back` and `push`-es to `back`,
80/// which has double the contention over queue, while number of atomic increments
81/// per `pop` or `push` is same as in queue)
82///
83/// [ShardedQueue] uses array of protected by separate [Mutex]-es queues(shards),
84/// and atomically increments `head_index` or `tail_index` when `pop` or `push` happens,
85/// and computes shard index for current operation by applying modulo operation to
86/// `head_index` or `tail_index`
87///
88/// Modulo operation is optimized, knowing that
89/// ```ignore
90/// x % 2^n == x & (2^n - 1)
91/// ```
92/// , so, as long as count of queues(shards) is a power of two,
93/// we can compute modulo very efficiently using formula
94/// ```ignore
95/// operation_number % shard_count == operation_number & (shard_count - 1)
96/// ```
97///
98/// As long as count of queues(shards) is a power of two and
99/// is greater than or equal to number of CPU-s,
100/// and CPU-s spend ~same time in `push`/`pop` (time is ~same,
101/// since it is amortized O(1)),
102/// multiple CPU-s physically can't access same shards
103/// simultaneously and we have best possible performance.
104/// Synchronizing underlying non-concurrent queue costs only
105/// - 1 additional atomic increment per `push` or `pop`
106/// (incrementing `head_index` or `tail_index`)
107/// - 1 additional `compare_and_swap` and 1 atomic store
108/// (uncontended `Mutex` acquire and release)
109/// - 1 cheap bit operation(to get modulo)
110/// - 1 get from queue(shard) list by index
111///
112/// ## Complex example
113///
114/// ```rust
115/// use sharded_queue::ShardedQueue;
116/// use std::cell::UnsafeCell;
117/// use std::fmt::{Debug, Display, Formatter};
118/// use std::marker::PhantomData;
119/// use std::ops::{Deref, DerefMut};
120/// use std::sync::atomic::{AtomicUsize, Ordering};
121///
122/// pub struct NonBlockingMutex<'captured_variables, State: ?Sized> {
123/// task_count: AtomicUsize,
124/// task_queue: ShardedQueue<Box<dyn FnOnce(MutexGuard<State>) + Send + 'captured_variables>>,
125/// unsafe_state: UnsafeCell<State>,
126/// }
127///
128/// /// [NonBlockingMutex] is needed to run actions atomically without thread blocking, or context
129/// /// switch, or spin lock contention, or rescheduling on some scheduler
130/// ///
131/// /// Notice that it uses [ShardedQueue] which doesn't guarantee order of retrieval, hence
132/// /// [NonBlockingMutex] doesn't guarantee order of execution too, even of already added
133/// /// items
134/// impl<'captured_variables, State> NonBlockingMutex<'captured_variables, State> {
135/// pub fn new(max_concurrent_thread_count: usize, state: State) -> Self {
136/// Self {
137/// task_count: AtomicUsize::new(0),
138/// task_queue: ShardedQueue::new(max_concurrent_thread_count),
139/// unsafe_state: UnsafeCell::new(state),
140/// }
141/// }
142///
143/// /// Please don't forget that order of execution is not guaranteed. Atomicity of operations is guaranteed,
144/// /// but order can be random
145/// pub fn run_if_first_or_schedule_on_first(
146/// &self,
147/// run_with_state: impl FnOnce(MutexGuard<State>) + Send + 'captured_variables,
148/// ) {
149/// if self.task_count.fetch_add(1, Ordering::Acquire) != 0 {
150/// self.task_queue.push_back(Box::new(run_with_state));
151/// } else {
152/// // If we acquired first lock, run should be executed immediately and run loop started
153/// run_with_state(unsafe { MutexGuard::new(self) });
154/// /// Note that if [`fetch_sub`] != 1
155/// /// => some thread entered first if block in method
156/// /// => [ShardedQueue::push_back] is guaranteed to be called
157/// /// => [ShardedQueue::pop_front_or_spin_wait_item] will not deadlock while spins until it gets item
158/// ///
159/// /// Notice that we run action first, and only then decrement count
160/// /// with releasing(pushing) memory changes, even if it looks otherwise
161/// while self.task_count.fetch_sub(1, Ordering::Release) != 1 {
162/// self.task_queue.pop_front_or_spin_wait_item()(unsafe { MutexGuard::new(self) });
163/// }
164/// }
165/// }
166/// }
167///
168/// /// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex]
169/// /// and [std::sync::MutexGuard]
170/// ///
171/// /// these are the only places where `T: Send` matters; all other
172/// /// functionality works fine on a single thread.
173/// unsafe impl<'captured_variables, State: ?Sized + Send> Send
174/// for NonBlockingMutex<'captured_variables, State>
175/// {
176/// }
177/// unsafe impl<'captured_variables, State: ?Sized + Send> Sync
178/// for NonBlockingMutex<'captured_variables, State>
179/// {
180/// }
181///
182/// /// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State]
183/// /// from moving out of synchronized loop
184/// pub struct MutexGuard<
185/// 'captured_variables,
186/// 'non_blocking_mutex_ref,
187/// State: ?Sized + 'non_blocking_mutex_ref,
188/// > {
189/// non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
190/// /// Adding it to ensure that [MutexGuard] implements [Send] and [Sync] in same cases
191/// /// as [std::sync::MutexGuard] and protects [State] from going out of synchronized
192/// /// execution loop
193/// ///
194/// /// todo remove when this error is no longer actual
195/// /// negative trait bounds are not yet fully implemented; use marker types for now [E0658]
196/// _phantom_unsend: PhantomData<std::sync::MutexGuard<'non_blocking_mutex_ref, State>>,
197/// }
198///
199/// // todo uncomment when this error is no longer actual
200/// // negative trait bounds are not yet fully implemented; use marker types for now [E0658]
201/// // impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> !Send
202/// // for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
203/// // {
204/// // }
205/// unsafe impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Sync> Sync
206/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
207/// {
208/// }
209///
210/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized>
211/// MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
212/// {
213/// unsafe fn new(
214/// non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
215/// ) -> Self {
216/// Self {
217/// non_blocking_mutex,
218/// _phantom_unsend: PhantomData,
219/// }
220/// }
221/// }
222///
223/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> Deref
224/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
225/// {
226/// type Target = State;
227///
228/// fn deref(&self) -> &State {
229/// unsafe { &*self.non_blocking_mutex.unsafe_state.get() }
230/// }
231/// }
232///
233/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> DerefMut
234/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
235/// {
236/// fn deref_mut(&mut self) -> &mut State {
237/// unsafe { &mut *self.non_blocking_mutex.unsafe_state.get() }
238/// }
239/// }
240///
241/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Debug> Debug
242/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
243/// {
244/// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
245/// Debug::fmt(&**self, f)
246/// }
247/// }
248///
249/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Display> Display
250/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
251/// {
252/// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
253/// (**self).fmt(f)
254/// }
255/// }
256/// ```
257impl<Item> ShardedQueue<Item> {
258 /// # Arguments
259 ///
260 /// * `max_concurrent_thread_count` - how many threads can physically access [ShardedQueue]
261 /// simultaneously, needed for computing `shard_count`
262 pub fn new(max_concurrent_thread_count: usize) -> Self {
263 // Computing `shard_count` and `modulo_number` to optimize modulo operation
264 // performance, knowing that:
265 // x % 2^n == x & (2^n - 1)
266 //
267 // Substituting `x` with `operation_number` and `2^n` with `shard_count`:
268 // operation_number % shard_count == operation_number & (shard_count - 1)
269 //
270 // So, to get the best modulo performance, we need to
271 // have `shard_count` a power of 2.
272 // Also, `shard_count` should be greater than `max_concurrent_thread_count`,
273 // so that threads physically couldn't contend if operations are fast
274 // (`VecDeque` operations are amortized O(1),
275 // and take O(n) only when resize needs to happen)
276 //
277 // Computing lowest `shard_count` which is
278 // - Greater than or equal to `max_concurrent_thread_count`
279 // - And is a power of 2
280 let shard_count = (2usize).pow((max_concurrent_thread_count as f64).log2().ceil() as u32);
281 Self {
282 modulo_number: shard_count - 1,
283 unsafe_queue_and_is_locked_list: (0..shard_count)
284 .map(|_| {
285 CachePadded::new((
286 UnsafeCell::new(VecDeque::<Item>::new()),
287 AtomicBool::new(false),
288 ))
289 })
290 .collect(),
291 head_index: CachePadded::new(AtomicUsize::new(0)),
292 tail_index: CachePadded::new(AtomicUsize::new(0)),
293 }
294 }
295
296 /// Note that [ShardedQueue::pop_front_or_spin_wait_item] will spin until [Item]
297 /// is added => [ShardedQueue::pop_front_or_spin_wait_item] can spin very long if
298 /// [ShardedQueue::pop_front_or_spin_wait_item] is called without guarantee that
299 /// [ShardedQueue::push_back] will be called soon
300 pub fn pop_front_or_spin_wait_item(&self) -> Item {
301 let queue_index = self.head_index.fetch_add(1, Ordering::Relaxed) & self.modulo_number;
302 let unsafe_queue_and_is_locked = unsafe {
303 /// SAFETY: [queue_index] can not be out of bounds,
304 /// because remainder of division can not be greater
305 /// than divided number
306 self.unsafe_queue_and_is_locked_list
307 .get_unchecked(queue_index)
308 };
309
310 /// Note that since we already incremented `head_index`,
311 /// it is important to complete operation. If we tried
312 /// to implement `try_remove_first`, we would need to somehow
313 /// balance our `head_index`, complexifying logic and
314 /// introducing overhead, and even if we need it,
315 /// we can just check length outside before calling this method
316 let is_locked = &unsafe_queue_and_is_locked.1;
317 loop {
318 /// We don't use [std::sync::Mutex::lock],
319 /// because [std::sync::Mutex::lock] may block thread, which we want to avoid,
320 /// and queue operations under lock have complexity
321 /// amortized O(1), which is very fast and has little
322 /// chance to perform better with blocking versus spinning
323 ///
324 /// We don't use [std::sync::Mutex::try_lock],
325 /// because replacing [std::sync::Mutex::try_lock]
326 /// with pure atomic gives ~2 times better performance
327 if is_locked
328 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
329 .is_ok()
330 {
331 let queue = unsafe {
332 /// SAFETY: We do operations on [unsafe_queue_and_is_locked] only between
333 /// acquiring of [AtomicBool] with [Ordering::Acquire] and releasing with
334 /// [Ordering::Release], so it is synchronized
335 ///
336 /// [ShardedQueue] is [Send] only when [Item] is [Send], so it is safe
337 /// to send [Item] to other thread when [ShardedQueue] can be accessed
338 /// from other thread
339 &mut *unsafe_queue_and_is_locked.0.get()
340 };
341
342 /// If we acquired lock after [ShardedQueue::push_back] and have [Item],
343 /// we can return [Item], otherwise we should unlock and restart
344 /// spinning, giving [ShardedQueue::push_back] chance to acquire lock
345 match queue.pop_front() {
346 None => {
347 is_locked.store(false, Ordering::Release);
348 }
349 Some(item) => {
350 is_locked.store(false, Ordering::Release);
351 return item;
352 }
353 }
354 }
355 }
356 }
357
358 pub fn push_back(&self, item: Item) {
359 let queue_index = self.tail_index.fetch_add(1, Ordering::Relaxed) & self.modulo_number;
360 let unsafe_queue_and_is_locked = unsafe {
361 /// SAFETY: [queue_index] can not be out of bounds,
362 /// because remainder of division can not be greater
363 /// than divided number
364 self.unsafe_queue_and_is_locked_list
365 .get_unchecked(queue_index)
366 };
367
368 let is_locked = &unsafe_queue_and_is_locked.1;
369 loop {
370 /// We don't use [std::sync::Mutex::lock],
371 /// because it may block thread, which we want to avoid,
372 /// and operation [VecDeque::push_back] under lock has complexity
373 /// amortized O(1), which is very fast
374 ///
375 /// We don't use [std::sync::Mutex::try_lock],
376 /// because we achieved 2 times faster performance in benchmarks by using raw
377 /// [AtomicBool]
378 if is_locked
379 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
380 .is_ok()
381 {
382 let queue = unsafe {
383 /// SAFETY: We do operations on [unsafe_queue_and_is_locked] only between
384 /// acquiring of [AtomicBool] with [Ordering::Acquire] and releasing with
385 /// [Ordering::Release], so it is synchronized
386 ///
387 /// [ShardedQueue] is [Send] only when [Item] is [Send], so it is safe
388 /// to send [Item] to other thread when [ShardedQueue] can be accessed
389 /// from other thread
390 &mut *unsafe_queue_and_is_locked.0.get()
391 };
392 queue.push_back(item);
393 is_locked.store(false, Ordering::Release);
394
395 break;
396 }
397 }
398 }
399}
400
401/// ```
402/// use sharded_queue::ShardedQueue;
403///
404/// fn is_send<T: Send>(t: T) {}
405///
406/// let shardedQueue: ShardedQueue<usize> = ShardedQueue::new(1);
407///
408/// is_send(shardedQueue);
409/// ```
410///
411/// ```
412/// use sharded_queue::ShardedQueue;
413///
414/// fn is_sync<T: Sync>(t: T) {}
415///
416/// let shardedQueue: ShardedQueue<usize> = ShardedQueue::new(1);
417///
418/// is_sync(shardedQueue);
419/// ```
420///
421/// ```compile_fail
422/// use std::rc::Rc;
423/// use sharded_queue::ShardedQueue;
424///
425/// fn is_send<T: Send>(t: T) {}
426///
427/// let shardedQueue: ShardedQueue<Rc<usize>> = ShardedQueue::new(1);
428///
429/// is_send(shardedQueue);
430/// ```
431///
432/// ```compile_fail
433/// use std::rc::Rc;
434/// use sharded_queue::ShardedQueue;
435///
436/// fn is_sync<T: Sync>(t: T) {}
437///
438/// let shardedQueue: ShardedQueue<Rc<usize>> = ShardedQueue::new(1);
439///
440/// is_sync(shardedQueue);
441/// ```
442///
443///
444///
445/// ```
446/// use std::collections::VecDeque;
447///
448/// fn is_send<T: Send>(t: T) {}
449///
450/// let vecDeque: VecDeque<usize> = VecDeque::new();
451///
452/// is_send(vecDeque);
453/// ```
454///
455/// ```
456/// use std::collections::VecDeque;
457///
458/// fn is_sync<T: Sync>(t: T) {}
459///
460/// let vecDeque: VecDeque<usize> = VecDeque::new();
461///
462/// is_sync(vecDeque);
463/// ```
464///
465/// ```compile_fail
466/// use std::collections::VecDeque;
467/// use std::rc::Rc;
468///
469/// fn is_send<T: Send>(t: T) {}
470///
471/// let vecDeque: VecDeque<Rc<usize>> = VecDeque::new();
472///
473/// is_send(vecDeque);
474/// ```
475///
476/// ```compile_fail
477/// use std::collections::VecDeque;
478/// use std::rc::Rc;
479///
480/// fn is_sync<T: Sync>(t: T) {}
481///
482/// let vecDeque: VecDeque<Rc<usize>> = VecDeque::new();
483///
484/// is_sync(vecDeque);
485/// ```
486struct ShardedQueueSendSyncImplementationsDocTests {}