use std::any::type_name;
use std::fmt;
use std::sync::Arc;
use async_trait::async_trait;
use thiserror::Error;
use tracing::error;
use crate::{ActorContext, QueueCapacity, SendError};
#[derive(Clone, Debug, Error)]
pub enum ActorExitStatus {
#[error("Success")]
Success,
#[error("Quit")]
Quit,
#[error("Downstream actor exited.")]
DownstreamClosed,
#[error("Killed")]
Killed,
#[error("Failure(cause={0:?})")]
Failure(Arc<anyhow::Error>),
#[error("Panicked")]
Panicked,
}
impl From<anyhow::Error> for ActorExitStatus {
fn from(err: anyhow::Error) -> Self {
ActorExitStatus::Failure(Arc::new(err))
}
}
impl ActorExitStatus {
pub fn is_success(&self) -> bool {
matches!(self, ActorExitStatus::Success)
}
}
impl From<SendError> for ActorExitStatus {
fn from(_: SendError) -> Self {
ActorExitStatus::DownstreamClosed
}
}
#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
type ObservableState: Send + Sync + Clone + serde::Serialize + fmt::Debug;
fn name(&self) -> String {
type_name::<Self>().to_string()
}
fn runtime_handle(&self) -> tokio::runtime::Handle {
tokio::runtime::Handle::current()
}
fn yield_after_each_message(&self) -> bool {
true
}
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Unbounded
}
fn observable_state(&self) -> Self::ObservableState;
async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
Ok(())
}
async fn on_drained_messages(
&mut self,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
Ok(())
}
async fn finalize(
&mut self,
_exit_status: &ActorExitStatus,
_ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait::async_trait]
pub trait DeferableReplyHandler<M>: Actor {
type Reply: Send + 'static;
async fn handle_message(
&mut self,
message: M,
reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus>
where
M: Send + Sync + 'static;
}
#[async_trait::async_trait]
pub trait Handler<M>: Actor {
type Reply: Send + 'static;
async fn handle(
&mut self,
message: M,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus>;
}
#[async_trait::async_trait]
impl<H, M> DeferableReplyHandler<M> for H
where H: Handler<M>
{
type Reply = H::Reply;
async fn handle_message(
&mut self,
message: M,
reply: impl FnOnce(Self::Reply) + Send + 'static,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus>
where
M: Send + 'static + Send + Sync,
{
self.handle(message, ctx).await.map(reply)
}
}