1use super::*;
2
3use crate::{
4 config::{ConfigEntry, ConfigError, ConfigValueType, HoconExt},
5 messaging::DispatchEnvelope,
6};
7use executors::*;
8use hocon::Hocon;
9use std::{fmt, path::PathBuf, rc::Rc};
10
11pub mod keys {
13 use super::*;
14 use crate::config::*;
15
16 kompact_config! {
17 LABEL,
18 key = "kompact.runtime.label",
19 type = StringValue,
20 default = default_runtime_label(),
21 doc = r#"The system label.
22
23# Default
24
25The default value is `kompact-runtime-` followed by a unique number (for this process).
26 "#,
27 version = "0.11"
28 }
29
30 kompact_config! {
31 THROUGHPUT,
32 key = "kompact.runtime.throughput",
33 type = UsizeValue,
34 default = 50,
35 validate = |value| *value > 0,
36 doc = r#"The scheduling granularity of a component.
37
38This settings determines the maximum number of messages and events a component will handle before re-scheduling itself to cede compute resources to other components.
39Thus, this settings allows users to tune fairness vs. throughput.
40Smaller values will give better fairness, while larger values will increase potential throughput.
41The exact value depends on your workload per event.
42If each component does a lot compute for every event, then a low value will be sufficient to counter scheduling overhead.
43If, however, the work per event is very low, performance will generally increase with higher values.
44
45# Legal Values
46
47Values must be `> 0`.
48
49# Default
50
51The default value is 50, which is tuned for relatively low-compute workloads (e.g., a few arithmetic operations per events and mayb a collection update/lookup).
52 "#,
53 version = "0.11"
54 }
55
56 kompact_config! {
57 MESSAGE_PRIORITY,
58 key = "kompact.runtime.message-priority",
59 type = F32Value,
60 default = 0.5,
61 validate = |value| 0.0 < *value && *value < 1.0,
62 doc = r#"The ratio between handling messages and events.
63
64A component will handle up to *throughput × r* messages and *throughput × (1 - r)* events before re-scheduling.
65
66If there are fewer events or messages than alloted queued up the remaining allotment will be redistributed to the other type until all throughput is used up or no messages or events remain.
67
68# Legal Values
69
70Values must be `> 0.0` and `< 1.0`.
71
72Also note that this setting will not allow either type to be completely starved out.
73Once multiplied with throughput a minimum of 1 message or event will be enforced.
74This means you might not get the ratio you expect with low [throughput](crate::config_keys::system::THROUGHPUT) values.
75
76# Default
77
78The default value is 0.5, i.e. an even split.
79 "#,
80 version = "0.11"
81 }
82
83 kompact_config! {
84 THREADS,
85 key = "kompact.runtime.threads",
86 type = UsizeValue,
87 default = std::cmp::max(1, num_cpus::get()),
88 validate = |value| *value > 0,
89 doc = r#"The ratio between handling messages and events.
90
91A component will handle up to *throughput × r* messages and *throughput × (1 - r)* events before re-scheduling.
92
93If there are fewer events or messages than alloted queued up the remaining allotment will be redistributed to the other type until all throughput is used up or no messages or events remain.
94
95# Legal Values
96
97Values must be `> 0`.
98
99# Default
100
101The default value is 1 per cpu core or a minimum of 1.
102 "#,
103 version = "0.11"
104 }
105
106 kompact_config! {
107 SCHEDULER,
108 key = "kompact.runtime.scheduler",
109 type = StringValue,
110 default = "auto".to_string(),
111 validate = |value| ["auto", "small", "large", "dynamic", "custom"].contains(&value.as_ref()),
112 doc = r#"The scheduler implementation to use for the system.
113
114A component will handle up to *throughput × r* messages and *throughput × (1 - r)* events before re-scheduling.
115
116If there are fewer events or messages than alloted queued up the remaining allotment will be redistributed to the other type until all throughput is used up or no messages or events remain.
117
118# Legal Values
119
120- `auto`: Automatically pick the right implementation of [crossbeam_workstealing_pool](crossbeam_workstealing_pool)
121- `small`: Use [small_pool](crossbeam_workstealing_pool::small_pool)
122- `large`: Use [large_pool](crossbeam_workstealing_pool::large_pool)
123- `dynamic`: Use [dyn_pool](crossbeam_workstealing_pool::dyn_pool)
124- `custom`: Use the scheduler provided via [KompactConfig::scheduler](KompactConfig::scheduler) or [KompactConfig::executor](KompactConfig::executor) (this will be set automatically when you use those functions).
125
126# Default
127
128The default value is `auto`.
129 "#,
130 version = "0.11"
131 }
132}
133
134#[derive(Debug, Clone)]
135pub(crate) enum ConfigSource {
136 File(PathBuf),
137 Str(String),
138}
139
140#[derive(Clone)]
158pub struct KompactConfig {
159 pub(crate) label: String,
160 pub(crate) throughput: usize,
161 pub(crate) msg_priority: f32,
162 pub(crate) threads: usize,
163 pub(crate) timer_builder: Rc<TimerBuilder>,
164 pub(crate) scheduler_builder: Rc<SchedulerBuilder>,
165 pub(crate) sc_builder: Rc<ScBuilder>,
166 pub(crate) root_logger: Option<KompactLogger>,
167 pub(crate) config_sources: Vec<ConfigSource>,
168}
169
170impl fmt::Debug for KompactConfig {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 write!(
173 f,
174 "KompactConfig{{
175 label={},
176 throughput={},
177 msg_priority={},
178 threads={},
179 timer_builder=<function>,
180 scheduler_builder=<function>,
181 sc_builder=<function>,
182 root_logger={:?},
183 config_sources={:?}
184 }}",
185 self.label,
186 self.throughput,
187 self.msg_priority,
188 self.threads,
189 self.root_logger,
190 self.config_sources,
191 )
192 }
193}
194
195pub const MINIMAL_CONFIG: &str = r#"
201 kompact.system {
202 throughput = 2
203 message-priority = 0.5
204 threads = 1
205 }
206"#;
207
208impl KompactConfig {
209 #[deprecated(
216 since = "0.11.0",
217 note = "If you really want these exact parameters, use `KompactConfig::load_config_str(kompact::runtime::MINIMAL_CONFIG)` instead. Otherwise prefer `KompactConfig::default()`."
218 )]
219 pub fn new() -> KompactConfig {
220 KompactConfig {
221 label: "<unset>".to_string(),
222 throughput: 0,
223 msg_priority: 0.0,
224 threads: 1,
225 timer_builder: Rc::new(DefaultTimer::new_timer_component),
226 scheduler_builder: Rc::new(|t| {
227 ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t))
228 }),
229 sc_builder: Rc::new(|sys, dead_prom, disp_prom| {
230 Box::new(DefaultComponents::new(sys, dead_prom, disp_prom))
231 }),
232 root_logger: None,
233 config_sources: vec![ConfigSource::Str(MINIMAL_CONFIG.to_string())],
234 }
235 }
236
237 #[deprecated(
241 since = "0.11.0",
242 note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::LABEL, ...)` instead."
243 )]
244 pub fn label<I>(&mut self, s: I) -> &mut Self
245 where
246 I: Into<String>,
247 {
248 let v: String = s.into();
249 self.set_config_value(&keys::LABEL, v);
250 self
251 }
252
253 #[deprecated(
258 since = "0.11.0",
259 note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::THROUGHPUT, ...)` instead."
260 )]
261 pub fn throughput(&mut self, n: usize) -> &mut Self {
262 self.set_config_value(&keys::THROUGHPUT, n);
263 self
264 }
265
266 #[deprecated(
275 since = "0.11.0",
276 note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::MESSAGE_PRIORITY, ...)` instead."
277 )]
278 pub fn msg_priority(&mut self, r: f32) -> &mut Self {
279 self.set_config_value(&keys::MESSAGE_PRIORITY, r);
280 self
281 }
282
283 #[deprecated(
290 since = "0.11.0",
291 note = "Use `KompactConfig::set_config_value(&kompact::config_keys::system::THREADS, ...)` instead."
292 )]
293 pub fn threads(&mut self, n: usize) -> &mut Self {
294 assert!(
295 n > 0,
296 "The number of threads must be more than 0, or no components will be run"
297 );
298 self.set_config_value(&keys::THREADS, n);
299 self
300 }
301
302 pub fn scheduler<F>(&mut self, f: F) -> &mut Self
306 where
307 F: Fn(usize) -> Box<dyn Scheduler> + 'static,
308 {
309 self.set_config_value(&keys::SCHEDULER, "custom".to_string());
310 self.scheduler_builder = Rc::new(f);
311 self
312 }
313
314 pub fn executor<E, F>(&mut self, f: F) -> &mut Self
318 where
319 E: FuturesExecutor + Sync + 'static,
320 F: Fn(usize) -> E + 'static,
321 {
322 self.set_config_value(&keys::SCHEDULER, "custom".to_string());
323 let sb = move |t: usize| ExecutorScheduler::from(f(t));
324 self.scheduler_builder = Rc::new(sb);
325 self
326 }
327
328 pub fn timer<T, F>(&mut self, f: F) -> &mut Self
330 where
331 T: TimerComponent + 'static,
332 F: Fn() -> Box<dyn TimerComponent> + 'static,
333 {
334 self.timer_builder = Rc::new(f);
335 self
336 }
337
338 pub fn system_components<B, C, FB, FC>(
359 &mut self,
360 deadletter_fn: FB,
361 dispatcher_fn: FC,
362 ) -> &mut Self
363 where
364 B: ComponentDefinition + ActorRaw<Message = Never> + Sized + 'static,
365 C: ComponentDefinition
366 + ActorRaw<Message = DispatchEnvelope>
367 + Sized
368 + 'static
369 + Dispatcher,
370 FB: Fn(KPromise<()>) -> B + 'static,
371 FC: Fn(KPromise<()>) -> C + 'static,
372 {
373 let sb = move |system: &KompactSystem, dead_prom: KPromise<()>, disp_prom: KPromise<()>| {
374 let deadletter_box = system.create_unsupervised(|| deadletter_fn(dead_prom));
375 let dispatcher = system.create_unsupervised(|| dispatcher_fn(disp_prom));
376
377 let cc = CustomComponents {
378 deadletter_box,
379 dispatcher,
380 };
381 Box::new(cc) as Box<dyn SystemComponents>
382 };
383 self.sc_builder = Rc::new(sb);
384 self
385 }
386
387 pub fn system_components_with_dedicated_dispatcher<B, C, FB, FC>(
393 &mut self,
394 deadletter_fn: FB,
395 dispatcher_fn: FC,
396 ) -> &mut Self
397 where
398 B: ComponentDefinition + ActorRaw<Message = Never> + Sized + 'static,
399 C: ComponentDefinition
400 + ActorRaw<Message = DispatchEnvelope>
401 + Sized
402 + 'static
403 + Dispatcher,
404 FB: Fn(KPromise<()>) -> B + 'static,
405 FC: Fn(KPromise<()>) -> C + 'static,
406 {
407 let sb = move |system: &KompactSystem, dead_prom: KPromise<()>, disp_prom: KPromise<()>| {
408 let deadletter_box = system.create_unsupervised(|| deadletter_fn(dead_prom));
409 let dispatcher = system.create_dedicated_unsupervised(|| dispatcher_fn(disp_prom));
410
411 let cc = CustomComponents {
412 deadletter_box,
413 dispatcher,
414 };
415 Box::new(cc) as Box<dyn SystemComponents>
416 };
417 self.sc_builder = Rc::new(sb);
418 self
419 }
420
421 #[cfg(feature = "thread_pinning")]
427 pub fn system_components_with_dedicated_dispatcher_pinned<B, C, FB, FC>(
428 &mut self,
429 deadletter_fn: FB,
430 dispatcher_fn: FC,
431 dispatcher_core: CoreId,
432 ) -> &mut Self
433 where
434 B: ComponentDefinition + ActorRaw<Message = Never> + Sized + 'static,
435 C: ComponentDefinition
436 + ActorRaw<Message = DispatchEnvelope>
437 + Sized
438 + 'static
439 + Dispatcher,
440 FB: Fn(KPromise<()>) -> B + 'static,
441 FC: Fn(KPromise<()>) -> C + 'static,
442 {
443 let sb = move |system: &KompactSystem, dead_prom: KPromise<()>, disp_prom: KPromise<()>| {
444 let deadletter_box = system.create_unsupervised(|| deadletter_fn(dead_prom));
445 let dispatcher = system
446 .create_dedicated_pinned_unsupervised(|| dispatcher_fn(disp_prom), dispatcher_core);
447
448 let cc = CustomComponents {
449 deadletter_box,
450 dispatcher,
451 };
452 Box::new(cc) as Box<dyn SystemComponents>
453 };
454 self.sc_builder = Rc::new(sb);
455 self
456 }
457
458 pub fn logger(&mut self, logger: KompactLogger) -> &mut Self {
460 self.root_logger = Some(logger);
461 self
462 }
463
464 pub fn load_config_file<P>(&mut self, path: P) -> &mut Self
474 where
475 P: Into<PathBuf>,
476 {
477 let p: PathBuf = path.into();
478 self.config_sources.push(ConfigSource::File(p));
479 self
480 }
481
482 pub fn load_config_str<S>(&mut self, config: S) -> &mut Self
492 where
493 S: Into<String>,
494 {
495 let s: String = config.into();
496 self.config_sources.push(ConfigSource::Str(s));
497 self
498 }
499
500 pub fn set_config_value<T>(
507 &mut self,
508 config: &ConfigEntry<T>,
509 value: <T as ConfigValueType>::Value,
510 ) -> &mut Self
511 where
512 T: ConfigValueType,
513 {
514 let value_string = <T as ConfigValueType>::config_string(value);
515 self.config_sources.push(ConfigSource::Str(format!(
516 "{} = {}",
517 config.key, value_string
518 )));
519 self
520 }
521
522 pub fn build(self) -> Result<KompactSystem, KompactError> {
538 KompactSystem::try_new(self)
539 }
540
541 pub(crate) fn max_messages(&self) -> usize {
542 let tpf = self.throughput as f32;
543 let mmf = tpf * self.msg_priority;
544 assert!(mmf >= 0.0, "msg_priority can not be negative!");
545 mmf as usize
546 }
547
548 pub(crate) fn override_from_hocon(&mut self, conf: &Hocon) -> Result<(), ConfigError> {
549 self.label = conf.get_or_default(&keys::LABEL)?;
550 self.throughput = conf.get_or_default(&keys::THROUGHPUT)?;
551 self.msg_priority = conf.get_or_default(&keys::MESSAGE_PRIORITY)?;
552 self.threads = conf.get_or_default(&keys::THREADS)?;
553 let scheduler_option = conf.get_or_default(&keys::SCHEDULER)?;
554 match scheduler_option.as_ref() {
555 "auto" => {
556 self.scheduler_builder = if self.threads <= 32 {
557 Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t)))
558 } else if self.threads <= 64 {
559 Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::large_pool(t)))
560 } else {
561 Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::dyn_pool(t)))
562 };
563 }
564 "small" => {
565 self.scheduler_builder = Rc::new(|t| {
566 ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t))
567 });
568 }
569 "large" => {
570 self.scheduler_builder = Rc::new(|t| {
571 ExecutorScheduler::from(crossbeam_workstealing_pool::large_pool(t))
572 });
573 }
574 "dynamic" => {
575 self.scheduler_builder =
576 Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::dyn_pool(t)))
577 }
578 "custom" => (), _ => unreachable!(
580 "Options should be checked by the validator! {} is illegal.",
581 scheduler_option
582 ),
583 }
584
585 Ok(())
586 }
587}
588
589impl Default for KompactConfig {
590 fn default() -> Self {
603 let scheduler_builder: Rc<SchedulerBuilder> =
606 Rc::new(|t| ExecutorScheduler::from(crossbeam_workstealing_pool::small_pool(t)));
607 KompactConfig {
608 label: "<unset>".to_string(),
609 throughput: 0,
610 msg_priority: 0.0,
611 threads: 1,
612 timer_builder: Rc::new(DefaultTimer::new_timer_component),
613 scheduler_builder,
614 sc_builder: Rc::new(|sys, dead_prom, disp_prom| {
615 Box::new(DefaultComponents::new(sys, dead_prom, disp_prom))
616 }),
617 root_logger: None,
618 config_sources: Vec::new(),
619 }
620 }
621}