nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
use std::any::Any;
use std::time::Duration;

use async_trait::async_trait;
use rand::Rng;

use crate::actor::actor::actor_inner_error::ActorInnerError;
use crate::actor::actor::pid::ExtendedPid;
use crate::actor::actor::restart_statistics::RestartStatistics;
use crate::actor::actor_system::ActorSystem;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::supervisor::directive::Directive;
use crate::actor::supervisor::supervisor_strategy::{log_failure, Supervisor, SupervisorHandle, SupervisorStrategy};

#[derive(Debug)]
pub struct ExponentialBackoffStrategy {
  backoff_window: Duration,
  initial_backoff: Option<Duration>,
}

impl ExponentialBackoffStrategy {
  pub fn new(backoff_window: Duration) -> Self {
    Self {
      backoff_window,
      initial_backoff: None,
    }
  }

  pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
    self.initial_backoff = Some(initial_backoff);
    self
  }

  pub(crate) async fn set_failure_count(&self, rs: &mut RestartStatistics) {
    if rs.number_of_failures(self.backoff_window).await == 0 {
      rs.reset().await;
    }
    rs.fail().await;
  }
}

#[async_trait]
impl SupervisorStrategy for ExponentialBackoffStrategy {
  async fn handle_child_failure(
    &self,
    actor_system: &ActorSystem,
    supervisor: SupervisorHandle,
    child: ExtendedPid,
    mut rs: RestartStatistics,
    reason: ActorInnerError,
    _: MessageHandle,
  ) {
    self.set_failure_count(&mut rs).await;

    let backoff = rs.failure_count().await as u64 * self.initial_backoff.map(|v| v.as_nanos()).unwrap_or(0) as u64;
    let noise = rand::thread_rng().gen_range(0..500);
    let dur = Duration::from_nanos(backoff + noise);

    let actor_system = actor_system.clone();
    let supervisor = supervisor.clone();
    let child = child.clone();
    let reason = reason.clone();

    tokio::spawn(async move {
      tokio::time::sleep(dur).await;
      log_failure(&actor_system, &child, reason.clone(), Directive::Restart).await;
      supervisor.restart_children(&[child]).await;
    });
  }

  fn as_any(&self) -> &dyn Any {
    self
  }
}