nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
use async_trait::async_trait;
use futures::future::BoxFuture;
use once_cell::sync::Lazy;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

use crate::actor::actor::actor_inner_error::ActorInnerError;
use crate::actor::actor::pid::ExtendedPid;
use crate::actor::actor::restart_statistics::RestartStatistics;
use crate::actor::actor_system::ActorSystem;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::supervisor::directive::Directive;
use crate::actor::supervisor::strategy_one_for_one::OneForOneStrategy;
use crate::actor::supervisor::strategy_restarting::RestartingStrategy;
use crate::actor::supervisor::supervision_event::SupervisorEvent;
use crate::actor::supervisor::supervisor_strategy_handle::SupervisorStrategyHandle;

#[derive(Clone)]
pub struct Decider(Arc<dyn Fn(ActorInnerError) -> BoxFuture<'static, Directive> + Send + Sync>);

impl Decider {
  pub fn new<F, Fut>(f: F) -> Self
  where
    F: Fn(ActorInnerError) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Directive> + Send + 'static, {
    Decider(Arc::new(move |error| Box::pin(f(error))))
  }

  pub async fn run(&self, reason: ActorInnerError) -> Directive {
    (self.0)(reason).await
  }
}

impl Debug for Decider {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    write!(f, "DeciderFunc")
  }
}

impl PartialEq for Decider {
  fn eq(&self, other: &Self) -> bool {
    Arc::ptr_eq(&self.0, &other.0)
  }
}

impl Eq for Decider {}

impl std::hash::Hash for Decider {
  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
    (self.0.as_ref() as *const dyn Fn(ActorInnerError) -> BoxFuture<'static, Directive>).hash(state);
  }
}

#[async_trait]
pub trait SupervisorStrategy: Debug + Send + Sync {
  async fn handle_child_failure(
    &self,
    actor_system: &ActorSystem,
    supervisor: SupervisorHandle,
    child: ExtendedPid,
    rs: RestartStatistics,
    reason: ActorInnerError,
    message_handle: MessageHandle,
  );

  fn as_any(&self) -> &dyn std::any::Any;
}

#[async_trait]
pub trait Supervisor: Debug + Send + Sync + 'static {
  async fn get_children(&self) -> Vec<ExtendedPid>;
  async fn escalate_failure(&self, reason: ActorInnerError, message_handle: MessageHandle);
  async fn restart_children(&self, pids: &[ExtendedPid]);
  async fn stop_children(&self, pids: &[ExtendedPid]);
  async fn resume_children(&self, pids: &[ExtendedPid]);
}

#[derive(Debug, Clone)]
pub struct SupervisorHandle(Arc<Mutex<dyn Supervisor>>);

impl SupervisorHandle {
  pub fn new_arc(s: Arc<Mutex<dyn Supervisor>>) -> Self {
    SupervisorHandle(s)
  }

  pub fn new(s: impl Supervisor + 'static) -> Self {
    SupervisorHandle(Arc::new(Mutex::new(s)))
  }
}

impl PartialEq for SupervisorHandle {
  fn eq(&self, other: &Self) -> bool {
    Arc::ptr_eq(&self.0, &other.0)
  }
}

impl Eq for SupervisorHandle {}

impl std::hash::Hash for SupervisorHandle {
  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
    (self.0.as_ref() as *const Mutex<dyn Supervisor>).hash(state);
  }
}

#[async_trait]
impl Supervisor for SupervisorHandle {
  async fn get_children(&self) -> Vec<ExtendedPid> {
    let mg = self.0.lock().await;
    mg.get_children().await
  }

  async fn escalate_failure(&self, reason: ActorInnerError, message_handle: MessageHandle) {
    let mg = self.0.lock().await;
    mg.escalate_failure(reason, message_handle).await;
  }

  async fn restart_children(&self, pids: &[ExtendedPid]) {
    let mg = self.0.lock().await;
    mg.restart_children(pids).await;
  }

  async fn stop_children(&self, pids: &[ExtendedPid]) {
    let mg = self.0.lock().await;
    mg.stop_children(pids).await;
  }

  async fn resume_children(&self, pids: &[ExtendedPid]) {
    let mg = self.0.lock().await;
    mg.resume_children(pids).await;
  }
}

pub async fn log_failure(
  actor_system: &ActorSystem,
  child: &ExtendedPid,
  reason: ActorInnerError,
  directive: Directive,
) {
  actor_system
    .get_event_stream()
    .await
    .publish(MessageHandle::new(SupervisorEvent {
      child: child.clone(),
      reason,
      directive,
    }))
    .await;
}

pub static DEFAULT_SUPERVISION_STRATEGY: Lazy<SupervisorStrategyHandle> =
  Lazy::new(|| SupervisorStrategyHandle::new(OneForOneStrategy::new(10, Duration::from_secs(10))));

pub static RESTARTING_SUPERVISION_STRATEGY: Lazy<SupervisorStrategyHandle> =
  Lazy::new(|| SupervisorStrategyHandle::new(RestartingStrategy::new()));