Skip to main content

maxim/
system.rs

1//! Implements the [`ActorSystem`] and related types of Maxim.
2//!
3//! When the [`ActorSystem`] starts up, a number of Reactors will be spawned that will iterate over
4//! Actor's inbound messages, processing them asynchronously. Actors will be ran as many times as
5//! they can over a given time slice until they are pending or have no more messages. If the Actor
6//! is Pending, it will be re-queued when the pending future wakes it. If the Actor has no more
7//! messages, it will be returned to the Executor until it has messages again. This process cycles
8//! until the [`ActorSystem`] is shutdown.
9//!
10//! The user should refer to test cases and examples as "how-to" guides for using Maxim.
11
12#[cfg(feature = "actor-pool")]
13use crate::actors::ActorPoolBuilder;
14use crate::actors::{Actor, ActorBuilder, ActorStream};
15use crate::executor::MaximExecutor;
16use crate::prelude::*;
17use crate::system::system_actor::SystemActor;
18use dashmap::DashMap;
19use log::{debug, error, info, trace, warn};
20use once_cell::sync::OnceCell;
21use secc::{SeccReceiver, SeccSender};
22use serde::{Deserialize, Serialize};
23use std::collections::{BinaryHeap, HashSet};
24use std::error::Error;
25use std::fmt;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Condvar, Mutex};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31use uuid::Uuid;
32
33mod system_actor;
34
35// Holds an ActorSystem in a std::thread_local so that the Aid deserializer and other types can
36// obtain a clone if needed at any time. This needs to be set by each Reactor that is processing
37// messages with the actors.
38std::thread_local! {
39    static ACTOR_SYSTEM: OnceCell<ActorSystem> = OnceCell::new();
40}
41
42/// An enum containing messages that are sent to actors by the actor system itself and are
43/// universal to all actors.
44#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
45pub enum SystemMsg {
46    /// A message that is sent by the system and guaranteed to be the first message that the
47    /// actor receives in its lifetime.
48    Start,
49
50    /// A message that instructs an actor to shut down. The actor receiving this message should
51    /// shut down all open file handles and any other resources and return a [`Status::Stop`]
52    /// from the call to the message processor. Regardless of the return from the actor's
53    /// message processor the actor will be shut down by the actor system.
54    Stop,
55
56    /// A message sent to an actor when a monitored actor is stopped and thus not able to
57    /// process additional messages. The value is the `aid` of the actor that stopped.
58    Stopped { aid: Aid, error: Option<String> },
59}
60
61/// A type used for sending messages to other actor systems.
62#[derive(Clone, Serialize, Deserialize)]
63pub enum WireMessage {
64    /// A message sent as a response to another actor system connecting to this actor system.
65    Hello {
66        /// The `aid` for the system actor on the actor system sending the message.
67        system_actor_aid: Aid,
68    },
69    /// A container for a message from one actor on one system to an actor on another system.
70    ActorMessage {
71        /// The UUID of the [`Aid`] that the message is being sent to.
72        actor_uuid: Uuid,
73        /// The UUID of the system that the destination [`Aid`] is local to.
74        system_uuid: Uuid,
75        /// The message to be sent.
76        message: Message,
77    },
78    /// A container for sending a message with a specified duration delay.
79    DelayedActorMessage {
80        /// The duration to use to delay the message.
81        duration: Duration,
82        /// The UUID of the [`Aid`] that the message is being sent to.
83        actor_uuid: Uuid,
84        /// The UUID of the system that the destination [`Aid`] is local to.
85        system_uuid: Uuid,
86        /// The message to be sent.
87        message: Message,
88    },
89}
90
91/// Configuration structure for the Maxim actor system. Note that this configuration implements
92/// serde serialize and deserialize to allow users to read the config from any serde supported
93/// means.
94#[derive(Clone, Debug, Serialize, Deserialize)]
95pub struct ActorSystemConfig {
96    /// The default size for the channel that is created for each actor. This can be overridden on
97    /// a per-actor basis during spawning as well. Making the default channel size bigger allows
98    /// for more bandwidth in sending messages to actors but also takes more memory. Also the
99    /// user should consider if their actor needs a large channel then it might need to be
100    /// refactored or the threads size should be increased because messages aren't being processed
101    /// fast enough. The default value for this is 32.
102    pub message_channel_size: u16,
103    /// Max duration to wait between attempts to send to an actor's message channel. This is used
104    /// to poll a busy channel that is at its capacity limit. The larger this value is, the longer
105    /// `send` will wait for capacity in the channel but the user should be aware that if the
106    /// system is often waiting on capacity that channel may be too small or the actor may need to
107    /// be refactored to process messages faster. The default value is 1 millisecond.
108    pub send_timeout: Duration,
109    /// The size of the thread pool which governs how many worker threads there are in the system.
110    /// The number of threads should be carefully considered to have sufficient parallelism but not
111    /// over-schedule the CPU on the target hardware. The default value is 4 * the number of logical
112    /// CPUs.
113    pub thread_pool_size: u16,
114    /// The threshold at which the dispatcher thread will warn the user that the message took too
115    /// long to process. If this warning is being logged then the user probably should reconsider
116    /// how their message processing works and refactor big tasks into a number of smaller tasks.
117    /// The default value is 1 millisecond.
118    pub warn_threshold: Duration,
119    /// This controls how long a processor will spend working on messages for an actor before
120    /// yielding to work on other actors in the system. The dispatcher will continue to pluck
121    /// messages off the actor's channel and process them until this time slice is exceeded. Note
122    /// that actors themselves can exceed this in processing a single message and if so, only one
123    /// message will be processed before yielding. The default value is 1 millisecond.
124    pub time_slice: Duration,
125    /// While Reactors will constantly attempt to get more work, they may run out. At that point,
126    /// they will idle for this duration, or until they get a wakeup notification. Said
127    /// notifications can be missed, so it's best to not set this too high. The default value is 10
128    /// milliseconds. This implementation is backed by a [`Condvar`].
129    pub thread_wait_time: Duration,
130    /// Determines whether the actor system will immediately start when it is created. The default
131    /// value is true.
132    pub start_on_launch: bool,
133}
134
135impl ActorSystemConfig {
136    /// Return a new config with the changed `message_channel_size`.
137    pub fn message_channel_size(mut self, value: u16) -> Self {
138        self.message_channel_size = value;
139        self
140    }
141
142    /// Return a new config with the changed `send_timeout`.
143    pub fn send_timeout(mut self, value: Duration) -> Self {
144        self.send_timeout = value;
145        self
146    }
147
148    /// Return a new config with the changed `thread_pool_size`.
149    pub fn thread_pool_size(mut self, value: u16) -> Self {
150        self.thread_pool_size = value;
151        self
152    }
153
154    /// Return a new config with the changed `warn_threshold`.
155    pub fn warn_threshold(mut self, value: Duration) -> Self {
156        self.warn_threshold = value;
157        self
158    }
159
160    /// Return a new config with the changed `time_slice`.
161    pub fn time_slice(mut self, value: Duration) -> Self {
162        self.time_slice = value;
163        self
164    }
165
166    /// Return a new config with the changed `thread_wait_time`.
167    pub fn thread_wait_time(mut self, value: Duration) -> Self {
168        self.thread_wait_time = value;
169        self
170    }
171}
172
173impl Default for ActorSystemConfig {
174    /// Create the config with the default values.
175    fn default() -> ActorSystemConfig {
176        ActorSystemConfig {
177            thread_pool_size: (num_cpus::get() * 4) as u16,
178            warn_threshold: Duration::from_millis(1),
179            time_slice: Duration::from_millis(1),
180            thread_wait_time: Duration::from_millis(100),
181            message_channel_size: 32,
182            send_timeout: Duration::from_millis(1),
183            start_on_launch: true,
184        }
185    }
186}
187
188/// Errors produced by the ActorSystem
189#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
190pub enum SystemError {
191    /// An error returned when an actor is already using a local name at the time the user tries
192    /// to register that name for a new actor. The error contains the name that was attempted
193    /// to be registered.
194    NameAlreadyUsed(String),
195}
196
197impl std::fmt::Display for SystemError {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        write!(f, "{:?}", self)
200    }
201}
202
203impl Error for SystemError {}
204
205/// Information for communicating with a remote actor system.
206pub struct RemoteInfo {
207    /// The UUID of the remote system.
208    pub system_uuid: Uuid,
209    /// The channel to use to send messages to the remote system.
210    pub sender: SeccSender<WireMessage>,
211    /// The channel to use to receive messages from the remote system.
212    pub receiver: SeccReceiver<WireMessage>,
213    /// The AID to the system actor for the remote system.
214    pub system_actor_aid: Aid,
215    /// The handle returned by the thread processing remote messages.
216    // FIXME (Issue #76) Add graceful shutdown for threads handling remotes.
217    _handle: JoinHandle<()>,
218}
219
220/// Stores a message that will be sent to an actor with a delay.
221struct DelayedMessage {
222    /// A unique identifier for a message.
223    uuid: Uuid,
224    /// The Aid that the message will be sent to.
225    destination: Aid,
226    /// The minimum instant that the message should be sent.
227    instant: Instant,
228    /// The message to sent.
229    message: Message,
230}
231
232impl std::cmp::PartialEq for DelayedMessage {
233    fn eq(&self, other: &Self) -> bool {
234        self.uuid == other.uuid
235    }
236}
237
238impl std::cmp::Eq for DelayedMessage {}
239
240impl std::cmp::PartialOrd for DelayedMessage {
241    fn partial_cmp(&self, other: &DelayedMessage) -> Option<std::cmp::Ordering> {
242        Some(other.instant.cmp(&self.instant)) // Uses an inverted sort.
243    }
244}
245
246impl std::cmp::Ord for DelayedMessage {
247    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
248        self.partial_cmp(other)
249            .expect("DelayedMessage::partial_cmp() returned None; can't happen")
250    }
251}
252
253/// Contains the inner data used by the actor system.
254pub(crate) struct ActorSystemData {
255    /// Unique version 4 UUID for this actor system.
256    pub(crate) uuid: Uuid,
257    /// The config for the actor system which was passed to it when created.
258    pub(crate) config: ActorSystemConfig,
259    /// Holds handles to the pool of threads processing the work channel.
260    threads: Mutex<Vec<JoinHandle<()>>>,
261    /// The Executor responsible for managing the runtime of the Actors
262    executor: MaximExecutor,
263    /// Whether the ActorSystem has started or not.
264    started: AtomicBool,
265    /// A flag and condvar that can be used to send a signal when the system begins to shutdown.
266    shutdown_triggered: Arc<(Mutex<bool>, Condvar)>,
267    /// Holds the [`Actor`] objects keyed by the [`Aid`].
268    actors_by_aid: Arc<DashMap<Aid, Arc<Actor>>>,
269    /// Holds a map of the actor ids by the UUID in the actor id. UUIDs of actor ids are assigned
270    /// when an actor is spawned using version 4 UUIDs.
271    aids_by_uuid: Arc<DashMap<Uuid, Aid>>,
272    /// Holds a map of user assigned names to actor ids set when the actors were spawned. Note
273    /// that only actors with an assigned name will be in this map.
274    aids_by_name: Arc<DashMap<String, Aid>>,
275    /// Holds a map of monitors where the key is the `aid` of the actor being monitored and
276    /// the value is a vector of `aid`s that are monitoring the actor.
277    monitoring_by_monitored: Arc<DashMap<Aid, HashSet<Aid>>>,
278    /// Holds a map of information objects about links to remote actor systems.
279    remotes: Arc<DashMap<Uuid, RemoteInfo>>,
280    /// Holds the messages that have been enqueued for delayed send.
281    delayed_messages: Arc<(Mutex<BinaryHeap<DelayedMessage>>, Condvar)>,
282}
283
284/// An actor system that contains and manages the actors spawned inside it.
285#[derive(Clone)]
286pub struct ActorSystem {
287    /// This field means the user doesnt have to worry about declaring `Arc<ActorSystem>` all
288    /// over the place but can just use `ActorSystem` instead. Wrapping the data also allows
289    /// `&self` semantics on the methods which feels more ergonomic.
290    pub(crate) data: Arc<ActorSystemData>,
291}
292
293impl ActorSystem {
294    /// Creates an actor system with the given config. The user should benchmark how many slots
295    /// are needed in the work channel, the number of threads they need in the system and and so
296    /// on in order to satisfy the requirements of the software they are creating.
297    pub fn create(config: ActorSystemConfig) -> ActorSystem {
298        let uuid = Uuid::new_v4();
299        let threads = Mutex::new(Vec::with_capacity(config.thread_pool_size as usize));
300        let shutdown_triggered = Arc::new((Mutex::new(false), Condvar::new()));
301
302        let executor = MaximExecutor::new(shutdown_triggered.clone());
303
304        let start_on_launch = config.start_on_launch;
305
306        // Creates the actor system with the thread pools and actor map initialized.
307        let system = ActorSystem {
308            data: Arc::new(ActorSystemData {
309                uuid,
310                config,
311                threads,
312                executor,
313                started: AtomicBool::new(false),
314                shutdown_triggered,
315                actors_by_aid: Arc::new(DashMap::default()),
316                aids_by_uuid: Arc::new(DashMap::default()),
317                aids_by_name: Arc::new(DashMap::default()),
318                monitoring_by_monitored: Arc::new(DashMap::default()),
319                remotes: Arc::new(DashMap::default()),
320                delayed_messages: Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new())),
321            }),
322        };
323
324        // Starts the actor system if configured to do so
325        if start_on_launch {
326            system.start();
327        }
328
329        system
330    }
331
332    /// Starts an unstarted ActorSystem. The function will do nothing if the ActorSystem has already been started.
333    pub fn start(&self) {
334        if !self
335            .data
336            .started
337            .compare_and_swap(false, true, Ordering::Relaxed)
338        {
339            info!("ActorSystem {} has spawned", self.data.uuid);
340            self.data.executor.init(self);
341
342            // We have the thread pool in a mutex to avoid a chicken & egg situation with the actor
343            // system not being created yet but needed by the thread. We put this code in a block to
344            // get around rust borrow constraints without unnecessarily copying things.
345            {
346                let mut guard = self.data.threads.lock().unwrap();
347
348                // Start the thread that reads from the `delayed_messages` queue.
349                // FIXME Put in ability to confirm how many of these to start.
350                for _ in 0..1 {
351                    let thread = self.start_send_after_thread();
352                    guard.push(thread);
353                }
354            }
355
356            // Launch the SystemActor and give it the name "System"
357            self.spawn()
358                .name("System")
359                .with(SystemActor, SystemActor::processor)
360                .unwrap();
361        }
362    }
363
364    /// Starts a thread that monitors the delayed_messages and sends the messages when their
365    /// delays have elapsed.
366    // FIXME Add a graceful shutdown to this thread and notifications.
367    fn start_send_after_thread(&self) -> JoinHandle<()> {
368        let system = self.clone();
369        let delayed_messages = self.data.delayed_messages.clone();
370        thread::spawn(move || {
371            while !*system.data.shutdown_triggered.0.lock().unwrap() {
372                let (ref mutex, ref condvar) = &*delayed_messages;
373                let mut data = mutex.lock().unwrap();
374                match data.peek() {
375                    None => {
376                        // wait to be notified something is added.
377                        let _ = condvar.wait(data).unwrap();
378                    }
379                    Some(msg) => {
380                        let now = Instant::now();
381                        if now >= msg.instant {
382                            trace!("Sending delayed message");
383                            msg.destination
384                                .send(msg.message.clone())
385                                .unwrap_or_else(|error| {
386                                    warn!(
387                                        "Cannot send scheduled message to {}: Error {:?}",
388                                        msg.destination, error
389                                    );
390                                });
391                            data.pop();
392                        } else {
393                            let duration = msg.instant.duration_since(now);
394                            let _result = condvar.wait_timeout(data, duration).unwrap();
395                        }
396                    }
397                }
398            }
399        })
400    }
401
402    /// Returns a reference to the config for this actor system.
403    pub fn config(&self) -> &ActorSystemConfig {
404        &self.data.config
405    }
406
407    /// Locates the sender for the remote actor system with the given Uuid.
408    pub(crate) fn remote_sender(&self, system_uuid: &Uuid) -> Option<SeccSender<WireMessage>> {
409        self.data
410            .remotes
411            .get(system_uuid)
412            .map(|info| info.sender.clone())
413    }
414
415    /// Adds a connection to a remote actor system. When the connection is established the
416    /// actor system will announce itself to the remote system with a [`WireMessage::Hello`].
417    pub fn connect(
418        &self,
419        sender: &SeccSender<WireMessage>,
420        receiver: &SeccReceiver<WireMessage>,
421    ) -> Uuid {
422        // Announce ourselves to the other system and get their info.
423        let hello = WireMessage::Hello {
424            system_actor_aid: self.system_actor_aid(),
425        };
426        sender.send(hello).unwrap();
427        debug!("Sending hello from {}", self.data.uuid);
428
429        // FIXME (Issue #75) Make error handling in ActorSystem::connect more robust.
430        let system_actor_aid =
431            match receiver.receive_await_timeout(self.data.config.thread_wait_time) {
432                Ok(message) => match message {
433                    WireMessage::Hello { system_actor_aid } => system_actor_aid,
434                    _ => panic!("Expected first message to be a Hello"),
435                },
436                Err(e) => panic!("Expected to read a Hello message {:?}", e),
437            };
438
439        // Starts a thread to read incoming wire messages and process them.
440        let system = self.clone();
441        let receiver_clone = receiver.clone();
442        let thread_timeout = self.data.config.thread_wait_time;
443        let sys_uuid = system_actor_aid.system_uuid();
444        let handle = thread::spawn(move || {
445            system.init_current();
446            // FIXME (Issue #76) Add graceful shutdown for threads handling remotes including
447            // informing remote that the system is exiting.
448            while !*system.data.shutdown_triggered.0.lock().unwrap() {
449                match receiver_clone.receive_await_timeout(thread_timeout) {
450                    Err(_) => (), // not an error, just loop and try again.
451                    Ok(wire_msg) => system.process_wire_message(&sys_uuid, &wire_msg),
452                }
453            }
454        });
455
456        // Save the info and thread to the remotes map.
457        let info = RemoteInfo {
458            system_uuid: system_actor_aid.system_uuid(),
459            sender: sender.clone(),
460            receiver: receiver.clone(),
461            _handle: handle,
462            system_actor_aid,
463        };
464
465        let uuid = info.system_uuid;
466        self.data.remotes.insert(uuid.clone(), info);
467        uuid
468    }
469
470    /// Disconnects this actor system from the remote actor system with the given UUID.
471    // FIXME Connectivity management needs a lot of work and testing.
472    pub fn disconnect(&self, system_uuid: Uuid) -> Result<(), AidError> {
473        self.data.remotes.remove(&system_uuid);
474        Ok(())
475    }
476
477    /// Connects two actor systems using two channels directly. This can be used as a utility
478    /// in testing or to link two actor systems directly within the same process.
479    pub fn connect_with_channels(system1: &ActorSystem, system2: &ActorSystem) {
480        let (tx1, rx1) = secc::create::<WireMessage>(32, system1.data.config.thread_wait_time);
481        let (tx2, rx2) = secc::create::<WireMessage>(32, system2.data.config.thread_wait_time);
482
483        // We will do this in 2 threads because one connect would block waiting on a message
484        // from the other actor system, causing a deadlock.
485        let system1_clone = system1.clone();
486        let system2_clone = system2.clone();
487        let h1 = thread::spawn(move || system1_clone.connect(&tx1, &rx2));
488        let h2 = thread::spawn(move || system2_clone.connect(&tx2, &rx1));
489
490        // Wait for the completion of the connection.
491        h1.join().unwrap();
492        h2.join().unwrap();
493    }
494
495    /// A helper function to process a wire message from another actor system. The passed uuid
496    /// is the uuid of the remote that sent the message.
497    // FIXME (Issue #74) Make error handling in ActorSystem::process_wire_message more robust.
498    fn process_wire_message(&self, _uuid: &Uuid, wire_message: &WireMessage) {
499        match wire_message {
500            WireMessage::ActorMessage {
501                actor_uuid,
502                system_uuid,
503                message,
504            } => {
505                if let Some(aid) = self.find_aid(&system_uuid, &actor_uuid) {
506                    aid.send(message.clone()).unwrap_or_else(|error| {
507                        warn!("Could not send wire message to {}. Error: {}", aid, error);
508                    })
509                }
510            }
511            WireMessage::DelayedActorMessage {
512                duration,
513                actor_uuid,
514                system_uuid,
515                message,
516            } => {
517                self.find_aid(&system_uuid, &actor_uuid)
518                    .map(|aid| self.send_after(message.clone(), aid, *duration))
519                    .expect("Error not handled yet");
520            }
521            WireMessage::Hello { system_actor_aid } => {
522                debug!("{:?} Got Hello from {}", self.data.uuid, system_actor_aid);
523            }
524        }
525    }
526
527    /// Initializes this actor system to use for the current thread which is necessary if the
528    /// user wishes to serialize and deserialize [`Aid`]s.
529    ///
530    /// ## Contract
531    /// You must run this exactly once per thread where needed.
532    pub fn init_current(&self) {
533        ACTOR_SYSTEM.with(|actor_system| {
534            actor_system
535                .set(self.clone())
536                .expect("Unable to set ACTOR_SYSTEM.");
537        });
538    }
539
540    /// Fetches a clone of a reference to the actor system for the current thread.
541    #[inline]
542    pub fn current() -> ActorSystem {
543        ACTOR_SYSTEM.with(|actor_system| {
544            actor_system
545                .get()
546                .expect("Thread local ACTOR_SYSTEM not set! See `ActorSystem::init_current()`")
547                .clone()
548        })
549    }
550
551    /// Returns the unique UUID for this actor system.
552    #[inline]
553    pub fn uuid(&self) -> Uuid {
554        self.data.uuid
555    }
556
557    /// Triggers a shutdown but doesn't wait for the Reactors to stop.
558    pub fn trigger_shutdown(&self) {
559        let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
560        *mutex.lock().unwrap() = true;
561        condvar.notify_all();
562    }
563
564    /// Awaits the Executor shutting down all Reactors. This is backed by a barrier that Reactors
565    /// will wait on after [`ActorSystem::trigger_shutdown`] is called, blocking until all Reactors
566    /// have stopped.
567    pub fn await_shutdown(&self, timeout: impl Into<Option<Duration>>) -> ShutdownResult {
568        info!("System awaiting shutdown");
569
570        let start = Instant::now();
571        let timeout = timeout.into();
572
573        let result = match timeout {
574            Some(dur) => self.await_shutdown_trigger_with_timeout(dur),
575            None => self.await_shutdown_trigger_without_timeout(),
576        };
577
578        if let Some(r) = result {
579            return r;
580        }
581
582        let timeout = {
583            match timeout {
584                Some(timeout) => {
585                    let elapsed = Instant::now().duration_since(start);
586                    if let Some(t) = timeout.checked_sub(elapsed) {
587                        Some(t)
588                    } else {
589                        return ShutdownResult::TimedOut;
590                    }
591                }
592                None => None,
593            }
594        };
595
596        // Wait for the executor to finish shutting down
597        self.data.executor.await_shutdown(timeout)
598    }
599
600    fn await_shutdown_trigger_with_timeout(&self, mut dur: Duration) -> Option<ShutdownResult> {
601        let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
602        let mut guard = mutex.lock().unwrap();
603        while !*guard {
604            let started = Instant::now();
605            let (new_guard, timeout) = match condvar.wait_timeout(guard, dur) {
606                Ok(ret) => ret,
607                Err(_) => return Some(ShutdownResult::Panicked),
608            };
609
610            if timeout.timed_out() {
611                return Some(ShutdownResult::TimedOut);
612            }
613
614            guard = new_guard;
615            dur -= started.elapsed();
616        }
617        None
618    }
619
620    fn await_shutdown_trigger_without_timeout(&self) -> Option<ShutdownResult> {
621        let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
622        let mut guard = mutex.lock().unwrap();
623        while !*guard {
624            guard = match condvar.wait(guard) {
625                Ok(ret) => ret,
626                Err(_) => return Some(ShutdownResult::Panicked),
627            };
628        }
629        None
630    }
631
632    /// Triggers a shutdown of the system and returns only when all Reactors have shutdown.
633    pub fn trigger_and_await_shutdown(
634        &self,
635        timeout: impl Into<Option<Duration>>,
636    ) -> ShutdownResult {
637        self.trigger_shutdown();
638        self.await_shutdown(timeout)
639    }
640
641    // An internal helper to register an actor in the actor system.
642    pub(crate) fn register_actor(
643        &self,
644        actor: Arc<Actor>,
645        stream: ActorStream,
646    ) -> Result<Aid, SystemError> {
647        let aids_by_name = &self.data.aids_by_name;
648        let actors_by_aid = &self.data.actors_by_aid;
649        let aids_by_uuid = &self.data.aids_by_uuid;
650        let aid = actor.context.aid.clone();
651        if let Some(name_string) = &aid.name() {
652            if aids_by_name.contains_key(name_string) {
653                return Err(SystemError::NameAlreadyUsed(name_string.clone()));
654            } else {
655                aids_by_name.insert(name_string.clone(), aid.clone());
656            }
657        }
658        actors_by_aid.insert(aid.clone(), actor);
659        aids_by_uuid.insert(aid.uuid(), aid.clone());
660        self.data.executor.register_actor(stream);
661        aid.send(Message::new(SystemMsg::Start)).unwrap(); // Actor was just made
662        Ok(aid)
663    }
664
665    /// Creates a single use builder for this actor system that allows a user to build actors
666    /// using a chained syntax while optionally providing configuration parameters if desired.
667    ///
668    /// # Examples
669    /// ```
670    /// use maxim::prelude::*;
671    ///
672    /// let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
673    ///
674    /// async fn handler(mut count: usize, _: Context, _: Message) -> ActorResult<usize> {
675    ///     count += 1;
676    ///     Ok(Status::done(count))
677    /// }
678    ///
679    /// let state = 0 as usize;
680    ///
681    /// let aid1 = system.spawn().with(state, handler).unwrap();
682    /// let aid2 = system.spawn().name("Foo").with(state, handler).unwrap();
683    /// let aid3 = system.spawn().channel_size(10).with(state, handler).unwrap();
684    /// ```
685    pub fn spawn(&self) -> ActorBuilder {
686        ActorBuilder {
687            system: self.clone(),
688            name: None,
689            channel_size: None,
690        }
691    }
692
693    /// Create a one use actor pool builder that can be used to create a pool of actors.
694    ///
695    /// The state and the handler function will be cloned and the `count` of actors will be spawned
696    /// and their [`Aid`]s added to an [`AidPool`]. With the [`AidPool`] you can send messages to
697    /// the pool to have one of the actors in the pool handle each message. The method that is used
698    /// to select an actor to handle the message depends on the [`AidPool`] implmentation. The most
699    /// common actor pool implementation is [`RandomAidPool`].
700    ///
701    /// # Examples
702    /// ```
703    /// use maxim::prelude::*;
704    ///
705    /// let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
706    ///
707    /// async fn handler(mut count: usize, _: Context, _: Message) -> ActorResult<usize> {
708    ///     // Do something
709    ///     Ok(Status::done(count))
710    /// }
711    ///
712    /// let state = 0 as usize;
713    ///
714    /// let mut aid_pool: RandomAidPool = system.spawn_pool(10).with(state, handler).unwrap();
715    /// // Send a message to one of the actors in the pool
716    /// aid_pool.send_new(()).unwrap();
717    /// ```
718    #[cfg(feature = "actor-pool")]
719    pub fn spawn_pool(&self, count: usize) -> ActorPoolBuilder {
720        ActorPoolBuilder::new(
721            ActorBuilder {
722                system: self.clone(),
723                name: None,
724                channel_size: None,
725            },
726            count,
727        )
728    }
729
730    /// Schedules the `aid` for work. Note that this is the only time that we have to use the
731    /// lookup table. This function gets called when an actor goes from 0 receivable messages to
732    /// 1 receivable message. If the actor has more receivable messages then this will not be
733    /// needed to be called because the dispatcher threads will handle the process of resending
734    /// the actor to the work channel.
735    // TODO Put tests verifying the resend on multiple messages.
736    pub(crate) fn schedule(&self, aid: Aid) {
737        let actors_by_aid = &self.data.actors_by_aid;
738        if actors_by_aid.contains_key(&aid) {
739            self.data.executor.wake(aid);
740        } else {
741            // The actor was removed from the map so ignore the problem and just log
742            // a warning.
743            warn!(
744                "Attempted to schedule actor with aid {:?} on system with node_id {:?} but
745                the actor does not exist.",
746                aid,
747                self.data.uuid.to_string(),
748            );
749        }
750    }
751
752    /// Stops an actor by shutting down its channels and removing it from the actors list and
753    /// telling the [`Aid`] to not allow messages to be sent to the actor since the receiving
754    /// side of the actor is gone.
755    ///
756    /// This is something that should rarely be called from the outside as it is much better to
757    /// send the actor a [`SystemMsg::Stop`] message and allow it to stop gracefully.
758    pub fn stop_actor(&self, aid: &Aid) {
759        self.internal_stop_actor(aid, None);
760    }
761
762    /// Internal implementation of stop_actor, so we have the ability to send an error along with
763    /// the notification of stop.
764    pub(crate) fn internal_stop_actor(&self, aid: &Aid, error: impl Into<Option<StdError>>) {
765        {
766            let actors_by_aid = &self.data.actors_by_aid;
767            let aids_by_uuid = &self.data.aids_by_uuid;
768            let aids_by_name = &self.data.aids_by_name;
769            actors_by_aid.remove(aid);
770            aids_by_uuid.remove(&aid.uuid());
771            if let Some(name_string) = aid.name() {
772                aids_by_name.remove(&name_string);
773            }
774            aid.stop().unwrap();
775        }
776
777        // Notify all of the actors monitoring the actor that is stopped and remove the
778        // actor from the map of monitors.
779        if let Some((_, monitoring)) = self.data.monitoring_by_monitored.remove(&aid) {
780            let error = error.into().map(|e| format!("{}", e));
781            for m_aid in monitoring {
782                let value = SystemMsg::Stopped {
783                    aid: aid.clone(),
784                    error: error.clone(),
785                };
786                m_aid.send(Message::new(value)).unwrap_or_else(|error| {
787                    error!(
788                        "Could not send 'Stopped' to monitoring actor {}: Error: {:?}",
789                        m_aid, error
790                    );
791                });
792            }
793        }
794    }
795
796    /// Checks to see if the actor with the given [`Aid`] is alive within this actor system.
797    pub fn is_actor_alive(&self, aid: &Aid) -> bool {
798        let actors_by_aid = &self.data.actors_by_aid;
799        actors_by_aid.contains_key(aid)
800    }
801
802    /// Look up an [`Aid`] by the unique UUID of the actor and either returns the located
803    /// [`Aid`] in a [`Option::Some`] or [`Option::None`] if not found.
804    pub fn find_aid_by_uuid(&self, uuid: &Uuid) -> Option<Aid> {
805        let aids_by_uuid = &self.data.aids_by_uuid;
806        aids_by_uuid.get(uuid).map(|aid| aid.clone())
807    }
808
809    /// Look up an [`Aid`] by the user assigned name of the actor and either returns the
810    /// located [`Aid`] in a [`Option::Some`] or [`Option::None`] if not found.
811    pub fn find_aid_by_name(&self, name: &str) -> Option<Aid> {
812        let aids_by_name = &self.data.aids_by_name;
813        aids_by_name.get(&name.to_string()).map(|aid| aid.clone())
814    }
815
816    /// A helper that finds an [`Aid`] on this system from the `system_uuid` and `actor_uuid`
817    /// passed to the function. If the `system_uuid` doesn't match this system then a `None` will
818    /// be returned. Also if the `system_uuid` matches but the actor is not found a `None` will
819    /// be returned.
820    fn find_aid(&self, system_uuid: &Uuid, actor_uuid: &Uuid) -> Option<Aid> {
821        if self.uuid() == *system_uuid {
822            self.find_aid_by_uuid(&actor_uuid)
823        } else {
824            None
825        }
826    }
827
828    /// Returns the [`Aid`] to the "System" actor for this actor system.
829    pub fn system_actor_aid(&self) -> Aid {
830        self.find_aid_by_name(&"System").unwrap()
831    }
832
833    /// Adds a monitor so that `monitoring` will be informed if `monitored` stops.
834    pub fn monitor(&self, monitoring: &Aid, monitored: &Aid) {
835        let mut monitoring_by_monitored = self
836            .data
837            .monitoring_by_monitored
838            .get_raw_mut_from_key(&monitored);
839        let monitoring_vec = monitoring_by_monitored
840            .entry(monitored.clone())
841            .or_insert(HashSet::new());
842        monitoring_vec.insert(monitoring.clone());
843    }
844
845    /// Asynchronously send a message to the system actors on all connected actor systems.
846    // FIXME (Issue #72) Add try_send ability.
847    pub fn send_to_system_actors(&self, message: Message) {
848        let remotes = &*self.data.remotes;
849        trace!("Sending message to Remote System Actors");
850        for remote in remotes.iter() {
851            let aid = &remote.value().system_actor_aid;
852            aid.send(message.clone()).unwrap_or_else(|error| {
853                error!("Could not send to system actor {}. Error: {}", aid, error)
854            });
855        }
856    }
857
858    /// Schedules a `message` to be sent to the `destination` [`Aid`] after a `delay`. Note
859    /// That this method makes a best attempt at sending the message on time but the message may
860    /// not be sent on exactly the delay passed. However, the message will never be sent before
861    /// the given delay.
862    pub(crate) fn send_after(&self, message: Message, destination: Aid, delay: Duration) {
863        let instant = Instant::now().checked_add(delay).unwrap();
864        let entry = DelayedMessage {
865            uuid: Uuid::new_v4(),
866            destination,
867            instant,
868            message,
869        };
870        let (ref mutex, ref condvar) = &*self.data.delayed_messages;
871        let mut data = mutex.lock().unwrap();
872        data.push(entry);
873        condvar.notify_all();
874    }
875
876    #[cfg(test)]
877    pub(crate) fn executor(&self) -> &MaximExecutor {
878        &self.data.executor
879    }
880}
881
882impl fmt::Debug for ActorSystem {
883    fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
884        write!(
885            formatter,
886            "ActorSystem{{uuid: {}, config: {:?}}}",
887            self.data.uuid.to_string(),
888            self.data.config,
889        )
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896    use crate::system::system_actor::SystemActorMessage;
897    use crate::tests::*;
898    use futures::future;
899    use std::thread;
900
901    // A helper to start two actor systems and connect them.
902    fn start_and_connect_two_systems() -> (ActorSystem, ActorSystem) {
903        let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
904        let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
905        ActorSystem::connect_with_channels(&system1, &system2);
906        (system1, system2)
907    }
908
909    /// Helper to wait for 2 actor systems to shutdown or panic if they don't do so within
910    /// 2000 milliseconds.
911    fn await_two_system_shutdown(system1: ActorSystem, system2: ActorSystem) {
912        let h1 = thread::spawn(move || {
913            system1.await_shutdown(None);
914        });
915
916        let h2 = thread::spawn(move || {
917            system2.await_shutdown(None);
918        });
919
920        h1.join().unwrap();
921        h2.join().unwrap();
922    }
923
924    /// Test that verifies that the actor system shutdown mechanisms that wait for a specific
925    /// timeout work properly.
926    #[test]
927    fn test_shutdown_await_timeout() {
928        use std::time::Duration;
929
930        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
931        system
932            .spawn()
933            .with((), |_state: (), context: Context, _: Message| {
934                async move {
935                    // Block for enough time so we can test timeout twice
936                    sleep(100);
937                    context.system.trigger_shutdown();
938                    Ok(Status::done(()))
939                }
940            })
941            .unwrap();
942
943        // Expecting to timeout
944        assert_eq!(
945            system.await_shutdown(Duration::from_millis(10)),
946            ShutdownResult::TimedOut
947        );
948
949        // Expecting to NOT timeout
950        assert_eq!(
951            system.await_shutdown(Duration::from_millis(200)),
952            ShutdownResult::Ok
953        );
954
955        // Validate that if the system is already shutdown the method doesn't hang.
956        // FIXME Design a means that this cannot ever hang the test.
957        system.await_shutdown(None);
958    }
959
960    /// This test verifies that an actor can be found by its uuid.
961    #[test]
962    fn test_find_by_uuid() {
963        init_test_log();
964
965        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
966        let aid = system.spawn().with((), simple_handler).unwrap();
967        aid.send_new(11).unwrap();
968        await_received(&aid, 2, 1000).unwrap();
969        let found = system.find_aid_by_uuid(&aid.uuid()).unwrap();
970        assert!(Aid::ptr_eq(&aid, &found));
971
972        assert_eq!(None, system.find_aid_by_uuid(&Uuid::new_v4()));
973
974        system.trigger_and_await_shutdown(None);
975    }
976
977    /// This test verifies that an actor can be found by its name if it has one.
978    #[test]
979    fn test_find_by_name() {
980        init_test_log();
981
982        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
983        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
984        aid.send_new(11).unwrap();
985        await_received(&aid, 2, 1000).unwrap();
986        let found = system.find_aid_by_name(&aid.name().unwrap()).unwrap();
987        assert!(Aid::ptr_eq(&aid, &found));
988
989        assert_eq!(None, system.find_aid_by_name("B"));
990
991        system.trigger_and_await_shutdown(None);
992    }
993
994    /// This tests the find_aid function that takes a system uuid and an actor uuid.
995    #[test]
996    fn test_find_aid() {
997        init_test_log();
998
999        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1000        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
1001        await_received(&aid, 1, 1000).unwrap();
1002        let found = system.find_aid(&aid.system_uuid(), &aid.uuid()).unwrap();
1003        assert!(Aid::ptr_eq(&aid, &found));
1004
1005        assert_eq!(None, system.find_aid(&aid.system_uuid(), &Uuid::new_v4()));
1006        assert_eq!(None, system.find_aid(&Uuid::new_v4(), &aid.uuid()));
1007
1008        system.trigger_and_await_shutdown(None);
1009    }
1010
1011    /// Tests that actors that are stopped are removed from all relevant lookup maps and
1012    /// are reported as not being alive.
1013    #[test]
1014    fn test_stop_actor() {
1015        init_test_log();
1016
1017        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1018        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
1019        aid.send_new(11).unwrap();
1020        await_received(&aid, 2, 1000).unwrap();
1021
1022        // Now we stop the actor.
1023        system.stop_actor(&aid);
1024        assert_eq!(false, system.is_actor_alive(&aid));
1025
1026        // Verify the actor is NOT in the maps.
1027        let sys_clone = system.clone();
1028        let actors_by_aid = &sys_clone.data.actors_by_aid;
1029        assert_eq!(false, actors_by_aid.contains_key(&aid));
1030        let aids_by_uuid = &sys_clone.data.aids_by_uuid;
1031        assert_eq!(false, aids_by_uuid.contains_key(&aid.uuid()));
1032        assert_eq!(None, system.find_aid_by_name("A"));
1033        assert_eq!(None, system.find_aid_by_uuid(&aid.uuid()));
1034
1035        system.trigger_and_await_shutdown(None);
1036    }
1037
1038    /// This test verifies that the system can send a message after a particular delay.
1039    // FIXME need separate test for remotes.
1040    #[test]
1041    fn test_send_after() {
1042        init_test_log();
1043
1044        info!("Preparing test");
1045        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1046        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
1047        await_received(&aid, 1, 1000).unwrap();
1048        info!("Test prepared, sending delayed message");
1049
1050        system.send_after(Message::new(11), aid.clone(), Duration::from_millis(10));
1051        info!("Sleeping for initial check");
1052        sleep(5);
1053        assert_eq!(1, aid.received().unwrap());
1054        info!("Sleeping till we're 100% sure we should have the message");
1055        sleep(10);
1056        assert_eq!(2, aid.received().unwrap());
1057
1058        system.trigger_and_await_shutdown(None);
1059    }
1060
1061    /// Tests that if we execute two send_after calls, one for a longer duration than the
1062    /// second, that the message will be sent for the second one before the first one enqueued
1063    /// and that the second one will still arrive properly.
1064    #[test]
1065    fn test_send_after_before_current() {
1066        init_test_log();
1067
1068        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1069
1070        let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
1071        await_received(&aid1, 1, 1000).unwrap();
1072        let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
1073        await_received(&aid2, 1, 1000).unwrap();
1074
1075        aid1.send_after(Message::new(11), Duration::from_millis(50))
1076            .unwrap();
1077
1078        aid2.send_after(Message::new(11), Duration::from_millis(10))
1079            .unwrap();
1080
1081        assert_eq!(1, aid1.received().unwrap());
1082        assert_eq!(1, aid2.received().unwrap());
1083
1084        // We overshoot the timing on the asserts because when the tests are run the CPU is
1085        // busy and the timing can be tricky.
1086        sleep(15);
1087        assert_eq!(1, aid1.received().unwrap());
1088        assert_eq!(2, aid2.received().unwrap());
1089
1090        sleep(50);
1091        assert_eq!(2, aid1.received().unwrap());
1092        assert_eq!(2, aid2.received().unwrap());
1093
1094        system.trigger_and_await_shutdown(None);
1095    }
1096
1097    /// This test verifies that the system does not panic if we schedule to an actor that does
1098    /// not exist in the lookup map. This can happen if a message is sent to an actor after the
1099    /// actor is stopped but before the system notifies the [`Aid`] that the actor has been
1100    /// stopped.
1101    #[test]
1102    fn test_actor_not_in_map() {
1103        init_test_log();
1104
1105        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1106        let aid = system.spawn().with((), simple_handler).unwrap();
1107        await_received(&aid, 1, 1000).unwrap(); // Now it is started for sure.
1108
1109        // We force remove the actor from the system without calling stop so now it cannot
1110        // be scheduled.
1111        let sys_clone = system.clone();
1112        let actors_by_aid = &sys_clone.data.actors_by_aid;
1113        actors_by_aid.remove(&aid);
1114
1115        // Send a message to the actor which should not schedule it but write out a warning.
1116        aid.send_new(11).unwrap();
1117
1118        system.trigger_and_await_shutdown(None);
1119    }
1120
1121    /// Tests connection between two different actor systems using channels.
1122    #[test]
1123    fn test_connect_with_channels() {
1124        let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1125        let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1126        ActorSystem::connect_with_channels(&system1, &system2);
1127        {
1128            system1
1129                .data
1130                .remotes
1131                .get(&system2.data.uuid)
1132                .expect("Unable to find connection with system 2 in system 1");
1133        }
1134        {
1135            system2
1136                .data
1137                .remotes
1138                .get(&system1.data.uuid)
1139                .expect("Unable to find connection with system 1 in system 2");
1140        }
1141    }
1142
1143    // Tests that monitors work in the actor system and send a message to monitoring actors
1144    // when monitored actors stop.
1145    #[test]
1146    fn test_monitors() {
1147        init_test_log();
1148
1149        let tracker = AssertCollect::new();
1150        async fn monitor_handler(
1151            state: (Aid, AssertCollect),
1152            _: Context,
1153            message: Message,
1154        ) -> ActorResult<(Aid, AssertCollect)> {
1155            if let Some(msg) = message.content_as::<SystemMsg>() {
1156                match &*msg {
1157                    SystemMsg::Stopped { aid, error } => {
1158                        state
1159                            .1
1160                            .assert(Aid::ptr_eq(&state.0, aid), "Pointers are not equal!");
1161                        state.1.assert(error.is_none(), "Actor was errored!");
1162                        Ok(Status::done(state))
1163                    }
1164                    SystemMsg::Start => Ok(Status::done(state)),
1165                    _ => state.1.panic("Received some other message!"),
1166                }
1167            } else {
1168                state.1.panic("Received some other message!")
1169            }
1170        }
1171
1172        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1173        let monitored = system.spawn().with((), simple_handler).unwrap();
1174        let not_monitoring = system.spawn().with((), simple_handler).unwrap();
1175        let monitoring1 = system
1176            .spawn()
1177            .with((monitored.clone(), tracker.clone()), monitor_handler)
1178            .unwrap();
1179        let monitoring2 = system
1180            .spawn()
1181            .with((monitored.clone(), tracker.clone()), monitor_handler)
1182            .unwrap();
1183        system.monitor(&monitoring1, &monitored);
1184        system.monitor(&monitoring2, &monitored);
1185
1186        {
1187            // Validate the monitors are there in a block to release mutex afterwards.
1188            let monitoring_by_monitored = &system.data.monitoring_by_monitored;
1189            let m_set = monitoring_by_monitored.get(&monitored).unwrap();
1190            assert!(m_set.contains(&monitoring1));
1191            assert!(m_set.contains(&monitoring2));
1192        }
1193
1194        // Stop the actor and it should be out of the monitors map.
1195        system.stop_actor(&monitored);
1196        await_received(&monitoring1, 2, 1000).unwrap();
1197        await_received(&monitoring2, 2, 1000).unwrap();
1198        await_received(&not_monitoring, 1, 1000).unwrap();
1199
1200        system.trigger_and_await_shutdown(None);
1201        tracker.collect();
1202    }
1203
1204    #[test]
1205    fn test_monitor_gets_panics_errors() {
1206        init_test_log();
1207
1208        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1209        let tracker = AssertCollect::new();
1210        let t = tracker.clone();
1211        let aid = system
1212            .spawn()
1213            .with((), |_: (), _: Context, msg: Message| {
1214                if let Some(_) = msg.content_as::<SystemMsg>() {
1215                    debug!("Not panicking this time");
1216                    return future::ok(Status::done(()));
1217                }
1218
1219                debug!("About to panic");
1220                panic!("I panicked")
1221            })
1222            .unwrap();
1223        let monitor = system
1224            .spawn()
1225            .with(aid.clone(), move |state: Aid, _: Context, msg: Message| {
1226                if let Some(msg) = msg.content_as::<SystemMsg>() {
1227                    match &*msg {
1228                        SystemMsg::Stopped { aid, error } => {
1229                            t.assert(*aid == state, "Aid is not expected Aid");
1230                            t.assert(error.is_some(), "Expected error");
1231                            t.assert(
1232                                error.as_ref().unwrap() == "I panicked",
1233                                "Error message does not match",
1234                            );
1235                            future::ok(Status::stop(state))
1236                        }
1237                        SystemMsg::Start => future::ok(Status::done(state)),
1238                        _ => t.panic("Unexpected message received!"),
1239                    }
1240                } else {
1241                    t.panic("Unexpected message received!")
1242                }
1243            })
1244            .unwrap();
1245        system.monitor(&monitor, &aid);
1246        aid.send_new(()).unwrap();
1247        await_received(&monitor, 2, 1000).unwrap();
1248        system.trigger_and_await_shutdown(Duration::from_millis(1000));
1249        tracker.collect();
1250    }
1251
1252    /// This test verifies that the concept of named actors works properly. When a user wants
1253    /// to declare a named actor they cannot register the same name twice. When the actor using
1254    /// the name currently stops the name should be removed from the registered names and be
1255    /// available again.
1256    #[test]
1257    fn test_named_actor_restrictions() {
1258        init_test_log();
1259        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1260
1261        let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
1262        await_received(&aid1, 1, 1000).unwrap();
1263
1264        let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
1265        await_received(&aid2, 1, 1000).unwrap();
1266
1267        // Spawn an actor that attempts to overwrite "A" in the names and make sure the
1268        // attempt returns an error to be handled.
1269        let result = system.spawn().name("A").with((), simple_handler);
1270        assert_eq!(Err(SystemError::NameAlreadyUsed("A".to_string())), result);
1271
1272        // Verify that the same actor has "A" name and is still up.
1273        let found1 = system.find_aid_by_name("A").unwrap();
1274        assert_eq!(true, system.is_actor_alive(&aid1));
1275        assert!(Aid::ptr_eq(&aid1, &found1));
1276
1277        // Stop "B" and verify that the ActorSystem's maps are cleaned up.
1278        system.stop_actor(&aid2);
1279        assert_eq!(None, system.find_aid_by_name("B"));
1280        assert_eq!(None, system.find_aid_by_uuid(&aid2.uuid()));
1281
1282        // Now we should be able to crate a new actor with the name "B".
1283        let aid3 = system.spawn().name("B").with((), simple_handler).unwrap();
1284        await_received(&aid3, 1, 1000).unwrap();
1285        let found2 = system.find_aid_by_name("B").unwrap();
1286        assert!(Aid::ptr_eq(&aid3, &found2));
1287
1288        system.trigger_and_await_shutdown(None);
1289    }
1290
1291    /// Tests that remote actors can send and receive messages between each other.
1292    #[test]
1293    fn test_remote_actors() {
1294        // In this test our messages are just structs.
1295        #[derive(Serialize, Deserialize, Debug)]
1296        struct Request {
1297            reply_to: Aid,
1298        }
1299
1300        #[derive(Serialize, Deserialize, Debug)]
1301        struct Reply {}
1302
1303        init_test_log();
1304        let tracker = AssertCollect::new();
1305        let t = tracker.clone();
1306        let (system1, system2) = start_and_connect_two_systems();
1307
1308        system1.init_current();
1309        let aid = system1
1310            .spawn()
1311            .with((), move |_: (), context: Context, message: Message| {
1312                let t = t.clone();
1313                async move {
1314                    if let Some(msg) = message.content_as::<Request>() {
1315                        msg.reply_to.send_new(Reply {}).unwrap();
1316                        context.system.trigger_shutdown();
1317                        Ok(Status::stop(()))
1318                    } else if let Some(_) = message.content_as::<SystemMsg>() {
1319                        Ok(Status::done(()))
1320                    } else {
1321                        t.panic("Unexpected message received!")
1322                    }
1323                }
1324            })
1325            .unwrap();
1326        await_received(&aid, 1, 1000).unwrap();
1327
1328        let t = tracker.clone();
1329        let serialized = bincode::serialize(&aid).unwrap();
1330        system2
1331            .spawn()
1332            .with((), move |_: (), context: Context, message: Message| {
1333                if let Some(_) = message.content_as::<Reply>() {
1334                    debug!("Received reply, shutting down");
1335                    context.system.trigger_shutdown();
1336                    future::ok(Status::stop(()))
1337                } else if let Some(msg) = message.content_as::<SystemMsg>() {
1338                    match &*msg {
1339                        SystemMsg::Start => {
1340                            debug!("Starting request actor");
1341                            let target_aid: Aid = bincode::deserialize(&serialized).unwrap();
1342                            target_aid
1343                                .send_new(Request {
1344                                    reply_to: context.aid.clone(),
1345                                })
1346                                .unwrap();
1347                            future::ok(Status::done(()))
1348                        }
1349                        _ => future::ok(Status::done(())),
1350                    }
1351                } else {
1352                    t.panic("Unexpected message received!")
1353                }
1354            })
1355            .unwrap();
1356
1357        await_two_system_shutdown(system1, system2);
1358        tracker.collect();
1359    }
1360
1361    /// Tests the ability to find an aid on a remote system by name using a `SystemActor`. This
1362    /// also serves as a test for cross system actor communication as well as testing broadcast
1363    /// to multiple system actors in the cluster.
1364    #[test]
1365    fn test_system_actor_find_by_name() {
1366        init_test_log();
1367        let tracker = AssertCollect::new();
1368        let t = tracker.clone();
1369        let (system1, system2) = start_and_connect_two_systems();
1370
1371        let aid1 = system1
1372            .spawn()
1373            .name("A")
1374            .with((), |_: (), context: Context, message: Message| async move {
1375                if let Some(_) = message.content_as::<bool>() {
1376                    context.system.trigger_shutdown();
1377                    Ok(Status::stop(()))
1378                } else {
1379                    Ok(Status::done(()))
1380                }
1381            })
1382            .unwrap();
1383        await_received(&aid1, 1, 1000).unwrap();
1384
1385        system2
1386            .spawn()
1387            .with((), move |_: (), context: Context, message: Message| {
1388                // We have to do this so each async block future gets its own copy.
1389                let aid1 = aid1.clone();
1390                let t = t.clone();
1391                async move {
1392                    if let Some(msg) = message.content_as::<SystemActorMessage>() {
1393                        match &*msg {
1394                            SystemActorMessage::FindByNameResult { aid: found, .. } => {
1395                                debug!("FindByNameResult received");
1396                                if let Some(target) = found {
1397                                    t.assert(
1398                                        target.uuid() == aid1.uuid(),
1399                                        "Target is not expected Actor",
1400                                    );
1401                                    target.send_new(true).unwrap();
1402                                    context.system.trigger_shutdown();
1403                                    Ok(Status::done(()))
1404                                } else {
1405                                    t.panic("Didn't find AID.")
1406                                }
1407                            }
1408                            _ => t.panic("Unexpected message received!"),
1409                        }
1410                    } else if let Some(msg) = message.content_as::<SystemMsg>() {
1411                        debug!("Actor started, attempting to send FindByName request");
1412                        if let SystemMsg::Start = &*msg {
1413                            context.system.send_to_system_actors(Message::new(
1414                                SystemActorMessage::FindByName {
1415                                    reply_to: context.aid.clone(),
1416                                    name: "A".to_string(),
1417                                },
1418                            ));
1419                            Ok(Status::done(()))
1420                        } else {
1421                            t.panic("Unexpected message received!")
1422                        }
1423                    } else {
1424                        t.panic("Unexpected message received!")
1425                    }
1426                }
1427            })
1428            .unwrap();
1429
1430        await_two_system_shutdown(system1, system2);
1431        tracker.collect();
1432    }
1433
1434    /// Tests the ability create an [`AidPool`] and send messages to the pool.
1435    #[test]
1436    #[cfg(feature = "actor-pool")]
1437    fn test_spawn_pool() {
1438        let tracker = AssertCollect::new();
1439        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1440
1441        async fn handler(_: (), _: Context, _: Message) -> ActorResult<()> {
1442            Ok(Status::done(()))
1443        }
1444
1445        // Create an actor pool
1446        let mut aid_pool: RandomAidPool = system
1447            .spawn_pool(3)
1448            .name("handler")
1449            .channel_size(100)
1450            .with((), handler)
1451            .unwrap();
1452
1453        // Send a bunch of messages to the pool
1454        for _ in 0..=100 {
1455            aid_pool.send_new(0).unwrap();
1456        }
1457
1458        // Sleep to make sure we get the messages
1459        sleep(10);
1460
1461        // Convert the pool to a `Vec` of `Aid`s
1462        let aids: Vec<Aid> = aid_pool.into();
1463
1464        // Make sure each aid in the pool has received at least one message
1465        for aid in aids {
1466            assert!(aid.received().unwrap() > 1);
1467        }
1468
1469        system.trigger_and_await_shutdown(None);
1470        tracker.collect();
1471    }
1472}