use std::collections::VecDeque;
use std::error::Error;
use std::fmt::{self, Debug, Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use crate::bi_rc::BiRc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SendError<T>(pub T);
impl<T> Display for SendError<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "channel disconnected")
}
}
impl<T> Error for SendError<T> where T: Debug {}
struct Shared<T> {
tx: Option<Waker>,
rx: Option<Waker>,
buf: VecDeque<T>,
unbounded: bool,
}
impl<T> Shared<T> {
fn at_capacity(&self) -> bool {
!self.unbounded && self.buf.capacity() == self.buf.len()
}
}
pub struct Sender<T> {
inner: BiRc<Shared<T>>,
}
impl<T> Sender<T> {
pub fn try_send(&mut self, value: T) -> Result<(), SendError<T>> {
unsafe {
let (inner, both_present) = self.inner.get_mut_unchecked();
if !both_present || inner.at_capacity() {
return Err(SendError(value));
}
inner.buf.push_back(value);
if let Some(waker) = &inner.rx {
waker.wake_by_ref();
};
Ok(())
}
}
pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
Send {
inner: &self.inner,
value: Some(value),
}
.await
}
}
struct Send<'a, T> {
inner: &'a BiRc<Shared<T>>,
value: Option<T>,
}
impl<'a, T> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
unsafe {
let (inner, both_present) = this.inner.get_mut_unchecked();
if !both_present {
inner.tx = None;
let value = this.value.take().expect("future already completed");
return Poll::Ready(Err(SendError(value)));
}
if inner.at_capacity() {
if !matches!(&inner.tx, Some(w) if w.will_wake(cx.waker())) {
inner.tx = Some(cx.waker().clone());
}
return Poll::Pending;
};
inner
.buf
.push_back(this.value.take().expect("future already completed"));
if let Some(waker) = &inner.rx {
waker.wake_by_ref();
};
Poll::Ready(Ok(()))
}
}
}
impl<T> Unpin for Send<'_, T> {}
pub struct Receiver<T> {
inner: BiRc<Shared<T>>,
}
impl<T> Receiver<T> {
pub async fn recv(&mut self) -> Option<T> {
Recv { inner: &self.inner }.await
}
}
struct Recv<'a, T> {
inner: &'a BiRc<Shared<T>>,
}
impl<'a, T> Future for Recv<'a, T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
unsafe {
let (inner, both_present) = this.inner.get_mut_unchecked();
if let Some(value) = inner.buf.pop_front() {
return Poll::Ready(Some(value));
}
if !both_present {
inner.rx = None;
return Poll::Ready(None);
}
if !matches!(&inner.rx, Some(w) if !w.will_wake(cx.waker())) {
inner.rx = Some(cx.waker().clone())
}
if let Some(tx) = &inner.tx {
tx.wake_by_ref();
}
Poll::Pending
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
if let Some(waker) = self.inner.get_mut_unchecked().0.rx.take() {
waker.wake();
}
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
if let Some(waker) = self.inner.get_mut_unchecked().0.tx.take() {
waker.wake();
}
}
}
}
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0, "capacity cannot be 0");
let (a, b) = BiRc::new(Shared {
tx: None,
rx: None,
buf: VecDeque::with_capacity(capacity),
unbounded: false,
});
let rx = Receiver { inner: a };
let tx = Sender { inner: b };
(tx, rx)
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let (a, b) = BiRc::new(Shared {
tx: None,
rx: None,
buf: VecDeque::new(),
unbounded: true,
});
let rx = Receiver { inner: a };
let tx = Sender { inner: b };
(tx, rx)
}