Skip to main content

rpc_control_node/
rpc_control_node.rs

1use std::sync::Arc;
2use std::thread;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7use rs_ctrl_os::{
8    init_logging, load_config_typed, start_discovery, PubSubManager, Result, TimeSynchronizer,
9};
10
11/// Command sent via RPC request to the arm node.
12#[derive(Serialize, Deserialize, Debug)]
13struct ArmCommand {
14    command: String,
15    args: Vec<String>,
16}
17
18/// Status received via RPC response from the arm node.
19#[derive(Serialize, Deserialize, Debug)]
20struct ArmStatus {
21    status: String, // "ok", "error", "busy"
22    result: String,
23}
24
25#[derive(Clone, Deserialize)]
26struct DynamicCfg {}
27
28fn main() -> Result<()> {
29    init_logging();
30
31    let config_path = std::env::args()
32        .nth(1)
33        .unwrap_or_else(|| "rpc_control_config.toml".to_string());
34
35    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
36
37    let time_sync = Arc::new(TimeSynchronizer::new());
38
39    // Participates in discovery to find the arm node.
40    let registry = start_discovery(
41        &static_cfg.my_id,
42        &static_cfg.host,
43        static_cfg.port,
44        static_cfg.is_master,
45        Some(time_sync.clone()),
46    )?;
47
48    let mut bus = PubSubManager::new(&static_cfg, registry)?;
49
50    // Demo: send 5 distinct RPC commands, each with its own request_id.
51    let commands = vec![
52        ArmCommand {
53            command: "move_to".into(),
54            args: vec!["x:100".into(), "y:200".into()],
55        },
56        ArmCommand {
57            command: "set_speed".into(),
58            args: vec!["50".into()],
59        },
60        ArmCommand {
61            command: "grab".into(),
62            args: vec!["width:30".into()],
63        },
64        ArmCommand {
65            command: "move_to".into(),
66            args: vec!["x:300".into(), "y:400".into()],
67        },
68        ArmCommand {
69            command: "release".into(),
70            args: vec![],
71        },
72    ];
73
74    let mut pending: Vec<u64> = Vec::new();
75
76    for (i, cmd) in commands.into_iter().enumerate() {
77        let rid = i as u64;
78        let payload = bincode::serialize(&cmd)?;
79
80        println!("[control] Sending RPC request rid={rid} cmd={:?}", cmd);
81        bus.publish_request("cmd", "rpc/cmd", rid, &payload)?;
82        pending.push(rid);
83
84        thread::sleep(Duration::from_millis(200)); // stagger sends
85    }
86
87    // Poll for responses with a timeout.
88    println!(
89        "[control] Waiting for {} response(s) from arm...",
90        pending.len()
91    );
92
93    let mut matched = 0u64;
94    let deadline = std::time::Instant::now() + Duration::from_secs(5);
95
96    // Filter: only accept RPC responses on this local_name.
97    // The sub_topic "rpc/status" is used by the arm node for replies.
98    bus.set_sub_topics("ctrl_resp", &["rpc/status"])?;
99
100    while matched < pending.len() as u64 && std::time::Instant::now() < deadline {
101        // try_recv_response only returns messages with a valid RPC response envelope.
102        if let Some((sender, rid, topic, payload)) = bus.try_recv_response("ctrl_resp")? {
103            if let Ok(status) = bincode::deserialize::<ArmStatus>(&payload) {
104                println!(
105                    "[control] RPC response rid={rid} from={sender} topic={topic} status={:?}",
106                    status
107                );
108                matched += 1;
109            } else {
110                println!(
111                    "[control] RPC response rid={rid} from={sender} <deserialize failed>"
112                );
113            }
114        } else {
115            // No message yet — brief sleep to avoid busy spin.
116            thread::sleep(Duration::from_millis(10));
117        }
118    }
119
120    println!(
121        "[control] Done. {}/{} RPC responses received.",
122        matched,
123        pending.len()
124    );
125    Ok(())
126}