mod err;
mod recv;
mod send;
use std::{collections::VecDeque, sync::Arc, task::Waker};
use parking_lot::{Condvar, Mutex};
pub use err::Error;
pub use recv::{Receiver, TryRecv};
pub use send::Sender;
struct SenderCtx<E> {
waker: Option<Waker>,
error: Option<Error<E>>
}
impl<E> Default for SenderCtx<E> {
fn default() -> Self {
Self {
waker: None,
error: None
}
}
}
struct ReceiverCtx<E> {
waker: Option<Waker>,
error: Option<Error<E>>
}
impl<E> Default for ReceiverCtx<E> {
fn default() -> Self {
Self {
waker: None,
error: None
}
}
}
struct Inner<T, E> {
q: VecDeque<T>,
sctx: Option<SenderCtx<E>>,
rctx: Option<ReceiverCtx<E>>,
sent_recs: Option<usize>,
recv_recs: Option<usize>
}
impl<T, E> Inner<T, E> {
fn new() -> Self {
Self {
q: VecDeque::new(),
sctx: Some(SenderCtx::default()),
rctx: Some(ReceiverCtx::default()),
sent_recs: None,
recv_recs: None
}
}
#[inline]
pub(crate) fn push(&mut self, n: T) {
self.q.push_back(n);
if let Some(ref mut nrecs) = self.sent_recs {
*nrecs = nrecs.saturating_add(1);
}
}
}
struct Shared<T, E> {
inner: Mutex<Inner<T, E>>,
signal: Condvar,
qsize: usize,
num_records: Option<usize>
}
impl<T, E> Shared<T, E> {
fn new(inner: Inner<T, E>, qsize: usize, nrecs: Option<usize>) -> Self {
Self {
inner: Mutex::new(inner),
signal: Condvar::new(),
qsize,
num_records: nrecs
}
}
pub(crate) fn queue_full(&self, inner: &mut Inner<T, E>) -> bool {
inner.q.len() >= self.qsize
}
}
pub struct Builder {
qsize: usize,
num_records: Option<usize>
}
impl Builder {
pub fn new() -> Self {
Self {
qsize: 32,
num_records: None
}
}
pub fn queue_size(mut self, qsize: usize) -> Self {
self.queue_size_r(qsize);
self
}
pub fn queue_size_r(&mut self, qsize: usize) -> &mut Self {
assert!(qsize != 0);
self.qsize = qsize;
self
}
pub fn num_records(mut self, nrecs: usize) -> Self {
self.num_records_r(nrecs);
self
}
pub fn num_records_r(&mut self, nrecs: usize) -> &mut Self {
self.num_records = Some(nrecs);
self
}
pub fn build<T, E>(self) -> (Sender<T, E>, Receiver<T, E>) {
let mut inner = Inner::new();
if self.num_records.is_some() {
inner.sent_recs = Some(0);
inner.recv_recs = Some(0);
}
let shared = Shared::new(inner, self.qsize, self.num_records);
let shared = Arc::new(shared);
(Sender(Arc::clone(&shared)), Receiver(shared))
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
pub fn channel<T, E>(
qsize: usize,
num_records: Option<usize>
) -> (Sender<T, E>, Receiver<T, E>) {
assert!(qsize != 0);
let mut bldr = Builder::new().queue_size(qsize);
if let Some(nrecs) = num_records {
bldr.num_records_r(nrecs);
}
bldr.build()
}