Skip to main content

tonari_actor/
lib.rs

1#![warn(clippy::clone_on_ref_ptr, clippy::todo)]
2
3//! This crate aims to provide a minimalist and high-performance actor framework
4//! for Rust with significantly less complexity than other frameworks like
5//! [Actix](https://docs.rs/actix/).
6//!
7//! In this framework, each `Actor` is its own OS-level thread. This makes debugging
8//! noticeably simpler, and is suitably performant when the number of actors
9//! is less than or equal to the number of CPU threads.
10//!
11//! # Example
12//! ```rust
13//! use tonari_actor::{Actor, Context, System};
14//!
15//! struct TestActor {}
16//! impl Actor for TestActor {
17//!     type Message = usize;
18//!     type Error = String;
19//!     type Context = Context<Self::Message>;
20//!
21//!     fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<(), String> {
22//!         println!("message: {}", message);
23//!
24//!         Ok(())
25//!     }
26//! }
27//!
28//! let mut system = System::new("default");
29//!
30//! // will spin up a new thread running this actor
31//! let addr = system.spawn(TestActor {}).unwrap();
32//!
33//! // send messages to actors to spin off work...
34//! addr.send(1usize).unwrap();
35//!
36//! // ask the actors to finish and join the threads.
37//! system.shutdown().unwrap();
38//! ```
39//!
40//! `tonari-actor` also provides some extensions on top of the basic actor functionality:
41//!
42//! # Timing Message Delivery
43//!
44//! On top of [`Context::set_deadline()`] and [`Actor::deadline_passed()`] building blocks there
45//! is a higher level abstraction for delayed and recurring messages in the [`timed`] module.
46//!
47//! # Publisher/subscriber Event System
48//!
49//! For cases where you want a global propagation of "events",
50//! you can implement the [`Event`] trait for your event type and then use
51//! [`BareContext::subscribe()`] (also accessible on [`Context`] through [`Deref`])
52//! and [`SystemHandle::publish()`] methods.
53//!
54//! Keep in mind that the event system has an additional requirement that the event type needs to be
55//! [`Clone`] and is not intended to be high-throughput. Run the `pub_sub` benchmark to get an idea.
56//!
57//! # Async Actors
58//!
59//! Support of `async` actors exists under the `async` feature flag. See the documentation of the
60//! [`async`] module for more info.
61
62use flume::{Receiver, RecvError, Selector, Sender, select::SelectError};
63use log::*;
64use parking_lot::{Mutex, RwLock};
65use std::{
66    any::{TypeId, type_name},
67    collections::HashMap,
68    fmt,
69    ops::Deref,
70    sync::Arc,
71    thread,
72    time::{Duration, Instant},
73};
74
75#[cfg(feature = "async")]
76pub mod r#async;
77pub mod timed;
78
79// Reexport the often-imported trait to top level, also to let downstream avoid typing r#async.
80#[cfg(feature = "async")]
81pub use r#async::AsyncActor;
82
83/// Capacity of the control channel (used to deliver [Control] messages).
84const CONTROL_CHANNEL_CAPACITY: usize = 5;
85
86#[derive(Debug)]
87pub enum ActorError {
88    /// The system has stopped, and a new actor can not be started.
89    SystemStopped { actor_name: &'static str },
90    /// Failed to spawn an actor thread.
91    SpawnFailed { actor_name: &'static str },
92    /// A panic occurred inside an actor thread.
93    ActorPanic,
94}
95
96impl fmt::Display for ActorError {
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        match self {
99            ActorError::SystemStopped { actor_name } => {
100                write!(f, "the system is not running; the actor {actor_name} can not be started")
101            },
102            ActorError::SpawnFailed { actor_name } => {
103                write!(f, "failed to spawn a thread for the actor {actor_name}")
104            },
105            ActorError::ActorPanic => {
106                write!(f, "panic inside an actor thread; see above for more verbose logs")
107            },
108        }
109    }
110}
111
112impl std::error::Error for ActorError {}
113
114/// Failures that can occur when sending a message to an actor.
115#[derive(Debug, Clone, Copy)]
116pub struct SendError {
117    /// The name of the intended recipient.
118    pub recipient_name: &'static str,
119    /// The priority assigned to the message that could not be sent.
120    pub priority: Priority,
121    /// The reason why sending has failed.
122    pub reason: SendErrorReason,
123}
124
125impl fmt::Display for SendError {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        let recipient_name = self.recipient_name;
128        let priority = self.priority;
129        match self.reason {
130            SendErrorReason::Full => {
131                write!(
132                    f,
133                    "the capacity of {recipient_name}'s {priority:?}-priority channel is full"
134                )
135            },
136            SendErrorReason::Disconnected => DisconnectedError { recipient_name, priority }.fmt(f),
137        }
138    }
139}
140
141impl std::error::Error for SendError {}
142
143/// Error publishing an event.
144#[derive(Debug)]
145pub struct PublishError(pub Vec<SendError>);
146
147impl fmt::Display for PublishError {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        let error_strings: Vec<String> = self.0.iter().map(ToString::to_string).collect();
150        write!(
151            f,
152            "failed to deliver an event to {} subscribers: {}",
153            self.0.len(),
154            error_strings.join(", ")
155        )
156    }
157}
158
159impl std::error::Error for PublishError {}
160
161/// The actor message channel is disconnected.
162#[derive(Debug, Clone, Copy)]
163pub struct DisconnectedError {
164    /// The name of the intended recipient.
165    pub recipient_name: &'static str,
166    /// The priority assigned to the message that could not be sent.
167    pub priority: Priority,
168}
169
170impl fmt::Display for DisconnectedError {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        write!(f, "the recipient of the message ({}) no longer exists", self.recipient_name)
173    }
174}
175
176impl std::error::Error for DisconnectedError {}
177
178/// Reasons why sending a message to an actor can fail.
179#[derive(Debug, Clone, Copy)]
180pub enum SendErrorReason {
181    /// The channel's capacity is full.
182    Full,
183    /// The recipient of the message no longer exists.
184    Disconnected,
185}
186
187impl<M> From<flume::TrySendError<M>> for SendErrorReason {
188    fn from(orig: flume::TrySendError<M>) -> Self {
189        match orig {
190            flume::TrySendError::Full(_) => Self::Full,
191            flume::TrySendError::Disconnected(_) => Self::Disconnected,
192        }
193    }
194}
195
196/// Systems are responsible for keeping track of their spawned actors, and managing
197/// their lifecycles appropriately.
198///
199/// You may run multiple systems in the same application, each system being responsible
200/// for its own pool of actors.
201#[derive(Default)]
202pub struct System {
203    handle: SystemHandle,
204}
205
206type SystemCallback = Box<dyn Fn() -> Result<(), ActorError> + Send + Sync>;
207type EventCallback = Box<dyn Fn(&dyn std::any::Any) -> Result<(), SendError> + Send + Sync>;
208
209#[derive(Default)]
210pub struct SystemCallbacks {
211    pub preshutdown: Option<SystemCallback>,
212    pub postshutdown: Option<SystemCallback>,
213}
214
215#[derive(Debug, Default, PartialEq)]
216enum SystemState {
217    /// The system is running and able to spawn new actors, or be asked to shut down
218    #[default]
219    Running,
220
221    /// The system is in the process of shutting down, actors cannot be spawned
222    /// or request for the system to shut down again
223    ShuttingDown,
224
225    /// The system has finished shutting down and is no longer running.
226    /// All actors have stopped and their threads have been joined. No actors
227    /// may be spawned at this point.
228    Stopped,
229}
230
231impl SystemState {
232    /// Return `true` if the actor system is running (not shutting down, not stopped).
233    fn is_running(&self) -> bool {
234        match self {
235            SystemState::Running => true,
236            SystemState::ShuttingDown | SystemState::Stopped => false,
237        }
238    }
239}
240
241/// A marker trait for types which participate in the publish-subscribe system
242/// of the actor framework.
243pub trait Event: Clone + std::any::Any + Send + Sync {}
244
245#[derive(Default)]
246struct EventSubscribers {
247    events: HashMap<TypeId, Vec<EventCallback>>,
248    /// We cache the last published value of each event type.
249    /// Subscribers can request to receive it upon subscription.
250    /// Use a concurrent map type, benchmarks in #87 have shown that there is a contention on
251    /// updating the cached value otherwise.
252    last_value_cache: dashmap::DashMap<TypeId, Box<dyn std::any::Any + Send + Sync>>,
253}
254
255impl EventSubscribers {
256    fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&mut self, recipient: Recipient<M>) {
257        let subs = self.events.entry(TypeId::of::<E>()).or_default();
258        subs.push(Box::new(move |e| {
259            if let Some(event) = e.downcast_ref::<E>() {
260                let msg = event.clone();
261                recipient.send(msg.into())?;
262            }
263            Ok(())
264        }));
265    }
266}
267
268/// Contains the "metadata" of the system, including information about the registry
269/// of actors currently existing within the system.
270#[derive(Default, Clone)]
271pub struct SystemHandle {
272    name: String,
273    registry: Arc<Mutex<Vec<RegistryEntry>>>,
274    system_state: Arc<RwLock<SystemState>>,
275    callbacks: Arc<SystemCallbacks>,
276
277    event_subscribers: Arc<RwLock<EventSubscribers>>,
278}
279
280/// An execution context for a specific actor. Specifically, this is useful for managing
281/// the lifecycle of itself (through the `myself` field) and other actors via the [`SystemHandle`]
282/// provided using a dereference to [`BareContext`].
283///
284/// A time-based deadline for receiving a message can be set using [`Self::set_deadline()`] and
285/// friends.
286pub struct Context<M> {
287    bare: BareContext<M>,
288    receive_deadline: Option<Instant>,
289}
290
291impl<M> Context<M> {
292    fn new(system_handle: SystemHandle, myself: Recipient<M>) -> Self {
293        Self { bare: BareContext { system_handle, myself }, receive_deadline: None }
294    }
295
296    /// Get the deadline previously set using [`Self::set_deadline()`] or [`Self::set_timeout()`].
297    /// The deadline is cleared just before [`Actor::deadline_passed()`] is called.
298    pub fn deadline(&self) -> &Option<Instant> {
299        &self.receive_deadline
300    }
301
302    /// Schedule a future one-shot call to [`Actor::deadline_passed()`], or cancel the schedule.
303    /// A deadline in the past is considered to expire right in the next iteration (possibly after
304    /// receiving new messages).
305    pub fn set_deadline(&mut self, deadline: Option<Instant>) {
306        self.receive_deadline = deadline;
307    }
308
309    /// Schedule or cancel a call to [`Actor::deadline_passed()`] after `timeout` from now.
310    /// Convenience variant of [`Self::set_deadline()`].
311    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
312        self.set_deadline(timeout.map(|t| Instant::now() + t));
313    }
314}
315
316impl<M> Deref for Context<M> {
317    type Target = BareContext<M>;
318
319    fn deref(&self) -> &BareContext<M> {
320        &self.bare
321    }
322}
323
324/// A [`Context`] without the [`Context::set_deadline()`] functionality. Used by [`timed`] and
325/// [`async`] actors. [`Context`] dereferences to this bare variant for `system_handle` and `myself`
326/// fields.
327pub struct BareContext<M> {
328    pub system_handle: SystemHandle,
329    pub myself: Recipient<M>,
330}
331
332impl<M: 'static> BareContext<M> {
333    /// Subscribe current actor to event of type `E`. This is part of the event system. You don't
334    /// need to call this method to receive direct messages sent using [`Addr`] and [`Recipient`].
335    ///
336    /// Note that subscribing twice to the same event would result in duplicate events -- no
337    /// de-duplication of subscriptions is performed.
338    pub fn subscribe<E: Event + Into<M>>(&self) {
339        self.system_handle.subscribe_recipient::<M, E>(self.myself.clone());
340    }
341
342    /// Subscribe current actor to event of type `E` and send the last cached event to it.
343    /// This is part of the event system. You don't need to call this method to receive
344    /// direct messages sent using [`Addr`] and [`Recipient`].
345    ///
346    /// Note that subscribing twice to the same event would result in duplicate events -- no
347    /// de-duplication of subscriptions is performed.
348    ///
349    /// This method may fail if it is not possible to send the latest event. In this case it is
350    /// guaranteed that the subscription did not take place. You can safely try again.
351    pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError> {
352        self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone())
353    }
354}
355
356// #[derive(Clone)] adds Clone bound to M, which is not necessary.
357// https://github.com/rust-lang/rust/issues/26925
358impl<M> Clone for BareContext<M> {
359    fn clone(&self) -> Self {
360        Self { system_handle: self.system_handle.clone(), myself: self.myself.clone() }
361    }
362}
363
364/// Capacity of actor's normal- and high-priority inboxes. Converts from [`usize`].
365#[derive(Clone, Copy, Debug, Default)]
366pub struct Capacity {
367    pub normal: usize,
368    pub high: usize,
369}
370
371/// Set capacity of both normal and high priority channels to the same amount of messages.
372impl From<usize> for Capacity {
373    fn from(capacity: usize) -> Self {
374        Self { normal: capacity, high: capacity }
375    }
376}
377
378/// A builder for configuring [`Actor`] spawning.
379/// You can specify your own [`Addr`] for the Actor, or let the system create
380/// a new address with either provided or default capacity.
381#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
382              configure this builder"]
383pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> {
384    system: &'a mut System,
385    factory: F,
386}
387
388impl<'a, A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
389    SpawnBuilderWithoutAddress<'a, A, F>
390{
391    /// Specify an existing [`Addr`] to use with this Actor.
392    pub fn with_addr(self, addr: Addr<A::Message>) -> SpawnBuilderWithAddress<'a, A, F> {
393        SpawnBuilderWithAddress { spawn_builder: self, addr }
394    }
395
396    /// Specify a capacity for the actor's receiving channel. Accepts [`Capacity`] or [`usize`].
397    pub fn with_capacity(self, capacity: impl Into<Capacity>) -> SpawnBuilderWithAddress<'a, A, F> {
398        let addr = A::addr_with_capacity(capacity);
399        SpawnBuilderWithAddress { spawn_builder: self, addr }
400    }
401
402    /// Use the default capacity for the actor's receiving channel.
403    pub fn with_default_capacity(self) -> SpawnBuilderWithAddress<'a, A, F> {
404        let addr = A::addr();
405        SpawnBuilderWithAddress { spawn_builder: self, addr }
406    }
407}
408
409/// After having configured the builder with an address
410/// it is possible to create and run the actor either on a new thread with [`Self::spawn()`]
411/// or on the current thread with [`Self::run_and_block()`].
412#[must_use = "You must call .spawn() or .run_and_block() to run the actor"]
413pub struct SpawnBuilderWithAddress<'a, A: Actor, F: FnOnce() -> A> {
414    spawn_builder: SpawnBuilderWithoutAddress<'a, A, F>,
415    addr: Addr<A::Message>,
416}
417
418impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
419    SpawnBuilderWithAddress<'_, A, F>
420{
421    /// Run this Actor on the current calling thread. This is a
422    /// blocking call. This function will return when the Actor
423    /// has stopped.
424    pub fn run_and_block(self) -> Result<(), ActorError> {
425        let factory = self.spawn_builder.factory;
426        self.spawn_builder.system.block_on(factory(), self.addr)
427    }
428}
429
430impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A + Send + 'static>
431    SpawnBuilderWithAddress<'_, A, F>
432{
433    /// Spawn this Actor into a new thread managed by the [`System`].
434    pub fn spawn(self) -> Result<Addr<A::Message>, ActorError> {
435        let builder = self.spawn_builder;
436        builder.system.spawn_fn_with_addr(builder.factory, self.addr.clone())?;
437        Ok(self.addr)
438    }
439}
440
441impl System {
442    /// Creates a new System with a given name.
443    pub fn new(name: &str) -> Self {
444        System::with_callbacks(name, Default::default())
445    }
446
447    pub fn with_callbacks(name: &str, callbacks: SystemCallbacks) -> Self {
448        Self {
449            handle: SystemHandle {
450                name: name.to_owned(),
451                callbacks: Arc::new(callbacks),
452                ..SystemHandle::default()
453            },
454        }
455    }
456
457    /// Prepare an actor to be spawned. Returns a [`SpawnBuilderWithoutAddress`]
458    /// which has to be further configured before spawning the actor.
459    pub fn prepare<A>(
460        &mut self,
461        actor: A,
462    ) -> SpawnBuilderWithoutAddress<'_, A, impl FnOnce() -> A + use<A>>
463    where
464        A: Actor,
465    {
466        SpawnBuilderWithoutAddress { system: self, factory: move || actor }
467    }
468
469    /// Similar to [`Self::prepare()`], but an actor factory is passed instead
470    /// of an [`Actor`] itself. This is used when an actor needs to be
471    /// created on its own thread instead of the calling thread.
472    /// Returns a [`SpawnBuilderWithoutAddress`] which has to be further
473    /// configured before spawning the actor.
474    pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilderWithoutAddress<'_, A, F>
475    where
476        A: Actor,
477        F: FnOnce() -> A + Send,
478    {
479        SpawnBuilderWithoutAddress { system: self, factory }
480    }
481
482    /// Spawn a normal [`Actor`] in the system, returning its address when successful.
483    /// This address is created by the system and uses a default capacity.
484    /// If you need to customize the address see [`Self::prepare()`] or [`Self::prepare_fn()`].
485    pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A::Message>, ActorError>
486    where
487        A: Actor<Context = Context<<A as Actor>::Message>> + Send + 'static,
488    {
489        self.prepare(actor).with_default_capacity().spawn()
490    }
491
492    /// Spawn a normal Actor in the system, using a factory that produces an [`Actor`],
493    /// and an address that will be assigned to the Actor.
494    ///
495    /// This method is useful if you need to model circular dependencies between `Actor`s.
496    fn spawn_fn_with_addr<F, A>(
497        &mut self,
498        factory: F,
499        addr: Addr<A::Message>,
500    ) -> Result<(), ActorError>
501    where
502        F: FnOnce() -> A + Send + 'static,
503        A: Actor<Context = Context<<A as Actor>::Message>>,
504    {
505        // Hold the lock until the end of the function to prevent the race
506        // condition between spawn and shutdown.
507        let system_state_lock = self.handle.system_state.read();
508        if !system_state_lock.is_running() {
509            return Err(ActorError::SystemStopped { actor_name: A::name() });
510        }
511
512        let system_handle = self.handle.clone();
513        let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
514        let control_addr = addr.control_tx.clone();
515
516        let thread_handle = thread::Builder::new()
517            .name(A::name().into())
518            .spawn(move || {
519                let mut actor = factory();
520
521                if let Err(error) = actor.started(&mut context) {
522                    Self::report_error_shutdown(&system_handle, A::name(), "started()", error);
523                    return;
524                }
525                debug!("[{}] started actor: {}", system_handle.name, A::name());
526
527                Self::run_actor_select_loop(actor, addr, &mut context, &system_handle);
528            })
529            .map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
530
531        self.handle
532            .registry
533            .lock()
534            .push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
535
536        Ok(())
537    }
538
539    /// Block the current thread until the system is shutdown.
540    pub fn run(&mut self) -> Result<(), ActorError> {
541        while *self.system_state.read() != SystemState::Stopped {
542            thread::sleep(Duration::from_millis(10));
543        }
544
545        Ok(())
546    }
547
548    /// Takes an actor and its address and runs it on the calling thread. This function
549    /// will exit once the actor has stopped.
550    fn block_on<A>(&mut self, mut actor: A, addr: Addr<A::Message>) -> Result<(), ActorError>
551    where
552        A: Actor<Context = Context<<A as Actor>::Message>>,
553    {
554        // Prevent race condition of spawn and shutdown.
555        if !self.is_running() {
556            return Err(ActorError::SystemStopped { actor_name: A::name() });
557        }
558
559        let system_handle = &self.handle;
560        let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
561
562        self.handle
563            .registry
564            .lock()
565            .push(RegistryEntry::InPlace(addr.control_tx.clone(), thread::current()));
566
567        match actor.started(&mut context) {
568            Ok(()) => {
569                debug!("[{}] started actor: {}", system_handle.name, A::name());
570                Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
571            },
572            Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started()", error),
573        }
574
575        // Wait for the system to shutdown before we exit, otherwise the process
576        // would exit before the system is completely shutdown
577        // TODO(bschwind) - We could possibly use a parking_lot::CondVar here
578        //                  for more efficient waiting
579        while *self.system_state.read() != SystemState::Stopped {
580            thread::sleep(Duration::from_millis(10));
581        }
582
583        Ok(())
584    }
585
586    /// Keep logically in sync with [`Self::run_async_actor_select_loop()`].
587    fn run_actor_select_loop<A>(
588        mut actor: A,
589        addr: Addr<A::Message>,
590        context: &mut Context<A::Message>,
591        system_handle: &SystemHandle,
592    ) where
593        A: Actor<Context = Context<<A as Actor>::Message>>,
594    {
595        /// What can be received during one actor event loop.
596        enum Received<M> {
597            Control(Control),
598            Message(M),
599            Timeout,
600        }
601
602        loop {
603            // We don't handle the messages (control and actor's) directly in .recv(), that would
604            // lead to mutably borrowing actor multiple times. Read into intermediate enum instead.
605            // The order of .recv() calls is significant and determines priority.
606            let selector = Selector::new()
607                .recv(&addr.control_rx, |msg| match msg {
608                    Ok(control) => Received::Control(control),
609                    Err(RecvError::Disconnected) => {
610                        panic!("We keep control_tx alive through addr, should not happen.")
611                    },
612                })
613                .recv(&addr.priority_rx, |msg| match msg {
614                    Ok(msg) => Received::Message(msg),
615                    Err(RecvError::Disconnected) => {
616                        panic!("We keep priority_tx alive through addr, should not happen.")
617                    },
618                })
619                .recv(&addr.message_rx, |msg| match msg {
620                    Ok(msg) => Received::Message(msg),
621                    Err(RecvError::Disconnected) => {
622                        panic!("We keep message_tx alive through addr, should not happen.")
623                    },
624                });
625
626            // Wait for some event to happen, with a timeout if set.
627            let received = if let Some(deadline) = context.receive_deadline {
628                match selector.wait_deadline(deadline) {
629                    Ok(received) => received,
630                    Err(SelectError::Timeout) => Received::Timeout,
631                }
632            } else {
633                selector.wait()
634            };
635
636            // Process the event. Returning ends actor loop, the normal operation is to fall through.
637            match received {
638                Received::Control(Control::Stop) => {
639                    if let Err(error) = actor.stopped(context) {
640                        // FWIW this should always hit the "while shutting down" variant.
641                        Self::report_error_shutdown(system_handle, A::name(), "stopped()", error);
642                    }
643                    debug!("[{}] stopped actor: {}", system_handle.name, A::name());
644                    return;
645                },
646                Received::Message(msg) => {
647                    trace!("[{}] message received by {}", system_handle.name, A::name());
648                    if let Err(error) = actor.handle(context, msg) {
649                        Self::report_error_shutdown(system_handle, A::name(), "handle()", error);
650                        return;
651                    }
652                },
653                Received::Timeout => {
654                    let deadline = context.receive_deadline.take().expect("implied by timeout");
655                    if let Err(error) = actor.deadline_passed(context, deadline) {
656                        Self::report_error_shutdown(
657                            system_handle,
658                            A::name(),
659                            "deadline_passed()",
660                            error,
661                        );
662                        return;
663                    }
664                },
665            }
666        }
667    }
668
669    fn report_error_shutdown(
670        system_handle: &SystemHandle,
671        actor_name: &str,
672        action: &str,
673        error: impl std::fmt::Display,
674    ) {
675        let system_name = &system_handle.name;
676
677        // Note that the system may have transitioned from running to stopping (but not the other
678        // way around) in the mean time. Slightly imprecise log and an extra no-op call is fine.
679        if system_handle.system_state.read().is_running() {
680            error!(
681                "[{system_name}] {actor_name} {action} error: {error:#}. Shutting down the actor \
682                 system."
683            );
684            let _ = system_handle.shutdown();
685        } else {
686            warn!(
687                "[{system_name}] {actor_name} {action} error while shutting down: {error:#}. \
688                 Ignoring."
689            );
690        }
691    }
692}
693
694impl Drop for System {
695    fn drop(&mut self) {
696        self.shutdown().unwrap();
697    }
698}
699
700impl Deref for System {
701    type Target = SystemHandle;
702
703    fn deref(&self) -> &Self::Target {
704        &self.handle
705    }
706}
707
708impl SystemHandle {
709    /// Stops all actors spawned by this system.
710    pub fn shutdown(&self) -> Result<(), ActorError> {
711        let shutdown_start = Instant::now();
712
713        let current_thread = thread::current();
714        let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
715
716        // Use an inner scope to prevent holding the lock for the duration of shutdown
717        {
718            let mut system_state_lock = self.system_state.write();
719
720            if system_state_lock.is_running() {
721                info!(
722                    "[{}] thread {} shutting down the actor system.",
723                    self.name, current_thread_name,
724                );
725                *system_state_lock = SystemState::ShuttingDown;
726            } else {
727                trace!(
728                    "[{}] thread {} called system.shutdown() but the system is already shutting \
729                     down or stopped.",
730                    self.name, current_thread_name,
731                );
732                return Ok(());
733            }
734        }
735
736        if let Some(callback) = self.callbacks.preshutdown.as_ref() {
737            info!("[{}] calling pre-shutdown callback.", self.name);
738            if let Err(err) = callback() {
739                warn!("[{}] pre-shutdown callback failed, reason: {}", self.name, err);
740            }
741        }
742
743        let err_count = {
744            let mut registry = self.registry.lock();
745            debug!("[{}] joining {} actor threads.", self.name, registry.len());
746
747            // Stop actors in the reverse order in which they were spawned.
748            // Send the Stop control message to all actors first so they can
749            // all shut down in parallel, so actors will be in the process of
750            // stopping when we join the threads below.
751            for entry in registry.iter_mut().rev() {
752                let actor_name = entry.name();
753
754                if let Err(e) = entry.control_addr().send(Control::Stop) {
755                    warn!(
756                        "Couldn't send Control::Stop to {actor_name} to shut it down: {e:#}. \
757                         Ignoring and proceeding."
758                    );
759                }
760            }
761
762            registry
763                .drain(..)
764                .enumerate()
765                .rev()
766                .filter_map(|(i, entry)| {
767                    let actor_name = entry.name();
768
769                    match entry {
770                        RegistryEntry::InPlace(_, _) => {
771                            debug!(
772                                "[{}] [{i}] skipping join of an actor running in-place: \
773                                 {actor_name}",
774                                self.name
775                            );
776                            None
777                        },
778                        RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
779                            if thread_handle.thread().id() == current_thread.id() {
780                                debug!(
781                                    "[{}] [{i}] skipping join of the actor thread currently \
782                                     executing SystemHandle::shutdown(): {actor_name}",
783                                    self.name,
784                                );
785                                return None;
786                            }
787
788                            debug!("[{}] [{}] joining actor thread: {}", self.name, i, actor_name);
789
790                            let join_result = thread_handle.join().map_err(|e| {
791                                error!("a panic inside actor thread {actor_name}: {e:?}")
792                            });
793
794                            debug!("[{}] [{}] joined actor thread:  {}", self.name, i, actor_name);
795                            join_result.err()
796                        },
797                    }
798                })
799                .count()
800        };
801
802        info!("[{}] system finished shutting down in {:?}.", self.name, shutdown_start.elapsed());
803
804        if let Some(callback) = self.callbacks.postshutdown.as_ref() {
805            info!("[{}] calling post-shutdown callback.", self.name);
806            if let Err(err) = callback() {
807                warn!("[{}] post-shutdown callback failed, reason: {}", self.name, err);
808            }
809        }
810
811        *self.system_state.write() = SystemState::Stopped;
812
813        if err_count > 0 { Err(ActorError::ActorPanic) } else { Ok(()) }
814    }
815
816    /// Subscribe given `recipient` to events of type `E`. See [`BareContext::subscribe()`].
817    pub fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&self, recipient: Recipient<M>) {
818        let mut event_subscribers = self.event_subscribers.write();
819        event_subscribers.subscribe_recipient::<M, E>(recipient);
820    }
821
822    /// Subscribe given `recipient` to events of type `E` and send the last cached event to it.
823    /// See [`BareContext::subscribe_and_receive_latest()`].
824    pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>(
825        &self,
826        recipient: Recipient<M>,
827    ) -> Result<(), SendError> {
828        let mut event_subscribers = self.event_subscribers.write();
829
830        // Send the last cached value if there is one. The cached event sending and adding ourselves
831        // to the subscriber list needs to be under the same write lock guard to avoid race
832        // conditions between this method and `publish()` (to guarantee exactly-once delivery).
833        if let Some(last_cached_value) = event_subscribers.last_value_cache.get(&TypeId::of::<E>())
834            && let Some(msg) = last_cached_value.downcast_ref::<E>()
835        {
836            recipient.send(msg.clone().into())?;
837        }
838
839        event_subscribers.subscribe_recipient::<M, E>(recipient);
840        Ok(())
841    }
842
843    /// Publish an event. All actors that have previously subscribed to the type will receive it.
844    ///
845    /// The event will be also cached. Actors that will subscribe to the type in future may choose
846    /// to receive the last cached event upon subscription.
847    ///
848    /// When sending to some subscriber fails, others are still tried and vec of errors is returned.
849    /// For direct, non-[`Clone`] or high-throughput messages please use [`Addr`] or [`Recipient`].
850    pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> {
851        let event_subscribers = self.event_subscribers.read();
852        let type_id = TypeId::of::<E>();
853
854        // This value update must be under the read lock (even if it would be possible to factor
855        // `last_value_cache` outside of the `RwLock`) to prevent race conditions between this and
856        // `subscribe_and_receive_latest()`.
857        event_subscribers.last_value_cache.insert(type_id, Box::new(event.clone()));
858
859        if let Some(subs) = event_subscribers.events.get(&type_id) {
860            let errors: Vec<SendError> = subs
861                .iter()
862                .filter_map(|subscriber_callback| subscriber_callback(&event).err())
863                .collect();
864            if !errors.is_empty() {
865                return Err(PublishError(errors));
866            }
867        }
868
869        Ok(())
870    }
871
872    pub fn name(&self) -> &str {
873        &self.name
874    }
875
876    /// Return `true` if the actor system is running (not shutting down, not stopped).
877    pub fn is_running(&self) -> bool {
878        self.system_state.read().is_running()
879    }
880}
881
882enum RegistryEntry {
883    InPlace(Sender<Control>, thread::Thread),
884    BackgroundThread(Sender<Control>, thread::JoinHandle<()>),
885}
886
887impl RegistryEntry {
888    fn name(&self) -> String {
889        match self {
890            RegistryEntry::InPlace(_, thread_handle) => {
891                thread_handle.name().unwrap_or("unnamed").to_owned()
892            },
893            RegistryEntry::BackgroundThread(_, join_handle) => {
894                join_handle.thread().name().unwrap_or("unnamed").to_owned()
895            },
896        }
897    }
898
899    fn control_addr(&mut self) -> &mut Sender<Control> {
900        match self {
901            RegistryEntry::InPlace(control_addr, _) => control_addr,
902            RegistryEntry::BackgroundThread(control_addr, _) => control_addr,
903        }
904    }
905}
906
907/// The set of available control messages that all actors respond to.
908pub enum Control {
909    /// Stop the actor
910    Stop,
911}
912
913/// The base actor trait.
914pub trait Actor {
915    /// The expected type of a message to be received.
916    // 'static required to create trait object in Addr, https://stackoverflow.com/q/29740488/4345715
917    type Message: Send + 'static;
918    /// The type to return on error in the handle method.
919    type Error: std::fmt::Display;
920    /// What kind of context this actor accepts. Usually [`Context<Self::Message>`].
921    type Context;
922
923    /// Default capacity of actor's normal-priority inbox unless overridden by `.with_capacity()`.
924    const DEFAULT_CAPACITY_NORMAL: usize = 5;
925    /// Default capacity of actor's high-priority inbox unless overridden by `.with_capacity()`.
926    const DEFAULT_CAPACITY_HIGH: usize = 5;
927
928    /// The name of the Actor. Used by `tonari-actor` for logging/debugging.
929    /// Default implementation uses [`type_name()`].
930    fn name() -> &'static str {
931        type_name::<Self>()
932    }
933
934    /// Determine priority of a `message` before it is sent to this actor.
935    /// Default implementation returns [`Priority::Normal`].
936    fn priority(_message: &Self::Message) -> Priority {
937        Priority::Normal
938    }
939
940    /// An optional callback when the Actor has been started.
941    fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
942        Ok(())
943    }
944
945    /// The primary function of this trait, allowing an actor to handle incoming messages of a certain type.
946    fn handle(
947        &mut self,
948        context: &mut Self::Context,
949        message: Self::Message,
950    ) -> Result<(), Self::Error>;
951
952    /// An optional callback when the Actor has been stopped.
953    fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
954        Ok(())
955    }
956
957    /// An optional callback when a deadline has passed.
958    ///
959    /// The deadline has to be set via [`Context::set_deadline()`] or [`Context::set_timeout()`]
960    /// first. The instant to which the deadline was originally set is passed via the `deadline`
961    /// argument; it is normally close to [`Instant::now()`], but can be later if the actor was busy.
962    ///
963    /// # Periodic tick example
964    /// ```
965    /// # use {std::{cmp::max, time::{Duration, Instant}}, tonari_actor::{Actor, Context}};
966    /// # struct TickingActor;
967    /// impl Actor for TickingActor {
968    /// #    type Context = Context<Self::Message>;
969    /// #    type Error = String;
970    /// #    type Message = ();
971    /// #    fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> { Ok(()) }
972    ///     // ...
973    ///
974    ///     fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<(), String> {
975    ///         // do_periodic_housekeeping();
976    ///
977    ///         // A: Schedule one second from now (even if delayed); drifting tick.
978    ///         context.set_timeout(Some(Duration::from_secs(1)));
979    ///
980    ///         // B: Schedule one second from deadline; non-drifting tick.
981    ///         context.set_deadline(Some(deadline + Duration::from_secs(1)));
982    ///
983    ///         // C: Schedule one second from deadline, but don't fire multiple times if delayed.
984    ///         context.set_deadline(Some(max(deadline + Duration::from_secs(1), Instant::now())));
985    ///
986    ///         Ok(())
987    ///     }
988    /// }
989    /// ```
990    fn deadline_passed(
991        &mut self,
992        _context: &mut Self::Context,
993        _deadline: Instant,
994    ) -> Result<(), Self::Error> {
995        Ok(())
996    }
997
998    /// Create address for this actor with default capacities.
999    fn addr() -> Addr<Self::Message> {
1000        let capacity =
1001            Capacity { normal: Self::DEFAULT_CAPACITY_NORMAL, high: Self::DEFAULT_CAPACITY_HIGH };
1002        Self::addr_with_capacity(capacity)
1003    }
1004
1005    /// Create address for this actor, specifying its inbox size. Accepts [`Capacity`] or [`usize`].
1006    fn addr_with_capacity(capacity: impl Into<Capacity>) -> Addr<Self::Message> {
1007        Addr::new(capacity, Self::name(), Self::priority)
1008    }
1009}
1010
1011pub struct Addr<M> {
1012    recipient: Recipient<M>,
1013    priority_rx: Receiver<M>,
1014    message_rx: Receiver<M>,
1015    control_rx: Receiver<Control>,
1016}
1017
1018impl<M> Clone for Addr<M> {
1019    fn clone(&self) -> Self {
1020        Self {
1021            recipient: self.recipient.clone(),
1022            priority_rx: self.priority_rx.clone(),
1023            message_rx: self.message_rx.clone(),
1024            control_rx: self.control_rx.clone(),
1025        }
1026    }
1027}
1028
1029impl<M> Deref for Addr<M> {
1030    type Target = Recipient<M>;
1031
1032    fn deref(&self) -> &Self::Target {
1033        &self.recipient
1034    }
1035}
1036
1037impl<M: Send + 'static> Addr<M> {
1038    fn new(
1039        capacity: impl Into<Capacity>,
1040        name: &'static str,
1041        get_priority: fn(&M) -> Priority,
1042    ) -> Self {
1043        let capacity: Capacity = capacity.into();
1044
1045        let (priority_tx, priority_rx) = flume::bounded::<M>(capacity.high);
1046        let (message_tx, message_rx) = flume::bounded::<M>(capacity.normal);
1047        let (control_tx, control_rx) = flume::bounded(CONTROL_CHANNEL_CAPACITY);
1048
1049        let message_tx =
1050            Arc::new(MessageSender { high: priority_tx, normal: message_tx, get_priority, name });
1051        Self {
1052            recipient: Recipient { message_tx, control_tx },
1053            priority_rx,
1054            message_rx,
1055            control_rx,
1056        }
1057    }
1058}
1059
1060/// Urgency of a given message. All high-priority messages are delivered before normal priority.
1061#[derive(Clone, Copy, Debug)]
1062pub enum Priority {
1063    Normal,
1064    High,
1065}
1066
1067/// Similar to [`Addr`], but rather than pointing to a specific actor,
1068/// it is typed for any actor that handles a given message-response type.
1069pub struct Recipient<M> {
1070    message_tx: Arc<dyn SenderTrait<M>>,
1071    control_tx: Sender<Control>,
1072}
1073
1074// #[derive(Clone)] adds Clone bound to M, which is not necessary.
1075// https://github.com/rust-lang/rust/issues/26925
1076impl<M> Clone for Recipient<M> {
1077    fn clone(&self) -> Self {
1078        Self { message_tx: Arc::clone(&self.message_tx), control_tx: self.control_tx.clone() }
1079    }
1080}
1081
1082impl<M> Recipient<M> {
1083    /// Send a message to an actor. Returns [`SendError`] if the channel is full; does not block.
1084    /// See [`SendResultExt`] trait for convenient handling of errors.
1085    pub fn send(&self, message: M) -> Result<(), SendError> {
1086        self.message_tx.try_send(message)
1087    }
1088}
1089
1090impl<M: 'static> Recipient<M> {
1091    /// Convert a [`Recipient<M>`] (or [`Addr<M>`] through [`Deref`]) into [`Recipient<N>`], where
1092    /// message `N` can be converted into `M`.
1093    pub fn recipient<N: Into<M>>(&self) -> Recipient<N> {
1094        Recipient {
1095            // Each level of boxing adds one .into() call, so box here to convert A::Message to M.
1096            message_tx: Arc::new(Arc::clone(&self.message_tx)),
1097            control_tx: self.control_tx.clone(),
1098        }
1099    }
1100}
1101
1102pub trait SendResultExt {
1103    /// Don't return an `Err` when the recipient is at full capacity, run `func(receiver_name)`
1104    /// in such a case instead. `receiver_name` is the name of the intended recipient.
1105    fn on_full<F: FnOnce(&'static str, Priority)>(self, func: F) -> Result<(), DisconnectedError>;
1106
1107    /// Don't return an `Err` when the recipient is at full capacity.
1108    fn ignore_on_full(self) -> Result<(), DisconnectedError>;
1109}
1110
1111impl SendResultExt for Result<(), SendError> {
1112    fn on_full<F: FnOnce(&'static str, Priority)>(
1113        self,
1114        callback: F,
1115    ) -> Result<(), DisconnectedError> {
1116        self.or_else(|e| match e {
1117            SendError { recipient_name, priority, reason: SendErrorReason::Full } => {
1118                callback(recipient_name, priority);
1119                Ok(())
1120            },
1121            SendError { recipient_name, priority, reason: SendErrorReason::Disconnected } => {
1122                Err(DisconnectedError { recipient_name, priority })
1123            },
1124        })
1125    }
1126
1127    fn ignore_on_full(self) -> Result<(), DisconnectedError> {
1128        self.on_full(|_, _| ())
1129    }
1130}
1131
1132/// Internal struct to encapsulate ability to send message with priority to an actor.
1133struct MessageSender<M> {
1134    high: Sender<M>,
1135    normal: Sender<M>,
1136    get_priority: fn(&M) -> Priority,
1137    /// Name of the actor we're sending to.
1138    name: &'static str,
1139}
1140
1141/// Internal trait to generalize over [`Sender`].
1142trait SenderTrait<M>: Send + Sync {
1143    fn try_send(&self, message: M) -> Result<(), SendError>;
1144}
1145
1146/// [`SenderTrait`] is implemented for our [`MessageSender`].
1147impl<M: Send> SenderTrait<M> for MessageSender<M> {
1148    fn try_send(&self, message: M) -> Result<(), SendError> {
1149        let priority = (self.get_priority)(&message);
1150        let sender = match priority {
1151            Priority::Normal => &self.normal,
1152            Priority::High => &self.high,
1153        };
1154        sender.try_send(message).map_err(|e| SendError {
1155            reason: e.into(),
1156            recipient_name: self.name,
1157            priority,
1158        })
1159    }
1160}
1161
1162/// [`SenderTrait`] is also implemented for boxed version of itself, including M -> N conversion.
1163impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
1164    fn try_send(&self, message: M) -> Result<(), SendError> {
1165        self.deref().try_send(message.into())
1166    }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use std::{
1172        rc::Rc,
1173        sync::atomic::{AtomicU32, Ordering},
1174        time::Duration,
1175    };
1176
1177    use super::*;
1178
1179    struct TestActor;
1180    impl Actor for TestActor {
1181        type Context = Context<Self::Message>;
1182        type Error = String;
1183        type Message = usize;
1184
1185        fn name() -> &'static str {
1186            // The name is asserted against in tests, use a short stable one rather than the default
1187            // implementation (`type_name()`)`, which is documented not to be stable and currently
1188            // produces `tonari_actor::tests::TestActor`.
1189            "TestActor"
1190        }
1191
1192        fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
1193            println!("message: {message}");
1194            Ok(())
1195        }
1196
1197        fn started(&mut self, _: &mut Self::Context) -> Result<(), String> {
1198            println!("started");
1199            Ok(())
1200        }
1201
1202        fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> {
1203            println!("stopped");
1204            Ok(())
1205        }
1206    }
1207
1208    #[test]
1209    fn it_works() {
1210        let mut system = System::new("hi");
1211        let address = system.spawn(TestActor).unwrap();
1212        let _ = system.spawn(TestActor).unwrap();
1213        let _ = system.spawn(TestActor).unwrap();
1214        let _ = system.spawn(TestActor).unwrap();
1215        let _ = system.spawn(TestActor).unwrap();
1216        address.send(1337usize).unwrap();
1217        address.send(666usize).unwrap();
1218        address.send(1usize).unwrap();
1219        thread::sleep(Duration::from_millis(100));
1220
1221        system.shutdown().unwrap();
1222        thread::sleep(Duration::from_millis(100));
1223    }
1224
1225    #[test]
1226    fn test_ignore_on_full() {
1227        let mut system = System::new("hi");
1228        let address = system.prepare(TestActor).with_capacity(1).spawn().unwrap();
1229        address.send(1337usize).unwrap();
1230        assert!(address.send(666usize).is_err());
1231        address.send(666usize).ignore_on_full().unwrap();
1232
1233        thread::sleep(Duration::from_millis(100));
1234
1235        system.shutdown().unwrap();
1236        thread::sleep(Duration::from_millis(100));
1237    }
1238
1239    #[test]
1240    fn send_constraints() {
1241        #[derive(Default)]
1242        struct LocalActor {
1243            _ensure_not_send_not_sync: Rc<()>,
1244        }
1245        impl Actor for LocalActor {
1246            type Context = Context<Self::Message>;
1247            type Error = String;
1248            type Message = ();
1249
1250            fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> {
1251                Ok(())
1252            }
1253
1254            /// We just need this test to compile, not run.
1255            fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> {
1256                ctx.system_handle.shutdown().map_err(|e| e.to_string())
1257            }
1258        }
1259
1260        // Allowable, as the struct will be created on the new thread.
1261        {
1262            let mut system = System::new("send_constraints prepare_fn");
1263            let _ = system.prepare_fn(LocalActor::default).with_default_capacity().spawn().unwrap();
1264        }
1265
1266        // Allowable, as the struct will be run on the current thread.
1267        {
1268            let mut system = System::new("send_constraints run_and_block");
1269            system.prepare(LocalActor::default()).with_default_capacity().run_and_block().unwrap();
1270        }
1271    }
1272
1273    #[test]
1274    fn timeouts() {
1275        struct TimeoutActor {
1276            handle_count: Arc<AtomicU32>,
1277            timeout_count: Arc<AtomicU32>,
1278        }
1279
1280        impl Actor for TimeoutActor {
1281            type Context = Context<Self::Message>;
1282            type Error = String;
1283            type Message = Option<Instant>;
1284
1285            fn handle(
1286                &mut self,
1287                ctx: &mut Self::Context,
1288                msg: Self::Message,
1289            ) -> Result<(), String> {
1290                self.handle_count.fetch_add(1, Ordering::SeqCst);
1291                if msg.is_some() {
1292                    ctx.receive_deadline = msg;
1293                }
1294                Ok(())
1295            }
1296
1297            fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), String> {
1298                self.timeout_count.fetch_add(1, Ordering::SeqCst);
1299                Ok(())
1300            }
1301        }
1302
1303        let mut system = System::new("timeouts");
1304        let (handle_count, timeout_count) = (Default::default(), Default::default());
1305        let actor = TimeoutActor {
1306            handle_count: Arc::clone(&handle_count),
1307            timeout_count: Arc::clone(&timeout_count),
1308        };
1309        let addr = system.spawn(actor).unwrap();
1310
1311        // Test that setting deadline to past triggers the deadline immediately.
1312        addr.send(Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())).unwrap();
1313        thread::sleep(Duration::from_millis(10));
1314        assert_eq!(handle_count.load(Ordering::SeqCst), 1);
1315        assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1316
1317        // Test the normal case.
1318        addr.send(Some(Instant::now() + Duration::from_millis(20))).unwrap();
1319        thread::sleep(Duration::from_millis(10));
1320        assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1321        assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1322        thread::sleep(Duration::from_millis(20));
1323        assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1324        assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1325
1326        // Test that receiving a message doesn't reset the deadline.
1327        addr.send(Some(Instant::now() + Duration::from_millis(40))).unwrap();
1328        thread::sleep(Duration::from_millis(20));
1329        assert_eq!(handle_count.load(Ordering::SeqCst), 3);
1330        assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1331        addr.send(None).unwrap();
1332        thread::sleep(Duration::from_millis(30));
1333        assert_eq!(handle_count.load(Ordering::SeqCst), 4);
1334        assert_eq!(timeout_count.load(Ordering::SeqCst), 3);
1335
1336        system.shutdown().unwrap();
1337    }
1338
1339    #[test]
1340    fn errors() {
1341        let mut system = System::new("hi");
1342        let low_capacity_actor = TestActor::addr_with_capacity(1);
1343        // Convert to `Recipient` so that we don't keep the receiving side of `Addr` alive.
1344        let stopped_actor = system.spawn(TestActor).unwrap().recipient();
1345
1346        low_capacity_actor.send(9).expect("one message should fit");
1347        let error = low_capacity_actor.send(123).unwrap_err();
1348        assert_eq!(
1349            error.to_string(),
1350            "the capacity of TestActor's Normal-priority channel is full"
1351        );
1352        assert_eq!(
1353            format!("{error:?}"),
1354            r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Full }"#
1355        );
1356
1357        system.shutdown().unwrap();
1358
1359        let error = stopped_actor.send(456usize).unwrap_err();
1360        assert_eq!(error.to_string(), "the recipient of the message (TestActor) no longer exists");
1361        assert_eq!(
1362            format!("{error:?}"),
1363            r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Disconnected }"#
1364        );
1365    }
1366
1367    #[test]
1368    fn message_priorities() {
1369        // Logger might have been initialized by another test, so just try on a best-effort basis.
1370        env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
1371            .try_init()
1372            .ok();
1373
1374        struct PriorityActor {
1375            received: Arc<Mutex<Vec<usize>>>,
1376        }
1377
1378        impl Actor for PriorityActor {
1379            type Context = Context<Self::Message>;
1380            type Error = String;
1381            type Message = usize;
1382
1383            fn handle(
1384                &mut self,
1385                context: &mut Self::Context,
1386                message: Self::Message,
1387            ) -> Result<(), Self::Error> {
1388                let mut received = self.received.lock();
1389                received.push(message);
1390                if received.len() >= 20 {
1391                    context.system_handle.shutdown().unwrap();
1392                }
1393                Ok(())
1394            }
1395
1396            fn priority(message: &Self::Message) -> Priority {
1397                if *message >= 10 { Priority::High } else { Priority::Normal }
1398            }
1399        }
1400
1401        let addr = PriorityActor::addr_with_capacity(10);
1402        let received = Arc::new(Mutex::new(Vec::<usize>::new()));
1403
1404        // Send messages before even actor starts.
1405        for message in 0..20usize {
1406            addr.send(message).unwrap();
1407        }
1408
1409        let mut system = System::new("priorities");
1410        system
1411            .prepare(PriorityActor { received: Arc::clone(&received) })
1412            .with_addr(addr)
1413            .run_and_block()
1414            .unwrap();
1415
1416        assert_eq!(
1417            *received.lock(),
1418            [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1419        );
1420    }
1421
1422    impl Event for () {}
1423
1424    #[test]
1425    fn last_cached_event() {
1426        struct Subscriber;
1427        impl Actor for Subscriber {
1428            type Context = Context<Self::Message>;
1429            type Error = String;
1430            type Message = ();
1431
1432            fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
1433                context.subscribe_and_receive_latest::<Self::Message>().map_err(|e| e.to_string())
1434            }
1435
1436            fn handle(
1437                &mut self,
1438                context: &mut Self::Context,
1439                _: Self::Message,
1440            ) -> Result<(), Self::Error> {
1441                println!("Event received!");
1442                context.system_handle.shutdown().unwrap();
1443                Ok(())
1444            }
1445        }
1446
1447        let mut system = System::new("last cached event");
1448        system.publish(()).expect("can publish event");
1449
1450        // This test will block indefinitely if the event isn't delivered.
1451        system
1452            .prepare(Subscriber)
1453            .with_capacity(1)
1454            .run_and_block()
1455            .expect("actor finishes successfully");
1456    }
1457}