1use std::{
2 collections::HashMap,
3 fmt,
4 ops::Deref,
5 sync::{
6 atomic::{AtomicBool, AtomicUsize, Ordering},
7 Arc, RwLock,
8 },
9 time::{Duration, Instant},
10};
11
12use chrono::prelude::*;
13use futures::{future::RemoteHandle, task::SpawnError, Future};
14use uuid::Uuid;
15
16use rand;
17
18use crate::actor_ref::{
19 ActorRef, ActorRefFactory, ActorReference, BasicActorRef, Tell, TmpActorRefFactory,
20};
21use crate::{
22 actor::{props::ActorFactory, *},
23 kernel::{
24 kernel_ref::{dispatch, dispatch_any, KernelRef},
25 mailbox::{AnySender, MailboxSender},
26 },
27 system::{
28 timer::{Job, OnceJob, RepeatJob, Timer},
29 ActorSystem, Run, SystemCmd, SystemMsg,
30 },
31 validate::InvalidPath,
32 AnyMessage, Envelope, Message,
33};
34
35#[derive(Clone)]
36pub struct ActorCell {
37 inner: Arc<ActorCellInner>,
38}
39
40#[derive(Clone)]
41struct ActorCellInner {
42 uid: ActorId,
43 uri: ActorUri,
44 parent: Option<BasicActorRef>,
45 children: Children,
46 is_remote: bool,
47 is_terminating: Arc<AtomicBool>,
48 is_restarting: Arc<AtomicBool>,
49 status: Arc<AtomicUsize>,
51 kernel: Option<KernelRef>,
52 system: ActorSystem,
53 mailbox: Arc<dyn AnySender>,
54 sys_mailbox: MailboxSender<SystemMsg>,
55}
56
57impl ActorCell {
58 pub(crate) fn new(
60 uid: ActorId,
61 uri: ActorUri,
62 parent: Option<BasicActorRef>,
63 system: &ActorSystem,
64 mailbox: Arc<dyn AnySender>,
66 sys_mailbox: MailboxSender<SystemMsg>,
67 ) -> Self {
68 Self {
69 inner: Arc::new(ActorCellInner {
70 uid,
71 uri,
72 parent,
73 children: Children::new(),
74 is_remote: false,
75 is_terminating: Arc::new(AtomicBool::new(false)),
76 is_restarting: Arc::new(AtomicBool::new(false)),
77 status: Arc::new(AtomicUsize::new(0)),
83 kernel: None,
84 system: system.clone(),
85 mailbox,
86 sys_mailbox,
87 }),
88 }
89 }
90
91 pub(crate) fn init(self, kernel: &KernelRef) -> Self {
92 let inner = ActorCellInner {
93 kernel: Some(kernel.clone()),
94 ..self.inner.deref().clone()
95 };
96
97 Self {
98 inner: Arc::new(inner),
99 }
100 }
101
102 pub(crate) fn kernel(&self) -> &KernelRef {
103 self.inner.kernel.as_ref().unwrap()
104 }
105
106 pub(crate) fn myself(&self) -> BasicActorRef {
107 BasicActorRef { cell: self.clone() }
108 }
109
110 pub(crate) fn uri(&self) -> &ActorUri {
111 &self.inner.uri
112 }
113
114 pub(crate) fn parent(&self) -> BasicActorRef {
115 self.inner.parent.as_ref().unwrap().clone()
116 }
117
118 pub fn has_children(&self) -> bool {
119 self.inner.children.len() > 0
120 }
121
122 pub(crate) fn children<'a>(&'a self) -> Box<dyn Iterator<Item = BasicActorRef> + 'a> {
123 Box::new(self.inner.children.iter().clone())
124 }
125
126 pub(crate) fn user_root(&self) -> BasicActorRef {
127 self.inner.system.user_root().clone()
128 }
129
130 pub(crate) fn is_root(&self) -> bool {
131 self.inner.uid == 0
132 }
133
134 pub fn is_user(&self) -> bool {
135 self.inner.system.user_root().is_child(&self.myself())
136 }
137
138 pub(crate) fn send_any_msg(
139 &self,
140 msg: &mut AnyMessage,
141 sender: crate::actor::Sender,
142 ) -> Result<(), ()> {
143 let mb = &self.inner.mailbox;
144 let k = self.kernel();
145
146 dispatch_any(msg, sender, mb, k, &self.inner.system)
147 }
148
149 pub(crate) fn send_sys_msg(&self, msg: Envelope<SystemMsg>) -> MsgResult<Envelope<SystemMsg>> {
150 let mb = &self.inner.sys_mailbox;
151
152 let k = self.kernel();
153 dispatch(msg, mb, k, &self.inner.system)
154 }
155
156 pub(crate) fn is_child(&self, actor: &BasicActorRef) -> bool {
157 self.inner.children.iter().any(|child| child == *actor)
158 }
159
160 #[allow(clippy::unused_self)]
161 pub(crate) fn stop(&self, actor: &BasicActorRef) {
162 actor.sys_tell(SystemCmd::Stop.into());
163 }
164
165 pub fn add_child(&self, actor: BasicActorRef) {
178 self.inner.children.add(actor);
179 }
180
181 pub fn remove_child(&self, actor: &BasicActorRef) {
182 self.inner.children.remove(actor)
183 }
184
185 pub fn receive_cmd<A: Actor>(&self, cmd: &SystemCmd, actor: &mut Option<A>) {
186 match cmd {
187 SystemCmd::Stop => self.terminate(actor),
188 SystemCmd::Restart => self.restart(),
189 }
190 }
191
192 pub fn terminate<A: Actor>(&self, actor: &mut Option<A>) {
193 self.inner.is_terminating.store(true, Ordering::Relaxed);
198
199 if self.has_children() {
200 for child in Box::new(self.inner.children.iter().clone()) {
201 self.stop(&child.clone());
202 }
203 } else {
204 self.kernel().terminate(&self.inner.system);
205 post_stop(actor);
206 }
207 }
208
209 pub fn restart(&self) {
210 if self.has_children() {
211 self.inner.is_restarting.store(true, Ordering::Relaxed);
212 for child in Box::new(self.inner.children.iter().clone()) {
213 self.stop(&child.clone());
214 }
215 } else {
216 self.kernel().restart(&self.inner.system);
217 }
218 }
219
220 pub fn death_watch<A: Actor>(&self, terminated: &BasicActorRef, actor: &mut Option<A>) {
221 if self.is_child(terminated) {
222 self.remove_child(terminated);
223
224 if !self.has_children() {
225 if self.inner.is_terminating.load(Ordering::Relaxed) {
227 self.kernel().terminate(&self.inner.system);
228 post_stop(actor);
229 }
230
231 if self.inner.is_restarting.load(Ordering::Relaxed) {
233 self.inner.is_restarting.store(false, Ordering::Relaxed);
234 self.kernel().restart(&self.inner.system);
235 }
236 }
237 }
238 }
239
240 pub fn handle_failure(&self, failed: &BasicActorRef, strategy: &Strategy) {
241 match strategy {
242 Strategy::Stop => self.stop(failed),
243 Strategy::Restart => self.restart_child(failed),
244 Strategy::Escalate => self.escalate_failure(),
245 }
246 }
247
248 #[allow(clippy::unused_self)]
249 pub fn restart_child(&self, actor: &BasicActorRef) {
250 actor.sys_tell(SystemCmd::Restart.into());
251 }
252
253 pub fn escalate_failure(&self) {
254 self.inner
255 .parent
256 .as_ref()
257 .unwrap()
258 .sys_tell(SystemMsg::Failed(self.myself()));
259 }
260
261 }
299
300impl<Msg: Message> From<ExtendedCell<Msg>> for ActorCell {
301 fn from(cell: ExtendedCell<Msg>) -> Self {
302 cell.cell
303 }
304}
305
306impl fmt::Debug for ActorCell {
307 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
308 write!(f, "ActorCell[{:?}]", self.uri())
309 }
310}
311
312impl TmpActorRefFactory for ActorCell {
313 fn tmp_actor_of_props<A: Actor>(
314 &self,
315 _props: BoxActorProd<A>,
316 ) -> Result<ActorRef<A::Msg>, CreateError> {
317 let _name = format!("{}", rand::random::<u64>());
318
319 unimplemented!()
323 }
324
325 fn tmp_actor_of<A: ActorFactory>(&self) -> Result<ActorRef<<A as Actor>::Msg>, CreateError> {
326 let _name = format!("{}", rand::random::<u64>());
327
328 unimplemented!()
332 }
333
334 fn tmp_actor_of_args<A, Args>(
335 &self,
336 _args: Args,
337 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
338 where
339 Args: ActorArgs,
340 A: ActorFactoryArgs<Args>,
341 {
342 let _name = format!("{}", rand::random::<u64>());
343
344 unimplemented!()
348 }
349}
350
351#[derive(Clone)]
352pub struct ExtendedCell<Msg: Message> {
353 cell: ActorCell,
354 mailbox: MailboxSender<Msg>,
355}
356
357impl<Msg> ExtendedCell<Msg>
358where
359 Msg: Message,
360{
361 pub(crate) fn new(
362 uid: ActorId,
363 uri: ActorUri,
364 parent: Option<BasicActorRef>,
365 system: &ActorSystem,
366 any_mailbox: Arc<dyn AnySender>,
368 sys_mailbox: MailboxSender<SystemMsg>,
369 mailbox: MailboxSender<Msg>,
370 ) -> Self {
371 let cell = ActorCell {
372 inner: Arc::new(ActorCellInner {
373 uid,
374 uri,
375 parent,
376 children: Children::new(),
377 is_remote: false,
378 is_terminating: Arc::new(AtomicBool::new(false)),
379 is_restarting: Arc::new(AtomicBool::new(false)),
380 status: Arc::new(AtomicUsize::new(0)),
386 kernel: None,
387 system: system.clone(),
388 mailbox: any_mailbox,
389 sys_mailbox,
390 }),
391 };
392
393 Self { cell, mailbox }
394 }
395
396 pub(crate) fn init(self, kernel: &KernelRef) -> Self {
397 let cell = self.cell.init(kernel);
398
399 Self { cell, ..self }
400 }
401
402 pub fn myself(&self) -> ActorRef<Msg> {
403 self.cell.myself().typed(self.clone())
404 }
405
406 pub fn uri(&self) -> &ActorUri {
407 self.cell.uri()
408 }
409
410 pub fn parent(&self) -> BasicActorRef {
411 self.cell.parent()
412 }
413
414 pub fn has_children(&self) -> bool {
415 self.cell.has_children()
416 }
417
418 pub(crate) fn is_child(&self, actor: &BasicActorRef) -> bool {
419 self.cell.is_child(actor)
420 }
421
422 pub fn children<'a>(&'a self) -> Box<dyn Iterator<Item = BasicActorRef> + 'a> {
423 self.cell.children()
424 }
425
426 pub fn user_root(&self) -> BasicActorRef {
427 self.cell.user_root()
428 }
429
430 pub fn is_root(&self) -> bool {
431 self.cell.is_root()
432 }
433
434 pub fn is_user(&self) -> bool {
435 self.cell.is_user()
436 }
437
438 pub(crate) fn send_msg(&self, msg: Envelope<Msg>) -> MsgResult<Envelope<Msg>> {
439 let mb = &self.mailbox;
440 let k = self.cell.kernel();
441
442 dispatch(msg, mb, k, self.system()).map_err(|e| {
443 let dl = e.clone(); let dl = DeadLetter {
445 msg: format!("{:?}", dl.msg.msg),
446 sender: dl.msg.sender,
447 recipient: self.cell.myself(),
448 };
449
450 self.cell.inner.system.dead_letters().tell(
451 Publish {
452 topic: "dead_letter".into(),
453 msg: dl,
454 },
455 None,
456 );
457
458 e
459 })
460 }
461
462 pub(crate) fn send_sys_msg(&self, msg: Envelope<SystemMsg>) -> MsgResult<Envelope<SystemMsg>> {
463 self.cell.send_sys_msg(msg)
464 }
465
466 pub fn system(&self) -> &ActorSystem {
467 &self.cell.inner.system
468 }
469
470 pub(crate) fn handle_failure(&self, failed: &BasicActorRef, strategy: &Strategy) {
471 self.cell.handle_failure(failed, strategy)
472 }
473
474 pub(crate) fn receive_cmd<A: Actor>(&self, cmd: &SystemCmd, actor: &mut Option<A>) {
475 self.cell.receive_cmd(cmd, actor)
476 }
477
478 pub(crate) fn death_watch<A: Actor>(&self, terminated: &BasicActorRef, actor: &mut Option<A>) {
479 self.cell.death_watch(terminated, actor)
480 }
481}
482
483impl<Msg: Message> fmt::Debug for ExtendedCell<Msg> {
484 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485 write!(f, "ExtendedCell[{:?}]", self.uri())
486 }
487}
488
489fn post_stop<A: Actor>(actor: &mut Option<A>) {
490 if let Some(act) = actor.as_mut() {
494 act.post_stop();
495 }
496}
497
498#[derive(Debug)]
513pub struct Context<Msg: Message> {
514 pub myself: ActorRef<Msg>,
515 pub system: ActorSystem,
516 pub(crate) kernel: KernelRef,
518}
519
520impl<Msg> Context<Msg>
521where
522 Msg: Message,
523{
524 #[inline]
526 pub fn myself(&self) -> ActorRef<Msg> {
527 self.myself.clone()
528 }
529
530 #[inline]
532 pub fn name(&self) -> &str {
533 self.myself.name()
534 }
535}
536
537impl<Msg: Message> ActorRefFactory for Context<Msg> {
538 fn actor_of_props<A>(
539 &self,
540 props: BoxActorProd<A>,
541 name: &str,
542 ) -> Result<ActorRef<A::Msg>, CreateError>
543 where
544 A: Actor,
545 {
546 self.system
547 .provider
548 .create_actor(props, name, &self.myself().into(), &self.system)
549 }
550
551 fn actor_of<A>(&self, name: &str) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
552 where
553 A: ActorFactory,
554 {
555 self.system.provider.create_actor(
556 Props::new_no_args(A::create),
557 name,
558 &self.myself().into(),
559 &self.system,
560 )
561 }
562
563 fn actor_of_args<A, Args>(
564 &self,
565 name: &str,
566 args: Args,
567 ) -> Result<ActorRef<<A as Actor>::Msg>, CreateError>
568 where
569 Args: ActorArgs,
570 A: ActorFactoryArgs<Args>,
571 {
572 self.system.provider.create_actor(
573 Props::new_args(A::create_args, args),
574 name,
575 &self.myself().into(),
576 &self.system,
577 )
578 }
579
580 fn stop(&self, actor: impl ActorReference) {
581 actor.sys_tell(SystemCmd::Stop.into());
582 }
583}
584
585impl<Msg> ActorSelectionFactory for Context<Msg>
586where
587 Msg: Message,
588{
589 fn select(&self, path: &str) -> Result<ActorSelection, InvalidPath> {
590 let (anchor, path_str) = if path.starts_with('/') {
591 let anchor = self.system.user_root().clone();
592 let anchor_path = format!("{}/", anchor.path().deref().clone());
593 let path = path.to_string().replace(&anchor_path, "");
594
595 (anchor, path)
596 } else {
597 (self.myself.clone().into(), path.to_string())
598 };
599
600 ActorSelection::new(
601 anchor, path_str,
603 )
604 }
605}
606
607impl<Msg> Run for Context<Msg>
608where
609 Msg: Message,
610{
611 fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
612 where
613 Fut: Future + Send + 'static,
614 <Fut as Future>::Output: Send,
615 {
616 self.system.run(future)
617 }
618}
619
620impl<Msg> Timer for Context<Msg>
621where
622 Msg: Message,
623{
624 fn schedule<T, M>(
625 &self,
626 initial_delay: Duration,
627 interval: Duration,
628 receiver: ActorRef<M>,
629 sender: Sender,
630 msg: T,
631 ) -> Uuid
632 where
633 T: Message + Into<M>,
634 M: Message,
635 {
636 let id = Uuid::new_v4();
637 let msg: M = msg.into();
638
639 let job = RepeatJob {
640 id,
641 send_at: Instant::now() + initial_delay,
642 interval,
643 receiver: receiver.into(),
644 sender,
645 msg: AnyMessage::new(msg, false),
646 };
647
648 self.system.timer.send(Job::Repeat(job)).unwrap();
649 id
650 }
651
652 fn schedule_once<T, M>(
653 &self,
654 delay: Duration,
655 receiver: ActorRef<M>,
656 sender: Sender,
657 msg: T,
658 ) -> Uuid
659 where
660 T: Message + Into<M>,
661 M: Message,
662 {
663 let id = Uuid::new_v4();
664 let msg: M = msg.into();
665
666 let job = OnceJob {
667 id,
668 send_at: Instant::now() + delay,
669 receiver: receiver.into(),
670 sender,
671 msg: AnyMessage::new(msg, true),
672 };
673
674 self.system.timer.send(Job::Once(job)).unwrap();
675 id
676 }
677
678 fn schedule_at_time<T, M>(
679 &self,
680 time: DateTime<Utc>,
681 receiver: ActorRef<M>,
682 sender: Sender,
683 msg: T,
684 ) -> Uuid
685 where
686 T: Message + Into<M>,
687 M: Message,
688 {
689 let delay = std::cmp::max(time.timestamp() - Utc::now().timestamp(), 0 as i64);
690 #[allow(clippy::cast_sign_loss)]
691 let delay = Duration::from_secs(delay as u64);
692
693 let id = Uuid::new_v4();
694 let msg: M = msg.into();
695
696 let job = OnceJob {
697 id,
698 send_at: Instant::now() + delay,
699 receiver: receiver.into(),
700 sender,
701 msg: AnyMessage::new(msg, true),
702 };
703
704 self.system.timer.send(Job::Once(job)).unwrap();
705 id
706 }
707
708 fn cancel_schedule(&self, id: Uuid) {
709 let _ = self.system.timer.send(Job::Cancel(id));
710 }
711}
712
713#[derive(Clone)]
714pub struct Children {
715 actors: Arc<RwLock<HashMap<String, BasicActorRef>>>,
716}
717
718impl Children {
719 #[allow(clippy::missing_const_for_fn)]
720 pub fn new() -> Self {
721 Self {
722 actors: Arc::new(RwLock::new(HashMap::new())),
723 }
724 }
725
726 pub fn add(&self, actor: BasicActorRef) {
727 self.actors
728 .write()
729 .unwrap()
730 .insert(actor.name().to_string(), actor);
731 }
732
733 pub fn remove(&self, actor: &BasicActorRef) {
734 self.actors.write().unwrap().remove(actor.name());
735 }
736
737 pub fn len(&self) -> usize {
738 self.actors.read().unwrap().len()
739 }
740
741 pub const fn iter(&self) -> ChildrenIterator {
742 ChildrenIterator {
743 children: self,
744 position: 0,
745 }
746 }
747}
748
749#[derive(Clone)]
750pub struct ChildrenIterator<'a> {
751 children: &'a Children,
752 position: usize,
753}
754
755impl<'a> Iterator for ChildrenIterator<'a> {
756 type Item = BasicActorRef;
757
758 fn next(&mut self) -> Option<Self::Item> {
759 let actors = self.children.actors.read().unwrap();
760 let actor = actors.values().nth(self.position);
761 self.position += 1;
762 actor.cloned()
763 }
764}