use std::{
future::Future,
mem::ManuallyDrop,
num::NonZeroUsize,
ops::{Deref, DerefMut},
pin::Pin,
sync::atomic::Ordering,
sync::Arc,
task::{Context, Poll}
};
#[repr(transparent)]
pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);
use super::StaleErr;
#[derive(Default)]
enum DropAction {
#[default]
ReturnToQueue,
Drop,
Nothing
}
pub struct MustHandle<T> {
sh: Arc<super::Shared<T>>,
inner: ManuallyDrop<T>,
drop_action: DropAction
}
impl<T> MustHandle<T> {
fn new(sh: Arc<super::Shared<T>>, inner: T) -> Self {
Self {
sh,
inner: ManuallyDrop::new(inner),
drop_action: DropAction::default()
}
}
pub fn handled(mut self) {
self.drop_action = DropAction::Drop;
}
pub fn into_inner(mut self) -> T {
self.drop_action = DropAction::Nothing;
unsafe { ManuallyDrop::take(&mut self.inner) }
}
}
impl<T> Deref for MustHandle<T> {
type Target = T;
fn deref(&self) -> &T {
&self.inner
}
}
impl<T> DerefMut for MustHandle<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T> Drop for MustHandle<T> {
fn drop(&mut self) {
match self.drop_action {
DropAction::ReturnToQueue => {
let t = unsafe { ManuallyDrop::take(&mut self.inner) };
let mut inner = self.sh.inner.lock();
inner.q.push_front(t);
}
DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
DropAction::Nothing => {}
}
}
}
impl<I> Puller<I> {
#[cfg_attr(feature = "inline-more", inline)]
pub fn pop(&self) -> Result<I, StaleErr> {
let mut inner = self.0.inner.lock();
loop {
if inner.q.is_empty() && inner.npushers == 0 {
break Err(StaleErr);
}
match inner.q.pop_front() {
Some(node) => {
break Ok(node);
}
None => {
self.0.signal.wait(&mut inner);
}
}
}
}
pub fn pop_managed(&self) -> Result<MustHandle<I>, StaleErr> {
let n = self.pop()?;
Ok(MustHandle::new(Arc::clone(&self.0), n))
}
#[cfg_attr(feature = "inline-more", inline)]
#[allow(clippy::option_if_let_else)]
pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
let mut inner = self.0.inner.lock();
if let Some(n) = inner.q.pop_front() {
Ok(Some(n))
} else if inner.npushers == 0 {
Err(StaleErr)
} else {
Ok(None)
}
}
pub fn try_pop_managed(&self) -> Result<Option<MustHandle<I>>, StaleErr> {
Ok(
self
.try_pop()?
.map(|n| MustHandle::new(Arc::clone(&self.0), n))
)
}
#[cfg_attr(feature = "inline-more", inline)]
#[must_use]
pub fn apop(&self) -> PopFuture<I> {
PopFuture {
ctx: Arc::clone(&self.0),
id: None
}
}
#[cfg_attr(feature = "inline-more", inline)]
#[must_use]
pub fn apop_managed(&self) -> PopManagedFuture<I> {
PopManagedFuture {
ctx: Arc::clone(&self.0),
id: None
}
}
#[cfg_attr(feature = "inline-more", inline)]
#[must_use]
pub fn was_empty(&self) -> bool {
let inner = self.0.inner.lock();
inner.q.is_empty()
}
}
impl<I> Drop for Puller<I> {
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.npullers -= 1;
if inner.npullers == 0 {
inner.q.clear();
}
}
}
#[doc(hidden)]
pub struct PopFuture<I> {
ctx: Arc<super::Shared<I>>,
id: Option<NonZeroUsize>
}
impl<I: 'static + Send> Future for PopFuture<I> {
type Output = Result<I, StaleErr>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut inner = self.ctx.inner.lock();
match inner.q.pop_front() {
Some(node) => Poll::Ready(Ok(node)),
None => {
if inner.q.is_empty() && inner.npushers == 0 {
Poll::Ready(Err(StaleErr))
} else {
let id = loop {
let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
if id == 0 || inner.wakers.contains_key(&id) {
continue;
}
break id;
};
inner.wakers.insert(id, ctx.waker().clone());
drop(inner);
self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
Poll::Pending
}
}
}
}
}
impl<I> Drop for PopFuture<I> {
fn drop(&mut self) {
if let Some(id) = self.id {
let mut inner = self.ctx.inner.lock();
let _ = inner.wakers.swap_remove(&id.get());
}
}
}
#[doc(hidden)]
pub struct PopManagedFuture<I> {
ctx: Arc<super::Shared<I>>,
id: Option<NonZeroUsize>
}
impl<I: 'static + Send> Future for PopManagedFuture<I> {
type Output = Result<MustHandle<I>, StaleErr>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut inner = self.ctx.inner.lock();
match inner.q.pop_front() {
Some(node) => {
Poll::Ready(Ok(MustHandle::new(Arc::clone(&self.ctx), node)))
}
None => {
if inner.q.is_empty() && inner.npushers == 0 {
Poll::Ready(Err(StaleErr))
} else {
let id = loop {
let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
if id == 0 || inner.wakers.contains_key(&id) {
continue;
}
break id;
};
inner.wakers.insert(id, ctx.waker().clone());
drop(inner);
self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
Poll::Pending
}
}
}
}
}
impl<I> Drop for PopManagedFuture<I> {
fn drop(&mut self) {
if let Some(id) = self.id {
let mut inner = self.ctx.inner.lock();
let _ = inner.wakers.swap_remove(&id.get());
}
}
}