Skip to main content

PubSubManager

Struct PubSubManager 

Source
pub struct PubSubManager { /* private fields */ }
Expand description

Pub/Sub 管理器

  • 发布频率控制:通过 publish_hz 限制 publish_topic 的最大发送速率。
  • 订阅频率控制:通过 subscribe_hz 限制 try_recv_* 的轮询频率。 频率配置建议从各节点的 [dynamic] 中传入(如 publish_hz / subscribe_hz)。

Implementations§

Source§

impl PubSubManager

Source

pub fn new(static_cfg: &StaticBase, registry: ServiceRegistry) -> Result<Self>

Examples found in repository?
examples/pub_node.rs (line 42)
18fn main() -> Result<()> {
19    init_logging();
20
21    let config_path = std::env::args()
22        .nth(1)
23        .unwrap_or_else(|| "pub_config.toml".to_string());
24
25    let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
26    let static_cfg = manager.static_cfg().clone();
27
28    // Time synchronizer for this node (acts as master in this simple setup)
29    let time_sync = Arc::new(TimeSynchronizer::new());
30
31    // Start discovery so other nodes can find us; we also get a live registry back.
32    let registry = start_discovery(
33        &static_cfg.my_id,
34        &static_cfg.host,
35        static_cfg.port,
36        static_cfg.is_master,
37        Some(time_sync.clone()),
38    )?;
39
40    // Publisher only: one PUB socket for topic "control"
41    // 频率由 static_config 的 publish_hz/subscribe_hz 提供,new() 时已注入
42    let mut bus = PubSubManager::new(&static_cfg, registry)?;
43
44    loop {
45        let dyn_cfg = manager.get_dynamic_clone();
46        let ts_ms = time_sync.now_corrected_ms();
47
48        let payload = format!(
49            "{} from {} at {} ms",
50            dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
51        );
52
53        // Single pub: topic key "control", sub-topic "demo"
54        bus.publish_topic("control", "demo", &payload)?;
55
56        thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
57    }
58}
More examples
Hide additional examples
examples/sub_node.rs (line 48)
27fn main() -> Result<()> {
28    init_logging();
29
30    let config_path = std::env::args()
31        .nth(1)
32        .unwrap_or_else(|| "sub_config.toml".to_string());
33
34    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
35
36    let time_sync = Arc::new(TimeSynchronizer::new());
37
38    // Subscriber participates in discovery so it can learn where `pub_node` is.
39    let registry = start_discovery(
40        &static_cfg.my_id,
41        &static_cfg.host,
42        static_cfg.port,
43        static_cfg.is_master,
44        Some(time_sync.clone()),
45    )?;
46
47    // Subscriber only: one SUB socket named "local_sub"
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    loop {
51        // try_recv_raw 内部自动 tick(),无需手动调用
52        // 返回值 (sender_id, sub_topic, payload) 告知消息来自哪个节点
53        while let Some((sender, topic, raw)) = bus.try_recv_raw("local_sub")? {
54            // rs_ctrl_os publish_topic() uses bincode; can_bridge currently publishes a String(JSON) under sub_topic="data".
55            if let Ok(s) = bincode::deserialize::<String>(&raw) {
56                println!("Sub from={sender} sub_topic={topic} string={s}");
57            } else {
58                let as_utf8 = std::str::from_utf8(&raw).ok();
59                println!(
60                    "Sub from={sender} sub_topic={topic} len={} utf8={} hex={}",
61                    raw.len(),
62                    as_utf8.unwrap_or("<non-utf8>"),
63                    fmt_hex(&raw)
64                );
65            }
66        }
67
68        // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
69        thread::sleep(Duration::from_millis(1));
70    }
71}
examples/multi_pub_node.rs (line 60)
38fn main() -> Result<()> {
39    init_logging();
40
41    let config_path = std::env::args()
42        .nth(1)
43        .unwrap_or_else(|| "multi_pub_config.toml".to_string());
44
45    let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
46    let static_cfg = manager.static_cfg().clone();
47
48    let time_sync = Arc::new(TimeSynchronizer::new());
49
50    // This node also participates in discovery (acts as master here).
51    let registry = start_discovery(
52        &static_cfg.my_id,
53        &static_cfg.host,
54        static_cfg.port,
55        static_cfg.is_master,
56        Some(time_sync.clone()),
57    )?;
58
59    // One process, single PUB socket (topic key "control"), multiple sub-topics
60    let mut bus = PubSubManager::new(&static_cfg, registry)?;
61
62    let mut counter: u64 = 0;
63
64    loop {
65        let dyn_cfg = manager.get_dynamic_clone();
66        let ts_ms = time_sync.now_corrected_ms();
67        counter = counter.wrapping_add(1);
68
69        // control sub-topic
70        let control_msg = format!(
71            "[control] {} #{counter} at {} ms",
72            dyn_cfg.control_sub_topic, ts_ms
73        );
74        bus.publish_topic("control", "demo_control", &control_msg)?;
75
76        // status sub-topic
77        let status_msg = format!(
78            "[status] {} #{counter} at {} ms",
79            dyn_cfg.status_sub_topic, ts_ms
80        );
81        bus.publish_topic("control", "demo_status", &status_msg)?;
82
83        // alerts sub-topic
84        let alerts_msg = format!(
85            "[alerts] {} #{counter} at {} ms",
86            dyn_cfg.alerts_sub_topic, ts_ms
87        );
88        bus.publish_topic("control", "demo_alerts", &alerts_msg)?;
89
90        thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
91    }
92}
examples/rpc_arm_node.rs (line 85)
66fn main() -> Result<()> {
67    init_logging();
68
69    let config_path = std::env::args()
70        .nth(1)
71        .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75    let time_sync = Arc::new(TimeSynchronizer::new());
76
77    let registry = start_discovery(
78        &static_cfg.my_id,
79        &static_cfg.host,
80        static_cfg.port,
81        static_cfg.is_master,
82        Some(time_sync.clone()),
83    )?;
84
85    let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87    // Only accept RPC request messages with sub_topic "rpc/cmd".
88    bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90    println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92    // Receive up to 10 requests then exit (demonstration limit).
93    let mut count = 0;
94    while count < 10 {
95        // try_recv_request returns only messages with a valid RPC request envelope.
96        if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97            if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98                println!(
99                    "[arm]  RPC request #{} rid={} from={} topic={} cmd={:?}",
100                    count, rid, sender, topic, cmd
101                );
102
103                let status = handle_command(&cmd);
104                let response = bincode::serialize(&status)?;
105
106                // Send response back to the control node.
107                bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108                println!("[arm]  -> reply: {:?}", status);
109                count += 1;
110            } else {
111                println!("[arm]  RPC request rid={} from={} <deserialize failed>", rid, sender);
112            }
113        } else {
114            thread::sleep(Duration::from_millis(10));
115        }
116    }
117
118    println!("[arm] Processed {} requests, shutting down.", count);
119    Ok(())
120}
examples/multi_sub_node.rs (line 35)
15fn main() -> Result<()> {
16    init_logging();
17
18    let config_path = std::env::args()
19        .nth(1)
20        .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24    let time_sync = Arc::new(TimeSynchronizer::new());
25
26    // Participate in discovery to learn where `multi_pub` lives.
27    let registry = start_discovery(
28        &static_cfg.my_id,
29        &static_cfg.host,
30        static_cfg.port,
31        static_cfg.is_master,
32        Some(time_sync.clone()),
33    )?;
34
35    let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37    // 只关心各远端节点下“一个” sub_topic:
38    // - from_multi_pub: 只订阅 demo_status
39    // - from_pub:       只订阅 demo
40    bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41    bus.set_sub_topics("from_pub", &["demo"])?;
42
43    loop {
44        // try_recv_raw 内部自动 tick(),无需手动调用
45        // 多端口(多远端节点)+ 多子话题:
46        // - "from_multi_pub" 订阅 multi_pub 节点
47        // - "from_pub"       订阅 pub_node 节点
48        for local_name in &["from_multi_pub", "from_pub"] {
49            if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50                if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51                    println!(
52                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' text=\"{}\"",
53                        sender, local_name, topic, text
54                    );
55                } else {
56                    println!(
57                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58                        sender, local_name, topic
59                    );
60                }
61            }
62        }
63
64        // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65        thread::sleep(Duration::from_millis(1));
66    }
67}
examples/rpc_control_node.rs (line 48)
28fn main() -> Result<()> {
29    init_logging();
30
31    let config_path = std::env::args()
32        .nth(1)
33        .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37    let time_sync = Arc::new(TimeSynchronizer::new());
38
39    // Participates in discovery to find the arm node.
40    let registry = start_discovery(
41        &static_cfg.my_id,
42        &static_cfg.host,
43        static_cfg.port,
44        static_cfg.is_master,
45        Some(time_sync.clone()),
46    )?;
47
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    // Demo: send 5 distinct RPC commands, each with its own request_id.
51    let commands = vec![
52        ArmCommand {
53            command: "move_to".into(),
54            args: vec!["x:100".into(), "y:200".into()],
55        },
56        ArmCommand {
57            command: "set_speed".into(),
58            args: vec!["50".into()],
59        },
60        ArmCommand {
61            command: "grab".into(),
62            args: vec!["width:30".into()],
63        },
64        ArmCommand {
65            command: "move_to".into(),
66            args: vec!["x:300".into(), "y:400".into()],
67        },
68        ArmCommand {
69            command: "release".into(),
70            args: vec![],
71        },
72    ];
73
74    let mut pending: Vec<u64> = Vec::new();
75
76    for (i, cmd) in commands.into_iter().enumerate() {
77        let rid = i as u64;
78        let payload = bincode::serialize(&cmd)?;
79
80        println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81        bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82        pending.push(rid);
83
84        thread::sleep(Duration::from_millis(200)); // stagger sends
85    }
86
87    // Poll for responses with a timeout.
88    println!(
89        "[control] Waiting for {} response(s) from arm...",
90        pending.len()
91    );
92
93    let mut matched = 0u64;
94    let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96    // Filter: only accept RPC responses on this local_name.
97    // The sub_topic "rpc/status" is used by the arm node for replies.
98    bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100    while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101        // try_recv_response only returns messages with a valid RPC response envelope.
102        if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103            if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104                println!(
105                    "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106                    status
107                );
108                matched += 1;
109            } else {
110                println!(
111                    "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112                );
113            }
114        } else {
115            // No message yet — brief sleep to avoid busy spin.
116            thread::sleep(Duration::from_millis(10));
117        }
118    }
119
120    println!(
121        "[control] Done. {}/{} RPC responses received.",
122        matched,
123        pending.len()
124    );
125    Ok(())
126}
Source

pub fn set_publish_hz(&mut self, hz: i64)

设置节点级发布频率(Hz)。

  • hz > 0:对所有 publish_topic 生效,按最小时间间隔限频;
  • hz = 0:动态频率(有多少发多快,仍受 ZMQ HWM 影响)。
  • hz < 0:不发布(publish_topic 直接返回 Ok(()))。
Source

pub fn set_subscribe_hz(&mut self, hz: i64)

设置节点级订阅/处理频率(Hz)。

  • hz > 0:对所有 try_recv_* 生效,按最小时间间隔限频;
  • hz = 0:动态频率(按调用频率尝试收取)。
  • hz < 0:不订阅/不消费(直接返回 Ok(None))。
Source

pub fn set_sub_topics<S: AsRef<str>>( &mut self, local_name: &str, topics: &[S], ) -> Result<()>

为指定本地订阅名配置需要保留的 sub_topic 列表。

  • topics 为空:不过滤任何 sub_topic(保留所有)。
  • 非空:仅当收到的 sub_topic 在此列表中时才返回;其他 sub_topic 会被静默丢弃。
Examples found in repository?
examples/rpc_arm_node.rs (line 88)
66fn main() -> Result<()> {
67    init_logging();
68
69    let config_path = std::env::args()
70        .nth(1)
71        .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75    let time_sync = Arc::new(TimeSynchronizer::new());
76
77    let registry = start_discovery(
78        &static_cfg.my_id,
79        &static_cfg.host,
80        static_cfg.port,
81        static_cfg.is_master,
82        Some(time_sync.clone()),
83    )?;
84
85    let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87    // Only accept RPC request messages with sub_topic "rpc/cmd".
88    bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90    println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92    // Receive up to 10 requests then exit (demonstration limit).
93    let mut count = 0;
94    while count < 10 {
95        // try_recv_request returns only messages with a valid RPC request envelope.
96        if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97            if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98                println!(
99                    "[arm]  RPC request #{} rid={} from={} topic={} cmd={:?}",
100                    count, rid, sender, topic, cmd
101                );
102
103                let status = handle_command(&cmd);
104                let response = bincode::serialize(&status)?;
105
106                // Send response back to the control node.
107                bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108                println!("[arm]  -> reply: {:?}", status);
109                count += 1;
110            } else {
111                println!("[arm]  RPC request rid={} from={} <deserialize failed>", rid, sender);
112            }
113        } else {
114            thread::sleep(Duration::from_millis(10));
115        }
116    }
117
118    println!("[arm] Processed {} requests, shutting down.", count);
119    Ok(())
120}
More examples
Hide additional examples
examples/multi_sub_node.rs (line 40)
15fn main() -> Result<()> {
16    init_logging();
17
18    let config_path = std::env::args()
19        .nth(1)
20        .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24    let time_sync = Arc::new(TimeSynchronizer::new());
25
26    // Participate in discovery to learn where `multi_pub` lives.
27    let registry = start_discovery(
28        &static_cfg.my_id,
29        &static_cfg.host,
30        static_cfg.port,
31        static_cfg.is_master,
32        Some(time_sync.clone()),
33    )?;
34
35    let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37    // 只关心各远端节点下“一个” sub_topic:
38    // - from_multi_pub: 只订阅 demo_status
39    // - from_pub:       只订阅 demo
40    bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41    bus.set_sub_topics("from_pub", &["demo"])?;
42
43    loop {
44        // try_recv_raw 内部自动 tick(),无需手动调用
45        // 多端口(多远端节点)+ 多子话题:
46        // - "from_multi_pub" 订阅 multi_pub 节点
47        // - "from_pub"       订阅 pub_node 节点
48        for local_name in &["from_multi_pub", "from_pub"] {
49            if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50                if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51                    println!(
52                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' text=\"{}\"",
53                        sender, local_name, topic, text
54                    );
55                } else {
56                    println!(
57                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58                        sender, local_name, topic
59                    );
60                }
61            }
62        }
63
64        // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65        thread::sleep(Duration::from_millis(1));
66    }
67}
examples/rpc_control_node.rs (line 98)
28fn main() -> Result<()> {
29    init_logging();
30
31    let config_path = std::env::args()
32        .nth(1)
33        .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37    let time_sync = Arc::new(TimeSynchronizer::new());
38
39    // Participates in discovery to find the arm node.
40    let registry = start_discovery(
41        &static_cfg.my_id,
42        &static_cfg.host,
43        static_cfg.port,
44        static_cfg.is_master,
45        Some(time_sync.clone()),
46    )?;
47
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    // Demo: send 5 distinct RPC commands, each with its own request_id.
51    let commands = vec![
52        ArmCommand {
53            command: "move_to".into(),
54            args: vec!["x:100".into(), "y:200".into()],
55        },
56        ArmCommand {
57            command: "set_speed".into(),
58            args: vec!["50".into()],
59        },
60        ArmCommand {
61            command: "grab".into(),
62            args: vec!["width:30".into()],
63        },
64        ArmCommand {
65            command: "move_to".into(),
66            args: vec!["x:300".into(), "y:400".into()],
67        },
68        ArmCommand {
69            command: "release".into(),
70            args: vec![],
71        },
72    ];
73
74    let mut pending: Vec<u64> = Vec::new();
75
76    for (i, cmd) in commands.into_iter().enumerate() {
77        let rid = i as u64;
78        let payload = bincode::serialize(&cmd)?;
79
80        println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81        bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82        pending.push(rid);
83
84        thread::sleep(Duration::from_millis(200)); // stagger sends
85    }
86
87    // Poll for responses with a timeout.
88    println!(
89        "[control] Waiting for {} response(s) from arm...",
90        pending.len()
91    );
92
93    let mut matched = 0u64;
94    let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96    // Filter: only accept RPC responses on this local_name.
97    // The sub_topic "rpc/status" is used by the arm node for replies.
98    bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100    while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101        // try_recv_response only returns messages with a valid RPC response envelope.
102        if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103            if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104                println!(
105                    "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106                    status
107                );
108                matched += 1;
109            } else {
110                println!(
111                    "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112                );
113            }
114        } else {
115            // No message yet — brief sleep to avoid busy spin.
116            thread::sleep(Duration::from_millis(10));
117        }
118    }
119
120    println!(
121        "[control] Done. {}/{} RPC responses received.",
122        matched,
123        pending.len()
124    );
125    Ok(())
126}
Source

pub fn tick(&mut self) -> Result<()>

Source

pub fn publish_raw( &mut self, topic_key: &str, sub_topic: &str, payload: &[u8], ) -> Result<()>

发布原始字节(不经过 serde/bincode,直接透传)。 适用于图像、点云等已编码的二进制数据(JPEG、压缩点云等)。 频率控制与 publish_topic 共享。

Source

pub fn publish_topic<T: Serialize>( &mut self, topic_key: &str, sub_topic: &str, data: &T, ) -> Result<()>

发布特定子话题 (Bincode 序列化)

Examples found in repository?
examples/pub_node.rs (line 54)
18fn main() -> Result<()> {
19    init_logging();
20
21    let config_path = std::env::args()
22        .nth(1)
23        .unwrap_or_else(|| "pub_config.toml".to_string());
24
25    let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
26    let static_cfg = manager.static_cfg().clone();
27
28    // Time synchronizer for this node (acts as master in this simple setup)
29    let time_sync = Arc::new(TimeSynchronizer::new());
30
31    // Start discovery so other nodes can find us; we also get a live registry back.
32    let registry = start_discovery(
33        &static_cfg.my_id,
34        &static_cfg.host,
35        static_cfg.port,
36        static_cfg.is_master,
37        Some(time_sync.clone()),
38    )?;
39
40    // Publisher only: one PUB socket for topic "control"
41    // 频率由 static_config 的 publish_hz/subscribe_hz 提供,new() 时已注入
42    let mut bus = PubSubManager::new(&static_cfg, registry)?;
43
44    loop {
45        let dyn_cfg = manager.get_dynamic_clone();
46        let ts_ms = time_sync.now_corrected_ms();
47
48        let payload = format!(
49            "{} from {} at {} ms",
50            dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
51        );
52
53        // Single pub: topic key "control", sub-topic "demo"
54        bus.publish_topic("control", "demo", &payload)?;
55
56        thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
57    }
58}
More examples
Hide additional examples
examples/multi_pub_node.rs (line 74)
38fn main() -> Result<()> {
39    init_logging();
40
41    let config_path = std::env::args()
42        .nth(1)
43        .unwrap_or_else(|| "multi_pub_config.toml".to_string());
44
45    let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
46    let static_cfg = manager.static_cfg().clone();
47
48    let time_sync = Arc::new(TimeSynchronizer::new());
49
50    // This node also participates in discovery (acts as master here).
51    let registry = start_discovery(
52        &static_cfg.my_id,
53        &static_cfg.host,
54        static_cfg.port,
55        static_cfg.is_master,
56        Some(time_sync.clone()),
57    )?;
58
59    // One process, single PUB socket (topic key "control"), multiple sub-topics
60    let mut bus = PubSubManager::new(&static_cfg, registry)?;
61
62    let mut counter: u64 = 0;
63
64    loop {
65        let dyn_cfg = manager.get_dynamic_clone();
66        let ts_ms = time_sync.now_corrected_ms();
67        counter = counter.wrapping_add(1);
68
69        // control sub-topic
70        let control_msg = format!(
71            "[control] {} #{counter} at {} ms",
72            dyn_cfg.control_sub_topic, ts_ms
73        );
74        bus.publish_topic("control", "demo_control", &control_msg)?;
75
76        // status sub-topic
77        let status_msg = format!(
78            "[status] {} #{counter} at {} ms",
79            dyn_cfg.status_sub_topic, ts_ms
80        );
81        bus.publish_topic("control", "demo_status", &status_msg)?;
82
83        // alerts sub-topic
84        let alerts_msg = format!(
85            "[alerts] {} #{counter} at {} ms",
86            dyn_cfg.alerts_sub_topic, ts_ms
87        );
88        bus.publish_topic("control", "demo_alerts", &alerts_msg)?;
89
90        thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
91    }
92}
Source

pub fn try_recv_raw( &mut self, local_name: &str, ) -> Result<Option<(String, String, Vec<u8>)>>

接收原始字节 (由用户反序列化) 返回 (sender_id, sub_topic, payload),其中 sender_id 是发布者的 node_id。 内部自动调用 tick(),无需在主循环手动调用。

Examples found in repository?
examples/sub_node.rs (line 53)
27fn main() -> Result<()> {
28    init_logging();
29
30    let config_path = std::env::args()
31        .nth(1)
32        .unwrap_or_else(|| "sub_config.toml".to_string());
33
34    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
35
36    let time_sync = Arc::new(TimeSynchronizer::new());
37
38    // Subscriber participates in discovery so it can learn where `pub_node` is.
39    let registry = start_discovery(
40        &static_cfg.my_id,
41        &static_cfg.host,
42        static_cfg.port,
43        static_cfg.is_master,
44        Some(time_sync.clone()),
45    )?;
46
47    // Subscriber only: one SUB socket named "local_sub"
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    loop {
51        // try_recv_raw 内部自动 tick(),无需手动调用
52        // 返回值 (sender_id, sub_topic, payload) 告知消息来自哪个节点
53        while let Some((sender, topic, raw)) = bus.try_recv_raw("local_sub")? {
54            // rs_ctrl_os publish_topic() uses bincode; can_bridge currently publishes a String(JSON) under sub_topic="data".
55            if let Ok(s) = bincode::deserialize::<String>(&raw) {
56                println!("Sub from={sender} sub_topic={topic} string={s}");
57            } else {
58                let as_utf8 = std::str::from_utf8(&raw).ok();
59                println!(
60                    "Sub from={sender} sub_topic={topic} len={} utf8={} hex={}",
61                    raw.len(),
62                    as_utf8.unwrap_or("<non-utf8>"),
63                    fmt_hex(&raw)
64                );
65            }
66        }
67
68        // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
69        thread::sleep(Duration::from_millis(1));
70    }
71}
More examples
Hide additional examples
examples/multi_sub_node.rs (line 49)
15fn main() -> Result<()> {
16    init_logging();
17
18    let config_path = std::env::args()
19        .nth(1)
20        .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24    let time_sync = Arc::new(TimeSynchronizer::new());
25
26    // Participate in discovery to learn where `multi_pub` lives.
27    let registry = start_discovery(
28        &static_cfg.my_id,
29        &static_cfg.host,
30        static_cfg.port,
31        static_cfg.is_master,
32        Some(time_sync.clone()),
33    )?;
34
35    let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37    // 只关心各远端节点下“一个” sub_topic:
38    // - from_multi_pub: 只订阅 demo_status
39    // - from_pub:       只订阅 demo
40    bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41    bus.set_sub_topics("from_pub", &["demo"])?;
42
43    loop {
44        // try_recv_raw 内部自动 tick(),无需手动调用
45        // 多端口(多远端节点)+ 多子话题:
46        // - "from_multi_pub" 订阅 multi_pub 节点
47        // - "from_pub"       订阅 pub_node 节点
48        for local_name in &["from_multi_pub", "from_pub"] {
49            if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50                if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51                    println!(
52                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' text=\"{}\"",
53                        sender, local_name, topic, text
54                    );
55                } else {
56                    println!(
57                        "[multi_sub][dec]  from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58                        sender, local_name, topic
59                    );
60                }
61            }
62        }
63
64        // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65        thread::sleep(Duration::from_millis(1));
66    }
67}
Source

pub fn try_recv_specific<T: for<'de> Deserialize<'de>>( &mut self, local_name: &str, target_sub: &str, ) -> Result<Option<T>>

辅助:直接接收并反序列化为特定类型 (如果知道具体话题)

Source

pub fn publish_request( &mut self, topic_key: &str, sub_topic: &str, request_id: u64, payload: &[u8], ) -> Result<()>

发布 RPC 请求。自动绕过发布频率限制。

Examples found in repository?
examples/rpc_control_node.rs (line 81)
28fn main() -> Result<()> {
29    init_logging();
30
31    let config_path = std::env::args()
32        .nth(1)
33        .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37    let time_sync = Arc::new(TimeSynchronizer::new());
38
39    // Participates in discovery to find the arm node.
40    let registry = start_discovery(
41        &static_cfg.my_id,
42        &static_cfg.host,
43        static_cfg.port,
44        static_cfg.is_master,
45        Some(time_sync.clone()),
46    )?;
47
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    // Demo: send 5 distinct RPC commands, each with its own request_id.
51    let commands = vec![
52        ArmCommand {
53            command: "move_to".into(),
54            args: vec!["x:100".into(), "y:200".into()],
55        },
56        ArmCommand {
57            command: "set_speed".into(),
58            args: vec!["50".into()],
59        },
60        ArmCommand {
61            command: "grab".into(),
62            args: vec!["width:30".into()],
63        },
64        ArmCommand {
65            command: "move_to".into(),
66            args: vec!["x:300".into(), "y:400".into()],
67        },
68        ArmCommand {
69            command: "release".into(),
70            args: vec![],
71        },
72    ];
73
74    let mut pending: Vec<u64> = Vec::new();
75
76    for (i, cmd) in commands.into_iter().enumerate() {
77        let rid = i as u64;
78        let payload = bincode::serialize(&cmd)?;
79
80        println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81        bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82        pending.push(rid);
83
84        thread::sleep(Duration::from_millis(200)); // stagger sends
85    }
86
87    // Poll for responses with a timeout.
88    println!(
89        "[control] Waiting for {} response(s) from arm...",
90        pending.len()
91    );
92
93    let mut matched = 0u64;
94    let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96    // Filter: only accept RPC responses on this local_name.
97    // The sub_topic "rpc/status" is used by the arm node for replies.
98    bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100    while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101        // try_recv_response only returns messages with a valid RPC response envelope.
102        if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103            if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104                println!(
105                    "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106                    status
107                );
108                matched += 1;
109            } else {
110                println!(
111                    "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112                );
113            }
114        } else {
115            // No message yet — brief sleep to avoid busy spin.
116            thread::sleep(Duration::from_millis(10));
117        }
118    }
119
120    println!(
121        "[control] Done. {}/{} RPC responses received.",
122        matched,
123        pending.len()
124    );
125    Ok(())
126}
Source

pub fn publish_response( &mut self, topic_key: &str, sub_topic: &str, request_id: u64, payload: &[u8], ) -> Result<()>

发布 RPC 响应。自动绕过发布频率限制。

Examples found in repository?
examples/rpc_arm_node.rs (line 107)
66fn main() -> Result<()> {
67    init_logging();
68
69    let config_path = std::env::args()
70        .nth(1)
71        .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75    let time_sync = Arc::new(TimeSynchronizer::new());
76
77    let registry = start_discovery(
78        &static_cfg.my_id,
79        &static_cfg.host,
80        static_cfg.port,
81        static_cfg.is_master,
82        Some(time_sync.clone()),
83    )?;
84
85    let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87    // Only accept RPC request messages with sub_topic "rpc/cmd".
88    bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90    println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92    // Receive up to 10 requests then exit (demonstration limit).
93    let mut count = 0;
94    while count < 10 {
95        // try_recv_request returns only messages with a valid RPC request envelope.
96        if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97            if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98                println!(
99                    "[arm]  RPC request #{} rid={} from={} topic={} cmd={:?}",
100                    count, rid, sender, topic, cmd
101                );
102
103                let status = handle_command(&cmd);
104                let response = bincode::serialize(&status)?;
105
106                // Send response back to the control node.
107                bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108                println!("[arm]  -> reply: {:?}", status);
109                count += 1;
110            } else {
111                println!("[arm]  RPC request rid={} from={} <deserialize failed>", rid, sender);
112            }
113        } else {
114            thread::sleep(Duration::from_millis(10));
115        }
116    }
117
118    println!("[arm] Processed {} requests, shutting down.", count);
119    Ok(())
120}
Source

pub fn try_recv_request( &mut self, local_name: &str, ) -> Result<Option<(String, u64, String, Vec<u8>)>>

接收 RPC 请求。 返回 (sender_id, request_id, sub_topic, payload)。 非 RPC 请求的消息会被静默丢弃,请通过 sub_topic 分离普通流量和 RPC 流量。

Examples found in repository?
examples/rpc_arm_node.rs (line 96)
66fn main() -> Result<()> {
67    init_logging();
68
69    let config_path = std::env::args()
70        .nth(1)
71        .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75    let time_sync = Arc::new(TimeSynchronizer::new());
76
77    let registry = start_discovery(
78        &static_cfg.my_id,
79        &static_cfg.host,
80        static_cfg.port,
81        static_cfg.is_master,
82        Some(time_sync.clone()),
83    )?;
84
85    let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87    // Only accept RPC request messages with sub_topic "rpc/cmd".
88    bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90    println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92    // Receive up to 10 requests then exit (demonstration limit).
93    let mut count = 0;
94    while count < 10 {
95        // try_recv_request returns only messages with a valid RPC request envelope.
96        if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97            if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98                println!(
99                    "[arm]  RPC request #{} rid={} from={} topic={} cmd={:?}",
100                    count, rid, sender, topic, cmd
101                );
102
103                let status = handle_command(&cmd);
104                let response = bincode::serialize(&status)?;
105
106                // Send response back to the control node.
107                bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108                println!("[arm]  -> reply: {:?}", status);
109                count += 1;
110            } else {
111                println!("[arm]  RPC request rid={} from={} <deserialize failed>", rid, sender);
112            }
113        } else {
114            thread::sleep(Duration::from_millis(10));
115        }
116    }
117
118    println!("[arm] Processed {} requests, shutting down.", count);
119    Ok(())
120}
Source

pub fn try_recv_response( &mut self, local_name: &str, ) -> Result<Option<(String, u64, String, Vec<u8>)>>

接收 RPC 响应。 返回 (sender_id, request_id, sub_topic, payload)。 非 RPC 响应的消息会被静默丢弃。

Examples found in repository?
examples/rpc_control_node.rs (line 102)
28fn main() -> Result<()> {
29    init_logging();
30
31    let config_path = std::env::args()
32        .nth(1)
33        .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37    let time_sync = Arc::new(TimeSynchronizer::new());
38
39    // Participates in discovery to find the arm node.
40    let registry = start_discovery(
41        &static_cfg.my_id,
42        &static_cfg.host,
43        static_cfg.port,
44        static_cfg.is_master,
45        Some(time_sync.clone()),
46    )?;
47
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    // Demo: send 5 distinct RPC commands, each with its own request_id.
51    let commands = vec![
52        ArmCommand {
53            command: "move_to".into(),
54            args: vec!["x:100".into(), "y:200".into()],
55        },
56        ArmCommand {
57            command: "set_speed".into(),
58            args: vec!["50".into()],
59        },
60        ArmCommand {
61            command: "grab".into(),
62            args: vec!["width:30".into()],
63        },
64        ArmCommand {
65            command: "move_to".into(),
66            args: vec!["x:300".into(), "y:400".into()],
67        },
68        ArmCommand {
69            command: "release".into(),
70            args: vec![],
71        },
72    ];
73
74    let mut pending: Vec<u64> = Vec::new();
75
76    for (i, cmd) in commands.into_iter().enumerate() {
77        let rid = i as u64;
78        let payload = bincode::serialize(&cmd)?;
79
80        println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81        bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82        pending.push(rid);
83
84        thread::sleep(Duration::from_millis(200)); // stagger sends
85    }
86
87    // Poll for responses with a timeout.
88    println!(
89        "[control] Waiting for {} response(s) from arm...",
90        pending.len()
91    );
92
93    let mut matched = 0u64;
94    let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96    // Filter: only accept RPC responses on this local_name.
97    // The sub_topic "rpc/status" is used by the arm node for replies.
98    bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100    while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101        // try_recv_response only returns messages with a valid RPC response envelope.
102        if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103            if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104                println!(
105                    "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106                    status
107                );
108                matched += 1;
109            } else {
110                println!(
111                    "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112                );
113            }
114        } else {
115            // No message yet — brief sleep to avoid busy spin.
116            thread::sleep(Duration::from_millis(10));
117        }
118    }
119
120    println!(
121        "[control] Done. {}/{} RPC responses received.",
122        matched,
123        pending.len()
124    );
125    Ok(())
126}

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more