use std::any::Any;
use std::fmt;
use async_trait::async_trait;
use tokio::sync::oneshot;
use tracing::Instrument;
use crate::{Actor, ActorContext, ActorExitStatus, Handler};
#[async_trait]
pub(crate) trait Envelope<A: Actor>: Send + Sync {
fn debug_msg(&self) -> String;
fn message(&mut self) -> Box<dyn Any>;
async fn handle_message(
&mut self,
msg_id: u64,
actor: &mut A,
ctx: &ActorContext<A>,
) -> Result<(), ActorExitStatus>;
}
#[async_trait]
impl<A, M> Envelope<A> for Option<(oneshot::Sender<A::Reply>, M)>
where
A: Handler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
fn debug_msg(&self) -> String {
if let Some((_response_tx, msg)) = self.as_ref().take() {
format!("{msg:?}")
} else {
"<consumed>".to_string()
}
}
fn message(&mut self) -> Box<dyn Any> {
if let Some((_, message)) = self.take() {
Box::new(message)
} else {
Box::new(())
}
}
async fn handle_message(
&mut self,
msg_id: u64,
actor: &mut A,
ctx: &ActorContext<A>,
) -> Result<(), ActorExitStatus> {
let (response_tx, msg) = self
.take()
.expect("handle_message should never be called twice.");
let message_span = actor.message_span(msg_id, &msg);
let response = actor.handle(msg, ctx).instrument(message_span).await?;
let _ = response_tx.send(response);
Ok(())
}
}
pub(crate) fn wrap_in_envelope<A, M>(msg: M) -> (Box<dyn Envelope<A>>, oneshot::Receiver<A::Reply>)
where
A: Handler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let (response_tx, response_rx) = oneshot::channel();
let envelope = Some((response_tx, msg));
(Box::new(envelope) as Box<dyn Envelope<A>>, response_rx)
}