nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
#[cfg(test)]
mod test {
  use async_trait::async_trait;
  use std::any::Any;
  use std::collections::VecDeque;
  use std::env;
  use std::sync::Arc;
  use std::time::Duration;
  use thiserror::Error;
  use tokio::sync::{Mutex, Notify};
  use tokio::time::Instant;
  use tracing_subscriber::EnvFilter;

  use crate::actor::actor::actor::Actor;
  use crate::actor::actor::actor_error::ActorError;
  use crate::actor::actor::actor_inner_error::ActorInnerError;
  use crate::actor::actor::pid::ExtendedPid;
  use crate::actor::actor::props::Props;
  use crate::actor::actor::receiver_middleware::ReceiverMiddleware;
  use crate::actor::actor::receiver_middleware_chain::ReceiverMiddlewareChain;
  use crate::actor::actor::restart_statistics::RestartStatistics;
  use crate::actor::actor_system::ActorSystem;
  use crate::actor::context::context_handle::ContextHandle;
  use crate::actor::context::receiver_context_handle::ReceiverContextHandle;
  use crate::actor::context::{MessagePart, SenderPart, SpawnerPart};
  use crate::actor::message::auto_receive_message::AutoReceiveMessage;
  use crate::actor::message::message::Message;
  use crate::actor::message::message_handle::MessageHandle;
  use crate::actor::supervisor::strategy_one_for_one::OneForOneStrategy;
  use crate::actor::supervisor::supervisor_strategy::{SupervisorHandle, SupervisorStrategy};
  use crate::actor::supervisor::supervisor_strategy_handle::SupervisorStrategyHandle;

  #[tokio::test]
  async fn test_actor_with_own_supervisor_can_handle_failure() {
    let _ = env::set_var("RUST_LOG", "debug");
    let _ = tracing_subscriber::fmt()
      .with_env_filter(EnvFilter::from_default_env())
      .try_init();

    let system = ActorSystem::new().await;
    let mut root = system.get_root_context().await;
    let notify = Arc::new(Notify::new());
    let cloned_notify = notify.clone();
    let props = Props::from_actor_producer(move |_| {
      let cloned_notify = cloned_notify.clone();
      async move {
        ActorWithSupervisor {
          notify: cloned_notify.clone(),
        }
      }
    })
    .await;
    let pid = root.spawn(props).await;
    tracing::info!("pid = {:?}", pid);
    notify.notified().await;
  }

  #[tokio::test]
  async fn test_actor_stops_after_x_restarts() {
    let _ = env::set_var("RUST_LOG", "debug");
    let _ = tracing_subscriber::fmt()
      .with_env_filter(EnvFilter::from_default_env())
      .try_init();
    let observer = Observer::new();
    let system = ActorSystem::new().await;
    let mut root_context = system.get_root_context().await;

    let cloned_observer = observer.clone();
    let middles = ReceiverMiddleware::new(move |next| {
      let cloned_observer = cloned_observer.clone();
      ReceiverMiddlewareChain::new(move |ctx, moe| {
        let next = next.clone();
        let cloned_observer = cloned_observer.clone();
        async move {
          tracing::debug!("ReceiverMiddleware: moe = {:?}", moe);
          let msg = moe.get_message_handle();
          tracing::debug!(">>>> msg = {:?}", msg);
          let result = cloned_observer.receive(ctx.clone(), msg.clone()).await;
          if result.is_err() {
            return result;
          }
          next.run(ctx, moe).await
        }
      })
    });

    let props = Props::from_actor_producer_with_opts(
      |_| async { FailingChildActor },
      [
        Props::with_receiver_middlewares([middles]),
        Props::with_supervisor_strategy(SupervisorStrategyHandle::new(OneForOneStrategy::new(
          10,
          Duration::from_secs(10),
        ))),
      ],
    )
    .await;

    let child = root_context.spawn(props).await;
    let fail = MessageHandle::new(StringMessage("fail".to_string()));
    let d = Duration::from_secs(10);
    let _ = observer
      .expect_message(MessageHandle::new(AutoReceiveMessage::PreStart), d)
      .await;
    let _ = observer
      .expect_message(MessageHandle::new(AutoReceiveMessage::PostStart), d)
      .await;

    for i in 0..10 {
      tracing::debug!("Sending fail message: {}", i);
      root_context.send(child.clone(), fail.clone()).await;
      observer.expect_message(fail.clone(), d).await.unwrap();
      observer
        .expect_message(MessageHandle::new(AutoReceiveMessage::PreRestart), d)
        .await
        .unwrap();
      observer
        .expect_message(MessageHandle::new(AutoReceiveMessage::PostRestart), d)
        .await
        .unwrap();
    }
    root_context.send(child, fail.clone()).await;
    observer.expect_message(fail.clone(), d).await.unwrap();
    observer
      .expect_message(MessageHandle::new(AutoReceiveMessage::PreStop), d)
      .await
      .unwrap();
  }

  #[derive(Debug, Clone)]
  struct ActorWithSupervisor {
    notify: Arc<Notify>,
  }

  #[derive(Debug, Clone)]
  struct FailingChildActor;

  #[derive(Debug, Clone)]
  struct StringMessage(String);

  impl PartialEq for StringMessage {
    fn eq(&self, other: &Self) -> bool {
      self.0 == other.0
    }
  }

  impl Message for StringMessage {
    fn eq_message(&self, other: &dyn Message) -> bool {
      let other = other.as_any().downcast_ref::<StringMessage>();
      match other {
        Some(other) => self.0 == other.0,
        None => false,
      }
    }

    fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
      self
    }
  }

  #[async_trait]
  impl Actor for ActorWithSupervisor {
    async fn post_start(&self, mut ctx: ContextHandle) -> Result<(), ActorError> {
      tracing::debug!("ActorWithSupervisor::post_start");
      let props = Props::from_actor_producer(|_| async { FailingChildActor }).await;
      let child = ctx.spawn(props).await;
      ctx
        .send(child, MessageHandle::new(StringMessage("fail".to_string())))
        .await;
      Ok(())
    }

    async fn receive(&mut self, _: ContextHandle) -> Result<(), ActorError> {
      tracing::debug!("ActorWithSupervisor::receive");
      Ok(())
    }

    async fn get_supervisor_strategy(&self) -> Option<SupervisorStrategyHandle> {
      Some(SupervisorStrategyHandle::new(self.clone()))
    }
  }

  #[async_trait]
  impl SupervisorStrategy for ActorWithSupervisor {
    async fn handle_child_failure(
      &self,
      _: &ActorSystem,
      _: SupervisorHandle,
      child: ExtendedPid,
      rs: RestartStatistics,
      _: ActorInnerError,
      message_handle: MessageHandle,
    ) {
      tracing::debug!(
        "ActorWithSupervisor::handle_failure: child = {}, rs = {}, message = {:?}",
        child,
        rs,
        message_handle
      );
      self.notify.notify_one();
    }

    fn as_any(&self) -> &dyn Any {
      self
    }
  }

  #[async_trait]
  impl Actor for FailingChildActor {
    async fn post_start(&self, _: ContextHandle) -> Result<(), ActorError> {
      tracing::debug!("FailingChildActor::post_start");
      Ok(())
    }

    async fn receive(&mut self, ctx: ContextHandle) -> Result<(), ActorError> {
      let message_handle = ctx.get_message_handle().await;
      tracing::debug!("FailingChildActor::receive: msg = {:?}", message_handle);
      if let Some(StringMessage(msg)) = message_handle.to_typed::<StringMessage>() {
        tracing::debug!("FailingChildActor::receive: msg = {:?}", msg);
        Err(ActorError::ReceiveError(ActorInnerError::new("error")))
      } else {
        Ok(())
      }
    }
  }
  #[derive(Debug, Error)]
  enum TestError {
    #[error("Timeout")]
    TimeoutError,
    #[error("NoMatch")]
    NoMatch,
  }

  #[derive(Debug, Clone)]
  struct Observer {
    received: Arc<Mutex<VecDeque<MessageHandle>>>,
  }

  impl Observer {
    fn new() -> Self {
      Observer {
        received: Arc::new(Mutex::new(VecDeque::new())),
      }
    }

    async fn receive(&self, _: ReceiverContextHandle, message_handle: MessageHandle) -> Result<(), ActorError> {
      tracing::debug!(">>> Observer::receive: message_handle = {:?}", message_handle);
      self.received.lock().await.push_back(message_handle);
      Ok(())
    }

    async fn expect_message(&self, expected: MessageHandle, timeout: Duration) -> Result<(), TestError> {
      let start = Instant::now();
      let mut counter = 0;
      while start.elapsed() <= timeout {
        counter += 1;
        if counter == 1 {
          tracing::debug!("expect_message: expected = {:?}, counter = {}", expected, counter);
        }
        if let Some(received) = self.received.lock().await.pop_front() {
          tracing::debug!("expected = {:?}, received = {:?}", expected, received);
          if expected.eq_message(&received) {
            return Ok(());
          } else {
            return Err(TestError::NoMatch);
          }
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
      }
      Err(TestError::TimeoutError)
    }
  }
}