koda_core/bg_agent.rs
1//! Background sub-agent registry.
2//!
3//! Tracks sub-agents spawned with `background: true` in `InvokeAgent`.
4//! The inference loop drains completed results and injects them as
5//! user-role messages so the model sees them on the next iteration.
6//!
7//! ## Lifecycle
8//!
9//! 1. **Spawn**: `InvokeAgent { background: true }` creates a tokio task
10//! 2. **Track**: the task handle + metadata are stored in `BgAgentRegistry`
11//! 3. **Poll**: before each inference call, the loop calls `drain_completed()`
12//! 4. **Inject**: completed results are appended as user messages
13//! 5. **Cleanup**: on registry drop, all pending task handles are aborted —
14//! no orphan futures, no leaked worktrees. (Phase 1 of #1022, B3.)
15//!
16//! ## Cancellation cascade
17//!
18//! Bg-agent tasks receive a `CancellationToken` derived from the parent's
19//! token via `child_token()` (wired in `crate::sub_agent_dispatch`). When
20//! the parent is cancelled, every bg child sees it; when the registry
21//! drops without cancellation, [`tokio_util::task::AbortOnDropHandle`]
22//! still aborts the futures so we never leak. Both paths are covered.
23//! (Phase 1 of #1022, B2+B3.)
24//!
25//! ## Thread safety
26//!
27//! The registry is wrapped in `Arc<Mutex<>>` and shared between the main
28//! inference loop and the background task spawner.
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33// **#1022 B16**: was `std::sync::Mutex`. Switched to `parking_lot::Mutex`
34// for three reasons:
35// 1. **No poisoning** — if a thread panics while holding the lock,
36// subsequent calls don't get a `PoisonError`. The bg registry
37// is shared between the main inference loop and N spawned tasks;
38// a panic in one critical section bricking every subsequent
39// drain would be a particularly bad failure mode.
40// 2. **Faster on contention** — no atomic check for poison flag,
41// no `Result` allocation. The contention is real: `drain_completed`
42// runs on every loop iteration.
43// 3. **Cleaner API** — `.lock()` returns a guard directly, no
44// `.unwrap()` boilerplate at every call site.
45// We deliberately keep this *sync* (not `tokio::sync::Mutex`) because
46// the critical sections are short HashMap ops with no awaits inside.
47use parking_lot::Mutex;
48use serde::{Deserialize, Serialize};
49use tokio::sync::{oneshot, watch};
50use tokio_util::sync::CancellationToken;
51use tokio_util::task::AbortOnDropHandle;
52
53use crate::engine::EngineEvent;
54
55// ── Layer 0 of #996 ──────────────────────────────────────────────────────
56//
57// Status enum + watch-channel plumbing + per-task cancel + snapshot API.
58// Pure infrastructure: no slash commands, no LLM tools, no UI changes.
59// Layers 1+ (slash commands, tools, status-bar pill) consume this surface.
60//
61// Modeled on Codex's `tokio::sync::watch::Receiver<AgentStatus>` pattern
62// (codex-rs/core/src/session/mod.rs). The bg-agent task drives the
63// `watch::Sender`; the registry stores the matching `Receiver` and exposes
64// snapshots to whoever asks (slash command, LLM tool, status-bar pill).
65
66/// Lifecycle of a single background sub-agent task.
67///
68/// The bg-agent future drives transitions through `watch::Sender<AgentStatus>`.
69/// Initial value is [`AgentStatus::Pending`]; the future flips to `Running`
70/// when execution actually starts and to one of the terminal variants
71/// (`Completed`, `Errored`, `Cancelled`) when it finishes.
72///
73/// `Running.iter` reflects the current inference iteration (1..=20).
74/// Background agents emit live updates via Layer 4 (#1058); `0` is
75/// the entry-point placeholder before the first iteration fires.
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(tag = "kind", rename_all = "snake_case")]
78pub enum AgentStatus {
79 /// Reserved but the spawned future hasn't started yet.
80 Pending,
81 /// Actively executing. `iter` is the current inference iteration
82 /// (1..=20); `0` means "started, no iter info yet" (Layer 0 default).
83 Running {
84 /// Current inference iteration (1..=20). `0` is the
85 /// entry-point placeholder emitted before the first iteration
86 /// in `run_bg_agent`; background agents update this live
87 /// (Layer 4, #1058).
88 iter: u8,
89 },
90 /// User or parent fired the cancel token. Terminal.
91 Cancelled,
92 /// Sub-agent returned a final answer. Terminal.
93 Completed {
94 /// The agent's final output. Truncation for display is the
95 /// renderer's job (see Codex's `COLLAB_AGENT_RESPONSE_PREVIEW_GRAPHEMES`).
96 summary: String,
97 },
98 /// Sub-agent returned an error. Terminal.
99 Errored {
100 /// Error message as produced by `execute_sub_agent`. Same
101 /// truncation note as `Completed.summary`.
102 error: String,
103 },
104}
105
106/// Snapshot of a pending bg-agent task — what `/agents` and the
107/// `ListBackgroundTasks` LLM tool will render.
108///
109/// Cloned out of the registry under the lock so callers can format/display
110/// without holding it. `age` is computed from `started_at` at snapshot time.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct BgTaskSnapshot {
113 /// Monotonic id assigned at `reserve()` time. Stable for the
114 /// lifetime of the task; reused across snapshots.
115 pub task_id: u32,
116 /// Configured agent name (`explore`, `verify`, ...).
117 pub agent_name: String,
118 /// The prompt the parent delegated. Surfaced verbatim by
119 /// `/agents -v`; truncation is the renderer's job.
120 pub prompt: String,
121 /// Wall-clock duration since the task was attached. Computed at
122 /// snapshot time, so successive snapshots of the same task
123 /// report different ages.
124 pub age: Duration,
125 /// Latest value from the task's `watch::Receiver<AgentStatus>`.
126 pub status: AgentStatus,
127 /// Sub-agent task that spawned this bg-agent. `None` = top-level
128 /// (the user's main conversation).
129 ///
130 /// **#996 Layer 2 / Model D**: tracked so that when a sub-agent
131 /// exits, [`BgAgentRegistry::cancel_for_spawner`] can fire the
132 /// cancel token on every bg-agent it left behind. Mirrors
133 /// Claude Code's `agentId` field on `LocalShellTaskState`
134 /// (`prevents 10-day fake-logs.sh zombies`).
135 ///
136 /// We don't use this for permission scoping — any caller with a
137 /// `task_id` can manage any task. Spawner is *only* a cleanup
138 /// hook (matching Claude Code's flat-permissions design).
139 pub spawner: Option<u32>,
140}
141
142/// Payload sent over the bg-agent oneshot.
143///
144/// Pre-#1022-B9 this was just `String` (the model's final output).
145/// Now also carries the trace lines collected by
146/// [`crate::engine::sink::BufferingSink`] so the inference loop
147/// can surface them to the user when injecting the result.
148///
149/// The `Result<BgPayload, BgPayload>` shape preserves the prior
150/// success/failure discrimination: `Ok` means `execute_sub_agent`
151/// returned text, `Err` means it returned an error (the trace is
152/// useful in *both* cases — the bg agent may have done several
153/// steps before erroring).
154pub type BgPayload = (String, Vec<String>);
155
156/// A completed background agent result.
157#[derive(Debug)]
158pub struct BgAgentResult {
159 /// The agent name that produced this result.
160 pub agent_name: String,
161 /// The original prompt that was delegated.
162 pub prompt: String,
163 /// The agent's output (or error message).
164 pub output: String,
165 /// Whether the agent succeeded.
166 pub success: bool,
167 /// **#1022 B9**: narrative trace lines captured by
168 /// [`crate::engine::sink::BufferingSink`] inside the bg agent.
169 /// Pre-fix this was implicitly always empty (bg agents ran with
170 /// `NullSink`). Now populated with one line per significant
171 /// event (tool start, info, auto-rejected approval) so the user
172 /// can see what the bg agent did at result-injection time.
173 /// Empty for the cancelled / panicked case (`output` carries the
174 /// failure detail in those paths).
175 pub events: Vec<String>,
176}
177
178/// Handle returned when a background agent is spawned.
179///
180/// Holds the task's [`tokio_util::task::AbortOnDropHandle`] so the
181/// future is aborted if the registry is dropped before the task
182/// completes (B3 of #1022). Also holds the per-task
183/// [`CancellationToken`] so future per-task cancel commands
184/// (`/cancel <id>` — see #996) have a hook to fire.
185struct BgAgentEntry {
186 agent_name: String,
187 prompt: String,
188 rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
189 /// Per-task cancel — derived as a `child_token()` of the parent
190 /// session's token at spawn time. Firing this token (via
191 /// [`BgAgentRegistry::cancel`] for #996, or via the registry-drop
192 /// path) causes the in-flight bg agent to observe `is_cancelled()`
193 /// on its next loop iteration.
194 cancel: CancellationToken,
195 /// Live status channel — the spawned future writes; the registry
196 /// reads at snapshot time. See [`AgentStatus`] for the lifecycle.
197 status_rx: watch::Receiver<AgentStatus>,
198 /// When the task was attached. Used to compute `age` in snapshots.
199 started_at: Instant,
200 /// Sub-agent task that spawned this bg-agent. `None` = top-level.
201 /// **#996 Layer 2 / Model D** — see [`BgTaskSnapshot::spawner`].
202 spawner: Option<u32>,
203 /// Aborts the spawned task on drop. The bg path uses
204 /// `tokio::spawn` on the multi-thread runtime (#1022 B5):
205 /// `execute_sub_agent` returns an explicitly `Send`-bounded
206 /// future, so abort works promptly at any await point. The
207 /// cancel-token cascade is still the primary stop signal
208 /// (so the bg task can run any cleanup it owns).
209 _handle: AbortOnDropHandle<()>,
210}
211
212/// Registry of running background sub-agents.
213///
214/// Shared via `Arc` between the inference loop (which drains results)
215/// and the tool dispatch (which spawns agents).
216pub struct BgAgentRegistry {
217 pending: Mutex<HashMap<u32, BgAgentEntry>>,
218 next_id: Mutex<u32>,
219 /// Queue of `EngineEvent::BgTaskUpdate` events produced by
220 /// [`BgStatusEmitter::send`]. Drained by the inference loop
221 /// alongside [`Self::drain_completed`] and forwarded to the
222 /// active `EngineSink`.
223 ///
224 /// **#1076**: closes the engine/UI boundary leak — prior to this
225 /// queue, bg-task status only reached the TUI by the TUI grabbing
226 /// `Arc<BgAgentRegistry>` directly out of `KodaSession` and
227 /// polling `snapshot()`. ACP / headless clients saw nothing.
228 /// Routing through `EngineEvent` puts every client surface on
229 /// the same channel.
230 ///
231 /// `VecDeque` not `Vec` because we drain FIFO (transition order
232 /// matters: `Pending` → `Running` → terminal must arrive in that
233 /// order even if the inference loop drains in batches).
234 events: Mutex<std::collections::VecDeque<EngineEvent>>,
235}
236
237/// Fan-out helper for bg-agent status transitions.
238///
239/// Bundles the per-task `watch::Sender<AgentStatus>` (read by
240/// `/agents` and the status-bar pill via the registry's snapshot
241/// API) with a back-reference to the registry's event queue (drained
242/// by the inference loop and forwarded to the engine sink — which is
243/// what closes the #1076 boundary leak).
244///
245/// `Clone` is intentional so Layer 4 (`#1058`, live `iter` heartbeat)
246/// can hold its own copy inside `execute_sub_agent` while
247/// `run_bg_agent` keeps another for the entry / terminal transitions.
248/// Both clones share the same `watch::Sender` and `Arc<registry>`,
249/// so every `.send()` reaches both fan-out targets.
250#[derive(Clone)]
251pub struct BgStatusEmitter {
252 task_id: u32,
253 spawner: Option<u32>,
254 status_tx: watch::Sender<AgentStatus>,
255 registry: Arc<BgAgentRegistry>,
256}
257
258impl BgStatusEmitter {
259 /// Construct from the parts handed back by [`BgAgentRegistry::reserve`].
260 ///
261 /// The registry `Arc` is held for the lifetime of the bg agent,
262 /// which is fine: the inference loop already keeps an `Arc` on
263 /// the same registry, and registry drop is what aborts every
264 /// in-flight bg task (B3 of #1022) — so an emitter outliving its
265 /// registry is impossible by construction.
266 pub fn new(
267 task_id: u32,
268 spawner: Option<u32>,
269 status_tx: watch::Sender<AgentStatus>,
270 registry: Arc<BgAgentRegistry>,
271 ) -> Self {
272 Self {
273 task_id,
274 spawner,
275 status_tx,
276 registry,
277 }
278 }
279
280 /// Drive a status transition.
281 ///
282 /// Fans out to:
283 /// 1. The per-task `watch::Sender` (so `snapshot()` / `/agents`
284 /// see the new state on the next read — no behavior change).
285 /// 2. The registry's event queue, drained by the inference loop
286 /// and forwarded to the active `EngineSink` (so the TUI / ACP
287 /// / headless clients all see the same `BgTaskUpdate` event).
288 ///
289 /// `watch::Sender::send` only fails if every receiver was dropped,
290 /// which means the registry entry is gone — in that case the queue
291 /// push is harmless (it'll be drained and ignored by clients that
292 /// don't recognize the task id). We deliberately don't gate the
293 /// queue push on the watch send result so a racing reap doesn't
294 /// swallow the terminal `BgTaskUpdate`.
295 pub fn send(&self, status: AgentStatus) {
296 let _ = self.status_tx.send(status.clone());
297 self.registry.push_status_event(EngineEvent::BgTaskUpdate {
298 task_id: self.task_id,
299 spawner: self.spawner,
300 status,
301 });
302 }
303
304 /// Current status (read from the watch channel). Useful for
305 /// terminal-disambiguation logic (e.g. "was this a cancel or a
306 /// real error?") without taking the registry lock.
307 pub fn current(&self) -> AgentStatus {
308 self.status_tx.borrow().clone()
309 }
310
311 /// Test helper: peek the underlying watch sender. Production
312 /// code should always go through [`Self::send`].
313 #[cfg(test)]
314 pub fn status_sender(&self) -> watch::Sender<AgentStatus> {
315 self.status_tx.clone()
316 }
317}
318
319/// Reservation slot returned by [`BgAgentRegistry::reserve`].
320///
321/// The two-phase pattern (`reserve` → spawn → `attach`) lets the
322/// dispatcher hand the oneshot sender into the spawned future
323/// *before* the future exists, so the spawned closure can `move` it
324/// without referencing the registry. The `cancel` token is a
325/// `child_token()` of the parent's cancel — fires either when the
326/// parent fires (cascade) or when this slot is individually cancelled
327/// (future per-task `/cancel <id>` UX, #996).
328pub struct BgAgentReservation {
329 /// Monotonically-assigned task ID. Surfaces in user-facing
330 /// messages (`Background agent 'foo' started (task 7)`) and
331 /// keys the per-task `/cancel <id>` UX (#996).
332 pub task_id: u32,
333 /// Sender half of the result oneshot. Move into the spawned
334 /// future so it can deliver `Ok(output)` / `Err(message)`.
335 pub tx: oneshot::Sender<Result<BgPayload, BgPayload>>,
336 /// Receiver half. Move back into the registry via [`BgAgentRegistry::attach`]
337 /// so `drain_completed()` can poll it.
338 pub rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
339 /// Per-task cancel token. Cloned for the spawned future
340 /// (`bg_cancel`) and re-stored on the registry entry
341 /// (`entry_cancel`); both halves observe parent cancellation
342 /// because this is a `child_token()` of the parent.
343 pub cancel: CancellationToken,
344 /// Status sender — move into the spawned future. The future is
345 /// the sole writer; it transitions through
346 /// [`AgentStatus::Pending`] → `Running` → terminal.
347 pub status_tx: watch::Sender<AgentStatus>,
348 /// Status receiver — hand back to the registry via [`BgAgentRegistry::attach`]
349 /// so `snapshot()` and `/agents` can read the current state without
350 /// touching the spawn site.
351 pub status_rx: watch::Receiver<AgentStatus>,
352 /// Sub-agent task id of the spawner, or `None` for the top-level
353 /// loop. Carried verbatim to [`BgAgentRegistry::attach`] so the
354 /// entry knows who spawned it (Model D cleanup-on-exit).
355 pub spawner: Option<u32>,
356}
357
358impl BgAgentRegistry {
359 /// Create an empty registry.
360 pub fn new() -> Self {
361 Self {
362 pending: Mutex::new(HashMap::new()),
363 next_id: Mutex::new(1),
364 events: Mutex::new(std::collections::VecDeque::new()),
365 }
366 }
367
368 /// Push an event onto the status queue. Called by
369 /// [`BgStatusEmitter::send`]; not part of the public API.
370 pub(crate) fn push_status_event(&self, event: EngineEvent) {
371 self.events.lock().push_back(event);
372 }
373
374 /// Drain queued status events for forwarding to the active
375 /// `EngineSink`. Called by the inference loop alongside
376 /// [`Self::drain_completed`].
377 ///
378 /// Returns events in FIFO order (transition order); empty if
379 /// nothing changed since the last drain. Cheap: a single mutex
380 /// acquisition + `VecDeque::drain`. The vast majority of turns
381 /// will see 0–1 events.
382 pub fn drain_status_events(&self) -> Vec<EngineEvent> {
383 let mut q = self.events.lock();
384 q.drain(..).collect()
385 }
386
387 /// Reserve a task ID and produce a oneshot sender + child cancel
388 /// token for the spawn site to consume. Call [`Self::attach`] with
389 /// the resulting `JoinHandle` to complete registration.
390 ///
391 /// `spawner` records who is reserving this slot. `None` means
392 /// the top-level inference loop; `Some(invocation_id)` means a
393 /// sub-agent invocation. Used by [`Self::cancel_for_spawner`] to
394 /// reap children when a sub-agent exits (Model E cleanup-on-exit)
395 /// and by [`Self::snapshot_for_caller`] to scope LLM-tool views.
396 ///
397 /// The two-phase shape (`reserve` → spawn → `attach`) exists
398 /// because `tokio::spawn` produces the `JoinHandle` *after* the
399 /// future is built, but the future needs to own the `tx` to deliver
400 /// its result. Reservation gives us `tx` early; attach binds the
401 /// handle once it exists.
402 pub fn reserve(
403 &self,
404 parent_cancel: &CancellationToken,
405 spawner: Option<u32>,
406 ) -> BgAgentReservation {
407 let (tx, rx) = oneshot::channel();
408 let (status_tx, status_rx) = watch::channel(AgentStatus::Pending);
409 let mut id = self.next_id.lock();
410 let task_id = *id;
411 *id += 1;
412 BgAgentReservation {
413 task_id,
414 tx,
415 rx,
416 cancel: parent_cancel.child_token(),
417 status_tx,
418 status_rx,
419 spawner,
420 }
421 }
422
423 /// Bind a spawned task's metadata to a previously [`reserve`]d slot.
424 ///
425 /// `rx` must be the receiver paired with the `tx` handed out by
426 /// `reserve`. Holding `handle` as `AbortOnDropHandle` ensures the
427 /// task is aborted on registry drop (B3 of #1022). `status_rx`
428 /// is the read half of the watch channel whose write half
429 /// (`status_tx`) was moved into the spawned future.
430 ///
431 /// [`reserve`]: Self::reserve
432 //
433 // 9 args trips `clippy::too_many_arguments` (limit 7). Each one
434 // is load-bearing: id + name + prompt are display metadata;
435 // rx/cancel/status_rx are the three channels we own; spawner is
436 // the cleanup-routing key (Model E); handle is the
437 // AbortOnDropHandle. Bundling into a struct just to satisfy
438 // a heuristic would add a one-use type for no readability win
439 // — "practicality beats purity".
440 #[allow(clippy::too_many_arguments)]
441 pub fn attach(
442 &self,
443 reservation_id: u32,
444 agent_name: &str,
445 prompt: &str,
446 rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
447 cancel: CancellationToken,
448 status_rx: watch::Receiver<AgentStatus>,
449 spawner: Option<u32>,
450 handle: tokio::task::JoinHandle<()>,
451 ) {
452 self.pending.lock().insert(
453 reservation_id,
454 BgAgentEntry {
455 agent_name: agent_name.to_string(),
456 prompt: prompt.to_string(),
457 rx,
458 cancel,
459 status_rx,
460 started_at: Instant::now(),
461 spawner,
462 _handle: AbortOnDropHandle::new(handle),
463 },
464 );
465 }
466
467 /// Convenience for tests: register a synthetic entry without a
468 /// real spawned task. The provided `tx` can be used to fire the
469 /// result manually. The handle is a noop spawned task that
470 /// returns immediately, so `_handle` has something to abort.
471 #[cfg(test)]
472 pub fn register_test(
473 &self,
474 agent_name: &str,
475 prompt: &str,
476 ) -> (u32, oneshot::Sender<Result<BgPayload, BgPayload>>) {
477 let (id, tx, _status_tx, _cancel) =
478 self.register_test_with_status(agent_name, prompt, None);
479 (id, tx)
480 }
481
482 /// Test-only sibling of [`register_test`] that returns the status
483 /// sender so a test can manually drive transitions without
484 /// needing a real spawned `run_bg_agent`. The cancel token also
485 /// comes back so cancel-cascade tests can verify the channel.
486 ///
487 /// `spawner` is recorded on the entry so scope-filtering /
488 /// kill-on-exit tests can exercise both the top-level (`None`)
489 /// and sub-agent (`Some(id)`) paths.
490 #[cfg(test)]
491 pub fn register_test_with_status(
492 &self,
493 agent_name: &str,
494 prompt: &str,
495 spawner: Option<u32>,
496 ) -> (
497 u32,
498 oneshot::Sender<Result<BgPayload, BgPayload>>,
499 watch::Sender<AgentStatus>,
500 CancellationToken,
501 ) {
502 let (tx, rx) = oneshot::channel();
503 let (status_tx, status_rx) = watch::channel(AgentStatus::Pending);
504 let mut id = self.next_id.lock();
505 let task_id = *id;
506 *id += 1;
507 drop(id);
508 let cancel = CancellationToken::new();
509 let cancel_observer = cancel.clone();
510 let noop = tokio::spawn(async {});
511 self.pending.lock().insert(
512 task_id,
513 BgAgentEntry {
514 agent_name: agent_name.to_string(),
515 prompt: prompt.to_string(),
516 rx,
517 cancel,
518 status_rx,
519 started_at: Instant::now(),
520 spawner,
521 _handle: AbortOnDropHandle::new(noop),
522 },
523 );
524 (task_id, tx, status_tx, cancel_observer)
525 }
526
527 /// Drain all completed background agents. Non-blocking — only takes
528 /// entries whose oneshot has already resolved.
529 pub fn drain_completed(&self) -> Vec<BgAgentResult> {
530 let mut guard = self.pending.lock();
531 let mut completed = Vec::new();
532 let mut done_ids = Vec::new();
533
534 for (id, entry) in guard.iter_mut() {
535 match entry.rx.try_recv() {
536 Ok(Ok((output, events))) => {
537 done_ids.push(*id);
538 completed.push(BgAgentResult {
539 agent_name: entry.agent_name.clone(),
540 prompt: entry.prompt.clone(),
541 output,
542 success: true,
543 events,
544 });
545 }
546 Ok(Err((err, events))) => {
547 done_ids.push(*id);
548 completed.push(BgAgentResult {
549 agent_name: entry.agent_name.clone(),
550 prompt: entry.prompt.clone(),
551 output: err,
552 success: false,
553 events,
554 });
555 }
556 Err(oneshot::error::TryRecvError::Empty) => {
557 // Still running
558 }
559 Err(oneshot::error::TryRecvError::Closed) => {
560 // Sender dropped without sending — task panicked or was cancelled.
561 // No events available (the buffering sink died with the task).
562 done_ids.push(*id);
563 completed.push(BgAgentResult {
564 agent_name: entry.agent_name.clone(),
565 prompt: entry.prompt.clone(),
566 output: "[background agent task was cancelled]".to_string(),
567 success: false,
568 events: Vec::new(),
569 });
570 }
571 }
572 }
573
574 for id in done_ids {
575 guard.remove(&id);
576 }
577
578 completed
579 }
580
581 /// How many background agents are still running.
582 pub fn pending_count(&self) -> usize {
583 self.pending.lock().len()
584 }
585
586 // ── Layer 0 of #996: per-task cancel + snapshot ───────────────────────────────
587
588 /// Fire the cancel token for a single task.
589 ///
590 /// Returns `true` if a pending task with that id existed and was
591 /// signalled, `false` if the id is unknown (already drained,
592 /// completed, or never registered). Idempotent: calling twice on
593 /// the same id is safe — [`CancellationToken::cancel`] is itself
594 /// idempotent.
595 ///
596 /// The entry stays in `pending` until the spawned future actually
597 /// observes the token and finishes. `drain_completed()` then
598 /// reaps it via the closed-sender path (or the future's terminal
599 /// `tx.send` if it noticed and shut down cleanly).
600 pub fn cancel(&self, task_id: u32) -> bool {
601 let guard = self.pending.lock();
602 match guard.get(&task_id) {
603 Some(entry) => {
604 entry.cancel.cancel();
605 true
606 }
607 None => false,
608 }
609 }
610
611 /// Snapshot every pending task's metadata for `/agents` and the
612 /// `ListBackgroundTasks` LLM tool.
613 ///
614 /// `age` is computed against `Instant::now()` at call time, so two
615 /// snapshots of the same task report different ages. Status is read
616 /// from each entry's `watch::Receiver` (no blocking, no waiting).
617 /// Sorted by ascending `task_id` so the output is stable across calls.
618 ///
619 /// **Unscoped**: returns every task regardless of spawner. Used by
620 /// the TUI `/agents` command (humans want the global view) and as
621 /// the engine of [`Self::snapshot_for_caller`] (which filters).
622 pub fn snapshot(&self) -> Vec<BgTaskSnapshot> {
623 let guard = self.pending.lock();
624 let now = Instant::now();
625 let mut out: Vec<_> = guard
626 .iter()
627 .map(|(id, entry)| BgTaskSnapshot {
628 task_id: *id,
629 agent_name: entry.agent_name.clone(),
630 prompt: entry.prompt.clone(),
631 age: now.saturating_duration_since(entry.started_at),
632 status: entry.status_rx.borrow().clone(),
633 spawner: entry.spawner,
634 })
635 .collect();
636 out.sort_by_key(|s| s.task_id);
637 out
638 }
639
640 /// Scoped snapshot for the `ListBackgroundTasks` LLM tool.
641 ///
642 /// **Model E scoping**: a caller only sees tasks whose `spawner`
643 /// matches its own `caller_spawner`. Top-level callers pass `None`
644 /// and see only top-level-spawned tasks; sub-agent callers pass
645 /// `Some(invocation_id)` and see only their own.
646 ///
647 /// Strict equality — a sub-agent does NOT see sibling sub-agents'
648 /// tasks, and the top-level does NOT see sub-agents' tasks via the
649 /// LLM (the TUI's `/agents` command remains the global view).
650 pub fn snapshot_for_caller(&self, caller_spawner: Option<u32>) -> Vec<BgTaskSnapshot> {
651 self.snapshot()
652 .into_iter()
653 .filter(|s| s.spawner == caller_spawner)
654 .collect()
655 }
656
657 /// Clone the status receiver for a task so callers can await
658 /// [`watch::Receiver::changed`] without holding the lock.
659 ///
660 /// Returns `None` if the task has already been drained from the registry.
661 /// Primarily used by tests to observe live iteration-counter updates
662 /// without polling `snapshot()` in a tight loop.
663 pub fn subscribe(&self, task_id: u32) -> Option<watch::Receiver<AgentStatus>> {
664 let guard = self.pending.lock();
665 guard.get(&task_id).map(|e| e.status_rx.clone())
666 }
667
668 // ── Layer 2 of #996 ───────────────────────────────────────────────
669 //
670 // Scoped cancel + cleanup-on-exit + WaitTask machinery.
671 // The unscoped [`Self::cancel`] above stays for the TUI (humans get
672 // the global view); LLM tools route through [`Self::cancel_as_caller`]
673 // so a sub-agent can't reach across into a sibling's task.
674
675 /// Scoped per-task cancel for the `CancelTask` LLM tool.
676 ///
677 /// **Model E permission rule** — `caller_spawner` must equal the
678 /// task's `spawner`, with `None == None` (top-level can cancel
679 /// top-level tasks; sub-agent invocation N can cancel only its
680 /// own tasks). Returns [`CancelOutcome::Forbidden`] otherwise.
681 ///
682 /// The unscoped [`Self::cancel`] is the TUI's contract — humans
683 /// at the keyboard implicitly have full authority. This method is
684 /// the LLM's contract.
685 pub fn cancel_as_caller(&self, task_id: u32, caller_spawner: Option<u32>) -> CancelOutcome {
686 let guard = self.pending.lock();
687 match guard.get(&task_id) {
688 None => CancelOutcome::NotFound,
689 Some(entry) if entry.spawner != caller_spawner => CancelOutcome::Forbidden,
690 Some(entry) => {
691 entry.cancel.cancel();
692 CancelOutcome::Cancelled
693 }
694 }
695 }
696
697 /// Fire the cancel token on every task whose `spawner` matches.
698 ///
699 /// Called from the sub-agent dispatch path when an invocation
700 /// exits (Model E cleanup-on-exit). Returns the number of tasks
701 /// signalled, purely for tracing — the actual reaping happens
702 /// later via [`Self::drain_completed`] once the futures observe
703 /// their cancel tokens and finish.
704 ///
705 /// Idempotent: re-calling on the same `spawner` after all tasks
706 /// have been reaped returns `0`.
707 pub fn cancel_for_spawner(&self, spawner: u32) -> usize {
708 let guard = self.pending.lock();
709 let mut count = 0;
710 for entry in guard.values() {
711 if entry.spawner == Some(spawner) {
712 entry.cancel.cancel();
713 count += 1;
714 }
715 }
716 count
717 }
718}
719
720/// Outcome of [`BgAgentRegistry::cancel_as_caller`].
721///
722/// Mirrors HTTP-ish status codes so the LLM-tool layer can produce
723/// useful error messages without inspecting registry internals.
724#[derive(Debug, Clone, Copy, PartialEq, Eq)]
725pub enum CancelOutcome {
726 /// Task existed, caller owned it, cancel token fired.
727 Cancelled,
728 /// No task with that id (already drained, never registered, or
729 /// completed and reaped).
730 NotFound,
731 /// Task exists but caller's `spawner` doesn't match. The LLM
732 /// surface translates this into a permission-style error.
733 Forbidden,
734}
735
736/// Outcome of [`BgAgentRegistry::wait_for_completion`].
737///
738/// Encodes the four resolutions of a `WaitTask` call. The LLM-tool
739/// layer translates each into a serialised payload the model receives.
740#[derive(Debug)]
741pub enum WaitOutcome {
742 /// Task reached a terminal `Completed`/`Errored` status before
743 /// the timeout fired. Carries the drained [`BgAgentResult`] so
744 /// the same payload `drain_completed()` would have produced is
745 /// surfaced directly to the caller. (Drain semantics: a task
746 /// consumed via `wait_for_completion` is removed from `pending`
747 /// so the next `drain_completed()` won't double-inject it.)
748 Completed(BgAgentResult),
749 /// Task was cancelled (parent token fired, peer cancelled, or
750 /// the spawned future panicked). The task has been reaped.
751 Cancelled,
752 /// Timeout fired before the task reached a terminal state. The
753 /// task is still in `pending` and may still complete on its own.
754 /// Carries the most-recent status snapshot so the model can
755 /// decide whether to wait again or move on.
756 TimedOut(BgTaskSnapshot),
757 /// No task with that id, or caller's spawner doesn't match.
758 /// Same `Forbidden`/`NotFound` distinction as [`CancelOutcome`].
759 NotFound,
760 /// Caller doesn't own this task.
761 Forbidden,
762}
763
764impl BgAgentRegistry {
765 /// Block until a single task reaches a terminal state, or until
766 /// `timeout` elapses. The tool layer is the sole caller; humans
767 /// use `/cancel` (synchronous) and the auto-drain path.
768 ///
769 /// **Drain semantics**: on `Completed`/`Cancelled`, the task is
770 /// removed from `pending` here so `drain_completed()` won't
771 /// surface it again on the next inference iteration. (This is
772 /// the resolution to the result-routing race we settled in
773 /// design Decision 3 — `WaitTask` consumes; auto-drain becomes a
774 /// no-op for that id.)
775 ///
776 /// **Scoping**: same Model E rule as [`Self::cancel_as_caller`].
777 /// `caller_spawner` must equal the task's `spawner` exactly.
778 ///
779 /// **Timeout**: bounded by the caller. The tool layer caps this
780 /// at 300 s before reaching here; we trust the bound but will
781 /// happily wait whatever value is passed (handy for tests with
782 /// `Duration::from_millis(50)`).
783 pub async fn wait_for_completion(
784 &self,
785 task_id: u32,
786 caller_spawner: Option<u32>,
787 timeout: Duration,
788 ) -> WaitOutcome {
789 // Phase 1 — ownership check + grab a status receiver to await on.
790 // We do NOT remove the entry yet: if we time out, it must remain
791 // visible to the next `drain_completed()` and to other callers.
792 let status_rx = {
793 let guard = self.pending.lock();
794 match guard.get(&task_id) {
795 None => return WaitOutcome::NotFound,
796 Some(entry) if entry.spawner != caller_spawner => {
797 return WaitOutcome::Forbidden;
798 }
799 Some(entry) => entry.status_rx.clone(),
800 }
801 };
802
803 // Phase 2 — wait for terminal status or timeout.
804 // We watch the status channel rather than the oneshot so we
805 // can distinguish Cancelled (terminal, no payload) from
806 // Completed (terminal, payload pending on the oneshot). The
807 // spawned future writes status BEFORE sending the oneshot,
808 // so by the time we observe a terminal status the oneshot is
809 // either ready or about to be (sub-microsecond skew).
810 let wait_fut = wait_for_terminal_status(status_rx);
811 let result = tokio::time::timeout(timeout, wait_fut).await;
812
813 match result {
814 Err(_elapsed) => {
815 // Timeout: the task is still pending. Re-snapshot so
816 // the caller sees the latest status (it may have
817 // transitioned `Pending` → `Running` while we waited).
818 let snap = self.snapshot().into_iter().find(|s| s.task_id == task_id);
819 match snap {
820 Some(s) => WaitOutcome::TimedOut(s),
821 // Task vanished mid-wait — drain reaped it. The
822 // result already injected into the conversation;
823 // tell the caller it's gone.
824 None => WaitOutcome::NotFound,
825 }
826 }
827 Ok(()) => {
828 // Terminal status observed. Pull the entry out so the
829 // auto-drain path won't see it again, then await the
830 // oneshot for the payload.
831 //
832 // PR #1043 review fix: previously this used
833 // `try_recv` + a back-to-back retry. The retry was a
834 // placebo — both calls run in the same scheduler tick
835 // with no `await` between them, so when the bg future
836 // sets `status_tx` *before* `tx.send` (the standard
837 // ordering in `sub_agent_dispatch::run_bg_agent`),
838 // a multi-thread runtime can wake the waiter on a
839 // *different* worker, observe `Empty` twice, and
840 // falsely report `Cancelled` for a successful task.
841 //
842 // `oneshot::Receiver` IS a `Future` — just `.await`
843 // it. The bound is set by `wait_for_terminal_status`
844 // having already observed the terminal status, so the
845 // sender is either landing or already dropped (the
846 // future writes status before sending the result, and
847 // panics drop both). A short inner timeout caps the
848 // "in-flight" window; the outer timeout is already
849 // exhausted at this point.
850 let entry = {
851 let mut guard = self.pending.lock();
852 let Some(entry) = guard.remove(&task_id) else {
853 // Drain raced us and reaped first. Rare but
854 // possible. The model already saw the result
855 // in the prior turn's auto-drain.
856 return WaitOutcome::NotFound;
857 };
858 entry
859 // `guard` drops here — explicit scope guarantees
860 // we don't hold the (non-Send) `parking_lot`
861 // guard across the upcoming `.await`.
862 };
863 let agent_name = entry.agent_name;
864 let prompt = entry.prompt;
865 match tokio::time::timeout(Duration::from_millis(50), entry.rx).await {
866 Ok(Ok(Ok((output, events)))) => WaitOutcome::Completed(BgAgentResult {
867 agent_name,
868 prompt,
869 output,
870 success: true,
871 events,
872 }),
873 Ok(Ok(Err((err, events)))) => WaitOutcome::Completed(BgAgentResult {
874 agent_name,
875 prompt,
876 output: err,
877 success: false,
878 events,
879 }),
880 // Sender dropped (panic) or 50ms elapsed without
881 // a value landing — surface as Cancelled. Both
882 // are degenerate cases: status said terminal,
883 // payload never arrived.
884 Ok(Err(_)) | Err(_) => WaitOutcome::Cancelled,
885 }
886 }
887 }
888 }
889}
890
891/// Wait for a `watch::Receiver<AgentStatus>` to report a terminal
892/// variant (`Completed`, `Errored`, `Cancelled`).
893///
894/// Returns when the current value is already terminal OR after a
895/// `changed()` event lands a terminal value. Yields control on
896/// every iteration so the timeout future in
897/// [`BgAgentRegistry::wait_for_completion`] gets a chance to fire.
898async fn wait_for_terminal_status(mut rx: watch::Receiver<AgentStatus>) {
899 loop {
900 let is_terminal = matches!(
901 *rx.borrow(),
902 AgentStatus::Completed { .. } | AgentStatus::Errored { .. } | AgentStatus::Cancelled
903 );
904 if is_terminal {
905 return;
906 }
907 // `changed()` resolves on the next write to the channel. If
908 // the sender was dropped (task panicked) it returns Err —
909 // treat as terminal so the caller can pull `Cancelled` from
910 // the closed oneshot.
911 if rx.changed().await.is_err() {
912 return;
913 }
914 }
915}
916
917impl Default for BgAgentRegistry {
918 fn default() -> Self {
919 Self::new()
920 }
921}
922
923impl Drop for BgAgentRegistry {
924 /// Abort every still-pending bg task on registry drop.
925 ///
926 /// `AbortOnDropHandle::drop` does the work — this impl exists
927 /// only to make the lifecycle explicit and to give a single
928 /// place to add telemetry later.
929 fn drop(&mut self) {
930 // **#1022 B16**: simplified post-parking_lot. The pre-fix
931 // version had to handle `PoisonError` (via
932 // `match get_mut() { Ok | Err(into_inner()) }`) because a
933 // panic-while-held would poison `std::sync::Mutex`.
934 // `parking_lot::Mutex` doesn't poison, so the cleanup path
935 // is now the obvious one: take the map, log if non-empty,
936 // let `AbortOnDropHandle::drop` do the actual abort work.
937 let map = std::mem::take(&mut *self.pending.lock());
938 if !map.is_empty() {
939 tracing::debug!(
940 count = map.len(),
941 "BgAgentRegistry dropped with pending tasks; aborting"
942 );
943 }
944 // Map drops here → each entry's `AbortOnDropHandle` aborts
945 // its task. No orphans. No leaked worktrees.
946 }
947}
948
949/// Wrap in Arc for sharing between inference loop and tool dispatch.
950pub fn new_shared() -> Arc<BgAgentRegistry> {
951 Arc::new(BgAgentRegistry::new())
952}
953
954#[cfg(test)]
955mod tests {
956 use super::*;
957 use std::sync::atomic::{AtomicBool, Ordering};
958 use std::time::Duration;
959
960 #[tokio::test]
961 async fn register_and_complete() {
962 let reg = BgAgentRegistry::new();
963 let (task_id, tx) = reg.register_test("explore", "find all tests");
964 assert_eq!(task_id, 1);
965 assert_eq!(reg.pending_count(), 1);
966
967 // Not yet complete
968 assert!(reg.drain_completed().is_empty());
969
970 // Complete it
971 tx.send(Ok(("found 42 tests".to_string(), Vec::new())))
972 .unwrap();
973 let results = reg.drain_completed();
974 assert_eq!(results.len(), 1);
975 assert_eq!(results[0].agent_name, "explore");
976 assert_eq!(results[0].output, "found 42 tests");
977 assert!(results[0].success);
978 assert_eq!(reg.pending_count(), 0);
979 }
980
981 #[tokio::test]
982 async fn drain_only_completed() {
983 let reg = BgAgentRegistry::new();
984 let (_id1, tx1) = reg.register_test("task", "build");
985 let (_id2, _tx2) = reg.register_test("explore", "search");
986
987 tx1.send(Ok(("done".to_string(), Vec::new()))).unwrap();
988
989 let results = reg.drain_completed();
990 assert_eq!(results.len(), 1);
991 assert_eq!(results[0].agent_name, "task");
992 assert_eq!(reg.pending_count(), 1); // explore still pending
993 }
994
995 #[tokio::test]
996 async fn dropped_sender_reports_cancelled() {
997 let reg = BgAgentRegistry::new();
998 let (_id, tx) = reg.register_test("task", "build");
999 drop(tx); // simulate task panic/cancel
1000
1001 let results = reg.drain_completed();
1002 assert_eq!(results.len(), 1);
1003 assert!(!results[0].success);
1004 assert!(results[0].output.contains("cancelled"));
1005 }
1006
1007 #[tokio::test]
1008 async fn error_result() {
1009 let reg = BgAgentRegistry::new();
1010 let (_id, tx) = reg.register_test("verify", "check");
1011 tx.send(Err(("test failures".to_string(), Vec::new())))
1012 .unwrap();
1013
1014 let results = reg.drain_completed();
1015 assert_eq!(results.len(), 1);
1016 assert!(!results[0].success);
1017 assert_eq!(results[0].output, "test failures");
1018 }
1019
1020 /// #1022 B9 regression: the narrative trace captured by
1021 /// `BufferingSink` inside the bg agent must propagate through
1022 /// the oneshot → registry → `BgAgentResult.events`. Pre-fix
1023 /// this field didn't exist; bg agents ran with `NullSink` and
1024 /// the user only saw spawn + completion lines. The fix is
1025 /// useless if the trace gets dropped at any of the three hops,
1026 /// so this test pins the round-trip end-to-end.
1027 #[tokio::test]
1028 async fn events_propagate_through_drain_for_success() {
1029 let reg = BgAgentRegistry::new();
1030 let (_id, tx) = reg.register_test("explore", "map repo");
1031 let trace = vec![
1032 " \u{1f527} Read".to_string(),
1033 " \u{1f527} Grep".to_string(),
1034 " \u{26a1} cache hit".to_string(),
1035 ];
1036 tx.send(Ok(("map result".to_string(), trace.clone())))
1037 .unwrap();
1038
1039 let results = reg.drain_completed();
1040 assert_eq!(results.len(), 1);
1041 assert!(results[0].success);
1042 assert_eq!(
1043 results[0].events, trace,
1044 "trace lost between sender and BgAgentResult"
1045 );
1046 }
1047
1048 /// #1022 B9 regression: trace must propagate even when the bg
1049 /// agent failed. The trace is *most* useful in the failure case
1050 /// — "the agent tried Read, Bash, Edit, then errored" is the
1051 /// kind of breadcrumb that turns a black-box failure into a
1052 /// debuggable one.
1053 #[tokio::test]
1054 async fn events_propagate_through_drain_for_failure() {
1055 let reg = BgAgentRegistry::new();
1056 let (_id, tx) = reg.register_test("build", "compile");
1057 let trace = vec![
1058 " \u{1f527} Bash".to_string(),
1059 " \u{2398} approval auto-rejected for Delete (no user channel)".to_string(),
1060 ];
1061 tx.send(Err(("compile failed".to_string(), trace.clone())))
1062 .unwrap();
1063
1064 let results = reg.drain_completed();
1065 assert_eq!(results.len(), 1);
1066 assert!(!results[0].success);
1067 assert_eq!(results[0].events, trace);
1068 }
1069
1070 /// #1022 B9 corollary: cancelled / panicked tasks have *no*
1071 /// trace available (the buffering sink died with the task), and
1072 /// that's an explicitly-empty Vec rather than uninitialized.
1073 #[tokio::test]
1074 async fn cancelled_task_has_empty_event_trace() {
1075 let reg = BgAgentRegistry::new();
1076 let (_id, tx) = reg.register_test("flaky", "x");
1077 drop(tx); // simulate panic / abort
1078 let results = reg.drain_completed();
1079 assert_eq!(results.len(), 1);
1080 assert!(!results[0].success);
1081 assert!(
1082 results[0].events.is_empty(),
1083 "cancel path must yield empty trace"
1084 );
1085 }
1086
1087 /// Phase 1 of #1022, B3 regression test: dropping the registry
1088 /// must abort still-running spawned tasks. Without
1089 /// `AbortOnDropHandle` (or an explicit `JoinHandle::abort` in
1090 /// `Drop`), the spawned future would keep running after the
1091 /// registry — and any worktrees / API tokens / writes it owns —
1092 /// were dropped. That's the leak we're fixing.
1093 #[tokio::test]
1094 async fn registry_drop_aborts_pending_tasks() {
1095 let reg = BgAgentRegistry::new();
1096 let parent = CancellationToken::new();
1097 let reservation = reg.reserve(&parent, None);
1098 let task_id = reservation.task_id;
1099 let cancel_for_task = reservation.cancel.clone();
1100 let tx = reservation.tx;
1101 let rx = reservation.rx;
1102 let cancel_for_entry = reservation.cancel;
1103 let status_rx = reservation.status_rx;
1104
1105 // Use a flag the task sets only if it ever finishes a full
1106 // sleep. If abort works, the flag stays false even though
1107 // we wait long enough for a non-aborted task to finish.
1108 let ran_to_completion = Arc::new(AtomicBool::new(false));
1109 let flag = ran_to_completion.clone();
1110 let handle = tokio::spawn(async move {
1111 // Either the cancel token fires (parent cascade) or we
1112 // get aborted (drop cascade). The slow sleep just gives
1113 // the test time to drop the registry before we'd
1114 // naturally finish.
1115 tokio::select! {
1116 _ = cancel_for_task.cancelled() => {}
1117 _ = tokio::time::sleep(Duration::from_secs(60)) => {
1118 flag.store(true, Ordering::SeqCst);
1119 }
1120 }
1121 let _ = tx.send(Ok(("done".to_string(), Vec::new())));
1122 });
1123 reg.attach(
1124 task_id,
1125 "explore",
1126 "long task",
1127 rx,
1128 cancel_for_entry,
1129 status_rx,
1130 None,
1131 handle,
1132 );
1133
1134 // Give the task a tick to start.
1135 tokio::time::sleep(Duration::from_millis(20)).await;
1136 assert_eq!(reg.pending_count(), 1);
1137
1138 // Drop the registry — this must abort the spawned task.
1139 drop(reg);
1140
1141 // Yield long enough for the abort to land; well under the
1142 // 60 s sleep the task would have completed otherwise.
1143 tokio::time::sleep(Duration::from_millis(100)).await;
1144 assert!(
1145 !ran_to_completion.load(Ordering::SeqCst),
1146 "task slept to completion — AbortOnDropHandle did not abort it"
1147 );
1148 }
1149
1150 /// Phase 1 of #1022, B2 regression test: cancelling the parent
1151 /// token must cascade to bg-agent child tokens handed out by
1152 /// `reserve`.
1153 #[tokio::test]
1154 async fn parent_cancel_cascades_to_reserved_child() {
1155 let reg = BgAgentRegistry::new();
1156 let parent = CancellationToken::new();
1157 let r1 = reg.reserve(&parent, None);
1158 let r2 = reg.reserve(&parent, None);
1159
1160 assert!(!r1.cancel.is_cancelled());
1161 assert!(!r2.cancel.is_cancelled());
1162
1163 parent.cancel();
1164
1165 assert!(
1166 r1.cancel.is_cancelled(),
1167 "child 1 token should observe parent cancel"
1168 );
1169 assert!(
1170 r2.cancel.is_cancelled(),
1171 "child 2 token should observe parent cancel"
1172 );
1173 }
1174
1175 // ── Layer 0 of #996 ──────────────────────────────────────────────────────
1176 //
1177 // Status channel + per-task cancel + snapshot.
1178
1179 /// `cancel(task_id)` must fire that task's cancel token.
1180 /// This is the hook the future `/cancel <id>` slash command and
1181 /// `CancelAgent` LLM tool will call. Verifies a known id returns
1182 /// true *and* the underlying token actually fires.
1183 #[tokio::test]
1184 async fn cancel_known_task_fires_token() {
1185 let reg = BgAgentRegistry::new();
1186 let (task_id, _tx, _status_tx, observer) =
1187 reg.register_test_with_status("explore", "map repo", None);
1188
1189 assert!(!observer.is_cancelled(), "precondition");
1190 let fired = reg.cancel(task_id);
1191 assert!(fired, "cancel(known_id) should report success");
1192 assert!(
1193 observer.is_cancelled(),
1194 "the task's cancel token should observe the cancellation"
1195 );
1196 }
1197
1198 /// `cancel` on an unknown / already-drained id must return false
1199 /// instead of panicking. The slash command and LLM tool will
1200 /// surface this to the user as "no such task".
1201 #[tokio::test]
1202 async fn cancel_unknown_task_returns_false() {
1203 let reg = BgAgentRegistry::new();
1204 assert!(
1205 !reg.cancel(999),
1206 "cancel of an unknown id should be a no-op returning false"
1207 );
1208 }
1209
1210 /// `cancel` is idempotent — calling twice on the same id is safe
1211 /// (the underlying [`CancellationToken::cancel`] is itself
1212 /// idempotent). Both calls return true while the entry is still
1213 /// in `pending`; a third call after drain returns false.
1214 #[tokio::test]
1215 async fn cancel_is_idempotent_while_pending() {
1216 let reg = BgAgentRegistry::new();
1217 let (task_id, _tx, _status_tx, _observer) =
1218 reg.register_test_with_status("explore", "x", None);
1219
1220 assert!(reg.cancel(task_id));
1221 assert!(
1222 reg.cancel(task_id),
1223 "second cancel should still find the entry and report success"
1224 );
1225 }
1226
1227 /// `snapshot()` must return one entry per pending task with
1228 /// stable ordering by `task_id`. Status defaults to `Pending`
1229 /// because no spawned future has flipped it yet.
1230 #[tokio::test]
1231 async fn snapshot_lists_pending_tasks_in_id_order() {
1232 let reg = BgAgentRegistry::new();
1233 let (id_a, _tx_a) = reg.register_test("explore", "map");
1234 let (id_b, _tx_b) = reg.register_test("verify", "check");
1235
1236 let snap = reg.snapshot();
1237 assert_eq!(snap.len(), 2);
1238 // Ordering is by ascending task_id, regardless of HashMap
1239 // iteration order — this is the contract `/agents` relies on.
1240 assert_eq!(snap[0].task_id, id_a);
1241 assert_eq!(snap[0].agent_name, "explore");
1242 assert_eq!(snap[0].prompt, "map");
1243 assert_eq!(snap[0].status, AgentStatus::Pending);
1244 assert_eq!(snap[1].task_id, id_b);
1245 assert_eq!(snap[1].agent_name, "verify");
1246 assert_eq!(snap[1].status, AgentStatus::Pending);
1247 }
1248
1249 /// `snapshot()` reads the live status channel — a `status_tx.send`
1250 /// must be observable on the very next snapshot, with no polling
1251 /// or yielding required (`watch::Receiver::borrow` is sync).
1252 /// This is the contract that lets the status-bar pill (Layer 3)
1253 /// and live `/agents -v` (Layer 1) reflect transitions immediately.
1254 #[tokio::test]
1255 async fn snapshot_reflects_status_writes() {
1256 let reg = BgAgentRegistry::new();
1257 let (task_id, _tx, status_tx, _cancel) =
1258 reg.register_test_with_status("explore", "map", None);
1259
1260 // Default is Pending.
1261 assert_eq!(reg.snapshot()[0].status, AgentStatus::Pending);
1262
1263 // Flip to Running and observe.
1264 status_tx.send(AgentStatus::Running { iter: 3 }).unwrap();
1265 let snap = reg.snapshot();
1266 assert_eq!(snap.len(), 1);
1267 assert_eq!(snap[0].task_id, task_id);
1268 assert_eq!(snap[0].status, AgentStatus::Running { iter: 3 });
1269
1270 // Flip to Completed and observe.
1271 status_tx
1272 .send(AgentStatus::Completed {
1273 summary: "42 files".to_string(),
1274 })
1275 .unwrap();
1276 assert_eq!(
1277 reg.snapshot()[0].status,
1278 AgentStatus::Completed {
1279 summary: "42 files".to_string()
1280 }
1281 );
1282 }
1283
1284 /// `snapshot()` reports a sane `age` that grows monotonically.
1285 /// We don't assert exact values (CI clocks are jittery) — just
1286 /// that two successive snapshots show a non-decreasing age and
1287 /// that the value is non-negative (saturating subtraction
1288 /// prevents underflow if the system clock jumps backwards).
1289 #[tokio::test]
1290 async fn snapshot_age_is_monotonic() {
1291 let reg = BgAgentRegistry::new();
1292 let (_id, _tx) = reg.register_test("explore", "x");
1293
1294 let age1 = reg.snapshot()[0].age;
1295 tokio::time::sleep(Duration::from_millis(15)).await;
1296 let age2 = reg.snapshot()[0].age;
1297 assert!(
1298 age2 >= age1,
1299 "age should be monotonic non-decreasing across snapshots"
1300 );
1301 }
1302
1303 /// `snapshot()` on an empty registry returns an empty Vec, not a
1304 /// panic and not None. `/agents` will use this to render "No
1305 /// background agents."
1306 #[tokio::test]
1307 async fn snapshot_empty_registry_is_empty_vec() {
1308 let reg = BgAgentRegistry::new();
1309 assert!(reg.snapshot().is_empty());
1310 }
1311
1312 /// Once a task is drained (completed and removed from `pending`),
1313 /// it disappears from `snapshot()` immediately. This pins the
1314 /// contract that `/agents` reflects the *currently-pending* set,
1315 /// not historical tasks. The Layer 1 "recently-completed lingers
1316 /// 30 s" UX is implemented at the *display* layer, not here.
1317 #[tokio::test]
1318 async fn snapshot_drops_drained_tasks() {
1319 let reg = BgAgentRegistry::new();
1320 let (_id, tx) = reg.register_test("explore", "x");
1321 assert_eq!(reg.snapshot().len(), 1);
1322
1323 tx.send(Ok(("done".to_string(), Vec::new()))).unwrap();
1324 let _ = reg.drain_completed();
1325
1326 assert!(
1327 reg.snapshot().is_empty(),
1328 "drained tasks must not appear in snapshots"
1329 );
1330 }
1331
1332 // ── Layer 2 of #996: scoped APIs + WaitOutcome ────────────────────────
1333
1334 /// `snapshot_for_caller(None)` returns only top-level tasks;
1335 /// `snapshot_for_caller(Some(N))` returns only N's tasks. Cross-spawner
1336 /// visibility is exactly zero — the Model E isolation guarantee.
1337 #[tokio::test]
1338 async fn snapshot_for_caller_filters_by_spawner() {
1339 let reg = BgAgentRegistry::new();
1340 let (top_id, _tx, _, _) = reg.register_test_with_status("a", "top", None);
1341 let (sub_a_id, _tx, _, _) = reg.register_test_with_status("b", "sub-a", Some(7));
1342 let (_sub_b_id, _tx, _, _) = reg.register_test_with_status("c", "sub-b", Some(9));
1343
1344 let top = reg.snapshot_for_caller(None);
1345 assert_eq!(top.len(), 1);
1346 assert_eq!(top[0].task_id, top_id);
1347
1348 let sub_a = reg.snapshot_for_caller(Some(7));
1349 assert_eq!(sub_a.len(), 1);
1350 assert_eq!(sub_a[0].task_id, sub_a_id);
1351
1352 // Sub-agent 7 sees nothing of sub-agent 9's, of top-level's, etc.
1353 assert!(reg.snapshot_for_caller(Some(42)).is_empty());
1354 }
1355
1356 /// `cancel_as_caller` enforces the Model E permission rule.
1357 #[tokio::test]
1358 async fn cancel_as_caller_returns_forbidden_for_other_spawner() {
1359 let reg = BgAgentRegistry::new();
1360 let (id, _tx, _, observer) = reg.register_test_with_status("x", "y", Some(7));
1361
1362 // Wrong caller — not the top-level (None != Some(7)) and not a peer.
1363 assert_eq!(
1364 reg.cancel_as_caller(id, None),
1365 CancelOutcome::Forbidden,
1366 "top-level must not be able to cancel sub-agent's task"
1367 );
1368 assert_eq!(
1369 reg.cancel_as_caller(id, Some(99)),
1370 CancelOutcome::Forbidden,
1371 "sibling sub-agent must not be able to cancel"
1372 );
1373 assert!(
1374 !observer.is_cancelled(),
1375 "forbidden calls must NOT fire the cancel token"
1376 );
1377
1378 // Correct caller — the original spawner.
1379 assert_eq!(reg.cancel_as_caller(id, Some(7)), CancelOutcome::Cancelled);
1380 assert!(observer.is_cancelled());
1381 }
1382
1383 #[tokio::test]
1384 async fn cancel_as_caller_returns_not_found_for_unknown_id() {
1385 let reg = BgAgentRegistry::new();
1386 assert_eq!(reg.cancel_as_caller(999, None), CancelOutcome::NotFound);
1387 }
1388
1389 /// `cancel_for_spawner` cleans up exactly one sub-agent's children
1390 /// and leaves siblings + top-level alone. The cleanup-on-exit hook.
1391 #[tokio::test]
1392 async fn cancel_for_spawner_kills_only_matching_children() {
1393 let reg = BgAgentRegistry::new();
1394 let (_top, _, _, top_obs) = reg.register_test_with_status("top", "t", None);
1395 let (_a1, _, _, a1_obs) = reg.register_test_with_status("a1", "x", Some(7));
1396 let (_a2, _, _, a2_obs) = reg.register_test_with_status("a2", "y", Some(7));
1397 let (_b, _, _, b_obs) = reg.register_test_with_status("b", "z", Some(9));
1398
1399 let count = reg.cancel_for_spawner(7);
1400 assert_eq!(count, 2, "both of spawner 7's children must be signalled");
1401
1402 assert!(a1_obs.is_cancelled());
1403 assert!(a2_obs.is_cancelled());
1404 assert!(!top_obs.is_cancelled(), "top-level must be untouched");
1405 assert!(!b_obs.is_cancelled(), "sibling spawner's task untouched");
1406
1407 // Idempotent — calling again after entries are still alive
1408 // re-fires the (already-cancelled, no-op) tokens.
1409 assert_eq!(reg.cancel_for_spawner(7), 2);
1410 // Calling for an unknown spawner returns 0.
1411 assert_eq!(reg.cancel_for_spawner(99), 0);
1412 }
1413
1414 /// `wait_for_completion` returns `Completed` and consumes the entry
1415 /// (so a subsequent `drain_completed` can't double-inject).
1416 #[tokio::test]
1417 async fn wait_for_completion_consumes_completed_task() {
1418 let reg = BgAgentRegistry::new();
1419 let (id, tx, status_tx, _) = reg.register_test_with_status("explore", "map", Some(3));
1420
1421 // Fire the result, then transition status to terminal so the
1422 // wait future wakes up.
1423 tx.send(Ok(("final answer".to_string(), vec!["step 1".to_string()])))
1424 .unwrap();
1425 status_tx
1426 .send(AgentStatus::Completed {
1427 summary: "final answer".to_string(),
1428 })
1429 .unwrap();
1430
1431 let outcome = reg
1432 .wait_for_completion(id, Some(3), Duration::from_secs(1))
1433 .await;
1434 match outcome {
1435 WaitOutcome::Completed(result) => {
1436 assert!(result.success);
1437 assert_eq!(result.output, "final answer");
1438 assert_eq!(result.events, vec!["step 1".to_string()]);
1439 }
1440 other => panic!("expected Completed, got {other:?}"),
1441 }
1442
1443 // Entry must be gone — drain sees nothing.
1444 assert_eq!(reg.drain_completed().len(), 0);
1445 assert!(reg.snapshot().is_empty());
1446 }
1447
1448 /// `wait_for_completion` returns `TimedOut` with a fresh snapshot
1449 /// when the task hasn't finished yet — and crucially leaves the
1450 /// entry in the registry so a later drain still works.
1451 #[tokio::test]
1452 async fn wait_for_completion_timeout_preserves_entry() {
1453 let reg = BgAgentRegistry::new();
1454 let (id, _tx, status_tx, _) = reg.register_test_with_status("slow", "x", None);
1455
1456 // Move to Running so the snapshot test below can verify the
1457 // current status got carried through.
1458 status_tx.send(AgentStatus::Running { iter: 2 }).unwrap();
1459
1460 let outcome = reg
1461 .wait_for_completion(id, None, Duration::from_millis(40))
1462 .await;
1463 match outcome {
1464 WaitOutcome::TimedOut(snap) => {
1465 assert_eq!(snap.task_id, id);
1466 assert_eq!(snap.status, AgentStatus::Running { iter: 2 });
1467 }
1468 other => panic!("expected TimedOut, got {other:?}"),
1469 }
1470 // Still in pending after a timeout.
1471 assert_eq!(reg.snapshot().len(), 1);
1472 }
1473
1474 /// `wait_for_completion` enforces the same Model E permission rule
1475 /// as `cancel_as_caller`.
1476 #[tokio::test]
1477 async fn wait_for_completion_returns_forbidden_for_other_spawner() {
1478 let reg = BgAgentRegistry::new();
1479 let (id, _tx, _, _) = reg.register_test_with_status("x", "y", Some(5));
1480
1481 let outcome = reg
1482 .wait_for_completion(id, None, Duration::from_millis(20))
1483 .await;
1484 assert!(
1485 matches!(outcome, WaitOutcome::Forbidden),
1486 "top-level must not be able to wait on sub-agent task; got {outcome:?}"
1487 );
1488
1489 let outcome = reg
1490 .wait_for_completion(id, Some(99), Duration::from_millis(20))
1491 .await;
1492 assert!(
1493 matches!(outcome, WaitOutcome::Forbidden),
1494 "sibling sub-agent must not be able to wait; got {outcome:?}"
1495 );
1496 }
1497
1498 /// Cancellation between status going terminal and `WaitTask` waking
1499 /// up surfaces as `Cancelled` (oneshot closed without sending).
1500 #[tokio::test]
1501 async fn wait_for_completion_returns_cancelled_when_sender_dropped() {
1502 let reg = BgAgentRegistry::new();
1503 let (id, tx, status_tx, _) = reg.register_test_with_status("x", "y", None);
1504
1505 // Drop the sender (simulates task panic / abort), then push
1506 // the status to terminal so wait wakes up.
1507 drop(tx);
1508 status_tx.send(AgentStatus::Cancelled).unwrap();
1509
1510 let outcome = reg
1511 .wait_for_completion(id, None, Duration::from_secs(1))
1512 .await;
1513 assert!(matches!(outcome, WaitOutcome::Cancelled), "got {outcome:?}");
1514 assert!(reg.snapshot().is_empty(), "entry must be reaped");
1515 }
1516
1517 #[tokio::test]
1518 async fn wait_for_completion_returns_not_found_for_unknown_id() {
1519 let reg = BgAgentRegistry::new();
1520 let outcome = reg
1521 .wait_for_completion(999, None, Duration::from_millis(10))
1522 .await;
1523 assert!(matches!(outcome, WaitOutcome::NotFound), "got {outcome:?}");
1524 }
1525
1526 /// Regression test for the PR #1043 race fix.
1527 ///
1528 /// Scenario: the bg future writes terminal `AgentStatus` to the
1529 /// watch channel, then — after a yield-induced gap — sends the
1530 /// payload on the oneshot. The waiter is woken on the watch
1531 /// notify and races to read the oneshot.
1532 ///
1533 /// Before the fix, `wait_for_completion` did `try_recv()` twice
1534 /// back-to-back with no `await` between them; on a multi-thread
1535 /// runtime the second `try_recv` could observe `Empty` again and
1536 /// return `Cancelled` for a successful task. The fix awaits the
1537 /// oneshot future directly with a short inner timeout, which
1538 /// gives the sender's task a chance to run.
1539 ///
1540 /// Multi-thread runtime + explicit `yield_now` between the two
1541 /// sends reliably triggers the old race.
1542 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1543 async fn wait_for_completion_handles_status_then_yield_then_payload() {
1544 let reg = Arc::new(BgAgentRegistry::new());
1545 let (id, tx, status_tx, _observer) =
1546 reg.register_test_with_status("explore", "map repo", None);
1547
1548 // Spawn a task that mimics `run_bg_agent`'s send ordering:
1549 // status first, yield, payload. The yield is what exposed the
1550 // race — it forces tokio to potentially schedule the waiter
1551 // between the two sends.
1552 let send_task = tokio::spawn(async move {
1553 status_tx
1554 .send(AgentStatus::Completed {
1555 summary: "done".into(),
1556 })
1557 .unwrap();
1558 tokio::task::yield_now().await;
1559 tokio::task::yield_now().await;
1560 let _ = tx.send(Ok(("final".into(), vec!["e1".into()])));
1561 });
1562
1563 let outcome = reg
1564 .wait_for_completion(id, None, Duration::from_secs(2))
1565 .await;
1566 send_task.await.unwrap();
1567
1568 match outcome {
1569 WaitOutcome::Completed(result) => {
1570 assert_eq!(result.output, "final");
1571 assert!(result.success);
1572 assert_eq!(result.events, vec!["e1".to_string()]);
1573 }
1574 other => panic!("expected Completed, got {other:?}"),
1575 }
1576 }
1577
1578 // ── #1076: BgStatusEmitter → sink fan-out ───────────────────────────────
1579
1580 /// Helper: build an emitter wired to the given registry, mirroring
1581 /// the production construction in `sub_agent_dispatch::execute_sub_agent`.
1582 fn emitter_for(
1583 reg: &Arc<BgAgentRegistry>,
1584 task_id: u32,
1585 spawner: Option<u32>,
1586 ) -> BgStatusEmitter {
1587 let (tx, _rx) = watch::channel(AgentStatus::Pending);
1588 BgStatusEmitter::new(task_id, spawner, tx, reg.clone())
1589 }
1590
1591 fn extract(event: &EngineEvent) -> (u32, Option<u32>, &AgentStatus) {
1592 match event {
1593 EngineEvent::BgTaskUpdate {
1594 task_id,
1595 spawner,
1596 status,
1597 } => (*task_id, *spawner, status),
1598 other => panic!("expected BgTaskUpdate, got {other:?}"),
1599 }
1600 }
1601
1602 #[test]
1603 fn emitter_send_queues_engine_event_on_registry() {
1604 let reg = Arc::new(BgAgentRegistry::new());
1605 let emitter = emitter_for(®, 7, Some(42));
1606
1607 // Initial: queue is empty.
1608 assert!(
1609 reg.drain_status_events().is_empty(),
1610 "fresh registry must have an empty event queue"
1611 );
1612
1613 emitter.send(AgentStatus::Running { iter: 0 });
1614 let drained = reg.drain_status_events();
1615 assert_eq!(drained.len(), 1, "single send must produce one event");
1616 let (id, spawner, status) = extract(&drained[0]);
1617 assert_eq!(id, 7);
1618 assert_eq!(spawner, Some(42));
1619 assert!(matches!(status, AgentStatus::Running { iter: 0 }));
1620 }
1621
1622 #[test]
1623 fn emitter_drain_is_fifo_and_clears_queue() {
1624 let reg = Arc::new(BgAgentRegistry::new());
1625 let emitter = emitter_for(®, 1, None);
1626
1627 emitter.send(AgentStatus::Running { iter: 0 });
1628 emitter.send(AgentStatus::Running { iter: 1 });
1629 emitter.send(AgentStatus::Running { iter: 2 });
1630 emitter.send(AgentStatus::Completed {
1631 summary: "done".into(),
1632 });
1633
1634 let drained = reg.drain_status_events();
1635 assert_eq!(drained.len(), 4, "all four sends must surface");
1636
1637 // FIFO: transition order is preserved across batches. This
1638 // matters for clients that render "iter N" progress — a
1639 // reorder would show the count moving backwards.
1640 let iters: Vec<_> = drained
1641 .iter()
1642 .filter_map(|e| match e {
1643 EngineEvent::BgTaskUpdate {
1644 status: AgentStatus::Running { iter },
1645 ..
1646 } => Some(*iter),
1647 _ => None,
1648 })
1649 .collect();
1650 assert_eq!(iters, vec![0, 1, 2]);
1651
1652 // Last event is the terminal Completed.
1653 assert!(matches!(
1654 extract(&drained[3]).2,
1655 AgentStatus::Completed { .. }
1656 ));
1657
1658 // Drain consumes — second drain is empty.
1659 assert!(
1660 reg.drain_status_events().is_empty(),
1661 "drain must clear the queue"
1662 );
1663 }
1664
1665 #[test]
1666 fn emitter_send_also_updates_watch_channel() {
1667 // The watch fan-out is what `/agents` and `snapshot()` read.
1668 // Sink fan-out (queue) is for the inference-loop → EngineSink
1669 // path. Both targets must see every transition or `/agents`
1670 // and the TUI/ACP/headless clients will disagree on state.
1671 let reg = Arc::new(BgAgentRegistry::new());
1672 let (tx, mut rx) = watch::channel(AgentStatus::Pending);
1673 let emitter = BgStatusEmitter::new(3, None, tx, reg.clone());
1674
1675 emitter.send(AgentStatus::Running { iter: 5 });
1676
1677 // Watch channel observed.
1678 assert!(matches!(
1679 *rx.borrow_and_update(),
1680 AgentStatus::Running { iter: 5 }
1681 ));
1682 // Queue observed.
1683 let drained = reg.drain_status_events();
1684 assert_eq!(drained.len(), 1);
1685 assert!(matches!(
1686 extract(&drained[0]).2,
1687 AgentStatus::Running { iter: 5 }
1688 ));
1689 }
1690
1691 #[test]
1692 fn emitter_clones_share_queue_and_watch() {
1693 // Layer 4 holds one clone for live `iter` heartbeats;
1694 // `run_bg_agent` keeps another for entry / terminal sends.
1695 // Both clones must funnel into the same registry queue and
1696 // the same per-task watch channel — otherwise terminal
1697 // states could land on a different queue than the heartbeats
1698 // and clients would see Running forever.
1699 let reg = Arc::new(BgAgentRegistry::new());
1700 let (tx, _rx) = watch::channel(AgentStatus::Pending);
1701 let a = BgStatusEmitter::new(11, Some(2), tx, reg.clone());
1702 let b = a.clone();
1703
1704 a.send(AgentStatus::Running { iter: 1 });
1705 b.send(AgentStatus::Completed {
1706 summary: "ok".into(),
1707 });
1708
1709 let drained = reg.drain_status_events();
1710 assert_eq!(drained.len(), 2, "clones must share the registry queue");
1711 // Watch channel reflects the LATEST send, regardless of which
1712 // clone made it (watch is overwriting by design).
1713 assert!(matches!(a.current(), AgentStatus::Completed { .. }));
1714 }
1715
1716 #[test]
1717 fn agent_status_round_trips_through_serde() {
1718 // `EngineEvent::BgTaskUpdate` is the wire format for ACP /
1719 // headless / future transports. All `AgentStatus` variants
1720 // must survive a serde round-trip or the boundary leak fix
1721 // creates a new boundary leak (engine emits, transport drops
1722 // it on the floor).
1723 for status in [
1724 AgentStatus::Pending,
1725 AgentStatus::Running { iter: 0 },
1726 AgentStatus::Running { iter: 17 },
1727 AgentStatus::Cancelled,
1728 AgentStatus::Completed {
1729 summary: "hello".into(),
1730 },
1731 AgentStatus::Errored {
1732 error: "boom".into(),
1733 },
1734 ] {
1735 let event = EngineEvent::BgTaskUpdate {
1736 task_id: 1,
1737 spawner: Some(2),
1738 status: status.clone(),
1739 };
1740 let json = serde_json::to_string(&event).expect("serialize");
1741 let back: EngineEvent = serde_json::from_str(&json).expect("deserialize");
1742 match back {
1743 EngineEvent::BgTaskUpdate {
1744 task_id,
1745 spawner,
1746 status: round_tripped,
1747 } => {
1748 assert_eq!(task_id, 1);
1749 assert_eq!(spawner, Some(2));
1750 assert_eq!(round_tripped, status, "json round-trip lost data: {json}");
1751 }
1752 other => panic!("round-trip changed variant: {other:?}"),
1753 }
1754 }
1755 }
1756
1757 #[test]
1758 fn drain_status_events_is_empty_on_fresh_registry() {
1759 // The inference loop calls `drain_status_events` every
1760 // iteration; the no-bg-task case must be cheap and yield
1761 // an empty Vec without any allocations forced by mistakes
1762 // in the queue type (e.g. `Some(VecDeque::new())`).
1763 let reg = BgAgentRegistry::new();
1764 let drained = reg.drain_status_events();
1765 assert!(drained.is_empty());
1766 }
1767}