sigq 0.13.5

Queue that signals waiting consumers about node availability
Documentation
use std::{
  future::Future,
  mem::ManuallyDrop,
  num::NonZeroUsize,
  ops::{Deref, DerefMut},
  pin::Pin,
  sync::atomic::Ordering,
  sync::Arc,
  task::{Context, Poll}
};

/// The receiving end-point of queue.
#[repr(transparent)]
pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);

use super::StaleErr;

#[derive(Default)]
enum DropAction {
  #[default]
  ReturnToQueue,
  Drop,
  Nothing
}

/// Wrapper around elements that must be handled by the application.
pub struct MustHandle<T> {
  sh: Arc<super::Shared<T>>,
  inner: ManuallyDrop<T>,
  drop_action: DropAction
}

impl<T> MustHandle<T> {
  fn new(sh: Arc<super::Shared<T>>, inner: T) -> Self {
    Self {
      sh,
      inner: ManuallyDrop::new(inner),
      drop_action: DropAction::default()
    }
  }

  /// Mark the inner object has handled and then drop it.
  pub fn handled(mut self) {
    self.drop_action = DropAction::Drop;
  }

  /// Remove the inner object from the `MustHandle` and return it.
  pub fn into_inner(mut self) -> T {
    self.drop_action = DropAction::Nothing;
    unsafe { ManuallyDrop::take(&mut self.inner) }
  }
}

impl<T> Deref for MustHandle<T> {
  type Target = T;

  fn deref(&self) -> &T {
    &self.inner
  }
}

impl<T> DerefMut for MustHandle<T> {
  fn deref_mut(&mut self) -> &mut T {
    &mut self.inner
  }
}

impl<T> Drop for MustHandle<T> {
  fn drop(&mut self) {
    match self.drop_action {
      DropAction::ReturnToQueue => {
        let t = unsafe { ManuallyDrop::take(&mut self.inner) };
        let mut inner = self.sh.inner.lock();
        inner.q.push_front(t);
      }
      DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
      DropAction::Nothing => {}
    }
  }
}


impl<I> Puller<I> {
  /// Pull the oldest node off the queue and return it.
  ///
  /// If no nodes are available on the queue, then block and wait for one to
  /// become available.
  ///
  /// # Errors
  /// `StaleErr` means there are no more items in queue and there are no more
  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn pop(&self) -> Result<I, StaleErr> {
    let mut inner = self.0.inner.lock();
    loop {
      if inner.q.is_empty() && inner.npushers == 0 {
        break Err(StaleErr);
      }
      match inner.q.pop_front() {
        Some(node) => {
          break Ok(node);
        }
        None => {
          self.0.signal.wait(&mut inner);
        }
      }
    }
  }

  /// Take an element off the queue that must be handled by the application, or
  /// it will be returned to the queue.
  ///
  /// # Errors
  /// `StaleErr` means there are no more items in queue and there are no more
  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
  pub fn pop_managed(&self) -> Result<MustHandle<I>, StaleErr> {
    let n = self.pop()?;
    Ok(MustHandle::new(Arc::clone(&self.0), n))
  }

  /// Pull the oldest node off the queue and return it.
  ///
  /// If a node is available on the queue then take it off and return it.
  ///
  /// If no nodes are available and there's at least one associated `Pusher`
  /// exists then return `Ok(None)`.
  ///
  /// # Errors
  /// `StaleErr` is returned if no nodes are available and there are no more
  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
  #[cfg_attr(feature = "inline-more", inline)]
  #[allow(clippy::option_if_let_else)]
  pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
    let mut inner = self.0.inner.lock();
    if let Some(n) = inner.q.pop_front() {
      Ok(Some(n))
    } else if inner.npushers == 0 {
      Err(StaleErr)
    } else {
      Ok(None)
    }
  }

  /// Take an element off the queue that must be handled by the application, or
  /// it will be returned to the queue.
  ///
  /// If a node is available on the queue then take it off and return it.
  ///
  /// If no nodes are available and there's at least one associated `Pusher`
  /// exists then return `Ok(None)`.
  ///
  /// # Errors
  /// `StaleErr` is returned if no nodes are available and there are no more
  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
  pub fn try_pop_managed(&self) -> Result<Option<MustHandle<I>>, StaleErr> {
    Ok(
      self
        .try_pop()?
        .map(|n| MustHandle::new(Arc::clone(&self.0), n))
    )
  }

  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
  /// but rather than block it returns a `Future` to be used to wait for a node
  /// to arrive in an `async` context.
  ///
  /// ```
  /// async fn test() {
  ///   let (tx, rx) = sigq::new();
  ///   tx.push("hello");
  ///   assert_eq!(rx.was_empty(), false);
  ///   let node = rx.apop().await.unwrap();
  ///   assert_eq!(node, "hello");
  ///   assert_eq!(rx.was_empty(), true);
  /// }
  /// ```
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn apop(&self) -> PopFuture<I> {
    PopFuture {
      ctx: Arc::clone(&self.0),
      id: None
    }
  }

  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
  /// but rather than block it returns a `Future` to be used to wait for a node
  /// to arrive in an `async` context.
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn apop_managed(&self) -> PopManagedFuture<I> {
    PopManagedFuture {
      ctx: Arc::clone(&self.0),
      id: None
    }
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }
}

impl<I> Drop for Puller<I> {
  /// Drop a `Puller` instance.
  ///
  /// If this is the last `Puller` end-point of a sigq instance, then the inner
  /// queue will be cleared (i.e. all its elements will be immediately
  /// dropped).
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npullers -= 1;

    // If this is the last puller then remove all thr nodes.
    // The nodes may contain some kind of context that must be notified that
    // the node will never reach its intended destination.
    if inner.npullers == 0 {
      inner.q.clear();
    }
  }
}


#[doc(hidden)]
pub struct PopFuture<I> {
  ctx: Arc<super::Shared<I>>,
  id: Option<NonZeroUsize>
}

impl<I: 'static + Send> Future for PopFuture<I> {
  type Output = Result<I, StaleErr>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.ctx.inner.lock();
    match inner.q.pop_front() {
      Some(node) => Poll::Ready(Ok(node)),
      None => {
        if inner.q.is_empty() && inner.npushers == 0 {
          // No more nodes and no more pushers, so return None
          Poll::Ready(Err(StaleErr))
        } else {
          // Generate a unique identifier for this waker
          let id = loop {
            let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
            // Make sure it is non-zero and unique
            if id == 0 || inner.wakers.contains_key(&id) {
              continue;
            }
            break id;
          };
          inner.wakers.insert(id, ctx.waker().clone());
          drop(inner);
          self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
          Poll::Pending
        }
      }
    }
  }
}

impl<I> Drop for PopFuture<I> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.ctx.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.swap_remove(&id.get());
    }
  }
}


#[doc(hidden)]
pub struct PopManagedFuture<I> {
  ctx: Arc<super::Shared<I>>,
  id: Option<NonZeroUsize>
}

impl<I: 'static + Send> Future for PopManagedFuture<I> {
  type Output = Result<MustHandle<I>, StaleErr>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.ctx.inner.lock();
    match inner.q.pop_front() {
      Some(node) => {
        Poll::Ready(Ok(MustHandle::new(Arc::clone(&self.ctx), node)))
      }
      None => {
        if inner.q.is_empty() && inner.npushers == 0 {
          // No more nodes and no more pushers, so return None
          Poll::Ready(Err(StaleErr))
        } else {
          // Generate a unique identifier for this waker
          let id = loop {
            let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
            // Make sure it is non-zero and unique
            if id == 0 || inner.wakers.contains_key(&id) {
              continue;
            }
            break id;
          };
          inner.wakers.insert(id, ctx.waker().clone());
          drop(inner);
          self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
          Poll::Pending
        }
      }
    }
  }
}

impl<I> Drop for PopManagedFuture<I> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.ctx.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.swap_remove(&id.get());
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :