use std::any::type_name;
use std::convert::Infallible;
use std::fmt;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::sync::watch::Sender;
use tracing::{debug, error, info_span, Span};
use crate::actor_state::{ActorState, AtomicState};
use crate::channel_with_priority::Priority;
use crate::envelope::wrap_in_envelope;
use crate::mailbox::{Command, CommandOrMessage};
use crate::progress::{Progress, ProtectedZoneGuard};
use crate::scheduler::{Callback, ScheduleEvent, Scheduler};
use crate::spawn_builder::SpawnBuilder;
use crate::{ActorRunner, AskError, KillSwitch, Mailbox, QueueCapacity, SendError};
#[derive(Error, Debug, Clone)]
pub enum ActorExitStatus {
#[error("Success")]
Success,
#[error("Quit")]
Quit,
#[error("Downstream actor exited.")]
DownstreamClosed,
#[error("Killed")]
Killed,
#[error("Failure(cause={0:?})")]
Failure(Arc<anyhow::Error>),
#[error("Panicked")]
Panicked,
}
impl From<anyhow::Error> for ActorExitStatus {
fn from(err: anyhow::Error) -> Self {
ActorExitStatus::Failure(Arc::new(err))
}
}
impl ActorExitStatus {
pub fn is_success(&self) -> bool {
matches!(self, ActorExitStatus::Success)
}
}
impl From<SendError> for ActorExitStatus {
fn from(_: SendError) -> Self {
ActorExitStatus::DownstreamClosed
}
}
#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
type ObservableState: Send + Sync + Clone + fmt::Debug;
fn name(&self) -> String {
type_name::<Self>().to_string()
}
fn runner(&self) -> ActorRunner {
ActorRunner::GlobalRuntime
}
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Unbounded
}
fn observable_state(&self) -> Self::ObservableState;
fn span(&self, _ctx: &ActorContext<Self>) -> Span {
info_span!("", actor = %self.name())
}
async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
Ok(())
}
async fn finalize(
&mut self,
_exit_status: &ActorExitStatus,
_ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
Ok(())
}
}
pub struct ActorContext<A: Actor> {
inner: Arc<ActorContextInner<A>>,
phantom_data: PhantomData<A>,
}
impl<A: Actor> Clone for ActorContext<A> {
fn clone(&self) -> Self {
ActorContext {
inner: self.inner.clone(),
phantom_data: PhantomData,
}
}
}
impl<A: Actor> Deref for ActorContext<A> {
type Target = ActorContextInner<A>;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
pub struct ActorContextInner<A: Actor> {
self_mailbox: Mailbox<A>,
progress: Progress,
kill_switch: KillSwitch,
scheduler_mailbox: Mailbox<Scheduler>,
actor_state: AtomicState,
}
impl<A: Actor> ActorContext<A> {
pub(crate) fn new(
self_mailbox: Mailbox<A>,
kill_switch: KillSwitch,
scheduler_mailbox: Mailbox<Scheduler>,
) -> Self {
ActorContext {
inner: ActorContextInner {
self_mailbox,
progress: Progress::default(),
kill_switch,
scheduler_mailbox,
actor_state: AtomicState::default(),
}
.into(),
phantom_data: PhantomData,
}
}
pub fn mailbox(&self) -> &Mailbox<A> {
&self.self_mailbox
}
pub fn actor_instance_id(&self) -> &str {
self.mailbox().actor_instance_id()
}
pub fn protect_zone(&self) -> ProtectedZoneGuard {
self.progress.protect_zone()
}
pub fn kill_switch(&self) -> &KillSwitch {
&self.kill_switch
}
pub fn progress(&self) -> &Progress {
&self.progress
}
pub fn spawn_actor<SpawnedActor: Actor>(
&self,
actor: SpawnedActor,
) -> SpawnBuilder<SpawnedActor> {
SpawnBuilder::new(
actor,
self.scheduler_mailbox.clone(),
self.kill_switch.clone(),
)
}
pub fn record_progress(&self) {
self.progress.record_progress();
}
pub(crate) fn state(&self) -> ActorState {
self.actor_state.get_state()
}
pub(crate) fn process(&self) {
self.actor_state.process();
}
pub(crate) fn idle(&self) {
self.actor_state.idle();
}
pub(crate) fn pause(&self) {
self.actor_state.pause();
}
pub(crate) fn resume(&self) {
self.actor_state.resume();
}
pub(crate) fn exit(&self, exit_status: &ActorExitStatus) {
if !exit_status.is_success() {
error!(actor_name=self.actor_instance_id(), actor_exit_status=?exit_status, "actor-failure");
}
self.actor_state.exit(exit_status.is_success());
if should_activate_kill_switch(exit_status) {
error!(actor=%self.actor_instance_id(), exit_status=?exit_status, "exit activating-kill-switch");
self.kill_switch().kill();
}
}
}
fn should_activate_kill_switch(exit_status: &ActorExitStatus) -> bool {
match exit_status {
ActorExitStatus::DownstreamClosed => true,
ActorExitStatus::Failure(_) => true,
ActorExitStatus::Panicked => true,
ActorExitStatus::Success => false,
ActorExitStatus::Quit => false,
ActorExitStatus::Killed => false,
}
}
impl<A: Actor> ActorContext<A> {
pub async fn send_message<DestActor: Actor, M>(
&self,
mailbox: &Mailbox<DestActor>,
msg: M,
) -> Result<oneshot::Receiver<DestActor::Reply>, crate::SendError>
where
DestActor: Handler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg);
mailbox.send_message(msg).await
}
pub async fn ask<DestActor: Actor, M, T>(
&self,
mailbox: &Mailbox<DestActor>,
msg: M,
) -> Result<T, AskError<Infallible>>
where
DestActor: Handler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
mailbox.ask(msg).await
}
pub async fn ask_for_res<DestActor: Actor, M, T, E: fmt::Debug>(
&self,
mailbox: &Mailbox<DestActor>,
msg: M,
) -> Result<T, AskError<E>>
where
DestActor: Handler<M, Reply = Result<T, E>>,
M: 'static + Send + Sync + fmt::Debug,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
mailbox.ask_for_res(msg).await
}
pub async fn send_exit_with_success<Dest: Actor>(
&self,
mailbox: &Mailbox<Dest>,
) -> Result<(), crate::SendError> {
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), to=%mailbox.actor_instance_id(), "success");
mailbox
.send_with_priority(
CommandOrMessage::Command(Command::ExitWithSuccess),
Priority::Low,
)
.await
}
pub async fn send_self_message<M>(
&self,
msg: M,
) -> Result<oneshot::Receiver<A::Reply>, crate::SendError>
where
A: Handler<M>,
M: 'static + Sync + Send + fmt::Debug,
{
debug!(self=%self.self_mailbox.actor_instance_id(), msg=?msg, "self_send");
self.self_mailbox.send_message(msg).await
}
pub async fn schedule_self_msg<M>(&self, after_duration: Duration, msg: M)
where
A: Handler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let self_mailbox = self.inner.self_mailbox.clone();
let (envelope, _response_rx) = wrap_in_envelope(msg);
let callback = Callback(Box::pin(async move {
let _ = self_mailbox
.send_with_priority(CommandOrMessage::Message(envelope), Priority::High)
.await;
}));
let scheduler_msg = ScheduleEvent {
timeout: after_duration,
callback,
};
let _ = self
.send_message(&self.inner.scheduler_mailbox, scheduler_msg)
.await;
}
}
pub(crate) fn process_command<A: Actor>(
actor: &mut A,
command: Command,
ctx: &ActorContext<A>,
state_tx: &Sender<A::ObservableState>,
) -> Option<ActorExitStatus> {
match command {
Command::Pause => {
ctx.pause();
None
}
Command::ExitWithSuccess => Some(ActorExitStatus::Success),
Command::Quit => Some(ActorExitStatus::Quit),
Command::Kill => Some(ActorExitStatus::Killed),
Command::Resume => {
ctx.resume();
None
}
Command::Observe(cb) => {
let state = actor.observable_state();
let _ = state_tx.send(state.clone());
let _ = cb.send(Box::new(state));
None
}
}
}
#[async_trait::async_trait]
pub trait Handler<M>: Actor {
type Reply: 'static + Send;
fn message_span(&self, msg_id: u64, _msg: &M) -> Span {
info_span!("", msg_id = &msg_id)
}
async fn handle(
&mut self,
message: M,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus>;
}