[][src]Trait kompact::runtime::SystemHandle

pub trait SystemHandle: Dispatching {
    fn create<C, F>(&self, f: F) -> Arc<Component<C>>
    where
        F: FnOnce() -> C,
        C: ComponentDefinition + 'static
;
fn register<C>(
        &self,
        c: &Arc<Component<C>>,
        reply_to: &dyn Receiver<RegistrationResponse>
    ) -> RegistrationId
    where
        C: ComponentDefinition + 'static
;
fn register_without_response<C>(&self, c: &Arc<Component<C>>)
    where
        C: ComponentDefinition + 'static
;
fn register_by_alias<C, A>(
        &self,
        c: &Arc<Component<C>>,
        alias: A,
        reply_to: &dyn Receiver<RegistrationResponse>
    ) -> RegistrationId
    where
        C: ComponentDefinition + 'static,
        A: Into<String>
;
fn register_by_alias_without_response<C, A>(
        &self,
        c: &Arc<Component<C>>,
        alias: A
    )
    where
        C: ComponentDefinition + 'static,
        A: Into<String>
;
fn update_alias_registration<C, A>(
        &self,
        c: &Arc<Component<C>>,
        alias: A,
        reply_to: &dyn Receiver<RegistrationResponse>
    ) -> RegistrationId
    where
        C: ComponentDefinition + 'static,
        A: Into<String>
;
fn update_alias_registration_without_response<C, A>(
        &self,
        c: &Arc<Component<C>>,
        alias: A
    )
    where
        C: ComponentDefinition + 'static,
        A: Into<String>
;
fn start<C>(&self, c: &Arc<Component<C>>)
    where
        C: ComponentDefinition + 'static
;
fn stop<C>(&self, c: &Arc<Component<C>>)
    where
        C: ComponentDefinition + 'static
;
fn kill<C>(&self, c: Arc<Component<C>>)
    where
        C: ComponentDefinition + 'static
;
fn throughput(&self) -> usize;
fn max_messages(&self) -> usize;
fn shutdown_async(&self);
fn system_path(&self) -> SystemPath;
fn deadletter_ref(&self) -> ActorRef<Never>; }

A limited version of a KompactSystem

This is meant for use from within components, where all the blocking APIs are unacceptable anyway.

Required methods

fn create<C, F>(&self, f: F) -> Arc<Component<C>> where
    F: FnOnce() -> C,
    C: ComponentDefinition + 'static, 

Create a new component

Uses f to create an instance of a ComponentDefinition, which is the initialised to form a Component. Since components are shared between threads, the created component is wrapped into an Arc.

Newly created components are not started automatically. Use start or start_notify to start a newly created component, once it is connected properly.

If you need address this component via the network, see the register function.

Example

let c = system.create(TestComponent1::new);

fn register<C>(
    &self,
    c: &Arc<Component<C>>,
    reply_to: &dyn Receiver<RegistrationResponse>
) -> RegistrationId where
    C: ComponentDefinition + 'static, 

Attempts to register c with the dispatcher using its unique id

The returned id can be used to match a later response message which will contain the unique id ActorPath for the given component, once it is completed by the dispatcher.

Once the response message is received, the component can be addressed via the network, even if it has not been started, yet (in which case messages will simply be queued up).

Example

use kompact::prelude::*;
use std::time::Duration;
use std::sync::Arc;

#[derive(ComponentDefinition)]
struct ParentComponent {
    ctx: ComponentContext<Self>,
    child: Option<Arc<Component<TestComponent1>>>,
    reg_id: Option<RegistrationId>,
}
impl ParentComponent {
    fn new() -> Self {
        ParentComponent {
            ctx: ComponentContext::new(),
            child: None,
            reg_id: None,
        }
    }
}
impl Provide<ControlPort> for ParentComponent {
   fn handle(&mut self, event: ControlEvent) -> () {
        match event {
            ControlEvent::Start => {
                let child = self.ctx.system().create(TestComponent1::new);
                let id = self.ctx.system().register(&child, self);
                self.reg_id = Some(id);
                self.child = Some(child);
            }
            ControlEvent::Stop | ControlEvent::Kill => {
                let _ = self.child.take(); // don't hang on to the child
            }
        }
    }
}
impl Actor for ParentComponent {
    type Message = RegistrationResponse;

    fn receive_local(&mut self, msg: Self::Message) -> () {
        assert_eq!(msg.id, self.reg_id.take().unwrap());
        info!(self.log(), "Child was registered");
        if let Some(ref child) = self.child {
            self.ctx.system().start(child);
            let path = msg.result.expect("actor path");
            path.tell((), self); // can send it messages via its path now
        } else {
            unreachable!("Wouldn't have asked for registration without storing the child");
        }
    }

    fn receive_network(&mut self, _msg: NetMessage) -> () {
        unimplemented!("unused");
    }
}

let mut cfg = KompactConfig::new();
cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
let system = cfg.build().expect("KompactSystem");
let parent = system.create(ParentComponent::new);

fn register_without_response<C>(&self, c: &Arc<Component<C>>) where
    C: ComponentDefinition + 'static, 

Attempts to register c with the dispatcher using its unique id

Same as register except requesting that no response be send.

fn register_by_alias<C, A>(
    &self,
    c: &Arc<Component<C>>,
    alias: A,
    reply_to: &dyn Receiver<RegistrationResponse>
) -> RegistrationId where
    C: ComponentDefinition + 'static,
    A: Into<String>, 

Attempts to register the provided component with a human-readable alias

The returned id can be used to match a later response message which will contain the named ActorPath for the given alias, once it is completed by the dispatcher.

Alias registration will fail if a previous registration already exists. Use update_alias_registration to override an existing registration.

Note

While aliases are easier to read, lookup by unique ids is significantly more efficient. However, named aliases allow services to be taken over by another component when the original registrant failed, something that is not possible with unique paths. Thus, this kind of addressing lends itself to lookup-service style components, for example.

Example

use kompact::prelude::*;
use std::time::Duration;
use std::sync::Arc;

#[derive(ComponentDefinition)]
struct ParentComponent {
    ctx: ComponentContext<Self>,
    child: Option<Arc<Component<TestComponent1>>>,
    reg_id: Option<RegistrationId>,
}
impl ParentComponent {
    fn new() -> Self {
        ParentComponent {
            ctx: ComponentContext::new(),
            child: None,
            reg_id: None,
        }
    }
}
impl Provide<ControlPort> for ParentComponent {
   fn handle(&mut self, event: ControlEvent) -> () {
        match event {
            ControlEvent::Start => {
                let child = self.ctx.system().create(TestComponent1::new);
                let id = self.ctx.system().register_by_alias(&child, "test", self);
                self.reg_id = Some(id);
                self.child = Some(child);
            }
            ControlEvent::Stop | ControlEvent::Kill => {
                let _ = self.child.take(); // don't hang on to the child
            }
        }
    }
}
impl Actor for ParentComponent {
    type Message = RegistrationResponse;

    fn receive_local(&mut self, msg: Self::Message) -> () {
        assert_eq!(msg.id, self.reg_id.take().unwrap());
        info!(self.log(), "Child was registered");
        if let Some(ref child) = self.child {
            self.ctx.system().start(child);
            let path = msg.result.expect("actor path");
            path.tell((), self); // can send it messages via its path now
        } else {
            unreachable!("Wouldn't have asked for registration without storing the child");
        }
    }

    fn receive_network(&mut self, _msg: NetMessage) -> () {
        unimplemented!("unused");
    }
}

let mut cfg = KompactConfig::new();
cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
let system = cfg.build().expect("KompactSystem");
let parent = system.create(ParentComponent::new);

fn register_by_alias_without_response<C, A>(
    &self,
    c: &Arc<Component<C>>,
    alias: A
) where
    C: ComponentDefinition + 'static,
    A: Into<String>, 

Attempts to register the provided component with a human-readable alias

Same as register_by_alias except requesting that no response be send.

fn update_alias_registration<C, A>(
    &self,
    c: &Arc<Component<C>>,
    alias: A,
    reply_to: &dyn Receiver<RegistrationResponse>
) -> RegistrationId where
    C: ComponentDefinition + 'static,
    A: Into<String>, 

Attempts to register the provided component with a human-readable alias.

The returned id can be used to match a later response message which will contain the named ActorPath for the given alias, once it is completed by the dispatcher.

This registration will replace any previous registration, if it exists.

Note

While aliases are easier to read, lookup by unique ids is significantly more efficient. However, named aliases allow services to be taken over by another component when the original registrant failed, something that is not possible with unique paths. Thus, this kind of addressing lends itself to lookup-service style components, for example.

Example

use kompact::prelude::*;
use std::time::Duration;
use std::sync::Arc;

#[derive(ComponentDefinition)]
struct ParentComponent {
    ctx: ComponentContext<Self>,
    child: Option<Arc<Component<TestComponent1>>>,
    reg_id: Option<RegistrationId>,
}
impl ParentComponent {
    fn new() -> Self {
        ParentComponent {
            ctx: ComponentContext::new(),
            child: None,
            reg_id: None,
        }
    }
}
impl Provide<ControlPort> for ParentComponent {
   fn handle(&mut self, event: ControlEvent) -> () {
        match event {
            ControlEvent::Start => {
                let child = self.ctx.system().create(TestComponent1::new);
                let id = self.ctx.system().update_alias_registration(&child, "test", self);
                self.reg_id = Some(id);
                self.child = Some(child);
            }
            ControlEvent::Stop | ControlEvent::Kill => {
                let _ = self.child.take(); // don't hang on to the child
            }
        }
    }
}
impl Actor for ParentComponent {
    type Message = RegistrationResponse;

    fn receive_local(&mut self, msg: Self::Message) -> () {
        assert_eq!(msg.id, self.reg_id.take().unwrap());
        info!(self.log(), "Child was registered");
        if let Some(ref child) = self.child {
            self.ctx.system().start(child);
            let path = msg.result.expect("actor path");
            path.tell((), self); // can send it messages via its path now
        } else {
            unreachable!("Wouldn't have asked for registration without storing the child");
        }
    }

    fn receive_network(&mut self, _msg: NetMessage) -> () {
        unimplemented!("unused");
    }
}

let mut cfg = KompactConfig::new();
cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
let system = cfg.build().expect("KompactSystem");
let parent = system.create(ParentComponent::new);

fn update_alias_registration_without_response<C, A>(
    &self,
    c: &Arc<Component<C>>,
    alias: A
) where
    C: ComponentDefinition + 'static,
    A: Into<String>, 

Attempts to register the provided component with a human-readable alias

Same as update_alias_registration except requesting that no response be send.

fn start<C>(&self, c: &Arc<Component<C>>) where
    C: ComponentDefinition + 'static, 

Start a component

A component only handles events/messages once it is started. In particular, a component that isn't started shouldn't be scheduled and thus access to its definition should always succeed, for example via on_definition.

Example

let c = system.create(TestComponent1::new);
system.start(&c);

fn stop<C>(&self, c: &Arc<Component<C>>) where
    C: ComponentDefinition + 'static, 

Stop a component

A component does not handle any events/messages while it is stopped, but it does not get deallocated either. It can be started again later with start or start_notify.

A component that is stopped shouldn't be scheduled and thus access to its definition should always succeed, for example via on_definition.

Example

use std::time::Duration;
let c = system.create(TestComponent1::new);
system.start_notify(&c)
      .wait_timeout(Duration::from_millis(1000))
      .expect("TestComponent1 never started!");
system.stop(&c);

fn kill<C>(&self, c: Arc<Component<C>>) where
    C: ComponentDefinition + 'static, 

Stop and deallocate a component

The supervisor will attempt to deallocate c once it is stopped. However, if there are still outstanding references somewhere else in the system this will fail, of course. In that case the supervisor leaves a debug message in the logging output, so that this circumstance can be discovered if necessary.

Example

use std::time::Duration;
let c = system.create(TestComponent1::new);
system.start_notify(&c)
      .wait_timeout(Duration::from_millis(1000))
      .expect("TestComponent1 never started!");
system.kill(c);

fn throughput(&self) -> usize

Return the configured thoughput value

See also throughput.

fn max_messages(&self) -> usize

Return the configured maximum number of messages per scheduling

This value is based on throughput and msg_priority.

fn shutdown_async(&self)

Shutdown the Kompact system from within a component

Stops all components and then stops the scheduler.

This function may fail to stop in time (or at all), if components hang on to scheduler threads indefinitely.

Example

use kompact::prelude::*;

#[derive(ComponentDefinition, Actor)]
struct Stopper {
   ctx: ComponentContext<Self>,
}
impl Stopper {
    fn new() -> Stopper {
        Stopper {
            ctx: ComponentContext::new(),
        }
    }    
}
impl Provide<ControlPort> for Stopper {
   fn handle(&mut self, event: ControlEvent) -> () {
        match event {
           ControlEvent::Start => {
               self.ctx().system().shutdown_async();
           }
           _ => (), // ignore
       }
    }
}
let system = KompactConfig::default().build().expect("system");
let c = system.create(Stopper::new);
system.start(&c);
system.await_termination();

fn system_path(&self) -> SystemPath

Return the system path of this Kompact system

The system path forms a prefix for every ActorPath.

fn deadletter_ref(&self) -> ActorRef<Never>

Returns a reference to the system's deadletter box

Loading content...

Implementors

Loading content...