mecha10_runtime/
process.rs

1//! External process management for node lifecycle
2//!
3//! This module provides process management for external node processes (spawned as child processes),
4//! complementing the Supervisor which manages in-process NodeRunner implementations.
5
6use crate::shutdown::ShutdownHandle;
7use anyhow::Result;
8use std::collections::HashMap;
9use std::process::Child;
10use std::time::Duration;
11use tracing::{debug, error, info, warn};
12
13/// Information about a tracked process
14#[derive(Debug, Clone)]
15pub struct ProcessInfo {
16    /// Process name
17    pub name: String,
18    /// Process ID
19    pub pid: u32,
20    /// Current status
21    pub status: ProcessStatus,
22}
23
24/// Process status
25#[derive(Debug, Clone, PartialEq)]
26pub enum ProcessStatus {
27    /// Process is running
28    Running,
29    /// Process exited with code
30    Exited(i32),
31    /// Process encountered an error
32    Error,
33}
34
35/// Manages external child processes with graceful shutdown and dependency ordering
36///
37/// ProcessManager tracks and manages child processes spawned as external binaries.
38/// It provides:
39/// - Process tracking with PIDs
40/// - Dependency-based shutdown ordering
41/// - Graceful shutdown with configurable timeout
42/// - Force kill fallback
43/// - Integration with runtime's ShutdownHandle
44pub struct ProcessManager {
45    /// Tracked child processes
46    processes: HashMap<String, Child>,
47    /// Dependency graph: node -> dependencies
48    dependency_graph: HashMap<String, Vec<String>>,
49    /// Shutdown handle for coordinating graceful shutdown
50    shutdown_handle: Option<ShutdownHandle>,
51    /// Default shutdown timeout
52    shutdown_timeout: Duration,
53}
54
55impl ProcessManager {
56    /// Create a new process manager
57    pub fn new() -> Self {
58        Self {
59            processes: HashMap::new(),
60            dependency_graph: HashMap::new(),
61            shutdown_handle: None,
62            shutdown_timeout: Duration::from_secs(10),
63        }
64    }
65
66    /// Create a new process manager with shutdown handle
67    pub fn with_shutdown_handle(shutdown_handle: ShutdownHandle) -> Self {
68        Self {
69            processes: HashMap::new(),
70            dependency_graph: HashMap::new(),
71            shutdown_handle: Some(shutdown_handle),
72            shutdown_timeout: Duration::from_secs(10),
73        }
74    }
75
76    /// Set the default shutdown timeout
77    pub fn set_shutdown_timeout(&mut self, timeout: Duration) {
78        self.shutdown_timeout = timeout;
79    }
80
81    /// Add a process to track
82    ///
83    /// # Arguments
84    ///
85    /// * `name` - Process name/identifier
86    /// * `child` - Child process handle
87    pub fn track(&mut self, name: String, child: Child) {
88        let pid = child.id();
89        debug!("Tracking process: {} (PID: {})", name, pid);
90        self.processes.insert(name, child);
91    }
92
93    /// Remove and return a process by name
94    pub fn remove(&mut self, name: &str) -> Option<Child> {
95        self.processes.remove(name)
96    }
97
98    /// Add a dependency relationship
99    ///
100    /// Specifies that `node` depends on `dependency` and should be stopped before it.
101    ///
102    /// # Arguments
103    ///
104    /// * `node` - The node that depends on another
105    /// * `dependency` - The node that is depended upon
106    pub fn add_dependency(&mut self, node: String, dependency: String) {
107        self.dependency_graph.entry(node).or_default().push(dependency);
108    }
109
110    /// Get list of all tracked process names
111    pub fn list(&self) -> Vec<String> {
112        self.processes.keys().cloned().collect()
113    }
114
115    /// Get the number of tracked processes
116    pub fn len(&self) -> usize {
117        self.processes.len()
118    }
119
120    /// Check if any processes are being tracked
121    pub fn is_empty(&self) -> bool {
122        self.processes.is_empty()
123    }
124
125    /// Get status of all processes
126    pub fn status_all(&mut self) -> HashMap<String, ProcessStatus> {
127        let mut status_map = HashMap::new();
128        let mut to_remove = Vec::new();
129
130        for (name, child) in self.processes.iter_mut() {
131            let status = match child.try_wait() {
132                Ok(Some(exit_status)) => {
133                    to_remove.push(name.clone());
134                    ProcessStatus::Exited(exit_status.code().unwrap_or(-1))
135                }
136                Ok(None) => ProcessStatus::Running,
137                Err(_) => {
138                    to_remove.push(name.clone());
139                    ProcessStatus::Error
140                }
141            };
142            status_map.insert(name.clone(), status);
143        }
144
145        // Remove exited/errored processes
146        for name in to_remove {
147            self.processes.remove(&name);
148        }
149
150        status_map
151    }
152
153    /// Get information about all tracked processes
154    pub fn processes(&mut self) -> Vec<ProcessInfo> {
155        let mut processes = Vec::new();
156        let mut to_remove = Vec::new();
157
158        for (name, child) in self.processes.iter_mut() {
159            let pid = child.id();
160            let status = match child.try_wait() {
161                Ok(Some(exit_status)) => {
162                    to_remove.push(name.clone());
163                    ProcessStatus::Exited(exit_status.code().unwrap_or(-1))
164                }
165                Ok(None) => ProcessStatus::Running,
166                Err(_) => {
167                    to_remove.push(name.clone());
168                    ProcessStatus::Error
169                }
170            };
171
172            processes.push(ProcessInfo {
173                name: name.clone(),
174                pid,
175                status,
176            });
177        }
178
179        // Remove exited/errored processes
180        for name in to_remove {
181            self.processes.remove(&name);
182        }
183
184        processes
185    }
186
187    /// Calculate shutdown order based on dependency graph
188    ///
189    /// Returns nodes in the order they should be stopped (dependents before dependencies).
190    pub fn shutdown_order(&self) -> Vec<String> {
191        let mut order = Vec::new();
192        let mut visited = std::collections::HashSet::new();
193
194        // Build reverse graph (dependents)
195        let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
196        for (node, deps) in &self.dependency_graph {
197            for dep in deps {
198                dependents.entry(dep.clone()).or_default().push(node.clone());
199            }
200        }
201
202        // Get all tracked nodes
203        let all_nodes: Vec<String> = self.list();
204
205        // Visit each node (prioritize nodes with dependents)
206        for node in &all_nodes {
207            if !visited.contains(node) {
208                visit_for_shutdown(node, &dependents, &mut visited, &mut order);
209            }
210        }
211
212        order
213    }
214
215    /// Stop a specific process gracefully with timeout
216    ///
217    /// # Arguments
218    ///
219    /// * `name` - Name of the process to stop
220    /// * `timeout` - How long to wait for graceful shutdown before force killing
221    ///
222    /// # Returns
223    ///
224    /// Ok(()) if stopped successfully (graceful or force), Err if not found or error
225    pub fn stop_graceful(&mut self, name: &str, timeout: Duration) -> Result<()> {
226        use std::thread;
227        use std::time::Instant;
228
229        if let Some(mut child) = self.remove(name) {
230            let pid = child.id();
231            debug!("Stopping process: {} (PID: {})", name, pid);
232
233            // Try graceful shutdown (SIGTERM on Unix)
234            #[cfg(unix)]
235            {
236                use std::process::Command;
237                // Send SIGTERM to the process group (negative PID kills the group)
238                // This ensures we kill the node-runner AND all its child nodes
239                let _ = Command::new("kill").arg("--").arg(format!("-{}", pid)).output();
240
241                // Also send to the process itself as fallback
242                let _ = Command::new("kill").arg(pid.to_string()).output();
243            }
244
245            #[cfg(not(unix))]
246            {
247                // Windows: terminate immediately
248                child.kill()?;
249            }
250
251            // Wait with timeout
252            let start = Instant::now();
253            loop {
254                match child.try_wait() {
255                    Ok(Some(_)) => {
256                        // Process exited gracefully
257                        info!("Process {} stopped gracefully", name);
258                        return Ok(());
259                    }
260                    Ok(None) => {
261                        // Still running
262                        if start.elapsed() >= timeout {
263                            // Timeout - force kill the process group
264                            warn!("Process {} did not stop within {:?}, force killing", name, timeout);
265
266                            #[cfg(unix)]
267                            {
268                                use std::process::Command;
269                                // Force kill the entire process group
270                                let _ = Command::new("kill")
271                                    .arg("-9")
272                                    .arg("--")
273                                    .arg(format!("-{}", pid))
274                                    .output();
275                            }
276
277                            child.kill()?;
278                            let _ = child.wait();
279                            return Ok(());
280                        }
281                        thread::sleep(Duration::from_millis(100));
282                    }
283                    Err(e) => {
284                        // Error checking status - try to kill anyway
285                        error!("Error checking process {} status: {}", name, e);
286
287                        #[cfg(unix)]
288                        {
289                            use std::process::Command;
290                            let _ = Command::new("kill")
291                                .arg("-9")
292                                .arg("--")
293                                .arg(format!("-{}", pid))
294                                .output();
295                        }
296
297                        child.kill()?;
298                        let _ = child.wait();
299                        return Ok(());
300                    }
301                }
302            }
303        } else {
304            Err(anyhow::anyhow!("Process '{}' not found", name))
305        }
306    }
307
308    /// Force kill a process immediately
309    pub fn force_kill(&mut self, name: &str) -> Result<()> {
310        if let Some(mut child) = self.remove(name) {
311            let pid = child.id();
312
313            #[cfg(unix)]
314            {
315                use std::process::Command;
316                // Force kill the entire process group
317                let _ = Command::new("kill")
318                    .arg("-9")
319                    .arg("--")
320                    .arg(format!("-{}", pid))
321                    .output();
322            }
323
324            child.kill()?;
325            let _ = child.wait();
326            info!("Process {} force killed", name);
327            Ok(())
328        } else {
329            Err(anyhow::anyhow!("Process '{}' not found", name))
330        }
331    }
332
333    /// Stop all processes in dependency order with graceful shutdown
334    pub fn shutdown_all(&mut self) {
335        if self.is_empty() {
336            debug!("No processes to shut down");
337            return;
338        }
339
340        info!("Shutting down all processes in dependency order");
341
342        // Trigger shutdown signal if available
343        if let Some(ref shutdown_handle) = self.shutdown_handle {
344            shutdown_handle.shutdown();
345        }
346
347        let shutdown_order = self.shutdown_order();
348
349        for node in shutdown_order {
350            info!("Stopping {} with {:?} timeout", node, self.shutdown_timeout);
351
352            if let Err(e) = self.stop_graceful(&node, self.shutdown_timeout) {
353                warn!("Failed to stop {}: {}, attempting force kill", node, e);
354                if let Err(kill_err) = self.force_kill(&node) {
355                    error!("Failed to force kill {}: {}", node, kill_err);
356                }
357            }
358        }
359
360        info!("All processes stopped");
361    }
362
363    /// Check if a specific process is running
364    pub fn is_running(&mut self, name: &str) -> bool {
365        if let Some(child) = self.processes.get_mut(name) {
366            matches!(child.try_wait(), Ok(None))
367        } else {
368            false
369        }
370    }
371}
372
373impl Default for ProcessManager {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379impl Drop for ProcessManager {
380    fn drop(&mut self) {
381        // Ensure all processes are cleaned up
382        self.shutdown_all();
383    }
384}
385
386/// Recursively visit nodes for shutdown ordering
387///
388/// Helper function for topological sort (DFS-based)
389fn visit_for_shutdown(
390    node: &str,
391    dependents: &HashMap<String, Vec<String>>,
392    visited: &mut std::collections::HashSet<String>,
393    order: &mut Vec<String>,
394) {
395    visited.insert(node.to_string());
396
397    // Visit dependents first (they must be stopped before this node)
398    if let Some(deps) = dependents.get(node) {
399        for dep in deps {
400            if !visited.contains(dep) {
401                visit_for_shutdown(dep, dependents, visited, order);
402            }
403        }
404    }
405
406    // Add this node to the order
407    order.push(node.to_string());
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    #[test]
415    fn test_dependency_ordering() {
416        let mut pm = ProcessManager::new();
417
418        // Setup: planner -> motor, camera -> planner
419        pm.add_dependency("planner".to_string(), "motor".to_string());
420        pm.add_dependency("camera".to_string(), "planner".to_string());
421
422        // Simulate tracking processes
423        pm.processes.insert("motor".to_string(), mock_child());
424        pm.processes.insert("planner".to_string(), mock_child());
425        pm.processes.insert("camera".to_string(), mock_child());
426
427        let order = pm.shutdown_order();
428
429        // Camera should stop before planner, planner before motor
430        let camera_idx = order.iter().position(|n| n == "camera").unwrap();
431        let planner_idx = order.iter().position(|n| n == "planner").unwrap();
432        let motor_idx = order.iter().position(|n| n == "motor").unwrap();
433
434        assert!(camera_idx < planner_idx);
435        assert!(planner_idx < motor_idx);
436    }
437
438    #[cfg(unix)]
439    fn mock_child() -> Child {
440        use std::process::{Command, Stdio};
441        Command::new("sleep")
442            .arg("1000")
443            .stdin(Stdio::null())
444            .stdout(Stdio::null())
445            .stderr(Stdio::null())
446            .spawn()
447            .unwrap()
448    }
449
450    #[cfg(not(unix))]
451    fn mock_child() -> Child {
452        use std::process::{Command, Stdio};
453        Command::new("timeout")
454            .arg("/t")
455            .arg("1000")
456            .stdin(Stdio::null())
457            .stdout(Stdio::null())
458            .stderr(Stdio::null())
459            .spawn()
460            .unwrap()
461    }
462}