Skip to main content

koda_core/
bg_agent.rs

1//! Background sub-agent registry.
2//!
3//! Tracks sub-agents spawned with `background: true` in `InvokeAgent`.
4//! The inference loop drains completed results and injects them as
5//! user-role messages so the model sees them on the next iteration.
6//!
7//! ## Lifecycle
8//!
9//! 1. **Spawn**: `InvokeAgent { background: true }` creates a tokio task
10//! 2. **Track**: the task handle + metadata are stored in `BgAgentRegistry`
11//! 3. **Poll**: before each inference call, the loop calls `drain_completed()`
12//! 4. **Inject**: completed results are appended as user messages
13//! 5. **Cleanup**: on registry drop, all pending task handles are aborted —
14//!    no orphan futures, no leaked worktrees. (Phase 1 of #1022, B3.)
15//!
16//! ## Cancellation cascade
17//!
18//! Bg-agent tasks receive a `CancellationToken` derived from the parent's
19//! token via `child_token()` (wired in `crate::sub_agent_dispatch`). When
20//! the parent is cancelled, every bg child sees it; when the registry
21//! drops without cancellation, [`tokio_util::task::AbortOnDropHandle`]
22//! still aborts the futures so we never leak. Both paths are covered.
23//! (Phase 1 of #1022, B2+B3.)
24//!
25//! ## Thread safety
26//!
27//! The registry is wrapped in `Arc<Mutex<>>` and shared between the main
28//! inference loop and the background task spawner.
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33// **#1022 B16**: was `std::sync::Mutex`. Switched to `parking_lot::Mutex`
34// for three reasons:
35//   1. **No poisoning** — if a thread panics while holding the lock,
36//      subsequent calls don't get a `PoisonError`. The bg registry
37//      is shared between the main inference loop and N spawned tasks;
38//      a panic in one critical section bricking every subsequent
39//      drain would be a particularly bad failure mode.
40//   2. **Faster on contention** — no atomic check for poison flag,
41//      no `Result` allocation. The contention is real: `drain_completed`
42//      runs on every loop iteration.
43//   3. **Cleaner API** — `.lock()` returns a guard directly, no
44//      `.unwrap()` boilerplate at every call site.
45// We deliberately keep this *sync* (not `tokio::sync::Mutex`) because
46// the critical sections are short HashMap ops with no awaits inside.
47use parking_lot::Mutex;
48use serde::{Deserialize, Serialize};
49use tokio::sync::{oneshot, watch};
50use tokio_util::sync::CancellationToken;
51use tokio_util::task::AbortOnDropHandle;
52
53use crate::engine::EngineEvent;
54
55// ── Layer 0 of #996 ──────────────────────────────────────────────────────
56//
57// Status enum + watch-channel plumbing + per-task cancel + snapshot API.
58// Pure infrastructure: no slash commands, no LLM tools, no UI changes.
59// Layers 1+ (slash commands, tools, status-bar pill) consume this surface.
60//
61// Modeled on Codex's `tokio::sync::watch::Receiver<AgentStatus>` pattern
62// (codex-rs/core/src/session/mod.rs). The bg-agent task drives the
63// `watch::Sender`; the registry stores the matching `Receiver` and exposes
64// snapshots to whoever asks (slash command, LLM tool, status-bar pill).
65
66/// Lifecycle of a single background sub-agent task.
67///
68/// The bg-agent future drives transitions through `watch::Sender<AgentStatus>`.
69/// Initial value is [`AgentStatus::Pending`]; the future flips to `Running`
70/// when execution actually starts and to one of the terminal variants
71/// (`Completed`, `Errored`, `Cancelled`) when it finishes.
72///
73/// `Running.iter` reflects the current inference iteration (1..=20).
74/// Background agents emit live updates via Layer 4 (#1058); `0` is
75/// the entry-point placeholder before the first iteration fires.
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(tag = "kind", rename_all = "snake_case")]
78pub enum AgentStatus {
79    /// Reserved but the spawned future hasn't started yet.
80    Pending,
81    /// Actively executing. `iter` is the current inference iteration
82    /// (1..=20); `0` means "started, no iter info yet" (Layer 0 default).
83    Running {
84        /// Current inference iteration (1..=20). `0` is the
85        /// entry-point placeholder emitted before the first iteration
86        /// in `run_bg_agent`; background agents update this live
87        /// (Layer 4, #1058).
88        iter: u8,
89    },
90    /// User or parent fired the cancel token. Terminal.
91    Cancelled,
92    /// Sub-agent returned a final answer. Terminal.
93    Completed {
94        /// The agent's final output. Truncation for display is the
95        /// renderer's job (see Codex's `COLLAB_AGENT_RESPONSE_PREVIEW_GRAPHEMES`).
96        summary: String,
97    },
98    /// Sub-agent returned an error. Terminal.
99    Errored {
100        /// Error message as produced by `execute_sub_agent`. Same
101        /// truncation note as `Completed.summary`.
102        error: String,
103    },
104}
105
106/// Snapshot of a pending bg-agent task — what `/agents` and the
107/// `ListBackgroundTasks` LLM tool will render.
108///
109/// Cloned out of the registry under the lock so callers can format/display
110/// without holding it. `age` is computed from `started_at` at snapshot time.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct BgTaskSnapshot {
113    /// Monotonic id assigned at `reserve()` time. Stable for the
114    /// lifetime of the task; reused across snapshots.
115    pub task_id: u32,
116    /// Configured agent name (`explore`, `verify`, ...).
117    pub agent_name: String,
118    /// The prompt the parent delegated. Surfaced verbatim by
119    /// `/agents -v`; truncation is the renderer's job.
120    pub prompt: String,
121    /// Wall-clock duration since the task was attached. Computed at
122    /// snapshot time, so successive snapshots of the same task
123    /// report different ages.
124    pub age: Duration,
125    /// Latest value from the task's `watch::Receiver<AgentStatus>`.
126    pub status: AgentStatus,
127    /// Sub-agent task that spawned this bg-agent. `None` = top-level
128    /// (the user's main conversation).
129    ///
130    /// **#996 Layer 2 / Model D**: tracked so that when a sub-agent
131    /// exits, [`BgAgentRegistry::cancel_for_spawner`] can fire the
132    /// cancel token on every bg-agent it left behind. Mirrors
133    /// Claude Code's `agentId` field on `LocalShellTaskState`
134    /// (`prevents 10-day fake-logs.sh zombies`).
135    ///
136    /// We don't use this for permission scoping — any caller with a
137    /// `task_id` can manage any task. Spawner is *only* a cleanup
138    /// hook (matching Claude Code's flat-permissions design).
139    pub spawner: Option<u32>,
140}
141
142/// Payload sent over the bg-agent oneshot.
143///
144/// Pre-#1022-B9 this was just `String` (the model's final output).
145/// Now also carries the trace lines collected by
146/// [`crate::engine::sink::BufferingSink`] so the inference loop
147/// can surface them to the user when injecting the result.
148///
149/// The `Result<BgPayload, BgPayload>` shape preserves the prior
150/// success/failure discrimination: `Ok` means `execute_sub_agent`
151/// returned text, `Err` means it returned an error (the trace is
152/// useful in *both* cases — the bg agent may have done several
153/// steps before erroring).
154pub type BgPayload = (String, Vec<String>);
155
156/// A completed background agent result.
157#[derive(Debug)]
158pub struct BgAgentResult {
159    /// The agent name that produced this result.
160    pub agent_name: String,
161    /// The original prompt that was delegated.
162    pub prompt: String,
163    /// The agent's output (or error message).
164    pub output: String,
165    /// Whether the agent succeeded.
166    pub success: bool,
167    /// **#1022 B9**: narrative trace lines captured by
168    /// [`crate::engine::sink::BufferingSink`] inside the bg agent.
169    /// Pre-fix this was implicitly always empty (bg agents ran with
170    /// `NullSink`). Now populated with one line per significant
171    /// event (tool start, info, auto-rejected approval) so the user
172    /// can see what the bg agent did at result-injection time.
173    /// Empty for the cancelled / panicked case (`output` carries the
174    /// failure detail in those paths).
175    pub events: Vec<String>,
176}
177
178/// Handle returned when a background agent is spawned.
179///
180/// Holds the task's [`tokio_util::task::AbortOnDropHandle`] so the
181/// future is aborted if the registry is dropped before the task
182/// completes (B3 of #1022). Also holds the per-task
183/// [`CancellationToken`] so future per-task cancel commands
184/// (`/cancel <id>` — see #996) have a hook to fire.
185struct BgAgentEntry {
186    agent_name: String,
187    prompt: String,
188    rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
189    /// Per-task cancel — derived as a `child_token()` of the parent
190    /// session's token at spawn time. Firing this token (via
191    /// [`BgAgentRegistry::cancel`] for #996, or via the registry-drop
192    /// path) causes the in-flight bg agent to observe `is_cancelled()`
193    /// on its next loop iteration.
194    cancel: CancellationToken,
195    /// Live status channel — the spawned future writes; the registry
196    /// reads at snapshot time. See [`AgentStatus`] for the lifecycle.
197    status_rx: watch::Receiver<AgentStatus>,
198    /// When the task was attached. Used to compute `age` in snapshots.
199    started_at: Instant,
200    /// Sub-agent task that spawned this bg-agent. `None` = top-level.
201    /// **#996 Layer 2 / Model D** — see [`BgTaskSnapshot::spawner`].
202    spawner: Option<u32>,
203    /// Aborts the spawned task on drop. The bg path uses
204    /// `tokio::spawn` on the multi-thread runtime (#1022 B5):
205    /// `execute_sub_agent` returns an explicitly `Send`-bounded
206    /// future, so abort works promptly at any await point. The
207    /// cancel-token cascade is still the primary stop signal
208    /// (so the bg task can run any cleanup it owns).
209    _handle: AbortOnDropHandle<()>,
210}
211
212/// Registry of running background sub-agents.
213///
214/// Shared via `Arc` between the inference loop (which drains results)
215/// and the tool dispatch (which spawns agents).
216pub struct BgAgentRegistry {
217    pending: Mutex<HashMap<u32, BgAgentEntry>>,
218    next_id: Mutex<u32>,
219    /// Queue of `EngineEvent::BgTaskUpdate` events produced by
220    /// [`BgStatusEmitter::send`]. Drained by the inference loop
221    /// alongside [`Self::drain_completed`] and forwarded to the
222    /// active `EngineSink`.
223    ///
224    /// **#1076**: closes the engine/UI boundary leak — prior to this
225    /// queue, bg-task status only reached the TUI by the TUI grabbing
226    /// `Arc<BgAgentRegistry>` directly out of `KodaSession` and
227    /// polling `snapshot()`. ACP / headless clients saw nothing.
228    /// Routing through `EngineEvent` puts every client surface on
229    /// the same channel.
230    ///
231    /// `VecDeque` not `Vec` because we drain FIFO (transition order
232    /// matters: `Pending` → `Running` → terminal must arrive in that
233    /// order even if the inference loop drains in batches).
234    events: Mutex<std::collections::VecDeque<EngineEvent>>,
235}
236
237/// Fan-out helper for bg-agent status transitions.
238///
239/// Bundles the per-task `watch::Sender<AgentStatus>` (read by
240/// `/agents` and the status-bar pill via the registry's snapshot
241/// API) with a back-reference to the registry's event queue (drained
242/// by the inference loop and forwarded to the engine sink — which is
243/// what closes the #1076 boundary leak).
244///
245/// `Clone` is intentional so Layer 4 (`#1058`, live `iter` heartbeat)
246/// can hold its own copy inside `execute_sub_agent` while
247/// `run_bg_agent` keeps another for the entry / terminal transitions.
248/// Both clones share the same `watch::Sender` and `Arc<registry>`,
249/// so every `.send()` reaches both fan-out targets.
250#[derive(Clone)]
251pub struct BgStatusEmitter {
252    task_id: u32,
253    spawner: Option<u32>,
254    status_tx: watch::Sender<AgentStatus>,
255    registry: Arc<BgAgentRegistry>,
256}
257
258impl BgStatusEmitter {
259    /// Construct from the parts handed back by [`BgAgentRegistry::reserve`].
260    ///
261    /// The registry `Arc` is held for the lifetime of the bg agent,
262    /// which is fine: the inference loop already keeps an `Arc` on
263    /// the same registry, and registry drop is what aborts every
264    /// in-flight bg task (B3 of #1022) — so an emitter outliving its
265    /// registry is impossible by construction.
266    pub fn new(
267        task_id: u32,
268        spawner: Option<u32>,
269        status_tx: watch::Sender<AgentStatus>,
270        registry: Arc<BgAgentRegistry>,
271    ) -> Self {
272        Self {
273            task_id,
274            spawner,
275            status_tx,
276            registry,
277        }
278    }
279
280    /// Drive a status transition.
281    ///
282    /// Fans out to:
283    /// 1. The per-task `watch::Sender` (so `snapshot()` / `/agents`
284    ///    see the new state on the next read — no behavior change).
285    /// 2. The registry's event queue, drained by the inference loop
286    ///    and forwarded to the active `EngineSink` (so the TUI / ACP
287    ///    / headless clients all see the same `BgTaskUpdate` event).
288    ///
289    /// `watch::Sender::send` only fails if every receiver was dropped,
290    /// which means the registry entry is gone — in that case the queue
291    /// push is harmless (it'll be drained and ignored by clients that
292    /// don't recognize the task id). We deliberately don't gate the
293    /// queue push on the watch send result so a racing reap doesn't
294    /// swallow the terminal `BgTaskUpdate`.
295    pub fn send(&self, status: AgentStatus) {
296        let _ = self.status_tx.send(status.clone());
297        self.registry.push_status_event(EngineEvent::BgTaskUpdate {
298            task_id: self.task_id,
299            spawner: self.spawner,
300            status,
301        });
302    }
303
304    /// Current status (read from the watch channel). Useful for
305    /// terminal-disambiguation logic (e.g. "was this a cancel or a
306    /// real error?") without taking the registry lock.
307    pub fn current(&self) -> AgentStatus {
308        self.status_tx.borrow().clone()
309    }
310
311    /// Test helper: peek the underlying watch sender. Production
312    /// code should always go through [`Self::send`].
313    #[cfg(test)]
314    pub fn status_sender(&self) -> watch::Sender<AgentStatus> {
315        self.status_tx.clone()
316    }
317}
318
319/// Reservation slot returned by [`BgAgentRegistry::reserve`].
320///
321/// The two-phase pattern (`reserve` → spawn → `attach`) lets the
322/// dispatcher hand the oneshot sender into the spawned future
323/// *before* the future exists, so the spawned closure can `move` it
324/// without referencing the registry. The `cancel` token is a
325/// `child_token()` of the parent's cancel — fires either when the
326/// parent fires (cascade) or when this slot is individually cancelled
327/// (future per-task `/cancel <id>` UX, #996).
328pub struct BgAgentReservation {
329    /// Monotonically-assigned task ID. Surfaces in user-facing
330    /// messages (`Background agent 'foo' started (task 7)`) and
331    /// keys the per-task `/cancel <id>` UX (#996).
332    pub task_id: u32,
333    /// Sender half of the result oneshot. Move into the spawned
334    /// future so it can deliver `Ok(output)` / `Err(message)`.
335    pub tx: oneshot::Sender<Result<BgPayload, BgPayload>>,
336    /// Receiver half. Move back into the registry via [`BgAgentRegistry::attach`]
337    /// so `drain_completed()` can poll it.
338    pub rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
339    /// Per-task cancel token. Cloned for the spawned future
340    /// (`bg_cancel`) and re-stored on the registry entry
341    /// (`entry_cancel`); both halves observe parent cancellation
342    /// because this is a `child_token()` of the parent.
343    pub cancel: CancellationToken,
344    /// Status sender — move into the spawned future. The future is
345    /// the sole writer; it transitions through
346    /// [`AgentStatus::Pending`] → `Running` → terminal.
347    pub status_tx: watch::Sender<AgentStatus>,
348    /// Status receiver — hand back to the registry via [`BgAgentRegistry::attach`]
349    /// so `snapshot()` and `/agents` can read the current state without
350    /// touching the spawn site.
351    pub status_rx: watch::Receiver<AgentStatus>,
352    /// Sub-agent task id of the spawner, or `None` for the top-level
353    /// loop. Carried verbatim to [`BgAgentRegistry::attach`] so the
354    /// entry knows who spawned it (Model D cleanup-on-exit).
355    pub spawner: Option<u32>,
356}
357
358impl BgAgentRegistry {
359    /// Create an empty registry.
360    pub fn new() -> Self {
361        Self {
362            pending: Mutex::new(HashMap::new()),
363            next_id: Mutex::new(1),
364            events: Mutex::new(std::collections::VecDeque::new()),
365        }
366    }
367
368    /// Push an event onto the status queue. Called by
369    /// [`BgStatusEmitter::send`]; not part of the public API.
370    pub(crate) fn push_status_event(&self, event: EngineEvent) {
371        self.events.lock().push_back(event);
372    }
373
374    /// Drain queued status events for forwarding to the active
375    /// `EngineSink`. Called by the inference loop alongside
376    /// [`Self::drain_completed`].
377    ///
378    /// Returns events in FIFO order (transition order); empty if
379    /// nothing changed since the last drain. Cheap: a single mutex
380    /// acquisition + `VecDeque::drain`. The vast majority of turns
381    /// will see 0–1 events.
382    pub fn drain_status_events(&self) -> Vec<EngineEvent> {
383        let mut q = self.events.lock();
384        q.drain(..).collect()
385    }
386
387    /// Reserve a task ID and produce a oneshot sender + child cancel
388    /// token for the spawn site to consume. Call [`Self::attach`] with
389    /// the resulting `JoinHandle` to complete registration.
390    ///
391    /// `spawner` records who is reserving this slot. `None` means
392    /// the top-level inference loop; `Some(invocation_id)` means a
393    /// sub-agent invocation. Used by [`Self::cancel_for_spawner`] to
394    /// reap children when a sub-agent exits (Model E cleanup-on-exit)
395    /// and by [`Self::snapshot_for_caller`] to scope LLM-tool views.
396    ///
397    /// The two-phase shape (`reserve` → spawn → `attach`) exists
398    /// because `tokio::spawn` produces the `JoinHandle` *after* the
399    /// future is built, but the future needs to own the `tx` to deliver
400    /// its result. Reservation gives us `tx` early; attach binds the
401    /// handle once it exists.
402    pub fn reserve(
403        &self,
404        parent_cancel: &CancellationToken,
405        spawner: Option<u32>,
406    ) -> BgAgentReservation {
407        let (tx, rx) = oneshot::channel();
408        let (status_tx, status_rx) = watch::channel(AgentStatus::Pending);
409        let mut id = self.next_id.lock();
410        let task_id = *id;
411        *id += 1;
412        BgAgentReservation {
413            task_id,
414            tx,
415            rx,
416            cancel: parent_cancel.child_token(),
417            status_tx,
418            status_rx,
419            spawner,
420        }
421    }
422
423    /// Bind a spawned task's metadata to a previously [`reserve`]d slot.
424    ///
425    /// `rx` must be the receiver paired with the `tx` handed out by
426    /// `reserve`. Holding `handle` as `AbortOnDropHandle` ensures the
427    /// task is aborted on registry drop (B3 of #1022). `status_rx`
428    /// is the read half of the watch channel whose write half
429    /// (`status_tx`) was moved into the spawned future.
430    ///
431    /// [`reserve`]: Self::reserve
432    //
433    // 9 args trips `clippy::too_many_arguments` (limit 7). Each one
434    // is load-bearing: id + name + prompt are display metadata;
435    // rx/cancel/status_rx are the three channels we own; spawner is
436    // the cleanup-routing key (Model E); handle is the
437    // AbortOnDropHandle. Bundling into a struct just to satisfy
438    // a heuristic would add a one-use type for no readability win
439    // — "practicality beats purity".
440    #[allow(clippy::too_many_arguments)]
441    pub fn attach(
442        &self,
443        reservation_id: u32,
444        agent_name: &str,
445        prompt: &str,
446        rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
447        cancel: CancellationToken,
448        status_rx: watch::Receiver<AgentStatus>,
449        spawner: Option<u32>,
450        handle: tokio::task::JoinHandle<()>,
451    ) {
452        self.pending.lock().insert(
453            reservation_id,
454            BgAgentEntry {
455                agent_name: agent_name.to_string(),
456                prompt: prompt.to_string(),
457                rx,
458                cancel,
459                status_rx,
460                started_at: Instant::now(),
461                spawner,
462                _handle: AbortOnDropHandle::new(handle),
463            },
464        );
465    }
466
467    /// Convenience for tests: register a synthetic entry without a
468    /// real spawned task. The provided `tx` can be used to fire the
469    /// result manually. The handle is a noop spawned task that
470    /// returns immediately, so `_handle` has something to abort.
471    #[cfg(test)]
472    pub fn register_test(
473        &self,
474        agent_name: &str,
475        prompt: &str,
476    ) -> (u32, oneshot::Sender<Result<BgPayload, BgPayload>>) {
477        let (id, tx, _status_tx, _cancel) =
478            self.register_test_with_status(agent_name, prompt, None);
479        (id, tx)
480    }
481
482    /// Test-only sibling of [`register_test`] that returns the status
483    /// sender so a test can manually drive transitions without
484    /// needing a real spawned `run_bg_agent`. The cancel token also
485    /// comes back so cancel-cascade tests can verify the channel.
486    ///
487    /// `spawner` is recorded on the entry so scope-filtering /
488    /// kill-on-exit tests can exercise both the top-level (`None`)
489    /// and sub-agent (`Some(id)`) paths.
490    #[cfg(test)]
491    pub fn register_test_with_status(
492        &self,
493        agent_name: &str,
494        prompt: &str,
495        spawner: Option<u32>,
496    ) -> (
497        u32,
498        oneshot::Sender<Result<BgPayload, BgPayload>>,
499        watch::Sender<AgentStatus>,
500        CancellationToken,
501    ) {
502        let (tx, rx) = oneshot::channel();
503        let (status_tx, status_rx) = watch::channel(AgentStatus::Pending);
504        let mut id = self.next_id.lock();
505        let task_id = *id;
506        *id += 1;
507        drop(id);
508        let cancel = CancellationToken::new();
509        let cancel_observer = cancel.clone();
510        let noop = tokio::spawn(async {});
511        self.pending.lock().insert(
512            task_id,
513            BgAgentEntry {
514                agent_name: agent_name.to_string(),
515                prompt: prompt.to_string(),
516                rx,
517                cancel,
518                status_rx,
519                started_at: Instant::now(),
520                spawner,
521                _handle: AbortOnDropHandle::new(noop),
522            },
523        );
524        (task_id, tx, status_tx, cancel_observer)
525    }
526
527    /// Drain all completed background agents. Non-blocking — only takes
528    /// entries whose oneshot has already resolved.
529    pub fn drain_completed(&self) -> Vec<BgAgentResult> {
530        let mut guard = self.pending.lock();
531        let mut completed = Vec::new();
532        let mut done_ids = Vec::new();
533
534        for (id, entry) in guard.iter_mut() {
535            match entry.rx.try_recv() {
536                Ok(Ok((output, events))) => {
537                    done_ids.push(*id);
538                    completed.push(BgAgentResult {
539                        agent_name: entry.agent_name.clone(),
540                        prompt: entry.prompt.clone(),
541                        output,
542                        success: true,
543                        events,
544                    });
545                }
546                Ok(Err((err, events))) => {
547                    done_ids.push(*id);
548                    completed.push(BgAgentResult {
549                        agent_name: entry.agent_name.clone(),
550                        prompt: entry.prompt.clone(),
551                        output: err,
552                        success: false,
553                        events,
554                    });
555                }
556                Err(oneshot::error::TryRecvError::Empty) => {
557                    // Still running
558                }
559                Err(oneshot::error::TryRecvError::Closed) => {
560                    // Sender dropped without sending — task panicked or was cancelled.
561                    // No events available (the buffering sink died with the task).
562                    done_ids.push(*id);
563                    completed.push(BgAgentResult {
564                        agent_name: entry.agent_name.clone(),
565                        prompt: entry.prompt.clone(),
566                        output: "[background agent task was cancelled]".to_string(),
567                        success: false,
568                        events: Vec::new(),
569                    });
570                }
571            }
572        }
573
574        for id in done_ids {
575            guard.remove(&id);
576        }
577
578        completed
579    }
580
581    /// How many background agents are still running.
582    pub fn pending_count(&self) -> usize {
583        self.pending.lock().len()
584    }
585
586    // ── Layer 0 of #996: per-task cancel + snapshot ───────────────────────────────
587
588    /// Fire the cancel token for a single task.
589    ///
590    /// Returns `true` if a pending task with that id existed and was
591    /// signalled, `false` if the id is unknown (already drained,
592    /// completed, or never registered). Idempotent: calling twice on
593    /// the same id is safe — [`CancellationToken::cancel`] is itself
594    /// idempotent.
595    ///
596    /// The entry stays in `pending` until the spawned future actually
597    /// observes the token and finishes. `drain_completed()` then
598    /// reaps it via the closed-sender path (or the future's terminal
599    /// `tx.send` if it noticed and shut down cleanly).
600    pub fn cancel(&self, task_id: u32) -> bool {
601        let guard = self.pending.lock();
602        match guard.get(&task_id) {
603            Some(entry) => {
604                entry.cancel.cancel();
605                true
606            }
607            None => false,
608        }
609    }
610
611    /// Snapshot every pending task's metadata for `/agents` and the
612    /// `ListBackgroundTasks` LLM tool.
613    ///
614    /// `age` is computed against `Instant::now()` at call time, so two
615    /// snapshots of the same task report different ages. Status is read
616    /// from each entry's `watch::Receiver` (no blocking, no waiting).
617    /// Sorted by ascending `task_id` so the output is stable across calls.
618    ///
619    /// **Unscoped**: returns every task regardless of spawner. Used by
620    /// the TUI `/agents` command (humans want the global view) and as
621    /// the engine of [`Self::snapshot_for_caller`] (which filters).
622    pub fn snapshot(&self) -> Vec<BgTaskSnapshot> {
623        let guard = self.pending.lock();
624        let now = Instant::now();
625        let mut out: Vec<_> = guard
626            .iter()
627            .map(|(id, entry)| BgTaskSnapshot {
628                task_id: *id,
629                agent_name: entry.agent_name.clone(),
630                prompt: entry.prompt.clone(),
631                age: now.saturating_duration_since(entry.started_at),
632                status: entry.status_rx.borrow().clone(),
633                spawner: entry.spawner,
634            })
635            .collect();
636        out.sort_by_key(|s| s.task_id);
637        out
638    }
639
640    /// Scoped snapshot for the `ListBackgroundTasks` LLM tool.
641    ///
642    /// **Model E scoping**: a caller only sees tasks whose `spawner`
643    /// matches its own `caller_spawner`. Top-level callers pass `None`
644    /// and see only top-level-spawned tasks; sub-agent callers pass
645    /// `Some(invocation_id)` and see only their own.
646    ///
647    /// Strict equality — a sub-agent does NOT see sibling sub-agents'
648    /// tasks, and the top-level does NOT see sub-agents' tasks via the
649    /// LLM (the TUI's `/agents` command remains the global view).
650    pub fn snapshot_for_caller(&self, caller_spawner: Option<u32>) -> Vec<BgTaskSnapshot> {
651        self.snapshot()
652            .into_iter()
653            .filter(|s| s.spawner == caller_spawner)
654            .collect()
655    }
656
657    /// Clone the status receiver for a task so callers can await
658    /// [`watch::Receiver::changed`] without holding the lock.
659    ///
660    /// Returns `None` if the task has already been drained from the registry.
661    /// Primarily used by tests to observe live iteration-counter updates
662    /// without polling `snapshot()` in a tight loop.
663    pub fn subscribe(&self, task_id: u32) -> Option<watch::Receiver<AgentStatus>> {
664        let guard = self.pending.lock();
665        guard.get(&task_id).map(|e| e.status_rx.clone())
666    }
667
668    // ── Layer 2 of #996 ───────────────────────────────────────────────
669    //
670    // Scoped cancel + cleanup-on-exit + WaitTask machinery.
671    // The unscoped [`Self::cancel`] above stays for the TUI (humans get
672    // the global view); LLM tools route through [`Self::cancel_as_caller`]
673    // so a sub-agent can't reach across into a sibling's task.
674
675    /// Scoped per-task cancel for the `CancelTask` LLM tool.
676    ///
677    /// **Model E permission rule** — `caller_spawner` must equal the
678    /// task's `spawner`, with `None == None` (top-level can cancel
679    /// top-level tasks; sub-agent invocation N can cancel only its
680    /// own tasks). Returns [`CancelOutcome::Forbidden`] otherwise.
681    ///
682    /// The unscoped [`Self::cancel`] is the TUI's contract — humans
683    /// at the keyboard implicitly have full authority. This method is
684    /// the LLM's contract.
685    pub fn cancel_as_caller(&self, task_id: u32, caller_spawner: Option<u32>) -> CancelOutcome {
686        let guard = self.pending.lock();
687        match guard.get(&task_id) {
688            None => CancelOutcome::NotFound,
689            Some(entry) if entry.spawner != caller_spawner => CancelOutcome::Forbidden,
690            Some(entry) => {
691                entry.cancel.cancel();
692                CancelOutcome::Cancelled
693            }
694        }
695    }
696
697    /// Fire the cancel token on every task whose `spawner` matches.
698    ///
699    /// Called from the sub-agent dispatch path when an invocation
700    /// exits (Model E cleanup-on-exit). Returns the number of tasks
701    /// signalled, purely for tracing — the actual reaping happens
702    /// later via [`Self::drain_completed`] once the futures observe
703    /// their cancel tokens and finish.
704    ///
705    /// Idempotent: re-calling on the same `spawner` after all tasks
706    /// have been reaped returns `0`.
707    pub fn cancel_for_spawner(&self, spawner: u32) -> usize {
708        let guard = self.pending.lock();
709        let mut count = 0;
710        for entry in guard.values() {
711            if entry.spawner == Some(spawner) {
712                entry.cancel.cancel();
713                count += 1;
714            }
715        }
716        count
717    }
718}
719
720/// Outcome of [`BgAgentRegistry::cancel_as_caller`].
721///
722/// Mirrors HTTP-ish status codes so the LLM-tool layer can produce
723/// useful error messages without inspecting registry internals.
724#[derive(Debug, Clone, Copy, PartialEq, Eq)]
725pub enum CancelOutcome {
726    /// Task existed, caller owned it, cancel token fired.
727    Cancelled,
728    /// No task with that id (already drained, never registered, or
729    /// completed and reaped).
730    NotFound,
731    /// Task exists but caller's `spawner` doesn't match. The LLM
732    /// surface translates this into a permission-style error.
733    Forbidden,
734}
735
736/// Outcome of [`BgAgentRegistry::wait_for_completion`].
737///
738/// Encodes the four resolutions of a `WaitTask` call. The LLM-tool
739/// layer translates each into a serialised payload the model receives.
740#[derive(Debug)]
741pub enum WaitOutcome {
742    /// Task reached a terminal `Completed`/`Errored` status before
743    /// the timeout fired. Carries the drained [`BgAgentResult`] so
744    /// the same payload `drain_completed()` would have produced is
745    /// surfaced directly to the caller. (Drain semantics: a task
746    /// consumed via `wait_for_completion` is removed from `pending`
747    /// so the next `drain_completed()` won't double-inject it.)
748    Completed(BgAgentResult),
749    /// Task was cancelled (parent token fired, peer cancelled, or
750    /// the spawned future panicked). The task has been reaped.
751    Cancelled,
752    /// Timeout fired before the task reached a terminal state. The
753    /// task is still in `pending` and may still complete on its own.
754    /// Carries the most-recent status snapshot so the model can
755    /// decide whether to wait again or move on.
756    TimedOut(BgTaskSnapshot),
757    /// No task with that id, or caller's spawner doesn't match.
758    /// Same `Forbidden`/`NotFound` distinction as [`CancelOutcome`].
759    NotFound,
760    /// Caller doesn't own this task.
761    Forbidden,
762}
763
764impl BgAgentRegistry {
765    /// Block until a single task reaches a terminal state, or until
766    /// `timeout` elapses. The tool layer is the sole caller; humans
767    /// use `/cancel` (synchronous) and the auto-drain path.
768    ///
769    /// **Drain semantics**: on `Completed`/`Cancelled`, the task is
770    /// removed from `pending` here so `drain_completed()` won't
771    /// surface it again on the next inference iteration. (This is
772    /// the resolution to the result-routing race we settled in
773    /// design Decision 3 — `WaitTask` consumes; auto-drain becomes a
774    /// no-op for that id.)
775    ///
776    /// **Scoping**: same Model E rule as [`Self::cancel_as_caller`].
777    /// `caller_spawner` must equal the task's `spawner` exactly.
778    ///
779    /// **Timeout**: bounded by the caller. The tool layer caps this
780    /// at 300 s before reaching here; we trust the bound but will
781    /// happily wait whatever value is passed (handy for tests with
782    /// `Duration::from_millis(50)`).
783    pub async fn wait_for_completion(
784        &self,
785        task_id: u32,
786        caller_spawner: Option<u32>,
787        timeout: Duration,
788    ) -> WaitOutcome {
789        // Phase 1 — ownership check + grab a status receiver to await on.
790        // We do NOT remove the entry yet: if we time out, it must remain
791        // visible to the next `drain_completed()` and to other callers.
792        let status_rx = {
793            let guard = self.pending.lock();
794            match guard.get(&task_id) {
795                None => return WaitOutcome::NotFound,
796                Some(entry) if entry.spawner != caller_spawner => {
797                    return WaitOutcome::Forbidden;
798                }
799                Some(entry) => entry.status_rx.clone(),
800            }
801        };
802
803        // Phase 2 — wait for terminal status or timeout.
804        // We watch the status channel rather than the oneshot so we
805        // can distinguish Cancelled (terminal, no payload) from
806        // Completed (terminal, payload pending on the oneshot). The
807        // spawned future writes status BEFORE sending the oneshot,
808        // so by the time we observe a terminal status the oneshot is
809        // either ready or about to be (sub-microsecond skew).
810        let wait_fut = wait_for_terminal_status(status_rx);
811        let result = tokio::time::timeout(timeout, wait_fut).await;
812
813        match result {
814            Err(_elapsed) => {
815                // Timeout: the task is still pending. Re-snapshot so
816                // the caller sees the latest status (it may have
817                // transitioned `Pending` → `Running` while we waited).
818                let snap = self.snapshot().into_iter().find(|s| s.task_id == task_id);
819                match snap {
820                    Some(s) => WaitOutcome::TimedOut(s),
821                    // Task vanished mid-wait — drain reaped it. The
822                    // result already injected into the conversation;
823                    // tell the caller it's gone.
824                    None => WaitOutcome::NotFound,
825                }
826            }
827            Ok(()) => {
828                // Terminal status observed. Pull the entry out so the
829                // auto-drain path won't see it again, then await the
830                // oneshot for the payload.
831                //
832                // PR #1043 review fix: previously this used
833                // `try_recv` + a back-to-back retry. The retry was a
834                // placebo — both calls run in the same scheduler tick
835                // with no `await` between them, so when the bg future
836                // sets `status_tx` *before* `tx.send` (the standard
837                // ordering in `sub_agent_dispatch::run_bg_agent`),
838                // a multi-thread runtime can wake the waiter on a
839                // *different* worker, observe `Empty` twice, and
840                // falsely report `Cancelled` for a successful task.
841                //
842                // `oneshot::Receiver` IS a `Future` — just `.await`
843                // it. The bound is set by `wait_for_terminal_status`
844                // having already observed the terminal status, so the
845                // sender is either landing or already dropped (the
846                // future writes status before sending the result, and
847                // panics drop both). A short inner timeout caps the
848                // "in-flight" window; the outer timeout is already
849                // exhausted at this point.
850                let entry = {
851                    let mut guard = self.pending.lock();
852                    let Some(entry) = guard.remove(&task_id) else {
853                        // Drain raced us and reaped first. Rare but
854                        // possible. The model already saw the result
855                        // in the prior turn's auto-drain.
856                        return WaitOutcome::NotFound;
857                    };
858                    entry
859                    // `guard` drops here — explicit scope guarantees
860                    // we don't hold the (non-Send) `parking_lot`
861                    // guard across the upcoming `.await`.
862                };
863                let agent_name = entry.agent_name;
864                let prompt = entry.prompt;
865                match tokio::time::timeout(Duration::from_millis(50), entry.rx).await {
866                    Ok(Ok(Ok((output, events)))) => WaitOutcome::Completed(BgAgentResult {
867                        agent_name,
868                        prompt,
869                        output,
870                        success: true,
871                        events,
872                    }),
873                    Ok(Ok(Err((err, events)))) => WaitOutcome::Completed(BgAgentResult {
874                        agent_name,
875                        prompt,
876                        output: err,
877                        success: false,
878                        events,
879                    }),
880                    // Sender dropped (panic) or 50ms elapsed without
881                    // a value landing — surface as Cancelled. Both
882                    // are degenerate cases: status said terminal,
883                    // payload never arrived.
884                    Ok(Err(_)) | Err(_) => WaitOutcome::Cancelled,
885                }
886            }
887        }
888    }
889}
890
891/// Wait for a `watch::Receiver<AgentStatus>` to report a terminal
892/// variant (`Completed`, `Errored`, `Cancelled`).
893///
894/// Returns when the current value is already terminal OR after a
895/// `changed()` event lands a terminal value. Yields control on
896/// every iteration so the timeout future in
897/// [`BgAgentRegistry::wait_for_completion`] gets a chance to fire.
898async fn wait_for_terminal_status(mut rx: watch::Receiver<AgentStatus>) {
899    loop {
900        let is_terminal = matches!(
901            *rx.borrow(),
902            AgentStatus::Completed { .. } | AgentStatus::Errored { .. } | AgentStatus::Cancelled
903        );
904        if is_terminal {
905            return;
906        }
907        // `changed()` resolves on the next write to the channel. If
908        // the sender was dropped (task panicked) it returns Err —
909        // treat as terminal so the caller can pull `Cancelled` from
910        // the closed oneshot.
911        if rx.changed().await.is_err() {
912            return;
913        }
914    }
915}
916
917impl Default for BgAgentRegistry {
918    fn default() -> Self {
919        Self::new()
920    }
921}
922
923impl Drop for BgAgentRegistry {
924    /// Abort every still-pending bg task on registry drop.
925    ///
926    /// `AbortOnDropHandle::drop` does the work — this impl exists
927    /// only to make the lifecycle explicit and to give a single
928    /// place to add telemetry later.
929    fn drop(&mut self) {
930        // **#1022 B16**: simplified post-parking_lot. The pre-fix
931        // version had to handle `PoisonError` (via
932        // `match get_mut() { Ok | Err(into_inner()) }`) because a
933        // panic-while-held would poison `std::sync::Mutex`.
934        // `parking_lot::Mutex` doesn't poison, so the cleanup path
935        // is now the obvious one: take the map, log if non-empty,
936        // let `AbortOnDropHandle::drop` do the actual abort work.
937        let map = std::mem::take(&mut *self.pending.lock());
938        if !map.is_empty() {
939            tracing::debug!(
940                count = map.len(),
941                "BgAgentRegistry dropped with pending tasks; aborting"
942            );
943        }
944        // Map drops here → each entry's `AbortOnDropHandle` aborts
945        // its task. No orphans. No leaked worktrees.
946    }
947}
948
949/// Wrap in Arc for sharing between inference loop and tool dispatch.
950pub fn new_shared() -> Arc<BgAgentRegistry> {
951    Arc::new(BgAgentRegistry::new())
952}
953
954#[cfg(test)]
955mod tests {
956    use super::*;
957    use std::sync::atomic::{AtomicBool, Ordering};
958    use std::time::Duration;
959
960    #[tokio::test]
961    async fn register_and_complete() {
962        let reg = BgAgentRegistry::new();
963        let (task_id, tx) = reg.register_test("explore", "find all tests");
964        assert_eq!(task_id, 1);
965        assert_eq!(reg.pending_count(), 1);
966
967        // Not yet complete
968        assert!(reg.drain_completed().is_empty());
969
970        // Complete it
971        tx.send(Ok(("found 42 tests".to_string(), Vec::new())))
972            .unwrap();
973        let results = reg.drain_completed();
974        assert_eq!(results.len(), 1);
975        assert_eq!(results[0].agent_name, "explore");
976        assert_eq!(results[0].output, "found 42 tests");
977        assert!(results[0].success);
978        assert_eq!(reg.pending_count(), 0);
979    }
980
981    #[tokio::test]
982    async fn drain_only_completed() {
983        let reg = BgAgentRegistry::new();
984        let (_id1, tx1) = reg.register_test("task", "build");
985        let (_id2, _tx2) = reg.register_test("explore", "search");
986
987        tx1.send(Ok(("done".to_string(), Vec::new()))).unwrap();
988
989        let results = reg.drain_completed();
990        assert_eq!(results.len(), 1);
991        assert_eq!(results[0].agent_name, "task");
992        assert_eq!(reg.pending_count(), 1); // explore still pending
993    }
994
995    #[tokio::test]
996    async fn dropped_sender_reports_cancelled() {
997        let reg = BgAgentRegistry::new();
998        let (_id, tx) = reg.register_test("task", "build");
999        drop(tx); // simulate task panic/cancel
1000
1001        let results = reg.drain_completed();
1002        assert_eq!(results.len(), 1);
1003        assert!(!results[0].success);
1004        assert!(results[0].output.contains("cancelled"));
1005    }
1006
1007    #[tokio::test]
1008    async fn error_result() {
1009        let reg = BgAgentRegistry::new();
1010        let (_id, tx) = reg.register_test("verify", "check");
1011        tx.send(Err(("test failures".to_string(), Vec::new())))
1012            .unwrap();
1013
1014        let results = reg.drain_completed();
1015        assert_eq!(results.len(), 1);
1016        assert!(!results[0].success);
1017        assert_eq!(results[0].output, "test failures");
1018    }
1019
1020    /// #1022 B9 regression: the narrative trace captured by
1021    /// `BufferingSink` inside the bg agent must propagate through
1022    /// the oneshot → registry → `BgAgentResult.events`. Pre-fix
1023    /// this field didn't exist; bg agents ran with `NullSink` and
1024    /// the user only saw spawn + completion lines. The fix is
1025    /// useless if the trace gets dropped at any of the three hops,
1026    /// so this test pins the round-trip end-to-end.
1027    #[tokio::test]
1028    async fn events_propagate_through_drain_for_success() {
1029        let reg = BgAgentRegistry::new();
1030        let (_id, tx) = reg.register_test("explore", "map repo");
1031        let trace = vec![
1032            "  \u{1f527} Read".to_string(),
1033            "  \u{1f527} Grep".to_string(),
1034            "  \u{26a1} cache hit".to_string(),
1035        ];
1036        tx.send(Ok(("map result".to_string(), trace.clone())))
1037            .unwrap();
1038
1039        let results = reg.drain_completed();
1040        assert_eq!(results.len(), 1);
1041        assert!(results[0].success);
1042        assert_eq!(
1043            results[0].events, trace,
1044            "trace lost between sender and BgAgentResult"
1045        );
1046    }
1047
1048    /// #1022 B9 regression: trace must propagate even when the bg
1049    /// agent failed. The trace is *most* useful in the failure case
1050    /// — "the agent tried Read, Bash, Edit, then errored" is the
1051    /// kind of breadcrumb that turns a black-box failure into a
1052    /// debuggable one.
1053    #[tokio::test]
1054    async fn events_propagate_through_drain_for_failure() {
1055        let reg = BgAgentRegistry::new();
1056        let (_id, tx) = reg.register_test("build", "compile");
1057        let trace = vec![
1058            "  \u{1f527} Bash".to_string(),
1059            "  \u{2398} approval auto-rejected for Delete (no user channel)".to_string(),
1060        ];
1061        tx.send(Err(("compile failed".to_string(), trace.clone())))
1062            .unwrap();
1063
1064        let results = reg.drain_completed();
1065        assert_eq!(results.len(), 1);
1066        assert!(!results[0].success);
1067        assert_eq!(results[0].events, trace);
1068    }
1069
1070    /// #1022 B9 corollary: cancelled / panicked tasks have *no*
1071    /// trace available (the buffering sink died with the task), and
1072    /// that's an explicitly-empty Vec rather than uninitialized.
1073    #[tokio::test]
1074    async fn cancelled_task_has_empty_event_trace() {
1075        let reg = BgAgentRegistry::new();
1076        let (_id, tx) = reg.register_test("flaky", "x");
1077        drop(tx); // simulate panic / abort
1078        let results = reg.drain_completed();
1079        assert_eq!(results.len(), 1);
1080        assert!(!results[0].success);
1081        assert!(
1082            results[0].events.is_empty(),
1083            "cancel path must yield empty trace"
1084        );
1085    }
1086
1087    /// Phase 1 of #1022, B3 regression test: dropping the registry
1088    /// must abort still-running spawned tasks. Without
1089    /// `AbortOnDropHandle` (or an explicit `JoinHandle::abort` in
1090    /// `Drop`), the spawned future would keep running after the
1091    /// registry — and any worktrees / API tokens / writes it owns —
1092    /// were dropped. That's the leak we're fixing.
1093    #[tokio::test]
1094    async fn registry_drop_aborts_pending_tasks() {
1095        let reg = BgAgentRegistry::new();
1096        let parent = CancellationToken::new();
1097        let reservation = reg.reserve(&parent, None);
1098        let task_id = reservation.task_id;
1099        let cancel_for_task = reservation.cancel.clone();
1100        let tx = reservation.tx;
1101        let rx = reservation.rx;
1102        let cancel_for_entry = reservation.cancel;
1103        let status_rx = reservation.status_rx;
1104
1105        // Use a flag the task sets only if it ever finishes a full
1106        // sleep. If abort works, the flag stays false even though
1107        // we wait long enough for a non-aborted task to finish.
1108        let ran_to_completion = Arc::new(AtomicBool::new(false));
1109        let flag = ran_to_completion.clone();
1110        let handle = tokio::spawn(async move {
1111            // Either the cancel token fires (parent cascade) or we
1112            // get aborted (drop cascade). The slow sleep just gives
1113            // the test time to drop the registry before we'd
1114            // naturally finish.
1115            tokio::select! {
1116                _ = cancel_for_task.cancelled() => {}
1117                _ = tokio::time::sleep(Duration::from_secs(60)) => {
1118                    flag.store(true, Ordering::SeqCst);
1119                }
1120            }
1121            let _ = tx.send(Ok(("done".to_string(), Vec::new())));
1122        });
1123        reg.attach(
1124            task_id,
1125            "explore",
1126            "long task",
1127            rx,
1128            cancel_for_entry,
1129            status_rx,
1130            None,
1131            handle,
1132        );
1133
1134        // Give the task a tick to start.
1135        tokio::time::sleep(Duration::from_millis(20)).await;
1136        assert_eq!(reg.pending_count(), 1);
1137
1138        // Drop the registry — this must abort the spawned task.
1139        drop(reg);
1140
1141        // Yield long enough for the abort to land; well under the
1142        // 60 s sleep the task would have completed otherwise.
1143        tokio::time::sleep(Duration::from_millis(100)).await;
1144        assert!(
1145            !ran_to_completion.load(Ordering::SeqCst),
1146            "task slept to completion — AbortOnDropHandle did not abort it"
1147        );
1148    }
1149
1150    /// Phase 1 of #1022, B2 regression test: cancelling the parent
1151    /// token must cascade to bg-agent child tokens handed out by
1152    /// `reserve`.
1153    #[tokio::test]
1154    async fn parent_cancel_cascades_to_reserved_child() {
1155        let reg = BgAgentRegistry::new();
1156        let parent = CancellationToken::new();
1157        let r1 = reg.reserve(&parent, None);
1158        let r2 = reg.reserve(&parent, None);
1159
1160        assert!(!r1.cancel.is_cancelled());
1161        assert!(!r2.cancel.is_cancelled());
1162
1163        parent.cancel();
1164
1165        assert!(
1166            r1.cancel.is_cancelled(),
1167            "child 1 token should observe parent cancel"
1168        );
1169        assert!(
1170            r2.cancel.is_cancelled(),
1171            "child 2 token should observe parent cancel"
1172        );
1173    }
1174
1175    // ── Layer 0 of #996 ──────────────────────────────────────────────────────
1176    //
1177    // Status channel + per-task cancel + snapshot.
1178
1179    /// `cancel(task_id)` must fire that task's cancel token.
1180    /// This is the hook the future `/cancel <id>` slash command and
1181    /// `CancelAgent` LLM tool will call. Verifies a known id returns
1182    /// true *and* the underlying token actually fires.
1183    #[tokio::test]
1184    async fn cancel_known_task_fires_token() {
1185        let reg = BgAgentRegistry::new();
1186        let (task_id, _tx, _status_tx, observer) =
1187            reg.register_test_with_status("explore", "map repo", None);
1188
1189        assert!(!observer.is_cancelled(), "precondition");
1190        let fired = reg.cancel(task_id);
1191        assert!(fired, "cancel(known_id) should report success");
1192        assert!(
1193            observer.is_cancelled(),
1194            "the task's cancel token should observe the cancellation"
1195        );
1196    }
1197
1198    /// `cancel` on an unknown / already-drained id must return false
1199    /// instead of panicking. The slash command and LLM tool will
1200    /// surface this to the user as "no such task".
1201    #[tokio::test]
1202    async fn cancel_unknown_task_returns_false() {
1203        let reg = BgAgentRegistry::new();
1204        assert!(
1205            !reg.cancel(999),
1206            "cancel of an unknown id should be a no-op returning false"
1207        );
1208    }
1209
1210    /// `cancel` is idempotent — calling twice on the same id is safe
1211    /// (the underlying [`CancellationToken::cancel`] is itself
1212    /// idempotent). Both calls return true while the entry is still
1213    /// in `pending`; a third call after drain returns false.
1214    #[tokio::test]
1215    async fn cancel_is_idempotent_while_pending() {
1216        let reg = BgAgentRegistry::new();
1217        let (task_id, _tx, _status_tx, _observer) =
1218            reg.register_test_with_status("explore", "x", None);
1219
1220        assert!(reg.cancel(task_id));
1221        assert!(
1222            reg.cancel(task_id),
1223            "second cancel should still find the entry and report success"
1224        );
1225    }
1226
1227    /// `snapshot()` must return one entry per pending task with
1228    /// stable ordering by `task_id`. Status defaults to `Pending`
1229    /// because no spawned future has flipped it yet.
1230    #[tokio::test]
1231    async fn snapshot_lists_pending_tasks_in_id_order() {
1232        let reg = BgAgentRegistry::new();
1233        let (id_a, _tx_a) = reg.register_test("explore", "map");
1234        let (id_b, _tx_b) = reg.register_test("verify", "check");
1235
1236        let snap = reg.snapshot();
1237        assert_eq!(snap.len(), 2);
1238        // Ordering is by ascending task_id, regardless of HashMap
1239        // iteration order — this is the contract `/agents` relies on.
1240        assert_eq!(snap[0].task_id, id_a);
1241        assert_eq!(snap[0].agent_name, "explore");
1242        assert_eq!(snap[0].prompt, "map");
1243        assert_eq!(snap[0].status, AgentStatus::Pending);
1244        assert_eq!(snap[1].task_id, id_b);
1245        assert_eq!(snap[1].agent_name, "verify");
1246        assert_eq!(snap[1].status, AgentStatus::Pending);
1247    }
1248
1249    /// `snapshot()` reads the live status channel — a `status_tx.send`
1250    /// must be observable on the very next snapshot, with no polling
1251    /// or yielding required (`watch::Receiver::borrow` is sync).
1252    /// This is the contract that lets the status-bar pill (Layer 3)
1253    /// and live `/agents -v` (Layer 1) reflect transitions immediately.
1254    #[tokio::test]
1255    async fn snapshot_reflects_status_writes() {
1256        let reg = BgAgentRegistry::new();
1257        let (task_id, _tx, status_tx, _cancel) =
1258            reg.register_test_with_status("explore", "map", None);
1259
1260        // Default is Pending.
1261        assert_eq!(reg.snapshot()[0].status, AgentStatus::Pending);
1262
1263        // Flip to Running and observe.
1264        status_tx.send(AgentStatus::Running { iter: 3 }).unwrap();
1265        let snap = reg.snapshot();
1266        assert_eq!(snap.len(), 1);
1267        assert_eq!(snap[0].task_id, task_id);
1268        assert_eq!(snap[0].status, AgentStatus::Running { iter: 3 });
1269
1270        // Flip to Completed and observe.
1271        status_tx
1272            .send(AgentStatus::Completed {
1273                summary: "42 files".to_string(),
1274            })
1275            .unwrap();
1276        assert_eq!(
1277            reg.snapshot()[0].status,
1278            AgentStatus::Completed {
1279                summary: "42 files".to_string()
1280            }
1281        );
1282    }
1283
1284    /// `snapshot()` reports a sane `age` that grows monotonically.
1285    /// We don't assert exact values (CI clocks are jittery) — just
1286    /// that two successive snapshots show a non-decreasing age and
1287    /// that the value is non-negative (saturating subtraction
1288    /// prevents underflow if the system clock jumps backwards).
1289    #[tokio::test]
1290    async fn snapshot_age_is_monotonic() {
1291        let reg = BgAgentRegistry::new();
1292        let (_id, _tx) = reg.register_test("explore", "x");
1293
1294        let age1 = reg.snapshot()[0].age;
1295        tokio::time::sleep(Duration::from_millis(15)).await;
1296        let age2 = reg.snapshot()[0].age;
1297        assert!(
1298            age2 >= age1,
1299            "age should be monotonic non-decreasing across snapshots"
1300        );
1301    }
1302
1303    /// `snapshot()` on an empty registry returns an empty Vec, not a
1304    /// panic and not None. `/agents` will use this to render "No
1305    /// background agents."
1306    #[tokio::test]
1307    async fn snapshot_empty_registry_is_empty_vec() {
1308        let reg = BgAgentRegistry::new();
1309        assert!(reg.snapshot().is_empty());
1310    }
1311
1312    /// Once a task is drained (completed and removed from `pending`),
1313    /// it disappears from `snapshot()` immediately. This pins the
1314    /// contract that `/agents` reflects the *currently-pending* set,
1315    /// not historical tasks. The Layer 1 "recently-completed lingers
1316    /// 30 s" UX is implemented at the *display* layer, not here.
1317    #[tokio::test]
1318    async fn snapshot_drops_drained_tasks() {
1319        let reg = BgAgentRegistry::new();
1320        let (_id, tx) = reg.register_test("explore", "x");
1321        assert_eq!(reg.snapshot().len(), 1);
1322
1323        tx.send(Ok(("done".to_string(), Vec::new()))).unwrap();
1324        let _ = reg.drain_completed();
1325
1326        assert!(
1327            reg.snapshot().is_empty(),
1328            "drained tasks must not appear in snapshots"
1329        );
1330    }
1331
1332    // ── Layer 2 of #996: scoped APIs + WaitOutcome ────────────────────────
1333
1334    /// `snapshot_for_caller(None)` returns only top-level tasks;
1335    /// `snapshot_for_caller(Some(N))` returns only N's tasks. Cross-spawner
1336    /// visibility is exactly zero — the Model E isolation guarantee.
1337    #[tokio::test]
1338    async fn snapshot_for_caller_filters_by_spawner() {
1339        let reg = BgAgentRegistry::new();
1340        let (top_id, _tx, _, _) = reg.register_test_with_status("a", "top", None);
1341        let (sub_a_id, _tx, _, _) = reg.register_test_with_status("b", "sub-a", Some(7));
1342        let (_sub_b_id, _tx, _, _) = reg.register_test_with_status("c", "sub-b", Some(9));
1343
1344        let top = reg.snapshot_for_caller(None);
1345        assert_eq!(top.len(), 1);
1346        assert_eq!(top[0].task_id, top_id);
1347
1348        let sub_a = reg.snapshot_for_caller(Some(7));
1349        assert_eq!(sub_a.len(), 1);
1350        assert_eq!(sub_a[0].task_id, sub_a_id);
1351
1352        // Sub-agent 7 sees nothing of sub-agent 9's, of top-level's, etc.
1353        assert!(reg.snapshot_for_caller(Some(42)).is_empty());
1354    }
1355
1356    /// `cancel_as_caller` enforces the Model E permission rule.
1357    #[tokio::test]
1358    async fn cancel_as_caller_returns_forbidden_for_other_spawner() {
1359        let reg = BgAgentRegistry::new();
1360        let (id, _tx, _, observer) = reg.register_test_with_status("x", "y", Some(7));
1361
1362        // Wrong caller — not the top-level (None != Some(7)) and not a peer.
1363        assert_eq!(
1364            reg.cancel_as_caller(id, None),
1365            CancelOutcome::Forbidden,
1366            "top-level must not be able to cancel sub-agent's task"
1367        );
1368        assert_eq!(
1369            reg.cancel_as_caller(id, Some(99)),
1370            CancelOutcome::Forbidden,
1371            "sibling sub-agent must not be able to cancel"
1372        );
1373        assert!(
1374            !observer.is_cancelled(),
1375            "forbidden calls must NOT fire the cancel token"
1376        );
1377
1378        // Correct caller — the original spawner.
1379        assert_eq!(reg.cancel_as_caller(id, Some(7)), CancelOutcome::Cancelled);
1380        assert!(observer.is_cancelled());
1381    }
1382
1383    #[tokio::test]
1384    async fn cancel_as_caller_returns_not_found_for_unknown_id() {
1385        let reg = BgAgentRegistry::new();
1386        assert_eq!(reg.cancel_as_caller(999, None), CancelOutcome::NotFound);
1387    }
1388
1389    /// `cancel_for_spawner` cleans up exactly one sub-agent's children
1390    /// and leaves siblings + top-level alone. The cleanup-on-exit hook.
1391    #[tokio::test]
1392    async fn cancel_for_spawner_kills_only_matching_children() {
1393        let reg = BgAgentRegistry::new();
1394        let (_top, _, _, top_obs) = reg.register_test_with_status("top", "t", None);
1395        let (_a1, _, _, a1_obs) = reg.register_test_with_status("a1", "x", Some(7));
1396        let (_a2, _, _, a2_obs) = reg.register_test_with_status("a2", "y", Some(7));
1397        let (_b, _, _, b_obs) = reg.register_test_with_status("b", "z", Some(9));
1398
1399        let count = reg.cancel_for_spawner(7);
1400        assert_eq!(count, 2, "both of spawner 7's children must be signalled");
1401
1402        assert!(a1_obs.is_cancelled());
1403        assert!(a2_obs.is_cancelled());
1404        assert!(!top_obs.is_cancelled(), "top-level must be untouched");
1405        assert!(!b_obs.is_cancelled(), "sibling spawner's task untouched");
1406
1407        // Idempotent — calling again after entries are still alive
1408        // re-fires the (already-cancelled, no-op) tokens.
1409        assert_eq!(reg.cancel_for_spawner(7), 2);
1410        // Calling for an unknown spawner returns 0.
1411        assert_eq!(reg.cancel_for_spawner(99), 0);
1412    }
1413
1414    /// `wait_for_completion` returns `Completed` and consumes the entry
1415    /// (so a subsequent `drain_completed` can't double-inject).
1416    #[tokio::test]
1417    async fn wait_for_completion_consumes_completed_task() {
1418        let reg = BgAgentRegistry::new();
1419        let (id, tx, status_tx, _) = reg.register_test_with_status("explore", "map", Some(3));
1420
1421        // Fire the result, then transition status to terminal so the
1422        // wait future wakes up.
1423        tx.send(Ok(("final answer".to_string(), vec!["step 1".to_string()])))
1424            .unwrap();
1425        status_tx
1426            .send(AgentStatus::Completed {
1427                summary: "final answer".to_string(),
1428            })
1429            .unwrap();
1430
1431        let outcome = reg
1432            .wait_for_completion(id, Some(3), Duration::from_secs(1))
1433            .await;
1434        match outcome {
1435            WaitOutcome::Completed(result) => {
1436                assert!(result.success);
1437                assert_eq!(result.output, "final answer");
1438                assert_eq!(result.events, vec!["step 1".to_string()]);
1439            }
1440            other => panic!("expected Completed, got {other:?}"),
1441        }
1442
1443        // Entry must be gone — drain sees nothing.
1444        assert_eq!(reg.drain_completed().len(), 0);
1445        assert!(reg.snapshot().is_empty());
1446    }
1447
1448    /// `wait_for_completion` returns `TimedOut` with a fresh snapshot
1449    /// when the task hasn't finished yet — and crucially leaves the
1450    /// entry in the registry so a later drain still works.
1451    #[tokio::test]
1452    async fn wait_for_completion_timeout_preserves_entry() {
1453        let reg = BgAgentRegistry::new();
1454        let (id, _tx, status_tx, _) = reg.register_test_with_status("slow", "x", None);
1455
1456        // Move to Running so the snapshot test below can verify the
1457        // current status got carried through.
1458        status_tx.send(AgentStatus::Running { iter: 2 }).unwrap();
1459
1460        let outcome = reg
1461            .wait_for_completion(id, None, Duration::from_millis(40))
1462            .await;
1463        match outcome {
1464            WaitOutcome::TimedOut(snap) => {
1465                assert_eq!(snap.task_id, id);
1466                assert_eq!(snap.status, AgentStatus::Running { iter: 2 });
1467            }
1468            other => panic!("expected TimedOut, got {other:?}"),
1469        }
1470        // Still in pending after a timeout.
1471        assert_eq!(reg.snapshot().len(), 1);
1472    }
1473
1474    /// `wait_for_completion` enforces the same Model E permission rule
1475    /// as `cancel_as_caller`.
1476    #[tokio::test]
1477    async fn wait_for_completion_returns_forbidden_for_other_spawner() {
1478        let reg = BgAgentRegistry::new();
1479        let (id, _tx, _, _) = reg.register_test_with_status("x", "y", Some(5));
1480
1481        let outcome = reg
1482            .wait_for_completion(id, None, Duration::from_millis(20))
1483            .await;
1484        assert!(
1485            matches!(outcome, WaitOutcome::Forbidden),
1486            "top-level must not be able to wait on sub-agent task; got {outcome:?}"
1487        );
1488
1489        let outcome = reg
1490            .wait_for_completion(id, Some(99), Duration::from_millis(20))
1491            .await;
1492        assert!(
1493            matches!(outcome, WaitOutcome::Forbidden),
1494            "sibling sub-agent must not be able to wait; got {outcome:?}"
1495        );
1496    }
1497
1498    /// Cancellation between status going terminal and `WaitTask` waking
1499    /// up surfaces as `Cancelled` (oneshot closed without sending).
1500    #[tokio::test]
1501    async fn wait_for_completion_returns_cancelled_when_sender_dropped() {
1502        let reg = BgAgentRegistry::new();
1503        let (id, tx, status_tx, _) = reg.register_test_with_status("x", "y", None);
1504
1505        // Drop the sender (simulates task panic / abort), then push
1506        // the status to terminal so wait wakes up.
1507        drop(tx);
1508        status_tx.send(AgentStatus::Cancelled).unwrap();
1509
1510        let outcome = reg
1511            .wait_for_completion(id, None, Duration::from_secs(1))
1512            .await;
1513        assert!(matches!(outcome, WaitOutcome::Cancelled), "got {outcome:?}");
1514        assert!(reg.snapshot().is_empty(), "entry must be reaped");
1515    }
1516
1517    #[tokio::test]
1518    async fn wait_for_completion_returns_not_found_for_unknown_id() {
1519        let reg = BgAgentRegistry::new();
1520        let outcome = reg
1521            .wait_for_completion(999, None, Duration::from_millis(10))
1522            .await;
1523        assert!(matches!(outcome, WaitOutcome::NotFound), "got {outcome:?}");
1524    }
1525
1526    /// Regression test for the PR #1043 race fix.
1527    ///
1528    /// Scenario: the bg future writes terminal `AgentStatus` to the
1529    /// watch channel, then — after a yield-induced gap — sends the
1530    /// payload on the oneshot. The waiter is woken on the watch
1531    /// notify and races to read the oneshot.
1532    ///
1533    /// Before the fix, `wait_for_completion` did `try_recv()` twice
1534    /// back-to-back with no `await` between them; on a multi-thread
1535    /// runtime the second `try_recv` could observe `Empty` again and
1536    /// return `Cancelled` for a successful task. The fix awaits the
1537    /// oneshot future directly with a short inner timeout, which
1538    /// gives the sender's task a chance to run.
1539    ///
1540    /// Multi-thread runtime + explicit `yield_now` between the two
1541    /// sends reliably triggers the old race.
1542    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1543    async fn wait_for_completion_handles_status_then_yield_then_payload() {
1544        let reg = Arc::new(BgAgentRegistry::new());
1545        let (id, tx, status_tx, _observer) =
1546            reg.register_test_with_status("explore", "map repo", None);
1547
1548        // Spawn a task that mimics `run_bg_agent`'s send ordering:
1549        // status first, yield, payload. The yield is what exposed the
1550        // race — it forces tokio to potentially schedule the waiter
1551        // between the two sends.
1552        let send_task = tokio::spawn(async move {
1553            status_tx
1554                .send(AgentStatus::Completed {
1555                    summary: "done".into(),
1556                })
1557                .unwrap();
1558            tokio::task::yield_now().await;
1559            tokio::task::yield_now().await;
1560            let _ = tx.send(Ok(("final".into(), vec!["e1".into()])));
1561        });
1562
1563        let outcome = reg
1564            .wait_for_completion(id, None, Duration::from_secs(2))
1565            .await;
1566        send_task.await.unwrap();
1567
1568        match outcome {
1569            WaitOutcome::Completed(result) => {
1570                assert_eq!(result.output, "final");
1571                assert!(result.success);
1572                assert_eq!(result.events, vec!["e1".to_string()]);
1573            }
1574            other => panic!("expected Completed, got {other:?}"),
1575        }
1576    }
1577
1578    // ── #1076: BgStatusEmitter → sink fan-out ───────────────────────────────
1579
1580    /// Helper: build an emitter wired to the given registry, mirroring
1581    /// the production construction in `sub_agent_dispatch::execute_sub_agent`.
1582    fn emitter_for(
1583        reg: &Arc<BgAgentRegistry>,
1584        task_id: u32,
1585        spawner: Option<u32>,
1586    ) -> BgStatusEmitter {
1587        let (tx, _rx) = watch::channel(AgentStatus::Pending);
1588        BgStatusEmitter::new(task_id, spawner, tx, reg.clone())
1589    }
1590
1591    fn extract(event: &EngineEvent) -> (u32, Option<u32>, &AgentStatus) {
1592        match event {
1593            EngineEvent::BgTaskUpdate {
1594                task_id,
1595                spawner,
1596                status,
1597            } => (*task_id, *spawner, status),
1598            other => panic!("expected BgTaskUpdate, got {other:?}"),
1599        }
1600    }
1601
1602    #[test]
1603    fn emitter_send_queues_engine_event_on_registry() {
1604        let reg = Arc::new(BgAgentRegistry::new());
1605        let emitter = emitter_for(&reg, 7, Some(42));
1606
1607        // Initial: queue is empty.
1608        assert!(
1609            reg.drain_status_events().is_empty(),
1610            "fresh registry must have an empty event queue"
1611        );
1612
1613        emitter.send(AgentStatus::Running { iter: 0 });
1614        let drained = reg.drain_status_events();
1615        assert_eq!(drained.len(), 1, "single send must produce one event");
1616        let (id, spawner, status) = extract(&drained[0]);
1617        assert_eq!(id, 7);
1618        assert_eq!(spawner, Some(42));
1619        assert!(matches!(status, AgentStatus::Running { iter: 0 }));
1620    }
1621
1622    #[test]
1623    fn emitter_drain_is_fifo_and_clears_queue() {
1624        let reg = Arc::new(BgAgentRegistry::new());
1625        let emitter = emitter_for(&reg, 1, None);
1626
1627        emitter.send(AgentStatus::Running { iter: 0 });
1628        emitter.send(AgentStatus::Running { iter: 1 });
1629        emitter.send(AgentStatus::Running { iter: 2 });
1630        emitter.send(AgentStatus::Completed {
1631            summary: "done".into(),
1632        });
1633
1634        let drained = reg.drain_status_events();
1635        assert_eq!(drained.len(), 4, "all four sends must surface");
1636
1637        // FIFO: transition order is preserved across batches.  This
1638        // matters for clients that render "iter N" progress — a
1639        // reorder would show the count moving backwards.
1640        let iters: Vec<_> = drained
1641            .iter()
1642            .filter_map(|e| match e {
1643                EngineEvent::BgTaskUpdate {
1644                    status: AgentStatus::Running { iter },
1645                    ..
1646                } => Some(*iter),
1647                _ => None,
1648            })
1649            .collect();
1650        assert_eq!(iters, vec![0, 1, 2]);
1651
1652        // Last event is the terminal Completed.
1653        assert!(matches!(
1654            extract(&drained[3]).2,
1655            AgentStatus::Completed { .. }
1656        ));
1657
1658        // Drain consumes — second drain is empty.
1659        assert!(
1660            reg.drain_status_events().is_empty(),
1661            "drain must clear the queue"
1662        );
1663    }
1664
1665    #[test]
1666    fn emitter_send_also_updates_watch_channel() {
1667        // The watch fan-out is what `/agents` and `snapshot()` read.
1668        // Sink fan-out (queue) is for the inference-loop → EngineSink
1669        // path.  Both targets must see every transition or `/agents`
1670        // and the TUI/ACP/headless clients will disagree on state.
1671        let reg = Arc::new(BgAgentRegistry::new());
1672        let (tx, mut rx) = watch::channel(AgentStatus::Pending);
1673        let emitter = BgStatusEmitter::new(3, None, tx, reg.clone());
1674
1675        emitter.send(AgentStatus::Running { iter: 5 });
1676
1677        // Watch channel observed.
1678        assert!(matches!(
1679            *rx.borrow_and_update(),
1680            AgentStatus::Running { iter: 5 }
1681        ));
1682        // Queue observed.
1683        let drained = reg.drain_status_events();
1684        assert_eq!(drained.len(), 1);
1685        assert!(matches!(
1686            extract(&drained[0]).2,
1687            AgentStatus::Running { iter: 5 }
1688        ));
1689    }
1690
1691    #[test]
1692    fn emitter_clones_share_queue_and_watch() {
1693        // Layer 4 holds one clone for live `iter` heartbeats;
1694        // `run_bg_agent` keeps another for entry / terminal sends.
1695        // Both clones must funnel into the same registry queue and
1696        // the same per-task watch channel — otherwise terminal
1697        // states could land on a different queue than the heartbeats
1698        // and clients would see Running forever.
1699        let reg = Arc::new(BgAgentRegistry::new());
1700        let (tx, _rx) = watch::channel(AgentStatus::Pending);
1701        let a = BgStatusEmitter::new(11, Some(2), tx, reg.clone());
1702        let b = a.clone();
1703
1704        a.send(AgentStatus::Running { iter: 1 });
1705        b.send(AgentStatus::Completed {
1706            summary: "ok".into(),
1707        });
1708
1709        let drained = reg.drain_status_events();
1710        assert_eq!(drained.len(), 2, "clones must share the registry queue");
1711        // Watch channel reflects the LATEST send, regardless of which
1712        // clone made it (watch is overwriting by design).
1713        assert!(matches!(a.current(), AgentStatus::Completed { .. }));
1714    }
1715
1716    #[test]
1717    fn agent_status_round_trips_through_serde() {
1718        // `EngineEvent::BgTaskUpdate` is the wire format for ACP /
1719        // headless / future transports.  All `AgentStatus` variants
1720        // must survive a serde round-trip or the boundary leak fix
1721        // creates a new boundary leak (engine emits, transport drops
1722        // it on the floor).
1723        for status in [
1724            AgentStatus::Pending,
1725            AgentStatus::Running { iter: 0 },
1726            AgentStatus::Running { iter: 17 },
1727            AgentStatus::Cancelled,
1728            AgentStatus::Completed {
1729                summary: "hello".into(),
1730            },
1731            AgentStatus::Errored {
1732                error: "boom".into(),
1733            },
1734        ] {
1735            let event = EngineEvent::BgTaskUpdate {
1736                task_id: 1,
1737                spawner: Some(2),
1738                status: status.clone(),
1739            };
1740            let json = serde_json::to_string(&event).expect("serialize");
1741            let back: EngineEvent = serde_json::from_str(&json).expect("deserialize");
1742            match back {
1743                EngineEvent::BgTaskUpdate {
1744                    task_id,
1745                    spawner,
1746                    status: round_tripped,
1747                } => {
1748                    assert_eq!(task_id, 1);
1749                    assert_eq!(spawner, Some(2));
1750                    assert_eq!(round_tripped, status, "json round-trip lost data: {json}");
1751                }
1752                other => panic!("round-trip changed variant: {other:?}"),
1753            }
1754        }
1755    }
1756
1757    #[test]
1758    fn drain_status_events_is_empty_on_fresh_registry() {
1759        // The inference loop calls `drain_status_events` every
1760        // iteration; the no-bg-task case must be cheap and yield
1761        // an empty Vec without any allocations forced by mistakes
1762        // in the queue type (e.g. `Some(VecDeque::new())`).
1763        let reg = BgAgentRegistry::new();
1764        let drained = reg.drain_status_events();
1765        assert!(drained.is_empty());
1766    }
1767}