maxim 0.1.0-alpha.0

Implements a highly-scalable and ergonomic actor system for Rust based on the best of Erlang / Elixir and Akka. A fork of the Axiom actor framework.
Documentation
//! Implements the [`ActorSystem`] and related types of Maxim.
//!
//! When the [`ActorSystem`] starts up, a number of Reactors will be spawned that will iterate over
//! Actor's inbound messages, processing them asynchronously. Actors will be ran as many times as
//! they can over a given time slice until they are pending or have no more messages. If the Actor
//! is Pending, it will be re-queued when the pending future wakes it. If the Actor has no more
//! messages, it will be returned to the Executor until it has messages again. This process cycles
//! until the [`ActorSystem`] is shutdown.
//!
//! The user should refer to test cases and examples as "how-to" guides for using Maxim.

#[cfg(feature = "actor-pool")]
use crate::actors::ActorPoolBuilder;
use crate::actors::{Actor, ActorBuilder, ActorStream};
use crate::executor::MaximExecutor;
use crate::prelude::*;
use crate::system::system_actor::SystemActor;
use dashmap::DashMap;
use log::{debug, error, info, trace, warn};
use once_cell::sync::OnceCell;
use secc::{SeccReceiver, SeccSender};
use serde::{Deserialize, Serialize};
use std::collections::{BinaryHeap, HashSet};
use std::error::Error;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use uuid::Uuid;

mod system_actor;

// Holds an ActorSystem in a std::thread_local so that the Aid deserializer and other types can
// obtain a clone if needed at any time. This needs to be set by each Reactor that is processing
// messages with the actors.
std::thread_local! {
    static ACTOR_SYSTEM: OnceCell<ActorSystem> = OnceCell::new();
}

/// An enum containing messages that are sent to actors by the actor system itself and are
/// universal to all actors.
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum SystemMsg {
    /// A message that is sent by the system and guaranteed to be the first message that the
    /// actor receives in its lifetime.
    Start,

    /// A message that instructs an actor to shut down. The actor receiving this message should
    /// shut down all open file handles and any other resources and return a [`Status::Stop`]
    /// from the call to the message processor. Regardless of the return from the actor's
    /// message processor the actor will be shut down by the actor system.
    Stop,

    /// A message sent to an actor when a monitored actor is stopped and thus not able to
    /// process additional messages. The value is the `aid` of the actor that stopped.
    Stopped { aid: Aid, error: Option<String> },
}

/// A type used for sending messages to other actor systems.
#[derive(Clone, Serialize, Deserialize)]
pub enum WireMessage {
    /// A message sent as a response to another actor system connecting to this actor system.
    Hello {
        /// The `aid` for the system actor on the actor system sending the message.
        system_actor_aid: Aid,
    },
    /// A container for a message from one actor on one system to an actor on another system.
    ActorMessage {
        /// The UUID of the [`Aid`] that the message is being sent to.
        actor_uuid: Uuid,
        /// The UUID of the system that the destination [`Aid`] is local to.
        system_uuid: Uuid,
        /// The message to be sent.
        message: Message,
    },
    /// A container for sending a message with a specified duration delay.
    DelayedActorMessage {
        /// The duration to use to delay the message.
        duration: Duration,
        /// The UUID of the [`Aid`] that the message is being sent to.
        actor_uuid: Uuid,
        /// The UUID of the system that the destination [`Aid`] is local to.
        system_uuid: Uuid,
        /// The message to be sent.
        message: Message,
    },
}

/// Configuration structure for the Maxim actor system. Note that this configuration implements
/// serde serialize and deserialize to allow users to read the config from any serde supported
/// means.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ActorSystemConfig {
    /// The default size for the channel that is created for each actor. This can be overridden on
    /// a per-actor basis during spawning as well. Making the default channel size bigger allows
    /// for more bandwidth in sending messages to actors but also takes more memory. Also the
    /// user should consider if their actor needs a large channel then it might need to be
    /// refactored or the threads size should be increased because messages aren't being processed
    /// fast enough. The default value for this is 32.
    pub message_channel_size: u16,
    /// Max duration to wait between attempts to send to an actor's message channel. This is used
    /// to poll a busy channel that is at its capacity limit. The larger this value is, the longer
    /// `send` will wait for capacity in the channel but the user should be aware that if the
    /// system is often waiting on capacity that channel may be too small or the actor may need to
    /// be refactored to process messages faster. The default value is 1 millisecond.
    pub send_timeout: Duration,
    /// The size of the thread pool which governs how many worker threads there are in the system.
    /// The number of threads should be carefully considered to have sufficient parallelism but not
    /// over-schedule the CPU on the target hardware. The default value is 4 * the number of logical
    /// CPUs.
    pub thread_pool_size: u16,
    /// The threshold at which the dispatcher thread will warn the user that the message took too
    /// long to process. If this warning is being logged then the user probably should reconsider
    /// how their message processing works and refactor big tasks into a number of smaller tasks.
    /// The default value is 1 millisecond.
    pub warn_threshold: Duration,
    /// This controls how long a processor will spend working on messages for an actor before
    /// yielding to work on other actors in the system. The dispatcher will continue to pluck
    /// messages off the actor's channel and process them until this time slice is exceeded. Note
    /// that actors themselves can exceed this in processing a single message and if so, only one
    /// message will be processed before yielding. The default value is 1 millisecond.
    pub time_slice: Duration,
    /// While Reactors will constantly attempt to get more work, they may run out. At that point,
    /// they will idle for this duration, or until they get a wakeup notification. Said
    /// notifications can be missed, so it's best to not set this too high. The default value is 10
    /// milliseconds. This implementation is backed by a [`Condvar`].
    pub thread_wait_time: Duration,
    /// Determines whether the actor system will immediately start when it is created. The default
    /// value is true.
    pub start_on_launch: bool,
}

impl ActorSystemConfig {
    /// Return a new config with the changed `message_channel_size`.
    pub fn message_channel_size(mut self, value: u16) -> Self {
        self.message_channel_size = value;
        self
    }

    /// Return a new config with the changed `send_timeout`.
    pub fn send_timeout(mut self, value: Duration) -> Self {
        self.send_timeout = value;
        self
    }

    /// Return a new config with the changed `thread_pool_size`.
    pub fn thread_pool_size(mut self, value: u16) -> Self {
        self.thread_pool_size = value;
        self
    }

    /// Return a new config with the changed `warn_threshold`.
    pub fn warn_threshold(mut self, value: Duration) -> Self {
        self.warn_threshold = value;
        self
    }

    /// Return a new config with the changed `time_slice`.
    pub fn time_slice(mut self, value: Duration) -> Self {
        self.time_slice = value;
        self
    }

    /// Return a new config with the changed `thread_wait_time`.
    pub fn thread_wait_time(mut self, value: Duration) -> Self {
        self.thread_wait_time = value;
        self
    }
}

impl Default for ActorSystemConfig {
    /// Create the config with the default values.
    fn default() -> ActorSystemConfig {
        ActorSystemConfig {
            thread_pool_size: (num_cpus::get() * 4) as u16,
            warn_threshold: Duration::from_millis(1),
            time_slice: Duration::from_millis(1),
            thread_wait_time: Duration::from_millis(100),
            message_channel_size: 32,
            send_timeout: Duration::from_millis(1),
            start_on_launch: true,
        }
    }
}

/// Errors produced by the ActorSystem
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum SystemError {
    /// An error returned when an actor is already using a local name at the time the user tries
    /// to register that name for a new actor. The error contains the name that was attempted
    /// to be registered.
    NameAlreadyUsed(String),
}

impl std::fmt::Display for SystemError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl Error for SystemError {}

/// Information for communicating with a remote actor system.
pub struct RemoteInfo {
    /// The UUID of the remote system.
    pub system_uuid: Uuid,
    /// The channel to use to send messages to the remote system.
    pub sender: SeccSender<WireMessage>,
    /// The channel to use to receive messages from the remote system.
    pub receiver: SeccReceiver<WireMessage>,
    /// The AID to the system actor for the remote system.
    pub system_actor_aid: Aid,
    /// The handle returned by the thread processing remote messages.
    // FIXME (Issue #76) Add graceful shutdown for threads handling remotes.
    _handle: JoinHandle<()>,
}

/// Stores a message that will be sent to an actor with a delay.
struct DelayedMessage {
    /// A unique identifier for a message.
    uuid: Uuid,
    /// The Aid that the message will be sent to.
    destination: Aid,
    /// The minimum instant that the message should be sent.
    instant: Instant,
    /// The message to sent.
    message: Message,
}

impl std::cmp::PartialEq for DelayedMessage {
    fn eq(&self, other: &Self) -> bool {
        self.uuid == other.uuid
    }
}

impl std::cmp::Eq for DelayedMessage {}

impl std::cmp::PartialOrd for DelayedMessage {
    fn partial_cmp(&self, other: &DelayedMessage) -> Option<std::cmp::Ordering> {
        Some(other.instant.cmp(&self.instant)) // Uses an inverted sort.
    }
}

impl std::cmp::Ord for DelayedMessage {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.partial_cmp(other)
            .expect("DelayedMessage::partial_cmp() returned None; can't happen")
    }
}

/// Contains the inner data used by the actor system.
pub(crate) struct ActorSystemData {
    /// Unique version 4 UUID for this actor system.
    pub(crate) uuid: Uuid,
    /// The config for the actor system which was passed to it when created.
    pub(crate) config: ActorSystemConfig,
    /// Holds handles to the pool of threads processing the work channel.
    threads: Mutex<Vec<JoinHandle<()>>>,
    /// The Executor responsible for managing the runtime of the Actors
    executor: MaximExecutor,
    /// Whether the ActorSystem has started or not.
    started: AtomicBool,
    /// A flag and condvar that can be used to send a signal when the system begins to shutdown.
    shutdown_triggered: Arc<(Mutex<bool>, Condvar)>,
    /// Holds the [`Actor`] objects keyed by the [`Aid`].
    actors_by_aid: Arc<DashMap<Aid, Arc<Actor>>>,
    /// Holds a map of the actor ids by the UUID in the actor id. UUIDs of actor ids are assigned
    /// when an actor is spawned using version 4 UUIDs.
    aids_by_uuid: Arc<DashMap<Uuid, Aid>>,
    /// Holds a map of user assigned names to actor ids set when the actors were spawned. Note
    /// that only actors with an assigned name will be in this map.
    aids_by_name: Arc<DashMap<String, Aid>>,
    /// Holds a map of monitors where the key is the `aid` of the actor being monitored and
    /// the value is a vector of `aid`s that are monitoring the actor.
    monitoring_by_monitored: Arc<DashMap<Aid, HashSet<Aid>>>,
    /// Holds a map of information objects about links to remote actor systems.
    remotes: Arc<DashMap<Uuid, RemoteInfo>>,
    /// Holds the messages that have been enqueued for delayed send.
    delayed_messages: Arc<(Mutex<BinaryHeap<DelayedMessage>>, Condvar)>,
}

/// An actor system that contains and manages the actors spawned inside it.
#[derive(Clone)]
pub struct ActorSystem {
    /// This field means the user doesnt have to worry about declaring `Arc<ActorSystem>` all
    /// over the place but can just use `ActorSystem` instead. Wrapping the data also allows
    /// `&self` semantics on the methods which feels more ergonomic.
    pub(crate) data: Arc<ActorSystemData>,
}

impl ActorSystem {
    /// Creates an actor system with the given config. The user should benchmark how many slots
    /// are needed in the work channel, the number of threads they need in the system and and so
    /// on in order to satisfy the requirements of the software they are creating.
    pub fn create(config: ActorSystemConfig) -> ActorSystem {
        let uuid = Uuid::new_v4();
        let threads = Mutex::new(Vec::with_capacity(config.thread_pool_size as usize));
        let shutdown_triggered = Arc::new((Mutex::new(false), Condvar::new()));

        let executor = MaximExecutor::new(shutdown_triggered.clone());

        let start_on_launch = config.start_on_launch;

        // Creates the actor system with the thread pools and actor map initialized.
        let system = ActorSystem {
            data: Arc::new(ActorSystemData {
                uuid,
                config,
                threads,
                executor,
                started: AtomicBool::new(false),
                shutdown_triggered,
                actors_by_aid: Arc::new(DashMap::default()),
                aids_by_uuid: Arc::new(DashMap::default()),
                aids_by_name: Arc::new(DashMap::default()),
                monitoring_by_monitored: Arc::new(DashMap::default()),
                remotes: Arc::new(DashMap::default()),
                delayed_messages: Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new())),
            }),
        };

        // Starts the actor system if configured to do so
        if start_on_launch {
            system.start();
        }

        system
    }

    /// Starts an unstarted ActorSystem. The function will do nothing if the ActorSystem has already been started.
    pub fn start(&self) {
        if !self
            .data
            .started
            .compare_and_swap(false, true, Ordering::Relaxed)
        {
            info!("ActorSystem {} has spawned", self.data.uuid);
            self.data.executor.init(self);

            // We have the thread pool in a mutex to avoid a chicken & egg situation with the actor
            // system not being created yet but needed by the thread. We put this code in a block to
            // get around rust borrow constraints without unnecessarily copying things.
            {
                let mut guard = self.data.threads.lock().unwrap();

                // Start the thread that reads from the `delayed_messages` queue.
                // FIXME Put in ability to confirm how many of these to start.
                for _ in 0..1 {
                    let thread = self.start_send_after_thread();
                    guard.push(thread);
                }
            }

            // Launch the SystemActor and give it the name "System"
            self.spawn()
                .name("System")
                .with(SystemActor, SystemActor::processor)
                .unwrap();
        }
    }

    /// Starts a thread that monitors the delayed_messages and sends the messages when their
    /// delays have elapsed.
    // FIXME Add a graceful shutdown to this thread and notifications.
    fn start_send_after_thread(&self) -> JoinHandle<()> {
        let system = self.clone();
        let delayed_messages = self.data.delayed_messages.clone();
        thread::spawn(move || {
            while !*system.data.shutdown_triggered.0.lock().unwrap() {
                let (ref mutex, ref condvar) = &*delayed_messages;
                let mut data = mutex.lock().unwrap();
                match data.peek() {
                    None => {
                        // wait to be notified something is added.
                        let _ = condvar.wait(data).unwrap();
                    }
                    Some(msg) => {
                        let now = Instant::now();
                        if now >= msg.instant {
                            trace!("Sending delayed message");
                            msg.destination
                                .send(msg.message.clone())
                                .unwrap_or_else(|error| {
                                    warn!(
                                        "Cannot send scheduled message to {}: Error {:?}",
                                        msg.destination, error
                                    );
                                });
                            data.pop();
                        } else {
                            let duration = msg.instant.duration_since(now);
                            let _result = condvar.wait_timeout(data, duration).unwrap();
                        }
                    }
                }
            }
        })
    }

    /// Returns a reference to the config for this actor system.
    pub fn config(&self) -> &ActorSystemConfig {
        &self.data.config
    }

    /// Locates the sender for the remote actor system with the given Uuid.
    pub(crate) fn remote_sender(&self, system_uuid: &Uuid) -> Option<SeccSender<WireMessage>> {
        self.data
            .remotes
            .get(system_uuid)
            .map(|info| info.sender.clone())
    }

    /// Adds a connection to a remote actor system. When the connection is established the
    /// actor system will announce itself to the remote system with a [`WireMessage::Hello`].
    pub fn connect(
        &self,
        sender: &SeccSender<WireMessage>,
        receiver: &SeccReceiver<WireMessage>,
    ) -> Uuid {
        // Announce ourselves to the other system and get their info.
        let hello = WireMessage::Hello {
            system_actor_aid: self.system_actor_aid(),
        };
        sender.send(hello).unwrap();
        debug!("Sending hello from {}", self.data.uuid);

        // FIXME (Issue #75) Make error handling in ActorSystem::connect more robust.
        let system_actor_aid =
            match receiver.receive_await_timeout(self.data.config.thread_wait_time) {
                Ok(message) => match message {
                    WireMessage::Hello { system_actor_aid } => system_actor_aid,
                    _ => panic!("Expected first message to be a Hello"),
                },
                Err(e) => panic!("Expected to read a Hello message {:?}", e),
            };

        // Starts a thread to read incoming wire messages and process them.
        let system = self.clone();
        let receiver_clone = receiver.clone();
        let thread_timeout = self.data.config.thread_wait_time;
        let sys_uuid = system_actor_aid.system_uuid();
        let handle = thread::spawn(move || {
            system.init_current();
            // FIXME (Issue #76) Add graceful shutdown for threads handling remotes including
            // informing remote that the system is exiting.
            while !*system.data.shutdown_triggered.0.lock().unwrap() {
                match receiver_clone.receive_await_timeout(thread_timeout) {
                    Err(_) => (), // not an error, just loop and try again.
                    Ok(wire_msg) => system.process_wire_message(&sys_uuid, &wire_msg),
                }
            }
        });

        // Save the info and thread to the remotes map.
        let info = RemoteInfo {
            system_uuid: system_actor_aid.system_uuid(),
            sender: sender.clone(),
            receiver: receiver.clone(),
            _handle: handle,
            system_actor_aid,
        };

        let uuid = info.system_uuid;
        self.data.remotes.insert(uuid.clone(), info);
        uuid
    }

    /// Disconnects this actor system from the remote actor system with the given UUID.
    // FIXME Connectivity management needs a lot of work and testing.
    pub fn disconnect(&self, system_uuid: Uuid) -> Result<(), AidError> {
        self.data.remotes.remove(&system_uuid);
        Ok(())
    }

    /// Connects two actor systems using two channels directly. This can be used as a utility
    /// in testing or to link two actor systems directly within the same process.
    pub fn connect_with_channels(system1: &ActorSystem, system2: &ActorSystem) {
        let (tx1, rx1) = secc::create::<WireMessage>(32, system1.data.config.thread_wait_time);
        let (tx2, rx2) = secc::create::<WireMessage>(32, system2.data.config.thread_wait_time);

        // We will do this in 2 threads because one connect would block waiting on a message
        // from the other actor system, causing a deadlock.
        let system1_clone = system1.clone();
        let system2_clone = system2.clone();
        let h1 = thread::spawn(move || system1_clone.connect(&tx1, &rx2));
        let h2 = thread::spawn(move || system2_clone.connect(&tx2, &rx1));

        // Wait for the completion of the connection.
        h1.join().unwrap();
        h2.join().unwrap();
    }

    /// A helper function to process a wire message from another actor system. The passed uuid
    /// is the uuid of the remote that sent the message.
    // FIXME (Issue #74) Make error handling in ActorSystem::process_wire_message more robust.
    fn process_wire_message(&self, _uuid: &Uuid, wire_message: &WireMessage) {
        match wire_message {
            WireMessage::ActorMessage {
                actor_uuid,
                system_uuid,
                message,
            } => {
                if let Some(aid) = self.find_aid(&system_uuid, &actor_uuid) {
                    aid.send(message.clone()).unwrap_or_else(|error| {
                        warn!("Could not send wire message to {}. Error: {}", aid, error);
                    })
                }
            }
            WireMessage::DelayedActorMessage {
                duration,
                actor_uuid,
                system_uuid,
                message,
            } => {
                self.find_aid(&system_uuid, &actor_uuid)
                    .map(|aid| self.send_after(message.clone(), aid, *duration))
                    .expect("Error not handled yet");
            }
            WireMessage::Hello { system_actor_aid } => {
                debug!("{:?} Got Hello from {}", self.data.uuid, system_actor_aid);
            }
        }
    }

    /// Initializes this actor system to use for the current thread which is necessary if the
    /// user wishes to serialize and deserialize [`Aid`]s.
    ///
    /// ## Contract
    /// You must run this exactly once per thread where needed.
    pub fn init_current(&self) {
        ACTOR_SYSTEM.with(|actor_system| {
            actor_system
                .set(self.clone())
                .expect("Unable to set ACTOR_SYSTEM.");
        });
    }

    /// Fetches a clone of a reference to the actor system for the current thread.
    #[inline]
    pub fn current() -> ActorSystem {
        ACTOR_SYSTEM.with(|actor_system| {
            actor_system
                .get()
                .expect("Thread local ACTOR_SYSTEM not set! See `ActorSystem::init_current()`")
                .clone()
        })
    }

    /// Returns the unique UUID for this actor system.
    #[inline]
    pub fn uuid(&self) -> Uuid {
        self.data.uuid
    }

    /// Triggers a shutdown but doesn't wait for the Reactors to stop.
    pub fn trigger_shutdown(&self) {
        let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
        *mutex.lock().unwrap() = true;
        condvar.notify_all();
    }

    /// Awaits the Executor shutting down all Reactors. This is backed by a barrier that Reactors
    /// will wait on after [`ActorSystem::trigger_shutdown`] is called, blocking until all Reactors
    /// have stopped.
    pub fn await_shutdown(&self, timeout: impl Into<Option<Duration>>) -> ShutdownResult {
        info!("System awaiting shutdown");

        let start = Instant::now();
        let timeout = timeout.into();

        let result = match timeout {
            Some(dur) => self.await_shutdown_trigger_with_timeout(dur),
            None => self.await_shutdown_trigger_without_timeout(),
        };

        if let Some(r) = result {
            return r;
        }

        let timeout = {
            match timeout {
                Some(timeout) => {
                    let elapsed = Instant::now().duration_since(start);
                    if let Some(t) = timeout.checked_sub(elapsed) {
                        Some(t)
                    } else {
                        return ShutdownResult::TimedOut;
                    }
                }
                None => None,
            }
        };

        // Wait for the executor to finish shutting down
        self.data.executor.await_shutdown(timeout)
    }

    fn await_shutdown_trigger_with_timeout(&self, mut dur: Duration) -> Option<ShutdownResult> {
        let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
        let mut guard = mutex.lock().unwrap();
        while !*guard {
            let started = Instant::now();
            let (new_guard, timeout) = match condvar.wait_timeout(guard, dur) {
                Ok(ret) => ret,
                Err(_) => return Some(ShutdownResult::Panicked),
            };

            if timeout.timed_out() {
                return Some(ShutdownResult::TimedOut);
            }

            guard = new_guard;
            dur -= started.elapsed();
        }
        None
    }

    fn await_shutdown_trigger_without_timeout(&self) -> Option<ShutdownResult> {
        let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
        let mut guard = mutex.lock().unwrap();
        while !*guard {
            guard = match condvar.wait(guard) {
                Ok(ret) => ret,
                Err(_) => return Some(ShutdownResult::Panicked),
            };
        }
        None
    }

    /// Triggers a shutdown of the system and returns only when all Reactors have shutdown.
    pub fn trigger_and_await_shutdown(
        &self,
        timeout: impl Into<Option<Duration>>,
    ) -> ShutdownResult {
        self.trigger_shutdown();
        self.await_shutdown(timeout)
    }

    // An internal helper to register an actor in the actor system.
    pub(crate) fn register_actor(
        &self,
        actor: Arc<Actor>,
        stream: ActorStream,
    ) -> Result<Aid, SystemError> {
        let aids_by_name = &self.data.aids_by_name;
        let actors_by_aid = &self.data.actors_by_aid;
        let aids_by_uuid = &self.data.aids_by_uuid;
        let aid = actor.context.aid.clone();
        if let Some(name_string) = &aid.name() {
            if aids_by_name.contains_key(name_string) {
                return Err(SystemError::NameAlreadyUsed(name_string.clone()));
            } else {
                aids_by_name.insert(name_string.clone(), aid.clone());
            }
        }
        actors_by_aid.insert(aid.clone(), actor);
        aids_by_uuid.insert(aid.uuid(), aid.clone());
        self.data.executor.register_actor(stream);
        aid.send(Message::new(SystemMsg::Start)).unwrap(); // Actor was just made
        Ok(aid)
    }

    /// Creates a single use builder for this actor system that allows a user to build actors
    /// using a chained syntax while optionally providing configuration parameters if desired.
    ///
    /// # Examples
    /// ```
    /// use maxim::prelude::*;
    ///
    /// let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
    ///
    /// async fn handler(mut count: usize, _: Context, _: Message) -> ActorResult<usize> {
    ///     count += 1;
    ///     Ok(Status::done(count))
    /// }
    ///
    /// let state = 0 as usize;
    ///
    /// let aid1 = system.spawn().with(state, handler).unwrap();
    /// let aid2 = system.spawn().name("Foo").with(state, handler).unwrap();
    /// let aid3 = system.spawn().channel_size(10).with(state, handler).unwrap();
    /// ```
    pub fn spawn(&self) -> ActorBuilder {
        ActorBuilder {
            system: self.clone(),
            name: None,
            channel_size: None,
        }
    }

    /// Create a one use actor pool builder that can be used to create a pool of actors.
    ///
    /// The state and the handler function will be cloned and the `count` of actors will be spawned
    /// and their [`Aid`]s added to an [`AidPool`]. With the [`AidPool`] you can send messages to
    /// the pool to have one of the actors in the pool handle each message. The method that is used
    /// to select an actor to handle the message depends on the [`AidPool`] implmentation. The most
    /// common actor pool implementation is [`RandomAidPool`].
    ///
    /// # Examples
    /// ```
    /// use maxim::prelude::*;
    ///
    /// let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
    ///
    /// async fn handler(mut count: usize, _: Context, _: Message) -> ActorResult<usize> {
    ///     // Do something
    ///     Ok(Status::done(count))
    /// }
    ///
    /// let state = 0 as usize;
    ///
    /// let mut aid_pool: RandomAidPool = system.spawn_pool(10).with(state, handler).unwrap();
    /// // Send a message to one of the actors in the pool
    /// aid_pool.send_new(()).unwrap();
    /// ```
    #[cfg(feature = "actor-pool")]
    pub fn spawn_pool(&self, count: usize) -> ActorPoolBuilder {
        ActorPoolBuilder::new(
            ActorBuilder {
                system: self.clone(),
                name: None,
                channel_size: None,
            },
            count,
        )
    }

    /// Schedules the `aid` for work. Note that this is the only time that we have to use the
    /// lookup table. This function gets called when an actor goes from 0 receivable messages to
    /// 1 receivable message. If the actor has more receivable messages then this will not be
    /// needed to be called because the dispatcher threads will handle the process of resending
    /// the actor to the work channel.
    // TODO Put tests verifying the resend on multiple messages.
    pub(crate) fn schedule(&self, aid: Aid) {
        let actors_by_aid = &self.data.actors_by_aid;
        if actors_by_aid.contains_key(&aid) {
            self.data.executor.wake(aid);
        } else {
            // The actor was removed from the map so ignore the problem and just log
            // a warning.
            warn!(
                "Attempted to schedule actor with aid {:?} on system with node_id {:?} but
                the actor does not exist.",
                aid,
                self.data.uuid.to_string(),
            );
        }
    }

    /// Stops an actor by shutting down its channels and removing it from the actors list and
    /// telling the [`Aid`] to not allow messages to be sent to the actor since the receiving
    /// side of the actor is gone.
    ///
    /// This is something that should rarely be called from the outside as it is much better to
    /// send the actor a [`SystemMsg::Stop`] message and allow it to stop gracefully.
    pub fn stop_actor(&self, aid: &Aid) {
        self.internal_stop_actor(aid, None);
    }

    /// Internal implementation of stop_actor, so we have the ability to send an error along with
    /// the notification of stop.
    pub(crate) fn internal_stop_actor(&self, aid: &Aid, error: impl Into<Option<StdError>>) {
        {
            let actors_by_aid = &self.data.actors_by_aid;
            let aids_by_uuid = &self.data.aids_by_uuid;
            let aids_by_name = &self.data.aids_by_name;
            actors_by_aid.remove(aid);
            aids_by_uuid.remove(&aid.uuid());
            if let Some(name_string) = aid.name() {
                aids_by_name.remove(&name_string);
            }
            aid.stop().unwrap();
        }

        // Notify all of the actors monitoring the actor that is stopped and remove the
        // actor from the map of monitors.
        if let Some((_, monitoring)) = self.data.monitoring_by_monitored.remove(&aid) {
            let error = error.into().map(|e| format!("{}", e));
            for m_aid in monitoring {
                let value = SystemMsg::Stopped {
                    aid: aid.clone(),
                    error: error.clone(),
                };
                m_aid.send(Message::new(value)).unwrap_or_else(|error| {
                    error!(
                        "Could not send 'Stopped' to monitoring actor {}: Error: {:?}",
                        m_aid, error
                    );
                });
            }
        }
    }

    /// Checks to see if the actor with the given [`Aid`] is alive within this actor system.
    pub fn is_actor_alive(&self, aid: &Aid) -> bool {
        let actors_by_aid = &self.data.actors_by_aid;
        actors_by_aid.contains_key(aid)
    }

    /// Look up an [`Aid`] by the unique UUID of the actor and either returns the located
    /// [`Aid`] in a [`Option::Some`] or [`Option::None`] if not found.
    pub fn find_aid_by_uuid(&self, uuid: &Uuid) -> Option<Aid> {
        let aids_by_uuid = &self.data.aids_by_uuid;
        aids_by_uuid.get(uuid).map(|aid| aid.clone())
    }

    /// Look up an [`Aid`] by the user assigned name of the actor and either returns the
    /// located [`Aid`] in a [`Option::Some`] or [`Option::None`] if not found.
    pub fn find_aid_by_name(&self, name: &str) -> Option<Aid> {
        let aids_by_name = &self.data.aids_by_name;
        aids_by_name.get(&name.to_string()).map(|aid| aid.clone())
    }

    /// A helper that finds an [`Aid`] on this system from the `system_uuid` and `actor_uuid`
    /// passed to the function. If the `system_uuid` doesn't match this system then a `None` will
    /// be returned. Also if the `system_uuid` matches but the actor is not found a `None` will
    /// be returned.
    fn find_aid(&self, system_uuid: &Uuid, actor_uuid: &Uuid) -> Option<Aid> {
        if self.uuid() == *system_uuid {
            self.find_aid_by_uuid(&actor_uuid)
        } else {
            None
        }
    }

    /// Returns the [`Aid`] to the "System" actor for this actor system.
    pub fn system_actor_aid(&self) -> Aid {
        self.find_aid_by_name(&"System").unwrap()
    }

    /// Adds a monitor so that `monitoring` will be informed if `monitored` stops.
    pub fn monitor(&self, monitoring: &Aid, monitored: &Aid) {
        let mut monitoring_by_monitored = self
            .data
            .monitoring_by_monitored
            .get_raw_mut_from_key(&monitored);
        let monitoring_vec = monitoring_by_monitored
            .entry(monitored.clone())
            .or_insert(HashSet::new());
        monitoring_vec.insert(monitoring.clone());
    }

    /// Asynchronously send a message to the system actors on all connected actor systems.
    // FIXME (Issue #72) Add try_send ability.
    pub fn send_to_system_actors(&self, message: Message) {
        let remotes = &*self.data.remotes;
        trace!("Sending message to Remote System Actors");
        for remote in remotes.iter() {
            let aid = &remote.value().system_actor_aid;
            aid.send(message.clone()).unwrap_or_else(|error| {
                error!("Could not send to system actor {}. Error: {}", aid, error)
            });
        }
    }

    /// Schedules a `message` to be sent to the `destination` [`Aid`] after a `delay`. Note
    /// That this method makes a best attempt at sending the message on time but the message may
    /// not be sent on exactly the delay passed. However, the message will never be sent before
    /// the given delay.
    pub(crate) fn send_after(&self, message: Message, destination: Aid, delay: Duration) {
        let instant = Instant::now().checked_add(delay).unwrap();
        let entry = DelayedMessage {
            uuid: Uuid::new_v4(),
            destination,
            instant,
            message,
        };
        let (ref mutex, ref condvar) = &*self.data.delayed_messages;
        let mut data = mutex.lock().unwrap();
        data.push(entry);
        condvar.notify_all();
    }

    #[cfg(test)]
    pub(crate) fn executor(&self) -> &MaximExecutor {
        &self.data.executor
    }
}

impl fmt::Debug for ActorSystem {
    fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
        write!(
            formatter,
            "ActorSystem{{uuid: {}, config: {:?}}}",
            self.data.uuid.to_string(),
            self.data.config,
        )
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::system::system_actor::SystemActorMessage;
    use crate::tests::*;
    use futures::future;
    use std::thread;

    // A helper to start two actor systems and connect them.
    fn start_and_connect_two_systems() -> (ActorSystem, ActorSystem) {
        let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        ActorSystem::connect_with_channels(&system1, &system2);
        (system1, system2)
    }

    /// Helper to wait for 2 actor systems to shutdown or panic if they don't do so within
    /// 2000 milliseconds.
    fn await_two_system_shutdown(system1: ActorSystem, system2: ActorSystem) {
        let h1 = thread::spawn(move || {
            system1.await_shutdown(None);
        });

        let h2 = thread::spawn(move || {
            system2.await_shutdown(None);
        });

        h1.join().unwrap();
        h2.join().unwrap();
    }

    /// Test that verifies that the actor system shutdown mechanisms that wait for a specific
    /// timeout work properly.
    #[test]
    fn test_shutdown_await_timeout() {
        use std::time::Duration;

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        system
            .spawn()
            .with((), |_state: (), context: Context, _: Message| {
                async move {
                    // Block for enough time so we can test timeout twice
                    sleep(100);
                    context.system.trigger_shutdown();
                    Ok(Status::done(()))
                }
            })
            .unwrap();

        // Expecting to timeout
        assert_eq!(
            system.await_shutdown(Duration::from_millis(10)),
            ShutdownResult::TimedOut
        );

        // Expecting to NOT timeout
        assert_eq!(
            system.await_shutdown(Duration::from_millis(200)),
            ShutdownResult::Ok
        );

        // Validate that if the system is already shutdown the method doesn't hang.
        // FIXME Design a means that this cannot ever hang the test.
        system.await_shutdown(None);
    }

    /// This test verifies that an actor can be found by its uuid.
    #[test]
    fn test_find_by_uuid() {
        init_test_log();

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let aid = system.spawn().with((), simple_handler).unwrap();
        aid.send_new(11).unwrap();
        await_received(&aid, 2, 1000).unwrap();
        let found = system.find_aid_by_uuid(&aid.uuid()).unwrap();
        assert!(Aid::ptr_eq(&aid, &found));

        assert_eq!(None, system.find_aid_by_uuid(&Uuid::new_v4()));

        system.trigger_and_await_shutdown(None);
    }

    /// This test verifies that an actor can be found by its name if it has one.
    #[test]
    fn test_find_by_name() {
        init_test_log();

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
        aid.send_new(11).unwrap();
        await_received(&aid, 2, 1000).unwrap();
        let found = system.find_aid_by_name(&aid.name().unwrap()).unwrap();
        assert!(Aid::ptr_eq(&aid, &found));

        assert_eq!(None, system.find_aid_by_name("B"));

        system.trigger_and_await_shutdown(None);
    }

    /// This tests the find_aid function that takes a system uuid and an actor uuid.
    #[test]
    fn test_find_aid() {
        init_test_log();

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
        await_received(&aid, 1, 1000).unwrap();
        let found = system.find_aid(&aid.system_uuid(), &aid.uuid()).unwrap();
        assert!(Aid::ptr_eq(&aid, &found));

        assert_eq!(None, system.find_aid(&aid.system_uuid(), &Uuid::new_v4()));
        assert_eq!(None, system.find_aid(&Uuid::new_v4(), &aid.uuid()));

        system.trigger_and_await_shutdown(None);
    }

    /// Tests that actors that are stopped are removed from all relevant lookup maps and
    /// are reported as not being alive.
    #[test]
    fn test_stop_actor() {
        init_test_log();

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
        aid.send_new(11).unwrap();
        await_received(&aid, 2, 1000).unwrap();

        // Now we stop the actor.
        system.stop_actor(&aid);
        assert_eq!(false, system.is_actor_alive(&aid));

        // Verify the actor is NOT in the maps.
        let sys_clone = system.clone();
        let actors_by_aid = &sys_clone.data.actors_by_aid;
        assert_eq!(false, actors_by_aid.contains_key(&aid));
        let aids_by_uuid = &sys_clone.data.aids_by_uuid;
        assert_eq!(false, aids_by_uuid.contains_key(&aid.uuid()));
        assert_eq!(None, system.find_aid_by_name("A"));
        assert_eq!(None, system.find_aid_by_uuid(&aid.uuid()));

        system.trigger_and_await_shutdown(None);
    }

    /// This test verifies that the system can send a message after a particular delay.
    // FIXME need separate test for remotes.
    #[test]
    fn test_send_after() {
        init_test_log();

        info!("Preparing test");
        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let aid = system.spawn().name("A").with((), simple_handler).unwrap();
        await_received(&aid, 1, 1000).unwrap();
        info!("Test prepared, sending delayed message");

        system.send_after(Message::new(11), aid.clone(), Duration::from_millis(10));
        info!("Sleeping for initial check");
        sleep(5);
        assert_eq!(1, aid.received().unwrap());
        info!("Sleeping till we're 100% sure we should have the message");
        sleep(10);
        assert_eq!(2, aid.received().unwrap());

        system.trigger_and_await_shutdown(None);
    }

    /// Tests that if we execute two send_after calls, one for a longer duration than the
    /// second, that the message will be sent for the second one before the first one enqueued
    /// and that the second one will still arrive properly.
    #[test]
    fn test_send_after_before_current() {
        init_test_log();

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));

        let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
        await_received(&aid1, 1, 1000).unwrap();
        let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
        await_received(&aid2, 1, 1000).unwrap();

        aid1.send_after(Message::new(11), Duration::from_millis(50))
            .unwrap();

        aid2.send_after(Message::new(11), Duration::from_millis(10))
            .unwrap();

        assert_eq!(1, aid1.received().unwrap());
        assert_eq!(1, aid2.received().unwrap());

        // We overshoot the timing on the asserts because when the tests are run the CPU is
        // busy and the timing can be tricky.
        sleep(15);
        assert_eq!(1, aid1.received().unwrap());
        assert_eq!(2, aid2.received().unwrap());

        sleep(50);
        assert_eq!(2, aid1.received().unwrap());
        assert_eq!(2, aid2.received().unwrap());

        system.trigger_and_await_shutdown(None);
    }

    /// This test verifies that the system does not panic if we schedule to an actor that does
    /// not exist in the lookup map. This can happen if a message is sent to an actor after the
    /// actor is stopped but before the system notifies the [`Aid`] that the actor has been
    /// stopped.
    #[test]
    fn test_actor_not_in_map() {
        init_test_log();

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let aid = system.spawn().with((), simple_handler).unwrap();
        await_received(&aid, 1, 1000).unwrap(); // Now it is started for sure.

        // We force remove the actor from the system without calling stop so now it cannot
        // be scheduled.
        let sys_clone = system.clone();
        let actors_by_aid = &sys_clone.data.actors_by_aid;
        actors_by_aid.remove(&aid);

        // Send a message to the actor which should not schedule it but write out a warning.
        aid.send_new(11).unwrap();

        system.trigger_and_await_shutdown(None);
    }

    /// Tests connection between two different actor systems using channels.
    #[test]
    fn test_connect_with_channels() {
        let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        ActorSystem::connect_with_channels(&system1, &system2);
        {
            system1
                .data
                .remotes
                .get(&system2.data.uuid)
                .expect("Unable to find connection with system 2 in system 1");
        }
        {
            system2
                .data
                .remotes
                .get(&system1.data.uuid)
                .expect("Unable to find connection with system 1 in system 2");
        }
    }

    // Tests that monitors work in the actor system and send a message to monitoring actors
    // when monitored actors stop.
    #[test]
    fn test_monitors() {
        init_test_log();

        let tracker = AssertCollect::new();
        async fn monitor_handler(
            state: (Aid, AssertCollect),
            _: Context,
            message: Message,
        ) -> ActorResult<(Aid, AssertCollect)> {
            if let Some(msg) = message.content_as::<SystemMsg>() {
                match &*msg {
                    SystemMsg::Stopped { aid, error } => {
                        state
                            .1
                            .assert(Aid::ptr_eq(&state.0, aid), "Pointers are not equal!");
                        state.1.assert(error.is_none(), "Actor was errored!");
                        Ok(Status::done(state))
                    }
                    SystemMsg::Start => Ok(Status::done(state)),
                    _ => state.1.panic("Received some other message!"),
                }
            } else {
                state.1.panic("Received some other message!")
            }
        }

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let monitored = system.spawn().with((), simple_handler).unwrap();
        let not_monitoring = system.spawn().with((), simple_handler).unwrap();
        let monitoring1 = system
            .spawn()
            .with((monitored.clone(), tracker.clone()), monitor_handler)
            .unwrap();
        let monitoring2 = system
            .spawn()
            .with((monitored.clone(), tracker.clone()), monitor_handler)
            .unwrap();
        system.monitor(&monitoring1, &monitored);
        system.monitor(&monitoring2, &monitored);

        {
            // Validate the monitors are there in a block to release mutex afterwards.
            let monitoring_by_monitored = &system.data.monitoring_by_monitored;
            let m_set = monitoring_by_monitored.get(&monitored).unwrap();
            assert!(m_set.contains(&monitoring1));
            assert!(m_set.contains(&monitoring2));
        }

        // Stop the actor and it should be out of the monitors map.
        system.stop_actor(&monitored);
        await_received(&monitoring1, 2, 1000).unwrap();
        await_received(&monitoring2, 2, 1000).unwrap();
        await_received(&not_monitoring, 1, 1000).unwrap();

        system.trigger_and_await_shutdown(None);
        tracker.collect();
    }

    #[test]
    fn test_monitor_gets_panics_errors() {
        init_test_log();

        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
        let tracker = AssertCollect::new();
        let t = tracker.clone();
        let aid = system
            .spawn()
            .with((), |_: (), _: Context, msg: Message| {
                if let Some(_) = msg.content_as::<SystemMsg>() {
                    debug!("Not panicking this time");
                    return future::ok(Status::done(()));
                }

                debug!("About to panic");
                panic!("I panicked")
            })
            .unwrap();
        let monitor = system
            .spawn()
            .with(aid.clone(), move |state: Aid, _: Context, msg: Message| {
                if let Some(msg) = msg.content_as::<SystemMsg>() {
                    match &*msg {
                        SystemMsg::Stopped { aid, error } => {
                            t.assert(*aid == state, "Aid is not expected Aid");
                            t.assert(error.is_some(), "Expected error");
                            t.assert(
                                error.as_ref().unwrap() == "I panicked",
                                "Error message does not match",
                            );
                            future::ok(Status::stop(state))
                        }
                        SystemMsg::Start => future::ok(Status::done(state)),
                        _ => t.panic("Unexpected message received!"),
                    }
                } else {
                    t.panic("Unexpected message received!")
                }
            })
            .unwrap();
        system.monitor(&monitor, &aid);
        aid.send_new(()).unwrap();
        await_received(&monitor, 2, 1000).unwrap();
        system.trigger_and_await_shutdown(Duration::from_millis(1000));
        tracker.collect();
    }

    /// This test verifies that the concept of named actors works properly. When a user wants
    /// to declare a named actor they cannot register the same name twice. When the actor using
    /// the name currently stops the name should be removed from the registered names and be
    /// available again.
    #[test]
    fn test_named_actor_restrictions() {
        init_test_log();
        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));

        let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
        await_received(&aid1, 1, 1000).unwrap();

        let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
        await_received(&aid2, 1, 1000).unwrap();

        // Spawn an actor that attempts to overwrite "A" in the names and make sure the
        // attempt returns an error to be handled.
        let result = system.spawn().name("A").with((), simple_handler);
        assert_eq!(Err(SystemError::NameAlreadyUsed("A".to_string())), result);

        // Verify that the same actor has "A" name and is still up.
        let found1 = system.find_aid_by_name("A").unwrap();
        assert_eq!(true, system.is_actor_alive(&aid1));
        assert!(Aid::ptr_eq(&aid1, &found1));

        // Stop "B" and verify that the ActorSystem's maps are cleaned up.
        system.stop_actor(&aid2);
        assert_eq!(None, system.find_aid_by_name("B"));
        assert_eq!(None, system.find_aid_by_uuid(&aid2.uuid()));

        // Now we should be able to crate a new actor with the name "B".
        let aid3 = system.spawn().name("B").with((), simple_handler).unwrap();
        await_received(&aid3, 1, 1000).unwrap();
        let found2 = system.find_aid_by_name("B").unwrap();
        assert!(Aid::ptr_eq(&aid3, &found2));

        system.trigger_and_await_shutdown(None);
    }

    /// Tests that remote actors can send and receive messages between each other.
    #[test]
    fn test_remote_actors() {
        // In this test our messages are just structs.
        #[derive(Serialize, Deserialize, Debug)]
        struct Request {
            reply_to: Aid,
        }

        #[derive(Serialize, Deserialize, Debug)]
        struct Reply {}

        init_test_log();
        let tracker = AssertCollect::new();
        let t = tracker.clone();
        let (system1, system2) = start_and_connect_two_systems();

        system1.init_current();
        let aid = system1
            .spawn()
            .with((), move |_: (), context: Context, message: Message| {
                let t = t.clone();
                async move {
                    if let Some(msg) = message.content_as::<Request>() {
                        msg.reply_to.send_new(Reply {}).unwrap();
                        context.system.trigger_shutdown();
                        Ok(Status::stop(()))
                    } else if let Some(_) = message.content_as::<SystemMsg>() {
                        Ok(Status::done(()))
                    } else {
                        t.panic("Unexpected message received!")
                    }
                }
            })
            .unwrap();
        await_received(&aid, 1, 1000).unwrap();

        let t = tracker.clone();
        let serialized = bincode::serialize(&aid).unwrap();
        system2
            .spawn()
            .with((), move |_: (), context: Context, message: Message| {
                if let Some(_) = message.content_as::<Reply>() {
                    debug!("Received reply, shutting down");
                    context.system.trigger_shutdown();
                    future::ok(Status::stop(()))
                } else if let Some(msg) = message.content_as::<SystemMsg>() {
                    match &*msg {
                        SystemMsg::Start => {
                            debug!("Starting request actor");
                            let target_aid: Aid = bincode::deserialize(&serialized).unwrap();
                            target_aid
                                .send_new(Request {
                                    reply_to: context.aid.clone(),
                                })
                                .unwrap();
                            future::ok(Status::done(()))
                        }
                        _ => future::ok(Status::done(())),
                    }
                } else {
                    t.panic("Unexpected message received!")
                }
            })
            .unwrap();

        await_two_system_shutdown(system1, system2);
        tracker.collect();
    }

    /// Tests the ability to find an aid on a remote system by name using a `SystemActor`. This
    /// also serves as a test for cross system actor communication as well as testing broadcast
    /// to multiple system actors in the cluster.
    #[test]
    fn test_system_actor_find_by_name() {
        init_test_log();
        let tracker = AssertCollect::new();
        let t = tracker.clone();
        let (system1, system2) = start_and_connect_two_systems();

        let aid1 = system1
            .spawn()
            .name("A")
            .with((), |_: (), context: Context, message: Message| async move {
                if let Some(_) = message.content_as::<bool>() {
                    context.system.trigger_shutdown();
                    Ok(Status::stop(()))
                } else {
                    Ok(Status::done(()))
                }
            })
            .unwrap();
        await_received(&aid1, 1, 1000).unwrap();

        system2
            .spawn()
            .with((), move |_: (), context: Context, message: Message| {
                // We have to do this so each async block future gets its own copy.
                let aid1 = aid1.clone();
                let t = t.clone();
                async move {
                    if let Some(msg) = message.content_as::<SystemActorMessage>() {
                        match &*msg {
                            SystemActorMessage::FindByNameResult { aid: found, .. } => {
                                debug!("FindByNameResult received");
                                if let Some(target) = found {
                                    t.assert(
                                        target.uuid() == aid1.uuid(),
                                        "Target is not expected Actor",
                                    );
                                    target.send_new(true).unwrap();
                                    context.system.trigger_shutdown();
                                    Ok(Status::done(()))
                                } else {
                                    t.panic("Didn't find AID.")
                                }
                            }
                            _ => t.panic("Unexpected message received!"),
                        }
                    } else if let Some(msg) = message.content_as::<SystemMsg>() {
                        debug!("Actor started, attempting to send FindByName request");
                        if let SystemMsg::Start = &*msg {
                            context.system.send_to_system_actors(Message::new(
                                SystemActorMessage::FindByName {
                                    reply_to: context.aid.clone(),
                                    name: "A".to_string(),
                                },
                            ));
                            Ok(Status::done(()))
                        } else {
                            t.panic("Unexpected message received!")
                        }
                    } else {
                        t.panic("Unexpected message received!")
                    }
                }
            })
            .unwrap();

        await_two_system_shutdown(system1, system2);
        tracker.collect();
    }

    /// Tests the ability create an [`AidPool`] and send messages to the pool.
    #[test]
    #[cfg(feature = "actor-pool")]
    fn test_spawn_pool() {
        let tracker = AssertCollect::new();
        let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));

        async fn handler(_: (), _: Context, _: Message) -> ActorResult<()> {
            Ok(Status::done(()))
        }

        // Create an actor pool
        let mut aid_pool: RandomAidPool = system
            .spawn_pool(3)
            .name("handler")
            .channel_size(100)
            .with((), handler)
            .unwrap();

        // Send a bunch of messages to the pool
        for _ in 0..=100 {
            aid_pool.send_new(0).unwrap();
        }

        // Sleep to make sure we get the messages
        sleep(10);

        // Convert the pool to a `Vec` of `Aid`s
        let aids: Vec<Aid> = aid_pool.into();

        // Make sure each aid in the pool has received at least one message
        for aid in aids {
            assert!(aid.received().unwrap() > 1);
        }

        system.trigger_and_await_shutdown(None);
        tracker.collect();
    }
}