use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::task::{Header, Task};
use std::marker::PhantomData;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;
pub(super) struct Queue<T: 'static> {
pointers: Mutex<Pointers>,
len: AtomicUsize,
_p: PhantomData<T>,
}
struct Pointers {
head: *const Header,
tail: *const Header,
}
const CLOSED: usize = 1;
const MAX_LEN: usize = usize::MAX >> 1;
impl<T: 'static> Queue<T> {
pub(super) fn new() -> Queue<T> {
Queue {
pointers: Mutex::new(Pointers {
head: ptr::null(),
tail: ptr::null(),
}),
len: AtomicUsize::new(0),
_p: PhantomData,
}
}
pub(super) fn is_empty(&self) -> bool {
self.len() == 0
}
pub(super) fn is_closed(&self) -> bool {
self.len.load(Acquire) & CLOSED == CLOSED
}
pub(super) fn close(&self) -> bool {
let p = self.pointers.lock().unwrap();
let len = unsafe {
self.len.unsync_load()
};
let ret = len & CLOSED == 0;
self.len.store(len | CLOSED, Release);
drop(p);
ret
}
fn len(&self) -> usize {
self.len.load(Acquire) >> 1
}
pub(super) fn wait_for_unlocked(&self) {
drop(self.pointers.lock().unwrap());
}
pub(super) fn push<F>(&self, task: Task<T>, f: F)
where
F: FnOnce(Result<(), Task<T>>),
{
unsafe {
let mut p = self.pointers.lock().unwrap();
let len = self.len.unsync_load();
if len & CLOSED == CLOSED {
drop(p);
f(Err(task));
return;
}
let task = task.into_raw();
debug_assert!(get_next(task).is_null());
if let Some(tail) = NonNull::new(p.tail as *mut _) {
set_next(tail, task.as_ptr());
} else {
p.head = task.as_ptr();
}
p.tail = task.as_ptr();
if (len >> 1) == MAX_LEN {
eprintln!("[ERROR] overflowed task counter. This is a bug and should be reported.");
std::process::abort();
}
self.len.store(len + 2, Release);
f(Ok(()));
drop(p);
}
}
pub(super) fn push_batch(&self, batch_head: Task<T>, batch_tail: Task<T>, num: usize) {
unsafe {
let batch_head = batch_head.into_raw().as_ptr();
let batch_tail = batch_tail.into_raw();
debug_assert!(get_next(batch_tail).is_null());
let mut p = self.pointers.lock().unwrap();
if let Some(tail) = NonNull::new(p.tail as *mut _) {
set_next(tail, batch_head);
} else {
p.head = batch_head;
}
p.tail = batch_tail.as_ptr();
let len = self.len.unsync_load();
if (len >> 1) >= (MAX_LEN - num) {
std::process::abort();
}
self.len.store(len + (num << 1), Release);
drop(p);
}
}
pub(super) fn pop(&self) -> Option<Task<T>> {
if self.is_empty() {
return None;
}
unsafe {
let mut p = self.pointers.lock().unwrap();
let task = NonNull::new(p.head as *mut _)?;
p.head = get_next(task);
if p.head.is_null() {
p.tail = ptr::null();
}
set_next(task, ptr::null());
self.len.store(self.len.unsync_load() - 2, Release);
drop(p);
Some(Task::from_raw(task))
}
}
}
unsafe fn get_next(meta: NonNull<Header>) -> *const Header {
*meta.as_ref().queue_next.get()
}
unsafe fn set_next(meta: NonNull<Header>, val: *const Header) {
*meta.as_ref().queue_next.get() = val;
}