1pub(crate) mod logger;
2pub(crate) mod timer;
3
4use std::{error::Error, fmt};
5
6use crate::actor::BasicActorRef;
7
8pub use self::timer::{BasicTimer, ScheduleId, Timer};
10
11#[derive(Clone, Debug)]
12pub enum SystemMsg {
13 ActorInit,
14 Command(SystemCmd),
15 Event(SystemEvent),
16 Failed(BasicActorRef),
17}
18
19unsafe impl Send for SystemMsg {}
20
21#[derive(Clone, Debug)]
22pub enum SystemCmd {
23 Stop,
24 Restart,
25}
26
27impl Into<SystemMsg> for SystemCmd {
28 fn into(self) -> SystemMsg {
29 SystemMsg::Command(self)
30 }
31}
32
33#[derive(Clone, Debug)]
34pub enum SystemEvent {
35 ActorCreated(ActorCreated),
37
38 ActorRestarted(ActorRestarted),
40
41 ActorTerminated(ActorTerminated),
43}
44
45impl Into<SystemMsg> for SystemEvent {
46 fn into(self) -> SystemMsg {
47 SystemMsg::Event(self)
48 }
49}
50
51#[derive(Clone, Debug)]
52pub struct ActorCreated {
53 pub actor: BasicActorRef,
54}
55
56#[derive(Clone, Debug)]
57pub struct ActorRestarted {
58 pub actor: BasicActorRef,
59}
60
61#[derive(Clone, Debug)]
62pub struct ActorTerminated {
63 pub actor: BasicActorRef,
64}
65
66impl Into<SystemEvent> for ActorCreated {
67 fn into(self) -> SystemEvent {
68 SystemEvent::ActorCreated(self)
69 }
70}
71
72impl Into<SystemEvent> for ActorRestarted {
73 fn into(self) -> SystemEvent {
74 SystemEvent::ActorRestarted(self)
75 }
76}
77
78impl Into<SystemEvent> for ActorTerminated {
79 fn into(self) -> SystemEvent {
80 SystemEvent::ActorTerminated(self)
81 }
82}
83
84impl Into<SystemMsg> for ActorCreated {
85 fn into(self) -> SystemMsg {
86 SystemMsg::Event(SystemEvent::ActorCreated(self))
87 }
88}
89
90impl Into<SystemMsg> for ActorRestarted {
91 fn into(self) -> SystemMsg {
92 SystemMsg::Event(SystemEvent::ActorRestarted(self))
93 }
94}
95
96impl Into<SystemMsg> for ActorTerminated {
97 fn into(self) -> SystemMsg {
98 SystemMsg::Event(SystemEvent::ActorTerminated(self))
99 }
100}
101
102#[derive(Clone, Debug)]
103pub enum SystemEventType {
104 ActorTerminated,
105 ActorRestarted,
106 ActorCreated,
107}
108
109pub enum SystemError {
110 ModuleFailed(String),
111 InvalidName(String),
112}
113
114impl Error for SystemError {
115 fn description(&self) -> &str {
116 match *self {
117 SystemError::ModuleFailed(_) => {
118 "Failed to create actor system. Cause: Sub module failed to start"
119 }
120 SystemError::InvalidName(_) => {
121 "Failed to create actor system. Cause: Invalid actor system name"
122 }
123 }
124 }
125}
126
127impl fmt::Display for SystemError {
128 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129 match *self {
130 SystemError::ModuleFailed(ref m) => {
131 f.write_str(&format!("{} ({})", self.to_string(), m))
132 }
133 SystemError::InvalidName(ref name) => {
134 f.write_str(&format!("{} ({})", self.to_string(), name))
135 }
136 }
137 }
138}
139
140impl fmt::Debug for SystemError {
141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142 f.write_str(&self.to_string())
143 }
144}
145use std::{
146 ops::Deref,
147 sync::{Arc, Mutex},
148 time::{Duration, Instant},
149};
150
151use chrono::prelude::*;
152use config::Config;
153use futures::{
154 channel::oneshot,
155 executor::{ThreadPool, ThreadPoolBuilder},
156 future::RemoteHandle,
157 task::{SpawnError, SpawnExt},
158 Future,
159};
160
161use uuid::Uuid;
162
163use crate::{
164 actor::{props::ActorFactory, *},
165 kernel::provider::{create_root, Provider},
166 load_config,
167 system::logger::*,
168 system::timer::*,
169 validate::{validate_name, InvalidPath},
170 AnyMessage, Message,
171};
172use slog::{debug, Logger};
173
174pub struct ProtoSystem {
178 id: Uuid,
179 name: String,
180 pub host: Arc<str>,
181 config: Config,
182 pub(crate) sys_settings: SystemSettings,
183 started_at: DateTime<Utc>,
184}
185
186#[derive(Default)]
187pub struct SystemBuilder {
188 name: Option<String>,
189 cfg: Option<Config>,
190 log: Option<Logger>,
191 exec: Option<ThreadPool>,
192}
193
194impl SystemBuilder {
195 pub fn new() -> Self {
196 SystemBuilder::default()
197 }
198
199 pub fn create(self) -> Result<ActorSystem, SystemError> {
200 let name = self.name.unwrap_or_else(|| "riker".to_string());
201 let cfg = self.cfg.unwrap_or_else(load_config);
202 let exec = self.exec.unwrap_or_else(|| default_exec(&cfg));
203 let log = self
204 .log
205 .map(|log| LoggingSystem::new(log, None))
206 .unwrap_or_else(|| default_log(&cfg));
207
208 ActorSystem::create(name.as_ref(), exec, log, cfg)
209 }
210
211 pub fn name(self, name: &str) -> Self {
212 SystemBuilder {
213 name: Some(name.to_string()),
214 ..self
215 }
216 }
217
218 pub fn cfg(self, cfg: Config) -> Self {
219 SystemBuilder {
220 cfg: Some(cfg),
221 ..self
222 }
223 }
224
225 pub fn exec(self, exec: ThreadPool) -> Self {
226 SystemBuilder {
227 exec: Some(exec),
228 ..self
229 }
230 }
231
232 pub fn log(self, log: Logger) -> Self {
233 SystemBuilder {
234 log: Some(log),
235 ..self
236 }
237 }
238}
239
240#[derive(Clone)]
242pub struct LoggingSystem {
243 log: Logger,
245 global_logger_guard: Option<GlobalLoggerGuard>,
247}
248
249impl LoggingSystem {
250 pub(crate) fn new(log: Logger, global_logger_guard: Option<GlobalLoggerGuard>) -> Self {
251 Self {
252 log,
253 global_logger_guard,
254 }
255 }
256}
257
258impl Deref for LoggingSystem {
259 type Target = Logger;
260
261 fn deref(&self) -> &Self::Target {
262 &self.log
263 }
264}
265
266#[allow(dead_code)]
274#[derive(Clone)]
275pub struct ActorSystem {
276 proto: Arc<ProtoSystem>,
277 sys_actors: Option<SysActors>,
278 log: LoggingSystem,
279 debug: bool,
280 pub exec: ThreadPool,
281 pub timer: TimerRef,
282 pub sys_channels: Option<SysChannels>,
283 pub(crate) provider: Provider,
284}
285
286impl ActorSystem {
287 pub fn new() -> Result<ActorSystem, SystemError> {
291 let cfg = load_config();
292 let exec = default_exec(&cfg);
293 let log = default_log(&cfg);
294
295 ActorSystem::create("riker", exec, log, cfg)
296 }
297
298 pub fn with_name(name: &str) -> Result<ActorSystem, SystemError> {
302 let cfg = load_config();
303 let exec = default_exec(&cfg);
304 let log = default_log(&cfg);
305
306 ActorSystem::create(name, exec, log, cfg)
307 }
308
309 pub fn with_config(name: &str, cfg: Config) -> Result<ActorSystem, SystemError> {
311 let exec = default_exec(&cfg);
312 let log = default_log(&cfg);
313
314 ActorSystem::create(name, exec, log, cfg)
315 }
316
317 fn create(
318 name: &str,
319 exec: ThreadPool,
320 log: LoggingSystem,
321 cfg: Config,
322 ) -> Result<ActorSystem, SystemError> {
323 validate_name(name).map_err(|_| SystemError::InvalidName(name.into()))?;
324 let debug = cfg.get_bool("debug").unwrap();
326
327 if debug {
329 debug!(log, "Starting actor system: System[{}]", name);
330 }
331
332 let prov = Provider::new(log.clone());
333 let timer = BasicTimer::start(&cfg);
334
335 let proto = ProtoSystem {
337 id: Uuid::new_v4(),
338 name: name.to_string(),
339 host: Arc::from("localhost"),
340 config: cfg.clone(),
341 sys_settings: SystemSettings::from(&cfg),
342 started_at: Utc::now(),
343 };
344
345 let mut sys = ActorSystem {
347 proto: Arc::new(proto),
348 debug,
349 exec,
350 log,
351 timer,
353 sys_channels: None,
354 sys_actors: None,
355 provider: prov.clone(),
356 };
357
358 let sys_actors = create_root(&sys);
360 sys.sys_actors = Some(sys_actors);
361
362 sys.sys_channels = Some(sys_channels(&prov, &sys)?);
364
365 let _dl_logger = sys_actor_of_args::<DeadLetterLogger, _>(
367 &prov,
368 &sys,
369 "dl_logger",
370 (sys.dead_letters().clone(), sys.log()),
371 )?;
372
373 sys.complete_start();
374
375 debug!(sys.log, "Actor system [{}] [{}] started", sys.id(), name);
376
377 Ok(sys)
378 }
379
380 fn complete_start(&self) {
381 self.sys_actors.as_ref().unwrap().user.sys_init(self);
382 }
383
384 pub fn start_date(&self) -> &DateTime<Utc> {
386 &self.proto.started_at
387 }
388
389 pub fn uptime(&self) -> u64 {
391 let now = Utc::now();
392 now.time()
393 .signed_duration_since(self.start_date().time())
394 .num_seconds() as u64
395 }
396
397 pub fn host(&self) -> Arc<str> {
403 self.proto.host.clone()
404 }
405
406 pub fn id(&self) -> Uuid {
408 self.proto.id
409 }
410
411 pub fn name(&self) -> String {
413 self.proto.name.clone()
414 }
415
416 pub fn print_tree(&self) {
417 fn print_node(sys: &ActorSystem, node: &BasicActorRef, indent: &str) {
418 if node.is_root() {
419 println!("{}", sys.name());
420
421 for actor in node.children() {
422 print_node(sys, &actor, "");
423 }
424 } else {
425 println!("{}└─ {}", indent, node.name());
426
427 for actor in node.children() {
428 print_node(sys, &actor, &(indent.to_string() + " "));
429 }
430 }
431 }
432
433 let root = &self.sys_actors.as_ref().unwrap().root;
434 print_node(self, &root, "");
435 }
436
437 #[allow(dead_code)]
439 fn root(&self) -> &BasicActorRef {
440 &self.sys_actors.as_ref().unwrap().root
441 }
442
443 pub fn user_root(&self) -> &BasicActorRef {
445 &self.sys_actors.as_ref().unwrap().user
446 }
447
448 pub fn sys_root(&self) -> &BasicActorRef {
450 &self.sys_actors.as_ref().unwrap().sysm
451 }
452
453 pub fn temp_root(&self) -> &BasicActorRef {
455 &self.sys_actors.as_ref().unwrap().temp
456 }
457
458 pub fn sys_events(&self) -> &ActorRef<ChannelMsg<SystemEvent>> {
460 &self.sys_channels.as_ref().unwrap().sys_events
461 }
462
463 pub fn dead_letters(&self) -> &ActorRef<DLChannelMsg> {
465 &self.sys_channels.as_ref().unwrap().dead_letters
466 }
467
468 pub fn publish_event(&self, evt: SystemEvent) {
469 let topic = Topic::from(&evt);
470 self.sys_events().tell(Publish { topic, msg: evt }, None);
471 }
472
473 pub fn config(&self) -> &Config {
475 &self.proto.config
476 }
477
478 pub(crate) fn sys_settings(&self) -> &SystemSettings {
479 &self.proto.sys_settings
480 }
481
482 pub fn sys_actor_of_props<A>(
484 &self,
485 name: &str,
486 props: BoxActorProd<A>,
487 ) -> Result<ActorRef<A::Msg>, CreateError>
488 where
489 A: Actor,
490 {
491 self.provider
492 .create_actor(props, name, &self.sys_root(), self)
493 }
494
495 pub fn sys_actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
496 where
497 A: ActorFactory,
498 {
499 self.provider
500 .create_actor(Props::new::<A>(), name, &self.sys_root(), self)
501 }
502
503 pub fn sys_actor_of_args<A, Args>(
504 &self,
505 name: &str,
506 args: Args,
507 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
508 where
509 Args: ActorArgs,
510 A: ActorFactoryArgs<Args>,
511 {
512 self.provider
513 .create_actor(Props::new_args::<A, _>(args), name, &self.sys_root(), self)
514 }
515
516 #[inline]
517 pub fn log(&self) -> LoggingSystem {
518 self.log.clone()
519 }
520
521 pub fn shutdown(&self) -> Shutdown {
529 let (tx, rx) = oneshot::channel::<()>();
530 let tx = Arc::new(Mutex::new(Some(tx)));
531
532 self.tmp_actor_of_args::<ShutdownActor, _>(tx).unwrap();
533
534 rx
535 }
536}
537
538unsafe impl Send for ActorSystem {}
539unsafe impl Sync for ActorSystem {}
540
541impl ActorRefFactory for ActorSystem {
542 fn actor_of_props<A>(
543 &self,
544 name: &str,
545 props: BoxActorProd<A>,
546 ) -> Result<ActorRef<A::Msg>, CreateError>
547 where
548 A: Actor,
549 {
550 self.provider
551 .create_actor(props, name, &self.user_root(), self)
552 }
553
554 fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
555 where
556 A: ActorFactory,
557 {
558 self.provider
559 .create_actor(Props::new::<A>(), name, &self.user_root(), self)
560 }
561
562 fn actor_of_args<A, Args>(
563 &self,
564 name: &str,
565 args: Args,
566 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
567 where
568 Args: ActorArgs,
569 A: ActorFactoryArgs<Args>,
570 {
571 self.provider
572 .create_actor(Props::new_args::<A, _>(args), name, &self.user_root(), self)
573 }
574
575 fn stop(&self, actor: impl ActorReference) {
576 actor.sys_tell(SystemCmd::Stop.into());
577 }
578}
579
580impl ActorRefFactory for &ActorSystem {
581 fn actor_of_props<A>(
582 &self,
583 name: &str,
584 props: BoxActorProd<A>,
585 ) -> Result<ActorRef<A::Msg>, CreateError>
586 where
587 A: Actor,
588 {
589 self.provider
590 .create_actor(props, name, &self.user_root(), self)
591 }
592
593 fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
594 where
595 A: ActorFactory,
596 {
597 self.provider
598 .create_actor(Props::new::<A>(), name, &self.user_root(), self)
599 }
600
601 fn actor_of_args<A, Args>(
602 &self,
603 name: &str,
604 args: Args,
605 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
606 where
607 Args: ActorArgs,
608 A: ActorFactoryArgs<Args>,
609 {
610 self.provider
611 .create_actor(Props::new_args::<A, _>(args), name, &self.user_root(), self)
612 }
613
614 fn stop(&self, actor: impl ActorReference) {
615 actor.sys_tell(SystemCmd::Stop.into());
616 }
617}
618
619impl TmpActorRefFactory for ActorSystem {
620 fn tmp_actor_of_props<A>(&self, props: BoxActorProd<A>) -> Result<ActorRef<A::Msg>, CreateError>
621 where
622 A: Actor,
623 {
624 let name = format!("{}", rand::random::<u64>());
625 self.provider
626 .create_actor(props, &name, &self.temp_root(), self)
627 }
628
629 fn tmp_actor_of<A>(&self) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
630 where
631 A: ActorFactory,
632 {
633 let name = format!("{}", rand::random::<u64>());
634 self.provider
635 .create_actor(Props::new::<A>(), &name, &self.temp_root(), self)
636 }
637
638 fn tmp_actor_of_args<A, Args>(
639 &self,
640 args: Args,
641 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
642 where
643 Args: ActorArgs,
644 A: ActorFactoryArgs<Args>,
645 {
646 let name = format!("{}", rand::random::<u64>());
647 self.provider.create_actor(
648 Props::new_args::<A, _>(args),
649 &name,
650 &self.temp_root(),
651 self,
652 )
653 }
654}
655
656impl ActorSelectionFactory for ActorSystem {
657 fn select(&self, path: &str) -> Result<ActorSelection, InvalidPath> {
658 let anchor = self.user_root();
659 let (anchor, path_str) = if path.starts_with('/') {
660 let anchor = self.user_root();
661 let anchor_path = format!("{}/", anchor.path());
662 let path = path.to_string().replace(&anchor_path, "");
663
664 (anchor, path)
665 } else {
666 (anchor, path.to_string())
667 };
668
669 ActorSelection::new(
670 anchor.clone(),
671 path_str,
673 )
674 }
675}
676
677pub trait Run {
680 fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
681 where
682 Fut: Future + Send + 'static,
683 <Fut as Future>::Output: Send;
684}
685
686impl Run for ActorSystem {
687 fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
688 where
689 Fut: Future + Send + 'static,
690 <Fut as Future>::Output: Send,
691 {
692 self.exec.spawn_with_handle(future)
693 }
694}
695
696impl fmt::Debug for ActorSystem {
697 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
698 write!(
699 f,
700 "ActorSystem[Name: {}, Start Time: {}, Uptime: {} seconds]",
701 self.name(),
702 self.start_date(),
703 self.uptime()
704 )
705 }
706}
707
708impl Timer for ActorSystem {
709 fn schedule<T, M>(
710 &self,
711 initial_delay: Duration,
712 interval: Duration,
713 receiver: ActorRef<M>,
714 sender: Sender,
715 msg: T,
716 ) -> ScheduleId
717 where
718 T: Message + Into<M>,
719 M: Message,
720 {
721 let id = Uuid::new_v4();
722 let msg: M = msg.into();
723
724 let job = RepeatJob {
725 id,
726 send_at: Instant::now() + initial_delay,
727 interval,
728 receiver: receiver.into(),
729 sender,
730 msg: AnyMessage::new(msg, false),
731 };
732
733 let _ = self.timer.send(Job::Repeat(job));
734 id
735 }
736
737 fn schedule_once<T, M>(
738 &self,
739 delay: Duration,
740 receiver: ActorRef<M>,
741 sender: Sender,
742 msg: T,
743 ) -> ScheduleId
744 where
745 T: Message + Into<M>,
746 M: Message,
747 {
748 let id = Uuid::new_v4();
749 let msg: M = msg.into();
750
751 let job = OnceJob {
752 id,
753 send_at: Instant::now() + delay,
754 receiver: receiver.into(),
755 sender,
756 msg: AnyMessage::new(msg, true),
757 };
758
759 let _ = self.timer.send(Job::Once(job));
760 id
761 }
762
763 fn schedule_at_time<T, M>(
764 &self,
765 time: DateTime<Utc>,
766 receiver: ActorRef<M>,
767 sender: Sender,
768 msg: T,
769 ) -> ScheduleId
770 where
771 T: Message + Into<M>,
772 M: Message,
773 {
774 let delay = std::cmp::max(time.timestamp() - Utc::now().timestamp(), 0 as i64);
775 let delay = Duration::from_secs(delay as u64);
776
777 let id = Uuid::new_v4();
778 let msg: M = msg.into();
779
780 let job = OnceJob {
781 id,
782 send_at: Instant::now() + delay,
783 receiver: receiver.into(),
784 sender,
785 msg: AnyMessage::new(msg, true),
786 };
787
788 let _ = self.timer.send(Job::Once(job));
789 id
790 }
791
792 fn cancel_schedule(&self, id: Uuid) {
793 let _ = self.timer.send(Job::Cancel(id));
794 }
795}
796
797#[allow(unused)]
799fn sys_actor_of_props<A>(
800 prov: &Provider,
801 sys: &ActorSystem,
802 name: &str,
803 props: BoxActorProd<A>,
804) -> Result<ActorRef<A::Msg>, SystemError>
805where
806 A: Actor,
807{
808 prov.create_actor(props, name, &sys.sys_root(), sys)
809 .map_err(|_| SystemError::ModuleFailed(name.into()))
810}
811
812fn sys_actor_of<A>(
813 prov: &Provider,
814 sys: &ActorSystem,
815 name: &str,
816) -> Result<ActorRef<<A as Actor>::Msg>, SystemError>
817where
818 A: ActorFactory,
819{
820 prov.create_actor(Props::new::<A>(), name, &sys.sys_root(), sys)
821 .map_err(|_| SystemError::ModuleFailed(name.into()))
822}
823
824#[allow(dead_code)]
825fn sys_actor_of_args<A, Args>(
826 prov: &Provider,
827 sys: &ActorSystem,
828 name: &str,
829 args: Args,
830) -> Result<ActorRef<<A as Actor>::Msg>, SystemError>
831where
832 Args: ActorArgs,
833 A: ActorFactoryArgs<Args>,
834{
835 prov.create_actor(Props::new_args::<A, _>(args), name, &sys.sys_root(), sys)
836 .map_err(|_| SystemError::ModuleFailed(name.into()))
837}
838
839fn sys_channels(prov: &Provider, sys: &ActorSystem) -> Result<SysChannels, SystemError> {
840 let sys_events = sys_actor_of::<EventsChannel>(prov, sys, "sys_events")?;
841 let dead_letters = sys_actor_of::<Channel<DeadLetter>>(prov, sys, "dead_letters")?;
842
843 Ok(SysChannels {
850 sys_events,
851 dead_letters,
852 })
853}
854
855pub struct SystemSettings {
856 pub msg_process_limit: u32,
857}
858
859impl<'a> From<&'a Config> for SystemSettings {
860 fn from(config: &Config) -> Self {
861 SystemSettings {
862 msg_process_limit: config.get_int("mailbox.msg_process_limit").unwrap() as u32,
863 }
864 }
865}
866
867struct ThreadPoolConfig {
868 pool_size: usize,
869 stack_size: usize,
870}
871
872impl<'a> From<&'a Config> for ThreadPoolConfig {
873 fn from(config: &Config) -> Self {
874 ThreadPoolConfig {
875 pool_size: config.get_int("dispatcher.pool_size").unwrap() as usize,
876 stack_size: config.get_int("dispatcher.stack_size").unwrap() as usize,
877 }
878 }
879}
880
881fn default_exec(cfg: &Config) -> ThreadPool {
882 let exec_cfg = ThreadPoolConfig::from(cfg);
883 ThreadPoolBuilder::new()
884 .pool_size(exec_cfg.pool_size)
885 .stack_size(exec_cfg.stack_size)
886 .name_prefix("pool-thread-#")
887 .create()
888 .unwrap()
889}
890
891#[derive(Clone)]
892pub struct SysActors {
893 pub root: BasicActorRef,
894 pub user: BasicActorRef,
895 pub sysm: BasicActorRef,
896 pub temp: BasicActorRef,
897}
898
899#[derive(Clone)]
900pub struct SysChannels {
901 pub sys_events: ActorRef<ChannelMsg<SystemEvent>>,
902 pub dead_letters: ActorRef<DLChannelMsg>,
903}
904
905pub type Shutdown = oneshot::Receiver<()>;
906
907#[derive(Clone)]
908struct ShutdownActor {
909 tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
910}
911
912impl ActorFactoryArgs<Arc<Mutex<Option<oneshot::Sender<()>>>>> for ShutdownActor {
913 fn create_args(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
914 ShutdownActor::new(tx)
915 }
916}
917
918impl ShutdownActor {
919 fn new(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
920 ShutdownActor { tx }
921 }
922}
923
924impl Actor for ShutdownActor {
925 type Msg = SystemEvent;
926
927 fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
928 let sub = Subscribe {
929 topic: SysTopic::ActorTerminated.into(),
930 actor: Box::new(ctx.myself.clone()),
931 };
932 ctx.system.sys_events().tell(sub, None);
933
934 ctx.system.stop(ctx.system.user_root());
943 }
944
945 fn sys_recv(
946 &mut self,
947 ctx: &Context<Self::Msg>,
948 msg: SystemMsg,
949 sender: Option<BasicActorRef>,
950 ) {
951 if let SystemMsg::Event(evt) = msg {
952 if let SystemEvent::ActorTerminated(terminated) = evt {
953 self.receive(ctx, terminated, sender);
954 }
955 }
956 }
957
958 fn recv(&mut self, _: &Context<Self::Msg>, _: Self::Msg, _: Option<BasicActorRef>) {}
959}
960
961impl Receive<ActorTerminated> for ShutdownActor {
962 type Msg = SystemEvent;
963
964 fn receive(
965 &mut self,
966 ctx: &Context<Self::Msg>,
967 msg: ActorTerminated,
968 _sender: Option<BasicActorRef>,
969 ) {
970 if &msg.actor == ctx.system.user_root() {
971 if let Ok(ref mut tx) = self.tx.lock() {
972 if let Some(tx) = tx.take() {
973 tx.send(()).unwrap();
974 }
975 }
976 }
977 }
978}