atomr_core/actor/
actor_cell.rs1use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::sync::mpsc;
17
18use super::actor_ref::{ActorRef, UntypedActorRef};
19use super::context::Context;
20use super::path::ActorPath;
21use super::props::Props;
22use super::traits::{Actor, MessageEnvelope};
23use crate::supervision::{Directive, PanicPayload};
24
25#[derive(Debug)]
27pub enum SystemMsg {
28 Stop,
29 Restart(String),
30 Terminated(ActorPath),
31 Watch(UntypedActorRef),
32 Unwatch(ActorPath),
33 ReceiveTimeout,
34 ChildFailed { name: String, error: String },
35}
36
37#[derive(Debug)]
39pub struct ChildEntry {
40 pub path: ActorPath,
44 #[allow(dead_code)]
45 pub untyped: UntypedActorRef,
46 pub system_tx: mpsc::UnboundedSender<SystemMsg>,
47}
48
49pub struct ActorCell<A: Actor> {
51 _marker: std::marker::PhantomData<A>,
52}
53
54pub(crate) fn spawn_cell<A: Actor>(
55 system: Arc<super::actor_system::ActorSystemInner>,
56 props: Props<A>,
57 path: ActorPath,
58) -> Result<ActorRef<A::Msg>, super::context::SpawnError> {
59 let (user_tx, user_rx) = mpsc::unbounded_channel::<MessageEnvelope<A::Msg>>();
60 let (sys_tx, sys_rx) = mpsc::unbounded_channel::<SystemMsg>();
61 let actor_ref = ActorRef::new(path.clone(), user_tx, sys_tx, Arc::downgrade(&system));
62
63 let cell_ref = actor_ref.clone();
64 let cell_system = Arc::downgrade(&system);
65 let props_clone = props.clone();
66 tokio::spawn(async move {
67 let mut actor = props_clone.new_actor();
68 let mut ctx = Context::<A>::new(cell_ref.clone(), path.clone(), cell_system);
69 run_cell(&mut actor, &mut ctx, user_rx, sys_rx, &props_clone).await;
70 });
71
72 Ok(actor_ref)
73}
74
75async fn run_cell<A: Actor>(
76 actor: &mut A,
77 ctx: &mut Context<A>,
78 mut user_rx: mpsc::UnboundedReceiver<MessageEnvelope<A::Msg>>,
79 mut sys_rx: mpsc::UnboundedReceiver<SystemMsg>,
80 props: &Props<A>,
81) {
82 ctx.phase = super::context::LifecyclePhase::Starting;
83 actor.pre_start(ctx).await;
84 ctx.phase = super::context::LifecyclePhase::Running;
85
86 let supervisor_ref = props.supervisor_strategy.clone();
87
88 let mut restart_history: VecDeque<Instant> = VecDeque::new();
96
97 loop {
98 while let Ok(sys) = sys_rx.try_recv() {
99 if handle_system(actor, ctx, sys).await {
100 finalize(actor, ctx).await;
101 return;
102 }
103 }
104 if ctx.stopping {
105 finalize(actor, ctx).await;
106 return;
107 }
108
109 let timeout = ctx.receive_timeout;
110 let next: Either<A> = tokio::select! {
111 biased;
112 sys = sys_rx.recv() => Either::<A>::Sys(sys),
113 user = user_rx.recv() => Either::<A>::User(user),
114 _ = maybe_sleep(timeout), if timeout.is_some() => Either::<A>::Timeout,
115 };
116
117 match next {
118 Either::Sys(Some(s)) => {
119 if handle_system(actor, ctx, s).await {
120 finalize(actor, ctx).await;
121 return;
122 }
123 }
124 Either::User(Some(env)) => {
125 ctx.current_sender = env.sender;
126 if let Err(panic_msg) = run_handle(actor, ctx, env.message).await {
127 let directive =
128 supervisor_ref.as_ref().map(|s| s.decide(&panic_msg)).unwrap_or(Directive::Restart);
129 match directive {
130 Directive::Resume => {}
131 Directive::Restart => {
132 let strategy = supervisor_ref.as_ref();
136 let max_retries = strategy.and_then(|s| s.max_retries);
137 if let Some(max) = max_retries {
138 let now = Instant::now();
139 if let Some(within) = strategy.and_then(|s| s.within) {
140 while let Some(front) = restart_history.front() {
141 if now.duration_since(*front) > within {
142 restart_history.pop_front();
143 } else {
144 break;
145 }
146 }
147 }
148 if (restart_history.len() as u32) + 1 > max {
149 tracing::warn!(
150 path = %ctx.path,
151 retries = restart_history.len(),
152 max,
153 "supervisor max_retries exceeded; escalating (stop)"
154 );
155 finalize(actor, ctx).await;
156 return;
157 }
158 restart_history.push_back(now);
159 }
160 actor.pre_restart(ctx, &panic_msg).await;
161 *actor = props.new_actor();
162 actor.post_restart(ctx, &panic_msg).await;
163 }
164 Directive::Stop | Directive::Escalate => {
165 finalize(actor, ctx).await;
166 return;
167 }
168 }
169 }
170 ctx.current_sender = super::sender::Sender::None;
171 }
172 Either::Timeout => {
173 if handle_system(actor, ctx, SystemMsg::ReceiveTimeout).await {
174 finalize(actor, ctx).await;
175 return;
176 }
177 }
178 Either::Sys(None) | Either::User(None) => {
179 finalize(actor, ctx).await;
180 return;
181 }
182 }
183 }
184}
185
186enum Either<A: Actor> {
187 User(Option<MessageEnvelope<A::Msg>>),
188 Sys(Option<SystemMsg>),
189 Timeout,
190}
191
192async fn maybe_sleep(d: Option<Duration>) {
193 if let Some(d) = d {
194 tokio::time::sleep(d).await;
195 } else {
196 futures_util::future::pending::<()>().await;
197 }
198}
199
200async fn handle_system<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: SystemMsg) -> bool {
201 match msg {
202 SystemMsg::Stop => true,
203 SystemMsg::Restart(err) => {
204 actor.pre_restart(ctx, &err).await;
205 actor.post_restart(ctx, &err).await;
206 false
207 }
208 SystemMsg::Terminated(path) => {
209 tracing::debug!(self_path = %ctx.path, watched = %path, "watched actor terminated");
210 ctx.watching.remove(&path);
211 actor.on_terminated(ctx, &path).await;
212 false
213 }
214 SystemMsg::Watch(subscriber) => {
215 ctx.watched_by.insert(subscriber);
216 false
217 }
218 SystemMsg::Unwatch(path) => {
219 ctx.watched_by.retain(|w| w.path() != &path);
220 false
221 }
222 SystemMsg::ReceiveTimeout => false,
223 SystemMsg::ChildFailed { name, error } => {
224 tracing::warn!(path = %ctx.path, child = %name, "child failed: {error}");
225 false
226 }
227 }
228}
229
230async fn run_handle<A: Actor>(actor: &mut A, ctx: &mut Context<A>, msg: A::Msg) -> Result<(), String> {
231 use futures_util::FutureExt;
232 let fut = actor.handle(ctx, msg);
233 match std::panic::AssertUnwindSafe(fut).catch_unwind().await {
234 Ok(()) => Ok(()),
235 Err(p) => {
236 let s = panic_payload_to_string(p);
237 tracing::error!(path = %ctx.path, "handle panic: {s}");
238 Err(s)
239 }
240 }
241}
242
243fn panic_payload_to_string(p: Box<dyn std::any::Any + Send>) -> String {
244 if let Some(payload) = p.downcast_ref::<PanicPayload>() {
245 payload.to_wire()
246 } else if let Some(s) = p.downcast_ref::<&str>() {
247 s.to_string()
248 } else if let Some(s) = p.downcast_ref::<String>() {
249 s.clone()
250 } else {
251 "actor panic".to_string()
252 }
253}
254
255async fn finalize<A: Actor>(actor: &mut A, ctx: &mut Context<A>) {
256 ctx.phase = super::context::LifecyclePhase::Stopping;
257 actor.post_stop(ctx).await;
258 for (_, child) in std::mem::take(&mut ctx.children) {
259 let _ = child.system_tx.send(SystemMsg::Stop);
260 }
261 if let Some(system) = ctx.system.upgrade() {
262 if ctx.path.elements.len() == 2 && ctx.path.elements[0].as_str() == "user" {
268 let name = ctx.path.elements[1].as_str();
269 let mut guardian = system.user_guardian.lock();
270 if guardian.get(name).is_some_and(|c| c.path == ctx.path) {
274 guardian.remove(name);
275 }
276 }
277 if let Some(obs) = system.spawn_observer.read().as_ref() {
278 obs.on_stop(&ctx.path);
279 }
280 }
281 for w in ctx.watched_by.drain().collect::<Vec<_>>() {
285 w.notify_watchers(ctx.path.clone());
286 }
287}