Skip to main content

sub_node/
sub_node.rs

1use std::sync::Arc;
2use std::thread;
3use std::time::Duration;
4
5use serde::Deserialize;
6
7use rs_ctrl_os::{
8    init_logging, load_config_typed, start_discovery, PubSubManager, Result, TimeSynchronizer,
9};
10
11/// 本示例不需要热重载,使用 load_config_typed 一次性加载。
12#[derive(Clone, Deserialize)]
13struct DynamicCfg {}
14
15fn fmt_hex(bytes: &[u8]) -> String {
16    let mut s = String::with_capacity(bytes.len() * 3);
17    for (i, b) in bytes.iter().enumerate() {
18        if i > 0 {
19            s.push(' ');
20        }
21        use std::fmt::Write as _;
22        let _ = write!(&mut s, "{:02X}", b);
23    }
24    s
25}
26
27fn main() -> Result<()> {
28    init_logging();
29
30    let config_path = std::env::args()
31        .nth(1)
32        .unwrap_or_else(|| "sub_config.toml".to_string());
33
34    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
35
36    let time_sync = Arc::new(TimeSynchronizer::new());
37
38    // Subscriber participates in discovery so it can learn where `pub_node` is.
39    let registry = start_discovery(
40        &static_cfg.my_id,
41        &static_cfg.host,
42        static_cfg.port,
43        static_cfg.is_master,
44        Some(time_sync.clone()),
45    )?;
46
47    // Subscriber only: one SUB socket named "local_sub"
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    loop {
51        // try_recv_raw 内部自动 tick(),无需手动调用
52        // 返回值 (sender_id, sub_topic, payload) 告知消息来自哪个节点
53        while let Some((sender, topic, raw)) = bus.try_recv_raw("local_sub")? {
54            // rs_ctrl_os publish_topic() uses bincode; can_bridge currently publishes a String(JSON) under sub_topic="data".
55            if let Ok(s) = bincode::deserialize::<String>(&raw) {
56                println!("Sub from={sender} sub_topic={topic} string={s}");
57            } else {
58                let as_utf8 = std::str::from_utf8(&raw).ok();
59                println!(
60                    "Sub from={sender} sub_topic={topic} len={} utf8={} hex={}",
61                    raw.len(),
62                    as_utf8.unwrap_or("<non-utf8>"),
63                    fmt_hex(&raw)
64                );
65            }
66        }
67
68        // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
69        thread::sleep(Duration::from_millis(1));
70    }
71}