#[cfg(feature = "actor-pool")]
use crate::actors::ActorPoolBuilder;
use crate::actors::{Actor, ActorBuilder, ActorStream};
use crate::executor::MaximExecutor;
use crate::prelude::*;
use crate::system::system_actor::SystemActor;
use dashmap::DashMap;
use log::{debug, error, info, trace, warn};
use once_cell::sync::OnceCell;
use secc::{SeccReceiver, SeccSender};
use serde::{Deserialize, Serialize};
use std::collections::{BinaryHeap, HashSet};
use std::error::Error;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use uuid::Uuid;
mod system_actor;
std::thread_local! {
static ACTOR_SYSTEM: OnceCell<ActorSystem> = OnceCell::new();
}
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum SystemMsg {
Start,
Stop,
Stopped { aid: Aid, error: Option<String> },
}
#[derive(Clone, Serialize, Deserialize)]
pub enum WireMessage {
Hello {
system_actor_aid: Aid,
},
ActorMessage {
actor_uuid: Uuid,
system_uuid: Uuid,
message: Message,
},
DelayedActorMessage {
duration: Duration,
actor_uuid: Uuid,
system_uuid: Uuid,
message: Message,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ActorSystemConfig {
pub message_channel_size: u16,
pub send_timeout: Duration,
pub thread_pool_size: u16,
pub warn_threshold: Duration,
pub time_slice: Duration,
pub thread_wait_time: Duration,
pub start_on_launch: bool,
}
impl ActorSystemConfig {
pub fn message_channel_size(mut self, value: u16) -> Self {
self.message_channel_size = value;
self
}
pub fn send_timeout(mut self, value: Duration) -> Self {
self.send_timeout = value;
self
}
pub fn thread_pool_size(mut self, value: u16) -> Self {
self.thread_pool_size = value;
self
}
pub fn warn_threshold(mut self, value: Duration) -> Self {
self.warn_threshold = value;
self
}
pub fn time_slice(mut self, value: Duration) -> Self {
self.time_slice = value;
self
}
pub fn thread_wait_time(mut self, value: Duration) -> Self {
self.thread_wait_time = value;
self
}
}
impl Default for ActorSystemConfig {
fn default() -> ActorSystemConfig {
ActorSystemConfig {
thread_pool_size: (num_cpus::get() * 4) as u16,
warn_threshold: Duration::from_millis(1),
time_slice: Duration::from_millis(1),
thread_wait_time: Duration::from_millis(100),
message_channel_size: 32,
send_timeout: Duration::from_millis(1),
start_on_launch: true,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum SystemError {
NameAlreadyUsed(String),
}
impl std::fmt::Display for SystemError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl Error for SystemError {}
pub struct RemoteInfo {
pub system_uuid: Uuid,
pub sender: SeccSender<WireMessage>,
pub receiver: SeccReceiver<WireMessage>,
pub system_actor_aid: Aid,
_handle: JoinHandle<()>,
}
struct DelayedMessage {
uuid: Uuid,
destination: Aid,
instant: Instant,
message: Message,
}
impl std::cmp::PartialEq for DelayedMessage {
fn eq(&self, other: &Self) -> bool {
self.uuid == other.uuid
}
}
impl std::cmp::Eq for DelayedMessage {}
impl std::cmp::PartialOrd for DelayedMessage {
fn partial_cmp(&self, other: &DelayedMessage) -> Option<std::cmp::Ordering> {
Some(other.instant.cmp(&self.instant))
}
}
impl std::cmp::Ord for DelayedMessage {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.partial_cmp(other)
.expect("DelayedMessage::partial_cmp() returned None; can't happen")
}
}
pub(crate) struct ActorSystemData {
pub(crate) uuid: Uuid,
pub(crate) config: ActorSystemConfig,
threads: Mutex<Vec<JoinHandle<()>>>,
executor: MaximExecutor,
started: AtomicBool,
shutdown_triggered: Arc<(Mutex<bool>, Condvar)>,
actors_by_aid: Arc<DashMap<Aid, Arc<Actor>>>,
aids_by_uuid: Arc<DashMap<Uuid, Aid>>,
aids_by_name: Arc<DashMap<String, Aid>>,
monitoring_by_monitored: Arc<DashMap<Aid, HashSet<Aid>>>,
remotes: Arc<DashMap<Uuid, RemoteInfo>>,
delayed_messages: Arc<(Mutex<BinaryHeap<DelayedMessage>>, Condvar)>,
}
#[derive(Clone)]
pub struct ActorSystem {
pub(crate) data: Arc<ActorSystemData>,
}
impl ActorSystem {
pub fn create(config: ActorSystemConfig) -> ActorSystem {
let uuid = Uuid::new_v4();
let threads = Mutex::new(Vec::with_capacity(config.thread_pool_size as usize));
let shutdown_triggered = Arc::new((Mutex::new(false), Condvar::new()));
let executor = MaximExecutor::new(shutdown_triggered.clone());
let start_on_launch = config.start_on_launch;
let system = ActorSystem {
data: Arc::new(ActorSystemData {
uuid,
config,
threads,
executor,
started: AtomicBool::new(false),
shutdown_triggered,
actors_by_aid: Arc::new(DashMap::default()),
aids_by_uuid: Arc::new(DashMap::default()),
aids_by_name: Arc::new(DashMap::default()),
monitoring_by_monitored: Arc::new(DashMap::default()),
remotes: Arc::new(DashMap::default()),
delayed_messages: Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new())),
}),
};
if start_on_launch {
system.start();
}
system
}
pub fn start(&self) {
if !self
.data
.started
.compare_and_swap(false, true, Ordering::Relaxed)
{
info!("ActorSystem {} has spawned", self.data.uuid);
self.data.executor.init(self);
{
let mut guard = self.data.threads.lock().unwrap();
for _ in 0..1 {
let thread = self.start_send_after_thread();
guard.push(thread);
}
}
self.spawn()
.name("System")
.with(SystemActor, SystemActor::processor)
.unwrap();
}
}
fn start_send_after_thread(&self) -> JoinHandle<()> {
let system = self.clone();
let delayed_messages = self.data.delayed_messages.clone();
thread::spawn(move || {
while !*system.data.shutdown_triggered.0.lock().unwrap() {
let (ref mutex, ref condvar) = &*delayed_messages;
let mut data = mutex.lock().unwrap();
match data.peek() {
None => {
let _ = condvar.wait(data).unwrap();
}
Some(msg) => {
let now = Instant::now();
if now >= msg.instant {
trace!("Sending delayed message");
msg.destination
.send(msg.message.clone())
.unwrap_or_else(|error| {
warn!(
"Cannot send scheduled message to {}: Error {:?}",
msg.destination, error
);
});
data.pop();
} else {
let duration = msg.instant.duration_since(now);
let _result = condvar.wait_timeout(data, duration).unwrap();
}
}
}
}
})
}
pub fn config(&self) -> &ActorSystemConfig {
&self.data.config
}
pub(crate) fn remote_sender(&self, system_uuid: &Uuid) -> Option<SeccSender<WireMessage>> {
self.data
.remotes
.get(system_uuid)
.map(|info| info.sender.clone())
}
pub fn connect(
&self,
sender: &SeccSender<WireMessage>,
receiver: &SeccReceiver<WireMessage>,
) -> Uuid {
let hello = WireMessage::Hello {
system_actor_aid: self.system_actor_aid(),
};
sender.send(hello).unwrap();
debug!("Sending hello from {}", self.data.uuid);
let system_actor_aid =
match receiver.receive_await_timeout(self.data.config.thread_wait_time) {
Ok(message) => match message {
WireMessage::Hello { system_actor_aid } => system_actor_aid,
_ => panic!("Expected first message to be a Hello"),
},
Err(e) => panic!("Expected to read a Hello message {:?}", e),
};
let system = self.clone();
let receiver_clone = receiver.clone();
let thread_timeout = self.data.config.thread_wait_time;
let sys_uuid = system_actor_aid.system_uuid();
let handle = thread::spawn(move || {
system.init_current();
while !*system.data.shutdown_triggered.0.lock().unwrap() {
match receiver_clone.receive_await_timeout(thread_timeout) {
Err(_) => (),
Ok(wire_msg) => system.process_wire_message(&sys_uuid, &wire_msg),
}
}
});
let info = RemoteInfo {
system_uuid: system_actor_aid.system_uuid(),
sender: sender.clone(),
receiver: receiver.clone(),
_handle: handle,
system_actor_aid,
};
let uuid = info.system_uuid;
self.data.remotes.insert(uuid.clone(), info);
uuid
}
pub fn disconnect(&self, system_uuid: Uuid) -> Result<(), AidError> {
self.data.remotes.remove(&system_uuid);
Ok(())
}
pub fn connect_with_channels(system1: &ActorSystem, system2: &ActorSystem) {
let (tx1, rx1) = secc::create::<WireMessage>(32, system1.data.config.thread_wait_time);
let (tx2, rx2) = secc::create::<WireMessage>(32, system2.data.config.thread_wait_time);
let system1_clone = system1.clone();
let system2_clone = system2.clone();
let h1 = thread::spawn(move || system1_clone.connect(&tx1, &rx2));
let h2 = thread::spawn(move || system2_clone.connect(&tx2, &rx1));
h1.join().unwrap();
h2.join().unwrap();
}
fn process_wire_message(&self, _uuid: &Uuid, wire_message: &WireMessage) {
match wire_message {
WireMessage::ActorMessage {
actor_uuid,
system_uuid,
message,
} => {
if let Some(aid) = self.find_aid(&system_uuid, &actor_uuid) {
aid.send(message.clone()).unwrap_or_else(|error| {
warn!("Could not send wire message to {}. Error: {}", aid, error);
})
}
}
WireMessage::DelayedActorMessage {
duration,
actor_uuid,
system_uuid,
message,
} => {
self.find_aid(&system_uuid, &actor_uuid)
.map(|aid| self.send_after(message.clone(), aid, *duration))
.expect("Error not handled yet");
}
WireMessage::Hello { system_actor_aid } => {
debug!("{:?} Got Hello from {}", self.data.uuid, system_actor_aid);
}
}
}
pub fn init_current(&self) {
ACTOR_SYSTEM.with(|actor_system| {
actor_system
.set(self.clone())
.expect("Unable to set ACTOR_SYSTEM.");
});
}
#[inline]
pub fn current() -> ActorSystem {
ACTOR_SYSTEM.with(|actor_system| {
actor_system
.get()
.expect("Thread local ACTOR_SYSTEM not set! See `ActorSystem::init_current()`")
.clone()
})
}
#[inline]
pub fn uuid(&self) -> Uuid {
self.data.uuid
}
pub fn trigger_shutdown(&self) {
let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
*mutex.lock().unwrap() = true;
condvar.notify_all();
}
pub fn await_shutdown(&self, timeout: impl Into<Option<Duration>>) -> ShutdownResult {
info!("System awaiting shutdown");
let start = Instant::now();
let timeout = timeout.into();
let result = match timeout {
Some(dur) => self.await_shutdown_trigger_with_timeout(dur),
None => self.await_shutdown_trigger_without_timeout(),
};
if let Some(r) = result {
return r;
}
let timeout = {
match timeout {
Some(timeout) => {
let elapsed = Instant::now().duration_since(start);
if let Some(t) = timeout.checked_sub(elapsed) {
Some(t)
} else {
return ShutdownResult::TimedOut;
}
}
None => None,
}
};
self.data.executor.await_shutdown(timeout)
}
fn await_shutdown_trigger_with_timeout(&self, mut dur: Duration) -> Option<ShutdownResult> {
let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
let mut guard = mutex.lock().unwrap();
while !*guard {
let started = Instant::now();
let (new_guard, timeout) = match condvar.wait_timeout(guard, dur) {
Ok(ret) => ret,
Err(_) => return Some(ShutdownResult::Panicked),
};
if timeout.timed_out() {
return Some(ShutdownResult::TimedOut);
}
guard = new_guard;
dur -= started.elapsed();
}
None
}
fn await_shutdown_trigger_without_timeout(&self) -> Option<ShutdownResult> {
let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
let mut guard = mutex.lock().unwrap();
while !*guard {
guard = match condvar.wait(guard) {
Ok(ret) => ret,
Err(_) => return Some(ShutdownResult::Panicked),
};
}
None
}
pub fn trigger_and_await_shutdown(
&self,
timeout: impl Into<Option<Duration>>,
) -> ShutdownResult {
self.trigger_shutdown();
self.await_shutdown(timeout)
}
pub(crate) fn register_actor(
&self,
actor: Arc<Actor>,
stream: ActorStream,
) -> Result<Aid, SystemError> {
let aids_by_name = &self.data.aids_by_name;
let actors_by_aid = &self.data.actors_by_aid;
let aids_by_uuid = &self.data.aids_by_uuid;
let aid = actor.context.aid.clone();
if let Some(name_string) = &aid.name() {
if aids_by_name.contains_key(name_string) {
return Err(SystemError::NameAlreadyUsed(name_string.clone()));
} else {
aids_by_name.insert(name_string.clone(), aid.clone());
}
}
actors_by_aid.insert(aid.clone(), actor);
aids_by_uuid.insert(aid.uuid(), aid.clone());
self.data.executor.register_actor(stream);
aid.send(Message::new(SystemMsg::Start)).unwrap();
Ok(aid)
}
pub fn spawn(&self) -> ActorBuilder {
ActorBuilder {
system: self.clone(),
name: None,
channel_size: None,
}
}
#[cfg(feature = "actor-pool")]
pub fn spawn_pool(&self, count: usize) -> ActorPoolBuilder {
ActorPoolBuilder::new(
ActorBuilder {
system: self.clone(),
name: None,
channel_size: None,
},
count,
)
}
pub(crate) fn schedule(&self, aid: Aid) {
let actors_by_aid = &self.data.actors_by_aid;
if actors_by_aid.contains_key(&aid) {
self.data.executor.wake(aid);
} else {
warn!(
"Attempted to schedule actor with aid {:?} on system with node_id {:?} but
the actor does not exist.",
aid,
self.data.uuid.to_string(),
);
}
}
pub fn stop_actor(&self, aid: &Aid) {
self.internal_stop_actor(aid, None);
}
pub(crate) fn internal_stop_actor(&self, aid: &Aid, error: impl Into<Option<StdError>>) {
{
let actors_by_aid = &self.data.actors_by_aid;
let aids_by_uuid = &self.data.aids_by_uuid;
let aids_by_name = &self.data.aids_by_name;
actors_by_aid.remove(aid);
aids_by_uuid.remove(&aid.uuid());
if let Some(name_string) = aid.name() {
aids_by_name.remove(&name_string);
}
aid.stop().unwrap();
}
if let Some((_, monitoring)) = self.data.monitoring_by_monitored.remove(&aid) {
let error = error.into().map(|e| format!("{}", e));
for m_aid in monitoring {
let value = SystemMsg::Stopped {
aid: aid.clone(),
error: error.clone(),
};
m_aid.send(Message::new(value)).unwrap_or_else(|error| {
error!(
"Could not send 'Stopped' to monitoring actor {}: Error: {:?}",
m_aid, error
);
});
}
}
}
pub fn is_actor_alive(&self, aid: &Aid) -> bool {
let actors_by_aid = &self.data.actors_by_aid;
actors_by_aid.contains_key(aid)
}
pub fn find_aid_by_uuid(&self, uuid: &Uuid) -> Option<Aid> {
let aids_by_uuid = &self.data.aids_by_uuid;
aids_by_uuid.get(uuid).map(|aid| aid.clone())
}
pub fn find_aid_by_name(&self, name: &str) -> Option<Aid> {
let aids_by_name = &self.data.aids_by_name;
aids_by_name.get(&name.to_string()).map(|aid| aid.clone())
}
fn find_aid(&self, system_uuid: &Uuid, actor_uuid: &Uuid) -> Option<Aid> {
if self.uuid() == *system_uuid {
self.find_aid_by_uuid(&actor_uuid)
} else {
None
}
}
pub fn system_actor_aid(&self) -> Aid {
self.find_aid_by_name(&"System").unwrap()
}
pub fn monitor(&self, monitoring: &Aid, monitored: &Aid) {
let mut monitoring_by_monitored = self
.data
.monitoring_by_monitored
.get_raw_mut_from_key(&monitored);
let monitoring_vec = monitoring_by_monitored
.entry(monitored.clone())
.or_insert(HashSet::new());
monitoring_vec.insert(monitoring.clone());
}
pub fn send_to_system_actors(&self, message: Message) {
let remotes = &*self.data.remotes;
trace!("Sending message to Remote System Actors");
for remote in remotes.iter() {
let aid = &remote.value().system_actor_aid;
aid.send(message.clone()).unwrap_or_else(|error| {
error!("Could not send to system actor {}. Error: {}", aid, error)
});
}
}
pub(crate) fn send_after(&self, message: Message, destination: Aid, delay: Duration) {
let instant = Instant::now().checked_add(delay).unwrap();
let entry = DelayedMessage {
uuid: Uuid::new_v4(),
destination,
instant,
message,
};
let (ref mutex, ref condvar) = &*self.data.delayed_messages;
let mut data = mutex.lock().unwrap();
data.push(entry);
condvar.notify_all();
}
#[cfg(test)]
pub(crate) fn executor(&self) -> &MaximExecutor {
&self.data.executor
}
}
impl fmt::Debug for ActorSystem {
fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
write!(
formatter,
"ActorSystem{{uuid: {}, config: {:?}}}",
self.data.uuid.to_string(),
self.data.config,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::system::system_actor::SystemActorMessage;
use crate::tests::*;
use futures::future;
use std::thread;
fn start_and_connect_two_systems() -> (ActorSystem, ActorSystem) {
let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
ActorSystem::connect_with_channels(&system1, &system2);
(system1, system2)
}
fn await_two_system_shutdown(system1: ActorSystem, system2: ActorSystem) {
let h1 = thread::spawn(move || {
system1.await_shutdown(None);
});
let h2 = thread::spawn(move || {
system2.await_shutdown(None);
});
h1.join().unwrap();
h2.join().unwrap();
}
#[test]
fn test_shutdown_await_timeout() {
use std::time::Duration;
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
system
.spawn()
.with((), |_state: (), context: Context, _: Message| {
async move {
sleep(100);
context.system.trigger_shutdown();
Ok(Status::done(()))
}
})
.unwrap();
assert_eq!(
system.await_shutdown(Duration::from_millis(10)),
ShutdownResult::TimedOut
);
assert_eq!(
system.await_shutdown(Duration::from_millis(200)),
ShutdownResult::Ok
);
system.await_shutdown(None);
}
#[test]
fn test_find_by_uuid() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().with((), simple_handler).unwrap();
aid.send_new(11).unwrap();
await_received(&aid, 2, 1000).unwrap();
let found = system.find_aid_by_uuid(&aid.uuid()).unwrap();
assert!(Aid::ptr_eq(&aid, &found));
assert_eq!(None, system.find_aid_by_uuid(&Uuid::new_v4()));
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_find_by_name() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().name("A").with((), simple_handler).unwrap();
aid.send_new(11).unwrap();
await_received(&aid, 2, 1000).unwrap();
let found = system.find_aid_by_name(&aid.name().unwrap()).unwrap();
assert!(Aid::ptr_eq(&aid, &found));
assert_eq!(None, system.find_aid_by_name("B"));
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_find_aid() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().name("A").with((), simple_handler).unwrap();
await_received(&aid, 1, 1000).unwrap();
let found = system.find_aid(&aid.system_uuid(), &aid.uuid()).unwrap();
assert!(Aid::ptr_eq(&aid, &found));
assert_eq!(None, system.find_aid(&aid.system_uuid(), &Uuid::new_v4()));
assert_eq!(None, system.find_aid(&Uuid::new_v4(), &aid.uuid()));
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_stop_actor() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().name("A").with((), simple_handler).unwrap();
aid.send_new(11).unwrap();
await_received(&aid, 2, 1000).unwrap();
system.stop_actor(&aid);
assert_eq!(false, system.is_actor_alive(&aid));
let sys_clone = system.clone();
let actors_by_aid = &sys_clone.data.actors_by_aid;
assert_eq!(false, actors_by_aid.contains_key(&aid));
let aids_by_uuid = &sys_clone.data.aids_by_uuid;
assert_eq!(false, aids_by_uuid.contains_key(&aid.uuid()));
assert_eq!(None, system.find_aid_by_name("A"));
assert_eq!(None, system.find_aid_by_uuid(&aid.uuid()));
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_send_after() {
init_test_log();
info!("Preparing test");
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().name("A").with((), simple_handler).unwrap();
await_received(&aid, 1, 1000).unwrap();
info!("Test prepared, sending delayed message");
system.send_after(Message::new(11), aid.clone(), Duration::from_millis(10));
info!("Sleeping for initial check");
sleep(5);
assert_eq!(1, aid.received().unwrap());
info!("Sleeping till we're 100% sure we should have the message");
sleep(10);
assert_eq!(2, aid.received().unwrap());
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_send_after_before_current() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
await_received(&aid1, 1, 1000).unwrap();
let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
await_received(&aid2, 1, 1000).unwrap();
aid1.send_after(Message::new(11), Duration::from_millis(50))
.unwrap();
aid2.send_after(Message::new(11), Duration::from_millis(10))
.unwrap();
assert_eq!(1, aid1.received().unwrap());
assert_eq!(1, aid2.received().unwrap());
sleep(15);
assert_eq!(1, aid1.received().unwrap());
assert_eq!(2, aid2.received().unwrap());
sleep(50);
assert_eq!(2, aid1.received().unwrap());
assert_eq!(2, aid2.received().unwrap());
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_actor_not_in_map() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().with((), simple_handler).unwrap();
await_received(&aid, 1, 1000).unwrap();
let sys_clone = system.clone();
let actors_by_aid = &sys_clone.data.actors_by_aid;
actors_by_aid.remove(&aid);
aid.send_new(11).unwrap();
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_connect_with_channels() {
let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
ActorSystem::connect_with_channels(&system1, &system2);
{
system1
.data
.remotes
.get(&system2.data.uuid)
.expect("Unable to find connection with system 2 in system 1");
}
{
system2
.data
.remotes
.get(&system1.data.uuid)
.expect("Unable to find connection with system 1 in system 2");
}
}
#[test]
fn test_monitors() {
init_test_log();
let tracker = AssertCollect::new();
async fn monitor_handler(
state: (Aid, AssertCollect),
_: Context,
message: Message,
) -> ActorResult<(Aid, AssertCollect)> {
if let Some(msg) = message.content_as::<SystemMsg>() {
match &*msg {
SystemMsg::Stopped { aid, error } => {
state
.1
.assert(Aid::ptr_eq(&state.0, aid), "Pointers are not equal!");
state.1.assert(error.is_none(), "Actor was errored!");
Ok(Status::done(state))
}
SystemMsg::Start => Ok(Status::done(state)),
_ => state.1.panic("Received some other message!"),
}
} else {
state.1.panic("Received some other message!")
}
}
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let monitored = system.spawn().with((), simple_handler).unwrap();
let not_monitoring = system.spawn().with((), simple_handler).unwrap();
let monitoring1 = system
.spawn()
.with((monitored.clone(), tracker.clone()), monitor_handler)
.unwrap();
let monitoring2 = system
.spawn()
.with((monitored.clone(), tracker.clone()), monitor_handler)
.unwrap();
system.monitor(&monitoring1, &monitored);
system.monitor(&monitoring2, &monitored);
{
let monitoring_by_monitored = &system.data.monitoring_by_monitored;
let m_set = monitoring_by_monitored.get(&monitored).unwrap();
assert!(m_set.contains(&monitoring1));
assert!(m_set.contains(&monitoring2));
}
system.stop_actor(&monitored);
await_received(&monitoring1, 2, 1000).unwrap();
await_received(&monitoring2, 2, 1000).unwrap();
await_received(¬_monitoring, 1, 1000).unwrap();
system.trigger_and_await_shutdown(None);
tracker.collect();
}
#[test]
fn test_monitor_gets_panics_errors() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let tracker = AssertCollect::new();
let t = tracker.clone();
let aid = system
.spawn()
.with((), |_: (), _: Context, msg: Message| {
if let Some(_) = msg.content_as::<SystemMsg>() {
debug!("Not panicking this time");
return future::ok(Status::done(()));
}
debug!("About to panic");
panic!("I panicked")
})
.unwrap();
let monitor = system
.spawn()
.with(aid.clone(), move |state: Aid, _: Context, msg: Message| {
if let Some(msg) = msg.content_as::<SystemMsg>() {
match &*msg {
SystemMsg::Stopped { aid, error } => {
t.assert(*aid == state, "Aid is not expected Aid");
t.assert(error.is_some(), "Expected error");
t.assert(
error.as_ref().unwrap() == "I panicked",
"Error message does not match",
);
future::ok(Status::stop(state))
}
SystemMsg::Start => future::ok(Status::done(state)),
_ => t.panic("Unexpected message received!"),
}
} else {
t.panic("Unexpected message received!")
}
})
.unwrap();
system.monitor(&monitor, &aid);
aid.send_new(()).unwrap();
await_received(&monitor, 2, 1000).unwrap();
system.trigger_and_await_shutdown(Duration::from_millis(1000));
tracker.collect();
}
#[test]
fn test_named_actor_restrictions() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
await_received(&aid1, 1, 1000).unwrap();
let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
await_received(&aid2, 1, 1000).unwrap();
let result = system.spawn().name("A").with((), simple_handler);
assert_eq!(Err(SystemError::NameAlreadyUsed("A".to_string())), result);
let found1 = system.find_aid_by_name("A").unwrap();
assert_eq!(true, system.is_actor_alive(&aid1));
assert!(Aid::ptr_eq(&aid1, &found1));
system.stop_actor(&aid2);
assert_eq!(None, system.find_aid_by_name("B"));
assert_eq!(None, system.find_aid_by_uuid(&aid2.uuid()));
let aid3 = system.spawn().name("B").with((), simple_handler).unwrap();
await_received(&aid3, 1, 1000).unwrap();
let found2 = system.find_aid_by_name("B").unwrap();
assert!(Aid::ptr_eq(&aid3, &found2));
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_remote_actors() {
#[derive(Serialize, Deserialize, Debug)]
struct Request {
reply_to: Aid,
}
#[derive(Serialize, Deserialize, Debug)]
struct Reply {}
init_test_log();
let tracker = AssertCollect::new();
let t = tracker.clone();
let (system1, system2) = start_and_connect_two_systems();
system1.init_current();
let aid = system1
.spawn()
.with((), move |_: (), context: Context, message: Message| {
let t = t.clone();
async move {
if let Some(msg) = message.content_as::<Request>() {
msg.reply_to.send_new(Reply {}).unwrap();
context.system.trigger_shutdown();
Ok(Status::stop(()))
} else if let Some(_) = message.content_as::<SystemMsg>() {
Ok(Status::done(()))
} else {
t.panic("Unexpected message received!")
}
}
})
.unwrap();
await_received(&aid, 1, 1000).unwrap();
let t = tracker.clone();
let serialized = bincode::serialize(&aid).unwrap();
system2
.spawn()
.with((), move |_: (), context: Context, message: Message| {
if let Some(_) = message.content_as::<Reply>() {
debug!("Received reply, shutting down");
context.system.trigger_shutdown();
future::ok(Status::stop(()))
} else if let Some(msg) = message.content_as::<SystemMsg>() {
match &*msg {
SystemMsg::Start => {
debug!("Starting request actor");
let target_aid: Aid = bincode::deserialize(&serialized).unwrap();
target_aid
.send_new(Request {
reply_to: context.aid.clone(),
})
.unwrap();
future::ok(Status::done(()))
}
_ => future::ok(Status::done(())),
}
} else {
t.panic("Unexpected message received!")
}
})
.unwrap();
await_two_system_shutdown(system1, system2);
tracker.collect();
}
#[test]
fn test_system_actor_find_by_name() {
init_test_log();
let tracker = AssertCollect::new();
let t = tracker.clone();
let (system1, system2) = start_and_connect_two_systems();
let aid1 = system1
.spawn()
.name("A")
.with((), |_: (), context: Context, message: Message| async move {
if let Some(_) = message.content_as::<bool>() {
context.system.trigger_shutdown();
Ok(Status::stop(()))
} else {
Ok(Status::done(()))
}
})
.unwrap();
await_received(&aid1, 1, 1000).unwrap();
system2
.spawn()
.with((), move |_: (), context: Context, message: Message| {
let aid1 = aid1.clone();
let t = t.clone();
async move {
if let Some(msg) = message.content_as::<SystemActorMessage>() {
match &*msg {
SystemActorMessage::FindByNameResult { aid: found, .. } => {
debug!("FindByNameResult received");
if let Some(target) = found {
t.assert(
target.uuid() == aid1.uuid(),
"Target is not expected Actor",
);
target.send_new(true).unwrap();
context.system.trigger_shutdown();
Ok(Status::done(()))
} else {
t.panic("Didn't find AID.")
}
}
_ => t.panic("Unexpected message received!"),
}
} else if let Some(msg) = message.content_as::<SystemMsg>() {
debug!("Actor started, attempting to send FindByName request");
if let SystemMsg::Start = &*msg {
context.system.send_to_system_actors(Message::new(
SystemActorMessage::FindByName {
reply_to: context.aid.clone(),
name: "A".to_string(),
},
));
Ok(Status::done(()))
} else {
t.panic("Unexpected message received!")
}
} else {
t.panic("Unexpected message received!")
}
}
})
.unwrap();
await_two_system_shutdown(system1, system2);
tracker.collect();
}
#[test]
#[cfg(feature = "actor-pool")]
fn test_spawn_pool() {
let tracker = AssertCollect::new();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
async fn handler(_: (), _: Context, _: Message) -> ActorResult<()> {
Ok(Status::done(()))
}
let mut aid_pool: RandomAidPool = system
.spawn_pool(3)
.name("handler")
.channel_size(100)
.with((), handler)
.unwrap();
for _ in 0..=100 {
aid_pool.send_new(0).unwrap();
}
sleep(10);
let aids: Vec<Aid> = aid_pool.into();
for aid in aids {
assert!(aid.received().unwrap() > 1);
}
system.trigger_and_await_shutdown(None);
tracker.collect();
}
}