Skip to main content

tokio_actors/actor/
runtime.rs

1use std::ops::ControlFlow;
2use std::sync::Arc;
3
4use tokio::runtime::Handle;
5use tokio::sync::mpsc;
6use tokio::task::JoinHandle;
7
8use crate::actor::supervision::{
9    evaluate_strategy, StrategyOutcome, SupervisionConfig, SupervisionState,
10};
11use crate::actor::{context::ActorContext, handle::ActorHandle, Actor, ActorEnvelope};
12use crate::error::SpawnError;
13use crate::system::{ActorSystem, RegistryGuard};
14use crate::types::{
15    ActorId, ActorStatus, ActorStatusInfo, ChildEvent, ChildStoppedInternal, Envelope, Shutdown,
16    StopReason, SupervisionAction, SystemMessage,
17};
18
19/// Configuration for the actor's mailbox.
20#[derive(Debug, Clone)]
21pub struct MailboxConfig {
22    /// The maximum number of messages the mailbox can hold.
23    pub capacity: usize,
24}
25
26impl Default for MailboxConfig {
27    fn default() -> Self {
28        Self { capacity: 64 }
29    }
30}
31
32impl MailboxConfig {
33    /// Sets the mailbox capacity.
34    pub fn with_capacity(mut self, capacity: usize) -> Self {
35        self.capacity = capacity;
36        self
37    }
38}
39
40/// Configuration for spawning an actor.
41#[derive(Debug, Clone, Default)]
42pub struct ActorConfig {
43    /// Mailbox configuration.
44    pub mailbox: MailboxConfig,
45    /// Supervision configuration. `Some` if this actor is a supervisor.
46    pub supervision: Option<SupervisionConfig>,
47}
48
49impl<'a> From<&'a ActorConfig> for ActorConfig {
50    fn from(value: &'a ActorConfig) -> Self {
51        value.clone()
52    }
53}
54
55impl ActorConfig {
56    /// Sets the mailbox capacity.
57    pub fn with_mailbox_capacity(mut self, capacity: usize) -> Self {
58        self.mailbox.capacity = capacity;
59        self
60    }
61
62    /// Sets the complete mailbox configuration.
63    pub fn with_mailbox(mut self, mailbox: MailboxConfig) -> Self {
64        self.mailbox = mailbox;
65        self
66    }
67
68    /// Enables supervision with default configuration (OneForOne, 3 restarts / 5s).
69    pub fn supervised(mut self) -> Self {
70        self.supervision = Some(SupervisionConfig::default());
71        self
72    }
73
74    /// Enables supervision with a custom configuration.
75    pub fn with_supervision(mut self, config: SupervisionConfig) -> Self {
76        self.supervision = Some(config);
77        self
78    }
79}
80
81// ---------------------------------------------------------------------------
82// Spawn
83// ---------------------------------------------------------------------------
84
85pub(crate) fn into_actor<A: Actor>(
86    id: impl Into<ActorId>,
87    actor: A,
88    config: impl Into<ActorConfig>,
89    name: Option<String>,
90    system: Option<Arc<ActorSystem>>,
91) -> Result<ActorHandle<A>, SpawnError> {
92    let (handle, _join) = spawn_actor(id.into(), actor, config.into(), name, system, None)?;
93    Ok(handle)
94}
95
96/// Spawns an actor, returning both the handle and the JoinHandle for the task.
97///
98/// `parent_system_tx` is set when this actor is spawned as a supervised child. The
99/// runtime will send `ChildStopped` events through this channel when the actor stops.
100pub(crate) fn spawn_actor<A: Actor>(
101    id: ActorId,
102    actor: A,
103    config: ActorConfig,
104    name: Option<String>,
105    system: Option<Arc<ActorSystem>>,
106    parent_system_tx: Option<mpsc::Sender<SystemMessage>>,
107) -> Result<(ActorHandle<A>, JoinHandle<()>), SpawnError> {
108    let handle = Handle::try_current().map_err(|_| SpawnError::MissingRuntime)?;
109    let mailbox_capacity = config.mailbox.capacity;
110    let (tx, rx) = mpsc::channel(mailbox_capacity);
111    let (system_tx, system_rx) = mpsc::channel::<SystemMessage>(64);
112    let actor_handle = ActorHandle::new(id.clone(), tx, system_tx.clone(), mailbox_capacity);
113
114    let supervision = config.supervision.map(SupervisionState::new);
115    let context = ActorContext::new(
116        id.clone(),
117        actor_handle.clone(),
118        handle.clone(),
119        system_tx,
120        system.clone(),
121        name.clone(),
122        supervision,
123    );
124
125    // Register in the target system when a name or explicit system is provided.
126    let guard = if name.is_some() || system.is_some() {
127        let target = system.unwrap_or_else(ActorSystem::default);
128        target.register_actor::<A>(&id, name.as_deref(), &actor_handle)?;
129        Some(RegistryGuard::new(target, id, name))
130    } else {
131        None
132    };
133
134    let join = handle.spawn(run_actor(
135        actor,
136        context,
137        rx,
138        system_rx,
139        parent_system_tx,
140        guard,
141    ));
142
143    Ok((actor_handle, join))
144}
145
146// ---------------------------------------------------------------------------
147// Actor run loop
148// ---------------------------------------------------------------------------
149
150async fn run_actor<A: Actor>(
151    mut actor: A,
152    mut ctx: ActorContext<A>,
153    mut mailbox: mpsc::Receiver<ActorEnvelope<A>>,
154    mut system_rx: mpsc::Receiver<SystemMessage>,
155    parent_system_tx: Option<mpsc::Sender<SystemMessage>>,
156    registry_guard: Option<RegistryGuard>,
157) {
158    // Phase 1: pre_start validation (fail-fast gate)
159    if let Err(err) = actor.pre_start(&mut ctx).await {
160        ctx.record_failure(err.clone());
161        ctx.set_status(ActorStatus::Stopped);
162        notify_parent(&parent_system_tx, ctx.actor_id(), StopReason::Failure(err));
163        return;
164    }
165
166    // Phase 2: on_started initialization
167    let mut stop_reason = match actor.on_started(&mut ctx).await {
168        Ok(()) => {
169            ctx.set_status(ActorStatus::Running);
170            StopReason::Graceful
171        }
172        Err(err) => {
173            ctx.record_failure(err.clone());
174            ctx.set_status(ActorStatus::Stopped);
175            notify_parent(&parent_system_tx, ctx.actor_id(), StopReason::Failure(err));
176            return;
177        }
178    };
179
180    // Phase 3: Message loop - biased select! gives system messages priority
181    loop {
182        tokio::select! {
183            biased;
184
185            // System channel - priority over user messages
186            sys_msg = system_rx.recv() => {
187                match sys_msg {
188                    Some(SystemMessage::Stop(reason)) => {
189                        // Tier 3 (Kill): bypass ALL callbacks, stop children, return
190                        if matches!(reason, StopReason::Kill) {
191                            ctx.set_status(ActorStatus::Stopped);
192                            stop_all_children(&mut ctx).await;
193                            notify_parent(&parent_system_tx, ctx.actor_id(), reason);
194                            drop(registry_guard);
195                            return;
196                        }
197                        // Tier 1 (Graceful/ParentRequest): pre_stop gate, vetoable
198                        if matches!(reason, StopReason::Graceful | StopReason::ParentRequest)
199                            && !actor.pre_stop(&reason, &mut ctx).await
200                        {
201                            continue; // Actor rejected the stop
202                        }
203                        // Tier 2 (Failure/Cancelled): non-vetoable, on_stopped still runs
204                        stop_reason = reason;
205                        break;
206                    }
207                    Some(SystemMessage::GetStatus(reply_tx)) => {
208                        let info = build_status_info(&ctx);
209                        let _ = reply_tx.send(info);
210                    }
211                    Some(SystemMessage::ChildStopped(event)) => {
212                        let action = handle_child_stopped(&mut actor, &mut ctx, &event).await;
213                        if matches!(action, SupervisionAction::BudgetExhausted) {
214                            stop_reason = StopReason::Failure(
215                                crate::error::SupervisionError::BudgetExhausted.into()
216                            );
217                            break;
218                        }
219                    }
220                    Some(SystemMessage::RestartComplete { seq, child_id, new_system_tx, new_join_handle }) => {
221                        if let Some(sup) = ctx.supervision_mut() {
222                            sup.registry.update_restarted(&child_id, seq, new_system_tx, new_join_handle);
223                        }
224                    }
225                    None => break, // system channel closed
226                }
227            }
228
229            // User mailbox
230            envelope = mailbox.recv() => {
231                match envelope {
232                    Some(env) => {
233                        match dispatch(&mut actor, &mut ctx, env).await {
234                            ControlFlow::Continue(()) => {}
235                            ControlFlow::Break(reason) => {
236                                stop_reason = reason;
237                                break;
238                            }
239                        }
240                    }
241                    None => break, // mailbox closed
242                }
243            }
244        }
245    }
246
247    // Phase 4: on_stopped
248    ctx.set_status(ActorStatus::Stopping);
249    if let Err(err) = actor.on_stopped(&stop_reason, &mut ctx).await {
250        stop_reason = StopReason::Failure(err);
251    }
252
253    // Phase 5: Stop all children (reverse start order)
254    stop_all_children(&mut ctx).await;
255
256    ctx.set_status(ActorStatus::Stopped);
257
258    // Phase 6: Notify parent
259    notify_parent(&parent_system_tx, ctx.actor_id(), stop_reason.clone());
260
261    #[cfg(feature = "tracing")]
262    tracing::info!(
263        actor_id = %ctx.actor_id(),
264        reason = %stop_reason,
265        "Actor stopped"
266    );
267
268    #[cfg(not(feature = "tracing"))]
269    let _ = stop_reason;
270
271    // Held for its Drop impl. RegistryGuard::drop unregisters the actor.
272    drop(registry_guard);
273}
274
275// ---------------------------------------------------------------------------
276// Dispatch (user messages only - Stop moved to system channel)
277// ---------------------------------------------------------------------------
278
279async fn dispatch<A: Actor>(
280    actor: &mut A,
281    ctx: &mut ActorContext<A>,
282    envelope: ActorEnvelope<A>,
283) -> ControlFlow<StopReason> {
284    match envelope {
285        Envelope::Message { payload, responder } => {
286            let outcome = actor.handle(payload, ctx).await;
287            match outcome {
288                Ok(response) => {
289                    if let Some(tx) = responder {
290                        let _ = tx.send(Ok(response));
291                    }
292                    ControlFlow::Continue(())
293                }
294                Err(err) => {
295                    if let Some(tx) = responder {
296                        // For send (request-response), return error and stop actor
297                        let _ = tx.send(Err(err.clone()));
298                        ControlFlow::Break(StopReason::Failure(err))
299                    } else {
300                        // For notify (fire-and-forget), call error handler but continue
301                        actor.handle_failure(err);
302                        ControlFlow::Continue(())
303                    }
304                }
305            }
306        }
307    }
308}
309
310// ---------------------------------------------------------------------------
311// Child supervision helpers
312// ---------------------------------------------------------------------------
313
314/// Handles a ChildStopped event: evaluates the strategy and notifies the actor.
315async fn handle_child_stopped<A: Actor>(
316    actor: &mut A,
317    ctx: &mut ActorContext<A>,
318    event: &ChildStoppedInternal,
319) -> SupervisionAction {
320    // Mark child as dead
321    let child_name = if let Some(sup) = ctx.supervision_mut() {
322        if let Some(child) = sup.registry.get_mut(&event.child_id) {
323            child.is_alive = false;
324            child.name.clone()
325        } else {
326            None
327        }
328    } else {
329        None
330    };
331
332    // Evaluate strategy
333    let action = if let Some(sup) = ctx.supervision_mut() {
334        match evaluate_strategy(sup, &event.child_id, &event.reason) {
335            StrategyOutcome::Restart(to_restart) => {
336                // Initiate restarts for all children in the list
337                for id in &to_restart {
338                    initiate_restart(ctx, id);
339                }
340                SupervisionAction::Restarted
341            }
342            StrategyOutcome::Remove => {
343                if let Some(sup) = ctx.supervision_mut() {
344                    // For SimpleOneForOne, remove the child spec entirely
345                    if matches!(
346                        sup.config.strategy,
347                        crate::types::RestartStrategy::SimpleOneForOne
348                    ) {
349                        sup.registry.remove(&event.child_id);
350                    }
351                }
352                SupervisionAction::Removed
353            }
354            StrategyOutcome::BudgetExhausted => SupervisionAction::BudgetExhausted,
355        }
356    } else {
357        SupervisionAction::NotSupervised
358    };
359
360    // Notify actor via on_child_stopped
361    let child_event = ChildEvent {
362        child_id: event.child_id.clone(),
363        child_name,
364        reason: event.reason.clone(),
365        action,
366    };
367    let _ = actor.on_child_stopped(&child_event, ctx).await;
368
369    action
370}
371
372/// Initiates a non-blocking restart for a child.
373fn initiate_restart<A: Actor>(ctx: &mut ActorContext<A>, child_id: &ActorId) {
374    let sup = match ctx.supervision_mut() {
375        Some(s) => s,
376        None => return,
377    };
378
379    // Get seq first, then mutate child
380    let seq = sup.registry.next_seq();
381
382    let child = match sup.registry.get_mut(child_id) {
383        Some(c) => c,
384        None => return,
385    };
386
387    child.pending_restart_seq = Some(seq);
388    child.is_alive = false;
389    let child_name = child.name.clone();
390
391    // Look up the type-erased restart function
392    if let Some(restart_fn) = sup.restart_fns.get(child_id) {
393        let fut = restart_fn(seq, child_name);
394        tokio::spawn(fut);
395    }
396}
397
398/// Stops all children in reverse start order, respecting their Shutdown policies.
399async fn stop_all_children<A: Actor>(ctx: &mut ActorContext<A>) {
400    let children = match ctx.supervision_mut() {
401        Some(sup) => sup.registry.drain_all(),
402        None => return,
403    };
404
405    // Stop in reverse order (last started = first stopped)
406    for child in children.into_iter().rev() {
407        if !child.is_alive {
408            continue;
409        }
410
411        match child.spec.shutdown {
412            Shutdown::Kill => {
413                let _ = child
414                    .system_tx
415                    .send(SystemMessage::Stop(StopReason::Kill))
416                    .await;
417            }
418            Shutdown::Timeout(duration) => {
419                let _ = child
420                    .system_tx
421                    .send(SystemMessage::Stop(StopReason::ParentRequest))
422                    .await;
423                let _ = tokio::time::timeout(duration, child.join_handle).await;
424            }
425            Shutdown::Infinity => {
426                let _ = child
427                    .system_tx
428                    .send(SystemMessage::Stop(StopReason::ParentRequest))
429                    .await;
430                let _ = child.join_handle.await;
431            }
432        }
433    }
434}
435
436// ---------------------------------------------------------------------------
437// Helper utilities
438// ---------------------------------------------------------------------------
439
440fn notify_parent(
441    parent_tx: &Option<mpsc::Sender<SystemMessage>>,
442    actor_id: &ActorId,
443    reason: StopReason,
444) {
445    if let Some(tx) = parent_tx {
446        let event = ChildStoppedInternal {
447            child_id: actor_id.clone(),
448            reason,
449        };
450        let _ = tx.try_send(SystemMessage::ChildStopped(event));
451    }
452}
453
454fn build_status_info<A: Actor>(ctx: &ActorContext<A>) -> ActorStatusInfo {
455    let (child_count, name) = ctx
456        .supervision_ref()
457        .map(|sup| (sup.registry.len(), ctx.actor_name().cloned()))
458        .unwrap_or((0, ctx.actor_name().cloned()));
459
460    ActorStatusInfo {
461        id: ctx.actor_id().clone(),
462        name,
463        status: ctx.status(),
464        mailbox_len: ctx.self_handle().mailbox_len(),
465        mailbox_capacity: ctx.self_handle().mailbox_capacity(),
466        child_count,
467        timer_count: ctx.active_timer_count(),
468        stream_count: ctx.active_stream_count(),
469    }
470}