bndpresbufch 0.1.2

Bounds-preserving channel for passing buffers.
Documentation
use std::{
  future::Future,
  mem::ManuallyDrop,
  ops::{Deref, DerefMut},
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use crate::err::Error;


#[derive(Default)]
enum DropAction {
  #[default]
  ReturnToQueue,
  Drop,
  Nothing
}

/// Wrapper around elements that must be handled by the application.
pub struct MustHandle {
  sh: Arc<super::Shared>,
  inner: ManuallyDrop<Vec<u8>>,
  drop_action: DropAction
}

impl MustHandle {
  fn new(sh: Arc<super::Shared>, inner: Vec<u8>) -> Self {
    Self {
      sh,
      inner: ManuallyDrop::new(inner),
      drop_action: DropAction::default()
    }
  }

  /// Mark the inner object has handled and then drop it.
  pub fn handled(mut self) {
    self.drop_action = DropAction::Drop;
  }

  /// Remove the inner object from the `MustHandle` and return it.
  #[must_use]
  pub fn into_inner(mut self) -> Vec<u8> {
    self.drop_action = DropAction::Nothing;
    unsafe { ManuallyDrop::take(&mut self.inner) }
  }
}

impl Deref for MustHandle {
  type Target = [u8];

  fn deref(&self) -> &[u8] {
    &self.inner
  }
}

impl DerefMut for MustHandle {
  fn deref_mut(&mut self) -> &mut [u8] {
    &mut self.inner
  }
}

impl Drop for MustHandle {
  fn drop(&mut self) {
    match self.drop_action {
      DropAction::ReturnToQueue => {
        let t = unsafe { ManuallyDrop::take(&mut self.inner) };
        let mut inner = self.sh.inner.lock();
        let _ = inner.q.try_return(t);
      }
      DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
      DropAction::Nothing => {}
    }
  }
}

/// Receiving end-point used to receive bounds-preserved buffers from a
/// [`Sender`](super::Sender) end-point.
#[repr(transparent)]
pub struct Receiver(pub(super) Arc<super::Shared>);

impl Receiver {
  /// Get next buffer in queue.
  ///
  /// If there are no buffers in queue, then block and wait for a
  /// [`Sender`](super::Sender) to push a buffer onto the queue.
  ///
  /// Returns `None` if the queue is empty and there are no more `Sender`
  /// objects associated with the queue.
  #[allow(clippy::significant_drop_tightening)]
  #[must_use]
  pub fn pop(&self) -> Option<Vec<u8>> {
    let mut inner = self.0.inner.lock();
    loop {
      if let Some(buf) = inner.pop() {
        self.0.wake_senders(&mut inner);
        break Some(buf);
      }
      if inner.tx_count == 0 {
        break None;
      }
      self.0.signal.wait(&mut inner);
    }
  }

  /// Take an element off the queue that must be handled by the application, or
  /// it will be returned to the queue.
  ///
  /// If no elements are available on the queue, then block and wait for one to
  /// be added.  If the queue us empty and the sender has been dropped this
  /// `None` will returned.
  #[must_use]
  pub fn pop_managed(&self) -> Option<MustHandle> {
    let n = self.pop()?;
    Some(MustHandle::new(Arc::clone(&self.0), n))
  }

  /// Attempt to get next buffer in queue.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  pub fn try_pop(&self) -> Result<Option<Vec<u8>>, Error> {
    let mut inner = self.0.inner.lock();
    self.0.pop(&mut inner)
  }


  /// Attempt to get next buffer in queue, wrapped in a [`MustHandle`] wrapper.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  pub fn try_pop_managed(&self) -> Result<Option<MustHandle>, Error> {
    let mut inner = self.0.inner.lock();
    Ok(
      self
        .0
        .pop(&mut inner)?
        .map(|n| MustHandle::new(Arc::clone(&self.0), n))
    )
  }

  /// Return a [`Future`] that will return a buffer from the queue or wait for
  /// a buffer to become available.
  ///
  /// The `Future` will resolve to `Ok(Some(Vec<u8>))` on success.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  #[must_use]
  pub fn apop(&self) -> RecvFuture {
    RecvFuture {
      sh: Arc::clone(&self.0),
      waker_id: None
    }
  }


  /// Return a [`Future`] that will return a buffer from the queue or wait for
  /// a buffer to become available.
  ///
  /// The `Future` will resolve to `Ok(Some(Vec<u8>))` on success.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  #[must_use]
  pub fn apop_managed(&self) -> RecvManagedFuture {
    RecvManagedFuture {
      sh: Arc::clone(&self.0),
      waker_id: None
    }
  }
}

impl Drop for Receiver {
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.rx_count -= 1;

    // If the number of receivers has reached zero, wake all the transmitters
    // (so they can return Error::Closed, if appropriate)
    if inner.rx_count == 0 {
      self.0.wake_senders(&mut inner);
    }
  }
}


pub struct RecvFuture {
  sh: Arc<super::Shared>,
  waker_id: Option<u32>
}

impl Future for RecvFuture {
  type Output = Result<Vec<u8>, Error>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.sh.lock_inner();
    match self.sh.pop(&mut inner) {
      Ok(Some(buf)) => Poll::Ready(Ok(buf)),
      Ok(None) => {
        // Queue is empty -- add this future to the collection of wakers and
        // return pending
        // ToDo: exhaust-deadlock
        let id = loop {
          inner.idgen = inner.idgen.wrapping_add(1);
          if !inner.rx_wakers.contains_key(&inner.idgen) {
            break inner.idgen;
          }
        };
        inner.rx_wakers.insert(id, ctx.waker().clone());

        drop(inner);

        self.waker_id = Some(id);

        Poll::Pending
      }
      Err(e) => Poll::Ready(Err(e))
    }
  }
}

impl Drop for RecvFuture {
  /// When a [`RecvFuture`] is dropped, make sure to deregister its waker [if
  /// registered].
  fn drop(&mut self) {
    if let Some(id) = self.waker_id.take() {
      let mut inner = self.sh.lock_inner();
      inner.rx_wakers.remove(&id);
    }
  }
}


pub struct RecvManagedFuture {
  sh: Arc<super::Shared>,
  waker_id: Option<u32>
}

impl Future for RecvManagedFuture {
  type Output = Result<MustHandle, Error>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.sh.lock_inner();
    match self.sh.pop(&mut inner) {
      Ok(Some(buf)) => {
        let ret = MustHandle::new(Arc::clone(&self.sh), buf);
        Poll::Ready(Ok(ret))
      }
      Ok(None) => {
        // Queue is empty -- add this future to the collection of wakers and
        // return pending
        // ToDo: exhaust-deadlock
        let id = loop {
          inner.idgen = inner.idgen.wrapping_add(1);
          if !inner.rx_wakers.contains_key(&inner.idgen) {
            break inner.idgen;
          }
        };
        inner.rx_wakers.insert(id, ctx.waker().clone());

        drop(inner);

        self.waker_id = Some(id);

        Poll::Pending
      }
      Err(e) => Poll::Ready(Err(e))
    }
  }
}

impl Drop for RecvManagedFuture {
  /// When a [`RecvFuture`] is dropped, make sure to deregister its waker [if
  /// registered].
  fn drop(&mut self) {
    if let Some(id) = self.waker_id.take() {
      let mut inner = self.sh.lock_inner();
      inner.rx_wakers.remove(&id);
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :