kompact/runtime/
config.rs

1use super::*;
2
3use crate::{
4    config::{ConfigEntry, ConfigError, ConfigValueType, HoconExt},
5    messaging::DispatchEnvelope,
6};
7use executors::*;
8use hocon::Hocon;
9use std::{fmt, path::PathBuf, rc::Rc};
10
11/// Configuration keys for Kompact systems.
12pub mod keys {
13    use super::*;
14    use crate::config::*;
15
16    kompact_config! {
17        LABEL,
18        key = "kompact.runtime.label",
19        type = StringValue,
20        default = default_runtime_label(),
21        doc = r#"The system label.
22
23# Default
24
25The default value is `kompact-runtime-` followed by a unique number (for this process).
26    "#,
27        version = "0.11"
28    }
29
30    kompact_config! {
31        THROUGHPUT,
32        key = "kompact.runtime.throughput",
33        type = UsizeValue,
34        default = 50,
35        validate = |value| *value > 0,
36        doc = r#"The scheduling granularity of a component.
37
38This settings determines the maximum number of messages and events a component will handle before re-scheduling itself to cede compute resources to other components.
39Thus, this settings allows users to tune fairness vs. throughput. 
40Smaller values will give better fairness, while larger values will increase potential throughput.
41The exact value depends on your workload per event. 
42If each component does a lot compute for every event, then a low value will be sufficient to counter scheduling overhead.
43If, however, the work per event is very low, performance will generally increase with higher values.
44
45# Legal Values
46
47Values must be `> 0`.
48
49# Default
50
51The default value is 50, which is tuned for relatively low-compute workloads (e.g., a few arithmetic operations per events and mayb a collection update/lookup).
52    "#,
53        version = "0.11"
54    }
55
56    kompact_config! {
57        MESSAGE_PRIORITY,
58        key = "kompact.runtime.message-priority",
59        type = F32Value,
60        default = 0.5,
61        validate = |value| 0.0 < *value && *value < 1.0,
62        doc = r#"The ratio between handling messages and events.
63
64A component will handle up to *throughput × r* messages and *throughput × (1 - r)* events before re-scheduling.
65
66If there are fewer events or messages than alloted queued up the remaining allotment will be redistributed to the other type until all throughput is used up or no messages or events remain.
67
68# Legal Values
69
70Values must be `> 0.0` and `< 1.0`.
71
72Also note that this setting will not allow either type to be completely starved out.
73Once multiplied with throughput a minimum of 1 message or event will be enforced.
74This means you might not get the ratio you expect with low [throughput](crate::config_keys::system::THROUGHPUT) values.
75
76# Default
77
78The default value is 0.5, i.e. an even split.
79    "#,
80        version = "0.11"
81    }
82
83    kompact_config! {
84        THREADS,
85        key = "kompact.runtime.threads",
86        type = UsizeValue,
87        default = std::cmp::max(1, num_cpus::get()),
88        validate = |value| *value > 0,
89        doc = r#"The ratio between handling messages and events.
90
91A component will handle up to *throughput × r* messages and *throughput × (1 - r)* events before re-scheduling.
92
93If there are fewer events or messages than alloted queued up the remaining allotment will be redistributed to the other type until all throughput is used up or no messages or events remain.
94
95# Legal Values
96
97Values must be `> 0`.
98
99# Default
100
101The default value is 1 per cpu core or a minimum of 1.
102    "#,
103        version = "0.11"
104    }
105
106    kompact_config! {
107        SCHEDULER,
108        key = "kompact.runtime.scheduler",
109        type = StringValue,
110        default = "auto".to_string(),
111        validate = |value| ["auto", "small", "large", "dynamic", "custom"].contains(&value.as_ref()),
112        doc = r#"The scheduler implementation to use for the system.
113
114A component will handle up to *throughput × r* messages and *throughput × (1 - r)* events before re-scheduling.
115
116If there are fewer events or messages than alloted queued up the remaining allotment will be redistributed to the other type until all throughput is used up or no messages or events remain.
117
118# Legal Values
119
120- `auto`: Automatically pick the right implementation of [crossbeam_workstealing_pool](crossbeam_workstealing_pool)
121- `small`: Use [small_pool](crossbeam_workstealing_pool::small_pool)
122- `large`: Use [large_pool](crossbeam_workstealing_pool::large_pool)
123- `dynamic`: Use [dyn_pool](crossbeam_workstealing_pool::dyn_pool)
124- `custom`: Use the scheduler provided via [KompactConfig::scheduler](KompactConfig::scheduler) or [KompactConfig::executor](KompactConfig::executor) (this will be set automatically when you use those functions).
125
126# Default
127
128The default value is `auto`.
129    "#,
130        version = "0.11"
131    }
132}
133
134#[derive(Debug, Clone)]
135pub(crate) enum ConfigSource {
136    File(PathBuf),
137    Str(String),
138}
139
140/// A configuration builder for Kompact systems
141///
142/// # Example
143///
144/// Set a custom label, and run up to 50 events or messages per scheduling of a component
145/// on a threadpool with 2 threads.
146///
147/// ```
148/// use kompact::prelude::*;
149///
150/// let mut conf = KompactConfig::default();
151/// conf.label("My special system")
152///     .throughput(50)
153///     .threads(2);
154/// let system = conf.build().expect("system");
155/// # system.shutdown().expect("shutdown");
156/// ```
157#[derive(Clone)]
158pub struct KompactConfig {
159    pub(crate) label: String,
160    pub(crate) throughput: usize,
161    pub(crate) msg_priority: f32,
162    pub(crate) threads: usize,
163    pub(crate) timer_builder: Rc<TimerBuilder>,
164    pub(crate) scheduler_builder: Rc<SchedulerBuilder>,
165    pub(crate) sc_builder: Rc<ScBuilder>,
166    pub(crate) root_logger: Option<KompactLogger>,
167    pub(crate) config_sources: Vec<ConfigSource>,
168}
169
170impl fmt::Debug for KompactConfig {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        write!(
173            f,
174            "KompactConfig{{
175            label={},
176            throughput={},
177            msg_priority={},
178            threads={},
179            timer_builder=<function>,
180            scheduler_builder=<function>,
181            sc_builder=<function>,
182            root_logger={:?},
183            config_sources={:?}
184        }}",
185            self.label,
186            self.throughput,
187            self.msg_priority,
188            self.threads,
189            self.root_logger,
190            self.config_sources,
191        )
192    }
193}
194
195/// A minimal Kompact configuration
196///
197/// - It the default label, i.e. `"kompact-runtime-{}"` for some sequence number.
198/// - It sets the event/message throughput to 2, split evenly between events and messages.
199/// - It runs with a single thread on a [small pool](crossbeam_workstealing_pool::small_pool).
200pub const MINIMAL_CONFIG: &str = r#"
201    kompact.system {
202        throughput = 2
203        message-priority = 0.5
204        threads = 1
205    }
206"#;
207
208impl KompactConfig {
209    /// Create a minimal Kompact config
210    ///
211    /// - The minimal config uses the default label, i.e. `"kompact-runtime-{}"` for some sequence number.
212    /// - It sets the event/message throughput to 2, split evenly between events and messages.
213    /// - It runs with a single thread on a [small pool](crossbeam_workstealing_pool::small_pool).
214    /// - It uses all default components, without networking, with the default timer and default logger.
215    #[deprecated(
216        since = "0.11.0",
217        note = "If you really want these exact parameters, use `KompactConfig::load_config_str(kompact::runtime::MINIMAL_CONFIG)` instead. Otherwise prefer `KompactConfig::default()`."
218    )]
219    pub fn new() -> KompactConfig {
220        KompactConfig {
221            label: "<unset>".to_string(),
222            throughput: 0,
223            msg_priority: 0.0,
224            threads: 1,
225            timer_builder: Rc::new(DefaultTimer::new_timer_component),
226            scheduler_builder: Rc::new(|t| {
227                ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t))
228            }),
229            sc_builder: Rc::new(|sys, dead_prom, disp_prom| {
230                Box::new(DefaultComponents::new(sys, dead_prom, disp_prom))
231            }),
232            root_logger: None,
233            config_sources: vec![ConfigSource::Str(MINIMAL_CONFIG.to_string())],
234        }
235    }
236
237    /// Set the name of the system
238    ///
239    /// The label is used as metadata for logging output.
240    #[deprecated(
241        since = "0.11.0",
242        note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::LABEL, ...)` instead."
243    )]
244    pub fn label<I>(&mut self, s: I) -> &mut Self
245    where
246        I: Into<String>,
247    {
248        let v: String = s.into();
249        self.set_config_value(&keys::LABEL, v);
250        self
251    }
252
253    /// Set the maximum number of events/messages to handle before rescheduling a component
254    ///
255    /// Larger values can increase throughput on highly loaded components,
256    /// but at the cost of fairness between components.
257    #[deprecated(
258        since = "0.11.0",
259        note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::THROUGHPUT, ...)` instead."
260    )]
261    pub fn throughput(&mut self, n: usize) -> &mut Self {
262        self.set_config_value(&keys::THROUGHPUT, n);
263        self
264    }
265
266    /// Set the ratio between handling messages and events.
267    ///
268    /// A component will handle up to throughput * r messages
269    /// and throughput * (1-r) events before rescheduling.
270    ///
271    /// If there are less events or messages than alloted queued up
272    /// the remaining allotment will be redistributed to the other type
273    /// until all throughput is used up or no messages or events remain.
274    #[deprecated(
275        since = "0.11.0",
276        note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::MESSAGE_PRIORITY, ...)` instead."
277    )]
278    pub fn msg_priority(&mut self, r: f32) -> &mut Self {
279        self.set_config_value(&keys::MESSAGE_PRIORITY, r);
280        self
281    }
282
283    /// The number of threads in the Kompact thread pool
284    ///
285    /// # Note
286    ///
287    /// You *must* ensure that the selected [scheduler](KompactConfig::scheduler) implementation
288    /// can manage the given number of threads, if you customise this value!
289    #[deprecated(
290        since = "0.11.0",
291        note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::THREADS, ...)` instead."
292    )]
293    pub fn threads(&mut self, n: usize) -> &mut Self {
294        assert!(
295            n > 0,
296            "The number of threads must be more than 0, or no components will be run"
297        );
298        self.set_config_value(&keys::THREADS, n);
299        self
300    }
301
302    /// Set a particular scheduler implementation
303    ///
304    /// Takes a function `f` from the number of threads to a concrete scheduler implementation as argument.
305    pub fn scheduler<F>(&mut self, f: F) -> &mut Self
306    where
307        F: Fn(usize) -> Box<dyn Scheduler> + 'static,
308    {
309        self.set_config_value(&keys::SCHEDULER, "custom".to_string());
310        self.scheduler_builder = Rc::new(f);
311        self
312    }
313
314    /// Set a particular scheduler implementation based on a [FuturesExecutor](executors::FuturesExecutor)
315    ///
316    /// Takes a function `f` from the number of threads to a concrete executor as argument.
317    pub fn executor<E, F>(&mut self, f: F) -> &mut Self
318    where
319        E: FuturesExecutor + Sync + 'static,
320        F: Fn(usize) -> E + 'static,
321    {
322        self.set_config_value(&keys::SCHEDULER, "custom".to_string());
323        let sb = move |t: usize| ExecutorScheduler::from(f(t));
324        self.scheduler_builder = Rc::new(sb);
325        self
326    }
327
328    /// Set a particular timer implementation
329    pub fn timer<T, F>(&mut self, f: F) -> &mut Self
330    where
331        T: TimerComponent + 'static,
332        F: Fn() -> Box<dyn TimerComponent> + 'static,
333    {
334        self.timer_builder = Rc::new(f);
335        self
336    }
337
338    /// Set a particular set of system components
339    ///
340    /// In particular, this allows exchanging the default dispatcher for the
341    /// [NetworkDispatcher](prelude::NetworkDispatcher), which enables the created Kompact system
342    /// to perform network communication.
343    ///
344    /// # Example
345    ///
346    /// For using the network dispatcher, with the default deadletter box:
347    /// ```
348    /// use kompact::prelude::*;
349    ///
350    /// let mut cfg = KompactConfig::new();
351    /// cfg.system_components(DeadletterBox::new, {
352    ///     let net_config = NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
353    ///     net_config.build()
354    /// });
355    /// let system = cfg.build().expect("KompactSystem");
356    /// # system.shutdown().expect("shutdown");
357    /// ```
358    pub fn system_components<B, C, FB, FC>(
359        &mut self,
360        deadletter_fn: FB,
361        dispatcher_fn: FC,
362    ) -> &mut Self
363    where
364        B: ComponentDefinition + ActorRaw<Message = Never> + Sized + 'static,
365        C: ComponentDefinition
366            + ActorRaw<Message = DispatchEnvelope>
367            + Sized
368            + 'static
369            + Dispatcher,
370        FB: Fn(KPromise<()>) -> B + 'static,
371        FC: Fn(KPromise<()>) -> C + 'static,
372    {
373        let sb = move |system: &KompactSystem, dead_prom: KPromise<()>, disp_prom: KPromise<()>| {
374            let deadletter_box = system.create_unsupervised(|| deadletter_fn(dead_prom));
375            let dispatcher = system.create_unsupervised(|| dispatcher_fn(disp_prom));
376
377            let cc = CustomComponents {
378                deadletter_box,
379                dispatcher,
380            };
381            Box::new(cc) as Box<dyn SystemComponents>
382        };
383        self.sc_builder = Rc::new(sb);
384        self
385    }
386
387    /// Set a particular set of system components
388    ///
389    /// This function works just like [system_components](KompactConfig::system_components),
390    /// except that it assigns the dispatcher to its own thread using
391    /// [create_dedicated_unsupervised](KompactSystem::create_dedicated_unsupervised).
392    pub fn system_components_with_dedicated_dispatcher<B, C, FB, FC>(
393        &mut self,
394        deadletter_fn: FB,
395        dispatcher_fn: FC,
396    ) -> &mut Self
397    where
398        B: ComponentDefinition + ActorRaw<Message = Never> + Sized + 'static,
399        C: ComponentDefinition
400            + ActorRaw<Message = DispatchEnvelope>
401            + Sized
402            + 'static
403            + Dispatcher,
404        FB: Fn(KPromise<()>) -> B + 'static,
405        FC: Fn(KPromise<()>) -> C + 'static,
406    {
407        let sb = move |system: &KompactSystem, dead_prom: KPromise<()>, disp_prom: KPromise<()>| {
408            let deadletter_box = system.create_unsupervised(|| deadletter_fn(dead_prom));
409            let dispatcher = system.create_dedicated_unsupervised(|| dispatcher_fn(disp_prom));
410
411            let cc = CustomComponents {
412                deadletter_box,
413                dispatcher,
414            };
415            Box::new(cc) as Box<dyn SystemComponents>
416        };
417        self.sc_builder = Rc::new(sb);
418        self
419    }
420
421    /// Set a particular set of system components
422    ///
423    /// This function works just like [system_components](KompactConfig::system_components),
424    /// except that it assigns the dispatcher is pinned to its own thread using
425    /// [create_dedicated_pinned_unsupervised](KompactSystem::create_dedicated_pinned_unsupervised).
426    #[cfg(feature = "thread_pinning")]
427    pub fn system_components_with_dedicated_dispatcher_pinned<B, C, FB, FC>(
428        &mut self,
429        deadletter_fn: FB,
430        dispatcher_fn: FC,
431        dispatcher_core: CoreId,
432    ) -> &mut Self
433    where
434        B: ComponentDefinition + ActorRaw<Message = Never> + Sized + 'static,
435        C: ComponentDefinition
436            + ActorRaw<Message = DispatchEnvelope>
437            + Sized
438            + 'static
439            + Dispatcher,
440        FB: Fn(KPromise<()>) -> B + 'static,
441        FC: Fn(KPromise<()>) -> C + 'static,
442    {
443        let sb = move |system: &KompactSystem, dead_prom: KPromise<()>, disp_prom: KPromise<()>| {
444            let deadletter_box = system.create_unsupervised(|| deadletter_fn(dead_prom));
445            let dispatcher = system
446                .create_dedicated_pinned_unsupervised(|| dispatcher_fn(disp_prom), dispatcher_core);
447
448            let cc = CustomComponents {
449                deadletter_box,
450                dispatcher,
451            };
452            Box::new(cc) as Box<dyn SystemComponents>
453        };
454        self.sc_builder = Rc::new(sb);
455        self
456    }
457
458    /// Set the logger implementation to use
459    pub fn logger(&mut self, logger: KompactLogger) -> &mut Self {
460        self.root_logger = Some(logger);
461        self
462    }
463
464    /// Load a HOCON config from a file at `path`
465    ///
466    /// This method can be called multiple times, and the resulting configurations will be merged.
467    ///
468    /// It matters in which order configs are loaded and values are set.
469    /// See [HoconLoader](hocon::HoconLoader) for more information.
470    ///
471    /// The loaded config can be accessed via [`system.config()`](KompactSystem::config
472    /// or from within a component via [`self.ctx().config()`](ComponentContext::config.
473    pub fn load_config_file<P>(&mut self, path: P) -> &mut Self
474    where
475        P: Into<PathBuf>,
476    {
477        let p: PathBuf = path.into();
478        self.config_sources.push(ConfigSource::File(p));
479        self
480    }
481
482    /// Load a HOCON config from a string
483    ///
484    /// This method can be called multiple times, and the resulting configurations will be merged.
485    ///
486    /// It matters in which order configs are loaded and values are set.
487    /// See [HoconLoader](hocon::HoconLoader) for more information.
488    ///
489    /// The loaded config can be accessed via [`system.config()`](KompactSystem::config
490    /// or from within a component via [`self.ctx().config()`](ComponentContext::config.
491    pub fn load_config_str<S>(&mut self, config: S) -> &mut Self
492    where
493        S: Into<String>,
494    {
495        let s: String = config.into();
496        self.config_sources.push(ConfigSource::Str(s));
497        self
498    }
499
500    /// Override a single value in the HOCON config
501    ///
502    /// This method can be called multiple times and the resulting configurations will be merged.
503    ///
504    /// It matters in which order configs are loaded and values are set.
505    /// See [HoconLoader](hocon::HoconLoader) for more information.
506    pub fn set_config_value<T>(
507        &mut self,
508        config: &ConfigEntry<T>,
509        value: <T as ConfigValueType>::Value,
510    ) -> &mut Self
511    where
512        T: ConfigValueType,
513    {
514        let value_string = <T as ConfigValueType>::config_string(value);
515        self.config_sources.push(ConfigSource::Str(format!(
516            "{} = {}",
517            config.key, value_string
518        )));
519        self
520    }
521
522    /// Finalise the config and use it create a [KompactSystem](KompactSystem)
523    ///
524    /// This function can fail, if the configuration sets up invalid schedulers
525    /// or dispatchers, for example.
526    ///
527    /// # Example
528    ///
529    /// Build a system with default settings with:
530    ///
531    /// ```
532    /// use kompact::prelude::*;
533    ///
534    /// let system = KompactConfig::default().build().expect("system");
535    /// # system.shutdown().expect("shutdown");
536    /// ```
537    pub fn build(self) -> Result<KompactSystem, KompactError> {
538        KompactSystem::try_new(self)
539    }
540
541    pub(crate) fn max_messages(&self) -> usize {
542        let tpf = self.throughput as f32;
543        let mmf = tpf * self.msg_priority;
544        assert!(mmf >= 0.0, "msg_priority can not be negative!");
545        mmf as usize
546    }
547
548    pub(crate) fn override_from_hocon(&mut self, conf: &Hocon) -> Result<(), ConfigError> {
549        self.label = conf.get_or_default(&keys::LABEL)?;
550        self.throughput = conf.get_or_default(&keys::THROUGHPUT)?;
551        self.msg_priority = conf.get_or_default(&keys::MESSAGE_PRIORITY)?;
552        self.threads = conf.get_or_default(&keys::THREADS)?;
553        let scheduler_option = conf.get_or_default(&keys::SCHEDULER)?;
554        match scheduler_option.as_ref() {
555            "auto" => {
556                self.scheduler_builder = if self.threads <= 32 {
557                    Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t)))
558                } else if self.threads <= 64 {
559                    Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::large_pool(t)))
560                } else {
561                    Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::dyn_pool(t)))
562                };
563            }
564            "small" => {
565                self.scheduler_builder = Rc::new(|t| {
566                    ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t))
567                });
568            }
569            "large" => {
570                self.scheduler_builder = Rc::new(|t| {
571                    ExecutorScheduler::from(crossbeam_workstealing_pool::large_pool(t))
572                });
573            }
574            "dynamic" => {
575                self.scheduler_builder =
576                    Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::dyn_pool(t)))
577            }
578            "custom" => (), // ignore, since the value is already set
579            _ => unreachable!(
580                "Options should be checked by the validator! {} is illegal.",
581                scheduler_option
582            ),
583        }
584
585        Ok(())
586    }
587}
588
589impl Default for KompactConfig {
590    /// Create a default Kompact config
591    ///
592    /// # Defaults
593    ///
594    /// See the following configuration keys for the default values:
595    ///
596    /// - [LABEL](crate::config_keys::system::LABEL)
597    /// - [THROUGHPUT](crate::config_keys::system::THROUGHPUT)
598    /// - [MESSAGE_PRIORITY](crate::config_keys::system::MESSAGE_PRIORITY)
599    /// - [THREADS](crate::config_keys::system::THREADS)
600    ///
601    /// It uses all default components, without networking, with the default timer and default logger.
602    fn default() -> Self {
603        // NOTE: Most of the values we are setting in here don't actually matter
604        // They will be overwritten by the default values of the config keys in `KompactConfig::override_from_hocon`
605        let scheduler_builder: Rc<SchedulerBuilder> =
606            Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t)));
607        KompactConfig {
608            label: "<unset>".to_string(),
609            throughput: 0,
610            msg_priority: 0.0,
611            threads: 1,
612            timer_builder: Rc::new(DefaultTimer::new_timer_component),
613            scheduler_builder,
614            sc_builder: Rc::new(|sys, dead_prom, disp_prom| {
615                Box::new(DefaultComponents::new(sys, dead_prom, disp_prom))
616            }),
617            root_logger: None,
618            config_sources: Vec::new(),
619        }
620    }
621}