use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread::{Overflow, Stats};
use crate::runtime::task;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
cfg_has_atomic_u64! {
type UnsignedShort = u32;
type UnsignedLong = u64;
type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
}
cfg_not_has_atomic_u64! {
type UnsignedShort = u16;
type UnsignedLong = u32;
type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
}
pub(crate) struct Local<T: 'static> {
inner: Arc<Inner<T>>,
}
pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
pub(crate) struct Inner<T: 'static> {
head: AtomicUnsignedLong,
tail: AtomicUnsignedShort,
lifo: task::AtomicNotified<T>,
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}
unsafe impl<T> Send for Inner<T> {}
unsafe impl<T> Sync for Inner<T> {}
#[cfg(not(loom))]
const LOCAL_QUEUE_CAPACITY: usize = 256;
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 4;
const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> {
assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY);
unsafe { Box::from_raw(Box::into_raw(buffer).cast()) }
}
pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
for _ in 0..LOCAL_QUEUE_CAPACITY {
buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let inner = Arc::new(Inner {
head: AtomicUnsignedLong::new(0),
tail: AtomicUnsignedShort::new(0),
lifo: task::AtomicNotified::empty(),
buffer: make_fixed_size(buffer.into_boxed_slice()),
});
let local = Local {
inner: inner.clone(),
};
let remote = Steal(inner);
(remote, local)
}
impl<T> Local<T> {
pub(crate) fn len(&self) -> usize {
let (_, head) = unpack(self.inner.head.load(Acquire));
let lifo = self.inner.lifo.is_some() as usize;
let tail = unsafe { self.inner.tail.unsync_load() };
len(head, tail) + lifo
}
pub(crate) fn remaining_slots(&self) -> usize {
let (steal, _) = unpack(self.inner.head.load(Acquire));
let tail = unsafe { self.inner.tail.unsync_load() };
LOCAL_QUEUE_CAPACITY - len(steal, tail)
}
pub(crate) fn max_capacity(&self) -> usize {
LOCAL_QUEUE_CAPACITY
}
pub(crate) fn has_tasks(&self) -> bool {
self.len() != 0
}
pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
let len = tasks.len();
assert!(len <= LOCAL_QUEUE_CAPACITY);
if len == 0 {
return;
}
let head = self.inner.head.load(Acquire);
let (steal, _) = unpack(head);
let mut tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort {
} else {
panic!()
}
for task in tasks {
let idx = tail as usize & MASK;
self.inner.buffer[idx].with_mut(|ptr| {
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
tail = tail.wrapping_add(1);
}
self.inner.tail.store(tail, Release);
}
pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
&mut self,
mut task: task::Notified<T>,
overflow: &O,
stats: &mut Stats,
) {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
let tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort {
break tail;
} else if steal != real {
overflow.push(task);
return;
} else {
match self.push_overflow(task, real, tail, overflow, stats) {
Ok(_) => return,
Err(v) => {
task = v;
}
}
}
};
self.push_back_finish(task, tail);
}
fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
let idx = tail as usize & MASK;
self.inner.buffer[idx].with_mut(|ptr| {
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
self.inner.tail.store(tail.wrapping_add(1), Release);
}
#[inline(never)]
fn push_overflow<O: Overflow<T>>(
&mut self,
task: task::Notified<T>,
head: UnsignedShort,
tail: UnsignedShort,
overflow: &O,
stats: &mut Stats,
) -> Result<(), task::Notified<T>> {
const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort;
assert_eq!(
tail.wrapping_sub(head) as usize,
LOCAL_QUEUE_CAPACITY,
"queue is not full; tail = {tail}; head = {head}"
);
if self
.inner
.head
.compare_exchange_weak(pack(head, head), pack(tail, tail), Release, Relaxed)
.is_err()
{
return Err(task);
}
self.inner
.tail
.store(tail.wrapping_add(NUM_TASKS_TAKEN), Release);
struct BatchTaskIter<'a, T: 'static> {
buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY],
head: UnsignedLong,
i: UnsignedLong,
}
impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
type Item = task::Notified<T>;
#[inline]
fn next(&mut self) -> Option<task::Notified<T>> {
if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) {
None
} else {
let i_idx = self.i.wrapping_add(self.head) as usize & MASK;
let slot = &self.buffer[i_idx];
let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
self.i += 1;
Some(task)
}
}
}
let batch_iter = BatchTaskIter {
buffer: &self.inner.buffer,
head: head.wrapping_add(NUM_TASKS_TAKEN) as UnsignedLong,
i: 0,
};
overflow.push_batch(batch_iter.chain(std::iter::once(task)));
stats.incr_overflow_count();
Ok(())
}
pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
let mut head = self.inner.head.load(Acquire);
let idx = loop {
let (steal, real) = unpack(head);
let tail = unsafe { self.inner.tail.unsync_load() };
if real == tail {
return None;
}
let next_real = real.wrapping_add(1);
let next = if steal == real {
pack(next_real, next_real)
} else {
assert_ne!(steal, next_real);
pack(steal, next_real)
};
let res = self
.inner
.head
.compare_exchange_weak(head, next, AcqRel, Acquire);
match res {
Ok(_) => break real as usize & MASK,
Err(actual) => head = actual,
}
};
Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
}
pub(crate) fn push_lifo(&self, task: task::Notified<T>) -> Option<task::Notified<T>> {
self.inner.lifo.swap(Some(task))
}
pub(crate) fn pop_lifo(&self) -> Option<task::Notified<T>> {
self.inner.lifo.take()
}
}
impl<T> Steal<T> {
pub(crate) fn len(&self) -> usize {
let (_, head) = unpack(self.0.head.load(Acquire));
let tail = self.0.tail.load(Acquire);
let lifo = self.0.lifo.is_some() as usize;
len(head, tail) + lifo
}
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
pub(crate) fn steal_into(
&self,
dst: &mut Local<T>,
dst_stats: &mut Stats,
) -> Option<task::Notified<T>> {
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
let (steal, _) = unpack(dst.inner.head.load(Acquire));
if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 {
return None;
}
let mut n = self.steal_into2(dst, dst_tail);
if n == 0 {
let lifo = self.0.lifo.take();
if lifo.is_some() {
dst_stats.incr_steal_count(1);
dst_stats.incr_steal_operations();
}
return lifo;
}
dst_stats.incr_steal_count(n as u16);
dst_stats.incr_steal_operations();
n -= 1;
let ret_pos = dst_tail.wrapping_add(n);
let ret_idx = ret_pos as usize & MASK;
let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
if n == 0 {
return Some(ret);
}
dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
Some(ret)
}
fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
let mut prev_packed = self.0.head.load(Acquire);
let mut next_packed;
let n = loop {
let (src_head_steal, src_head_real) = unpack(prev_packed);
let src_tail = self.0.tail.load(Acquire);
if src_head_steal != src_head_real {
return 0;
}
let n = src_tail.wrapping_sub(src_head_real);
let n = n - n / 2;
if n == 0 {
return 0;
}
let steal_to = src_head_real.wrapping_add(n);
assert_ne!(src_head_steal, steal_to);
next_packed = pack(src_head_steal, steal_to);
let res = self
.0
.head
.compare_exchange_weak(prev_packed, next_packed, AcqRel, Acquire);
match res {
Ok(_) => break n,
Err(actual) => prev_packed = actual,
}
};
assert!(
n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2,
"actual = {n}"
);
let (first, _) = unpack(next_packed);
for i in 0..n {
let src_pos = first.wrapping_add(i);
let dst_pos = dst_tail.wrapping_add(i);
let src_idx = src_pos as usize & MASK;
let dst_idx = dst_pos as usize & MASK;
let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
dst.inner.buffer[dst_idx]
.with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
}
let mut prev_packed = next_packed;
loop {
let head = unpack(prev_packed).1;
next_packed = pack(head, head);
let res = self
.0
.head
.compare_exchange_weak(prev_packed, next_packed, AcqRel, Acquire);
match res {
Ok(_) => return n,
Err(actual) => prev_packed = actual,
}
}
}
}
impl<T> Clone for Steal<T> {
fn clone(&self) -> Steal<T> {
Steal(self.0.clone())
}
}
impl<T> Drop for Local<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
assert!(self.pop_lifo().is_none(), "LIFO slot not empty");
}
}
}
fn len(head: UnsignedShort, tail: UnsignedShort) -> usize {
tail.wrapping_sub(head) as usize
}
fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
let real = n & UnsignedShort::MAX as UnsignedLong;
let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
(steal as UnsignedShort, real as UnsignedShort)
}
fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
(real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
}
#[test]
fn test_local_queue_capacity() {
assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize);
}