use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};
use crate::exec::Notify;
use crate::util::NotSyncMarker;
#[derive(Debug)]
pub struct Queue<'s, T> {
storage: &'s mut [UnsafeCell<MaybeUninit<T>>],
head: AtomicUsize,
tail: AtomicUsize,
popped: Notify,
pushed: Notify,
}
unsafe impl<T> Sync for Queue<'_, T> where T: Send {}
impl<'s, T> Queue<'s, T> {
pub fn new(storage: &'s mut [MaybeUninit<T>]) -> Self {
let storage: *mut [MaybeUninit<T>] = storage;
let storage: *mut [UnsafeCell<MaybeUninit<T>>] = storage as *mut _;
let storage: &'s mut [UnsafeCell<MaybeUninit<T>>] = unsafe {
&mut *storage
};
Self {
storage,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
pushed: Notify::new(),
popped: Notify::new(),
}
}
pub fn split(&mut self) -> (Pusher<'_, T>, Popper<'_, T>) {
(
Pusher { q: self, _marker: NotSyncMarker::default() },
Popper { q: self, _marker: NotSyncMarker::default() },
)
}
fn next_index(&self, i: usize) -> usize {
let ni = i.wrapping_add(1);
if ni == self.storage.len() { 0 } else { ni }
}
}
impl<T> Drop for Queue<'_, T> {
fn drop(&mut self) {
let h = self.head.load(Ordering::SeqCst);
let mut t = self.tail.load(Ordering::SeqCst);
while h != t {
let unsafecell_ptr = self.storage[t].get();
let maybeuninit = unsafe { &mut *unsafecell_ptr };
unsafe {
maybeuninit.assume_init_drop();
}
t = self.next_index(t);
}
}
}
#[derive(Debug)]
pub struct Pusher<'a, T> {
q: &'a Queue<'a, T>,
_marker: NotSyncMarker,
}
impl<'q, T> Pusher<'q, T> {
pub fn can_push(&self) -> bool {
let h = self.q.head.load(Ordering::Relaxed);
let t = self.q.tail.load(Ordering::Acquire);
self.q.next_index(h) != t
}
pub fn try_reserve(&mut self) -> Option<Entry<'q, T>> {
let h = self.q.head.load(Ordering::Relaxed);
let t = self.q.tail.load(Ordering::Acquire);
let h_next = self.q.next_index(h);
if h_next == t {
return None;
}
let unsafecell = unsafe {
self.q.storage.get_unchecked(h)
};
let unsafecell_ptr = unsafecell.get();
let maybeuninit = unsafe { &mut *unsafecell_ptr };
Some(Entry {
maybeuninit,
head: &self.q.head,
h_next,
pushed: &self.q.pushed,
})
}
pub async fn reserve<'s>(&'s mut self) -> Entry<'s, T> {
self.q.popped.until(|| self.try_reserve()).await
}
pub fn try_push(&mut self, value: T) -> Result<(), T> {
if let Some(entry) = self.try_reserve() {
entry.push(value);
Ok(())
} else {
Err(value)
}
}
}
#[derive(Debug)]
pub struct Popper<'a, T> {
q: &'a Queue<'a, T>,
_marker: NotSyncMarker,
}
impl<T> Popper<'_, T> {
pub fn can_pop(&self) -> bool {
let t = self.q.tail.load(Ordering::Relaxed);
let h = self.q.head.load(Ordering::Acquire);
h != t
}
pub fn try_pop(&mut self) -> Option<T> {
let t = self.q.tail.load(Ordering::Relaxed);
let h = self.q.head.load(Ordering::Acquire);
if h == t {
return None;
}
let t_next = self.q.next_index(t);
let unsafecell = unsafe {
self.q.storage.get_unchecked(t)
};
let unsafecell_ptr = unsafecell.get();
let maybeuninit = unsafe { &mut *unsafecell_ptr };
let result = unsafe { maybeuninit.assume_init_read() };
self.q.tail.store(t_next, Ordering::Release);
self.q.popped.notify();
Some(result)
}
pub async fn pop(&mut self) -> T {
self.q.pushed.until(move || self.try_pop()).await
}
}
#[derive(Debug)]
pub struct Entry<'q, T> {
maybeuninit: &'q mut MaybeUninit<T>,
head: &'q AtomicUsize,
pushed: &'q Notify,
h_next: usize,
}
impl<T> Entry<'_, T> {
pub fn push(self, value: T) {
self.maybeuninit.write(value);
self.head.store(self.h_next, Ordering::Release);
self.pushed.notify();
}
}