Skip to main content

car_multi/patterns/
spawn_subtask.rs

1//! SpawnSubtask — an agent dynamically spawns isolated, tool-constrained
2//! sub-agents mid-run via a `spawn_subtask` tool.
3//!
4//! Unlike [`Delegator`](crate::Delegator), which routes to a fixed set of named
5//! specialists, `spawn_subtask` lets the main agent invent an ephemeral
6//! sub-agent on the fly and hand it a *subset of its own tools*:
7//!
8//! ```text
9//! main agent calls spawn_subtask(task="scrape the page", tools=["fetch"])
10//!   → SpawnSubtaskExecutor intercepts
11//!   → validates {fetch} ⊆ parent tools  (privilege de-escalation)
12//!   → spawns an ephemeral agent limited to {fetch}
13//!   → returns the sub-agent's answer as the tool result
14//! ```
15//!
16//! ## Tool-subset is enforced twice — once provably
17//!
18//! 1. **Schema (provable):** the `spawn_subtask` tool is registered with a JSON
19//!    Schema whose `tools` items are an `enum` of the parent's tool names. The
20//!    runtime's validator (`car-validator`, via `jsonschema`) rejects any
21//!    proposed call that lists a tool outside that set *before* execution. The
22//!    subset constraint is therefore a verified precondition, not a convention.
23//! 2. **Executor (defense in depth):** the executor re-checks the subset at call
24//!    time, so a caller that dispatches the tool without schema validation still
25//!    cannot escalate.
26//!
27//! ## Parallelism
28//!
29//! Multiple `spawn_subtask` calls a model emits in one proposal run concurrently
30//! through the engine's DAG executor (`futures::join_all` over independent
31//! actions) — no manual batching needed. Each spawn gets its own runtime over
32//! the shared infra, so they don't contend on a single mutable tool.
33
34use crate::error::MultiError;
35use crate::mailbox::Mailbox;
36use crate::runner::AgentRunner;
37use crate::shared::SharedInfra;
38use crate::types::AgentSpec;
39use car_engine::ToolExecutor;
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use tokio::sync::Mutex;
45use tracing::instrument;
46
47/// Reserved name of the meta-tool itself. Never granted to a sub-agent, so
48/// delegation is structurally bounded to one level.
49pub const SPAWN_SUBTASK_TOOL: &str = "spawn_subtask";
50
51/// Default system prompt for an ephemeral sub-agent when none is configured.
52const DEFAULT_SUBAGENT_PROMPT: &str =
53    "You are a focused sub-agent. Complete the single task you are given using \
54     only the tools provided, then return a concise result.";
55
56/// Default turn cap for ephemeral sub-agents.
57const DEFAULT_SUBAGENT_MAX_TURNS: u32 = 10;
58
59/// Record of one `spawn_subtask` invocation.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct SubtaskRecord {
62    pub name: String,
63    pub task: String,
64    pub tools: Vec<String>,
65    pub result: String,
66    pub success: bool,
67}
68
69/// Result of running an agent that can spawn sub-agents.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct SpawnSubtaskResult {
72    pub task: String,
73    pub final_answer: String,
74    pub subtasks: Vec<SubtaskRecord>,
75}
76
77/// Coordinator: runs a main agent with the `spawn_subtask` tool enabled.
78pub struct SpawnSubtask {
79    pub main: AgentSpec,
80    subagent_prompt: String,
81    subagent_max_turns: u32,
82}
83
84impl SpawnSubtask {
85    pub fn new(main: AgentSpec) -> Self {
86        Self {
87            main,
88            subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
89            subagent_max_turns: DEFAULT_SUBAGENT_MAX_TURNS,
90        }
91    }
92
93    #[instrument(name = "multi.spawn_subtask", skip_all)]
94    pub async fn run(
95        &self,
96        task: &str,
97        runner: &Arc<dyn AgentRunner>,
98        infra: &SharedInfra,
99    ) -> Result<SpawnSubtaskResult, MultiError> {
100        let records = Arc::new(Mutex::new(Vec::<SubtaskRecord>::new()));
101
102        // `spawn_subtask` is a reserved meta-tool: it is never part of the granted
103        // set, so (a) a sub-agent can never receive it — delegation is bounded to
104        // one level, no unbounded recursion — and (b) the parent-tool registration
105        // can't clobber the constrained schema by re-registering the same name.
106        let granted: Vec<String> = self
107            .main
108            .tools
109            .iter()
110            .filter(|t| t.as_str() != SPAWN_SUBTASK_TOOL)
111            .cloned()
112            .collect();
113
114        let rt = infra.make_runtime();
115        for tool in &granted {
116            rt.register_tool(tool).await;
117        }
118        // Register the constrained, subset-enforcing schema last so it is what
119        // ends up registered for the meta-tool (not a bare `{}`).
120        rt.register_tool_schema(spawn_subtask_schema(&granted)).await;
121
122        let executor = Arc::new(SpawnSubtaskExecutor {
123            parent_tools: granted.into_iter().collect(),
124            subagent_prompt: self.subagent_prompt.clone(),
125            subagent_max_turns: self.subagent_max_turns,
126            runner: Arc::clone(runner),
127            infra_state: Arc::clone(&infra.state),
128            infra_log: Arc::clone(&infra.log),
129            infra_policies: Arc::clone(&infra.policies),
130            budget: Arc::clone(&infra.budget),
131            records: Arc::clone(&records),
132        });
133        rt.set_executor(executor).await;
134
135        // The main agent is the entry agent the caller explicitly invoked; it is
136        // intentionally NOT budget-gated. The budget caps the sub-agents it
137        // spawns, gated in SpawnSubtaskExecutor below.
138        let mailbox = Mailbox::default();
139        let output = runner
140            .run(&self.main, task, &rt, &mailbox)
141            .await
142            .map_err(|e| MultiError::AgentFailed(self.main.name.clone(), e.to_string()))?;
143
144        let subtasks = records.lock().await.clone();
145        Ok(SpawnSubtaskResult {
146            task: task.to_string(),
147            final_answer: output.answer,
148            subtasks,
149        })
150    }
151}
152
153/// Build the `spawn_subtask` tool schema whose `tools` parameter is constrained
154/// to an `enum` of `parent_tools` — the verifiable subset precondition.
155///
156/// If `parent_tools` is empty the `enum` is empty, so the validator rejects any
157/// non-empty `tools` request: an agent with no tools can only spawn a (useless)
158/// tool-less sub-agent. That's a degenerate but safe configuration.
159pub fn spawn_subtask_schema(parent_tools: &[String]) -> car_ir::ToolSchema {
160    car_ir::ToolSchema {
161        name: "spawn_subtask".to_string(),
162        description: "Spawn an isolated sub-agent to handle one focused subtask. \
163            The sub-agent may only use a subset of the tools you yourself have."
164            .to_string(),
165        parameters: serde_json::json!({
166            "type": "object",
167            "properties": {
168                "task": {
169                    "type": "string",
170                    "description": "The single, self-contained task for the sub-agent."
171                },
172                "tools": {
173                    "type": "array",
174                    "items": { "type": "string", "enum": parent_tools },
175                    "description": "Tools to grant the sub-agent. Must be a subset of your own tools."
176                },
177                "name": {
178                    "type": "string",
179                    "description": "Short label for the sub-agent (for logs)."
180                }
181            },
182            "required": ["task", "tools"]
183        }),
184        returns: None,
185        idempotent: false,
186        cache_ttl_secs: None,
187        rate_limit: None,
188    }
189}
190
191/// ToolExecutor that intercepts `spawn_subtask` and spawns a constrained
192/// ephemeral sub-agent.
193struct SpawnSubtaskExecutor {
194    parent_tools: HashSet<String>,
195    subagent_prompt: String,
196    subagent_max_turns: u32,
197    runner: Arc<dyn AgentRunner>,
198    infra_state: Arc<car_state::StateStore>,
199    infra_log: Arc<tokio::sync::Mutex<car_eventlog::EventLog>>,
200    infra_policies: Arc<tokio::sync::RwLock<car_policy::PolicyEngine>>,
201    budget: Arc<crate::budget::CoordinationBudget>,
202    records: Arc<Mutex<Vec<SubtaskRecord>>>,
203}
204
205#[async_trait::async_trait]
206impl ToolExecutor for SpawnSubtaskExecutor {
207    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
208        if tool != "spawn_subtask" {
209            return Err(format!("unknown tool: {}", tool));
210        }
211
212        let task = params
213            .get("task")
214            .and_then(|v| v.as_str())
215            .ok_or("spawn_subtask requires 'task' parameter")?;
216        // Collect requested tools, de-duplicated (preserving first occurrence) so
217        // a repeated name doesn't propagate into the spawned spec or the record.
218        let mut seen = HashSet::new();
219        let requested: Vec<String> = params
220            .get("tools")
221            .and_then(|v| v.as_array())
222            .map(|arr| {
223                arr.iter()
224                    .filter_map(|v| v.as_str().map(String::from))
225                    .filter(|t| seen.insert(t.clone()))
226                    .collect()
227            })
228            .unwrap_or_default();
229        let name = params
230            .get("name")
231            .and_then(|v| v.as_str())
232            .unwrap_or("subtask")
233            .to_string();
234
235        // Defense-in-depth: reject privilege escalation even if the call reached
236        // us without schema validation. The schema enum is the primary guard.
237        let escalations: Vec<String> = requested
238            .iter()
239            .filter(|t| !self.parent_tools.contains(*t))
240            .cloned()
241            .collect();
242        if !escalations.is_empty() {
243            return Err(format!(
244                "privilege escalation rejected: sub-agent tools {:?} are not a subset of the parent's tools",
245                escalations
246            ));
247        }
248
249        let spec = AgentSpec {
250            name: name.clone(),
251            system_prompt: self.subagent_prompt.clone(),
252            tools: requested.clone(),
253            max_turns: self.subagent_max_turns,
254            metadata: std::collections::HashMap::new(),
255            cache_control: false,
256        };
257
258        // Budget gate before spawning the sub-agent. On denial, surface the
259        // reason as the tool result so the parent agent can stop fanning out.
260        if let Err(e) = self.budget.try_begin_agent() {
261            let msg = e.to_string();
262            self.records.lock().await.push(SubtaskRecord {
263                name,
264                task: task.to_string(),
265                tools: requested,
266                result: msg.clone(),
267                success: false,
268            });
269            return Ok(Value::String(msg));
270        }
271
272        let infra = SharedInfra {
273            state: Arc::clone(&self.infra_state),
274            log: Arc::clone(&self.infra_log),
275            policies: Arc::clone(&self.infra_policies),
276            budget: Arc::clone(&self.budget),
277        };
278        let rt = infra.make_runtime();
279        for tool_name in &requested {
280            rt.register_tool(tool_name).await;
281        }
282
283        let mailbox = Mailbox::default();
284        match self.runner.run(&spec, task, &rt, &mailbox).await {
285            Ok(output) => {
286                self.budget.record_output(&output);
287                self.records.lock().await.push(SubtaskRecord {
288                    name,
289                    task: task.to_string(),
290                    tools: requested,
291                    result: output.answer.clone(),
292                    success: true,
293                });
294                Ok(Value::String(output.answer))
295            }
296            Err(e) => {
297                let msg = format!("sub-agent '{}' failed: {}", name, e);
298                self.records.lock().await.push(SubtaskRecord {
299                    name,
300                    task: task.to_string(),
301                    tools: requested,
302                    result: msg.clone(),
303                    success: false,
304                });
305                Ok(Value::String(msg))
306            }
307        }
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::types::{AgentOutput, AgentSpec};
315    use car_engine::Runtime;
316
317    /// Echoes the spec name + task; records nothing about tools.
318    struct SimpleRunner;
319
320    #[async_trait::async_trait]
321    impl AgentRunner for SimpleRunner {
322        async fn run(
323            &self,
324            spec: &AgentSpec,
325            task: &str,
326            _runtime: &Runtime,
327            _mailbox: &Mailbox,
328        ) -> Result<AgentOutput, MultiError> {
329            Ok(AgentOutput {
330                name: spec.name.clone(),
331                answer: format!("{} handled: {}", spec.name, &task[..task.len().min(40)]),
332                turns: 1,
333                tool_calls: 0,
334                duration_ms: 1.0,
335                error: None,
336                outcome: None,
337                tokens: None,
338            })
339        }
340    }
341
342    fn test_executor() -> SpawnSubtaskExecutor {
343        let infra = SharedInfra::new();
344        SpawnSubtaskExecutor {
345            parent_tools: ["fetch", "search"].iter().map(|s| s.to_string()).collect(),
346            subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
347            subagent_max_turns: 5,
348            runner: Arc::new(SimpleRunner),
349            infra_state: infra.state,
350            infra_log: infra.log,
351            infra_policies: infra.policies,
352            budget: infra.budget,
353            records: Arc::new(Mutex::new(Vec::new())),
354        }
355    }
356
357    #[test]
358    fn schema_enum_lists_parent_tools_only() {
359        let schema = spawn_subtask_schema(&["fetch".into(), "search".into()]);
360        let enum_vals = schema.parameters["properties"]["tools"]["items"]["enum"]
361            .as_array()
362            .unwrap();
363        assert_eq!(enum_vals.len(), 2);
364        assert!(enum_vals.iter().any(|v| v == "fetch"));
365        assert!(enum_vals.iter().any(|v| v == "search"));
366    }
367
368    #[tokio::test]
369    async fn subset_call_spawns_subagent() {
370        let exec = test_executor();
371        let out = exec
372            .execute(
373                "spawn_subtask",
374                &serde_json::json!({ "task": "grab the page", "tools": ["fetch"], "name": "scraper" }),
375            )
376            .await
377            .unwrap();
378        assert!(out.as_str().unwrap().contains("scraper handled"));
379        let records = exec.records.lock().await;
380        assert_eq!(records.len(), 1);
381        assert!(records[0].success);
382        assert_eq!(records[0].tools, vec!["fetch".to_string()]);
383    }
384
385    #[tokio::test]
386    async fn escalation_is_rejected() {
387        let exec = test_executor();
388        let err = exec
389            .execute(
390                "spawn_subtask",
391                &serde_json::json!({ "task": "do admin", "tools": ["fetch", "delete_everything"] }),
392            )
393            .await
394            .unwrap_err();
395        assert!(err.contains("privilege escalation"));
396        assert!(err.contains("delete_everything"));
397        // Nothing was spawned.
398        assert!(exec.records.lock().await.is_empty());
399    }
400
401    #[tokio::test]
402    async fn unknown_tool_is_rejected() {
403        let exec = test_executor();
404        let err = exec
405            .execute("not_spawn", &serde_json::json!({}))
406            .await
407            .unwrap_err();
408        assert!(err.contains("unknown tool"));
409    }
410
411    /// Build a single-`spawn_subtask` tool-call proposal.
412    fn spawn_proposal(params: Value) -> car_ir::ActionProposal {
413        let parameters = params
414            .as_object()
415            .unwrap()
416            .iter()
417            .map(|(k, v)| (k.clone(), v.clone()))
418            .collect();
419        car_ir::ActionProposal {
420            id: "p1".into(),
421            source: "test".into(),
422            actions: vec![car_ir::Action {
423                id: "a1".into(),
424                action_type: car_ir::ActionType::ToolCall,
425                tool: Some("spawn_subtask".into()),
426                parameters,
427                preconditions: vec![],
428                expected_effects: std::collections::HashMap::new(),
429                state_dependencies: vec![],
430                idempotent: false,
431                max_retries: 0,
432                failure_behavior: car_ir::FailureBehavior::Skip,
433                timeout_ms: None,
434                metadata: std::collections::HashMap::new(),
435            }],
436            timestamp: chrono::Utc::now(),
437            context: std::collections::HashMap::new(),
438        }
439    }
440
441    /// A runner whose `lead` agent spawns one sub-agent through `runtime.execute`
442    /// (exercising the registered schema + validator), then answers.
443    struct SpawningRunner;
444    #[async_trait::async_trait]
445    impl AgentRunner for SpawningRunner {
446        async fn run(
447            &self,
448            spec: &AgentSpec,
449            task: &str,
450            runtime: &Runtime,
451            _mailbox: &Mailbox,
452        ) -> Result<AgentOutput, MultiError> {
453            if spec.name == "lead" {
454                let proposal = spawn_proposal(serde_json::json!({
455                    "task": "subtask work", "tools": ["fetch"], "name": "helper"
456                }));
457                let _ = runtime.execute(&proposal).await;
458            }
459            Ok(AgentOutput {
460                name: spec.name.clone(),
461                answer: format!("{} done: {}", spec.name, &task[..task.len().min(20)]),
462                turns: 1,
463                tool_calls: 0,
464                duration_ms: 1.0,
465                error: None,
466                outcome: None,
467                tokens: None,
468            })
469        }
470    }
471
472    #[tokio::test]
473    async fn end_to_end_run_records_subtasks() {
474        let main =
475            AgentSpec::new("lead", "lead").with_tools(vec!["fetch".into(), "search".into()]);
476        let runner: Arc<dyn AgentRunner> = Arc::new(SpawningRunner);
477        let infra = SharedInfra::new();
478        let result = SpawnSubtask::new(main)
479            .run("build it", &runner, &infra)
480            .await
481            .unwrap();
482
483        assert!(result.final_answer.contains("lead done"));
484        assert_eq!(result.subtasks.len(), 1);
485        assert_eq!(result.subtasks[0].name, "helper");
486        assert!(result.subtasks[0].success);
487    }
488
489    #[tokio::test]
490    async fn reserved_meta_tool_is_not_granted_to_subagents() {
491        // A parent that (mistakenly) lists "spawn_subtask" among its own tools
492        // must NOT have it leak into the granted set / schema enum — otherwise a
493        // sub-agent could spawn (unbounded recursion) and the schema could be
494        // clobbered by the bare-name registration.
495        let main = AgentSpec::new("lead", "lead")
496            .with_tools(vec!["fetch".into(), SPAWN_SUBTASK_TOOL.into()]);
497        let infra = SharedInfra::new();
498        let rt = infra.make_runtime();
499
500        // Re-run the granted-set derivation the coordinator uses.
501        let granted: Vec<String> = main
502            .tools
503            .iter()
504            .filter(|t| t.as_str() != SPAWN_SUBTASK_TOOL)
505            .cloned()
506            .collect();
507        assert_eq!(granted, vec!["fetch".to_string()]);
508
509        // The schema enum must not contain the meta-tool.
510        let schema = spawn_subtask_schema(&granted);
511        let enum_vals = schema.parameters["properties"]["tools"]["items"]["enum"]
512            .as_array()
513            .unwrap();
514        assert!(!enum_vals.iter().any(|v| v == SPAWN_SUBTASK_TOOL));
515
516        // And the executor guard rejects a request for it.
517        rt.register_tool_schema(spawn_subtask_schema(&granted)).await;
518        let _ = rt; // schema wiring covered elsewhere; guard check below
519        let exec = SpawnSubtaskExecutor {
520            parent_tools: granted.into_iter().collect(),
521            subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
522            subagent_max_turns: 5,
523            runner: Arc::new(SimpleRunner),
524            infra_state: infra.state,
525            infra_log: infra.log,
526            infra_policies: infra.policies,
527            budget: infra.budget,
528            records: Arc::new(Mutex::new(Vec::new())),
529        };
530        let err = exec
531            .execute(
532                "spawn_subtask",
533                &serde_json::json!({ "task": "recurse", "tools": [SPAWN_SUBTASK_TOOL] }),
534            )
535            .await
536            .unwrap_err();
537        assert!(err.contains("privilege escalation"));
538    }
539
540    #[tokio::test]
541    async fn validator_rejects_out_of_subset_tool_via_schema_enum() {
542        // The schema enum is the provable subset guard: a proposal listing a tool
543        // outside the parent's set must be rejected by the validator before the
544        // executor spawns anything.
545        let records: Arc<Mutex<Vec<SubtaskRecord>>> = Arc::new(Mutex::new(Vec::new()));
546        let infra = SharedInfra::new();
547        let rt = infra.make_runtime();
548        rt.register_tool_schema(spawn_subtask_schema(&["fetch".into(), "search".into()]))
549            .await;
550        let exec = Arc::new(SpawnSubtaskExecutor {
551            parent_tools: ["fetch", "search"].iter().map(|s| s.to_string()).collect(),
552            subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
553            subagent_max_turns: 5,
554            runner: Arc::new(SimpleRunner),
555            infra_state: Arc::clone(&infra.state),
556            infra_log: Arc::clone(&infra.log),
557            infra_policies: Arc::clone(&infra.policies),
558            budget: Arc::clone(&infra.budget),
559            records: Arc::clone(&records),
560        });
561        rt.set_executor(exec).await;
562
563        let proposal = spawn_proposal(serde_json::json!({
564            "task": "escalate", "tools": ["delete_everything"]
565        }));
566        let result = rt.execute(&proposal).await;
567
568        assert!(!result.all_succeeded(), "out-of-subset tool must not succeed");
569        assert!(
570            records.lock().await.is_empty(),
571            "executor must not spawn when the validator rejects the call"
572        );
573    }
574}