Skip to main content

atomr_core/actor/
context.rs

1//! `Context<A>` — the actor's window into the system.
2//! (partial).
3
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::{Arc, Weak};
6use std::time::Duration;
7
8use std::marker::PhantomData;
9
10use super::actor_cell::{ChildEntry, SystemMsg};
11use super::actor_ref::{ActorRef, UntypedActorRef};
12use super::path::ActorPath;
13use super::props::Props;
14use super::scheduler::Scheduler;
15use super::sender::Sender;
16use super::traits::Actor;
17
18/// Public, opaque handle to the running [`super::ActorSystem`] that an actor
19/// can use to reach a small, curated subset of the system surface — currently
20/// just [`Self::scheduler`]. The handle holds a `Weak` reference so it does
21/// not keep the system alive; callers must check [`Self::is_alive`] (or
22/// receive `None` from [`Self::scheduler`]) before relying on it.
23///
24/// This type is the canonical way for binding layers (e.g. `pycore`) to
25/// register cancellable timers from inside `Actor::handle` without spawning
26/// detached `tokio::spawn` tasks. The internal `ActorSystemInner` type is
27/// crate-private, so this thin wrapper is the only stable public path.
28#[derive(Clone)]
29pub struct SystemHandle {
30    inner: Weak<super::actor_system::ActorSystemInner>,
31}
32
33impl SystemHandle {
34    pub(crate) fn new(inner: Weak<super::actor_system::ActorSystemInner>) -> Self {
35        Self { inner }
36    }
37
38    /// True if the underlying [`super::ActorSystem`] has not been dropped.
39    pub fn is_alive(&self) -> bool {
40        self.inner.strong_count() > 0
41    }
42
43    /// Borrow the system's [`Scheduler`]. Returns `None` if the system has
44    /// been dropped.
45    pub fn scheduler(&self) -> Option<Arc<dyn Scheduler>> {
46        self.inner.upgrade().map(|s| s.scheduler.clone())
47    }
48}
49
50/// Lifecycle phase exposed via [`Context::phase`]. Phase 1.C of
51/// `docs/full-port-plan.md` — runtime precursor to the phantom-typed
52/// `Context<A, Phase>` (kept additive so it doesn't break existing
53/// signatures). Stage-only APIs assert against this in debug builds.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55#[non_exhaustive]
56pub enum LifecyclePhase {
57    Starting,
58    Running,
59    Stopping,
60}
61
62/// Passed to every `Actor::handle` call.
63pub struct Context<A: Actor> {
64    pub(crate) self_ref: ActorRef<A::Msg>,
65    pub(crate) path: ActorPath,
66    pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
67    pub(crate) children: HashMap<String, ChildEntry>,
68    pub(crate) watching: HashSet<ActorPath>,
69    pub(crate) watched_by: HashSet<UntypedActorRef>,
70    pub(crate) stash: VecDeque<A::Msg>,
71    pub(crate) receive_timeout: Option<Duration>,
72    pub(crate) current_sender: Sender,
73    pub(crate) current_metadata: super::metadata::Metadata,
74    pub(crate) stopping: bool,
75    pub(crate) phase: LifecyclePhase,
76}
77
78impl<A: Actor> Context<A> {
79    pub(crate) fn new(
80        self_ref: ActorRef<A::Msg>,
81        path: ActorPath,
82        system: Weak<super::actor_system::ActorSystemInner>,
83    ) -> Self {
84        Self {
85            self_ref,
86            path,
87            system,
88            children: HashMap::new(),
89            watching: HashSet::new(),
90            watched_by: HashSet::new(),
91            stash: VecDeque::new(),
92            receive_timeout: None,
93            current_sender: Sender::None,
94            current_metadata: super::metadata::Metadata::new(),
95            stopping: false,
96            phase: LifecyclePhase::Starting,
97        }
98    }
99
100    /// Metadata (trace context + baggage) of the message currently being
101    /// processed (FR-10). Empty unless the sender attached it. Handlers can
102    /// read it to extend trace context onto outgoing sends.
103    pub fn metadata(&self) -> &super::metadata::Metadata {
104        &self.current_metadata
105    }
106
107    /// Current lifecycle phase. Phase 1.C marker — useful in
108    /// generic helpers that need to gate calls (e.g. `become`,
109    /// `unstash_all`) without taking a typed-`Context<A, P>`
110    /// parameter.
111    pub fn phase(&self) -> LifecyclePhase {
112        self.phase
113    }
114
115    pub fn self_ref(&self) -> &ActorRef<A::Msg> {
116        &self.self_ref
117    }
118
119    pub fn path(&self) -> &ActorPath {
120        &self.path
121    }
122
123    /// Opaque handle to the running [`super::ActorSystem`]. Useful in
124    /// binding layers that need to register cancellable timers via the
125    /// system [`Scheduler`] without spawning detached `tokio` tasks.
126    /// Holds a `Weak` reference; see [`SystemHandle`].
127    pub fn system_handle(&self) -> SystemHandle {
128        SystemHandle::new(self.system.clone())
129    }
130
131    /// Spawn a child actor under this context.
132    pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
133        if self.children.contains_key(name) {
134            return Err(SpawnError::NameTaken(name.into()));
135        }
136        let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
137        let child_path = self.path.child(name);
138        let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
139        if let Some(obs) = system.spawn_observer.read().as_ref() {
140            obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
141        }
142        self.children.insert(
143            name.to_string(),
144            ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
145        );
146        Ok(r)
147    }
148
149    /// Stop a specific child.
150    pub fn stop_child(&mut self, name: &str) {
151        if let Some(c) = self.children.get(name) {
152            let _ = c.system_tx.send(SystemMsg::Stop);
153        }
154    }
155
156    /// Watch another actor. The sender is notified with a `SystemMsg::Terminated`
157    /// when the watched actor stops.
158    pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
159        if self.watching.insert(target.path().clone()) {
160            let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
161        }
162    }
163
164    /// Stop watching.
165    pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
166        if self.watching.remove(target.path()) {
167            let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
168        }
169    }
170
171    /// Stash the currently-processed message for later unstash.
172    pub fn stash(&mut self, msg: A::Msg) {
173        self.stash.push_back(msg);
174    }
175
176    /// Put all stashed messages back at the front of the mailbox.
177    pub fn unstash_all(&mut self) -> Vec<A::Msg> {
178        let mut out = Vec::with_capacity(self.stash.len());
179        while let Some(m) = self.stash.pop_front() {
180            out.push(m);
181        }
182        out
183    }
184
185    /// Stop self.
186    pub fn stop_self(&mut self) {
187        self.stopping = true;
188    }
189
190    /// Set idle-receive timeout (like).
191    pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
192        self.receive_timeout = d;
193    }
194
195    /// Typed sender of the message currently being processed.
196    ///
197    /// Returns [`Sender::None`] if the sender slot was empty (the
198    /// analog of `Sender == NoSender`).
199    pub fn sender(&self) -> &Sender {
200        &self.current_sender
201    }
202
203    /// Backwards-compatible alias for [`Context::sender`].
204    #[doc(hidden)]
205    pub fn sender_typed(&self) -> &Sender {
206        &self.current_sender
207    }
208
209    /// Borrow this context as a phase-typed view. The phase parameter is a
210    /// phantom witness only — call sites typically use one of
211    /// [`Context::starting`], [`Context::running`], or
212    /// [`Context::stopping_view`] to get a view whose method surface
213    /// matches the phase.
214    pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
215        if P::PHASE == self.phase {
216            Some(TypedContext { inner: self, _phase: PhantomData })
217        } else {
218            None
219        }
220    }
221
222    /// Phase-typed view valid only while the actor is in `Starting`.
223    pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
224        self.phased::<Starting>()
225    }
226
227    /// Phase-typed view valid only while the actor is in `Running`.
228    pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
229        self.phased::<Running>()
230    }
231
232    /// Phase-typed view valid only while the actor is in `Stopping`.
233    pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
234        self.phased::<Stopping>()
235    }
236}
237
238/// Phase markers for [`TypedContext`]. Each marker type implements
239/// [`PhaseMarker`] with a const [`LifecyclePhase`] discriminant; the
240/// `PhasedContext` view inspects this at runtime to gate phase-only APIs.
241pub trait PhaseMarker: sealed::Sealed {
242    const PHASE: LifecyclePhase;
243}
244
245/// Marker for the `Starting` lifecycle phase.
246pub struct Starting;
247/// Marker for the `Running` lifecycle phase.
248pub struct Running;
249/// Marker for the `Stopping` lifecycle phase.
250pub struct Stopping;
251
252mod sealed {
253    pub trait Sealed {}
254    impl Sealed for super::Starting {}
255    impl Sealed for super::Running {}
256    impl Sealed for super::Stopping {}
257}
258
259impl PhaseMarker for Starting {
260    const PHASE: LifecyclePhase = LifecyclePhase::Starting;
261}
262impl PhaseMarker for Running {
263    const PHASE: LifecyclePhase = LifecyclePhase::Running;
264}
265impl PhaseMarker for Stopping {
266    const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
267}
268
269/// Phase-typed view over a [`Context`]. The phase parameter is a phantom
270/// witness; only methods valid in that phase are exposed.
271///
272/// `Starting`-only: nothing yet (constructor surface).
273/// `Running` exposes `become_`, `unstash_all`, `set_receive_timeout`.
274/// `Stopping` exposes only inspection (no new state changes).
275pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
276    inner: &'a mut Context<A>,
277    _phase: PhantomData<P>,
278}
279
280impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
281    pub fn ctx(&self) -> &Context<A> {
282        self.inner
283    }
284    pub fn ctx_mut(&mut self) -> &mut Context<A> {
285        self.inner
286    }
287    pub fn self_ref(&self) -> &ActorRef<A::Msg> {
288        &self.inner.self_ref
289    }
290}
291
292impl<'a, A: Actor> TypedContext<'a, A, Running> {
293    /// Set the receive-idle timeout. Only callable from a `Running` view.
294    pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
295        self.inner.set_receive_timeout(d);
296    }
297
298    /// Drain stashed messages. Only callable from a `Running` view.
299    pub fn unstash_all(&mut self) -> Vec<A::Msg> {
300        self.inner.unstash_all()
301    }
302
303    /// Begin self-stop. Transitions the underlying context to `Stopping`
304    /// once the cell observes the request.
305    pub fn stop_self(&mut self) {
306        self.inner.stop_self();
307    }
308}
309
310#[derive(Debug, thiserror::Error)]
311pub enum SpawnError {
312    #[error("child name `{0}` already taken")]
313    NameTaken(String),
314    #[error("actor system has terminated")]
315    SystemTerminated,
316}