mod err;
mod rx;
mod tx;
use std::{sync::Arc, task::Waker};
use parking_lot::{Condvar, Mutex, MutexGuard};
use rustc_hash::FxHashMap;
use bndpresbufq::BndPresLimBufQ;
pub use {
err::Error,
rx::{MustHandle, Receiver},
tx::Sender
};
#[derive(Default)]
pub struct Builder {
max_len: Option<usize>,
max_size: Option<usize>
}
impl Builder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn max_len(mut self, n: usize) -> Self {
self.max_len_r(n);
self
}
pub fn max_len_r(&mut self, n: usize) -> &mut Self {
assert!(n != 0);
self.max_len = Some(n);
self
}
#[must_use]
pub fn max_size(mut self, n: usize) -> Self {
self.max_size_r(n);
self
}
pub fn max_size_r(&mut self, n: usize) -> &mut Self {
assert!(n != 0);
self.max_size = Some(n);
self
}
#[must_use]
pub fn build(self) -> (Sender, Receiver) {
assert!(!matches!(self.max_len, Some(0)));
assert!(!matches!(self.max_size, Some(0)));
let sh = Shared::new(self.max_len, self.max_size);
let sh = Arc::new(sh);
let tx = Sender(Arc::clone(&sh));
let rx = Receiver(sh);
(tx, rx)
}
}
struct Inner {
q: BndPresLimBufQ,
tx_count: usize,
rx_count: usize,
tx_wakers: FxHashMap<u32, Waker>,
rx_wakers: FxHashMap<u32, Waker>,
idgen: u32
}
impl Inner {
fn new(max_len: Option<usize>, max_size: Option<usize>) -> Self {
Self {
q: BndPresLimBufQ::new(max_len, max_size),
tx_count: 1,
rx_count: 1,
tx_wakers: FxHashMap::default(),
rx_wakers: FxHashMap::default(),
idgen: 0
}
}
#[inline]
fn try_push(&mut self, buf: Vec<u8>) -> Result<(), Vec<u8>> {
self.q.try_push(buf)
}
#[inline]
fn force_push(&mut self, buf: Vec<u8>) -> Result<(), Vec<u8>> {
self.q.force_push(buf)
}
#[inline]
fn pop(&mut self) -> Option<Vec<u8>> {
self.q.pop()
}
}
struct Shared {
inner: Mutex<Inner>,
signal: Condvar
}
impl Shared {
#[inline]
fn lock_inner(&self) -> MutexGuard<'_, Inner> {
self.inner.lock()
}
#[inline]
fn wake_senders(&self, inner: &mut Inner) {
self.signal.notify_all();
for (_id, waker) in inner.tx_wakers.drain() {
waker.wake();
}
}
#[inline]
fn wake_receivers(&self, inner: &mut Inner) {
self.signal.notify_all();
for (_id, waker) in inner.rx_wakers.drain() {
waker.wake();
}
}
}
impl Shared {
fn new(max_len: Option<usize>, max_size: Option<usize>) -> Self {
let inner = Inner::new(max_len, max_size);
Self {
inner: Mutex::new(inner),
signal: Condvar::new()
}
}
#[inline]
fn try_push(&self, inner: &mut Inner, buf: Vec<u8>) -> Result<(), Error> {
if inner.rx_count == 0 {
return Err(Error::Closed);
}
inner.try_push(buf).map_err(Error::WontFit)?;
self.wake_receivers(inner);
Ok(())
}
#[inline]
fn force_push(&self, inner: &mut Inner, buf: Vec<u8>) -> Result<(), Error> {
if inner.rx_count == 0 {
return Err(Error::Closed);
}
inner.force_push(buf).map_err(Error::WontFit)?;
self.wake_receivers(inner);
Ok(())
}
#[inline]
fn pop(&self, inner: &mut Inner) -> Result<Option<Vec<u8>>, Error> {
if let Some(buf) = inner.pop() {
self.wake_senders(inner);
Ok(Some(buf))
} else if inner.tx_count == 0 {
Err(Error::Closed)
} else {
Ok(None)
}
}
}