use std::task::{Context, Poll};
use futures_util::FutureExt;
use crate::chan::{self, ActorMessage, BroadcastQueue, MessageToOne, Rx};
pub struct WaitingReceiver<A>(catty::Receiver<CtrlMsg<A>>);
pub struct Handle<A>(catty::Sender<CtrlMsg<A>>);
impl<A> Handle<A> {
pub fn notify_channel_shutdown(self) {
let _ = self.0.send(CtrlMsg::Shutdown);
}
pub fn notify_new_broadcast(self) -> Result<(), ()> {
self.0.send(CtrlMsg::NewBroadcast).map_err(|_| ())
}
pub fn notify_new_message(self, msg: MessageToOne<A>) -> Result<(), MessageToOne<A>> {
self.0
.send(CtrlMsg::NewMessage(msg))
.map_err(|reason| match reason {
CtrlMsg::NewMessage(msg) => msg,
_ => unreachable!(),
})
}
}
impl<A> WaitingReceiver<A> {
pub fn new() -> (WaitingReceiver<A>, Handle<A>) {
let (sender, receiver) = catty::oneshot();
(WaitingReceiver(receiver), Handle(sender))
}
pub fn cancel(&mut self) -> Option<MessageToOne<A>> {
match self.0.try_recv() {
Ok(Some(CtrlMsg::NewMessage(msg))) => Some(msg),
_ => None,
}
}
pub fn poll(
&mut self,
chan: &chan::Ptr<A, Rx>,
broadcast_mailbox: &BroadcastQueue<A>,
cx: &mut Context<'_>,
) -> Poll<Option<ActorMessage<A>>> {
let ctrl_msg = match futures_util::ready!(self.0.poll_unpin(cx)) {
Ok(reason) => reason,
Err(_) => return Poll::Ready(None), };
let actor_message = match ctrl_msg {
CtrlMsg::NewMessage(msg) => ActorMessage::ToOneActor(msg),
CtrlMsg::Shutdown => ActorMessage::Shutdown,
CtrlMsg::NewBroadcast => match chan.next_broadcast_message(broadcast_mailbox) {
Some(msg) => ActorMessage::ToAllActors(msg),
None => return Poll::Ready(None),
},
};
Poll::Ready(Some(actor_message))
}
}
enum CtrlMsg<A> {
NewMessage(MessageToOne<A>),
NewBroadcast,
Shutdown,
}