atomr_core/actor/
context.rs1use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::{Arc, Weak};
6use std::time::Duration;
7
8use std::marker::PhantomData;
9
10use super::actor_cell::{ChildEntry, SystemMsg};
11use super::actor_ref::{ActorRef, UntypedActorRef};
12use super::path::ActorPath;
13use super::props::Props;
14use super::scheduler::Scheduler;
15use super::sender::Sender;
16use super::traits::Actor;
17
18#[derive(Clone)]
29pub struct SystemHandle {
30 inner: Weak<super::actor_system::ActorSystemInner>,
31}
32
33impl SystemHandle {
34 pub(crate) fn new(inner: Weak<super::actor_system::ActorSystemInner>) -> Self {
35 Self { inner }
36 }
37
38 pub fn is_alive(&self) -> bool {
40 self.inner.strong_count() > 0
41 }
42
43 pub fn scheduler(&self) -> Option<Arc<dyn Scheduler>> {
46 self.inner.upgrade().map(|s| s.scheduler.clone())
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55#[non_exhaustive]
56pub enum LifecyclePhase {
57 Starting,
58 Running,
59 Stopping,
60}
61
62pub struct Context<A: Actor> {
64 pub(crate) self_ref: ActorRef<A::Msg>,
65 pub(crate) path: ActorPath,
66 pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
67 pub(crate) children: HashMap<String, ChildEntry>,
68 pub(crate) watching: HashSet<ActorPath>,
69 pub(crate) watched_by: HashSet<UntypedActorRef>,
70 pub(crate) stash: VecDeque<A::Msg>,
71 pub(crate) receive_timeout: Option<Duration>,
72 pub(crate) current_sender: Sender,
73 pub(crate) current_metadata: super::metadata::Metadata,
74 pub(crate) stopping: bool,
75 pub(crate) phase: LifecyclePhase,
76}
77
78impl<A: Actor> Context<A> {
79 pub(crate) fn new(
80 self_ref: ActorRef<A::Msg>,
81 path: ActorPath,
82 system: Weak<super::actor_system::ActorSystemInner>,
83 ) -> Self {
84 Self {
85 self_ref,
86 path,
87 system,
88 children: HashMap::new(),
89 watching: HashSet::new(),
90 watched_by: HashSet::new(),
91 stash: VecDeque::new(),
92 receive_timeout: None,
93 current_sender: Sender::None,
94 current_metadata: super::metadata::Metadata::new(),
95 stopping: false,
96 phase: LifecyclePhase::Starting,
97 }
98 }
99
100 pub fn metadata(&self) -> &super::metadata::Metadata {
104 &self.current_metadata
105 }
106
107 pub fn phase(&self) -> LifecyclePhase {
112 self.phase
113 }
114
115 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
116 &self.self_ref
117 }
118
119 pub fn path(&self) -> &ActorPath {
120 &self.path
121 }
122
123 pub fn system_handle(&self) -> SystemHandle {
128 SystemHandle::new(self.system.clone())
129 }
130
131 pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
133 if self.children.contains_key(name) {
134 return Err(SpawnError::NameTaken(name.into()));
135 }
136 let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
137 let child_path = self.path.child(name);
138 let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
139 if let Some(obs) = system.spawn_observer.read().as_ref() {
140 obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
141 }
142 self.children.insert(
143 name.to_string(),
144 ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
145 );
146 Ok(r)
147 }
148
149 pub fn stop_child(&mut self, name: &str) {
151 if let Some(c) = self.children.get(name) {
152 let _ = c.system_tx.send(SystemMsg::Stop);
153 }
154 }
155
156 pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
159 if self.watching.insert(target.path().clone()) {
160 let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
161 }
162 }
163
164 pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
166 if self.watching.remove(target.path()) {
167 let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
168 }
169 }
170
171 pub fn stash(&mut self, msg: A::Msg) {
173 self.stash.push_back(msg);
174 }
175
176 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
178 let mut out = Vec::with_capacity(self.stash.len());
179 while let Some(m) = self.stash.pop_front() {
180 out.push(m);
181 }
182 out
183 }
184
185 pub fn stop_self(&mut self) {
187 self.stopping = true;
188 }
189
190 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
192 self.receive_timeout = d;
193 }
194
195 pub fn sender(&self) -> &Sender {
200 &self.current_sender
201 }
202
203 #[doc(hidden)]
205 pub fn sender_typed(&self) -> &Sender {
206 &self.current_sender
207 }
208
209 pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
215 if P::PHASE == self.phase {
216 Some(TypedContext { inner: self, _phase: PhantomData })
217 } else {
218 None
219 }
220 }
221
222 pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
224 self.phased::<Starting>()
225 }
226
227 pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
229 self.phased::<Running>()
230 }
231
232 pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
234 self.phased::<Stopping>()
235 }
236}
237
238pub trait PhaseMarker: sealed::Sealed {
242 const PHASE: LifecyclePhase;
243}
244
245pub struct Starting;
247pub struct Running;
249pub struct Stopping;
251
252mod sealed {
253 pub trait Sealed {}
254 impl Sealed for super::Starting {}
255 impl Sealed for super::Running {}
256 impl Sealed for super::Stopping {}
257}
258
259impl PhaseMarker for Starting {
260 const PHASE: LifecyclePhase = LifecyclePhase::Starting;
261}
262impl PhaseMarker for Running {
263 const PHASE: LifecyclePhase = LifecyclePhase::Running;
264}
265impl PhaseMarker for Stopping {
266 const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
267}
268
269pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
276 inner: &'a mut Context<A>,
277 _phase: PhantomData<P>,
278}
279
280impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
281 pub fn ctx(&self) -> &Context<A> {
282 self.inner
283 }
284 pub fn ctx_mut(&mut self) -> &mut Context<A> {
285 self.inner
286 }
287 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
288 &self.inner.self_ref
289 }
290}
291
292impl<'a, A: Actor> TypedContext<'a, A, Running> {
293 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
295 self.inner.set_receive_timeout(d);
296 }
297
298 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
300 self.inner.unstash_all()
301 }
302
303 pub fn stop_self(&mut self) {
306 self.inner.stop_self();
307 }
308}
309
310#[derive(Debug, thiserror::Error)]
311pub enum SpawnError {
312 #[error("child name `{0}` already taken")]
313 NameTaken(String),
314 #[error("actor system has terminated")]
315 SystemTerminated,
316}