use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_core::FusedFuture;
use futures_util::FutureExt;
use crate::chan::{MailboxFull, MessageToAll, MessageToOne, RefCounter, WaitingSender};
use crate::envelope::{BroadcastEnvelopeConcrete, ReturningEnvelope};
use crate::{chan, Error, Handler};
#[must_use = "Futures do nothing unless polled"]
pub struct SendFuture<F, S> {
sending: F,
state: S,
}
pub struct ResolveToHandlerReturn<R>(Receiver<R>);
pub struct ResolveToReceiver<R>(Option<Receiver<R>>);
pub struct Broadcast(());
impl<F, R> SendFuture<F, ResolveToHandlerReturn<R>>
where
F: Future,
{
pub fn detach(self) -> SendFuture<F, ResolveToReceiver<R>> {
SendFuture {
sending: self.sending,
state: self.state.resolve_to_receiver(),
}
}
}
impl<F, S> SendFuture<F, S>
where
F: private::SetPriority,
{
pub fn priority(mut self, new_priority: u32) -> Self {
self.sending.set_priority(new_priority);
self
}
}
#[must_use = "Futures do nothing unless polled"]
pub struct ActorNamedSending<A, Rc: RefCounter>(Sending<A, MessageToOne<A>, Rc>);
#[must_use = "Futures do nothing unless polled"]
pub struct ActorNamedBroadcasting<A, Rc: RefCounter>(Sending<A, MessageToAll<A>, Rc>);
#[must_use = "Futures do nothing unless polled"]
pub struct ActorErasedSending(Box<dyn private::ErasedSending>);
impl<A, R, Rc> SendFuture<ActorNamedSending<A, Rc>, ResolveToHandlerReturn<R>>
where
R: Send + 'static,
Rc: RefCounter,
{
pub(crate) fn sending_named<M>(message: M, sender: chan::Ptr<A, Rc>) -> Self
where
A: Handler<M, Return = R>,
M: Send + 'static,
{
let (envelope, receiver) = ReturningEnvelope::<A, M, R>::new(message, 0);
Self {
sending: ActorNamedSending(Sending::New {
msg: Box::new(envelope) as MessageToOne<A>,
sender,
}),
state: ResolveToHandlerReturn::new(receiver),
}
}
}
impl<R> SendFuture<ActorErasedSending, ResolveToHandlerReturn<R>> {
pub(crate) fn sending_erased<A, M, Rc>(message: M, sender: chan::Ptr<A, Rc>) -> Self
where
Rc: RefCounter,
A: Handler<M, Return = R>,
M: Send + 'static,
R: Send + 'static,
{
let (envelope, receiver) = ReturningEnvelope::<A, M, R>::new(message, 0);
Self {
sending: ActorErasedSending(Box::new(Sending::New {
msg: Box::new(envelope) as MessageToOne<A>,
sender,
})),
state: ResolveToHandlerReturn::new(receiver),
}
}
}
impl<A, Rc> SendFuture<ActorNamedBroadcasting<A, Rc>, Broadcast>
where
Rc: RefCounter,
{
pub(crate) fn broadcast_named<M>(msg: M, sender: chan::Ptr<A, Rc>) -> Self
where
A: Handler<M, Return = ()>,
M: Clone + Send + Sync + 'static,
{
let envelope = BroadcastEnvelopeConcrete::new(msg, 0);
Self {
sending: ActorNamedBroadcasting(Sending::New {
msg: Arc::new(envelope) as MessageToAll<A>,
sender,
}),
state: Broadcast(()),
}
}
}
#[allow(dead_code)] impl SendFuture<ActorErasedSending, Broadcast> {
pub(crate) fn broadcast_erased<A, M, Rc>(msg: M, sender: chan::Ptr<A, Rc>) -> Self
where
Rc: RefCounter,
A: Handler<M, Return = ()>,
M: Clone + Send + Sync + 'static,
{
let envelope = BroadcastEnvelopeConcrete::new(msg, 0);
Self {
sending: ActorErasedSending(Box::new(Sending::New {
msg: Arc::new(envelope) as MessageToAll<A>,
sender,
})),
state: Broadcast(()),
}
}
}
#[must_use = "Futures do nothing unless polled"]
enum Sending<A, M, Rc: RefCounter> {
New { msg: M, sender: chan::Ptr<A, Rc> },
WaitingToSend(WaitingSender<M>),
Done,
}
impl<A, Rc> Future for Sending<A, MessageToOne<A>, Rc>
where
Rc: RefCounter,
{
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match mem::replace(this, Sending::Done) {
Sending::New { msg, sender } => match sender.try_send_to_one(msg)? {
Ok(()) => return Poll::Ready(Ok(())),
Err(MailboxFull(waiting)) => {
*this = Sending::WaitingToSend(waiting);
}
},
Sending::WaitingToSend(mut waiting) => {
return match waiting.poll_unpin(cx)? {
Poll::Ready(()) => Poll::Ready(Ok(())),
Poll::Pending => {
*this = Sending::WaitingToSend(waiting);
Poll::Pending
}
};
}
Sending::Done => panic!("Polled after completion"),
}
}
}
}
impl<A, Rc> Future for Sending<A, MessageToAll<A>, Rc>
where
Rc: RefCounter,
{
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match mem::replace(this, Sending::Done) {
Sending::New { msg, sender } => match sender.try_send_to_all(msg)? {
Ok(()) => return Poll::Ready(Ok(())),
Err(MailboxFull(waiting)) => {
*this = Sending::WaitingToSend(waiting);
}
},
Sending::WaitingToSend(mut waiting) => {
return match waiting.poll_unpin(cx)? {
Poll::Ready(()) => Poll::Ready(Ok(())),
Poll::Pending => {
*this = Sending::WaitingToSend(waiting);
Poll::Pending
}
};
}
Sending::Done => panic!("Polled after completion"),
}
}
}
}
impl<A, M, Rc> FusedFuture for Sending<A, M, Rc>
where
Self: Future,
Rc: RefCounter,
{
fn is_terminated(&self) -> bool {
matches!(self, Sending::Done)
}
}
impl<A, Rc: RefCounter> Future for ActorNamedSending<A, Rc> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().0.poll_unpin(cx)
}
}
impl<A, Rc> FusedFuture for ActorNamedSending<A, Rc>
where
Self: Future,
Rc: RefCounter,
{
fn is_terminated(&self) -> bool {
self.0.is_terminated()
}
}
impl<A, Rc: RefCounter> Future for ActorNamedBroadcasting<A, Rc> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().0.poll_unpin(cx)
}
}
impl<A, Rc> FusedFuture for ActorNamedBroadcasting<A, Rc>
where
Self: Future,
Rc: RefCounter,
{
fn is_terminated(&self) -> bool {
self.0.is_terminated()
}
}
impl Future for ActorErasedSending {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().0.poll_unpin(cx)
}
}
impl FusedFuture for ActorErasedSending {
fn is_terminated(&self) -> bool {
self.0.is_terminated()
}
}
impl<R, F> Future for SendFuture<F, ResolveToReceiver<R>>
where
F: Future<Output = Result<(), Error>> + FusedFuture + Unpin,
{
type Output = Result<Receiver<R>, Error>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
if !this.sending.is_terminated() {
futures_util::ready!(this.sending.poll_unpin(ctx))?;
}
let receiver = this.state.0.take().expect("polled after completion");
Poll::Ready(Ok(receiver))
}
}
impl<R, F> Future for SendFuture<F, ResolveToHandlerReturn<R>>
where
F: Future<Output = Result<(), Error>> + FusedFuture + Unpin,
{
type Output = Result<R, Error>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
if !this.sending.is_terminated() {
futures_util::ready!(this.sending.poll_unpin(ctx))?;
}
this.state.0.poll_unpin(ctx)
}
}
impl<F> Future for SendFuture<F, Broadcast>
where
F: Future<Output = Result<(), Error>> + Unpin,
{
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
self.get_mut().sending.poll_unpin(ctx)
}
}
#[must_use = "Futures do nothing unless polled"]
pub struct Receiver<R>(catty::Receiver<R>);
impl<R> Future for Receiver<R> {
type Output = Result<R, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut()
.0
.poll_unpin(cx)
.map_err(|_| Error::Interrupted)
}
}
impl<R> ResolveToHandlerReturn<R> {
fn new(receiver: catty::Receiver<R>) -> Self {
Self(Receiver(receiver))
}
fn resolve_to_receiver(self) -> ResolveToReceiver<R> {
ResolveToReceiver(Some(self.0))
}
}
mod private {
use super::*;
pub trait SetPriority {
fn set_priority(&mut self, priority: u32);
}
impl<A, Rc> SetPriority for Sending<A, MessageToOne<A>, Rc>
where
Rc: RefCounter,
{
fn set_priority(&mut self, new_priority: u32) {
match self {
Sending::New { msg, .. } => msg.set_priority(new_priority),
_ => panic!("Cannot set priority after first poll"),
}
}
}
impl<A, Rc> SetPriority for Sending<A, MessageToAll<A>, Rc>
where
Rc: RefCounter,
{
fn set_priority(&mut self, new_priority: u32) {
match self {
Sending::New { msg, .. } => Arc::get_mut(msg)
.expect("envelope is not cloned until here")
.set_priority(new_priority),
_ => panic!("Cannot set priority after first poll"),
}
}
}
impl<A, Rc> SetPriority for ActorNamedSending<A, Rc>
where
Rc: RefCounter,
{
fn set_priority(&mut self, priority: u32) {
self.0.set_priority(priority)
}
}
impl<A, Rc> SetPriority for ActorNamedBroadcasting<A, Rc>
where
Rc: RefCounter,
{
fn set_priority(&mut self, priority: u32) {
self.0.set_priority(priority)
}
}
impl SetPriority for ActorErasedSending {
fn set_priority(&mut self, priority: u32) {
self.0.set_priority(priority)
}
}
pub trait ErasedSending:
Future<Output = Result<(), Error>> + FusedFuture + SetPriority + Send + 'static + Unpin
{
}
impl<F> ErasedSending for F where
F: Future<Output = Result<(), Error>> + FusedFuture + SetPriority + Send + 'static + Unpin
{
}
}