use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::Value;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::error::{CoreError, Result as CoreResult};
use crate::event::EventSink;
use crate::message::{AgentMessage, ContentBlock, Role};
use crate::model::{Model, ModelProvider};
use crate::policy::ToolPolicy;
use crate::runner::{run_agent, RunConfig, RunOutcome};
use crate::tool::{InvokeContext, Tool, ToolDefinition, ToolResult};
pub const DEFAULT_MAX_DEPTH: usize = 5;
pub const DEFAULT_MAX_DELEGATIONS: usize = 64;
#[derive(Clone)]
pub struct SubagentProfile {
pub name: String,
pub description: String,
pub instructions: String,
pub model: Option<Model>,
pub config: Option<RunConfig>,
pub tools: Vec<Arc<dyn Tool>>,
pub subagents: Vec<SubagentProfile>,
}
impl SubagentProfile {
#[must_use]
pub fn new(name: impl Into<String>, instructions: impl Into<String>) -> Self {
Self {
name: name.into(),
description: String::new(),
instructions: instructions.into(),
model: None,
config: None,
tools: Vec::new(),
subagents: Vec::new(),
}
}
#[must_use]
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = description.into();
self
}
#[must_use]
pub fn with_model(mut self, model: Model) -> Self {
self.model = Some(model);
self
}
#[must_use]
pub fn with_config(mut self, config: RunConfig) -> Self {
self.config = Some(config);
self
}
#[must_use]
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.tools.push(tool);
self
}
#[must_use]
pub fn with_subagent(mut self, subagent: SubagentProfile) -> Self {
self.subagents.push(subagent);
self
}
}
#[derive(Clone, Copy, Debug)]
pub struct SubagentOptions {
pub max_depth: usize,
pub max_delegations: usize,
}
impl Default for SubagentOptions {
fn default() -> Self {
Self {
max_depth: DEFAULT_MAX_DEPTH,
max_delegations: DEFAULT_MAX_DELEGATIONS,
}
}
}
pub struct TaskTool {
provider: Arc<dyn ModelProvider>,
parent_model: Model,
parent_config: RunConfig,
subagents: Vec<SubagentProfile>,
max_depth: usize,
depth: usize,
cancel: CancellationToken,
event_sink: Option<Arc<dyn EventSink>>,
policy: Option<Arc<dyn ToolPolicy>>,
remaining_delegations: Arc<AtomicUsize>,
}
impl TaskTool {
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
provider: Arc<dyn ModelProvider>,
parent_model: Model,
parent_config: RunConfig,
subagents: Vec<SubagentProfile>,
options: SubagentOptions,
cancel: CancellationToken,
event_sink: Option<Arc<dyn EventSink>>,
policy: Option<Arc<dyn ToolPolicy>>,
) -> Self {
Self {
provider,
parent_model,
parent_config,
subagents,
max_depth: options.max_depth,
depth: 0,
cancel,
event_sink,
policy,
remaining_delegations: Arc::new(AtomicUsize::new(options.max_delegations)),
}
}
fn child(
&self,
subagents: Vec<SubagentProfile>,
parent_model: Model,
parent_config: RunConfig,
) -> Self {
Self {
provider: Arc::clone(&self.provider),
parent_model,
parent_config,
subagents,
max_depth: self.max_depth,
depth: self.depth + 1,
cancel: self.cancel.clone(),
event_sink: self.event_sink.as_ref().map(Arc::clone),
policy: self.policy.as_ref().map(Arc::clone),
remaining_delegations: Arc::clone(&self.remaining_delegations),
}
}
fn resolve(&self, name: &str) -> Option<&SubagentProfile> {
self.subagents.iter().find(|s| s.name == name)
}
fn max_delegations_hint(&self) -> usize {
self.remaining_delegations.load(Ordering::Relaxed) + 1
}
async fn delegate(&self, profile: &SubagentProfile, prompt: String) -> CoreResult<RunOutcome> {
let child_model = profile
.model
.clone()
.unwrap_or_else(|| self.parent_model.clone());
let child_config = profile
.config
.clone()
.unwrap_or_else(|| self.parent_config.clone());
let mut child_tools: Vec<Arc<dyn Tool>> = Vec::new();
if !profile.subagents.is_empty() {
let child_task = self.child(
profile.subagents.clone(),
child_model.clone(),
child_config.clone(),
);
child_tools.push(Arc::new(child_task));
}
child_tools.extend(profile.tools.clone());
let child_session = Uuid::new_v4();
let mut child_messages = vec![
AgentMessage {
role: Role::System,
content: vec![ContentBlock::Text {
text: profile.instructions.clone(),
}],
},
AgentMessage {
role: Role::User,
content: vec![ContentBlock::Text { text: prompt }],
},
];
let child_hooks = crate::event::RunHooks {
session_id: Some(child_session),
turn_sink: None,
event_sink: self.event_sink.as_deref(),
policy: self.policy.as_deref(),
};
run_agent(
self.provider.as_ref(),
&child_tools,
&mut child_messages,
&child_model,
&child_config,
&self.cancel,
&child_hooks,
)
.await
}
}
#[async_trait]
impl Tool for TaskTool {
fn definition(&self) -> ToolDefinition {
let mut desc = String::from(
"Delegate a focused subtask to a named subagent. The subagent runs \
in a fresh context and its answer is returned to you. Call this \
only when a declared subagent is well-suited to the work. \
Available subagents:",
);
if self.subagents.is_empty() {
desc.push_str(" (none declared)");
} else {
for s in &self.subagents {
let guidance = if s.description.trim().is_empty() {
"(no description provided)"
} else {
s.description.trim()
};
desc.push_str(&format!("\n - \"{}\": {}", s.name, guidance));
}
}
let mut fields = serde_json::Map::new();
fields.insert("type".into(), Value::String("object".into()));
fields.insert(
"properties".into(),
serde_json::json!({
"agent": {
"type": "string",
"description": "The name of the declared subagent to delegate to."
},
"prompt": {
"type": "string",
"description": "The task to give the subagent (it sees this, not your conversation history)."
}
}),
);
fields.insert(
"required".into(),
Value::Array(vec![
Value::String("agent".into()),
Value::String("prompt".into()),
]),
);
ToolDefinition {
name: "task".into(),
label: "Task".into(),
description: desc,
parameters: crate::tool::ParameterSchema {
fields: fields.into_iter().collect(),
},
}
}
async fn execute(&self, ctx: InvokeContext, input: Value) -> CoreResult<ToolResult> {
let obj = input.as_object().ok_or_else(|| {
CoreError::ToolInputValidation("task tool expects an object input".into())
})?;
let agent = obj.get("agent").and_then(Value::as_str).ok_or_else(|| {
CoreError::ToolInputValidation("task tool requires a string `agent`".into())
})?;
let prompt = obj.get("prompt").and_then(Value::as_str).ok_or_else(|| {
CoreError::ToolInputValidation("task tool requires a string `prompt`".into())
})?;
let profile = match self.resolve(agent) {
Some(p) => p,
None => {
let known: Vec<&str> = self.subagents.iter().map(|s| s.name.as_str()).collect();
return Err(CoreError::ToolInputValidation(format!(
"subagent not declared: \"{agent}\" (known: {})",
known.join(", ")
)));
}
};
let prev = self.remaining_delegations.fetch_sub(1, Ordering::Relaxed);
if prev == 0 {
self.remaining_delegations.fetch_add(1, Ordering::Relaxed);
return Err(CoreError::ToolInputValidation(format!(
"delegation budget exhausted (max {} total delegations across the tree)",
self.max_delegations_hint()
)));
}
if self.depth >= self.max_depth {
self.remaining_delegations.fetch_add(1, Ordering::Relaxed);
return Err(CoreError::ToolInputValidation(format!(
"delegation depth exceeded (depth {} >= max_depth {})",
self.depth, self.max_depth
)));
}
if ctx.cancel.is_cancelled() {
return Err(CoreError::Cancelled("task delegation cancelled".into()));
}
let outcome = self.delegate(profile, prompt.to_string()).await?;
Ok(ToolResult {
content: vec![serde_json::json!({
"type": "text",
"text": if outcome.final_text.trim().is_empty() {
"(subagent returned no text)".to_string()
} else {
outcome.final_text
},
})],
details: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{ModelRequest, ModelResponse};
fn dummy_profile(name: &str) -> SubagentProfile {
SubagentProfile::new(name, "you are a helper")
}
fn top_level_tool(profiles: Vec<SubagentProfile>, max_depth: usize) -> TaskTool {
TaskTool::new(
test_provider(),
Model {
id: "test/model".into(),
},
RunConfig::default(),
profiles,
SubagentOptions {
max_depth,
max_delegations: DEFAULT_MAX_DELEGATIONS,
},
CancellationToken::new(),
None,
None,
)
}
fn test_provider() -> Arc<dyn ModelProvider> {
use async_trait::async_trait;
struct TestProvider;
#[async_trait]
impl ModelProvider for TestProvider {
async fn invoke(
&self,
_request: crate::model::ModelRequest,
) -> CoreResult<crate::model::ModelResponse> {
Ok(crate::model::ModelResponse {
messages: vec![crate::message::AgentMessage {
role: crate::message::Role::Assistant,
content: vec![crate::message::ContentBlock::Text {
text: "child done".into(),
}],
}],
})
}
}
Arc::new(TestProvider)
}
#[test]
fn definition_lists_declared_subagents() {
let profiles = vec![
dummy_profile("reviewer").with_description("Review changes."),
dummy_profile("classifier").with_description("Classify issues."),
];
let tool = top_level_tool(profiles, DEFAULT_MAX_DEPTH);
let def = tool.definition();
assert_eq!(def.name, "task");
assert_eq!(def.label, "Task");
assert!(def.description.contains("\"reviewer\""), "missing reviewer");
assert!(def.description.contains("Review changes."));
assert!(def.description.contains("\"classifier\""));
assert!(def.description.contains("Classify issues."));
}
#[test]
fn definition_handles_no_subagents() {
let tool = top_level_tool(vec![], DEFAULT_MAX_DEPTH);
let def = tool.definition();
assert!(def.description.contains("(none declared)"));
}
#[test]
fn definition_schema_requires_agent_and_prompt() {
let tool = top_level_tool(vec![dummy_profile("a")], DEFAULT_MAX_DEPTH);
let def = tool.definition();
let required = def
.parameters
.fields
.get("required")
.and_then(|v| v.as_array())
.expect("required array");
let names: Vec<&str> = required.iter().filter_map(Value::as_str).collect();
assert!(names.contains(&"agent"));
assert!(names.contains(&"prompt"));
}
#[tokio::test]
async fn unknown_agent_returns_error() {
let tool = top_level_tool(vec![dummy_profile("reviewer")], DEFAULT_MAX_DEPTH);
let ctx = InvokeContext {
tool_call_id: "c1".into(),
cancel: CancellationToken::new(),
};
let err = tool
.execute(ctx, serde_json::json!({ "agent": "ghost", "prompt": "hi" }))
.await
.expect_err("unknown agent should error");
let msg = err.to_string();
assert!(msg.contains("not declared"), "msg: {msg}");
assert!(msg.contains("ghost"));
assert!(msg.contains("reviewer"));
}
#[tokio::test]
async fn depth_exceeded_at_max_zero() {
let tool = top_level_tool(vec![dummy_profile("a")], 0);
let ctx = InvokeContext {
tool_call_id: "c2".into(),
cancel: CancellationToken::new(),
};
let err = tool
.execute(ctx, serde_json::json!({ "agent": "a", "prompt": "hi" }))
.await
.expect_err("depth should exceed");
let msg = err.to_string();
assert!(msg.contains("depth exceeded"), "msg: {msg}");
assert!(msg.contains("max_depth 0"));
}
#[tokio::test]
async fn budget_exhaustion_blocks_delegation() {
let tool = TaskTool::new(
test_provider(),
Model {
id: "test/model".into(),
},
RunConfig::default(),
vec![dummy_profile("worker")],
SubagentOptions {
max_depth: DEFAULT_MAX_DEPTH,
max_delegations: 1,
},
CancellationToken::new(),
None,
None,
);
let ctx1 = InvokeContext {
tool_call_id: "b1".into(),
cancel: CancellationToken::new(),
};
let r1 = tool
.execute(
ctx1,
serde_json::json!({ "agent": "worker", "prompt": "go" }),
)
.await
.expect("first delegation succeeds");
assert_eq!(r1.content.len(), 1);
let ctx2 = InvokeContext {
tool_call_id: "b2".into(),
cancel: CancellationToken::new(),
};
let err = tool
.execute(
ctx2,
serde_json::json!({ "agent": "worker", "prompt": "again" }),
)
.await
.expect_err("budget should be exhausted");
let msg = err.to_string();
assert!(msg.contains("budget exhausted"), "msg: {msg}");
}
#[tokio::test]
async fn delegate_runs_child_and_returns_text() {
let tool = top_level_tool(vec![dummy_profile("worker")], DEFAULT_MAX_DEPTH);
let ctx = InvokeContext {
tool_call_id: "c3".into(),
cancel: CancellationToken::new(),
};
let result = tool
.execute(
ctx,
serde_json::json!({ "agent": "worker", "prompt": "do it" }),
)
.await
.expect("delegation should succeed");
assert_eq!(result.content.len(), 1);
let text = result.content[0]
.get("text")
.and_then(Value::as_str)
.expect("text");
assert_eq!(text, "child done");
}
#[tokio::test]
async fn cancellation_aborts_before_child_spawn() {
let tool = top_level_tool(vec![dummy_profile("a")], DEFAULT_MAX_DEPTH);
let cancel = CancellationToken::new();
let ctx = InvokeContext {
tool_call_id: "c4".into(),
cancel: cancel.clone(),
};
cancel.cancel();
let err = tool
.execute(ctx, serde_json::json!({ "agent": "a", "prompt": "hi" }))
.await
.expect_err("should be cancelled");
assert!(matches!(err, CoreError::Cancelled(_)), "err: {err}");
}
struct ScriptedProvider {
responses: std::sync::Mutex<std::collections::VecDeque<ModelResponse>>,
}
impl ScriptedProvider {
fn new(responses: Vec<Vec<AgentMessage>>) -> Self {
let responses = responses
.into_iter()
.map(|msgs| ModelResponse { messages: msgs })
.collect();
Self {
responses: std::sync::Mutex::new(responses),
}
}
}
#[async_trait]
impl ModelProvider for ScriptedProvider {
async fn invoke(&self, _request: ModelRequest) -> CoreResult<ModelResponse> {
let next = self
.responses
.lock()
.unwrap()
.pop_front()
.unwrap_or(ModelResponse { messages: vec![] });
Ok(next)
}
}
fn assistant_text(t: &str) -> AgentMessage {
AgentMessage {
role: Role::Assistant,
content: vec![ContentBlock::Text { text: t.into() }],
}
}
fn parent_task_call(agent: &str, prompt: &str) -> AgentMessage {
AgentMessage {
role: Role::Assistant,
content: vec![ContentBlock::ToolUse {
id: "call_1".into(),
call: crate::tool::ToolCall {
name: "task".into(),
input: serde_json::json!({ "agent": agent, "prompt": prompt }),
},
}],
}
}
#[tokio::test]
async fn integration_parent_delegates_and_child_answers() {
let provider: Arc<dyn ModelProvider> = Arc::new(ScriptedProvider::new(vec![
vec![parent_task_call("worker", "do the work")],
vec![assistant_text("child done")],
vec![assistant_text("got: child done")],
]));
let cancel = CancellationToken::new();
let task = Arc::new(TaskTool::new(
Arc::clone(&provider),
Model {
id: "test/m".into(),
},
RunConfig::default(),
vec![dummy_profile("worker")],
SubagentOptions::default(),
cancel.clone(),
None,
None,
));
let tools: Vec<Arc<dyn Tool>> = vec![task];
let mut messages = vec![
AgentMessage {
role: Role::System,
content: vec![ContentBlock::Text {
text: "be brief".into(),
}],
},
AgentMessage {
role: Role::User,
content: vec![ContentBlock::Text {
text: "delegate the work".into(),
}],
},
];
let outcome = run_agent(
provider.as_ref(),
&tools,
&mut messages,
&Model {
id: "test/m".into(),
},
&RunConfig::default(),
&cancel,
&crate::event::RunHooks::default(),
)
.await
.expect("parent run");
assert_eq!(outcome.turns, 2);
assert_eq!(outcome.final_text, "got: child done");
}
#[tokio::test]
async fn integration_nested_delegation_stops_at_max_depth() {
let grandchild = dummy_profile("grandchild");
let child = SubagentProfile::new("child", "you delegate").with_subagent(grandchild);
let provider: Arc<dyn ModelProvider> = Arc::new(ScriptedProvider::new(vec![
vec![parent_task_call("child", "sub-delegate")],
vec![AgentMessage {
role: Role::Assistant,
content: vec![ContentBlock::ToolUse {
id: "cchild".into(),
call: crate::tool::ToolCall {
name: "task".into(),
input: serde_json::json!({
"agent": "grandchild",
"prompt": "too deep"
}),
},
}],
}],
vec![assistant_text("grandchild was unreachable")],
vec![assistant_text("done")],
]));
let cancel = CancellationToken::new();
let task = Arc::new(TaskTool::new(
Arc::clone(&provider),
Model {
id: "test/m".into(),
},
RunConfig::default(),
vec![child],
SubagentOptions {
max_depth: 1,
max_delegations: DEFAULT_MAX_DELEGATIONS,
},
cancel.clone(),
None,
None,
));
let tools: Vec<Arc<dyn Tool>> = vec![task];
let mut messages = vec![AgentMessage {
role: Role::User,
content: vec![ContentBlock::Text { text: "go".into() }],
}];
let outcome = run_agent(
provider.as_ref(),
&tools,
&mut messages,
&Model {
id: "test/m".into(),
},
&RunConfig::default(),
&cancel,
&crate::event::RunHooks::default(),
)
.await
.expect("parent run");
assert_eq!(outcome.turns, 2);
}
struct FlagTool {
name: String,
ran: Arc<std::sync::atomic::AtomicBool>,
}
#[async_trait]
impl Tool for FlagTool {
fn definition(&self) -> ToolDefinition {
ToolDefinition {
name: self.name.clone(),
label: self.name.clone(),
description: "records execution".into(),
parameters: crate::tool::ParameterSchema {
fields: std::collections::BTreeMap::new(),
},
}
}
async fn execute(&self, _ctx: InvokeContext, _input: Value) -> CoreResult<ToolResult> {
self.ran.store(true, Ordering::SeqCst);
Ok(ToolResult {
content: vec![serde_json::json!({ "type": "text", "text": "ran" })],
details: None,
})
}
}
struct DenyByName(String);
#[async_trait]
impl ToolPolicy for DenyByName {
async fn check(
&self,
tool: &str,
_input: &Value,
_ctx: &InvokeContext,
) -> crate::policy::PolicyVerdict {
if tool == self.0 {
crate::policy::PolicyVerdict::Deny("blocked by test policy".into())
} else {
crate::policy::PolicyVerdict::Allow
}
}
}
#[tokio::test]
async fn policy_applies_to_delegated_subagent() {
let ran = Arc::new(std::sync::atomic::AtomicBool::new(false));
let danger = Arc::new(FlagTool {
name: "danger".into(),
ran: Arc::clone(&ran),
});
let worker = SubagentProfile::new("worker", "you do work").with_tool(danger);
let provider: Arc<dyn ModelProvider> = Arc::new(ScriptedProvider::new(vec![
vec![parent_task_call("worker", "use the danger tool")],
vec![AgentMessage {
role: Role::Assistant,
content: vec![ContentBlock::ToolUse {
id: "cdanger".into(),
call: crate::tool::ToolCall {
name: "danger".into(),
input: serde_json::json!({}),
},
}],
}],
vec![assistant_text("could not run danger")],
vec![assistant_text("done")],
]));
let cancel = CancellationToken::new();
let policy: Arc<dyn ToolPolicy> = Arc::new(DenyByName("danger".into()));
let task = Arc::new(TaskTool::new(
Arc::clone(&provider),
Model {
id: "test/m".into(),
},
RunConfig::default(),
vec![worker],
SubagentOptions::default(),
cancel.clone(),
None,
Some(Arc::clone(&policy)),
));
let tools: Vec<Arc<dyn Tool>> = vec![task];
let mut messages = vec![AgentMessage {
role: Role::User,
content: vec![ContentBlock::Text {
text: "delegate".into(),
}],
}];
let hooks = crate::event::RunHooks {
policy: Some(policy.as_ref()),
..crate::event::RunHooks::default()
};
let outcome = run_agent(
provider.as_ref(),
&tools,
&mut messages,
&Model {
id: "test/m".into(),
},
&RunConfig::default(),
&cancel,
&hooks,
)
.await
.expect("parent run completes");
assert_eq!(outcome.final_text, "done");
assert!(
!ran.load(Ordering::SeqCst),
"policy must block the subagent's tool — it was inherited, not bypassed"
);
}
}