Skip to main content

rpc_arm_node/
rpc_arm_node.rs

1use std::sync::Arc;
2use std::thread;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7use rs_ctrl_os::{
8    init_logging, load_config_typed, start_discovery, PubSubManager, Result, TimeSynchronizer,
9};
10
11/// Command received via RPC request from the control node.
12#[derive(Serialize, Deserialize, Debug)]
13struct ArmCommand {
14    command: String,
15    args: Vec<String>,
16}
17
18/// Status sent back via RPC response.
19#[derive(Serialize, Deserialize, Debug)]
20struct ArmStatus {
21    status: String,
22    result: String,
23}
24
25/// Process one command and produce a status reply.
26fn handle_command(cmd: &ArmCommand) -> ArmStatus {
27    match cmd.command.as_str() {
28        "move_to" => {
29            thread::sleep(Duration::from_millis(100));
30            ArmStatus {
31                status: "ok".into(),
32                result: format!("moved to {}", cmd.args.join(", ")),
33            }
34        }
35        "set_speed" => {
36            thread::sleep(Duration::from_millis(50));
37            ArmStatus {
38                status: "ok".into(),
39                result: format!("speed set to {}", cmd.args.first().unwrap_or(&"?".into())),
40            }
41        }
42        "grab" => {
43            thread::sleep(Duration::from_millis(200));
44            ArmStatus {
45                status: "ok".into(),
46                result: "grab complete".into(),
47            }
48        }
49        "release" => {
50            thread::sleep(Duration::from_millis(80));
51            ArmStatus {
52                status: "ok".into(),
53                result: "released".into(),
54            }
55        }
56        _ => ArmStatus {
57            status: "error".into(),
58            result: format!("unknown command: {}", cmd.command),
59        },
60    }
61}
62
63#[derive(Clone, Deserialize)]
64struct DynamicCfg {}
65
66fn main() -> Result<()> {
67    init_logging();
68
69    let config_path = std::env::args()
70        .nth(1)
71        .unwrap_or_else(|| "rpc_arm_config.toml".to_string());
72
73    let (static_cfg, _dynamic) = load_config_typed::<DynamicCfg>(&config_path)?;
74
75    let time_sync = Arc::new(TimeSynchronizer::new());
76
77    let registry = start_discovery(
78        &static_cfg.my_id,
79        &static_cfg.host,
80        static_cfg.port,
81        static_cfg.is_master,
82        Some(time_sync.clone()),
83    )?;
84
85    let mut bus = PubSubManager::new(&static_cfg, registry)?;
86
87    // Only accept RPC request messages with sub_topic "rpc/cmd".
88    bus.set_sub_topics("arm_cmd", &["rpc/cmd"])?;
89
90    println!("[arm] Listening for RPC requests... (Ctrl+C to stop)");
91
92    // Receive up to 10 requests then exit (demonstration limit).
93    let mut count = 0;
94    while count < 10 {
95        // try_recv_request returns only messages with a valid RPC request envelope.
96        if let Some((sender, rid, topic, payload)) = bus.try_recv_request("arm_cmd")? {
97            if let Ok(cmd) = bincode::deserialize::<ArmCommand>(&payload) {
98                println!(
99                    "[arm]  RPC request #{} rid={} from={} topic={} cmd={:?}",
100                    count, rid, sender, topic, cmd
101                );
102
103                let status = handle_command(&cmd);
104                let response = bincode::serialize(&status)?;
105
106                // Send response back to the control node.
107                bus.publish_response("arm_resp", "rpc/status", rid, &response)?;
108                println!("[arm]  -> reply: {:?}", status);
109                count += 1;
110            } else {
111                println!("[arm]  RPC request rid={} from={} <deserialize failed>", rid, sender);
112            }
113        } else {
114            thread::sleep(Duration::from_millis(10));
115        }
116    }
117
118    println!("[arm] Processed {} requests, shutting down.", count);
119    Ok(())
120}