[−][src]Struct kompact::runtime::KompactSystem
A Kompact system is a collection of components and services
An instance of KompactSystem
is created from a KompactConfig
via the build function.
It is possible to run more than one Kompact system in a single process. This allows different settings to be used for different component groups, for example. It can also be used for testing communication in unit or integration tests.
Note
For some schedulers it can happen that components may switch from one system's scheduler to the
other's when running multiple systems in the same process and communicating between them via channels
or actor references.
Generally, this shouldn't be an issue, but it can invalidate assumptions on thread assignments, so it's
important to be aware of. If this behaviour needs to be avoided at all costs, one can either use a scheduler
that doesn't use thread-local variables to determine the target queue
(e.g., crossbeam_channel_pool
),
or limit cross-system communication to network-only,
incurring the associated serialisation/deserialisations costs.
Example
Build a system with default settings with:
use kompact::prelude::*; let system = KompactConfig::default().build().expect("system");
Methods
impl KompactSystem
[src]
pub fn logger(&self) -> &KompactLogger
[src]
Get a reference to the system-wide Kompact logger
Example
info!(system.logger(), "Hello World from the system logger!");
pub fn config(&self) -> &Hocon
[src]
Get a reference to the system configuration
Use load_config_str or or load_config_file to load values into the config object.
Example
use kompact::prelude::*; let default_values = r#"{ a = 7 }"#; let mut conf = KompactConfig::default(); conf.load_config_str(default_values); let system = conf.build().expect("system"); assert_eq!(Some(7i64), system.config()["a"].as_i64());
pub fn config_owned(&self) -> Arc<Hocon>
[src]
Get a owned reference to the system configuration
pub fn create<C, F>(&self, f: F) -> Arc<Component<C>> where
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
[src]
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
Create a new component
Uses f
to create an instance of a ComponentDefinition,
which is the initialised to form a Component.
Since components are shared between threads, the created component
is wrapped into an Arc.
Newly created components are not started automatically. Use start or start_notify to start a newly created component, once it is connected properly.
If you need address this component via the network, see the register function.
Example
let c = system.create(TestComponent1::new);
pub fn create_unsupervised<C, F>(&self, f: F) -> Arc<Component<C>> where
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
[src]
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
Create a new system component
You must use this instead of create to create a new system component. During system initialisation the supervisor is not available, yet, so normal create calls will panic!
pub fn create_dedicated<C, F>(&self, f: F) -> Arc<Component<C>> where
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
[src]
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
Create a new component, which runs on its own dedicated thread.
Uses f
to create an instance of a ComponentDefinition,
which is the initialised to form a Component.
Since components are shared between threads, the created component
is wrapped into an Arc.
A dedicated thread is assigned to this component, which sleeps when the component has no work.
Newly created components are not started automatically. Use start or start_notify to start a newly created component, once it is connected properly.
If you need address this component via the network, see the register function.
pub fn create_dedicated_unsupervised<C, F>(&self, f: F) -> Arc<Component<C>> where
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
[src]
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
Create a new system component, which runs on its own dedicated thread.
A dedicated thread is assigned to this component, which sleeps when the component has no work.
You must use this instead of create_dedicated to create a new system component on its own thread. During system initialisation the supervisor is not available, yet, so normal create calls will panic!
pub fn register<C>(&self, c: &Arc<Component<C>>) -> Future<RegistrationResult> where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Attempts to register c
with the dispatcher using its unique id
The returned future will contain the unique id ActorPath for the given component, once it is completed by the dispatcher.
Once the future completes, the component can be addressed via the network, even if it has not been started, yet (in which case messages will simply be queued up).
Example
use kompact::prelude::*; use std::time::Duration; let mut cfg = KompactConfig::new(); cfg.system_components(DeadletterBox::new, { let net_config = NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")); net_config.build() }); let system = cfg.build().expect("KompactSystem"); let c = system.create(TestComponent1::new); system.register(&c).wait_expect(Duration::from_millis(1000), "Failed to register TestComponent1");
pub fn create_and_register<C, F>(
&self,
f: F
) -> (Arc<Component<C>>, Future<RegistrationResult>) where
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
[src]
&self,
f: F
) -> (Arc<Component<C>>, Future<RegistrationResult>) where
F: FnOnce() -> C,
C: ComponentDefinition + 'static,
Creates a new component and registers it with the dispatcher
This function is simply a convenience shortcut for create followed by register, as this combination is very common in networked Kompact systems.
Example
use kompact::prelude::*; use std::time::Duration; let mut cfg = KompactConfig::new(); cfg.system_components(DeadletterBox::new, { let net_config = NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")); net_config.build() }); let system = cfg.build().expect("KompactSystem"); let (c, registration_future) = system.create_and_register(TestComponent1::new); registration_future.wait_expect(Duration::from_millis(1000), "Failed to register TestComponent1");
pub fn register_by_alias<C, A>(
&self,
c: &Arc<Component<C>>,
alias: A
) -> Future<RegistrationResult> where
C: ComponentDefinition + 'static,
A: Into<String>,
[src]
&self,
c: &Arc<Component<C>>,
alias: A
) -> Future<RegistrationResult> where
C: ComponentDefinition + 'static,
A: Into<String>,
Attempts to register the provided component with a human-readable alias.
The returned future will contain the named ActorPath for the given alias, once it is completed by the dispatcher.
Alias registration will fail if a previous registration already exists. Use update_alias_registration to override an existing registration.
Note
While aliases are easier to read, lookup by unique ids is significantly more efficient. However, named aliases allow services to be taken over by another component when the original registrant failed, something that is not possible with unique paths. Thus, this kind of addressing lends itself to lookup-service style components, for example.
Example
use kompact::prelude::*; use std::time::Duration; let mut cfg = KompactConfig::new(); cfg.system_components(DeadletterBox::new, { let net_config = NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")); net_config.build() }); let system = cfg.build().expect("KompactSystem"); let (c, unique_registration_future) = system.create_and_register(TestComponent1::new); unique_registration_future.wait_expect(Duration::from_millis(1000), "Failed to register TestComponent1"); let alias_registration_future = system.register_by_alias(&c, "test"); alias_registration_future.wait_expect(Duration::from_millis(1000), "Failed to register TestComponent1 by alias");
pub fn update_alias_registration<C, A>(
&self,
c: &Arc<Component<C>>,
alias: A
) -> Future<RegistrationResult> where
C: ComponentDefinition + 'static,
A: Into<String>,
[src]
&self,
c: &Arc<Component<C>>,
alias: A
) -> Future<RegistrationResult> where
C: ComponentDefinition + 'static,
A: Into<String>,
Attempts to register the provided component with a human-readable alias.
The returned future will contain the named ActorPath for the given alias, once it is completed by the dispatcher.
This registration will replace any previous registration, if it exists.
Note
While aliases are easier to read, lookup by unique ids is significantly more efficient. However, named aliases allow services to be taken over by another component when the original registrant failed, something that is not possible with unique paths. Thus, this kind of addressing lends itself to lookup-service style components, for example.
Example
use kompact::prelude::*; use std::time::Duration; let mut cfg = KompactConfig::new(); cfg.system_components(DeadletterBox::new, { let net_config = NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")); net_config.build() }); let system = cfg.build().expect("KompactSystem"); let (c, unique_registration_future) = system.create_and_register(TestComponent1::new); unique_registration_future.wait_expect(Duration::from_millis(1000), "Failed to register TestComponent1"); let alias_registration_future = system.update_alias_registration(&c, "test"); alias_registration_future.wait_expect(Duration::from_millis(1000), "Failed to register TestComponent1 by alias"); let alias_reregistration_future = system.update_alias_registration(&c, "test"); alias_reregistration_future.wait_expect(Duration::from_millis(1000), "Failed to override TestComponent1 registration by alias");
pub fn start<C>(&self, c: &Arc<Component<C>>) where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Start a component
A component only handles events/messages once it is started. In particular, a component that isn't started shouldn't be scheduled and thus access to its definition should always succeed, for example via on_definition.
Example
let c = system.create(TestComponent1::new); system.start(&c);
pub fn start_notify<C>(&self, c: &Arc<Component<C>>) -> Future<()> where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Start a component and complete a future once it has started
When the returned future completes, the component is guaranteed to have started. However, it is not guaranteed to be in an active state, as it could already have been stopped or could have failed since.
A component only handles events/messages once it is started. In particular, a component that isn't started shouldn't be scheduled and thus access to its definition should always succeed, for example via on_definition.
Example
use std::time::Duration; let c = system.create(TestComponent1::new); system.start_notify(&c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never started!");
pub fn stop<C>(&self, c: &Arc<Component<C>>) where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Stop a component
A component does not handle any events/messages while it is stopped, but it does not get deallocated either. It can be started again later with start or start_notify.
A component that is stopped shouldn't be scheduled and thus access to its definition should always succeed, for example via on_definition.
Example
use std::time::Duration; let c = system.create(TestComponent1::new); system.start_notify(&c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never started!"); system.stop(&c);
pub fn stop_notify<C>(&self, c: &Arc<Component<C>>) -> Future<()> where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Stop a component and complete a future once it has stopped
When the returned future completes, the component is guaranteed to have stopped. However, it is not guaranteed to be in a passive state, as it could already have been started again since.
A component does not handle any events/messages while it is stopped, but it does not get deallocated either. It can be started again later with start or start_notify.
A component that is stopped shouldn't be scheduled and thus access to its definition should always succeed, for example via on_definition.
Example
use std::time::Duration; let c = system.create(TestComponent1::new); system.start_notify(&c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never started!"); system.stop_notify(&c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never stopped!"); system.start_notify(&c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never re-started!");
pub fn kill<C>(&self, c: Arc<Component<C>>) where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Stop and deallocate a component
The supervisor will attempt to deallocate c
once it is stopped.
However, if there are still outstanding references somewhere else in the system
this will fail, of course. In that case the supervisor leaves a debug message
in the logging output, so that this circumstance can be discovered if necessary.
Example
use std::time::Duration; let c = system.create(TestComponent1::new); system.start_notify(&c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never started!"); system.kill(c);
pub fn kill_notify<C>(&self, c: Arc<Component<C>>) -> Future<()> where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Stop and deallocate a component, and complete a future once it has stopped
The supervisor will attempt to deallocate c
once it is stopped.
However, if there are still outstanding references somewhere else in the system
this will fail, of course. In that case the supervisor leaves a debug message
in the logging output, so that this circumstance can be discovered if necessary.
Note
The completion of the future indicates that the component has been stopped, not that it has been deallocated.
If, for some reason, you really need to know when it has been deallocated,
you need to hold on to a copy of the component, use try_unwrap
and then call drop
once you are successful.
Example
use std::time::Duration; let c = system.create(TestComponent1::new); system.start_notify(&c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never started!"); system.kill_notify(c) .wait_timeout(Duration::from_millis(1000)) .expect("TestComponent1 never stopped!");
pub fn trigger_i<P>(&self, event: P::Indication, port: &RequiredRef<P>) where
P: Port + 'static,
[src]
P: Port + 'static,
Trigger an indication event
on a shared port
This can be used to send events to component without connecting a channel. You only nneed to acquire port reference via share.
pub fn trigger_r<P>(&self, msg: P::Request, port: &ProvidedRef<P>) where
P: Port + 'static,
[src]
P: Port + 'static,
Trigger a request event
on a shared port
This can be used to send events to component without connecting a channel. You only nneed to acquire port reference via share.
pub fn throughput(&self) -> usize
[src]
Return the configured thoughput value
See also throughput.
pub fn max_messages(&self) -> usize
[src]
Return the configured maximum number of messages per scheduling
This value is based on throughput and msg_priority.
pub fn await_termination(self)
[src]
Wait for the Kompact system to be terminated
Suspends this thread until the system is terminated from some other thread, such as its own threadpool, for example.
Note
Don't use this method for any measurements, as its implementation currently does not include any notification logic. It simply checks its internal state every second or so, so there might be quite some delay until a shutdown is detected.
pub fn shutdown(self) -> Result<(), String>
[src]
Shutdown the Kompact system
Stops all components and then stops the scheduler.
This function may fail to stop in time (or at all), if components hang on to scheduler threads indefinitely.
Example
use kompact::prelude::*; let system = KompactConfig::default().build().expect("system"); system.shutdown().expect("shutdown");
pub fn shutdown_async(&self)
[src]
Shutdown the Kompact system from within a component
Stops all components and then stops the scheduler.
This function may fail to stop in time (or at all), if components hang on to scheduler threads indefinitely.
Example
use kompact::prelude::*; #[derive(ComponentDefinition, Actor)] struct Stopper { ctx: ComponentContext<Self>, } impl Stopper { fn new() -> Stopper { Stopper { ctx: ComponentContext::new(), } } } impl Provide<ControlPort> for Stopper { fn handle(&mut self, event: ControlEvent) -> () { match event { ControlEvent::Start => { self.ctx().system().shutdown_async(); } _ => (), // ignore } } } let system = KompactConfig::default().build().expect("system"); let c = system.create(Stopper::new); system.start(&c); system.await_termination();
pub fn system_path(&self) -> SystemPath
[src]
Return the system path of this Kompact system
The system path forms a prefix for every ActorPath.
pub fn actor_path_for<C>(&self, component: &Arc<Component<C>>) -> ActorPath where
C: ComponentDefinition + 'static,
[src]
C: ComponentDefinition + 'static,
Generate an unique path for the given component
Produces a unique id ActorPath for component
using the system path of this system.
The returned ActorPath
is only useful if the component was first registered,
otherwise all messages sent to it will land in the system's deadletter box.
Note
If you pass in a component from a different system, you will get
a perfectly valid ActorPath
instance, but which can not receive any messages,
unless you also registered the component with this system's dispatcher.
Suffice to say, crossing system boundaries in this manner is not recommended.
Trait Implementations
impl ActorRefFactory for KompactSystem
[src]
type Message = Never
The type of messages carried by references produced by this factory
fn actor_ref(&self) -> ActorRef<Never>
[src]
Returns a reference to the deadletter box
impl ActorSource for KompactSystem
[src]
fn path_resolvable(&self) -> PathResolvable
[src]
impl Clone for KompactSystem
[src]
fn clone(&self) -> KompactSystem
[src]
fn clone_from(&mut self, source: &Self)
1.0.0[src]
impl Dispatching for KompactSystem
[src]
fn dispatcher_ref(&self) -> DispatcherRef
[src]
impl TimerRefFactory for KompactSystem
[src]
Auto Trait Implementations
impl !RefUnwindSafe for KompactSystem
impl Send for KompactSystem
impl Sync for KompactSystem
impl Unpin for KompactSystem
impl !UnwindSafe for KompactSystem
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> From<T> for T
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T> ToOwned for T where
T: Clone,
[src]
T: Clone,
type Owned = T
The resulting type after obtaining ownership.
fn to_owned(&self) -> T
[src]
fn clone_into(&self, target: &mut T)
[src]
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,
type Error = <U as TryFrom<T>>::Error
The type returned in the event of a conversion error.
fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>
[src]
impl<V, T> VZip<V> for T where
V: MultiLane<T>,
V: MultiLane<T>,