use tracing::{debug, warn};
use crate::actor::{Actor, ActorContext, ActorId, ActorState, Stopping};
use crate::address::{Address, Mailbox, Recipient, SenderInfo};
use crate::channel::mpsc;
use crate::envelope::EnvelopeProxy;
use crate::supervisor::SupervisionEvent;
pub const DEFAULT_MAILBOX_CAPACITY: usize = 8;
#[derive(Debug)]
pub struct Context<A>
where
A: Actor<Context = Self>,
{
label: String,
state: ActorState,
doorplate: Address<A>,
mailbox: Option<Mailbox<A>>,
drain_mailbox: bool,
error: Option<A::Error>, supervisor: Option<Recipient<SupervisionEvent<A>>>,
}
impl<A> Context<A>
where
A: Actor<Context = Self>,
{
pub fn with_capacity(label: String, capacity: usize) -> Self {
let (tx, rx) = mpsc::channel(capacity);
Self {
label,
state: ActorState::Unstarted,
doorplate: Address::new(tx),
mailbox: Some(Mailbox::new(rx)),
drain_mailbox: false,
supervisor: None,
error: None,
}
}
pub fn save_error(&mut self, error: A::Error) {
self.error = Some(error);
}
pub fn drain_mailbox(&mut self) {
self.drain_mailbox = true;
}
fn take_error(&mut self) -> Result<(), A::Error> {
match self.error.take() {
Some(e) => Err(e),
None => Ok(()),
}
}
}
impl<A> ActorContext<A> for Context<A>
where
A: Actor<Context = Self>,
{
fn new(label: String) -> Self {
Self::with_capacity(label, DEFAULT_MAILBOX_CAPACITY)
}
fn index(&self) -> ActorId {
self.doorplate.index()
}
fn label(&self) -> &str {
self.label.as_str()
}
fn address(&self) -> Address<A> {
self.doorplate.clone()
}
fn take_mailbox(&mut self) -> Option<Mailbox<A>> {
self.mailbox.take()
}
fn state(&self) -> ActorState {
self.state
}
fn set_state(&mut self, state: ActorState) {
self.state = state;
self.try_notify_supervisor(SupervisionEvent::State(self.address(), state));
}
async fn run_loop(&mut self, actor: &mut A, mailbox: &mut Mailbox<A>) -> Result<(), A::Error> {
while self.state() == ActorState::Running {
if self.drain_mailbox {
let count = mailbox.len();
for _ in 0..count {
let _ = mailbox.try_recv();
}
self.drain_mailbox = false;
}
match mailbox.recv().await {
Ok(mut envelope) => {
envelope.handle(actor, self).await;
if self.error.is_some() && self.state() == ActorState::Running {
self.set_state(ActorState::Stopping);
}
}
Err(_) => {
warn!("Mailbox is dropped, terminate the actor");
self.set_state(ActorState::Stopped);
}
};
match self.state() {
ActorState::Stopping => {
let result = self.take_error();
match actor.stopping(self).await {
Ok(Stopping::Stop) => return result,
Ok(Stopping::Continue) => {
if let Err(e) = result {
self.try_notify_supervisor(SupervisionEvent::Warn(
self.address(),
e,
))
};
self.set_state(ActorState::Running);
}
Err(e) => return result.or(Err(e)),
}
}
ActorState::Stopped => {
return self.take_error();
}
_ => {}
}
}
Ok(())
}
fn supervisor(&self) -> Option<&Recipient<SupervisionEvent<A>>> {
self.supervisor.as_ref()
}
fn set_supervisor(&mut self, supervisor: Option<Recipient<SupervisionEvent<A>>>) {
match supervisor {
Some(supervisor) => {
if supervisor.index() == self.index() {
warn!("Could not set the actor itself as its supervisor");
return;
}
debug!("Set actor {} as supervisor", supervisor.index());
self.supervisor = Some(supervisor);
}
None => {
if self.supervisor.take().is_some() {
debug!("Unset supervisor");
}
}
}
}
}