mecha10_nodes_teleop/
lib.rs1mod config;
20
21pub use config::TeleopConfig;
22use mecha10_core::actuator::Twist;
23use mecha10_core::prelude::*;
24use mecha10_core::teleop::{TeleopInput, TeleopStatus};
25use mecha10_core::topics::Topic;
26use std::time::Duration;
27
28pub mod topics {
30 use super::*;
31
32 pub const TELEOP_CMD: Topic<TeleopInput> = Topic::new("/teleop/cmd");
34
35 pub const CMD_VEL: Topic<Twist> = Topic::new("/motor/cmd_vel");
37 pub const TELEOP_STATUS: Topic<TeleopStatus> = Topic::new("/teleop/status");
38}
39
40pub struct TeleopNode {
42 config: TeleopConfig,
43 status: TeleopStatus,
44 current_linear: f32,
45 current_angular: f32,
46 target_linear: f32,
47 target_angular: f32,
48 last_input: Option<TeleopInput>,
49}
50
51impl std::fmt::Debug for TeleopNode {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.debug_struct("TeleopNode")
54 .field("config", &self.config)
55 .field("status", &self.status)
56 .finish()
57 }
58}
59
60#[async_trait]
61impl NodeImpl for TeleopNode {
62 type Config = TeleopConfig;
63
64 async fn init(config: Self::Config) -> Result<Self> {
65 info!("Initializing teleop coordination node");
66 info!(
67 "Max linear: {} m/s, Max angular: {} rad/s, Timeout: {} ms",
68 config.max_linear_vel, config.max_angular_vel, config.command_timeout_ms
69 );
70
71 Ok(Self {
72 config,
73 status: TeleopStatus::new(),
74 current_linear: 0.0,
75 current_angular: 0.0,
76 target_linear: 0.0,
77 target_angular: 0.0,
78 last_input: None,
79 })
80 }
81
82 async fn run(&mut self, ctx: &Context) -> Result<()> {
83 info!("Teleop coordination node running");
84 info!("Subscribing to: /teleop/cmd");
85 info!("Publishing to: /motor/cmd_vel, /teleop/status");
86 info!(
87 "Output rate: {} Hz, Smoothing: {} (factor: {})",
88 self.config.output_rate_hz, self.config.enable_smoothing, self.config.smoothing_factor
89 );
90
91 let mut commands = ctx.subscribe(topics::TELEOP_CMD).await?;
93
94 let status_interval = Duration::from_secs_f32(1.0 / self.config.status_publish_rate);
96 let mut status_ticker = tokio::time::interval(status_interval);
97 status_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
98
99 let timeout_check_interval = Duration::from_millis(self.config.command_timeout_ms / 2);
101 let mut timeout_ticker = tokio::time::interval(timeout_check_interval);
102 timeout_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
103
104 let output_interval = Duration::from_secs_f32(1.0 / self.config.output_rate_hz);
107 let mut output_ticker = tokio::time::interval(output_interval);
108 output_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
109
110 loop {
111 tokio::select! {
112 Some(input) = commands.recv() => {
114 self.handle_input(input).await?;
115 }
116
117 _ = output_ticker.tick() => {
119 self.update_and_publish(ctx).await?;
120 }
121
122 _ = status_ticker.tick() => {
124 self.publish_status(ctx).await?;
125 }
126
127 _ = timeout_ticker.tick() => {
129 self.check_timeout(ctx).await?;
130 }
131 }
132 }
133 }
134}
135
136impl TeleopNode {
137 async fn handle_input(&mut self, input: TeleopInput) -> Result<()> {
140 let current_time = now_micros();
141
142 if input.is_expired(current_time, self.config.command_timeout_ms) {
144 debug!(
145 "Ignoring expired command from {} (age: {} ms)",
146 input.source,
147 input.age_ms(current_time)
148 );
149 return Ok(());
150 }
151
152 let should_accept = if let Some(last) = &self.last_input {
154 input.priority >= last.priority
155 } else {
156 true
157 };
158
159 if !should_accept {
160 debug!(
161 "Rejecting lower priority command from {} (priority: {}, current: {})",
162 input.source,
163 input.priority,
164 self.last_input.as_ref().unwrap().priority
165 );
166 return Ok(());
167 }
168
169 if input.emergency_stop {
171 warn!("Emergency stop from {}", input.source);
172 self.target_linear = 0.0;
173 self.target_angular = 0.0;
174 self.current_linear = 0.0;
175 self.current_angular = 0.0;
176 self.status.safety_engaged = true;
177 self.last_input = Some(input.clone());
178 self.update_status(&input);
179 return Ok(());
180 }
181
182 let target_linear = input
184 .linear
185 .clamp(-self.config.max_linear_vel, self.config.max_linear_vel);
186 let target_angular = input
187 .angular
188 .clamp(-self.config.max_angular_vel, self.config.max_angular_vel);
189
190 if self.config.enable_smoothing {
192 let alpha = self.config.smoothing_factor;
193 self.current_linear += alpha * (target_linear - self.current_linear);
194 self.current_angular += alpha * (target_angular - self.current_angular);
195 } else {
196 self.current_linear = target_linear;
197 self.current_angular = target_angular;
198 }
199
200 self.target_linear = target_linear;
202 self.target_angular = target_angular;
203
204 if self.status.safety_engaged {
206 debug!("Releasing safety stop, resuming normal operation");
207 self.status.safety_engaged = false;
208 }
209
210 self.last_input = Some(input.clone());
212 self.update_status(&input);
213
214 debug!(
215 "Command from {} (priority: {}): target=({:.2}, {:.2}), current=({:.2}, {:.2})",
216 input.source, input.priority, target_linear, target_angular, self.current_linear, self.current_angular
217 );
218
219 Ok(())
220 }
221
222 async fn update_and_publish(&mut self, ctx: &Context) -> Result<()> {
225 if self.last_input.is_none() {
227 return Ok(());
228 }
229
230 if self.status.safety_engaged {
232 return Ok(());
233 }
234
235 self.publish_velocity(ctx).await?;
238
239 Ok(())
240 }
241
242 async fn check_timeout(&mut self, ctx: &Context) -> Result<()> {
244 if let Some(last) = &self.last_input {
245 let current_time = now_micros();
246 let age_ms = last.age_ms(current_time);
247
248 if age_ms > self.config.command_timeout_ms && !self.status.safety_engaged {
249 warn!(
250 "Command timeout ({}ms > {}ms), engaging safety stop",
251 age_ms, self.config.command_timeout_ms
252 );
253 self.safety_stop(ctx).await?;
254 }
255 }
256
257 Ok(())
258 }
259
260 async fn safety_stop(&mut self, ctx: &Context) -> Result<()> {
262 self.target_linear = 0.0;
264 self.target_angular = 0.0;
265 self.current_linear = 0.0;
266 self.current_angular = 0.0;
267 self.status.safety_engaged = true;
268 self.status.active_source = "none".to_string();
269
270 self.publish_velocity(ctx).await?;
272
273 Ok(())
274 }
275
276 async fn publish_velocity(&self, ctx: &Context) -> Result<()> {
278 let twist = Twist {
279 linear: self.current_linear,
280 angular: self.current_angular,
281 };
282
283 ctx.publish_to(topics::CMD_VEL, &twist).await?;
284
285 Ok(())
286 }
287
288 async fn publish_status(&self, ctx: &Context) -> Result<()> {
290 ctx.publish_to(topics::TELEOP_STATUS, &self.status).await?;
291 Ok(())
292 }
293
294 fn update_status(&mut self, input: &TeleopInput) {
296 self.status.active_source = input.source.clone();
297 self.status.active_priority = input.priority;
298 self.status.last_command_age_ms = 0;
299 self.status.safety_engaged = input.emergency_stop;
300 self.status.current_linear = self.current_linear;
301 self.status.current_angular = self.current_angular;
302 self.status.total_commands += 1;
303 }
304}
305
306pub async fn run() -> Result<()> {
308 let ctx = Context::new("teleop").await?;
310
311 let config: TeleopConfig = ctx.load_node_config("teleop").await?;
313
314 run_node::<TeleopNode>(config, ctx, HealthReportingConfig::normal()).await
316}