Skip to main content

mecha10_nodes_teleop/
lib.rs

1//! Teleoperation Coordination Node
2//!
3//! Arbitrates between multiple teleoperation input sources and publishes
4//! velocity commands to the motor controller. Provides safety features like
5//! command timeout, priority arbitration, and emergency stop.
6//!
7//! # Architecture
8//!
9//! This node is a **pure coordination layer** with no UI. Multiple control
10//! interfaces (CLI, dashboard, game controller) publish to `/teleop/cmd`,
11//! and this node decides which commands to forward to `/motor/cmd_vel`.
12//!
13//! # Topics
14//!
15//! - **Subscribes**: `/teleop/cmd` (TeleopInput) - Commands from control sources
16//! - **Publishes**: `/motor/cmd_vel` (Twist) - Velocity commands to motor
17//! - **Publishes**: `/teleop/status` (TeleopStatus) - Current state for UIs
18
19mod config;
20
21pub use config::TeleopConfig;
22use mecha10_core::actuator::Twist;
23use mecha10_core::prelude::*;
24use mecha10_core::teleop::{TeleopInput, TeleopStatus};
25use mecha10_core::topics::Topic;
26use std::time::Duration;
27
28/// Teleop node topics
29pub mod topics {
30    use super::*;
31
32    // Input topics
33    pub const TELEOP_CMD: Topic<TeleopInput> = Topic::new("/teleop/cmd");
34
35    // Output topics
36    pub const CMD_VEL: Topic<Twist> = Topic::new("/motor/cmd_vel");
37    pub const TELEOP_STATUS: Topic<TeleopStatus> = Topic::new("/teleop/status");
38}
39
40/// Teleoperation coordination node state
41pub struct TeleopNode {
42    config: TeleopConfig,
43    status: TeleopStatus,
44    current_linear: f32,
45    current_angular: f32,
46    target_linear: f32,
47    target_angular: f32,
48    last_input: Option<TeleopInput>,
49}
50
51impl std::fmt::Debug for TeleopNode {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.debug_struct("TeleopNode")
54            .field("config", &self.config)
55            .field("status", &self.status)
56            .finish()
57    }
58}
59
60#[async_trait]
61impl NodeImpl for TeleopNode {
62    type Config = TeleopConfig;
63
64    async fn init(config: Self::Config) -> Result<Self> {
65        info!("Initializing teleop coordination node");
66        info!(
67            "Max linear: {} m/s, Max angular: {} rad/s, Timeout: {} ms",
68            config.max_linear_vel, config.max_angular_vel, config.command_timeout_ms
69        );
70
71        Ok(Self {
72            config,
73            status: TeleopStatus::new(),
74            current_linear: 0.0,
75            current_angular: 0.0,
76            target_linear: 0.0,
77            target_angular: 0.0,
78            last_input: None,
79        })
80    }
81
82    async fn run(&mut self, ctx: &Context) -> Result<()> {
83        info!("Teleop coordination node running");
84        info!("Subscribing to: /teleop/cmd");
85        info!("Publishing to: /motor/cmd_vel, /teleop/status");
86        info!(
87            "Output rate: {} Hz, Smoothing: {} (factor: {})",
88            self.config.output_rate_hz, self.config.enable_smoothing, self.config.smoothing_factor
89        );
90
91        // Subscribe to teleop commands
92        let mut commands = ctx.subscribe(topics::TELEOP_CMD).await?;
93
94        // Status publish interval
95        let status_interval = Duration::from_secs_f32(1.0 / self.config.status_publish_rate);
96        let mut status_ticker = tokio::time::interval(status_interval);
97        status_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
98
99        // Timeout check interval (check more frequently than timeout)
100        let timeout_check_interval = Duration::from_millis(self.config.command_timeout_ms / 2);
101        let mut timeout_ticker = tokio::time::interval(timeout_check_interval);
102        timeout_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
103
104        // Continuous output loop - publishes at fixed rate regardless of input timing
105        // This decouples network jitter from motor output for smooth control
106        let output_interval = Duration::from_secs_f32(1.0 / self.config.output_rate_hz);
107        let mut output_ticker = tokio::time::interval(output_interval);
108        output_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
109
110        loop {
111            tokio::select! {
112                // New teleop command received - updates target velocity
113                Some(input) = commands.recv() => {
114                    self.handle_input(input).await?;
115                }
116
117                // Continuous output - applies smoothing and publishes at fixed rate
118                _ = output_ticker.tick() => {
119                    self.update_and_publish(ctx).await?;
120                }
121
122                // Publish status periodically
123                _ = status_ticker.tick() => {
124                    self.publish_status(ctx).await?;
125                }
126
127                // Check for command timeout
128                _ = timeout_ticker.tick() => {
129                    self.check_timeout(ctx).await?;
130                }
131            }
132        }
133    }
134}
135
136impl TeleopNode {
137    /// Handle incoming teleop input - updates target velocities only
138    /// Actual publishing happens in the continuous output loop
139    async fn handle_input(&mut self, input: TeleopInput) -> Result<()> {
140        let current_time = now_micros();
141
142        // Check if command is expired
143        if input.is_expired(current_time, self.config.command_timeout_ms) {
144            debug!(
145                "Ignoring expired command from {} (age: {} ms)",
146                input.source,
147                input.age_ms(current_time)
148            );
149            return Ok(());
150        }
151
152        // Check if this source has priority over current source
153        let should_accept = if let Some(last) = &self.last_input {
154            input.priority >= last.priority
155        } else {
156            true
157        };
158
159        if !should_accept {
160            debug!(
161                "Rejecting lower priority command from {} (priority: {}, current: {})",
162                input.source,
163                input.priority,
164                self.last_input.as_ref().unwrap().priority
165            );
166            return Ok(());
167        }
168
169        // Handle emergency stop - set targets to zero immediately
170        if input.emergency_stop {
171            warn!("Emergency stop from {}", input.source);
172            self.target_linear = 0.0;
173            self.target_angular = 0.0;
174            self.current_linear = 0.0;
175            self.current_angular = 0.0;
176            self.status.safety_engaged = true;
177            self.last_input = Some(input.clone());
178            self.update_status(&input);
179            return Ok(());
180        }
181
182        // Clamp velocities to configured limits
183        let target_linear = input
184            .linear
185            .clamp(-self.config.max_linear_vel, self.config.max_linear_vel);
186        let target_angular = input
187            .angular
188            .clamp(-self.config.max_angular_vel, self.config.max_angular_vel);
189
190        // Apply smoothing once if enabled, otherwise use target directly
191        if self.config.enable_smoothing {
192            let alpha = self.config.smoothing_factor;
193            self.current_linear += alpha * (target_linear - self.current_linear);
194            self.current_angular += alpha * (target_angular - self.current_angular);
195        } else {
196            self.current_linear = target_linear;
197            self.current_angular = target_angular;
198        }
199
200        // Store targets for reference
201        self.target_linear = target_linear;
202        self.target_angular = target_angular;
203
204        // Reset safety engaged when new valid command arrives
205        if self.status.safety_engaged {
206            debug!("Releasing safety stop, resuming normal operation");
207            self.status.safety_engaged = false;
208        }
209
210        // Update state
211        self.last_input = Some(input.clone());
212        self.update_status(&input);
213
214        debug!(
215            "Command from {} (priority: {}): target=({:.2}, {:.2}), current=({:.2}, {:.2})",
216            input.source, input.priority, target_linear, target_angular, self.current_linear, self.current_angular
217        );
218
219        Ok(())
220    }
221
222    /// Continuous output loop - republishes current velocity at fixed rate
223    /// This ensures consistent output to simulation-bridge regardless of input timing
224    async fn update_and_publish(&mut self, ctx: &Context) -> Result<()> {
225        // Don't publish if no input received yet
226        if self.last_input.is_none() {
227            return Ok(());
228        }
229
230        // Don't publish if safety is engaged
231        if self.status.safety_engaged {
232            return Ok(());
233        }
234
235        // Just republish the current velocity - no additional smoothing here
236        // Smoothing is applied once when input arrives in handle_input
237        self.publish_velocity(ctx).await?;
238
239        Ok(())
240    }
241
242    /// Check for command timeout and stop if needed
243    async fn check_timeout(&mut self, ctx: &Context) -> Result<()> {
244        if let Some(last) = &self.last_input {
245            let current_time = now_micros();
246            let age_ms = last.age_ms(current_time);
247
248            if age_ms > self.config.command_timeout_ms && !self.status.safety_engaged {
249                warn!(
250                    "Command timeout ({}ms > {}ms), engaging safety stop",
251                    age_ms, self.config.command_timeout_ms
252                );
253                self.safety_stop(ctx).await?;
254            }
255        }
256
257        Ok(())
258    }
259
260    /// Safety stop - timeout or fault
261    async fn safety_stop(&mut self, ctx: &Context) -> Result<()> {
262        // Reset all velocities to zero immediately
263        self.target_linear = 0.0;
264        self.target_angular = 0.0;
265        self.current_linear = 0.0;
266        self.current_angular = 0.0;
267        self.status.safety_engaged = true;
268        self.status.active_source = "none".to_string();
269
270        // Publish zero velocity
271        self.publish_velocity(ctx).await?;
272
273        Ok(())
274    }
275
276    /// Publish velocity command to motor
277    async fn publish_velocity(&self, ctx: &Context) -> Result<()> {
278        let twist = Twist {
279            linear: self.current_linear,
280            angular: self.current_angular,
281        };
282
283        ctx.publish_to(topics::CMD_VEL, &twist).await?;
284
285        Ok(())
286    }
287
288    /// Publish status for UIs
289    async fn publish_status(&self, ctx: &Context) -> Result<()> {
290        ctx.publish_to(topics::TELEOP_STATUS, &self.status).await?;
291        Ok(())
292    }
293
294    /// Update status from input
295    fn update_status(&mut self, input: &TeleopInput) {
296        self.status.active_source = input.source.clone();
297        self.status.active_priority = input.priority;
298        self.status.last_command_age_ms = 0;
299        self.status.safety_engaged = input.emergency_stop;
300        self.status.current_linear = self.current_linear;
301        self.status.current_angular = self.current_angular;
302        self.status.total_commands += 1;
303    }
304}
305
306/// Run the teleop node with V2 configuration system
307pub async fn run() -> Result<()> {
308    // Create context first (needed for config loading)
309    let ctx = Context::new("teleop").await?;
310
311    // Load config using V2 system with environment-aware file-level fallback
312    let config: TeleopConfig = ctx.load_node_config("teleop").await?;
313
314    // Run the node with Normal priority (teleop coordination)
315    run_node::<TeleopNode>(config, ctx, HealthReportingConfig::normal()).await
316}