use std::any::Any;
use std::fmt;
use async_trait::async_trait;
use tokio::sync::oneshot;
use crate::actor::DeferableReplyHandler;
use crate::scheduler::NoAdvanceTimeGuard;
use crate::{Actor, ActorContext, ActorExitStatus};
pub struct Envelope<A> {
handler_envelope: Box<dyn EnvelopeT<A>>,
_no_advance_time_guard: Option<NoAdvanceTimeGuard>,
}
impl<A: Actor> Envelope<A> {
pub fn message(&mut self) -> Box<dyn Any> {
self.handler_envelope.message()
}
pub fn message_typed<M: 'static>(&mut self) -> Option<M> {
if let Ok(boxed_msg) = self.handler_envelope.message().downcast::<M>() {
Some(*boxed_msg)
} else {
None
}
}
pub async fn handle_message(
&mut self,
actor: &mut A,
ctx: &ActorContext<A>,
) -> Result<(), ActorExitStatus> {
self.handler_envelope.handle_message(actor, ctx).await?;
Ok(())
}
}
impl<A: Actor> fmt::Debug for Envelope<A> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg_str = self.handler_envelope.debug_msg();
f.debug_tuple("Envelope").field(&msg_str).finish()
}
}
#[async_trait]
trait EnvelopeT<A: Actor>: Send + Sync {
fn debug_msg(&self) -> String;
fn message(&mut self) -> Box<dyn Any>;
async fn handle_message(
&mut self,
actor: &mut A,
ctx: &ActorContext<A>,
) -> Result<(), ActorExitStatus>;
}
#[async_trait]
impl<A, M> EnvelopeT<A> for Option<(oneshot::Sender<A::Reply>, M)>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
fn debug_msg(&self) -> String {
#[allow(clippy::needless_option_take)]
if let Some((_response_tx, msg)) = self.as_ref().take() {
format!("{msg:?}")
} else {
"<consumed>".to_string()
}
}
fn message(&mut self) -> Box<dyn Any> {
if let Some((_, message)) = self.take() {
Box::new(message)
} else {
Box::new(())
}
}
async fn handle_message(
&mut self,
actor: &mut A,
ctx: &ActorContext<A>,
) -> Result<(), ActorExitStatus> {
let (response_tx, msg) = self
.take()
.expect("handle_message should never be called twice.");
actor
.handle_message(
msg,
|response| {
let _ = response_tx.send(response);
},
ctx,
)
.await?;
Ok(())
}
}
pub(crate) fn wrap_in_envelope<A, M>(
msg: M,
no_advance_time_guard: Option<NoAdvanceTimeGuard>,
) -> (Envelope<A>, oneshot::Receiver<A::Reply>)
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let (response_tx, response_rx) = oneshot::channel();
let handler_envelope = Some((response_tx, msg));
let envelope = Envelope {
handler_envelope: Box::new(handler_envelope),
_no_advance_time_guard: no_advance_time_guard,
};
(envelope, response_rx)
}