bndpresbufch 0.1.2

Bounds-preserving channel for passing buffers.
Documentation
//! Bounds-preserving, optionally explicitly limited/lossy, buffer channel.
//!
//! ```
//! use bndpresbufch::{Builder, Error};
//!
//! // Create a buffer queue that can hold at most two buffers, with a total of
//! // 4 bytes of data.
//! let (tx, rx) = Builder::new()
//!   .max_len(2)
//!   .max_size(4)
//!   .build();
//!
//! // Fail to push a single buffer that is larger than maximum allowed total
//! // queue size.
//! assert_eq!(tx.force_push(vec![1, 2, 3, 4, 5]),
//!     Err(Error::WontFit(vec![1, 2, 3, 4, 5])));
//!
//! // Fill up queue
//! tx.try_push(vec![1, 2]).unwrap();
//! tx.force_push(vec![3, 4]).unwrap();
//!
//! // Fail to add more data
//! assert_eq!(tx.try_push(vec![5]), Err(Error::WontFit(vec![5])));
//!
//! // Force push data to the queue, ejecting the oldest buffer
//! tx.force_push(vec![6]).unwrap();
//!
//! // Pull off a buffer that must be handled.
//! // Then drop the managed node before marking it has handled, which should
//! // put it back onto the channel.
//! let n = rx.pop_managed().unwrap();
//! assert_eq!(*n, [3, 4]);
//! drop(n);
//!
//! assert_eq!(rx.pop(), Some(vec![3, 4]));
//! assert_eq!(rx.try_pop(), Ok(Some(vec![6])));
//! assert_eq!(rx.try_pop(), Ok(None));
//! ```

mod err;
mod rx;
mod tx;

use std::{sync::Arc, task::Waker};

use parking_lot::{Condvar, Mutex, MutexGuard};

use rustc_hash::FxHashMap;

use bndpresbufq::BndPresLimBufQ;

pub use {
  err::Error,
  rx::{MustHandle, Receiver},
  tx::Sender
};

/// Builder for a bounds-preserving buffer channel.
#[derive(Default)]
pub struct Builder {
  max_len: Option<usize>,
  max_size: Option<usize>
}

impl Builder {
  /// Create new builder for generating a bounds-preserving buffers channel.
  #[must_use]
  pub fn new() -> Self {
    Self::default()
  }

  /// Set a maximum buffer queue length.
  ///
  /// # Panics
  /// `n` must be non-zero.
  #[must_use]
  pub fn max_len(mut self, n: usize) -> Self {
    self.max_len_r(n);
    self
  }

  /// Set a maximum buffer queue length.
  ///
  /// # Panics
  /// `n` must be non-zero.
  pub fn max_len_r(&mut self, n: usize) -> &mut Self {
    assert!(n != 0);
    self.max_len = Some(n);
    self
  }

  /// Set a maximum buffer queue size.
  ///
  /// # Panics
  /// `n` must be non-zero.
  #[must_use]
  pub fn max_size(mut self, n: usize) -> Self {
    self.max_size_r(n);
    self
  }

  /// Set a maximum buffer queue size.
  ///
  /// # Panics
  /// `n` must be non-zero.
  pub fn max_size_r(&mut self, n: usize) -> &mut Self {
    assert!(n != 0);
    self.max_size = Some(n);
    self
  }

  /// Construct channel end-points.
  ///
  /// # Panics
  /// Maximum length and maximum size must not be zero.
  #[must_use]
  pub fn build(self) -> (Sender, Receiver) {
    assert!(!matches!(self.max_len, Some(0)));
    assert!(!matches!(self.max_size, Some(0)));

    let sh = Shared::new(self.max_len, self.max_size);
    let sh = Arc::new(sh);

    let tx = Sender(Arc::clone(&sh));
    let rx = Receiver(sh);

    (tx, rx)
  }
}


struct Inner {
  q: BndPresLimBufQ,
  tx_count: usize,
  rx_count: usize,
  tx_wakers: FxHashMap<u32, Waker>,
  rx_wakers: FxHashMap<u32, Waker>,
  idgen: u32
}

impl Inner {
  fn new(max_len: Option<usize>, max_size: Option<usize>) -> Self {
    Self {
      q: BndPresLimBufQ::new(max_len, max_size),
      tx_count: 1,
      rx_count: 1,
      tx_wakers: FxHashMap::default(),
      rx_wakers: FxHashMap::default(),
      idgen: 0
    }
  }

  /// Push buffer onto queue.
  ///
  /// The caller must ensure that pushing `buf` onto the queue would not exceed
  /// any configured queue size of length constraints.
  #[inline]
  fn try_push(&mut self, buf: Vec<u8>) -> Result<(), Vec<u8>> {
    self.q.try_push(buf)
  }

  #[inline]
  fn force_push(&mut self, buf: Vec<u8>) -> Result<(), Vec<u8>> {
    self.q.force_push(buf)
  }

  #[inline]
  fn pop(&mut self) -> Option<Vec<u8>> {
    self.q.pop()
  }
}


struct Shared {
  inner: Mutex<Inner>,
  signal: Condvar
}

impl Shared {
  #[inline]
  fn lock_inner(&self) -> MutexGuard<'_, Inner> {
    self.inner.lock()
  }

  /// Wake up threads/tasks waiting to push buffers onto the queue.
  #[inline]
  fn wake_senders(&self, inner: &mut Inner) {
    self.signal.notify_all();
    for (_id, waker) in inner.tx_wakers.drain() {
      waker.wake();
    }
  }

  /// Wake up threads/tasks waiting to receiver buffers from the queue.
  #[inline]
  fn wake_receivers(&self, inner: &mut Inner) {
    self.signal.notify_all();
    for (_id, waker) in inner.rx_wakers.drain() {
      waker.wake();
    }
  }
}

impl Shared {
  fn new(max_len: Option<usize>, max_size: Option<usize>) -> Self {
    let inner = Inner::new(max_len, max_size);
    Self {
      inner: Mutex::new(inner),
      signal: Condvar::new()
    }
  }

  /// Attempt to push a buffer onto the queue.
  ///
  /// Returns `Err(buf)` if the queue currently can not fit this buffer.
  ///
  /// The caller must have validated that the size of `buf` does not exceed the
  /// total size limit before calling this method.
  #[inline]
  fn try_push(&self, inner: &mut Inner, buf: Vec<u8>) -> Result<(), Error> {
    if inner.rx_count == 0 {
      return Err(Error::Closed);
    }
    inner.try_push(buf).map_err(Error::WontFit)?;
    self.wake_receivers(inner);
    Ok(())
  }

  /// Push a buffer onto the queue.
  ///
  /// Returns `Err()` if a maximum buffer size has been configured and the
  /// input exceeds the maximum size of the queue.
  ///
  /// If the new buffer causes the buffer queue to exceed any of its limits,
  /// the _oldest_ node will be removed until there's room for the buffer.
  #[inline]
  fn force_push(&self, inner: &mut Inner, buf: Vec<u8>) -> Result<(), Error> {
    if inner.rx_count == 0 {
      return Err(Error::Closed);
    }
    inner.force_push(buf).map_err(Error::WontFit)?;
    self.wake_receivers(inner);
    Ok(())
  }

  #[inline]
  fn pop(&self, inner: &mut Inner) -> Result<Option<Vec<u8>>, Error> {
    if let Some(buf) = inner.pop() {
      self.wake_senders(inner);
      Ok(Some(buf))
    } else if inner.tx_count == 0 {
      // No nodes in queue, and there are no transmitters remaining
      Err(Error::Closed)
    } else {
      // No nodes in queue, but there may be some in the future
      Ok(None)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :