atomr_core/actor/
context.rs1use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::{Arc, Weak};
6use std::time::Duration;
7
8use std::marker::PhantomData;
9
10use super::actor_cell::{ChildEntry, SystemMsg};
11use super::actor_ref::{ActorRef, UntypedActorRef};
12use super::path::ActorPath;
13use super::props::Props;
14use super::scheduler::Scheduler;
15use super::sender::Sender;
16use super::traits::Actor;
17
18#[derive(Clone)]
29pub struct SystemHandle {
30 inner: Weak<super::actor_system::ActorSystemInner>,
31}
32
33impl SystemHandle {
34 pub(crate) fn new(inner: Weak<super::actor_system::ActorSystemInner>) -> Self {
35 Self { inner }
36 }
37
38 pub fn is_alive(&self) -> bool {
40 self.inner.strong_count() > 0
41 }
42
43 pub fn scheduler(&self) -> Option<Arc<dyn Scheduler>> {
46 self.inner.upgrade().map(|s| s.scheduler.clone())
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55#[non_exhaustive]
56pub enum LifecyclePhase {
57 Starting,
58 Running,
59 Stopping,
60}
61
62pub struct Context<A: Actor> {
64 pub(crate) self_ref: ActorRef<A::Msg>,
65 pub(crate) path: ActorPath,
66 pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
67 pub(crate) children: HashMap<String, ChildEntry>,
68 pub(crate) watching: HashSet<ActorPath>,
69 pub(crate) watched_by: HashSet<UntypedActorRef>,
70 pub(crate) stash: VecDeque<A::Msg>,
71 pub(crate) receive_timeout: Option<Duration>,
72 pub(crate) current_sender: Sender,
73 pub(crate) stopping: bool,
74 pub(crate) phase: LifecyclePhase,
75}
76
77impl<A: Actor> Context<A> {
78 pub(crate) fn new(
79 self_ref: ActorRef<A::Msg>,
80 path: ActorPath,
81 system: Weak<super::actor_system::ActorSystemInner>,
82 ) -> Self {
83 Self {
84 self_ref,
85 path,
86 system,
87 children: HashMap::new(),
88 watching: HashSet::new(),
89 watched_by: HashSet::new(),
90 stash: VecDeque::new(),
91 receive_timeout: None,
92 current_sender: Sender::None,
93 stopping: false,
94 phase: LifecyclePhase::Starting,
95 }
96 }
97
98 pub fn phase(&self) -> LifecyclePhase {
103 self.phase
104 }
105
106 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
107 &self.self_ref
108 }
109
110 pub fn path(&self) -> &ActorPath {
111 &self.path
112 }
113
114 pub fn system_handle(&self) -> SystemHandle {
119 SystemHandle::new(self.system.clone())
120 }
121
122 pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
124 if self.children.contains_key(name) {
125 return Err(SpawnError::NameTaken(name.into()));
126 }
127 let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
128 let child_path = self.path.child(name);
129 let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
130 if let Some(obs) = system.spawn_observer.read().as_ref() {
131 obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
132 }
133 self.children.insert(
134 name.to_string(),
135 ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
136 );
137 Ok(r)
138 }
139
140 pub fn stop_child(&mut self, name: &str) {
142 if let Some(c) = self.children.get(name) {
143 let _ = c.system_tx.send(SystemMsg::Stop);
144 }
145 }
146
147 pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
150 if self.watching.insert(target.path().clone()) {
151 let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
152 }
153 }
154
155 pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
157 if self.watching.remove(target.path()) {
158 let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
159 }
160 }
161
162 pub fn stash(&mut self, msg: A::Msg) {
164 self.stash.push_back(msg);
165 }
166
167 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
169 let mut out = Vec::with_capacity(self.stash.len());
170 while let Some(m) = self.stash.pop_front() {
171 out.push(m);
172 }
173 out
174 }
175
176 pub fn stop_self(&mut self) {
178 self.stopping = true;
179 }
180
181 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
183 self.receive_timeout = d;
184 }
185
186 pub fn sender(&self) -> &Sender {
191 &self.current_sender
192 }
193
194 #[doc(hidden)]
196 pub fn sender_typed(&self) -> &Sender {
197 &self.current_sender
198 }
199
200 pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
206 if P::PHASE == self.phase {
207 Some(TypedContext { inner: self, _phase: PhantomData })
208 } else {
209 None
210 }
211 }
212
213 pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
215 self.phased::<Starting>()
216 }
217
218 pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
220 self.phased::<Running>()
221 }
222
223 pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
225 self.phased::<Stopping>()
226 }
227}
228
229pub trait PhaseMarker: sealed::Sealed {
233 const PHASE: LifecyclePhase;
234}
235
236pub struct Starting;
238pub struct Running;
240pub struct Stopping;
242
243mod sealed {
244 pub trait Sealed {}
245 impl Sealed for super::Starting {}
246 impl Sealed for super::Running {}
247 impl Sealed for super::Stopping {}
248}
249
250impl PhaseMarker for Starting {
251 const PHASE: LifecyclePhase = LifecyclePhase::Starting;
252}
253impl PhaseMarker for Running {
254 const PHASE: LifecyclePhase = LifecyclePhase::Running;
255}
256impl PhaseMarker for Stopping {
257 const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
258}
259
260pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
267 inner: &'a mut Context<A>,
268 _phase: PhantomData<P>,
269}
270
271impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
272 pub fn ctx(&self) -> &Context<A> {
273 self.inner
274 }
275 pub fn ctx_mut(&mut self) -> &mut Context<A> {
276 self.inner
277 }
278 pub fn self_ref(&self) -> &ActorRef<A::Msg> {
279 &self.inner.self_ref
280 }
281}
282
283impl<'a, A: Actor> TypedContext<'a, A, Running> {
284 pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
286 self.inner.set_receive_timeout(d);
287 }
288
289 pub fn unstash_all(&mut self) -> Vec<A::Msg> {
291 self.inner.unstash_all()
292 }
293
294 pub fn stop_self(&mut self) {
297 self.inner.stop_self();
298 }
299}
300
301#[derive(Debug, thiserror::Error)]
302pub enum SpawnError {
303 #[error("child name `{0}` already taken")]
304 NameTaken(String),
305 #[error("actor system has terminated")]
306 SystemTerminated,
307}