Skip to main content

riker/
system.rs

1pub(crate) mod logger;
2pub(crate) mod timer;
3
4use std::{error::Error, fmt};
5
6use crate::actor::BasicActorRef;
7
8// Public riker::system API (plus the pub data types in this file)
9pub use self::timer::{BasicTimer, ScheduleId, Timer};
10
11#[derive(Clone, Debug)]
12pub enum SystemMsg {
13    ActorInit,
14    Command(SystemCmd),
15    Event(SystemEvent),
16    Failed(BasicActorRef),
17}
18
19unsafe impl Send for SystemMsg {}
20
21#[derive(Clone, Debug)]
22pub enum SystemCmd {
23    Stop,
24    Restart,
25}
26
27impl Into<SystemMsg> for SystemCmd {
28    fn into(self) -> SystemMsg {
29        SystemMsg::Command(self)
30    }
31}
32
33#[derive(Clone, Debug)]
34pub enum SystemEvent {
35    /// An actor was terminated
36    ActorCreated(ActorCreated),
37
38    /// An actor was restarted
39    ActorRestarted(ActorRestarted),
40
41    /// An actor was started
42    ActorTerminated(ActorTerminated),
43}
44
45impl Into<SystemMsg> for SystemEvent {
46    fn into(self) -> SystemMsg {
47        SystemMsg::Event(self)
48    }
49}
50
51#[derive(Clone, Debug)]
52pub struct ActorCreated {
53    pub actor: BasicActorRef,
54}
55
56#[derive(Clone, Debug)]
57pub struct ActorRestarted {
58    pub actor: BasicActorRef,
59}
60
61#[derive(Clone, Debug)]
62pub struct ActorTerminated {
63    pub actor: BasicActorRef,
64}
65
66impl Into<SystemEvent> for ActorCreated {
67    fn into(self) -> SystemEvent {
68        SystemEvent::ActorCreated(self)
69    }
70}
71
72impl Into<SystemEvent> for ActorRestarted {
73    fn into(self) -> SystemEvent {
74        SystemEvent::ActorRestarted(self)
75    }
76}
77
78impl Into<SystemEvent> for ActorTerminated {
79    fn into(self) -> SystemEvent {
80        SystemEvent::ActorTerminated(self)
81    }
82}
83
84impl Into<SystemMsg> for ActorCreated {
85    fn into(self) -> SystemMsg {
86        SystemMsg::Event(SystemEvent::ActorCreated(self))
87    }
88}
89
90impl Into<SystemMsg> for ActorRestarted {
91    fn into(self) -> SystemMsg {
92        SystemMsg::Event(SystemEvent::ActorRestarted(self))
93    }
94}
95
96impl Into<SystemMsg> for ActorTerminated {
97    fn into(self) -> SystemMsg {
98        SystemMsg::Event(SystemEvent::ActorTerminated(self))
99    }
100}
101
102#[derive(Clone, Debug)]
103pub enum SystemEventType {
104    ActorTerminated,
105    ActorRestarted,
106    ActorCreated,
107}
108
109pub enum SystemError {
110    ModuleFailed(String),
111    InvalidName(String),
112}
113
114impl Error for SystemError {
115    fn description(&self) -> &str {
116        match *self {
117            SystemError::ModuleFailed(_) => {
118                "Failed to create actor system. Cause: Sub module failed to start"
119            }
120            SystemError::InvalidName(_) => {
121                "Failed to create actor system. Cause: Invalid actor system name"
122            }
123        }
124    }
125}
126
127impl fmt::Display for SystemError {
128    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129        match *self {
130            SystemError::ModuleFailed(ref m) => {
131                f.write_str(&format!("{} ({})", self.to_string(), m))
132            }
133            SystemError::InvalidName(ref name) => {
134                f.write_str(&format!("{} ({})", self.to_string(), name))
135            }
136        }
137    }
138}
139
140impl fmt::Debug for SystemError {
141    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142        f.write_str(&self.to_string())
143    }
144}
145use std::{
146    ops::Deref,
147    sync::{Arc, Mutex},
148    time::{Duration, Instant},
149};
150
151use chrono::prelude::*;
152use config::Config;
153use futures::{
154    channel::oneshot,
155    executor::{ThreadPool, ThreadPoolBuilder},
156    future::RemoteHandle,
157    task::{SpawnError, SpawnExt},
158    Future,
159};
160
161use uuid::Uuid;
162
163use crate::{
164    actor::{props::ActorFactory, *},
165    kernel::provider::{create_root, Provider},
166    load_config,
167    system::logger::*,
168    system::timer::*,
169    validate::{validate_name, InvalidPath},
170    AnyMessage, Message,
171};
172use slog::{debug, Logger};
173
174// 0. error results on any
175// 1. visibility
176
177pub struct ProtoSystem {
178    id: Uuid,
179    name: String,
180    pub host: Arc<str>,
181    config: Config,
182    pub(crate) sys_settings: SystemSettings,
183    started_at: DateTime<Utc>,
184}
185
186#[derive(Default)]
187pub struct SystemBuilder {
188    name: Option<String>,
189    cfg: Option<Config>,
190    log: Option<Logger>,
191    exec: Option<ThreadPool>,
192}
193
194impl SystemBuilder {
195    pub fn new() -> Self {
196        SystemBuilder::default()
197    }
198
199    pub fn create(self) -> Result<ActorSystem, SystemError> {
200        let name = self.name.unwrap_or_else(|| "riker".to_string());
201        let cfg = self.cfg.unwrap_or_else(load_config);
202        let exec = self.exec.unwrap_or_else(|| default_exec(&cfg));
203        let log = self
204            .log
205            .map(|log| LoggingSystem::new(log, None))
206            .unwrap_or_else(|| default_log(&cfg));
207
208        ActorSystem::create(name.as_ref(), exec, log, cfg)
209    }
210
211    pub fn name(self, name: &str) -> Self {
212        SystemBuilder {
213            name: Some(name.to_string()),
214            ..self
215        }
216    }
217
218    pub fn cfg(self, cfg: Config) -> Self {
219        SystemBuilder {
220            cfg: Some(cfg),
221            ..self
222        }
223    }
224
225    pub fn exec(self, exec: ThreadPool) -> Self {
226        SystemBuilder {
227            exec: Some(exec),
228            ..self
229        }
230    }
231
232    pub fn log(self, log: Logger) -> Self {
233        SystemBuilder {
234            log: Some(log),
235            ..self
236        }
237    }
238}
239
240/// Holds fields related to logging system.
241#[derive(Clone)]
242pub struct LoggingSystem {
243    /// Logger
244    log: Logger,
245    /// Global logger guard
246    global_logger_guard: Option<GlobalLoggerGuard>,
247}
248
249impl LoggingSystem {
250    pub(crate) fn new(log: Logger, global_logger_guard: Option<GlobalLoggerGuard>) -> Self {
251        Self {
252            log,
253            global_logger_guard,
254        }
255    }
256}
257
258impl Deref for LoggingSystem {
259    type Target = Logger;
260
261    fn deref(&self) -> &Self::Target {
262        &self.log
263    }
264}
265
266/// The actor runtime and common services coordinator
267///
268/// The `ActorSystem` provides a runtime on which actors are executed.
269/// It also provides common services such as channels and scheduling.
270/// The `ActorSystem` is the heart of a Riker application,
271/// starting several threads when it is created. Create only one instance
272/// of `ActorSystem` per application.
273#[allow(dead_code)]
274#[derive(Clone)]
275pub struct ActorSystem {
276    proto: Arc<ProtoSystem>,
277    sys_actors: Option<SysActors>,
278    log: LoggingSystem,
279    debug: bool,
280    pub exec: ThreadPool,
281    pub timer: TimerRef,
282    pub sys_channels: Option<SysChannels>,
283    pub(crate) provider: Provider,
284}
285
286impl ActorSystem {
287    /// Create a new `ActorSystem` instance
288    ///
289    /// Requires a type that implements the `Model` trait.
290    pub fn new() -> Result<ActorSystem, SystemError> {
291        let cfg = load_config();
292        let exec = default_exec(&cfg);
293        let log = default_log(&cfg);
294
295        ActorSystem::create("riker", exec, log, cfg)
296    }
297
298    /// Create a new `ActorSystem` instance with provided name
299    ///
300    /// Requires a type that implements the `Model` trait.
301    pub fn with_name(name: &str) -> Result<ActorSystem, SystemError> {
302        let cfg = load_config();
303        let exec = default_exec(&cfg);
304        let log = default_log(&cfg);
305
306        ActorSystem::create(name, exec, log, cfg)
307    }
308
309    /// Create a new `ActorSystem` instance bypassing default config behavior
310    pub fn with_config(name: &str, cfg: Config) -> Result<ActorSystem, SystemError> {
311        let exec = default_exec(&cfg);
312        let log = default_log(&cfg);
313
314        ActorSystem::create(name, exec, log, cfg)
315    }
316
317    fn create(
318        name: &str,
319        exec: ThreadPool,
320        log: LoggingSystem,
321        cfg: Config,
322    ) -> Result<ActorSystem, SystemError> {
323        validate_name(name).map_err(|_| SystemError::InvalidName(name.into()))?;
324        // Process Configuration
325        let debug = cfg.get_bool("debug").unwrap();
326
327        // Until the logger has started, use println
328        if debug {
329            debug!(log, "Starting actor system: System[{}]", name);
330        }
331
332        let prov = Provider::new(log.clone());
333        let timer = BasicTimer::start(&cfg);
334
335        // 1. create proto system
336        let proto = ProtoSystem {
337            id: Uuid::new_v4(),
338            name: name.to_string(),
339            host: Arc::from("localhost"),
340            config: cfg.clone(),
341            sys_settings: SystemSettings::from(&cfg),
342            started_at: Utc::now(),
343        };
344
345        // 2. create uninitialized system
346        let mut sys = ActorSystem {
347            proto: Arc::new(proto),
348            debug,
349            exec,
350            log,
351            // event_store: None,
352            timer,
353            sys_channels: None,
354            sys_actors: None,
355            provider: prov.clone(),
356        };
357
358        // 3. create initial actor hierarchy
359        let sys_actors = create_root(&sys);
360        sys.sys_actors = Some(sys_actors);
361
362        // 4. start system channels
363        sys.sys_channels = Some(sys_channels(&prov, &sys)?);
364
365        // 5. start dead letter logger
366        let _dl_logger = sys_actor_of_args::<DeadLetterLogger, _>(
367            &prov,
368            &sys,
369            "dl_logger",
370            (sys.dead_letters().clone(), sys.log()),
371        )?;
372
373        sys.complete_start();
374
375        debug!(sys.log, "Actor system [{}] [{}] started", sys.id(), name);
376
377        Ok(sys)
378    }
379
380    fn complete_start(&self) {
381        self.sys_actors.as_ref().unwrap().user.sys_init(self);
382    }
383
384    /// Returns the system start date
385    pub fn start_date(&self) -> &DateTime<Utc> {
386        &self.proto.started_at
387    }
388
389    /// Returns the number of seconds since the system started
390    pub fn uptime(&self) -> u64 {
391        let now = Utc::now();
392        now.time()
393            .signed_duration_since(self.start_date().time())
394            .num_seconds() as u64
395    }
396
397    /// Returns the hostname used when the system started
398    ///
399    /// The host is used in actor addressing.
400    ///
401    /// Currently not used, but will be once system clustering is introduced.
402    pub fn host(&self) -> Arc<str> {
403        self.proto.host.clone()
404    }
405
406    /// Returns the UUID assigned to the system
407    pub fn id(&self) -> Uuid {
408        self.proto.id
409    }
410
411    /// Returns the name of the system
412    pub fn name(&self) -> String {
413        self.proto.name.clone()
414    }
415
416    pub fn print_tree(&self) {
417        fn print_node(sys: &ActorSystem, node: &BasicActorRef, indent: &str) {
418            if node.is_root() {
419                println!("{}", sys.name());
420
421                for actor in node.children() {
422                    print_node(sys, &actor, "");
423                }
424            } else {
425                println!("{}└─ {}", indent, node.name());
426
427                for actor in node.children() {
428                    print_node(sys, &actor, &(indent.to_string() + "   "));
429                }
430            }
431        }
432
433        let root = &self.sys_actors.as_ref().unwrap().root;
434        print_node(self, &root, "");
435    }
436
437    /// Returns the system root's actor reference
438    #[allow(dead_code)]
439    fn root(&self) -> &BasicActorRef {
440        &self.sys_actors.as_ref().unwrap().root
441    }
442
443    /// Returns the user root actor reference
444    pub fn user_root(&self) -> &BasicActorRef {
445        &self.sys_actors.as_ref().unwrap().user
446    }
447
448    /// Returns the system root actor reference
449    pub fn sys_root(&self) -> &BasicActorRef {
450        &self.sys_actors.as_ref().unwrap().sysm
451    }
452
453    /// Reutrns the temp root actor reference
454    pub fn temp_root(&self) -> &BasicActorRef {
455        &self.sys_actors.as_ref().unwrap().temp
456    }
457
458    /// Returns a reference to the system events channel
459    pub fn sys_events(&self) -> &ActorRef<ChannelMsg<SystemEvent>> {
460        &self.sys_channels.as_ref().unwrap().sys_events
461    }
462
463    /// Returns a reference to the dead letters channel
464    pub fn dead_letters(&self) -> &ActorRef<DLChannelMsg> {
465        &self.sys_channels.as_ref().unwrap().dead_letters
466    }
467
468    pub fn publish_event(&self, evt: SystemEvent) {
469        let topic = Topic::from(&evt);
470        self.sys_events().tell(Publish { topic, msg: evt }, None);
471    }
472
473    /// Returns the `Config` used by the system
474    pub fn config(&self) -> &Config {
475        &self.proto.config
476    }
477
478    pub(crate) fn sys_settings(&self) -> &SystemSettings {
479        &self.proto.sys_settings
480    }
481
482    /// Create an actor under the system root
483    pub fn sys_actor_of_props<A>(
484        &self,
485        name: &str,
486        props: BoxActorProd<A>,
487    ) -> Result<ActorRef<A::Msg>, CreateError>
488    where
489        A: Actor,
490    {
491        self.provider
492            .create_actor(props, name, &self.sys_root(), self)
493    }
494
495    pub fn sys_actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
496    where
497        A: ActorFactory,
498    {
499        self.provider
500            .create_actor(Props::new::<A>(), name, &self.sys_root(), self)
501    }
502
503    pub fn sys_actor_of_args<A, Args>(
504        &self,
505        name: &str,
506        args: Args,
507    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
508    where
509        Args: ActorArgs,
510        A: ActorFactoryArgs<Args>,
511    {
512        self.provider
513            .create_actor(Props::new_args::<A, _>(args), name, &self.sys_root(), self)
514    }
515
516    #[inline]
517    pub fn log(&self) -> LoggingSystem {
518        self.log.clone()
519    }
520
521    /// Shutdown the actor system
522    ///
523    /// Attempts a graceful shutdown of the system and all actors.
524    /// Actors will receive a stop message, executing `actor.post_stop`.
525    ///
526    /// Does not block. Returns a future which is completed when all
527    /// actors have successfully stopped.
528    pub fn shutdown(&self) -> Shutdown {
529        let (tx, rx) = oneshot::channel::<()>();
530        let tx = Arc::new(Mutex::new(Some(tx)));
531
532        self.tmp_actor_of_args::<ShutdownActor, _>(tx).unwrap();
533
534        rx
535    }
536}
537
538unsafe impl Send for ActorSystem {}
539unsafe impl Sync for ActorSystem {}
540
541impl ActorRefFactory for ActorSystem {
542    fn actor_of_props<A>(
543        &self,
544        name: &str,
545        props: BoxActorProd<A>,
546    ) -> Result<ActorRef<A::Msg>, CreateError>
547    where
548        A: Actor,
549    {
550        self.provider
551            .create_actor(props, name, &self.user_root(), self)
552    }
553
554    fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
555    where
556        A: ActorFactory,
557    {
558        self.provider
559            .create_actor(Props::new::<A>(), name, &self.user_root(), self)
560    }
561
562    fn actor_of_args<A, Args>(
563        &self,
564        name: &str,
565        args: Args,
566    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
567    where
568        Args: ActorArgs,
569        A: ActorFactoryArgs<Args>,
570    {
571        self.provider
572            .create_actor(Props::new_args::<A, _>(args), name, &self.user_root(), self)
573    }
574
575    fn stop(&self, actor: impl ActorReference) {
576        actor.sys_tell(SystemCmd::Stop.into());
577    }
578}
579
580impl ActorRefFactory for &ActorSystem {
581    fn actor_of_props<A>(
582        &self,
583        name: &str,
584        props: BoxActorProd<A>,
585    ) -> Result<ActorRef<A::Msg>, CreateError>
586    where
587        A: Actor,
588    {
589        self.provider
590            .create_actor(props, name, &self.user_root(), self)
591    }
592
593    fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
594    where
595        A: ActorFactory,
596    {
597        self.provider
598            .create_actor(Props::new::<A>(), name, &self.user_root(), self)
599    }
600
601    fn actor_of_args<A, Args>(
602        &self,
603        name: &str,
604        args: Args,
605    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
606    where
607        Args: ActorArgs,
608        A: ActorFactoryArgs<Args>,
609    {
610        self.provider
611            .create_actor(Props::new_args::<A, _>(args), name, &self.user_root(), self)
612    }
613
614    fn stop(&self, actor: impl ActorReference) {
615        actor.sys_tell(SystemCmd::Stop.into());
616    }
617}
618
619impl TmpActorRefFactory for ActorSystem {
620    fn tmp_actor_of_props<A>(&self, props: BoxActorProd<A>) -> Result<ActorRef<A::Msg>, CreateError>
621    where
622        A: Actor,
623    {
624        let name = format!("{}", rand::random::<u64>());
625        self.provider
626            .create_actor(props, &name, &self.temp_root(), self)
627    }
628
629    fn tmp_actor_of<A>(&self) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
630    where
631        A: ActorFactory,
632    {
633        let name = format!("{}", rand::random::<u64>());
634        self.provider
635            .create_actor(Props::new::<A>(), &name, &self.temp_root(), self)
636    }
637
638    fn tmp_actor_of_args<A, Args>(
639        &self,
640        args: Args,
641    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
642    where
643        Args: ActorArgs,
644        A: ActorFactoryArgs<Args>,
645    {
646        let name = format!("{}", rand::random::<u64>());
647        self.provider.create_actor(
648            Props::new_args::<A, _>(args),
649            &name,
650            &self.temp_root(),
651            self,
652        )
653    }
654}
655
656impl ActorSelectionFactory for ActorSystem {
657    fn select(&self, path: &str) -> Result<ActorSelection, InvalidPath> {
658        let anchor = self.user_root();
659        let (anchor, path_str) = if path.starts_with('/') {
660            let anchor = self.user_root();
661            let anchor_path = format!("{}/", anchor.path());
662            let path = path.to_string().replace(&anchor_path, "");
663
664            (anchor, path)
665        } else {
666            (anchor, path.to_string())
667        };
668
669        ActorSelection::new(
670            anchor.clone(),
671            // self.dead_letters(),
672            path_str,
673        )
674    }
675}
676
677// futures::task::Spawn::spawn requires &mut self so
678// we'll create a wrapper trait that requires only &self.
679pub trait Run {
680    fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
681    where
682        Fut: Future + Send + 'static,
683        <Fut as Future>::Output: Send;
684}
685
686impl Run for ActorSystem {
687    fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
688    where
689        Fut: Future + Send + 'static,
690        <Fut as Future>::Output: Send,
691    {
692        self.exec.spawn_with_handle(future)
693    }
694}
695
696impl fmt::Debug for ActorSystem {
697    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
698        write!(
699            f,
700            "ActorSystem[Name: {}, Start Time: {}, Uptime: {} seconds]",
701            self.name(),
702            self.start_date(),
703            self.uptime()
704        )
705    }
706}
707
708impl Timer for ActorSystem {
709    fn schedule<T, M>(
710        &self,
711        initial_delay: Duration,
712        interval: Duration,
713        receiver: ActorRef<M>,
714        sender: Sender,
715        msg: T,
716    ) -> ScheduleId
717    where
718        T: Message + Into<M>,
719        M: Message,
720    {
721        let id = Uuid::new_v4();
722        let msg: M = msg.into();
723
724        let job = RepeatJob {
725            id,
726            send_at: Instant::now() + initial_delay,
727            interval,
728            receiver: receiver.into(),
729            sender,
730            msg: AnyMessage::new(msg, false),
731        };
732
733        let _ = self.timer.send(Job::Repeat(job));
734        id
735    }
736
737    fn schedule_once<T, M>(
738        &self,
739        delay: Duration,
740        receiver: ActorRef<M>,
741        sender: Sender,
742        msg: T,
743    ) -> ScheduleId
744    where
745        T: Message + Into<M>,
746        M: Message,
747    {
748        let id = Uuid::new_v4();
749        let msg: M = msg.into();
750
751        let job = OnceJob {
752            id,
753            send_at: Instant::now() + delay,
754            receiver: receiver.into(),
755            sender,
756            msg: AnyMessage::new(msg, true),
757        };
758
759        let _ = self.timer.send(Job::Once(job));
760        id
761    }
762
763    fn schedule_at_time<T, M>(
764        &self,
765        time: DateTime<Utc>,
766        receiver: ActorRef<M>,
767        sender: Sender,
768        msg: T,
769    ) -> ScheduleId
770    where
771        T: Message + Into<M>,
772        M: Message,
773    {
774        let delay = std::cmp::max(time.timestamp() - Utc::now().timestamp(), 0 as i64);
775        let delay = Duration::from_secs(delay as u64);
776
777        let id = Uuid::new_v4();
778        let msg: M = msg.into();
779
780        let job = OnceJob {
781            id,
782            send_at: Instant::now() + delay,
783            receiver: receiver.into(),
784            sender,
785            msg: AnyMessage::new(msg, true),
786        };
787
788        let _ = self.timer.send(Job::Once(job));
789        id
790    }
791
792    fn cancel_schedule(&self, id: Uuid) {
793        let _ = self.timer.send(Job::Cancel(id));
794    }
795}
796
797// helper functions
798#[allow(unused)]
799fn sys_actor_of_props<A>(
800    prov: &Provider,
801    sys: &ActorSystem,
802    name: &str,
803    props: BoxActorProd<A>,
804) -> Result<ActorRef<A::Msg>, SystemError>
805where
806    A: Actor,
807{
808    prov.create_actor(props, name, &sys.sys_root(), sys)
809        .map_err(|_| SystemError::ModuleFailed(name.into()))
810}
811
812fn sys_actor_of<A>(
813    prov: &Provider,
814    sys: &ActorSystem,
815    name: &str,
816) -> Result<ActorRef<<A as Actor>::Msg>, SystemError>
817where
818    A: ActorFactory,
819{
820    prov.create_actor(Props::new::<A>(), name, &sys.sys_root(), sys)
821        .map_err(|_| SystemError::ModuleFailed(name.into()))
822}
823
824#[allow(dead_code)]
825fn sys_actor_of_args<A, Args>(
826    prov: &Provider,
827    sys: &ActorSystem,
828    name: &str,
829    args: Args,
830) -> Result<ActorRef<<A as Actor>::Msg>, SystemError>
831where
832    Args: ActorArgs,
833    A: ActorFactoryArgs<Args>,
834{
835    prov.create_actor(Props::new_args::<A, _>(args), name, &sys.sys_root(), sys)
836        .map_err(|_| SystemError::ModuleFailed(name.into()))
837}
838
839fn sys_channels(prov: &Provider, sys: &ActorSystem) -> Result<SysChannels, SystemError> {
840    let sys_events = sys_actor_of::<EventsChannel>(prov, sys, "sys_events")?;
841    let dead_letters = sys_actor_of::<Channel<DeadLetter>>(prov, sys, "dead_letters")?;
842
843    // subscribe the dead_letters channel to actor terminated events
844    // so that any future subscribed actors that terminate are automatically
845    // unsubscribed from the dead_letters channel
846    // let msg = ChannelMsg::Subscribe(SysTopic::ActorTerminated.into(), dl.clone());
847    // es.tell(msg, None);
848
849    Ok(SysChannels {
850        sys_events,
851        dead_letters,
852    })
853}
854
855pub struct SystemSettings {
856    pub msg_process_limit: u32,
857}
858
859impl<'a> From<&'a Config> for SystemSettings {
860    fn from(config: &Config) -> Self {
861        SystemSettings {
862            msg_process_limit: config.get_int("mailbox.msg_process_limit").unwrap() as u32,
863        }
864    }
865}
866
867struct ThreadPoolConfig {
868    pool_size: usize,
869    stack_size: usize,
870}
871
872impl<'a> From<&'a Config> for ThreadPoolConfig {
873    fn from(config: &Config) -> Self {
874        ThreadPoolConfig {
875            pool_size: config.get_int("dispatcher.pool_size").unwrap() as usize,
876            stack_size: config.get_int("dispatcher.stack_size").unwrap() as usize,
877        }
878    }
879}
880
881fn default_exec(cfg: &Config) -> ThreadPool {
882    let exec_cfg = ThreadPoolConfig::from(cfg);
883    ThreadPoolBuilder::new()
884        .pool_size(exec_cfg.pool_size)
885        .stack_size(exec_cfg.stack_size)
886        .name_prefix("pool-thread-#")
887        .create()
888        .unwrap()
889}
890
891#[derive(Clone)]
892pub struct SysActors {
893    pub root: BasicActorRef,
894    pub user: BasicActorRef,
895    pub sysm: BasicActorRef,
896    pub temp: BasicActorRef,
897}
898
899#[derive(Clone)]
900pub struct SysChannels {
901    pub sys_events: ActorRef<ChannelMsg<SystemEvent>>,
902    pub dead_letters: ActorRef<DLChannelMsg>,
903}
904
905pub type Shutdown = oneshot::Receiver<()>;
906
907#[derive(Clone)]
908struct ShutdownActor {
909    tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
910}
911
912impl ActorFactoryArgs<Arc<Mutex<Option<oneshot::Sender<()>>>>> for ShutdownActor {
913    fn create_args(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
914        ShutdownActor::new(tx)
915    }
916}
917
918impl ShutdownActor {
919    fn new(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
920        ShutdownActor { tx }
921    }
922}
923
924impl Actor for ShutdownActor {
925    type Msg = SystemEvent;
926
927    fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
928        let sub = Subscribe {
929            topic: SysTopic::ActorTerminated.into(),
930            actor: Box::new(ctx.myself.clone()),
931        };
932        ctx.system.sys_events().tell(sub, None);
933
934        // todo this is prone to failing since there is no
935        // confirmation that ShutdownActor has subscribed to
936        // the ActorTerminated events yet.
937        // It may be that the user root actor is Sterminated
938        // before the subscription is complete.
939
940        // std::thread::sleep_ms(1000);
941        // send stop to all /user children
942        ctx.system.stop(ctx.system.user_root());
943    }
944
945    fn sys_recv(
946        &mut self,
947        ctx: &Context<Self::Msg>,
948        msg: SystemMsg,
949        sender: Option<BasicActorRef>,
950    ) {
951        if let SystemMsg::Event(evt) = msg {
952            if let SystemEvent::ActorTerminated(terminated) = evt {
953                self.receive(ctx, terminated, sender);
954            }
955        }
956    }
957
958    fn recv(&mut self, _: &Context<Self::Msg>, _: Self::Msg, _: Option<BasicActorRef>) {}
959}
960
961impl Receive<ActorTerminated> for ShutdownActor {
962    type Msg = SystemEvent;
963
964    fn receive(
965        &mut self,
966        ctx: &Context<Self::Msg>,
967        msg: ActorTerminated,
968        _sender: Option<BasicActorRef>,
969    ) {
970        if &msg.actor == ctx.system.user_root() {
971            if let Ok(ref mut tx) = self.tx.lock() {
972                if let Some(tx) = tx.take() {
973                    tx.send(()).unwrap();
974                }
975            }
976        }
977    }
978}