atomr_core/actor/
context.rs1use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::Weak;
6use std::time::Duration;
7
8use std::marker::PhantomData;
9
10use super::actor_cell::{ChildEntry, SystemMsg};
11use super::actor_ref::{ActorRef, UntypedActorRef};
12use super::path::ActorPath;
13use super::props::Props;
14use super::sender::Sender;
15use super::traits::Actor;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[non_exhaustive]
23pub enum LifecyclePhase {
24 Starting,
25 Running,
26 Stopping,
27}
28
29pub struct Context<A: Actor> {
31 pub(crate) self_ref: ActorRef<A::Msg>,
32 pub(crate) path: ActorPath,
33 pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
34 pub(crate) children: HashMap<String, ChildEntry>,
35 pub(crate) watching: HashSet<ActorPath>,
36 pub(crate) watched_by: HashSet<UntypedActorRef>,
37 pub(crate) stash: VecDeque<A::Msg>,
38 pub(crate) receive_timeout: Option<Duration>,
39 pub(crate) current_sender: Sender,
40 pub(crate) stopping: bool,
41 pub(crate) phase: LifecyclePhase,
42}
43
44impl<A: Actor> Context<A> {
45 pub(crate) fn new(
46 self_ref: ActorRef<A::Msg>,
47 path: ActorPath,
48 system: Weak<super::actor_system::ActorSystemInner>,
49 ) -> Self {
50 Self {
51 self_ref,
52 path,
53 system,
54 children: HashMap::new(),
55 watching: HashSet::new(),
56 watched_by: HashSet::new(),
57 stash: VecDeque::new(),
58 receive_timeout: None,
59 current_sender: Sender::None,
60 stopping: false,
61 phase: LifecyclePhase::Starting,
62 }
63 }
64
65 pub fn phase(&self) -> LifecyclePhase {
70 self.phase
71 }
72
73 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
74 &self.self_ref
75 }
76
77 pub fn path(&self) -> &ActorPath {
78 &self.path
79 }
80
81 pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
83 if self.children.contains_key(name) {
84 return Err(SpawnError::NameTaken(name.into()));
85 }
86 let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
87 let child_path = self.path.child(name);
88 let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
89 if let Some(obs) = system.spawn_observer.read().as_ref() {
90 obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
91 }
92 self.children.insert(
93 name.to_string(),
94 ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
95 );
96 Ok(r)
97 }
98
99 pub fn stop_child(&mut self, name: &str) {
101 if let Some(c) = self.children.get(name) {
102 let _ = c.system_tx.send(SystemMsg::Stop);
103 }
104 }
105
106 pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
109 if self.watching.insert(target.path().clone()) {
110 let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
111 }
112 }
113
114 pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
116 if self.watching.remove(target.path()) {
117 let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
118 }
119 }
120
121 pub fn stash(&mut self, msg: A::Msg) {
123 self.stash.push_back(msg);
124 }
125
126 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
128 let mut out = Vec::with_capacity(self.stash.len());
129 while let Some(m) = self.stash.pop_front() {
130 out.push(m);
131 }
132 out
133 }
134
135 pub fn stop_self(&mut self) {
137 self.stopping = true;
138 }
139
140 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
142 self.receive_timeout = d;
143 }
144
145 pub fn sender(&self) -> &Sender {
150 &self.current_sender
151 }
152
153 #[doc(hidden)]
155 pub fn sender_typed(&self) -> &Sender {
156 &self.current_sender
157 }
158
159 pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
164 if P::PHASE == self.phase {
165 Some(TypedContext { inner: self, _phase: PhantomData })
166 } else {
167 None
168 }
169 }
170
171 pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
173 self.phased::<Starting>()
174 }
175
176 pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
178 self.phased::<Running>()
179 }
180
181 pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
183 self.phased::<Stopping>()
184 }
185}
186
187pub trait PhaseMarker: sealed::Sealed {
191 const PHASE: LifecyclePhase;
192}
193
194pub struct Starting;
196pub struct Running;
198pub struct Stopping;
200
201mod sealed {
202 pub trait Sealed {}
203 impl Sealed for super::Starting {}
204 impl Sealed for super::Running {}
205 impl Sealed for super::Stopping {}
206}
207
208impl PhaseMarker for Starting {
209 const PHASE: LifecyclePhase = LifecyclePhase::Starting;
210}
211impl PhaseMarker for Running {
212 const PHASE: LifecyclePhase = LifecyclePhase::Running;
213}
214impl PhaseMarker for Stopping {
215 const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
216}
217
218pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
225 inner: &'a mut Context<A>,
226 _phase: PhantomData<P>,
227}
228
229impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
230 pub fn ctx(&self) -> &Context<A> {
231 self.inner
232 }
233 pub fn ctx_mut(&mut self) -> &mut Context<A> {
234 self.inner
235 }
236 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
237 &self.inner.self_ref
238 }
239}
240
241impl<'a, A: Actor> TypedContext<'a, A, Running> {
242 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
244 self.inner.set_receive_timeout(d);
245 }
246
247 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
249 self.inner.unstash_all()
250 }
251
252 pub fn stop_self(&mut self) {
255 self.inner.stop_self();
256 }
257}
258
259#[derive(Debug, thiserror::Error)]
260pub enum SpawnError {
261 #[error("child name `{0}` already taken")]
262 NameTaken(String),
263 #[error("actor system has terminated")]
264 SystemTerminated,
265}