recstrm 0.0.1

Special purpose flow-controlled channel used to stream records.
Documentation
mod err;
mod recv;
mod send;

use std::{collections::VecDeque, sync::Arc, task::Waker};

use parking_lot::{Condvar, Mutex};

pub use err::Error;
pub use recv::{Receiver, TryRecv};
pub use send::Sender;

struct SenderCtx<E> {
  waker: Option<Waker>,
  error: Option<Error<E>>
}

impl<E> Default for SenderCtx<E> {
  fn default() -> Self {
    Self {
      waker: None,
      error: None
    }
  }
}

struct ReceiverCtx<E> {
  waker: Option<Waker>,
  error: Option<Error<E>>
}

impl<E> Default for ReceiverCtx<E> {
  fn default() -> Self {
    Self {
      waker: None,
      error: None
    }
  }
}


// ToDo: By virtue of how this library should work, could the wakers be made
// into one?  It should not be possible for both sender and receiver to wait at
// the same time?
struct Inner<T, E> {
  q: VecDeque<T>,

  /// Used to store sender end-point specific information.
  ///
  /// This is set to `None` by the `Sender`'s `Drop` implementaion, which
  /// implicitly means (when the queue is empty) end-of-stream.
  sctx: Option<SenderCtx<E>>,

  /// Used to store receiver end-point specific information.
  ///
  /// This is set to `None` by the `Receiver`'s `Drop` implementaion.
  rctx: Option<ReceiverCtx<E>>,

  sent_recs: Option<usize>,
  recv_recs: Option<usize>
}

impl<T, E> Inner<T, E> {
  fn new() -> Self {
    Self {
      q: VecDeque::new(),
      sctx: Some(SenderCtx::default()),
      rctx: Some(ReceiverCtx::default()),
      sent_recs: None,
      recv_recs: None
    }
  }

  #[inline]
  pub(crate) fn push(&mut self, n: T) {
    self.q.push_back(n);
    if let Some(ref mut nrecs) = self.sent_recs {
      *nrecs = nrecs.saturating_add(1);
    }
  }
}

struct Shared<T, E> {
  inner: Mutex<Inner<T, E>>,
  signal: Condvar,

  /// Maximum number of records in queue.  Sender will block, waiting for
  /// records to be pulled off queue, if full.
  qsize: usize,

  /// Expected number of records.  If this is set, a limit has been set.  If
  /// this is `None`, no limit has been set.
  num_records: Option<usize>
}

impl<T, E> Shared<T, E> {
  fn new(inner: Inner<T, E>, qsize: usize, nrecs: Option<usize>) -> Self {
    Self {
      inner: Mutex::new(inner),
      signal: Condvar::new(),
      qsize,
      num_records: nrecs
    }
  }

  pub(crate) fn queue_full(&self, inner: &mut Inner<T, E>) -> bool {
    inner.q.len() >= self.qsize
  }
}


pub struct Builder {
  qsize: usize,
  num_records: Option<usize>
}


impl Builder {
  /// Construct a new recstrm builder.
  ///
  /// The queue size will default to 32 records, and there's no expected
  /// number of records.
  pub fn new() -> Self {
    Self {
      qsize: 32,
      num_records: None
    }
  }

  pub fn queue_size(mut self, qsize: usize) -> Self {
    self.queue_size_r(qsize);
    self
  }

  pub fn queue_size_r(&mut self, qsize: usize) -> &mut Self {
    assert!(qsize != 0);
    self.qsize = qsize;
    self
  }

  pub fn num_records(mut self, nrecs: usize) -> Self {
    self.num_records_r(nrecs);
    self
  }

  pub fn num_records_r(&mut self, nrecs: usize) -> &mut Self {
    self.num_records = Some(nrecs);
    self
  }

  pub fn build<T, E>(self) -> (Sender<T, E>, Receiver<T, E>) {
    let mut inner = Inner::new();
    if self.num_records.is_some() {
      inner.sent_recs = Some(0);
      inner.recv_recs = Some(0);
    }
    let shared = Shared::new(inner, self.qsize, self.num_records);
    let shared = Arc::new(shared);

    (Sender(Arc::clone(&shared)), Receiver(shared))
  }
}

impl Default for Builder {
  fn default() -> Self {
    Self::new()
  }
}


/// Generate paired end-points used to pass records.
///
/// This is typically called when the initiator wants to send a stream of
/// records to a remote that receives a stream of records.
///
/// ```
/// use std::thread;
///
/// let (tx, rx) = recstrm::channel::<_, ()>(16, None);
///
/// let jh = thread::spawn(move || {
///   if let Some(node) = rx.recv().unwrap() {
///     assert_eq!(node, "hello");
///   }
/// });
///
/// tx.send("hello").unwrap();
///
/// jh.join();
/// ```
///
/// # Panic
/// Passing `0` to `qsize` will cause this function to panic.
pub fn channel<T, E>(
  qsize: usize,
  num_records: Option<usize>
) -> (Sender<T, E>, Receiver<T, E>) {
  assert!(qsize != 0);

  let mut bldr = Builder::new().queue_size(qsize);
  if let Some(nrecs) = num_records {
    bldr.num_records_r(nrecs);
  }

  bldr.build()
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :