use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::runtime::task;
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{Acquire, Release};
pub(crate) struct Inject<T: 'static> {
pointers: Mutex<Pointers>,
len: AtomicUsize,
_p: PhantomData<T>,
}
struct Pointers {
is_closed: bool,
head: Option<NonNull<task::Header>>,
tail: Option<NonNull<task::Header>>,
}
unsafe impl<T> Send for Inject<T> {}
unsafe impl<T> Sync for Inject<T> {}
impl<T: 'static> Inject<T> {
pub(crate) fn new() -> Inject<T> {
Inject {
pointers: Mutex::new(Pointers {
is_closed: false,
head: None,
tail: None,
}),
len: AtomicUsize::new(0),
_p: PhantomData,
}
}
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
pub(crate) fn close(&self) -> bool {
let mut p = self.pointers.lock();
if p.is_closed {
return false;
}
p.is_closed = true;
true
}
pub(crate) fn is_closed(&self) -> bool {
self.pointers.lock().is_closed
}
pub(crate) fn len(&self) -> usize {
self.len.load(Acquire)
}
pub(crate) fn push(&self, task: task::Notified<T>) {
let mut p = self.pointers.lock();
if p.is_closed {
return;
}
let len = unsafe { self.len.unsync_load() };
let task = task.into_raw();
debug_assert!(get_next(task).is_none());
if let Some(tail) = p.tail {
set_next(tail, Some(task));
} else {
p.head = Some(task);
}
p.tail = Some(task);
self.len.store(len + 1, Release);
}
#[inline]
pub(crate) fn push_batch<I>(&self, mut iter: I)
where
I: Iterator<Item = task::Notified<T>>,
{
let first = match iter.next() {
Some(first) => first.into_raw(),
None => return,
};
let mut prev = first;
let mut counter = 1;
iter.for_each(|next| {
let next = next.into_raw();
set_next(prev, Some(next));
prev = next;
counter += 1;
});
self.push_batch_inner(first, prev, counter);
}
#[inline]
fn push_batch_inner(
&self,
batch_head: NonNull<task::Header>,
batch_tail: NonNull<task::Header>,
num: usize,
) {
debug_assert!(get_next(batch_tail).is_none());
let mut p = self.pointers.lock();
if let Some(tail) = p.tail {
set_next(tail, Some(batch_head));
} else {
p.head = Some(batch_head);
}
p.tail = Some(batch_tail);
let len = unsafe { self.len.unsync_load() };
self.len.store(len + num, Release);
}
pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
if self.is_empty() {
return None;
}
let mut p = self.pointers.lock();
let task = p.head?;
p.head = get_next(task);
if p.head.is_none() {
p.tail = None;
}
set_next(task, None);
self.len
.store(unsafe { self.len.unsync_load() } - 1, Release);
Some(unsafe { task::Notified::from_raw(task) })
}
}
impl<T: 'static> Drop for Inject<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
}
}
}
fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
}
fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
unsafe {
header.as_ref().set_next(val);
}
}