sigq 0.13.5

Queue that signals waiting consumers about node availability
Documentation
use std::sync::{Arc, Weak};

/// The transmitting end-point of queue.
#[repr(transparent)]
pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>);

use super::StaleErr;

impl<I> Pusher<I> {
  /// Push a node on to the queue and unlock one queue reader, if any.
  ///
  /// If there are any tasks or threads waiting for new nodes to arrive they
  /// will be notified.
  ///
  /// # Errors
  /// `StaleErr` means there are no [`Puller`](super::Puller)'s available
  /// to receive any new nodes.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn push(&self, item: I) -> Result<(), StaleErr> {
    let mut inner = self.0.inner.lock();
    if inner.npullers == 0 {
      Err(StaleErr)
    } else {
      inner.q.push_back(item);
      if let Some((_, n)) = inner.wakers.pop() {
        n.wake();
      }
      self.0.signal.notify_one();
      drop(inner);
      Ok(())
    }
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }

  /// Create a weak reference to this `Pusher`.
  #[must_use]
  pub fn weak(&self) -> WeakPusher<I> {
    WeakPusher(Arc::downgrade(&self.0))
  }
}

impl<I> Clone for Pusher<I> {
  fn clone(&self) -> Self {
    self.0.inner.lock().npushers += 1;
    Self(Arc::clone(&self.0))
  }
}

impl<I> Drop for Pusher<I> {
  /// Drop a `Pusher` instance.
  ///
  /// When the final instance of a sigq's instance's `Pusher` is dropped, wake
  /// up any `Puller`'s waiting for new nodes to arrive.
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npushers -= 1;

    // If this was the last pusher then wake any pullers that are waiting to
    // receive new items.  (When they discover that no pushers remain they will
    // return None).
    if inner.npushers == 0 {
      self.0.signal.notify_all();
      for waker in inner.wakers.drain(..).map(|(_k, v)| v) {
        waker.wake();
      }
    }
  }
}

/// A weak reference to a [`Pusher`].
#[repr(transparent)]
pub struct WeakPusher<I>(pub(crate) Weak<super::Shared<I>>);

impl<I> Clone for WeakPusher<I> {
  fn clone(&self) -> Self {
    Self(Weak::clone(&self.0))
  }
}

impl<I> WeakPusher<I> {
  /// Attempt to upgrade `WeakPusher` to a [`Pusher`].
  ///
  /// Returns `None` is all the strong references have been exhausted.
  #[must_use]
  pub fn upgrade(&self) -> Option<Pusher<I>> {
    self.0.upgrade().map_or_else(
      || None,
      |strong| {
        strong.inner.lock().npushers += 1;
        Some(Pusher(Arc::clone(&strong)))
      }
    )
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :