use std::marker::PhantomData;
use std::ops::ControlFlow;
use std::sync::Arc;
use catty::{Receiver, Sender};
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use crate::chan::{HasPriority, MessageToAll, MessageToOne, Priority};
use crate::context::Context;
use crate::instrumentation::{Instrumentation, Span};
use crate::{Actor, Handler, Mailbox};
pub trait MessageEnvelope: HasPriority + Send {
type Actor;
fn set_priority(&mut self, new_priority: u32);
fn start_span(&mut self);
fn handle(
self: Box<Self>,
act: &mut Self::Actor,
mailbox: Mailbox<Self::Actor>,
) -> (BoxFuture<ControlFlow<(), ()>>, Span);
}
pub struct ReturningEnvelope<A, M, R> {
message: M,
result_sender: Sender<R>,
priority: u32,
phantom: PhantomData<for<'a> fn(&'a A)>,
instrumentation: Instrumentation,
}
impl<A, M, R: Send + 'static> ReturningEnvelope<A, M, R> {
pub fn new(message: M, priority: u32) -> (Self, Receiver<R>) {
let (tx, rx) = catty::oneshot();
let envelope = ReturningEnvelope {
message,
result_sender: tx,
priority,
phantom: PhantomData,
instrumentation: Instrumentation::empty(),
};
(envelope, rx)
}
}
impl<A, M, R> HasPriority for ReturningEnvelope<A, M, R> {
fn priority(&self) -> Priority {
Priority::Valued(self.priority)
}
}
impl<A> HasPriority for MessageToOne<A> {
fn priority(&self) -> Priority {
self.as_ref().priority()
}
}
impl<A, M, R> MessageEnvelope for ReturningEnvelope<A, M, R>
where
A: Handler<M, Return = R>,
M: Send + 'static,
R: Send + 'static,
{
type Actor = A;
fn set_priority(&mut self, new_priority: u32) {
self.priority = new_priority;
}
fn start_span(&mut self) {
assert!(self.instrumentation.is_parent_none());
self.instrumentation = Instrumentation::started::<A, M>();
}
fn handle(
self: Box<Self>,
act: &mut Self::Actor,
mailbox: Mailbox<Self::Actor>,
) -> (BoxFuture<ControlFlow<(), ()>>, Span) {
let Self {
message,
result_sender,
instrumentation,
..
} = *self;
let fut = async move {
let mut ctx = Context {
running: true,
mailbox,
};
let r = act.handle(message, &mut ctx).await;
if ctx.running {
(r, ControlFlow::Continue(()))
} else {
(r, ControlFlow::Break(()))
}
};
let (fut, span) = instrumentation.apply::<_>(fut);
let fut = Box::pin(fut.map(move |(r, flow)| {
let _ = result_sender.send(r);
flow
}));
(fut, span)
}
}
pub trait BroadcastEnvelope: HasPriority + Send + Sync {
type Actor;
fn set_priority(&mut self, new_priority: u32);
fn start_span(&mut self);
fn handle(
self: Arc<Self>,
act: &mut Self::Actor,
mailbox: Mailbox<Self::Actor>,
) -> (BoxFuture<ControlFlow<()>>, Span);
}
impl<A> HasPriority for MessageToAll<A> {
fn priority(&self) -> Priority {
self.as_ref().priority()
}
}
pub struct BroadcastEnvelopeConcrete<A, M> {
message: M,
priority: u32,
phantom: PhantomData<for<'a> fn(&'a A)>,
instrumentation: Instrumentation,
}
impl<A: Actor, M> BroadcastEnvelopeConcrete<A, M> {
pub fn new(message: M, priority: u32) -> Self {
BroadcastEnvelopeConcrete {
message,
priority,
phantom: PhantomData,
instrumentation: Instrumentation::empty(),
}
}
}
impl<A, M> BroadcastEnvelope for BroadcastEnvelopeConcrete<A, M>
where
A: Handler<M, Return = ()>,
M: Clone + Send + Sync + 'static,
{
type Actor = A;
fn set_priority(&mut self, new_priority: u32) {
self.priority = new_priority;
}
fn start_span(&mut self) {
assert!(self.instrumentation.is_parent_none());
self.instrumentation = Instrumentation::started::<A, M>();
}
fn handle(
self: Arc<Self>,
act: &mut Self::Actor,
mailbox: Mailbox<Self::Actor>,
) -> (BoxFuture<ControlFlow<(), ()>>, Span) {
let (msg, instrumentation) = (self.message.clone(), self.instrumentation.clone());
drop(self); let fut = async move {
let mut ctx = Context {
running: true,
mailbox,
};
act.handle(msg, &mut ctx).await;
if ctx.running {
ControlFlow::Continue(())
} else {
ControlFlow::Break(())
}
};
let (fut, span) = instrumentation.apply::<_>(fut);
(Box::pin(fut), span)
}
}
impl<A, M> HasPriority for BroadcastEnvelopeConcrete<A, M> {
fn priority(&self) -> Priority {
Priority::Valued(self.priority)
}
}
#[derive(Copy, Clone, Default)]
pub struct Shutdown<A>(PhantomData<for<'a> fn(&'a A)>);
impl<A> Shutdown<A> {
pub fn new() -> Self {
Shutdown(PhantomData)
}
pub fn handle() -> (BoxFuture<'static, ControlFlow<()>>, Span) {
let fut = Box::pin(async { ControlFlow::Break(()) });
(fut, Span::none())
}
}
impl<A> HasPriority for Shutdown<A> {
fn priority(&self) -> Priority {
Priority::Shutdown
}
}
impl<A> BroadcastEnvelope for Shutdown<A>
where
A: Actor,
{
type Actor = A;
fn set_priority(&mut self, _: u32) {}
fn start_span(&mut self) {}
fn handle(
self: Arc<Self>,
_act: &mut Self::Actor,
_mailbox: Mailbox<Self::Actor>,
) -> (BoxFuture<ControlFlow<()>>, Span) {
Self::handle()
}
}