use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_core::FusedFuture;
use futures_util::FutureExt;
use crate::chan::{self, ActorMessage, BroadcastQueue, Rx, WaitingReceiver};
#[must_use = "Futures do nothing unless polled"]
pub struct ReceiveFuture<A>(Receiving<A>);
pub struct Message<A> {
pub(crate) inner: ActorMessage<A>,
pub(crate) channel: chan::Ptr<A, Rx>,
pub(crate) broadcast_mailbox: Arc<BroadcastQueue<A>>,
}
impl<A> ReceiveFuture<A> {
pub(crate) fn new(
channel: chan::Ptr<A, Rx>,
broadcast_mailbox: Arc<BroadcastQueue<A>>,
) -> Self {
Self(Receiving::New {
channel,
broadcast_mailbox,
})
}
}
impl<A> Future for ReceiveFuture<A> {
type Output = Message<A>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}
#[must_use = "Futures do nothing unless polled"]
enum Receiving<A> {
New {
channel: chan::Ptr<A, Rx>,
broadcast_mailbox: Arc<BroadcastQueue<A>>,
},
Waiting(Waiting<A>),
Done,
}
#[must_use = "Futures do nothing unless polled"]
pub struct Waiting<A> {
channel: Option<chan::Ptr<A, Rx>>,
broadcast_mailbox: Option<Arc<BroadcastQueue<A>>>,
waiting_receiver: WaitingReceiver<A>,
}
impl<A> Future for Waiting<A> {
type Output = Result<
(ActorMessage<A>, chan::Ptr<A, Rx>, Arc<BroadcastQueue<A>>),
(chan::Ptr<A, Rx>, Arc<BroadcastQueue<A>>),
>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let maybe_message = futures_util::ready!(this.waiting_receiver.poll(
this.channel
.as_mut()
.expect("to not be polled after completion"),
this.broadcast_mailbox
.as_mut()
.expect("to not be polled after completion"),
cx
));
let channel = this
.channel
.take()
.expect("to not be polled after completion");
let mailbox = this
.broadcast_mailbox
.take()
.expect("to not be polled after completion");
let result = match maybe_message {
None => Err((channel, mailbox)),
Some(msg) => Ok((msg, channel, mailbox)),
};
Poll::Ready(result)
}
}
impl<A> Drop for Waiting<A> {
fn drop(&mut self) {
if let Some(msg) = self.waiting_receiver.cancel() {
self.channel
.as_mut()
.expect("to not have message on drop but channel is gone")
.requeue_message(msg);
}
}
}
impl<A> Future for Receiving<A> {
type Output = Message<A>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Message<A>> {
let this = self.get_mut();
loop {
match mem::replace(this, Receiving::Done) {
Receiving::New {
channel,
broadcast_mailbox,
} => match channel.try_recv(broadcast_mailbox.as_ref()) {
Ok(inner) => {
return Poll::Ready(Message {
inner,
channel,
broadcast_mailbox,
})
}
Err(waiting) => {
*this = Receiving::Waiting(Waiting {
channel: Some(channel),
broadcast_mailbox: Some(broadcast_mailbox),
waiting_receiver: waiting,
});
}
},
Receiving::Waiting(mut inner) => match inner.poll_unpin(cx) {
Poll::Ready(Ok((msg, channel, broadcast_mailbox))) => {
return Poll::Ready(Message {
inner: msg,
channel,
broadcast_mailbox,
})
}
Poll::Ready(Err((channel, broadcast_mailbox))) => {
*this = Receiving::New {
channel,
broadcast_mailbox,
};
}
Poll::Pending => {
*this = Receiving::Waiting(inner);
return Poll::Pending;
}
},
Receiving::Done => panic!("polled after completion"),
}
}
}
}
impl<A> FusedFuture for Receiving<A> {
fn is_terminated(&self) -> bool {
matches!(self, Receiving::Done)
}
}
impl<A> FusedFuture for ReceiveFuture<A> {
fn is_terminated(&self) -> bool {
self.0.is_terminated()
}
}