pub struct TimeSynchronizer { /* private fields */ }Implementations§
Source§impl TimeSynchronizer
impl TimeSynchronizer
Sourcepub fn new() -> Self
pub fn new() -> Self
Examples found in repository?
examples/pub_node.rs (line 29)
18fn main() -> Result<()> {
19 init_logging();
20
21 let config_path = std::env::args()
22 .nth(1)
23 .unwrap_or_else(|| "pub_config.toml".to_string());
24
25 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
26 let static_cfg = manager.static_cfg().clone();
27
28 // Time synchronizer for this node (acts as master in this simple setup)
29 let time_sync = Arc::new(TimeSynchronizer::new());
30
31 // Start discovery so other nodes can find us; we also get a live registry back.
32 let registry = start_discovery(
33 &static_cfg.my_id,
34 &static_cfg.host,
35 static_cfg.port,
36 static_cfg.is_master,
37 Some(time_sync.clone()),
38 )?;
39
40 // Publisher only: one PUB socket for topic "control"
41 // 频率由 static_config 的 publish_hz/subscribe_hz 提供,new() 时已注入
42 let mut bus = PubSubManager::new(&static_cfg, registry)?;
43
44 loop {
45 let dyn_cfg = manager.get_dynamic_clone();
46 let ts_ms = time_sync.now_corrected_ms();
47
48 let payload = format!(
49 "{} from {} at {} ms",
50 dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
51 );
52
53 // Single pub: topic key "control", sub-topic "demo"
54 bus.publish_topic("control", "demo", &payload)?;
55
56 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
57 }
58}More examples
examples/sub_node.rs (line 36)
27fn main() -> Result<()> {
28 init_logging();
29
30 let config_path = std::env::args()
31 .nth(1)
32 .unwrap_or_else(|| "sub_config.toml".to_string());
33
34 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
35
36 let time_sync = Arc::new(TimeSynchronizer::new());
37
38 // Subscriber participates in discovery so it can learn where `pub_node` is.
39 let registry = start_discovery(
40 &static_cfg.my_id,
41 &static_cfg.host,
42 static_cfg.port,
43 static_cfg.is_master,
44 Some(time_sync.clone()),
45 )?;
46
47 // Subscriber only: one SUB socket named "local_sub"
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 loop {
51 // try_recv_raw 内部自动 tick(),无需手动调用
52 // 返回值 (sender_id, sub_topic, payload) 告知消息来自哪个节点
53 while let Some((sender, topic, raw)) = bus.try_recv_raw("local_sub")? {
54 // rs_ctrl_os publish_topic() uses bincode; can_bridge currently publishes a String(JSON) under sub_topic="data".
55 if let Ok(s) = bincode::deserialize::<String>(&raw) {
56 println!("Sub from={sender} sub_topic={topic} string={s}");
57 } else {
58 let as_utf8 = std::str::from_utf8(&raw).ok();
59 println!(
60 "Sub from={sender} sub_topic={topic} len={} utf8={} hex={}",
61 raw.len(),
62 as_utf8.unwrap_or("<non-utf8>"),
63 fmt_hex(&raw)
64 );
65 }
66 }
67
68 // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
69 thread::sleep(Duration::from_millis(1));
70 }
71}examples/multi_pub_node.rs (line 48)
38fn main() -> Result<()> {
39 init_logging();
40
41 let config_path = std::env::args()
42 .nth(1)
43 .unwrap_or_else(|| "multi_pub_config.toml".to_string());
44
45 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
46 let static_cfg = manager.static_cfg().clone();
47
48 let time_sync = Arc::new(TimeSynchronizer::new());
49
50 // This node also participates in discovery (acts as master here).
51 let registry = start_discovery(
52 &static_cfg.my_id,
53 &static_cfg.host,
54 static_cfg.port,
55 static_cfg.is_master,
56 Some(time_sync.clone()),
57 )?;
58
59 // One process, single PUB socket (topic key "control"), multiple sub-topics
60 let mut bus = PubSubManager::new(&static_cfg, registry)?;
61
62 let mut counter: u64 = 0;
63
64 loop {
65 let dyn_cfg = manager.get_dynamic_clone();
66 let ts_ms = time_sync.now_corrected_ms();
67 counter = counter.wrapping_add(1);
68
69 // control sub-topic
70 let control_msg = format!(
71 "[control] {} #{counter} at {} ms",
72 dyn_cfg.control_sub_topic, ts_ms
73 );
74 bus.publish_topic("control", "demo_control", &control_msg)?;
75
76 // status sub-topic
77 let status_msg = format!(
78 "[status] {} #{counter} at {} ms",
79 dyn_cfg.status_sub_topic, ts_ms
80 );
81 bus.publish_topic("control", "demo_status", &status_msg)?;
82
83 // alerts sub-topic
84 let alerts_msg = format!(
85 "[alerts] {} #{counter} at {} ms",
86 dyn_cfg.alerts_sub_topic, ts_ms
87 );
88 bus.publish_topic("control", "demo_alerts", &alerts_msg)?;
89
90 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
91 }
92}examples/rpc_arm_node.rs (line 75)
66fn main() -> Result<()> {
67 init_logging();
68
69 let config_path = std::env::args()
70 .nth(1)
71 .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75 let time_sync = Arc::new(TimeSynchronizer::new());
76
77 let registry = start_discovery(
78 &static_cfg.my_id,
79 &static_cfg.host,
80 static_cfg.port,
81 static_cfg.is_master,
82 Some(time_sync.clone()),
83 )?;
84
85 let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87 // Only accept RPC request messages with sub_topic "rpc/cmd".
88 bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90 println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92 // Receive up to 10 requests then exit (demonstration limit).
93 let mut count = 0;
94 while count < 10 {
95 // try_recv_request returns only messages with a valid RPC request envelope.
96 if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97 if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98 println!(
99 "[arm] RPC request #{} rid={} from={} topic={} cmd={:?}",
100 count, rid, sender, topic, cmd
101 );
102
103 let status = handle_command(&cmd);
104 let response = bincode::serialize(&status)?;
105
106 // Send response back to the control node.
107 bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108 println!("[arm] -> reply: {:?}", status);
109 count += 1;
110 } else {
111 println!("[arm] RPC request rid={} from={} <deserialize failed>", rid, sender);
112 }
113 } else {
114 thread::sleep(Duration::from_millis(10));
115 }
116 }
117
118 println!("[arm] Processed {} requests, shutting down.", count);
119 Ok(())
120}examples/multi_sub_node.rs (line 24)
15fn main() -> Result<()> {
16 init_logging();
17
18 let config_path = std::env::args()
19 .nth(1)
20 .unwrap_or_else(|| "multi_sub_config.toml".to_string());
21
22 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
23
24 let time_sync = Arc::new(TimeSynchronizer::new());
25
26 // Participate in discovery to learn where `multi_pub` lives.
27 let registry = start_discovery(
28 &static_cfg.my_id,
29 &static_cfg.host,
30 static_cfg.port,
31 static_cfg.is_master,
32 Some(time_sync.clone()),
33 )?;
34
35 let mut bus = PubSubManager::new(&static_cfg, registry)?;
36
37 // 只关心各远端节点下“一个” sub_topic:
38 // - from_multi_pub: 只订阅 demo_status
39 // - from_pub: 只订阅 demo
40 bus.set_sub_topics("from_multi_pub", &["demo_status"])?;
41 bus.set_sub_topics("from_pub", &["demo"])?;
42
43 loop {
44 // try_recv_raw 内部自动 tick(),无需手动调用
45 // 多端口(多远端节点)+ 多子话题:
46 // - "from_multi_pub" 订阅 multi_pub 节点
47 // - "from_pub" 订阅 pub_node 节点
48 for local_name in &["from_multi_pub", "from_pub"] {
49 if let Some((sender, topic, bytes)) = bus.try_recv_raw(local_name)? {
50 if let Ok(text) = bincode::deserialize::<String>(&bytes) {
51 println!(
52 "[multi_sub][dec] from={} local='{}' sub_topic='{}' text=\"{}\"",
53 sender, local_name, topic, text
54 );
55 } else {
56 println!(
57 "[multi_sub][dec] from={} local='{}' sub_topic='{}' <failed to deserialize as String>",
58 sender, local_name, topic
59 );
60 }
61 }
62 }
63
64 // 简单 sleep,订阅限频由 static_config.subscribe_hz 控制(new 时已注入)
65 thread::sleep(Duration::from_millis(1));
66 }
67}examples/rpc_control_node.rs (line 37)
28fn main() -> Result<()> {
29 init_logging();
30
31 let config_path = std::env::args()
32 .nth(1)
33 .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37 let time_sync = Arc::new(TimeSynchronizer::new());
38
39 // Participates in discovery to find the arm node.
40 let registry = start_discovery(
41 &static_cfg.my_id,
42 &static_cfg.host,
43 static_cfg.port,
44 static_cfg.is_master,
45 Some(time_sync.clone()),
46 )?;
47
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 // Demo: send 5 distinct RPC commands, each with its own request_id.
51 let commands = vec![
52 ArmCommand {
53 command: "move_to".into(),
54 args: vec!["x:100".into(), "y:200".into()],
55 },
56 ArmCommand {
57 command: "set_speed".into(),
58 args: vec!["50".into()],
59 },
60 ArmCommand {
61 command: "grab".into(),
62 args: vec!["width:30".into()],
63 },
64 ArmCommand {
65 command: "move_to".into(),
66 args: vec!["x:300".into(), "y:400".into()],
67 },
68 ArmCommand {
69 command: "release".into(),
70 args: vec![],
71 },
72 ];
73
74 let mut pending: Vec<u64> = Vec::new();
75
76 for (i, cmd) in commands.into_iter().enumerate() {
77 let rid = i as u64;
78 let payload = bincode::serialize(&cmd)?;
79
80 println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81 bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82 pending.push(rid);
83
84 thread::sleep(Duration::from_millis(200)); // stagger sends
85 }
86
87 // Poll for responses with a timeout.
88 println!(
89 "[control] Waiting for {} response(s) from arm...",
90 pending.len()
91 );
92
93 let mut matched = 0u64;
94 let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96 // Filter: only accept RPC responses on this local_name.
97 // The sub_topic "rpc/status" is used by the arm node for replies.
98 bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100 while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101 // try_recv_response only returns messages with a valid RPC response envelope.
102 if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103 if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104 println!(
105 "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106 status
107 );
108 matched += 1;
109 } else {
110 println!(
111 "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112 );
113 }
114 } else {
115 // No message yet — brief sleep to avoid busy spin.
116 thread::sleep(Duration::from_millis(10));
117 }
118 }
119
120 println!(
121 "[control] Done. {}/{} RPC responses received.",
122 matched,
123 pending.len()
124 );
125 Ok(())
126}pub fn update_from_master(&self, master_id: &str, master_ts_ms: u64)
Sourcepub fn now_corrected_ms(&self) -> u64
pub fn now_corrected_ms(&self) -> u64
Examples found in repository?
examples/pub_node.rs (line 46)
18fn main() -> Result<()> {
19 init_logging();
20
21 let config_path = std::env::args()
22 .nth(1)
23 .unwrap_or_else(|| "pub_config.toml".to_string());
24
25 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
26 let static_cfg = manager.static_cfg().clone();
27
28 // Time synchronizer for this node (acts as master in this simple setup)
29 let time_sync = Arc::new(TimeSynchronizer::new());
30
31 // Start discovery so other nodes can find us; we also get a live registry back.
32 let registry = start_discovery(
33 &static_cfg.my_id,
34 &static_cfg.host,
35 static_cfg.port,
36 static_cfg.is_master,
37 Some(time_sync.clone()),
38 )?;
39
40 // Publisher only: one PUB socket for topic "control"
41 // 频率由 static_config 的 publish_hz/subscribe_hz 提供,new() 时已注入
42 let mut bus = PubSubManager::new(&static_cfg, registry)?;
43
44 loop {
45 let dyn_cfg = manager.get_dynamic_clone();
46 let ts_ms = time_sync.now_corrected_ms();
47
48 let payload = format!(
49 "{} from {} at {} ms",
50 dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
51 );
52
53 // Single pub: topic key "control", sub-topic "demo"
54 bus.publish_topic("control", "demo", &payload)?;
55
56 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
57 }
58}More examples
examples/multi_pub_node.rs (line 66)
38fn main() -> Result<()> {
39 init_logging();
40
41 let config_path = std::env::args()
42 .nth(1)
43 .unwrap_or_else(|| "multi_pub_config.toml".to_string());
44
45 let manager: ConfigManager<DynamicCfg> = ConfigManager::new(Path::new(&config_path))?;
46 let static_cfg = manager.static_cfg().clone();
47
48 let time_sync = Arc::new(TimeSynchronizer::new());
49
50 // This node also participates in discovery (acts as master here).
51 let registry = start_discovery(
52 &static_cfg.my_id,
53 &static_cfg.host,
54 static_cfg.port,
55 static_cfg.is_master,
56 Some(time_sync.clone()),
57 )?;
58
59 // One process, single PUB socket (topic key "control"), multiple sub-topics
60 let mut bus = PubSubManager::new(&static_cfg, registry)?;
61
62 let mut counter: u64 = 0;
63
64 loop {
65 let dyn_cfg = manager.get_dynamic_clone();
66 let ts_ms = time_sync.now_corrected_ms();
67 counter = counter.wrapping_add(1);
68
69 // control sub-topic
70 let control_msg = format!(
71 "[control] {} #{counter} at {} ms",
72 dyn_cfg.control_sub_topic, ts_ms
73 );
74 bus.publish_topic("control", "demo_control", &control_msg)?;
75
76 // status sub-topic
77 let status_msg = format!(
78 "[status] {} #{counter} at {} ms",
79 dyn_cfg.status_sub_topic, ts_ms
80 );
81 bus.publish_topic("control", "demo_status", &status_msg)?;
82
83 // alerts sub-topic
84 let alerts_msg = format!(
85 "[alerts] {} #{counter} at {} ms",
86 dyn_cfg.alerts_sub_topic, ts_ms
87 );
88 bus.publish_topic("control", "demo_alerts", &alerts_msg)?;
89
90 thread::sleep(Duration::from_millis(dyn_cfg.interval_ms));
91 }
92}pub fn is_synced(&self) -> bool
Auto Trait Implementations§
impl Freeze for TimeSynchronizer
impl RefUnwindSafe for TimeSynchronizer
impl Send for TimeSynchronizer
impl Sync for TimeSynchronizer
impl Unpin for TimeSynchronizer
impl UnsafeUnpin for TimeSynchronizer
impl UnwindSafe for TimeSynchronizer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more