Skip to main content

multi_pub_node/
multi_pub_node.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::thread;
4use std::time::Duration;
5
6use serde::Deserialize;
7
8use rs_ctrl_os::{
9    config::ConfigManager, init_logging, start_discovery, PubSubManager, Result, TimeSynchronizer,
10};
11
12/// Dynamic section of the TOML config used for the multi-pub node.
13///
14/// ```toml
15/// [static_config]
16/// my_id = "multi_pub"
17/// host = "127.0.0.1"
18/// port = 5560
19/// is_master = true
20///
21/// [static_config.publishers]
22/// control = "self"
23/// status  = "self"
24/// alerts  = "self"
25///
26/// [dynamic]
27/// message_prefix = "multi"
28/// interval_ms = 500
29/// ```
30#[derive(Clone, Deserialize)]
31struct DynamicCfg {
32    control_sub_topic: String,
33    status_sub_topic: String,
34    alerts_sub_topic: String,
35    interval_ms: u64,
36}
37
38fn main() -> Result<()> {
39    init_logging();
40
41    let config_path = std::env::args()
42        .nth(1)
43        .unwrap_or_else(|| "multi_pub_config.toml".to_string());
44
45    let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
46    let static_cfg = manager.static_cfg().clone();
47
48    let time_sync = Arc::new(TimeSynchronizer::new());
49
50    // This node also participates in discovery (acts as master here).
51    let registry = start_discovery(
52        &static_cfg.my_id,
53        &static_cfg.host,
54        static_cfg.port,
55        static_cfg.is_master,
56        Some(time_sync.clone()),
57    )?;
58
59    // One process, single PUB socket (topic key "control"), multiple sub-topics
60    let mut bus = PubSubManager::new(&static_cfg, registry)?;
61
62    let mut counter: u64 = 0;
63
64    loop {
65        let dyn_cfg = manager.get_dynamic_clone();
66        let ts_ms = time_sync.now_corrected_ms();
67        counter = counter.wrapping_add(1);
68
69        // control sub-topic
70        let control_msg = format!(
71            "[control] {} #{counter} at {} ms",
72            dyn_cfg.control_sub_topic, ts_ms
73        );
74        bus.publish_topic("control", "demo_control", &control_msg)?;
75
76        // status sub-topic
77        let status_msg = format!(
78            "[status] {} #{counter} at {} ms",
79            dyn_cfg.status_sub_topic, ts_ms
80        );
81        bus.publish_topic("control", "demo_status", &status_msg)?;
82
83        // alerts sub-topic
84        let alerts_msg = format!(
85            "[alerts] {} #{counter} at {} ms",
86            dyn_cfg.alerts_sub_topic, ts_ms
87        );
88        bus.publish_topic("control", "demo_alerts", &alerts_msg)?;
89
90        thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
91    }
92}