1use crate::shutdown::ShutdownHandle;
7use anyhow::Result;
8use std::collections::HashMap;
9use std::process::Child;
10use std::time::Duration;
11use tracing::{debug, error, info, warn};
12
13#[derive(Debug, Clone)]
15pub struct ProcessInfo {
16 pub name: String,
18 pub pid: u32,
20 pub status: ProcessStatus,
22}
23
24#[derive(Debug, Clone, PartialEq)]
26pub enum ProcessStatus {
27 Running,
29 Exited(i32),
31 Error,
33}
34
35pub struct ProcessManager {
45 processes: HashMap<String, Child>,
47 dependency_graph: HashMap<String, Vec<String>>,
49 shutdown_handle: Option<ShutdownHandle>,
51 shutdown_timeout: Duration,
53}
54
55impl ProcessManager {
56 pub fn new() -> Self {
58 Self {
59 processes: HashMap::new(),
60 dependency_graph: HashMap::new(),
61 shutdown_handle: None,
62 shutdown_timeout: Duration::from_secs(10),
63 }
64 }
65
66 pub fn with_shutdown_handle(shutdown_handle: ShutdownHandle) -> Self {
68 Self {
69 processes: HashMap::new(),
70 dependency_graph: HashMap::new(),
71 shutdown_handle: Some(shutdown_handle),
72 shutdown_timeout: Duration::from_secs(10),
73 }
74 }
75
76 pub fn set_shutdown_timeout(&mut self, timeout: Duration) {
78 self.shutdown_timeout = timeout;
79 }
80
81 pub fn track(&mut self, name: String, child: Child) {
88 let pid = child.id();
89 debug!("Tracking process: {} (PID: {})", name, pid);
90 self.processes.insert(name, child);
91 }
92
93 pub fn remove(&mut self, name: &str) -> Option<Child> {
95 self.processes.remove(name)
96 }
97
98 pub fn add_dependency(&mut self, node: String, dependency: String) {
107 self.dependency_graph.entry(node).or_default().push(dependency);
108 }
109
110 pub fn list(&self) -> Vec<String> {
112 self.processes.keys().cloned().collect()
113 }
114
115 pub fn len(&self) -> usize {
117 self.processes.len()
118 }
119
120 pub fn is_empty(&self) -> bool {
122 self.processes.is_empty()
123 }
124
125 pub fn status_all(&mut self) -> HashMap<String, ProcessStatus> {
127 let mut status_map = HashMap::new();
128 let mut to_remove = Vec::new();
129
130 for (name, child) in self.processes.iter_mut() {
131 let status = match child.try_wait() {
132 Ok(Some(exit_status)) => {
133 to_remove.push(name.clone());
134 ProcessStatus::Exited(exit_status.code().unwrap_or(-1))
135 }
136 Ok(None) => ProcessStatus::Running,
137 Err(_) => {
138 to_remove.push(name.clone());
139 ProcessStatus::Error
140 }
141 };
142 status_map.insert(name.clone(), status);
143 }
144
145 for name in to_remove {
147 self.processes.remove(&name);
148 }
149
150 status_map
151 }
152
153 pub fn processes(&mut self) -> Vec<ProcessInfo> {
155 let mut processes = Vec::new();
156 let mut to_remove = Vec::new();
157
158 for (name, child) in self.processes.iter_mut() {
159 let pid = child.id();
160 let status = match child.try_wait() {
161 Ok(Some(exit_status)) => {
162 to_remove.push(name.clone());
163 ProcessStatus::Exited(exit_status.code().unwrap_or(-1))
164 }
165 Ok(None) => ProcessStatus::Running,
166 Err(_) => {
167 to_remove.push(name.clone());
168 ProcessStatus::Error
169 }
170 };
171
172 processes.push(ProcessInfo {
173 name: name.clone(),
174 pid,
175 status,
176 });
177 }
178
179 for name in to_remove {
181 self.processes.remove(&name);
182 }
183
184 processes
185 }
186
187 pub fn shutdown_order(&self) -> Vec<String> {
191 let mut order = Vec::new();
192 let mut visited = std::collections::HashSet::new();
193
194 let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
196 for (node, deps) in &self.dependency_graph {
197 for dep in deps {
198 dependents.entry(dep.clone()).or_default().push(node.clone());
199 }
200 }
201
202 let all_nodes: Vec<String> = self.list();
204
205 for node in &all_nodes {
207 if !visited.contains(node) {
208 visit_for_shutdown(node, &dependents, &mut visited, &mut order);
209 }
210 }
211
212 order
213 }
214
215 pub fn stop_graceful(&mut self, name: &str, timeout: Duration) -> Result<()> {
226 use std::thread;
227 use std::time::Instant;
228
229 if let Some(mut child) = self.remove(name) {
230 let pid = child.id();
231 debug!("Stopping process: {} (PID: {})", name, pid);
232
233 #[cfg(unix)]
235 {
236 use std::process::Command;
237 let _ = Command::new("kill").arg("--").arg(format!("-{}", pid)).output();
240
241 let _ = Command::new("kill").arg(pid.to_string()).output();
243 }
244
245 #[cfg(not(unix))]
246 {
247 child.kill()?;
249 }
250
251 let start = Instant::now();
253 loop {
254 match child.try_wait() {
255 Ok(Some(_)) => {
256 info!("Process {} stopped gracefully", name);
258 return Ok(());
259 }
260 Ok(None) => {
261 if start.elapsed() >= timeout {
263 warn!("Process {} did not stop within {:?}, force killing", name, timeout);
265
266 #[cfg(unix)]
267 {
268 use std::process::Command;
269 let _ = Command::new("kill")
271 .arg("-9")
272 .arg("--")
273 .arg(format!("-{}", pid))
274 .output();
275 }
276
277 child.kill()?;
278 let _ = child.wait();
279 return Ok(());
280 }
281 thread::sleep(Duration::from_millis(100));
282 }
283 Err(e) => {
284 error!("Error checking process {} status: {}", name, e);
286
287 #[cfg(unix)]
288 {
289 use std::process::Command;
290 let _ = Command::new("kill")
291 .arg("-9")
292 .arg("--")
293 .arg(format!("-{}", pid))
294 .output();
295 }
296
297 child.kill()?;
298 let _ = child.wait();
299 return Ok(());
300 }
301 }
302 }
303 } else {
304 Err(anyhow::anyhow!("Process '{}' not found", name))
305 }
306 }
307
308 pub fn force_kill(&mut self, name: &str) -> Result<()> {
310 if let Some(mut child) = self.remove(name) {
311 let pid = child.id();
312
313 #[cfg(unix)]
314 {
315 use std::process::Command;
316 let _ = Command::new("kill")
318 .arg("-9")
319 .arg("--")
320 .arg(format!("-{}", pid))
321 .output();
322 }
323
324 child.kill()?;
325 let _ = child.wait();
326 info!("Process {} force killed", name);
327 Ok(())
328 } else {
329 Err(anyhow::anyhow!("Process '{}' not found", name))
330 }
331 }
332
333 pub fn shutdown_all(&mut self) {
335 if self.is_empty() {
336 debug!("No processes to shut down");
337 return;
338 }
339
340 info!("Shutting down all processes in dependency order");
341
342 if let Some(ref shutdown_handle) = self.shutdown_handle {
344 shutdown_handle.shutdown();
345 }
346
347 let shutdown_order = self.shutdown_order();
348
349 for node in shutdown_order {
350 info!("Stopping {} with {:?} timeout", node, self.shutdown_timeout);
351
352 if let Err(e) = self.stop_graceful(&node, self.shutdown_timeout) {
353 warn!("Failed to stop {}: {}, attempting force kill", node, e);
354 if let Err(kill_err) = self.force_kill(&node) {
355 error!("Failed to force kill {}: {}", node, kill_err);
356 }
357 }
358 }
359
360 info!("All processes stopped");
361 }
362
363 pub fn is_running(&mut self, name: &str) -> bool {
365 if let Some(child) = self.processes.get_mut(name) {
366 matches!(child.try_wait(), Ok(None))
367 } else {
368 false
369 }
370 }
371}
372
373impl Default for ProcessManager {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379impl Drop for ProcessManager {
380 fn drop(&mut self) {
381 self.shutdown_all();
383 }
384}
385
386fn visit_for_shutdown(
390 node: &str,
391 dependents: &HashMap<String, Vec<String>>,
392 visited: &mut std::collections::HashSet<String>,
393 order: &mut Vec<String>,
394) {
395 visited.insert(node.to_string());
396
397 if let Some(deps) = dependents.get(node) {
399 for dep in deps {
400 if !visited.contains(dep) {
401 visit_for_shutdown(dep, dependents, visited, order);
402 }
403 }
404 }
405
406 order.push(node.to_string());
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 #[test]
415 fn test_dependency_ordering() {
416 let mut pm = ProcessManager::new();
417
418 pm.add_dependency("planner".to_string(), "motor".to_string());
420 pm.add_dependency("camera".to_string(), "planner".to_string());
421
422 pm.processes.insert("motor".to_string(), mock_child());
424 pm.processes.insert("planner".to_string(), mock_child());
425 pm.processes.insert("camera".to_string(), mock_child());
426
427 let order = pm.shutdown_order();
428
429 let camera_idx = order.iter().position(|n| n == "camera").unwrap();
431 let planner_idx = order.iter().position(|n| n == "planner").unwrap();
432 let motor_idx = order.iter().position(|n| n == "motor").unwrap();
433
434 assert!(camera_idx < planner_idx);
435 assert!(planner_idx < motor_idx);
436 }
437
438 #[cfg(unix)]
439 fn mock_child() -> Child {
440 use std::process::{Command, Stdio};
441 Command::new("sleep")
442 .arg("1000")
443 .stdin(Stdio::null())
444 .stdout(Stdio::null())
445 .stderr(Stdio::null())
446 .spawn()
447 .unwrap()
448 }
449
450 #[cfg(not(unix))]
451 fn mock_child() -> Child {
452 use std::process::{Command, Stdio};
453 Command::new("timeout")
454 .arg("/t")
455 .arg("1000")
456 .stdin(Stdio::null())
457 .stdout(Stdio::null())
458 .stderr(Stdio::null())
459 .spawn()
460 .unwrap()
461 }
462}