1use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::sync::mpsc;
17
18use super::actor_ref::{ActorRef, UntypedActorRef};
19use super::context::Context;
20use super::path::ActorPath;
21use super::props::Props;
22use super::traits::{Actor, MessageEnvelope};
23use crate::supervision::{Directive, PanicPayload};
24
25#[derive(Debug)]
27pub enum SystemMsg {
28 Stop,
29 Restart(String),
30 Terminated(ActorPath),
31 Watch(UntypedActorRef),
32 Unwatch(ActorPath),
33 ReceiveTimeout,
34 ChildFailed {
35 name: String,
36 error: String,
37 },
38 Directive(Directive),
42}
43
44#[derive(Debug)]
46pub struct ChildEntry {
47 pub path: ActorPath,
51 #[allow(dead_code)]
52 pub untyped: UntypedActorRef,
53 pub system_tx: mpsc::UnboundedSender<SystemMsg>,
54}
55
56pub struct ActorCell<A: Actor> {
58 _marker: std::marker::PhantomData<A>,
59}
60
61pub(crate) fn spawn_cell<A: Actor>(
62 system: Arc<super::actor_system::ActorSystemInner>,
63 props: Props<A>,
64 path: ActorPath,
65) -> Result<ActorRef<A::Msg>, super::context::SpawnError> {
66 let (user_tx, user_rx) = mpsc::unbounded_channel::<MessageEnvelope<A::Msg>>();
67 let (sys_tx, sys_rx) = mpsc::unbounded_channel::<SystemMsg>();
68 let actor_ref = ActorRef::new(path.clone(), user_tx, sys_tx, Arc::downgrade(&system));
69
70 let cell_ref = actor_ref.clone();
71 let cell_system = Arc::downgrade(&system);
72 let props_clone = props.clone();
73 tokio::spawn(async move {
74 let mut actor = props_clone.new_actor();
75 let mut ctx = Context::<A>::new(cell_ref.clone(), path.clone(), cell_system);
76 run_cell(&mut actor, &mut ctx, user_rx, sys_rx, &props_clone).await;
77 });
78
79 Ok(actor_ref)
80}
81
82async fn run_cell<A: Actor>(
83 actor: &mut A,
84 ctx: &mut Context<A>,
85 mut user_rx: mpsc::UnboundedReceiver<MessageEnvelope<A::Msg>>,
86 mut sys_rx: mpsc::UnboundedReceiver<SystemMsg>,
87 props: &Props<A>,
88) {
89 ctx.phase = super::context::LifecyclePhase::Starting;
90 actor.pre_start(ctx).await;
91 ctx.phase = super::context::LifecyclePhase::Running;
92
93 let supervisor_ref = props.supervisor_strategy.clone();
94
95 let mut restart_history: VecDeque<Instant> = VecDeque::new();
103
104 loop {
105 while let Ok(sys) = sys_rx.try_recv() {
106 if handle_system(actor, ctx, sys).await {
107 finalize(actor, ctx).await;
108 return;
109 }
110 }
111 if ctx.stopping {
112 finalize(actor, ctx).await;
113 return;
114 }
115
116 let timeout = ctx.receive_timeout;
117 let next: Either<A> = tokio::select! {
118 biased;
119 sys = sys_rx.recv() => Either::<A>::Sys(sys),
120 user = user_rx.recv() => Either::<A>::User(user),
121 _ = maybe_sleep(timeout), if timeout.is_some() => Either::<A>::Timeout,
122 };
123
124 match next {
125 Either::Sys(Some(s)) => {
126 if handle_system(actor, ctx, s).await {
127 finalize(actor, ctx).await;
128 return;
129 }
130 }
131 Either::User(Some(env)) => {
132 ctx.current_sender = env.sender;
133 ctx.current_metadata = env.metadata;
134 let _span_guard = props.interceptor.as_ref().map(|i| i.before_handle(&ctx.current_metadata));
136 let handle_result = run_handle(actor, ctx, env.message).await;
137 drop(_span_guard);
138 if let Err(panic_msg) = handle_result {
139 let directive =
140 supervisor_ref.as_ref().map(|s| s.decide(&panic_msg)).unwrap_or(Directive::Restart);
141 match directive {
142 Directive::Resume => {}
143 Directive::Throttle { .. } | Directive::Suspend { .. } | Directive::ResumeFrom(_) => {
146 actor.on_directive(ctx, &directive).await;
147 }
148 Directive::Restart => {
149 let strategy = supervisor_ref.as_ref();
153 let max_retries = strategy.and_then(|s| s.max_retries);
154 if let Some(max) = max_retries {
155 let now = Instant::now();
156 if let Some(within) = strategy.and_then(|s| s.within) {
157 while let Some(front) = restart_history.front() {
158 if now.duration_since(*front) > within {
159 restart_history.pop_front();
160 } else {
161 break;
162 }
163 }
164 }
165 if (restart_history.len() as u32) + 1 > max {
166 tracing::warn!(
167 path = %ctx.path,
168 retries = restart_history.len(),
169 max,
170 "supervisor max_retries exceeded; escalating (stop)"
171 );
172 finalize(actor, ctx).await;
173 return;
174 }
175 restart_history.push_back(now);
176 }
177 actor.pre_restart(ctx, &panic_msg).await;
178 *actor = props.new_actor();
179 actor.post_restart(ctx, &panic_msg).await;
180 }
181 Directive::Stop | Directive::Escalate => {
182 finalize(actor, ctx).await;
183 return;
184 }
185 }
186 }
187 ctx.current_sender = super::sender::Sender::None;
188 ctx.current_metadata = super::metadata::Metadata::new();
189 }
190 Either::Timeout => {
191 if handle_system(actor, ctx, SystemMsg::ReceiveTimeout).await {
192 finalize(actor, ctx).await;
193 return;
194 }
195 }
196 Either::Sys(None) | Either::User(None) => {
197 finalize(actor, ctx).await;
198 return;
199 }
200 }
201 }
202}
203
204enum Either<A: Actor> {
205 User(Option<MessageEnvelope<A::Msg>>),
206 Sys(Option<SystemMsg>),
207 Timeout,
208}
209
210async fn maybe_sleep(d: Option<Duration>) {
211 if let Some(d) = d {
212 tokio::time::sleep(d).await;
213 } else {
214 futures_util::future::pending::<()>().await;
215 }
216}
217
218async fn handle_system<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: SystemMsg) -> bool {
219 match msg {
220 SystemMsg::Stop => true,
221 SystemMsg::Restart(err) => {
222 actor.pre_restart(ctx, &err).await;
223 actor.post_restart(ctx, &err).await;
224 false
225 }
226 SystemMsg::Terminated(path) => {
227 tracing::debug!(self_path = %ctx.path, watched = %path, "watched actor terminated");
228 ctx.watching.remove(&path);
229 actor.on_terminated(ctx, &path).await;
230 false
231 }
232 SystemMsg::Watch(subscriber) => {
233 ctx.watched_by.insert(subscriber);
234 false
235 }
236 SystemMsg::Unwatch(path) => {
237 ctx.watched_by.retain(|w| w.path() != &path);
238 false
239 }
240 SystemMsg::ReceiveTimeout => false,
241 SystemMsg::ChildFailed { name, error } => {
242 tracing::warn!(path = %ctx.path, child = %name, "child failed: {error}");
243 false
244 }
245 SystemMsg::Directive(directive) => {
246 actor.on_directive(ctx, &directive).await;
247 false
248 }
249 }
250}
251
252async fn run_handle<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: A::Msg) -> Result<(), String> {
253 use futures_util::FutureExt;
254 let fut = actor.handle(ctx, msg);
255 match std::panic::AssertUnwindSafe(fut).catch_unwind().await {
256 Ok(()) => Ok(()),
257 Err(p) => {
258 let s = panic_payload_to_string(p);
259 tracing::error!(path = %ctx.path, "handle panic: {s}");
260 Err(s)
261 }
262 }
263}
264
265fn panic_payload_to_string(p: Box<dyn std::any::Any + Send>) -> String {
266 if let Some(payload) = p.downcast_ref::<PanicPayload>() {
267 payload.to_wire()
268 } else if let Some(s) = p.downcast_ref::<&str>() {
269 s.to_string()
270 } else if let Some(s) = p.downcast_ref::<String>() {
271 s.clone()
272 } else {
273 "actor panic".to_string()
274 }
275}
276
277async fn finalize<A: Actor>(actor: &mut A, ctx: &mut Context<A>) {
278 ctx.phase = super::context::LifecyclePhase::Stopping;
279 actor.post_stop(ctx).await;
280 for (_, child) in std::mem::take(&mut ctx.children) {
281 let _ = child.system_tx.send(SystemMsg::Stop);
282 }
283 if let Some(system) = ctx.system.upgrade() {
284 if ctx.path.elements.len() == 2 && ctx.path.elements[0].as_str() == "user" {
290 let name = ctx.path.elements[1].as_str();
291 let mut guardian = system.user_guardian.lock();
292 if guardian.get(name).is_some_and(|c| c.path == ctx.path) {
296 guardian.remove(name);
297 }
298 }
299 if let Some(obs) = system.spawn_observer.read().as_ref() {
300 obs.on_stop(&ctx.path);
301 }
302 }
303 for w in ctx.watched_by.drain().collect::<Vec<_>>() {
307 w.notify_watchers(ctx.path.clone());
308 }
309}