1#[cfg(feature = "actor-pool")]
13use crate::actors::ActorPoolBuilder;
14use crate::actors::{Actor, ActorBuilder, ActorStream};
15use crate::executor::MaximExecutor;
16use crate::prelude::*;
17use crate::system::system_actor::SystemActor;
18use dashmap::DashMap;
19use log::{debug, error, info, trace, warn};
20use once_cell::sync::OnceCell;
21use secc::{SeccReceiver, SeccSender};
22use serde::{Deserialize, Serialize};
23use std::collections::{BinaryHeap, HashSet};
24use std::error::Error;
25use std::fmt;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Condvar, Mutex};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31use uuid::Uuid;
32
33mod system_actor;
34
35std::thread_local! {
39 static ACTOR_SYSTEM: OnceCell<ActorSystem> = OnceCell::new();
40}
41
42#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
45pub enum SystemMsg {
46 Start,
49
50 Stop,
55
56 Stopped { aid: Aid, error: Option<String> },
59}
60
61#[derive(Clone, Serialize, Deserialize)]
63pub enum WireMessage {
64 Hello {
66 system_actor_aid: Aid,
68 },
69 ActorMessage {
71 actor_uuid: Uuid,
73 system_uuid: Uuid,
75 message: Message,
77 },
78 DelayedActorMessage {
80 duration: Duration,
82 actor_uuid: Uuid,
84 system_uuid: Uuid,
86 message: Message,
88 },
89}
90
91#[derive(Clone, Debug, Serialize, Deserialize)]
95pub struct ActorSystemConfig {
96 pub message_channel_size: u16,
103 pub send_timeout: Duration,
109 pub thread_pool_size: u16,
114 pub warn_threshold: Duration,
119 pub time_slice: Duration,
125 pub thread_wait_time: Duration,
130 pub start_on_launch: bool,
133}
134
135impl ActorSystemConfig {
136 pub fn message_channel_size(mut self, value: u16) -> Self {
138 self.message_channel_size = value;
139 self
140 }
141
142 pub fn send_timeout(mut self, value: Duration) -> Self {
144 self.send_timeout = value;
145 self
146 }
147
148 pub fn thread_pool_size(mut self, value: u16) -> Self {
150 self.thread_pool_size = value;
151 self
152 }
153
154 pub fn warn_threshold(mut self, value: Duration) -> Self {
156 self.warn_threshold = value;
157 self
158 }
159
160 pub fn time_slice(mut self, value: Duration) -> Self {
162 self.time_slice = value;
163 self
164 }
165
166 pub fn thread_wait_time(mut self, value: Duration) -> Self {
168 self.thread_wait_time = value;
169 self
170 }
171}
172
173impl Default for ActorSystemConfig {
174 fn default() -> ActorSystemConfig {
176 ActorSystemConfig {
177 thread_pool_size: (num_cpus::get() * 4) as u16,
178 warn_threshold: Duration::from_millis(1),
179 time_slice: Duration::from_millis(1),
180 thread_wait_time: Duration::from_millis(100),
181 message_channel_size: 32,
182 send_timeout: Duration::from_millis(1),
183 start_on_launch: true,
184 }
185 }
186}
187
188#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
190pub enum SystemError {
191 NameAlreadyUsed(String),
195}
196
197impl std::fmt::Display for SystemError {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 write!(f, "{:?}", self)
200 }
201}
202
203impl Error for SystemError {}
204
205pub struct RemoteInfo {
207 pub system_uuid: Uuid,
209 pub sender: SeccSender<WireMessage>,
211 pub receiver: SeccReceiver<WireMessage>,
213 pub system_actor_aid: Aid,
215 _handle: JoinHandle<()>,
218}
219
220struct DelayedMessage {
222 uuid: Uuid,
224 destination: Aid,
226 instant: Instant,
228 message: Message,
230}
231
232impl std::cmp::PartialEq for DelayedMessage {
233 fn eq(&self, other: &Self) -> bool {
234 self.uuid == other.uuid
235 }
236}
237
238impl std::cmp::Eq for DelayedMessage {}
239
240impl std::cmp::PartialOrd for DelayedMessage {
241 fn partial_cmp(&self, other: &DelayedMessage) -> Option<std::cmp::Ordering> {
242 Some(other.instant.cmp(&self.instant)) }
244}
245
246impl std::cmp::Ord for DelayedMessage {
247 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
248 self.partial_cmp(other)
249 .expect("DelayedMessage::partial_cmp() returned None; can't happen")
250 }
251}
252
253pub(crate) struct ActorSystemData {
255 pub(crate) uuid: Uuid,
257 pub(crate) config: ActorSystemConfig,
259 threads: Mutex<Vec<JoinHandle<()>>>,
261 executor: MaximExecutor,
263 started: AtomicBool,
265 shutdown_triggered: Arc<(Mutex<bool>, Condvar)>,
267 actors_by_aid: Arc<DashMap<Aid, Arc<Actor>>>,
269 aids_by_uuid: Arc<DashMap<Uuid, Aid>>,
272 aids_by_name: Arc<DashMap<String, Aid>>,
275 monitoring_by_monitored: Arc<DashMap<Aid, HashSet<Aid>>>,
278 remotes: Arc<DashMap<Uuid, RemoteInfo>>,
280 delayed_messages: Arc<(Mutex<BinaryHeap<DelayedMessage>>, Condvar)>,
282}
283
284#[derive(Clone)]
286pub struct ActorSystem {
287 pub(crate) data: Arc<ActorSystemData>,
291}
292
293impl ActorSystem {
294 pub fn create(config: ActorSystemConfig) -> ActorSystem {
298 let uuid = Uuid::new_v4();
299 let threads = Mutex::new(Vec::with_capacity(config.thread_pool_size as usize));
300 let shutdown_triggered = Arc::new((Mutex::new(false), Condvar::new()));
301
302 let executor = MaximExecutor::new(shutdown_triggered.clone());
303
304 let start_on_launch = config.start_on_launch;
305
306 let system = ActorSystem {
308 data: Arc::new(ActorSystemData {
309 uuid,
310 config,
311 threads,
312 executor,
313 started: AtomicBool::new(false),
314 shutdown_triggered,
315 actors_by_aid: Arc::new(DashMap::default()),
316 aids_by_uuid: Arc::new(DashMap::default()),
317 aids_by_name: Arc::new(DashMap::default()),
318 monitoring_by_monitored: Arc::new(DashMap::default()),
319 remotes: Arc::new(DashMap::default()),
320 delayed_messages: Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new())),
321 }),
322 };
323
324 if start_on_launch {
326 system.start();
327 }
328
329 system
330 }
331
332 pub fn start(&self) {
334 if !self
335 .data
336 .started
337 .compare_and_swap(false, true, Ordering::Relaxed)
338 {
339 info!("ActorSystem {} has spawned", self.data.uuid);
340 self.data.executor.init(self);
341
342 {
346 let mut guard = self.data.threads.lock().unwrap();
347
348 for _ in 0..1 {
351 let thread = self.start_send_after_thread();
352 guard.push(thread);
353 }
354 }
355
356 self.spawn()
358 .name("System")
359 .with(SystemActor, SystemActor::processor)
360 .unwrap();
361 }
362 }
363
364 fn start_send_after_thread(&self) -> JoinHandle<()> {
368 let system = self.clone();
369 let delayed_messages = self.data.delayed_messages.clone();
370 thread::spawn(move || {
371 while !*system.data.shutdown_triggered.0.lock().unwrap() {
372 let (ref mutex, ref condvar) = &*delayed_messages;
373 let mut data = mutex.lock().unwrap();
374 match data.peek() {
375 None => {
376 let _ = condvar.wait(data).unwrap();
378 }
379 Some(msg) => {
380 let now = Instant::now();
381 if now >= msg.instant {
382 trace!("Sending delayed message");
383 msg.destination
384 .send(msg.message.clone())
385 .unwrap_or_else(|error| {
386 warn!(
387 "Cannot send scheduled message to {}: Error {:?}",
388 msg.destination, error
389 );
390 });
391 data.pop();
392 } else {
393 let duration = msg.instant.duration_since(now);
394 let _result = condvar.wait_timeout(data, duration).unwrap();
395 }
396 }
397 }
398 }
399 })
400 }
401
402 pub fn config(&self) -> &ActorSystemConfig {
404 &self.data.config
405 }
406
407 pub(crate) fn remote_sender(&self, system_uuid: &Uuid) -> Option<SeccSender<WireMessage>> {
409 self.data
410 .remotes
411 .get(system_uuid)
412 .map(|info| info.sender.clone())
413 }
414
415 pub fn connect(
418 &self,
419 sender: &SeccSender<WireMessage>,
420 receiver: &SeccReceiver<WireMessage>,
421 ) -> Uuid {
422 let hello = WireMessage::Hello {
424 system_actor_aid: self.system_actor_aid(),
425 };
426 sender.send(hello).unwrap();
427 debug!("Sending hello from {}", self.data.uuid);
428
429 let system_actor_aid =
431 match receiver.receive_await_timeout(self.data.config.thread_wait_time) {
432 Ok(message) => match message {
433 WireMessage::Hello { system_actor_aid } => system_actor_aid,
434 _ => panic!("Expected first message to be a Hello"),
435 },
436 Err(e) => panic!("Expected to read a Hello message {:?}", e),
437 };
438
439 let system = self.clone();
441 let receiver_clone = receiver.clone();
442 let thread_timeout = self.data.config.thread_wait_time;
443 let sys_uuid = system_actor_aid.system_uuid();
444 let handle = thread::spawn(move || {
445 system.init_current();
446 while !*system.data.shutdown_triggered.0.lock().unwrap() {
449 match receiver_clone.receive_await_timeout(thread_timeout) {
450 Err(_) => (), Ok(wire_msg) => system.process_wire_message(&sys_uuid, &wire_msg),
452 }
453 }
454 });
455
456 let info = RemoteInfo {
458 system_uuid: system_actor_aid.system_uuid(),
459 sender: sender.clone(),
460 receiver: receiver.clone(),
461 _handle: handle,
462 system_actor_aid,
463 };
464
465 let uuid = info.system_uuid;
466 self.data.remotes.insert(uuid.clone(), info);
467 uuid
468 }
469
470 pub fn disconnect(&self, system_uuid: Uuid) -> Result<(), AidError> {
473 self.data.remotes.remove(&system_uuid);
474 Ok(())
475 }
476
477 pub fn connect_with_channels(system1: &ActorSystem, system2: &ActorSystem) {
480 let (tx1, rx1) = secc::create::<WireMessage>(32, system1.data.config.thread_wait_time);
481 let (tx2, rx2) = secc::create::<WireMessage>(32, system2.data.config.thread_wait_time);
482
483 let system1_clone = system1.clone();
486 let system2_clone = system2.clone();
487 let h1 = thread::spawn(move || system1_clone.connect(&tx1, &rx2));
488 let h2 = thread::spawn(move || system2_clone.connect(&tx2, &rx1));
489
490 h1.join().unwrap();
492 h2.join().unwrap();
493 }
494
495 fn process_wire_message(&self, _uuid: &Uuid, wire_message: &WireMessage) {
499 match wire_message {
500 WireMessage::ActorMessage {
501 actor_uuid,
502 system_uuid,
503 message,
504 } => {
505 if let Some(aid) = self.find_aid(&system_uuid, &actor_uuid) {
506 aid.send(message.clone()).unwrap_or_else(|error| {
507 warn!("Could not send wire message to {}. Error: {}", aid, error);
508 })
509 }
510 }
511 WireMessage::DelayedActorMessage {
512 duration,
513 actor_uuid,
514 system_uuid,
515 message,
516 } => {
517 self.find_aid(&system_uuid, &actor_uuid)
518 .map(|aid| self.send_after(message.clone(), aid, *duration))
519 .expect("Error not handled yet");
520 }
521 WireMessage::Hello { system_actor_aid } => {
522 debug!("{:?} Got Hello from {}", self.data.uuid, system_actor_aid);
523 }
524 }
525 }
526
527 pub fn init_current(&self) {
533 ACTOR_SYSTEM.with(|actor_system| {
534 actor_system
535 .set(self.clone())
536 .expect("Unable to set ACTOR_SYSTEM.");
537 });
538 }
539
540 #[inline]
542 pub fn current() -> ActorSystem {
543 ACTOR_SYSTEM.with(|actor_system| {
544 actor_system
545 .get()
546 .expect("Thread local ACTOR_SYSTEM not set! See `ActorSystem::init_current()`")
547 .clone()
548 })
549 }
550
551 #[inline]
553 pub fn uuid(&self) -> Uuid {
554 self.data.uuid
555 }
556
557 pub fn trigger_shutdown(&self) {
559 let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
560 *mutex.lock().unwrap() = true;
561 condvar.notify_all();
562 }
563
564 pub fn await_shutdown(&self, timeout: impl Into<Option<Duration>>) -> ShutdownResult {
568 info!("System awaiting shutdown");
569
570 let start = Instant::now();
571 let timeout = timeout.into();
572
573 let result = match timeout {
574 Some(dur) => self.await_shutdown_trigger_with_timeout(dur),
575 None => self.await_shutdown_trigger_without_timeout(),
576 };
577
578 if let Some(r) = result {
579 return r;
580 }
581
582 let timeout = {
583 match timeout {
584 Some(timeout) => {
585 let elapsed = Instant::now().duration_since(start);
586 if let Some(t) = timeout.checked_sub(elapsed) {
587 Some(t)
588 } else {
589 return ShutdownResult::TimedOut;
590 }
591 }
592 None => None,
593 }
594 };
595
596 self.data.executor.await_shutdown(timeout)
598 }
599
600 fn await_shutdown_trigger_with_timeout(&self, mut dur: Duration) -> Option<ShutdownResult> {
601 let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
602 let mut guard = mutex.lock().unwrap();
603 while !*guard {
604 let started = Instant::now();
605 let (new_guard, timeout) = match condvar.wait_timeout(guard, dur) {
606 Ok(ret) => ret,
607 Err(_) => return Some(ShutdownResult::Panicked),
608 };
609
610 if timeout.timed_out() {
611 return Some(ShutdownResult::TimedOut);
612 }
613
614 guard = new_guard;
615 dur -= started.elapsed();
616 }
617 None
618 }
619
620 fn await_shutdown_trigger_without_timeout(&self) -> Option<ShutdownResult> {
621 let (ref mutex, ref condvar) = &*self.data.shutdown_triggered;
622 let mut guard = mutex.lock().unwrap();
623 while !*guard {
624 guard = match condvar.wait(guard) {
625 Ok(ret) => ret,
626 Err(_) => return Some(ShutdownResult::Panicked),
627 };
628 }
629 None
630 }
631
632 pub fn trigger_and_await_shutdown(
634 &self,
635 timeout: impl Into<Option<Duration>>,
636 ) -> ShutdownResult {
637 self.trigger_shutdown();
638 self.await_shutdown(timeout)
639 }
640
641 pub(crate) fn register_actor(
643 &self,
644 actor: Arc<Actor>,
645 stream: ActorStream,
646 ) -> Result<Aid, SystemError> {
647 let aids_by_name = &self.data.aids_by_name;
648 let actors_by_aid = &self.data.actors_by_aid;
649 let aids_by_uuid = &self.data.aids_by_uuid;
650 let aid = actor.context.aid.clone();
651 if let Some(name_string) = &aid.name() {
652 if aids_by_name.contains_key(name_string) {
653 return Err(SystemError::NameAlreadyUsed(name_string.clone()));
654 } else {
655 aids_by_name.insert(name_string.clone(), aid.clone());
656 }
657 }
658 actors_by_aid.insert(aid.clone(), actor);
659 aids_by_uuid.insert(aid.uuid(), aid.clone());
660 self.data.executor.register_actor(stream);
661 aid.send(Message::new(SystemMsg::Start)).unwrap(); Ok(aid)
663 }
664
665 pub fn spawn(&self) -> ActorBuilder {
686 ActorBuilder {
687 system: self.clone(),
688 name: None,
689 channel_size: None,
690 }
691 }
692
693 #[cfg(feature = "actor-pool")]
719 pub fn spawn_pool(&self, count: usize) -> ActorPoolBuilder {
720 ActorPoolBuilder::new(
721 ActorBuilder {
722 system: self.clone(),
723 name: None,
724 channel_size: None,
725 },
726 count,
727 )
728 }
729
730 pub(crate) fn schedule(&self, aid: Aid) {
737 let actors_by_aid = &self.data.actors_by_aid;
738 if actors_by_aid.contains_key(&aid) {
739 self.data.executor.wake(aid);
740 } else {
741 warn!(
744 "Attempted to schedule actor with aid {:?} on system with node_id {:?} but
745 the actor does not exist.",
746 aid,
747 self.data.uuid.to_string(),
748 );
749 }
750 }
751
752 pub fn stop_actor(&self, aid: &Aid) {
759 self.internal_stop_actor(aid, None);
760 }
761
762 pub(crate) fn internal_stop_actor(&self, aid: &Aid, error: impl Into<Option<StdError>>) {
765 {
766 let actors_by_aid = &self.data.actors_by_aid;
767 let aids_by_uuid = &self.data.aids_by_uuid;
768 let aids_by_name = &self.data.aids_by_name;
769 actors_by_aid.remove(aid);
770 aids_by_uuid.remove(&aid.uuid());
771 if let Some(name_string) = aid.name() {
772 aids_by_name.remove(&name_string);
773 }
774 aid.stop().unwrap();
775 }
776
777 if let Some((_, monitoring)) = self.data.monitoring_by_monitored.remove(&aid) {
780 let error = error.into().map(|e| format!("{}", e));
781 for m_aid in monitoring {
782 let value = SystemMsg::Stopped {
783 aid: aid.clone(),
784 error: error.clone(),
785 };
786 m_aid.send(Message::new(value)).unwrap_or_else(|error| {
787 error!(
788 "Could not send 'Stopped' to monitoring actor {}: Error: {:?}",
789 m_aid, error
790 );
791 });
792 }
793 }
794 }
795
796 pub fn is_actor_alive(&self, aid: &Aid) -> bool {
798 let actors_by_aid = &self.data.actors_by_aid;
799 actors_by_aid.contains_key(aid)
800 }
801
802 pub fn find_aid_by_uuid(&self, uuid: &Uuid) -> Option<Aid> {
805 let aids_by_uuid = &self.data.aids_by_uuid;
806 aids_by_uuid.get(uuid).map(|aid| aid.clone())
807 }
808
809 pub fn find_aid_by_name(&self, name: &str) -> Option<Aid> {
812 let aids_by_name = &self.data.aids_by_name;
813 aids_by_name.get(&name.to_string()).map(|aid| aid.clone())
814 }
815
816 fn find_aid(&self, system_uuid: &Uuid, actor_uuid: &Uuid) -> Option<Aid> {
821 if self.uuid() == *system_uuid {
822 self.find_aid_by_uuid(&actor_uuid)
823 } else {
824 None
825 }
826 }
827
828 pub fn system_actor_aid(&self) -> Aid {
830 self.find_aid_by_name(&"System").unwrap()
831 }
832
833 pub fn monitor(&self, monitoring: &Aid, monitored: &Aid) {
835 let mut monitoring_by_monitored = self
836 .data
837 .monitoring_by_monitored
838 .get_raw_mut_from_key(&monitored);
839 let monitoring_vec = monitoring_by_monitored
840 .entry(monitored.clone())
841 .or_insert(HashSet::new());
842 monitoring_vec.insert(monitoring.clone());
843 }
844
845 pub fn send_to_system_actors(&self, message: Message) {
848 let remotes = &*self.data.remotes;
849 trace!("Sending message to Remote System Actors");
850 for remote in remotes.iter() {
851 let aid = &remote.value().system_actor_aid;
852 aid.send(message.clone()).unwrap_or_else(|error| {
853 error!("Could not send to system actor {}. Error: {}", aid, error)
854 });
855 }
856 }
857
858 pub(crate) fn send_after(&self, message: Message, destination: Aid, delay: Duration) {
863 let instant = Instant::now().checked_add(delay).unwrap();
864 let entry = DelayedMessage {
865 uuid: Uuid::new_v4(),
866 destination,
867 instant,
868 message,
869 };
870 let (ref mutex, ref condvar) = &*self.data.delayed_messages;
871 let mut data = mutex.lock().unwrap();
872 data.push(entry);
873 condvar.notify_all();
874 }
875
876 #[cfg(test)]
877 pub(crate) fn executor(&self) -> &MaximExecutor {
878 &self.data.executor
879 }
880}
881
882impl fmt::Debug for ActorSystem {
883 fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
884 write!(
885 formatter,
886 "ActorSystem{{uuid: {}, config: {:?}}}",
887 self.data.uuid.to_string(),
888 self.data.config,
889 )
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use super::*;
896 use crate::system::system_actor::SystemActorMessage;
897 use crate::tests::*;
898 use futures::future;
899 use std::thread;
900
901 fn start_and_connect_two_systems() -> (ActorSystem, ActorSystem) {
903 let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
904 let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
905 ActorSystem::connect_with_channels(&system1, &system2);
906 (system1, system2)
907 }
908
909 fn await_two_system_shutdown(system1: ActorSystem, system2: ActorSystem) {
912 let h1 = thread::spawn(move || {
913 system1.await_shutdown(None);
914 });
915
916 let h2 = thread::spawn(move || {
917 system2.await_shutdown(None);
918 });
919
920 h1.join().unwrap();
921 h2.join().unwrap();
922 }
923
924 #[test]
927 fn test_shutdown_await_timeout() {
928 use std::time::Duration;
929
930 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
931 system
932 .spawn()
933 .with((), |_state: (), context: Context, _: Message| {
934 async move {
935 sleep(100);
937 context.system.trigger_shutdown();
938 Ok(Status::done(()))
939 }
940 })
941 .unwrap();
942
943 assert_eq!(
945 system.await_shutdown(Duration::from_millis(10)),
946 ShutdownResult::TimedOut
947 );
948
949 assert_eq!(
951 system.await_shutdown(Duration::from_millis(200)),
952 ShutdownResult::Ok
953 );
954
955 system.await_shutdown(None);
958 }
959
960 #[test]
962 fn test_find_by_uuid() {
963 init_test_log();
964
965 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
966 let aid = system.spawn().with((), simple_handler).unwrap();
967 aid.send_new(11).unwrap();
968 await_received(&aid, 2, 1000).unwrap();
969 let found = system.find_aid_by_uuid(&aid.uuid()).unwrap();
970 assert!(Aid::ptr_eq(&aid, &found));
971
972 assert_eq!(None, system.find_aid_by_uuid(&Uuid::new_v4()));
973
974 system.trigger_and_await_shutdown(None);
975 }
976
977 #[test]
979 fn test_find_by_name() {
980 init_test_log();
981
982 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
983 let aid = system.spawn().name("A").with((), simple_handler).unwrap();
984 aid.send_new(11).unwrap();
985 await_received(&aid, 2, 1000).unwrap();
986 let found = system.find_aid_by_name(&aid.name().unwrap()).unwrap();
987 assert!(Aid::ptr_eq(&aid, &found));
988
989 assert_eq!(None, system.find_aid_by_name("B"));
990
991 system.trigger_and_await_shutdown(None);
992 }
993
994 #[test]
996 fn test_find_aid() {
997 init_test_log();
998
999 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1000 let aid = system.spawn().name("A").with((), simple_handler).unwrap();
1001 await_received(&aid, 1, 1000).unwrap();
1002 let found = system.find_aid(&aid.system_uuid(), &aid.uuid()).unwrap();
1003 assert!(Aid::ptr_eq(&aid, &found));
1004
1005 assert_eq!(None, system.find_aid(&aid.system_uuid(), &Uuid::new_v4()));
1006 assert_eq!(None, system.find_aid(&Uuid::new_v4(), &aid.uuid()));
1007
1008 system.trigger_and_await_shutdown(None);
1009 }
1010
1011 #[test]
1014 fn test_stop_actor() {
1015 init_test_log();
1016
1017 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1018 let aid = system.spawn().name("A").with((), simple_handler).unwrap();
1019 aid.send_new(11).unwrap();
1020 await_received(&aid, 2, 1000).unwrap();
1021
1022 system.stop_actor(&aid);
1024 assert_eq!(false, system.is_actor_alive(&aid));
1025
1026 let sys_clone = system.clone();
1028 let actors_by_aid = &sys_clone.data.actors_by_aid;
1029 assert_eq!(false, actors_by_aid.contains_key(&aid));
1030 let aids_by_uuid = &sys_clone.data.aids_by_uuid;
1031 assert_eq!(false, aids_by_uuid.contains_key(&aid.uuid()));
1032 assert_eq!(None, system.find_aid_by_name("A"));
1033 assert_eq!(None, system.find_aid_by_uuid(&aid.uuid()));
1034
1035 system.trigger_and_await_shutdown(None);
1036 }
1037
1038 #[test]
1041 fn test_send_after() {
1042 init_test_log();
1043
1044 info!("Preparing test");
1045 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1046 let aid = system.spawn().name("A").with((), simple_handler).unwrap();
1047 await_received(&aid, 1, 1000).unwrap();
1048 info!("Test prepared, sending delayed message");
1049
1050 system.send_after(Message::new(11), aid.clone(), Duration::from_millis(10));
1051 info!("Sleeping for initial check");
1052 sleep(5);
1053 assert_eq!(1, aid.received().unwrap());
1054 info!("Sleeping till we're 100% sure we should have the message");
1055 sleep(10);
1056 assert_eq!(2, aid.received().unwrap());
1057
1058 system.trigger_and_await_shutdown(None);
1059 }
1060
1061 #[test]
1065 fn test_send_after_before_current() {
1066 init_test_log();
1067
1068 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1069
1070 let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
1071 await_received(&aid1, 1, 1000).unwrap();
1072 let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
1073 await_received(&aid2, 1, 1000).unwrap();
1074
1075 aid1.send_after(Message::new(11), Duration::from_millis(50))
1076 .unwrap();
1077
1078 aid2.send_after(Message::new(11), Duration::from_millis(10))
1079 .unwrap();
1080
1081 assert_eq!(1, aid1.received().unwrap());
1082 assert_eq!(1, aid2.received().unwrap());
1083
1084 sleep(15);
1087 assert_eq!(1, aid1.received().unwrap());
1088 assert_eq!(2, aid2.received().unwrap());
1089
1090 sleep(50);
1091 assert_eq!(2, aid1.received().unwrap());
1092 assert_eq!(2, aid2.received().unwrap());
1093
1094 system.trigger_and_await_shutdown(None);
1095 }
1096
1097 #[test]
1102 fn test_actor_not_in_map() {
1103 init_test_log();
1104
1105 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1106 let aid = system.spawn().with((), simple_handler).unwrap();
1107 await_received(&aid, 1, 1000).unwrap(); let sys_clone = system.clone();
1112 let actors_by_aid = &sys_clone.data.actors_by_aid;
1113 actors_by_aid.remove(&aid);
1114
1115 aid.send_new(11).unwrap();
1117
1118 system.trigger_and_await_shutdown(None);
1119 }
1120
1121 #[test]
1123 fn test_connect_with_channels() {
1124 let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1125 let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1126 ActorSystem::connect_with_channels(&system1, &system2);
1127 {
1128 system1
1129 .data
1130 .remotes
1131 .get(&system2.data.uuid)
1132 .expect("Unable to find connection with system 2 in system 1");
1133 }
1134 {
1135 system2
1136 .data
1137 .remotes
1138 .get(&system1.data.uuid)
1139 .expect("Unable to find connection with system 1 in system 2");
1140 }
1141 }
1142
1143 #[test]
1146 fn test_monitors() {
1147 init_test_log();
1148
1149 let tracker = AssertCollect::new();
1150 async fn monitor_handler(
1151 state: (Aid, AssertCollect),
1152 _: Context,
1153 message: Message,
1154 ) -> ActorResult<(Aid, AssertCollect)> {
1155 if let Some(msg) = message.content_as::<SystemMsg>() {
1156 match &*msg {
1157 SystemMsg::Stopped { aid, error } => {
1158 state
1159 .1
1160 .assert(Aid::ptr_eq(&state.0, aid), "Pointers are not equal!");
1161 state.1.assert(error.is_none(), "Actor was errored!");
1162 Ok(Status::done(state))
1163 }
1164 SystemMsg::Start => Ok(Status::done(state)),
1165 _ => state.1.panic("Received some other message!"),
1166 }
1167 } else {
1168 state.1.panic("Received some other message!")
1169 }
1170 }
1171
1172 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1173 let monitored = system.spawn().with((), simple_handler).unwrap();
1174 let not_monitoring = system.spawn().with((), simple_handler).unwrap();
1175 let monitoring1 = system
1176 .spawn()
1177 .with((monitored.clone(), tracker.clone()), monitor_handler)
1178 .unwrap();
1179 let monitoring2 = system
1180 .spawn()
1181 .with((monitored.clone(), tracker.clone()), monitor_handler)
1182 .unwrap();
1183 system.monitor(&monitoring1, &monitored);
1184 system.monitor(&monitoring2, &monitored);
1185
1186 {
1187 let monitoring_by_monitored = &system.data.monitoring_by_monitored;
1189 let m_set = monitoring_by_monitored.get(&monitored).unwrap();
1190 assert!(m_set.contains(&monitoring1));
1191 assert!(m_set.contains(&monitoring2));
1192 }
1193
1194 system.stop_actor(&monitored);
1196 await_received(&monitoring1, 2, 1000).unwrap();
1197 await_received(&monitoring2, 2, 1000).unwrap();
1198 await_received(¬_monitoring, 1, 1000).unwrap();
1199
1200 system.trigger_and_await_shutdown(None);
1201 tracker.collect();
1202 }
1203
1204 #[test]
1205 fn test_monitor_gets_panics_errors() {
1206 init_test_log();
1207
1208 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1209 let tracker = AssertCollect::new();
1210 let t = tracker.clone();
1211 let aid = system
1212 .spawn()
1213 .with((), |_: (), _: Context, msg: Message| {
1214 if let Some(_) = msg.content_as::<SystemMsg>() {
1215 debug!("Not panicking this time");
1216 return future::ok(Status::done(()));
1217 }
1218
1219 debug!("About to panic");
1220 panic!("I panicked")
1221 })
1222 .unwrap();
1223 let monitor = system
1224 .spawn()
1225 .with(aid.clone(), move |state: Aid, _: Context, msg: Message| {
1226 if let Some(msg) = msg.content_as::<SystemMsg>() {
1227 match &*msg {
1228 SystemMsg::Stopped { aid, error } => {
1229 t.assert(*aid == state, "Aid is not expected Aid");
1230 t.assert(error.is_some(), "Expected error");
1231 t.assert(
1232 error.as_ref().unwrap() == "I panicked",
1233 "Error message does not match",
1234 );
1235 future::ok(Status::stop(state))
1236 }
1237 SystemMsg::Start => future::ok(Status::done(state)),
1238 _ => t.panic("Unexpected message received!"),
1239 }
1240 } else {
1241 t.panic("Unexpected message received!")
1242 }
1243 })
1244 .unwrap();
1245 system.monitor(&monitor, &aid);
1246 aid.send_new(()).unwrap();
1247 await_received(&monitor, 2, 1000).unwrap();
1248 system.trigger_and_await_shutdown(Duration::from_millis(1000));
1249 tracker.collect();
1250 }
1251
1252 #[test]
1257 fn test_named_actor_restrictions() {
1258 init_test_log();
1259 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1260
1261 let aid1 = system.spawn().name("A").with((), simple_handler).unwrap();
1262 await_received(&aid1, 1, 1000).unwrap();
1263
1264 let aid2 = system.spawn().name("B").with((), simple_handler).unwrap();
1265 await_received(&aid2, 1, 1000).unwrap();
1266
1267 let result = system.spawn().name("A").with((), simple_handler);
1270 assert_eq!(Err(SystemError::NameAlreadyUsed("A".to_string())), result);
1271
1272 let found1 = system.find_aid_by_name("A").unwrap();
1274 assert_eq!(true, system.is_actor_alive(&aid1));
1275 assert!(Aid::ptr_eq(&aid1, &found1));
1276
1277 system.stop_actor(&aid2);
1279 assert_eq!(None, system.find_aid_by_name("B"));
1280 assert_eq!(None, system.find_aid_by_uuid(&aid2.uuid()));
1281
1282 let aid3 = system.spawn().name("B").with((), simple_handler).unwrap();
1284 await_received(&aid3, 1, 1000).unwrap();
1285 let found2 = system.find_aid_by_name("B").unwrap();
1286 assert!(Aid::ptr_eq(&aid3, &found2));
1287
1288 system.trigger_and_await_shutdown(None);
1289 }
1290
1291 #[test]
1293 fn test_remote_actors() {
1294 #[derive(Serialize, Deserialize, Debug)]
1296 struct Request {
1297 reply_to: Aid,
1298 }
1299
1300 #[derive(Serialize, Deserialize, Debug)]
1301 struct Reply {}
1302
1303 init_test_log();
1304 let tracker = AssertCollect::new();
1305 let t = tracker.clone();
1306 let (system1, system2) = start_and_connect_two_systems();
1307
1308 system1.init_current();
1309 let aid = system1
1310 .spawn()
1311 .with((), move |_: (), context: Context, message: Message| {
1312 let t = t.clone();
1313 async move {
1314 if let Some(msg) = message.content_as::<Request>() {
1315 msg.reply_to.send_new(Reply {}).unwrap();
1316 context.system.trigger_shutdown();
1317 Ok(Status::stop(()))
1318 } else if let Some(_) = message.content_as::<SystemMsg>() {
1319 Ok(Status::done(()))
1320 } else {
1321 t.panic("Unexpected message received!")
1322 }
1323 }
1324 })
1325 .unwrap();
1326 await_received(&aid, 1, 1000).unwrap();
1327
1328 let t = tracker.clone();
1329 let serialized = bincode::serialize(&aid).unwrap();
1330 system2
1331 .spawn()
1332 .with((), move |_: (), context: Context, message: Message| {
1333 if let Some(_) = message.content_as::<Reply>() {
1334 debug!("Received reply, shutting down");
1335 context.system.trigger_shutdown();
1336 future::ok(Status::stop(()))
1337 } else if let Some(msg) = message.content_as::<SystemMsg>() {
1338 match &*msg {
1339 SystemMsg::Start => {
1340 debug!("Starting request actor");
1341 let target_aid: Aid = bincode::deserialize(&serialized).unwrap();
1342 target_aid
1343 .send_new(Request {
1344 reply_to: context.aid.clone(),
1345 })
1346 .unwrap();
1347 future::ok(Status::done(()))
1348 }
1349 _ => future::ok(Status::done(())),
1350 }
1351 } else {
1352 t.panic("Unexpected message received!")
1353 }
1354 })
1355 .unwrap();
1356
1357 await_two_system_shutdown(system1, system2);
1358 tracker.collect();
1359 }
1360
1361 #[test]
1365 fn test_system_actor_find_by_name() {
1366 init_test_log();
1367 let tracker = AssertCollect::new();
1368 let t = tracker.clone();
1369 let (system1, system2) = start_and_connect_two_systems();
1370
1371 let aid1 = system1
1372 .spawn()
1373 .name("A")
1374 .with((), |_: (), context: Context, message: Message| async move {
1375 if let Some(_) = message.content_as::<bool>() {
1376 context.system.trigger_shutdown();
1377 Ok(Status::stop(()))
1378 } else {
1379 Ok(Status::done(()))
1380 }
1381 })
1382 .unwrap();
1383 await_received(&aid1, 1, 1000).unwrap();
1384
1385 system2
1386 .spawn()
1387 .with((), move |_: (), context: Context, message: Message| {
1388 let aid1 = aid1.clone();
1390 let t = t.clone();
1391 async move {
1392 if let Some(msg) = message.content_as::<SystemActorMessage>() {
1393 match &*msg {
1394 SystemActorMessage::FindByNameResult { aid: found, .. } => {
1395 debug!("FindByNameResult received");
1396 if let Some(target) = found {
1397 t.assert(
1398 target.uuid() == aid1.uuid(),
1399 "Target is not expected Actor",
1400 );
1401 target.send_new(true).unwrap();
1402 context.system.trigger_shutdown();
1403 Ok(Status::done(()))
1404 } else {
1405 t.panic("Didn't find AID.")
1406 }
1407 }
1408 _ => t.panic("Unexpected message received!"),
1409 }
1410 } else if let Some(msg) = message.content_as::<SystemMsg>() {
1411 debug!("Actor started, attempting to send FindByName request");
1412 if let SystemMsg::Start = &*msg {
1413 context.system.send_to_system_actors(Message::new(
1414 SystemActorMessage::FindByName {
1415 reply_to: context.aid.clone(),
1416 name: "A".to_string(),
1417 },
1418 ));
1419 Ok(Status::done(()))
1420 } else {
1421 t.panic("Unexpected message received!")
1422 }
1423 } else {
1424 t.panic("Unexpected message received!")
1425 }
1426 }
1427 })
1428 .unwrap();
1429
1430 await_two_system_shutdown(system1, system2);
1431 tracker.collect();
1432 }
1433
1434 #[test]
1436 #[cfg(feature = "actor-pool")]
1437 fn test_spawn_pool() {
1438 let tracker = AssertCollect::new();
1439 let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
1440
1441 async fn handler(_: (), _: Context, _: Message) -> ActorResult<()> {
1442 Ok(Status::done(()))
1443 }
1444
1445 let mut aid_pool: RandomAidPool = system
1447 .spawn_pool(3)
1448 .name("handler")
1449 .channel_size(100)
1450 .with((), handler)
1451 .unwrap();
1452
1453 for _ in 0..=100 {
1455 aid_pool.send_new(0).unwrap();
1456 }
1457
1458 sleep(10);
1460
1461 let aids: Vec<Aid> = aid_pool.into();
1463
1464 for aid in aids {
1466 assert!(aid.received().unwrap() > 1);
1467 }
1468
1469 system.trigger_and_await_shutdown(None);
1470 tracker.collect();
1471 }
1472}