recstrm 0.0.1

Special purpose flow-controlled channel used to stream records.
Documentation
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use parking_lot::MutexGuard;

use super::Inner;

use crate::err::Error;

#[derive(PartialEq)]
pub enum TryRecv<T> {
  Some(T),
  None,
  EOF
}

pub struct Receiver<T, E>(pub(super) Arc<super::Shared<T, E>>);

impl<T, E> Receiver<T, E> {
  #[inline]
  fn lock_inner(&self) -> MutexGuard<'_, Inner<T, E>> {
    self.0.inner.lock()
  }

  #[inline]
  fn signal_sender(&self, inner: &mut Inner<T, E>) {
    self.0.signal.notify_all();
    if let Some(ref mut sctx) = inner.sctx {
      if let Some(waker) = sctx.waker.take() {
        waker.wake();
      }
    }
  }

  #[inline]
  fn check_error(&self, inner: &mut Inner<T, E>) -> Result<(), Error<E>> {
    if let Some(ref mut rctx) = inner.rctx {
      if let Some(err) = rctx.error.take() {
        return Err(err);
      }
    }
    Ok(())
  }

  #[inline]
  fn blocking_wait(&self, inner: &mut MutexGuard<'_, Inner<T, E>>) {
    self.0.signal.wait(inner);
  }
}

impl<T, E> Receiver<T, E> {
  /// Receive a single record from the sender end-point.
  ///
  /// If the sender has reported an error, it will be returned.
  pub fn recv(&self) -> Result<Option<T>, Error<E>> {
    let mut inner = self.lock_inner();

    // Wait for data to become available (or error to be set by sender) if
    // needed.
    loop {
      // If an error has triggered which invalidates the receiver, then handle
      // it here.
      self.check_error(&mut inner)?;

      if let Some(n) = inner.q.pop_front() {
        if let Some(ref mut nrecs) = inner.recv_recs {
          *nrecs = nrecs.saturating_add(1);
        }

        // Signal to the sender that a node has been taken off the queue
        self.signal_sender(&mut inner);

        break Ok(Some(n));
      } else if inner.sctx.is_none() {
        // No nodes in the queue and the stream has reached its end.
        // Return Ok(None) to signal that stream has ended.

        // ToDo: If a number of records has been set when the channel was
        // created, then make sure the number of received nodes has reached the
        // expected number of records.

        return Ok(None);
      }

      self.blocking_wait(&mut inner);
    }
  }

  /// Receive all records in the queue.
  ///
  /// Blocks and waits for nodes to arrive if the queue was empty.
  ///
  /// If records were extracted from the queue `Ok(Some(Vec<T>))` is returned,
  /// where the length of the `Vec` will be guaranteed to be at least one.
  ///
  /// If the queue is empty and the sender was dropped, then Ok(None) is
  /// returned.
  pub fn recv_all(&self) -> Result<Option<Vec<T>>, Error<E>> {
    let mut inner = self.lock_inner();

    self.check_error(&mut inner)?;

    // Wait for data to become available, the sender to drop or an error to be
    // set.
    while inner.q.is_empty() {
      if inner.sctx.is_none() {
        return Ok(None);
      }
      self.blocking_wait(&mut inner);

      // Re-check error after reacquiring the mutex
      self.check_error(&mut inner)?;
    }
    let ret = Vec::from_iter(inner.q.drain(..));
    if let Some(ref mut nrecs) = inner.recv_recs {
      *nrecs = nrecs.saturating_add(ret.len());
    }

    self.signal_sender(&mut inner);
    Ok(Some(ret))
  }

  /// Receiver at most `lim` records from queue.
  ///
  /// # Panic
  /// Passing a `lim` value of `0` will cause a panic.
  pub fn recv_atmost(&self, lim: usize) -> Result<Option<Vec<T>>, Error<E>> {
    assert!(lim != 0);

    let mut inner = self.lock_inner();

    self.check_error(&mut inner)?;

    // Wait for data to become available, the sender to drop or an error to be
    // set.
    while inner.q.is_empty() {
      if inner.sctx.is_none() {
        return Ok(None);
      }
      self.blocking_wait(&mut inner);

      // Re-check error after reacquiring the mutex
      self.check_error(&mut inner)?;
    }

    let n = std::cmp::min(inner.q.len(), lim);
    let ret = Vec::from_iter(inner.q.drain(..n));
    if let Some(ref mut nrecs) = inner.recv_recs {
      *nrecs = nrecs.saturating_add(ret.len());
    }
    self.signal_sender(&mut inner);
    Ok(Some(ret))
  }

  /// Attempt to get a node off the queue.
  ///
  /// Will not block, and thus is safe to call from async contexts.
  pub fn try_recv(&self) -> Result<TryRecv<T>, Error<E>> {
    let mut inner = self.lock_inner();

    self.check_error(&mut inner)?;

    if let Some(n) = inner.q.pop_front() {
      if let Some(ref mut nrecs) = inner.recv_recs {
        *nrecs = nrecs.saturating_add(1);
      }
      self.signal_sender(&mut inner);
      Ok(TryRecv::Some(n))
    } else if inner.sctx.is_none() {
      Ok(TryRecv::EOF)
    } else {
      Ok(TryRecv::None)
    }
  }

  /// Receive a single record from the queue.
  ///
  /// If there are no entries in the queue, block and wait for the sender to
  /// send one.
  pub fn recv_async(&self) -> RecvFuture<T, E> {
    RecvFuture(self)
  }

  pub fn recv_all_async(&self) -> RecvAllFuture<T, E> {
    RecvAllFuture(self)
  }

  /// # Panic
  /// Passing a `lim` value of `0` will cause a panic.
  pub fn recv_atmost_async(&self, lim: usize) -> RecvAtMostFuture<T, E> {
    assert!(lim != 0);
    RecvAtMostFuture { rcv: self, lim }
  }

  /// Terminate the channel with an application-defined error.
  ///
  /// This will cause the [`Sender`](super::Sender) to return an
  /// `Error::App(E)` error.
  pub fn fail(self, e: E) {
    let mut inner = self.lock_inner();
    if let Some(ref mut sctx) = inner.sctx {
      sctx.error = Some(Error::App(e));
      self.signal_sender(&mut inner);
    }
  }
}

impl<T, E> Drop for Receiver<T, E> {
  fn drop(&mut self) {
    let mut inner = self.lock_inner();

    // Release receiver's context to signal to the sender that the receiver has
    // been dropped.
    let _ = inner.rctx.take();

    self.signal_sender(&mut inner);
  }
}

pub struct RecvFuture<'a, T, E>(pub(super) &'a Receiver<T, E>);

impl<'a, T, E> Future for RecvFuture<'a, T, E> {
  type Output = Result<Option<T>, Error<E>>;

  fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut inner = self.0.lock_inner();

    if let Err(e) = self.0.check_error(&mut inner) {
      return Poll::Ready(Err(e));
    }

    if let Some(n) = inner.q.pop_front() {
      if let Some(ref mut nrecs) = inner.recv_recs {
        *nrecs = nrecs.saturating_add(1);
      }
      self.0.signal_sender(&mut inner);
      Poll::Ready(Ok(Some(n)))
    } else if inner.sctx.is_none() {
      Poll::Ready(Ok(None))
    } else {
      let Some(ref mut rctx) = inner.rctx else {
        // This should not be possible, because the future holds a reference
        // to the Receiver, and it's the Receiver's Drop implementation that
        // clears inner.rctx.
        panic!("Internal error; Receiver has been dropped");
      };
      rctx.waker = Some(ctx.waker().clone());
      Poll::Pending
    }
  }
}

pub struct RecvAllFuture<'a, T, E>(pub(super) &'a Receiver<T, E>);

impl<'a, T, E> Future for RecvAllFuture<'a, T, E> {
  type Output = Result<Option<Vec<T>>, Error<E>>;

  fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut inner = self.0.lock_inner();

    if let Err(e) = self.0.check_error(&mut inner) {
      return Poll::Ready(Err(e));
    }

    if inner.q.is_empty() {
      if inner.sctx.is_none() {
        Poll::Ready(Ok(None))
      } else {
        let Some(ref mut rctx) = inner.rctx else {
          // This should not be possible, because the future holds a reference
          // to the Receiver, and it's the Receiver's Drop implementation that
          // clears inner.rctx.
          panic!("Internal error; Receiver has been dropped");
        };
        rctx.waker = Some(ctx.waker().clone());
        Poll::Pending
      }
    } else {
      let ret = Vec::from_iter(inner.q.drain(..));
      if let Some(ref mut nrecs) = inner.recv_recs {
        *nrecs = nrecs.saturating_add(ret.len());
      }
      self.0.signal_sender(&mut inner);
      Poll::Ready(Ok(Some(ret)))
    }
  }
}

pub struct RecvAtMostFuture<'a, T, E> {
  rcv: &'a Receiver<T, E>,
  lim: usize
}

impl<'a, T, E> Future for RecvAtMostFuture<'a, T, E> {
  type Output = Result<Option<Vec<T>>, Error<E>>;

  fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut inner = self.rcv.lock_inner();

    if let Err(e) = self.rcv.check_error(&mut inner) {
      return Poll::Ready(Err(e));
    }

    if inner.q.is_empty() {
      if inner.sctx.is_none() {
        Poll::Ready(Ok(None))
      } else {
        let Some(ref mut rctx) = inner.rctx else {
          // This should not be possible, because the future holds a reference
          // to the Receiver, and it's the Receiver's Drop implementation that
          // clears inner.rctx.
          panic!("Internal error; Receiver has been dropped");
        };
        rctx.waker = Some(ctx.waker().clone());
        Poll::Pending
      }
    } else {
      let n = std::cmp::min(inner.q.len(), self.lim);
      let ret = Vec::from_iter(inner.q.drain(..n));
      if let Some(ref mut nrecs) = inner.recv_recs {
        *nrecs = nrecs.saturating_add(ret.len());
      }
      self.rcv.signal_sender(&mut inner);
      Poll::Ready(Ok(Some(ret)))
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :