use std::fmt;
use serde::Serialize;
use tokio::sync::{oneshot, watch};
use tracing::error;
use crate::actor_state::ActorState;
use crate::command::Observe;
use crate::mailbox::Priority;
use crate::observation::ObservationType;
use crate::registry::ActorJoinHandle;
use crate::{Actor, ActorContext, ActorExitStatus, Command, Mailbox, Observation};
pub struct ActorHandle<A: Actor> {
actor_context: ActorContext<A>,
last_state: watch::Receiver<A::ObservableState>,
join_handle: ActorJoinHandle,
}
#[derive(Clone, Eq, PartialEq, Debug, Hash, Serialize)]
pub enum Health {
Healthy,
FailureOrUnhealthy,
Success,
}
#[derive(Clone, Debug)]
pub struct Healthz;
impl<A: Actor> fmt::Debug for ActorHandle<A> {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter
.debug_struct("ActorHandle")
.field("name", &self.actor_context.actor_instance_id())
.finish()
}
}
pub trait Supervisable {
fn name(&self) -> &str;
fn harvest_health(&self) -> Health;
}
impl<A: Actor> Supervisable for ActorHandle<A> {
fn name(&self) -> &str {
self.actor_context.actor_instance_id()
}
fn harvest_health(&self) -> Health {
let actor_state = self.state();
if actor_state == ActorState::Success {
Health::Success
} else if actor_state == ActorState::Failure {
error!(actor = self.name(), "actor-exit-without-success");
Health::FailureOrUnhealthy
} else if self
.actor_context
.progress()
.registered_activity_since_last_call()
{
Health::Healthy
} else {
error!(actor = self.name(), "actor-timeout");
Health::FailureOrUnhealthy
}
}
}
impl<A: Actor> ActorHandle<A> {
pub(crate) fn new(
last_state: watch::Receiver<A::ObservableState>,
join_handle: ActorJoinHandle,
actor_context: ActorContext<A>,
) -> Self {
ActorHandle {
actor_context,
last_state,
join_handle,
}
}
pub fn state(&self) -> ActorState {
self.actor_context.state()
}
pub async fn process_pending_and_observe(&self) -> Observation<A::ObservableState> {
self.observe_with_priority(Priority::Low).await
}
pub async fn observe(&self) -> Observation<A::ObservableState> {
self.observe_with_priority(Priority::High).await
}
async fn observe_with_priority(&self, priority: Priority) -> Observation<A::ObservableState> {
if !self.actor_context.state().is_exit() {
if let Ok(oneshot_rx) = self
.actor_context
.mailbox()
.send_message_with_priority(Observe, priority)
.await
{
return self.wait_for_observable_state_callback(oneshot_rx).await;
} else {
error!(
actor_id = self.actor_context.actor_instance_id(),
"Failed to send observe message"
);
}
}
let state = self.last_observation();
Observation {
obs_type: ObservationType::PostMortem,
state,
}
}
pub fn pause(&self) {
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Command::Pause);
}
pub fn resume(&self) {
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Command::Resume);
}
pub async fn kill(self) -> (ActorExitStatus, A::ObservableState) {
self.actor_context.kill_switch().kill();
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Command::Nudge);
self.join().await
}
pub async fn quit(self) -> (ActorExitStatus, A::ObservableState) {
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Command::Quit);
self.join().await
}
pub async fn join(self) -> (ActorExitStatus, A::ObservableState) {
let exit_status = self.join_handle.join().await;
let observation = self.last_state.borrow().clone();
(exit_status, observation)
}
pub fn last_observation(&self) -> A::ObservableState {
self.last_state.borrow().clone()
}
async fn wait_for_observable_state_callback(
&self,
rx: oneshot::Receiver<A::ObservableState>,
) -> Observation<A::ObservableState> {
let scheduler_client = &self.actor_context.spawn_ctx().scheduler_client;
let observable_state_or_timeout =
scheduler_client.timeout(crate::OBSERVE_TIMEOUT, rx).await;
match observable_state_or_timeout {
Ok(Ok(state)) => {
let obs_type = ObservationType::Alive;
Observation { obs_type, state }
}
Ok(Err(_)) => {
let state = self.last_observation();
let obs_type = ObservationType::PostMortem;
Observation { obs_type, state }
}
Err(_) => {
let state = self.last_observation();
let obs_type = if self.actor_context.state().is_exit() {
ObservationType::PostMortem
} else {
ObservationType::Timeout
};
Observation { obs_type, state }
}
}
}
pub fn mailbox(&self) -> &Mailbox<A> {
self.actor_context.mailbox()
}
}
#[cfg(test)]
mod tests {
use async_trait::async_trait;
use super::*;
use crate::{Handler, Universe};
#[derive(Default)]
struct PanickingActor {
count: usize,
}
impl Actor for PanickingActor {
type ObservableState = usize;
fn observable_state(&self) -> usize {
self.count
}
}
#[derive(Debug)]
struct Panic;
#[async_trait]
impl Handler<Panic> for PanickingActor {
type Reply = ();
async fn handle(
&mut self,
_message: Panic,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.count += 1;
panic!("Oops");
}
}
#[derive(Default)]
struct ExitActor {
count: usize,
}
impl Actor for ExitActor {
type ObservableState = usize;
fn observable_state(&self) -> usize {
self.count
}
}
#[derive(Debug)]
struct Exit;
#[async_trait]
impl Handler<Exit> for ExitActor {
type Reply = ();
async fn handle(
&mut self,
_msg: Exit,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.count += 1;
Err(ActorExitStatus::DownstreamClosed)
}
}
#[tokio::test]
async fn test_panic_in_actor() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let (mailbox, handle) = universe.spawn_builder().spawn(PanickingActor::default());
mailbox.send_message(Panic).await?;
let (exit_status, count) = handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Panicked));
assert!(matches!(count, 1)); Ok(())
}
#[tokio::test]
async fn test_exit() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let (mailbox, handle) = universe.spawn_builder().spawn(ExitActor::default());
mailbox.send_message(Exit).await?;
let (exit_status, count) = handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
assert!(matches!(count, 1)); Ok(())
}
}