ractor/actor.rs
1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! This module contains the basic building blocks of an actor.
7//!
8//! They are:
9//! [Actor]: The behavior definition for an actor's internal processing logic + state management
10//! [ActorRuntime]: Management structure processing the message handler, signals, and supervision events in a loop
11//!
12//! ## Actor Supervision
13//!
14//! Supervision is a special notion of "ownership" over actors by a parent (supervisor).
15//! Supervisors are responsible for the lifecycle of a child actor such that they get notified
16//! when a child actor starts, stops, or panics (when possible). The supervisor can then decide
17//! how to handle the event. Should it restart the actor, leave it dead, potentially die itself
18//! notifying the supervisor's supervisor? That's up to the implementation of the [super::Actor]
19//!
20//! This is currently an initial implementation of [Erlang supervisors](https://www.erlang.org/doc/man/supervisor.html)
21//!
22//! An example supervision tree may look like:
23//!
24//! ```text
25//! Root/
26//! ├─ Actor A/
27//! │ ├─ Actor A_1/
28//! │ ├─ Actor A_2/
29//! ├─ Actor B/
30//! ├─ Actor C/
31//! │ ├─ Actor C_1/
32//! │ │ ├─ Actor C_1_1/
33//! │ │ │ ├─ Actor C_1_1_1/
34//! ```
35//!
36//! To link actors together in the supervision tree, there are 2 choices.
37//!
38//! 1. [Actor::spawn_linked] which requires supplying the supervisor to the actor upon spawning a child.
39//! This call will link the supervision tree as early as possible in the lifecycle of the actors,
40//! such that a failure or panic in `post_start` will be captured and notify the supervisor
41//! 2. `ActorCell::link` will link actors together after-the-fact, once already spawned. This is helpful
42//! for actors which are originally created independently but have some latent relationship to each
43//! other. However due to startup routines and asynchronous processing, it's unlikely that failures
44//! in `post_start` and any other asynchronous handling will be captured in the supervision tree.
45//!
46//! ## Handling panics
47//!
48//! Another point to consider in actor frameworks are `panic!`s. The actor runtime captures and transforms
49//! a panic in an actor into the string message equivalent upon exit. However the traditional panic will still
50//! log to `stderr` for tracing. You can additionally setup a [panic hook](https://doc.rust-lang.org/std/panic/fn.set_hook.html)
51//! to do things like capturing backtraces on the unwinding panic.
52
53use std::fmt::Debug;
54#[cfg(not(feature = "async-trait"))]
55use std::future::Future;
56use std::panic::AssertUnwindSafe;
57
58use actor_properties::MuxedMessage;
59use futures::TryFutureExt;
60use tracing::Instrument;
61
62use crate::concurrency::JoinHandle;
63use crate::ActorId;
64pub mod messages;
65use messages::*;
66
67pub mod actor_cell;
68pub mod actor_id;
69pub(crate) mod actor_properties;
70pub mod actor_ref;
71pub mod derived_actor;
72mod supervision;
73
74#[cfg(test)]
75mod supervision_tests;
76#[cfg(test)]
77mod tests;
78
79use actor_cell::ActorCell;
80use actor_cell::ActorPortSet;
81use actor_cell::ActorStatus;
82use actor_ref::ActorRef;
83
84use crate::errors::ActorErr;
85use crate::errors::ActorProcessingErr;
86use crate::errors::MessagingErr;
87use crate::errors::SpawnErr;
88use crate::ActorName;
89use crate::Message;
90use crate::State;
91
92pub(crate) fn get_panic_string(e: Box<dyn std::any::Any + Send>) -> ActorProcessingErr {
93 match e.downcast::<String>() {
94 Ok(v) => From::from(*v),
95 Err(e) => match e.downcast::<&str>() {
96 Ok(v) => From::from(*v),
97 _ => From::from("Unknown panic occurred which couldn't be coerced to a string"),
98 },
99 }
100}
101
102/// [Actor] defines the behavior of an Actor. It specifies the
103/// Message type, State type, and all processing logic for the actor
104///
105/// Additionally it aliases the calls for `spawn` and `spawn_linked` from
106/// [ActorRuntime] for convenient startup + lifecycle management
107///
108/// NOTE: All of the implemented trait functions
109///
110/// * `pre_start`
111/// * `post_start`
112/// * `post_stop`
113/// * `handle`
114/// * `handle_serialized` (Available with `cluster` feature only)
115/// * `handle_supervisor_evt`
116///
117/// return a [Result<_, ActorProcessingError>] where the error type is an
118/// alias of [Box<dyn std::error::Error + Send + Sync + 'static>]. This is treated
119/// as an "unhandled" error and will terminate the actor + execute necessary supervision
120/// patterns. Panics are also captured from the inner functions and wrapped into an Error
121/// type, however should an [Err(_)] result from any of these functions the **actor will
122/// terminate** and cleanup.
123#[cfg_attr(feature = "async-trait", crate::async_trait)]
124pub trait Actor: Sized + Sync + Send + 'static {
125 /// The message type for this actor
126 type Msg: Message;
127
128 /// The type of state this actor manages internally
129 type State: State;
130
131 /// Initialization arguments
132 type Arguments: State;
133
134 /// Invoked when an actor is being started by the system.
135 ///
136 /// Any initialization inherent to the actor's role should be
137 /// performed here hence why it returns the initial state.
138 ///
139 /// Panics in `pre_start` do not invoke the
140 /// supervision strategy and the actor won't be started. [Actor]::`spawn`
141 /// will return an error to the caller
142 ///
143 /// * `myself` - A handle to the [ActorCell] representing this actor
144 /// * `args` - Arguments that are passed in the spawning of the actor which might
145 /// be necessary to construct the initial state
146 ///
147 /// Returns an initial [Actor::State] to bootstrap the actor
148 #[cfg(not(feature = "async-trait"))]
149 fn pre_start(
150 &self,
151 myself: ActorRef<Self::Msg>,
152 args: Self::Arguments,
153 ) -> impl Future<Output = Result<Self::State, ActorProcessingErr>> + Send;
154 /// Invoked when an actor is being started by the system.
155 ///
156 /// Any initialization inherent to the actor's role should be
157 /// performed here hence why it returns the initial state.
158 ///
159 /// Panics in `pre_start` do not invoke the
160 /// supervision strategy and the actor won't be started. [Actor]::`spawn`
161 /// will return an error to the caller
162 ///
163 /// * `myself` - A handle to the [ActorCell] representing this actor
164 /// * `args` - Arguments that are passed in the spawning of the actor which might
165 /// be necessary to construct the initial state
166 ///
167 /// Returns an initial [Actor::State] to bootstrap the actor
168 #[cfg(feature = "async-trait")]
169 async fn pre_start(
170 &self,
171 myself: ActorRef<Self::Msg>,
172 args: Self::Arguments,
173 ) -> Result<Self::State, ActorProcessingErr>;
174
175 /// Invoked after an actor has started.
176 ///
177 /// Any post initialization can be performed here, such as writing
178 /// to a log file, emitting metrics.
179 ///
180 /// Panics in `post_start` follow the supervision strategy.
181 ///
182 /// * `myself` - A handle to the [ActorCell] representing this actor
183 /// * `state` - A mutable reference to the internal actor's state
184 #[allow(unused_variables)]
185 #[cfg(not(feature = "async-trait"))]
186 fn post_start(
187 &self,
188 myself: ActorRef<Self::Msg>,
189 state: &mut Self::State,
190 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
191 async { Ok(()) }
192 }
193
194 /// Invoked after an actor has started.
195 ///
196 /// Any post initialization can be performed here, such as writing
197 /// to a log file, emitting metrics.
198 ///
199 /// Panics in `post_start` follow the supervision strategy.
200 ///
201 /// * `myself` - A handle to the [ActorCell] representing this actor
202 /// * `state` - A mutable reference to the internal actor's state
203 #[allow(unused_variables)]
204 #[cfg(feature = "async-trait")]
205 async fn post_start(
206 &self,
207 myself: ActorRef<Self::Msg>,
208 state: &mut Self::State,
209 ) -> Result<(), ActorProcessingErr> {
210 Ok(())
211 }
212
213 /// Invoked after an actor has been stopped to perform final cleanup. In the
214 /// event the actor is terminated with [Signal::Kill] or has self-panicked,
215 /// `post_stop` won't be called.
216 ///
217 /// Panics in `post_stop` follow the supervision strategy.
218 ///
219 /// * `myself` - A handle to the [ActorCell] representing this actor
220 /// * `state` - A mutable reference to the internal actor's last known state
221 #[allow(unused_variables)]
222 #[cfg(not(feature = "async-trait"))]
223 fn post_stop(
224 &self,
225 myself: ActorRef<Self::Msg>,
226 state: &mut Self::State,
227 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
228 async { Ok(()) }
229 }
230 /// Invoked after an actor has been stopped to perform final cleanup. In the
231 /// event the actor is terminated with [Signal::Kill] or has self-panicked,
232 /// `post_stop` won't be called.
233 ///
234 /// Panics in `post_stop` follow the supervision strategy.
235 ///
236 /// * `myself` - A handle to the [ActorCell] representing this actor
237 /// * `state` - A mutable reference to the internal actor's last known state
238 #[allow(unused_variables)]
239 #[cfg(feature = "async-trait")]
240 async fn post_stop(
241 &self,
242 myself: ActorRef<Self::Msg>,
243 state: &mut Self::State,
244 ) -> Result<(), ActorProcessingErr> {
245 Ok(())
246 }
247
248 /// Handle the incoming message from the event processing loop. Unhandled panickes will be
249 /// captured and sent to the supervisor(s)
250 ///
251 /// * `myself` - A handle to the [ActorCell] representing this actor
252 /// * `message` - The message to process
253 /// * `state` - A mutable reference to the internal actor's state
254 #[allow(unused_variables)]
255 #[cfg(not(feature = "async-trait"))]
256 fn handle(
257 &self,
258 myself: ActorRef<Self::Msg>,
259 message: Self::Msg,
260 state: &mut Self::State,
261 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
262 async { Ok(()) }
263 }
264 /// Handle the incoming message from the event processing loop. Unhandled panickes will be
265 /// captured and sent to the supervisor(s)
266 ///
267 /// * `myself` - A handle to the [ActorCell] representing this actor
268 /// * `message` - The message to process
269 /// * `state` - A mutable reference to the internal actor's state
270 #[allow(unused_variables)]
271 #[cfg(feature = "async-trait")]
272 async fn handle(
273 &self,
274 myself: ActorRef<Self::Msg>,
275 message: Self::Msg,
276 state: &mut Self::State,
277 ) -> Result<(), ActorProcessingErr> {
278 Ok(())
279 }
280
281 /// Handle the remote incoming message from the event processing loop. Unhandled panickes will be
282 /// captured and sent to the supervisor(s)
283 ///
284 /// * `myself` - A handle to the [ActorCell] representing this actor
285 /// * `message` - The serialized message to handle
286 /// * `state` - A mutable reference to the internal actor's state
287 #[allow(unused_variables)]
288 #[cfg(all(feature = "cluster", not(feature = "async-trait")))]
289 fn handle_serialized(
290 &self,
291 myself: ActorRef<Self::Msg>,
292 message: crate::message::SerializedMessage,
293 state: &mut Self::State,
294 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
295 async { Ok(()) }
296 }
297 /// Handle the remote incoming message from the event processing loop. Unhandled panickes will be
298 /// captured and sent to the supervisor(s)
299 ///
300 /// * `myself` - A handle to the [ActorCell] representing this actor
301 /// * `message` - The serialized message to handle
302 /// * `state` - A mutable reference to the internal actor's state
303 #[allow(unused_variables)]
304 #[cfg(all(feature = "cluster", feature = "async-trait"))]
305 async fn handle_serialized(
306 &self,
307 myself: ActorRef<Self::Msg>,
308 message: crate::message::SerializedMessage,
309 state: &mut Self::State,
310 ) -> Result<(), ActorProcessingErr> {
311 Ok(())
312 }
313
314 /// Handle the incoming supervision event. Unhandled panics will be captured and
315 /// sent the the supervisor(s). The default supervision behavior is to exit the
316 /// supervisor on any child exit. To override this behavior, implement this function.
317 ///
318 /// * `myself` - A handle to the [ActorCell] representing this actor
319 /// * `message` - The message to process
320 /// * `state` - A mutable reference to the internal actor's state
321 #[allow(unused_variables)]
322 #[cfg(not(feature = "async-trait"))]
323 fn handle_supervisor_evt(
324 &self,
325 myself: ActorRef<Self::Msg>,
326 message: SupervisionEvent,
327 state: &mut Self::State,
328 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
329 async move {
330 match message {
331 SupervisionEvent::ActorTerminated(who, _, _)
332 | SupervisionEvent::ActorFailed(who, _) => {
333 myself.stop(None);
334 }
335 _ => {}
336 }
337 Ok(())
338 }
339 }
340 /// Handle the incoming supervision event. Unhandled panics will be captured and
341 /// sent the the supervisor(s). The default supervision behavior is to exit the
342 /// supervisor on any child exit. To override this behavior, implement this function.
343 ///
344 /// * `myself` - A handle to the [ActorCell] representing this actor
345 /// * `message` - The message to process
346 /// * `state` - A mutable reference to the internal actor's state
347 #[allow(unused_variables)]
348 #[cfg(feature = "async-trait")]
349 async fn handle_supervisor_evt(
350 &self,
351 myself: ActorRef<Self::Msg>,
352 message: SupervisionEvent,
353 state: &mut Self::State,
354 ) -> Result<(), ActorProcessingErr> {
355 match message {
356 SupervisionEvent::ActorTerminated(who, _, _)
357 | SupervisionEvent::ActorFailed(who, _) => {
358 myself.stop(None);
359 }
360 _ => {}
361 }
362 Ok(())
363 }
364
365 /// Spawn an actor of this type, which is unsupervised, automatically starting
366 ///
367 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
368 /// * `handler` The implementation of Self
369 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
370 /// initial state creation
371 ///
372 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
373 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
374 /// the actor failed to start
375 #[cfg(not(feature = "async-trait"))]
376 fn spawn(
377 name: Option<ActorName>,
378 handler: Self,
379 startup_args: Self::Arguments,
380 ) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> + Send {
381 ActorRuntime::<Self>::spawn(name, handler, startup_args)
382 }
383 /// Spawn an actor of this type, which is unsupervised, automatically starting
384 ///
385 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
386 /// * `handler` The implementation of Self
387 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
388 /// initial state creation
389 ///
390 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
391 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
392 /// the actor failed to start
393 #[cfg(feature = "async-trait")]
394 async fn spawn(
395 name: Option<ActorName>,
396 handler: Self,
397 startup_args: Self::Arguments,
398 ) -> Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr> {
399 ActorRuntime::<Self>::spawn(name, handler, startup_args).await
400 }
401
402 /// Spawn an actor of this type with a supervisor, automatically starting the actor
403 ///
404 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
405 /// * `handler` The implementation of Self
406 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
407 /// initial state creation
408 /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
409 ///
410 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
411 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
412 /// the actor failed to start
413 #[cfg(not(feature = "async-trait"))]
414 fn spawn_linked(
415 name: Option<ActorName>,
416 handler: Self,
417 startup_args: Self::Arguments,
418 supervisor: ActorCell,
419 ) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> + Send {
420 ActorRuntime::<Self>::spawn_linked(name, handler, startup_args, supervisor)
421 }
422 /// Spawn an actor of this type with a supervisor, automatically starting the actor
423 ///
424 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
425 /// * `handler` The implementation of Self
426 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
427 /// initial state creation
428 /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
429 ///
430 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
431 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
432 /// the actor failed to start
433 #[cfg(feature = "async-trait")]
434 async fn spawn_linked(
435 name: Option<ActorName>,
436 handler: Self,
437 startup_args: Self::Arguments,
438 supervisor: ActorCell,
439 ) -> Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr> {
440 ActorRuntime::<Self>::spawn_linked(name, handler, startup_args, supervisor).await
441 }
442}
443
444/// Helper struct for tracking the results from actor processing loops
445#[doc(hidden)]
446pub(crate) struct ActorLoopResult {
447 pub(crate) should_exit: bool,
448 pub(crate) exit_reason: Option<String>,
449 pub(crate) was_killed: bool,
450}
451
452impl ActorLoopResult {
453 pub(crate) fn ok() -> Self {
454 Self {
455 should_exit: false,
456 exit_reason: None,
457 was_killed: false,
458 }
459 }
460
461 pub(crate) fn stop(reason: Option<String>) -> Self {
462 Self {
463 should_exit: true,
464 exit_reason: reason,
465 was_killed: false,
466 }
467 }
468
469 pub(crate) fn signal(signal_str: Option<String>) -> Self {
470 Self {
471 should_exit: true,
472 exit_reason: signal_str,
473 was_killed: true,
474 }
475 }
476}
477
478/// [ActorRuntime] is a struct which represents the processing actor.
479///
480/// This struct is consumed by the `start` operation, but results in an
481/// [ActorRef] to communicate and operate with along with the [JoinHandle]
482/// representing the actor's async processing loop.
483pub struct ActorRuntime<TActor>
484where
485 TActor: Actor,
486{
487 actor_ref: ActorRef<TActor::Msg>,
488 handler: TActor,
489 id: ActorId,
490 name: Option<String>,
491}
492
493impl<TActor: Actor> Debug for ActorRuntime<TActor> {
494 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495 f.debug_struct("ActorRuntime")
496 .field("name", &self.name)
497 .field("id", &self.id)
498 .finish()
499 }
500}
501
502impl<TActor> ActorRuntime<TActor>
503where
504 TActor: Actor,
505{
506 /// Spawn an actor, which is unsupervised, automatically starting the actor
507 ///
508 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
509 /// * `handler` The [Actor] defining the logic for this actor
510 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
511 /// initial state creation
512 ///
513 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
514 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
515 /// the actor failed to start
516 pub async fn spawn(
517 name: Option<ActorName>,
518 handler: TActor,
519 startup_args: TActor::Arguments,
520 ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
521 let (actor, ports) = Self::new(name, handler)?;
522 let aref = actor.actor_ref.clone();
523 let result = actor.start(ports, startup_args, None).await;
524 if result.is_err() {
525 aref.set_status(ActorStatus::Stopped);
526 }
527 result
528 }
529
530 /// Spawn an actor with a supervisor, automatically starting the actor
531 ///
532 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
533 /// * `handler` The [Actor] defining the logic for this actor
534 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
535 /// initial state creation
536 /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
537 ///
538 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
539 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
540 /// the actor failed to start
541 pub async fn spawn_linked(
542 name: Option<ActorName>,
543 handler: TActor,
544 startup_args: TActor::Arguments,
545 supervisor: ActorCell,
546 ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
547 let (actor, ports) = Self::new(name, handler)?;
548 let aref = actor.actor_ref.clone();
549 let result = actor.start(ports, startup_args, Some(supervisor)).await;
550 if result.is_err() {
551 aref.set_status(ActorStatus::Stopped);
552 }
553 result
554 }
555
556 /// Spawn an actor instantly, not waiting on the actor's `pre_start` routine. This is helpful
557 /// for actors where you want access to the send messages into the actor's message queue
558 /// without waiting on an asynchronous context.
559 ///
560 /// **WARNING** Failures in the pre_start routine need to be waited on in the join handle
561 /// since they will NOT fail the spawn operation in this context
562 ///
563 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
564 /// * `handler` The [Actor] defining the logic for this actor
565 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
566 /// initial state creation
567 ///
568 /// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
569 /// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner
570 /// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if
571 /// the actor name is already allocated
572 #[allow(clippy::type_complexity)]
573 pub fn spawn_instant(
574 name: Option<ActorName>,
575 handler: TActor,
576 startup_args: TActor::Arguments,
577 ) -> Result<
578 (
579 ActorRef<TActor::Msg>,
580 JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
581 ),
582 SpawnErr,
583 > {
584 let (actor, ports) = Self::new(name.clone(), handler)?;
585 let actor_ref = actor.actor_ref.clone();
586 let actor_ref2 = actor_ref.clone();
587 let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
588 let result = actor.start(ports, startup_args, None).await;
589 if result.is_err() {
590 actor_ref2.set_status(ActorStatus::Stopped);
591 }
592 let (_, handle) = result?;
593 Ok(handle)
594 });
595 Ok((actor_ref, join_op))
596 }
597
598 /// Spawn an actor instantly with supervision, not waiting on the actor's `pre_start` routine.
599 /// This is helpful for actors where you want access to the send messages into the actor's
600 /// message queue without waiting on an asynchronous context.
601 ///
602 /// **WARNING** Failures in the pre_start routine need to be waited on in the join handle
603 /// since they will NOT fail the spawn operation in this context. Additionally the supervision
604 /// tree will **NOT** be linked until the `pre_start` completes so there is a chance an actor
605 /// is lost during `pre_start` and not successfully started unless it's specifically handled
606 /// by the caller by awaiting later.
607 ///
608 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
609 /// * `handler` The [Actor] defining the logic for this actor
610 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
611 /// initial state creation
612 /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
613 ///
614 /// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
615 /// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner
616 /// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if
617 /// the actor name is already allocated
618 #[allow(clippy::type_complexity)]
619 pub fn spawn_linked_instant(
620 name: Option<ActorName>,
621 handler: TActor,
622 startup_args: TActor::Arguments,
623 supervisor: ActorCell,
624 ) -> Result<
625 (
626 ActorRef<TActor::Msg>,
627 JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
628 ),
629 SpawnErr,
630 > {
631 let (actor, ports) = Self::new(name.clone(), handler)?;
632 let actor_ref = actor.actor_ref.clone();
633 let actor_ref2 = actor_ref.clone();
634 let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
635 let result = actor.start(ports, startup_args, Some(supervisor)).await;
636 if result.is_err() {
637 actor_ref2.set_status(ActorStatus::Stopped);
638 }
639 let (_, handle) = result?;
640 Ok(handle)
641 });
642 Ok((actor_ref, join_op))
643 }
644
645 /// Spawn a REMOTE actor with a supervisor, automatically starting the actor. Only for use
646 /// by `ractor_cluster::node::NodeSession`
647 ///
648 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
649 /// * `handler`: The [Actor] defining the logic for this actor
650 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
651 /// initial state creation
652 /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
653 ///
654 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
655 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
656 /// the actor failed to start
657 #[cfg(feature = "cluster")]
658 pub async fn spawn_linked_remote(
659 name: Option<ActorName>,
660 handler: TActor,
661 id: ActorId,
662 startup_args: TActor::Arguments,
663 supervisor: ActorCell,
664 ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
665 if id.is_local() {
666 Err(SpawnErr::StartupFailed(From::from(
667 "Cannot spawn a remote actor when the identifier is not remote!",
668 )))
669 } else {
670 let (actor_cell, ports) = actor_cell::ActorCell::new_remote::<TActor>(name, id)?;
671 let id = actor_cell.get_id();
672 let name = actor_cell.get_name();
673 let actor_cell2 = actor_cell.clone();
674 let (actor, ports) = (
675 Self {
676 actor_ref: actor_cell.into(),
677 handler,
678 id,
679 name,
680 },
681 ports,
682 );
683 let result = actor.start(ports, startup_args, Some(supervisor)).await;
684 if result.is_err() {
685 actor_cell2.set_status(ActorStatus::Stopped);
686 }
687 result
688 }
689 }
690
691 /// Create a new actor with some handler implementation and initial state
692 ///
693 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
694 /// * `handler` The [Actor] defining the logic for this actor
695 ///
696 /// Returns A tuple [(Actor, ActorPortSet)] to be passed to the `start` function of [Actor]
697 fn new(name: Option<ActorName>, handler: TActor) -> Result<(Self, ActorPortSet), SpawnErr> {
698 let (actor_cell, ports) = actor_cell::ActorCell::new::<TActor>(name)?;
699 let id = actor_cell.get_id();
700 let name = actor_cell.get_name();
701 Ok((
702 Self {
703 actor_ref: actor_cell.into(),
704 handler,
705 id,
706 name,
707 },
708 ports,
709 ))
710 }
711
712 /// Start the actor immediately, optionally linking to a parent actor (supervision tree)
713 ///
714 /// NOTE: This returned [crate::concurrency::JoinHandle] is guaranteed to not panic (unless the runtime is shutting down perhaps).
715 /// An inner join handle is capturing panic results from any part of the inner tasks, so therefore
716 /// we can safely ignore it, or wait on it to block on the actor's progress
717 ///
718 /// * `ports` - The [ActorPortSet] for this actor
719 /// * `supervisor` - The optional [ActorCell] representing the supervisor of this actor
720 ///
721 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
722 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
723 /// the actor failed to start
724 #[tracing::instrument(name = "Actor", skip(self, ports, startup_args, supervisor), fields(id = self.id.to_string(), name = self.name))]
725 async fn start(
726 self,
727 ports: ActorPortSet,
728 startup_args: TActor::Arguments,
729 supervisor: Option<ActorCell>,
730 ) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
731 // cannot start an actor more than once
732 if self.actor_ref.get_status() != ActorStatus::Unstarted {
733 return Err(SpawnErr::ActorAlreadyStarted);
734 }
735
736 let Self {
737 handler,
738 actor_ref,
739 id,
740 name,
741 } = self;
742
743 actor_ref.set_status(ActorStatus::Starting);
744
745 // Perform the pre-start routine, crashing immediately if we fail to start
746 let mut state = Self::do_pre_start(actor_ref.clone(), &handler, startup_args)
747 .await?
748 .map_err(SpawnErr::StartupFailed)?;
749
750 // setup supervision
751 if let Some(sup) = &supervisor {
752 actor_ref.link(sup.clone());
753 }
754
755 // Generate the ActorRef which will be returned
756 let myself_ret = actor_ref.clone();
757
758 // run the processing loop, backgrounding the work
759 let handle = crate::concurrency::spawn_named(actor_ref.get_name().as_deref(), async move {
760 let myself = actor_ref.clone();
761 let evt = match Self::processing_loop(ports, &mut state, &handler, actor_ref, id, name)
762 .await
763 {
764 Ok(exit_reason) => SupervisionEvent::ActorTerminated(
765 myself.get_cell(),
766 Some(BoxedState::new(state)),
767 exit_reason,
768 ),
769 Err(actor_err) => match actor_err {
770 ActorErr::Cancelled => SupervisionEvent::ActorTerminated(
771 myself.get_cell(),
772 None,
773 Some("killed".to_string()),
774 ),
775 ActorErr::Failed(msg) => SupervisionEvent::ActorFailed(myself.get_cell(), msg),
776 },
777 };
778
779 // terminate children
780 myself.terminate();
781
782 // notify supervisors of the actor's death
783 myself.notify_supervisor_and_monitors(evt);
784
785 // unlink superisors
786 if let Some(sup) = supervisor {
787 myself.unlink(sup);
788 }
789
790 // set status to stopped
791 myself.set_status(ActorStatus::Stopped);
792 });
793
794 Ok((myself_ret, handle))
795 }
796
797 #[tracing::instrument(name = "Actor", skip(ports, state, handler, myself, _id, _name), fields(id = _id.to_string(), name = _name))]
798 async fn processing_loop(
799 mut ports: ActorPortSet,
800 state: &mut TActor::State,
801 handler: &TActor,
802 myself: ActorRef<TActor::Msg>,
803 _id: ActorId,
804 _name: Option<String>,
805 ) -> Result<Option<String>, ActorErr> {
806 // perform the post-start, with supervision enabled
807 Self::do_post_start(myself.clone(), handler, state)
808 .await?
809 .map_err(ActorErr::Failed)?;
810
811 myself.set_status(ActorStatus::Running);
812 myself.notify_supervisor_and_monitors(SupervisionEvent::ActorStarted(myself.get_cell()));
813
814 let myself_clone = myself.clone();
815
816 let future = async move {
817 // the message processing loop. If we get an exit flag, try and capture the exit reason if there
818 // is one
819 loop {
820 let ActorLoopResult {
821 should_exit,
822 exit_reason,
823 was_killed,
824 } = Self::process_message(myself.clone(), state, handler, &mut ports)
825 .await
826 .map_err(ActorErr::Failed)?;
827 // processing loop exit
828 if should_exit {
829 return Ok((state, exit_reason, was_killed));
830 }
831 }
832 };
833
834 // capture any panics in this future and convert to an ActorErr
835 let loop_done = futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
836 .map_err(|err| ActorErr::Failed(get_panic_string(err)))
837 .await;
838
839 // set status to stopping
840 myself_clone.set_status(ActorStatus::Stopping);
841
842 let (exit_state, exit_reason, was_killed) = loop_done??;
843
844 // if we didn't exit in error mode, call `post_stop`
845 if !was_killed {
846 Self::do_post_stop(myself_clone, handler, exit_state)
847 .await?
848 .map_err(ActorErr::Failed)?;
849 }
850
851 Ok(exit_reason)
852 }
853
854 /// Process a message, returning the "new" state (if changed)
855 /// along with optionally whether we were signaled mid-processing or not
856 ///
857 /// * `myself` - The current [ActorRef]
858 /// * `state` - The current [Actor::State] object
859 /// * `handler` - Pointer to the [Actor] definition
860 /// * `ports` - The mutable [ActorPortSet] which are the message ports for this actor
861 ///
862 /// Returns a tuple of the next [Actor::State] and a flag to denote if the processing
863 /// loop is done
864 async fn process_message(
865 myself: ActorRef<TActor::Msg>,
866 state: &mut TActor::State,
867 handler: &TActor,
868 ports: &mut ActorPortSet,
869 ) -> Result<ActorLoopResult, ActorProcessingErr> {
870 match ports.listen_in_priority().await {
871 Ok(actor_port_message) => match actor_port_message {
872 actor_cell::ActorPortMessage::Signal(signal) => {
873 Ok(ActorLoopResult::signal(Self::handle_signal(myself, signal)))
874 }
875 actor_cell::ActorPortMessage::Stop(stop_message) => {
876 let exit_reason = match stop_message {
877 StopMessage::Stop => {
878 tracing::trace!("Actor {:?} stopped with no reason", myself.get_id());
879 None
880 }
881 StopMessage::Reason(reason) => {
882 tracing::trace!(
883 "Actor {:?} stopped with reason '{reason}'",
884 myself.get_id(),
885 );
886 Some(reason)
887 }
888 };
889 Ok(ActorLoopResult::stop(exit_reason))
890 }
891 actor_cell::ActorPortMessage::Supervision(supervision) => {
892 let future = Self::handle_supervision_message(
893 myself.clone(),
894 state,
895 handler,
896 supervision,
897 );
898 match ports.run_with_signal(future).await {
899 Ok(Ok(())) => Ok(ActorLoopResult::ok()),
900 Ok(Err(internal_err)) => Err(internal_err),
901 Err(signal) => {
902 Ok(ActorLoopResult::signal(Self::handle_signal(myself, signal)))
903 }
904 }
905 }
906 actor_cell::ActorPortMessage::Message(MuxedMessage::Message(msg)) => {
907 let future = Self::handle_message(myself.clone(), state, handler, msg);
908 match ports.run_with_signal(future).await {
909 Ok(Ok(())) => Ok(ActorLoopResult::ok()),
910 Ok(Err(internal_err)) => Err(internal_err),
911 Err(signal) => {
912 Ok(ActorLoopResult::signal(Self::handle_signal(myself, signal)))
913 }
914 }
915 }
916 actor_cell::ActorPortMessage::Message(MuxedMessage::Drain) => {
917 // Drain is a stub marker that the actor should now stop, we've processed
918 // all the messages and we want the actor to die now
919 Ok(ActorLoopResult::stop(Some("Drained".to_string())))
920 }
921 },
922 Err(MessagingErr::ChannelClosed) => {
923 // one of the channels is closed, this means
924 // the receiver was dropped and in this case
925 // we should always die. Therefore we flag
926 // to terminate
927 Ok(ActorLoopResult::signal(Self::handle_signal(
928 myself,
929 Signal::Kill,
930 )))
931 }
932 Err(MessagingErr::InvalidActorType) => {
933 // not possible. Treat like a channel closed
934 Ok(ActorLoopResult::signal(Self::handle_signal(
935 myself,
936 Signal::Kill,
937 )))
938 }
939 Err(MessagingErr::SendErr(_)) => {
940 // not possible. Treat like a channel closed
941 Ok(ActorLoopResult::signal(Self::handle_signal(
942 myself,
943 Signal::Kill,
944 )))
945 }
946 }
947 }
948
949 async fn handle_message(
950 myself: ActorRef<TActor::Msg>,
951 state: &mut TActor::State,
952 handler: &TActor,
953 mut msg: crate::message::BoxedMessage,
954 ) -> Result<(), ActorProcessingErr> {
955 // panic in order to kill the actor
956 #[cfg(feature = "cluster")]
957 {
958 // A `RemoteActor` will handle serialized messages, without decoding them, forwarding them
959 // to the remote system for decoding + handling by the real implementation. Therefore `RemoteActor`s
960 // can be thought of as a "shim" to a real actor on a remote system
961 if !myself.get_id().is_local() {
962 match msg.serialized_msg {
963 Some(serialized_msg) => {
964 return handler
965 .handle_serialized(myself, serialized_msg, state)
966 .await;
967 }
968 None => {
969 return Err(From::from(
970 "`RemoteActor` failed to read `SerializedMessage` from `BoxedMessage`",
971 ));
972 }
973 }
974 }
975 }
976
977 // The current [tracing::Span] is retrieved, boxed, and included in every
978 // `BoxedMessage` during the conversion of this `TActor::Msg`. It is used
979 // to automatically continue tracing span nesting when sending messages to Actors.
980 let current_span_when_message_was_sent = msg.span.take();
981
982 // An error here will bubble up to terminate the actor
983 let typed_msg = TActor::Msg::from_boxed(msg)?;
984
985 if let Some(span) = current_span_when_message_was_sent {
986 handler
987 .handle(myself, typed_msg, state)
988 .instrument(span)
989 .await
990 } else {
991 handler.handle(myself, typed_msg, state).await
992 }
993 }
994
995 fn handle_signal(myself: ActorRef<TActor::Msg>, signal: Signal) -> Option<String> {
996 match &signal {
997 Signal::Kill => {
998 myself.terminate();
999 }
1000 }
1001 Some(signal.to_string())
1002 }
1003
1004 async fn handle_supervision_message(
1005 myself: ActorRef<TActor::Msg>,
1006 state: &mut TActor::State,
1007 handler: &TActor,
1008 message: SupervisionEvent,
1009 ) -> Result<(), ActorProcessingErr> {
1010 handler.handle_supervisor_evt(myself, message, state).await
1011 }
1012
1013 async fn do_pre_start(
1014 myself: ActorRef<TActor::Msg>,
1015 handler: &TActor,
1016 arguments: TActor::Arguments,
1017 ) -> Result<Result<TActor::State, ActorProcessingErr>, SpawnErr> {
1018 let future = handler.pre_start(myself, arguments);
1019 futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
1020 .await
1021 .map_err(|err| SpawnErr::StartupFailed(get_panic_string(err)))
1022 }
1023
1024 async fn do_post_start(
1025 myself: ActorRef<TActor::Msg>,
1026 handler: &TActor,
1027 state: &mut TActor::State,
1028 ) -> Result<Result<(), ActorProcessingErr>, ActorErr> {
1029 let future = handler.post_start(myself, state);
1030 futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
1031 .await
1032 .map_err(|err| ActorErr::Failed(get_panic_string(err)))
1033 }
1034
1035 async fn do_post_stop(
1036 myself: ActorRef<TActor::Msg>,
1037 handler: &TActor,
1038 state: &mut TActor::State,
1039 ) -> Result<Result<(), ActorProcessingErr>, ActorErr> {
1040 let future = handler.post_stop(myself, state);
1041 futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
1042 .await
1043 .map_err(|err| ActorErr::Failed(get_panic_string(err)))
1044 }
1045}