use crate::{AllocError, InnerAtomicFlag, FALSE, TRUE};
use core::fmt::Debug;
use core::{
alloc::Layout,
iter::FusedIterator,
ptr::NonNull,
sync::atomic::{AtomicPtr, Ordering},
};
#[cfg(feature = "alloc_api")]
use {alloc::alloc::Global, core::alloc::*};
macro_rules! impl_all {
(impl $(@$tr:path =>)? $target:ident {
$($t:tt)*
}) => {
cfg_if::cfg_if! {
if #[cfg(feature = "alloc_api")] {
impl<T, A: Allocator> $($tr for)? $target <T, A> {
$($t)*
}
} else {
impl<T> $($tr for)? $target <T> {
$($t)*
}
}
}
};
}
struct PrevCell<T> {
init: InnerAtomicFlag,
prev: AtomicPtr<FillQueueNode<T>>,
}
impl<T> PrevCell<T> {
#[inline]
pub const fn new() -> Self {
return Self {
init: InnerAtomicFlag::new(FALSE),
prev: AtomicPtr::new(core::ptr::null_mut()),
};
}
#[inline]
pub fn set(&self, prev: *mut FillQueueNode<T>) {
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
assert!(self.prev.swap(prev, Ordering::AcqRel).is_null());
self.init.store(TRUE, Ordering::Release);
} else {
self.prev.store(prev, Ordering::Release);
self.init.store(TRUE, Ordering::Release);
}
}
}
#[inline]
pub fn set_mut(&mut self, prev: *mut FillQueueNode<T>) {
let this_prev = self.prev.get_mut();
debug_assert!(this_prev.is_null());
*this_prev = prev;
*self.init.get_mut() = TRUE;
}
pub fn get(&self) -> *mut FillQueueNode<T> {
while self.init.load(Ordering::Acquire) == FALSE {
core::hint::spin_loop()
}
return self.prev.swap(core::ptr::null_mut(), Ordering::Acquire);
}
}
struct FillQueueNode<T> {
prev: PrevCell<T>,
v: T,
}
#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
pub struct FillQueue<T, #[cfg(feature = "alloc_api")] A: Allocator = Global> {
head: AtomicPtr<FillQueueNode<T>>,
#[cfg(feature = "alloc_api")]
alloc: A,
}
impl<T> FillQueue<T> {
#[inline]
pub const fn new() -> Self {
Self {
head: AtomicPtr::new(core::ptr::null_mut()),
#[cfg(feature = "alloc_api")]
alloc: Global,
}
}
}
#[docfg::docfg(feature = "alloc_api")]
impl<T, A: Allocator> FillQueue<T, A> {
#[inline]
pub const fn new_in(alloc: A) -> Self {
Self {
head: AtomicPtr::new(core::ptr::null_mut()),
alloc,
}
}
#[inline]
pub fn allocator(&self) -> &A {
&self.alloc
}
}
impl_all! {
impl FillQueue {
#[inline]
pub fn is_empty (&self) -> bool {
self.head.load(Ordering::Relaxed).is_null()
}
#[inline]
pub fn push (&self, v: T) {
self.try_push(v).unwrap()
}
#[inline]
pub fn push_mut (&mut self, v: T) {
self.try_push_mut(v).unwrap()
}
pub fn try_push (&self, v: T) -> Result<(), AllocError> {
let node = FillQueueNode {
prev: PrevCell::new(),
v
};
let layout = Layout::new::<FillQueueNode<T>>();
#[cfg(feature = "alloc_api")]
let ptr = self.alloc.allocate(layout)?.cast::<FillQueueNode<T>>();
#[cfg(not(feature = "alloc_api"))]
let ptr = match unsafe { NonNull::new(alloc::alloc::alloc(layout)) } {
Some(x) => x.cast::<FillQueueNode<T>>(),
None => return Err(AllocError)
};
unsafe {
ptr.as_ptr().write(node)
}
let prev = self.head.swap(ptr.as_ptr(), Ordering::AcqRel);
unsafe {
let rf = &*ptr.as_ptr();
rf.prev.set(prev);
}
Ok(())
}
pub fn try_push_mut (&mut self, v: T) -> Result<(), AllocError> {
let node = FillQueueNode {
prev: PrevCell::new(),
v
};
let layout = Layout::new::<FillQueueNode<T>>();
#[cfg(feature = "alloc_api")]
let mut ptr = self.alloc.allocate(layout)?.cast::<FillQueueNode<T>>();
#[cfg(not(feature = "alloc_api"))]
let mut ptr = match unsafe { NonNull::new(alloc::alloc::alloc(layout)) } {
Some(x) => x.cast::<FillQueueNode<T>>(),
None => return Err(AllocError)
};
unsafe {
ptr.as_ptr().write(node);
let prev = core::ptr::replace(self.head.get_mut(), ptr.as_ptr());
ptr.as_mut().prev.set_mut(prev);
Ok(())
}
}
}
}
#[cfg(feature = "alloc_api")]
impl<T, A: Allocator> FillQueue<T, A> {
#[inline]
pub fn chop(&self) -> ChopIter<T, A>
where
A: Clone,
{
let ptr = self.head.swap(core::ptr::null_mut(), Ordering::AcqRel);
ChopIter {
ptr: NonNull::new(ptr),
alloc: self.alloc.clone(),
}
}
#[inline]
pub fn chop_mut(&mut self) -> ChopIter<T, A>
where
A: Clone,
{
let ptr = unsafe { core::ptr::replace(self.head.get_mut(), core::ptr::null_mut()) };
ChopIter {
ptr: NonNull::new(ptr),
alloc: self.alloc.clone(),
}
}
}
#[cfg(not(feature = "alloc_api"))]
impl<T> FillQueue<T> {
#[inline]
pub fn chop(&self) -> ChopIter<T> {
let ptr = self.head.swap(core::ptr::null_mut(), Ordering::AcqRel);
ChopIter {
ptr: NonNull::new(ptr),
}
}
#[inline]
pub fn chop_mut(&mut self) -> ChopIter<T> {
let ptr = unsafe { core::ptr::replace(self.head.get_mut(), core::ptr::null_mut()) };
ChopIter {
ptr: NonNull::new(ptr),
}
}
}
cfg_if::cfg_if! {
if #[cfg(feature = "alloc_api")] {
unsafe impl<T: Send, A: Send + Allocator> Send for FillQueue<T, A> {}
unsafe impl<T: Sync, A: Sync + Allocator> Sync for FillQueue<T, A> {}
unsafe impl<T: Send, A: Send + Allocator> Send for ChopIter<T, A> {}
unsafe impl<T: Sync, A: Sync + Allocator> Sync for ChopIter<T, A> {}
} else {
unsafe impl<T: Send> Send for FillQueue<T> {}
unsafe impl<T: Sync> Sync for FillQueue<T> {}
unsafe impl<T: Send> Send for ChopIter<T> {}
unsafe impl<T: Sync> Sync for ChopIter<T> {}
}
}
pub struct ChopIter<T, #[cfg(feature = "alloc_api")] A: Allocator = Global> {
ptr: Option<NonNull<FillQueueNode<T>>>,
#[cfg(feature = "alloc_api")]
alloc: A,
}
impl_all! {
impl @Iterator => ChopIter {
type Item = T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some(ptr) = self.ptr {
unsafe {
let node = &*ptr.as_ptr();
let value = core::ptr::read(&node.v);
self.ptr = NonNull::new(node.prev.get());
#[cfg(feature = "alloc_api")]
self.alloc.deallocate(ptr.cast(), Layout::new::<FillQueueNode<T>>());
#[cfg(not(feature = "alloc_api"))]
alloc::alloc::dealloc(ptr.as_ptr().cast(), Layout::new::<FillQueueNode<T>>());
return Some(value)
}
}
None
}
}
}
impl_all! {
impl @Drop => ChopIter {
#[inline]
fn drop(&mut self) {
self.for_each(core::mem::drop)
}
}
}
impl_all! {
impl @FusedIterator => ChopIter {}
}
#[cfg(feature = "alloc_api")]
impl<T, A: Debug + Allocator> Debug for FillQueue<T, A> {
#[inline]
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("FillQueue")
.field("alloc", &self.alloc)
.finish_non_exhaustive()
}
}
#[cfg(not(feature = "alloc_api"))]
impl<T> Debug for FillQueue<T> {
#[inline]
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
f.debug_struct("FillQueue").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::FillQueue;
#[test]
fn test_basic_functionality() {
let mut fill_queue = FillQueue::new();
assert!(fill_queue.is_empty());
fill_queue.push(1);
fill_queue.push(2);
fill_queue.push(3);
assert!(!fill_queue.is_empty());
let mut chop_iter = fill_queue.chop_mut();
assert_eq!(chop_iter.next(), Some(3));
assert_eq!(chop_iter.next(), Some(2));
assert_eq!(chop_iter.next(), Some(1));
assert_eq!(chop_iter.next(), None);
fill_queue.push_mut(1);
fill_queue.push_mut(2);
fill_queue.push_mut(3);
let mut chop_iter = fill_queue.chop();
assert_eq!(chop_iter.next(), Some(3));
assert_eq!(chop_iter.next(), Some(2));
assert_eq!(chop_iter.next(), Some(1));
assert_eq!(chop_iter.next(), None);
assert!(fill_queue.is_empty());
}
#[cfg(feature = "std")]
#[test]
fn test_concurrent_fill_queue() {
use core::sync::atomic::{AtomicUsize, Ordering};
let fill_queue = FillQueue::new();
let mut count = AtomicUsize::new(0);
std::thread::scope(|s| {
for _ in 0..10 {
s.spawn(|| {
for i in 1..=10 {
fill_queue.push(i);
}
count.fetch_add(fill_queue.chop().count(), Ordering::Relaxed);
});
}
});
assert_eq!(*count.get_mut(), 100);
}
}