atomr_core/actor/
context.rs1use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::Weak;
6use std::time::Duration;
7
8use std::marker::PhantomData;
9
10use super::actor_cell::{ChildEntry, SystemMsg};
11use super::actor_ref::{ActorRef, UntypedActorRef};
12use super::path::ActorPath;
13use super::props::Props;
14use super::sender::Sender;
15use super::traits::Actor;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[non_exhaustive]
23pub enum LifecyclePhase {
24 Starting,
25 Running,
26 Stopping,
27}
28
29pub struct Context<A: Actor> {
31 pub(crate) self_ref: ActorRef<A::Msg>,
32 pub(crate) path: ActorPath,
33 pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
34 pub(crate) children: HashMap<String, ChildEntry>,
35 pub(crate) watching: HashSet<ActorPath>,
36 pub(crate) watched_by: HashSet<UntypedActorRef>,
37 pub(crate) stash: VecDeque<A::Msg>,
38 pub(crate) receive_timeout: Option<Duration>,
39 pub(crate) current_sender: Sender,
40 pub(crate) stopping: bool,
41 pub(crate) phase: LifecyclePhase,
42}
43
44impl<A: Actor> Context<A> {
45 pub(crate) fn new(
46 self_ref: ActorRef<A::Msg>,
47 path: ActorPath,
48 system: Weak<super::actor_system::ActorSystemInner>,
49 ) -> Self {
50 Self {
51 self_ref,
52 path,
53 system,
54 children: HashMap::new(),
55 watching: HashSet::new(),
56 watched_by: HashSet::new(),
57 stash: VecDeque::new(),
58 receive_timeout: None,
59 current_sender: Sender::None,
60 stopping: false,
61 phase: LifecyclePhase::Starting,
62 }
63 }
64
65 pub fn phase(&self) -> LifecyclePhase {
70 self.phase
71 }
72
73 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
74 &self.self_ref
75 }
76
77 pub fn path(&self) -> &ActorPath {
78 &self.path
79 }
80
81 pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
83 if self.children.contains_key(name) {
84 return Err(SpawnError::NameTaken(name.into()));
85 }
86 let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
87 let child_path = self.path.child(name);
88 let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
89 if let Some(obs) = system.spawn_observer.read().as_ref() {
90 obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
91 }
92 self.children.insert(
93 name.to_string(),
94 ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
95 );
96 Ok(r)
97 }
98
99 pub fn stop_child(&mut self, name: &str) {
101 if let Some(c) = self.children.get(name) {
102 let _ = c.system_tx.send(SystemMsg::Stop);
103 }
104 }
105
106 pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
109 if self.watching.insert(target.path().clone()) {
110 let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
111 }
112 }
113
114 pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
116 if self.watching.remove(target.path()) {
117 let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
118 }
119 }
120
121 pub fn stash(&mut self, msg: A::Msg) {
124 self.stash.push_back(msg);
125 }
126
127 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
129 let mut out = Vec::with_capacity(self.stash.len());
130 while let Some(m) = self.stash.pop_front() {
131 out.push(m);
132 }
133 out
134 }
135
136 pub fn stop_self(&mut self) {
138 self.stopping = true;
139 }
140
141 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
143 self.receive_timeout = d;
144 }
145
146 pub fn sender(&self) -> &Sender {
151 &self.current_sender
152 }
153
154 #[doc(hidden)]
156 pub fn sender_typed(&self) -> &Sender {
157 &self.current_sender
158 }
159
160 pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
165 if P::PHASE == self.phase {
166 Some(TypedContext { inner: self, _phase: PhantomData })
167 } else {
168 None
169 }
170 }
171
172 pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
174 self.phased::<Starting>()
175 }
176
177 pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
179 self.phased::<Running>()
180 }
181
182 pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
184 self.phased::<Stopping>()
185 }
186}
187
188pub trait PhaseMarker: sealed::Sealed {
192 const PHASE: LifecyclePhase;
193}
194
195pub struct Starting;
197pub struct Running;
199pub struct Stopping;
201
202mod sealed {
203 pub trait Sealed {}
204 impl Sealed for super::Starting {}
205 impl Sealed for super::Running {}
206 impl Sealed for super::Stopping {}
207}
208
209impl PhaseMarker for Starting {
210 const PHASE: LifecyclePhase = LifecyclePhase::Starting;
211}
212impl PhaseMarker for Running {
213 const PHASE: LifecyclePhase = LifecyclePhase::Running;
214}
215impl PhaseMarker for Stopping {
216 const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
217}
218
219pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
226 inner: &'a mut Context<A>,
227 _phase: PhantomData<P>,
228}
229
230impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
231 pub fn ctx(&self) -> &Context<A> {
232 self.inner
233 }
234 pub fn ctx_mut(&mut self) -> &mut Context<A> {
235 self.inner
236 }
237 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
238 &self.inner.self_ref
239 }
240}
241
242impl<'a, A: Actor> TypedContext<'a, A, Running> {
243 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
245 self.inner.set_receive_timeout(d);
246 }
247
248 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
250 self.inner.unstash_all()
251 }
252
253 pub fn stop_self(&mut self) {
256 self.inner.stop_self();
257 }
258}
259
260#[derive(Debug, thiserror::Error)]
261pub enum SpawnError {
262 #[error("child name `{0}` already taken")]
263 NameTaken(String),
264 #[error("actor system has terminated")]
265 SystemTerminated,
266}