nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
use std::any::Any;

use async_trait::async_trait;

use crate::actor::actor::pid::ExtendedPid;
use crate::actor::actor_system::ActorSystem;
use crate::actor::context::SenderPart;
use crate::actor::log::P_LOG;
use crate::actor::message::dead_letter_response::DeadLetterResponse;
use crate::actor::message::ignore_dead_letter_logging::IgnoreDeadLetterLogging;
use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::message::message_or_envelope::unwrap_envelope;
use crate::actor::message::system_message::SystemMessage;
use crate::actor::message::terminate_info::TerminateInfo;
use crate::actor::message::terminate_reason::TerminateReason;
use crate::actor::process::{Process, ProcessHandle};
use crate::actor::util::throttler::{Throttle, ThrottleCallback, Valve};
use crate::event_stream::event_handler::EventHandler;

#[derive(Debug, Clone)]
pub struct DeadLetterProcess {
  actor_system: ActorSystem,
}

impl DeadLetterProcess {
  pub async fn new(actor_system: ActorSystem) -> Self {
    let myself = Self { actor_system };
    let dead_letter_throttle_count = myself
      .actor_system
      .get_config()
      .await
      .dead_letter_throttle_count
      .clone();
    let dead_letter_throttle_interval = myself
      .actor_system
      .get_config()
      .await
      .dead_letter_throttle_interval
      .clone();
    let func = ThrottleCallback::new(move |i: usize| async move {
      P_LOG
        .info(&format!("DeadLetterProcess: Throttling dead letters, count: {}", i))
        .await;
    });
    let throttle = Throttle::new(dead_letter_throttle_count, dead_letter_throttle_interval, func).await;

    let cloned_self = myself.clone();
    myself
      .actor_system
      .get_process_registry()
      .await
      .add_process(ProcessHandle::new(myself.clone()), "deadletter");
    myself
      .actor_system
      .get_event_stream()
      .await
      .subscribe(EventHandler::new(move |msg| {
        let cloned_msg = msg.clone();
        let cloned_self = cloned_self.clone();
        let cloned_throttle = throttle.clone();
        async move {
          if let Some(dead_letter) = cloned_msg.to_typed::<DeadLetterEvent>() {
            if let Some(sender) = &dead_letter.sender {
              cloned_self
                .actor_system
                .get_root_context()
                .await
                .send(sender.clone(), MessageHandle::new(DeadLetterResponse { target: None }))
                .await
            }

            if cloned_self
              .actor_system
              .get_config()
              .await
              .developer_supervision_logging
              && dead_letter.sender.is_some()
            {
              return;
            }

            if let Some(is_ignore_dead_letter) = dead_letter.message_handle.to_typed::<IgnoreDeadLetterLogging>() {
              if cloned_throttle.should_throttle() == Valve::Open {
                P_LOG
                  .debug(&format!(
                    "DeadLetterProcess: Message from {} to {} was not delivered, message: {:?}",
                    dead_letter.sender.as_ref().unwrap(),
                    dead_letter
                      .pid
                      .as_ref()
                      .map(|v| v.to_string())
                      .unwrap_or("None".to_string()),
                    is_ignore_dead_letter,
                  ))
                  .await
              }
            }
          }
        }
      }))
      .await;

    let cloned_self = myself.clone();
    myself
      .actor_system
      .get_event_stream()
      .await
      .subscribe(EventHandler::new(move |msg| {
        let cloned_msg = msg.clone();
        let cloned_self = cloned_self.clone();
        async move {
          if let Some(dle) = cloned_msg.to_typed::<DeadLetterEvent>() {
            if let Some(SystemMessage::Watch(watch)) = dle.message_handle.to_typed::<SystemMessage>() {
              let actor_system = cloned_self.actor_system.clone();
              let pid = watch.watcher.clone().unwrap();
              let e_pid = ExtendedPid::new(pid.clone(), actor_system.clone());
              e_pid
                .send_system_message(
                  actor_system,
                  MessageHandle::new(SystemMessage::Terminate(TerminateInfo {
                    who: Some(pid),
                    why: TerminateReason::NotFound,
                  })),
                )
                .await;
            }
          }
        }
      }))
      .await;

    myself
  }
}

#[async_trait]
impl Process for DeadLetterProcess {
  async fn send_user_message(&self, pid: Option<&ExtendedPid>, message_handle: MessageHandle) {
    // TODO: Metrics

    let (_, msg, sender) = unwrap_envelope(message_handle.clone());
    self
      .actor_system
      .get_event_stream()
      .await
      .publish(MessageHandle::new(DeadLetterEvent {
        pid: pid.cloned(),
        message_handle: msg,
        sender,
      }))
      .await;
    tracing::debug!("DeadLetterProcess: send_user_message: msg = {:?}", message_handle);
  }

  async fn send_system_message(&self, pid: &ExtendedPid, message_handle: MessageHandle) {
    self
      .actor_system
      .get_event_stream()
      .await
      .publish(MessageHandle::new(DeadLetterEvent {
        pid: Some(pid.clone()),
        message_handle: message_handle.clone(),
        sender: None,
      }))
      .await;
    tracing::debug!("DeadLetterProcess: send_system_message: msg = {:?}", message_handle);
  }

  async fn stop(&self, pid: &ExtendedPid) {
    self
      .send_system_message(pid, MessageHandle::new(SystemMessage::Stop))
      .await
  }

  fn set_dead(&self) {}

  fn as_any(&self) -> &dyn std::any::Any {
    self
  }
}

#[derive(Debug, Clone)]
pub struct DeadLetterEvent {
  pub pid: Option<ExtendedPid>,
  pub message_handle: MessageHandle,
  pub sender: Option<ExtendedPid>,
}

impl PartialEq for DeadLetterEvent {
  fn eq(&self, other: &Self) -> bool {
    self.pid == other.pid && self.message_handle == other.message_handle && self.sender == other.sender
  }
}

impl Eq for DeadLetterEvent {}

impl Message for DeadLetterEvent {
  fn eq_message(&self, other: &dyn Message) -> bool {
    match other.as_any().downcast_ref::<DeadLetterEvent>() {
      Some(a) => self == a,
      None => false,
    }
  }

  fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
    self
  }
}