use async_trait::async_trait;
use std::{
fmt::{Debug, Display},
time::{Duration, Instant},
};
use crate::{
error::{
actor::{ActorHandleErrorFailure, ActorInitFailure, ActorRuntimeError, ActorStopFailure},
handler::BaseHandlerError,
},
handler::ActorMessageHandlerTrait,
log,
messaging::Receiver,
};
pub enum CommandMessage {
StopActor,
ForceStopActor,
RestartActor,
}
#[doc(hidden)]
pub enum ActorMessage<M> {
CommandMessage(CommandMessage),
RegularMessage {
msg: M,
sent_at: Instant,
ttl: Option<Duration>,
},
}
#[async_trait]
pub trait ActorTrait<E>
where
E: Send + Display + Debug + 'static,
{
async fn init(&mut self) -> Result<(), ActorInitFailure> {
Ok(())
}
#[allow(unused_variables, unused_mut)]
async fn on_stop(&mut self) -> Result<(), ActorStopFailure> {
Ok(())
}
#[allow(unused_variables, unused_mut)]
async fn on_error(
&mut self,
error: BaseHandlerError<E>,
) -> Result<Option<CommandMessage>, ActorHandleErrorFailure> {
Ok(Some(CommandMessage::StopActor))
}
}
pub async fn run<A, M, E>(actor: A, rx: Receiver<M>)
where
M: Send + 'static,
A: ActorMessageHandlerTrait<M, E> + ActorTrait<E> + Send + Sync + 'static,
E: Send + Debug + Display + 'static,
{
if let Err(error) = run_actor_loop(actor, rx.rx).await {
log::error(format!("Actor runtime error: {error}"));
}
log::info("Actor task finished - channel closed".to_string());
}
async fn run_actor_loop<A, M, E>(
mut actor: A,
rx: async_channel::Receiver<ActorMessage<M>>,
) -> Result<(), ActorRuntimeError>
where
M: Send + 'static,
A: ActorMessageHandlerTrait<M, E> + ActorTrait<E> + Send + Sync + 'static,
E: Send + Debug + Display + 'static,
{
actor.init().await?;
loop {
let msg = rx.recv().await?;
let command_result = handle_message(&mut actor, msg).await?;
if let Some(command) = command_result {
match command {
CommandMessage::StopActor => {
rx.close();
}
CommandMessage::ForceStopActor => {
return Ok(());
}
CommandMessage::RestartActor => {
actor.on_stop().await?;
actor.init().await?;
}
};
}
if rx.is_closed() {
break;
}
}
actor.on_stop().await?;
Ok(())
}
async fn handle_message<A, M, E>(
actor: &mut A,
msg: ActorMessage<M>,
) -> Result<Option<CommandMessage>, ActorHandleErrorFailure>
where
M: Send + 'static,
A: Send + Sync + ActorMessageHandlerTrait<M, E> + ActorTrait<E> + 'static,
E: Send + Debug + Display + 'static,
{
let result = match msg {
ActorMessage::CommandMessage(command) => Some(command),
ActorMessage::RegularMessage { msg, sent_at, ttl } => {
if match ttl {
Some(ttl) => sent_at.elapsed() > ttl,
None => false,
} {
return Ok(None);
}
let result = actor
.__handle(msg)
.await
.inspect_err(|err| log::error(format!("{err}")));
match result {
Ok(_) => None,
Err(err) => actor.on_error(err).await?,
}
}
};
Ok(result)
}