bndpresbufch 0.1.2

Bounds-preserving channel for passing buffers.
Documentation
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use crate::err::Error;

/// Transmitting end-point used to transfer bounds-preserved buffers to a
/// [`Receiver`](super::Receiver) end-point.
#[repr(transparent)]
pub struct Sender(pub(super) Arc<super::Shared>);

impl Sender {
  /// Push buffer onto queue.
  ///
  /// If the queue is full (either by length or by size), block and wait for
  /// space to become available.
  ///
  /// # Errors
  /// [`Error::WontFit`] means the queue has been configured with a
  /// maximum total queue size and the size of the input `buf` exceeds the
  /// entire queue size limit.
  pub fn push(&self, mut buf: Vec<u8>) -> Result<(), Error> {
    let mut inner = self.0.inner.lock();
    loop {
      match self.0.try_push(&mut inner, buf) {
        Ok(()) => break Ok(()),
        Err(Error::WontFit(b)) => {
          buf = b;
          self.0.signal.wait(&mut inner);
        }
        Err(e) => break Err(e)
      }
    }
  }

  /// Push buffer onto queue, for `async` contexts.
  ///
  /// If the queue is full (either by length or by size), block and wait for
  /// space to become available.
  ///
  /// # Errors
  /// [`Error::WontFit`] means the queue has been configured with a
  /// maximum total queue size and the length of `buf` exceeds it.
  pub async fn push_async(&self, buf: Vec<u8>) -> Result<(), Error> {
    {
      let inner = self.0.inner.lock();

      // First make sure the buffer isn't too large for the queue (even if the
      // queue is empty).
      if let Some(max_size) = inner.q.max_size() {
        if buf.len() > max_size {
          return Err(Error::WontFit(buf));
        }
      }
    }

    let mut buf2 = buf;
    loop {
      // Try to push the buffer onto the queue
      match self.try_push(buf2) {
        Ok(()) => break Ok(()),
        Err(Error::WontFit(buf)) => {
          // Wait for space to become available ..
          let wsf = WaitSpaceFuture {
            sender: self,
            size: buf.len(),
            waker_id: None
          };
          wsf.await;
          buf2 = buf;
        }
        Err(e) => break Err(e)
      }
    }
  }

  /// Forcibly push buffer onto the buffer queue.
  ///
  /// # Errors
  /// If the queue is configured with a queue buffer limit size, and `buf`
  /// exceeds this lmit, this method will return [`Error::WontFit`].
  ///
  /// If the pushing `buf` onto the queue would either exceed a configured
  /// maximum number of elements in the queue, or a configured maximum queue
  /// size, the oldest node(s) will be removed from the queue until the new
  /// node fits.
  pub fn force_push(&self, buf: Vec<u8>) -> Result<(), Error> {
    let mut inner = self.0.inner.lock();
    self.0.force_push(&mut inner, buf)
  }

  /// Attempt to push buffer onto the queue.
  ///
  /// # Errors
  /// Returns [`Error::WontFit`] if the queue's capacity can (currently) not
  /// hold the input buffer.
  ///
  /// # Panics
  /// If the queue is configured to have an upper total queue buffer size
  /// limit, then the caller must have ensured that the input `buf` does not
  /// exceed this limit.
  pub fn try_push(&self, buf: Vec<u8>) -> Result<(), Error> {
    let mut inner = self.0.inner.lock();
    self.0.try_push(&mut inner, buf)
  }
}

impl Drop for Sender {
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.tx_count -= 1;

    // If the number of transmitters has reached zero, wake all the receivers
    // (so they can return Error::Closed, if appropriate)
    if inner.tx_count == 0 {
      self.0.wake_senders(&mut inner);
    }
  }
}

/// Future used to make sure that there's room for a new buffer in the queue.
pub struct WaitSpaceFuture<'snd> {
  sender: &'snd Sender,
  size: usize,
  waker_id: Option<u32>
}

impl<'snd> Future for WaitSpaceFuture<'snd> {
  type Output = ();
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.sender.0.lock_inner();

    let is_full_len = inner
      .q
      .max_len()
      .map_or(false, |max_len| inner.q.len() >= max_len);

    let is_full_size = if let Some(max_size) = inner.q.max_size() {
      inner.q.size() + self.size > max_size
    } else {
      false
    };

    if is_full_len || is_full_size {
      // Queue is empty -- add this future to the collection of wakers and
      // return pending
      // ToDo: exhaust-deadlock
      let id = loop {
        inner.idgen = inner.idgen.wrapping_add(1);
        if !inner.tx_wakers.contains_key(&inner.idgen) {
          break inner.idgen;
        }
      };
      inner.tx_wakers.insert(id, ctx.waker().clone());

      drop(inner);

      self.waker_id = Some(id);

      Poll::Pending
    } else {
      Poll::Ready(())
    }
  }
}

impl<'snd> Drop for WaitSpaceFuture<'snd> {
  /// Deregister waker.
  fn drop(&mut self) {
    if let Some(id) = self.waker_id.take() {
      let mut inner = self.sender.0.lock_inner();
      inner.tx_wakers.remove(&id);
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :