use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use parking_lot::MutexGuard;
use super::Inner;
use crate::err::Error;
#[derive(PartialEq)]
pub enum TryRecv<T> {
Some(T),
None,
EOF
}
pub struct Receiver<T, E>(pub(super) Arc<super::Shared<T, E>>);
impl<T, E> Receiver<T, E> {
#[inline]
fn lock_inner(&self) -> MutexGuard<'_, Inner<T, E>> {
self.0.inner.lock()
}
#[inline]
fn signal_sender(&self, inner: &mut Inner<T, E>) {
self.0.signal.notify_all();
if let Some(ref mut sctx) = inner.sctx {
if let Some(waker) = sctx.waker.take() {
waker.wake();
}
}
}
#[inline]
fn check_error(&self, inner: &mut Inner<T, E>) -> Result<(), Error<E>> {
if let Some(ref mut rctx) = inner.rctx {
if let Some(err) = rctx.error.take() {
return Err(err);
}
}
Ok(())
}
#[inline]
fn blocking_wait(&self, inner: &mut MutexGuard<'_, Inner<T, E>>) {
self.0.signal.wait(inner);
}
}
impl<T, E> Receiver<T, E> {
pub fn recv(&self) -> Result<Option<T>, Error<E>> {
let mut inner = self.lock_inner();
loop {
self.check_error(&mut inner)?;
if let Some(n) = inner.q.pop_front() {
if let Some(ref mut nrecs) = inner.recv_recs {
*nrecs = nrecs.saturating_add(1);
}
self.signal_sender(&mut inner);
break Ok(Some(n));
} else if inner.sctx.is_none() {
return Ok(None);
}
self.blocking_wait(&mut inner);
}
}
pub fn recv_all(&self) -> Result<Option<Vec<T>>, Error<E>> {
let mut inner = self.lock_inner();
self.check_error(&mut inner)?;
while inner.q.is_empty() {
if inner.sctx.is_none() {
return Ok(None);
}
self.blocking_wait(&mut inner);
self.check_error(&mut inner)?;
}
let ret = Vec::from_iter(inner.q.drain(..));
if let Some(ref mut nrecs) = inner.recv_recs {
*nrecs = nrecs.saturating_add(ret.len());
}
self.signal_sender(&mut inner);
Ok(Some(ret))
}
pub fn recv_atmost(&self, lim: usize) -> Result<Option<Vec<T>>, Error<E>> {
assert!(lim != 0);
let mut inner = self.lock_inner();
self.check_error(&mut inner)?;
while inner.q.is_empty() {
if inner.sctx.is_none() {
return Ok(None);
}
self.blocking_wait(&mut inner);
self.check_error(&mut inner)?;
}
let n = std::cmp::min(inner.q.len(), lim);
let ret = Vec::from_iter(inner.q.drain(..n));
if let Some(ref mut nrecs) = inner.recv_recs {
*nrecs = nrecs.saturating_add(ret.len());
}
self.signal_sender(&mut inner);
Ok(Some(ret))
}
pub fn try_recv(&self) -> Result<TryRecv<T>, Error<E>> {
let mut inner = self.lock_inner();
self.check_error(&mut inner)?;
if let Some(n) = inner.q.pop_front() {
if let Some(ref mut nrecs) = inner.recv_recs {
*nrecs = nrecs.saturating_add(1);
}
self.signal_sender(&mut inner);
Ok(TryRecv::Some(n))
} else if inner.sctx.is_none() {
Ok(TryRecv::EOF)
} else {
Ok(TryRecv::None)
}
}
pub fn recv_async(&self) -> RecvFuture<T, E> {
RecvFuture(self)
}
pub fn recv_all_async(&self) -> RecvAllFuture<T, E> {
RecvAllFuture(self)
}
pub fn recv_atmost_async(&self, lim: usize) -> RecvAtMostFuture<T, E> {
assert!(lim != 0);
RecvAtMostFuture { rcv: self, lim }
}
pub fn fail(self, e: E) {
let mut inner = self.lock_inner();
if let Some(ref mut sctx) = inner.sctx {
sctx.error = Some(Error::App(e));
self.signal_sender(&mut inner);
}
}
}
impl<T, E> Drop for Receiver<T, E> {
fn drop(&mut self) {
let mut inner = self.lock_inner();
let _ = inner.rctx.take();
self.signal_sender(&mut inner);
}
}
pub struct RecvFuture<'a, T, E>(pub(super) &'a Receiver<T, E>);
impl<'a, T, E> Future for RecvFuture<'a, T, E> {
type Output = Result<Option<T>, Error<E>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = self.0.lock_inner();
if let Err(e) = self.0.check_error(&mut inner) {
return Poll::Ready(Err(e));
}
if let Some(n) = inner.q.pop_front() {
if let Some(ref mut nrecs) = inner.recv_recs {
*nrecs = nrecs.saturating_add(1);
}
self.0.signal_sender(&mut inner);
Poll::Ready(Ok(Some(n)))
} else if inner.sctx.is_none() {
Poll::Ready(Ok(None))
} else {
let Some(ref mut rctx) = inner.rctx else {
panic!("Internal error; Receiver has been dropped");
};
rctx.waker = Some(ctx.waker().clone());
Poll::Pending
}
}
}
pub struct RecvAllFuture<'a, T, E>(pub(super) &'a Receiver<T, E>);
impl<'a, T, E> Future for RecvAllFuture<'a, T, E> {
type Output = Result<Option<Vec<T>>, Error<E>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = self.0.lock_inner();
if let Err(e) = self.0.check_error(&mut inner) {
return Poll::Ready(Err(e));
}
if inner.q.is_empty() {
if inner.sctx.is_none() {
Poll::Ready(Ok(None))
} else {
let Some(ref mut rctx) = inner.rctx else {
panic!("Internal error; Receiver has been dropped");
};
rctx.waker = Some(ctx.waker().clone());
Poll::Pending
}
} else {
let ret = Vec::from_iter(inner.q.drain(..));
if let Some(ref mut nrecs) = inner.recv_recs {
*nrecs = nrecs.saturating_add(ret.len());
}
self.0.signal_sender(&mut inner);
Poll::Ready(Ok(Some(ret)))
}
}
}
pub struct RecvAtMostFuture<'a, T, E> {
rcv: &'a Receiver<T, E>,
lim: usize
}
impl<'a, T, E> Future for RecvAtMostFuture<'a, T, E> {
type Output = Result<Option<Vec<T>>, Error<E>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = self.rcv.lock_inner();
if let Err(e) = self.rcv.check_error(&mut inner) {
return Poll::Ready(Err(e));
}
if inner.q.is_empty() {
if inner.sctx.is_none() {
Poll::Ready(Ok(None))
} else {
let Some(ref mut rctx) = inner.rctx else {
panic!("Internal error; Receiver has been dropped");
};
rctx.waker = Some(ctx.waker().clone());
Poll::Pending
}
} else {
let n = std::cmp::min(inner.q.len(), self.lim);
let ret = Vec::from_iter(inner.q.drain(..n));
if let Some(ref mut nrecs) = inner.recv_recs {
*nrecs = nrecs.saturating_add(ret.len());
}
self.rcv.signal_sender(&mut inner);
Poll::Ready(Ok(Some(ret)))
}
}
}