ractor/
actor.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! This module contains the basic building blocks of an actor.
7//!
8//! They are:
9//! [Actor]: The behavior definition for an actor's internal processing logic + state management
10//! [ActorRuntime]: Management structure processing the message handler, signals, and supervision events in a loop
11//!
12//! ## Actor Supervision
13//!
14//! Supervision is a special notion of "ownership" over actors by a parent (supervisor).
15//! Supervisors are responsible for the lifecycle of a child actor such that they get notified
16//! when a child actor starts, stops, or panics (when possible). The supervisor can then decide
17//! how to handle the event. Should it restart the actor, leave it dead, potentially die itself
18//! notifying the supervisor's supervisor? That's up to the implementation of the [super::Actor]
19//!
20//! This is currently an initial implementation of [Erlang supervisors](https://www.erlang.org/doc/man/supervisor.html)
21//!
22//! An example supervision tree may look like:
23//!
24//! ```text
25//! Root/
26//! ├─ Actor A/
27//! │  ├─ Actor A_1/
28//! │  ├─ Actor A_2/
29//! ├─ Actor B/
30//! ├─ Actor C/
31//! │  ├─ Actor C_1/
32//! │  │  ├─ Actor C_1_1/
33//! │  │  │  ├─ Actor C_1_1_1/
34//! ```
35//!
36//! To link actors together in the supervision tree, there are 2 choices.
37//!
38//! 1. [Actor::spawn_linked] which requires supplying the supervisor to the actor upon spawning a child.
39//!    This call will link the supervision tree as early as possible in the lifecycle of the actors,
40//!    such that a failure or panic in `post_start` will be captured and notify the supervisor
41//! 2. `ActorCell::link` will link actors together after-the-fact, once already spawned. This is helpful
42//!    for actors which are originally created independently but have some latent relationship to each
43//!    other. However due to startup routines and asynchronous processing, it's unlikely that failures
44//!    in `post_start` and any other asynchronous handling will be captured in the supervision tree.
45//!
46//! ## Handling panics
47//!
48//! Another point to consider in actor frameworks are `panic!`s. The actor runtime captures and transforms
49//! a panic in an actor into the string message equivalent upon exit. However the traditional panic will still
50//! log to `stderr` for tracing. You can additionally setup a [panic hook](https://doc.rust-lang.org/std/panic/fn.set_hook.html)
51//! to do things like capturing backtraces on the unwinding panic.
52
53use std::fmt::Debug;
54#[cfg(not(feature = "async-trait"))]
55use std::future::Future;
56use std::panic::AssertUnwindSafe;
57
58use actor_properties::MuxedMessage;
59use futures::TryFutureExt;
60use tracing::Instrument;
61
62use crate::concurrency::JoinHandle;
63use crate::ActorId;
64pub mod messages;
65use messages::*;
66
67pub mod actor_cell;
68pub mod actor_id;
69pub(crate) mod actor_properties;
70pub mod actor_ref;
71pub mod derived_actor;
72mod supervision;
73
74#[cfg(test)]
75mod supervision_tests;
76#[cfg(test)]
77mod tests;
78
79use actor_cell::ActorCell;
80use actor_cell::ActorPortSet;
81use actor_cell::ActorStatus;
82use actor_ref::ActorRef;
83
84use crate::errors::ActorErr;
85use crate::errors::ActorProcessingErr;
86use crate::errors::MessagingErr;
87use crate::errors::SpawnErr;
88use crate::ActorName;
89use crate::Message;
90use crate::State;
91
92pub(crate) fn get_panic_string(e: Box<dyn std::any::Any + Send>) -> ActorProcessingErr {
93    match e.downcast::<String>() {
94        Ok(v) => From::from(*v),
95        Err(e) => match e.downcast::<&str>() {
96            Ok(v) => From::from(*v),
97            _ => From::from("Unknown panic occurred which couldn't be coerced to a string"),
98        },
99    }
100}
101
102/// [Actor] defines the behavior of an Actor. It specifies the
103/// Message type, State type, and all processing logic for the actor
104///
105/// Additionally it aliases the calls for `spawn` and `spawn_linked` from
106/// [ActorRuntime] for convenient startup + lifecycle management
107///
108/// NOTE: All of the implemented trait functions
109///
110/// * `pre_start`
111/// * `post_start`
112/// * `post_stop`
113/// * `handle`
114/// * `handle_serialized` (Available with `cluster` feature only)
115/// * `handle_supervisor_evt`
116///
117/// return a [Result<_, ActorProcessingError>] where the error type is an
118/// alias of [Box<dyn std::error::Error + Send + Sync + 'static>]. This is treated
119/// as an "unhandled" error and will terminate the actor + execute necessary supervision
120/// patterns. Panics are also captured from the inner functions and wrapped into an Error
121/// type, however should an [Err(_)] result from any of these functions the **actor will
122/// terminate** and cleanup.
123#[cfg_attr(feature = "async-trait", crate::async_trait)]
124pub trait Actor: Sized + Sync + Send + 'static {
125    /// The message type for this actor
126    type Msg: Message;
127
128    /// The type of state this actor manages internally
129    type State: State;
130
131    /// Initialization arguments
132    type Arguments: State;
133
134    /// Invoked when an actor is being started by the system.
135    ///
136    /// Any initialization inherent to the actor's role should be
137    /// performed here hence why it returns the initial state.
138    ///
139    /// Panics in `pre_start` do not invoke the
140    /// supervision strategy and the actor won't be started. [Actor]::`spawn`
141    /// will return an error to the caller
142    ///
143    /// * `myself` - A handle to the [ActorCell] representing this actor
144    /// * `args` - Arguments that are passed in the spawning of the actor which might
145    ///   be necessary to construct the initial state
146    ///
147    /// Returns an initial [Actor::State] to bootstrap the actor
148    #[cfg(not(feature = "async-trait"))]
149    fn pre_start(
150        &self,
151        myself: ActorRef<Self::Msg>,
152        args: Self::Arguments,
153    ) -> impl Future<Output = Result<Self::State, ActorProcessingErr>> + Send;
154    /// Invoked when an actor is being started by the system.
155    ///
156    /// Any initialization inherent to the actor's role should be
157    /// performed here hence why it returns the initial state.
158    ///
159    /// Panics in `pre_start` do not invoke the
160    /// supervision strategy and the actor won't be started. [Actor]::`spawn`
161    /// will return an error to the caller
162    ///
163    /// * `myself` - A handle to the [ActorCell] representing this actor
164    /// * `args` - Arguments that are passed in the spawning of the actor which might
165    /// be necessary to construct the initial state
166    ///
167    /// Returns an initial [Actor::State] to bootstrap the actor
168    #[cfg(feature = "async-trait")]
169    async fn pre_start(
170        &self,
171        myself: ActorRef<Self::Msg>,
172        args: Self::Arguments,
173    ) -> Result<Self::State, ActorProcessingErr>;
174
175    /// Invoked after an actor has started.
176    ///
177    /// Any post initialization can be performed here, such as writing
178    /// to a log file, emitting metrics.
179    ///
180    /// Panics in `post_start` follow the supervision strategy.
181    ///
182    /// * `myself` - A handle to the [ActorCell] representing this actor
183    /// * `state` - A mutable reference to the internal actor's state
184    #[allow(unused_variables)]
185    #[cfg(not(feature = "async-trait"))]
186    fn post_start(
187        &self,
188        myself: ActorRef<Self::Msg>,
189        state: &mut Self::State,
190    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
191        async { Ok(()) }
192    }
193
194    /// Invoked after an actor has started.
195    ///
196    /// Any post initialization can be performed here, such as writing
197    /// to a log file, emitting metrics.
198    ///
199    /// Panics in `post_start` follow the supervision strategy.
200    ///
201    /// * `myself` - A handle to the [ActorCell] representing this actor
202    /// * `state` - A mutable reference to the internal actor's state
203    #[allow(unused_variables)]
204    #[cfg(feature = "async-trait")]
205    async fn post_start(
206        &self,
207        myself: ActorRef<Self::Msg>,
208        state: &mut Self::State,
209    ) -> Result<(), ActorProcessingErr> {
210        Ok(())
211    }
212
213    /// Invoked after an actor has been stopped to perform final cleanup. In the
214    /// event the actor is terminated with [Signal::Kill] or has self-panicked,
215    /// `post_stop` won't be called.
216    ///
217    /// Panics in `post_stop` follow the supervision strategy.
218    ///
219    /// * `myself` - A handle to the [ActorCell] representing this actor
220    /// * `state` - A mutable reference to the internal actor's last known state
221    #[allow(unused_variables)]
222    #[cfg(not(feature = "async-trait"))]
223    fn post_stop(
224        &self,
225        myself: ActorRef<Self::Msg>,
226        state: &mut Self::State,
227    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
228        async { Ok(()) }
229    }
230    /// Invoked after an actor has been stopped to perform final cleanup. In the
231    /// event the actor is terminated with [Signal::Kill] or has self-panicked,
232    /// `post_stop` won't be called.
233    ///
234    /// Panics in `post_stop` follow the supervision strategy.
235    ///
236    /// * `myself` - A handle to the [ActorCell] representing this actor
237    /// * `state` - A mutable reference to the internal actor's last known state
238    #[allow(unused_variables)]
239    #[cfg(feature = "async-trait")]
240    async fn post_stop(
241        &self,
242        myself: ActorRef<Self::Msg>,
243        state: &mut Self::State,
244    ) -> Result<(), ActorProcessingErr> {
245        Ok(())
246    }
247
248    /// Handle the incoming message from the event processing loop. Unhandled panickes will be
249    /// captured and sent to the supervisor(s)
250    ///
251    /// * `myself` - A handle to the [ActorCell] representing this actor
252    /// * `message` - The message to process
253    /// * `state` - A mutable reference to the internal actor's state
254    #[allow(unused_variables)]
255    #[cfg(not(feature = "async-trait"))]
256    fn handle(
257        &self,
258        myself: ActorRef<Self::Msg>,
259        message: Self::Msg,
260        state: &mut Self::State,
261    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
262        async { Ok(()) }
263    }
264    /// Handle the incoming message from the event processing loop. Unhandled panickes will be
265    /// captured and sent to the supervisor(s)
266    ///
267    /// * `myself` - A handle to the [ActorCell] representing this actor
268    /// * `message` - The message to process
269    /// * `state` - A mutable reference to the internal actor's state
270    #[allow(unused_variables)]
271    #[cfg(feature = "async-trait")]
272    async fn handle(
273        &self,
274        myself: ActorRef<Self::Msg>,
275        message: Self::Msg,
276        state: &mut Self::State,
277    ) -> Result<(), ActorProcessingErr> {
278        Ok(())
279    }
280
281    /// Handle the remote incoming message from the event processing loop. Unhandled panickes will be
282    /// captured and sent to the supervisor(s)
283    ///
284    /// * `myself` - A handle to the [ActorCell] representing this actor
285    /// * `message` - The serialized message to handle
286    /// * `state` - A mutable reference to the internal actor's state
287    #[allow(unused_variables)]
288    #[cfg(all(feature = "cluster", not(feature = "async-trait")))]
289    fn handle_serialized(
290        &self,
291        myself: ActorRef<Self::Msg>,
292        message: crate::message::SerializedMessage,
293        state: &mut Self::State,
294    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
295        async { Ok(()) }
296    }
297    /// Handle the remote incoming message from the event processing loop. Unhandled panickes will be
298    /// captured and sent to the supervisor(s)
299    ///
300    /// * `myself` - A handle to the [ActorCell] representing this actor
301    /// * `message` - The serialized message to handle
302    /// * `state` - A mutable reference to the internal actor's state
303    #[allow(unused_variables)]
304    #[cfg(all(feature = "cluster", feature = "async-trait"))]
305    async fn handle_serialized(
306        &self,
307        myself: ActorRef<Self::Msg>,
308        message: crate::message::SerializedMessage,
309        state: &mut Self::State,
310    ) -> Result<(), ActorProcessingErr> {
311        Ok(())
312    }
313
314    /// Handle the incoming supervision event. Unhandled panics will be captured and
315    /// sent the the supervisor(s). The default supervision behavior is to exit the
316    /// supervisor on any child exit. To override this behavior, implement this function.
317    ///
318    /// * `myself` - A handle to the [ActorCell] representing this actor
319    /// * `message` - The message to process
320    /// * `state` - A mutable reference to the internal actor's state
321    #[allow(unused_variables)]
322    #[cfg(not(feature = "async-trait"))]
323    fn handle_supervisor_evt(
324        &self,
325        myself: ActorRef<Self::Msg>,
326        message: SupervisionEvent,
327        state: &mut Self::State,
328    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
329        async move {
330            match message {
331                SupervisionEvent::ActorTerminated(who, _, _)
332                | SupervisionEvent::ActorFailed(who, _) => {
333                    myself.stop(None);
334                }
335                _ => {}
336            }
337            Ok(())
338        }
339    }
340    /// Handle the incoming supervision event. Unhandled panics will be captured and
341    /// sent the the supervisor(s). The default supervision behavior is to exit the
342    /// supervisor on any child exit. To override this behavior, implement this function.
343    ///
344    /// * `myself` - A handle to the [ActorCell] representing this actor
345    /// * `message` - The message to process
346    /// * `state` - A mutable reference to the internal actor's state
347    #[allow(unused_variables)]
348    #[cfg(feature = "async-trait")]
349    async fn handle_supervisor_evt(
350        &self,
351        myself: ActorRef<Self::Msg>,
352        message: SupervisionEvent,
353        state: &mut Self::State,
354    ) -> Result<(), ActorProcessingErr> {
355        match message {
356            SupervisionEvent::ActorTerminated(who, _, _)
357            | SupervisionEvent::ActorFailed(who, _) => {
358                myself.stop(None);
359            }
360            _ => {}
361        }
362        Ok(())
363    }
364
365    /// Spawn an actor of this type, which is unsupervised, automatically starting
366    ///
367    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
368    /// * `handler` The implementation of Self
369    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
370    ///   initial state creation
371    ///
372    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
373    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
374    /// the actor failed to start
375    #[cfg(not(feature = "async-trait"))]
376    fn spawn(
377        name: Option<ActorName>,
378        handler: Self,
379        startup_args: Self::Arguments,
380    ) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> + Send {
381        ActorRuntime::<Self>::spawn(name, handler, startup_args)
382    }
383    /// Spawn an actor of this type, which is unsupervised, automatically starting
384    ///
385    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
386    /// * `handler` The implementation of Self
387    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
388    ///   initial state creation
389    ///
390    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
391    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
392    /// the actor failed to start
393    #[cfg(feature = "async-trait")]
394    async fn spawn(
395        name: Option<ActorName>,
396        handler: Self,
397        startup_args: Self::Arguments,
398    ) -> Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr> {
399        ActorRuntime::<Self>::spawn(name, handler, startup_args).await
400    }
401
402    /// Spawn an actor of this type with a supervisor, automatically starting the actor
403    ///
404    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
405    /// * `handler` The implementation of Self
406    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
407    ///   initial state creation
408    /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
409    ///
410    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
411    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
412    /// the actor failed to start
413    #[cfg(not(feature = "async-trait"))]
414    fn spawn_linked(
415        name: Option<ActorName>,
416        handler: Self,
417        startup_args: Self::Arguments,
418        supervisor: ActorCell,
419    ) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> + Send {
420        ActorRuntime::<Self>::spawn_linked(name, handler, startup_args, supervisor)
421    }
422    /// Spawn an actor of this type with a supervisor, automatically starting the actor
423    ///
424    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
425    /// * `handler` The implementation of Self
426    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
427    /// initial state creation
428    /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
429    ///
430    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
431    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
432    /// the actor failed to start
433    #[cfg(feature = "async-trait")]
434    async fn spawn_linked(
435        name: Option<ActorName>,
436        handler: Self,
437        startup_args: Self::Arguments,
438        supervisor: ActorCell,
439    ) -> Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr> {
440        ActorRuntime::<Self>::spawn_linked(name, handler, startup_args, supervisor).await
441    }
442}
443
444/// Helper struct for tracking the results from actor processing loops
445#[doc(hidden)]
446pub(crate) struct ActorLoopResult {
447    pub(crate) should_exit: bool,
448    pub(crate) exit_reason: Option<String>,
449    pub(crate) was_killed: bool,
450}
451
452impl ActorLoopResult {
453    pub(crate) fn ok() -> Self {
454        Self {
455            should_exit: false,
456            exit_reason: None,
457            was_killed: false,
458        }
459    }
460
461    pub(crate) fn stop(reason: Option<String>) -> Self {
462        Self {
463            should_exit: true,
464            exit_reason: reason,
465            was_killed: false,
466        }
467    }
468
469    pub(crate) fn signal(signal_str: Option<String>) -> Self {
470        Self {
471            should_exit: true,
472            exit_reason: signal_str,
473            was_killed: true,
474        }
475    }
476}
477
478/// [ActorRuntime] is a struct which represents the processing actor.
479///
480///  This struct is consumed by the `start` operation, but results in an
481/// [ActorRef] to communicate and operate with along with the [JoinHandle]
482/// representing the actor's async processing loop.
483pub struct ActorRuntime<TActor>
484where
485    TActor: Actor,
486{
487    actor_ref: ActorRef<TActor::Msg>,
488    handler: TActor,
489    id: ActorId,
490    name: Option<String>,
491}
492
493impl<TActor: Actor> Debug for ActorRuntime<TActor> {
494    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495        f.debug_struct("ActorRuntime")
496            .field("name", &self.name)
497            .field("id", &self.id)
498            .finish()
499    }
500}
501
502impl<TActor> ActorRuntime<TActor>
503where
504    TActor: Actor,
505{
506    /// Spawn an actor, which is unsupervised, automatically starting the actor
507    ///
508    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
509    /// * `handler` The [Actor] defining the logic for this actor
510    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
511    ///   initial state creation
512    ///
513    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
514    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
515    /// the actor failed to start
516    pub async fn spawn(
517        name: Option<ActorName>,
518        handler: TActor,
519        startup_args: TActor::Arguments,
520    ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
521        let (actor, ports) = Self::new(name, handler)?;
522        let aref = actor.actor_ref.clone();
523        let result = actor.start(ports, startup_args, None).await;
524        if result.is_err() {
525            aref.set_status(ActorStatus::Stopped);
526        }
527        result
528    }
529
530    /// Spawn an actor with a supervisor, automatically starting the actor
531    ///
532    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
533    /// * `handler` The [Actor] defining the logic for this actor
534    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
535    ///   initial state creation
536    /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
537    ///
538    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
539    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
540    /// the actor failed to start
541    pub async fn spawn_linked(
542        name: Option<ActorName>,
543        handler: TActor,
544        startup_args: TActor::Arguments,
545        supervisor: ActorCell,
546    ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
547        let (actor, ports) = Self::new(name, handler)?;
548        let aref = actor.actor_ref.clone();
549        let result = actor.start(ports, startup_args, Some(supervisor)).await;
550        if result.is_err() {
551            aref.set_status(ActorStatus::Stopped);
552        }
553        result
554    }
555
556    /// Spawn an actor instantly, not waiting on the actor's `pre_start` routine. This is helpful
557    /// for actors where you want access to the send messages into the actor's message queue
558    /// without waiting on an asynchronous context.
559    ///
560    /// **WARNING** Failures in the pre_start routine need to be waited on in the join handle
561    /// since they will NOT fail the spawn operation in this context
562    ///
563    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
564    /// * `handler` The [Actor] defining the logic for this actor
565    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
566    ///   initial state creation
567    ///
568    /// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
569    /// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner
570    /// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if
571    /// the actor name is already allocated
572    #[allow(clippy::type_complexity)]
573    pub fn spawn_instant(
574        name: Option<ActorName>,
575        handler: TActor,
576        startup_args: TActor::Arguments,
577    ) -> Result<
578        (
579            ActorRef<TActor::Msg>,
580            JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
581        ),
582        SpawnErr,
583    > {
584        let (actor, ports) = Self::new(name.clone(), handler)?;
585        let actor_ref = actor.actor_ref.clone();
586        let actor_ref2 = actor_ref.clone();
587        let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
588            let result = actor.start(ports, startup_args, None).await;
589            if result.is_err() {
590                actor_ref2.set_status(ActorStatus::Stopped);
591            }
592            let (_, handle) = result?;
593            Ok(handle)
594        });
595        Ok((actor_ref, join_op))
596    }
597
598    /// Spawn an actor instantly with supervision, not waiting on the actor's `pre_start` routine.
599    /// This is helpful for actors where you want access to the send messages into the actor's
600    /// message queue without waiting on an asynchronous context.
601    ///
602    /// **WARNING** Failures in the pre_start routine need to be waited on in the join handle
603    /// since they will NOT fail the spawn operation in this context. Additionally the supervision
604    /// tree will **NOT** be linked until the `pre_start` completes so there is a chance an actor
605    /// is lost during `pre_start` and not successfully started unless it's specifically handled
606    /// by the caller by awaiting later.
607    ///
608    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
609    /// * `handler` The [Actor] defining the logic for this actor
610    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
611    ///   initial state creation
612    /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
613    ///
614    /// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
615    /// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner
616    /// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if
617    /// the actor name is already allocated
618    #[allow(clippy::type_complexity)]
619    pub fn spawn_linked_instant(
620        name: Option<ActorName>,
621        handler: TActor,
622        startup_args: TActor::Arguments,
623        supervisor: ActorCell,
624    ) -> Result<
625        (
626            ActorRef<TActor::Msg>,
627            JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
628        ),
629        SpawnErr,
630    > {
631        let (actor, ports) = Self::new(name.clone(), handler)?;
632        let actor_ref = actor.actor_ref.clone();
633        let actor_ref2 = actor_ref.clone();
634        let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
635            let result = actor.start(ports, startup_args, Some(supervisor)).await;
636            if result.is_err() {
637                actor_ref2.set_status(ActorStatus::Stopped);
638            }
639            let (_, handle) = result?;
640            Ok(handle)
641        });
642        Ok((actor_ref, join_op))
643    }
644
645    /// Spawn a REMOTE actor with a supervisor, automatically starting the actor. Only for use
646    /// by `ractor_cluster::node::NodeSession`
647    ///
648    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
649    /// * `handler`: The [Actor] defining the logic for this actor
650    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
651    ///   initial state creation
652    /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
653    ///
654    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
655    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
656    /// the actor failed to start
657    #[cfg(feature = "cluster")]
658    pub async fn spawn_linked_remote(
659        name: Option<ActorName>,
660        handler: TActor,
661        id: ActorId,
662        startup_args: TActor::Arguments,
663        supervisor: ActorCell,
664    ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
665        if id.is_local() {
666            Err(SpawnErr::StartupFailed(From::from(
667                "Cannot spawn a remote actor when the identifier is not remote!",
668            )))
669        } else {
670            let (actor_cell, ports) = actor_cell::ActorCell::new_remote::<TActor>(name, id)?;
671            let id = actor_cell.get_id();
672            let name = actor_cell.get_name();
673            let actor_cell2 = actor_cell.clone();
674            let (actor, ports) = (
675                Self {
676                    actor_ref: actor_cell.into(),
677                    handler,
678                    id,
679                    name,
680                },
681                ports,
682            );
683            let result = actor.start(ports, startup_args, Some(supervisor)).await;
684            if result.is_err() {
685                actor_cell2.set_status(ActorStatus::Stopped);
686            }
687            result
688        }
689    }
690
691    /// Create a new actor with some handler implementation and initial state
692    ///
693    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
694    /// * `handler` The [Actor] defining the logic for this actor
695    ///
696    /// Returns A tuple [(Actor, ActorPortSet)] to be passed to the `start` function of [Actor]
697    fn new(name: Option<ActorName>, handler: TActor) -> Result<(Self, ActorPortSet), SpawnErr> {
698        let (actor_cell, ports) = actor_cell::ActorCell::new::<TActor>(name)?;
699        let id = actor_cell.get_id();
700        let name = actor_cell.get_name();
701        Ok((
702            Self {
703                actor_ref: actor_cell.into(),
704                handler,
705                id,
706                name,
707            },
708            ports,
709        ))
710    }
711
712    /// Start the actor immediately, optionally linking to a parent actor (supervision tree)
713    ///
714    /// NOTE: This returned [crate::concurrency::JoinHandle] is guaranteed to not panic (unless the runtime is shutting down perhaps).
715    /// An inner join handle is capturing panic results from any part of the inner tasks, so therefore
716    /// we can safely ignore it, or wait on it to block on the actor's progress
717    ///
718    /// * `ports` - The [ActorPortSet] for this actor
719    /// * `supervisor` - The optional [ActorCell] representing the supervisor of this actor
720    ///
721    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
722    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
723    /// the actor failed to start
724    #[tracing::instrument(name = "Actor", skip(self, ports, startup_args, supervisor), fields(id = self.id.to_string(), name = self.name))]
725    async fn start(
726        self,
727        ports: ActorPortSet,
728        startup_args: TActor::Arguments,
729        supervisor: Option<ActorCell>,
730    ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
731        // cannot start an actor more than once
732        if self.actor_ref.get_status() != ActorStatus::Unstarted {
733            return Err(SpawnErr::ActorAlreadyStarted);
734        }
735
736        let Self {
737            handler,
738            actor_ref,
739            id,
740            name,
741        } = self;
742
743        actor_ref.set_status(ActorStatus::Starting);
744
745        // Perform the pre-start routine, crashing immediately if we fail to start
746        let mut state = Self::do_pre_start(actor_ref.clone(), &handler, startup_args)
747            .await?
748            .map_err(SpawnErr::StartupFailed)?;
749
750        // setup supervision
751        if let Some(sup) = &supervisor {
752            actor_ref.link(sup.clone());
753        }
754
755        // Generate the ActorRef which will be returned
756        let myself_ret = actor_ref.clone();
757
758        // run the processing loop, backgrounding the work
759        let handle = crate::concurrency::spawn_named(actor_ref.get_name().as_deref(), async move {
760            let myself = actor_ref.clone();
761            let evt = match Self::processing_loop(ports, &mut state, &handler, actor_ref, id, name)
762                .await
763            {
764                Ok(exit_reason) => SupervisionEvent::ActorTerminated(
765                    myself.get_cell(),
766                    Some(BoxedState::new(state)),
767                    exit_reason,
768                ),
769                Err(actor_err) => match actor_err {
770                    ActorErr::Cancelled => SupervisionEvent::ActorTerminated(
771                        myself.get_cell(),
772                        None,
773                        Some("killed".to_string()),
774                    ),
775                    ActorErr::Failed(msg) => SupervisionEvent::ActorFailed(myself.get_cell(), msg),
776                },
777            };
778
779            // terminate children
780            myself.terminate();
781
782            // notify supervisors of the actor's death
783            myself.notify_supervisor_and_monitors(evt);
784
785            // unlink superisors
786            if let Some(sup) = supervisor {
787                myself.unlink(sup);
788            }
789
790            // set status to stopped
791            myself.set_status(ActorStatus::Stopped);
792        });
793
794        Ok((myself_ret, handle))
795    }
796
797    #[tracing::instrument(name = "Actor", skip(ports, state, handler, myself, _id, _name), fields(id = _id.to_string(), name = _name))]
798    async fn processing_loop(
799        mut ports: ActorPortSet,
800        state: &mut TActor::State,
801        handler: &TActor,
802        myself: ActorRef<TActor::Msg>,
803        _id: ActorId,
804        _name: Option<String>,
805    ) -> Result<Option<String>, ActorErr> {
806        // perform the post-start, with supervision enabled
807        Self::do_post_start(myself.clone(), handler, state)
808            .await?
809            .map_err(ActorErr::Failed)?;
810
811        myself.set_status(ActorStatus::Running);
812        myself.notify_supervisor_and_monitors(SupervisionEvent::ActorStarted(myself.get_cell()));
813
814        let myself_clone = myself.clone();
815
816        let future = async move {
817            // the message processing loop. If we get an exit flag, try and capture the exit reason if there
818            // is one
819            loop {
820                let ActorLoopResult {
821                    should_exit,
822                    exit_reason,
823                    was_killed,
824                } = Self::process_message(myself.clone(), state, handler, &mut ports)
825                    .await
826                    .map_err(ActorErr::Failed)?;
827                // processing loop exit
828                if should_exit {
829                    return Ok((state, exit_reason, was_killed));
830                }
831            }
832        };
833
834        // capture any panics in this future and convert to an ActorErr
835        let loop_done = futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
836            .map_err(|err| ActorErr::Failed(get_panic_string(err)))
837            .await;
838
839        // set status to stopping
840        myself_clone.set_status(ActorStatus::Stopping);
841
842        let (exit_state, exit_reason, was_killed) = loop_done??;
843
844        // if we didn't exit in error mode, call `post_stop`
845        if !was_killed {
846            Self::do_post_stop(myself_clone, handler, exit_state)
847                .await?
848                .map_err(ActorErr::Failed)?;
849        }
850
851        Ok(exit_reason)
852    }
853
854    /// Process a message, returning the "new" state (if changed)
855    /// along with optionally whether we were signaled mid-processing or not
856    ///
857    /// * `myself` - The current [ActorRef]
858    /// * `state` - The current [Actor::State] object
859    /// * `handler` - Pointer to the [Actor] definition
860    /// * `ports` - The mutable [ActorPortSet] which are the message ports for this actor
861    ///
862    /// Returns a tuple of the next [Actor::State] and a flag to denote if the processing
863    /// loop is done
864    async fn process_message(
865        myself: ActorRef<TActor::Msg>,
866        state: &mut TActor::State,
867        handler: &TActor,
868        ports: &mut ActorPortSet,
869    ) -> Result<ActorLoopResult, ActorProcessingErr> {
870        match ports.listen_in_priority().await {
871            Ok(actor_port_message) => match actor_port_message {
872                actor_cell::ActorPortMessage::Signal(signal) => {
873                    Ok(ActorLoopResult::signal(Self::handle_signal(myself, signal)))
874                }
875                actor_cell::ActorPortMessage::Stop(stop_message) => {
876                    let exit_reason = match stop_message {
877                        StopMessage::Stop => {
878                            tracing::trace!("Actor {:?} stopped with no reason", myself.get_id());
879                            None
880                        }
881                        StopMessage::Reason(reason) => {
882                            tracing::trace!(
883                                "Actor {:?} stopped with reason '{reason}'",
884                                myself.get_id(),
885                            );
886                            Some(reason)
887                        }
888                    };
889                    Ok(ActorLoopResult::stop(exit_reason))
890                }
891                actor_cell::ActorPortMessage::Supervision(supervision) => {
892                    let future = Self::handle_supervision_message(
893                        myself.clone(),
894                        state,
895                        handler,
896                        supervision,
897                    );
898                    match ports.run_with_signal(future).await {
899                        Ok(Ok(())) => Ok(ActorLoopResult::ok()),
900                        Ok(Err(internal_err)) => Err(internal_err),
901                        Err(signal) => {
902                            Ok(ActorLoopResult::signal(Self::handle_signal(myself, signal)))
903                        }
904                    }
905                }
906                actor_cell::ActorPortMessage::Message(MuxedMessage::Message(msg)) => {
907                    let future = Self::handle_message(myself.clone(), state, handler, msg);
908                    match ports.run_with_signal(future).await {
909                        Ok(Ok(())) => Ok(ActorLoopResult::ok()),
910                        Ok(Err(internal_err)) => Err(internal_err),
911                        Err(signal) => {
912                            Ok(ActorLoopResult::signal(Self::handle_signal(myself, signal)))
913                        }
914                    }
915                }
916                actor_cell::ActorPortMessage::Message(MuxedMessage::Drain) => {
917                    // Drain is a stub marker that the actor should now stop, we've processed
918                    // all the messages and we want the actor to die now
919                    Ok(ActorLoopResult::stop(Some("Drained".to_string())))
920                }
921            },
922            Err(MessagingErr::ChannelClosed) => {
923                // one of the channels is closed, this means
924                // the receiver was dropped and in this case
925                // we should always die. Therefore we flag
926                // to terminate
927                Ok(ActorLoopResult::signal(Self::handle_signal(
928                    myself,
929                    Signal::Kill,
930                )))
931            }
932            Err(MessagingErr::InvalidActorType) => {
933                // not possible. Treat like a channel closed
934                Ok(ActorLoopResult::signal(Self::handle_signal(
935                    myself,
936                    Signal::Kill,
937                )))
938            }
939            Err(MessagingErr::SendErr(_)) => {
940                // not possible. Treat like a channel closed
941                Ok(ActorLoopResult::signal(Self::handle_signal(
942                    myself,
943                    Signal::Kill,
944                )))
945            }
946        }
947    }
948
949    async fn handle_message(
950        myself: ActorRef<TActor::Msg>,
951        state: &mut TActor::State,
952        handler: &TActor,
953        mut msg: crate::message::BoxedMessage,
954    ) -> Result<(), ActorProcessingErr> {
955        // panic in order to kill the actor
956        #[cfg(feature = "cluster")]
957        {
958            // A `RemoteActor` will handle serialized messages, without decoding them, forwarding them
959            // to the remote system for decoding + handling by the real implementation. Therefore `RemoteActor`s
960            // can be thought of as a "shim" to a real actor on a remote system
961            if !myself.get_id().is_local() {
962                match msg.serialized_msg {
963                    Some(serialized_msg) => {
964                        return handler
965                            .handle_serialized(myself, serialized_msg, state)
966                            .await;
967                    }
968                    None => {
969                        return Err(From::from(
970                            "`RemoteActor` failed to read `SerializedMessage` from `BoxedMessage`",
971                        ));
972                    }
973                }
974            }
975        }
976
977        // The current [tracing::Span] is retrieved, boxed, and included in every
978        // `BoxedMessage` during the conversion of this `TActor::Msg`. It is used
979        // to automatically continue tracing span nesting when sending messages to Actors.
980        let current_span_when_message_was_sent = msg.span.take();
981
982        // An error here will bubble up to terminate the actor
983        let typed_msg = TActor::Msg::from_boxed(msg)?;
984
985        if let Some(span) = current_span_when_message_was_sent {
986            handler
987                .handle(myself, typed_msg, state)
988                .instrument(span)
989                .await
990        } else {
991            handler.handle(myself, typed_msg, state).await
992        }
993    }
994
995    fn handle_signal(myself: ActorRef<TActor::Msg>, signal: Signal) -> Option<String> {
996        match &signal {
997            Signal::Kill => {
998                myself.terminate();
999            }
1000        }
1001        Some(signal.to_string())
1002    }
1003
1004    async fn handle_supervision_message(
1005        myself: ActorRef<TActor::Msg>,
1006        state: &mut TActor::State,
1007        handler: &TActor,
1008        message: SupervisionEvent,
1009    ) -> Result<(), ActorProcessingErr> {
1010        handler.handle_supervisor_evt(myself, message, state).await
1011    }
1012
1013    async fn do_pre_start(
1014        myself: ActorRef<TActor::Msg>,
1015        handler: &TActor,
1016        arguments: TActor::Arguments,
1017    ) -> Result<Result<TActor::State, ActorProcessingErr>, SpawnErr> {
1018        let future = handler.pre_start(myself, arguments);
1019        futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
1020            .await
1021            .map_err(|err| SpawnErr::StartupFailed(get_panic_string(err)))
1022    }
1023
1024    async fn do_post_start(
1025        myself: ActorRef<TActor::Msg>,
1026        handler: &TActor,
1027        state: &mut TActor::State,
1028    ) -> Result<Result<(), ActorProcessingErr>, ActorErr> {
1029        let future = handler.post_start(myself, state);
1030        futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
1031            .await
1032            .map_err(|err| ActorErr::Failed(get_panic_string(err)))
1033    }
1034
1035    async fn do_post_stop(
1036        myself: ActorRef<TActor::Msg>,
1037        handler: &TActor,
1038        state: &mut TActor::State,
1039    ) -> Result<Result<(), ActorProcessingErr>, ActorErr> {
1040        let future = handler.post_stop(myself, state);
1041        futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
1042            .await
1043            .map_err(|err| ActorErr::Failed(get_panic_string(err)))
1044    }
1045}