use crate::{
ActorPath, Error,
actor::{Actor, ActorContext, Handler, Message},
};
use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use tracing::error;
use std::marker::PhantomData;
#[async_trait]
pub trait MessageHandler<A: Actor>: Send + Sync {
async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext<A>);
fn is_critical(&self) -> bool;
fn respond_stopped(&mut self);
}
struct ActorMessage<A>
where
A: Actor + Handler<A>,
{
message: A::Message,
sender: ActorPath,
rsvp: Option<oneshot::Sender<Result<A::Response, Error>>>,
_phantom_actor: PhantomData<A>,
}
impl<A> ActorMessage<A>
where
A: Actor + Handler<A>,
{
pub const fn new(
message: A::Message,
sender: ActorPath,
rsvp: Option<oneshot::Sender<Result<A::Response, Error>>>,
) -> Self {
Self {
message,
sender,
rsvp,
_phantom_actor: PhantomData,
}
}
}
#[async_trait]
impl<A> MessageHandler<A> for ActorMessage<A>
where
A: Actor + Handler<A>,
{
async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext<A>) {
let result = actor
.handle_message(self.sender.clone(), self.message.clone(), ctx)
.await;
if let Some(rsvp) = self.rsvp.take()
&& rsvp.send(result).is_err()
{
error!("Failed to send response back to caller");
}
}
fn is_critical(&self) -> bool {
self.message.is_critical()
}
fn respond_stopped(&mut self) {
if let Some(rsvp) = self.rsvp.take()
&& rsvp.send(Err(crate::Error::ActorStopped)).is_err()
{
error!("Failed to send ActorStopped response to caller");
}
}
}
pub type BoxedMessageHandler<A> = Box<dyn MessageHandler<A>>;
pub type MailboxReceiver<A> = mpsc::Receiver<BoxedMessageHandler<A>>;
pub type MailboxSender<A> = mpsc::Sender<BoxedMessageHandler<A>>;
pub type Mailbox<A> = (MailboxSender<A>, MailboxReceiver<A>);
pub fn mailbox<A>() -> Mailbox<A> {
mpsc::channel(1024)
}
pub struct HandleHelper<A> {
sender: MailboxSender<A>,
}
impl<A> HandleHelper<A>
where
A: Actor + Handler<A>,
{
pub(crate) fn new(sender: MailboxSender<A>) -> Self {
Self { sender }
}
pub(crate) async fn tell(
&self,
sender: ActorPath,
message: A::Message,
) -> Result<(), Error> {
let msg = ActorMessage::new(message, sender, None);
self.sender
.send(Box::new(msg))
.await
.map_err(|_| Error::ActorStopped)
}
pub(crate) async fn ask(
&self,
sender: ActorPath,
message: A::Message,
) -> Result<A::Response, Error> {
let (response_sender, response_receiver) = oneshot::channel();
let msg = ActorMessage::new(message, sender, Some(response_sender));
self.sender
.send(Box::new(msg))
.await
.map_err(|_| Error::ActorStopped)?;
response_receiver.await.map_err(|_| Error::ActorStopped)?
}
pub async fn close(&self) {
self.sender.closed().await;
}
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
}
impl<A> Clone for HandleHelper<A> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mailbox() {
let (sender, receiver) = mailbox::<()>();
assert_eq!(sender.is_closed(), false);
assert_eq!(receiver.is_closed(), false);
}
}