Skip to main content

koda_core/tools/
bg_process.rs

1//! Background process registry.
2//!
3//! Tracks processes spawned by `Bash { background: true }` so they can be
4//! listed, waited on, killed, and cleaned up (SIGTERM) when the session ends
5//! or the spawning sub-agent exits (Model E, see #996).
6//!
7//! ## Usage
8//!
9//! ```text
10//! Model calls: Bash { command: "npm run dev", background: true }
11//!   → Process spawned, PID + spawner recorded in BgRegistry
12//!   → Tool returns immediately: "Started PID 12345"
13//!   → Model continues with other work
14//!   → On session end: all tracked PIDs receive SIGTERM
15//!   → On spawning sub-agent exit: kill_for_spawner reaps that
16//!     sub-agent's processes (Model E cleanup-on-exit)
17//! ```
18//!
19//! ## Status lifecycle
20//!
21//! Each tracked process has a [`BgProcessStatus`] that transitions
22//! `Running` → `Exited { code }` (natural exit) or `Running` → `Killed`
23//! (we sent SIGTERM). [`BgRegistry::reap`] is the sole writer of the
24//! `Exited` transition; it calls `try_wait` on every still-running child
25//! and updates status without blocking. The TUI / LLM tool layers call
26//! `reap()` before producing a snapshot so the displayed status is fresh.
27//!
28//! Entries persist past terminal status so the LLM can observe the exit
29//! code via `ListBackgroundTasks` / `WaitTask`. Manual purge is the
30//! caller's job for now (sessions are short; auto-purge can come later).
31//!
32//! ## Design
33//!
34//! Each `ToolRegistry` owns one `BgRegistry`. All background processes for
35//! the session are keyed by PID. The registry is `Mutex`-protected since
36//! the spawning thread, the reaper, and the cleanup path all touch it.
37
38use crate::bg_agent::CancelOutcome;
39use std::collections::HashMap;
40use std::sync::Mutex;
41use std::time::{Duration, Instant};
42use tokio::process::Child;
43
44/// Outcome of [`BgRegistry::wait_for_exit_as_caller`].
45///
46/// Mirrors [`crate::bg_agent::WaitOutcome`] but carries process-specific
47/// terminal info (exit code) instead of an agent result. The two enums
48/// stay separate because they really are different things — forcing one
49/// to wear the other's shape would mean optionalizing fields that aren't
50/// optional in their natural domain.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum ProcessWaitOutcome {
53    /// Process has exited — either naturally or as a result of an
54    /// earlier `kill`. `code` is the OS exit code if reported.
55    Exited {
56        /// Same semantics as [`BgProcessStatus::Exited::code`].
57        code: Option<i32>,
58    },
59    /// Wait deadline elapsed; the process is still running. The
60    /// returned snapshot reflects the latest state at the moment the
61    /// timeout fired (e.g. age has advanced).
62    TimedOut(BgProcessSnapshot),
63    /// PID is not in the registry (never tracked, or already removed).
64    NotFound,
65    /// PID exists but caller's `spawner` does not match. Model E
66    /// permission rule.
67    Forbidden,
68}
69
70/// Lifecycle of a single tracked background process.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum BgProcessStatus {
73    /// Child process is still alive (last `try_wait` returned `Ok(None)`).
74    Running,
75    /// Child has exited naturally. `code` is the OS exit code if the
76    /// platform reported one (POSIX always does for normal exits;
77    /// signal-killed processes report `None` on most platforms).
78    Exited {
79        /// Process exit code as reported by the OS, or `None` for
80        /// signal-killed (POSIX returns no code in that case).
81        code: Option<i32>,
82    },
83    /// We sent SIGTERM via [`BgRegistry::kill`] / [`BgRegistry::kill_as_caller`].
84    /// The child may still be alive briefly; the reaper transitions it
85    /// to `Exited` once it's actually gone.
86    Killed,
87}
88
89/// Snapshot of a tracked background process — what `/agents` (combined
90/// view, see #996 Layer 2) and the `ListBackgroundTasks` LLM tool render.
91///
92/// Cloned out of the registry under the lock so callers can format /
93/// display without holding it.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct BgProcessSnapshot {
96    /// OS process id. Stable for the process's lifetime; reused by
97    /// the kernel after it exits (so don't compare snapshots across
98    /// long pauses).
99    pub pid: u32,
100    /// The original shell command string. Surfaced verbatim by the
101    /// TUI; truncation is the renderer's job.
102    pub command: String,
103    /// Wall-clock duration since insert. Computed at snapshot time,
104    /// so successive snapshots of the same process report different
105    /// ages.
106    pub age: Duration,
107    /// Latest known status (set by [`BgRegistry::reap`] / `kill`).
108    pub status: BgProcessStatus,
109    /// Sub-agent invocation id of the spawner, or `None` for the
110    /// top-level inference loop. Drives [`BgRegistry::kill_for_spawner`]
111    /// (Model E cleanup-on-exit) and the LLM scope-filter.
112    pub spawner: Option<u32>,
113}
114
115/// Metadata stored alongside the child handle.
116struct BgEntry {
117    /// The original shell command string.
118    command: String,
119    /// The spawned child process handle (used for `start_kill` on
120    /// kill-paths and `try_wait` on reap).
121    child: Child,
122    /// When the entry was inserted. Drives `age` in snapshots.
123    started_at: Instant,
124    /// Current status. Only [`BgRegistry::reap`] and the kill-paths
125    /// transition this away from `Running`.
126    status: BgProcessStatus,
127    /// Sub-agent that spawned this process. `None` = top-level.
128    spawner: Option<u32>,
129}
130
131/// Registry of running background processes, scoped to one session.
132///
133/// Drop kills all remaining processes (SIGTERM).
134pub struct BgRegistry {
135    inner: Mutex<HashMap<u32, BgEntry>>,
136}
137
138impl BgRegistry {
139    /// Create an empty registry.
140    pub fn new() -> Self {
141        Self {
142            inner: Mutex::new(HashMap::new()),
143        }
144    }
145
146    /// Register a spawned child. `spawner` is the sub-agent invocation
147    /// id (`None` for top-level). Returns the PID.
148    pub fn insert(&self, pid: u32, command: String, child: Child, spawner: Option<u32>) -> u32 {
149        self.inner.lock().unwrap().insert(
150            pid,
151            BgEntry {
152                command,
153                child,
154                started_at: Instant::now(),
155                status: BgProcessStatus::Running,
156                spawner,
157            },
158        );
159        pid
160    }
161
162    /// Return a snapshot of running PIDs + commands for display.
163    ///
164    /// **Legacy path** kept for the TUI's existing `/agents` rendering
165    /// (#1042). New code should prefer [`Self::snapshot`] which carries
166    /// status, age, and spawner.
167    pub fn list(&self) -> Vec<(u32, String)> {
168        self.inner
169            .lock()
170            .unwrap()
171            .iter()
172            .map(|(pid, e)| (*pid, e.command.clone()))
173            .collect()
174    }
175
176    /// Snapshot every tracked process for `/agents` and the
177    /// `ListBackgroundTasks` LLM tool. Sorted by ascending PID.
178    ///
179    /// **Unscoped**: returns every entry regardless of spawner. Used by
180    /// the TUI (humans get the global view) and as the engine of
181    /// [`Self::snapshot_for_caller`].
182    pub fn snapshot(&self) -> Vec<BgProcessSnapshot> {
183        let guard = self.inner.lock().unwrap();
184        let now = Instant::now();
185        let mut out: Vec<_> = guard
186            .iter()
187            .map(|(pid, e)| BgProcessSnapshot {
188                pid: *pid,
189                command: e.command.clone(),
190                age: now.saturating_duration_since(e.started_at),
191                status: e.status,
192                spawner: e.spawner,
193            })
194            .collect();
195        out.sort_by_key(|s| s.pid);
196        out
197    }
198
199    /// Scoped snapshot for the `ListBackgroundTasks` LLM tool. Same
200    /// Model E rule as [`crate::bg_agent::BgAgentRegistry::snapshot_for_caller`]:
201    /// strict spawner equality, `None == None`.
202    pub fn snapshot_for_caller(&self, caller_spawner: Option<u32>) -> Vec<BgProcessSnapshot> {
203        self.snapshot()
204            .into_iter()
205            .filter(|s| s.spawner == caller_spawner)
206            .collect()
207    }
208
209    /// How many processes are currently tracked (any status).
210    pub fn len(&self) -> usize {
211        self.inner.lock().unwrap().len()
212    }
213
214    /// Returns `true` if no background processes are tracked.
215    pub fn is_empty(&self) -> bool {
216        self.inner.lock().unwrap().is_empty()
217    }
218
219    /// Non-blocking poll on every still-`Running` child. Transitions
220    /// any that have exited to `Exited { code }` so subsequent
221    /// snapshots see the fresh status.
222    ///
223    /// Cheap: zero syscalls if the registry is empty; one `waitpid`
224    /// per running child otherwise. Safe to call before every
225    /// `snapshot()` (the TUI does so via the slash command path).
226    pub fn reap(&self) {
227        let mut guard = self.inner.lock().unwrap();
228        for entry in guard.values_mut() {
229            if entry.status != BgProcessStatus::Running {
230                continue;
231            }
232            match entry.child.try_wait() {
233                Ok(Some(exit)) => {
234                    entry.status = BgProcessStatus::Exited { code: exit.code() };
235                }
236                Ok(None) => { /* still running */ }
237                Err(e) => {
238                    // try_wait can fail if the OS lost track (rare).
239                    // Log + treat as terminal so we don't spin.
240                    tracing::warn!(
241                        "BgRegistry reap try_wait failed for PID {}: {e}",
242                        entry.child.id().unwrap_or(0)
243                    );
244                    entry.status = BgProcessStatus::Exited { code: None };
245                }
246            }
247        }
248    }
249
250    /// Send SIGTERM to a tracked PID. Returns `true` if the PID was
251    /// known (whether or not the kill succeeded — the underlying error
252    /// is logged).
253    ///
254    /// Status flips to `Killed` immediately; the reaper will surface
255    /// the eventual `Exited { code }` once the child is reaped by the
256    /// kernel. **Unscoped** — TUI `/cancel` contract; LLM goes through
257    /// [`Self::kill_as_caller`].
258    pub fn kill(&self, pid: u32) -> bool {
259        let mut guard = self.inner.lock().unwrap();
260        let Some(entry) = guard.get_mut(&pid) else {
261            return false;
262        };
263        if entry.status == BgProcessStatus::Running {
264            if let Err(e) = entry.child.start_kill() {
265                tracing::warn!("BgRegistry::kill: failed to SIGTERM PID {pid}: {e}");
266            }
267            entry.status = BgProcessStatus::Killed;
268        }
269        true
270    }
271
272    /// Scoped kill for the `CancelTask` LLM tool. Same Model E rule
273    /// as [`crate::bg_agent::BgAgentRegistry::cancel_as_caller`].
274    pub fn kill_as_caller(&self, pid: u32, caller_spawner: Option<u32>) -> CancelOutcome {
275        let mut guard = self.inner.lock().unwrap();
276        let Some(entry) = guard.get_mut(&pid) else {
277            return CancelOutcome::NotFound;
278        };
279        if entry.spawner != caller_spawner {
280            return CancelOutcome::Forbidden;
281        }
282        if entry.status == BgProcessStatus::Running {
283            if let Err(e) = entry.child.start_kill() {
284                tracing::warn!("BgRegistry::kill_as_caller: SIGTERM PID {pid}: {e}");
285            }
286            entry.status = BgProcessStatus::Killed;
287        }
288        CancelOutcome::Cancelled
289    }
290
291    /// Block until a tracked process exits, with a timeout. Same
292    /// Model E permission rule as [`Self::kill_as_caller`].
293    ///
294    /// Implementation: poll-based. Calls [`Self::reap`] every
295    /// `POLL_INTERVAL`; cheap because reap is a `try_wait` per
296    /// running child. Once status leaves `Running`, returns
297    /// [`ProcessWaitOutcome::Exited`] (mapping `Killed` → `Exited`
298    /// with `code: None` if the OS hasn't reported the exit yet —
299    /// the model only cares that the process is gone).
300    ///
301    /// On timeout, leaves the entry in the registry so it can still
302    /// be queried via [`Self::snapshot`] / re-waited.
303    pub async fn wait_for_exit_as_caller(
304        &self,
305        pid: u32,
306        caller_spawner: Option<u32>,
307        timeout: Duration,
308    ) -> ProcessWaitOutcome {
309        const POLL_INTERVAL: Duration = Duration::from_millis(100);
310
311        // Sanity-check: known + owned. Done up-front so we can fail
312        // fast on the common error cases without spinning.
313        {
314            let guard = self.inner.lock().unwrap();
315            match guard.get(&pid) {
316                None => return ProcessWaitOutcome::NotFound,
317                Some(e) if e.spawner != caller_spawner => return ProcessWaitOutcome::Forbidden,
318                Some(_) => {}
319            }
320        }
321
322        let deadline = Instant::now() + timeout;
323        loop {
324            self.reap();
325            {
326                let guard = self.inner.lock().unwrap();
327                let Some(entry) = guard.get(&pid) else {
328                    return ProcessWaitOutcome::NotFound;
329                };
330                match entry.status {
331                    BgProcessStatus::Running => {}
332                    BgProcessStatus::Exited { code } => {
333                        return ProcessWaitOutcome::Exited { code };
334                    }
335                    BgProcessStatus::Killed => {
336                        return ProcessWaitOutcome::Exited { code: None };
337                    }
338                }
339            }
340
341            if Instant::now() >= deadline {
342                let guard = self.inner.lock().unwrap();
343                let Some(entry) = guard.get(&pid) else {
344                    return ProcessWaitOutcome::NotFound;
345                };
346                let now = Instant::now();
347                return ProcessWaitOutcome::TimedOut(BgProcessSnapshot {
348                    pid,
349                    command: entry.command.clone(),
350                    age: now.saturating_duration_since(entry.started_at),
351                    status: entry.status,
352                    spawner: entry.spawner,
353                });
354            }
355
356            let remaining = deadline.saturating_duration_since(Instant::now());
357            tokio::time::sleep(POLL_INTERVAL.min(remaining)).await;
358        }
359    }
360
361    /// SIGTERM every still-running child whose `spawner` matches.
362    /// Cleanup-on-exit hook for sub-agent exit (Model E). Returns
363    /// the count of processes signalled. Idempotent.
364    pub fn kill_for_spawner(&self, spawner: u32) -> usize {
365        let mut guard = self.inner.lock().unwrap();
366        let mut count = 0;
367        for entry in guard.values_mut() {
368            if entry.spawner != Some(spawner) {
369                continue;
370            }
371            if entry.status == BgProcessStatus::Running {
372                if let Err(e) = entry.child.start_kill() {
373                    tracing::warn!(
374                        "BgRegistry::kill_for_spawner: SIGTERM PID {}: {e}",
375                        entry.child.id().unwrap_or(0)
376                    );
377                }
378                entry.status = BgProcessStatus::Killed;
379                count += 1;
380            }
381        }
382        count
383    }
384}
385
386impl Default for BgRegistry {
387    fn default() -> Self {
388        Self::new()
389    }
390}
391
392impl Drop for BgRegistry {
393    /// Best-effort SIGTERM all still-running tracked processes when
394    /// the session ends.
395    fn drop(&mut self) {
396        let mut guard = self.inner.lock().unwrap();
397        for (pid, entry) in guard.iter_mut() {
398            if entry.status != BgProcessStatus::Running {
399                continue;
400            }
401            if let Err(e) = entry.child.start_kill() {
402                tracing::warn!("BgRegistry drop: failed to kill PID {pid}: {e}");
403            } else {
404                tracing::debug!("BgRegistry drop: sent SIGTERM to PID {pid}");
405            }
406        }
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    fn spawn_sleep_child() -> (u32, Child) {
415        // 60s sleep gives every test plenty of headroom; we either kill
416        // it explicitly or let Drop SIGTERM it.
417        let child = tokio::process::Command::new("sleep")
418            .arg("60")
419            .spawn()
420            .expect("spawn sleep");
421        let pid = child.id().expect("pid");
422        (pid, child)
423    }
424
425    fn spawn_true_child() -> (u32, Child) {
426        let child = tokio::process::Command::new("true").spawn().expect("spawn");
427        let pid = child.id().unwrap_or(99999);
428        (pid, child)
429    }
430
431    #[test]
432    fn registry_starts_empty() {
433        let reg = BgRegistry::new();
434        assert_eq!(reg.len(), 0);
435        assert!(reg.list().is_empty());
436        assert!(reg.snapshot().is_empty());
437    }
438
439    #[tokio::test]
440    async fn insert_records_spawner_and_appears_in_snapshot() {
441        let reg = BgRegistry::new();
442        let (pid, child) = spawn_sleep_child();
443        reg.insert(pid, "sleep 60".into(), child, Some(7));
444
445        let snap = reg.snapshot();
446        assert_eq!(snap.len(), 1);
447        assert_eq!(snap[0].pid, pid);
448        assert_eq!(snap[0].command, "sleep 60");
449        assert_eq!(snap[0].status, BgProcessStatus::Running);
450        assert_eq!(snap[0].spawner, Some(7));
451    }
452
453    #[tokio::test]
454    async fn snapshot_for_caller_filters_by_spawner() {
455        let reg = BgRegistry::new();
456        let (p1, c1) = spawn_sleep_child();
457        let (p2, c2) = spawn_sleep_child();
458        let (p3, c3) = spawn_sleep_child();
459        reg.insert(p1, "a".into(), c1, None);
460        reg.insert(p2, "b".into(), c2, Some(7));
461        reg.insert(p3, "c".into(), c3, Some(9));
462
463        let top = reg.snapshot_for_caller(None);
464        assert_eq!(top.len(), 1);
465        assert_eq!(top[0].pid, p1);
466
467        let sub_7 = reg.snapshot_for_caller(Some(7));
468        assert_eq!(sub_7.len(), 1);
469        assert_eq!(sub_7[0].pid, p2);
470
471        // Sibling sees nothing of peer's.
472        assert!(reg.snapshot_for_caller(Some(42)).is_empty());
473    }
474
475    #[tokio::test]
476    async fn reap_transitions_finished_children_to_exited() {
477        let reg = BgRegistry::new();
478        let (pid, child) = spawn_true_child();
479        reg.insert(pid, "true".into(), child, None);
480
481        // `true` exits ~immediately, but tokio's try_wait needs time
482        // for the SIGCHLD handler to register the exit. Poll up to 1s.
483        let mut observed = None;
484        for _ in 0..50 {
485            tokio::time::sleep(Duration::from_millis(20)).await;
486            reg.reap();
487            let snap = reg.snapshot();
488            if let BgProcessStatus::Exited { code } = snap[0].status {
489                observed = Some(code);
490                break;
491            }
492        }
493        assert_eq!(
494            observed,
495            Some(Some(0)),
496            "reap should observe `true` exiting with code 0 within 1s"
497        );
498    }
499
500    #[tokio::test]
501    async fn kill_transitions_to_killed_and_returns_true() {
502        let reg = BgRegistry::new();
503        let (pid, child) = spawn_sleep_child();
504        reg.insert(pid, "sleep 60".into(), child, None);
505
506        assert!(reg.kill(pid));
507        assert_eq!(reg.snapshot()[0].status, BgProcessStatus::Killed);
508
509        // Unknown PID → false.
510        assert!(!reg.kill(987654));
511    }
512
513    #[tokio::test]
514    async fn kill_as_caller_enforces_spawner_scope() {
515        let reg = BgRegistry::new();
516        let (pid, child) = spawn_sleep_child();
517        reg.insert(pid, "sleep 60".into(), child, Some(5));
518
519        // Wrong caller(s).
520        assert_eq!(reg.kill_as_caller(pid, None), CancelOutcome::Forbidden);
521        assert_eq!(reg.kill_as_caller(pid, Some(99)), CancelOutcome::Forbidden);
522        assert_eq!(reg.snapshot()[0].status, BgProcessStatus::Running);
523
524        // Correct caller.
525        assert_eq!(reg.kill_as_caller(pid, Some(5)), CancelOutcome::Cancelled);
526        assert_eq!(reg.snapshot()[0].status, BgProcessStatus::Killed);
527
528        // Unknown PID for any caller → NotFound.
529        assert_eq!(reg.kill_as_caller(987654, None), CancelOutcome::NotFound);
530    }
531
532    #[tokio::test]
533    async fn wait_for_exit_returns_exited_when_child_finishes() {
534        let reg = BgRegistry::new();
535        let (pid, child) = spawn_true_child();
536        reg.insert(pid, "true".into(), child, None);
537
538        let outcome = reg
539            .wait_for_exit_as_caller(pid, None, Duration::from_secs(2))
540            .await;
541        assert_eq!(outcome, ProcessWaitOutcome::Exited { code: Some(0) });
542    }
543
544    #[tokio::test]
545    async fn wait_for_exit_returns_exited_when_already_killed() {
546        let reg = BgRegistry::new();
547        let (pid, child) = spawn_sleep_child();
548        reg.insert(pid, "sleep 60".into(), child, Some(7));
549        reg.kill(pid); // status → Killed
550
551        let outcome = reg
552            .wait_for_exit_as_caller(pid, Some(7), Duration::from_secs(1))
553            .await;
554        assert_eq!(outcome, ProcessWaitOutcome::Exited { code: None });
555    }
556
557    #[tokio::test]
558    async fn wait_for_exit_returns_timed_out_with_snapshot() {
559        let reg = BgRegistry::new();
560        let (pid, child) = spawn_sleep_child();
561        reg.insert(pid, "sleep 60".into(), child, None);
562
563        let outcome = reg
564            .wait_for_exit_as_caller(pid, None, Duration::from_millis(150))
565            .await;
566        match outcome {
567            ProcessWaitOutcome::TimedOut(snap) => {
568                assert_eq!(snap.pid, pid);
569                assert_eq!(snap.status, BgProcessStatus::Running);
570                assert_eq!(snap.spawner, None);
571            }
572            other => panic!("expected TimedOut, got {other:?}"),
573        }
574        assert_eq!(
575            reg.snapshot().len(),
576            1,
577            "entry must be preserved on timeout"
578        );
579    }
580
581    #[tokio::test]
582    async fn wait_for_exit_enforces_spawner_scope() {
583        let reg = BgRegistry::new();
584        let (pid, child) = spawn_sleep_child();
585        reg.insert(pid, "sleep 60".into(), child, Some(5));
586
587        assert_eq!(
588            reg.wait_for_exit_as_caller(pid, None, Duration::from_millis(20))
589                .await,
590            ProcessWaitOutcome::Forbidden
591        );
592        assert_eq!(
593            reg.wait_for_exit_as_caller(pid, Some(99), Duration::from_millis(20))
594                .await,
595            ProcessWaitOutcome::Forbidden
596        );
597    }
598
599    #[tokio::test]
600    async fn wait_for_exit_returns_not_found_for_unknown_pid() {
601        let reg = BgRegistry::new();
602        assert_eq!(
603            reg.wait_for_exit_as_caller(987654, None, Duration::from_millis(10))
604                .await,
605            ProcessWaitOutcome::NotFound
606        );
607    }
608
609    #[tokio::test]
610    async fn kill_for_spawner_kills_only_matching_running_children() {
611        let reg = BgRegistry::new();
612        let (p_top, c_top) = spawn_sleep_child();
613        let (p_a, c_a) = spawn_sleep_child();
614        let (p_b, c_b) = spawn_sleep_child();
615        reg.insert(p_top, "top".into(), c_top, None);
616        reg.insert(p_a, "a".into(), c_a, Some(7));
617        reg.insert(p_b, "b".into(), c_b, Some(9));
618
619        let count = reg.kill_for_spawner(7);
620        assert_eq!(count, 1);
621
622        let by_pid: HashMap<u32, BgProcessStatus> = reg
623            .snapshot()
624            .into_iter()
625            .map(|s| (s.pid, s.status))
626            .collect();
627        assert_eq!(by_pid[&p_top], BgProcessStatus::Running);
628        assert_eq!(by_pid[&p_a], BgProcessStatus::Killed);
629        assert_eq!(by_pid[&p_b], BgProcessStatus::Running);
630
631        // Idempotent — second call won't re-kill the now-Killed child.
632        assert_eq!(reg.kill_for_spawner(7), 0);
633        // Unknown spawner → 0.
634        assert_eq!(reg.kill_for_spawner(99), 0);
635    }
636}