use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use agent_client_protocol_schema::{
Content, ContentBlock, SessionId, TextContent, ToolCallContent, ToolCallUpdateFields, ToolKind,
};
use futures::StreamExt;
use futures::future::BoxFuture;
use serde::Deserialize;
use serde_json::json;
use crate::error::BoxError;
use crate::event::AgentEvent;
use crate::hooks::{HookEngine, NoopHookEngine};
use crate::llm::{HostedCapabilities, MessageContent, ProviderRegistry, Role, SamplingParams};
use crate::policy::{NonInteractivePolicy, SandboxPolicy};
use crate::session::{
EventEmitter, History, PermissionGate, RequestAuditTracker, StaticToolRegistry, ToolRegistry,
TurnConfig, TurnRequestLimit, TurnRunner, VecHistory,
};
use crate::tool::{
SafetyClass, Tool, ToolCallDescription, ToolContext, ToolError, ToolEvent, ToolSchema,
ToolStream,
};
pub(crate) const SPAWN_AGENT_TOOL_NAME: &str = "spawn_agent";
#[derive(Clone)]
pub struct SubagentProfile {
pub description: String,
pub model: Option<String>,
pub system_prompt: String,
pub tool_allow: Vec<String>,
pub sampling: Option<SamplingParams>,
pub hooks: Option<Arc<dyn HookEngine>>,
}
impl std::fmt::Debug for SubagentProfile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubagentProfile")
.field("description", &self.description)
.field("model", &self.model)
.field("system_prompt", &self.system_prompt)
.field("tool_allow", &self.tool_allow)
.field("sampling", &self.sampling)
.field("hooks", &self.hooks.as_ref().map(|_| "<engine>"))
.finish()
}
}
pub struct SpawnAgentTool {
schema: ToolSchema,
profiles: Arc<BTreeMap<String, SubagentProfile>>,
registry: Arc<ProviderRegistry>,
policy: Arc<dyn SandboxPolicy>,
process_tools: Arc<dyn ToolRegistry>,
base_prompt: Option<String>,
}
impl SpawnAgentTool {
pub fn new(
profiles: Arc<BTreeMap<String, SubagentProfile>>,
registry: Arc<ProviderRegistry>,
policy: Arc<dyn SandboxPolicy>,
process_tools: Arc<dyn ToolRegistry>,
base_prompt: Option<String>,
) -> Self {
let schema = build_schema(&profiles);
Self {
schema,
profiles,
registry,
policy,
process_tools,
base_prompt,
}
}
pub fn has_profiles(profiles: &BTreeMap<String, SubagentProfile>) -> bool {
!profiles.is_empty()
}
}
fn build_schema(profiles: &BTreeMap<String, SubagentProfile>) -> ToolSchema {
let names: Vec<&str> = profiles.keys().map(String::as_str).collect();
let catalog = profiles
.iter()
.map(|(name, p)| format!("- {name}: {}", p.description))
.collect::<Vec<_>>()
.join("\n");
let description = format!(
"Delegate a task to a specialized subagent that runs in a fresh, isolated context. \
The subagent returns only its final summary, not its intermediate work. \
Pick the profile whose description best matches the task.\n\n\
When you have multiple independent pieces of work, emit several `spawn_agent` \
calls in a single message: they run concurrently (fanout), so the total wait is \
the slowest subagent rather than their sum. Only spawn one at a time when a later \
task genuinely depends on an earlier subagent's result.\n\n\
Available profiles:\n{catalog}"
);
ToolSchema {
name: SPAWN_AGENT_TOOL_NAME.to_string(),
description,
input_schema: json!({
"type": "object",
"properties": {
"profile": {
"type": "string",
"enum": names,
"description": "Which subagent to spawn. See the tool description for what each profile does."
},
"task": {
"type": "string",
"description": "The complete task for the subagent, as a self-contained \
natural-language instruction. The subagent has none of this \
conversation's context — include everything it needs."
},
"model": {
"type": "string",
"description": "Optional model override for this subagent. When omitted, \
the profile's configured model is used, falling back to the \
parent session's current model. Only set this when a task \
needs a specifically more or less capable model than the default."
},
"run_in_background": {
"type": "boolean",
"description": "When true, spawn the subagent asynchronously and return \
immediately with a task id, without waiting for it to finish. \
The subagent's result is delivered back to you later, on a \
subsequent turn, so you can keep working in the meantime. \
Leave false (the default) when the next step depends on this \
subagent's result — then the call blocks until it completes."
}
},
"required": ["profile", "task"]
}),
}
}
#[derive(Debug, Deserialize)]
struct SpawnArgs {
profile: String,
task: String,
#[serde(default)]
model: Option<String>,
#[serde(default)]
run_in_background: bool,
}
impl Tool for SpawnAgentTool {
fn schema(&self) -> &ToolSchema {
&self.schema
}
fn safety_hint(&self, _args: &serde_json::Value) -> SafetyClass {
SafetyClass::Mutating
}
fn describe<'a>(
&'a self,
args: &'a serde_json::Value,
_ctx: ToolContext<'a>,
) -> BoxFuture<'a, ToolCallDescription> {
Box::pin(async move {
let profile = args.get("profile").and_then(|v| v.as_str()).unwrap_or("?");
let mut fields = ToolCallUpdateFields::default();
fields.title = Some(format!("Spawn subagent `{profile}`"));
fields.kind = Some(ToolKind::Think);
ToolCallDescription { fields }
})
}
fn execute(&self, args: serde_json::Value, ctx: ToolContext<'_>) -> ToolStream {
let profiles = self.profiles.clone();
let registry = self.registry.clone();
let policy = ctx.policy.clone().unwrap_or_else(|| self.policy.clone());
let process_tools = self.process_tools.clone();
let base_prompt = self.base_prompt.clone();
let cwd = ctx.cwd.to_path_buf();
let fs = ctx.fs.clone();
let shell = ctx.shell.clone();
let http = ctx.http.clone();
let parent_model = ctx.current_model.to_string();
let parent_provider = ctx.current_provider.to_string();
let background = ctx.background.clone();
let bridge = ctx.subagent_bridge.clone();
let subagent_depth = ctx.subagent_depth;
let turn_cancel = ctx.cancel.child_token();
let parsed: Result<SpawnArgs, _> = serde_json::from_value(args.clone());
let fut = async move {
let parsed = match parsed {
Ok(p) => p,
Err(err) => return ToolEvent::Failed(ToolError::InvalidArgs(BoxError::new(err))),
};
if subagent_depth == 0 {
return ToolEvent::Failed(ToolError::InvalidArgs(BoxError::new(io_err(
"subagent recursion depth exhausted: this agent is not allowed to spawn \
further subagents"
.to_string(),
))));
}
if parsed.run_in_background {
let Some(bg) = background else {
return ToolEvent::Failed(ToolError::InvalidArgs(BoxError::new(io_err(
"run_in_background is not available in this context (nested subagents \
cannot spawn background tasks)"
.to_string(),
))));
};
let label = parsed.profile.clone();
let deps = SubagentDeps {
profiles,
registry,
policy,
process_tools,
base_prompt,
cwd,
fs,
shell,
http,
parent_model,
parent_provider,
subagent_depth,
bridge,
task_handle: None,
};
let label_for_log = parsed.profile.clone();
let task_id = bg.spawn(label, move |task_cancel, task_handle| async move {
let mut deps = deps;
deps.task_handle = Some(task_handle);
match run_subagent_core(parsed, deps, task_cancel).await {
Ok(answer) => crate::session::BackgroundResult::Completed(answer),
Err(err) => {
tracing::warn!(
profile = %label_for_log,
error = %err,
"background subagent failed"
);
crate::session::BackgroundResult::Failed(err.to_string())
}
}
});
let msg = format!(
"Started background subagent `{}`, task id `{}`. Its result will arrive on a \
later turn.",
parsed_profile_for_msg(&args),
task_id
);
let mut fields = ToolCallUpdateFields::default();
fields.content = Some(vec![ToolCallContent::Content(Content::new(
ContentBlock::Text(TextContent::new(msg.clone())),
))]);
fields.raw_output = Some(serde_json::Value::String(msg));
return ToolEvent::Completed(fields);
}
let deps = SubagentDeps {
profiles,
registry,
policy,
process_tools,
base_prompt,
cwd,
fs,
shell,
http,
parent_model,
parent_provider,
subagent_depth,
bridge,
task_handle: None,
};
match run_subagent_core(parsed, deps, turn_cancel).await {
Ok(answer) => {
let mut fields = ToolCallUpdateFields::default();
fields.content = Some(vec![ToolCallContent::Content(Content::new(
ContentBlock::Text(TextContent::new(answer.clone())),
))]);
fields.raw_output = Some(serde_json::Value::String(answer));
ToolEvent::Completed(fields)
}
Err(err) => ToolEvent::Failed(err),
}
};
let s: Pin<Box<dyn futures::Stream<Item = ToolEvent> + Send>> =
Box::pin(futures::stream::once(fut));
s
}
}
struct SubagentDeps {
profiles: Arc<BTreeMap<String, SubagentProfile>>,
registry: Arc<ProviderRegistry>,
policy: Arc<dyn SandboxPolicy>,
process_tools: Arc<dyn ToolRegistry>,
base_prompt: Option<String>,
cwd: std::path::PathBuf,
fs: Arc<dyn crate::fs::FsBackend>,
shell: Arc<dyn crate::shell::ShellBackend>,
http: Arc<dyn crate::http::HttpClient>,
parent_model: String,
parent_provider: String,
subagent_depth: u32,
bridge: Option<crate::tool::SubagentBridge>,
task_handle: Option<crate::session::TaskHandle>,
}
fn parsed_profile_for_msg(args: &serde_json::Value) -> String {
args.get("profile")
.and_then(|v| v.as_str())
.unwrap_or("?")
.to_string()
}
async fn run_subagent_core(
parsed: SpawnArgs,
deps: SubagentDeps,
cancel: tokio_util::sync::CancellationToken,
) -> Result<String, ToolError> {
let SubagentDeps {
profiles,
registry,
policy,
process_tools,
base_prompt,
cwd,
fs,
shell,
http,
parent_model,
parent_provider,
subagent_depth,
bridge,
task_handle,
} = deps;
let Some(profile) = profiles.get(&parsed.profile) else {
return Err(ToolError::InvalidArgs(BoxError::new(io_err(format!(
"unknown profile `{}`; available: {}",
parsed.profile,
profiles.keys().cloned().collect::<Vec<_>>().join(", ")
)))));
};
let model_override = parsed.model.clone().or_else(|| profile.model.clone());
let inherits_parent = model_override.is_none();
let model = model_override.unwrap_or(parent_model);
let entry = if inherits_parent && !parent_provider.is_empty() {
registry.entry_for(&parent_provider, &model)
} else {
registry.first_entry_for_model(&model)
};
let Some(entry) = entry else {
return Err(ToolError::Execution(BoxError::new(io_err(format!(
"subagent model `{model}` is not declared by any provider entry"
)))));
};
let provider = entry.provider().clone();
let child_depth = subagent_depth - 1;
let mut builder = StaticToolRegistry::builder();
for name in &profile.tool_allow {
if name == SPAWN_AGENT_TOOL_NAME {
if child_depth > 0 {
let child_spawn = SpawnAgentTool::new(
profiles.clone(),
registry.clone(),
policy.clone(),
process_tools.clone(),
base_prompt.clone(),
);
builder = builder.insert(Arc::new(child_spawn));
}
continue;
}
match process_tools.get(name) {
Some(tool) => builder = builder.insert(tool),
None => {
return Err(ToolError::InvalidArgs(BoxError::new(io_err(format!(
"profile `{}` allows unknown tool `{name}`",
parsed.profile
)))));
}
}
}
let sub_tools = builder.build();
let mut sections = Vec::new();
if let Some(bp) = base_prompt.as_deref()
&& !bp.is_empty()
{
sections.push(bp.to_string());
}
if !profile.system_prompt.is_empty() {
sections.push(profile.system_prompt.clone());
}
let system_prompt: Option<Arc<str>> =
(!sections.is_empty()).then(|| Arc::from(sections.join("\n\n").as_str()));
let history: Arc<dyn History> = Arc::new(VecHistory::new());
if let Some(handle) = &task_handle {
handle.attach_history(history.clone());
}
let events = Arc::new(EventEmitter::new());
let bridge_task = bridge.map(|b| {
let mut sub_events = events.subscribe();
let agent_type = parsed.profile.clone();
tokio::spawn(async move {
while let Some(ev) = sub_events.next().await {
let forwarded = match ev {
AgentEvent::Subagent {
mut ancestor_path,
agent_type: deeper,
inner,
} => {
ancestor_path.insert(0, b.parent_tool_call_id.clone());
AgentEvent::Subagent {
ancestor_path,
agent_type: deeper,
inner,
}
}
leaf => AgentEvent::Subagent {
ancestor_path: vec![b.parent_tool_call_id.clone()],
agent_type: agent_type.clone(),
inner: Box::new(leaf),
},
};
b.parent_events.emit(forwarded).await;
}
})
});
let permissions = PermissionGate::new();
let sub_policy: Arc<dyn SandboxPolicy> = Arc::new(NonInteractivePolicy::new(policy));
let noop = NoopHookEngine;
let hooks: &dyn HookEngine = match &profile.hooks {
Some(engine) => engine.as_ref(),
None => &noop,
};
let session_id = SessionId::new(format!("subagent-{}", parsed.profile));
let audit = RequestAuditTracker::new();
let config = TurnConfig {
model: model.clone(),
sampling: profile.sampling.clone().unwrap_or_default(),
request_limit: TurnRequestLimit::Fixed(32),
subagent_max_depth: child_depth,
..TurnConfig::default()
};
let runner = TurnRunner {
history: history.as_ref(),
tools: &sub_tools,
provider: provider.as_ref(),
policy: sub_policy,
events: events.clone(),
permissions: &permissions,
cancel: cancel.clone(),
config: &config,
system_prompt,
cwd: &cwd,
fs,
shell,
http,
hosted_capabilities: HostedCapabilities::default(),
hooks,
session_id: &session_id,
request_audit: &audit,
background: None,
goal: None,
compaction_slot: None,
history_arc: None,
provider_arc: Some(provider.clone()),
session_cancel: None,
ingest_source: crate::hooks::step::IngestSource::User,
};
let prompt = vec![ContentBlock::Text(TextContent::new(parsed.task))];
let run_result = runner.run(prompt).await;
drop(runner);
drop(events);
if let Some(task) = bridge_task {
let _ = task.await;
}
if let Err(err) = run_result {
return Err(ToolError::Execution(BoxError::new(io_err(format!(
"subagent turn failed: {err}"
)))));
}
Ok(last_assistant_text(&history.snapshot()))
}
fn last_assistant_text(history: &[crate::llm::Message]) -> String {
history
.iter()
.rev()
.find(|m| m.role == Role::Assistant)
.map(|m| {
m.content
.iter()
.filter_map(|c| match c {
MessageContent::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("")
})
.unwrap_or_default()
}
fn io_err(msg: String) -> std::io::Error {
std::io::Error::other(msg)
}
#[cfg(test)]
mod tests;