kompact/component/
mod.rs

1use hocon::Hocon;
2use std::{
3    cell::{RefCell, UnsafeCell},
4    fmt,
5    ops::DerefMut,
6    panic,
7    sync::{atomic::AtomicU64, Arc, Mutex, Weak},
8    time::Duration,
9};
10use uuid::Uuid;
11
12use super::*;
13use crate::{
14    actors::TypedMsgQueue,
15    net::buffers::EncodeBuffer,
16    supervision::*,
17    timer::timer_manager::{ExecuteAction, ScheduledTimer, Timer, TimerManager, TimerRefFactory},
18};
19use rustc_hash::FxHashMap;
20
21#[cfg(all(nightly, feature = "type_erasure"))]
22use crate::utils::erased::CreateErased;
23use owning_ref::{Erased, OwningRefMut};
24use std::any::Any;
25
26mod context;
27pub use context::*;
28mod actual_component;
29pub(crate) mod lifecycle;
30pub use actual_component::*;
31mod system_handle;
32pub use system_handle::*;
33mod definition;
34pub use definition::*;
35mod core;
36pub use self::core::*;
37mod future_task;
38pub use future_task::*;
39
40/// State transition indication at the end of a message or event handler
41#[must_use = "The Handled value must be returned from a handle or receive function in order to take effect."]
42#[derive(Debug, Default)]
43pub enum Handled {
44    /// Continue as normal
45    #[default]
46    Ok,
47    /// Immediately suspend processing of any messages and events
48    /// until the `BlockingFuture` has completed
49    BlockOn(BlockingFuture),
50    /// Kill the component without handling any further messages
51    DieNow,
52}
53impl Handled {
54    /// Constructs a state transition instruction which causes
55    /// the component to suspend processing of any messages and events
56    /// until the async `fun` (the returned [Future](std::future::Future)) has completed.
57    ///
58    /// Mutable access to the component's internal state is provided via
59    /// the [ComponentDefinitionAccess](ComponentDefinitionAccess) guard object.
60    ///
61    /// Please see the documentation for [ComponentDefinitionAccess](ComponentDefinitionAccess)
62    /// for details on how the internal state may (and may not) be used.
63    ///
64    /// # Example
65    ///
66    /// ```
67    /// # use kompact::prelude::*;
68    ///
69    /// #[derive(ComponentDefinition, Actor)]
70    /// struct AsyncComponent {
71    ///    ctx: ComponentContext<Self>,
72    ///    flag: bool,
73    /// }
74    /// impl AsyncComponent {
75    ///     fn new() -> Self {
76    ///         AsyncComponent {
77    ///             ctx: ComponentContext::uninitialised(),
78    ///             flag: false,    
79    ///         }
80    ///     }   
81    /// }
82    /// impl ComponentLifecycle for AsyncComponent {
83    ///     fn on_start(&mut self) -> Handled {
84    ///         // on nightly you can just write: async move |mut async_self| {...}
85    ///         Handled::block_on(self, move |mut async_self| async move {
86    ///             async_self.flag = true;
87    ///             Handled::Ok
88    ///         })
89    ///     }   
90    /// }
91    /// ```
92    ///
93    /// # See Also
94    ///
95    /// In order to continue processing messages and events in parallel to completing the future
96    /// use [spawn_local](ComponentDefinition::spawn_local).
97    ///
98    /// In order to run a large future which does not need access to component's internal state
99    /// at all or until the very end, consider using [spawn_off](ComponentDefinition::spawn_off).
100    pub fn block_on<CD, F>(
101        component: &mut CD,
102        fun: impl FnOnce(ComponentDefinitionAccess<CD>) -> F,
103    ) -> Self
104    where
105        CD: ComponentDefinition + 'static,
106        F: std::future::Future + Send + 'static,
107    {
108        let blocking = future_task::blocking(component, fun);
109        Handled::BlockOn(blocking)
110    }
111
112    /// Returns true if this instance is an [Handled::Ok](Handled::Ok) variant
113    pub fn is_ok(&self) -> bool {
114        matches!(self, Handled::Ok)
115    }
116}
117
118/// A trait bound alias for the trait required by the
119/// generic parameter of a [Component](Component)
120pub trait ComponentTraits: ComponentDefinition + ActorRaw + Sized + 'static {}
121impl<CD> ComponentTraits for CD where CD: ComponentDefinition + ActorRaw + Sized + 'static {}
122
123/// A trait for abstracting over structures that contain a component core
124///
125/// Used for implementing scheduling and execution logic,
126/// such as [Scheduler](runtime::Scheduler).
127pub trait CoreContainer: Send + Sync {
128    /// Returns the component's unique id
129    fn id(&self) -> Uuid;
130    /// Returns a reference to the actual component core
131    fn core(&self) -> &ComponentCore;
132    /// Executes this component on the current thread
133    fn execute(&self) -> SchedulingDecision;
134    /// Returns this component's system
135    fn system(&self) -> &KompactSystem {
136        self.core().system()
137    }
138    /// Schedules this component on its associated scheduler
139    fn schedule(&self) -> ();
140    /// The descriptive string of the [ComponentDefinition](ComponentDefinition) type wrapped in this container
141    fn type_name(&self) -> &'static str;
142
143    /// Returns the underlying message queue of this component
144    /// without the type information
145    fn dyn_message_queue(&self) -> &dyn DynMsgQueue;
146
147    /// Enqueue an event on the component's control queue
148    ///
149    /// Not usually something you need to manually,
150    /// unless you nned custom supervisor behaviours, for example.
151    fn enqueue_control(&self, event: ControlEvent) -> ();
152}
153
154impl fmt::Debug for dyn CoreContainer {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        write!(f, "CoreContainer({})", self.id())
157    }
158}
159
160/// A trait for component views that can be used for unique actor registration
161pub trait UniqueRegistrable: DynActorRefFactory {
162    /// Returns the unique id of a component
163    fn component_id(&self) -> Uuid;
164}
165
166/// Anything with this trait can be turned into an [ActorRef](ActorRef)
167/// as long as its behind an Arc or Weak
168pub trait MsgQueueContainer: CoreContainer {
169    /// The message type of the queue
170    type Message: MessageBounds;
171    /// The actual underlying queue
172    fn message_queue(&self) -> &TypedMsgQueue<Self::Message>;
173    /// A weak reference to the component that must be scheduled
174    /// when something is enqueued
175    fn downgrade_dyn(self: Arc<Self>) -> Weak<dyn CoreContainer>;
176}
177
178pub(crate) struct FakeCoreContainer;
179impl CoreContainer for FakeCoreContainer {
180    fn id(&self) -> Uuid {
181        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
182    }
183
184    fn core(&self) -> &ComponentCore {
185        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
186    }
187
188    fn execute(&self) -> SchedulingDecision {
189        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
190    }
191
192    fn system(&self) -> &KompactSystem {
193        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
194    }
195
196    fn schedule(&self) -> () {
197        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
198    }
199
200    fn type_name(&self) -> &'static str {
201        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
202    }
203
204    fn dyn_message_queue(&self) -> &dyn DynMsgQueue {
205        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
206    }
207
208    fn enqueue_control(&self, _event: ControlEvent) -> () {
209        unreachable!("FakeCoreContainer should only be used as a Sized type for `Weak::new()`!");
210    }
211}
212
213pub(crate) struct ComponentMutableCore<C> {
214    pub(crate) definition: C,
215    skip: usize,
216}
217impl<C> ComponentMutableCore<C> {
218    fn from(definition: C) -> Self {
219        ComponentMutableCore {
220            definition,
221            skip: 0,
222        }
223    }
224}
225
226/// Statistics about the last invocation of [execute](ComponentDefinition::execute).
227pub struct ExecuteResult {
228    blocking: bool,
229    count: usize,
230    skip: usize,
231}
232
233impl ExecuteResult {
234    /// Create a new execute result
235    ///
236    /// `count` gives the total number of events handled during the invocation
237    /// `skip` gives the offset from where queues will be checked during the next invocation (used for fairness)
238    pub fn new(blocking: bool, count: usize, skip: usize) -> ExecuteResult {
239        ExecuteResult {
240            blocking,
241            count,
242            skip,
243        }
244    }
245}
246
247/// An abstraction over providers of Kompact loggers
248pub trait ComponentLogging {
249    /// Returns a reference to the component's logger instance
250    ///
251    /// See [log](ComponentContext::log) for more details.
252    fn log(&self) -> &KompactLogger;
253}
254
255impl<CD> ComponentLogging for CD
256where
257    CD: ComponentTraits + ComponentLifecycle,
258{
259    fn log(&self) -> &KompactLogger {
260        self.ctx().log()
261    }
262}
263
264/// A trait implementing handling of provided events of `P`
265///
266/// This is equivalent to a Kompics *Handler* subscribed on a provided port of type `P`.
267pub trait Provide<P: Port + 'static> {
268    /// Handle the port's `event`
269    ///
270    /// # Note
271    ///
272    /// Remember that components usually run on a shared thread pool,
273    /// so you shouldn't ever block in this method unless you know what you are doing.
274    fn handle(&mut self, event: P::Request) -> Handled;
275}
276
277/// A trait implementing handling of required events of `P`
278///
279/// This is equivalent to a Kompics *Handler* subscribed on a required port of type `P`.
280pub trait Require<P: Port + 'static> {
281    /// Handle the port's `event`
282    ///
283    /// # Note
284    ///
285    /// Remember that components usually run on a shared thread pool,
286    /// so you shouldn't ever block in this method unless you know what you are doing.
287    fn handle(&mut self, event: P::Indication) -> Handled;
288}
289
290/// A convenience abstraction over concrete port instance fields
291///
292/// This trait is usually automatically derived when using `#[derive(ComponentDefinition)]`.
293pub trait ProvideRef<P: Port + 'static> {
294    /// Returns a provided reference to this component's port instance of type `P`
295    fn provided_ref(&mut self) -> ProvidedRef<P>;
296
297    /// Connects this component's provided port instance of type `P` to `req`
298    fn connect_to_required(&mut self, req: RequiredRef<P>) -> ();
299
300    /// Disconnects this component's provided port instance of type `P` from `req`
301    fn disconnect(&mut self, req: RequiredRef<P>) -> ();
302}
303
304/// A convenience abstraction over concrete port instance fields
305///
306/// This trait is usually automatically derived when using `#[derive(ComponentDefinition)]`.
307pub trait RequireRef<P: Port + 'static> {
308    /// Returns a required reference to this component's port instance of type `P`
309    fn required_ref(&mut self) -> RequiredRef<P>;
310
311    /// Connects this component's required port instance of type `P` to `prov`
312    fn connect_to_provided(&mut self, prov: ProvidedRef<P>) -> ();
313
314    /// Disconnects this component's required port instance of type `P` from `prov`
315    fn disconnect(&mut self, prov: ProvidedRef<P>) -> ();
316}
317
318/// Same as [ProvideRef](ProvideRef), but for instances that must be locked first
319///
320/// This is used, for example, with an `Arc<Component<_>>`.
321pub trait LockingProvideRef<P, C>
322where
323    P: Port + 'static,
324    C: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
325{
326    /// Returns a required reference to this component's port instance of type `P`
327    fn provided_ref(&self) -> ProvidedRef<P>;
328
329    /// Connects this component's required port instance of type `P` to `prov`
330    fn connect_to_required(&self, req: RequiredRef<P>) -> ProviderChannel<P, C>;
331}
332
333/// Same as [RequireRef](RequireRef), but for instances that must be locked first
334///
335/// This is used, for example, with an `Arc<Component<_>>`.
336pub trait LockingRequireRef<P, C>
337where
338    P: Port + 'static,
339    C: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
340{
341    /// Returns a required reference to this component's port instance of type `P`
342    fn required_ref(&self) -> RequiredRef<P>;
343
344    /// Connects this component's required port instance of type `P` to `prov`
345    fn connect_to_provided(&self, prov: ProvidedRef<P>) -> RequirerChannel<P, C>;
346}
347
348impl<P, CD> LockingProvideRef<P, CD> for Arc<Component<CD>>
349where
350    P: Port + 'static,
351    CD: ComponentTraits + ComponentLifecycle + Provide<P> + ProvideRef<P>,
352{
353    fn provided_ref(&self) -> ProvidedRef<P> {
354        self.on_definition(|cd| ProvideRef::provided_ref(cd))
355    }
356
357    fn connect_to_required(&self, req: RequiredRef<P>) -> ProviderChannel<P, CD> {
358        self.on_definition(|cd| ProvideRef::connect_to_required(cd, req.clone()));
359        ProviderChannel::new(self, req)
360    }
361}
362
363impl<P, CD> LockingRequireRef<P, CD> for Arc<Component<CD>>
364where
365    P: Port + 'static,
366    CD: ComponentTraits + ComponentLifecycle + Require<P> + RequireRef<P>,
367{
368    fn required_ref(&self) -> RequiredRef<P> {
369        self.on_definition(|cd| RequireRef::required_ref(cd))
370    }
371
372    fn connect_to_provided(&self, prov: ProvidedRef<P>) -> RequirerChannel<P, CD> {
373        self.on_definition(|cd| RequireRef::connect_to_provided(cd, prov.clone()));
374        RequirerChannel::new(self, prov)
375    }
376}
377
378/// Indicates whether or not a component should be sent to the [Scheduler](runtime::Scheduler)
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
380pub enum SchedulingDecision {
381    /// Sent the component to the [Scheduler](runtime::Scheduler)
382    ///
383    /// That is, call [schedule](CoreContainer::schedule).
384    Schedule,
385    /// Don't schedule the component, because it is already scheduled
386    AlreadyScheduled,
387    /// Don't schedule the component, because it has nothing to do
388    NoWork,
389    /// Don't schedule the component, because it is blocked
390    Blocked,
391    /// Immediately execute the component again, as it just came out of blocking
392    Resume,
393}
394
395impl SchedulingDecision {
396    /// Use the current `SchedulingDecision` if it makes a strong decision
397    /// or the one returned by given function if the current one is `NoWork`.
398    ///
399    /// In particular this is used to come out of blocking, where the combination
400    /// of `NoWork` and anything that that indicates work (e.g., `AlreadyScheduled`)
401    /// actually indicates that we want the component to `Resume` immediately.
402    pub fn or_use(self, other: SchedulingDecision) -> SchedulingDecision {
403        match self {
404            SchedulingDecision::Schedule
405            | SchedulingDecision::AlreadyScheduled
406            | SchedulingDecision::Blocked
407            | SchedulingDecision::Resume => self,
408            SchedulingDecision::NoWork => match other {
409                SchedulingDecision::Schedule
410                | SchedulingDecision::Resume
411                | SchedulingDecision::AlreadyScheduled => SchedulingDecision::Resume,
412                x => x,
413            },
414        }
415    }
416
417    /// Use the current `SchedulingDecision` if it makes a strong decision
418    /// or the one returned by given function if the current one is `NoWork`.
419    ///
420    /// In particular this is used to come out of blocking, where the combination
421    /// of `NoWork` and anything that that indicates work (e.g., `AlreadyScheduled`)
422    /// actually indicates that we want the component to `Resume` immediately.
423    pub fn or_from(self, other: impl Fn() -> SchedulingDecision) -> SchedulingDecision {
424        match self {
425            SchedulingDecision::Schedule
426            | SchedulingDecision::AlreadyScheduled
427            | SchedulingDecision::Blocked
428            | SchedulingDecision::Resume => self,
429            SchedulingDecision::NoWork => match other() {
430                SchedulingDecision::Schedule
431                | SchedulingDecision::Resume
432                | SchedulingDecision::AlreadyScheduled => SchedulingDecision::Resume,
433                x => x,
434            },
435        }
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use crate::{component::AbstractComponent, prelude::*};
442    use futures::channel::oneshot;
443    use std::{sync::Arc, thread, time::Duration};
444
445    use std::ops::Deref;
446
447    const TIMEOUT: Duration = Duration::from_millis(3000);
448
449    #[derive(ComponentDefinition, Actor)]
450    struct TestComponent {
451        ctx: ComponentContext<TestComponent>,
452    }
453
454    impl TestComponent {
455        fn new() -> TestComponent {
456            TestComponent {
457                ctx: ComponentContext::uninitialised(),
458            }
459        }
460    }
461
462    ignore_lifecycle!(TestComponent);
463
464    #[test]
465    fn component_core_send() -> () {
466        let system = KompactConfig::default().build().expect("KompactSystem");
467        let cc = system.create(TestComponent::new);
468        let core = cc.core();
469        is_send(&core.id);
470        is_send(&core.system);
471        is_send(&core.state);
472        // component is clearly not Send, but that's ok
473        is_sync(&core.id);
474        is_sync(&core.system);
475        is_sync(&core.state);
476        // component is clearly not Sync, but that's ok
477    }
478
479    // Just a way to force the compiler to infer Send for T
480    fn is_send<T: Send>(_v: &T) -> () {
481        // ignore
482    }
483
484    // Just a way to force the compiler to infer Sync for T
485    fn is_sync<T: Sync>(_v: &T) -> () {
486        // ignore
487    }
488
489    #[derive(Debug, Copy, Clone)]
490    struct TestMessage;
491    impl Serialisable for TestMessage {
492        fn ser_id(&self) -> SerId {
493            Self::SER_ID
494        }
495
496        fn size_hint(&self) -> Option<usize> {
497            Some(0)
498        }
499
500        fn serialise(&self, _buf: &mut dyn BufMut) -> Result<(), SerError> {
501            Ok(())
502        }
503
504        fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
505            Ok(self)
506        }
507    }
508    impl Deserialiser<TestMessage> for TestMessage {
509        // whatever
510        const SER_ID: SerId = 42;
511
512        fn deserialise(_buf: &mut dyn Buf) -> Result<TestMessage, SerError> {
513            Ok(TestMessage)
514        }
515    }
516
517    #[derive(ComponentDefinition)]
518    struct ChildComponent {
519        ctx: ComponentContext<Self>,
520        got_message: bool,
521    }
522    impl ChildComponent {
523        fn new() -> Self {
524            ChildComponent {
525                ctx: ComponentContext::uninitialised(),
526                got_message: false,
527            }
528        }
529    }
530    ignore_lifecycle!(ChildComponent);
531    impl NetworkActor for ChildComponent {
532        type Deserialiser = TestMessage;
533        type Message = TestMessage;
534
535        fn receive(&mut self, _sender: Option<ActorPath>, _msg: Self::Message) -> Handled {
536            info!(self.log(), "Child got message");
537            self.got_message = true;
538            Handled::Ok
539        }
540    }
541
542    #[derive(Debug)]
543    enum ParentMessage {
544        GetChild(KPromise<Arc<Component<ChildComponent>>>),
545    }
546
547    #[derive(ComponentDefinition)]
548    struct ParentComponent {
549        ctx: ComponentContext<Self>,
550        alias_opt: Option<String>,
551        child: Option<Arc<Component<ChildComponent>>>,
552    }
553    impl ParentComponent {
554        fn unique() -> Self {
555            ParentComponent {
556                ctx: ComponentContext::uninitialised(),
557                alias_opt: None,
558                child: None,
559            }
560        }
561
562        fn alias(s: String) -> Self {
563            ParentComponent {
564                ctx: ComponentContext::uninitialised(),
565                alias_opt: Some(s),
566                child: None,
567            }
568        }
569    }
570
571    impl ComponentLifecycle for ParentComponent {
572        fn on_start(&mut self) -> Handled {
573            let child = self.ctx.system().create(ChildComponent::new);
574            let f = match self.alias_opt.take() {
575                Some(s) => self.ctx.system().register_by_alias(&child, s),
576                None => self.ctx.system().register(&child),
577            };
578            self.child = Some(child);
579            // async move closure syntax is nightly only
580            Handled::block_on(self, move |async_self| async move {
581                let path = f.await.expect("actor path").expect("actor path");
582                info!(async_self.log(), "Child was registered");
583                if let Some(ref child) = async_self.child {
584                    async_self.ctx.system().start(child);
585                    path.tell(TestMessage, async_self.deref());
586                } else {
587                    unreachable!();
588                }
589            })
590        }
591
592        fn on_stop(&mut self) -> Handled {
593            let _ = self.child.take(); // don't hang on to the child
594            Handled::Ok
595        }
596
597        fn on_kill(&mut self) -> Handled {
598            let _ = self.child.take(); // don't hang on to the child
599            Handled::Ok
600        }
601    }
602    impl Actor for ParentComponent {
603        type Message = ParentMessage;
604
605        fn receive_local(&mut self, msg: Self::Message) -> Handled {
606            match msg {
607                ParentMessage::GetChild(promise) => {
608                    if let Some(ref child) = self.child {
609                        promise.fulfil(child.clone()).expect("fulfilled");
610                    } else {
611                        drop(promise); // this will cause an error on the Future side
612                    }
613                }
614            }
615            Handled::Ok
616        }
617
618        fn receive_network(&mut self, _msg: NetMessage) -> Handled {
619            unimplemented!("Shouldn't be used");
620        }
621    }
622
623    #[test]
624    fn child_unique_registration_test() -> () {
625        let mut conf = KompactConfig::default();
626        conf.system_components(DeadletterBox::new, NetworkConfig::default().build());
627        let system = conf.build().expect("system");
628        let parent = system.create(ParentComponent::unique);
629        system.start(&parent);
630        thread::sleep(TIMEOUT);
631        let (p, f) = promise::<Arc<Component<ChildComponent>>>();
632        parent.actor_ref().tell(ParentMessage::GetChild(p));
633        let child = f.wait_timeout(TIMEOUT).expect("child");
634        let stop_f = system.stop_notify(&child);
635        system.stop(&parent);
636
637        stop_f.wait_timeout(TIMEOUT).expect("child didn't stop");
638        child.on_definition(|cd| {
639            assert!(cd.got_message, "child didn't get the message");
640        });
641        system.shutdown().expect("shutdown");
642    }
643
644    const TEST_ALIAS: &str = "test";
645
646    #[test]
647    fn child_alias_registration_test() -> () {
648        let mut conf = KompactConfig::default();
649        conf.system_components(DeadletterBox::new, NetworkConfig::default().build());
650        let system = conf.build().expect("system");
651        let parent = system.create(|| ParentComponent::alias(TEST_ALIAS.into()));
652        system.start(&parent);
653        thread::sleep(TIMEOUT);
654        let (p, f) = promise::<Arc<Component<ChildComponent>>>();
655        parent.actor_ref().tell(ParentMessage::GetChild(p));
656        let child = f.wait_timeout(TIMEOUT).expect("child");
657        let stop_f = system.stop_notify(&child);
658        system.stop(&parent);
659
660        stop_f.wait_timeout(TIMEOUT).expect("child didn't stop");
661        child.on_definition(|cd| {
662            assert!(cd.got_message, "child didn't get the message");
663        });
664        system.shutdown().expect("shutdown");
665    }
666
667    #[test]
668    fn test_dynamic_port_access() -> () {
669        struct A;
670        impl Port for A {
671            type Indication = u64;
672            type Request = String;
673        }
674        struct B;
675        impl Port for B {
676            type Indication = &'static str;
677            type Request = i8;
678        }
679
680        #[derive(ComponentDefinition, Actor)]
681        struct TestComp {
682            ctx: ComponentContext<Self>,
683            req_a: RequiredPort<A>,
684            prov_b: ProvidedPort<B>,
685        }
686
687        impl TestComp {
688            fn new() -> TestComp {
689                TestComp {
690                    ctx: ComponentContext::uninitialised(),
691                    req_a: RequiredPort::uninitialised(),
692                    prov_b: ProvidedPort::uninitialised(),
693                }
694            }
695        }
696        ignore_lifecycle!(TestComp);
697        ignore_requests!(B, TestComp);
698        ignore_indications!(A, TestComp);
699
700        let system = KompactConfig::default().build().expect("System");
701        let comp = system.create(TestComp::new);
702        let dynamic: Arc<dyn AbstractComponent<Message = Never>> = comp;
703        dynamic.on_dyn_definition(|def| {
704            assert!(def.get_required_port::<A>().is_some());
705            assert!(def.get_provided_port::<A>().is_none());
706
707            assert!(def.get_required_port::<B>().is_none());
708            assert!(def.get_provided_port::<B>().is_some());
709        });
710
711        system.shutdown().expect("shutdown");
712    }
713
714    #[derive(Debug)]
715    enum BlockMe {
716        Now,
717        OnChannel(oneshot::Receiver<String>),
718        SpawnOff(String),
719        OnShutdown,
720    }
721
722    #[derive(ComponentDefinition)]
723    struct BlockingComponent {
724        ctx: ComponentContext<Self>,
725        test_string: String,
726        block_on_shutdown: bool,
727    }
728
729    impl BlockingComponent {
730        fn new() -> Self {
731            BlockingComponent {
732                ctx: ComponentContext::uninitialised(),
733                test_string: "started".to_string(),
734                block_on_shutdown: false,
735            }
736        }
737    }
738
739    impl ComponentLifecycle for BlockingComponent {
740        fn on_kill(&mut self) -> Handled {
741            if self.block_on_shutdown {
742                info!(self.log(), "Cleaning up before shutdown");
743                Handled::block_on(self, move |mut async_self| async move {
744                    async_self.test_string = "done".to_string();
745                    info!(async_self.log(), "Ran BlockMe::OnShutdown future");
746                })
747            } else {
748                Handled::Ok
749            }
750        }
751    }
752
753    impl Actor for BlockingComponent {
754        type Message = BlockMe;
755
756        fn receive_local(&mut self, msg: Self::Message) -> Handled {
757            match msg {
758                BlockMe::Now => {
759                    info!(self.log(), "Got BlockMe::Now");
760                    // async move closure syntax is nightly only
761                    Handled::block_on(self, move |mut async_self| async move {
762                        async_self.test_string = "done".to_string();
763                        info!(async_self.log(), "Ran BlockMe::Now future");
764                    })
765                }
766                BlockMe::OnChannel(receiver) => {
767                    info!(self.log(), "Got BlockMe::OnChannel");
768                    // async move closure syntax is nightly only
769                    Handled::block_on(self, move |mut async_self| async move {
770                        info!(async_self.log(), "Started BlockMe::OnChannel future");
771                        let s = receiver.await;
772                        async_self.test_string = s.expect("Some string");
773                        info!(async_self.log(), "Completed BlockMe::OnChannel future");
774                    })
775                }
776                BlockMe::SpawnOff(s) => {
777                    let handle = self.spawn_off(async move { s });
778                    // async move closure syntax is nightly only
779                    Handled::block_on(self, move |mut async_self| async move {
780                        let res = handle.await.expect("result");
781                        async_self.test_string = res;
782                    })
783                }
784                BlockMe::OnShutdown => {
785                    self.block_on_shutdown = true;
786                    Handled::Ok
787                }
788            }
789        }
790
791        fn receive_network(&mut self, _msg: NetMessage) -> Handled {
792            unimplemented!("No networking here!");
793        }
794    }
795
796    #[test]
797    fn test_immediate_blocking() {
798        let system = KompactConfig::default().build().expect("System");
799        let comp = system.create(BlockingComponent::new);
800        system
801            .start_notify(&comp)
802            .wait_timeout(TIMEOUT)
803            .expect("Component didn't start");
804        comp.actor_ref().tell(BlockMe::Now);
805        thread::sleep(TIMEOUT);
806        system
807            .kill_notify(comp.clone())
808            .wait_timeout(TIMEOUT)
809            .expect("Component didn't die");
810        comp.on_definition(|cd| {
811            assert_eq!(cd.test_string, "done");
812        });
813        system.shutdown().expect("shutdown");
814    }
815
816    #[test]
817    fn test_channel_blocking() {
818        let system = KompactConfig::default().build().expect("System");
819        let comp = system.create(BlockingComponent::new);
820        system
821            .start_notify(&comp)
822            .wait_timeout(TIMEOUT)
823            .expect("Component didn't start");
824
825        let (sender, receiver) = oneshot::channel();
826        comp.actor_ref().tell(BlockMe::OnChannel(receiver));
827        thread::sleep(TIMEOUT);
828        sender.send("gotcha".to_string()).expect("Should have sent");
829        thread::sleep(TIMEOUT);
830        system
831            .kill_notify(comp.clone())
832            .wait_timeout(TIMEOUT)
833            .expect("Component didn't die");
834        comp.on_definition(|cd| {
835            assert_eq!(cd.test_string, "gotcha");
836        });
837        system.shutdown().expect("shutdown");
838    }
839
840    #[test]
841    fn test_mixed_blocking() {
842        let system = KompactConfig::default().build().expect("System");
843        let comp = system.create(BlockingComponent::new);
844        system
845            .start_notify(&comp)
846            .wait_timeout(TIMEOUT)
847            .expect("Component didn't start");
848
849        let (sender, receiver) = oneshot::channel();
850        comp.actor_ref().tell(BlockMe::OnChannel(receiver));
851        thread::sleep(TIMEOUT);
852        comp.actor_ref().tell(BlockMe::Now);
853        sender.send("gotcha".to_string()).expect("Should have sent");
854        thread::sleep(TIMEOUT);
855        system
856            .kill_notify(comp.clone())
857            .wait_timeout(TIMEOUT)
858            .expect("Component didn't die");
859        comp.on_definition(|cd| {
860            assert_eq!(cd.test_string, "done");
861        });
862        system.shutdown().expect("shutdown");
863    }
864
865    #[test]
866    fn test_shutdown_blocking() {
867        let system = KompactConfig::default().build().expect("System");
868        let comp = system.create(BlockingComponent::new);
869        system
870            .start_notify(&comp)
871            .wait_timeout(TIMEOUT)
872            .expect("Component didn't start");
873        comp.actor_ref().tell(BlockMe::OnShutdown);
874        thread::sleep(TIMEOUT);
875        system
876            .kill_notify(comp.clone())
877            .wait_timeout(TIMEOUT)
878            .expect("Component didn't die");
879        comp.on_definition(|cd| {
880            assert_eq!(cd.test_string, "done");
881        });
882        system.shutdown().expect("shutdown");
883    }
884
885    #[test]
886    fn test_component_spawn_off() -> () {
887        let system = KompactConfig::default().build().expect("System");
888        let comp = system.create(BlockingComponent::new);
889        system
890            .start_notify(&comp)
891            .wait_timeout(TIMEOUT)
892            .expect("Component didn't start");
893        comp.actor_ref()
894            .tell(BlockMe::SpawnOff("gotcha".to_string()));
895        thread::sleep(TIMEOUT);
896        system
897            .kill_notify(comp.clone())
898            .wait_timeout(TIMEOUT)
899            .expect("Component didn't die");
900        comp.on_definition(|cd| {
901            assert_eq!(cd.test_string, "gotcha");
902        });
903        system.shutdown().expect("shutdown");
904    }
905
906    #[derive(Debug)]
907    enum AsyncMe {
908        Now,
909        OnChannel(oneshot::Receiver<String>),
910        ConcurrentMessage(oneshot::Receiver<String>),
911        JustAMessage(String),
912    }
913
914    #[derive(ComponentDefinition)]
915    struct AsyncComponent {
916        ctx: ComponentContext<Self>,
917        test_string: String,
918    }
919
920    impl AsyncComponent {
921        fn new() -> Self {
922            AsyncComponent {
923                ctx: ComponentContext::uninitialised(),
924                test_string: "started".to_string(),
925            }
926        }
927    }
928
929    ignore_lifecycle!(AsyncComponent);
930
931    impl Actor for AsyncComponent {
932        type Message = AsyncMe;
933
934        fn receive_local(&mut self, msg: Self::Message) -> Handled {
935            match msg {
936                AsyncMe::Now => {
937                    info!(self.log(), "Got AsyncMe::Now");
938                    // async move closure syntax is nightly only
939                    self.spawn_local(move |mut async_self| async move {
940                        async_self.test_string = "done".to_string();
941                        info!(async_self.log(), "Ran AsyncMe::Now future");
942                        Handled::Ok
943                    });
944                    Handled::Ok
945                }
946                AsyncMe::OnChannel(receiver) => {
947                    info!(self.log(), "Got AsyncMe::OnChannel");
948                    // async move closure syntax is nightly only
949                    self.spawn_local(move |mut async_self| async move {
950                        info!(async_self.log(), "Started AsyncMe::OnChannel future");
951                        let s = receiver.await;
952                        async_self.test_string = s.expect("Some string");
953                        info!(async_self.log(), "Completed Async::OnChannel future");
954                        Handled::Ok
955                    });
956                    Handled::Ok
957                }
958                AsyncMe::ConcurrentMessage(receiver) => {
959                    info!(self.log(), "Got AsyncMe::OnChannel");
960                    // async move closure syntax is nightly only
961                    self.spawn_local(move |mut async_self| async move {
962                        info!(async_self.log(), "Started AsyncMe::ConcurrentMessag future");
963                        let s = receiver.await.expect("Some string");
964                        info!(
965                            async_self.log(),
966                            "Got message {} as state={}", s, async_self.test_string
967                        );
968                        assert_eq!(
969                            s, async_self.test_string,
970                            "Message was not processed before future!"
971                        );
972                        async_self.test_string = "done".to_string();
973                        info!(
974                            async_self.log(),
975                            "Completed AsyncMe::ConcurrentMessage future with state={}",
976                            async_self.test_string
977                        );
978                        Handled::Ok
979                    });
980                    Handled::Ok
981                }
982                AsyncMe::JustAMessage(s) => {
983                    info!(self.log(), "Got AsyncMe::JustAMessage({})", s);
984                    self.test_string = s;
985                    Handled::Ok
986                }
987            }
988        }
989
990        fn receive_network(&mut self, _msg: NetMessage) -> Handled {
991            unimplemented!("No networking here!");
992        }
993    }
994
995    #[test]
996    fn test_immediate_non_blocking() {
997        let system = KompactConfig::default().build().expect("System");
998        let comp = system.create(AsyncComponent::new);
999        system
1000            .start_notify(&comp)
1001            .wait_timeout(TIMEOUT)
1002            .expect("Component didn't start");
1003        comp.actor_ref().tell(AsyncMe::Now);
1004        thread::sleep(TIMEOUT);
1005        system
1006            .kill_notify(comp.clone())
1007            .wait_timeout(TIMEOUT)
1008            .expect("Component didn't die");
1009        comp.on_definition(|cd| {
1010            assert_eq!(cd.test_string, "done");
1011        });
1012        system.shutdown().expect("shutdown");
1013    }
1014
1015    #[test]
1016    fn test_channel_non_blocking() {
1017        let system = KompactConfig::default().build().expect("System");
1018        let comp = system.create(AsyncComponent::new);
1019        system
1020            .start_notify(&comp)
1021            .wait_timeout(TIMEOUT)
1022            .expect("Component didn't start");
1023
1024        let (sender, receiver) = oneshot::channel();
1025        comp.actor_ref().tell(AsyncMe::OnChannel(receiver));
1026        thread::sleep(TIMEOUT);
1027        sender.send("gotcha".to_string()).expect("Should have sent");
1028        thread::sleep(TIMEOUT);
1029        system
1030            .kill_notify(comp.clone())
1031            .wait_timeout(TIMEOUT)
1032            .expect("Component didn't die");
1033        comp.on_definition(|cd| {
1034            assert_eq!(cd.test_string, "gotcha");
1035        });
1036        system.shutdown().expect("shutdown");
1037    }
1038
1039    #[test]
1040    fn test_concurrent_non_blocking() {
1041        let system = KompactConfig::default().build().expect("System");
1042        let comp = system.create(AsyncComponent::new);
1043        system
1044            .start_notify(&comp)
1045            .wait_timeout(TIMEOUT)
1046            .expect("Component didn't start");
1047
1048        let (sender, receiver) = oneshot::channel();
1049        comp.actor_ref().tell(AsyncMe::ConcurrentMessage(receiver));
1050        thread::sleep(TIMEOUT);
1051        let msg = "gotcha";
1052        comp.actor_ref()
1053            .tell(AsyncMe::JustAMessage(msg.to_string()));
1054        thread::sleep(TIMEOUT);
1055        sender.send(msg.to_string()).expect("Should have sent");
1056        thread::sleep(TIMEOUT);
1057        system
1058            .kill_notify(comp.clone())
1059            .wait_timeout(TIMEOUT)
1060            .expect("Component didn't die");
1061        comp.on_definition(|cd| {
1062            assert_eq!(cd.test_string, "done");
1063        });
1064        system.shutdown().expect("shutdown");
1065    }
1066
1067    #[derive(Debug, Clone, Copy)]
1068    struct CountMe;
1069    #[derive(Debug, Clone, Copy)]
1070    struct Counted;
1071    #[derive(Debug, Clone, Copy)]
1072    struct SendCount;
1073
1074    struct CounterPort;
1075    impl Port for CounterPort {
1076        type Indication = CountMe;
1077        type Request = Counted;
1078    }
1079
1080    #[derive(ComponentDefinition)]
1081    struct CountSender {
1082        ctx: ComponentContext<Self>,
1083        count_port: ProvidedPort<CounterPort>,
1084        counted: usize,
1085    }
1086    impl Default for CountSender {
1087        fn default() -> Self {
1088            CountSender {
1089                ctx: ComponentContext::uninitialised(),
1090                count_port: ProvidedPort::uninitialised(),
1091                counted: 0,
1092            }
1093        }
1094    }
1095    ignore_lifecycle!(CountSender);
1096    impl Provide<CounterPort> for CountSender {
1097        fn handle(&mut self, _event: Counted) -> Handled {
1098            self.counted += 1;
1099            Handled::Ok
1100        }
1101    }
1102    impl Actor for CountSender {
1103        type Message = SendCount;
1104
1105        fn receive_local(&mut self, _msg: Self::Message) -> Handled {
1106            self.count_port.trigger(CountMe);
1107            Handled::Ok
1108        }
1109
1110        fn receive_network(&mut self, _msg: NetMessage) -> Handled {
1111            unimplemented!("No networking in this test");
1112        }
1113    }
1114
1115    #[derive(ComponentDefinition)]
1116    struct Counter {
1117        ctx: ComponentContext<Self>,
1118        count_port: RequiredPort<CounterPort>,
1119        count: usize,
1120    }
1121    impl Default for Counter {
1122        fn default() -> Self {
1123            Counter {
1124                ctx: ComponentContext::uninitialised(),
1125                count_port: RequiredPort::uninitialised(),
1126                count: 0,
1127            }
1128        }
1129    }
1130    ignore_lifecycle!(Counter);
1131    impl Require<CounterPort> for Counter {
1132        fn handle(&mut self, _event: CountMe) -> Handled {
1133            self.count += 1;
1134            self.count_port.trigger(Counted);
1135            Handled::Ok
1136        }
1137    }
1138    impl Actor for Counter {
1139        type Message = Never;
1140
1141        fn receive_local(&mut self, _msg: Self::Message) -> Handled {
1142            unreachable!("Never type is empty")
1143        }
1144
1145        fn receive_network(&mut self, _msg: NetMessage) -> Handled {
1146            unimplemented!("No networking in this test");
1147        }
1148    }
1149
1150    #[test]
1151    fn test_channel_disconnection() {
1152        let system = KompactConfig::default().build().expect("System");
1153        let sender = system.create(CountSender::default);
1154
1155        let counter1 = system.create(Counter::default);
1156        let counter2 = system.create(Counter::default);
1157
1158        let channel1: Box<(dyn Channel + Send + 'static)> =
1159            biconnect_components::<CounterPort, _, _>(&sender, &counter1)
1160                .expect("connection")
1161                .boxed();
1162        let channel2 =
1163            biconnect_components::<CounterPort, _, _>(&sender, &counter2).expect("connection");
1164
1165        let start_all = || {
1166            let sender_start_f = system.start_notify(&sender);
1167            let counter1_start_f = system.start_notify(&counter1);
1168            let counter2_start_f = system.start_notify(&counter2);
1169
1170            sender_start_f
1171                .wait_timeout(TIMEOUT)
1172                .expect("sender started");
1173            counter1_start_f
1174                .wait_timeout(TIMEOUT)
1175                .expect("counter1 started");
1176            counter2_start_f
1177                .wait_timeout(TIMEOUT)
1178                .expect("counter2 started");
1179        };
1180        start_all();
1181
1182        sender.actor_ref().tell(SendCount);
1183
1184        thread::sleep(TIMEOUT);
1185
1186        let stop_all = || {
1187            let sender_stop_f = system.stop_notify(&sender);
1188            let counter1_stop_f = system.stop_notify(&counter1);
1189            let counter2_stop_f = system.stop_notify(&counter2);
1190
1191            sender_stop_f.wait_timeout(TIMEOUT).expect("sender stopped");
1192            counter1_stop_f
1193                .wait_timeout(TIMEOUT)
1194                .expect("counter1 stopped");
1195            counter2_stop_f
1196                .wait_timeout(TIMEOUT)
1197                .expect("counter2 stopped");
1198        };
1199        stop_all();
1200
1201        let check_counts = |sender_expected, counter1_expected, counter2_expected| {
1202            let sender_count = sender.on_definition(|cd| cd.counted);
1203            assert_eq!(sender_expected, sender_count);
1204            let counter1_count = counter1.on_definition(|cd| cd.count);
1205            assert_eq!(counter1_expected, counter1_count);
1206            let counter2_count = counter2.on_definition(|cd| cd.count);
1207            assert_eq!(counter2_expected, counter2_count);
1208        };
1209        check_counts(2, 1, 1);
1210
1211        channel2.disconnect().expect("disconnect");
1212
1213        start_all();
1214
1215        sender.actor_ref().tell(SendCount);
1216
1217        thread::sleep(TIMEOUT);
1218
1219        stop_all();
1220
1221        check_counts(3, 2, 1);
1222
1223        channel1.disconnect().expect("disconnect");
1224
1225        start_all();
1226
1227        sender.actor_ref().tell(SendCount);
1228
1229        thread::sleep(TIMEOUT);
1230
1231        stop_all();
1232
1233        check_counts(3, 2, 1);
1234
1235        let sender_port: ProvidedRef<CounterPort> = sender.provided_ref();
1236        let counter1_port: RequiredRef<CounterPort> = counter1.required_ref();
1237        let channel1: Box<(dyn Channel + Send + 'static)> =
1238            sender.connect_to_required(counter1_port).boxed();
1239        let channel2 = counter2.connect_to_provided(sender_port);
1240
1241        start_all();
1242
1243        sender.actor_ref().tell(SendCount);
1244
1245        thread::sleep(TIMEOUT);
1246
1247        stop_all();
1248
1249        check_counts(3, 3, 1);
1250
1251        let counter2_port: RequiredRef<CounterPort> = counter2.required_ref();
1252        let channel3 = sender.connect_to_required(counter2_port);
1253
1254        start_all();
1255
1256        sender.actor_ref().tell(SendCount);
1257
1258        thread::sleep(TIMEOUT);
1259
1260        stop_all();
1261
1262        check_counts(4, 4, 2);
1263
1264        channel1.disconnect().expect("disconnected");
1265        channel2.disconnect().expect("disconnected");
1266        channel3.disconnect().expect("disconnected");
1267
1268        start_all();
1269
1270        sender.actor_ref().tell(SendCount);
1271
1272        thread::sleep(TIMEOUT);
1273
1274        stop_all();
1275
1276        check_counts(4, 4, 2);
1277
1278        system.shutdown().expect("shutdown");
1279    }
1280}