sigq 0.13.5

Queue that signals waiting consumers about node availability
Documentation
//! _sigq_ is a FIFO queue that supports pushing and poping nodes from
//! threads/tasks, crossing sync/async boundaries.  The interface to interact
//! with the queue is a pair of end-points.  The [`Pusher`] is used to add data
//! to the queue, and the [`Puller`] is used to pull data off the queue.
//!
//! The `Pusher` has a [`push()`](Pusher::push) method that is used to push new
//! nodes onto the queue.
//!
//! The `Puller` has a blocking [`pop()`](Puller::pop) and a
//! [`apop()`](Puller::apop) that returns a `Future` for getting the next node
//! off the queue.  These will return immediately with the next node if
//! available, or block and wait for a new node to be pushed onto the queue.
//! [`try_pop()`](Puller::try_pop) can be used as a non-blocking way to get the
//! next node, if available.
//!
//! ```
//! let (pusher, puller) = sigq::new();
//! pusher.push(42).unwrap();
//! assert_eq!(puller.pop(), Ok(42));
//! assert_eq!(puller.try_pop(), Ok(None));
//! ```
//!
//! # Semantics
//! - Dropping the last `Pusher` end-point will cause waiting `Puller`'s to
//!   wake up and return `Err(StaleErr)` if there are no more nodes on the
//!   queue.
//! - Dropping the last `Puller` end-point will:
//!   - Immediately drop all the nodes in the queue.
//!   - Cause the `Puller`'s to return `Err(StaleErr)` if new nodes are
//!     attempted to be added to the queue.

pub(crate) mod pull;
pub(crate) mod push;

use std::{
  collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;

pub use pull::{MustHandle, Puller};
pub use push::{Pusher, WeakPusher};

/// Error value used to indicate that there are no remote end-points available.
///
/// If a `Puller` method returns this it means the queue has no more associated
/// `Pusher`'s, which implies that no new nodes can become available.
///
/// If a `Pusher` method returns this it means that the queue has no more
/// associated `Puller`'s, which implies that there's nothing to take nodes off
/// the queue any longer.
#[derive(Debug, PartialEq, Eq)]
pub struct StaleErr;

/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
  q: VecDeque<I>,
  npushers: usize,
  npullers: usize,
  wakers: IndexMap<usize, Waker>
}

/// Inner shared data.
struct Shared<I> {
  signal: Condvar,
  inner: Mutex<Inner<I>>,
  idgen: AtomicUsize
}

/// Create a new queue and return its paired push and pull objects.
#[must_use]
pub fn new<T>() -> (Pusher<T>, Puller<T>) {
  let inner = Inner {
    q: VecDeque::new(),
    npushers: 1,
    npullers: 1,
    wakers: IndexMap::new()
  };
  let shared = Shared {
    signal: Condvar::new(),
    inner: Mutex::new(inner),
    idgen: AtomicUsize::new(1)
  };
  let shared = Arc::new(shared);

  let pusher = Pusher(Arc::clone(&shared));
  let puller = Puller(shared);

  (pusher, puller)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :