recstrm 0.0.1

Special purpose flow-controlled channel used to stream records.
Documentation
use std::{
  collections::VecDeque,
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use parking_lot::{Mutex, MutexGuard};

use super::Inner;

use crate::err::Error;


pub struct Sender<T, E>(pub(super) Arc<super::Shared<T, E>>);

impl<T, E> Sender<T, E> {
  #[inline]
  fn lock_inner(&self) -> MutexGuard<'_, Inner<T, E>> {
    self.0.inner.lock()
  }

  #[inline]
  fn signal_receiver(&self, inner: &mut Inner<T, E>) {
    self.0.signal.notify_all();
    if let Some(ref mut rctx) = inner.rctx {
      if let Some(waker) = rctx.waker.take() {
        waker.wake();
      }
    }
  }

  #[inline]
  fn check_error(&self, inner: &mut Inner<T, E>) -> Result<(), Error<E>> {
    if let Some(ref mut sctx) = inner.sctx {
      if let Some(err) = sctx.error.take() {
        return Err(err);
      }
    }

    // Presumably this method is called when the application has requested to
    // send data.  If the receiver has been dropped, then sending should return
    // Error::ReceiverDisappeared.
    //
    // If check_error() is ever made to be called in situations other than if a
    // send has been requested, then this check should not be called (refactor
    // out to a separate error checking function that is only used in send
    // methods?).
    if inner.rctx.is_none() {
      return Err(Error::ReceiverDisappeared);
    }

    Ok(())
  }


  #[inline]
  fn queue_full(&self, inner: &mut Inner<T, E>) -> bool {
    self.0.queue_full(inner)
  }

  #[inline]
  fn blocking_wait(&self, inner: &mut MutexGuard<'_, Inner<T, E>>) {
    self.0.signal.wait(inner);
  }

  /// Called when the queue is full and we need to wait for the receiver to
  /// take nodes off the queue.
  fn wait_for_space(
    &self,
    inner: &mut MutexGuard<'_, Inner<T, E>>
  ) -> Result<(), Error<E>> {
    self.check_error(inner)?;

    loop {
      // Break if there's room for another node in the queue, otherwise wait
      // for a notification that the queue state has changed.
      if !self.queue_full(inner) {
        break;
      }
      self.blocking_wait(inner);

      // If an error was raised while the mutex was locked then abort
      // immediately
      self.check_error(inner)?;
    }
    Ok(())
  }

  #[inline]
  fn expected(&self, inner: &mut Inner<T, E>) -> Option<(usize, usize)> {
    if let (Some(expected), Some(sent)) = (self.0.num_records, inner.sent_recs)
    {
      Some((expected, sent))
    } else {
      None
    }
  }
}


impl<T, E> Sender<T, E> {
  /// Send a single record to the receiver.
  ///
  /// If the queue is currently full this call will block and wait for space to
  /// become available.
  ///
  /// # Panic
  /// If this function has already returned an application-defined error, then
  /// calling this method again will panic.  Handle errors on first
  /// opportunity.
  pub fn send(&self, n: T) -> Result<(), Error<E>> {
    let mut inner = self.lock_inner();

    // If queue is full, block and wait until space becomes available.
    self.wait_for_space(&mut inner)?;

    // Push node onto queue and signal anyone waiting for new nodess
    inner.push(n);
    self.signal_receiver(&mut inner);

    Ok(())
  }

  /// Send a batch of records.
  ///
  /// All records will be transferred.  If the queue is full, this call will
  /// block until space becomes available.
  pub fn send_batch<I>(&self, it: I) -> Result<(), Error<E>>
  where
    I: Iterator<Item = T>
  {
    //let mut it = it.into_iter();

    let mut inner = self.lock_inner();

    // Always checking for an error as soon as lock has been released
    self.check_error(&mut inner)?;

    let mut did_push = false;

    //while let Some(n) = it.next() {
    for n in it {
      // We have a new node -- make sure there's room in the queue for it
      //
      // If the queue is full, then wait for the receiver to take nodes off
      // the queue.
      while self.queue_full(&mut inner) {
        // If nodes have been pushed to the receiver as we're about to wait
        // for space to become available, signal the receiver that queue has
        // received new nodes.
        if did_push {
          self.signal_receiver(&mut inner);
          did_push = false;
        }
        self.wait_for_space(&mut inner)?;
      }

      inner.push(n);
      did_push = true;
    }

    if did_push {
      self.signal_receiver(&mut inner);
    }

    Ok(())
  }

  /// Non-blocking send.
  ///
  /// Attempt to send a node.
  // ToDo: If the queue is full, return `Err(Error::QueueFull(T))`, returning
  // the node that could not be added to the queue.
  pub fn try_send(&self, n: T) -> Result<(), Error<E>> {
    let mut inner = self.lock_inner();

    self.check_error(&mut inner)?;

    if self.queue_full(&mut inner) {
      //Err(Error::QueueFull(n))
      Err(Error::QueueFull)
    } else {
      inner.push(n);
      self.signal_receiver(&mut inner);
      Ok(())
    }
  }

  /// Return a `Future` that will send a record to the receiver or wait until
  /// space becomes available in the queue.
  ///
  /// # Cancel safety
  /// The returned `SendFuture` is _not_ cancel safe.
  pub fn send_async(&self, n: T) -> SendFuture<T, E> {
    let mut q = VecDeque::new();
    q.push_back(n);

    SendFuture {
      snd: self,
      outq: Mutex::new(q)
    }
  }

  /// Return a `Future` that will send records to the receiver or wait until
  /// space becomes available in the queue.
  ///
  /// # Cancel safety
  /// The returned `SendFuture` is _not_ cancel safe.
  pub fn send_batch_async<I>(&self, it: I) -> SendFuture<T, E>
  where
    I: Iterator<Item = T>
  {
    let q = VecDeque::from_iter(it);
    SendFuture {
      snd: self,
      outq: Mutex::new(q)
    }
  }

  /// Terminate the channel with an application-defined error.
  ///
  /// This will cause the [`Receiver`](super::Receiver) to return an
  /// `Error::App(E)` error.
  pub fn fail(self, e: E) {
    let mut inner = self.lock_inner();
    if let Some(ref mut rctx) = inner.rctx {
      rctx.error = Some(Error::App(e));
      self.signal_receiver(&mut inner);
    }
  }
}

impl<T, E> Drop for Sender<T, E> {
  fn drop(&mut self) {
    let mut inner = self.lock_inner();

    let _ = inner.sctx.take();

    // If the number of expected records have not been sent, then make the
    // receiver return an underflow error.
    if let Some((expected, sent)) = self.expected(&mut inner) {
      if sent < expected {
        if let Some(ref mut rctx) = inner.rctx {
          rctx.error = Some(Error::RecordsUnderflow);
        }
      }
    }

    self.signal_receiver(&mut inner);
  }
}


pub struct SendFuture<'a, T, E> {
  snd: &'a Sender<T, E>,
  outq: Mutex<VecDeque<T>>
}

impl<'a, T, E> Future for SendFuture<'a, T, E> {
  type Output = Result<(), Error<E>>;

  fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut inner = self.snd.lock_inner();

    if let Err(e) = self.snd.check_error(&mut inner) {
      return Poll::Ready(Err(e));
    };

    if self.snd.queue_full(&mut inner) {
      if let Some(ref mut sctx) = inner.sctx {
        sctx.waker = Some(ctx.waker().clone());
        return Poll::Pending;
      } else {
        // This means the Sender has been dropped, but this future is still
        // alive.  Optimally this shouldn't be possible, for instance by
        // storing a reference to Sender in SendFuture.
        //
        // ToDo: Should probably return an appropriate error
        return Poll::Ready(Ok(()));
      }
    }

    let mut outq = self.outq.lock();
    let mut did_push = false;

    // Keep moving nodes from self's queue to the shared queue until either
    // self's queue is empty or the shared queue is full.
    let ret = loop {
      let Some(n) = outq.pop_front() else {
        // If outq is empty, there's nothing more to do
        break Poll::Ready(Ok(()));
      };

      // This should always succeed, because it has been determined that
      // the shared queue is not full at this point.
      inner.push(n);
      did_push = true;

      // If the latest push emptied the outq, then we're done
      if outq.is_empty() {
        break Poll::Ready(Ok(()));
      };

      // If this lastest push filled the queue, then wait until space is freed
      // by the receiver.
      if self.snd.queue_full(&mut inner) {
        if let Some(ref mut sctx) = inner.sctx {
          sctx.waker = Some(ctx.waker().clone());
          break Poll::Pending;
        } else {
          // Sender gone.
          // ToDo: Return an appropriate error
          break Poll::Ready(Ok(()));
        }
      }
    };
    if did_push {
      self.snd.signal_receiver(&mut inner);
    }

    ret
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :