tonari_actor/
lib.rs

1#![warn(clippy::clone_on_ref_ptr, clippy::todo)]
2
3//! This crate aims to provide a minimalist and high-performance actor framework
4//! for Rust with significantly less complexity than other frameworks like
5//! [Actix](https://docs.rs/actix/).
6//!
7//! In this framework, each `Actor` is its own OS-level thread. This makes debugging
8//! noticeably simpler, and is suitably performant when the number of actors
9//! is less than or equal to the number of CPU threads.
10//!
11//! # Example
12//! ```rust
13//! use tonari_actor::{Actor, Context, System};
14//!
15//! struct TestActor {}
16//! impl Actor for TestActor {
17//!     type Message = usize;
18//!     type Error = String;
19//!     type Context = Context<Self::Message>;
20//!
21//!     fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<(), String> {
22//!         println!("message: {}", message);
23//!
24//!         Ok(())
25//!     }
26//! }
27//!
28//! let mut system = System::new("default");
29//!
30//! // will spin up a new thread running this actor
31//! let addr = system.spawn(TestActor {}).unwrap();
32//!
33//! // send messages to actors to spin off work...
34//! addr.send(1usize).unwrap();
35//!
36//! // ask the actors to finish and join the threads.
37//! system.shutdown().unwrap();
38//! ```
39//!
40//! `tonari-actor` also provides some extensions on top of the basic actor functionality:
41//!
42//! # Timing Message Delivery
43//!
44//! On top of [`Context::set_deadline()`] and [`Actor::deadline_passed()`] building blocks there
45//! is a higher level abstraction for delayed and recurring messages in the [`timed`] module.
46//!
47//! # Publisher/subscriber Event System
48//!
49//! For cases where you want a global propagation of "events",
50//! you can implement the [`Event`] trait for your event type and then use [`Context::subscribe()`]
51//! and [`SystemHandle::publish()`] methods.
52//!
53//! Keep in mind that the event system has an additional requirement that the event type needs to be
54//! [`Clone`] and is not intended to be high-throughput. Run the `pub_sub` benchmark to get an idea.
55
56use flume::{select::SelectError, Receiver, RecvError, Selector, Sender};
57use log::*;
58use parking_lot::{Mutex, RwLock};
59use std::{
60    any::{type_name, TypeId},
61    collections::HashMap,
62    fmt,
63    ops::Deref,
64    sync::Arc,
65    thread,
66    time::{Duration, Instant},
67};
68
69pub mod timed;
70
71/// Capacity of the control channel (used to deliver [Control] messages).
72const CONTROL_CHANNEL_CAPACITY: usize = 5;
73
74#[derive(Debug)]
75pub enum ActorError {
76    /// The system has stopped, and a new actor can not be started.
77    SystemStopped { actor_name: &'static str },
78    /// Failed to spawn an actor thread.
79    SpawnFailed { actor_name: &'static str },
80    /// A panic occurred inside an actor thread.
81    ActorPanic,
82}
83
84impl fmt::Display for ActorError {
85    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
86        match self {
87            ActorError::SystemStopped { actor_name } => {
88                write!(f, "the system is not running; the actor {actor_name} can not be started")
89            },
90            ActorError::SpawnFailed { actor_name } => {
91                write!(f, "failed to spawn a thread for the actor {actor_name}")
92            },
93            ActorError::ActorPanic => {
94                write!(f, "panic inside an actor thread; see above for more verbose logs")
95            },
96        }
97    }
98}
99
100impl std::error::Error for ActorError {}
101
102/// Failures that can occur when sending a message to an actor.
103#[derive(Debug, Clone, Copy)]
104pub struct SendError {
105    /// The name of the intended recipient.
106    pub recipient_name: &'static str,
107    /// The priority assigned to the message that could not be sent.
108    pub priority: Priority,
109    /// The reason why sending has failed.
110    pub reason: SendErrorReason,
111}
112
113impl fmt::Display for SendError {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        let recipient_name = self.recipient_name;
116        let priority = self.priority;
117        match self.reason {
118            SendErrorReason::Full => {
119                write!(
120                    f,
121                    "the capacity of {recipient_name}'s {priority:?}-priority channel is full"
122                )
123            },
124            SendErrorReason::Disconnected => DisconnectedError { recipient_name, priority }.fmt(f),
125        }
126    }
127}
128
129impl std::error::Error for SendError {}
130
131/// Error publishing an event.
132#[derive(Debug)]
133pub struct PublishError(pub Vec<SendError>);
134
135impl fmt::Display for PublishError {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        let error_strings: Vec<String> = self.0.iter().map(ToString::to_string).collect();
138        write!(
139            f,
140            "failed to deliver an event to {} subscribers: {}",
141            self.0.len(),
142            error_strings.join(", ")
143        )
144    }
145}
146
147impl std::error::Error for PublishError {}
148
149/// The actor message channel is disconnected.
150#[derive(Debug, Clone, Copy)]
151pub struct DisconnectedError {
152    /// The name of the intended recipient.
153    pub recipient_name: &'static str,
154    /// The priority assigned to the message that could not be sent.
155    pub priority: Priority,
156}
157
158impl fmt::Display for DisconnectedError {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        write!(f, "the recipient of the message ({}) no longer exists", self.recipient_name)
161    }
162}
163
164impl std::error::Error for DisconnectedError {}
165
166/// Reasons why sending a message to an actor can fail.
167#[derive(Debug, Clone, Copy)]
168pub enum SendErrorReason {
169    /// The channel's capacity is full.
170    Full,
171    /// The recipient of the message no longer exists.
172    Disconnected,
173}
174
175impl<M> From<flume::TrySendError<M>> for SendErrorReason {
176    fn from(orig: flume::TrySendError<M>) -> Self {
177        match orig {
178            flume::TrySendError::Full(_) => Self::Full,
179            flume::TrySendError::Disconnected(_) => Self::Disconnected,
180        }
181    }
182}
183
184/// Systems are responsible for keeping track of their spawned actors, and managing
185/// their lifecycles appropriately.
186///
187/// You may run multiple systems in the same application, each system being responsible
188/// for its own pool of actors.
189#[derive(Default)]
190pub struct System {
191    handle: SystemHandle,
192}
193
194type SystemCallback = Box<dyn Fn() -> Result<(), ActorError> + Send + Sync>;
195type EventCallback = Box<dyn Fn(&dyn std::any::Any) -> Result<(), SendError> + Send + Sync>;
196
197#[derive(Default)]
198pub struct SystemCallbacks {
199    pub preshutdown: Option<SystemCallback>,
200    pub postshutdown: Option<SystemCallback>,
201}
202
203#[derive(Debug, Default, PartialEq)]
204enum SystemState {
205    /// The system is running and able to spawn new actors, or be asked to shut down
206    #[default]
207    Running,
208
209    /// The system is in the process of shutting down, actors cannot be spawned
210    /// or request for the system to shut down again
211    ShuttingDown,
212
213    /// The system has finished shutting down and is no longer running.
214    /// All actors have stopped and their threads have been joined. No actors
215    /// may be spawned at this point.
216    Stopped,
217}
218
219/// A marker trait for types which participate in the publish-subscribe system
220/// of the actor framework.
221pub trait Event: Clone + std::any::Any + Send + Sync {}
222
223#[derive(Default)]
224struct EventSubscribers {
225    events: HashMap<TypeId, Vec<EventCallback>>,
226    /// We cache the last published value of each event type.
227    /// Subscribers can request to receive it upon subscription.
228    /// Use a concurrent map type, benchmarks in #87 have shown that there is a contention on
229    /// updating the cached value otherwise.
230    last_value_cache: dashmap::DashMap<TypeId, Box<dyn std::any::Any + Send + Sync>>,
231}
232
233impl EventSubscribers {
234    fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&mut self, recipient: Recipient<M>) {
235        let subs = self.events.entry(TypeId::of::<E>()).or_default();
236        subs.push(Box::new(move |e| {
237            if let Some(event) = e.downcast_ref::<E>() {
238                let msg = event.clone();
239                recipient.send(msg.into())?;
240            }
241            Ok(())
242        }));
243    }
244}
245
246/// Contains the "metadata" of the system, including information about the registry
247/// of actors currently existing within the system.
248#[derive(Default, Clone)]
249pub struct SystemHandle {
250    name: String,
251    registry: Arc<Mutex<Vec<RegistryEntry>>>,
252    system_state: Arc<RwLock<SystemState>>,
253    callbacks: Arc<SystemCallbacks>,
254
255    event_subscribers: Arc<RwLock<EventSubscribers>>,
256}
257
258/// An execution context for a specific actor. Specifically, this is useful for managing
259/// the lifecycle of itself (through the `myself` field) and other actors via the `SystemHandle`
260/// provided. A time-based deadline for receiving a message can be set using
261/// [`Self::set_deadline()`] and friends.
262pub struct Context<M> {
263    pub system_handle: SystemHandle,
264    pub myself: Recipient<M>,
265    receive_deadline: Option<Instant>,
266}
267
268impl<M> Context<M> {
269    fn new(system_handle: SystemHandle, myself: Recipient<M>) -> Self {
270        Self { system_handle, myself, receive_deadline: None }
271    }
272
273    /// Get the deadline previously set using [`Self::set_deadline()`] or [`Self::set_timeout()`].
274    /// The deadline is cleared just before [`Actor::deadline_passed()`] is called.
275    pub fn deadline(&self) -> &Option<Instant> {
276        &self.receive_deadline
277    }
278
279    /// Schedule a future one-shot call to [`Actor::deadline_passed()`], or cancel the schedule.
280    /// A deadline in the past is considered to expire right in the next iteration (possibly after
281    /// receiving new messages).
282    pub fn set_deadline(&mut self, deadline: Option<Instant>) {
283        self.receive_deadline = deadline;
284    }
285
286    /// Schedule or cancel a call to [`Actor::deadline_passed()`] after `timeout` from now.
287    /// Convenience variant of [`Self::set_deadline()`].
288    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
289        self.set_deadline(timeout.map(|t| Instant::now() + t));
290    }
291
292    /// Subscribe current actor to event of type `E`. This is part of the event system. You don't
293    /// need to call this method to receive direct messages sent using [`Addr`] and [`Recipient`].
294    ///
295    /// Note that subscribing twice to the same event would result in duplicate events -- no
296    /// de-duplication of subscriptions is performed.
297    pub fn subscribe<E: Event + Into<M>>(&self)
298    where
299        M: 'static,
300    {
301        self.system_handle.subscribe_recipient::<M, E>(self.myself.clone());
302    }
303
304    /// Subscribe current actor to event of type `E` and send the last cached event to it.
305    /// This is part of the event system. You don't need to call this method to receive
306    /// direct messages sent using [`Addr`] and [`Recipient`].
307    ///
308    /// Note that subscribing twice to the same event would result in duplicate events -- no
309    /// de-duplication of subscriptions is performed.
310    ///
311    /// This method may fail if it is not possible to send the latest event. In this case it is
312    /// guaranteed that the subscription did not take place. You can safely try again.
313    pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError>
314    where
315        M: 'static,
316    {
317        self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone())
318    }
319}
320
321/// Capacity of actor's normal- and high-priority inboxes.
322/// For each inbox type, `None` signifies default capacity of given actor. Converts from [`usize`].
323#[derive(Clone, Copy, Debug, Default)]
324pub struct Capacity {
325    pub normal: Option<usize>,
326    pub high: Option<usize>,
327}
328
329impl Capacity {
330    /// Set capacity of the normal priority channel, and use default for the high priority one.
331    pub fn of_normal_priority(capacity: usize) -> Self {
332        Self { normal: Some(capacity), ..Default::default() }
333    }
334
335    /// Set capacity of the high priority channel, and use default for the normal priority one.
336    pub fn of_high_priority(capacity: usize) -> Self {
337        Self { high: Some(capacity), ..Default::default() }
338    }
339}
340
341/// Set capacity of both normal and high priority channels to the same amount of messages.
342impl From<usize> for Capacity {
343    fn from(capacity: usize) -> Self {
344        Self { normal: Some(capacity), high: Some(capacity) }
345    }
346}
347
348/// A builder for configuring [`Actor`] spawning.
349/// You can specify your own [`Addr`] for the Actor, or let the system create
350/// a new address with either provided or default capacity.
351#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
352              configure this builder"]
353pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> {
354    system: &'a mut System,
355    factory: F,
356}
357
358impl<'a, A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
359    SpawnBuilderWithoutAddress<'a, A, F>
360{
361    /// Specify an existing [`Addr`] to use with this Actor.
362    pub fn with_addr(self, addr: Addr<A>) -> SpawnBuilderWithAddress<'a, A, F> {
363        SpawnBuilderWithAddress { spawn_builder: self, addr }
364    }
365
366    /// Specify a capacity for the actor's receiving channel. Accepts [`Capacity`] or [`usize`].
367    pub fn with_capacity(self, capacity: impl Into<Capacity>) -> SpawnBuilderWithAddress<'a, A, F> {
368        let addr = Addr::with_capacity(capacity);
369        SpawnBuilderWithAddress { spawn_builder: self, addr }
370    }
371
372    /// Use the default capacity for the actor's receiving channel.
373    pub fn with_default_capacity(self) -> SpawnBuilderWithAddress<'a, A, F> {
374        let addr = Addr::with_capacity(Capacity::default());
375        SpawnBuilderWithAddress { spawn_builder: self, addr }
376    }
377}
378
379#[must_use = "You must call .spawn() or .run_and_block() to run an actor"]
380/// After having configured the builder with an address
381/// it is possible to create and run the actor either on a new thread with `spawn()`
382/// or on the current thread with `run_and_block()`.
383pub struct SpawnBuilderWithAddress<'a, A: Actor, F: FnOnce() -> A> {
384    spawn_builder: SpawnBuilderWithoutAddress<'a, A, F>,
385    addr: Addr<A>,
386}
387
388impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
389    SpawnBuilderWithAddress<'_, A, F>
390{
391    /// Run this Actor on the current calling thread. This is a
392    /// blocking call. This function will return when the Actor
393    /// has stopped.
394    pub fn run_and_block(self) -> Result<(), ActorError> {
395        let factory = self.spawn_builder.factory;
396        self.spawn_builder.system.block_on(factory(), self.addr)
397    }
398}
399
400impl<
401        A: 'static + Actor<Context = Context<<A as Actor>::Message>>,
402        F: FnOnce() -> A + Send + 'static,
403    > SpawnBuilderWithAddress<'_, A, F>
404{
405    /// Spawn this Actor into a new thread managed by the [`System`].
406    pub fn spawn(self) -> Result<Addr<A>, ActorError> {
407        let builder = self.spawn_builder;
408        builder.system.spawn_fn_with_addr(builder.factory, self.addr.clone())?;
409        Ok(self.addr)
410    }
411}
412
413impl System {
414    /// Creates a new System with a given name.
415    pub fn new(name: &str) -> Self {
416        System::with_callbacks(name, Default::default())
417    }
418
419    pub fn with_callbacks(name: &str, callbacks: SystemCallbacks) -> Self {
420        Self {
421            handle: SystemHandle {
422                name: name.to_owned(),
423                callbacks: Arc::new(callbacks),
424                ..SystemHandle::default()
425            },
426        }
427    }
428
429    /// Prepare an actor to be spawned. Returns a [`SpawnBuilderWithoutAddress`]
430    /// which has to be further configured before spawning the actor.
431    pub fn prepare<A>(&mut self, actor: A) -> SpawnBuilderWithoutAddress<A, impl FnOnce() -> A>
432    where
433        A: Actor,
434    {
435        SpawnBuilderWithoutAddress { system: self, factory: move || actor }
436    }
437
438    /// Similar to `prepare`, but an actor factory is passed instead
439    /// of an [`Actor`] itself. This is used when an actor needs to be
440    /// created on its own thread instead of the calling thread.
441    /// Returns a [`SpawnBuilderWithoutAddress`] which has to be further
442    /// configured before spawning the actor.
443    pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilderWithoutAddress<A, F>
444    where
445        A: Actor,
446        F: FnOnce() -> A + Send,
447    {
448        SpawnBuilderWithoutAddress { system: self, factory }
449    }
450
451    /// Spawn a normal [`Actor`] in the system, returning its address when successful.
452    /// This address is created by the system and uses a default capacity.
453    /// If you need to customize the address see [`prepare`] or [`prepare_fn`] above.
454    pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A>, ActorError>
455    where
456        A: Actor<Context = Context<<A as Actor>::Message>> + Send + 'static,
457    {
458        self.prepare(actor).with_default_capacity().spawn()
459    }
460
461    /// Spawn a normal Actor in the system, using a factory that produces an [`Actor`],
462    /// and an address that will be assigned to the Actor.
463    ///
464    /// This method is useful if you need to model circular dependencies between `Actor`s.
465    fn spawn_fn_with_addr<F, A>(&mut self, factory: F, addr: Addr<A>) -> Result<(), ActorError>
466    where
467        F: FnOnce() -> A + Send + 'static,
468        A: Actor<Context = Context<<A as Actor>::Message>> + 'static,
469    {
470        // Hold the lock until the end of the function to prevent the race
471        // condition between spawn and shutdown.
472        let system_state_lock = self.handle.system_state.read();
473        match *system_state_lock {
474            SystemState::ShuttingDown | SystemState::Stopped => {
475                return Err(ActorError::SystemStopped { actor_name: A::name() });
476            },
477            SystemState::Running => {},
478        }
479
480        let system_handle = self.handle.clone();
481        let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
482        let control_addr = addr.control_tx.clone();
483
484        let thread_handle = thread::Builder::new()
485            .name(A::name().into())
486            .spawn(move || {
487                let mut actor = factory();
488
489                if let Err(error) = actor.started(&mut context) {
490                    Self::report_error_shutdown(&system_handle, A::name(), "started", error);
491                    return;
492                }
493                debug!("[{}] started actor: {}", system_handle.name, A::name());
494
495                Self::run_actor_select_loop(actor, addr, &mut context, &system_handle);
496            })
497            .map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
498
499        self.handle
500            .registry
501            .lock()
502            .push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
503
504        Ok(())
505    }
506
507    /// Block the current thread until the system is shutdown.
508    pub fn run(&mut self) -> Result<(), ActorError> {
509        while *self.system_state.read() != SystemState::Stopped {
510            thread::sleep(Duration::from_millis(10));
511        }
512
513        Ok(())
514    }
515
516    /// Takes an actor and its address and runs it on the calling thread. This function
517    /// will exit once the actor has stopped.
518    fn block_on<A>(&mut self, mut actor: A, addr: Addr<A>) -> Result<(), ActorError>
519    where
520        A: Actor<Context = Context<<A as Actor>::Message>>,
521    {
522        // Prevent race condition of spawn and shutdown.
523        if !self.is_running() {
524            return Err(ActorError::SystemStopped { actor_name: A::name() });
525        }
526
527        let system_handle = &self.handle;
528        let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
529
530        self.handle
531            .registry
532            .lock()
533            .push(RegistryEntry::InPlace(addr.control_tx.clone(), thread::current()));
534
535        match actor.started(&mut context) {
536            Ok(()) => {
537                debug!("[{}] started actor: {}", system_handle.name, A::name());
538                Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
539            },
540            Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started", error),
541        }
542
543        // Wait for the system to shutdown before we exit, otherwise the process
544        // would exit before the system is completely shutdown
545        // TODO(bschwind) - We could possibly use a parking_lot::CondVar here
546        //                  for more efficient waiting
547        while *self.system_state.read() != SystemState::Stopped {
548            thread::sleep(Duration::from_millis(10));
549        }
550
551        Ok(())
552    }
553
554    fn run_actor_select_loop<A>(
555        mut actor: A,
556        addr: Addr<A>,
557        context: &mut Context<A::Message>,
558        system_handle: &SystemHandle,
559    ) where
560        A: Actor<Context = Context<<A as Actor>::Message>>,
561    {
562        /// What can be received during one actor event loop.
563        enum Received<M> {
564            Control(Control),
565            Message(M),
566            Timeout,
567        }
568
569        loop {
570            // We don't handle the messages (control and actor's) directly in .recv(), that would
571            // lead to mutably borrowing actor multiple times. Read into intermediate enum instead.
572            // The order of .recv() calls is significant and determines priority.
573            let selector = Selector::new()
574                .recv(&addr.control_rx, |msg| match msg {
575                    Ok(control) => Received::Control(control),
576                    Err(RecvError::Disconnected) => {
577                        panic!("We keep control_tx alive through addr, should not happen.")
578                    },
579                })
580                .recv(&addr.priority_rx, |msg| match msg {
581                    Ok(msg) => Received::Message(msg),
582                    Err(RecvError::Disconnected) => {
583                        panic!("We keep priority_tx alive through addr, should not happen.")
584                    },
585                })
586                .recv(&addr.message_rx, |msg| match msg {
587                    Ok(msg) => Received::Message(msg),
588                    Err(RecvError::Disconnected) => {
589                        panic!("We keep message_tx alive through addr, should not happen.")
590                    },
591                });
592
593            // Wait for some event to happen, with a timeout if set.
594            let received = if let Some(deadline) = context.receive_deadline {
595                match selector.wait_deadline(deadline) {
596                    Ok(received) => received,
597                    Err(SelectError::Timeout) => Received::Timeout,
598                }
599            } else {
600                selector.wait()
601            };
602
603            // Process the event. Returning ends actor loop, the normal operation is to fall through.
604            match received {
605                Received::Control(Control::Stop) => {
606                    if let Err(error) = actor.stopped(context) {
607                        // FWIW this should always hit the "while shutting down" variant.
608                        Self::report_error_shutdown(system_handle, A::name(), "stopped", error);
609                    }
610                    debug!("[{}] stopped actor: {}", system_handle.name, A::name());
611                    return;
612                },
613                Received::Message(msg) => {
614                    trace!("[{}] message received by {}", system_handle.name, A::name());
615                    if let Err(error) = actor.handle(context, msg) {
616                        Self::report_error_shutdown(system_handle, A::name(), "handle", error);
617                        return;
618                    }
619                },
620                Received::Timeout => {
621                    let deadline = context.receive_deadline.take().expect("implied by timeout");
622                    if let Err(error) = actor.deadline_passed(context, deadline) {
623                        Self::report_error_shutdown(
624                            system_handle,
625                            A::name(),
626                            "deadline_passed",
627                            error,
628                        );
629                        return;
630                    }
631                },
632            }
633        }
634    }
635
636    fn report_error_shutdown(
637        system_handle: &SystemHandle,
638        actor_name: &str,
639        method_name: &str,
640        error: impl std::fmt::Display,
641    ) {
642        let is_running = match *system_handle.system_state.read() {
643            SystemState::Running => true,
644            SystemState::ShuttingDown | SystemState::Stopped => false,
645        };
646
647        let system_name = &system_handle.name;
648
649        // Note that the system may have transitioned from running to stopping (but not the other
650        // way around) in the mean time. Slightly imprecise log and an extra no-op call is fine.
651        if is_running {
652            error!(
653                "[{system_name}] {actor_name} {method_name}() error: {error:#}. Shutting down the \
654                 actor system."
655            );
656            let _ = system_handle.shutdown();
657        } else {
658            warn!(
659                "[{system_name}] {actor_name} {method_name}() error while shutting down: \
660                 {error:#}. Ignoring."
661            );
662        }
663    }
664}
665
666impl Drop for System {
667    fn drop(&mut self) {
668        self.shutdown().unwrap();
669    }
670}
671
672impl Deref for System {
673    type Target = SystemHandle;
674
675    fn deref(&self) -> &Self::Target {
676        &self.handle
677    }
678}
679
680impl SystemHandle {
681    /// Stops all actors spawned by this system.
682    pub fn shutdown(&self) -> Result<(), ActorError> {
683        let shutdown_start = Instant::now();
684
685        let current_thread = thread::current();
686        let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
687
688        // Use an inner scope to prevent holding the lock for the duration of shutdown
689        {
690            let mut system_state_lock = self.system_state.write();
691
692            match *system_state_lock {
693                SystemState::ShuttingDown | SystemState::Stopped => {
694                    debug!(
695                        "[{}] thread {} called system.shutdown() but the system is already \
696                         shutting down or stopped.",
697                        self.name, current_thread_name,
698                    );
699                    return Ok(());
700                },
701                SystemState::Running => {
702                    info!(
703                        "[{}] thread {} shutting down the actor system.",
704                        self.name, current_thread_name,
705                    );
706                    *system_state_lock = SystemState::ShuttingDown;
707                },
708            }
709        }
710
711        if let Some(callback) = self.callbacks.preshutdown.as_ref() {
712            info!("[{}] calling pre-shutdown callback.", self.name);
713            if let Err(err) = callback() {
714                warn!("[{}] pre-shutdown callback failed, reason: {}", self.name, err);
715            }
716        }
717
718        let err_count = {
719            let mut registry = self.registry.lock();
720            debug!("[{}] joining {} actor threads.", self.name, registry.len());
721
722            // Stop actors in the reverse order in which they were spawned.
723            // Send the Stop control message to all actors first so they can
724            // all shut down in parallel, so actors will be in the process of
725            // stopping when we join the threads below.
726            for entry in registry.iter_mut().rev() {
727                let actor_name = entry.name();
728
729                if let Err(e) = entry.control_addr().send(Control::Stop) {
730                    warn!(
731                        "Couldn't send Control::Stop to {actor_name} to shut it down: {e:#}. \
732                         Ignoring and proceeding."
733                    );
734                }
735            }
736
737            registry
738                .drain(..)
739                .enumerate()
740                .rev()
741                .filter_map(|(i, entry)| {
742                    let actor_name = entry.name();
743
744                    match entry {
745                        RegistryEntry::InPlace(_, _) => {
746                            debug!(
747                                "[{}] [{i}] skipping join of an actor running in-place: \
748                                 {actor_name}",
749                                self.name
750                            );
751                            None
752                        },
753                        RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
754                            if thread_handle.thread().id() == current_thread.id() {
755                                debug!(
756                                    "[{}] [{i}] skipping join of the actor thread currently \
757                                     executing SystemHandle::shutdown(): {actor_name}",
758                                    self.name,
759                                );
760                                return None;
761                            }
762
763                            debug!("[{}] [{}] joining actor thread: {}", self.name, i, actor_name);
764
765                            let join_result = thread_handle.join().map_err(|e| {
766                                error!("a panic inside actor thread {actor_name}: {e:?}")
767                            });
768
769                            debug!("[{}] [{}] joined actor thread:  {}", self.name, i, actor_name);
770                            join_result.err()
771                        },
772                    }
773                })
774                .count()
775        };
776
777        info!("[{}] system finished shutting down in {:?}.", self.name, shutdown_start.elapsed());
778
779        if let Some(callback) = self.callbacks.postshutdown.as_ref() {
780            info!("[{}] calling post-shutdown callback.", self.name);
781            if let Err(err) = callback() {
782                warn!("[{}] post-shutdown callback failed, reason: {}", self.name, err);
783            }
784        }
785
786        *self.system_state.write() = SystemState::Stopped;
787
788        if err_count > 0 {
789            Err(ActorError::ActorPanic)
790        } else {
791            Ok(())
792        }
793    }
794
795    /// Subscribe given `recipient` to events of type `E`. See [`Context::subscribe()`].
796    pub fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&self, recipient: Recipient<M>) {
797        let mut event_subscribers = self.event_subscribers.write();
798        event_subscribers.subscribe_recipient::<M, E>(recipient);
799    }
800
801    /// Subscribe given `recipient` to events of type `E` and send the last cached event to it.
802    /// See [`Context::subscribe_and_receive_latest()`].
803    pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>(
804        &self,
805        recipient: Recipient<M>,
806    ) -> Result<(), SendError> {
807        let mut event_subscribers = self.event_subscribers.write();
808
809        // Send the last cached value if there is one. The cached event sending and adding ourselves
810        // to the subscriber list needs to be under the same write lock guard to avoid race
811        // conditions between this method and `publish()` (to guarantee exactly-once delivery).
812        if let Some(last_cached_value) = event_subscribers.last_value_cache.get(&TypeId::of::<E>())
813        {
814            if let Some(msg) = last_cached_value.downcast_ref::<E>() {
815                recipient.send(msg.clone().into())?;
816            }
817        }
818
819        event_subscribers.subscribe_recipient::<M, E>(recipient);
820        Ok(())
821    }
822
823    /// Publish an event. All actors that have previously subscribed to the type will receive it.
824    ///
825    /// The event will be also cached. Actors that will subscribe to the type in future may choose
826    /// to receive the last cached event upon subscription.
827    ///
828    /// When sending to some subscriber fails, others are still tried and vec of errors is returned.
829    /// For direct, non-[`Clone`] or high-throughput messages please use [`Addr`] or [`Recipient`].
830    pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> {
831        let event_subscribers = self.event_subscribers.read();
832        let type_id = TypeId::of::<E>();
833
834        // This value update must be under the read lock (even if it would be possible to factor
835        // `last_value_cache` outside of the `RwLock`) to prevent race conditions between this and
836        // `subscribe_and_receive_latest()`.
837        event_subscribers.last_value_cache.insert(type_id, Box::new(event.clone()));
838
839        if let Some(subs) = event_subscribers.events.get(&type_id) {
840            let errors: Vec<SendError> = subs
841                .iter()
842                .filter_map(|subscriber_callback| subscriber_callback(&event).err())
843                .collect();
844            if !errors.is_empty() {
845                return Err(PublishError(errors));
846            }
847        }
848
849        Ok(())
850    }
851
852    pub fn is_running(&self) -> bool {
853        *self.system_state.read() == SystemState::Running
854    }
855}
856
857enum RegistryEntry {
858    InPlace(Sender<Control>, thread::Thread),
859    BackgroundThread(Sender<Control>, thread::JoinHandle<()>),
860}
861
862impl RegistryEntry {
863    fn name(&self) -> String {
864        match self {
865            RegistryEntry::InPlace(_, thread_handle) => {
866                thread_handle.name().unwrap_or("unnamed").to_owned()
867            },
868            RegistryEntry::BackgroundThread(_, join_handle) => {
869                join_handle.thread().name().unwrap_or("unnamed").to_owned()
870            },
871        }
872    }
873
874    fn control_addr(&mut self) -> &mut Sender<Control> {
875        match self {
876            RegistryEntry::InPlace(control_addr, _) => control_addr,
877            RegistryEntry::BackgroundThread(control_addr, _) => control_addr,
878        }
879    }
880}
881
882/// The set of available control messages that all actors respond to.
883pub enum Control {
884    /// Stop the actor
885    Stop,
886}
887
888/// The base actor trait.
889pub trait Actor {
890    /// The expected type of a message to be received.
891    // 'static required to create trait object in Addr, https://stackoverflow.com/q/29740488/4345715
892    type Message: Send + 'static;
893    /// The type to return on error in the handle method.
894    type Error: std::fmt::Display;
895    /// What kind of context this actor accepts. Usually [`Context<Self::Message>`].
896    type Context;
897
898    /// Default capacity of actor's normal-priority inbox unless overridden by `.with_capacity()`.
899    const DEFAULT_CAPACITY_NORMAL: usize = 5;
900    /// Default capacity of actor's high-priority inbox unless overridden by `.with_capacity()`.
901    const DEFAULT_CAPACITY_HIGH: usize = 5;
902
903    /// The name of the Actor. Used by `tonari-actor` for logging/debugging.
904    /// Default implementation uses [`type_name()`].
905    fn name() -> &'static str {
906        type_name::<Self>()
907    }
908
909    /// Determine priority of a `message` before it is sent to this actor.
910    /// Default implementation returns [`Priority::Normal`].
911    fn priority(_message: &Self::Message) -> Priority {
912        Priority::Normal
913    }
914
915    /// An optional callback when the Actor has been started.
916    fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
917        Ok(())
918    }
919
920    /// The primary function of this trait, allowing an actor to handle incoming messages of a certain type.
921    fn handle(
922        &mut self,
923        context: &mut Self::Context,
924        message: Self::Message,
925    ) -> Result<(), Self::Error>;
926
927    /// An optional callback when the Actor has been stopped.
928    fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
929        Ok(())
930    }
931
932    /// An optional callback when a deadline has passed.
933    ///
934    /// The deadline has to be set via [`Context::set_deadline()`] or [`Context::set_timeout()`]
935    /// first. The instant to which the deadline was originally set is passed via the `deadline`
936    /// argument; it is normally close to [`Instant::now()`], but can be later if the actor was busy.
937    ///
938    /// # Periodic tick example
939    /// ```
940    /// # use {std::{cmp::max, time::{Duration, Instant}}, tonari_actor::{Actor, Context}};
941    /// # struct TickingActor;
942    /// impl Actor for TickingActor {
943    /// #    type Context = Context<Self::Message>;
944    /// #    type Error = String;
945    /// #    type Message = ();
946    /// #    fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> { Ok(()) }
947    ///     // ...
948    ///
949    ///     fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<(), String> {
950    ///         // do_periodic_housekeeping();
951    ///
952    ///         // A: Schedule one second from now (even if delayed); drifting tick.
953    ///         context.set_timeout(Some(Duration::from_secs(1)));
954    ///
955    ///         // B: Schedule one second from deadline; non-drifting tick.
956    ///         context.set_deadline(Some(deadline + Duration::from_secs(1)));
957    ///
958    ///         // C: Schedule one second from deadline, but don't fire multiple times if delayed.
959    ///         context.set_deadline(Some(max(deadline + Duration::from_secs(1), Instant::now())));
960    ///
961    ///         Ok(())
962    ///     }
963    /// }
964    /// ```
965    fn deadline_passed(
966        &mut self,
967        _context: &mut Self::Context,
968        _deadline: Instant,
969    ) -> Result<(), Self::Error> {
970        Ok(())
971    }
972}
973
974pub struct Addr<A: Actor + ?Sized> {
975    recipient: Recipient<A::Message>,
976    priority_rx: Receiver<A::Message>,
977    message_rx: Receiver<A::Message>,
978    control_rx: Receiver<Control>,
979}
980
981impl<A: Actor> Default for Addr<A> {
982    fn default() -> Self {
983        Self::with_capacity(Capacity::default())
984    }
985}
986
987impl<A: Actor> Clone for Addr<A> {
988    fn clone(&self) -> Self {
989        Self {
990            recipient: self.recipient.clone(),
991            priority_rx: self.priority_rx.clone(),
992            message_rx: self.message_rx.clone(),
993            control_rx: self.control_rx.clone(),
994        }
995    }
996}
997
998impl<A, M> Deref for Addr<A>
999where
1000    A: Actor<Message = M>,
1001{
1002    type Target = Recipient<M>;
1003
1004    fn deref(&self) -> &Self::Target {
1005        &self.recipient
1006    }
1007}
1008
1009impl<A: Actor> Addr<A> {
1010    /// Create address for an actor, specifying its inbox size. Accepts [`Capacity`] or [`usize`].
1011    pub fn with_capacity(capacity: impl Into<Capacity>) -> Self {
1012        let capacity: Capacity = capacity.into();
1013        let prio_capacity = capacity.high.unwrap_or(A::DEFAULT_CAPACITY_HIGH);
1014        let normal_capacity = capacity.normal.unwrap_or(A::DEFAULT_CAPACITY_NORMAL);
1015
1016        let (priority_tx, priority_rx) = flume::bounded::<A::Message>(prio_capacity);
1017        let (message_tx, message_rx) = flume::bounded::<A::Message>(normal_capacity);
1018        let (control_tx, control_rx) = flume::bounded(CONTROL_CHANNEL_CAPACITY);
1019
1020        let name = A::name();
1021        let message_tx = Arc::new(MessageSender {
1022            high: priority_tx,
1023            normal: message_tx,
1024            get_priority: A::priority,
1025            name,
1026        });
1027        Self {
1028            recipient: Recipient { message_tx, control_tx },
1029            priority_rx,
1030            message_rx,
1031            control_rx,
1032        }
1033    }
1034}
1035
1036/// Urgency of a given message. All high-priority messages are delivered before normal priority.
1037#[derive(Clone, Copy, Debug)]
1038pub enum Priority {
1039    Normal,
1040    High,
1041}
1042
1043/// Similar to [`Addr`], but rather than pointing to a specific actor,
1044/// it is typed for any actor that handles a given message-response type.
1045pub struct Recipient<M> {
1046    message_tx: Arc<dyn SenderTrait<M>>,
1047    control_tx: Sender<Control>,
1048}
1049
1050// #[derive(Clone)] adds Clone bound to M, which is not necessary.
1051// https://github.com/rust-lang/rust/issues/26925
1052impl<M> Clone for Recipient<M> {
1053    fn clone(&self) -> Self {
1054        Self { message_tx: Arc::clone(&self.message_tx), control_tx: self.control_tx.clone() }
1055    }
1056}
1057
1058impl<M> Recipient<M> {
1059    /// Send a message to an actor. Returns [`SendError`] if the channel is full; does not block.
1060    /// See [`SendResultExt`] trait for convenient handling of errors.
1061    pub fn send(&self, message: M) -> Result<(), SendError> {
1062        self.message_tx.try_send(message)
1063    }
1064}
1065
1066impl<M: 'static> Recipient<M> {
1067    /// Convert a [`Recipient<M>`] (or [`Addr<A>`] through [`Deref`], where `A::Message = M`) into
1068    /// [`Recipient<N>`], where message `N` can be converted into `M`.
1069    ///
1070    /// In case of converting from [`Addr`], this erases the type of the actor and only preserves
1071    /// type of the message, allowing you to make actors more independent of each other.
1072    pub fn recipient<N: Into<M>>(&self) -> Recipient<N> {
1073        Recipient {
1074            // Each level of boxing adds one .into() call, so box here to convert A::Message to M.
1075            message_tx: Arc::new(Arc::clone(&self.message_tx)),
1076            control_tx: self.control_tx.clone(),
1077        }
1078    }
1079}
1080
1081pub trait SendResultExt {
1082    /// Don't return an `Err` when the recipient is at full capacity, run `func(receiver_name)`
1083    /// in such a case instead. `receiver_name` is the name of the intended recipient.
1084    fn on_full<F: FnOnce(&'static str, Priority)>(self, func: F) -> Result<(), DisconnectedError>;
1085
1086    /// Don't return an `Err` when the recipient is at full capacity.
1087    fn ignore_on_full(self) -> Result<(), DisconnectedError>;
1088}
1089
1090impl SendResultExt for Result<(), SendError> {
1091    fn on_full<F: FnOnce(&'static str, Priority)>(
1092        self,
1093        callback: F,
1094    ) -> Result<(), DisconnectedError> {
1095        self.or_else(|e| match e {
1096            SendError { recipient_name, priority, reason: SendErrorReason::Full } => {
1097                callback(recipient_name, priority);
1098                Ok(())
1099            },
1100            SendError { recipient_name, priority, reason: SendErrorReason::Disconnected } => {
1101                Err(DisconnectedError { recipient_name, priority })
1102            },
1103        })
1104    }
1105
1106    fn ignore_on_full(self) -> Result<(), DisconnectedError> {
1107        self.on_full(|_, _| ())
1108    }
1109}
1110
1111/// Internal struct to encapsulate ability to send message with priority to an actor.
1112struct MessageSender<M> {
1113    high: Sender<M>,
1114    normal: Sender<M>,
1115    get_priority: fn(&M) -> Priority,
1116    /// Name of the actor we're sending to.
1117    name: &'static str,
1118}
1119
1120/// Internal trait to generalize over [`Sender`].
1121trait SenderTrait<M>: Send + Sync {
1122    fn try_send(&self, message: M) -> Result<(), SendError>;
1123}
1124
1125/// [`SenderTrait`] is implemented for our [`MessageSender`].
1126impl<M: Send> SenderTrait<M> for MessageSender<M> {
1127    fn try_send(&self, message: M) -> Result<(), SendError> {
1128        let priority = (self.get_priority)(&message);
1129        let sender = match priority {
1130            Priority::Normal => &self.normal,
1131            Priority::High => &self.high,
1132        };
1133        sender.try_send(message).map_err(|e| SendError {
1134            reason: e.into(),
1135            recipient_name: self.name,
1136            priority,
1137        })
1138    }
1139}
1140
1141/// [`SenderTrait`] is also implemented for boxed version of itself, including M -> N conversion.
1142impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
1143    fn try_send(&self, message: M) -> Result<(), SendError> {
1144        self.deref().try_send(message.into())
1145    }
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use std::{
1151        rc::Rc,
1152        sync::atomic::{AtomicU32, Ordering},
1153        time::Duration,
1154    };
1155
1156    use super::*;
1157
1158    struct TestActor;
1159    impl Actor for TestActor {
1160        type Context = Context<Self::Message>;
1161        type Error = String;
1162        type Message = usize;
1163
1164        fn name() -> &'static str {
1165            // The name is asserted against in tests, use a short stable one rather than the default
1166            // implementation (`type_name()`)`, which is documented not to be stable and currently
1167            // produces `tonari_actor::tests::TestActor`.
1168            "TestActor"
1169        }
1170
1171        fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
1172            println!("message: {message}");
1173            Ok(())
1174        }
1175
1176        fn started(&mut self, _: &mut Self::Context) -> Result<(), String> {
1177            println!("started");
1178            Ok(())
1179        }
1180
1181        fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> {
1182            println!("stopped");
1183            Ok(())
1184        }
1185    }
1186
1187    #[test]
1188    fn it_works() {
1189        let mut system = System::new("hi");
1190        let address = system.spawn(TestActor).unwrap();
1191        let _ = system.spawn(TestActor).unwrap();
1192        let _ = system.spawn(TestActor).unwrap();
1193        let _ = system.spawn(TestActor).unwrap();
1194        let _ = system.spawn(TestActor).unwrap();
1195        address.send(1337usize).unwrap();
1196        address.send(666usize).unwrap();
1197        address.send(1usize).unwrap();
1198        thread::sleep(Duration::from_millis(100));
1199
1200        system.shutdown().unwrap();
1201        thread::sleep(Duration::from_millis(100));
1202    }
1203
1204    #[test]
1205    fn test_ignore_on_full() {
1206        let mut system = System::new("hi");
1207        let address = system.prepare(TestActor).with_capacity(1).spawn().unwrap();
1208        address.send(1337usize).unwrap();
1209        assert!(address.send(666usize).is_err());
1210        address.send(666usize).ignore_on_full().unwrap();
1211
1212        thread::sleep(Duration::from_millis(100));
1213
1214        system.shutdown().unwrap();
1215        thread::sleep(Duration::from_millis(100));
1216    }
1217
1218    #[test]
1219    fn send_constraints() {
1220        #[derive(Default)]
1221        struct LocalActor {
1222            _ensure_not_send_not_sync: Rc<()>,
1223        }
1224        impl Actor for LocalActor {
1225            type Context = Context<Self::Message>;
1226            type Error = String;
1227            type Message = ();
1228
1229            fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> {
1230                Ok(())
1231            }
1232
1233            /// We just need this test to compile, not run.
1234            fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> {
1235                ctx.system_handle.shutdown().map_err(|e| e.to_string())
1236            }
1237        }
1238
1239        let mut system = System::new("main");
1240
1241        // Allowable, as the struct will be created on the new thread.
1242        let _ = system.prepare_fn(LocalActor::default).with_default_capacity().spawn().unwrap();
1243
1244        // Allowable, as the struct will be run on the current thread.
1245        system.prepare(LocalActor::default()).with_default_capacity().run_and_block().unwrap();
1246
1247        system.shutdown().unwrap();
1248    }
1249
1250    #[test]
1251    fn timeouts() {
1252        struct TimeoutActor {
1253            handle_count: Arc<AtomicU32>,
1254            timeout_count: Arc<AtomicU32>,
1255        }
1256
1257        impl Actor for TimeoutActor {
1258            type Context = Context<Self::Message>;
1259            type Error = String;
1260            type Message = Option<Instant>;
1261
1262            fn handle(
1263                &mut self,
1264                ctx: &mut Self::Context,
1265                msg: Self::Message,
1266            ) -> Result<(), String> {
1267                self.handle_count.fetch_add(1, Ordering::SeqCst);
1268                if msg.is_some() {
1269                    ctx.receive_deadline = msg;
1270                }
1271                Ok(())
1272            }
1273
1274            fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), String> {
1275                self.timeout_count.fetch_add(1, Ordering::SeqCst);
1276                Ok(())
1277            }
1278        }
1279
1280        let mut system = System::new("timeouts");
1281        let (handle_count, timeout_count) = (Default::default(), Default::default());
1282        let actor = TimeoutActor {
1283            handle_count: Arc::clone(&handle_count),
1284            timeout_count: Arc::clone(&timeout_count),
1285        };
1286        let addr = system.spawn(actor).unwrap();
1287
1288        // Test that setting deadline to past triggers the deadline immediately.
1289        addr.send(Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())).unwrap();
1290        thread::sleep(Duration::from_millis(10));
1291        assert_eq!(handle_count.load(Ordering::SeqCst), 1);
1292        assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1293
1294        // Test the normal case.
1295        addr.send(Some(Instant::now() + Duration::from_millis(20))).unwrap();
1296        thread::sleep(Duration::from_millis(10));
1297        assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1298        assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1299        thread::sleep(Duration::from_millis(20));
1300        assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1301        assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1302
1303        // Test that receiving a message doesn't reset the deadline.
1304        addr.send(Some(Instant::now() + Duration::from_millis(40))).unwrap();
1305        thread::sleep(Duration::from_millis(20));
1306        assert_eq!(handle_count.load(Ordering::SeqCst), 3);
1307        assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1308        addr.send(None).unwrap();
1309        thread::sleep(Duration::from_millis(30));
1310        assert_eq!(handle_count.load(Ordering::SeqCst), 4);
1311        assert_eq!(timeout_count.load(Ordering::SeqCst), 3);
1312
1313        system.shutdown().unwrap();
1314    }
1315
1316    #[test]
1317    fn errors() {
1318        let mut system = System::new("hi");
1319        let low_capacity_actor: Addr<TestActor> = Addr::with_capacity(1);
1320        // Convert to `Recipient` so that we don't keep the receiving side of `Addr` alive.
1321        let stopped_actor = system.spawn(TestActor).unwrap().recipient();
1322
1323        low_capacity_actor.send(9).expect("one message should fit");
1324        let error = low_capacity_actor.send(123).unwrap_err();
1325        assert_eq!(
1326            error.to_string(),
1327            "the capacity of TestActor's Normal-priority channel is full"
1328        );
1329        assert_eq!(
1330            format!("{error:?}"),
1331            r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Full }"#
1332        );
1333
1334        system.shutdown().unwrap();
1335
1336        let error = stopped_actor.send(456usize).unwrap_err();
1337        assert_eq!(error.to_string(), "the recipient of the message (TestActor) no longer exists");
1338        assert_eq!(
1339            format!("{error:?}"),
1340            r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Disconnected }"#
1341        );
1342    }
1343
1344    #[test]
1345    fn message_priorities() {
1346        env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace")).init();
1347
1348        struct PriorityActor {
1349            received: Arc<Mutex<Vec<usize>>>,
1350        }
1351
1352        impl Actor for PriorityActor {
1353            type Context = Context<Self::Message>;
1354            type Error = String;
1355            type Message = usize;
1356
1357            fn handle(
1358                &mut self,
1359                context: &mut Self::Context,
1360                message: Self::Message,
1361            ) -> Result<(), Self::Error> {
1362                let mut received = self.received.lock();
1363                received.push(message);
1364                if received.len() >= 20 {
1365                    context.system_handle.shutdown().unwrap();
1366                }
1367                Ok(())
1368            }
1369
1370            fn priority(message: &Self::Message) -> Priority {
1371                if *message >= 10 {
1372                    Priority::High
1373                } else {
1374                    Priority::Normal
1375                }
1376            }
1377        }
1378
1379        let addr = Addr::with_capacity(10);
1380        let received = Arc::new(Mutex::new(Vec::<usize>::new()));
1381
1382        // Send messages before even actor starts.
1383        for message in 0..20usize {
1384            addr.send(message).unwrap();
1385        }
1386
1387        let mut system = System::new("priorities");
1388        system
1389            .prepare(PriorityActor { received: Arc::clone(&received) })
1390            .with_addr(addr)
1391            .run_and_block()
1392            .unwrap();
1393
1394        assert_eq!(
1395            *received.lock(),
1396            [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1397        );
1398    }
1399
1400    impl Event for () {}
1401
1402    #[test]
1403    fn last_cached_event() {
1404        struct Subscriber;
1405        impl Actor for Subscriber {
1406            type Context = Context<Self::Message>;
1407            type Error = String;
1408            type Message = ();
1409
1410            fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
1411                context.subscribe_and_receive_latest::<Self::Message>().map_err(|e| e.to_string())
1412            }
1413
1414            fn handle(
1415                &mut self,
1416                context: &mut Self::Context,
1417                _: Self::Message,
1418            ) -> Result<(), Self::Error> {
1419                println!("Event received!");
1420                context.system_handle.shutdown().unwrap();
1421                Ok(())
1422            }
1423        }
1424
1425        let mut system = System::new("last cached event");
1426        system.publish(()).expect("can publish event");
1427
1428        // This test will block indefinitely if the event isn't delivered.
1429        system
1430            .prepare(Subscriber)
1431            .with_addr(Addr::with_capacity(1))
1432            .run_and_block()
1433            .expect("actor finishes successfully");
1434    }
1435}