Skip to main content

atomr_core/actor/
context.rs

1//! `Context<A>` — the actor's window into the system.
2//! (partial).
3
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::{Arc, Weak};
6use std::time::Duration;
7
8use std::marker::PhantomData;
9
10use super::actor_cell::{ChildEntry, SystemMsg};
11use super::actor_ref::{ActorRef, UntypedActorRef};
12use super::path::ActorPath;
13use super::props::Props;
14use super::scheduler::Scheduler;
15use super::sender::Sender;
16use super::traits::Actor;
17
18/// Public, opaque handle to the running [`super::ActorSystem`] that an actor
19/// can use to reach a small, curated subset of the system surface — currently
20/// just [`Self::scheduler`]. The handle holds a `Weak` reference so it does
21/// not keep the system alive; callers must check [`Self::is_alive`] (or
22/// receive `None` from [`Self::scheduler`]) before relying on it.
23///
24/// This type is the canonical way for binding layers (e.g. `pycore`) to
25/// register cancellable timers from inside `Actor::handle` without spawning
26/// detached `tokio::spawn` tasks. The internal `ActorSystemInner` type is
27/// crate-private, so this thin wrapper is the only stable public path.
28#[derive(Clone)]
29pub struct SystemHandle {
30    inner: Weak<super::actor_system::ActorSystemInner>,
31}
32
33impl SystemHandle {
34    pub(crate) fn new(inner: Weak<super::actor_system::ActorSystemInner>) -> Self {
35        Self { inner }
36    }
37
38    /// True if the underlying [`super::ActorSystem`] has not been dropped.
39    pub fn is_alive(&self) -> bool {
40        self.inner.strong_count() > 0
41    }
42
43    /// Borrow the system's [`Scheduler`]. Returns `None` if the system has
44    /// been dropped.
45    pub fn scheduler(&self) -> Option<Arc<dyn Scheduler>> {
46        self.inner.upgrade().map(|s| s.scheduler.clone())
47    }
48}
49
50/// Lifecycle phase exposed via [`Context::phase`]. Phase 1.C of
51/// `docs/full-port-plan.md` — runtime precursor to the phantom-typed
52/// `Context<A, Phase>` (kept additive so it doesn't break existing
53/// signatures). Stage-only APIs assert against this in debug builds.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55#[non_exhaustive]
56pub enum LifecyclePhase {
57    Starting,
58    Running,
59    Stopping,
60}
61
62/// Passed to every `Actor::handle` call.
63pub struct Context<A: Actor> {
64    pub(crate) self_ref: ActorRef<A::Msg>,
65    pub(crate) path: ActorPath,
66    pub(crate) system: Weak<super::actor_system::ActorSystemInner>,
67    pub(crate) children: HashMap<String, ChildEntry>,
68    pub(crate) watching: HashSet<ActorPath>,
69    pub(crate) watched_by: HashSet<UntypedActorRef>,
70    pub(crate) stash: VecDeque<A::Msg>,
71    pub(crate) receive_timeout: Option<Duration>,
72    pub(crate) current_sender: Sender,
73    pub(crate) stopping: bool,
74    pub(crate) phase: LifecyclePhase,
75}
76
77impl<A: Actor> Context<A> {
78    pub(crate) fn new(
79        self_ref: ActorRef<A::Msg>,
80        path: ActorPath,
81        system: Weak<super::actor_system::ActorSystemInner>,
82    ) -> Self {
83        Self {
84            self_ref,
85            path,
86            system,
87            children: HashMap::new(),
88            watching: HashSet::new(),
89            watched_by: HashSet::new(),
90            stash: VecDeque::new(),
91            receive_timeout: None,
92            current_sender: Sender::None,
93            stopping: false,
94            phase: LifecyclePhase::Starting,
95        }
96    }
97
98    /// Current lifecycle phase. Phase 1.C marker — useful in
99    /// generic helpers that need to gate calls (e.g. `become`,
100    /// `unstash_all`) without taking a typed-`Context<A, P>`
101    /// parameter.
102    pub fn phase(&self) -> LifecyclePhase {
103        self.phase
104    }
105
106    pub fn self_ref(&self) -> &ActorRef<A::Msg> {
107        &self.self_ref
108    }
109
110    pub fn path(&self) -> &ActorPath {
111        &self.path
112    }
113
114    /// Opaque handle to the running [`super::ActorSystem`]. Useful in
115    /// binding layers that need to register cancellable timers via the
116    /// system [`Scheduler`] without spawning detached `tokio` tasks.
117    /// Holds a `Weak` reference; see [`SystemHandle`].
118    pub fn system_handle(&self) -> SystemHandle {
119        SystemHandle::new(self.system.clone())
120    }
121
122    /// Spawn a child actor under this context.
123    pub fn spawn<B: Actor>(&mut self, props: Props<B>, name: &str) -> Result<ActorRef<B::Msg>, SpawnError> {
124        if self.children.contains_key(name) {
125            return Err(SpawnError::NameTaken(name.into()));
126        }
127        let system = self.system.upgrade().ok_or(SpawnError::SystemTerminated)?;
128        let child_path = self.path.child(name);
129        let r = super::actor_cell::spawn_cell::<B>(system.clone(), props, child_path.clone())?;
130        if let Some(obs) = system.spawn_observer.read().as_ref() {
131            obs.on_spawn(&child_path, Some(&self.path), std::any::type_name::<B>());
132        }
133        self.children.insert(
134            name.to_string(),
135            ChildEntry { path: child_path, untyped: r.as_untyped(), system_tx: r.system_sender() },
136        );
137        Ok(r)
138    }
139
140    /// Stop a specific child.
141    pub fn stop_child(&mut self, name: &str) {
142        if let Some(c) = self.children.get(name) {
143            let _ = c.system_tx.send(SystemMsg::Stop);
144        }
145    }
146
147    /// Watch another actor. The sender is notified with a `SystemMsg::Terminated`
148    /// when the watched actor stops.
149    pub fn watch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
150        if self.watching.insert(target.path().clone()) {
151            let _ = target.system_sender().send(SystemMsg::Watch(self.self_ref.as_untyped()));
152        }
153    }
154
155    /// Stop watching.
156    pub fn unwatch<M: Send + 'static>(&mut self, target: &ActorRef<M>) {
157        if self.watching.remove(target.path()) {
158            let _ = target.system_sender().send(SystemMsg::Unwatch(self.path.clone()));
159        }
160    }
161
162    /// Stash the currently-processed message for later unstash.
163    pub fn stash(&mut self, msg: A::Msg) {
164        self.stash.push_back(msg);
165    }
166
167    /// Put all stashed messages back at the front of the mailbox.
168    pub fn unstash_all(&mut self) -> Vec<A::Msg> {
169        let mut out = Vec::with_capacity(self.stash.len());
170        while let Some(m) = self.stash.pop_front() {
171            out.push(m);
172        }
173        out
174    }
175
176    /// Stop self.
177    pub fn stop_self(&mut self) {
178        self.stopping = true;
179    }
180
181    /// Set idle-receive timeout (like).
182    pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
183        self.receive_timeout = d;
184    }
185
186    /// Typed sender of the message currently being processed.
187    ///
188    /// Returns [`Sender::None`] if the sender slot was empty (the
189    /// analog of `Sender == NoSender`).
190    pub fn sender(&self) -> &Sender {
191        &self.current_sender
192    }
193
194    /// Backwards-compatible alias for [`Context::sender`].
195    #[doc(hidden)]
196    pub fn sender_typed(&self) -> &Sender {
197        &self.current_sender
198    }
199
200    /// Borrow this context as a phase-typed view. The phase parameter is a
201    /// phantom witness only — call sites typically use one of
202    /// [`Context::starting`], [`Context::running`], or
203    /// [`Context::stopping_view`] to get a view whose method surface
204    /// matches the phase.
205    pub fn phased<P: PhaseMarker>(&mut self) -> Option<TypedContext<'_, A, P>> {
206        if P::PHASE == self.phase {
207            Some(TypedContext { inner: self, _phase: PhantomData })
208        } else {
209            None
210        }
211    }
212
213    /// Phase-typed view valid only while the actor is in `Starting`.
214    pub fn starting(&mut self) -> Option<TypedContext<'_, A, Starting>> {
215        self.phased::<Starting>()
216    }
217
218    /// Phase-typed view valid only while the actor is in `Running`.
219    pub fn running(&mut self) -> Option<TypedContext<'_, A, Running>> {
220        self.phased::<Running>()
221    }
222
223    /// Phase-typed view valid only while the actor is in `Stopping`.
224    pub fn stopping_view(&mut self) -> Option<TypedContext<'_, A, Stopping>> {
225        self.phased::<Stopping>()
226    }
227}
228
229/// Phase markers for [`TypedContext`]. Each marker type implements
230/// [`PhaseMarker`] with a const [`LifecyclePhase`] discriminant; the
231/// `PhasedContext` view inspects this at runtime to gate phase-only APIs.
232pub trait PhaseMarker: sealed::Sealed {
233    const PHASE: LifecyclePhase;
234}
235
236/// Marker for the `Starting` lifecycle phase.
237pub struct Starting;
238/// Marker for the `Running` lifecycle phase.
239pub struct Running;
240/// Marker for the `Stopping` lifecycle phase.
241pub struct Stopping;
242
243mod sealed {
244    pub trait Sealed {}
245    impl Sealed for super::Starting {}
246    impl Sealed for super::Running {}
247    impl Sealed for super::Stopping {}
248}
249
250impl PhaseMarker for Starting {
251    const PHASE: LifecyclePhase = LifecyclePhase::Starting;
252}
253impl PhaseMarker for Running {
254    const PHASE: LifecyclePhase = LifecyclePhase::Running;
255}
256impl PhaseMarker for Stopping {
257    const PHASE: LifecyclePhase = LifecyclePhase::Stopping;
258}
259
260/// Phase-typed view over a [`Context`]. The phase parameter is a phantom
261/// witness; only methods valid in that phase are exposed.
262///
263/// `Starting`-only: nothing yet (constructor surface).
264/// `Running` exposes `become_`, `unstash_all`, `set_receive_timeout`.
265/// `Stopping` exposes only inspection (no new state changes).
266pub struct TypedContext<'a, A: Actor, P: PhaseMarker> {
267    inner: &'a mut Context<A>,
268    _phase: PhantomData<P>,
269}
270
271impl<'a, A: Actor, P: PhaseMarker> TypedContext<'a, A, P> {
272    pub fn ctx(&self) -> &Context<A> {
273        self.inner
274    }
275    pub fn ctx_mut(&mut self) -> &mut Context<A> {
276        self.inner
277    }
278    pub fn self_ref(&self) -> &ActorRef<A::Msg> {
279        &self.inner.self_ref
280    }
281}
282
283impl<'a, A: Actor> TypedContext<'a, A, Running> {
284    /// Set the receive-idle timeout. Only callable from a `Running` view.
285    pub fn set_receive_timeout(&mut self, d: Option<Duration>) {
286        self.inner.set_receive_timeout(d);
287    }
288
289    /// Drain stashed messages. Only callable from a `Running` view.
290    pub fn unstash_all(&mut self) -> Vec<A::Msg> {
291        self.inner.unstash_all()
292    }
293
294    /// Begin self-stop. Transitions the underlying context to `Stopping`
295    /// once the cell observes the request.
296    pub fn stop_self(&mut self) {
297        self.inner.stop_self();
298    }
299}
300
301#[derive(Debug, thiserror::Error)]
302pub enum SpawnError {
303    #[error("child name `{0}` already taken")]
304    NameTaken(String),
305    #[error("actor system has terminated")]
306    SystemTerminated,
307}