atomr_core/actor/
actor_cell.rs1use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::sync::mpsc;
16
17use super::actor_ref::{ActorRef, UntypedActorRef};
18use super::context::Context;
19use super::path::ActorPath;
20use super::props::Props;
21use super::traits::{Actor, MessageEnvelope};
22use crate::supervision::Directive;
23
24#[derive(Debug)]
26pub enum SystemMsg {
27 Stop,
28 Restart(String),
29 Terminated(ActorPath),
30 Watch(UntypedActorRef),
31 Unwatch(ActorPath),
32 ReceiveTimeout,
33 ChildFailed { name: String, error: String },
34}
35
36#[derive(Debug)]
38pub struct ChildEntry {
39 #[allow(dead_code)]
41 pub path: ActorPath,
42 #[allow(dead_code)]
43 pub untyped: UntypedActorRef,
44 pub system_tx: mpsc::UnboundedSender<SystemMsg>,
45}
46
47pub struct ActorCell<A: Actor> {
49 _marker: std::marker::PhantomData<A>,
50}
51
52pub(crate) fn spawn_cell<A: Actor>(
53 system: Arc<super::actor_system::ActorSystemInner>,
54 props: Props<A>,
55 path: ActorPath,
56) -> Result<ActorRef<A::Msg>, super::context::SpawnError> {
57 let (user_tx, user_rx) = mpsc::unbounded_channel::<MessageEnvelope<A::Msg>>();
58 let (sys_tx, sys_rx) = mpsc::unbounded_channel::<SystemMsg>();
59 let actor_ref = ActorRef::new(path.clone(), user_tx, sys_tx, Arc::downgrade(&system));
60
61 let cell_ref = actor_ref.clone();
62 let cell_system = Arc::downgrade(&system);
63 let props_clone = props.clone();
64 tokio::spawn(async move {
65 let mut actor = props_clone.new_actor();
66 let mut ctx = Context::<A>::new(cell_ref.clone(), path.clone(), cell_system);
67 run_cell(&mut actor, &mut ctx, user_rx, sys_rx, &props_clone).await;
68 });
69
70 Ok(actor_ref)
71}
72
73async fn run_cell<A: Actor>(
74 actor: &mut A,
75 ctx: &mut Context<A>,
76 mut user_rx: mpsc::UnboundedReceiver<MessageEnvelope<A::Msg>>,
77 mut sys_rx: mpsc::UnboundedReceiver<SystemMsg>,
78 props: &Props<A>,
79) {
80 ctx.phase = super::context::LifecyclePhase::Starting;
81 actor.pre_start(ctx).await;
82 ctx.phase = super::context::LifecyclePhase::Running;
83
84 let supervisor_ref = props.supervisor_strategy.clone();
85
86 loop {
87 while let Ok(sys) = sys_rx.try_recv() {
88 if handle_system(actor, ctx, sys).await {
89 finalize(actor, ctx).await;
90 return;
91 }
92 }
93 if ctx.stopping {
94 finalize(actor, ctx).await;
95 return;
96 }
97
98 let timeout = ctx.receive_timeout;
99 let next: Either<A> = tokio::select! {
100 biased;
101 sys = sys_rx.recv() => Either::<A>::Sys(sys),
102 user = user_rx.recv() => Either::<A>::User(user),
103 _ = maybe_sleep(timeout), if timeout.is_some() => Either::<A>::Timeout,
104 };
105
106 match next {
107 Either::Sys(Some(s)) => {
108 if handle_system(actor, ctx, s).await {
109 finalize(actor, ctx).await;
110 return;
111 }
112 }
113 Either::User(Some(env)) => {
114 ctx.current_sender = env.sender;
115 if let Err(panic_msg) = run_handle(actor, ctx, env.message).await {
116 let directive =
117 supervisor_ref.as_ref().map(|s| s.decide(&panic_msg)).unwrap_or(Directive::Restart);
118 match directive {
119 Directive::Resume => {}
120 Directive::Restart => {
121 actor.pre_restart(ctx, &panic_msg).await;
122 *actor = props.new_actor();
123 actor.post_restart(ctx, &panic_msg).await;
124 }
125 Directive::Stop | Directive::Escalate => {
126 finalize(actor, ctx).await;
127 return;
128 }
129 }
130 }
131 ctx.current_sender = super::sender::Sender::None;
132 }
133 Either::Timeout => {
134 if handle_system(actor, ctx, SystemMsg::ReceiveTimeout).await {
135 finalize(actor, ctx).await;
136 return;
137 }
138 }
139 Either::Sys(None) | Either::User(None) => {
140 finalize(actor, ctx).await;
141 return;
142 }
143 }
144 }
145}
146
147enum Either<A: Actor> {
148 User(Option<MessageEnvelope<A::Msg>>),
149 Sys(Option<SystemMsg>),
150 Timeout,
151}
152
153async fn maybe_sleep(d: Option<Duration>) {
154 if let Some(d) = d {
155 tokio::time::sleep(d).await;
156 } else {
157 futures_util::future::pending::<()>().await;
158 }
159}
160
161async fn handle_system<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: SystemMsg) -> bool {
162 match msg {
163 SystemMsg::Stop => true,
164 SystemMsg::Restart(err) => {
165 actor.pre_restart(ctx, &err).await;
166 actor.post_restart(ctx, &err).await;
167 false
168 }
169 SystemMsg::Terminated(path) => {
170 tracing::debug!(self_path = %ctx.path, watched = %path, "watched actor terminated");
171 ctx.watching.remove(&path);
172 false
173 }
174 SystemMsg::Watch(subscriber) => {
175 ctx.watched_by.insert(subscriber);
176 false
177 }
178 SystemMsg::Unwatch(path) => {
179 ctx.watched_by.retain(|w| w.path() != &path);
180 false
181 }
182 SystemMsg::ReceiveTimeout => false,
183 SystemMsg::ChildFailed { name, error } => {
184 tracing::warn!(path = %ctx.path, child = %name, "child failed: {error}");
185 false
186 }
187 }
188}
189
190async fn run_handle<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: A::Msg) -> Result<(), String> {
191 use futures_util::FutureExt;
192 let fut = actor.handle(ctx, msg);
193 match std::panic::AssertUnwindSafe(fut).catch_unwind().await {
194 Ok(()) => Ok(()),
195 Err(p) => {
196 let s = panic_payload_to_string(p);
197 tracing::error!(path = %ctx.path, "handle panic: {s}");
198 Err(s)
199 }
200 }
201}
202
203fn panic_payload_to_string(p: Box<dyn std::any::Any + Send>) -> String {
204 if let Some(s) = p.downcast_ref::<&str>() {
205 s.to_string()
206 } else if let Some(s) = p.downcast_ref::<String>() {
207 s.clone()
208 } else {
209 "actor panic".to_string()
210 }
211}
212
213async fn finalize<A: Actor>(actor: &mut A, ctx: &mut Context<A>) {
214 ctx.phase = super::context::LifecyclePhase::Stopping;
215 actor.post_stop(ctx).await;
216 for w in ctx.watched_by.drain().collect::<Vec<_>>() {
217 w.notify_watchers(ctx.path.clone());
218 }
219 for (_, child) in std::mem::take(&mut ctx.children) {
220 let _ = child.system_tx.send(SystemMsg::Stop);
221 }
222 if let Some(system) = ctx.system.upgrade() {
223 if let Some(obs) = system.spawn_observer.read().as_ref() {
224 obs.on_stop(&ctx.path);
225 }
226 }
227}