pub struct PubSubManager { /* private fields */ }Expand description
Pub/Sub 管理器
- 发布频率控制:通过
publish_hz限制publish_topic的最大发送速率。 - 订阅频率控制:通过
subscribe_hz限制try_recv_*的轮询频率。 频率配置建议从各节点的[dynamic]中传入(如publish_hz/subscribe_hz)。
Implementations§
Source§impl PubSubManager
impl PubSubManager
Sourcepub fn new(static_cfg: &StaticBase, registry: ServiceRegistry) -> Result<Self>
pub fn new(static_cfg: &StaticBase, registry: ServiceRegistry) -> Result<Self>
Examples found in repository?
examples/pub_node.rs (line 42)
18fn main() -> Result<()> {
19 init_logging();
20
21 let config_path = std::env::args()
22 .nth(1)
23 .unwrap_or_else(|| "pub_config.toml".to_string());
24
25 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
26 let static_cfg = manager.static_cfg().clone();
27
28 // Time synchronizer for this node (acts as master in this simple setup)
29 let time_sync = Arc::new(TimeSynchronizer::new());
30
31 // Start discovery so other nodes can find us; we also get a live registry back.
32 let registry = start_discovery(
33 &static_cfg.my_id,
34 &static_cfg.host,
35 static_cfg.port,
36 static_cfg.is_master,
37 Some(time_sync.clone()),
38 )?;
39
40 // Publisher only: one PUB socket for topic "control"
41 // 频率由 static_config 的 publish_hz/subscribe_hz 提供,new() 时已注入
42 let mut bus = PubSubManager::new(&static_cfg, registry)?;
43
44 loop {
45 let dyn_cfg = manager.get_dynamic_clone();
46 let ts_ms = time_sync.now_corrected_ms();
47
48 let payload = format!(
49 "{} from {} at {} ms",
50 dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
51 );
52
53 // Single pub: topic key "control", sub-topic "demo"
54 bus.publish_topic("control", "demo", &payload)?;
55
56 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
57 }
58}More examples
examples/sub_node.rs (line 48)
27fn main() -> Result<()> {
28 init_logging();
29
30 let config_path = std::env::args()
31 .nth(1)
32 .unwrap_or_else(|| "sub_config.toml".to_string());
33
34 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
35
36 let time_sync = Arc::new(TimeSynchronizer::new());
37
38 // Subscriber participates in discovery so it can learn where `pub_node` is.
39 let registry = start_discovery(
40 &static_cfg.my_id,
41 &static_cfg.host,
42 static_cfg.port,
43 static_cfg.is_master,
44 Some(time_sync.clone()),
45 )?;
46
47 // Subscriber only: one SUB socket named "local_sub"
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 loop {
51 // try_recv_raw 内部自动 tick(),无需手动调用
52 // 返回值 (sender_id, sub_topic, payload) 告知消息来自哪个节点
53 while let Some((sender, topic, raw)) = bus.try_recv_raw("local_sub")? {
54 // rs_ctrl_os publish_topic() uses bincode; can_bridge currently publishes a String(JSON) under sub_topic="data".
55 if let Ok(s) = bincode::deserialize::<String>(&raw) {
56 println!("Sub from={sender} sub_topic={topic} string={s}");
57 } else {
58 let as_utf8 = std::str::from_utf8(&raw).ok();
59 println!(
60 "Sub from={sender} sub_topic={topic} len={} utf8={} hex={}",
61 raw.len(),
62 as_utf8.unwrap_or("<non-utf8>"),
63 fmt_hex(&raw)
64 );
65 }
66 }
67
68 // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
69 thread::sleep(Duration::from_millis(1));
70 }
71}examples/multi_pub_node.rs (line 60)
38fn main() -> Result<()> {
39 init_logging();
40
41 let config_path = std::env::args()
42 .nth(1)
43 .unwrap_or_else(|| "multi_pub_config.toml".to_string());
44
45 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
46 let static_cfg = manager.static_cfg().clone();
47
48 let time_sync = Arc::new(TimeSynchronizer::new());
49
50 // This node also participates in discovery (acts as master here).
51 let registry = start_discovery(
52 &static_cfg.my_id,
53 &static_cfg.host,
54 static_cfg.port,
55 static_cfg.is_master,
56 Some(time_sync.clone()),
57 )?;
58
59 // One process, single PUB socket (topic key "control"), multiple sub-topics
60 let mut bus = PubSubManager::new(&static_cfg, registry)?;
61
62 let mut counter: u64 = 0;
63
64 loop {
65 let dyn_cfg = manager.get_dynamic_clone();
66 let ts_ms = time_sync.now_corrected_ms();
67 counter = counter.wrapping_add(1);
68
69 // control sub-topic
70 let control_msg = format!(
71 "[control] {} #{counter} at {} ms",
72 dyn_cfg.control_sub_topic, ts_ms
73 );
74 bus.publish_topic("control", "demo_control", &control_msg)?;
75
76 // status sub-topic
77 let status_msg = format!(
78 "[status] {} #{counter} at {} ms",
79 dyn_cfg.status_sub_topic, ts_ms
80 );
81 bus.publish_topic("control", "demo_status", &status_msg)?;
82
83 // alerts sub-topic
84 let alerts_msg = format!(
85 "[alerts] {} #{counter} at {} ms",
86 dyn_cfg.alerts_sub_topic, ts_ms
87 );
88 bus.publish_topic("control", "demo_alerts", &alerts_msg)?;
89
90 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
91 }
92}examples/rpc_arm_node.rs (line 85)
66fn main() -> Result<()> {
67 init_logging();
68
69 let config_path = std::env::args()
70 .nth(1)
71 .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75 let time_sync = Arc::new(TimeSynchronizer::new());
76
77 let registry = start_discovery(
78 &static_cfg.my_id,
79 &static_cfg.host,
80 static_cfg.port,
81 static_cfg.is_master,
82 Some(time_sync.clone()),
83 )?;
84
85 let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87 // Only accept RPC request messages with sub_topic "rpc/cmd".
88 bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90 println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92 // Receive up to 10 requests then exit (demonstration limit).
93 let mut count = 0;
94 while count < 10 {
95 // try_recv_request returns only messages with a valid RPC request envelope.
96 if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97 if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98 println!(
99 "[arm] RPC request #{} rid={} from={} topic={} cmd={:?}",
100 count, rid, sender, topic, cmd
101 );
102
103 let status = handle_command(&cmd);
104 let response = bincode::serialize(&status)?;
105
106 // Send response back to the control node.
107 bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108 println!("[arm] -> reply: {:?}", status);
109 count += 1;
110 } else {
111 println!("[arm] RPC request rid={} from={} <deserialize failed>", rid, sender);
112 }
113 } else {
114 thread::sleep(Duration::from_millis(10));
115 }
116 }
117
118 println!("[arm] Processed {} requests, shutting down.", count);
119 Ok(())
120}examples/multi_sub_node.rs (line 35)
15fn main() -> Result<()> {
16 init_logging();
17
18 let config_path = std::env::args()
19 .nth(1)
20 .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24 let time_sync = Arc::new(TimeSynchronizer::new());
25
26 // Participate in discovery to learn where `multi_pub` lives.
27 let registry = start_discovery(
28 &static_cfg.my_id,
29 &static_cfg.host,
30 static_cfg.port,
31 static_cfg.is_master,
32 Some(time_sync.clone()),
33 )?;
34
35 let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37 // 只关心各远端节点下“一个” sub_topic:
38 // - from_multi_pub: 只订阅 demo_status
39 // - from_pub: 只订阅 demo
40 bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41 bus.set_sub_topics("from_pub", &["demo"])?;
42
43 loop {
44 // try_recv_raw 内部自动 tick(),无需手动调用
45 // 多端口(多远端节点)+ 多子话题:
46 // - "from_multi_pub" 订阅 multi_pub 节点
47 // - "from_pub" 订阅 pub_node 节点
48 for local_name in &["from_multi_pub", "from_pub"] {
49 if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50 if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51 println!(
52 "[multi_sub][dec] from={} local='{}' sub_topic='{}' text=\"{}\"",
53 sender, local_name, topic, text
54 );
55 } else {
56 println!(
57 "[multi_sub][dec] from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58 sender, local_name, topic
59 );
60 }
61 }
62 }
63
64 // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65 thread::sleep(Duration::from_millis(1));
66 }
67}examples/rpc_control_node.rs (line 48)
28fn main() -> Result<()> {
29 init_logging();
30
31 let config_path = std::env::args()
32 .nth(1)
33 .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37 let time_sync = Arc::new(TimeSynchronizer::new());
38
39 // Participates in discovery to find the arm node.
40 let registry = start_discovery(
41 &static_cfg.my_id,
42 &static_cfg.host,
43 static_cfg.port,
44 static_cfg.is_master,
45 Some(time_sync.clone()),
46 )?;
47
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 // Demo: send 5 distinct RPC commands, each with its own request_id.
51 let commands = vec![
52 ArmCommand {
53 command: "move_to".into(),
54 args: vec!["x:100".into(), "y:200".into()],
55 },
56 ArmCommand {
57 command: "set_speed".into(),
58 args: vec!["50".into()],
59 },
60 ArmCommand {
61 command: "grab".into(),
62 args: vec!["width:30".into()],
63 },
64 ArmCommand {
65 command: "move_to".into(),
66 args: vec!["x:300".into(), "y:400".into()],
67 },
68 ArmCommand {
69 command: "release".into(),
70 args: vec![],
71 },
72 ];
73
74 let mut pending: Vec<u64> = Vec::new();
75
76 for (i, cmd) in commands.into_iter().enumerate() {
77 let rid = i as u64;
78 let payload = bincode::serialize(&cmd)?;
79
80 println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81 bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82 pending.push(rid);
83
84 thread::sleep(Duration::from_millis(200)); // stagger sends
85 }
86
87 // Poll for responses with a timeout.
88 println!(
89 "[control] Waiting for {} response(s) from arm...",
90 pending.len()
91 );
92
93 let mut matched = 0u64;
94 let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96 // Filter: only accept RPC responses on this local_name.
97 // The sub_topic "rpc/status" is used by the arm node for replies.
98 bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100 while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101 // try_recv_response only returns messages with a valid RPC response envelope.
102 if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103 if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104 println!(
105 "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106 status
107 );
108 matched += 1;
109 } else {
110 println!(
111 "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112 );
113 }
114 } else {
115 // No message yet — brief sleep to avoid busy spin.
116 thread::sleep(Duration::from_millis(10));
117 }
118 }
119
120 println!(
121 "[control] Done. {}/{} RPC responses received.",
122 matched,
123 pending.len()
124 );
125 Ok(())
126}Sourcepub fn set_publish_hz(&mut self, hz: i64)
pub fn set_publish_hz(&mut self, hz: i64)
设置节点级发布频率(Hz)。
hz > 0:对所有publish_topic生效,按最小时间间隔限频;hz = 0:动态频率(有多少发多快,仍受 ZMQ HWM 影响)。hz < 0:不发布(publish_topic 直接返回 Ok(()))。
Sourcepub fn set_subscribe_hz(&mut self, hz: i64)
pub fn set_subscribe_hz(&mut self, hz: i64)
设置节点级订阅/处理频率(Hz)。
hz > 0:对所有try_recv_*生效,按最小时间间隔限频;hz = 0:动态频率(按调用频率尝试收取)。hz < 0:不订阅/不消费(直接返回 Ok(None))。
Sourcepub fn set_sub_topics<S: AsRef<str>>(
&mut self,
local_name: &str,
topics: &[S],
) -> Result<()>
pub fn set_sub_topics<S: AsRef<str>>( &mut self, local_name: &str, topics: &[S], ) -> Result<()>
为指定本地订阅名配置需要保留的 sub_topic 列表。
topics为空:不过滤任何 sub_topic(保留所有)。- 非空:仅当收到的 sub_topic 在此列表中时才返回;其他 sub_topic 会被静默丢弃。
Examples found in repository?
examples/rpc_arm_node.rs (line 88)
66fn main() -> Result<()> {
67 init_logging();
68
69 let config_path = std::env::args()
70 .nth(1)
71 .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75 let time_sync = Arc::new(TimeSynchronizer::new());
76
77 let registry = start_discovery(
78 &static_cfg.my_id,
79 &static_cfg.host,
80 static_cfg.port,
81 static_cfg.is_master,
82 Some(time_sync.clone()),
83 )?;
84
85 let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87 // Only accept RPC request messages with sub_topic "rpc/cmd".
88 bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90 println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92 // Receive up to 10 requests then exit (demonstration limit).
93 let mut count = 0;
94 while count < 10 {
95 // try_recv_request returns only messages with a valid RPC request envelope.
96 if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97 if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98 println!(
99 "[arm] RPC request #{} rid={} from={} topic={} cmd={:?}",
100 count, rid, sender, topic, cmd
101 );
102
103 let status = handle_command(&cmd);
104 let response = bincode::serialize(&status)?;
105
106 // Send response back to the control node.
107 bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108 println!("[arm] -> reply: {:?}", status);
109 count += 1;
110 } else {
111 println!("[arm] RPC request rid={} from={} <deserialize failed>", rid, sender);
112 }
113 } else {
114 thread::sleep(Duration::from_millis(10));
115 }
116 }
117
118 println!("[arm] Processed {} requests, shutting down.", count);
119 Ok(())
120}More examples
examples/multi_sub_node.rs (line 40)
15fn main() -> Result<()> {
16 init_logging();
17
18 let config_path = std::env::args()
19 .nth(1)
20 .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24 let time_sync = Arc::new(TimeSynchronizer::new());
25
26 // Participate in discovery to learn where `multi_pub` lives.
27 let registry = start_discovery(
28 &static_cfg.my_id,
29 &static_cfg.host,
30 static_cfg.port,
31 static_cfg.is_master,
32 Some(time_sync.clone()),
33 )?;
34
35 let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37 // 只关心各远端节点下“一个” sub_topic:
38 // - from_multi_pub: 只订阅 demo_status
39 // - from_pub: 只订阅 demo
40 bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41 bus.set_sub_topics("from_pub", &["demo"])?;
42
43 loop {
44 // try_recv_raw 内部自动 tick(),无需手动调用
45 // 多端口(多远端节点)+ 多子话题:
46 // - "from_multi_pub" 订阅 multi_pub 节点
47 // - "from_pub" 订阅 pub_node 节点
48 for local_name in &["from_multi_pub", "from_pub"] {
49 if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50 if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51 println!(
52 "[multi_sub][dec] from={} local='{}' sub_topic='{}' text=\"{}\"",
53 sender, local_name, topic, text
54 );
55 } else {
56 println!(
57 "[multi_sub][dec] from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58 sender, local_name, topic
59 );
60 }
61 }
62 }
63
64 // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65 thread::sleep(Duration::from_millis(1));
66 }
67}examples/rpc_control_node.rs (line 98)
28fn main() -> Result<()> {
29 init_logging();
30
31 let config_path = std::env::args()
32 .nth(1)
33 .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37 let time_sync = Arc::new(TimeSynchronizer::new());
38
39 // Participates in discovery to find the arm node.
40 let registry = start_discovery(
41 &static_cfg.my_id,
42 &static_cfg.host,
43 static_cfg.port,
44 static_cfg.is_master,
45 Some(time_sync.clone()),
46 )?;
47
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 // Demo: send 5 distinct RPC commands, each with its own request_id.
51 let commands = vec![
52 ArmCommand {
53 command: "move_to".into(),
54 args: vec!["x:100".into(), "y:200".into()],
55 },
56 ArmCommand {
57 command: "set_speed".into(),
58 args: vec!["50".into()],
59 },
60 ArmCommand {
61 command: "grab".into(),
62 args: vec!["width:30".into()],
63 },
64 ArmCommand {
65 command: "move_to".into(),
66 args: vec!["x:300".into(), "y:400".into()],
67 },
68 ArmCommand {
69 command: "release".into(),
70 args: vec![],
71 },
72 ];
73
74 let mut pending: Vec<u64> = Vec::new();
75
76 for (i, cmd) in commands.into_iter().enumerate() {
77 let rid = i as u64;
78 let payload = bincode::serialize(&cmd)?;
79
80 println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81 bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82 pending.push(rid);
83
84 thread::sleep(Duration::from_millis(200)); // stagger sends
85 }
86
87 // Poll for responses with a timeout.
88 println!(
89 "[control] Waiting for {} response(s) from arm...",
90 pending.len()
91 );
92
93 let mut matched = 0u64;
94 let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96 // Filter: only accept RPC responses on this local_name.
97 // The sub_topic "rpc/status" is used by the arm node for replies.
98 bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100 while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101 // try_recv_response only returns messages with a valid RPC response envelope.
102 if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103 if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104 println!(
105 "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106 status
107 );
108 matched += 1;
109 } else {
110 println!(
111 "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112 );
113 }
114 } else {
115 // No message yet — brief sleep to avoid busy spin.
116 thread::sleep(Duration::from_millis(10));
117 }
118 }
119
120 println!(
121 "[control] Done. {}/{} RPC responses received.",
122 matched,
123 pending.len()
124 );
125 Ok(())
126}pub fn tick(&mut self) -> Result<()>
Sourcepub fn publish_raw(
&mut self,
topic_key: &str,
sub_topic: &str,
payload: &[u8],
) -> Result<()>
pub fn publish_raw( &mut self, topic_key: &str, sub_topic: &str, payload: &[u8], ) -> Result<()>
发布原始字节(不经过 serde/bincode,直接透传)。
适用于图像、点云等已编码的二进制数据(JPEG、压缩点云等)。
频率控制与 publish_topic 共享。
Sourcepub fn publish_topic<T: Serialize>(
&mut self,
topic_key: &str,
sub_topic: &str,
data: &T,
) -> Result<()>
pub fn publish_topic<T: Serialize>( &mut self, topic_key: &str, sub_topic: &str, data: &T, ) -> Result<()>
发布特定子话题 (Bincode 序列化)
Examples found in repository?
examples/pub_node.rs (line 54)
18fn main() -> Result<()> {
19 init_logging();
20
21 let config_path = std::env::args()
22 .nth(1)
23 .unwrap_or_else(|| "pub_config.toml".to_string());
24
25 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
26 let static_cfg = manager.static_cfg().clone();
27
28 // Time synchronizer for this node (acts as master in this simple setup)
29 let time_sync = Arc::new(TimeSynchronizer::new());
30
31 // Start discovery so other nodes can find us; we also get a live registry back.
32 let registry = start_discovery(
33 &static_cfg.my_id,
34 &static_cfg.host,
35 static_cfg.port,
36 static_cfg.is_master,
37 Some(time_sync.clone()),
38 )?;
39
40 // Publisher only: one PUB socket for topic "control"
41 // 频率由 static_config 的 publish_hz/subscribe_hz 提供,new() 时已注入
42 let mut bus = PubSubManager::new(&static_cfg, registry)?;
43
44 loop {
45 let dyn_cfg = manager.get_dynamic_clone();
46 let ts_ms = time_sync.now_corrected_ms();
47
48 let payload = format!(
49 "{} from {} at {} ms",
50 dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
51 );
52
53 // Single pub: topic key "control", sub-topic "demo"
54 bus.publish_topic("control", "demo", &payload)?;
55
56 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
57 }
58}More examples
examples/multi_pub_node.rs (line 74)
38fn main() -> Result<()> {
39 init_logging();
40
41 let config_path = std::env::args()
42 .nth(1)
43 .unwrap_or_else(|| "multi_pub_config.toml".to_string());
44
45 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
46 let static_cfg = manager.static_cfg().clone();
47
48 let time_sync = Arc::new(TimeSynchronizer::new());
49
50 // This node also participates in discovery (acts as master here).
51 let registry = start_discovery(
52 &static_cfg.my_id,
53 &static_cfg.host,
54 static_cfg.port,
55 static_cfg.is_master,
56 Some(time_sync.clone()),
57 )?;
58
59 // One process, single PUB socket (topic key "control"), multiple sub-topics
60 let mut bus = PubSubManager::new(&static_cfg, registry)?;
61
62 let mut counter: u64 = 0;
63
64 loop {
65 let dyn_cfg = manager.get_dynamic_clone();
66 let ts_ms = time_sync.now_corrected_ms();
67 counter = counter.wrapping_add(1);
68
69 // control sub-topic
70 let control_msg = format!(
71 "[control] {} #{counter} at {} ms",
72 dyn_cfg.control_sub_topic, ts_ms
73 );
74 bus.publish_topic("control", "demo_control", &control_msg)?;
75
76 // status sub-topic
77 let status_msg = format!(
78 "[status] {} #{counter} at {} ms",
79 dyn_cfg.status_sub_topic, ts_ms
80 );
81 bus.publish_topic("control", "demo_status", &status_msg)?;
82
83 // alerts sub-topic
84 let alerts_msg = format!(
85 "[alerts] {} #{counter} at {} ms",
86 dyn_cfg.alerts_sub_topic, ts_ms
87 );
88 bus.publish_topic("control", "demo_alerts", &alerts_msg)?;
89
90 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
91 }
92}Sourcepub fn try_recv_raw(
&mut self,
local_name: &str,
) -> Result<Option<(String, String, Vec<u8>)>>
pub fn try_recv_raw( &mut self, local_name: &str, ) -> Result<Option<(String, String, Vec<u8>)>>
接收原始字节 (由用户反序列化) 返回 (sender_id, sub_topic, payload),其中 sender_id 是发布者的 node_id。 内部自动调用 tick(),无需在主循环手动调用。
Examples found in repository?
examples/sub_node.rs (line 53)
27fn main() -> Result<()> {
28 init_logging();
29
30 let config_path = std::env::args()
31 .nth(1)
32 .unwrap_or_else(|| "sub_config.toml".to_string());
33
34 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
35
36 let time_sync = Arc::new(TimeSynchronizer::new());
37
38 // Subscriber participates in discovery so it can learn where `pub_node` is.
39 let registry = start_discovery(
40 &static_cfg.my_id,
41 &static_cfg.host,
42 static_cfg.port,
43 static_cfg.is_master,
44 Some(time_sync.clone()),
45 )?;
46
47 // Subscriber only: one SUB socket named "local_sub"
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 loop {
51 // try_recv_raw 内部自动 tick(),无需手动调用
52 // 返回值 (sender_id, sub_topic, payload) 告知消息来自哪个节点
53 while let Some((sender, topic, raw)) = bus.try_recv_raw("local_sub")? {
54 // rs_ctrl_os publish_topic() uses bincode; can_bridge currently publishes a String(JSON) under sub_topic="data".
55 if let Ok(s) = bincode::deserialize::<String>(&raw) {
56 println!("Sub from={sender} sub_topic={topic} string={s}");
57 } else {
58 let as_utf8 = std::str::from_utf8(&raw).ok();
59 println!(
60 "Sub from={sender} sub_topic={topic} len={} utf8={} hex={}",
61 raw.len(),
62 as_utf8.unwrap_or("<non-utf8>"),
63 fmt_hex(&raw)
64 );
65 }
66 }
67
68 // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
69 thread::sleep(Duration::from_millis(1));
70 }
71}More examples
examples/multi_sub_node.rs (line 49)
15fn main() -> Result<()> {
16 init_logging();
17
18 let config_path = std::env::args()
19 .nth(1)
20 .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24 let time_sync = Arc::new(TimeSynchronizer::new());
25
26 // Participate in discovery to learn where `multi_pub` lives.
27 let registry = start_discovery(
28 &static_cfg.my_id,
29 &static_cfg.host,
30 static_cfg.port,
31 static_cfg.is_master,
32 Some(time_sync.clone()),
33 )?;
34
35 let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37 // 只关心各远端节点下“一个” sub_topic:
38 // - from_multi_pub: 只订阅 demo_status
39 // - from_pub: 只订阅 demo
40 bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41 bus.set_sub_topics("from_pub", &["demo"])?;
42
43 loop {
44 // try_recv_raw 内部自动 tick(),无需手动调用
45 // 多端口(多远端节点)+ 多子话题:
46 // - "from_multi_pub" 订阅 multi_pub 节点
47 // - "from_pub" 订阅 pub_node 节点
48 for local_name in &["from_multi_pub", "from_pub"] {
49 if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50 if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51 println!(
52 "[multi_sub][dec] from={} local='{}' sub_topic='{}' text=\"{}\"",
53 sender, local_name, topic, text
54 );
55 } else {
56 println!(
57 "[multi_sub][dec] from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58 sender, local_name, topic
59 );
60 }
61 }
62 }
63
64 // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65 thread::sleep(Duration::from_millis(1));
66 }
67}Sourcepub fn try_recv_specific<T: for<'de> Deserialize<'de>>(
&mut self,
local_name: &str,
target_sub: &str,
) -> Result<Option<T>>
pub fn try_recv_specific<T: for<'de> Deserialize<'de>>( &mut self, local_name: &str, target_sub: &str, ) -> Result<Option<T>>
辅助:直接接收并反序列化为特定类型 (如果知道具体话题)
Sourcepub fn publish_request(
&mut self,
topic_key: &str,
sub_topic: &str,
request_id: u64,
payload: &[u8],
) -> Result<()>
pub fn publish_request( &mut self, topic_key: &str, sub_topic: &str, request_id: u64, payload: &[u8], ) -> Result<()>
发布 RPC 请求。自动绕过发布频率限制。
Examples found in repository?
examples/rpc_control_node.rs (line 81)
28fn main() -> Result<()> {
29 init_logging();
30
31 let config_path = std::env::args()
32 .nth(1)
33 .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37 let time_sync = Arc::new(TimeSynchronizer::new());
38
39 // Participates in discovery to find the arm node.
40 let registry = start_discovery(
41 &static_cfg.my_id,
42 &static_cfg.host,
43 static_cfg.port,
44 static_cfg.is_master,
45 Some(time_sync.clone()),
46 )?;
47
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 // Demo: send 5 distinct RPC commands, each with its own request_id.
51 let commands = vec![
52 ArmCommand {
53 command: "move_to".into(),
54 args: vec!["x:100".into(), "y:200".into()],
55 },
56 ArmCommand {
57 command: "set_speed".into(),
58 args: vec!["50".into()],
59 },
60 ArmCommand {
61 command: "grab".into(),
62 args: vec!["width:30".into()],
63 },
64 ArmCommand {
65 command: "move_to".into(),
66 args: vec!["x:300".into(), "y:400".into()],
67 },
68 ArmCommand {
69 command: "release".into(),
70 args: vec![],
71 },
72 ];
73
74 let mut pending: Vec<u64> = Vec::new();
75
76 for (i, cmd) in commands.into_iter().enumerate() {
77 let rid = i as u64;
78 let payload = bincode::serialize(&cmd)?;
79
80 println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81 bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82 pending.push(rid);
83
84 thread::sleep(Duration::from_millis(200)); // stagger sends
85 }
86
87 // Poll for responses with a timeout.
88 println!(
89 "[control] Waiting for {} response(s) from arm...",
90 pending.len()
91 );
92
93 let mut matched = 0u64;
94 let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96 // Filter: only accept RPC responses on this local_name.
97 // The sub_topic "rpc/status" is used by the arm node for replies.
98 bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100 while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101 // try_recv_response only returns messages with a valid RPC response envelope.
102 if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103 if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104 println!(
105 "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106 status
107 );
108 matched += 1;
109 } else {
110 println!(
111 "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112 );
113 }
114 } else {
115 // No message yet — brief sleep to avoid busy spin.
116 thread::sleep(Duration::from_millis(10));
117 }
118 }
119
120 println!(
121 "[control] Done. {}/{} RPC responses received.",
122 matched,
123 pending.len()
124 );
125 Ok(())
126}Sourcepub fn publish_response(
&mut self,
topic_key: &str,
sub_topic: &str,
request_id: u64,
payload: &[u8],
) -> Result<()>
pub fn publish_response( &mut self, topic_key: &str, sub_topic: &str, request_id: u64, payload: &[u8], ) -> Result<()>
发布 RPC 响应。自动绕过发布频率限制。
Examples found in repository?
examples/rpc_arm_node.rs (line 107)
66fn main() -> Result<()> {
67 init_logging();
68
69 let config_path = std::env::args()
70 .nth(1)
71 .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75 let time_sync = Arc::new(TimeSynchronizer::new());
76
77 let registry = start_discovery(
78 &static_cfg.my_id,
79 &static_cfg.host,
80 static_cfg.port,
81 static_cfg.is_master,
82 Some(time_sync.clone()),
83 )?;
84
85 let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87 // Only accept RPC request messages with sub_topic "rpc/cmd".
88 bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90 println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92 // Receive up to 10 requests then exit (demonstration limit).
93 let mut count = 0;
94 while count < 10 {
95 // try_recv_request returns only messages with a valid RPC request envelope.
96 if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97 if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98 println!(
99 "[arm] RPC request #{} rid={} from={} topic={} cmd={:?}",
100 count, rid, sender, topic, cmd
101 );
102
103 let status = handle_command(&cmd);
104 let response = bincode::serialize(&status)?;
105
106 // Send response back to the control node.
107 bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108 println!("[arm] -> reply: {:?}", status);
109 count += 1;
110 } else {
111 println!("[arm] RPC request rid={} from={} <deserialize failed>", rid, sender);
112 }
113 } else {
114 thread::sleep(Duration::from_millis(10));
115 }
116 }
117
118 println!("[arm] Processed {} requests, shutting down.", count);
119 Ok(())
120}Sourcepub fn try_recv_request(
&mut self,
local_name: &str,
) -> Result<Option<(String, u64, String, Vec<u8>)>>
pub fn try_recv_request( &mut self, local_name: &str, ) -> Result<Option<(String, u64, String, Vec<u8>)>>
接收 RPC 请求。
返回 (sender_id, request_id, sub_topic, payload)。
非 RPC 请求的消息会被静默丢弃,请通过 sub_topic 分离普通流量和 RPC 流量。
Examples found in repository?
examples/rpc_arm_node.rs (line 96)
66fn main() -> Result<()> {
67 init_logging();
68
69 let config_path = std::env::args()
70 .nth(1)
71 .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75 let time_sync = Arc::new(TimeSynchronizer::new());
76
77 let registry = start_discovery(
78 &static_cfg.my_id,
79 &static_cfg.host,
80 static_cfg.port,
81 static_cfg.is_master,
82 Some(time_sync.clone()),
83 )?;
84
85 let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87 // Only accept RPC request messages with sub_topic "rpc/cmd".
88 bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90 println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92 // Receive up to 10 requests then exit (demonstration limit).
93 let mut count = 0;
94 while count < 10 {
95 // try_recv_request returns only messages with a valid RPC request envelope.
96 if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97 if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98 println!(
99 "[arm] RPC request #{} rid={} from={} topic={} cmd={:?}",
100 count, rid, sender, topic, cmd
101 );
102
103 let status = handle_command(&cmd);
104 let response = bincode::serialize(&status)?;
105
106 // Send response back to the control node.
107 bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108 println!("[arm] -> reply: {:?}", status);
109 count += 1;
110 } else {
111 println!("[arm] RPC request rid={} from={} <deserialize failed>", rid, sender);
112 }
113 } else {
114 thread::sleep(Duration::from_millis(10));
115 }
116 }
117
118 println!("[arm] Processed {} requests, shutting down.", count);
119 Ok(())
120}Sourcepub fn try_recv_response(
&mut self,
local_name: &str,
) -> Result<Option<(String, u64, String, Vec<u8>)>>
pub fn try_recv_response( &mut self, local_name: &str, ) -> Result<Option<(String, u64, String, Vec<u8>)>>
接收 RPC 响应。
返回 (sender_id, request_id, sub_topic, payload)。
非 RPC 响应的消息会被静默丢弃。
Examples found in repository?
examples/rpc_control_node.rs (line 102)
28fn main() -> Result<()> {
29 init_logging();
30
31 let config_path = std::env::args()
32 .nth(1)
33 .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37 let time_sync = Arc::new(TimeSynchronizer::new());
38
39 // Participates in discovery to find the arm node.
40 let registry = start_discovery(
41 &static_cfg.my_id,
42 &static_cfg.host,
43 static_cfg.port,
44 static_cfg.is_master,
45 Some(time_sync.clone()),
46 )?;
47
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 // Demo: send 5 distinct RPC commands, each with its own request_id.
51 let commands = vec![
52 ArmCommand {
53 command: "move_to".into(),
54 args: vec!["x:100".into(), "y:200".into()],
55 },
56 ArmCommand {
57 command: "set_speed".into(),
58 args: vec!["50".into()],
59 },
60 ArmCommand {
61 command: "grab".into(),
62 args: vec!["width:30".into()],
63 },
64 ArmCommand {
65 command: "move_to".into(),
66 args: vec!["x:300".into(), "y:400".into()],
67 },
68 ArmCommand {
69 command: "release".into(),
70 args: vec![],
71 },
72 ];
73
74 let mut pending: Vec<u64> = Vec::new();
75
76 for (i, cmd) in commands.into_iter().enumerate() {
77 let rid = i as u64;
78 let payload = bincode::serialize(&cmd)?;
79
80 println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81 bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82 pending.push(rid);
83
84 thread::sleep(Duration::from_millis(200)); // stagger sends
85 }
86
87 // Poll for responses with a timeout.
88 println!(
89 "[control] Waiting for {} response(s) from arm...",
90 pending.len()
91 );
92
93 let mut matched = 0u64;
94 let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96 // Filter: only accept RPC responses on this local_name.
97 // The sub_topic "rpc/status" is used by the arm node for replies.
98 bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100 while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101 // try_recv_response only returns messages with a valid RPC response envelope.
102 if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103 if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104 println!(
105 "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106 status
107 );
108 matched += 1;
109 } else {
110 println!(
111 "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112 );
113 }
114 } else {
115 // No message yet — brief sleep to avoid busy spin.
116 thread::sleep(Duration::from_millis(10));
117 }
118 }
119
120 println!(
121 "[control] Done. {}/{} RPC responses received.",
122 matched,
123 pending.len()
124 );
125 Ok(())
126}Auto Trait Implementations§
impl Freeze for PubSubManager
impl RefUnwindSafe for PubSubManager
impl Send for PubSubManager
impl !Sync for PubSubManager
impl Unpin for PubSubManager
impl UnsafeUnpin for PubSubManager
impl UnwindSafe for PubSubManager
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more