use crate::Pid;
use crate::mailbox::Mailbox;
use crate::message::{Envelope, ExitReason, Message, Signal};
use async_trait::async_trait;
#[async_trait]
pub trait Actor: Send + 'static {
async fn started(&mut self, _ctx: &mut ActorContext) {}
async fn handle_message(&mut self, msg: Message, ctx: &mut ActorContext);
async fn handle_signal(&mut self, signal: Signal, ctx: &mut ActorContext) {
match signal {
Signal::Exit { from, reason } => {
if !ctx.is_trapping_exits() && !reason.is_normal() {
tracing::warn!(
"Actor {} received EXIT from {}: {}",
ctx.pid(),
from,
reason
);
ctx.stop(reason);
}
}
Signal::Down {
reference: _,
pid,
reason,
} => {
tracing::debug!("Actor {} received DOWN for {}: {}", ctx.pid(), pid, reason);
}
Signal::Stop => {
ctx.stop(ExitReason::Normal);
}
Signal::Kill => {
ctx.stop(ExitReason::Killed);
}
}
}
async fn stopped(&mut self, _reason: &ExitReason, _ctx: &mut ActorContext) {}
}
pub struct ActorContext {
pid: Pid,
mailbox: Mailbox,
trap_exit: bool,
should_stop: bool,
stop_reason: Option<ExitReason>,
system: Option<std::sync::Weak<crate::ActorSystem>>,
}
impl ActorContext {
pub(crate) fn new(pid: Pid, mailbox: Mailbox) -> Self {
Self {
pid,
mailbox,
trap_exit: false,
should_stop: false,
stop_reason: None,
system: None,
}
}
pub(crate) fn set_system(&mut self, system: std::sync::Weak<crate::ActorSystem>) {
self.system = Some(system);
}
pub fn pid(&self) -> Pid {
self.pid
}
pub fn is_trapping_exits(&self) -> bool {
self.trap_exit
}
pub fn trap_exit(&mut self, trap: bool) {
self.trap_exit = trap;
}
pub fn stop(&mut self, reason: ExitReason) {
self.should_stop = true;
self.stop_reason = Some(reason);
}
pub(crate) fn should_stop(&self) -> bool {
self.should_stop
}
pub(crate) fn stop_reason(&self) -> Option<&ExitReason> {
self.stop_reason.as_ref()
}
pub(crate) async fn recv(&mut self) -> Option<Envelope> {
self.mailbox.recv().await
}
#[allow(dead_code)]
pub(crate) fn close_mailbox(&mut self) {
self.mailbox.close();
}
pub async fn send(&self, to: Pid, msg: Message) -> crate::Result<()> {
if let Some(system_weak) = &self.system
&& let Some(system) = system_weak.upgrade()
{
return system.send(to, msg).await;
}
Err(crate::ActorError::SendFailed(to))
}
pub fn whereis(&self, name: &str) -> Option<Pid> {
if let Some(system_weak) = &self.system
&& let Some(system) = system_weak.upgrade()
{
return system.whereis(name);
}
None
}
pub fn send_after(
&self,
dest: crate::scheduler::Destination,
msg: Message,
duration: std::time::Duration,
) -> Option<crate::scheduler::TimerRef> {
if let Some(system_weak) = &self.system
&& let Some(system) = system_weak.upgrade()
{
return Some(system.send_after(dest, msg, duration));
}
None
}
pub async fn receive<F, T>(&mut self, predicate: F) -> Option<T>
where
F: FnMut(&Message) -> Option<T>,
{
self.mailbox.recv_matching(predicate, None).await
}
pub async fn receive_timeout<F, T>(
&mut self,
predicate: F,
timeout: std::time::Duration,
) -> Option<T>
where
F: FnMut(&Message) -> Option<T>,
{
self.mailbox.recv_matching(predicate, Some(timeout)).await
}
pub fn try_receive<F, T>(&mut self, predicate: F) -> Option<T>
where
F: FnMut(&Message) -> Option<T>,
{
self.mailbox.try_recv_matching(predicate)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mailbox::{DEFAULT_MAILBOX_CAPACITY, Mailbox};
struct TestActor {
messages_received: usize,
}
#[async_trait]
impl Actor for TestActor {
async fn handle_message(&mut self, _msg: Message, _ctx: &mut ActorContext) {
self.messages_received += 1;
}
}
#[tokio::test]
async fn test_actor_context_pid() {
let pid = Pid::new();
let (mailbox, _sender) = Mailbox::new(DEFAULT_MAILBOX_CAPACITY);
let ctx = ActorContext::new(pid, mailbox);
assert_eq!(ctx.pid(), pid);
}
#[tokio::test]
async fn test_actor_context_trap_exit() {
let pid = Pid::new();
let (mailbox, _sender) = Mailbox::new(DEFAULT_MAILBOX_CAPACITY);
let mut ctx = ActorContext::new(pid, mailbox);
assert!(!ctx.is_trapping_exits());
ctx.trap_exit(true);
assert!(ctx.is_trapping_exits());
}
#[tokio::test]
async fn test_actor_context_stop() {
let pid = Pid::new();
let (mailbox, _sender) = Mailbox::new(DEFAULT_MAILBOX_CAPACITY);
let mut ctx = ActorContext::new(pid, mailbox);
assert!(!ctx.should_stop());
ctx.stop(ExitReason::Normal);
assert!(ctx.should_stop());
assert_eq!(ctx.stop_reason(), Some(&ExitReason::Normal));
}
#[tokio::test]
async fn test_actor_handle_exit_signal() {
let pid = Pid::new();
let (mailbox, _sender) = Mailbox::new(DEFAULT_MAILBOX_CAPACITY);
let mut ctx = ActorContext::new(pid, mailbox);
let mut actor = TestActor {
messages_received: 0,
};
let signal = Signal::Exit {
from: Pid::new(),
reason: ExitReason::Panic("error".to_string()),
};
actor.handle_signal(signal, &mut ctx).await;
assert!(ctx.should_stop());
}
#[tokio::test]
async fn test_actor_trap_exit_signal() {
let pid = Pid::new();
let (mailbox, _sender) = Mailbox::new(DEFAULT_MAILBOX_CAPACITY);
let mut ctx = ActorContext::new(pid, mailbox);
let mut actor = TestActor {
messages_received: 0,
};
ctx.trap_exit(true);
let signal = Signal::Exit {
from: Pid::new(),
reason: ExitReason::Panic("error".to_string()),
};
actor.handle_signal(signal, &mut ctx).await;
assert!(!ctx.should_stop());
}
#[tokio::test]
async fn test_actor_stop_signal() {
let pid = Pid::new();
let (mailbox, _sender) = Mailbox::new(DEFAULT_MAILBOX_CAPACITY);
let mut ctx = ActorContext::new(pid, mailbox);
let mut actor = TestActor {
messages_received: 0,
};
actor.handle_signal(Signal::Stop, &mut ctx).await;
assert!(ctx.should_stop());
assert_eq!(ctx.stop_reason(), Some(&ExitReason::Normal));
}
#[tokio::test]
async fn test_actor_kill_signal() {
let pid = Pid::new();
let (mailbox, _sender) = Mailbox::new(DEFAULT_MAILBOX_CAPACITY);
let mut ctx = ActorContext::new(pid, mailbox);
let mut actor = TestActor {
messages_received: 0,
};
actor.handle_signal(Signal::Kill, &mut ctx).await;
assert!(ctx.should_stop());
assert_eq!(ctx.stop_reason(), Some(&ExitReason::Killed));
}
}