use crate::actor_ref::ActorRef;
use crate::{ActorResult, ActorWeak, ControlSignal, FailurePhase, MailboxMessage};
use std::{fmt::Debug, future::Future};
use tokio::sync::mpsc;
use tracing::{debug, error, Instrument};
macro_rules! run_with_actor_scope {
($actor_id:expr, $fut:expr) => {{
#[cfg(feature = "deadlock-detection")]
{
crate::CURRENT_ACTOR.scope($actor_id, $fut).await
}
#[cfg(not(feature = "deadlock-detection"))]
{
$fut.await
}
}};
}
macro_rules! with_actor_scope {
($actor_id:expr, $fut:expr) => {{
#[cfg(feature = "deadlock-detection")]
{
crate::CURRENT_ACTOR.scope($actor_id, $fut)
}
#[cfg(not(feature = "deadlock-detection"))]
{
$fut
}
}};
}
pub trait Actor: Sized + Send + 'static {
type Args: Send;
type Error: Send + Debug;
fn on_start(
args: Self::Args,
actor_ref: &ActorRef<Self>,
) -> impl Future<Output = std::result::Result<Self, Self::Error>> + Send;
#[allow(unused_variables)]
fn on_run(
&mut self,
actor_weak: &ActorWeak<Self>,
) -> impl Future<Output = std::result::Result<bool, Self::Error>> + Send {
async { Ok(false) }
}
#[allow(unused_variables)]
fn on_stop(
&mut self,
actor_weak: &ActorWeak<Self>,
killed: bool,
) -> impl Future<Output = std::result::Result<(), Self::Error>> + Send {
async { Ok(()) }
}
}
pub trait Message<T: Send + 'static>: Actor {
type Reply: Send + 'static;
fn handle(
&mut self,
msg: T,
actor_ref: &ActorRef<Self>,
) -> impl Future<Output = Self::Reply> + Send;
fn on_tell_result(_result: &Self::Reply, _actor_ref: &ActorRef<Self>) {
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_lifecycle",
fields(
actor_id = %actor_ref.identity(),
actor_type = %std::any::type_name::<T>()
),
skip_all
))]
pub(crate) async fn run_actor_lifecycle<T: Actor>(
args: T::Args,
actor_ref: ActorRef<T>,
mut receiver: mpsc::Receiver<MailboxMessage<T>>,
mut terminate_receiver: mpsc::Receiver<ControlSignal>,
) -> ActorResult<T> {
let actor_id = actor_ref.identity();
#[cfg(feature = "tracing")]
let on_start_span = tracing::debug_span!("actor_on_start");
#[cfg(not(feature = "tracing"))]
let on_start_span = tracing::Span::none();
let mut actor = match run_with_actor_scope!(
actor_id,
T::on_start(args, &actor_ref).instrument(on_start_span)
) {
Ok(actor) => {
debug!("Actor {actor_id} on_start completed successfully.");
actor
}
Err(e) => {
error!("Actor {actor_id} on_start failed: {e:?}");
return ActorResult::Failed {
actor: None,
error: e,
phase: FailurePhase::OnStart,
killed: false,
};
}
};
debug!("Actor {actor_id} runtime starting - entering main processing loop.");
let actor_weak = ActorRef::downgrade(&actor_ref);
drop(actor_ref);
let mut killed = false;
let mut idle_enabled = true;
loop {
#[cfg(feature = "tracing")]
let on_run_span = tracing::debug_span!("actor_on_run");
#[cfg(not(feature = "tracing"))]
let on_run_span = tracing::Span::none();
tokio::select! {
biased;
maybe_terminate = terminate_receiver.recv() => {
match maybe_terminate {
Some(_) => {
#[cfg(feature = "tracing")]
debug!("Actor termination via kill() method");
killed = true;
}
None => {
#[cfg(feature = "tracing")]
debug!("Actor termination due to all actor_ref instances being dropped");
killed = false;
}
}
#[cfg(feature = "tracing")]
let on_stop_span = tracing::debug_span!("actor_on_stop", killed);
#[cfg(not(feature = "tracing"))]
let on_stop_span = tracing::Span::none();
if let Err(e) = run_with_actor_scope!(
actor_id,
actor.on_stop(&actor_weak, killed).instrument(on_stop_span)
) {
error!("Actor {actor_id} on_stop failed during termination: {e:?}");
return ActorResult::Failed {
actor: Some(actor),
error: e,
phase: FailurePhase::OnStop,
killed,
};
}
break; }
maybe_message = receiver.recv() => {
match maybe_message {
Some(MailboxMessage::Envelope { payload, reply_channel, actor_ref }) => {
#[cfg(feature = "tracing")]
let msg_span = tracing::debug_span!("actor_process_message");
#[cfg(not(feature = "tracing"))]
let msg_span = tracing::Span::none();
#[cfg(feature = "tracing")]
let start_time = std::time::Instant::now();
#[cfg(feature = "metrics")]
let metrics_ref = actor_ref.clone();
#[cfg(feature = "metrics")]
let _metrics_guard = crate::metrics::collector::MessageProcessingGuard::new(
metrics_ref.metrics_collector()
);
run_with_actor_scope!(
actor_id,
payload.handle_message(&mut actor, actor_ref, reply_channel)
.instrument(msg_span)
);
#[cfg(feature = "tracing")]
debug!("Actor {} processed message in {:?}", actor_id, start_time.elapsed());
}
Some(MailboxMessage::StopGracefully(_)) | None => {
#[cfg(feature = "tracing")]
debug!("Actor termination due to graceful stop");
#[cfg(feature = "tracing")]
let on_stop_span = tracing::debug_span!("actor_on_stop", killed = false);
#[cfg(not(feature = "tracing"))]
let on_stop_span = tracing::Span::none();
if let Err(e) = run_with_actor_scope!(
actor_id,
actor.on_stop(&actor_weak, false).instrument(on_stop_span)
) {
error!("Actor {actor_id} on_stop failed during graceful stop: {e:?}");
return ActorResult::Failed {
actor: Some(actor),
error: e,
phase: FailurePhase::OnStop,
killed: false,
};
}
break;
}
}
}
maybe_result = with_actor_scope!(
actor_id,
actor.on_run(&actor_weak).instrument(on_run_span)
), if idle_enabled => {
match maybe_result {
Ok(true) => {
}
Ok(false) => {
idle_enabled = false;
}
Err(e) => {
let error_msg = format!("Actor {actor_id} on_run error: {e:?}");
error!("{error_msg}");
#[cfg(feature = "tracing")]
let on_stop_span = tracing::debug_span!("actor_on_stop", killed = false);
#[cfg(not(feature = "tracing"))]
let on_stop_span = tracing::Span::none();
let phase = if let Err(stop_err) = run_with_actor_scope!(
actor_id,
actor.on_stop(&actor_weak, false).instrument(on_stop_span)
) {
error!(
"Actor {actor_id} on_stop failed during on_run error cleanup: {stop_err:?}"
);
FailurePhase::OnRunThenOnStop
} else {
FailurePhase::OnRun
};
return ActorResult::Failed {
actor: Some(actor),
error: e,
phase,
killed,
};
}
}
}
}
}
receiver.close(); terminate_receiver.close();
debug!("Actor {actor_id} lifecycle completed - exiting runtime loop.");
ActorResult::Completed { actor, killed }
}