Skip to main content

atomr_core/actor/
actor_cell.rs

1//! `ActorCell` — the per-actor runtime.
2//! and its partial classes (`.Children.cs`, `.DeathWatch.cs`,
3//! `.DefaultMessages.cs`, `.FaultHandling.cs`, `.ReceiveTimeout.cs`).
4//!
5//! Responsibilities:
6//! * Own the user actor instance `A`
7//! * Poll mailbox (system priority over user)
8//! * Invoke lifecycle hooks (pre_start, post_stop, pre/post_restart)
9//! * Handle supervision decisions on panic
10//! * Track children, watchers, and receive timeout
11
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::sync::mpsc;
16
17use super::actor_ref::{ActorRef, UntypedActorRef};
18use super::context::Context;
19use super::path::ActorPath;
20use super::props::Props;
21use super::traits::{Actor, MessageEnvelope};
22use crate::supervision::Directive;
23
24/// Messages on the actor's system channel.
25#[derive(Debug)]
26pub enum SystemMsg {
27    Stop,
28    Restart(String),
29    Terminated(ActorPath),
30    Watch(UntypedActorRef),
31    Unwatch(ActorPath),
32    ReceiveTimeout,
33    ChildFailed { name: String, error: String },
34}
35
36/// Bookkeeping entry for a child on the parent's side.
37#[derive(Debug)]
38pub struct ChildEntry {
39    /// Reserved for future child introspection APIs.
40    #[allow(dead_code)]
41    pub path: ActorPath,
42    #[allow(dead_code)]
43    pub untyped: UntypedActorRef,
44    pub system_tx: mpsc::UnboundedSender<SystemMsg>,
45}
46
47/// Marker used only for public type references.
48pub struct ActorCell<A: Actor> {
49    _marker: std::marker::PhantomData<A>,
50}
51
52pub(crate) fn spawn_cell<A: Actor>(
53    system: Arc<super::actor_system::ActorSystemInner>,
54    props: Props<A>,
55    path: ActorPath,
56) -> Result<ActorRef<A::Msg>, super::context::SpawnError> {
57    let (user_tx, user_rx) = mpsc::unbounded_channel::<MessageEnvelope<A::Msg>>();
58    let (sys_tx, sys_rx) = mpsc::unbounded_channel::<SystemMsg>();
59    let actor_ref = ActorRef::new(path.clone(), user_tx, sys_tx, Arc::downgrade(&system));
60
61    let cell_ref = actor_ref.clone();
62    let cell_system = Arc::downgrade(&system);
63    let props_clone = props.clone();
64    tokio::spawn(async move {
65        let mut actor = props_clone.new_actor();
66        let mut ctx = Context::<A>::new(cell_ref.clone(), path.clone(), cell_system);
67        run_cell(&mut actor, &mut ctx, user_rx, sys_rx, &props_clone).await;
68    });
69
70    Ok(actor_ref)
71}
72
73async fn run_cell<A: Actor>(
74    actor: &mut A,
75    ctx: &mut Context<A>,
76    mut user_rx: mpsc::UnboundedReceiver<MessageEnvelope<A::Msg>>,
77    mut sys_rx: mpsc::UnboundedReceiver<SystemMsg>,
78    props: &Props<A>,
79) {
80    ctx.phase = super::context::LifecyclePhase::Starting;
81    actor.pre_start(ctx).await;
82    ctx.phase = super::context::LifecyclePhase::Running;
83
84    let supervisor_ref = props.supervisor_strategy.clone();
85
86    loop {
87        while let Ok(sys) = sys_rx.try_recv() {
88            if handle_system(actor, ctx, sys).await {
89                finalize(actor, ctx).await;
90                return;
91            }
92        }
93        if ctx.stopping {
94            finalize(actor, ctx).await;
95            return;
96        }
97
98        let timeout = ctx.receive_timeout;
99        let next: Either<A> = tokio::select! {
100            biased;
101            sys = sys_rx.recv() => Either::<A>::Sys(sys),
102            user = user_rx.recv() => Either::<A>::User(user),
103            _ = maybe_sleep(timeout), if timeout.is_some() => Either::<A>::Timeout,
104        };
105
106        match next {
107            Either::Sys(Some(s)) => {
108                if handle_system(actor, ctx, s).await {
109                    finalize(actor, ctx).await;
110                    return;
111                }
112            }
113            Either::User(Some(env)) => {
114                ctx.current_sender = env.sender;
115                if let Err(panic_msg) = run_handle(actor, ctx, env.message).await {
116                    let directive =
117                        supervisor_ref.as_ref().map(|s| s.decide(&panic_msg)).unwrap_or(Directive::Restart);
118                    match directive {
119                        Directive::Resume => {}
120                        Directive::Restart => {
121                            actor.pre_restart(ctx, &panic_msg).await;
122                            *actor = props.new_actor();
123                            actor.post_restart(ctx, &panic_msg).await;
124                        }
125                        Directive::Stop | Directive::Escalate => {
126                            finalize(actor, ctx).await;
127                            return;
128                        }
129                    }
130                }
131                ctx.current_sender = super::sender::Sender::None;
132            }
133            Either::Timeout => {
134                if handle_system(actor, ctx, SystemMsg::ReceiveTimeout).await {
135                    finalize(actor, ctx).await;
136                    return;
137                }
138            }
139            Either::Sys(None) | Either::User(None) => {
140                finalize(actor, ctx).await;
141                return;
142            }
143        }
144    }
145}
146
147enum Either<A: Actor> {
148    User(Option<MessageEnvelope<A::Msg>>),
149    Sys(Option<SystemMsg>),
150    Timeout,
151}
152
153async fn maybe_sleep(d: Option<Duration>) {
154    if let Some(d) = d {
155        tokio::time::sleep(d).await;
156    } else {
157        futures_util::future::pending::<()>().await;
158    }
159}
160
161async fn handle_system<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: SystemMsg) -> bool {
162    match msg {
163        SystemMsg::Stop => true,
164        SystemMsg::Restart(err) => {
165            actor.pre_restart(ctx, &err).await;
166            actor.post_restart(ctx, &err).await;
167            false
168        }
169        SystemMsg::Terminated(path) => {
170            tracing::debug!(self_path = %ctx.path, watched = %path, "watched actor terminated");
171            ctx.watching.remove(&path);
172            false
173        }
174        SystemMsg::Watch(subscriber) => {
175            ctx.watched_by.insert(subscriber);
176            false
177        }
178        SystemMsg::Unwatch(path) => {
179            ctx.watched_by.retain(|w| w.path() != &path);
180            false
181        }
182        SystemMsg::ReceiveTimeout => false,
183        SystemMsg::ChildFailed { name, error } => {
184            tracing::warn!(path = %ctx.path, child = %name, "child failed: {error}");
185            false
186        }
187    }
188}
189
190async fn run_handle<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: A::Msg) -> Result<(), String> {
191    use futures_util::FutureExt;
192    let fut = actor.handle(ctx, msg);
193    match std::panic::AssertUnwindSafe(fut).catch_unwind().await {
194        Ok(()) => Ok(()),
195        Err(p) => {
196            let s = panic_payload_to_string(p);
197            tracing::error!(path = %ctx.path, "handle panic: {s}");
198            Err(s)
199        }
200    }
201}
202
203fn panic_payload_to_string(p: Box<dyn std::any::Any + Send>) -> String {
204    if let Some(s) = p.downcast_ref::<&str>() {
205        s.to_string()
206    } else if let Some(s) = p.downcast_ref::<String>() {
207        s.clone()
208    } else {
209        "actor panic".to_string()
210    }
211}
212
213async fn finalize<A: Actor>(actor: &mut A, ctx: &mut Context<A>) {
214    ctx.phase = super::context::LifecyclePhase::Stopping;
215    actor.post_stop(ctx).await;
216    for w in ctx.watched_by.drain().collect::<Vec<_>>() {
217        w.notify_watchers(ctx.path.clone());
218    }
219    for (_, child) in std::mem::take(&mut ctx.children) {
220        let _ = child.system_tx.send(SystemMsg::Stop);
221    }
222    if let Some(system) = ctx.system.upgrade() {
223        if let Some(obs) = system.spawn_observer.read().as_ref() {
224            obs.on_stop(&ctx.path);
225        }
226    }
227}