use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::AgentSpec;
use car_engine::ToolExecutor;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::instrument;
pub const SPAWN_SUBTASK_TOOL: &str = "spawn_subtask";
const DEFAULT_SUBAGENT_PROMPT: &str =
"You are a focused sub-agent. Complete the single task you are given using \
only the tools provided, then return a concise result.";
const DEFAULT_SUBAGENT_MAX_TURNS: u32 = 10;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubtaskRecord {
pub name: String,
pub task: String,
pub tools: Vec<String>,
pub result: String,
pub success: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpawnSubtaskResult {
pub task: String,
pub final_answer: String,
pub subtasks: Vec<SubtaskRecord>,
}
pub struct SpawnSubtask {
pub main: AgentSpec,
subagent_prompt: String,
subagent_max_turns: u32,
}
impl SpawnSubtask {
pub fn new(main: AgentSpec) -> Self {
Self {
main,
subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
subagent_max_turns: DEFAULT_SUBAGENT_MAX_TURNS,
}
}
#[instrument(name = "multi.spawn_subtask", skip_all)]
pub async fn run(
&self,
task: &str,
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> Result<SpawnSubtaskResult, MultiError> {
let records = Arc::new(Mutex::new(Vec::<SubtaskRecord>::new()));
let granted: Vec<String> = self
.main
.tools
.iter()
.filter(|t| t.as_str() != SPAWN_SUBTASK_TOOL)
.cloned()
.collect();
let rt = infra.make_runtime();
for tool in &granted {
rt.register_tool(tool).await;
}
rt.register_tool_schema(spawn_subtask_schema(&granted)).await;
let executor = Arc::new(SpawnSubtaskExecutor {
parent_tools: granted.into_iter().collect(),
subagent_prompt: self.subagent_prompt.clone(),
subagent_max_turns: self.subagent_max_turns,
runner: Arc::clone(runner),
infra_state: Arc::clone(&infra.state),
infra_log: Arc::clone(&infra.log),
infra_policies: Arc::clone(&infra.policies),
budget: Arc::clone(&infra.budget),
records: Arc::clone(&records),
});
rt.set_executor(executor).await;
let mailbox = Mailbox::default();
let output = runner
.run(&self.main, task, &rt, &mailbox)
.await
.map_err(|e| MultiError::AgentFailed(self.main.name.clone(), e.to_string()))?;
let subtasks = records.lock().await.clone();
Ok(SpawnSubtaskResult {
task: task.to_string(),
final_answer: output.answer,
subtasks,
})
}
}
pub fn spawn_subtask_schema(parent_tools: &[String]) -> car_ir::ToolSchema {
car_ir::ToolSchema {
name: "spawn_subtask".to_string(),
description: "Spawn an isolated sub-agent to handle one focused subtask. \
The sub-agent may only use a subset of the tools you yourself have."
.to_string(),
parameters: serde_json::json!({
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "The single, self-contained task for the sub-agent."
},
"tools": {
"type": "array",
"items": { "type": "string", "enum": parent_tools },
"description": "Tools to grant the sub-agent. Must be a subset of your own tools."
},
"name": {
"type": "string",
"description": "Short label for the sub-agent (for logs)."
}
},
"required": ["task", "tools"]
}),
returns: None,
idempotent: false,
cache_ttl_secs: None,
rate_limit: None,
}
}
struct SpawnSubtaskExecutor {
parent_tools: HashSet<String>,
subagent_prompt: String,
subagent_max_turns: u32,
runner: Arc<dyn AgentRunner>,
infra_state: Arc<car_state::StateStore>,
infra_log: Arc<tokio::sync::Mutex<car_eventlog::EventLog>>,
infra_policies: Arc<tokio::sync::RwLock<car_policy::PolicyEngine>>,
budget: Arc<crate::budget::CoordinationBudget>,
records: Arc<Mutex<Vec<SubtaskRecord>>>,
}
#[async_trait::async_trait]
impl ToolExecutor for SpawnSubtaskExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
if tool != "spawn_subtask" {
return Err(format!("unknown tool: {}", tool));
}
let task = params
.get("task")
.and_then(|v| v.as_str())
.ok_or("spawn_subtask requires 'task' parameter")?;
let mut seen = HashSet::new();
let requested: Vec<String> = params
.get("tools")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.filter(|t| seen.insert(t.clone()))
.collect()
})
.unwrap_or_default();
let name = params
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("subtask")
.to_string();
let escalations: Vec<String> = requested
.iter()
.filter(|t| !self.parent_tools.contains(*t))
.cloned()
.collect();
if !escalations.is_empty() {
return Err(format!(
"privilege escalation rejected: sub-agent tools {:?} are not a subset of the parent's tools",
escalations
));
}
let spec = AgentSpec {
name: name.clone(),
system_prompt: self.subagent_prompt.clone(),
tools: requested.clone(),
max_turns: self.subagent_max_turns,
metadata: std::collections::HashMap::new(),
cache_control: false,
};
if let Err(e) = self.budget.try_begin_agent() {
let msg = e.to_string();
self.records.lock().await.push(SubtaskRecord {
name,
task: task.to_string(),
tools: requested,
result: msg.clone(),
success: false,
});
return Ok(Value::String(msg));
}
let infra = SharedInfra {
state: Arc::clone(&self.infra_state),
log: Arc::clone(&self.infra_log),
policies: Arc::clone(&self.infra_policies),
budget: Arc::clone(&self.budget),
};
let rt = infra.make_runtime();
for tool_name in &requested {
rt.register_tool(tool_name).await;
}
let mailbox = Mailbox::default();
match self.runner.run(&spec, task, &rt, &mailbox).await {
Ok(output) => {
self.budget.record_output(&output);
self.records.lock().await.push(SubtaskRecord {
name,
task: task.to_string(),
tools: requested,
result: output.answer.clone(),
success: true,
});
Ok(Value::String(output.answer))
}
Err(e) => {
let msg = format!("sub-agent '{}' failed: {}", name, e);
self.records.lock().await.push(SubtaskRecord {
name,
task: task.to_string(),
tools: requested,
result: msg.clone(),
success: false,
});
Ok(Value::String(msg))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{AgentOutput, AgentSpec};
use car_engine::Runtime;
struct SimpleRunner;
#[async_trait::async_trait]
impl AgentRunner for SimpleRunner {
async fn run(
&self,
spec: &AgentSpec,
task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
Ok(AgentOutput {
name: spec.name.clone(),
answer: format!("{} handled: {}", spec.name, &task[..task.len().min(40)]),
turns: 1,
tool_calls: 0,
duration_ms: 1.0,
error: None,
outcome: None,
tokens: None,
tools_used: Vec::new(),
})
}
}
fn test_executor() -> SpawnSubtaskExecutor {
let infra = SharedInfra::new();
SpawnSubtaskExecutor {
parent_tools: ["fetch", "search"].iter().map(|s| s.to_string()).collect(),
subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
subagent_max_turns: 5,
runner: Arc::new(SimpleRunner),
infra_state: infra.state,
infra_log: infra.log,
infra_policies: infra.policies,
budget: infra.budget,
records: Arc::new(Mutex::new(Vec::new())),
}
}
#[test]
fn schema_enum_lists_parent_tools_only() {
let schema = spawn_subtask_schema(&["fetch".into(), "search".into()]);
let enum_vals = schema.parameters["properties"]["tools"]["items"]["enum"]
.as_array()
.unwrap();
assert_eq!(enum_vals.len(), 2);
assert!(enum_vals.iter().any(|v| v == "fetch"));
assert!(enum_vals.iter().any(|v| v == "search"));
}
#[tokio::test]
async fn subset_call_spawns_subagent() {
let exec = test_executor();
let out = exec
.execute(
"spawn_subtask",
&serde_json::json!({ "task": "grab the page", "tools": ["fetch"], "name": "scraper" }),
)
.await
.unwrap();
assert!(out.as_str().unwrap().contains("scraper handled"));
let records = exec.records.lock().await;
assert_eq!(records.len(), 1);
assert!(records[0].success);
assert_eq!(records[0].tools, vec!["fetch".to_string()]);
}
#[tokio::test]
async fn escalation_is_rejected() {
let exec = test_executor();
let err = exec
.execute(
"spawn_subtask",
&serde_json::json!({ "task": "do admin", "tools": ["fetch", "delete_everything"] }),
)
.await
.unwrap_err();
assert!(err.contains("privilege escalation"));
assert!(err.contains("delete_everything"));
assert!(exec.records.lock().await.is_empty());
}
#[tokio::test]
async fn unknown_tool_is_rejected() {
let exec = test_executor();
let err = exec
.execute("not_spawn", &serde_json::json!({}))
.await
.unwrap_err();
assert!(err.contains("unknown tool"));
}
fn spawn_proposal(params: Value) -> car_ir::ActionProposal {
let parameters = params
.as_object()
.unwrap()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
car_ir::ActionProposal {
id: "p1".into(),
source: "test".into(),
actions: vec![car_ir::Action {
id: "a1".into(),
action_type: car_ir::ActionType::ToolCall,
tool: Some("spawn_subtask".into()),
parameters,
preconditions: vec![],
expected_effects: std::collections::HashMap::new(),
state_dependencies: vec![],
read_set: vec![],
write_set: vec![],
assumptions: vec![],
idempotent: false,
max_retries: 0,
failure_behavior: car_ir::FailureBehavior::Skip,
timeout_ms: None,
metadata: std::collections::HashMap::new(),
}],
timestamp: chrono::Utc::now(),
context: std::collections::HashMap::new(),
}
}
struct SpawningRunner;
#[async_trait::async_trait]
impl AgentRunner for SpawningRunner {
async fn run(
&self,
spec: &AgentSpec,
task: &str,
runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
if spec.name == "lead" {
let proposal = spawn_proposal(serde_json::json!({
"task": "subtask work", "tools": ["fetch"], "name": "helper"
}));
let _ = runtime.execute(&proposal).await;
}
Ok(AgentOutput {
name: spec.name.clone(),
answer: format!("{} done: {}", spec.name, &task[..task.len().min(20)]),
turns: 1,
tool_calls: 0,
duration_ms: 1.0,
error: None,
outcome: None,
tokens: None,
tools_used: Vec::new(),
})
}
}
#[tokio::test]
async fn end_to_end_run_records_subtasks() {
let main =
AgentSpec::new("lead", "lead").with_tools(vec!["fetch".into(), "search".into()]);
let runner: Arc<dyn AgentRunner> = Arc::new(SpawningRunner);
let infra = SharedInfra::new();
let result = SpawnSubtask::new(main)
.run("build it", &runner, &infra)
.await
.unwrap();
assert!(result.final_answer.contains("lead done"));
assert_eq!(result.subtasks.len(), 1);
assert_eq!(result.subtasks[0].name, "helper");
assert!(result.subtasks[0].success);
}
#[tokio::test]
async fn reserved_meta_tool_is_not_granted_to_subagents() {
let main = AgentSpec::new("lead", "lead")
.with_tools(vec!["fetch".into(), SPAWN_SUBTASK_TOOL.into()]);
let infra = SharedInfra::new();
let rt = infra.make_runtime();
let granted: Vec<String> = main
.tools
.iter()
.filter(|t| t.as_str() != SPAWN_SUBTASK_TOOL)
.cloned()
.collect();
assert_eq!(granted, vec!["fetch".to_string()]);
let schema = spawn_subtask_schema(&granted);
let enum_vals = schema.parameters["properties"]["tools"]["items"]["enum"]
.as_array()
.unwrap();
assert!(!enum_vals.iter().any(|v| v == SPAWN_SUBTASK_TOOL));
rt.register_tool_schema(spawn_subtask_schema(&granted)).await;
let _ = rt; let exec = SpawnSubtaskExecutor {
parent_tools: granted.into_iter().collect(),
subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
subagent_max_turns: 5,
runner: Arc::new(SimpleRunner),
infra_state: infra.state,
infra_log: infra.log,
infra_policies: infra.policies,
budget: infra.budget,
records: Arc::new(Mutex::new(Vec::new())),
};
let err = exec
.execute(
"spawn_subtask",
&serde_json::json!({ "task": "recurse", "tools": [SPAWN_SUBTASK_TOOL] }),
)
.await
.unwrap_err();
assert!(err.contains("privilege escalation"));
}
#[tokio::test]
async fn validator_rejects_out_of_subset_tool_via_schema_enum() {
let records: Arc<Mutex<Vec<SubtaskRecord>>> = Arc::new(Mutex::new(Vec::new()));
let infra = SharedInfra::new();
let rt = infra.make_runtime();
rt.register_tool_schema(spawn_subtask_schema(&["fetch".into(), "search".into()]))
.await;
let exec = Arc::new(SpawnSubtaskExecutor {
parent_tools: ["fetch", "search"].iter().map(|s| s.to_string()).collect(),
subagent_prompt: DEFAULT_SUBAGENT_PROMPT.to_string(),
subagent_max_turns: 5,
runner: Arc::new(SimpleRunner),
infra_state: Arc::clone(&infra.state),
infra_log: Arc::clone(&infra.log),
infra_policies: Arc::clone(&infra.policies),
budget: Arc::clone(&infra.budget),
records: Arc::clone(&records),
});
rt.set_executor(exec).await;
let proposal = spawn_proposal(serde_json::json!({
"task": "escalate", "tools": ["delete_everything"]
}));
let result = rt.execute(&proposal).await;
assert!(!result.all_succeeded(), "out-of-subset tool must not succeed");
assert!(
records.lock().await.is_empty(),
"executor must not spawn when the validator rejects the call"
);
}
}