use crate::*;
use event_listener as el;
use futures::{Future, FutureExt, Stream};
use std::{
fmt::Debug,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct Inbox<T> {
channel: Arc<Channel<T>>,
listener: Option<el::EventListener>,
signaled_halt: bool,
}
impl<T> Inbox<T> {
pub(crate) fn from_channel(channel: Arc<Channel<T>>) -> Self {
Inbox {
channel,
listener: None,
signaled_halt: false,
}
}
pub(crate) fn try_from_channel(channel: Arc<Channel<T>>) -> Option<Self> {
match channel.try_add_inbox() {
Ok(()) => Some(Self {
channel,
listener: None,
signaled_halt: false,
}),
Err(()) => None,
}
}
pub fn try_recv(&mut self) -> Result<Option<T>, RecvError> {
if !self.signaled_halt && self.channel.inbox_should_halt() {
self.signaled_halt = true;
Err(RecvError::Halted)
} else {
self.channel
.take_next_msg()
.map_err(|()| RecvError::ClosedAndEmpty)
}
}
pub fn recv(&mut self) -> Rcv<'_, T> {
Rcv { inbox: self }
}
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
try_send(&self.channel, msg)
}
pub fn send_now(&self, msg: T) -> Result<(), TrySendError<T>> {
send_now(&self.channel, msg)
}
pub fn send(&self, msg: T) -> Snd<'_, T> {
send(&self.channel, msg)
}
pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
send_blocking(&self.channel, msg)
}
pub fn recv_blocking(&mut self) -> Result<T, RecvError> {
loop {
match self.try_recv() {
Ok(None) => (),
Ok(Some(msg)) => {
self.listener = None;
return Ok(msg);
}
Err(signal) => {
self.listener = None;
match signal {
RecvError::Halted => return Err(RecvError::Halted),
RecvError::ClosedAndEmpty => return Err(RecvError::ClosedAndEmpty),
}
}
}
self.channel.recv_listener().wait();
}
}
pub fn close(&self) -> bool {
self.channel.close()
}
pub fn halt(&self) {
self.channel.halt_n(u32::MAX)
}
pub fn halt_some(&self, n: u32) {
self.channel.halt_n(n)
}
pub fn inbox_count(&self) -> usize {
self.channel.inbox_count()
}
pub fn message_count(&self) -> usize {
self.channel.msg_count()
}
pub fn address_count(&self) -> usize {
self.channel.address_count()
}
pub fn is_closed(&self) -> bool {
self.channel.is_closed()
}
pub fn capacity(&self) -> &Capacity {
self.channel.capacity()
}
pub(crate) fn clone_inbox(&self) -> Self {
self.channel.add_inbox();
Self {
channel: self.channel.clone(),
listener: None,
signaled_halt: self.signaled_halt.clone(),
}
}
}
impl<T> Stream for Inbox<T> {
type Item = Result<T, Halted>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
match self.try_recv() {
Ok(None) => (),
Ok(Some(msg)) => {
self.listener = None;
return Poll::Ready(Some(Ok(msg)));
}
Err(signal) => {
self.listener = None;
match signal {
RecvError::Halted => return Poll::Ready(Some(Err(Halted))),
RecvError::ClosedAndEmpty => return Poll::Ready(None),
}
}
}
if self.listener.is_none() {
self.listener = Some(self.channel.recv_listener())
}
match self.listener.as_mut().unwrap().poll_unpin(cx) {
Poll::Ready(()) => {}
Poll::Pending => return Poll::Pending,
}
}
}
}
impl<T> Drop for Inbox<T> {
fn drop(&mut self) {
self.channel.remove_inbox()
}
}
impl<T> Debug for Inbox<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inbox")
.field("listener", &self.listener)
.field("signaled_halt", &self.signaled_halt)
.finish()
}
}
pub struct Rcv<'a, T> {
inbox: &'a mut Inbox<T>,
}
impl<'a, T> Unpin for Rcv<'a, T> {}
impl<'a, T> Future for Rcv<'a, T> {
type Output = Result<T, RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.inbox.try_recv() {
Ok(None) => (),
Ok(Some(msg)) => {
return Poll::Ready(Ok(msg));
}
Err(signal) => match signal {
RecvError::Halted => return Poll::Ready(Err(RecvError::Halted)),
RecvError::ClosedAndEmpty => {
return Poll::Ready(Err(RecvError::ClosedAndEmpty))
}
},
}
if self.inbox.listener.is_none() {
self.inbox.listener = Some(self.inbox.channel.recv_listener())
}
match self.inbox.listener.as_mut().unwrap().poll_unpin(cx) {
Poll::Ready(()) => self.inbox.listener = None,
Poll::Pending => return Poll::Pending,
}
}
}
}
impl<'a, T> Debug for Rcv<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Rcv").field("inbox", &self.inbox).finish()
}
}
#[derive(Debug, thiserror::Error)]
#[error("This inbox has been halted")]
pub struct Halted;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum RecvError {
Halted,
ClosedAndEmpty,
}