use std::{any, fmt};
use futures::{Future, FutureExt, future::BoxFuture};
use crate::{
Actor,
actor::ActorRef,
error::{self, PanicError, PanicReason, SendError},
reply::{BoxReplySender, DelegatedReply, ForwardedReply, Reply, ReplyError, ReplySender},
};
pub type BoxMessage<A> = Box<dyn DynMessage<A>>;
pub type BoxReply = Box<dyn any::Any + Send>;
pub trait Message<T: Send + 'static>: Actor {
type Reply: Reply;
#[inline]
fn name() -> &'static str {
any::type_name::<T>()
}
fn handle(
&mut self,
msg: T,
ctx: &mut Context<Self, Self::Reply>,
) -> impl Future<Output = Self::Reply> + Send;
}
#[derive(Clone, Debug)]
pub enum StreamMessage<T, S, F> {
Next(T),
Started(S),
Finished(F),
}
pub struct Context<A, R>
where
A: Actor,
R: Reply + ?Sized,
{
actor_ref: ActorRef<A>,
reply: Option<ReplySender<R::Value>>,
stop: bool,
}
impl<A, R> Context<A, R>
where
A: Actor,
R: Reply + ?Sized,
{
pub(crate) fn new(
actor_ref: ActorRef<A>,
reply: Option<ReplySender<R::Value>>,
stop: bool,
) -> Self {
Context {
actor_ref,
reply,
stop,
}
}
pub fn actor_ref(&self) -> &ActorRef<A> {
&self.actor_ref
}
pub fn stop(&mut self) {
self.stop = true;
}
#[must_use = "the reply must be sent to the ReplySender"]
pub fn reply_sender(&mut self) -> (DelegatedReply<R::Value>, Option<ReplySender<R::Value>>) {
(DelegatedReply::new(), self.reply.take())
}
pub fn reply(&mut self, reply: R::Value) -> DelegatedReply<R::Value> {
if let Some(reply_sender) = self.reply.take() {
reply_sender.send(reply);
}
DelegatedReply::new()
}
pub fn spawn<F>(&mut self, future: F) -> DelegatedReply<R::Value>
where
F: Future<Output = R::Value> + Send + 'static,
{
let (delegated_reply, reply_sender) = self.reply_sender();
tokio::spawn(async move {
let reply = future.await;
match reply_sender {
Some(tx) => {
tx.send(reply);
}
None => {
if let Some(err) = reply.into_any_err() {
error::invoke_actor_error_hook(&PanicError::new(
err,
PanicReason::OnMessage,
));
}
}
}
});
delegated_reply
}
pub async fn forward<B, M>(
&mut self,
actor_ref: &ActorRef<B>,
message: M,
) -> ForwardedReply<M, <B as Message<M>>::Reply>
where
B: Message<M>,
M: Send + 'static,
{
match self.reply.take() {
Some(tx) => {
let res = actor_ref
.ask(message)
.forward(tx.cast())
.await
.map_err(|err| {
err.map_msg(|(msg, tx)| {
self.reply = Some(tx.cast());
msg
})
});
ForwardedReply::new(res)
}
None => {
let res = actor_ref
.tell(message)
.send()
.await
.map_err(SendError::reset_err_infallible);
ForwardedReply::new(res)
}
}
}
pub fn try_forward<B, M>(
&mut self,
actor_ref: &ActorRef<B>,
message: M,
) -> ForwardedReply<M, <B as Message<M>>::Reply>
where
B: Message<M>,
M: Send + 'static,
{
match self.reply.take() {
Some(tx) => {
let res = actor_ref
.ask(message)
.try_forward(tx.cast())
.map_err(|err| {
err.map_msg(|(msg, tx)| {
self.reply = Some(tx.cast());
msg
})
});
ForwardedReply::new(res)
}
None => {
let res = actor_ref
.tell(message)
.try_send()
.map_err(SendError::reset_err_infallible);
ForwardedReply::new(res)
}
}
}
pub fn blocking_forward<B, M>(
&mut self,
actor_ref: &ActorRef<B>,
message: M,
) -> ForwardedReply<M, <B as Message<M>>::Reply>
where
B: Message<M>,
M: Send + 'static,
{
match self.reply.take() {
Some(tx) => {
let res = actor_ref
.ask(message)
.blocking_forward(tx.cast())
.map_err(|err| {
err.map_msg(|(msg, tx)| {
self.reply = Some(tx.cast());
msg
})
});
ForwardedReply::new(res)
}
None => {
let res = actor_ref
.tell(message)
.blocking_send()
.map_err(SendError::reset_err_infallible);
ForwardedReply::new(res)
}
}
}
}
impl<A, R> fmt::Debug for Context<A, R>
where
A: Actor,
R: Reply + ?Sized,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context")
.field("actor_ref", &self.actor_ref)
.field("reply", &self.reply)
.finish()
}
}
pub trait DynMessage<A>
where
Self: Send,
A: Actor,
{
fn handle_dyn<'a>(
self: Box<Self>,
state: &'a mut A,
actor_ref: ActorRef<A>,
tx: Option<BoxReplySender>,
stop: &'a mut bool,
) -> BoxFuture<'a, Result<(), Box<dyn ReplyError>>>;
fn as_any(self: Box<Self>) -> Box<dyn any::Any>;
}
impl<A, T> DynMessage<A> for T
where
A: Actor + Message<T>,
T: Send + 'static,
{
fn handle_dyn<'a>(
self: Box<Self>,
state: &'a mut A,
actor_ref: ActorRef<A>,
tx: Option<BoxReplySender>,
stop: &'a mut bool,
) -> BoxFuture<'a, Result<(), Box<dyn ReplyError>>> {
async move {
let reply_sender = tx.map(ReplySender::new);
let mut ctx: Context<A, <A as Message<T>>::Reply> =
Context::new(actor_ref, reply_sender, *stop);
let reply = Message::handle(state, *self, &mut ctx).await;
*stop = ctx.stop;
if let Some(tx) = ctx.reply.take() {
tx.send(reply.into_value());
Ok(())
} else {
match reply.into_any_err() {
Some(err) => Err(err),
None => Ok(()),
}
}
}
.boxed()
}
fn as_any(self: Box<Self>) -> Box<dyn any::Any> {
self
}
}