Skip to main content

atomr_core/actor/
context.rs

1//! `Context<A>` — the actor's window into the system.
2//! akka.net: `Actor/IActorContext.cs`, `ActorCell.cs` (partial).
3
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::Weak;
6use std::time::Duration;
7
8use std::marker::PhantomData;
9
10use super::actor_cell::{ChildEntry, SystemMsg};
11use super::actor_ref::{ActorRef, UntypedActorRef};
12use super::path::ActorPath;
13use super::props::Props;
14use super::sender::Sender;
15use super::traits::Actor;
16
17/// Lifecycle phase exposed via [`Context::phase`]. Phase 1.C of
18/// `docs/full-port-plan.md` — runtime precursor to the phantom-typed
19/// `Context<A, Phase>` (kept additive so it doesn't break existing
20/// signatures). Stage-only APIs assert against this in debug builds.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[non_exhaustive]
23pub enum LifecyclePhase {
24    Starting,
25    Running,
26    Stopping,
27}
28
29/// Passed to every `Actor::handle` call.
30pub struct Context<A: Actor> {
31    pub(crate) self_ref: ActorRef<A::Msg>,
32    pub(crate) path: ActorPath,
33    pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
34    pub(crate) children: HashMap<String, ChildEntry>,
35    pub(crate) watching: HashSet<ActorPath>,
36    pub(crate) watched_by: HashSet<UntypedActorRef>,
37    pub(crate) stash: VecDeque<A::Msg>,
38    pub(crate) receive_timeout: Option<Duration>,
39    pub(crate) current_sender: Sender,
40    pub(crate) stopping: bool,
41    pub(crate) phase: LifecyclePhase,
42}
43
44impl<A: Actor> Context<A> {
45    pub(crate) fn new(
46        self_ref: ActorRef<A::Msg>,
47        path: ActorPath,
48        system: Weak<super::actor_system::ActorSystemInner>,
49    ) -> Self {
50        Self {
51            self_ref,
52            path,
53            system,
54            children: HashMap::new(),
55            watching: HashSet::new(),
56            watched_by: HashSet::new(),
57            stash: VecDeque::new(),
58            receive_timeout: None,
59            current_sender: Sender::None,
60            stopping: false,
61            phase: LifecyclePhase::Starting,
62        }
63    }
64
65    /// Current lifecycle phase. Phase 1.C marker — useful in
66    /// generic helpers that need to gate calls (e.g. `become`,
67    /// `unstash_all`) without taking a typed-`Context<A, P>`
68    /// parameter.
69    pub fn phase(&self) -> LifecyclePhase {
70        self.phase
71    }
72
73    pub fn self_ref(&self) -> &ActorRef<A::Msg> {
74        &self.self_ref
75    }
76
77    pub fn path(&self) -> &ActorPath {
78        &self.path
79    }
80
81    /// Spawn a child actor under this context.
82    pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
83        if self.children.contains_key(name) {
84            return Err(SpawnError::NameTaken(name.into()));
85        }
86        let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
87        let child_path = self.path.child(name);
88        let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
89        if let Some(obs) = system.spawn_observer.read().as_ref() {
90            obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
91        }
92        self.children.insert(
93            name.to_string(),
94            ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
95        );
96        Ok(r)
97    }
98
99    /// Stop a specific child.
100    pub fn stop_child(&mut self, name: &str) {
101        if let Some(c) = self.children.get(name) {
102            let _ = c.system_tx.send(SystemMsg::Stop);
103        }
104    }
105
106    /// Watch another actor. The sender is notified with a `SystemMsg::Terminated`
107    /// when the watched actor stops. akka.net: `Context.Watch`.
108    pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
109        if self.watching.insert(target.path().clone()) {
110            let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
111        }
112    }
113
114    /// Stop watching.
115    pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
116        if self.watching.remove(target.path()) {
117            let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
118        }
119    }
120
121    /// Stash the currently-processed message for later unstash.
122    /// akka.net: `IStash.Stash()`.
123    pub fn stash(&mut self, msg: A::Msg) {
124        self.stash.push_back(msg);
125    }
126
127    /// Put all stashed messages back at the front of the mailbox.
128    pub fn unstash_all(&mut self) -> Vec<A::Msg> {
129        let mut out = Vec::with_capacity(self.stash.len());
130        while let Some(m) = self.stash.pop_front() {
131            out.push(m);
132        }
133        out
134    }
135
136    /// Stop self. akka.net: `Context.Stop(Self)`.
137    pub fn stop_self(&mut self) {
138        self.stopping = true;
139    }
140
141    /// Set idle-receive timeout (like akka.net `SetReceiveTimeout`).
142    pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
143        self.receive_timeout = d;
144    }
145
146    /// Typed sender of the message currently being processed.
147    ///
148    /// Returns [`Sender::None`] if the sender slot was empty (the
149    /// akka.net analog of `Sender == NoSender`).
150    pub fn sender(&self) -> &Sender {
151        &self.current_sender
152    }
153
154    /// Backwards-compatible alias for [`Context::sender`].
155    #[doc(hidden)]
156    pub fn sender_typed(&self) -> &Sender {
157        &self.current_sender
158    }
159
160    /// Borrow this context as a phase-typed view. The phase parameter is a
161    /// phantom witness only — call sites typically use one of
162    /// [`Context::starting`], [`Context::running`], or [`Context::stopping`]
163    /// to get a view whose method surface matches the phase.
164    pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
165        if P::PHASE == self.phase {
166            Some(TypedContext { inner: self, _phase: PhantomData })
167        } else {
168            None
169        }
170    }
171
172    /// Phase-typed view valid only while the actor is in `Starting`.
173    pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
174        self.phased::<Starting>()
175    }
176
177    /// Phase-typed view valid only while the actor is in `Running`.
178    pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
179        self.phased::<Running>()
180    }
181
182    /// Phase-typed view valid only while the actor is in `Stopping`.
183    pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
184        self.phased::<Stopping>()
185    }
186}
187
188/// Phase markers for [`TypedContext`]. Each marker type implements
189/// [`PhaseMarker`] with a const [`LifecyclePhase`] discriminant; the
190/// `PhasedContext` view inspects this at runtime to gate phase-only APIs.
191pub trait PhaseMarker: sealed::Sealed {
192    const PHASE: LifecyclePhase;
193}
194
195/// Marker for the `Starting` lifecycle phase.
196pub struct Starting;
197/// Marker for the `Running` lifecycle phase.
198pub struct Running;
199/// Marker for the `Stopping` lifecycle phase.
200pub struct Stopping;
201
202mod sealed {
203    pub trait Sealed {}
204    impl Sealed for super::Starting {}
205    impl Sealed for super::Running {}
206    impl Sealed for super::Stopping {}
207}
208
209impl PhaseMarker for Starting {
210    const PHASE: LifecyclePhase = LifecyclePhase::Starting;
211}
212impl PhaseMarker for Running {
213    const PHASE: LifecyclePhase = LifecyclePhase::Running;
214}
215impl PhaseMarker for Stopping {
216    const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
217}
218
219/// Phase-typed view over a [`Context`]. The phase parameter is a phantom
220/// witness; only methods valid in that phase are exposed.
221///
222/// `Starting`-only: nothing yet (constructor surface).
223/// `Running` exposes `become_`, `unstash_all`, `set_receive_timeout`.
224/// `Stopping` exposes only inspection (no new state changes).
225pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
226    inner: &'a mut Context<A>,
227    _phase: PhantomData<P>,
228}
229
230impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
231    pub fn ctx(&self) -> &Context<A> {
232        self.inner
233    }
234    pub fn ctx_mut(&mut self) -> &mut Context<A> {
235        self.inner
236    }
237    pub fn self_ref(&self) -> &ActorRef<A::Msg> {
238        &self.inner.self_ref
239    }
240}
241
242impl<'a, A: Actor> TypedContext<'a, A, Running> {
243    /// Set the receive-idle timeout. Only callable from a `Running` view.
244    pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
245        self.inner.set_receive_timeout(d);
246    }
247
248    /// Drain stashed messages. Only callable from a `Running` view.
249    pub fn unstash_all(&mut self) -> Vec<A::Msg> {
250        self.inner.unstash_all()
251    }
252
253    /// Begin self-stop. Transitions the underlying context to `Stopping`
254    /// once the cell observes the request.
255    pub fn stop_self(&mut self) {
256        self.inner.stop_self();
257    }
258}
259
260#[derive(Debug, thiserror::Error)]
261pub enum SpawnError {
262    #[error("child name `{0}` already taken")]
263    NameTaken(String),
264    #[error("actor system has terminated")]
265    SystemTerminated,
266}