Skip to main content

atomr_core/actor/
actor_cell.rs

1//! `ActorCell` — the per-actor runtime.
2//! and its partial classes (`.Children.cs`, `.DeathWatch.cs`,
3//! `.DefaultMessages.cs`, `.FaultHandling.cs`, `.ReceiveTimeout.cs`).
4//!
5//! Responsibilities:
6//! * Own the user actor instance `A`
7//! * Poll mailbox (system priority over user)
8//! * Invoke lifecycle hooks (pre_start, post_stop, pre/post_restart)
9//! * Handle supervision decisions on panic
10//! * Track children, watchers, and receive timeout
11
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::sync::mpsc;
17
18use super::actor_ref::{ActorRef, UntypedActorRef};
19use super::context::Context;
20use super::path::ActorPath;
21use super::props::Props;
22use super::traits::{Actor, MessageEnvelope};
23use crate::supervision::{Directive, PanicPayload};
24
25/// Messages on the actor's system channel.
26#[derive(Debug)]
27pub enum SystemMsg {
28    Stop,
29    Restart(String),
30    Terminated(ActorPath),
31    Watch(UntypedActorRef),
32    Unwatch(ActorPath),
33    ReceiveTimeout,
34    ChildFailed { name: String, error: String },
35}
36
37/// Bookkeeping entry for a child on the parent's side.
38#[derive(Debug)]
39pub struct ChildEntry {
40    /// Full path of the child. Used by the parent's death-watch
41    /// cleanup to confirm the slot still belongs to the actor that is
42    /// finalizing (a fast respawn could have replaced it).
43    pub path: ActorPath,
44    #[allow(dead_code)]
45    pub untyped: UntypedActorRef,
46    pub system_tx: mpsc::UnboundedSender<SystemMsg>,
47}
48
49/// Marker used only for public type references.
50pub struct ActorCell<A: Actor> {
51    _marker: std::marker::PhantomData<A>,
52}
53
54pub(crate) fn spawn_cell<A: Actor>(
55    system: Arc<super::actor_system::ActorSystemInner>,
56    props: Props<A>,
57    path: ActorPath,
58) -> Result<ActorRef<A::Msg>, super::context::SpawnError> {
59    let (user_tx, user_rx) = mpsc::unbounded_channel::<MessageEnvelope<A::Msg>>();
60    let (sys_tx, sys_rx) = mpsc::unbounded_channel::<SystemMsg>();
61    let actor_ref = ActorRef::new(path.clone(), user_tx, sys_tx, Arc::downgrade(&system));
62
63    let cell_ref = actor_ref.clone();
64    let cell_system = Arc::downgrade(&system);
65    let props_clone = props.clone();
66    tokio::spawn(async move {
67        let mut actor = props_clone.new_actor();
68        let mut ctx = Context::<A>::new(cell_ref.clone(), path.clone(), cell_system);
69        run_cell(&mut actor, &mut ctx, user_rx, sys_rx, &props_clone).await;
70    });
71
72    Ok(actor_ref)
73}
74
75async fn run_cell<A: Actor>(
76    actor: &mut A,
77    ctx: &mut Context<A>,
78    mut user_rx: mpsc::UnboundedReceiver<MessageEnvelope<A::Msg>>,
79    mut sys_rx: mpsc::UnboundedReceiver<SystemMsg>,
80    props: &Props<A>,
81) {
82    ctx.phase = super::context::LifecyclePhase::Starting;
83    actor.pre_start(ctx).await;
84    ctx.phase = super::context::LifecyclePhase::Running;
85
86    let supervisor_ref = props.supervisor_strategy.clone();
87
88    // Sliding-window restart history. A new entry is appended on every
89    // `Directive::Restart` decision; entries older than the strategy's
90    // `within` are pruned before each check. When `max_retries` is set
91    // and the (post-prune) history length plus the imminent restart
92    // would exceed the cap, supervision escalates (currently: stop the
93    // actor — escalation to the parent will land with the parent-cell
94    // reorg in a follow-up).
95    let mut restart_history: VecDeque<Instant> = VecDeque::new();
96
97    loop {
98        while let Ok(sys) = sys_rx.try_recv() {
99            if handle_system(actor, ctx, sys).await {
100                finalize(actor, ctx).await;
101                return;
102            }
103        }
104        if ctx.stopping {
105            finalize(actor, ctx).await;
106            return;
107        }
108
109        let timeout = ctx.receive_timeout;
110        let next: Either<A> = tokio::select! {
111            biased;
112            sys = sys_rx.recv() => Either::<A>::Sys(sys),
113            user = user_rx.recv() => Either::<A>::User(user),
114            _ = maybe_sleep(timeout), if timeout.is_some() => Either::<A>::Timeout,
115        };
116
117        match next {
118            Either::Sys(Some(s)) => {
119                if handle_system(actor, ctx, s).await {
120                    finalize(actor, ctx).await;
121                    return;
122                }
123            }
124            Either::User(Some(env)) => {
125                ctx.current_sender = env.sender;
126                if let Err(panic_msg) = run_handle(actor, ctx, env.message).await {
127                    let directive =
128                        supervisor_ref.as_ref().map(|s| s.decide(&panic_msg)).unwrap_or(Directive::Restart);
129                    match directive {
130                        Directive::Resume => {}
131                        Directive::Restart => {
132                            // Sliding-window retry budget. Only enforced when
133                            // the strategy declares one; without `max_retries`
134                            // the cell behaves exactly as before.
135                            let strategy = supervisor_ref.as_ref();
136                            let max_retries = strategy.and_then(|s| s.max_retries);
137                            if let Some(max) = max_retries {
138                                let now = Instant::now();
139                                if let Some(within) = strategy.and_then(|s| s.within) {
140                                    while let Some(front) = restart_history.front() {
141                                        if now.duration_since(*front) > within {
142                                            restart_history.pop_front();
143                                        } else {
144                                            break;
145                                        }
146                                    }
147                                }
148                                if (restart_history.len() as u32) + 1 > max {
149                                    tracing::warn!(
150                                        path = %ctx.path,
151                                        retries = restart_history.len(),
152                                        max,
153                                        "supervisor max_retries exceeded; escalating (stop)"
154                                    );
155                                    finalize(actor, ctx).await;
156                                    return;
157                                }
158                                restart_history.push_back(now);
159                            }
160                            actor.pre_restart(ctx, &panic_msg).await;
161                            *actor = props.new_actor();
162                            actor.post_restart(ctx, &panic_msg).await;
163                        }
164                        Directive::Stop | Directive::Escalate => {
165                            finalize(actor, ctx).await;
166                            return;
167                        }
168                    }
169                }
170                ctx.current_sender = super::sender::Sender::None;
171            }
172            Either::Timeout => {
173                if handle_system(actor, ctx, SystemMsg::ReceiveTimeout).await {
174                    finalize(actor, ctx).await;
175                    return;
176                }
177            }
178            Either::Sys(None) | Either::User(None) => {
179                finalize(actor, ctx).await;
180                return;
181            }
182        }
183    }
184}
185
186enum Either<A: Actor> {
187    User(Option<MessageEnvelope<A::Msg>>),
188    Sys(Option<SystemMsg>),
189    Timeout,
190}
191
192async fn maybe_sleep(d: Option<Duration>) {
193    if let Some(d) = d {
194        tokio::time::sleep(d).await;
195    } else {
196        futures_util::future::pending::<()>().await;
197    }
198}
199
200async fn handle_system<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: SystemMsg) -> bool {
201    match msg {
202        SystemMsg::Stop => true,
203        SystemMsg::Restart(err) => {
204            actor.pre_restart(ctx, &err).await;
205            actor.post_restart(ctx, &err).await;
206            false
207        }
208        SystemMsg::Terminated(path) => {
209            tracing::debug!(self_path = %ctx.path, watched = %path, "watched actor terminated");
210            ctx.watching.remove(&path);
211            actor.on_terminated(ctx, &path).await;
212            false
213        }
214        SystemMsg::Watch(subscriber) => {
215            ctx.watched_by.insert(subscriber);
216            false
217        }
218        SystemMsg::Unwatch(path) => {
219            ctx.watched_by.retain(|w| w.path() != &path);
220            false
221        }
222        SystemMsg::ReceiveTimeout => false,
223        SystemMsg::ChildFailed { name, error } => {
224            tracing::warn!(path = %ctx.path, child = %name, "child failed: {error}");
225            false
226        }
227    }
228}
229
230async fn run_handle<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: A::Msg) -> Result<(), String> {
231    use futures_util::FutureExt;
232    let fut = actor.handle(ctx, msg);
233    match std::panic::AssertUnwindSafe(fut).catch_unwind().await {
234        Ok(()) => Ok(()),
235        Err(p) => {
236            let s = panic_payload_to_string(p);
237            tracing::error!(path = %ctx.path, "handle panic: {s}");
238            Err(s)
239        }
240    }
241}
242
243fn panic_payload_to_string(p: Box<dyn std::any::Any + Send>) -> String {
244    if let Some(payload) = p.downcast_ref::<PanicPayload>() {
245        payload.to_wire()
246    } else if let Some(s) = p.downcast_ref::<&str>() {
247        s.to_string()
248    } else if let Some(s) = p.downcast_ref::<String>() {
249        s.clone()
250    } else {
251        "actor panic".to_string()
252    }
253}
254
255async fn finalize<A: Actor>(actor: &mut A, ctx: &mut Context<A>) {
256    ctx.phase = super::context::LifecyclePhase::Stopping;
257    actor.post_stop(ctx).await;
258    for (_, child) in std::mem::take(&mut ctx.children) {
259        let _ = child.system_tx.send(SystemMsg::Stop);
260    }
261    if let Some(system) = ctx.system.upgrade() {
262        // Free the user_guardian slot for this child name once it has
263        // fully stopped (post_stop returned). Done *before* watcher
264        // notifications so any actor woken by `Terminated(self.path)`
265        // can immediately call `actor_of(name)` and succeed. Child names
266        // are unique among *currently-alive* siblings, not forever.
267        if ctx.path.elements.len() == 2 && ctx.path.elements[0].as_str() == "user" {
268            let name = ctx.path.elements[1].as_str();
269            let mut guardian = system.user_guardian.lock();
270            // Path-guarded removal: only erase the slot if it still
271            // points at *this* actor. Defends against a (currently
272            // forbidden) fast respawn that won the lock first.
273            if guardian.get(name).is_some_and(|c| c.path == ctx.path) {
274                guardian.remove(name);
275            }
276        }
277        if let Some(obs) = system.spawn_observer.read().as_ref() {
278            obs.on_stop(&ctx.path);
279        }
280    }
281    // Notify watchers *after* the user_guardian slot is freed, so a
282    // watcher that immediately re-spawns the same name on `Terminated`
283    // is guaranteed not to race the cleanup.
284    for w in ctx.watched_by.drain().collect::<Vec<_>>() {
285        w.notify_watchers(ctx.path.clone());
286    }
287}