Skip to main content

atomr_core/actor/
actor_cell.rs

1//! `ActorCell` — the per-actor runtime.
2//! and its partial classes (`.Children.cs`, `.DeathWatch.cs`,
3//! `.DefaultMessages.cs`, `.FaultHandling.cs`, `.ReceiveTimeout.cs`).
4//!
5//! Responsibilities:
6//! * Own the user actor instance `A`
7//! * Poll mailbox (system priority over user)
8//! * Invoke lifecycle hooks (pre_start, post_stop, pre/post_restart)
9//! * Handle supervision decisions on panic
10//! * Track children, watchers, and receive timeout
11
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::sync::mpsc;
17
18use super::actor_ref::{ActorRef, UntypedActorRef};
19use super::context::Context;
20use super::path::ActorPath;
21use super::props::Props;
22use super::traits::{Actor, MessageEnvelope};
23use crate::supervision::{Directive, PanicPayload};
24
25/// Messages on the actor's system channel.
26#[derive(Debug)]
27pub enum SystemMsg {
28    Stop,
29    Restart(String),
30    Terminated(ActorPath),
31    Watch(UntypedActorRef),
32    Unwatch(ActorPath),
33    ReceiveTimeout,
34    ChildFailed {
35        name: String,
36        error: String,
37    },
38    /// A graded supervisor directive pushed into a running actor (FR-6).
39    /// Delivered out-of-band on the system channel so a circuit breaker can
40    /// throttle / suspend / resume a child without restarting it.
41    Directive(Directive),
42}
43
44/// Bookkeeping entry for a child on the parent's side.
45#[derive(Debug)]
46pub struct ChildEntry {
47    /// Full path of the child. Used by the parent's death-watch
48    /// cleanup to confirm the slot still belongs to the actor that is
49    /// finalizing (a fast respawn could have replaced it).
50    pub path: ActorPath,
51    #[allow(dead_code)]
52    pub untyped: UntypedActorRef,
53    pub system_tx: mpsc::UnboundedSender<SystemMsg>,
54}
55
56/// Marker used only for public type references.
57pub struct ActorCell<A: Actor> {
58    _marker: std::marker::PhantomData<A>,
59}
60
61pub(crate) fn spawn_cell<A: Actor>(
62    system: Arc<super::actor_system::ActorSystemInner>,
63    props: Props<A>,
64    path: ActorPath,
65) -> Result<ActorRef<A::Msg>, super::context::SpawnError> {
66    let (user_tx, user_rx) = mpsc::unbounded_channel::<MessageEnvelope<A::Msg>>();
67    let (sys_tx, sys_rx) = mpsc::unbounded_channel::<SystemMsg>();
68    let actor_ref = ActorRef::new(path.clone(), user_tx, sys_tx, Arc::downgrade(&system));
69
70    let cell_ref = actor_ref.clone();
71    let cell_system = Arc::downgrade(&system);
72    let props_clone = props.clone();
73    tokio::spawn(async move {
74        let mut actor = props_clone.new_actor();
75        let mut ctx = Context::<A>::new(cell_ref.clone(), path.clone(), cell_system);
76        run_cell(&mut actor, &mut ctx, user_rx, sys_rx, &props_clone).await;
77    });
78
79    Ok(actor_ref)
80}
81
82async fn run_cell<A: Actor>(
83    actor: &mut A,
84    ctx: &mut Context<A>,
85    mut user_rx: mpsc::UnboundedReceiver<MessageEnvelope<A::Msg>>,
86    mut sys_rx: mpsc::UnboundedReceiver<SystemMsg>,
87    props: &Props<A>,
88) {
89    ctx.phase = super::context::LifecyclePhase::Starting;
90    actor.pre_start(ctx).await;
91    ctx.phase = super::context::LifecyclePhase::Running;
92
93    let supervisor_ref = props.supervisor_strategy.clone();
94
95    // Sliding-window restart history. A new entry is appended on every
96    // `Directive::Restart` decision; entries older than the strategy's
97    // `within` are pruned before each check. When `max_retries` is set
98    // and the (post-prune) history length plus the imminent restart
99    // would exceed the cap, supervision escalates (currently: stop the
100    // actor — escalation to the parent will land with the parent-cell
101    // reorg in a follow-up).
102    let mut restart_history: VecDeque<Instant> = VecDeque::new();
103
104    loop {
105        while let Ok(sys) = sys_rx.try_recv() {
106            if handle_system(actor, ctx, sys).await {
107                finalize(actor, ctx).await;
108                return;
109            }
110        }
111        if ctx.stopping {
112            finalize(actor, ctx).await;
113            return;
114        }
115
116        let timeout = ctx.receive_timeout;
117        let next: Either<A> = tokio::select! {
118            biased;
119            sys = sys_rx.recv() => Either::<A>::Sys(sys),
120            user = user_rx.recv() => Either::<A>::User(user),
121            _ = maybe_sleep(timeout), if timeout.is_some() => Either::<A>::Timeout,
122        };
123
124        match next {
125            Either::Sys(Some(s)) => {
126                if handle_system(actor, ctx, s).await {
127                    finalize(actor, ctx).await;
128                    return;
129                }
130            }
131            Either::User(Some(env)) => {
132                ctx.current_sender = env.sender;
133                ctx.current_metadata = env.metadata;
134                // FR-10: open an interceptor span for the lifetime of `handle`.
135                let _span_guard = props.interceptor.as_ref().map(|i| i.before_handle(&ctx.current_metadata));
136                let handle_result = run_handle(actor, ctx, env.message).await;
137                drop(_span_guard);
138                if let Err(panic_msg) = handle_result {
139                    let directive =
140                        supervisor_ref.as_ref().map(|s| s.decide(&panic_msg)).unwrap_or(Directive::Restart);
141                    match directive {
142                        Directive::Resume => {}
143                        // FR-6: graded directives degrade a running child without
144                        // restart — push the mode change in via `on_directive`.
145                        Directive::Throttle { .. } | Directive::Suspend { .. } | Directive::ResumeFrom(_) => {
146                            actor.on_directive(ctx, &directive).await;
147                        }
148                        Directive::Restart => {
149                            // Sliding-window retry budget. Only enforced when
150                            // the strategy declares one; without `max_retries`
151                            // the cell behaves exactly as before.
152                            let strategy = supervisor_ref.as_ref();
153                            let max_retries = strategy.and_then(|s| s.max_retries);
154                            if let Some(max) = max_retries {
155                                let now = Instant::now();
156                                if let Some(within) = strategy.and_then(|s| s.within) {
157                                    while let Some(front) = restart_history.front() {
158                                        if now.duration_since(*front) > within {
159                                            restart_history.pop_front();
160                                        } else {
161                                            break;
162                                        }
163                                    }
164                                }
165                                if (restart_history.len() as u32) + 1 > max {
166                                    tracing::warn!(
167                                        path = %ctx.path,
168                                        retries = restart_history.len(),
169                                        max,
170                                        "supervisor max_retries exceeded; escalating (stop)"
171                                    );
172                                    finalize(actor, ctx).await;
173                                    return;
174                                }
175                                restart_history.push_back(now);
176                            }
177                            actor.pre_restart(ctx, &panic_msg).await;
178                            *actor = props.new_actor();
179                            actor.post_restart(ctx, &panic_msg).await;
180                        }
181                        Directive::Stop | Directive::Escalate => {
182                            finalize(actor, ctx).await;
183                            return;
184                        }
185                    }
186                }
187                ctx.current_sender = super::sender::Sender::None;
188                ctx.current_metadata = super::metadata::Metadata::new();
189            }
190            Either::Timeout => {
191                if handle_system(actor, ctx, SystemMsg::ReceiveTimeout).await {
192                    finalize(actor, ctx).await;
193                    return;
194                }
195            }
196            Either::Sys(None) | Either::User(None) => {
197                finalize(actor, ctx).await;
198                return;
199            }
200        }
201    }
202}
203
204enum Either<A: Actor> {
205    User(Option<MessageEnvelope<A::Msg>>),
206    Sys(Option<SystemMsg>),
207    Timeout,
208}
209
210async fn maybe_sleep(d: Option<Duration>) {
211    if let Some(d) = d {
212        tokio::time::sleep(d).await;
213    } else {
214        futures_util::future::pending::<()>().await;
215    }
216}
217
218async fn handle_system<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: SystemMsg) -> bool {
219    match msg {
220        SystemMsg::Stop => true,
221        SystemMsg::Restart(err) => {
222            actor.pre_restart(ctx, &err).await;
223            actor.post_restart(ctx, &err).await;
224            false
225        }
226        SystemMsg::Terminated(path) => {
227            tracing::debug!(self_path = %ctx.path, watched = %path, "watched actor terminated");
228            ctx.watching.remove(&path);
229            actor.on_terminated(ctx, &path).await;
230            false
231        }
232        SystemMsg::Watch(subscriber) => {
233            ctx.watched_by.insert(subscriber);
234            false
235        }
236        SystemMsg::Unwatch(path) => {
237            ctx.watched_by.retain(|w| w.path() != &path);
238            false
239        }
240        SystemMsg::ReceiveTimeout => false,
241        SystemMsg::ChildFailed { name, error } => {
242            tracing::warn!(path = %ctx.path, child = %name, "child failed: {error}");
243            false
244        }
245        SystemMsg::Directive(directive) => {
246            actor.on_directive(ctx, &directive).await;
247            false
248        }
249    }
250}
251
252async fn run_handle<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: A::Msg) -> Result<(), String> {
253    use futures_util::FutureExt;
254    let fut = actor.handle(ctx, msg);
255    match std::panic::AssertUnwindSafe(fut).catch_unwind().await {
256        Ok(()) => Ok(()),
257        Err(p) => {
258            let s = panic_payload_to_string(p);
259            tracing::error!(path = %ctx.path, "handle panic: {s}");
260            Err(s)
261        }
262    }
263}
264
265fn panic_payload_to_string(p: Box<dyn std::any::Any + Send>) -> String {
266    if let Some(payload) = p.downcast_ref::<PanicPayload>() {
267        payload.to_wire()
268    } else if let Some(s) = p.downcast_ref::<&str>() {
269        s.to_string()
270    } else if let Some(s) = p.downcast_ref::<String>() {
271        s.clone()
272    } else {
273        "actor panic".to_string()
274    }
275}
276
277async fn finalize<A: Actor>(actor: &mut A, ctx: &mut Context<A>) {
278    ctx.phase = super::context::LifecyclePhase::Stopping;
279    actor.post_stop(ctx).await;
280    for (_, child) in std::mem::take(&mut ctx.children) {
281        let _ = child.system_tx.send(SystemMsg::Stop);
282    }
283    if let Some(system) = ctx.system.upgrade() {
284        // Free the user_guardian slot for this child name once it has
285        // fully stopped (post_stop returned). Done *before* watcher
286        // notifications so any actor woken by `Terminated(self.path)`
287        // can immediately call `actor_of(name)` and succeed. Child names
288        // are unique among *currently-alive* siblings, not forever.
289        if ctx.path.elements.len() == 2 && ctx.path.elements[0].as_str() == "user" {
290            let name = ctx.path.elements[1].as_str();
291            let mut guardian = system.user_guardian.lock();
292            // Path-guarded removal: only erase the slot if it still
293            // points at *this* actor. Defends against a (currently
294            // forbidden) fast respawn that won the lock first.
295            if guardian.get(name).is_some_and(|c| c.path == ctx.path) {
296                guardian.remove(name);
297            }
298        }
299        if let Some(obs) = system.spawn_observer.read().as_ref() {
300            obs.on_stop(&ctx.path);
301        }
302    }
303    // Notify watchers *after* the user_guardian slot is freed, so a
304    // watcher that immediately re-spawns the same name on `Terminated`
305    // is guaranteed not to race the cleanup.
306    for w in ctx.watched_by.drain().collect::<Vec<_>>() {
307        w.notify_watchers(ctx.path.clone());
308    }
309}