1use std::ops::ControlFlow;
2use std::sync::Arc;
3
4use tokio::runtime::Handle;
5use tokio::sync::mpsc;
6use tokio::task::JoinHandle;
7
8use crate::actor::supervision::{
9 evaluate_strategy, StrategyOutcome, SupervisionConfig, SupervisionState,
10};
11use crate::actor::{context::ActorContext, handle::ActorHandle, Actor, ActorEnvelope};
12use crate::error::SpawnError;
13use crate::system::{ActorSystem, RegistryGuard};
14use crate::types::{
15 ActorId, ActorStatus, ActorStatusInfo, ChildEvent, ChildStoppedInternal, Envelope, Shutdown,
16 StopReason, SupervisionAction, SystemMessage,
17};
18
19#[derive(Debug, Clone)]
21pub struct MailboxConfig {
22 pub capacity: usize,
24}
25
26impl Default for MailboxConfig {
27 fn default() -> Self {
28 Self { capacity: 64 }
29 }
30}
31
32impl MailboxConfig {
33 pub fn with_capacity(mut self, capacity: usize) -> Self {
35 self.capacity = capacity;
36 self
37 }
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct ActorConfig {
43 pub mailbox: MailboxConfig,
45 pub supervision: Option<SupervisionConfig>,
47}
48
49impl<'a> From<&'a ActorConfig> for ActorConfig {
50 fn from(value: &'a ActorConfig) -> Self {
51 value.clone()
52 }
53}
54
55impl ActorConfig {
56 pub fn with_mailbox_capacity(mut self, capacity: usize) -> Self {
58 self.mailbox.capacity = capacity;
59 self
60 }
61
62 pub fn with_mailbox(mut self, mailbox: MailboxConfig) -> Self {
64 self.mailbox = mailbox;
65 self
66 }
67
68 pub fn supervised(mut self) -> Self {
70 self.supervision = Some(SupervisionConfig::default());
71 self
72 }
73
74 pub fn with_supervision(mut self, config: SupervisionConfig) -> Self {
76 self.supervision = Some(config);
77 self
78 }
79}
80
81pub(crate) fn into_actor<A: Actor>(
86 id: impl Into<ActorId>,
87 actor: A,
88 config: impl Into<ActorConfig>,
89 name: Option<String>,
90 system: Option<Arc<ActorSystem>>,
91) -> Result<ActorHandle<A>, SpawnError> {
92 let (handle, _join) = spawn_actor(id.into(), actor, config.into(), name, system, None)?;
93 Ok(handle)
94}
95
96pub(crate) fn spawn_actor<A: Actor>(
101 id: ActorId,
102 actor: A,
103 config: ActorConfig,
104 name: Option<String>,
105 system: Option<Arc<ActorSystem>>,
106 parent_system_tx: Option<mpsc::Sender<SystemMessage>>,
107) -> Result<(ActorHandle<A>, JoinHandle<()>), SpawnError> {
108 let handle = Handle::try_current().map_err(|_| SpawnError::MissingRuntime)?;
109 let mailbox_capacity = config.mailbox.capacity;
110 let (tx, rx) = mpsc::channel(mailbox_capacity);
111 let (system_tx, system_rx) = mpsc::channel::<SystemMessage>(64);
112 let actor_handle = ActorHandle::new(id.clone(), tx, system_tx.clone(), mailbox_capacity);
113
114 let supervision = config.supervision.map(SupervisionState::new);
115 let context = ActorContext::new(
116 id.clone(),
117 actor_handle.clone(),
118 handle.clone(),
119 system_tx,
120 system.clone(),
121 name.clone(),
122 supervision,
123 );
124
125 let guard = if name.is_some() || system.is_some() {
127 let target = system.unwrap_or_else(ActorSystem::default);
128 target.register_actor::<A>(&id, name.as_deref(), &actor_handle)?;
129 Some(RegistryGuard::new(target, id, name))
130 } else {
131 None
132 };
133
134 let join = handle.spawn(run_actor(
135 actor,
136 context,
137 rx,
138 system_rx,
139 parent_system_tx,
140 guard,
141 ));
142
143 Ok((actor_handle, join))
144}
145
146async fn run_actor<A: Actor>(
151 mut actor: A,
152 mut ctx: ActorContext<A>,
153 mut mailbox: mpsc::Receiver<ActorEnvelope<A>>,
154 mut system_rx: mpsc::Receiver<SystemMessage>,
155 parent_system_tx: Option<mpsc::Sender<SystemMessage>>,
156 registry_guard: Option<RegistryGuard>,
157) {
158 if let Err(err) = actor.pre_start(&mut ctx).await {
160 ctx.record_failure(err.clone());
161 ctx.set_status(ActorStatus::Stopped);
162 notify_parent(&parent_system_tx, ctx.actor_id(), StopReason::Failure(err));
163 return;
164 }
165
166 let mut stop_reason = match actor.on_started(&mut ctx).await {
168 Ok(()) => {
169 ctx.set_status(ActorStatus::Running);
170 StopReason::Graceful
171 }
172 Err(err) => {
173 ctx.record_failure(err.clone());
174 ctx.set_status(ActorStatus::Stopped);
175 notify_parent(&parent_system_tx, ctx.actor_id(), StopReason::Failure(err));
176 return;
177 }
178 };
179
180 loop {
182 tokio::select! {
183 biased;
184
185 sys_msg = system_rx.recv() => {
187 match sys_msg {
188 Some(SystemMessage::Stop(reason)) => {
189 if matches!(reason, StopReason::Kill) {
191 ctx.set_status(ActorStatus::Stopped);
192 stop_all_children(&mut ctx).await;
193 notify_parent(&parent_system_tx, ctx.actor_id(), reason);
194 drop(registry_guard);
195 return;
196 }
197 if matches!(reason, StopReason::Graceful | StopReason::ParentRequest)
199 && !actor.pre_stop(&reason, &mut ctx).await
200 {
201 continue; }
203 stop_reason = reason;
205 break;
206 }
207 Some(SystemMessage::GetStatus(reply_tx)) => {
208 let info = build_status_info(&ctx);
209 let _ = reply_tx.send(info);
210 }
211 Some(SystemMessage::ChildStopped(event)) => {
212 let action = handle_child_stopped(&mut actor, &mut ctx, &event).await;
213 if matches!(action, SupervisionAction::BudgetExhausted) {
214 stop_reason = StopReason::Failure(
215 crate::error::SupervisionError::BudgetExhausted.into()
216 );
217 break;
218 }
219 }
220 Some(SystemMessage::RestartComplete { seq, child_id, new_system_tx, new_join_handle }) => {
221 if let Some(sup) = ctx.supervision_mut() {
222 sup.registry.update_restarted(&child_id, seq, new_system_tx, new_join_handle);
223 }
224 }
225 None => break, }
227 }
228
229 envelope = mailbox.recv() => {
231 match envelope {
232 Some(env) => {
233 match dispatch(&mut actor, &mut ctx, env).await {
234 ControlFlow::Continue(()) => {}
235 ControlFlow::Break(reason) => {
236 stop_reason = reason;
237 break;
238 }
239 }
240 }
241 None => break, }
243 }
244 }
245 }
246
247 ctx.set_status(ActorStatus::Stopping);
249 if let Err(err) = actor.on_stopped(&stop_reason, &mut ctx).await {
250 stop_reason = StopReason::Failure(err);
251 }
252
253 stop_all_children(&mut ctx).await;
255
256 ctx.set_status(ActorStatus::Stopped);
257
258 notify_parent(&parent_system_tx, ctx.actor_id(), stop_reason.clone());
260
261 #[cfg(feature = "tracing")]
262 tracing::info!(
263 actor_id = %ctx.actor_id(),
264 reason = %stop_reason,
265 "Actor stopped"
266 );
267
268 #[cfg(not(feature = "tracing"))]
269 let _ = stop_reason;
270
271 drop(registry_guard);
273}
274
275async fn dispatch<A: Actor>(
280 actor: &mut A,
281 ctx: &mut ActorContext<A>,
282 envelope: ActorEnvelope<A>,
283) -> ControlFlow<StopReason> {
284 match envelope {
285 Envelope::Message { payload, responder } => {
286 let outcome = actor.handle(payload, ctx).await;
287 match outcome {
288 Ok(response) => {
289 if let Some(tx) = responder {
290 let _ = tx.send(Ok(response));
291 }
292 ControlFlow::Continue(())
293 }
294 Err(err) => {
295 if let Some(tx) = responder {
296 let _ = tx.send(Err(err.clone()));
298 ControlFlow::Break(StopReason::Failure(err))
299 } else {
300 actor.handle_failure(err);
302 ControlFlow::Continue(())
303 }
304 }
305 }
306 }
307 }
308}
309
310async fn handle_child_stopped<A: Actor>(
316 actor: &mut A,
317 ctx: &mut ActorContext<A>,
318 event: &ChildStoppedInternal,
319) -> SupervisionAction {
320 let child_name = if let Some(sup) = ctx.supervision_mut() {
322 if let Some(child) = sup.registry.get_mut(&event.child_id) {
323 child.is_alive = false;
324 child.name.clone()
325 } else {
326 None
327 }
328 } else {
329 None
330 };
331
332 let action = if let Some(sup) = ctx.supervision_mut() {
334 match evaluate_strategy(sup, &event.child_id, &event.reason) {
335 StrategyOutcome::Restart(to_restart) => {
336 for id in &to_restart {
338 initiate_restart(ctx, id);
339 }
340 SupervisionAction::Restarted
341 }
342 StrategyOutcome::Remove => {
343 if let Some(sup) = ctx.supervision_mut() {
344 if matches!(
346 sup.config.strategy,
347 crate::types::RestartStrategy::SimpleOneForOne
348 ) {
349 sup.registry.remove(&event.child_id);
350 }
351 }
352 SupervisionAction::Removed
353 }
354 StrategyOutcome::BudgetExhausted => SupervisionAction::BudgetExhausted,
355 }
356 } else {
357 SupervisionAction::NotSupervised
358 };
359
360 let child_event = ChildEvent {
362 child_id: event.child_id.clone(),
363 child_name,
364 reason: event.reason.clone(),
365 action,
366 };
367 let _ = actor.on_child_stopped(&child_event, ctx).await;
368
369 action
370}
371
372fn initiate_restart<A: Actor>(ctx: &mut ActorContext<A>, child_id: &ActorId) {
374 let sup = match ctx.supervision_mut() {
375 Some(s) => s,
376 None => return,
377 };
378
379 let seq = sup.registry.next_seq();
381
382 let child = match sup.registry.get_mut(child_id) {
383 Some(c) => c,
384 None => return,
385 };
386
387 child.pending_restart_seq = Some(seq);
388 child.is_alive = false;
389 let child_name = child.name.clone();
390
391 if let Some(restart_fn) = sup.restart_fns.get(child_id) {
393 let fut = restart_fn(seq, child_name);
394 tokio::spawn(fut);
395 }
396}
397
398async fn stop_all_children<A: Actor>(ctx: &mut ActorContext<A>) {
400 let children = match ctx.supervision_mut() {
401 Some(sup) => sup.registry.drain_all(),
402 None => return,
403 };
404
405 for child in children.into_iter().rev() {
407 if !child.is_alive {
408 continue;
409 }
410
411 match child.spec.shutdown {
412 Shutdown::Kill => {
413 let _ = child
414 .system_tx
415 .send(SystemMessage::Stop(StopReason::Kill))
416 .await;
417 }
418 Shutdown::Timeout(duration) => {
419 let _ = child
420 .system_tx
421 .send(SystemMessage::Stop(StopReason::ParentRequest))
422 .await;
423 let _ = tokio::time::timeout(duration, child.join_handle).await;
424 }
425 Shutdown::Infinity => {
426 let _ = child
427 .system_tx
428 .send(SystemMessage::Stop(StopReason::ParentRequest))
429 .await;
430 let _ = child.join_handle.await;
431 }
432 }
433 }
434}
435
436fn notify_parent(
441 parent_tx: &Option<mpsc::Sender<SystemMessage>>,
442 actor_id: &ActorId,
443 reason: StopReason,
444) {
445 if let Some(tx) = parent_tx {
446 let event = ChildStoppedInternal {
447 child_id: actor_id.clone(),
448 reason,
449 };
450 let _ = tx.try_send(SystemMessage::ChildStopped(event));
451 }
452}
453
454fn build_status_info<A: Actor>(ctx: &ActorContext<A>) -> ActorStatusInfo {
455 let (child_count, name) = ctx
456 .supervision_ref()
457 .map(|sup| (sup.registry.len(), ctx.actor_name().cloned()))
458 .unwrap_or((0, ctx.actor_name().cloned()));
459
460 ActorStatusInfo {
461 id: ctx.actor_id().clone(),
462 name,
463 status: ctx.status(),
464 mailbox_len: ctx.self_handle().mailbox_len(),
465 mailbox_capacity: ctx.self_handle().mailbox_capacity(),
466 child_count,
467 timer_count: ctx.active_timer_count(),
468 stream_count: ctx.active_stream_count(),
469 }
470}