Skip to main content

koda_core/tools/
bg_task_tools.rs

1//! Background task management tools (Layer 2 of #996).
2//!
3//! Three LLM tools that let the model see, cancel, and wait for any
4//! background work it has spawned — both background sub-agents
5//! (`InvokeAgent { background: true }`) and background shell processes
6//! (`Bash { background: true }`):
7//!
8//! | Tool | Purpose |
9//! |------|---------|
10//! | `ListBackgroundTasks` | Snapshot every running task (no args). |
11//! | `CancelTask`          | Send cancel/SIGTERM by `task_id`. |
12//! | `WaitTask`            | Block until a task finishes, with timeout. |
13//!
14//! ## ID format
15//!
16//! Task IDs are prefixed strings so the model can tell agent tasks and
17//! shell processes apart at a glance:
18//!
19//! - `agent:N`   — bg-agent task (the `task_id` from
20//!   [`crate::bg_agent::BgAgentRegistry::reserve`]).
21//! - `process:N` — bg shell process (the OS PID from
22//!   [`crate::tools::bg_process::BgRegistry::insert`]).
23//!
24//! Bare numeric IDs (`5`) are accepted by the TUI's `/cancel` and
25//! resolve to `agent:5` for back-compat with #1042; the LLM tools
26//! always require the prefix.
27//!
28//! ## Scope (Model E, see #996 discussion)
29//!
30//! Each tool is filtered to the caller's own tasks: top-level sees
31//! only top-level-spawned, sub-agent N sees only its own. Cross-spawner
32//! cancel/wait returns a `Forbidden` error. This is enforced at the
33//! [`BgAgentRegistry`] / [`BgRegistry`] layer; the tool layer just
34//! produces a useful message when it sees `CancelOutcome::Forbidden`
35//! / `WaitOutcome::Forbidden`.
36//!
37//! [`BgAgentRegistry`]: crate::bg_agent::BgAgentRegistry
38//! [`BgRegistry`]: crate::tools::bg_process::BgRegistry
39
40use crate::providers::ToolDefinition;
41use serde_json::{Value, json};
42use std::sync::Arc;
43use std::time::Duration;
44
45use crate::bg_agent::{
46    AgentStatus, BgAgentRegistry, BgAgentResult, BgTaskSnapshot, CancelOutcome, WaitOutcome,
47};
48use crate::tools::ToolResult;
49use crate::tools::bg_process::{
50    BgProcessSnapshot, BgProcessStatus, BgRegistry, ProcessWaitOutcome,
51};
52
53/// Maximum `timeout_secs` a `WaitTask` call may request. Higher values
54/// are clamped down by the dispatch layer before reaching the registry.
55///
56/// Bounds the worst-case time the inference loop can be parked on a
57/// single tool call. 300 s = 5 min is generous for "wait for a build
58/// to finish" while still preventing a confused model from asking for
59/// `timeout_secs: 86400`.
60pub const WAIT_TASK_MAX_TIMEOUT_SECS: u32 = 300;
61
62/// Default `timeout_secs` when the model omits the parameter.
63pub const WAIT_TASK_DEFAULT_TIMEOUT_SECS: u32 = 30;
64
65/// Return tool definitions for the LLM.
66pub fn definitions() -> Vec<ToolDefinition> {
67    vec![
68        ToolDefinition {
69            name: "ListBackgroundTasks".to_string(),
70            description:
71                "List every background task you have running — both background sub-agents (spawned via \
72                InvokeAgent { background: true }) and background shell processes (spawned via Bash \
73                { background: true }).\n\n\
74                Returns a JSON array of objects, each with:\n\
75                - task_id: prefixed string. \"agent:N\" for sub-agent tasks, \"process:N\" for shell processes.\n\
76                - task_type: \"agent\" or \"process\".\n\
77                - description: agent name + prompt for agents; the original command for processes.\n\
78                - status: \"pending\" | \"running\" | \"completed\" | \"errored\" | \"cancelled\" \
79                (agents) or \"running\" | \"exited\" | \"killed\" (processes).\n\
80                - age_secs: wall-clock seconds since the task was spawned.\n\
81                - exit_code: present only for exited processes.\n\n\
82                Use this when:\n\
83                - You launched background work and want to check progress before doing more.\n\
84                - You need a task_id to feed CancelTask or WaitTask.\n\n\
85                Do NOT use this when:\n\
86                - You're not sure whether you launched anything (you'd see an empty array — \
87                cheap, but pointless if you didn't intend to background work).\n\n\
88                Scope: returns only YOUR tasks. You will never see another agent's tasks or \
89                the user's top-level tasks here."
90                    .to_string(),
91            parameters: json!({
92                "type": "object",
93                "properties": {},
94                "additionalProperties": false
95            }),
96        },
97        ToolDefinition {
98            name: "CancelTask".to_string(),
99            description:
100                "Cancel a single background task by its task_id (from ListBackgroundTasks).\n\n\
101                For sub-agent tasks (\"agent:N\"): fires the per-task cancel token. The agent \
102                observes it on its next inference iteration and shuts down cleanly. The \
103                cancellation result will appear in your conversation as a normal sub-agent \
104                completion with a cancelled marker.\n\n\
105                For shell processes (\"process:N\"): sends SIGTERM. The process status \
106                transitions to \"killed\" immediately; the OS exit code surfaces on the next \
107                ListBackgroundTasks / WaitTask call once the process is fully reaped.\n\n\
108                Idempotent — calling on an already-cancelled / already-exited task is a \
109                successful no-op. Returns an error if the task_id is unknown OR if you don't \
110                own the task (Model E scope: you can only cancel tasks you spawned)."
111                    .to_string(),
112            parameters: json!({
113                "type": "object",
114                "properties": {
115                    "task_id": {
116                        "type": "string",
117                        "description": "Prefixed task id from ListBackgroundTasks: \
118                                        \"agent:N\" or \"process:N\"."
119                    }
120                },
121                "required": ["task_id"],
122                "additionalProperties": false
123            }),
124        },
125        ToolDefinition {
126            name: "WaitTask".to_string(),
127            description: format!(
128                "Block until a background task finishes (or timeout fires).\n\n\
129                Returns the task's terminal state and result so you don't have to keep \
130                polling ListBackgroundTasks. Prefer WaitTask over a polling loop — one \
131                tool call instead of many.\n\n\
132                For sub-agent tasks (\"agent:N\"): on completion, returns the agent's full \
133                output. The result will NOT also appear in the auto-drain on the next \
134                iteration — WaitTask consumes it.\n\n\
135                For shell processes (\"process:N\"): on exit, returns the OS exit code. \
136                Process stdout/stderr is NOT captured — if you need the output, redirect \
137                inside the command (e.g. `Bash {{ command: \"cmd > /tmp/out.log 2>&1\", \
138                background: true }}`) and Read the file separately.\n\n\
139                If the task hasn't finished by `timeout_secs`, returns the current status \
140                without consuming the task — you can call again to keep waiting. Default \
141                {default}s, max {max}s. Returns an error if the task_id is unknown or \
142                doesn't belong to you.",
143                default = WAIT_TASK_DEFAULT_TIMEOUT_SECS,
144                max = WAIT_TASK_MAX_TIMEOUT_SECS,
145            ),
146            parameters: json!({
147                "type": "object",
148                "properties": {
149                    "task_id": {
150                        "type": "string",
151                        "description": "Prefixed task id: \"agent:N\" or \"process:N\"."
152                    },
153                    "timeout_secs": {
154                        "type": "integer",
155                        "minimum": 1,
156                        "maximum": WAIT_TASK_MAX_TIMEOUT_SECS,
157                        "description": format!(
158                            "Maximum seconds to wait. Default {default}, capped at {max} to \
159                             prevent runaway parks of the inference loop.",
160                            default = WAIT_TASK_DEFAULT_TIMEOUT_SECS,
161                            max = WAIT_TASK_MAX_TIMEOUT_SECS,
162                        )
163                    }
164                },
165                "required": ["task_id"],
166                "additionalProperties": false
167            }),
168        },
169    ]
170}
171
172/// Parsed task id (prefix + numeric).
173///
174/// `parse_task_id` produces this from the model-supplied string so the
175/// dispatch layer can route to the right registry without re-parsing.
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub enum TaskId {
178    /// Bg-agent task — `agent:N`.
179    Agent(u32),
180    /// Bg shell process — `process:N`.
181    Process(u32),
182}
183
184/// Parse a model-supplied `task_id` string.
185///
186/// Accepts:
187/// - `"agent:N"` → [`TaskId::Agent`]
188/// - `"process:N"` → [`TaskId::Process`]
189/// - `"N"` (bare numeric) → [`TaskId::Agent`] for back-compat with the
190///   `/cancel <id>` UX shipped in #1042. The LLM tool descriptions
191///   *require* the prefix; this lookup tolerates the bare form so the
192///   TUI can share the same parser without diverging.
193///
194/// Returns an `Err(message)` the dispatch layer can hand back to the
195/// model verbatim when the input is malformed.
196pub fn parse_task_id(input: &str) -> Result<TaskId, String> {
197    let trimmed = input.trim();
198    if trimmed.is_empty() {
199        return Err("task_id is empty".to_string());
200    }
201    if let Some(rest) = trimmed.strip_prefix("agent:") {
202        return rest
203            .parse::<u32>()
204            .map(TaskId::Agent)
205            .map_err(|_| format!("invalid agent id: '{rest}' (expected non-negative integer)"));
206    }
207    if let Some(rest) = trimmed.strip_prefix("process:") {
208        return rest
209            .parse::<u32>()
210            .map(TaskId::Process)
211            .map_err(|_| format!("invalid process id: '{rest}' (expected non-negative integer)"));
212    }
213    // Bare numeric → agent (TUI back-compat, see doc above).
214    if let Ok(n) = trimmed.parse::<u32>() {
215        return Ok(TaskId::Agent(n));
216    }
217    Err(format!(
218        "unrecognized task_id '{input}'; expected \"agent:N\" or \"process:N\""
219    ))
220}
221
222/// Clamp a model-supplied `timeout_secs` into the allowed range.
223///
224/// `None` → [`WAIT_TASK_DEFAULT_TIMEOUT_SECS`].
225/// `Some(0)` → 1 (degenerate but harmless; we don't error out on 0).
226/// `Some(> WAIT_TASK_MAX_TIMEOUT_SECS)` → cap.
227pub fn clamp_wait_timeout_secs(requested: Option<u32>) -> u32 {
228    let raw = requested.unwrap_or(WAIT_TASK_DEFAULT_TIMEOUT_SECS);
229    raw.clamp(1, WAIT_TASK_MAX_TIMEOUT_SECS)
230}
231
232// ══ Execution ══════════════════════════════════════════════════════════════════════════════════════
233//
234// Dispatch entry point for the three Layer-2 tools. Called from
235// `tool_dispatch::execute_one_tool` when `tool_name` matches; never
236// goes through `ToolRegistry::execute()` because we need the
237// `Arc<BgAgentRegistry>` (not stored on the registry) and the caller's
238// spawner identity (only known at the dispatch layer).
239
240/// Render an [`AgentStatus`] as the lower-case status string we
241/// surface to the model. Stable strings — they're documented in the
242/// `ListBackgroundTasks` tool description and become part of the
243/// tool API surface.
244fn agent_status_str(s: &AgentStatus) -> &'static str {
245    match s {
246        AgentStatus::Pending => "pending",
247        AgentStatus::Running { .. } => "running",
248        AgentStatus::Completed { .. } => "completed",
249        AgentStatus::Errored { .. } => "errored",
250        AgentStatus::Cancelled => "cancelled",
251    }
252}
253
254/// Render a [`BgProcessStatus`] as the lower-case status string.
255fn process_status_str(s: &BgProcessStatus) -> &'static str {
256    match s {
257        BgProcessStatus::Running => "running",
258        BgProcessStatus::Exited { .. } => "exited",
259        BgProcessStatus::Killed => "killed",
260    }
261}
262
263fn agent_snapshot_to_json(s: &BgTaskSnapshot) -> Value {
264    json!({
265        "task_id": format!("agent:{}", s.task_id),
266        "task_type": "agent",
267        "description": format!("{}: {}", s.agent_name, s.prompt),
268        "status": agent_status_str(&s.status),
269        "age_secs": s.age.as_secs(),
270    })
271}
272
273fn process_snapshot_to_json(s: &BgProcessSnapshot) -> Value {
274    let mut obj = json!({
275        "task_id": format!("process:{}", s.pid),
276        "task_type": "process",
277        "description": s.command.clone(),
278        "status": process_status_str(&s.status),
279        "age_secs": s.age.as_secs(),
280    });
281    if let BgProcessStatus::Exited { code } = s.status {
282        obj.as_object_mut()
283            .unwrap()
284            .insert("exit_code".into(), json!(code));
285    }
286    obj
287}
288
289/// Helper for emitting an Err-shaped [`ToolResult`] with a model-readable
290/// message. The dispatch layer surfaces this back to the model as a
291/// failed tool call — same convention as every other tool.
292fn err(msg: impl Into<String>) -> ToolResult {
293    ToolResult {
294        output: msg.into(),
295        success: false,
296        full_output: None,
297    }
298}
299
300fn ok(value: Value) -> ToolResult {
301    ToolResult {
302        output: value.to_string(),
303        success: true,
304        full_output: None,
305    }
306}
307
308/// Dispatch a Layer-2 tool call. Returns a [`ToolResult`] in the same
309/// shape as `ToolRegistry::execute()` so the dispatch layer can plug
310/// it in without special-casing further.
311///
312/// `tool_name` must be one of `"ListBackgroundTasks"`, `"CancelTask"`,
313/// `"WaitTask"`. Any other name is a programmer error in the dispatch
314/// router — we return an `err` so it's loud-but-safe in production.
315pub async fn execute(
316    tool_name: &str,
317    arguments: &str,
318    bg_agents: &Arc<BgAgentRegistry>,
319    bg_processes: &BgRegistry,
320    caller_spawner: Option<u32>,
321) -> ToolResult {
322    match tool_name {
323        "ListBackgroundTasks" => execute_list(bg_agents, bg_processes, caller_spawner),
324        "CancelTask" => execute_cancel(arguments, bg_agents, bg_processes, caller_spawner),
325        "WaitTask" => execute_wait(arguments, bg_agents, bg_processes, caller_spawner).await,
326        other => err(format!(
327            "bg_task_tools::execute called with unknown tool '{other}' \
328             (router bug — should have matched in tool_dispatch)"
329        )),
330    }
331}
332
333fn execute_list(
334    bg_agents: &BgAgentRegistry,
335    bg_processes: &BgRegistry,
336    caller_spawner: Option<u32>,
337) -> ToolResult {
338    // Refresh process statuses so the model sees the latest exit codes.
339    bg_processes.reap();
340
341    let mut entries: Vec<Value> = bg_agents
342        .snapshot_for_caller(caller_spawner)
343        .iter()
344        .map(agent_snapshot_to_json)
345        .collect();
346    entries.extend(
347        bg_processes
348            .snapshot_for_caller(caller_spawner)
349            .iter()
350            .map(process_snapshot_to_json),
351    );
352    ok(Value::Array(entries))
353}
354
355fn execute_cancel(
356    arguments: &str,
357    bg_agents: &BgAgentRegistry,
358    bg_processes: &BgRegistry,
359    caller_spawner: Option<u32>,
360) -> ToolResult {
361    let args: Value = match serde_json::from_str(arguments) {
362        Ok(v) => v,
363        Err(e) => return err(format!("CancelTask: invalid JSON arguments: {e}")),
364    };
365    let task_id_str = match args.get("task_id").and_then(|v| v.as_str()) {
366        Some(s) => s,
367        None => return err("CancelTask: missing required 'task_id' (string)"),
368    };
369    let task_id = match parse_task_id(task_id_str) {
370        Ok(t) => t,
371        Err(e) => return err(format!("CancelTask: {e}")),
372    };
373
374    let outcome = match task_id {
375        TaskId::Agent(n) => bg_agents.cancel_as_caller(n, caller_spawner),
376        TaskId::Process(n) => bg_processes.kill_as_caller(n, caller_spawner),
377    };
378
379    match outcome {
380        CancelOutcome::Cancelled => ok(json!({
381            "task_id": task_id_str,
382            "cancelled": true,
383        })),
384        CancelOutcome::NotFound => err(format!(
385            "CancelTask: no background task with id '{task_id_str}' \
386             (already finished, never existed, or already drained)"
387        )),
388        CancelOutcome::Forbidden => err(format!(
389            "CancelTask: task '{task_id_str}' is not owned by this caller"
390        )),
391    }
392}
393
394async fn execute_wait(
395    arguments: &str,
396    bg_agents: &BgAgentRegistry,
397    bg_processes: &BgRegistry,
398    caller_spawner: Option<u32>,
399) -> ToolResult {
400    let args: Value = match serde_json::from_str(arguments) {
401        Ok(v) => v,
402        Err(e) => return err(format!("WaitTask: invalid JSON arguments: {e}")),
403    };
404    let task_id_str = match args.get("task_id").and_then(|v| v.as_str()) {
405        Some(s) => s,
406        None => return err("WaitTask: missing required 'task_id' (string)"),
407    };
408    let task_id = match parse_task_id(task_id_str) {
409        Ok(t) => t,
410        Err(e) => return err(format!("WaitTask: {e}")),
411    };
412    let timeout_secs = clamp_wait_timeout_secs(
413        args.get("timeout_secs")
414            .and_then(|v| v.as_u64())
415            .map(|n| n as u32),
416    );
417    let timeout = Duration::from_secs(timeout_secs as u64);
418
419    match task_id {
420        TaskId::Agent(n) => {
421            let outcome = bg_agents
422                .wait_for_completion(n, caller_spawner, timeout)
423                .await;
424            agent_wait_to_tool_result(task_id_str, outcome)
425        }
426        TaskId::Process(n) => {
427            let outcome = bg_processes
428                .wait_for_exit_as_caller(n, caller_spawner, timeout)
429                .await;
430            process_wait_to_tool_result(task_id_str, outcome)
431        }
432    }
433}
434
435fn agent_wait_to_tool_result(task_id_str: &str, outcome: WaitOutcome) -> ToolResult {
436    match outcome {
437        WaitOutcome::Completed(BgAgentResult {
438            agent_name,
439            prompt,
440            output,
441            success,
442            events,
443        }) => ok(json!({
444            "task_id": task_id_str,
445            "status": if success { "completed" } else { "errored" },
446            "agent_name": agent_name,
447            "prompt": prompt,
448            "output": output,
449            "events": events,
450        })),
451        WaitOutcome::Cancelled => ok(json!({
452            "task_id": task_id_str,
453            "status": "cancelled",
454        })),
455        WaitOutcome::TimedOut(snap) => ok(json!({
456            "task_id": task_id_str,
457            "status": "timed_out",
458            "current": agent_snapshot_to_json(&snap),
459        })),
460        WaitOutcome::NotFound => err(format!(
461            "WaitTask: no background task with id '{task_id_str}'"
462        )),
463        WaitOutcome::Forbidden => err(format!(
464            "WaitTask: task '{task_id_str}' is not owned by this caller"
465        )),
466    }
467}
468
469fn process_wait_to_tool_result(task_id_str: &str, outcome: ProcessWaitOutcome) -> ToolResult {
470    match outcome {
471        ProcessWaitOutcome::Exited { code } => ok(json!({
472            "task_id": task_id_str,
473            "status": "exited",
474            "exit_code": code,
475        })),
476        ProcessWaitOutcome::TimedOut(snap) => ok(json!({
477            "task_id": task_id_str,
478            "status": "timed_out",
479            "current": process_snapshot_to_json(&snap),
480        })),
481        ProcessWaitOutcome::NotFound => err(format!(
482            "WaitTask: no background task with id '{task_id_str}'"
483        )),
484        ProcessWaitOutcome::Forbidden => err(format!(
485            "WaitTask: task '{task_id_str}' is not owned by this caller"
486        )),
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[test]
495    fn definitions_returns_three_tools_with_expected_names() {
496        let defs = definitions();
497        let names: Vec<&str> = defs.iter().map(|d| d.name.as_str()).collect();
498        assert_eq!(names, vec!["ListBackgroundTasks", "CancelTask", "WaitTask"]);
499    }
500
501    #[test]
502    fn list_background_tasks_takes_no_arguments() {
503        let defs = definitions();
504        let list = defs
505            .iter()
506            .find(|d| d.name == "ListBackgroundTasks")
507            .unwrap();
508        // Required must be empty (or absent).
509        let required = list.parameters.get("required");
510        assert!(
511            required.is_none() || required.unwrap().as_array().unwrap().is_empty(),
512            "ListBackgroundTasks must take no required args"
513        );
514    }
515
516    #[test]
517    fn cancel_and_wait_require_task_id() {
518        let defs = definitions();
519        for name in ["CancelTask", "WaitTask"] {
520            let def = defs.iter().find(|d| d.name == name).unwrap();
521            let required = def.parameters["required"].as_array().unwrap();
522            assert!(
523                required.iter().any(|v| v == "task_id"),
524                "{name} must require task_id"
525            );
526        }
527    }
528
529    #[test]
530    fn parse_task_id_accepts_prefixed_forms() {
531        assert_eq!(parse_task_id("agent:7").unwrap(), TaskId::Agent(7));
532        assert_eq!(
533            parse_task_id("process:12345").unwrap(),
534            TaskId::Process(12345)
535        );
536        // Whitespace tolerance — models sometimes add it.
537        assert_eq!(parse_task_id("  agent:1  ").unwrap(), TaskId::Agent(1));
538    }
539
540    #[test]
541    fn parse_task_id_accepts_bare_numeric_as_agent() {
542        // TUI back-compat: `/cancel 5` → agent:5.
543        assert_eq!(parse_task_id("5").unwrap(), TaskId::Agent(5));
544    }
545
546    #[test]
547    fn parse_task_id_rejects_bad_input() {
548        assert!(parse_task_id("").is_err());
549        assert!(parse_task_id("   ").is_err());
550        assert!(parse_task_id("agent:").is_err());
551        assert!(parse_task_id("agent:abc").is_err());
552        assert!(parse_task_id("process:-1").is_err());
553        assert!(parse_task_id("foobar").is_err());
554        assert!(parse_task_id("mcp:1").is_err()); // future prefix not yet wired
555    }
556
557    #[test]
558    fn clamp_wait_timeout_handles_none_default() {
559        assert_eq!(
560            clamp_wait_timeout_secs(None),
561            WAIT_TASK_DEFAULT_TIMEOUT_SECS
562        );
563    }
564
565    #[test]
566    fn clamp_wait_timeout_caps_at_max() {
567        assert_eq!(
568            clamp_wait_timeout_secs(Some(86400)),
569            WAIT_TASK_MAX_TIMEOUT_SECS
570        );
571    }
572
573    #[test]
574    fn clamp_wait_timeout_floors_at_one() {
575        assert_eq!(clamp_wait_timeout_secs(Some(0)), 1);
576    }
577
578    #[test]
579    fn clamp_wait_timeout_passes_through_in_range() {
580        assert_eq!(clamp_wait_timeout_secs(Some(45)), 45);
581        assert_eq!(
582            clamp_wait_timeout_secs(Some(WAIT_TASK_MAX_TIMEOUT_SECS)),
583            WAIT_TASK_MAX_TIMEOUT_SECS
584        );
585    }
586
587    // ── execute() dispatch tests ─────────────────────────────────────────────────────
588
589    fn fresh_registries() -> (Arc<BgAgentRegistry>, BgRegistry) {
590        (Arc::new(BgAgentRegistry::new()), BgRegistry::new())
591    }
592
593    /// `ListBackgroundTasks` on empty registries returns `[]`, success.
594    #[tokio::test]
595    async fn execute_list_returns_empty_array_when_no_tasks() {
596        let (agents, processes) = fresh_registries();
597        let r = execute("ListBackgroundTasks", "{}", &agents, &processes, None).await;
598        assert!(r.success);
599        assert_eq!(r.output, "[]");
600    }
601
602    /// `ListBackgroundTasks` shows the caller's agent tasks with the
603    /// agreed-upon shape (prefixed task_id, lower-case status).
604    #[tokio::test]
605    async fn execute_list_includes_caller_agent_tasks() {
606        let (agents, processes) = fresh_registries();
607        let (id, _tx, _, _) = agents.register_test_with_status("explore", "map repo", None);
608
609        let r = execute("ListBackgroundTasks", "{}", &agents, &processes, None).await;
610        assert!(r.success);
611        let arr: Value = serde_json::from_str(&r.output).unwrap();
612        let arr = arr.as_array().unwrap();
613        assert_eq!(arr.len(), 1);
614        assert_eq!(arr[0]["task_id"], format!("agent:{id}"));
615        assert_eq!(arr[0]["task_type"], "agent");
616        assert_eq!(arr[0]["status"], "pending");
617        assert_eq!(arr[0]["description"], "explore: map repo");
618    }
619
620    /// Caller scoping: a sub-agent caller (Some(7)) must not see the
621    /// top-level (None) task. Defence-in-depth on top of the
622    /// sub_agent_dispatch denylist.
623    #[tokio::test]
624    async fn execute_list_filters_out_other_callers_tasks() {
625        let (agents, processes) = fresh_registries();
626        agents.register_test_with_status("a", "top", None);
627        agents.register_test_with_status("b", "sub", Some(7));
628
629        let top = execute("ListBackgroundTasks", "{}", &agents, &processes, None).await;
630        let arr: Value = serde_json::from_str(&top.output).unwrap();
631        assert_eq!(arr.as_array().unwrap().len(), 1, "top sees only its own");
632
633        let sub = execute("ListBackgroundTasks", "{}", &agents, &processes, Some(7)).await;
634        let arr: Value = serde_json::from_str(&sub.output).unwrap();
635        assert_eq!(arr.as_array().unwrap().len(), 1, "sub sees only its own");
636    }
637
638    /// `CancelTask` routes `agent:N` to BgAgentRegistry and reports
639    /// success in the structured payload.
640    #[tokio::test]
641    async fn execute_cancel_succeeds_for_owned_agent_task() {
642        let (agents, processes) = fresh_registries();
643        let (id, _tx, _, observer) = agents.register_test_with_status("x", "y", None);
644
645        let r = execute(
646            "CancelTask",
647            &json!({ "task_id": format!("agent:{id}") }).to_string(),
648            &agents,
649            &processes,
650            None,
651        )
652        .await;
653        assert!(r.success, "got: {}", r.output);
654        assert!(observer.is_cancelled(), "cancel token must fire");
655        let payload: Value = serde_json::from_str(&r.output).unwrap();
656        assert_eq!(payload["cancelled"], true);
657        assert_eq!(payload["task_id"], format!("agent:{id}"));
658    }
659
660    #[tokio::test]
661    async fn execute_cancel_returns_not_found_for_unknown_id() {
662        let (agents, processes) = fresh_registries();
663        let r = execute(
664            "CancelTask",
665            &json!({ "task_id": "agent:9999" }).to_string(),
666            &agents,
667            &processes,
668            None,
669        )
670        .await;
671        assert!(!r.success);
672        assert!(r.output.contains("no background task"), "got: {}", r.output);
673    }
674
675    #[tokio::test]
676    async fn execute_cancel_returns_forbidden_for_cross_caller() {
677        let (agents, processes) = fresh_registries();
678        let (id, _tx, _, observer) = agents.register_test_with_status("x", "y", Some(5));
679
680        // Top-level (None) tries to cancel sub-agent 5's task.
681        let r = execute(
682            "CancelTask",
683            &json!({ "task_id": format!("agent:{id}") }).to_string(),
684            &agents,
685            &processes,
686            None,
687        )
688        .await;
689        assert!(!r.success);
690        assert!(
691            r.output.contains("not owned by this caller"),
692            "got: {}",
693            r.output
694        );
695        assert!(!observer.is_cancelled(), "forbidden must NOT fire token");
696    }
697
698    #[tokio::test]
699    async fn execute_cancel_rejects_malformed_json() {
700        let (agents, processes) = fresh_registries();
701        let r = execute("CancelTask", "not-json", &agents, &processes, None).await;
702        assert!(!r.success);
703        assert!(r.output.contains("invalid JSON"), "got: {}", r.output);
704    }
705
706    #[tokio::test]
707    async fn execute_cancel_rejects_missing_task_id() {
708        let (agents, processes) = fresh_registries();
709        let r = execute("CancelTask", "{}", &agents, &processes, None).await;
710        assert!(!r.success);
711        assert!(r.output.contains("missing required"), "got: {}", r.output);
712    }
713
714    /// `WaitTask` on a completed agent task returns `status:completed`
715    /// + the agent's output, and consumes the entry (drain sees nothing).
716    #[tokio::test]
717    async fn execute_wait_returns_completed_for_finished_agent() {
718        let (agents, processes) = fresh_registries();
719        let (id, tx, status_tx, _) = agents.register_test_with_status("explore", "map", None);
720        tx.send(Ok(("final answer".into(), vec!["e1".into()])))
721            .unwrap();
722        status_tx
723            .send(AgentStatus::Completed {
724                summary: "final".into(),
725            })
726            .unwrap();
727
728        let r = execute(
729            "WaitTask",
730            &json!({ "task_id": format!("agent:{id}"), "timeout_secs": 1 }).to_string(),
731            &agents,
732            &processes,
733            None,
734        )
735        .await;
736        assert!(r.success, "got: {}", r.output);
737        let payload: Value = serde_json::from_str(&r.output).unwrap();
738        assert_eq!(payload["status"], "completed");
739        assert_eq!(payload["output"], "final answer");
740        assert_eq!(payload["events"].as_array().unwrap().len(), 1);
741        // Consumed — not in registry anymore.
742        assert_eq!(agents.snapshot().len(), 0);
743    }
744
745    /// `WaitTask` timeout returns `status:timed_out` + a snapshot of
746    /// the still-running task.
747    #[tokio::test]
748    async fn execute_wait_returns_timed_out_with_snapshot() {
749        let (agents, processes) = fresh_registries();
750        // Bind ALL four to keep the channels alive — if status_tx
751        // drops, the watch sender is gone and `wait_for_terminal_status`
752        // early-returns, surfacing as Cancelled instead of TimedOut.
753        let (id, _tx, _status_tx, _observer) = agents.register_test_with_status("slow", "x", None);
754
755        let r = execute(
756            "WaitTask",
757            // Timeout below 1s gets clamped to 1s by clamp_wait_timeout_secs;
758            // we still want the test to be fast — 1 s is the minimum the
759            // tool surface allows.
760            &json!({ "task_id": format!("agent:{id}"), "timeout_secs": 1 }).to_string(),
761            &agents,
762            &processes,
763            None,
764        )
765        .await;
766        assert!(r.success);
767        let payload: Value = serde_json::from_str(&r.output).unwrap();
768        assert_eq!(payload["status"], "timed_out");
769        assert_eq!(payload["current"]["task_id"], format!("agent:{id}"));
770        // Entry preserved.
771        assert_eq!(agents.snapshot().len(), 1);
772    }
773
774    /// `WaitTask` returns `status:cancelled` when the cancellation token
775    /// fires and the status channel reflects `AgentStatus::Cancelled`.
776    ///
777    /// The three terminal states are `completed`, `timed_out`, and
778    /// `cancelled`. The first two have existing tests; this is the
779    /// third path — the one that was missing (#1048).
780    #[tokio::test]
781    async fn execute_wait_returns_cancelled_when_token_fires() {
782        let (agents, processes) = fresh_registries();
783        let (id, tx, status_tx, observer) = agents.register_test_with_status("slow", "x", None);
784
785        // Fire the cancellation token and push the terminal status so
786        // `wait_for_terminal_status` unblocks immediately.
787        observer.cancel();
788        status_tx.send(AgentStatus::Cancelled).unwrap();
789        // Drop the result sender so `entry.rx.await` inside
790        // `wait_for_completion` resolves immediately as `Err` rather
791        // than waiting out the 50 ms inner timeout.
792        drop(tx);
793
794        let r = execute(
795            "WaitTask",
796            &json!({ "task_id": format!("agent:{id}"), "timeout_secs": 5 }).to_string(),
797            &agents,
798            &processes,
799            None,
800        )
801        .await;
802        assert!(
803            r.success,
804            "WaitTask on a cancelled task must still succeed: {}",
805            r.output
806        );
807        let payload: Value = serde_json::from_str(&r.output).unwrap();
808        assert_eq!(payload["status"], "cancelled");
809        assert_eq!(payload["task_id"], format!("agent:{id}"));
810        // Consumed — entry removed from registry.
811        assert_eq!(agents.snapshot().len(), 0);
812    }
813
814    #[tokio::test]
815    async fn execute_unknown_tool_name_returns_error() {
816        let (agents, processes) = fresh_registries();
817        let r = execute("NotAToolWeKnow", "{}", &agents, &processes, None).await;
818        assert!(!r.success);
819        assert!(r.output.contains("unknown tool"));
820    }
821}