1pub(crate) mod logger;
2pub(crate) mod timer;
3
4pub use self::timer::{BasicTimer, Timer};
6
7use std::{
8 fmt,
9 ops::Deref,
10 sync::{Arc, Mutex},
11 time::{Duration, Instant},
12};
13
14use chrono::prelude::*;
15use config::Config;
16use futures::{
17 channel::oneshot,
18 executor::{ThreadPool, ThreadPoolBuilder},
19 future::RemoteHandle,
20 task::{SpawnError, SpawnExt},
21 Future,
22};
23use rand;
24use uuid::Uuid;
25
26use crate::actor_ref::{
27 ActorRef, ActorRefFactory, ActorReference, BasicActorRef, Sender, Tell, TmpActorRefFactory,
28};
29use crate::{
30 actor::{props::ActorFactory, *},
31 kernel::provider::{create_root, Provider},
32 load_config,
33 system::logger::*,
34 system::timer::*,
35 validate::{validate_name, InvalidPath},
36 AnyMessage, Message,
37};
38use slog::{debug, Logger};
39
40#[derive(Clone, Debug)]
41pub enum SystemMsg {
42 ActorInit,
43 Command(SystemCmd),
44 Event(SystemEvent),
45 Failed(BasicActorRef),
46}
47
48unsafe impl Send for SystemMsg {}
49
50#[derive(Clone, Debug)]
51pub enum SystemCmd {
52 Stop,
53 Restart,
54}
55
56impl Into<SystemMsg> for SystemCmd {
57 fn into(self) -> SystemMsg {
58 SystemMsg::Command(self)
59 }
60}
61
62#[derive(Clone, Debug)]
63pub enum SystemEvent {
64 ActorCreated(ActorCreated),
66
67 ActorRestarted(ActorRestarted),
69
70 ActorTerminated(ActorTerminated),
72}
73
74impl Into<SystemMsg> for SystemEvent {
75 fn into(self) -> SystemMsg {
76 SystemMsg::Event(self)
77 }
78}
79
80#[derive(Clone, Debug)]
81pub struct ActorCreated {
82 pub actor: BasicActorRef,
83}
84
85#[derive(Clone, Debug)]
86pub struct ActorRestarted {
87 pub actor: BasicActorRef,
88}
89
90#[derive(Clone, Debug)]
91pub struct ActorTerminated {
92 pub actor: BasicActorRef,
93}
94
95impl Into<SystemEvent> for ActorCreated {
96 fn into(self) -> SystemEvent {
97 SystemEvent::ActorCreated(self)
98 }
99}
100
101impl Into<SystemEvent> for ActorRestarted {
102 fn into(self) -> SystemEvent {
103 SystemEvent::ActorRestarted(self)
104 }
105}
106
107impl Into<SystemEvent> for ActorTerminated {
108 fn into(self) -> SystemEvent {
109 SystemEvent::ActorTerminated(self)
110 }
111}
112
113impl Into<SystemMsg> for ActorCreated {
114 fn into(self) -> SystemMsg {
115 SystemMsg::Event(SystemEvent::ActorCreated(self))
116 }
117}
118
119impl Into<SystemMsg> for ActorRestarted {
120 fn into(self) -> SystemMsg {
121 SystemMsg::Event(SystemEvent::ActorRestarted(self))
122 }
123}
124
125impl Into<SystemMsg> for ActorTerminated {
126 fn into(self) -> SystemMsg {
127 SystemMsg::Event(SystemEvent::ActorTerminated(self))
128 }
129}
130
131#[derive(Clone, Debug)]
132pub enum SystemEventType {
133 ActorTerminated,
134 ActorRestarted,
135 ActorCreated,
136}
137
138pub enum SystemError {
139 ModuleFailed(String),
140 InvalidName(String),
141}
142
143impl fmt::Display for SystemError {
144 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145 match *self {
146 Self::ModuleFailed(ref m) => f.write_str(&format!(
147 "Failed to create actor system. Cause: Sub module failed to start ({})",
148 m
149 )),
150 Self::InvalidName(ref name) => f.write_str(&format!(
151 "Failed to create actor system. Cause: Invalid actor system name ({})",
152 name
153 )),
154 }
155 }
156}
157
158impl fmt::Debug for SystemError {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 f.write_str(self.to_string().as_str())
161 }
162}
163
164pub struct ProtoSystem {
168 id: Uuid,
169 name: String,
170 pub host: Arc<String>,
171 config: Config,
172 pub(crate) sys_settings: SystemSettings,
173 started_at: DateTime<Utc>,
174}
175
176#[derive(Default)]
177pub struct SystemBuilder {
178 name: Option<String>,
179 cfg: Option<Config>,
180 log: Option<Logger>,
181 exec: Option<ThreadPool>,
182}
183
184impl SystemBuilder {
185 #[must_use]
186 pub fn new() -> Self {
187 Self::default()
188 }
189
190 pub fn create(self) -> Result<ActorSystem, SystemError> {
191 let name = self.name.unwrap_or_else(|| "riker".into());
192 let cfg = self.cfg.unwrap_or_else(load_config);
193 let exec = self.exec.unwrap_or_else(|| default_exec(&cfg));
194 let log = self.log.unwrap_or_else(|| default_log(&cfg));
195
196 ActorSystem::create(&name, exec, log, &cfg)
197 }
198
199 #[must_use]
200 pub fn name(self, name: &str) -> Self {
201 Self {
202 name: Some(name.to_string()),
203 ..self
204 }
205 }
206
207 #[must_use]
208 pub fn cfg(self, cfg: Config) -> Self {
209 Self {
210 cfg: Some(cfg),
211 ..self
212 }
213 }
214
215 #[must_use]
216 pub fn exec(self, exec: ThreadPool) -> Self {
217 Self {
218 exec: Some(exec),
219 ..self
220 }
221 }
222
223 #[must_use]
224 pub fn log(self, log: Logger) -> Self {
225 Self {
226 log: Some(log),
227 ..self
228 }
229 }
230}
231
232#[derive(Clone)]
240pub struct ActorSystem {
241 proto: Arc<ProtoSystem>,
242 sys_actors: Option<SysActors>,
243 log: Logger,
244 debug: bool,
245 pub exec: ThreadPool,
246 pub timer: TimerRef,
247 pub sys_channels: Option<SysChannels>,
248 pub(crate) provider: Provider,
249}
250
251impl fmt::Display for ActorSystem {
252 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253 f.write_str(self.get_tree().as_str())
254 }
255}
256
257impl ActorSystem {
258 pub fn new() -> Result<Self, SystemError> {
262 let cfg = load_config();
263 let exec = default_exec(&cfg);
264 let log = default_log(&cfg);
265
266 Self::create("riker", exec, log, &cfg)
267 }
268
269 pub fn with_name(name: &str) -> Result<Self, SystemError> {
273 let cfg = load_config();
274 let exec = default_exec(&cfg);
275 let log = default_log(&cfg);
276
277 Self::create(name, exec, log, &cfg)
278 }
279
280 pub fn with_config(name: &str, cfg: &Config) -> Result<Self, SystemError> {
282 let exec = default_exec(cfg);
283 let log = default_log(cfg);
284
285 Self::create(name, exec, log, cfg)
286 }
287
288 fn create(
289 name: &str,
290 exec: ThreadPool,
291 log: Logger,
292 cfg: &Config,
293 ) -> Result<Self, SystemError> {
294 validate_name(name).map_err(|_| SystemError::InvalidName(name.into()))?;
295 let debug = cfg.get_bool("debug").unwrap();
297
298 if debug {
300 debug!(log, "Starting actor system: System[{}]", name);
301 }
302
303 let prov = Provider::new(log.clone());
304 let timer = BasicTimer::start(cfg);
305
306 let proto = ProtoSystem {
308 id: Uuid::new_v4(),
309 name: name.to_string(),
310 host: Arc::new("localhost".to_string()),
311 config: cfg.clone(),
312 sys_settings: SystemSettings::from(cfg),
313 started_at: Utc::now(),
314 };
315
316 let mut sys = Self {
318 proto: Arc::new(proto),
319 debug,
320 exec,
321 log,
322 timer,
324 sys_channels: None,
325 sys_actors: None,
326 provider: prov.clone(),
327 };
328
329 let sys_actors = create_root(&sys);
331 sys.sys_actors = Some(sys_actors);
332
333 sys.sys_channels = Some(sys_channels(&prov, &sys)?);
335
336 let props = DeadLetterLogger::props(sys.dead_letters(), sys.log());
338 let _dl_logger = sys_actor_of_props(&prov, &sys, props, "dl_logger")?;
339
340 sys.complete_start();
341
342 debug!(sys.log, "Actor system [{}] [{}] started", sys.id(), name);
343
344 Ok(sys)
345 }
346
347 fn complete_start(&self) {
348 self.sys_actors.as_ref().unwrap().user.sys_init(self);
349 }
350
351 fn start_date(&self) -> &DateTime<Utc> {
353 &self.proto.started_at
354 }
355
356 #[allow(clippy::cast_sign_loss)]
358 pub fn uptime(&self) -> u64 {
359 let now = Utc::now();
360 now.time()
361 .signed_duration_since(self.start_date().time())
362 .num_seconds() as u64
363 }
364
365 pub fn host(&self) -> Arc<String> {
371 self.proto.host.clone()
372 }
373
374 pub fn id(&self) -> Uuid {
376 self.proto.id
377 }
378
379 pub fn name(&self) -> String {
381 self.proto.name.clone()
382 }
383
384 pub fn print_tree(&self) {
385 println!("{}", self.get_tree());
386 }
387
388 #[allow(clippy::items_after_statements)]
389 pub fn get_tree(&self) -> String {
390 let mut tree_str: String = String::new();
391 let root = self.sys_actors.as_ref().unwrap().root.clone();
392
393 fn get_node(
394 mut tree_str: &mut String,
395 sys: &ActorSystem,
396 node: &BasicActorRef,
397 indent: &str,
398 ) -> String {
399 if node.is_root() {
400 tree_str.push_str(&format!("{}\n", sys.name()));
401
402 for actor in node.children() {
403 get_node(&mut tree_str, sys, &actor, "");
404 }
405 } else {
406 tree_str.push_str(&format!("{}\u{2514}\u{2500} {}\n", indent, node.name()));
407
408 for actor in node.children() {
409 get_node(tree_str, sys, &actor, &(indent.to_string() + " "));
410 }
411 }
412 (*tree_str).to_string()
413 }
414
415 get_node(&mut tree_str, self, &root, "")
416 }
417
418 pub fn user_root(&self) -> &BasicActorRef {
425 &self.sys_actors.as_ref().unwrap().user
426 }
427
428 pub fn sys_root(&self) -> &BasicActorRef {
430 &self.sys_actors.as_ref().unwrap().sysm
431 }
432
433 pub fn temp_root(&self) -> &BasicActorRef {
435 &self.sys_actors.as_ref().unwrap().temp
436 }
437
438 pub fn sys_events(&self) -> &ActorRef<ChannelMsg<SystemEvent>> {
440 &self.sys_channels.as_ref().unwrap().sys_events
441 }
442
443 pub fn dead_letters(&self) -> &ActorRef<DLChannelMsg> {
445 &self.sys_channels.as_ref().unwrap().dead_letters
446 }
447
448 pub fn publish_event(&self, evt: SystemEvent) {
449 let topic = Topic::from(&evt);
450 self.sys_events().tell(Publish { topic, msg: evt }, None);
451 }
452
453 pub fn config(&self) -> Config {
455 self.proto.config.clone()
456 }
457
458 pub(crate) fn sys_settings(&self) -> &SystemSettings {
459 &self.proto.sys_settings
460 }
461
462 pub fn sys_actor_of_props<A>(
464 &self,
465 props: BoxActorProd<A>,
466 name: &str,
467 ) -> Result<ActorRef<A::Msg>, CreateError>
468 where
469 A: Actor,
470 {
471 self.provider
472 .create_actor(props, name, self.sys_root(), self)
473 }
474
475 pub fn sys_actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
476 where
477 A: ActorFactory,
478 {
479 self.provider
480 .create_actor(Props::new_no_args(A::create), name, self.sys_root(), self)
481 }
482
483 pub fn sys_actor_of_args<A, Args>(
484 &self,
485 name: &str,
486 args: Args,
487 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
488 where
489 Args: ActorArgs,
490 A: ActorFactoryArgs<Args>,
491 {
492 self.provider.create_actor(
493 Props::new_args(A::create_args, args),
494 name,
495 self.sys_root(),
496 self,
497 )
498 }
499
500 #[inline]
501 pub fn log(&self) -> Logger {
502 self.log.clone()
503 }
504
505 pub fn when_terminated(&self) -> Terminated {
511 let (tx, rx) = oneshot::channel::<()>();
512 let tx = Arc::new(Mutex::new(Some(tx)));
513
514 self.tmp_actor_of_args::<WhenTerminatedActor, _>(tx)
515 .unwrap();
516
517 rx
518 }
519
520 pub fn shutdown(&self) -> Shutdown {
528 let receiver = self.when_terminated();
529
530 self.stop(self.user_root());
541
542 receiver
543 }
544}
545
546unsafe impl Send for ActorSystem {}
547unsafe impl Sync for ActorSystem {}
548
549impl ActorRefFactory for ActorSystem {
550 fn actor_of_props<A>(
551 &self,
552 props: BoxActorProd<A>,
553 name: &str,
554 ) -> Result<ActorRef<A::Msg>, CreateError>
555 where
556 A: Actor,
557 {
558 self.provider
559 .create_actor(props, name, self.user_root(), self)
560 }
561
562 fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
563 where
564 A: ActorFactory,
565 {
566 self.provider
567 .create_actor(Props::new_no_args(A::create), name, self.user_root(), self)
568 }
569
570 fn actor_of_args<A, Args>(
571 &self,
572 name: &str,
573 args: Args,
574 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
575 where
576 Args: ActorArgs,
577 A: ActorFactoryArgs<Args>,
578 {
579 self.provider.create_actor(
580 Props::new_args(A::create_args, args),
581 name,
582 self.user_root(),
583 self,
584 )
585 }
586
587 fn stop(&self, actor: impl ActorReference) {
588 actor.sys_tell(SystemCmd::Stop.into());
589 }
590}
591
592impl TmpActorRefFactory for ActorSystem {
593 fn tmp_actor_of_props<A>(&self, props: BoxActorProd<A>) -> Result<ActorRef<A::Msg>, CreateError>
594 where
595 A: Actor,
596 {
597 let name = format!("{}", rand::random::<u64>());
598 self.provider
599 .create_actor(props, &name, self.temp_root(), self)
600 }
601
602 fn tmp_actor_of<A>(&self) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
603 where
604 A: ActorFactory,
605 {
606 let name = format!("{}", rand::random::<u64>());
607 self.provider
608 .create_actor(Props::new_no_args(A::create), &name, self.temp_root(), self)
609 }
610
611 fn tmp_actor_of_args<A, Args>(
612 &self,
613 args: Args,
614 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
615 where
616 Args: ActorArgs,
617 A: ActorFactoryArgs<Args>,
618 {
619 let name = format!("{}", rand::random::<u64>());
620 self.provider.create_actor(
621 Props::new_args(A::create_args, args),
622 &name,
623 self.temp_root(),
624 self,
625 )
626 }
627}
628
629impl ActorSelectionFactory for ActorSystem {
630 fn select(&self, path: &str) -> Result<ActorSelection, InvalidPath> {
631 let anchor = self.user_root();
632 let (anchor, path_str) = if path.starts_with('/') {
633 let anchor_path = format!("{}/", anchor.path().deref().clone());
634 let path = path.to_string().replace(&anchor_path, "");
635
636 (anchor, path)
637 } else {
638 (anchor, path.to_string())
639 };
640
641 ActorSelection::new(
642 anchor.clone(),
643 path_str,
645 )
646 }
647}
648
649pub trait Run {
652 fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
653 where
654 Fut: Future + Send + 'static,
655 <Fut as Future>::Output: Send;
656}
657
658impl Run for ActorSystem {
659 fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
660 where
661 Fut: Future + Send + 'static,
662 <Fut as Future>::Output: Send,
663 {
664 let mut exec = self.exec.clone();
665 exec.spawn_with_handle(future)
666 }
667}
668
669impl fmt::Debug for ActorSystem {
670 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
671 write!(
672 f,
673 "ActorSystem[Name: {}, Start Time: {}, Uptime: {} seconds]",
674 self.name(),
675 self.start_date(),
676 self.uptime()
677 )
678 }
679}
680
681impl Timer for ActorSystem {
682 fn schedule<T, M>(
683 &self,
684 initial_delay: Duration,
685 interval: Duration,
686 receiver: ActorRef<M>,
687 sender: Sender,
688 msg: T,
689 ) -> Uuid
690 where
691 T: Message + Into<M>,
692 M: Message,
693 {
694 let id = Uuid::new_v4();
695 let msg: M = msg.into();
696
697 let job = RepeatJob {
698 id,
699 send_at: Instant::now() + initial_delay,
700 interval,
701 receiver: receiver.into(),
702 sender,
703 msg: AnyMessage::new(msg, false),
704 };
705
706 let _ = self.timer.send(Job::Repeat(job));
707 id
708 }
709
710 fn schedule_once<T, M>(
711 &self,
712 delay: Duration,
713 receiver: ActorRef<M>,
714 sender: Sender,
715 msg: T,
716 ) -> Uuid
717 where
718 T: Message + Into<M>,
719 M: Message,
720 {
721 let id = Uuid::new_v4();
722 let msg: M = msg.into();
723
724 let job = OnceJob {
725 id,
726 send_at: Instant::now() + delay,
727 receiver: receiver.into(),
728 sender,
729 msg: AnyMessage::new(msg, true),
730 };
731
732 let _ = self.timer.send(Job::Once(job));
733 id
734 }
735
736 fn schedule_at_time<T, M>(
737 &self,
738 time: DateTime<Utc>,
739 receiver: ActorRef<M>,
740 sender: Sender,
741 msg: T,
742 ) -> Uuid
743 where
744 T: Message + Into<M>,
745 M: Message,
746 {
747 let delay = std::cmp::max(time.timestamp() - Utc::now().timestamp(), 0 as i64);
748 #[allow(clippy::cast_sign_loss)]
749 let delay = Duration::from_secs(delay as u64);
750
751 let id = Uuid::new_v4();
752 let msg: M = msg.into();
753
754 let job = OnceJob {
755 id,
756 send_at: Instant::now() + delay,
757 receiver: receiver.into(),
758 sender,
759 msg: AnyMessage::new(msg, true),
760 };
761
762 let _ = self.timer.send(Job::Once(job));
763 id
764 }
765
766 fn cancel_schedule(&self, id: Uuid) {
767 let _ = self.timer.send(Job::Cancel(id));
768 }
769}
770
771fn sys_actor_of_props<A>(
774 prov: &Provider,
775 sys: &ActorSystem,
776 props: BoxActorProd<A>,
777 name: &str,
778) -> Result<ActorRef<A::Msg>, SystemError>
779where
780 A: Actor,
781{
782 prov.create_actor(props, name, sys.sys_root(), sys)
783 .map_err(|_| SystemError::ModuleFailed(name.into()))
784}
785
786fn sys_actor_of<A>(
787 prov: &Provider,
788 sys: &ActorSystem,
789 name: &str,
790) -> Result<ActorRef<<A as Actor>::Msg>, SystemError>
791where
792 A: ActorFactory,
793{
794 prov.create_actor(Props::new_no_args(A::create), name, sys.sys_root(), sys)
795 .map_err(|_| SystemError::ModuleFailed(name.into()))
796}
797
798fn sys_channels(prov: &Provider, sys: &ActorSystem) -> Result<SysChannels, SystemError> {
818 let sys_events = sys_actor_of::<EventsChannel>(prov, sys, "sys_events")?;
819 let dead_letters = sys_actor_of::<Channel<DeadLetter>>(prov, sys, "dead_letters")?;
820
821 Ok(SysChannels {
828 sys_events,
829 dead_letters,
830 })
831}
832
833pub struct SystemSettings {
834 pub msg_process_limit: u32,
835}
836
837impl<'a> From<&'a Config> for SystemSettings {
838 fn from(config: &Config) -> Self {
839 Self {
840 msg_process_limit: config.get::<u32>("mailbox.msg_process_limit").unwrap(),
841 }
842 }
843}
844
845struct ThreadPoolConfig {
846 pool_size: usize,
847}
848
849impl<'a> From<&'a Config> for ThreadPoolConfig {
850 fn from(config: &Config) -> Self {
851 Self {
852 pool_size: config.get::<usize>("dispatcher.pool_size").unwrap(),
853 }
854 }
855}
856
857fn default_exec(cfg: &Config) -> ThreadPool {
858 let exec_cfg = ThreadPoolConfig::from(cfg);
859 ThreadPoolBuilder::new()
860 .pool_size(exec_cfg.pool_size)
861 .name_prefix("pool-thread-#")
862 .create()
863 .unwrap()
864}
865
866#[derive(Clone)]
867pub struct SysActors {
868 pub root: BasicActorRef,
869 pub user: BasicActorRef,
870 pub sysm: BasicActorRef,
871 pub temp: BasicActorRef,
872}
873
874#[derive(Clone)]
875pub struct SysChannels {
876 pub sys_events: ActorRef<ChannelMsg<SystemEvent>>,
877 pub dead_letters: ActorRef<DLChannelMsg>,
878}
879
880pub type Shutdown = oneshot::Receiver<()>;
881pub type Terminated = oneshot::Receiver<()>;
882
883#[derive(Clone)]
884struct WhenTerminatedActor {
885 tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
886}
887
888impl ActorFactoryArgs<Arc<Mutex<Option<oneshot::Sender<()>>>>> for WhenTerminatedActor {
889 fn create_args(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
890 Self::new(tx)
891 }
892}
893
894impl WhenTerminatedActor {
895 fn new(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
896 Self { tx }
897 }
898}
899
900impl Actor for WhenTerminatedActor {
901 type Msg = SystemEvent;
902
903 fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
904 let sub = Subscribe {
905 topic: SysTopic::ActorTerminated.into(),
906 actor: Box::new(ctx.myself.clone()),
907 };
908 ctx.system.sys_events().tell(sub, None);
909 }
910
911 fn recv(&mut self, _: &Context<Self::Msg>, _: Self::Msg, _: Option<BasicActorRef>) {}
912
913 fn sys_recv(
914 &mut self,
915 ctx: &Context<Self::Msg>,
916 msg: SystemMsg,
917 sender: Option<BasicActorRef>,
918 ) {
919 if let SystemMsg::Event(evt) = msg {
920 if let SystemEvent::ActorTerminated(terminated) = evt {
921 self.receive(ctx, terminated, sender);
922 }
923 }
924 }
925}
926
927impl Receive<ActorTerminated> for WhenTerminatedActor {
928 type Msg = SystemEvent;
929
930 fn receive(
932 &mut self,
933 ctx: &Context<Self::Msg>,
934 msg: ActorTerminated,
935 _sender: Option<BasicActorRef>,
936 ) {
937 if &msg.actor == ctx.system.user_root() {
938 if let Ok(ref mut tx) = self.tx.lock() {
939 if let Some(tx) = tx.take() {
940 tx.send(()).unwrap();
941 }
942 }
943 }
944 }
945}