use crate::error::{Error, Result};
use crate::Identity;
use crate::{Actor, ControlSignal, MailboxMessage, MailboxSender, Message};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
#[cfg(feature = "tracing")]
use tracing::{debug, info};
#[cfg(feature = "metrics")]
use crate::metrics::MetricsCollector;
#[cfg(feature = "metrics")]
use std::sync::Arc;
#[derive(Debug)]
pub struct ActorRef<T: Actor> {
id: Identity,
sender: MailboxSender<T>,
pub(crate) terminate_sender: mpsc::Sender<ControlSignal>,
#[cfg(feature = "metrics")]
pub(crate) metrics: Arc<MetricsCollector>,
}
impl<T: Actor> ActorRef<T> {
pub(crate) fn new(
id: Identity,
sender: MailboxSender<T>,
terminate_sender: mpsc::Sender<ControlSignal>,
#[cfg(feature = "metrics")] metrics: Arc<MetricsCollector>,
) -> Self {
ActorRef {
id,
sender,
terminate_sender,
#[cfg(feature = "metrics")]
metrics,
}
}
pub fn identity(&self) -> Identity {
self.id
}
#[inline]
pub fn is_alive(&self) -> bool {
!self.sender.is_closed() && !self.terminate_sender.is_closed()
}
pub fn downgrade(this: &Self) -> ActorWeak<T> {
ActorWeak {
id: this.id,
sender: this.sender.downgrade(),
terminate_sender: this.terminate_sender.downgrade(),
#[cfg(feature = "metrics")]
metrics: this.metrics.clone(),
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_tell",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>()
),
skip(self, msg)
))]
pub async fn tell<M>(&self, msg: M) -> Result<()>
where
M: Send + 'static,
T: Message<M>,
{
let envelope = MailboxMessage::Envelope {
payload: Box::new(msg),
reply_channel: None, actor_ref: self.clone(), };
#[cfg(feature = "tracing")]
debug!("Sending tell message (fire-and-forget)");
let result = if self.sender.send(envelope).await.is_err() {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::ActorStopped,
"tell",
);
Err(Error::Send {
identity: self.identity(),
details: "Mailbox channel closed".to_string(),
})
} else {
Ok(())
};
#[cfg(feature = "tracing")]
match &result {
Ok(_) => debug!("Tell message sent successfully"),
Err(e) => warn!(error = %e, "Failed to send tell message"),
}
result
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_tell_with_timeout",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
timeout_ms = timeout.as_millis()
),
skip(self, msg)
))]
pub async fn tell_with_timeout<M>(&self, msg: M, timeout: Duration) -> Result<()>
where
T: Message<M>,
M: Send + 'static,
{
#[cfg(feature = "tracing")]
debug!(
timeout_ms = timeout.as_millis(),
"Sending tell message with timeout"
);
let result = tokio::time::timeout(timeout, self.tell(msg))
.await
.map_err(|_| {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::Timeout,
"tell",
);
Error::Timeout {
identity: self.identity(),
timeout,
operation: "tell".to_string(),
}
})?;
#[cfg(feature = "tracing")]
match &result {
Ok(_) => debug!("Tell with timeout completed successfully"),
Err(e) => warn!(error = %e, "Tell with timeout failed"),
}
result
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_ask",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
reply_type = %std::any::type_name::<T::Reply>()
),
skip(self, msg)
))]
pub async fn ask<M>(&self, msg: M) -> Result<T::Reply>
where
T: Message<M>,
M: Send + 'static,
T::Reply: Send + 'static,
{
#[cfg(feature = "deadlock-detection")]
let _guard = {
let caller = crate::CURRENT_ACTOR.try_with(|id| *id).ok();
if let Some(caller) = caller {
let callee = self.identity();
let mut graph = crate::wait_for_graph().lock().unwrap();
if caller.id == callee.id || crate::has_path(&graph, callee.id, caller.id) {
let cycle = crate::format_cycle_path(&graph, caller, callee);
drop(graph);
panic!(
"Deadlock detected: ask cycle {cycle}\n\
This is a design error. Use `tell` to break the cycle, \
or restructure actor dependencies."
);
}
graph.insert(caller.id, callee);
Some(crate::WaitForGuard(caller.id))
} else {
None
}
};
let (reply_tx, reply_rx) = oneshot::channel();
let envelope = MailboxMessage::Envelope {
payload: Box::new(msg),
reply_channel: Some(reply_tx),
actor_ref: self.clone(), };
#[cfg(feature = "tracing")]
debug!("Sending ask message and waiting for reply");
if self.sender.send(envelope).await.is_err() {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::ActorStopped,
"ask",
);
#[cfg(feature = "tracing")]
warn!("Failed to send ask message: mailbox channel closed");
return Err(Error::Send {
identity: self.identity(),
details: "Mailbox channel closed".to_string(),
});
}
match reply_rx.await {
Ok(reply_any) => {
match reply_any.downcast::<T::Reply>() {
Ok(reply) => {
#[cfg(feature = "tracing")]
debug!("Ask reply received successfully");
Ok(*reply)
}
Err(_) => {
#[cfg(feature = "tracing")]
warn!(
expected_type = %std::any::type_name::<T::Reply>(),
"Ask reply type downcast failed"
);
Err(Error::Downcast {
identity: self.identity(),
expected_type: std::any::type_name::<T::Reply>().to_string(),
})
}
}
}
Err(_recv_err) => {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::ReplyDropped,
"ask",
);
#[cfg(feature = "tracing")]
warn!("Ask reply channel closed unexpectedly");
Err(Error::Receive {
identity: self.identity(),
details: "Reply channel closed unexpectedly".to_string(),
})
}
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_ask_with_timeout",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
reply_type = %std::any::type_name::<T::Reply>(),
timeout_ms = timeout.as_millis()
),
skip(self, msg)
))]
pub async fn ask_with_timeout<M>(&self, msg: M, timeout: Duration) -> Result<T::Reply>
where
T: Message<M>,
M: Send + 'static,
T::Reply: Send + 'static,
{
#[cfg(feature = "tracing")]
debug!(
timeout_ms = timeout.as_millis(),
"Sending ask message with timeout"
);
let result = tokio::time::timeout(timeout, self.ask(msg))
.await
.map_err(|_| {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::Timeout,
"ask",
);
Error::Timeout {
identity: self.identity(),
timeout,
operation: "ask".to_string(),
}
})?;
#[cfg(feature = "tracing")]
match &result {
Ok(_) => debug!("Ask with timeout completed successfully"),
Err(e) => warn!(error = %e, "Ask with timeout failed"),
}
result
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "info", name = "actor_kill", skip(self))
)]
pub fn kill(&self) -> Result<()> {
#[cfg(feature = "tracing")]
info!(actor_id = %self.identity(), "Killing actor");
match self.terminate_sender.try_send(ControlSignal::Terminate) {
Ok(_) => {
#[cfg(feature = "tracing")]
info!("Kill signal sent successfully");
Ok(())
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("Failed to send Terminate to actor {}: terminate mailbox is full. Actor is likely already being terminated.", self.identity());
Ok(())
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!("Failed to send Terminate to actor {}: terminate mailbox closed. Actor might already be stopped.", self.identity());
Ok(())
}
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "info", name = "actor_stop", skip(self))
)]
pub async fn stop(&self) -> Result<()> {
match self
.sender
.send(MailboxMessage::StopGracefully(self.clone()))
.await
{
Ok(_) => {
#[cfg(feature = "tracing")]
info!(actor_id = %self.identity(), "Actor stop signal sent successfully");
Ok(())
}
Err(_) => {
warn!("Failed to send StopGracefully to actor {}: mailbox closed. Actor might already be stopped or stopping.", self.identity());
Ok(())
}
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_blocking_tell",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
timeout_ms = timeout.map(|t| t.as_millis())
),
skip(self, msg)
))]
pub fn blocking_tell<M>(&self, msg: M, timeout: Option<Duration>) -> Result<()>
where
M: Send + 'static,
T: Message<M>,
{
match timeout {
Some(timeout_duration) => self.blocking_tell_with_timeout_impl(msg, timeout_duration),
None => self.blocking_tell_no_timeout(msg),
}
}
fn blocking_tell_no_timeout<M>(&self, msg: M) -> Result<()>
where
M: Send + 'static,
T: Message<M>,
{
let envelope = MailboxMessage::Envelope {
payload: Box::new(msg),
reply_channel: None, actor_ref: self.clone(), };
#[cfg(feature = "tracing")]
debug!("Sending blocking tell message (fire-and-forget)");
let result = self.sender.blocking_send(envelope).map_err(|_| {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::ActorStopped,
"blocking_tell",
);
Error::Send {
identity: self.identity(),
details: "Mailbox channel closed".to_string(),
}
});
#[cfg(feature = "tracing")]
match &result {
Ok(_) => debug!("Blocking tell message sent successfully"),
Err(e) => warn!(error = %e, "Failed to send blocking tell message"),
}
result
}
fn blocking_tell_with_timeout_impl<M>(&self, msg: M, timeout: Duration) -> Result<()>
where
M: Send + 'static,
T: Message<M>,
{
let self_clone = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build();
let result = match rt {
Ok(runtime) => runtime.block_on(async {
tokio::time::timeout(timeout, self_clone.tell(msg))
.await
.map_err(|_| {
crate::dead_letter::record::<M>(
self_clone.identity(),
crate::dead_letter::DeadLetterReason::Timeout,
"blocking_tell",
);
Error::Timeout {
identity: self_clone.identity(),
timeout,
operation: "blocking_tell".to_string(),
}
})?
}),
Err(e) => Err(Error::Send {
identity: self_clone.identity(),
details: format!("Failed to create runtime: {}", e),
}),
};
let _ = tx.send(result);
});
rx.recv().map_err(|_| Error::Send {
identity: self.identity(),
details: "Timeout thread terminated unexpectedly".to_string(),
})?
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_blocking_ask",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
reply_type = %std::any::type_name::<T::Reply>(),
timeout_ms = timeout.map(|t| t.as_millis())
),
skip(self, msg)
))]
pub fn blocking_ask<M>(&self, msg: M, timeout: Option<Duration>) -> Result<T::Reply>
where
T: Message<M>,
M: Send + 'static,
T::Reply: Send + 'static,
{
match timeout {
Some(timeout_duration) => self.blocking_ask_with_timeout_impl(msg, timeout_duration),
None => self.blocking_ask_no_timeout(msg),
}
}
fn blocking_ask_no_timeout<M>(&self, msg: M) -> Result<T::Reply>
where
T: Message<M>,
M: Send + 'static,
T::Reply: Send + 'static,
{
let (reply_tx, reply_rx) = oneshot::channel();
let envelope = MailboxMessage::Envelope {
payload: Box::new(msg),
reply_channel: Some(reply_tx),
actor_ref: self.clone(), };
#[cfg(feature = "tracing")]
debug!("Sending blocking ask message and waiting for reply");
self.sender.blocking_send(envelope).map_err(|_| {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::ActorStopped,
"blocking_ask",
);
#[cfg(feature = "tracing")]
warn!("Failed to send blocking ask message: mailbox channel closed");
Error::Send {
identity: self.identity(),
details: "Mailbox channel closed".to_string(),
}
})?;
match reply_rx.blocking_recv() {
Ok(reply_any) => {
match reply_any.downcast::<T::Reply>() {
Ok(reply) => {
#[cfg(feature = "tracing")]
debug!("Blocking ask reply received successfully");
Ok(*reply)
}
Err(_) => {
#[cfg(feature = "tracing")]
warn!(
expected_type = %std::any::type_name::<T::Reply>(),
"Blocking ask reply type downcast failed"
);
Err(Error::Downcast {
identity: self.identity(),
expected_type: std::any::type_name::<T::Reply>().to_string(),
})
}
}
}
Err(_recv_err) => {
crate::dead_letter::record::<M>(
self.identity(),
crate::dead_letter::DeadLetterReason::ReplyDropped,
"blocking_ask",
);
#[cfg(feature = "tracing")]
warn!("Blocking ask reply channel closed unexpectedly");
Err(Error::Receive {
identity: self.identity(),
details: "Reply channel closed unexpectedly".to_string(),
})
}
}
}
fn blocking_ask_with_timeout_impl<M>(&self, msg: M, timeout: Duration) -> Result<T::Reply>
where
T: Message<M>,
M: Send + 'static,
T::Reply: Send + 'static,
{
let self_clone = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build();
let result = match rt {
Ok(runtime) => runtime.block_on(async {
tokio::time::timeout(timeout, self_clone.ask(msg))
.await
.map_err(|_| {
crate::dead_letter::record::<M>(
self_clone.identity(),
crate::dead_letter::DeadLetterReason::Timeout,
"blocking_ask",
);
Error::Timeout {
identity: self_clone.identity(),
timeout,
operation: "blocking_ask".to_string(),
}
})?
}),
Err(e) => Err(Error::Send {
identity: self_clone.identity(),
details: format!("Failed to create runtime: {}", e),
}),
};
let _ = tx.send(result);
});
rx.recv().map_err(|_| Error::Send {
identity: self.identity(),
details: "Timeout thread terminated unexpectedly".to_string(),
})?
}
#[deprecated(
since = "0.10.0",
note = "Use `blocking_tell` instead. Timeout parameter is ignored."
)]
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_tell_blocking",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
timeout_ms = timeout.map(|t| t.as_millis())
),
skip(self, msg)
))]
pub fn tell_blocking<M>(&self, msg: M, timeout: Option<Duration>) -> Result<()>
where
T: Message<M>,
M: Send + 'static,
{
#[cfg(feature = "tracing")]
debug!("Executing deprecated tell_blocking, delegating to blocking_tell");
let _ = timeout;
self.blocking_tell(msg, None)
}
#[deprecated(
since = "0.10.0",
note = "Use `blocking_ask` instead. Timeout parameter is ignored."
)]
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_ask_blocking",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
reply_type = %std::any::type_name::<T::Reply>(),
timeout_ms = timeout.map(|t| t.as_millis())
),
skip(self, msg)
))]
pub fn ask_blocking<M>(&self, msg: M, timeout: Option<Duration>) -> Result<T::Reply>
where
T: Message<M>,
M: Send + 'static,
T::Reply: Send + 'static,
{
#[cfg(feature = "tracing")]
debug!("Executing deprecated ask_blocking, delegating to blocking_ask");
let _ = timeout;
self.blocking_ask(msg, None)
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "debug",
name = "actor_ask_join",
fields(
actor_id = %self.identity(),
message_type = %std::any::type_name::<M>(),
result_type = %std::any::type_name::<R>()
),
skip(self, msg)
))]
pub async fn ask_join<M, R>(&self, msg: M) -> crate::Result<R>
where
T: Message<M, Reply = tokio::task::JoinHandle<R>>,
M: Send + 'static,
R: Send + 'static,
{
#[cfg(feature = "tracing")]
tracing::debug!("Sending ask_join message and waiting for task completion");
let join_handle = self.ask(msg).await?;
#[cfg(feature = "tracing")]
tracing::debug!("Received JoinHandle, awaiting task completion");
let result = join_handle.await.map_err(|join_error| crate::Error::Join {
identity: self.identity(),
source: join_error,
})?;
#[cfg(feature = "tracing")]
tracing::debug!("Task completed successfully");
Ok(result)
}
#[cfg(feature = "metrics")]
pub fn metrics(&self) -> crate::MetricsSnapshot {
self.metrics.snapshot()
}
#[cfg(feature = "metrics")]
#[inline]
pub fn message_count(&self) -> u64 {
self.metrics.message_count()
}
#[cfg(feature = "metrics")]
#[inline]
pub fn avg_processing_time(&self) -> std::time::Duration {
self.metrics.avg_processing_time()
}
#[cfg(feature = "metrics")]
#[inline]
pub fn max_processing_time(&self) -> std::time::Duration {
self.metrics.max_processing_time()
}
#[cfg(feature = "metrics")]
#[inline]
pub fn error_count(&self) -> u64 {
self.metrics.error_count()
}
#[cfg(feature = "metrics")]
#[inline]
pub fn uptime(&self) -> std::time::Duration {
self.metrics.uptime()
}
#[cfg(feature = "metrics")]
#[inline]
pub fn last_activity(&self) -> Option<std::time::SystemTime> {
self.metrics.last_activity()
}
#[cfg(feature = "metrics")]
#[inline]
pub(crate) fn metrics_collector(&self) -> &MetricsCollector {
&self.metrics
}
}
impl<T: Actor> Clone for ActorRef<T> {
#[inline]
fn clone(&self) -> Self {
ActorRef {
id: self.id,
sender: self.sender.clone(),
terminate_sender: self.terminate_sender.clone(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
}
}
}
#[derive(Debug)]
pub struct ActorWeak<T: Actor> {
id: Identity,
sender: tokio::sync::mpsc::WeakSender<MailboxMessage<T>>,
terminate_sender: tokio::sync::mpsc::WeakSender<ControlSignal>,
#[cfg(feature = "metrics")]
metrics: Arc<MetricsCollector>,
}
impl<T: Actor> ActorWeak<T> {
#[inline]
pub fn upgrade(&self) -> Option<ActorRef<T>> {
let sender = self.sender.upgrade()?;
let terminate_sender = self.terminate_sender.upgrade()?;
Some(ActorRef {
id: self.id,
sender,
terminate_sender,
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
})
}
pub fn identity(&self) -> Identity {
self.id
}
#[inline]
pub fn is_alive(&self) -> bool {
self.sender.strong_count() > 0 && self.terminate_sender.strong_count() > 0
}
}
impl<T: Actor> Clone for ActorWeak<T> {
#[inline]
fn clone(&self) -> Self {
ActorWeak {
id: self.id,
sender: self.sender.clone(),
terminate_sender: self.terminate_sender.clone(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
}
}
}