1use hocon::Hocon;
2use std::{
3 cell::{RefCell, UnsafeCell},
4 fmt,
5 ops::DerefMut,
6 panic,
7 sync::{atomic::AtomicU64, Arc, Mutex, Weak},
8 time::Duration,
9};
10use uuid::Uuid;
11
12use super::*;
13use crate::{
14 actors::TypedMsgQueue,
15 net::buffers::EncodeBuffer,
16 supervision::*,
17 timer::timer_manager::{ExecuteAction, ScheduledTimer, Timer, TimerManager, TimerRefFactory},
18};
19use rustc_hash::FxHashMap;
20
21#[cfg(all(nightly, feature = "type_erasure"))]
22use crate::utils::erased::CreateErased;
23use owning_ref::{Erased, OwningRefMut};
24use std::any::Any;
25
26mod context;
27pub use context::*;
28mod actual_component;
29pub(crate) mod lifecycle;
30pub use actual_component::*;
31mod system_handle;
32pub use system_handle::*;
33mod definition;
34pub use definition::*;
35mod core;
36pub use self::core::*;
37mod future_task;
38pub use future_task::*;
39
40#[must_use = "The Handled value must be returned from a handle or receive function in order to take effect."]
42#[derive(Debug, Default)]
43pub enum Handled {
44 #[default]
46 Ok,
47 BlockOn(BlockingFuture),
50 DieNow,
52}
53impl Handled {
54 pub fn block_on<CD, F>(
101 component: &mut CD,
102 fun: impl FnOnce(ComponentDefinitionAccess<CD>) -> F,
103 ) -> Self
104 where
105 CD: ComponentDefinition + 'static,
106 F: std::future::Future + Send + 'static,
107 {
108 let blocking = future_task::blocking(component, fun);
109 Handled::BlockOn(blocking)
110 }
111
112 pub fn is_ok(&self) -> bool {
114 matches!(self, Handled::Ok)
115 }
116}
117
118pub trait ComponentTraits: ComponentDefinition + ActorRaw + Sized + 'static {}
121impl<CD> ComponentTraits for CD where CD: ComponentDefinition + ActorRaw + Sized + 'static {}
122
123pub trait CoreContainer: Send + Sync {
128 fn id(&self) -> Uuid;
130 fn core(&self) -> &ComponentCore;
132 fn execute(&self) -> SchedulingDecision;
134 fn system(&self) -> &KompactSystem {
136 self.core().system()
137 }
138 fn schedule(&self) -> ();
140 fn type_name(&self) -> &'static str;
142
143 fn dyn_message_queue(&self) -> &dyn DynMsgQueue;
146
147 fn enqueue_control(&self, event: ControlEvent) -> ();
152}
153
154impl fmt::Debug for dyn CoreContainer {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 write!(f, "CoreContainer({})", self.id())
157 }
158}
159
160pub trait UniqueRegistrable: DynActorRefFactory {
162 fn component_id(&self) -> Uuid;
164}
165
166pub trait MsgQueueContainer: CoreContainer {
169 type Message: MessageBounds;
171 fn message_queue(&self) -> &TypedMsgQueue<Self::Message>;
173 fn downgrade_dyn(self: Arc<Self>) -> Weak<dyn CoreContainer>;
176}
177
178pub(crate) struct FakeCoreContainer;
179impl CoreContainer for FakeCoreContainer {
180 fn id(&self) -> Uuid {
181 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
182 }
183
184 fn core(&self) -> &ComponentCore {
185 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
186 }
187
188 fn execute(&self) -> SchedulingDecision {
189 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
190 }
191
192 fn system(&self) -> &KompactSystem {
193 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
194 }
195
196 fn schedule(&self) -> () {
197 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
198 }
199
200 fn type_name(&self) -> &'static str {
201 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
202 }
203
204 fn dyn_message_queue(&self) -> &dyn DynMsgQueue {
205 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
206 }
207
208 fn enqueue_control(&self, _event: ControlEvent) -> () {
209 unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
210 }
211}
212
213pub(crate) struct ComponentMutableCore<C> {
214 pub(crate) definition: C,
215 skip: usize,
216}
217impl<C> ComponentMutableCore<C> {
218 fn from(definition: C) -> Self {
219 ComponentMutableCore {
220 definition,
221 skip: 0,
222 }
223 }
224}
225
226pub struct ExecuteResult {
228 blocking: bool,
229 count: usize,
230 skip: usize,
231}
232
233impl ExecuteResult {
234 pub fn new(blocking: bool, count: usize, skip: usize) -> ExecuteResult {
239 ExecuteResult {
240 blocking,
241 count,
242 skip,
243 }
244 }
245}
246
247pub trait ComponentLogging {
249 fn log(&self) -> &KompactLogger;
253}
254
255impl<CD> ComponentLogging for CD
256where
257 CD: ComponentTraits + ComponentLifecycle,
258{
259 fn log(&self) -> &KompactLogger {
260 self.ctx().log()
261 }
262}
263
264pub trait Provide<P: Port + 'static> {
268 fn handle(&mut self, event: P::Request) -> Handled;
275}
276
277pub trait Require<P: Port + 'static> {
281 fn handle(&mut self, event: P::Indication) -> Handled;
288}
289
290pub trait ProvideRef<P: Port + 'static> {
294 fn provided_ref(&mut self) -> ProvidedRef<P>;
296
297 fn connect_to_required(&mut self, req: RequiredRef<P>) -> ();
299
300 fn disconnect(&mut self, req: RequiredRef<P>) -> ();
302}
303
304pub trait RequireRef<P: Port + 'static> {
308 fn required_ref(&mut self) -> RequiredRef<P>;
310
311 fn connect_to_provided(&mut self, prov: ProvidedRef<P>) -> ();
313
314 fn disconnect(&mut self, prov: ProvidedRef<P>) -> ();
316}
317
318pub trait LockingProvideRef<P, C>
322where
323 P: Port + 'static,
324 C: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
325{
326 fn provided_ref(&self) -> ProvidedRef<P>;
328
329 fn connect_to_required(&self, req: RequiredRef<P>) -> ProviderChannel<P, C>;
331}
332
333pub trait LockingRequireRef<P, C>
337where
338 P: Port + 'static,
339 C: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
340{
341 fn required_ref(&self) -> RequiredRef<P>;
343
344 fn connect_to_provided(&self, prov: ProvidedRef<P>) -> RequirerChannel<P, C>;
346}
347
348impl<P, CD> LockingProvideRef<P, CD> for Arc<Component<CD>>
349where
350 P: Port + 'static,
351 CD: ComponentTraits + ComponentLifecycle + Provide<P> + ProvideRef<P>,
352{
353 fn provided_ref(&self) -> ProvidedRef<P> {
354 self.on_definition(|cd| ProvideRef::provided_ref(cd))
355 }
356
357 fn connect_to_required(&self, req: RequiredRef<P>) -> ProviderChannel<P, CD> {
358 self.on_definition(|cd| ProvideRef::connect_to_required(cd, req.clone()));
359 ProviderChannel::new(self, req)
360 }
361}
362
363impl<P, CD> LockingRequireRef<P, CD> for Arc<Component<CD>>
364where
365 P: Port + 'static,
366 CD: ComponentTraits + ComponentLifecycle + Require<P> + RequireRef<P>,
367{
368 fn required_ref(&self) -> RequiredRef<P> {
369 self.on_definition(|cd| RequireRef::required_ref(cd))
370 }
371
372 fn connect_to_provided(&self, prov: ProvidedRef<P>) -> RequirerChannel<P, CD> {
373 self.on_definition(|cd| RequireRef::connect_to_provided(cd, prov.clone()));
374 RequirerChannel::new(self, prov)
375 }
376}
377
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
380pub enum SchedulingDecision {
381 Schedule,
385 AlreadyScheduled,
387 NoWork,
389 Blocked,
391 Resume,
393}
394
395impl SchedulingDecision {
396 pub fn or_use(self, other: SchedulingDecision) -> SchedulingDecision {
403 match self {
404 SchedulingDecision::Schedule
405 | SchedulingDecision::AlreadyScheduled
406 | SchedulingDecision::Blocked
407 | SchedulingDecision::Resume => self,
408 SchedulingDecision::NoWork => match other {
409 SchedulingDecision::Schedule
410 | SchedulingDecision::Resume
411 | SchedulingDecision::AlreadyScheduled => SchedulingDecision::Resume,
412 x => x,
413 },
414 }
415 }
416
417 pub fn or_from(self, other: impl Fn() -> SchedulingDecision) -> SchedulingDecision {
424 match self {
425 SchedulingDecision::Schedule
426 | SchedulingDecision::AlreadyScheduled
427 | SchedulingDecision::Blocked
428 | SchedulingDecision::Resume => self,
429 SchedulingDecision::NoWork => match other() {
430 SchedulingDecision::Schedule
431 | SchedulingDecision::Resume
432 | SchedulingDecision::AlreadyScheduled => SchedulingDecision::Resume,
433 x => x,
434 },
435 }
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use crate::{component::AbstractComponent, prelude::*};
442 use futures::channel::oneshot;
443 use std::{sync::Arc, thread, time::Duration};
444
445 use std::ops::Deref;
446
447 const TIMEOUT: Duration = Duration::from_millis(3000);
448
449 #[derive(ComponentDefinition, Actor)]
450 struct TestComponent {
451 ctx: ComponentContext<TestComponent>,
452 }
453
454 impl TestComponent {
455 fn new() -> TestComponent {
456 TestComponent {
457 ctx: ComponentContext::uninitialised(),
458 }
459 }
460 }
461
462 ignore_lifecycle!(TestComponent);
463
464 #[test]
465 fn component_core_send() -> () {
466 let system = KompactConfig::default().build().expect("KompactSystem");
467 let cc = system.create(TestComponent::new);
468 let core = cc.core();
469 is_send(&core.id);
470 is_send(&core.system);
471 is_send(&core.state);
472 is_sync(&core.id);
474 is_sync(&core.system);
475 is_sync(&core.state);
476 }
478
479 fn is_send<T: Send>(_v: &T) -> () {
481 }
483
484 fn is_sync<T: Sync>(_v: &T) -> () {
486 }
488
489 #[derive(Debug, Copy, Clone)]
490 struct TestMessage;
491 impl Serialisable for TestMessage {
492 fn ser_id(&self) -> SerId {
493 Self::SER_ID
494 }
495
496 fn size_hint(&self) -> Option<usize> {
497 Some(0)
498 }
499
500 fn serialise(&self, _buf: &mut dyn BufMut) -> Result<(), SerError> {
501 Ok(())
502 }
503
504 fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
505 Ok(self)
506 }
507 }
508 impl Deserialiser<TestMessage> for TestMessage {
509 const SER_ID: SerId = 42;
511
512 fn deserialise(_buf: &mut dyn Buf) -> Result<TestMessage, SerError> {
513 Ok(TestMessage)
514 }
515 }
516
517 #[derive(ComponentDefinition)]
518 struct ChildComponent {
519 ctx: ComponentContext<Self>,
520 got_message: bool,
521 }
522 impl ChildComponent {
523 fn new() -> Self {
524 ChildComponent {
525 ctx: ComponentContext::uninitialised(),
526 got_message: false,
527 }
528 }
529 }
530 ignore_lifecycle!(ChildComponent);
531 impl NetworkActor for ChildComponent {
532 type Deserialiser = TestMessage;
533 type Message = TestMessage;
534
535 fn receive(&mut self, _sender: Option<ActorPath>, _msg: Self::Message) -> Handled {
536 info!(self.log(), "Child got message");
537 self.got_message = true;
538 Handled::Ok
539 }
540 }
541
542 #[derive(Debug)]
543 enum ParentMessage {
544 GetChild(KPromise<Arc<Component<ChildComponent>>>),
545 }
546
547 #[derive(ComponentDefinition)]
548 struct ParentComponent {
549 ctx: ComponentContext<Self>,
550 alias_opt: Option<String>,
551 child: Option<Arc<Component<ChildComponent>>>,
552 }
553 impl ParentComponent {
554 fn unique() -> Self {
555 ParentComponent {
556 ctx: ComponentContext::uninitialised(),
557 alias_opt: None,
558 child: None,
559 }
560 }
561
562 fn alias(s: String) -> Self {
563 ParentComponent {
564 ctx: ComponentContext::uninitialised(),
565 alias_opt: Some(s),
566 child: None,
567 }
568 }
569 }
570
571 impl ComponentLifecycle for ParentComponent {
572 fn on_start(&mut self) -> Handled {
573 let child = self.ctx.system().create(ChildComponent::new);
574 let f = match self.alias_opt.take() {
575 Some(s) => self.ctx.system().register_by_alias(&child, s),
576 None => self.ctx.system().register(&child),
577 };
578 self.child = Some(child);
579 Handled::block_on(self, move |async_self| async move {
581 let path = f.await.expect("actor path").expect("actor path");
582 info!(async_self.log(), "Child was registered");
583 if let Some(ref child) = async_self.child {
584 async_self.ctx.system().start(child);
585 path.tell(TestMessage, async_self.deref());
586 } else {
587 unreachable!();
588 }
589 })
590 }
591
592 fn on_stop(&mut self) -> Handled {
593 let _ = self.child.take(); Handled::Ok
595 }
596
597 fn on_kill(&mut self) -> Handled {
598 let _ = self.child.take(); Handled::Ok
600 }
601 }
602 impl Actor for ParentComponent {
603 type Message = ParentMessage;
604
605 fn receive_local(&mut self, msg: Self::Message) -> Handled {
606 match msg {
607 ParentMessage::GetChild(promise) => {
608 if let Some(ref child) = self.child {
609 promise.fulfil(child.clone()).expect("fulfilled");
610 } else {
611 drop(promise); }
613 }
614 }
615 Handled::Ok
616 }
617
618 fn receive_network(&mut self, _msg: NetMessage) -> Handled {
619 unimplemented!("Shouldn't be used");
620 }
621 }
622
623 #[test]
624 fn child_unique_registration_test() -> () {
625 let mut conf = KompactConfig::default();
626 conf.system_components(DeadletterBox::new, NetworkConfig::default().build());
627 let system = conf.build().expect("system");
628 let parent = system.create(ParentComponent::unique);
629 system.start(&parent);
630 thread::sleep(TIMEOUT);
631 let (p, f) = promise::<Arc<Component<ChildComponent>>>();
632 parent.actor_ref().tell(ParentMessage::GetChild(p));
633 let child = f.wait_timeout(TIMEOUT).expect("child");
634 let stop_f = system.stop_notify(&child);
635 system.stop(&parent);
636
637 stop_f.wait_timeout(TIMEOUT).expect("child didn't stop");
638 child.on_definition(|cd| {
639 assert!(cd.got_message, "child didn't get the message");
640 });
641 system.shutdown().expect("shutdown");
642 }
643
644 const TEST_ALIAS: &str = "test";
645
646 #[test]
647 fn child_alias_registration_test() -> () {
648 let mut conf = KompactConfig::default();
649 conf.system_components(DeadletterBox::new, NetworkConfig::default().build());
650 let system = conf.build().expect("system");
651 let parent = system.create(|| ParentComponent::alias(TEST_ALIAS.into()));
652 system.start(&parent);
653 thread::sleep(TIMEOUT);
654 let (p, f) = promise::<Arc<Component<ChildComponent>>>();
655 parent.actor_ref().tell(ParentMessage::GetChild(p));
656 let child = f.wait_timeout(TIMEOUT).expect("child");
657 let stop_f = system.stop_notify(&child);
658 system.stop(&parent);
659
660 stop_f.wait_timeout(TIMEOUT).expect("child didn't stop");
661 child.on_definition(|cd| {
662 assert!(cd.got_message, "child didn't get the message");
663 });
664 system.shutdown().expect("shutdown");
665 }
666
667 #[test]
668 fn test_dynamic_port_access() -> () {
669 struct A;
670 impl Port for A {
671 type Indication = u64;
672 type Request = String;
673 }
674 struct B;
675 impl Port for B {
676 type Indication = &'static str;
677 type Request = i8;
678 }
679
680 #[derive(ComponentDefinition, Actor)]
681 struct TestComp {
682 ctx: ComponentContext<Self>,
683 req_a: RequiredPort<A>,
684 prov_b: ProvidedPort<B>,
685 }
686
687 impl TestComp {
688 fn new() -> TestComp {
689 TestComp {
690 ctx: ComponentContext::uninitialised(),
691 req_a: RequiredPort::uninitialised(),
692 prov_b: ProvidedPort::uninitialised(),
693 }
694 }
695 }
696 ignore_lifecycle!(TestComp);
697 ignore_requests!(B, TestComp);
698 ignore_indications!(A, TestComp);
699
700 let system = KompactConfig::default().build().expect("System");
701 let comp = system.create(TestComp::new);
702 let dynamic: Arc<dyn AbstractComponent<Message = Never>> = comp;
703 dynamic.on_dyn_definition(|def| {
704 assert!(def.get_required_port::<A>().is_some());
705 assert!(def.get_provided_port::<A>().is_none());
706
707 assert!(def.get_required_port::<B>().is_none());
708 assert!(def.get_provided_port::<B>().is_some());
709 });
710
711 system.shutdown().expect("shutdown");
712 }
713
714 #[derive(Debug)]
715 enum BlockMe {
716 Now,
717 OnChannel(oneshot::Receiver<String>),
718 SpawnOff(String),
719 OnShutdown,
720 }
721
722 #[derive(ComponentDefinition)]
723 struct BlockingComponent {
724 ctx: ComponentContext<Self>,
725 test_string: String,
726 block_on_shutdown: bool,
727 }
728
729 impl BlockingComponent {
730 fn new() -> Self {
731 BlockingComponent {
732 ctx: ComponentContext::uninitialised(),
733 test_string: "started".to_string(),
734 block_on_shutdown: false,
735 }
736 }
737 }
738
739 impl ComponentLifecycle for BlockingComponent {
740 fn on_kill(&mut self) -> Handled {
741 if self.block_on_shutdown {
742 info!(self.log(), "Cleaning up before shutdown");
743 Handled::block_on(self, move |mut async_self| async move {
744 async_self.test_string = "done".to_string();
745 info!(async_self.log(), "Ran BlockMe::OnShutdown future");
746 })
747 } else {
748 Handled::Ok
749 }
750 }
751 }
752
753 impl Actor for BlockingComponent {
754 type Message = BlockMe;
755
756 fn receive_local(&mut self, msg: Self::Message) -> Handled {
757 match msg {
758 BlockMe::Now => {
759 info!(self.log(), "Got BlockMe::Now");
760 Handled::block_on(self, move |mut async_self| async move {
762 async_self.test_string = "done".to_string();
763 info!(async_self.log(), "Ran BlockMe::Now future");
764 })
765 }
766 BlockMe::OnChannel(receiver) => {
767 info!(self.log(), "Got BlockMe::OnChannel");
768 Handled::block_on(self, move |mut async_self| async move {
770 info!(async_self.log(), "Started BlockMe::OnChannel future");
771 let s = receiver.await;
772 async_self.test_string = s.expect("Some string");
773 info!(async_self.log(), "Completed BlockMe::OnChannel future");
774 })
775 }
776 BlockMe::SpawnOff(s) => {
777 let handle = self.spawn_off(async move { s });
778 Handled::block_on(self, move |mut async_self| async move {
780 let res = handle.await.expect("result");
781 async_self.test_string = res;
782 })
783 }
784 BlockMe::OnShutdown => {
785 self.block_on_shutdown = true;
786 Handled::Ok
787 }
788 }
789 }
790
791 fn receive_network(&mut self, _msg: NetMessage) -> Handled {
792 unimplemented!("No networking here!");
793 }
794 }
795
796 #[test]
797 fn test_immediate_blocking() {
798 let system = KompactConfig::default().build().expect("System");
799 let comp = system.create(BlockingComponent::new);
800 system
801 .start_notify(&comp)
802 .wait_timeout(TIMEOUT)
803 .expect("Component didn't start");
804 comp.actor_ref().tell(BlockMe::Now);
805 thread::sleep(TIMEOUT);
806 system
807 .kill_notify(comp.clone())
808 .wait_timeout(TIMEOUT)
809 .expect("Component didn't die");
810 comp.on_definition(|cd| {
811 assert_eq!(cd.test_string, "done");
812 });
813 system.shutdown().expect("shutdown");
814 }
815
816 #[test]
817 fn test_channel_blocking() {
818 let system = KompactConfig::default().build().expect("System");
819 let comp = system.create(BlockingComponent::new);
820 system
821 .start_notify(&comp)
822 .wait_timeout(TIMEOUT)
823 .expect("Component didn't start");
824
825 let (sender, receiver) = oneshot::channel();
826 comp.actor_ref().tell(BlockMe::OnChannel(receiver));
827 thread::sleep(TIMEOUT);
828 sender.send("gotcha".to_string()).expect("Should have sent");
829 thread::sleep(TIMEOUT);
830 system
831 .kill_notify(comp.clone())
832 .wait_timeout(TIMEOUT)
833 .expect("Component didn't die");
834 comp.on_definition(|cd| {
835 assert_eq!(cd.test_string, "gotcha");
836 });
837 system.shutdown().expect("shutdown");
838 }
839
840 #[test]
841 fn test_mixed_blocking() {
842 let system = KompactConfig::default().build().expect("System");
843 let comp = system.create(BlockingComponent::new);
844 system
845 .start_notify(&comp)
846 .wait_timeout(TIMEOUT)
847 .expect("Component didn't start");
848
849 let (sender, receiver) = oneshot::channel();
850 comp.actor_ref().tell(BlockMe::OnChannel(receiver));
851 thread::sleep(TIMEOUT);
852 comp.actor_ref().tell(BlockMe::Now);
853 sender.send("gotcha".to_string()).expect("Should have sent");
854 thread::sleep(TIMEOUT);
855 system
856 .kill_notify(comp.clone())
857 .wait_timeout(TIMEOUT)
858 .expect("Component didn't die");
859 comp.on_definition(|cd| {
860 assert_eq!(cd.test_string, "done");
861 });
862 system.shutdown().expect("shutdown");
863 }
864
865 #[test]
866 fn test_shutdown_blocking() {
867 let system = KompactConfig::default().build().expect("System");
868 let comp = system.create(BlockingComponent::new);
869 system
870 .start_notify(&comp)
871 .wait_timeout(TIMEOUT)
872 .expect("Component didn't start");
873 comp.actor_ref().tell(BlockMe::OnShutdown);
874 thread::sleep(TIMEOUT);
875 system
876 .kill_notify(comp.clone())
877 .wait_timeout(TIMEOUT)
878 .expect("Component didn't die");
879 comp.on_definition(|cd| {
880 assert_eq!(cd.test_string, "done");
881 });
882 system.shutdown().expect("shutdown");
883 }
884
885 #[test]
886 fn test_component_spawn_off() -> () {
887 let system = KompactConfig::default().build().expect("System");
888 let comp = system.create(BlockingComponent::new);
889 system
890 .start_notify(&comp)
891 .wait_timeout(TIMEOUT)
892 .expect("Component didn't start");
893 comp.actor_ref()
894 .tell(BlockMe::SpawnOff("gotcha".to_string()));
895 thread::sleep(TIMEOUT);
896 system
897 .kill_notify(comp.clone())
898 .wait_timeout(TIMEOUT)
899 .expect("Component didn't die");
900 comp.on_definition(|cd| {
901 assert_eq!(cd.test_string, "gotcha");
902 });
903 system.shutdown().expect("shutdown");
904 }
905
906 #[derive(Debug)]
907 enum AsyncMe {
908 Now,
909 OnChannel(oneshot::Receiver<String>),
910 ConcurrentMessage(oneshot::Receiver<String>),
911 JustAMessage(String),
912 }
913
914 #[derive(ComponentDefinition)]
915 struct AsyncComponent {
916 ctx: ComponentContext<Self>,
917 test_string: String,
918 }
919
920 impl AsyncComponent {
921 fn new() -> Self {
922 AsyncComponent {
923 ctx: ComponentContext::uninitialised(),
924 test_string: "started".to_string(),
925 }
926 }
927 }
928
929 ignore_lifecycle!(AsyncComponent);
930
931 impl Actor for AsyncComponent {
932 type Message = AsyncMe;
933
934 fn receive_local(&mut self, msg: Self::Message) -> Handled {
935 match msg {
936 AsyncMe::Now => {
937 info!(self.log(), "Got AsyncMe::Now");
938 self.spawn_local(move |mut async_self| async move {
940 async_self.test_string = "done".to_string();
941 info!(async_self.log(), "Ran AsyncMe::Now future");
942 Handled::Ok
943 });
944 Handled::Ok
945 }
946 AsyncMe::OnChannel(receiver) => {
947 info!(self.log(), "Got AsyncMe::OnChannel");
948 self.spawn_local(move |mut async_self| async move {
950 info!(async_self.log(), "Started AsyncMe::OnChannel future");
951 let s = receiver.await;
952 async_self.test_string = s.expect("Some string");
953 info!(async_self.log(), "Completed Async::OnChannel future");
954 Handled::Ok
955 });
956 Handled::Ok
957 }
958 AsyncMe::ConcurrentMessage(receiver) => {
959 info!(self.log(), "Got AsyncMe::OnChannel");
960 self.spawn_local(move |mut async_self| async move {
962 info!(async_self.log(), "Started AsyncMe::ConcurrentMessag future");
963 let s = receiver.await.expect("Some string");
964 info!(
965 async_self.log(),
966 "Got message {} as state={}", s, async_self.test_string
967 );
968 assert_eq!(
969 s, async_self.test_string,
970 "Message was not processed before future!"
971 );
972 async_self.test_string = "done".to_string();
973 info!(
974 async_self.log(),
975 "Completed AsyncMe::ConcurrentMessage future with state={}",
976 async_self.test_string
977 );
978 Handled::Ok
979 });
980 Handled::Ok
981 }
982 AsyncMe::JustAMessage(s) => {
983 info!(self.log(), "Got AsyncMe::JustAMessage({})", s);
984 self.test_string = s;
985 Handled::Ok
986 }
987 }
988 }
989
990 fn receive_network(&mut self, _msg: NetMessage) -> Handled {
991 unimplemented!("No networking here!");
992 }
993 }
994
995 #[test]
996 fn test_immediate_non_blocking() {
997 let system = KompactConfig::default().build().expect("System");
998 let comp = system.create(AsyncComponent::new);
999 system
1000 .start_notify(&comp)
1001 .wait_timeout(TIMEOUT)
1002 .expect("Component didn't start");
1003 comp.actor_ref().tell(AsyncMe::Now);
1004 thread::sleep(TIMEOUT);
1005 system
1006 .kill_notify(comp.clone())
1007 .wait_timeout(TIMEOUT)
1008 .expect("Component didn't die");
1009 comp.on_definition(|cd| {
1010 assert_eq!(cd.test_string, "done");
1011 });
1012 system.shutdown().expect("shutdown");
1013 }
1014
1015 #[test]
1016 fn test_channel_non_blocking() {
1017 let system = KompactConfig::default().build().expect("System");
1018 let comp = system.create(AsyncComponent::new);
1019 system
1020 .start_notify(&comp)
1021 .wait_timeout(TIMEOUT)
1022 .expect("Component didn't start");
1023
1024 let (sender, receiver) = oneshot::channel();
1025 comp.actor_ref().tell(AsyncMe::OnChannel(receiver));
1026 thread::sleep(TIMEOUT);
1027 sender.send("gotcha".to_string()).expect("Should have sent");
1028 thread::sleep(TIMEOUT);
1029 system
1030 .kill_notify(comp.clone())
1031 .wait_timeout(TIMEOUT)
1032 .expect("Component didn't die");
1033 comp.on_definition(|cd| {
1034 assert_eq!(cd.test_string, "gotcha");
1035 });
1036 system.shutdown().expect("shutdown");
1037 }
1038
1039 #[test]
1040 fn test_concurrent_non_blocking() {
1041 let system = KompactConfig::default().build().expect("System");
1042 let comp = system.create(AsyncComponent::new);
1043 system
1044 .start_notify(&comp)
1045 .wait_timeout(TIMEOUT)
1046 .expect("Component didn't start");
1047
1048 let (sender, receiver) = oneshot::channel();
1049 comp.actor_ref().tell(AsyncMe::ConcurrentMessage(receiver));
1050 thread::sleep(TIMEOUT);
1051 let msg = "gotcha";
1052 comp.actor_ref()
1053 .tell(AsyncMe::JustAMessage(msg.to_string()));
1054 thread::sleep(TIMEOUT);
1055 sender.send(msg.to_string()).expect("Should have sent");
1056 thread::sleep(TIMEOUT);
1057 system
1058 .kill_notify(comp.clone())
1059 .wait_timeout(TIMEOUT)
1060 .expect("Component didn't die");
1061 comp.on_definition(|cd| {
1062 assert_eq!(cd.test_string, "done");
1063 });
1064 system.shutdown().expect("shutdown");
1065 }
1066
1067 #[derive(Debug, Clone, Copy)]
1068 struct CountMe;
1069 #[derive(Debug, Clone, Copy)]
1070 struct Counted;
1071 #[derive(Debug, Clone, Copy)]
1072 struct SendCount;
1073
1074 struct CounterPort;
1075 impl Port for CounterPort {
1076 type Indication = CountMe;
1077 type Request = Counted;
1078 }
1079
1080 #[derive(ComponentDefinition)]
1081 struct CountSender {
1082 ctx: ComponentContext<Self>,
1083 count_port: ProvidedPort<CounterPort>,
1084 counted: usize,
1085 }
1086 impl Default for CountSender {
1087 fn default() -> Self {
1088 CountSender {
1089 ctx: ComponentContext::uninitialised(),
1090 count_port: ProvidedPort::uninitialised(),
1091 counted: 0,
1092 }
1093 }
1094 }
1095 ignore_lifecycle!(CountSender);
1096 impl Provide<CounterPort> for CountSender {
1097 fn handle(&mut self, _event: Counted) -> Handled {
1098 self.counted += 1;
1099 Handled::Ok
1100 }
1101 }
1102 impl Actor for CountSender {
1103 type Message = SendCount;
1104
1105 fn receive_local(&mut self, _msg: Self::Message) -> Handled {
1106 self.count_port.trigger(CountMe);
1107 Handled::Ok
1108 }
1109
1110 fn receive_network(&mut self, _msg: NetMessage) -> Handled {
1111 unimplemented!("No networking in this test");
1112 }
1113 }
1114
1115 #[derive(ComponentDefinition)]
1116 struct Counter {
1117 ctx: ComponentContext<Self>,
1118 count_port: RequiredPort<CounterPort>,
1119 count: usize,
1120 }
1121 impl Default for Counter {
1122 fn default() -> Self {
1123 Counter {
1124 ctx: ComponentContext::uninitialised(),
1125 count_port: RequiredPort::uninitialised(),
1126 count: 0,
1127 }
1128 }
1129 }
1130 ignore_lifecycle!(Counter);
1131 impl Require<CounterPort> for Counter {
1132 fn handle(&mut self, _event: CountMe) -> Handled {
1133 self.count += 1;
1134 self.count_port.trigger(Counted);
1135 Handled::Ok
1136 }
1137 }
1138 impl Actor for Counter {
1139 type Message = Never;
1140
1141 fn receive_local(&mut self, _msg: Self::Message) -> Handled {
1142 unreachable!("Never type is empty")
1143 }
1144
1145 fn receive_network(&mut self, _msg: NetMessage) -> Handled {
1146 unimplemented!("No networking in this test");
1147 }
1148 }
1149
1150 #[test]
1151 fn test_channel_disconnection() {
1152 let system = KompactConfig::default().build().expect("System");
1153 let sender = system.create(CountSender::default);
1154
1155 let counter1 = system.create(Counter::default);
1156 let counter2 = system.create(Counter::default);
1157
1158 let channel1: Box<(dyn Channel + Send + 'static)> =
1159 biconnect_components::<CounterPort, _, _>(&sender, &counter1)
1160 .expect("connection")
1161 .boxed();
1162 let channel2 =
1163 biconnect_components::<CounterPort, _, _>(&sender, &counter2).expect("connection");
1164
1165 let start_all = || {
1166 let sender_start_f = system.start_notify(&sender);
1167 let counter1_start_f = system.start_notify(&counter1);
1168 let counter2_start_f = system.start_notify(&counter2);
1169
1170 sender_start_f
1171 .wait_timeout(TIMEOUT)
1172 .expect("sender started");
1173 counter1_start_f
1174 .wait_timeout(TIMEOUT)
1175 .expect("counter1 started");
1176 counter2_start_f
1177 .wait_timeout(TIMEOUT)
1178 .expect("counter2 started");
1179 };
1180 start_all();
1181
1182 sender.actor_ref().tell(SendCount);
1183
1184 thread::sleep(TIMEOUT);
1185
1186 let stop_all = || {
1187 let sender_stop_f = system.stop_notify(&sender);
1188 let counter1_stop_f = system.stop_notify(&counter1);
1189 let counter2_stop_f = system.stop_notify(&counter2);
1190
1191 sender_stop_f.wait_timeout(TIMEOUT).expect("sender stopped");
1192 counter1_stop_f
1193 .wait_timeout(TIMEOUT)
1194 .expect("counter1 stopped");
1195 counter2_stop_f
1196 .wait_timeout(TIMEOUT)
1197 .expect("counter2 stopped");
1198 };
1199 stop_all();
1200
1201 let check_counts = |sender_expected, counter1_expected, counter2_expected| {
1202 let sender_count = sender.on_definition(|cd| cd.counted);
1203 assert_eq!(sender_expected, sender_count);
1204 let counter1_count = counter1.on_definition(|cd| cd.count);
1205 assert_eq!(counter1_expected, counter1_count);
1206 let counter2_count = counter2.on_definition(|cd| cd.count);
1207 assert_eq!(counter2_expected, counter2_count);
1208 };
1209 check_counts(2, 1, 1);
1210
1211 channel2.disconnect().expect("disconnect");
1212
1213 start_all();
1214
1215 sender.actor_ref().tell(SendCount);
1216
1217 thread::sleep(TIMEOUT);
1218
1219 stop_all();
1220
1221 check_counts(3, 2, 1);
1222
1223 channel1.disconnect().expect("disconnect");
1224
1225 start_all();
1226
1227 sender.actor_ref().tell(SendCount);
1228
1229 thread::sleep(TIMEOUT);
1230
1231 stop_all();
1232
1233 check_counts(3, 2, 1);
1234
1235 let sender_port: ProvidedRef<CounterPort> = sender.provided_ref();
1236 let counter1_port: RequiredRef<CounterPort> = counter1.required_ref();
1237 let channel1: Box<(dyn Channel + Send + 'static)> =
1238 sender.connect_to_required(counter1_port).boxed();
1239 let channel2 = counter2.connect_to_provided(sender_port);
1240
1241 start_all();
1242
1243 sender.actor_ref().tell(SendCount);
1244
1245 thread::sleep(TIMEOUT);
1246
1247 stop_all();
1248
1249 check_counts(3, 3, 1);
1250
1251 let counter2_port: RequiredRef<CounterPort> = counter2.required_ref();
1252 let channel3 = sender.connect_to_required(counter2_port);
1253
1254 start_all();
1255
1256 sender.actor_ref().tell(SendCount);
1257
1258 thread::sleep(TIMEOUT);
1259
1260 stop_all();
1261
1262 check_counts(4, 4, 2);
1263
1264 channel1.disconnect().expect("disconnected");
1265 channel2.disconnect().expect("disconnected");
1266 channel3.disconnect().expect("disconnected");
1267
1268 start_all();
1269
1270 sender.actor_ref().tell(SendCount);
1271
1272 thread::sleep(TIMEOUT);
1273
1274 stop_all();
1275
1276 check_counts(4, 4, 2);
1277
1278 system.shutdown().expect("shutdown");
1279 }
1280}