Skip to main content

multi_sub_node/
multi_sub_node.rs

1use std::sync::Arc;
2use std::thread;
3use std::time::Duration;
4
5use serde::Deserialize;
6
7use rs_ctrl_os::{
8    init_logging, load_config_typed, start_discovery, PubSubManager, Result, TimeSynchronizer,
9};
10
11/// 本示例不需要热重载,使用 load_config_typed 一次性加载。
12#[derive(Clone, Deserialize)]
13struct DynamicCfg {}
14
15fn main() -> Result<()> {
16    init_logging();
17
18    let config_path = std::env::args()
19        .nth(1)
20        .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24    let time_sync = Arc::new(TimeSynchronizer::new());
25
26    // Participate in discovery to learn where `multi_pub` lives.
27    let registry = start_discovery(
28        &static_cfg.my_id,
29        &static_cfg.host,
30        static_cfg.port,
31        static_cfg.is_master,
32        Some(time_sync.clone()),
33    )?;
34
35    let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37    // 只关心各远端节点下“一个” sub_topic:
38    // - from_multi_pub: 只订阅 demo_status
39    // - from_pub:       只订阅 demo
40    bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41    bus.set_sub_topics("from_pub", &["demo"])?;
42
43    loop {
44        // try_recv_raw 内部自动 tick(),无需手动调用
45        // 多端口(多远端节点)+ 多子话题:
46        // - "from_multi_pub" 订阅 multi_pub 节点
47        // - "from_pub"       订阅 pub_node 节点
48        for local_name in &["from_multi_pub", "from_pub"] {
49            if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50                if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51                    println!(
52                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' text=\"{}\"",
53                        sender, local_name, topic, text
54                    );
55                } else {
56                    println!(
57                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58                        sender, local_name, topic
59                    );
60                }
61            }
62        }
63
64        // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65        thread::sleep(Duration::from_millis(1));
66    }
67}