actors_rs/actor/
actor_cell.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    ops::Deref,
5    sync::{
6        atomic::{AtomicBool, AtomicUsize, Ordering},
7        Arc, RwLock,
8    },
9    time::{Duration, Instant},
10};
11
12use chrono::prelude::*;
13use futures::{future::RemoteHandle, task::SpawnError, Future};
14use uuid::Uuid;
15
16use rand;
17
18use crate::actor_ref::{
19    ActorRef, ActorRefFactory, ActorReference, BasicActorRef, Tell, TmpActorRefFactory,
20};
21use crate::{
22    actor::{props::ActorFactory, *},
23    kernel::{
24        kernel_ref::{dispatch, dispatch_any, KernelRef},
25        mailbox::{AnySender, MailboxSender},
26    },
27    system::{
28        timer::{Job, OnceJob, RepeatJob, Timer},
29        ActorSystem, Run, SystemCmd, SystemMsg,
30    },
31    validate::InvalidPath,
32    AnyMessage, Envelope, Message,
33};
34
35#[derive(Clone)]
36pub struct ActorCell {
37    inner: Arc<ActorCellInner>,
38}
39
40#[derive(Clone)]
41struct ActorCellInner {
42    uid: ActorId,
43    uri: ActorUri,
44    parent: Option<BasicActorRef>,
45    children: Children,
46    is_remote: bool,
47    is_terminating: Arc<AtomicBool>,
48    is_restarting: Arc<AtomicBool>,
49    // persistence: Persistence,
50    status: Arc<AtomicUsize>,
51    kernel: Option<KernelRef>,
52    system: ActorSystem,
53    mailbox: Arc<dyn AnySender>,
54    sys_mailbox: MailboxSender<SystemMsg>,
55}
56
57impl ActorCell {
58    /// Constructs a new `ActorCell`
59    pub(crate) fn new(
60        uid: ActorId,
61        uri: ActorUri,
62        parent: Option<BasicActorRef>,
63        system: &ActorSystem,
64        // perconf: Option<PersistenceConf>,
65        mailbox: Arc<dyn AnySender>,
66        sys_mailbox: MailboxSender<SystemMsg>,
67    ) -> Self {
68        Self {
69            inner: Arc::new(ActorCellInner {
70                uid,
71                uri,
72                parent,
73                children: Children::new(),
74                is_remote: false,
75                is_terminating: Arc::new(AtomicBool::new(false)),
76                is_restarting: Arc::new(AtomicBool::new(false)),
77                // persistence: Persistence {
78                //     // event_store: system.event_store.clone(),
79                //     is_persisting: Arc::new(AtomicBool::new(false)),
80                //     persistence_conf: perconf,
81                // },
82                status: Arc::new(AtomicUsize::new(0)),
83                kernel: None,
84                system: system.clone(),
85                mailbox,
86                sys_mailbox,
87            }),
88        }
89    }
90
91    pub(crate) fn init(self, kernel: &KernelRef) -> Self {
92        let inner = ActorCellInner {
93            kernel: Some(kernel.clone()),
94            ..self.inner.deref().clone()
95        };
96
97        Self {
98            inner: Arc::new(inner),
99        }
100    }
101
102    pub(crate) fn kernel(&self) -> &KernelRef {
103        self.inner.kernel.as_ref().unwrap()
104    }
105
106    pub(crate) fn myself(&self) -> BasicActorRef {
107        BasicActorRef { cell: self.clone() }
108    }
109
110    pub(crate) fn uri(&self) -> &ActorUri {
111        &self.inner.uri
112    }
113
114    pub(crate) fn parent(&self) -> BasicActorRef {
115        self.inner.parent.as_ref().unwrap().clone()
116    }
117
118    pub fn has_children(&self) -> bool {
119        self.inner.children.len() > 0
120    }
121
122    pub(crate) fn children<'a>(&'a self) -> Box<dyn Iterator<Item = BasicActorRef> + 'a> {
123        Box::new(self.inner.children.iter().clone())
124    }
125
126    pub(crate) fn user_root(&self) -> BasicActorRef {
127        self.inner.system.user_root().clone()
128    }
129
130    pub(crate) fn is_root(&self) -> bool {
131        self.inner.uid == 0
132    }
133
134    pub fn is_user(&self) -> bool {
135        self.inner.system.user_root().is_child(&self.myself())
136    }
137
138    pub(crate) fn send_any_msg(
139        &self,
140        msg: &mut AnyMessage,
141        sender: crate::actor::Sender,
142    ) -> Result<(), ()> {
143        let mb = &self.inner.mailbox;
144        let k = self.kernel();
145
146        dispatch_any(msg, sender, mb, k, &self.inner.system)
147    }
148
149    pub(crate) fn send_sys_msg(&self, msg: Envelope<SystemMsg>) -> MsgResult<Envelope<SystemMsg>> {
150        let mb = &self.inner.sys_mailbox;
151
152        let k = self.kernel();
153        dispatch(msg, mb, k, &self.inner.system)
154    }
155
156    pub(crate) fn is_child(&self, actor: &BasicActorRef) -> bool {
157        self.inner.children.iter().any(|child| child == *actor)
158    }
159
160    #[allow(clippy::unused_self)]
161    pub(crate) fn stop(&self, actor: &BasicActorRef) {
162        actor.sys_tell(SystemCmd::Stop.into());
163    }
164
165    // pub(crate) fn persistence_conf(&self) -> Option<PersistenceConf> {
166    //     self.inner.persistence.persistence_conf.clone()
167    // }
168
169    // pub fn is_persisting(&self) -> bool {
170    //     self.inner.persistence.is_persisting.load(Ordering::Relaxed)
171    // }
172
173    // pub fn set_persisting(&self, b: bool) {
174    //     self.inner.persistence.is_persisting.store(b, Ordering::Relaxed);
175    // }
176
177    pub fn add_child(&self, actor: BasicActorRef) {
178        self.inner.children.add(actor);
179    }
180
181    pub fn remove_child(&self, actor: &BasicActorRef) {
182        self.inner.children.remove(actor)
183    }
184
185    pub fn receive_cmd<A: Actor>(&self, cmd: &SystemCmd, actor: &mut Option<A>) {
186        match cmd {
187            SystemCmd::Stop => self.terminate(actor),
188            SystemCmd::Restart => self.restart(),
189        }
190    }
191
192    pub fn terminate<A: Actor>(&self, actor: &mut Option<A>) {
193        // *1. Suspend non-system mailbox messages
194        // *2. Iterate all children and send Stop to each
195        // *3. Wait for ActorTerminated from each child
196
197        self.inner.is_terminating.store(true, Ordering::Relaxed);
198
199        if self.has_children() {
200            for child in Box::new(self.inner.children.iter().clone()) {
201                self.stop(&child.clone());
202            }
203        } else {
204            self.kernel().terminate(&self.inner.system);
205            post_stop(actor);
206        }
207    }
208
209    pub fn restart(&self) {
210        if self.has_children() {
211            self.inner.is_restarting.store(true, Ordering::Relaxed);
212            for child in Box::new(self.inner.children.iter().clone()) {
213                self.stop(&child.clone());
214            }
215        } else {
216            self.kernel().restart(&self.inner.system);
217        }
218    }
219
220    pub fn death_watch<A: Actor>(&self, terminated: &BasicActorRef, actor: &mut Option<A>) {
221        if self.is_child(terminated) {
222            self.remove_child(terminated);
223
224            if !self.has_children() {
225                // No children exist. Stop this actor's kernel.
226                if self.inner.is_terminating.load(Ordering::Relaxed) {
227                    self.kernel().terminate(&self.inner.system);
228                    post_stop(actor);
229                }
230
231                // No children exist. Restart the actor.
232                if self.inner.is_restarting.load(Ordering::Relaxed) {
233                    self.inner.is_restarting.store(false, Ordering::Relaxed);
234                    self.kernel().restart(&self.inner.system);
235                }
236            }
237        }
238    }
239
240    pub fn handle_failure(&self, failed: &BasicActorRef, strategy: &Strategy) {
241        match strategy {
242            Strategy::Stop => self.stop(failed),
243            Strategy::Restart => self.restart_child(failed),
244            Strategy::Escalate => self.escalate_failure(),
245        }
246    }
247
248    #[allow(clippy::unused_self)]
249    pub fn restart_child(&self, actor: &BasicActorRef) {
250        actor.sys_tell(SystemCmd::Restart.into());
251    }
252
253    pub fn escalate_failure(&self) {
254        self.inner
255            .parent
256            .as_ref()
257            .unwrap()
258            .sys_tell(SystemMsg::Failed(self.myself()));
259    }
260
261    // pub fn load_events<A: Actor>(&self, actor: &mut Option<A>) -> bool {
262    //     let event_store = &self.inner.persistence.event_store;
263    //     let perconf = &self.inner.persistence.persistence_conf;
264
265    //     match (actor, event_store, perconf) {
266    //         (Some(_), Some(es), Some(perconf)) => {
267    //             let myself = self.myself();
268    //             // query(&perconf.id,
269    //             //         &perconf.keyspace,
270    //             //         &es,
271    //             //         self,
272    //             //         myself); // todo implement
273
274    //             false
275    //         }
276    //         (Some(_), None, Some(_)) => {
277    //             warn!("Can't load actor events. No event store configured");
278    //             true
279    //         }
280    //         _ => {
281    //             // anything else either the actor is None or there's no persistence configured
282    //             true
283    //         }
284    //     }
285    //     unimplemented!()
286    // }
287
288    // pub fn replay<A: Actor>(&self,
289    //             ctx: &Context<A::Msg>,
290    //             evts: Vec<A::Msg>,
291    //             actor: &mut Option<A>) {
292    //     if let Some(actor) = actor.as_mut() {
293    //         for event in evts.iter() {
294    //             actor.replay_event(ctx, event.clone());
295    //         }
296    //     }
297    // }
298}
299
300impl<Msg: Message> From<ExtendedCell<Msg>> for ActorCell {
301    fn from(cell: ExtendedCell<Msg>) -> Self {
302        cell.cell
303    }
304}
305
306impl fmt::Debug for ActorCell {
307    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
308        write!(f, "ActorCell[{:?}]", self.uri())
309    }
310}
311
312impl TmpActorRefFactory for ActorCell {
313    fn tmp_actor_of_props<A: Actor>(
314        &self,
315        _props: BoxActorProd<A>,
316    ) -> Result<ActorRef<A::Msg>, CreateError> {
317        let _name = format!("{}", rand::random::<u64>());
318
319        // self.inner
320        //     .kernel
321        //     .create_actor(props, &name, &self.inner.system.temp_root())
322        unimplemented!()
323    }
324
325    fn tmp_actor_of<A: ActorFactory>(&self) -> Result<ActorRef<<A as Actor>::Msg>, CreateError> {
326        let _name = format!("{}", rand::random::<u64>());
327
328        // self.inner
329        //     .kernel
330        //     .create_actor(props, &name, &self.inner.system.temp_root())
331        unimplemented!()
332    }
333
334    fn tmp_actor_of_args<A, Args>(
335        &self,
336        _args: Args,
337    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
338    where
339        Args: ActorArgs,
340        A: ActorFactoryArgs<Args>,
341    {
342        let _name = format!("{}", rand::random::<u64>());
343
344        // self.inner
345        //     .kernel
346        //     .create_actor(props, &name, &self.inner.system.temp_root())
347        unimplemented!()
348    }
349}
350
351#[derive(Clone)]
352pub struct ExtendedCell<Msg: Message> {
353    cell: ActorCell,
354    mailbox: MailboxSender<Msg>,
355}
356
357impl<Msg> ExtendedCell<Msg>
358where
359    Msg: Message,
360{
361    pub(crate) fn new(
362        uid: ActorId,
363        uri: ActorUri,
364        parent: Option<BasicActorRef>,
365        system: &ActorSystem,
366        // perconf: Option<PersistenceConf>,
367        any_mailbox: Arc<dyn AnySender>,
368        sys_mailbox: MailboxSender<SystemMsg>,
369        mailbox: MailboxSender<Msg>,
370    ) -> Self {
371        let cell = ActorCell {
372            inner: Arc::new(ActorCellInner {
373                uid,
374                uri,
375                parent,
376                children: Children::new(),
377                is_remote: false,
378                is_terminating: Arc::new(AtomicBool::new(false)),
379                is_restarting: Arc::new(AtomicBool::new(false)),
380                // persistence: Persistence {
381                //     // event_store: system.event_store.clone(),
382                //     is_persisting: Arc::new(AtomicBool::new(false)),
383                //     persistence_conf: perconf,
384                // },
385                status: Arc::new(AtomicUsize::new(0)),
386                kernel: None,
387                system: system.clone(),
388                mailbox: any_mailbox,
389                sys_mailbox,
390            }),
391        };
392
393        Self { cell, mailbox }
394    }
395
396    pub(crate) fn init(self, kernel: &KernelRef) -> Self {
397        let cell = self.cell.init(kernel);
398
399        Self { cell, ..self }
400    }
401
402    pub fn myself(&self) -> ActorRef<Msg> {
403        self.cell.myself().typed(self.clone())
404    }
405
406    pub fn uri(&self) -> &ActorUri {
407        self.cell.uri()
408    }
409
410    pub fn parent(&self) -> BasicActorRef {
411        self.cell.parent()
412    }
413
414    pub fn has_children(&self) -> bool {
415        self.cell.has_children()
416    }
417
418    pub(crate) fn is_child(&self, actor: &BasicActorRef) -> bool {
419        self.cell.is_child(actor)
420    }
421
422    pub fn children<'a>(&'a self) -> Box<dyn Iterator<Item = BasicActorRef> + 'a> {
423        self.cell.children()
424    }
425
426    pub fn user_root(&self) -> BasicActorRef {
427        self.cell.user_root()
428    }
429
430    pub fn is_root(&self) -> bool {
431        self.cell.is_root()
432    }
433
434    pub fn is_user(&self) -> bool {
435        self.cell.is_user()
436    }
437
438    pub(crate) fn send_msg(&self, msg: Envelope<Msg>) -> MsgResult<Envelope<Msg>> {
439        let mb = &self.mailbox;
440        let k = self.cell.kernel();
441
442        dispatch(msg, mb, k, self.system()).map_err(|e| {
443            let dl = e.clone(); // clone the failed message and send to dead letters
444            let dl = DeadLetter {
445                msg: format!("{:?}", dl.msg.msg),
446                sender: dl.msg.sender,
447                recipient: self.cell.myself(),
448            };
449
450            self.cell.inner.system.dead_letters().tell(
451                Publish {
452                    topic: "dead_letter".into(),
453                    msg: dl,
454                },
455                None,
456            );
457
458            e
459        })
460    }
461
462    pub(crate) fn send_sys_msg(&self, msg: Envelope<SystemMsg>) -> MsgResult<Envelope<SystemMsg>> {
463        self.cell.send_sys_msg(msg)
464    }
465
466    pub fn system(&self) -> &ActorSystem {
467        &self.cell.inner.system
468    }
469
470    pub(crate) fn handle_failure(&self, failed: &BasicActorRef, strategy: &Strategy) {
471        self.cell.handle_failure(failed, strategy)
472    }
473
474    pub(crate) fn receive_cmd<A: Actor>(&self, cmd: &SystemCmd, actor: &mut Option<A>) {
475        self.cell.receive_cmd(cmd, actor)
476    }
477
478    pub(crate) fn death_watch<A: Actor>(&self, terminated: &BasicActorRef, actor: &mut Option<A>) {
479        self.cell.death_watch(terminated, actor)
480    }
481}
482
483impl<Msg: Message> fmt::Debug for ExtendedCell<Msg> {
484    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485        write!(f, "ExtendedCell[{:?}]", self.uri())
486    }
487}
488
489fn post_stop<A: Actor>(actor: &mut Option<A>) {
490    // If the actor instance exists we can execute post_stop.
491    // The instance will be None if this is an actor that has failed
492    // and is being terminated by an escalated supervisor.
493    if let Some(act) = actor.as_mut() {
494        act.post_stop();
495    }
496}
497
498/// Provides context, including the actor system during actor execution.
499///
500/// `Context` is passed to an actor's functions, such as
501/// `receive`.
502///
503/// Operations performed are in most cases done so from the
504/// actor's perspective. For example, creating a child actor
505/// using `ctx.actor_of` will create the child under the current
506/// actor within the heirarchy. In a similar manner, persistence
507/// operations such as `persist_event` use the current actor's
508/// persistence configuration.
509///
510/// Since `Context` is specific to an actor and its functions
511/// it is not cloneable.
512#[derive(Debug)]
513pub struct Context<Msg: Message> {
514    pub myself: ActorRef<Msg>,
515    pub system: ActorSystem,
516    // pub persistence: Persistence,
517    pub(crate) kernel: KernelRef,
518}
519
520impl<Msg> Context<Msg>
521where
522    Msg: Message,
523{
524    /// Returns the `ActorRef` of the current actor.
525    #[inline]
526    pub fn myself(&self) -> ActorRef<Msg> {
527        self.myself.clone()
528    }
529
530    /// Return the name of actor. Useful for logging
531    #[inline]
532    pub fn name(&self) -> &str {
533        self.myself.name()
534    }
535}
536
537impl<Msg: Message> ActorRefFactory for Context<Msg> {
538    fn actor_of_props<A>(
539        &self,
540        props: BoxActorProd<A>,
541        name: &str,
542    ) -> Result<ActorRef<A::Msg>, CreateError>
543    where
544        A: Actor,
545    {
546        self.system
547            .provider
548            .create_actor(props, name, &self.myself().into(), &self.system)
549    }
550
551    fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
552    where
553        A: ActorFactory,
554    {
555        self.system.provider.create_actor(
556            Props::new_no_args(A::create),
557            name,
558            &self.myself().into(),
559            &self.system,
560        )
561    }
562
563    fn actor_of_args<A, Args>(
564        &self,
565        name: &str,
566        args: Args,
567    ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
568    where
569        Args: ActorArgs,
570        A: ActorFactoryArgs<Args>,
571    {
572        self.system.provider.create_actor(
573            Props::new_args(A::create_args, args),
574            name,
575            &self.myself().into(),
576            &self.system,
577        )
578    }
579
580    fn stop(&self, actor: impl ActorReference) {
581        actor.sys_tell(SystemCmd::Stop.into());
582    }
583}
584
585impl<Msg> ActorSelectionFactory for Context<Msg>
586where
587    Msg: Message,
588{
589    fn select(&self, path: &str) -> Result<ActorSelection, InvalidPath> {
590        let (anchor, path_str) = if path.starts_with('/') {
591            let anchor = self.system.user_root().clone();
592            let anchor_path = format!("{}/", anchor.path().deref().clone());
593            let path = path.to_string().replace(&anchor_path, "");
594
595            (anchor, path)
596        } else {
597            (self.myself.clone().into(), path.to_string())
598        };
599
600        ActorSelection::new(
601            anchor, // self.system.dead_letters(),
602            path_str,
603        )
604    }
605}
606
607impl<Msg> Run for Context<Msg>
608where
609    Msg: Message,
610{
611    fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
612    where
613        Fut: Future + Send + 'static,
614        <Fut as Future>::Output: Send,
615    {
616        self.system.run(future)
617    }
618}
619
620impl<Msg> Timer for Context<Msg>
621where
622    Msg: Message,
623{
624    fn schedule<T, M>(
625        &self,
626        initial_delay: Duration,
627        interval: Duration,
628        receiver: ActorRef<M>,
629        sender: Sender,
630        msg: T,
631    ) -> Uuid
632    where
633        T: Message + Into<M>,
634        M: Message,
635    {
636        let id = Uuid::new_v4();
637        let msg: M = msg.into();
638
639        let job = RepeatJob {
640            id,
641            send_at: Instant::now() + initial_delay,
642            interval,
643            receiver: receiver.into(),
644            sender,
645            msg: AnyMessage::new(msg, false),
646        };
647
648        self.system.timer.send(Job::Repeat(job)).unwrap();
649        id
650    }
651
652    fn schedule_once<T, M>(
653        &self,
654        delay: Duration,
655        receiver: ActorRef<M>,
656        sender: Sender,
657        msg: T,
658    ) -> Uuid
659    where
660        T: Message + Into<M>,
661        M: Message,
662    {
663        let id = Uuid::new_v4();
664        let msg: M = msg.into();
665
666        let job = OnceJob {
667            id,
668            send_at: Instant::now() + delay,
669            receiver: receiver.into(),
670            sender,
671            msg: AnyMessage::new(msg, true),
672        };
673
674        self.system.timer.send(Job::Once(job)).unwrap();
675        id
676    }
677
678    fn schedule_at_time<T, M>(
679        &self,
680        time: DateTime<Utc>,
681        receiver: ActorRef<M>,
682        sender: Sender,
683        msg: T,
684    ) -> Uuid
685    where
686        T: Message + Into<M>,
687        M: Message,
688    {
689        let delay = std::cmp::max(time.timestamp() - Utc::now().timestamp(), 0 as i64);
690        #[allow(clippy::cast_sign_loss)]
691        let delay = Duration::from_secs(delay as u64);
692
693        let id = Uuid::new_v4();
694        let msg: M = msg.into();
695
696        let job = OnceJob {
697            id,
698            send_at: Instant::now() + delay,
699            receiver: receiver.into(),
700            sender,
701            msg: AnyMessage::new(msg, true),
702        };
703
704        self.system.timer.send(Job::Once(job)).unwrap();
705        id
706    }
707
708    fn cancel_schedule(&self, id: Uuid) {
709        let _ = self.system.timer.send(Job::Cancel(id));
710    }
711}
712
713#[derive(Clone)]
714pub struct Children {
715    actors: Arc<RwLock<HashMap<String, BasicActorRef>>>,
716}
717
718impl Children {
719    #[allow(clippy::missing_const_for_fn)]
720    pub fn new() -> Self {
721        Self {
722            actors: Arc::new(RwLock::new(HashMap::new())),
723        }
724    }
725
726    pub fn add(&self, actor: BasicActorRef) {
727        self.actors
728            .write()
729            .unwrap()
730            .insert(actor.name().to_string(), actor);
731    }
732
733    pub fn remove(&self, actor: &BasicActorRef) {
734        self.actors.write().unwrap().remove(actor.name());
735    }
736
737    pub fn len(&self) -> usize {
738        self.actors.read().unwrap().len()
739    }
740
741    pub const fn iter(&self) -> ChildrenIterator {
742        ChildrenIterator {
743            children: self,
744            position: 0,
745        }
746    }
747}
748
749#[derive(Clone)]
750pub struct ChildrenIterator<'a> {
751    children: &'a Children,
752    position: usize,
753}
754
755impl<'a> Iterator for ChildrenIterator<'a> {
756    type Item = BasicActorRef;
757
758    fn next(&mut self) -> Option<Self::Item> {
759        let actors = self.children.actors.read().unwrap();
760        let actor = actors.values().nth(self.position);
761        self.position += 1;
762        actor.cloned()
763    }
764}