actors_rs/system/
mod.rs

1pub(crate) mod logger;
2pub(crate) mod timer;
3
4// Public riker::system API (plus the pub data types in this file)
5pub use self::timer::{BasicTimer, Timer};
6
7use std::{
8    fmt,
9    ops::Deref,
10    sync::{Arc, Mutex},
11    time::{Duration, Instant},
12};
13
14use chrono::prelude::*;
15use config::Config;
16use futures::{
17    channel::oneshot,
18    executor::{ThreadPool, ThreadPoolBuilder},
19    future::RemoteHandle,
20    task::{SpawnError, SpawnExt},
21    Future,
22};
23use rand;
24use uuid::Uuid;
25
26use crate::actor_ref::{
27    ActorRef, ActorRefFactory, ActorReference, BasicActorRef, Sender, Tell, TmpActorRefFactory,
28};
29use crate::{
30    actor::{props::ActorFactory, *},
31    kernel::provider::{create_root, Provider},
32    load_config,
33    system::logger::*,
34    system::timer::*,
35    validate::{validate_name, InvalidPath},
36    AnyMessage, Message,
37};
38use slog::{debug, Logger};
39
40#[derive(Clone, Debug)]
41pub enum SystemMsg {
42    ActorInit,
43    Command(SystemCmd),
44    Event(SystemEvent),
45    Failed(BasicActorRef),
46}
47
48unsafe impl Send for SystemMsg {}
49
50#[derive(Clone, Debug)]
51pub enum SystemCmd {
52    Stop,
53    Restart,
54}
55
56impl Into<SystemMsg> for SystemCmd {
57    fn into(self) -> SystemMsg {
58        SystemMsg::Command(self)
59    }
60}
61
62#[derive(Clone, Debug)]
63pub enum SystemEvent {
64    /// An actor was terminated
65    ActorCreated(ActorCreated),
66
67    /// An actor was restarted
68    ActorRestarted(ActorRestarted),
69
70    /// An actor was started
71    ActorTerminated(ActorTerminated),
72}
73
74impl Into<SystemMsg> for SystemEvent {
75    fn into(self) -> SystemMsg {
76        SystemMsg::Event(self)
77    }
78}
79
80#[derive(Clone, Debug)]
81pub struct ActorCreated {
82    pub actor: BasicActorRef,
83}
84
85#[derive(Clone, Debug)]
86pub struct ActorRestarted {
87    pub actor: BasicActorRef,
88}
89
90#[derive(Clone, Debug)]
91pub struct ActorTerminated {
92    pub actor: BasicActorRef,
93}
94
95impl Into<SystemEvent> for ActorCreated {
96    fn into(self) -> SystemEvent {
97        SystemEvent::ActorCreated(self)
98    }
99}
100
101impl Into<SystemEvent> for ActorRestarted {
102    fn into(self) -> SystemEvent {
103        SystemEvent::ActorRestarted(self)
104    }
105}
106
107impl Into<SystemEvent> for ActorTerminated {
108    fn into(self) -> SystemEvent {
109        SystemEvent::ActorTerminated(self)
110    }
111}
112
113impl Into<SystemMsg> for ActorCreated {
114    fn into(self) -> SystemMsg {
115        SystemMsg::Event(SystemEvent::ActorCreated(self))
116    }
117}
118
119impl Into<SystemMsg> for ActorRestarted {
120    fn into(self) -> SystemMsg {
121        SystemMsg::Event(SystemEvent::ActorRestarted(self))
122    }
123}
124
125impl Into<SystemMsg> for ActorTerminated {
126    fn into(self) -> SystemMsg {
127        SystemMsg::Event(SystemEvent::ActorTerminated(self))
128    }
129}
130
131#[derive(Clone, Debug)]
132pub enum SystemEventType {
133    ActorTerminated,
134    ActorRestarted,
135    ActorCreated,
136}
137
138pub enum SystemError {
139    ModuleFailed(String),
140    InvalidName(String),
141}
142
143impl fmt::Display for SystemError {
144    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145        match *self {
146            Self::ModuleFailed(ref m) => f.write_str(&format!(
147                "Failed to create actor system. Cause: Sub module failed to start ({})",
148                m
149            )),
150            Self::InvalidName(ref name) => f.write_str(&format!(
151                "Failed to create actor system. Cause: Invalid actor system name ({})",
152                name
153            )),
154        }
155    }
156}
157
158impl fmt::Debug for SystemError {
159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160        f.write_str(self.to_string().as_str())
161    }
162}
163
164// 0. error results on any
165// 1. visibility
166
167pub struct ProtoSystem {
168    id: Uuid,
169    name: String,
170    pub host: Arc<String>,
171    config: Config,
172    pub(crate) sys_settings: SystemSettings,
173    started_at: DateTime<Utc>,
174}
175
176#[derive(Default)]
177pub struct SystemBuilder {
178    name: Option<String>,
179    cfg: Option<Config>,
180    log: Option<Logger>,
181    exec: Option<ThreadPool>,
182}
183
184impl SystemBuilder {
185    #[must_use]
186    pub fn new() -> Self {
187        Self::default()
188    }
189
190    pub fn create(self) -> Result<ActorSystem, SystemError> {
191        let name = self.name.unwrap_or_else(|| "riker".into());
192        let cfg = self.cfg.unwrap_or_else(load_config);
193        let exec = self.exec.unwrap_or_else(|| default_exec(&cfg));
194        let log = self.log.unwrap_or_else(|| default_log(&cfg));
195
196        ActorSystem::create(&name, exec, log, &cfg)
197    }
198
199    #[must_use]
200    pub fn name(self, name: &str) -> Self {
201        Self {
202            name: Some(name.to_string()),
203            ..self
204        }
205    }
206
207    #[must_use]
208    pub fn cfg(self, cfg: Config) -> Self {
209        Self {
210            cfg: Some(cfg),
211            ..self
212        }
213    }
214
215    #[must_use]
216    pub fn exec(self, exec: ThreadPool) -> Self {
217        Self {
218            exec: Some(exec),
219            ..self
220        }
221    }
222
223    #[must_use]
224    pub fn log(self, log: Logger) -> Self {
225        Self {
226            log: Some(log),
227            ..self
228        }
229    }
230}
231
232/// The actor runtime and common services coordinator
233///
234/// The `ActorSystem` provides a runtime on which actors are executed.
235/// It also provides common services such as channels, persistence
236/// and scheduling. The `ActorSystem` is the heart of a Riker application,
237/// starting serveral threads when it is created. Create only one instance
238/// of `ActorSystem` per application.
239#[derive(Clone)]
240pub struct ActorSystem {
241    proto: Arc<ProtoSystem>,
242    sys_actors: Option<SysActors>,
243    log: Logger,
244    debug: bool,
245    pub exec: ThreadPool,
246    pub timer: TimerRef,
247    pub sys_channels: Option<SysChannels>,
248    pub(crate) provider: Provider,
249}
250
251impl fmt::Display for ActorSystem {
252    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253        f.write_str(self.get_tree().as_str())
254    }
255}
256
257impl ActorSystem {
258    /// Create a new `ActorSystem` instance
259    ///
260    /// Requires a type that implements the `Model` trait.
261    pub fn new() -> Result<Self, SystemError> {
262        let cfg = load_config();
263        let exec = default_exec(&cfg);
264        let log = default_log(&cfg);
265
266        Self::create("riker", exec, log, &cfg)
267    }
268
269    /// Create a new `ActorSystem` instance with provided name
270    ///
271    /// Requires a type that implements the `Model` trait.
272    pub fn with_name(name: &str) -> Result<Self, SystemError> {
273        let cfg = load_config();
274        let exec = default_exec(&cfg);
275        let log = default_log(&cfg);
276
277        Self::create(name, exec, log, &cfg)
278    }
279
280    /// Create a new `ActorSystem` instance bypassing default config behavior
281    pub fn with_config(name: &str, cfg: &Config) -> Result<Self, SystemError> {
282        let exec = default_exec(cfg);
283        let log = default_log(cfg);
284
285        Self::create(name, exec, log, cfg)
286    }
287
288    fn create(
289        name: &str,
290        exec: ThreadPool,
291        log: Logger,
292        cfg: &Config,
293    ) -> Result<Self, SystemError> {
294        validate_name(name).map_err(|_| SystemError::InvalidName(name.into()))?;
295        // Process Configuration
296        let debug = cfg.get_bool("debug").unwrap();
297
298        // Until the logger has started, use println
299        if debug {
300            debug!(log, "Starting actor system: System[{}]", name);
301        }
302
303        let prov = Provider::new(log.clone());
304        let timer = BasicTimer::start(cfg);
305
306        // 1. create proto system
307        let proto = ProtoSystem {
308            id: Uuid::new_v4(),
309            name: name.to_string(),
310            host: Arc::new("localhost".to_string()),
311            config: cfg.clone(),
312            sys_settings: SystemSettings::from(cfg),
313            started_at: Utc::now(),
314        };
315
316        // 2. create uninitialized system
317        let mut sys = Self {
318            proto: Arc::new(proto),
319            debug,
320            exec,
321            log,
322            // event_store: None,
323            timer,
324            sys_channels: None,
325            sys_actors: None,
326            provider: prov.clone(),
327        };
328
329        // 3. create initial actor hierarchy
330        let sys_actors = create_root(&sys);
331        sys.sys_actors = Some(sys_actors);
332
333        // 4. start system channels
334        sys.sys_channels = Some(sys_channels(&prov, &sys)?);
335
336        // 5. start dead letter logger
337        let props = DeadLetterLogger::props(sys.dead_letters(), sys.log());
338        let _dl_logger = sys_actor_of_props(&prov, &sys, props, "dl_logger")?;
339
340        sys.complete_start();
341
342        debug!(sys.log, "Actor system [{}] [{}] started", sys.id(), name);
343
344        Ok(sys)
345    }
346
347    fn complete_start(&self) {
348        self.sys_actors.as_ref().unwrap().user.sys_init(self);
349    }
350
351    /// Returns the system start date
352    fn start_date(&self) -> &DateTime<Utc> {
353        &self.proto.started_at
354    }
355
356    /// Returns the number of seconds since the system started
357    #[allow(clippy::cast_sign_loss)]
358    pub fn uptime(&self) -> u64 {
359        let now = Utc::now();
360        now.time()
361            .signed_duration_since(self.start_date().time())
362            .num_seconds() as u64
363    }
364
365    /// Returns the hostname used when the system started
366    ///
367    /// The host is used in actor addressing.
368    ///
369    /// Currently not used, but will be once system clustering is introduced.
370    pub fn host(&self) -> Arc<String> {
371        self.proto.host.clone()
372    }
373
374    /// Returns the UUID assigned to the system
375    pub fn id(&self) -> Uuid {
376        self.proto.id
377    }
378
379    /// Returns the name of the system
380    pub fn name(&self) -> String {
381        self.proto.name.clone()
382    }
383
384    pub fn print_tree(&self) {
385        println!("{}", self.get_tree());
386    }
387
388    #[allow(clippy::items_after_statements)]
389    pub fn get_tree(&self) -> String {
390        let mut tree_str: String = String::new();
391        let root = self.sys_actors.as_ref().unwrap().root.clone();
392
393        fn get_node(
394            mut tree_str: &mut String,
395            sys: &ActorSystem,
396            node: &BasicActorRef,
397            indent: &str,
398        ) -> String {
399            if node.is_root() {
400                tree_str.push_str(&format!("{}\n", sys.name()));
401
402                for actor in node.children() {
403                    get_node(&mut tree_str, sys, &actor, "");
404                }
405            } else {
406                tree_str.push_str(&format!("{}\u{2514}\u{2500} {}\n", indent, node.name()));
407
408                for actor in node.children() {
409                    get_node(tree_str, sys, &actor, &(indent.to_string() + "   "));
410                }
411            }
412            (*tree_str).to_string()
413        }
414
415        get_node(&mut tree_str, self, &root, "")
416    }
417
418    /// Returns the system root's actor reference
419    // fn root(&self) -> &BasicActorRef {
420    //     &self.sys_actors.as_ref().unwrap().root
421    // }
422
423    /// Returns the user root actor reference
424    pub fn user_root(&self) -> &BasicActorRef {
425        &self.sys_actors.as_ref().unwrap().user
426    }
427
428    /// Returns the system root actor reference
429    pub fn sys_root(&self) -> &BasicActorRef {
430        &self.sys_actors.as_ref().unwrap().sysm
431    }
432
433    /// Reutrns the temp root actor reference
434    pub fn temp_root(&self) -> &BasicActorRef {
435        &self.sys_actors.as_ref().unwrap().temp
436    }
437
438    /// Returns a reference to the system events channel
439    pub fn sys_events(&self) -> &ActorRef<ChannelMsg<SystemEvent>> {
440        &self.sys_channels.as_ref().unwrap().sys_events
441    }
442
443    /// Returns a reference to the dead letters channel
444    pub fn dead_letters(&self) -> &ActorRef<DLChannelMsg> {
445        &self.sys_channels.as_ref().unwrap().dead_letters
446    }
447
448    pub fn publish_event(&self, evt: SystemEvent) {
449        let topic = Topic::from(&evt);
450        self.sys_events().tell(Publish { topic, msg: evt }, None);
451    }
452
453    /// Returns the `Config` used by the system
454    pub fn config(&self) -> Config {
455        self.proto.config.clone()
456    }
457
458    pub(crate) fn sys_settings(&self) -> &SystemSettings {
459        &self.proto.sys_settings
460    }
461
462    /// Create an actor under the system root
463    pub fn sys_actor_of_props<A>(
464        &self,
465        props: BoxActorProd<A>,
466        name: &str,
467    ) -> Result<ActorRef<A::Msg>, CreateError>
468    where
469        A: Actor,
470    {
471        self.provider
472            .create_actor(props, name, self.sys_root(), self)
473    }
474
475    pub fn sys_actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
476    where
477        A: ActorFactory,
478    {
479        self.provider
480            .create_actor(Props::new_no_args(A::create), name, self.sys_root(), self)
481    }
482
483    pub fn sys_actor_of_args<A, Args>(
484        &self,
485        name: &str,
486        args: Args,
487    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
488    where
489        Args: ActorArgs,
490        A: ActorFactoryArgs<Args>,
491    {
492        self.provider.create_actor(
493            Props::new_args(A::create_args, args),
494            name,
495            self.sys_root(),
496            self,
497        )
498    }
499
500    #[inline]
501    pub fn log(&self) -> Logger {
502        self.log.clone()
503    }
504
505    /// Returns a future which is completed when all
506    /// actors have successfully stopped.
507    ///
508    /// Note! In the current implementation the future will not complete if
509    /// root actor is already terminated.
510    pub fn when_terminated(&self) -> Terminated {
511        let (tx, rx) = oneshot::channel::<()>();
512        let tx = Arc::new(Mutex::new(Some(tx)));
513
514        self.tmp_actor_of_args::<WhenTerminatedActor, _>(tx)
515            .unwrap();
516
517        rx
518    }
519
520    /// Shutdown the actor system
521    ///
522    /// Attempts a graceful shutdown of the system and all actors.
523    /// Actors will receive a stop message, executing `actor.post_stop`.
524    ///
525    /// Does not block. Returns a future which is completed when all
526    /// actors have successfully stopped.
527    pub fn shutdown(&self) -> Shutdown {
528        let receiver = self.when_terminated();
529
530        // todo this is prone to failing since there is no
531        // confirmation that ShutdownActor has subscribed to
532        // the ActorTerminated events yet.
533        // It may be that the user root actor is Sterminated
534        // before the subscription is complete.
535
536        // std::thread::sleep_ms(1000);
537        // send stop to all /user children
538        // self.tmp_actor_of_args::<ShutdownActor, _>(tx).unwrap();
539
540        self.stop(self.user_root());
541
542        receiver
543    }
544}
545
546unsafe impl Send for ActorSystem {}
547unsafe impl Sync for ActorSystem {}
548
549impl ActorRefFactory for ActorSystem {
550    fn actor_of_props<A>(
551        &self,
552        props: BoxActorProd<A>,
553        name: &str,
554    ) -> Result<ActorRef<A::Msg>, CreateError>
555    where
556        A: Actor,
557    {
558        self.provider
559            .create_actor(props, name, self.user_root(), self)
560    }
561
562    fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
563    where
564        A: ActorFactory,
565    {
566        self.provider
567            .create_actor(Props::new_no_args(A::create), name, self.user_root(), self)
568    }
569
570    fn actor_of_args<A, Args>(
571        &self,
572        name: &str,
573        args: Args,
574    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
575    where
576        Args: ActorArgs,
577        A: ActorFactoryArgs<Args>,
578    {
579        self.provider.create_actor(
580            Props::new_args(A::create_args, args),
581            name,
582            self.user_root(),
583            self,
584        )
585    }
586
587    fn stop(&self, actor: impl ActorReference) {
588        actor.sys_tell(SystemCmd::Stop.into());
589    }
590}
591
592impl TmpActorRefFactory for ActorSystem {
593    fn tmp_actor_of_props<A>(&self, props: BoxActorProd<A>) -> Result<ActorRef<A::Msg>, CreateError>
594    where
595        A: Actor,
596    {
597        let name = format!("{}", rand::random::<u64>());
598        self.provider
599            .create_actor(props, &name, self.temp_root(), self)
600    }
601
602    fn tmp_actor_of<A>(&self) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
603    where
604        A: ActorFactory,
605    {
606        let name = format!("{}", rand::random::<u64>());
607        self.provider
608            .create_actor(Props::new_no_args(A::create), &name, self.temp_root(), self)
609    }
610
611    fn tmp_actor_of_args<A, Args>(
612        &self,
613        args: Args,
614    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
615    where
616        Args: ActorArgs,
617        A: ActorFactoryArgs<Args>,
618    {
619        let name = format!("{}", rand::random::<u64>());
620        self.provider.create_actor(
621            Props::new_args(A::create_args, args),
622            &name,
623            self.temp_root(),
624            self,
625        )
626    }
627}
628
629impl ActorSelectionFactory for ActorSystem {
630    fn select(&self, path: &str) -> Result<ActorSelection, InvalidPath> {
631        let anchor = self.user_root();
632        let (anchor, path_str) = if path.starts_with('/') {
633            let anchor_path = format!("{}/", anchor.path().deref().clone());
634            let path = path.to_string().replace(&anchor_path, "");
635
636            (anchor, path)
637        } else {
638            (anchor, path.to_string())
639        };
640
641        ActorSelection::new(
642            anchor.clone(),
643            // self.dead_letters(),
644            path_str,
645        )
646    }
647}
648
649// futures::task::Spawn::spawn requires &mut self so
650// we'll create a wrapper trait that requires only &self.
651pub trait Run {
652    fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
653    where
654        Fut: Future + Send + 'static,
655        <Fut as Future>::Output: Send;
656}
657
658impl Run for ActorSystem {
659    fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
660    where
661        Fut: Future + Send + 'static,
662        <Fut as Future>::Output: Send,
663    {
664        let mut exec = self.exec.clone();
665        exec.spawn_with_handle(future)
666    }
667}
668
669impl fmt::Debug for ActorSystem {
670    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
671        write!(
672            f,
673            "ActorSystem[Name: {}, Start Time: {}, Uptime: {} seconds]",
674            self.name(),
675            self.start_date(),
676            self.uptime()
677        )
678    }
679}
680
681impl Timer for ActorSystem {
682    fn schedule<T, M>(
683        &self,
684        initial_delay: Duration,
685        interval: Duration,
686        receiver: ActorRef<M>,
687        sender: Sender,
688        msg: T,
689    ) -> Uuid
690    where
691        T: Message + Into<M>,
692        M: Message,
693    {
694        let id = Uuid::new_v4();
695        let msg: M = msg.into();
696
697        let job = RepeatJob {
698            id,
699            send_at: Instant::now() + initial_delay,
700            interval,
701            receiver: receiver.into(),
702            sender,
703            msg: AnyMessage::new(msg, false),
704        };
705
706        let _ = self.timer.send(Job::Repeat(job));
707        id
708    }
709
710    fn schedule_once<T, M>(
711        &self,
712        delay: Duration,
713        receiver: ActorRef<M>,
714        sender: Sender,
715        msg: T,
716    ) -> Uuid
717    where
718        T: Message + Into<M>,
719        M: Message,
720    {
721        let id = Uuid::new_v4();
722        let msg: M = msg.into();
723
724        let job = OnceJob {
725            id,
726            send_at: Instant::now() + delay,
727            receiver: receiver.into(),
728            sender,
729            msg: AnyMessage::new(msg, true),
730        };
731
732        let _ = self.timer.send(Job::Once(job));
733        id
734    }
735
736    fn schedule_at_time<T, M>(
737        &self,
738        time: DateTime<Utc>,
739        receiver: ActorRef<M>,
740        sender: Sender,
741        msg: T,
742    ) -> Uuid
743    where
744        T: Message + Into<M>,
745        M: Message,
746    {
747        let delay = std::cmp::max(time.timestamp() - Utc::now().timestamp(), 0 as i64);
748        #[allow(clippy::cast_sign_loss)]
749        let delay = Duration::from_secs(delay as u64);
750
751        let id = Uuid::new_v4();
752        let msg: M = msg.into();
753
754        let job = OnceJob {
755            id,
756            send_at: Instant::now() + delay,
757            receiver: receiver.into(),
758            sender,
759            msg: AnyMessage::new(msg, true),
760        };
761
762        let _ = self.timer.send(Job::Once(job));
763        id
764    }
765
766    fn cancel_schedule(&self, id: Uuid) {
767        let _ = self.timer.send(Job::Cancel(id));
768    }
769}
770
771// helper functions
772
773fn sys_actor_of_props<A>(
774    prov: &Provider,
775    sys: &ActorSystem,
776    props: BoxActorProd<A>,
777    name: &str,
778) -> Result<ActorRef<A::Msg>, SystemError>
779where
780    A: Actor,
781{
782    prov.create_actor(props, name, sys.sys_root(), sys)
783        .map_err(|_| SystemError::ModuleFailed(name.into()))
784}
785
786fn sys_actor_of<A>(
787    prov: &Provider,
788    sys: &ActorSystem,
789    name: &str,
790) -> Result<ActorRef<<A as Actor>::Msg>, SystemError>
791where
792    A: ActorFactory,
793{
794    prov.create_actor(Props::new_no_args(A::create), name, sys.sys_root(), sys)
795        .map_err(|_| SystemError::ModuleFailed(name.into()))
796}
797
798// fn sys_actor_of_args<A, Args>(
799//     prov: &Provider,
800//     sys: &ActorSystem,
801//     name: &str,
802//     args: Args,
803// ) -> Result<ActorRef<<A as Actor>::Msg>, SystemError>
804// where
805//     Args: ActorArgs,
806//     A: ActorFactoryArgs<Args>,
807// {
808//     prov.create_actor(
809//         Props::new_args(A::create_args, args),
810//         name,
811//         &sys.sys_root(),
812//         sys,
813//     )
814//     .map_err(|_| SystemError::ModuleFailed(name.into()))
815// }
816
817fn sys_channels(prov: &Provider, sys: &ActorSystem) -> Result<SysChannels, SystemError> {
818    let sys_events = sys_actor_of::<EventsChannel>(prov, sys, "sys_events")?;
819    let dead_letters = sys_actor_of::<Channel<DeadLetter>>(prov, sys, "dead_letters")?;
820
821    // subscribe the dead_letters channel to actor terminated events
822    // so that any future subscribed actors that terminate are automatically
823    // unsubscribed from the dead_letters channel
824    // let msg = ChannelMsg::Subscribe(SysTopic::ActorTerminated.into(), dl.clone());
825    // es.tell(msg, None);
826
827    Ok(SysChannels {
828        sys_events,
829        dead_letters,
830    })
831}
832
833pub struct SystemSettings {
834    pub msg_process_limit: u32,
835}
836
837impl<'a> From<&'a Config> for SystemSettings {
838    fn from(config: &Config) -> Self {
839        Self {
840            msg_process_limit: config.get::<u32>("mailbox.msg_process_limit").unwrap(),
841        }
842    }
843}
844
845struct ThreadPoolConfig {
846    pool_size: usize,
847}
848
849impl<'a> From<&'a Config> for ThreadPoolConfig {
850    fn from(config: &Config) -> Self {
851        Self {
852            pool_size: config.get::<usize>("dispatcher.pool_size").unwrap(),
853        }
854    }
855}
856
857fn default_exec(cfg: &Config) -> ThreadPool {
858    let exec_cfg = ThreadPoolConfig::from(cfg);
859    ThreadPoolBuilder::new()
860        .pool_size(exec_cfg.pool_size)
861        .name_prefix("pool-thread-#")
862        .create()
863        .unwrap()
864}
865
866#[derive(Clone)]
867pub struct SysActors {
868    pub root: BasicActorRef,
869    pub user: BasicActorRef,
870    pub sysm: BasicActorRef,
871    pub temp: BasicActorRef,
872}
873
874#[derive(Clone)]
875pub struct SysChannels {
876    pub sys_events: ActorRef<ChannelMsg<SystemEvent>>,
877    pub dead_letters: ActorRef<DLChannelMsg>,
878}
879
880pub type Shutdown = oneshot::Receiver<()>;
881pub type Terminated = oneshot::Receiver<()>;
882
883#[derive(Clone)]
884struct WhenTerminatedActor {
885    tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
886}
887
888impl ActorFactoryArgs<Arc<Mutex<Option<oneshot::Sender<()>>>>> for WhenTerminatedActor {
889    fn create_args(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
890        Self::new(tx)
891    }
892}
893
894impl WhenTerminatedActor {
895    fn new(tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
896        Self { tx }
897    }
898}
899
900impl Actor for WhenTerminatedActor {
901    type Msg = SystemEvent;
902
903    fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
904        let sub = Subscribe {
905            topic: SysTopic::ActorTerminated.into(),
906            actor: Box::new(ctx.myself.clone()),
907        };
908        ctx.system.sys_events().tell(sub, None);
909    }
910
911    fn recv(&mut self, _: &Context<Self::Msg>, _: Self::Msg, _: Option<BasicActorRef>) {}
912
913    fn sys_recv(
914        &mut self,
915        ctx: &Context<Self::Msg>,
916        msg: SystemMsg,
917        sender: Option<BasicActorRef>,
918    ) {
919        if let SystemMsg::Event(evt) = msg {
920            if let SystemEvent::ActorTerminated(terminated) = evt {
921                self.receive(ctx, terminated, sender);
922            }
923        }
924    }
925}
926
927impl Receive<ActorTerminated> for WhenTerminatedActor {
928    type Msg = SystemEvent;
929
930    //noinspection RsBorrowChecker
931    fn receive(
932        &mut self,
933        ctx: &Context<Self::Msg>,
934        msg: ActorTerminated,
935        _sender: Option<BasicActorRef>,
936    ) {
937        if &msg.actor == ctx.system.user_root() {
938            if let Ok(ref mut tx) = self.tx.lock() {
939                if let Some(tx) = tx.take() {
940                    tx.send(()).unwrap();
941                }
942            }
943        }
944    }
945}