#![warn(missing_debug_implementations, rust_2018_idioms, missing_docs)]
use std::cmp;
use std::collections::hash_map::DefaultHasher;
use std::fmt::{self, Debug, Formatter};
use std::hash::Hasher;
use std::iter::FusedIterator;
use std::mem::{self, MaybeUninit};
use std::ops::Deref;
use std::ptr::{self, NonNull};
#[cfg(not(loom))]
use std::{collections::hash_map::RandomState, hash::BuildHasher};
#[cfg_attr(loom, path = "loom.rs")]
#[cfg_attr(not(loom), path = "std.rs")]
mod facade;
use facade::atomic::{self, AtomicBool, AtomicU16, AtomicU32, AtomicUsize};
use facade::Arc;
use facade::GlobalQueue;
use facade::UnsafeCell;
#[derive(Debug)]
pub struct Queue<T>(Arc<Shared<T>>);
impl<T> Queue<T> {
pub fn new(local_queues: usize, local_queue_size: u16) -> Self {
assert_eq!(
local_queue_size.count_ones(),
1,
"Queue size is not a power of two"
);
let mask = local_queue_size - 1;
Self(Arc::new(Shared {
local_queues: (0..local_queues)
.map(|_| LocalQueueInner {
heads: AtomicU32::new(0),
tail: AtomicU16::new(0),
mask,
items: (0..local_queue_size)
.map(|_| UnsafeCell::new(MaybeUninit::uninit()))
.collect(),
})
.collect(),
global_queue: GlobalQueue::new(),
stealing_global: AtomicBool::new(false),
taken_local_queues: AtomicBool::new(false),
searchers: AtomicUsize::new(0),
}))
}
pub fn push(&self, item: T) {
let _ = self.0.global_queue.push(item);
}
pub fn local_queues(&self) -> LocalQueues<'_, T> {
assert!(!self
.0
.taken_local_queues
.swap(true, atomic::Ordering::Relaxed));
LocalQueues {
shared: self,
index: 0,
#[cfg(not(loom))]
hasher: RandomState::new().build_hasher(),
#[cfg(loom)]
hasher: DefaultHasher::new(),
}
}
#[must_use]
pub fn searchers(&self) -> usize {
self.0.searchers.load(atomic::Ordering::Relaxed)
}
}
impl<T> Clone for Queue<T> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
#[derive(Debug)]
struct Shared<T> {
local_queues: Box<[LocalQueueInner<T>]>,
global_queue: GlobalQueue<T>,
stealing_global: AtomicBool,
taken_local_queues: AtomicBool,
searchers: AtomicUsize,
}
struct LocalQueueInner<T> {
heads: AtomicU32,
tail: AtomicU16,
mask: u16,
items: Box<[UnsafeCell<MaybeUninit<T>>]>,
}
unsafe impl<T: Send> Sync for LocalQueueInner<T> {}
impl<T> Debug for LocalQueueInner<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let (protected_head, head) = unpack_heads(self.heads.load(atomic::Ordering::Acquire));
f.debug_struct("LocalQueueInner")
.field("protected_head", &protected_head)
.field("head", &head)
.field("tail", &self.tail)
.field("mask", &format_args!("{:#b}", self.mask))
.finish()
}
}
fn unpack_heads(heads: u32) -> (u16, u16) {
((heads >> 16) as u16, heads as u16)
}
fn pack_heads(stealer: u16, real: u16) -> u32 {
(stealer as u32) << 16 | real as u32
}
#[derive(Debug)]
pub struct LocalQueue<T> {
lifo_slot: Option<T>,
local: ValidPtr<LocalQueueInner<T>>,
shared: Queue<T>,
rng: Rng,
}
impl<T> LocalQueue<T> {
fn local_tail(&mut self) -> u16 {
unsafe { facade::atomic_u16_unsync_load(&self.local.tail) }
}
pub fn push(&mut self, item: T) {
if let Some(previous) = self.lifo_slot.replace(item) {
self.push_yield(previous);
}
}
pub fn push_yield(&mut self, item: T) {
let tail = self.local_tail();
let mut heads = self.local.heads.load(atomic::Ordering::Acquire);
loop {
let (steal_head, head) = unpack_heads(heads);
if tail.wrapping_sub(steal_head) < self.local.items.len() as u16 {
let i = tail & self.local.mask;
self.local.items[usize::from(i)]
.with_mut(|slot| unsafe { slot.write(MaybeUninit::new(item)) });
self.local
.tail
.store(tail.wrapping_add(1), atomic::Ordering::Release);
return;
}
if steal_head == head {
let half = self.local.items.len() as u16 / 2;
let res = self.local.heads.compare_exchange(
heads,
pack_heads(head.wrapping_add(half), head.wrapping_add(half)),
atomic::Ordering::AcqRel,
atomic::Ordering::Acquire,
);
if let Err(new_heads) = res {
heads = new_heads;
continue;
}
for i in 0..half {
let index = head.wrapping_add(i) & self.local.mask;
let item = unsafe {
self.local.items[usize::from(index)]
.with(|slot| slot.read())
.assume_init()
};
let _ = self.shared.0.global_queue.push(item);
}
}
let _ = self.shared.0.global_queue.push(item);
return;
}
}
pub fn pop(&mut self) -> Option<T> {
if let Some(item) = self.lifo_slot.take() {
return Some(item);
}
let tail = self.local_tail();
let res = atomic_u32_fetch_update(
&self.local.heads,
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|heads| {
let (steal_head, head) = unpack_heads(heads);
if head == tail {
None
} else if steal_head == head {
Some(pack_heads(head.wrapping_add(1), head.wrapping_add(1)))
} else {
Some(pack_heads(steal_head, head.wrapping_add(1)))
}
},
);
let heads = match res {
Ok(heads) => {
let (_, head) = unpack_heads(heads);
let i = head & self.local.mask;
return Some(unsafe {
self.local.items[usize::from(i)]
.with(|ptr| ptr.read())
.assume_init()
});
}
Err(heads) => heads,
};
let (steal_head, head) = unpack_heads(heads);
assert_eq!(head, tail);
let space = self.local.items.len() as u16 - head.wrapping_sub(steal_head);
self.shared
.0
.searchers
.fetch_add(1, atomic::Ordering::Relaxed);
struct DecrementSearchers<'a>(&'a AtomicUsize);
impl Drop for DecrementSearchers<'_> {
fn drop(&mut self) {
self.0.fetch_sub(1, atomic::Ordering::Relaxed);
}
}
let _decrement_searchers = DecrementSearchers(&self.shared.0.searchers);
if self
.shared
.0
.stealing_global
.compare_exchange(
false,
true,
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
)
.is_ok()
{
let popped_item = self.shared.0.global_queue.pop();
if popped_item.is_some() {
let steal = cmp::min(self.local.items.len() as u16 / 2, space);
let mut tail = head;
let end_tail = head.wrapping_add(steal);
u32_acquire_fence(&self.local.heads);
while tail != end_tail {
match self.shared.0.global_queue.pop() {
Some(item) => {
let i = tail & self.local.mask;
self.local.items[usize::from(i)]
.with_mut(|slot| unsafe { slot.write(MaybeUninit::new(item)) });
}
None => break,
}
tail = tail.wrapping_add(1);
}
self.local.tail.store(tail, atomic::Ordering::Release);
}
self.shared
.0
.stealing_global
.store(false, atomic::Ordering::Relaxed);
if let Some(popped_item) = popped_item {
return Some(popped_item);
}
}
let queues = self.shared.0.local_queues.len();
let start = self.rng.gen_usize_to(queues);
'sibling_queues: for i in 0..queues {
let mut i = start + i;
if i >= queues {
i -= queues;
}
let queue = &self.shared.0.local_queues[i];
if ptr::eq(queue, &*self.local) {
continue;
}
let mut queue_heads = queue.heads.load(atomic::Ordering::Acquire);
let (old_queue_head, mut queue_head, steal) = loop {
let (queue_steal_head, queue_head) = unpack_heads(queue_heads);
if queue_steal_head != queue_head {
continue 'sibling_queues;
}
let queue_tail = queue.tail.load(atomic::Ordering::Acquire);
let stealable = queue_tail.wrapping_sub(queue_head);
let steal = cmp::min(stealable - stealable / 2, space);
if steal == 0 {
continue 'sibling_queues;
}
let new_queue_head = queue_head.wrapping_add(steal);
let res = queue.heads.compare_exchange_weak(
queue_heads,
pack_heads(queue_head, new_queue_head),
atomic::Ordering::Acquire,
atomic::Ordering::Acquire,
);
match res {
Ok(_) => break (queue_head, new_queue_head, steal),
Err(updated_queue_heads) => queue_heads = updated_queue_heads,
}
};
assert_ne!(steal, 0);
u32_acquire_fence(&self.local.heads);
let first_item = unsafe {
queue.items[usize::from(old_queue_head & queue.mask)]
.with(|slot| slot.read())
.assume_init()
};
for i in 1..steal {
let src = &queue.items[usize::from(old_queue_head.wrapping_add(i) & queue.mask)];
let dst =
&self.local.items[usize::from(head.wrapping_add(i - 1) & self.local.mask)];
src.with(|src| dst.with_mut(|dst| unsafe { src.copy_to_nonoverlapping(dst, 1) }))
}
loop {
let res = queue.heads.compare_exchange_weak(
pack_heads(old_queue_head, queue_head),
pack_heads(queue_head, queue_head),
atomic::Ordering::Release,
atomic::Ordering::Relaxed,
);
match res {
Ok(_) => break,
Err(updated_queue_heads) => {
let (updated_queue_steal_head, update_queue_head) =
unpack_heads(updated_queue_heads);
assert_eq!(updated_queue_steal_head, old_queue_head);
queue_head = update_queue_head;
}
}
}
if steal > 1 {
self.local
.tail
.store(tail.wrapping_add(steal - 1), atomic::Ordering::Release);
}
return Some(first_item);
}
self.shared.0.global_queue.pop()
}
#[must_use]
pub fn searchers(&self) -> usize {
self.shared.searchers()
}
#[must_use]
pub fn global(&self) -> &Queue<T> {
&self.shared
}
}
#[derive(Debug)]
#[must_use = "iterators are lazy and do nothing unless consumed"]
pub struct LocalQueues<'a, T> {
shared: &'a Queue<T>,
index: usize,
hasher: DefaultHasher,
}
impl<T> Iterator for LocalQueues<'_, T> {
type Item = LocalQueue<T>;
fn next(&mut self) -> Option<Self::Item> {
let inner = self.shared.0.local_queues.get(self.index)?;
self.index += 1;
Some(LocalQueue {
lifo_slot: None,
local: unsafe { ValidPtr::new(inner) },
shared: self.shared.clone(),
rng: Rng {
state: {
self.hasher.write_usize(self.index);
self.hasher.finish()
},
},
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl<T> ExactSizeIterator for LocalQueues<'_, T> {
fn len(&self) -> usize {
self.shared.0.local_queues.len() - self.index
}
}
impl<T> FusedIterator for LocalQueues<'_, T> {}
struct ValidPtr<T: ?Sized>(NonNull<T>);
impl<T: ?Sized> ValidPtr<T> {
unsafe fn new(ptr: *const T) -> Self {
Self(NonNull::new_unchecked(ptr as *mut T))
}
}
impl<T: ?Sized> Clone for ValidPtr<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T: ?Sized> Copy for ValidPtr<T> {}
impl<T: ?Sized> Deref for ValidPtr<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { self.0.as_ref() }
}
}
impl<T: ?Sized + Debug> Debug for ValidPtr<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
T::fmt(self, f)
}
}
unsafe impl<T: ?Sized + Sync> Send for ValidPtr<T> {}
unsafe impl<T: ?Sized + Sync> Sync for ValidPtr<T> {}
#[cfg(target_pointer_width = "64")]
type DoubleUsize = u128;
#[cfg(target_pointer_width = "32")]
type DoubleUsize = u64;
#[derive(Debug)]
struct Rng {
state: u64,
}
impl Rng {
fn gen_u64(&mut self) -> u64 {
self.state = self.state.wrapping_add(0xA0761D6478BD642F);
let t = u128::from(self.state) * u128::from(self.state ^ 0xE7037ED1A0B428DB);
(t >> 64) as u64 ^ t as u64
}
fn gen_usize(&mut self) -> usize {
self.gen_u64() as usize
}
fn gen_usize_to(&mut self, to: usize) -> usize {
const USIZE_BITS: usize = mem::size_of::<usize>() * 8;
let mut x = self.gen_usize();
let mut m = ((x as DoubleUsize * to as DoubleUsize) >> USIZE_BITS) as usize;
let mut l = x.wrapping_mul(to);
if l < to {
let t = to.wrapping_neg() % to;
while l < t {
x = self.gen_usize();
m = ((x as DoubleUsize * to as DoubleUsize) >> USIZE_BITS) as usize;
l = x.wrapping_mul(to);
}
}
m
}
}
fn atomic_u32_fetch_update<F>(
atomic: &AtomicU32,
set_order: atomic::Ordering,
fetch_order: atomic::Ordering,
mut f: F,
) -> Result<u32, u32>
where
F: FnMut(u32) -> Option<u32>,
{
let mut prev = atomic.load(fetch_order);
while let Some(next) = f(prev) {
match atomic.compare_exchange_weak(prev, next, set_order, fetch_order) {
Ok(x) => return Ok(x),
Err(next_prev) => prev = next_prev,
}
}
Err(prev)
}
fn u32_acquire_fence(atomic: &AtomicU32) {
if cfg!(tsan) {
atomic.load(atomic::Ordering::Acquire);
} else {
atomic::fence(atomic::Ordering::Acquire);
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use std::collections::HashSet;
#[test]
fn rng() {
let mut rng = Rng { state: 3493858 };
let mut remaining: HashSet<_> = (0..15).collect();
while !remaining.is_empty() {
let value = rng.gen_usize_to(15);
assert!(value < 15, "{} is not less than 15!", value);
remaining.remove(&value);
}
}
#[test]
fn lifo_slot() {
let queue = Queue::new(1, 2);
let mut local = queue.local_queues().next().unwrap();
assert_eq!(local.pop(), None);
assert_eq!(local.pop(), None);
local.push(Box::new(5));
assert_eq!(local.pop(), Some(Box::new(5)));
assert_eq!(local.pop(), None);
}
#[test]
fn push_many() {
let queue = Queue::new(1, 2);
let mut local = queue.local_queues().next().unwrap();
for i in 0..4 {
local.push(Box::new(i));
}
assert_eq!(local.pop(), Some(Box::new(3)));
assert_eq!(local.pop(), Some(Box::new(1)));
assert_eq!(local.pop(), Some(Box::new(0)));
assert_eq!(local.pop(), Some(Box::new(2)));
assert_eq!(local.pop(), None);
}
#[test]
fn wrapping() {
let queue = Queue::new(1, 2);
let mut local = queue.local_queues().next().unwrap();
local.push_yield(Box::new(0));
for i in 0..10 {
local.push_yield(Box::new(i + 1));
assert_eq!(local.pop(), Some(Box::new(i)));
}
assert_eq!(local.pop(), Some(Box::new(10)));
assert_eq!(local.pop(), None);
assert_eq!(local.pop(), None);
}
#[test]
fn steal_global() {
for &size in &[2, 4, 8, 16, 32, 64] {
let queue = Queue::new(4, size);
for i in 0..16 {
queue.push(Box::new(i));
}
let mut local = queue.local_queues().next().unwrap();
for i in 0..16 {
assert_eq!(local.pop(), Some(Box::new(i)));
}
assert_eq!(local.pop(), None);
}
}
#[test]
fn steal_siblings() {
let queue = Queue::new(2, 64);
let mut locals: Vec<_> = queue.local_queues().collect();
locals[0].push_yield(Box::new(4));
locals[0].push_yield(Box::new(5));
locals[1].push(Box::new(1));
locals[1].push(Box::new(0));
queue.push(Box::new(2));
queue.push(Box::new(3));
for i in 0..6 {
assert_eq!(locals[1].pop(), Some(Box::new(i)));
}
}
#[test]
fn many_locals() {
let queue = <Queue<()>>::new(10, 128);
queue.local_queues().for_each(drop);
}
#[test]
fn searchers() {
let queue = Queue::new(2, 64);
let mut locals = queue.local_queues();
let mut local_a = locals.next().unwrap();
let mut local_b = locals.next().unwrap();
assert_eq!(local_a.searchers(), 0);
assert_eq!(local_b.searchers(), 0);
local_a.push(());
local_a.push(());
local_a.pop().unwrap();
local_a.pop().unwrap();
queue.push(());
local_b.pop().unwrap();
assert!(local_b.pop().is_none());
assert_eq!(local_a.searchers(), 0);
assert_eq!(local_b.searchers(), 0);
if cfg!(not(miri)) {
let stop = Arc::new(AtomicBool::new(false));
let handle = std::thread::spawn({
let stop = Arc::clone(&stop);
move || {
while !stop.load(atomic::Ordering::Relaxed) {
local_b.pop();
}
}
});
loop {
let searchers = local_a.searchers();
assert!(searchers < 2);
if searchers == 1 {
break;
}
}
stop.store(true, atomic::Ordering::Relaxed);
handle.join().unwrap();
}
}
#[test]
fn stress() {
let queue = Queue::new(4, 4);
if cfg!(miri) {
for _ in 0..3 {
queue.push(4);
}
} else {
for _ in 0..32 {
queue.push(6);
}
}
let threads: Vec<_> = queue
.local_queues()
.map(|mut queue| {
std::thread::spawn(move || {
while let Some(num) = queue.pop() {
for _ in 0..num {
queue.push(num - 1);
}
}
})
})
.collect();
for thread in threads {
thread.join().unwrap();
}
}
#[test]
fn cobb() {
use std::cell::UnsafeCell;
struct State(Box<[UnsafeCell<LocalQueue<Box<i32>>>]>);
unsafe impl Sync for State {}
cobb::run_test(cobb::TestCfg {
threads: 4,
iterations: if cfg!(miri) { 100 } else { 1000 },
sub_iterations: if cfg!(miri) { 1 } else { 10 },
setup: || {
let queue = Queue::new(4, 4);
State(
queue
.local_queues()
.map(UnsafeCell::new)
.collect::<Box<[_]>>(),
)
},
test: |State(local_queues), tctx| {
let queue = unsafe { &mut *local_queues[tctx.thread_index()].get() };
if tctx.thread_index() < 2 {
queue.push(Box::new(5));
} else {
queue.pop();
}
},
..Default::default()
});
}
}
#[cfg(all(test, loom))]
mod loom_tests {
use super::*;
fn locals<T, const N: usize>(queue: &Queue<T>) -> [LocalQueue<T>; N] {
array_init::from_iter(queue.local_queues()).expect("incorrect number of local queues")
}
#[test]
fn pop_none() {
loom::model(|| {
let queue: Queue<()> = Queue::new(2, 1);
let [mut local_1, mut local_2] = locals(&queue);
loom::thread::spawn(move || assert!(local_1.pop().is_none()));
assert!(local_2.pop().is_none());
});
}
#[test]
fn concurrent_steal_global() {
loom::model(|| {
let queue: Queue<Box<i32>> = Queue::new(2, 1);
let [mut local_1, mut local_2] = locals(&queue);
for i in 0..2 {
queue.push(Box::new(i));
}
loom::thread::spawn(move || {
local_1.pop();
local_1.pop();
});
local_2.pop();
});
}
#[test]
fn concurrent_steal_sibling() {
loom::model(|| {
let queue: Queue<Box<i32>> = Queue::new(3, 1);
let [mut local_1, mut local_2, mut local_3] = locals(&queue);
for i in 0..4 {
local_1.push(Box::new(i));
}
loom::thread::spawn(move || {
local_2.pop();
local_2.pop();
});
local_3.pop();
});
}
#[test]
fn global_spsc() {
loom::model(|| {
let queue: Queue<Box<i32>> = Queue::new(1, 4);
let [mut local] = locals(&queue);
loom::thread::spawn(move || {
for i in 0..6 {
queue.push(Box::new(i));
}
});
for _ in 0..6 {
local.pop();
}
});
}
#[test]
fn sibling_spsc_few() {
loom::model(|| {
let queue: Queue<Box<i32>> = Queue::new(2, 4);
let [mut local_1, mut local_2] = locals(&queue);
loom::thread::spawn(move || {
for i in 0..4 {
local_1.push(Box::new(i));
}
});
for _ in 0..4 {
local_2.pop();
}
});
}
#[test]
fn sibling_spsc_many() {
loom::model(|| {
let queue: Queue<Box<i32>> = Queue::new(2, 4);
let [mut local_1, mut local_2] = locals(&queue);
loom::thread::spawn(move || {
for i in 0..8 {
local_1.push(Box::new(i));
}
});
local_2.pop();
});
}
}