1use crate::error::MultiError;
35use crate::mailbox::Mailbox;
36use crate::runner::AgentRunner;
37use crate::shared::SharedInfra;
38use crate::types::AgentSpec;
39use car_engine::ToolExecutor;
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use tokio::sync::Mutex;
45use tracing::instrument;
46
47pub const SPAWN_SUBTASK_TOOL: &str = "spawn_subtask";
50
51const DEFAULT_SUBAGENT_PROMPT: &str =
53 "You are a focused sub-agent. Complete the single task you are given using \
54 only the tools provided, then return a concise result.";
55
56const DEFAULT_SUBAGENT_MAX_TURNS: u32 = 10;
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct SubtaskRecord {
62 pub name: String,
63 pub task: String,
64 pub tools: Vec<String>,
65 pub result: String,
66 pub success: bool,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct SpawnSubtaskResult {
72 pub task: String,
73 pub final_answer: String,
74 pub subtasks: Vec<SubtaskRecord>,
75}
76
77pub struct SpawnSubtask {
79 pub main: AgentSpec,
80 subagent_prompt: String,
81 subagent_max_turns: u32,
82}
83
84impl SpawnSubtask {
85 pub fn new(main: AgentSpec) -> Self {
86 Self {
87 main,
88 subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
89 subagent_max_turns: DEFAULT_SUBAGENT_MAX_TURNS,
90 }
91 }
92
93 #[instrument(name = "multi.spawn_subtask", skip_all)]
94 pub async fn run(
95 &self,
96 task: &str,
97 runner: &Arc<dyn AgentRunner>,
98 infra: &SharedInfra,
99 ) -> Result<SpawnSubtaskResult, MultiError> {
100 let records = Arc::new(Mutex::new(Vec::<SubtaskRecord>::new()));
101
102 let granted: Vec<String> = self
107 .main
108 .tools
109 .iter()
110 .filter(|t| t.as_str() != SPAWN_SUBTASK_TOOL)
111 .cloned()
112 .collect();
113
114 let rt = infra.make_runtime();
115 for tool in &granted {
116 rt.register_tool(tool).await;
117 }
118 rt.register_tool_schema(spawn_subtask_schema(&granted)).await;
121
122 let executor = Arc::new(SpawnSubtaskExecutor {
123 parent_tools: granted.into_iter().collect(),
124 subagent_prompt: self.subagent_prompt.clone(),
125 subagent_max_turns: self.subagent_max_turns,
126 runner: Arc::clone(runner),
127 infra_state: Arc::clone(&infra.state),
128 infra_log: Arc::clone(&infra.log),
129 infra_policies: Arc::clone(&infra.policies),
130 budget: Arc::clone(&infra.budget),
131 records: Arc::clone(&records),
132 });
133 rt.set_executor(executor).await;
134
135 let mailbox = Mailbox::default();
139 let output = runner
140 .run(&self.main, task, &rt, &mailbox)
141 .await
142 .map_err(|e| MultiError::AgentFailed(self.main.name.clone(), e.to_string()))?;
143
144 let subtasks = records.lock().await.clone();
145 Ok(SpawnSubtaskResult {
146 task: task.to_string(),
147 final_answer: output.answer,
148 subtasks,
149 })
150 }
151}
152
153pub fn spawn_subtask_schema(parent_tools: &[String]) -> car_ir::ToolSchema {
160 car_ir::ToolSchema {
161 name: "spawn_subtask".to_string(),
162 description: "Spawn an isolated sub-agent to handle one focused subtask. \
163 The sub-agent may only use a subset of the tools you yourself have."
164 .to_string(),
165 parameters: serde_json::json!({
166 "type": "object",
167 "properties": {
168 "task": {
169 "type": "string",
170 "description": "The single, self-contained task for the sub-agent."
171 },
172 "tools": {
173 "type": "array",
174 "items": { "type": "string", "enum": parent_tools },
175 "description": "Tools to grant the sub-agent. Must be a subset of your own tools."
176 },
177 "name": {
178 "type": "string",
179 "description": "Short label for the sub-agent (for logs)."
180 }
181 },
182 "required": ["task", "tools"]
183 }),
184 returns: None,
185 idempotent: false,
186 cache_ttl_secs: None,
187 rate_limit: None,
188 }
189}
190
191struct SpawnSubtaskExecutor {
194 parent_tools: HashSet<String>,
195 subagent_prompt: String,
196 subagent_max_turns: u32,
197 runner: Arc<dyn AgentRunner>,
198 infra_state: Arc<car_state::StateStore>,
199 infra_log: Arc<tokio::sync::Mutex<car_eventlog::EventLog>>,
200 infra_policies: Arc<tokio::sync::RwLock<car_policy::PolicyEngine>>,
201 budget: Arc<crate::budget::CoordinationBudget>,
202 records: Arc<Mutex<Vec<SubtaskRecord>>>,
203}
204
205#[async_trait::async_trait]
206impl ToolExecutor for SpawnSubtaskExecutor {
207 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
208 if tool != "spawn_subtask" {
209 return Err(format!("unknown tool: {}", tool));
210 }
211
212 let task = params
213 .get("task")
214 .and_then(|v| v.as_str())
215 .ok_or("spawn_subtask requires 'task' parameter")?;
216 let mut seen = HashSet::new();
219 let requested: Vec<String> = params
220 .get("tools")
221 .and_then(|v| v.as_array())
222 .map(|arr| {
223 arr.iter()
224 .filter_map(|v| v.as_str().map(String::from))
225 .filter(|t| seen.insert(t.clone()))
226 .collect()
227 })
228 .unwrap_or_default();
229 let name = params
230 .get("name")
231 .and_then(|v| v.as_str())
232 .unwrap_or("subtask")
233 .to_string();
234
235 let escalations: Vec<String> = requested
238 .iter()
239 .filter(|t| !self.parent_tools.contains(*t))
240 .cloned()
241 .collect();
242 if !escalations.is_empty() {
243 return Err(format!(
244 "privilege escalation rejected: sub-agent tools {:?} are not a subset of the parent's tools",
245 escalations
246 ));
247 }
248
249 let spec = AgentSpec {
250 name: name.clone(),
251 system_prompt: self.subagent_prompt.clone(),
252 tools: requested.clone(),
253 max_turns: self.subagent_max_turns,
254 metadata: std::collections::HashMap::new(),
255 cache_control: false,
256 };
257
258 if let Err(e) = self.budget.try_begin_agent() {
261 let msg = e.to_string();
262 self.records.lock().await.push(SubtaskRecord {
263 name,
264 task: task.to_string(),
265 tools: requested,
266 result: msg.clone(),
267 success: false,
268 });
269 return Ok(Value::String(msg));
270 }
271
272 let infra = SharedInfra {
273 state: Arc::clone(&self.infra_state),
274 log: Arc::clone(&self.infra_log),
275 policies: Arc::clone(&self.infra_policies),
276 budget: Arc::clone(&self.budget),
277 };
278 let rt = infra.make_runtime();
279 for tool_name in &requested {
280 rt.register_tool(tool_name).await;
281 }
282
283 let mailbox = Mailbox::default();
284 match self.runner.run(&spec, task, &rt, &mailbox).await {
285 Ok(output) => {
286 self.budget.record_output(&output);
287 self.records.lock().await.push(SubtaskRecord {
288 name,
289 task: task.to_string(),
290 tools: requested,
291 result: output.answer.clone(),
292 success: true,
293 });
294 Ok(Value::String(output.answer))
295 }
296 Err(e) => {
297 let msg = format!("sub-agent '{}' failed: {}", name, e);
298 self.records.lock().await.push(SubtaskRecord {
299 name,
300 task: task.to_string(),
301 tools: requested,
302 result: msg.clone(),
303 success: false,
304 });
305 Ok(Value::String(msg))
306 }
307 }
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::types::{AgentOutput, AgentSpec};
315 use car_engine::Runtime;
316
317 struct SimpleRunner;
319
320 #[async_trait::async_trait]
321 impl AgentRunner for SimpleRunner {
322 async fn run(
323 &self,
324 spec: &AgentSpec,
325 task: &str,
326 _runtime: &Runtime,
327 _mailbox: &Mailbox,
328 ) -> Result<AgentOutput, MultiError> {
329 Ok(AgentOutput {
330 name: spec.name.clone(),
331 answer: format!("{} handled: {}", spec.name, &task[..task.len().min(40)]),
332 turns: 1,
333 tool_calls: 0,
334 duration_ms: 1.0,
335 error: None,
336 outcome: None,
337 tokens: None,
338 })
339 }
340 }
341
342 fn test_executor() -> SpawnSubtaskExecutor {
343 let infra = SharedInfra::new();
344 SpawnSubtaskExecutor {
345 parent_tools: ["fetch", "search"].iter().map(|s| s.to_string()).collect(),
346 subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
347 subagent_max_turns: 5,
348 runner: Arc::new(SimpleRunner),
349 infra_state: infra.state,
350 infra_log: infra.log,
351 infra_policies: infra.policies,
352 budget: infra.budget,
353 records: Arc::new(Mutex::new(Vec::new())),
354 }
355 }
356
357 #[test]
358 fn schema_enum_lists_parent_tools_only() {
359 let schema = spawn_subtask_schema(&["fetch".into(), "search".into()]);
360 let enum_vals = schema.parameters["properties"]["tools"]["items"]["enum"]
361 .as_array()
362 .unwrap();
363 assert_eq!(enum_vals.len(), 2);
364 assert!(enum_vals.iter().any(|v| v == "fetch"));
365 assert!(enum_vals.iter().any(|v| v == "search"));
366 }
367
368 #[tokio::test]
369 async fn subset_call_spawns_subagent() {
370 let exec = test_executor();
371 let out = exec
372 .execute(
373 "spawn_subtask",
374 &serde_json::json!({ "task": "grab the page", "tools": ["fetch"], "name": "scraper" }),
375 )
376 .await
377 .unwrap();
378 assert!(out.as_str().unwrap().contains("scraper handled"));
379 let records = exec.records.lock().await;
380 assert_eq!(records.len(), 1);
381 assert!(records[0].success);
382 assert_eq!(records[0].tools, vec!["fetch".to_string()]);
383 }
384
385 #[tokio::test]
386 async fn escalation_is_rejected() {
387 let exec = test_executor();
388 let err = exec
389 .execute(
390 "spawn_subtask",
391 &serde_json::json!({ "task": "do admin", "tools": ["fetch", "delete_everything"] }),
392 )
393 .await
394 .unwrap_err();
395 assert!(err.contains("privilege escalation"));
396 assert!(err.contains("delete_everything"));
397 assert!(exec.records.lock().await.is_empty());
399 }
400
401 #[tokio::test]
402 async fn unknown_tool_is_rejected() {
403 let exec = test_executor();
404 let err = exec
405 .execute("not_spawn", &serde_json::json!({}))
406 .await
407 .unwrap_err();
408 assert!(err.contains("unknown tool"));
409 }
410
411 fn spawn_proposal(params: Value) -> car_ir::ActionProposal {
413 let parameters = params
414 .as_object()
415 .unwrap()
416 .iter()
417 .map(|(k, v)| (k.clone(), v.clone()))
418 .collect();
419 car_ir::ActionProposal {
420 id: "p1".into(),
421 source: "test".into(),
422 actions: vec![car_ir::Action {
423 id: "a1".into(),
424 action_type: car_ir::ActionType::ToolCall,
425 tool: Some("spawn_subtask".into()),
426 parameters,
427 preconditions: vec![],
428 expected_effects: std::collections::HashMap::new(),
429 state_dependencies: vec![],
430 idempotent: false,
431 max_retries: 0,
432 failure_behavior: car_ir::FailureBehavior::Skip,
433 timeout_ms: None,
434 metadata: std::collections::HashMap::new(),
435 }],
436 timestamp: chrono::Utc::now(),
437 context: std::collections::HashMap::new(),
438 }
439 }
440
441 struct SpawningRunner;
444 #[async_trait::async_trait]
445 impl AgentRunner for SpawningRunner {
446 async fn run(
447 &self,
448 spec: &AgentSpec,
449 task: &str,
450 runtime: &Runtime,
451 _mailbox: &Mailbox,
452 ) -> Result<AgentOutput, MultiError> {
453 if spec.name == "lead" {
454 let proposal = spawn_proposal(serde_json::json!({
455 "task": "subtask work", "tools": ["fetch"], "name": "helper"
456 }));
457 let _ = runtime.execute(&proposal).await;
458 }
459 Ok(AgentOutput {
460 name: spec.name.clone(),
461 answer: format!("{} done: {}", spec.name, &task[..task.len().min(20)]),
462 turns: 1,
463 tool_calls: 0,
464 duration_ms: 1.0,
465 error: None,
466 outcome: None,
467 tokens: None,
468 })
469 }
470 }
471
472 #[tokio::test]
473 async fn end_to_end_run_records_subtasks() {
474 let main =
475 AgentSpec::new("lead", "lead").with_tools(vec!["fetch".into(), "search".into()]);
476 let runner: Arc<dyn AgentRunner> = Arc::new(SpawningRunner);
477 let infra = SharedInfra::new();
478 let result = SpawnSubtask::new(main)
479 .run("build it", &runner, &infra)
480 .await
481 .unwrap();
482
483 assert!(result.final_answer.contains("lead done"));
484 assert_eq!(result.subtasks.len(), 1);
485 assert_eq!(result.subtasks[0].name, "helper");
486 assert!(result.subtasks[0].success);
487 }
488
489 #[tokio::test]
490 async fn reserved_meta_tool_is_not_granted_to_subagents() {
491 let main = AgentSpec::new("lead", "lead")
496 .with_tools(vec!["fetch".into(), SPAWN_SUBTASK_TOOL.into()]);
497 let infra = SharedInfra::new();
498 let rt = infra.make_runtime();
499
500 let granted: Vec<String> = main
502 .tools
503 .iter()
504 .filter(|t| t.as_str() != SPAWN_SUBTASK_TOOL)
505 .cloned()
506 .collect();
507 assert_eq!(granted, vec!["fetch".to_string()]);
508
509 let schema = spawn_subtask_schema(&granted);
511 let enum_vals = schema.parameters["properties"]["tools"]["items"]["enum"]
512 .as_array()
513 .unwrap();
514 assert!(!enum_vals.iter().any(|v| v == SPAWN_SUBTASK_TOOL));
515
516 rt.register_tool_schema(spawn_subtask_schema(&granted)).await;
518 let _ = rt; let exec = SpawnSubtaskExecutor {
520 parent_tools: granted.into_iter().collect(),
521 subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
522 subagent_max_turns: 5,
523 runner: Arc::new(SimpleRunner),
524 infra_state: infra.state,
525 infra_log: infra.log,
526 infra_policies: infra.policies,
527 budget: infra.budget,
528 records: Arc::new(Mutex::new(Vec::new())),
529 };
530 let err = exec
531 .execute(
532 "spawn_subtask",
533 &serde_json::json!({ "task": "recurse", "tools": [SPAWN_SUBTASK_TOOL] }),
534 )
535 .await
536 .unwrap_err();
537 assert!(err.contains("privilege escalation"));
538 }
539
540 #[tokio::test]
541 async fn validator_rejects_out_of_subset_tool_via_schema_enum() {
542 let records: Arc<Mutex<Vec<SubtaskRecord>>> = Arc::new(Mutex::new(Vec::new()));
546 let infra = SharedInfra::new();
547 let rt = infra.make_runtime();
548 rt.register_tool_schema(spawn_subtask_schema(&["fetch".into(), "search".into()]))
549 .await;
550 let exec = Arc::new(SpawnSubtaskExecutor {
551 parent_tools: ["fetch", "search"].iter().map(|s| s.to_string()).collect(),
552 subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
553 subagent_max_turns: 5,
554 runner: Arc::new(SimpleRunner),
555 infra_state: Arc::clone(&infra.state),
556 infra_log: Arc::clone(&infra.log),
557 infra_policies: Arc::clone(&infra.policies),
558 budget: Arc::clone(&infra.budget),
559 records: Arc::clone(&records),
560 });
561 rt.set_executor(exec).await;
562
563 let proposal = spawn_proposal(serde_json::json!({
564 "task": "escalate", "tools": ["delete_everything"]
565 }));
566 let result = rt.execute(&proposal).await;
567
568 assert!(!result.all_succeeded(), "out-of-subset tool must not succeed");
569 assert!(
570 records.lock().await.is_empty(),
571 "executor must not spawn when the validator rejects the call"
572 );
573 }
574}