rpc_control_node/
rpc_control_node.rs1use std::sync::Arc;
2use std::thread;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7use rs_ctrl_os::{
8 init_logging, load_config_typed, start_discovery, PubSubManager, Result, TimeSynchronizer,
9};
10
11#[derive(Serialize, Deserialize, Debug)]
13struct ArmCommand {
14 command: String,
15 args: Vec<String>,
16}
17
18#[derive(Serialize, Deserialize, Debug)]
20struct ArmStatus {
21 status: String, result: String,
23}
24
25#[derive(Clone, Deserialize)]
26struct DynamicCfg {}
27
28fn main() -> Result<()> {
29 init_logging();
30
31 let config_path = std::env::args()
32 .nth(1)
33 .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35 let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37 let time_sync = Arc::new(TimeSynchronizer::new());
38
39 let registry = start_discovery(
41 &static_cfg.my_id,
42 &static_cfg.host,
43 static_cfg.port,
44 static_cfg.is_master,
45 Some(time_sync.clone()),
46 )?;
47
48 let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50 let commands = vec![
52 ArmCommand {
53 command: "move_to".into(),
54 args: vec!["x:100".into(), "y:200".into()],
55 },
56 ArmCommand {
57 command: "set_speed".into(),
58 args: vec!["50".into()],
59 },
60 ArmCommand {
61 command: "grab".into(),
62 args: vec!["width:30".into()],
63 },
64 ArmCommand {
65 command: "move_to".into(),
66 args: vec!["x:300".into(), "y:400".into()],
67 },
68 ArmCommand {
69 command: "release".into(),
70 args: vec![],
71 },
72 ];
73
74 let mut pending: Vec<u64> = Vec::new();
75
76 for (i, cmd) in commands.into_iter().enumerate() {
77 let rid = i as u64;
78 let payload = bincode::serialize(&cmd)?;
79
80 println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81 bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82 pending.push(rid);
83
84 thread::sleep(Duration::from_millis(200)); }
86
87 println!(
89 "[control] Waiting for {} response(s) from arm...",
90 pending.len()
91 );
92
93 let mut matched = 0u64;
94 let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96 bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100 while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101 if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103 if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104 println!(
105 "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106 status
107 );
108 matched += 1;
109 } else {
110 println!(
111 "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112 );
113 }
114 } else {
115 thread::sleep(Duration::from_millis(10));
117 }
118 }
119
120 println!(
121 "[control] Done. {}/{} RPC responses received.",
122 matched,
123 pending.len()
124 );
125 Ok(())
126}