1use crate::bg_agent::CancelOutcome;
39use std::collections::HashMap;
40use std::sync::Mutex;
41use std::time::{Duration, Instant};
42use tokio::process::Child;
43
44#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum ProcessWaitOutcome {
53 Exited {
56 code: Option<i32>,
58 },
59 TimedOut(BgProcessSnapshot),
63 NotFound,
65 Forbidden,
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum BgProcessStatus {
73 Running,
75 Exited {
79 code: Option<i32>,
82 },
83 Killed,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct BgProcessSnapshot {
96 pub pid: u32,
100 pub command: String,
103 pub age: Duration,
107 pub status: BgProcessStatus,
109 pub spawner: Option<u32>,
113}
114
115struct BgEntry {
117 command: String,
119 child: Child,
122 started_at: Instant,
124 status: BgProcessStatus,
127 spawner: Option<u32>,
129}
130
131pub struct BgRegistry {
135 inner: Mutex<HashMap<u32, BgEntry>>,
136}
137
138impl BgRegistry {
139 pub fn new() -> Self {
141 Self {
142 inner: Mutex::new(HashMap::new()),
143 }
144 }
145
146 pub fn insert(&self, pid: u32, command: String, child: Child, spawner: Option<u32>) -> u32 {
149 self.inner.lock().unwrap().insert(
150 pid,
151 BgEntry {
152 command,
153 child,
154 started_at: Instant::now(),
155 status: BgProcessStatus::Running,
156 spawner,
157 },
158 );
159 pid
160 }
161
162 pub fn list(&self) -> Vec<(u32, String)> {
168 self.inner
169 .lock()
170 .unwrap()
171 .iter()
172 .map(|(pid, e)| (*pid, e.command.clone()))
173 .collect()
174 }
175
176 pub fn snapshot(&self) -> Vec<BgProcessSnapshot> {
183 let guard = self.inner.lock().unwrap();
184 let now = Instant::now();
185 let mut out: Vec<_> = guard
186 .iter()
187 .map(|(pid, e)| BgProcessSnapshot {
188 pid: *pid,
189 command: e.command.clone(),
190 age: now.saturating_duration_since(e.started_at),
191 status: e.status,
192 spawner: e.spawner,
193 })
194 .collect();
195 out.sort_by_key(|s| s.pid);
196 out
197 }
198
199 pub fn snapshot_for_caller(&self, caller_spawner: Option<u32>) -> Vec<BgProcessSnapshot> {
203 self.snapshot()
204 .into_iter()
205 .filter(|s| s.spawner == caller_spawner)
206 .collect()
207 }
208
209 pub fn len(&self) -> usize {
211 self.inner.lock().unwrap().len()
212 }
213
214 pub fn is_empty(&self) -> bool {
216 self.inner.lock().unwrap().is_empty()
217 }
218
219 pub fn reap(&self) {
227 let mut guard = self.inner.lock().unwrap();
228 for entry in guard.values_mut() {
229 if entry.status != BgProcessStatus::Running {
230 continue;
231 }
232 match entry.child.try_wait() {
233 Ok(Some(exit)) => {
234 entry.status = BgProcessStatus::Exited { code: exit.code() };
235 }
236 Ok(None) => { }
237 Err(e) => {
238 tracing::warn!(
241 "BgRegistry reap try_wait failed for PID {}: {e}",
242 entry.child.id().unwrap_or(0)
243 );
244 entry.status = BgProcessStatus::Exited { code: None };
245 }
246 }
247 }
248 }
249
250 pub fn kill(&self, pid: u32) -> bool {
259 let mut guard = self.inner.lock().unwrap();
260 let Some(entry) = guard.get_mut(&pid) else {
261 return false;
262 };
263 if entry.status == BgProcessStatus::Running {
264 if let Err(e) = entry.child.start_kill() {
265 tracing::warn!("BgRegistry::kill: failed to SIGTERM PID {pid}: {e}");
266 }
267 entry.status = BgProcessStatus::Killed;
268 }
269 true
270 }
271
272 pub fn kill_as_caller(&self, pid: u32, caller_spawner: Option<u32>) -> CancelOutcome {
275 let mut guard = self.inner.lock().unwrap();
276 let Some(entry) = guard.get_mut(&pid) else {
277 return CancelOutcome::NotFound;
278 };
279 if entry.spawner != caller_spawner {
280 return CancelOutcome::Forbidden;
281 }
282 if entry.status == BgProcessStatus::Running {
283 if let Err(e) = entry.child.start_kill() {
284 tracing::warn!("BgRegistry::kill_as_caller: SIGTERM PID {pid}: {e}");
285 }
286 entry.status = BgProcessStatus::Killed;
287 }
288 CancelOutcome::Cancelled
289 }
290
291 pub async fn wait_for_exit_as_caller(
304 &self,
305 pid: u32,
306 caller_spawner: Option<u32>,
307 timeout: Duration,
308 ) -> ProcessWaitOutcome {
309 const POLL_INTERVAL: Duration = Duration::from_millis(100);
310
311 {
314 let guard = self.inner.lock().unwrap();
315 match guard.get(&pid) {
316 None => return ProcessWaitOutcome::NotFound,
317 Some(e) if e.spawner != caller_spawner => return ProcessWaitOutcome::Forbidden,
318 Some(_) => {}
319 }
320 }
321
322 let deadline = Instant::now() + timeout;
323 loop {
324 self.reap();
325 {
326 let guard = self.inner.lock().unwrap();
327 let Some(entry) = guard.get(&pid) else {
328 return ProcessWaitOutcome::NotFound;
329 };
330 match entry.status {
331 BgProcessStatus::Running => {}
332 BgProcessStatus::Exited { code } => {
333 return ProcessWaitOutcome::Exited { code };
334 }
335 BgProcessStatus::Killed => {
336 return ProcessWaitOutcome::Exited { code: None };
337 }
338 }
339 }
340
341 if Instant::now() >= deadline {
342 let guard = self.inner.lock().unwrap();
343 let Some(entry) = guard.get(&pid) else {
344 return ProcessWaitOutcome::NotFound;
345 };
346 let now = Instant::now();
347 return ProcessWaitOutcome::TimedOut(BgProcessSnapshot {
348 pid,
349 command: entry.command.clone(),
350 age: now.saturating_duration_since(entry.started_at),
351 status: entry.status,
352 spawner: entry.spawner,
353 });
354 }
355
356 let remaining = deadline.saturating_duration_since(Instant::now());
357 tokio::time::sleep(POLL_INTERVAL.min(remaining)).await;
358 }
359 }
360
361 pub fn kill_for_spawner(&self, spawner: u32) -> usize {
365 let mut guard = self.inner.lock().unwrap();
366 let mut count = 0;
367 for entry in guard.values_mut() {
368 if entry.spawner != Some(spawner) {
369 continue;
370 }
371 if entry.status == BgProcessStatus::Running {
372 if let Err(e) = entry.child.start_kill() {
373 tracing::warn!(
374 "BgRegistry::kill_for_spawner: SIGTERM PID {}: {e}",
375 entry.child.id().unwrap_or(0)
376 );
377 }
378 entry.status = BgProcessStatus::Killed;
379 count += 1;
380 }
381 }
382 count
383 }
384}
385
386impl Default for BgRegistry {
387 fn default() -> Self {
388 Self::new()
389 }
390}
391
392impl Drop for BgRegistry {
393 fn drop(&mut self) {
396 let mut guard = self.inner.lock().unwrap();
397 for (pid, entry) in guard.iter_mut() {
398 if entry.status != BgProcessStatus::Running {
399 continue;
400 }
401 if let Err(e) = entry.child.start_kill() {
402 tracing::warn!("BgRegistry drop: failed to kill PID {pid}: {e}");
403 } else {
404 tracing::debug!("BgRegistry drop: sent SIGTERM to PID {pid}");
405 }
406 }
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 fn spawn_sleep_child() -> (u32, Child) {
415 let child = tokio::process::Command::new("sleep")
418 .arg("60")
419 .spawn()
420 .expect("spawn sleep");
421 let pid = child.id().expect("pid");
422 (pid, child)
423 }
424
425 fn spawn_true_child() -> (u32, Child) {
426 let child = tokio::process::Command::new("true").spawn().expect("spawn");
427 let pid = child.id().unwrap_or(99999);
428 (pid, child)
429 }
430
431 #[test]
432 fn registry_starts_empty() {
433 let reg = BgRegistry::new();
434 assert_eq!(reg.len(), 0);
435 assert!(reg.list().is_empty());
436 assert!(reg.snapshot().is_empty());
437 }
438
439 #[tokio::test]
440 async fn insert_records_spawner_and_appears_in_snapshot() {
441 let reg = BgRegistry::new();
442 let (pid, child) = spawn_sleep_child();
443 reg.insert(pid, "sleep 60".into(), child, Some(7));
444
445 let snap = reg.snapshot();
446 assert_eq!(snap.len(), 1);
447 assert_eq!(snap[0].pid, pid);
448 assert_eq!(snap[0].command, "sleep 60");
449 assert_eq!(snap[0].status, BgProcessStatus::Running);
450 assert_eq!(snap[0].spawner, Some(7));
451 }
452
453 #[tokio::test]
454 async fn snapshot_for_caller_filters_by_spawner() {
455 let reg = BgRegistry::new();
456 let (p1, c1) = spawn_sleep_child();
457 let (p2, c2) = spawn_sleep_child();
458 let (p3, c3) = spawn_sleep_child();
459 reg.insert(p1, "a".into(), c1, None);
460 reg.insert(p2, "b".into(), c2, Some(7));
461 reg.insert(p3, "c".into(), c3, Some(9));
462
463 let top = reg.snapshot_for_caller(None);
464 assert_eq!(top.len(), 1);
465 assert_eq!(top[0].pid, p1);
466
467 let sub_7 = reg.snapshot_for_caller(Some(7));
468 assert_eq!(sub_7.len(), 1);
469 assert_eq!(sub_7[0].pid, p2);
470
471 assert!(reg.snapshot_for_caller(Some(42)).is_empty());
473 }
474
475 #[tokio::test]
476 async fn reap_transitions_finished_children_to_exited() {
477 let reg = BgRegistry::new();
478 let (pid, child) = spawn_true_child();
479 reg.insert(pid, "true".into(), child, None);
480
481 let mut observed = None;
484 for _ in 0..50 {
485 tokio::time::sleep(Duration::from_millis(20)).await;
486 reg.reap();
487 let snap = reg.snapshot();
488 if let BgProcessStatus::Exited { code } = snap[0].status {
489 observed = Some(code);
490 break;
491 }
492 }
493 assert_eq!(
494 observed,
495 Some(Some(0)),
496 "reap should observe `true` exiting with code 0 within 1s"
497 );
498 }
499
500 #[tokio::test]
501 async fn kill_transitions_to_killed_and_returns_true() {
502 let reg = BgRegistry::new();
503 let (pid, child) = spawn_sleep_child();
504 reg.insert(pid, "sleep 60".into(), child, None);
505
506 assert!(reg.kill(pid));
507 assert_eq!(reg.snapshot()[0].status, BgProcessStatus::Killed);
508
509 assert!(!reg.kill(987654));
511 }
512
513 #[tokio::test]
514 async fn kill_as_caller_enforces_spawner_scope() {
515 let reg = BgRegistry::new();
516 let (pid, child) = spawn_sleep_child();
517 reg.insert(pid, "sleep 60".into(), child, Some(5));
518
519 assert_eq!(reg.kill_as_caller(pid, None), CancelOutcome::Forbidden);
521 assert_eq!(reg.kill_as_caller(pid, Some(99)), CancelOutcome::Forbidden);
522 assert_eq!(reg.snapshot()[0].status, BgProcessStatus::Running);
523
524 assert_eq!(reg.kill_as_caller(pid, Some(5)), CancelOutcome::Cancelled);
526 assert_eq!(reg.snapshot()[0].status, BgProcessStatus::Killed);
527
528 assert_eq!(reg.kill_as_caller(987654, None), CancelOutcome::NotFound);
530 }
531
532 #[tokio::test]
533 async fn wait_for_exit_returns_exited_when_child_finishes() {
534 let reg = BgRegistry::new();
535 let (pid, child) = spawn_true_child();
536 reg.insert(pid, "true".into(), child, None);
537
538 let outcome = reg
539 .wait_for_exit_as_caller(pid, None, Duration::from_secs(2))
540 .await;
541 assert_eq!(outcome, ProcessWaitOutcome::Exited { code: Some(0) });
542 }
543
544 #[tokio::test]
545 async fn wait_for_exit_returns_exited_when_already_killed() {
546 let reg = BgRegistry::new();
547 let (pid, child) = spawn_sleep_child();
548 reg.insert(pid, "sleep 60".into(), child, Some(7));
549 reg.kill(pid); let outcome = reg
552 .wait_for_exit_as_caller(pid, Some(7), Duration::from_secs(1))
553 .await;
554 assert_eq!(outcome, ProcessWaitOutcome::Exited { code: None });
555 }
556
557 #[tokio::test]
558 async fn wait_for_exit_returns_timed_out_with_snapshot() {
559 let reg = BgRegistry::new();
560 let (pid, child) = spawn_sleep_child();
561 reg.insert(pid, "sleep 60".into(), child, None);
562
563 let outcome = reg
564 .wait_for_exit_as_caller(pid, None, Duration::from_millis(150))
565 .await;
566 match outcome {
567 ProcessWaitOutcome::TimedOut(snap) => {
568 assert_eq!(snap.pid, pid);
569 assert_eq!(snap.status, BgProcessStatus::Running);
570 assert_eq!(snap.spawner, None);
571 }
572 other => panic!("expected TimedOut, got {other:?}"),
573 }
574 assert_eq!(
575 reg.snapshot().len(),
576 1,
577 "entry must be preserved on timeout"
578 );
579 }
580
581 #[tokio::test]
582 async fn wait_for_exit_enforces_spawner_scope() {
583 let reg = BgRegistry::new();
584 let (pid, child) = spawn_sleep_child();
585 reg.insert(pid, "sleep 60".into(), child, Some(5));
586
587 assert_eq!(
588 reg.wait_for_exit_as_caller(pid, None, Duration::from_millis(20))
589 .await,
590 ProcessWaitOutcome::Forbidden
591 );
592 assert_eq!(
593 reg.wait_for_exit_as_caller(pid, Some(99), Duration::from_millis(20))
594 .await,
595 ProcessWaitOutcome::Forbidden
596 );
597 }
598
599 #[tokio::test]
600 async fn wait_for_exit_returns_not_found_for_unknown_pid() {
601 let reg = BgRegistry::new();
602 assert_eq!(
603 reg.wait_for_exit_as_caller(987654, None, Duration::from_millis(10))
604 .await,
605 ProcessWaitOutcome::NotFound
606 );
607 }
608
609 #[tokio::test]
610 async fn kill_for_spawner_kills_only_matching_running_children() {
611 let reg = BgRegistry::new();
612 let (p_top, c_top) = spawn_sleep_child();
613 let (p_a, c_a) = spawn_sleep_child();
614 let (p_b, c_b) = spawn_sleep_child();
615 reg.insert(p_top, "top".into(), c_top, None);
616 reg.insert(p_a, "a".into(), c_a, Some(7));
617 reg.insert(p_b, "b".into(), c_b, Some(9));
618
619 let count = reg.kill_for_spawner(7);
620 assert_eq!(count, 1);
621
622 let by_pid: HashMap<u32, BgProcessStatus> = reg
623 .snapshot()
624 .into_iter()
625 .map(|s| (s.pid, s.status))
626 .collect();
627 assert_eq!(by_pid[&p_top], BgProcessStatus::Running);
628 assert_eq!(by_pid[&p_a], BgProcessStatus::Killed);
629 assert_eq!(by_pid[&p_b], BgProcessStatus::Running);
630
631 assert_eq!(reg.kill_for_spawner(7), 0);
633 assert_eq!(reg.kill_for_spawner(99), 0);
635 }
636}