use std::fmt::Debug;
#[cfg(not(feature = "async-trait"))]
use std::future::Future;
use std::panic::AssertUnwindSafe;
use actor_properties::MuxedMessage;
use futures::TryFutureExt;
use tracing::Instrument;
use crate::concurrency::JoinHandle;
use crate::ActorId;
pub mod messages;
use messages::*;
pub mod actor_cell;
pub mod actor_id;
pub(crate) mod actor_properties;
pub mod actor_ref;
pub mod derived_actor;
mod supervision;
#[cfg(test)]
mod supervision_tests;
#[cfg(test)]
mod tests;
use actor_cell::ActorCell;
use actor_cell::ActorPortSet;
use actor_cell::ActorStatus;
use actor_ref::ActorRef;
use crate::errors::ActorErr;
use crate::errors::ActorProcessingErr;
use crate::errors::MessagingErr;
use crate::errors::SpawnErr;
use crate::ActorName;
use crate::Message;
use crate::State;
pub(crate) fn get_panic_string(e: Box<dyn std::any::Any + Send>) -> ActorProcessingErr {
match e.downcast::<String>() {
Ok(v) => From::from(*v),
Err(e) => match e.downcast::<&str>() {
Ok(v) => From::from(*v),
_ => From::from("Unknown panic occurred which couldn't be coerced to a string"),
},
}
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
pub trait Actor: Sized + Sync + Send + 'static {
type Msg: Message;
type State: State;
type Arguments: State;
#[cfg(not(feature = "async-trait"))]
fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> impl Future<Output = Result<Self::State, ActorProcessingErr>> + Send;
#[cfg(feature = "async-trait")]
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr>;
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn post_start(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn post_start(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Ok(())
}
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn post_stop(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn post_stop(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Ok(())
}
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Ok(())
}
#[allow(unused_variables)]
#[cfg(all(feature = "cluster", not(feature = "async-trait")))]
fn handle_serialized(
&self,
myself: ActorRef<Self::Msg>,
message: crate::message::SerializedMessage,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
#[cfg(all(feature = "cluster", feature = "async-trait"))]
async fn handle_serialized(
&self,
myself: ActorRef<Self::Msg>,
message: crate::message::SerializedMessage,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Ok(())
}
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async move {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorFailed(who, _) => {
myself.stop(None);
}
_ => {}
}
Ok(())
}
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorFailed(who, _) => {
myself.stop(None);
}
_ => {}
}
Ok(())
}
#[cfg(not(feature = "async-trait"))]
fn spawn(
name: Option<ActorName>,
handler: Self,
startup_args: Self::Arguments,
) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> + Send {
ActorRuntime::<Self>::spawn(name, handler, startup_args)
}
#[cfg(feature = "async-trait")]
async fn spawn(
name: Option<ActorName>,
handler: Self,
startup_args: Self::Arguments,
) -> Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr> {
ActorRuntime::<Self>::spawn(name, handler, startup_args).await
}
#[cfg(not(feature = "async-trait"))]
fn spawn_linked(
name: Option<ActorName>,
handler: Self,
startup_args: Self::Arguments,
supervisor: ActorCell,
) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> + Send {
ActorRuntime::<Self>::spawn_linked(name, handler, startup_args, supervisor)
}
#[cfg(feature = "async-trait")]
async fn spawn_linked(
name: Option<ActorName>,
handler: Self,
startup_args: Self::Arguments,
supervisor: ActorCell,
) -> Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr> {
ActorRuntime::<Self>::spawn_linked(name, handler, startup_args, supervisor).await
}
}
#[doc(hidden)]
pub(crate) struct ActorLoopResult {
pub(crate) should_exit: bool,
pub(crate) exit_reason: Option<String>,
pub(crate) was_killed: bool,
}
impl ActorLoopResult {
pub(crate) fn ok() -> Self {
Self {
should_exit: false,
exit_reason: None,
was_killed: false,
}
}
pub(crate) fn stop(reason: Option<String>) -> Self {
Self {
should_exit: true,
exit_reason: reason,
was_killed: false,
}
}
pub(crate) fn signal(signal_str: Option<String>) -> Self {
Self {
should_exit: true,
exit_reason: signal_str,
was_killed: true,
}
}
}
pub struct ActorRuntime<TActor>
where
TActor: Actor,
{
actor_ref: ActorRef<TActor::Msg>,
handler: TActor,
id: ActorId,
name: Option<String>,
}
impl<TActor: Actor> Debug for ActorRuntime<TActor> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActorRuntime")
.field("name", &self.name)
.field("id", &self.id)
.finish()
}
}
impl<TActor> ActorRuntime<TActor>
where
TActor: Actor,
{
pub async fn spawn(
name: Option<ActorName>,
handler: TActor,
startup_args: TActor::Arguments,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
let (actor, ports) = Self::new(name, handler)?;
actor.start(ports, startup_args, None).await
}
pub async fn spawn_linked(
name: Option<ActorName>,
handler: TActor,
startup_args: TActor::Arguments,
supervisor: ActorCell,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
let (actor, ports) = Self::new(name, handler)?;
actor.start(ports, startup_args, Some(supervisor)).await
}
#[allow(clippy::type_complexity)]
pub fn spawn_instant(
name: Option<ActorName>,
handler: TActor,
startup_args: TActor::Arguments,
) -> Result<
(
ActorRef<TActor::Msg>,
JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
),
SpawnErr,
> {
let (actor, ports) = Self::new(name.clone(), handler)?;
let actor_ref = actor.actor_ref.clone();
let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
let result = actor.start(ports, startup_args, None).await;
let (_, handle) = result?;
Ok(handle)
});
Ok((actor_ref, join_op))
}
#[allow(clippy::type_complexity)]
pub fn spawn_linked_instant(
name: Option<ActorName>,
handler: TActor,
startup_args: TActor::Arguments,
supervisor: ActorCell,
) -> Result<
(
ActorRef<TActor::Msg>,
JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
),
SpawnErr,
> {
let (actor, ports) = Self::new(name.clone(), handler)?;
let actor_ref = actor.actor_ref.clone();
let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
let result = actor.start(ports, startup_args, Some(supervisor)).await;
let (_, handle) = result?;
Ok(handle)
});
Ok((actor_ref, join_op))
}
#[cfg(feature = "cluster")]
pub async fn spawn_linked_remote(
name: Option<ActorName>,
handler: TActor,
id: ActorId,
startup_args: TActor::Arguments,
supervisor: ActorCell,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
if id.is_local() {
Err(SpawnErr::StartupFailed(From::from(
"Cannot spawn a remote actor when the identifier is not remote!",
)))
} else {
let (actor_cell, ports) = actor_cell::ActorCell::new_remote::<TActor>(name, id)?;
let id = actor_cell.get_id();
let name = actor_cell.get_name();
let actor_cell2 = actor_cell.clone();
let (actor, ports) = (
Self {
actor_ref: actor_cell.into(),
handler,
id,
name,
},
ports,
);
let result = actor.start(ports, startup_args, Some(supervisor)).await;
if result.is_err() {
actor_cell2.set_status(ActorStatus::Stopped);
}
result
}
}
fn new(name: Option<ActorName>, handler: TActor) -> Result<(Self, ActorPortSet), SpawnErr> {
let (actor_cell, ports) = actor_cell::ActorCell::new::<TActor>(name)?;
let id = actor_cell.get_id();
let name = actor_cell.get_name();
Ok((
Self {
actor_ref: actor_cell.into(),
handler,
id,
name,
},
ports,
))
}
#[tracing::instrument(name = "Actor", skip(self, ports, startup_args, supervisor), fields(id = self.id.to_string(), name = self.name))]
async fn start(
self,
ports: ActorPortSet,
startup_args: TActor::Arguments,
supervisor: Option<ActorCell>,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
if self.actor_ref.get_status() != ActorStatus::Unstarted {
return Err(SpawnErr::ActorAlreadyStarted);
}
let Self {
handler,
actor_ref,
id,
name,
} = self;
actor_ref.set_status(ActorStatus::Starting);
let mut state = match Self::do_pre_start(actor_ref.clone(), &handler, startup_args).await {
Ok(Ok(state)) => state,
Ok(Err(e)) => {
actor_ref.set_status(ActorStatus::Stopped);
return Err(SpawnErr::StartupFailed(e));
}
Err(e) => {
actor_ref.set_status(ActorStatus::Stopped);
return Err(e);
}
};
if let Some(sup) = &supervisor {
actor_ref.link(sup.clone());
}
let myself_ret = actor_ref.clone();
let handle = crate::concurrency::spawn_named(actor_ref.get_name().as_deref(), async move {
let myself = actor_ref.clone();
let evt = match Self::processing_loop(ports, &mut state, &handler, actor_ref, id, name)
.await
{
Ok(exit_reason) => SupervisionEvent::ActorTerminated(
myself.get_cell(),
Some(BoxedState::new(state)),
exit_reason,
),
Err(actor_err) => match actor_err {
ActorErr::Cancelled => SupervisionEvent::ActorTerminated(
myself.get_cell(),
None,
Some("killed".to_string()),
),
ActorErr::Failed(msg) => SupervisionEvent::ActorFailed(myself.get_cell(), msg),
},
};
myself.terminate();
myself.notify_supervisor_and_monitors(evt);
if let Some(sup) = supervisor {
myself.unlink(sup);
}
myself.set_status(ActorStatus::Stopped);
});
Ok((myself_ret, handle))
}
#[tracing::instrument(name = "Actor", skip(ports, state, handler, myself, _id, _name), fields(id = _id.to_string(), name = _name))]
async fn processing_loop(
mut ports: ActorPortSet,
state: &mut TActor::State,
handler: &TActor,
myself: ActorRef<TActor::Msg>,
_id: ActorId,
_name: Option<String>,
) -> Result<Option<String>, ActorErr> {
Self::do_post_start(myself.clone(), handler, state)
.await?
.map_err(ActorErr::Failed)?;
myself.set_status(ActorStatus::Running);
myself.notify_supervisor_and_monitors(SupervisionEvent::ActorStarted(myself.get_cell()));
let myself_clone = myself.clone();
let future = async move {
loop {
let ActorLoopResult {
should_exit,
exit_reason,
was_killed,
} = Box::pin(Self::process_message(&myself, state, handler, &mut ports))
.await
.map_err(ActorErr::Failed)?;
if should_exit {
return Ok((state, exit_reason, was_killed));
}
}
};
let loop_done = futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.map_err(|err| ActorErr::Failed(get_panic_string(err)))
.await;
myself_clone.set_status(ActorStatus::Stopping);
let (exit_state, exit_reason, was_killed) = loop_done??;
if !was_killed {
Self::do_post_stop(myself_clone, handler, exit_state)
.await?
.map_err(ActorErr::Failed)?;
}
Ok(exit_reason)
}
async fn process_message(
myself: &ActorRef<TActor::Msg>,
state: &mut TActor::State,
handler: &TActor,
ports: &mut ActorPortSet,
) -> Result<ActorLoopResult, ActorProcessingErr> {
match ports.listen_in_priority().await {
Ok(actor_port_message) => match actor_port_message {
actor_cell::ActorPortMessage::Signal(signal) => Ok(ActorLoopResult::signal(
Self::handle_signal(myself.clone(), signal),
)),
actor_cell::ActorPortMessage::Stop(stop_message) => {
let exit_reason = match stop_message {
StopMessage::Stop => {
tracing::trace!("Actor {:?} stopped with no reason", myself.get_id());
None
}
StopMessage::Reason(reason) => {
tracing::trace!(
"Actor {:?} stopped with reason '{reason}'",
myself.get_id(),
);
Some(reason)
}
};
Ok(ActorLoopResult::stop(exit_reason))
}
actor_cell::ActorPortMessage::Supervision(supervision) => {
let future = Self::handle_supervision_message(
myself.clone(),
state,
handler,
supervision,
);
match ports.run_with_signal(future).await {
Ok(Ok(())) => Ok(ActorLoopResult::ok()),
Ok(Err(internal_err)) => Err(internal_err),
Err(signal) => Ok(ActorLoopResult::signal(Self::handle_signal(
myself.clone(),
signal,
))),
}
}
actor_cell::ActorPortMessage::Message(MuxedMessage::Message(msg)) => {
let future = Self::handle_message(myself.clone(), state, handler, msg);
match ports.run_with_signal(future).await {
Ok(Ok(())) => Ok(ActorLoopResult::ok()),
Ok(Err(internal_err)) => Err(internal_err),
Err(signal) => Ok(ActorLoopResult::signal(Self::handle_signal(
myself.clone(),
signal,
))),
}
}
actor_cell::ActorPortMessage::Message(MuxedMessage::Drain) => {
Ok(ActorLoopResult::stop(Some("Drained".to_string())))
}
},
Err(MessagingErr::ChannelClosed) => {
Ok(ActorLoopResult::signal(Self::handle_signal(
myself.clone(),
Signal::Kill,
)))
}
Err(MessagingErr::InvalidActorType) => {
Ok(ActorLoopResult::signal(Self::handle_signal(
myself.clone(),
Signal::Kill,
)))
}
Err(MessagingErr::SendErr(_)) => {
Ok(ActorLoopResult::signal(Self::handle_signal(
myself.clone(),
Signal::Kill,
)))
}
}
}
async fn handle_message(
myself: ActorRef<TActor::Msg>,
state: &mut TActor::State,
handler: &TActor,
mut msg: crate::message::BoxedMessage,
) -> Result<(), ActorProcessingErr> {
#[cfg(feature = "cluster")]
{
if !myself.get_id().is_local() {
match msg.serialized_msg {
Some(serialized_msg) => {
return handler
.handle_serialized(myself, serialized_msg, state)
.await;
}
None => {
return Err(From::from(
"`RemoteActor` failed to read `SerializedMessage` from `BoxedMessage`",
));
}
}
}
}
let current_span_when_message_was_sent = msg.span.take();
let typed_msg = TActor::Msg::from_boxed(msg)?;
if let Some(span) = current_span_when_message_was_sent {
handler
.handle(myself, typed_msg, state)
.instrument(span)
.await
} else {
handler.handle(myself, typed_msg, state).await
}
}
fn handle_signal(myself: ActorRef<TActor::Msg>, signal: Signal) -> Option<String> {
match &signal {
Signal::Kill => {
myself.terminate();
}
}
Some(signal.to_string())
}
async fn handle_supervision_message(
myself: ActorRef<TActor::Msg>,
state: &mut TActor::State,
handler: &TActor,
message: SupervisionEvent,
) -> Result<(), ActorProcessingErr> {
handler.handle_supervisor_evt(myself, message, state).await
}
async fn do_pre_start(
myself: ActorRef<TActor::Msg>,
handler: &TActor,
arguments: TActor::Arguments,
) -> Result<Result<TActor::State, ActorProcessingErr>, SpawnErr> {
let future = handler.pre_start(myself, arguments);
futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.await
.map_err(|err| SpawnErr::StartupFailed(get_panic_string(err)))
}
async fn do_post_start(
myself: ActorRef<TActor::Msg>,
handler: &TActor,
state: &mut TActor::State,
) -> Result<Result<(), ActorProcessingErr>, ActorErr> {
let future = handler.post_start(myself, state);
futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.await
.map_err(|err| ActorErr::Failed(get_panic_string(err)))
}
async fn do_post_stop(
myself: ActorRef<TActor::Msg>,
handler: &TActor,
state: &mut TActor::State,
) -> Result<Result<(), ActorProcessingErr>, ActorErr> {
let future = handler.post_stop(myself, state);
futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.await
.map_err(|err| ActorErr::Failed(get_panic_string(err)))
}
}