use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Weak;
use std::time::Duration;
use std::marker::PhantomData;
use super::actor_cell::{ChildEntry, SystemMsg};
use super::actor_ref::{ActorRef, UntypedActorRef};
use super::path::ActorPath;
use super::props::Props;
use super::sender::Sender;
use super::traits::Actor;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum LifecyclePhase {
Starting,
Running,
Stopping,
}
pub struct Context<A: Actor> {
pub(crate) self_ref: ActorRef<A::Msg>,
pub(crate) path: ActorPath,
pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
pub(crate) children: HashMap<String, ChildEntry>,
pub(crate) watching: HashSet<ActorPath>,
pub(crate) watched_by: HashSet<UntypedActorRef>,
pub(crate) stash: VecDeque<A::Msg>,
pub(crate) receive_timeout: Option<Duration>,
pub(crate) current_sender: Sender,
pub(crate) stopping: bool,
pub(crate) phase: LifecyclePhase,
}
impl<A: Actor> Context<A> {
pub(crate) fn new(
self_ref: ActorRef<A::Msg>,
path: ActorPath,
system: Weak<super::actor_system::ActorSystemInner>,
) -> Self {
Self {
self_ref,
path,
system,
children: HashMap::new(),
watching: HashSet::new(),
watched_by: HashSet::new(),
stash: VecDeque::new(),
receive_timeout: None,
current_sender: Sender::None,
stopping: false,
phase: LifecyclePhase::Starting,
}
}
pub fn phase(&self) -> LifecyclePhase {
self.phase
}
pub fn self_ref(&self) -> &ActorRef<A::Msg> {
&self.self_ref
}
pub fn path(&self) -> &ActorPath {
&self.path
}
pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
if self.children.contains_key(name) {
return Err(SpawnError::NameTaken(name.into()));
}
let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
let child_path = self.path.child(name);
let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
if let Some(obs) = system.spawn_observer.read().as_ref() {
obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
}
self.children.insert(
name.to_string(),
ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
);
Ok(r)
}
pub fn stop_child(&mut self, name: &str) {
if let Some(c) = self.children.get(name) {
let _ = c.system_tx.send(SystemMsg::Stop);
}
}
pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
if self.watching.insert(target.path().clone()) {
let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
}
}
pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
if self.watching.remove(target.path()) {
let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
}
}
pub fn stash(&mut self, msg: A::Msg) {
self.stash.push_back(msg);
}
pub fn unstash_all(&mut self) -> Vec<A::Msg> {
let mut out = Vec::with_capacity(self.stash.len());
while let Some(m) = self.stash.pop_front() {
out.push(m);
}
out
}
pub fn stop_self(&mut self) {
self.stopping = true;
}
pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
self.receive_timeout = d;
}
pub fn sender(&self) -> &Sender {
&self.current_sender
}
#[doc(hidden)]
pub fn sender_typed(&self) -> &Sender {
&self.current_sender
}
pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
if P::PHASE == self.phase {
Some(TypedContext { inner: self, _phase: PhantomData })
} else {
None
}
}
pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
self.phased::<Starting>()
}
pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
self.phased::<Running>()
}
pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
self.phased::<Stopping>()
}
}
pub trait PhaseMarker: sealed::Sealed {
const PHASE: LifecyclePhase;
}
pub struct Starting;
pub struct Running;
pub struct Stopping;
mod sealed {
pub trait Sealed {}
impl Sealed for super::Starting {}
impl Sealed for super::Running {}
impl Sealed for super::Stopping {}
}
impl PhaseMarker for Starting {
const PHASE: LifecyclePhase = LifecyclePhase::Starting;
}
impl PhaseMarker for Running {
const PHASE: LifecyclePhase = LifecyclePhase::Running;
}
impl PhaseMarker for Stopping {
const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
}
pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
inner: &'a mut Context<A>,
_phase: PhantomData<P>,
}
impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
pub fn ctx(&self) -> &Context<A> {
self.inner
}
pub fn ctx_mut(&mut self) -> &mut Context<A> {
self.inner
}
pub fn self_ref(&self) -> &ActorRef<A::Msg> {
&self.inner.self_ref
}
}
impl<'a, A: Actor> TypedContext<'a, A, Running> {
pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
self.inner.set_receive_timeout(d);
}
pub fn unstash_all(&mut self) -> Vec<A::Msg> {
self.inner.unstash_all()
}
pub fn stop_self(&mut self) {
self.inner.stop_self();
}
}
#[derive(Debug, thiserror::Error)]
pub enum SpawnError {
#[error("child name `{0}` already taken")]
NameTaken(String),
#[error("actor system has terminated")]
SystemTerminated,
}