#![forbid(unsafe_code, future_incompatible, rust_2018_idioms)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, rustdoc::missing_doc_code_examples, unreachable_pub)]
#[cfg(doctest)]
mod doctests {
doc_comment::doctest!("../README.md");
}
use std::collections::VecDeque;
use std::convert::TryInto;
use std::error;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use event_listener::{Event, EventListener};
use futures_core::{ready, stream::Stream};
use parking_lot::RwLock;
pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
assert!(cap > 0, "capacity cannot be zero");
let inner = Arc::new(RwLock::new(Inner {
queue: VecDeque::with_capacity(cap),
capacity: cap,
overflow: false,
await_active: true,
receiver_count: 1,
inactive_receiver_count: 0,
sender_count: 1,
head_pos: 0,
is_closed: false,
send_ops: Event::new(),
recv_ops: Event::new(),
}));
let s = Sender {
inner: inner.clone(),
};
let r = Receiver {
inner,
pos: 0,
listener: None,
};
(s, r)
}
#[derive(Debug)]
struct Inner<T> {
queue: VecDeque<(T, usize)>,
capacity: usize,
receiver_count: usize,
inactive_receiver_count: usize,
sender_count: usize,
head_pos: u64,
overflow: bool,
await_active: bool,
is_closed: bool,
send_ops: Event,
recv_ops: Event,
}
impl<T> Inner<T> {
fn try_recv_at(&mut self, pos: &mut u64) -> Result<Result<T, &T>, TryRecvError> {
let i = match pos.checked_sub(self.head_pos) {
Some(i) => i
.try_into()
.expect("Head position more than usize::MAX behind a receiver"),
None => {
let count = self.head_pos - *pos;
*pos = self.head_pos;
return Err(TryRecvError::Overflowed(count));
}
};
let last_waiter;
if let Some((_elt, waiters)) = self.queue.get_mut(i) {
*pos += 1;
*waiters -= 1;
last_waiter = *waiters == 0;
} else {
debug_assert_eq!(i, self.queue.len());
if self.is_closed {
return Err(TryRecvError::Closed);
} else {
return Err(TryRecvError::Empty);
}
}
if last_waiter {
assert_eq!(i, 0);
let elt = self.queue.pop_front().unwrap().0;
self.head_pos += 1;
if !self.overflow {
self.send_ops.notify(1);
}
Ok(Ok(elt))
} else {
Ok(Err(&self.queue[i].0))
}
}
fn close(&mut self) -> bool {
if self.is_closed {
return false;
}
self.is_closed = true;
self.send_ops.notify(usize::MAX);
self.recv_ops.notify(usize::MAX);
true
}
fn set_capacity(&mut self, new_cap: usize) {
self.capacity = new_cap;
if new_cap > self.queue.capacity() {
let diff = new_cap - self.queue.capacity();
self.queue.reserve(diff);
}
if new_cap < self.queue.len() {
let diff = self.queue.len() - new_cap;
self.queue.drain(0..diff);
self.head_pos += diff as u64;
}
}
fn close_channel(&mut self) {
if self.receiver_count == 0 && self.inactive_receiver_count == 0 {
self.close();
}
}
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<RwLock<Inner<T>>>,
}
impl<T> Sender<T> {
pub fn capacity(&self) -> usize {
self.inner.read().capacity
}
pub fn set_capacity(&mut self, new_cap: usize) {
self.inner.write().set_capacity(new_cap);
}
pub fn overflow(&self) -> bool {
self.inner.read().overflow
}
pub fn set_overflow(&mut self, overflow: bool) {
self.inner.write().overflow = overflow;
}
pub fn await_active(&self) -> bool {
self.inner.read().await_active
}
pub fn set_await_active(&mut self, await_active: bool) {
self.inner.write().await_active = await_active;
}
pub fn close(&self) -> bool {
self.inner.write().close()
}
pub fn is_closed(&self) -> bool {
self.inner.read().is_closed
}
pub fn is_empty(&self) -> bool {
self.inner.read().queue.is_empty()
}
pub fn is_full(&self) -> bool {
let inner = self.inner.read();
inner.queue.len() == inner.capacity
}
pub fn len(&self) -> usize {
self.inner.read().queue.len()
}
pub fn receiver_count(&self) -> usize {
self.inner.read().receiver_count
}
pub fn inactive_receiver_count(&self) -> usize {
self.inner.read().inactive_receiver_count
}
pub fn sender_count(&self) -> usize {
self.inner.read().sender_count
}
pub fn new_receiver(&self) -> Receiver<T> {
let mut inner = self.inner.write();
inner.receiver_count += 1;
Receiver {
inner: self.inner.clone(),
pos: inner.head_pos + inner.queue.len() as u64,
listener: None,
}
}
}
impl<T: Clone> Sender<T> {
pub fn broadcast(&self, msg: T) -> Send<'_, T> {
Send {
sender: self,
listener: None,
msg: Some(msg),
}
}
pub fn try_broadcast(&self, msg: T) -> Result<Option<T>, TrySendError<T>> {
let mut ret = None;
let mut inner = self.inner.write();
if inner.is_closed {
return Err(TrySendError::Closed(msg));
} else if inner.receiver_count == 0 {
assert!(inner.inactive_receiver_count != 0);
return Err(TrySendError::Inactive(msg));
} else if inner.queue.len() == inner.capacity {
if inner.overflow {
ret = inner.queue.pop_front().map(|(m, _)| m);
} else {
return Err(TrySendError::Full(msg));
}
}
let receiver_count = inner.receiver_count;
inner.queue.push_back((msg, receiver_count));
if ret.is_some() {
inner.head_pos += 1;
}
inner.recv_ops.notify(usize::MAX);
Ok(ret)
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut inner = self.inner.write();
inner.sender_count -= 1;
if inner.sender_count == 0 {
inner.close();
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.inner.write().sender_count += 1;
Sender {
inner: self.inner.clone(),
}
}
}
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<RwLock<Inner<T>>>,
pos: u64,
listener: Option<EventListener>,
}
impl<T> Receiver<T> {
pub fn capacity(&self) -> usize {
self.inner.read().capacity
}
pub fn set_capacity(&mut self, new_cap: usize) {
self.inner.write().set_capacity(new_cap);
}
pub fn overflow(&self) -> bool {
self.inner.read().overflow
}
pub fn set_overflow(&mut self, overflow: bool) {
self.inner.write().overflow = overflow;
}
pub fn await_active(&self) -> bool {
self.inner.read().await_active
}
pub fn set_await_active(&mut self, await_active: bool) {
self.inner.write().await_active = await_active;
}
pub fn close(&self) -> bool {
self.inner.write().close()
}
pub fn is_closed(&self) -> bool {
self.inner.read().is_closed
}
pub fn is_empty(&self) -> bool {
self.inner.read().queue.is_empty()
}
pub fn is_full(&self) -> bool {
let inner = self.inner.read();
inner.queue.len() == inner.capacity
}
pub fn len(&self) -> usize {
self.inner.read().queue.len()
}
pub fn receiver_count(&self) -> usize {
self.inner.read().receiver_count
}
pub fn inactive_receiver_count(&self) -> usize {
self.inner.read().inactive_receiver_count
}
pub fn sender_count(&self) -> usize {
self.inner.read().sender_count
}
pub fn deactivate(self) -> InactiveReceiver<T> {
self.inner.write().inactive_receiver_count += 1;
InactiveReceiver {
inner: self.inner.clone(),
}
}
}
impl<T: Clone> Receiver<T> {
pub fn recv(&mut self) -> Recv<'_, T> {
Recv {
receiver: self,
listener: None,
}
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.inner
.write()
.try_recv_at(&mut self.pos)
.map(|cow| cow.unwrap_or_else(T::clone))
}
pub fn new_sender(&self) -> Sender<T> {
self.inner.write().sender_count += 1;
Sender {
inner: self.inner.clone(),
}
}
pub fn new_receiver(&self) -> Self {
let mut inner = self.inner.write();
inner.receiver_count += 1;
Receiver {
inner: self.inner.clone(),
pos: inner.head_pos + inner.queue.len() as u64,
listener: None,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut inner = self.inner.write();
loop {
match inner.try_recv_at(&mut self.pos) {
Ok(_) => continue,
Err(TryRecvError::Overflowed(_)) => continue,
Err(TryRecvError::Closed) => break,
Err(TryRecvError::Empty) => break,
}
}
inner.receiver_count -= 1;
inner.close_channel();
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let mut inner = self.inner.write();
inner.receiver_count += 1;
let n = self.pos.saturating_sub(inner.head_pos) as usize;
for (_elt, waiters) in inner.queue.iter_mut().skip(n) {
*waiters += 1;
}
Receiver {
inner: self.inner.clone(),
pos: self.pos,
listener: None,
}
}
}
impl<T: Clone> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(listener) = self.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
self.listener = None;
}
loop {
match self.try_recv() {
Ok(msg) => {
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Overflowed(_)) => continue,
Err(TryRecvError::Empty) => {}
}
match self.listener.as_mut() {
None => {
self.listener = {
let inner = self.inner.write();
Some(inner.recv_ops.listen())
};
}
Some(_) => {
break;
}
}
}
}
}
}
impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
let inner = self.inner.read();
inner.is_closed && inner.queue.is_empty()
}
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SendError(..)")
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending into a closed channel")
}
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
Full(T),
Closed(T),
Inactive(T),
}
impl<T> TrySendError<T> {
pub fn into_inner(self) -> T {
match self {
TrySendError::Full(t) => t,
TrySendError::Closed(t) => t,
TrySendError::Inactive(t) => t,
}
}
pub fn is_full(&self) -> bool {
match self {
TrySendError::Full(_) => true,
TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
}
}
pub fn is_closed(&self) -> bool {
match self {
TrySendError::Full(_) | TrySendError::Inactive(_) => false,
TrySendError::Closed(_) => true,
}
}
pub fn is_disconnected(&self) -> bool {
match self {
TrySendError::Full(_) | TrySendError::Closed(_) => false,
TrySendError::Inactive(_) => true,
}
}
}
impl<T> error::Error for TrySendError<T> {}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "Full(..)"),
TrySendError::Closed(..) => write!(f, "Closed(..)"),
TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "sending into a full channel"),
TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
}
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum RecvError {
Overflowed(u64),
Closed,
}
impl error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Overflowed(n) => write!(f, "receiving skipped {} messages", n),
Self::Closed => write!(f, "receiving from an empty and closed channel"),
}
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
Overflowed(u64),
Empty,
Closed,
}
impl TryRecvError {
pub fn is_empty(&self) -> bool {
match self {
TryRecvError::Empty => true,
TryRecvError::Closed => false,
TryRecvError::Overflowed(_) => false,
}
}
pub fn is_closed(&self) -> bool {
match self {
TryRecvError::Empty => false,
TryRecvError::Closed => true,
TryRecvError::Overflowed(_) => false,
}
}
pub fn is_overflowed(&self) -> bool {
match self {
TryRecvError::Empty => false,
TryRecvError::Closed => false,
TryRecvError::Overflowed(_) => true,
}
}
}
impl error::Error for TryRecvError {}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TryRecvError::Empty => write!(f, "receiving from an empty channel"),
TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
TryRecvError::Overflowed(n) => {
write!(f, "receiving operation observed {} lost messages", n)
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}
impl<'a, T> Unpin for Send<'a, T> {}
impl<'a, T: Clone> Future for Send<'a, T> {
type Output = Result<Option<T>, SendError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);
loop {
let msg = this.msg.take().unwrap();
let inner = &this.sender.inner;
match this.sender.try_broadcast(msg) {
Ok(msg) => {
let inner = inner.write();
if inner.queue.len() < inner.capacity {
inner.send_ops.notify(1);
}
return Poll::Ready(Ok(msg));
}
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => this.msg = Some(m),
Err(TrySendError::Inactive(m)) if inner.read().await_active => this.msg = Some(m),
Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
}
match &mut this.listener {
None => {
let inner = inner.write();
this.listener = Some(inner.send_ops.listen());
}
Some(l) => {
ready!(Pin::new(l).poll(cx));
this.listener = None;
}
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,
listener: Option<EventListener>,
}
impl<'a, T> Unpin for Recv<'a, T> {}
impl<'a, T: Clone> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);
loop {
match this.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
Err(TryRecvError::Overflowed(n)) => {
return Poll::Ready(Err(RecvError::Overflowed(n)))
}
Err(TryRecvError::Empty) => {}
}
match &mut this.listener {
None => {
this.listener = {
let inner = this.receiver.inner.write();
Some(inner.recv_ops.listen())
};
}
Some(l) => {
ready!(Pin::new(l).poll(cx));
this.listener = None;
}
}
}
}
}
#[derive(Debug)]
pub struct InactiveReceiver<T> {
inner: Arc<RwLock<Inner<T>>>,
}
impl<T> InactiveReceiver<T> {
pub fn activate(self) -> Receiver<T> {
self.activate_cloned()
}
pub fn activate_cloned(&self) -> Receiver<T> {
let mut inner = self.inner.write();
inner.receiver_count += 1;
if inner.receiver_count == 1 {
inner.send_ops.notify(1);
}
Receiver {
inner: self.inner.clone(),
pos: inner.head_pos + inner.queue.len() as u64,
listener: None,
}
}
pub fn capacity(&self) -> usize {
self.inner.read().capacity
}
pub fn set_capacity(&mut self, new_cap: usize) {
self.inner.write().set_capacity(new_cap);
}
pub fn overflow(&self) -> bool {
self.inner.read().overflow
}
pub fn set_overflow(&mut self, overflow: bool) {
self.inner.write().overflow = overflow;
}
pub fn await_active(&self) -> bool {
self.inner.read().await_active
}
pub fn set_await_active(&mut self, await_active: bool) {
self.inner.write().await_active = await_active;
}
pub fn close(&self) -> bool {
self.inner.write().close()
}
pub fn is_closed(&self) -> bool {
self.inner.read().is_closed
}
pub fn is_empty(&self) -> bool {
self.inner.read().queue.is_empty()
}
pub fn is_full(&self) -> bool {
let inner = self.inner.read();
inner.queue.len() == inner.capacity
}
pub fn len(&self) -> usize {
self.inner.read().queue.len()
}
pub fn receiver_count(&self) -> usize {
self.inner.read().receiver_count
}
pub fn inactive_receiver_count(&self) -> usize {
self.inner.read().inactive_receiver_count
}
pub fn sender_count(&self) -> usize {
self.inner.read().sender_count
}
}
impl<T> Clone for InactiveReceiver<T> {
fn clone(&self) -> Self {
self.inner.write().inactive_receiver_count += 1;
InactiveReceiver {
inner: self.inner.clone(),
}
}
}
impl<T> Drop for InactiveReceiver<T> {
fn drop(&mut self) {
let mut inner = self.inner.write();
inner.inactive_receiver_count -= 1;
inner.close_channel();
}
}