use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{info, info_span};
use crate::error::Error;
use crate::llm::types::{TokenUsage, ToolDefinition};
use crate::llm::{BoxedProvider, LlmProvider};
use crate::tool::{Tool, ToolOutput};
use crate::types::DispatchMode;
use crate::memory::Memory;
use crate::knowledge::KnowledgeBase;
use crate::tool::builtins::OnQuestion;
use super::blackboard::{Blackboard, InMemoryBlackboard};
use super::blackboard_tools::blackboard_tools;
use super::context::ContextStrategy;
use super::events::{AgentEvent, OnEvent};
use super::guardrail::Guardrail;
use super::{AgentOutput, AgentRunner};
#[derive(Clone)]
pub(crate) struct SubAgentDef {
pub(crate) name: String,
pub(crate) description: String,
pub(crate) system_prompt: String,
pub(crate) tools: Vec<Arc<dyn Tool>>,
pub(crate) context_strategy: Option<ContextStrategy>,
pub(crate) summarize_threshold: Option<u32>,
pub(crate) tool_timeout: Option<Duration>,
pub(crate) max_tool_output_bytes: Option<usize>,
pub(crate) max_turns: Option<usize>,
pub(crate) max_tokens: Option<u32>,
pub(crate) response_schema: Option<serde_json::Value>,
pub(crate) guardrails: Vec<Arc<dyn Guardrail>>,
pub(crate) run_timeout: Option<Duration>,
pub(crate) provider_override: Option<Arc<BoxedProvider>>,
pub(crate) reasoning_effort: Option<crate::llm::types::ReasoningEffort>,
pub(crate) enable_reflection: Option<bool>,
pub(crate) tool_output_compression_threshold: Option<usize>,
pub(crate) max_tools_per_turn: Option<usize>,
pub(crate) tool_profile: Option<super::tool_filter::ToolProfile>,
pub(crate) max_identical_tool_calls: Option<u32>,
pub(crate) max_fuzzy_identical_tool_calls: Option<u32>,
pub(crate) max_tool_calls_per_turn: Option<u32>,
pub(crate) session_prune_config: Option<crate::agent::pruner::SessionPruneConfig>,
pub(crate) enable_recursive_summarization: Option<bool>,
pub(crate) reflection_threshold: Option<u32>,
pub(crate) consolidate_on_exit: Option<bool>,
pub(crate) workspace: Option<std::path::PathBuf>,
pub(crate) max_total_tokens: Option<u64>,
pub(crate) audit_trail: Option<Arc<dyn super::audit::AuditTrail>>,
pub(crate) audit_user_id: Option<String>,
pub(crate) audit_tenant_id: Option<String>,
pub(crate) audit_delegation_chain: Vec<String>,
}
impl std::fmt::Debug for SubAgentDef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubAgentDef")
.field("name", &self.name)
.field("description", &self.description)
.field("tools_count", &self.tools.len())
.finish()
}
}
impl SubAgentDef {
pub(crate) fn new(
name: impl Into<String>,
description: impl Into<String>,
system_prompt: impl Into<String>,
) -> Self {
Self {
name: name.into(),
description: description.into(),
system_prompt: system_prompt.into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider_override: None,
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
}
}
}
impl From<SubAgentConfig> for SubAgentDef {
fn from(def: SubAgentConfig) -> Self {
Self {
name: def.name,
description: def.description,
system_prompt: def.system_prompt,
tools: def.tools,
context_strategy: def.context_strategy,
summarize_threshold: def.summarize_threshold,
tool_timeout: def.tool_timeout,
max_tool_output_bytes: def.max_tool_output_bytes,
max_turns: def.max_turns,
max_tokens: def.max_tokens,
response_schema: def.response_schema,
run_timeout: def.run_timeout,
guardrails: def.guardrails,
provider_override: def.provider,
reasoning_effort: def.reasoning_effort,
enable_reflection: def.enable_reflection,
tool_output_compression_threshold: def.tool_output_compression_threshold,
max_tools_per_turn: def.max_tools_per_turn,
tool_profile: def.tool_profile,
max_identical_tool_calls: def.max_identical_tool_calls,
max_fuzzy_identical_tool_calls: def.max_fuzzy_identical_tool_calls,
max_tool_calls_per_turn: def.max_tool_calls_per_turn,
session_prune_config: def.session_prune_config,
enable_recursive_summarization: def.enable_recursive_summarization,
reflection_threshold: def.reflection_threshold,
consolidate_on_exit: def.consolidate_on_exit,
workspace: def.workspace,
max_total_tokens: def.max_total_tokens,
audit_trail: def.audit_trail,
audit_user_id: def.audit_user_id,
audit_tenant_id: def.audit_tenant_id,
audit_delegation_chain: def.audit_delegation_chain,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct DelegatedTask {
pub(crate) agent: String,
pub(crate) task: String,
}
#[derive(Debug, Clone)]
pub(crate) struct SubAgentResult {
pub(crate) agent: String,
pub(crate) result: String,
pub(crate) tokens_used: TokenUsage,
pub(crate) success: bool,
}
pub struct Orchestrator<P: LlmProvider> {
runner: AgentRunner<P>,
sub_agent_tokens: Arc<Mutex<TokenUsage>>,
}
impl<P: LlmProvider + 'static> Orchestrator<P> {
pub fn builder(provider: Arc<P>) -> OrchestratorBuilder<P> {
OrchestratorBuilder {
provider,
sub_agents: vec![],
max_turns: 10,
max_tokens: 4096,
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
shared_memory: None,
memory_namespace_prefix: None,
blackboard: None,
knowledge_base: None,
on_text: None,
on_approval: None,
on_event: None,
guardrails: Vec::new(),
on_question: None,
run_timeout: None,
enable_squads: None,
reasoning_effort: None,
enable_reflection: false,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
permission_rules: super::permission::PermissionRuleset::default(),
instruction_text: None,
learned_permissions: None,
lsp_manager: None,
observability_mode: None,
dispatch_mode: DispatchMode::Parallel,
workspace: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
allow_shared_write: true,
multi_agent_prompt: true,
spawn_config: None,
spawn_builtin_tools: Vec::new(),
tenant_tracker: None,
}
}
pub async fn run(&mut self, task: &str) -> Result<AgentOutput, Error> {
{
let mut acc = self.sub_agent_tokens.lock().expect("token lock poisoned");
*acc = TokenUsage::default();
}
match self.runner.execute(task).await {
Ok(mut output) => {
let sub_tokens = *self.sub_agent_tokens.lock().expect("token lock poisoned");
output.tokens_used += sub_tokens;
Ok(output)
}
Err(e) => {
let sub_tokens = *self.sub_agent_tokens.lock().expect("token lock poisoned");
let mut usage = e.partial_usage();
usage += sub_tokens;
Err(e.with_partial_usage(usage))
}
}
}
}
struct DelegateTaskTool {
shared_provider: Arc<BoxedProvider>,
sub_agents: Vec<SubAgentDef>,
max_turns: usize,
max_tokens: u32,
permission_rules: super::permission::PermissionRuleset,
accumulated_tokens: Arc<Mutex<TokenUsage>>,
shared_memory: Option<Arc<dyn Memory>>,
memory_namespace_prefix: Option<String>,
blackboard: Option<Arc<dyn Blackboard>>,
knowledge_base: Option<Arc<dyn KnowledgeBase>>,
cached_definition: ToolDefinition,
on_event: Option<Arc<OnEvent>>,
on_text: Option<Arc<crate::llm::OnText>>,
lsp_manager: Option<Arc<crate::lsp::LspManager>>,
observability_mode: super::observability::ObservabilityMode,
allow_shared_write: bool,
tenant_tracker: Option<Arc<crate::agent::tenant_tracker::TenantTokenTracker>>,
guardrails: Vec<Arc<dyn Guardrail>>,
}
impl DelegateTaskTool {
async fn delegate(&self, tasks: Vec<DelegatedTask>) -> Result<String, Error> {
if tasks.is_empty() {
return Err(Error::Agent(
"delegate_task requires at least one task".into(),
));
}
let task_count = tasks.len();
let agent_names: Vec<String> = tasks.iter().map(|t| t.agent.clone()).collect();
let _delegate_span = info_span!(
"heartbit.orchestrator.delegate",
agent_count = task_count,
agents = ?agent_names,
);
if let Some(ref cb) = self.on_event {
cb(AgentEvent::SubAgentsDispatched {
agent: "orchestrator".into(),
agents: agent_names.clone(),
});
}
let mut join_set = tokio::task::JoinSet::new();
for (idx, task) in tasks.into_iter().enumerate() {
let agent_def = match self.sub_agents.iter().find(|a| a.name == task.agent) {
Some(def) => def.clone(),
None => {
let agent_name = task.agent.clone();
join_set.spawn(async move {
(
idx,
SubAgentResult {
agent: agent_name.clone(),
result: format!("Error: unknown agent '{agent_name}'"),
tokens_used: TokenUsage::default(),
success: false,
},
)
});
continue;
}
};
let provider = agent_def
.provider_override
.clone()
.unwrap_or_else(|| self.shared_provider.clone());
let max_turns = agent_def.max_turns.unwrap_or(self.max_turns);
let max_tokens = agent_def.max_tokens.unwrap_or(self.max_tokens);
let shared_memory = self.shared_memory.clone();
let ns_prefix = self.memory_namespace_prefix.clone();
let blackboard = self.blackboard.clone();
let knowledge_base = self.knowledge_base.clone();
let on_event = self.on_event.clone();
let on_text = self.on_text.clone();
let lsp_manager = self.lsp_manager.clone();
let permission_rules = self.permission_rules.clone();
let observability_mode = self.observability_mode;
let allow_shared_write = self.allow_shared_write;
let tenant_tracker = self.tenant_tracker.clone();
let orchestrator_guardrails = self.guardrails.clone();
info!(agent = %agent_def.name, task = %task.task, "spawning sub-agent");
join_set.spawn(async move {
let mut builder = AgentRunner::builder(provider)
.name(&agent_def.name)
.system_prompt(&agent_def.system_prompt)
.tools(agent_def.tools)
.max_turns(max_turns)
.max_tokens(max_tokens);
if let Some(strategy) = agent_def.context_strategy {
builder = builder.context_strategy(strategy);
}
if let Some(threshold) = agent_def.summarize_threshold {
builder = builder.summarize_threshold(threshold);
}
if let Some(timeout) = agent_def.tool_timeout {
builder = builder.tool_timeout(timeout);
}
if let Some(max) = agent_def.max_tool_output_bytes {
builder = builder.max_tool_output_bytes(max);
}
if let Some(schema) = agent_def.response_schema {
builder = builder.structured_schema(schema);
}
let mut combined_guardrails = orchestrator_guardrails;
combined_guardrails.extend(agent_def.guardrails);
if !combined_guardrails.is_empty() {
builder = builder.guardrails(combined_guardrails);
}
if let Some(timeout) = agent_def.run_timeout {
builder = builder.run_timeout(timeout);
}
if let Some(effort) = agent_def.reasoning_effort {
builder = builder.reasoning_effort(effort);
}
if let Some(true) = agent_def.enable_reflection {
builder = builder.enable_reflection(true);
}
if let Some(threshold) = agent_def.tool_output_compression_threshold {
builder = builder.tool_output_compression_threshold(threshold);
}
if let Some(max) = agent_def.max_tools_per_turn {
builder = builder.max_tools_per_turn(max);
}
if let Some(profile) = agent_def.tool_profile {
builder = builder.tool_profile(profile);
}
if let Some(max) = agent_def.max_identical_tool_calls {
builder = builder.max_identical_tool_calls(max);
}
if let Some(max) = agent_def.max_fuzzy_identical_tool_calls {
builder = builder.max_fuzzy_identical_tool_calls(max);
}
if let Some(cap) = agent_def.max_tool_calls_per_turn {
builder = builder.max_tool_calls_per_turn(cap);
}
if let Some(ref config) = agent_def.session_prune_config {
builder = builder.session_prune_config(config.clone());
}
if let Some(true) = agent_def.enable_recursive_summarization {
builder = builder.enable_recursive_summarization(true);
}
if let Some(threshold) = agent_def.reflection_threshold {
builder = builder.reflection_threshold(threshold);
}
if let Some(true) = agent_def.consolidate_on_exit {
builder = builder.consolidate_on_exit(true);
}
if let Some(ref ws) = agent_def.workspace {
builder = builder.workspace(ws.clone());
}
if let Some(max) = agent_def.max_total_tokens {
builder = builder.max_total_tokens(max);
}
if let Some(trail) = agent_def.audit_trail {
builder = builder.audit_trail(trail);
}
if let Some(uid) = &agent_def.audit_user_id
&& let Some(tid) = &agent_def.audit_tenant_id
{
builder = builder.audit_user_context(uid.clone(), tid.clone());
}
if !agent_def.audit_delegation_chain.is_empty() {
builder =
builder.audit_delegation_chain(agent_def.audit_delegation_chain.clone());
}
if !permission_rules.is_empty() {
builder = builder.permission_rules(permission_rules);
}
builder = builder.observability_mode(observability_mode);
if let Some(ref tracker) = tenant_tracker {
builder = builder.tenant_tracker(tracker.clone());
}
if let Some(ref lsp) = lsp_manager {
builder = builder.lsp_manager(lsp.clone());
}
if let Some(ref on_event) = on_event {
builder = builder.on_event(on_event.clone());
}
if let Some(ref on_text) = on_text {
builder = builder.on_text(on_text.clone());
}
if let Some(ref memory) = shared_memory {
let agent_ns = match &ns_prefix {
Some(prefix) => format!("{prefix}:{}", agent_def.name),
None => agent_def.name.clone(),
};
let ns = Arc::new(crate::memory::namespaced::NamespacedMemory::new(
memory.clone(),
&agent_ns,
));
builder = builder.memory(ns);
let mem_scope = crate::auth::TenantScope::from_audit_fields(
agent_def.audit_tenant_id.as_deref(),
agent_def.audit_user_id.as_deref(),
);
builder = builder.tools(crate::memory::shared_tools::shared_memory_tools(
memory.clone(),
&agent_ns,
mem_scope,
allow_shared_write,
));
}
if let Some(ref bb) = blackboard {
builder = builder.tools(blackboard_tools(bb.clone(), &agent_def.name));
}
if let Some(ref kb) = knowledge_base {
builder = builder.knowledge(kb.clone());
}
let runner = match builder.build() {
Ok(r) => r,
Err(e) => {
return (
idx,
SubAgentResult {
agent: agent_def.name,
result: format!("Error building agent: {e}"),
tokens_used: TokenUsage::default(),
success: false,
},
);
}
};
let result = match runner.execute(&task.task).await {
Ok(output) => {
if let Some(ref bb) = blackboard {
let key = format!("agent:{}", agent_def.name);
if let Err(e) = bb
.write(&key, serde_json::Value::String(output.result.clone()))
.await
{
tracing::warn!(
agent = %agent_def.name,
error = %e,
"failed to write result to blackboard"
);
}
}
SubAgentResult {
agent: agent_def.name,
result: output.result,
tokens_used: output.tokens_used,
success: true,
}
}
Err(e) => SubAgentResult {
agent: agent_def.name,
result: format!("Error: {e}"),
tokens_used: e.partial_usage(),
success: false,
},
};
(idx, result)
});
}
let mut results: Vec<Option<(usize, SubAgentResult)>> = vec![None; task_count];
while let Some(result) = join_set.join_next().await {
match result {
Ok((idx, sub_result)) => {
results[idx] = Some((idx, sub_result));
}
Err(e) => {
tracing::error!(error = %e, "sub-agent task panicked");
}
}
}
let mut results: Vec<(usize, SubAgentResult)> = results
.into_iter()
.enumerate()
.map(|(idx, r)| {
r.unwrap_or_else(|| {
(
idx,
SubAgentResult {
agent: agent_names[idx].clone(),
result: "Error: sub-agent task panicked".into(),
tokens_used: TokenUsage::default(),
success: false,
},
)
})
})
.collect();
results.sort_by_key(|(idx, _)| *idx);
{
let mut acc = self.accumulated_tokens.lock().expect("token lock poisoned");
for (_, r) in &results {
*acc += r.tokens_used;
}
}
if let Some(ref cb) = self.on_event {
for (_, r) in &results {
cb(AgentEvent::SubAgentCompleted {
agent: r.agent.clone(),
success: r.success,
usage: r.tokens_used,
});
}
}
let formatted = results
.iter()
.map(|(_, r)| format!("=== Agent: {} ===\n{}", r.agent, r.result))
.collect::<Vec<_>>()
.join("\n\n");
Ok(formatted)
}
}
impl Tool for DelegateTaskTool {
fn definition(&self) -> ToolDefinition {
self.cached_definition.clone()
}
fn execute(
&self,
input: serde_json::Value,
) -> Pin<Box<dyn Future<Output = Result<ToolOutput, Error>> + Send + '_>> {
Box::pin(async move {
let delegate_input: DelegateInput = serde_json::from_value(input)
.map_err(|e| Error::Agent(format!("Invalid delegate_task input: {e}")))?;
let result = self.delegate(delegate_input.tasks).await?;
Ok(ToolOutput::success(result))
})
}
}
#[derive(Deserialize)]
struct DelegateInput {
tasks: Vec<DelegatedTask>,
}
struct FormSquadTool {
shared_provider: Arc<BoxedProvider>,
agent_pool: Vec<SubAgentDef>,
default_max_turns: usize,
default_max_tokens: u32,
permission_rules: super::permission::PermissionRuleset,
accumulated_tokens: Arc<Mutex<TokenUsage>>,
shared_memory: Option<Arc<dyn Memory>>,
memory_namespace_prefix: Option<String>,
blackboard: Option<Arc<dyn Blackboard>>,
knowledge_base: Option<Arc<dyn KnowledgeBase>>,
on_event: Option<Arc<OnEvent>>,
on_text: Option<Arc<crate::llm::OnText>>,
lsp_manager: Option<Arc<crate::lsp::LspManager>>,
cached_definition: ToolDefinition,
observability_mode: super::observability::ObservabilityMode,
allow_shared_write: bool,
tenant_tracker: Option<Arc<crate::agent::tenant_tracker::TenantTokenTracker>>,
}
impl Tool for FormSquadTool {
fn definition(&self) -> ToolDefinition {
self.cached_definition.clone()
}
fn execute(
&self,
input: serde_json::Value,
) -> Pin<Box<dyn Future<Output = Result<ToolOutput, Error>> + Send + '_>> {
Box::pin(async move {
let delegate_input: DelegateInput = serde_json::from_value(input)
.map_err(|e| Error::Agent(format!("Invalid form_squad input: {e}")))?;
let tasks = delegate_input.tasks;
if tasks.len() < 2 {
return Ok(ToolOutput::error(
"form_squad requires at least 2 tasks. Use delegate_task for single-agent tasks."
.to_string(),
));
}
{
let mut seen = std::collections::HashSet::new();
for t in &tasks {
if !seen.insert(&t.agent) {
return Ok(ToolOutput::error(format!(
"Duplicate agent name in squad: '{}'",
t.agent
)));
}
}
}
for t in &tasks {
if !self.agent_pool.iter().any(|a| a.name == t.agent) {
return Ok(ToolOutput::error(format!(
"Unknown agent '{}'. Available agents: {}",
t.agent,
self.agent_pool
.iter()
.map(|a| a.name.as_str())
.collect::<Vec<_>>()
.join(", ")
)));
}
}
let task_count = tasks.len();
let agent_names: Vec<String> = tasks.iter().map(|t| t.agent.clone()).collect();
let private_bb: Arc<dyn Blackboard> = Arc::new(InMemoryBlackboard::new());
let _squad_span = info_span!(
"heartbit.orchestrator.squad",
agent_count = task_count,
agents = ?agent_names,
);
if let Some(ref cb) = self.on_event {
cb(AgentEvent::SubAgentsDispatched {
agent: "squad-leader".into(),
agents: agent_names.clone(),
});
}
let mut join_set = tokio::task::JoinSet::new();
for (idx, task) in tasks.into_iter().enumerate() {
let agent_def = match self.agent_pool.iter().find(|a| a.name == task.agent) {
Some(def) => def.clone(),
None => {
return Ok(ToolOutput::error(format!(
"Internal error: agent '{}' not found after validation",
task.agent
)));
}
};
let provider = agent_def
.provider_override
.clone()
.unwrap_or_else(|| self.shared_provider.clone());
let max_turns = agent_def.max_turns.unwrap_or(self.default_max_turns);
let max_tokens = agent_def.max_tokens.unwrap_or(self.default_max_tokens);
let shared_memory = self.shared_memory.clone();
let ns_prefix = self.memory_namespace_prefix.clone();
let bb = private_bb.clone();
let knowledge_base = self.knowledge_base.clone();
let on_event = self.on_event.clone();
let on_text = self.on_text.clone();
let lsp_manager = self.lsp_manager.clone();
let permission_rules = self.permission_rules.clone();
let observability_mode = self.observability_mode;
let allow_shared_write = self.allow_shared_write;
let tenant_tracker = self.tenant_tracker.clone();
info!(agent = %agent_def.name, task = %task.task, "spawning squad member");
join_set.spawn(async move {
let mut builder = AgentRunner::builder(provider)
.name(&agent_def.name)
.system_prompt(&agent_def.system_prompt)
.tools(agent_def.tools)
.max_turns(max_turns)
.max_tokens(max_tokens);
if let Some(strategy) = agent_def.context_strategy {
builder = builder.context_strategy(strategy);
}
if let Some(threshold) = agent_def.summarize_threshold {
builder = builder.summarize_threshold(threshold);
}
if let Some(timeout) = agent_def.tool_timeout {
builder = builder.tool_timeout(timeout);
}
if let Some(max) = agent_def.max_tool_output_bytes {
builder = builder.max_tool_output_bytes(max);
}
if let Some(schema) = agent_def.response_schema {
builder = builder.structured_schema(schema);
}
if !agent_def.guardrails.is_empty() {
builder = builder.guardrails(agent_def.guardrails);
}
if let Some(timeout) = agent_def.run_timeout {
builder = builder.run_timeout(timeout);
}
if let Some(effort) = agent_def.reasoning_effort {
builder = builder.reasoning_effort(effort);
}
if let Some(true) = agent_def.enable_reflection {
builder = builder.enable_reflection(true);
}
if let Some(threshold) = agent_def.tool_output_compression_threshold {
builder = builder.tool_output_compression_threshold(threshold);
}
if let Some(max) = agent_def.max_tools_per_turn {
builder = builder.max_tools_per_turn(max);
}
if let Some(profile) = agent_def.tool_profile {
builder = builder.tool_profile(profile);
}
if let Some(max) = agent_def.max_identical_tool_calls {
builder = builder.max_identical_tool_calls(max);
}
if let Some(max) = agent_def.max_fuzzy_identical_tool_calls {
builder = builder.max_fuzzy_identical_tool_calls(max);
}
if let Some(cap) = agent_def.max_tool_calls_per_turn {
builder = builder.max_tool_calls_per_turn(cap);
}
if let Some(ref config) = agent_def.session_prune_config {
builder = builder.session_prune_config(config.clone());
}
if let Some(true) = agent_def.enable_recursive_summarization {
builder = builder.enable_recursive_summarization(true);
}
if let Some(threshold) = agent_def.reflection_threshold {
builder = builder.reflection_threshold(threshold);
}
if let Some(true) = agent_def.consolidate_on_exit {
builder = builder.consolidate_on_exit(true);
}
if let Some(ref ws) = agent_def.workspace {
builder = builder.workspace(ws.clone());
}
if let Some(max) = agent_def.max_total_tokens {
builder = builder.max_total_tokens(max);
}
if let Some(trail) = agent_def.audit_trail {
builder = builder.audit_trail(trail);
}
if let Some(uid) = &agent_def.audit_user_id
&& let Some(tid) = &agent_def.audit_tenant_id
{
builder = builder.audit_user_context(uid.clone(), tid.clone());
}
if !agent_def.audit_delegation_chain.is_empty() {
builder = builder
.audit_delegation_chain(agent_def.audit_delegation_chain.clone());
}
if !permission_rules.is_empty() {
builder = builder.permission_rules(permission_rules);
}
builder = builder.observability_mode(observability_mode);
if let Some(ref tracker) = tenant_tracker {
builder = builder.tenant_tracker(tracker.clone());
}
if let Some(ref lsp) = lsp_manager {
builder = builder.lsp_manager(lsp.clone());
}
if let Some(ref on_event) = on_event {
builder = builder.on_event(on_event.clone());
}
if let Some(ref on_text) = on_text {
builder = builder.on_text(on_text.clone());
}
if let Some(ref memory) = shared_memory {
let agent_ns = match &ns_prefix {
Some(prefix) => format!("{prefix}:{}", agent_def.name),
None => agent_def.name.clone(),
};
let ns = Arc::new(crate::memory::namespaced::NamespacedMemory::new(
memory.clone(),
&agent_ns,
));
builder = builder.memory(ns);
let mem_scope = crate::auth::TenantScope::from_audit_fields(
agent_def.audit_tenant_id.as_deref(),
agent_def.audit_user_id.as_deref(),
);
builder = builder.tools(crate::memory::shared_tools::shared_memory_tools(
memory.clone(),
&agent_ns,
mem_scope,
allow_shared_write,
));
}
builder = builder.tools(blackboard_tools(bb.clone(), &agent_def.name));
if let Some(ref kb) = knowledge_base {
builder = builder.knowledge(kb.clone());
}
let runner = match builder.build() {
Ok(r) => r,
Err(e) => {
return (
idx,
SubAgentResult {
agent: agent_def.name,
result: format!("Error building agent: {e}"),
tokens_used: TokenUsage::default(),
success: false,
},
);
}
};
let result = match runner.execute(&task.task).await {
Ok(output) => {
let key = format!("agent:{}", agent_def.name);
if let Err(e) = bb
.write(&key, serde_json::Value::String(output.result.clone()))
.await
{
tracing::warn!(
agent = %agent_def.name,
error = %e,
"failed to write result to private blackboard"
);
}
SubAgentResult {
agent: agent_def.name,
result: output.result,
tokens_used: output.tokens_used,
success: true,
}
}
Err(e) => SubAgentResult {
agent: agent_def.name,
result: format!("Error: {e}"),
tokens_used: e.partial_usage(),
success: false,
},
};
(idx, result)
});
}
let mut results: Vec<Option<(usize, SubAgentResult)>> = vec![None; task_count];
while let Some(result) = join_set.join_next().await {
match result {
Ok((idx, sub_result)) => {
results[idx] = Some((idx, sub_result));
}
Err(e) => {
tracing::error!(error = %e, "squad member task panicked");
}
}
}
let mut results: Vec<(usize, SubAgentResult)> = results
.into_iter()
.enumerate()
.map(|(idx, r)| {
r.unwrap_or_else(|| {
(
idx,
SubAgentResult {
agent: agent_names[idx].clone(),
result: "Error: squad member task panicked".into(),
tokens_used: TokenUsage::default(),
success: false,
},
)
})
})
.collect();
results.sort_by_key(|(idx, _)| *idx);
let squad_label = format!("squad[{}]", agent_names.join(","));
let bb_key = format!("squad:{}", agent_names.join("+"));
let mut total_tokens = TokenUsage::default();
{
let mut acc = self.accumulated_tokens.lock().expect("token lock poisoned");
for (_, r) in &results {
*acc += r.tokens_used;
total_tokens += r.tokens_used;
}
}
if let Some(ref cb) = self.on_event {
for (_, r) in &results {
cb(AgentEvent::SubAgentCompleted {
agent: r.agent.clone(),
success: r.success,
usage: r.tokens_used,
});
}
}
let all_success = results.iter().all(|(_, r)| r.success);
let formatted = results
.iter()
.map(|(_, r)| format!("=== Agent: {} ===\n{}", r.agent, r.result))
.collect::<Vec<_>>()
.join("\n\n");
if let Some(ref cb) = self.on_event {
cb(AgentEvent::SubAgentCompleted {
agent: squad_label,
success: all_success,
usage: total_tokens,
});
}
if let Some(ref bb) = self.blackboard
&& let Err(e) = bb
.write(&bb_key, serde_json::Value::String(formatted.clone()))
.await
{
tracing::warn!(
key = %bb_key,
error = %e,
"failed to write squad result to outer blackboard"
);
}
Ok(ToolOutput::success(formatted))
})
}
}
struct SpawnAgentTool {
shared_provider: Arc<BoxedProvider>,
spawn_config: crate::types::SpawnConfig,
tool_pool: std::collections::HashMap<String, Arc<dyn Tool>>,
spawn_count: Arc<std::sync::atomic::AtomicU32>,
spawned_names: Arc<Mutex<std::collections::HashSet<String>>>,
accumulated_tokens: Arc<Mutex<TokenUsage>>,
permission_rules: super::permission::PermissionRuleset,
shared_memory: Option<Arc<dyn Memory>>,
memory_namespace_prefix: Option<String>,
on_event: Option<Arc<OnEvent>>,
on_text: Option<Arc<crate::llm::OnText>>,
lsp_manager: Option<Arc<crate::lsp::LspManager>>,
observability_mode: super::observability::ObservabilityMode,
workspace: Option<std::path::PathBuf>,
guardrails: Vec<Arc<dyn Guardrail>>,
audit_trail: Option<Arc<dyn super::audit::AuditTrail>>,
audit_user_id: Option<String>,
audit_tenant_id: Option<String>,
audit_delegation_chain: Vec<String>,
cached_definition: ToolDefinition,
tenant_tracker: Option<Arc<crate::agent::tenant_tracker::TenantTokenTracker>>,
}
#[derive(Deserialize)]
struct SpawnAgentInput {
name: String,
system_prompt: String,
#[serde(default)]
tools: Vec<String>,
task: String,
}
const SPAWN_MAX_PROMPT_BYTES: usize = 32 * 1024;
impl SpawnAgentTool {
fn build_definition(config: &crate::types::SpawnConfig) -> ToolDefinition {
let allowlist = if config.tool_allowlist.is_empty() {
"(none — reasoning-only agents)".to_string()
} else {
config.tool_allowlist.join(", ")
};
ToolDefinition {
name: "spawn_agent".into(),
description: format!(
"Create a new specialist agent at runtime when no pre-configured agent fits the task. \
The spawned agent runs with the given system prompt and tool subset, then returns its result.\n\n\
Available tools for spawned agents: [{allowlist}]. Budget: {} agents max per run.",
config.max_spawned_agents
),
input_schema: json!({
"type": "object",
"required": ["name", "system_prompt", "task"],
"properties": {
"name": {
"type": "string",
"description": "Lowercase identifier for the agent (a-z, 0-9, underscores). Must start with a letter. E.g. 'tax_specialist', 'csv_analyzer'."
},
"system_prompt": {
"type": "string",
"description": "The agent's role and behavior instructions. Be specific about expertise and constraints."
},
"tools": {
"type": "array",
"items": { "type": "string" },
"description": format!("Subset of available tools: [{allowlist}]. Empty array creates a reasoning-only agent.")
},
"task": {
"type": "string",
"description": "The specific task for this agent to accomplish."
}
},
"additionalProperties": false
}),
}
}
async fn spawn(&self, input: SpawnAgentInput) -> Result<ToolOutput, Error> {
let current = self.spawn_count.load(std::sync::atomic::Ordering::Relaxed);
if current >= self.spawn_config.max_spawned_agents {
return Ok(ToolOutput::error(format!(
"Spawn limit reached: {current}/{} agents already spawned this run.",
self.spawn_config.max_spawned_agents
)));
}
let name_re =
regex::Regex::new(r"^[a-z][a-z0-9_]{0,63}$").expect("spawn agent name regex is valid");
if !name_re.is_match(&input.name) {
return Ok(ToolOutput::error(format!(
"Invalid agent name '{}'. Must match ^[a-z][a-z0-9_]{{0,63}}$ \
(lowercase, starts with letter, alphanumeric + underscores, max 64 chars).",
input.name
)));
}
{
let mut names = self.spawned_names.lock().expect("spawned names lock");
if !names.insert(input.name.clone()) {
return Ok(ToolOutput::error(format!(
"Agent name '{}' already used in this run. Choose a different name.",
input.name
)));
}
}
for tool_name in &input.tools {
if !self.tool_pool.contains_key(tool_name) {
let available: Vec<&str> = self.tool_pool.keys().map(|k| k.as_str()).collect();
return Ok(ToolOutput::error(format!(
"Tool '{}' not in allowlist. Available: [{}]",
tool_name,
available.join(", ")
)));
}
}
{
let acc = self.accumulated_tokens.lock().expect("token lock");
let used = acc.total();
if used >= self.spawn_config.max_total_tokens {
return Ok(ToolOutput::error(format!(
"Spawn token budget exhausted: {used}/{} tokens used across spawned agents.",
self.spawn_config.max_total_tokens
)));
}
}
if input.system_prompt.len() > SPAWN_MAX_PROMPT_BYTES {
return Ok(ToolOutput::error(format!(
"System prompt too long: {} bytes (max {SPAWN_MAX_PROMPT_BYTES}).",
input.system_prompt.len()
)));
}
let spawned_name = format!("spawn:{}", input.name);
if let Some(ref cb) = self.on_event {
cb(AgentEvent::AgentSpawned {
agent: "orchestrator".into(),
spawned_name: spawned_name.clone(),
tools: input.tools.clone(),
task: input.task.clone(),
});
}
let selected_tools: Vec<Arc<dyn Tool>> = input
.tools
.iter()
.filter_map(|name| self.tool_pool.get(name).cloned())
.collect();
let mut builder = AgentRunner::builder(self.shared_provider.clone())
.name(&spawned_name)
.system_prompt(&input.system_prompt)
.tools(selected_tools)
.max_turns(self.spawn_config.max_turns)
.max_tokens(self.spawn_config.max_tokens)
.observability_mode(self.observability_mode);
if !self.permission_rules.is_empty() {
builder = builder.permission_rules(self.permission_rules.clone());
}
if !self.guardrails.is_empty() {
builder = builder.guardrails(self.guardrails.clone());
}
if let Some(ref ws) = self.workspace {
builder = builder.workspace(ws.clone());
}
if let Some(ref lsp) = self.lsp_manager {
builder = builder.lsp_manager(lsp.clone());
}
if let Some(ref cb) = self.on_event {
builder = builder.on_event(cb.clone());
}
if let Some(ref cb) = self.on_text {
builder = builder.on_text(cb.clone());
}
if let Some(ref trail) = self.audit_trail {
builder = builder.audit_trail(trail.clone());
}
if let (Some(uid), Some(tid)) = (&self.audit_user_id, &self.audit_tenant_id) {
builder = builder.audit_user_context(uid.clone(), tid.clone());
}
if !self.audit_delegation_chain.is_empty() {
let mut chain = self.audit_delegation_chain.clone();
chain.push(spawned_name.clone());
builder = builder.audit_delegation_chain(chain);
}
if let Some(ref tracker) = self.tenant_tracker {
builder = builder.tenant_tracker(tracker.clone());
}
if let Some(ref memory) = self.shared_memory {
let agent_ns = match &self.memory_namespace_prefix {
Some(prefix) => format!("{prefix}:{spawned_name}"),
None => spawned_name.clone(),
};
let ns = Arc::new(crate::memory::namespaced::NamespacedMemory::new(
memory.clone(),
&agent_ns,
));
builder = builder.memory(ns);
let mem_scope = crate::auth::TenantScope::from_audit_fields(
self.audit_tenant_id.as_deref(),
self.audit_user_id.as_deref(),
);
builder = builder.tools(crate::memory::shared_tools::shared_memory_tools(
memory.clone(),
&agent_ns,
mem_scope,
false, ));
}
let runner = builder.build()?;
info!(
agent = %spawned_name,
tools = ?input.tools,
"spawning dynamic agent"
);
match runner.execute(&input.task).await {
Ok(output) => {
self.spawn_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
{
let mut acc = self.accumulated_tokens.lock().expect("token lock");
*acc += output.tokens_used;
}
if let Some(ref cb) = self.on_event {
cb(AgentEvent::SubAgentCompleted {
agent: spawned_name.clone(),
success: true,
usage: output.tokens_used,
});
}
Ok(ToolOutput::success(format!(
"=== Spawned Agent: {} ===\n{}",
spawned_name, output.result
)))
}
Err(e) => {
self.spawn_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let partial = e.partial_usage();
{
let mut acc = self.accumulated_tokens.lock().expect("token lock");
*acc += partial;
}
if let Some(ref cb) = self.on_event {
cb(AgentEvent::SubAgentCompleted {
agent: spawned_name.clone(),
success: false,
usage: partial,
});
}
Ok(ToolOutput::error(format!(
"Spawned agent '{spawned_name}' failed: {e}"
)))
}
}
}
}
impl Tool for SpawnAgentTool {
fn definition(&self) -> ToolDefinition {
self.cached_definition.clone()
}
fn execute(
&self,
input: serde_json::Value,
) -> Pin<Box<dyn Future<Output = Result<ToolOutput, Error>> + Send + '_>> {
Box::pin(async move {
let spawn_input: SpawnAgentInput = serde_json::from_value(input)
.map_err(|e| Error::Agent(format!("Invalid spawn_agent input: {e}")))?;
self.spawn(spawn_input).await
})
}
}
pub fn build_system_prompt(
agents: &[(&str, &str, &[String])],
squads_enabled: bool,
dispatch_mode: DispatchMode,
) -> String {
let agent_list: String = agents
.iter()
.map(|(name, desc, tools)| {
if tools.is_empty() {
format!("- **{name}**: {desc}\n Tools: (none)")
} else {
format!("- **{name}**: {desc}\n Tools: {}", tools.join(", "))
}
})
.collect::<Vec<_>>()
.join("\n");
let delegation_instructions = match (squads_enabled, dispatch_mode) {
(_, DispatchMode::Sequential) => {
"## Delegation Tool\n\
Delegate to ONE agent at a time using **delegate_task**. Wait for the result \
before deciding the next agent. Do NOT batch multiple agents in a single call."
}
(true, DispatchMode::Parallel) => {
"## Delegation Tools\n\
You have two delegation tools:\n\n\
1. **delegate_task** — Run independent subtasks in parallel. Each agent works in \
isolation and cannot see other agents' output. Use when subtasks are independent.\n\n\
2. **form_squad** — Run subtasks in parallel with a shared blackboard. \
Unlike delegate_task, agents can read each other's results via the blackboard. \
Agents run concurrently — use when they benefit from shared state, not when \
strict ordering is needed.\n\n\
After receiving results, synthesize them into a coherent response."
}
(false, DispatchMode::Parallel) => {
"## Delegation Tool\n\
Use the **delegate_task** tool to assign work to sub-agents. You can assign \
multiple tasks at once for parallel execution. Each agent works in isolation. \
After receiving results, synthesize them into a coherent response."
}
};
let choose_tool_step = match (squads_enabled, dispatch_mode) {
(_, DispatchMode::Sequential) => {
"3. DELEGATE: Use delegate_task with ONE agent at a time. Wait for results before \
delegating to the next agent."
}
(true, DispatchMode::Parallel) => {
"3. CHOOSE TOOL: Select delegate_task for independent parallel work, or form_squad \
when agents benefit from shared state via a blackboard."
}
(false, DispatchMode::Parallel) => {
"3. DELEGATE: Use delegate_task to assign subtasks to the best-fit agents."
}
};
format!(
"You are an orchestrator agent. Analyze incoming tasks and delegate work to \
specialized sub-agents.\n\n\
## Decision Process\n\
1. DECOMPOSE: Break the task into distinct subtasks. Identify which require different expertise.\n\
2. MATCH: For each subtask, pick the best-fit agent based on their description and tools.\n\
{choose_tool_step}\n\n\
## Important\n\
- ALWAYS delegate to a sub-agent using your delegation tools. You do NOT have any \
direct capabilities — sub-agents have the tools. Never respond to the user directly \
without delegating first.\n\n\
## Effort Scaling\n\
- If only ONE agent is relevant, delegate a single task. Do NOT force-split across agents.\n\
- If the task is simple enough for one agent, use one agent.\n\
- Only use multiple agents when the task genuinely has multiple distinct parts \
needing different expertise.\n\n\
## Task Quality\n\
- Each delegated task must be self-contained: include all context the agent needs.\n\
- Be specific: \"Read /path/to/file and extract X\" not \"look at the project\".\n\
- Avoid overlapping tasks — no two agents should do the same work.\n\n\
## Available Sub-Agents\n\
Choose agents based on their description and available tools:\n\
{agent_list}\n\n\
{delegation_instructions}"
)
}
pub fn build_delegate_tool_schema(
agents: &[(&str, &str, &[String])],
dispatch_mode: DispatchMode,
) -> ToolDefinition {
let agent_descriptions: Vec<serde_json::Value> = agents
.iter()
.map(|(name, desc, tools)| json!({"name": name, "description": desc, "tools": tools}))
.collect();
let (description, tasks_schema) = match dispatch_mode {
DispatchMode::Sequential => (
format!(
"Delegate a task to ONE sub-agent at a time. Wait for the result before \
delegating to the next agent. Each task runs in isolation. \
Write clear, self-contained task descriptions with all necessary context. \
Available agents: {}",
serde_json::to_string(&agent_descriptions)
.expect("agent list serialization is infallible")
),
json!({
"type": "array",
"items": {
"type": "object",
"properties": {
"agent": {
"type": "string",
"description": "Name of the sub-agent"
},
"task": {
"type": "string",
"description": "Task instruction for the sub-agent"
}
},
"required": ["agent", "task"]
},
"minItems": 1,
"maxItems": 1
}),
),
DispatchMode::Parallel => (
format!(
"Delegate independent tasks to sub-agents for parallel execution. \
Each task runs in isolation — agents cannot see each other's work. \
Write clear, self-contained task descriptions with all necessary context. \
Available agents: {}",
serde_json::to_string(&agent_descriptions)
.expect("agent list serialization is infallible")
),
json!({
"type": "array",
"items": {
"type": "object",
"properties": {
"agent": {
"type": "string",
"description": "Name of the sub-agent"
},
"task": {
"type": "string",
"description": "Task instruction for the sub-agent"
}
},
"required": ["agent", "task"]
},
"minItems": 1
}),
),
};
ToolDefinition {
name: "delegate_task".into(),
description,
input_schema: json!({
"type": "object",
"properties": {
"tasks": tasks_schema
},
"required": ["tasks"]
}),
}
}
pub(crate) fn build_form_squad_tool_schema(agents: &[(&str, &str, &[String])]) -> ToolDefinition {
let agent_descriptions: Vec<serde_json::Value> = agents
.iter()
.map(|(name, desc, tools)| json!({"name": name, "description": desc, "tools": tools}))
.collect();
ToolDefinition {
name: "form_squad".into(),
description: format!(
"Dispatch per-agent tasks in parallel with a shared blackboard for intra-squad coordination. \
Unlike delegate_task, squad agents can read each other's results via the blackboard. \
Use this when agents benefit from shared state (e.g., building on each other's work, \
coordinating on a shared artifact). Agents run concurrently. \
Requires at least 2 tasks (one per agent). \
Available agents: {}",
serde_json::to_string(&agent_descriptions)
.expect("agent list serialization is infallible")
),
input_schema: json!({
"type": "object",
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"agent": {
"type": "string",
"description": "Name of the sub-agent"
},
"task": {
"type": "string",
"description": "Task instruction for the sub-agent"
}
},
"required": ["agent", "task"]
},
"minItems": 2,
"description": "Per-agent tasks for the squad (minimum 2)"
}
},
"required": ["tasks"]
}),
}
}
#[derive(Default)]
pub struct SubAgentConfig {
pub name: String,
pub description: String,
pub system_prompt: String,
pub tools: Vec<Arc<dyn Tool>>,
pub context_strategy: Option<ContextStrategy>,
pub summarize_threshold: Option<u32>,
pub tool_timeout: Option<Duration>,
pub max_tool_output_bytes: Option<usize>,
pub max_turns: Option<usize>,
pub max_tokens: Option<u32>,
pub response_schema: Option<serde_json::Value>,
pub run_timeout: Option<Duration>,
pub guardrails: Vec<Arc<dyn Guardrail>>,
pub provider: Option<Arc<BoxedProvider>>,
pub reasoning_effort: Option<crate::llm::types::ReasoningEffort>,
pub enable_reflection: Option<bool>,
pub tool_output_compression_threshold: Option<usize>,
pub max_tools_per_turn: Option<usize>,
pub tool_profile: Option<super::tool_filter::ToolProfile>,
pub max_identical_tool_calls: Option<u32>,
pub max_fuzzy_identical_tool_calls: Option<u32>,
pub max_tool_calls_per_turn: Option<u32>,
pub session_prune_config: Option<crate::agent::pruner::SessionPruneConfig>,
pub enable_recursive_summarization: Option<bool>,
pub reflection_threshold: Option<u32>,
pub consolidate_on_exit: Option<bool>,
pub workspace: Option<std::path::PathBuf>,
pub max_total_tokens: Option<u64>,
pub audit_trail: Option<Arc<dyn super::audit::AuditTrail>>,
pub audit_user_id: Option<String>,
pub audit_tenant_id: Option<String>,
#[allow(dead_code)]
pub audit_delegation_chain: Vec<String>,
}
pub struct OrchestratorBuilder<P: LlmProvider> {
provider: Arc<P>,
sub_agents: Vec<SubAgentDef>,
max_turns: usize,
max_tokens: u32,
context_strategy: Option<ContextStrategy>,
summarize_threshold: Option<u32>,
tool_timeout: Option<Duration>,
max_tool_output_bytes: Option<usize>,
shared_memory: Option<Arc<dyn Memory>>,
memory_namespace_prefix: Option<String>,
blackboard: Option<Arc<dyn Blackboard>>,
knowledge_base: Option<Arc<dyn KnowledgeBase>>,
on_text: Option<Arc<crate::llm::OnText>>,
on_approval: Option<Arc<crate::llm::OnApproval>>,
on_event: Option<Arc<OnEvent>>,
guardrails: Vec<Arc<dyn Guardrail>>,
on_question: Option<Arc<OnQuestion>>,
run_timeout: Option<Duration>,
enable_squads: Option<bool>,
reasoning_effort: Option<crate::llm::types::ReasoningEffort>,
enable_reflection: bool,
tool_output_compression_threshold: Option<usize>,
max_tools_per_turn: Option<usize>,
max_identical_tool_calls: Option<u32>,
max_fuzzy_identical_tool_calls: Option<u32>,
max_tool_calls_per_turn: Option<u32>,
permission_rules: super::permission::PermissionRuleset,
instruction_text: Option<String>,
learned_permissions: Option<Arc<std::sync::Mutex<super::permission::LearnedPermissions>>>,
lsp_manager: Option<Arc<crate::lsp::LspManager>>,
observability_mode: Option<super::observability::ObservabilityMode>,
dispatch_mode: DispatchMode,
workspace: Option<std::path::PathBuf>,
audit_trail: Option<Arc<dyn super::audit::AuditTrail>>,
audit_user_id: Option<String>,
audit_tenant_id: Option<String>,
audit_delegation_chain: Vec<String>,
allow_shared_write: bool,
multi_agent_prompt: bool,
spawn_config: Option<crate::types::SpawnConfig>,
spawn_builtin_tools: Vec<Arc<dyn Tool>>,
tenant_tracker: Option<Arc<crate::agent::tenant_tracker::TenantTokenTracker>>,
}
impl<P: LlmProvider + 'static> OrchestratorBuilder<P> {
pub fn sub_agent(
mut self,
name: impl Into<String>,
description: impl Into<String>,
system_prompt: impl Into<String>,
) -> Self {
let mut def = SubAgentDef::new(name, description, system_prompt);
def.workspace = self.workspace.clone();
def.audit_trail = self.audit_trail.clone();
def.audit_user_id = self.audit_user_id.clone();
def.audit_tenant_id = self.audit_tenant_id.clone();
def.audit_delegation_chain = self.audit_delegation_chain.clone();
self.sub_agents.push(def);
self
}
pub fn sub_agent_with_tools(
mut self,
name: impl Into<String>,
description: impl Into<String>,
system_prompt: impl Into<String>,
tools: Vec<Arc<dyn Tool>>,
) -> Self {
let mut def = SubAgentDef::new(name, description, system_prompt);
def.tools = tools;
def.workspace = self.workspace.clone();
def.audit_trail = self.audit_trail.clone();
def.audit_user_id = self.audit_user_id.clone();
def.audit_tenant_id = self.audit_tenant_id.clone();
def.audit_delegation_chain = self.audit_delegation_chain.clone();
self.sub_agents.push(def);
self
}
pub fn sub_agent_full(mut self, config: SubAgentConfig) -> Self {
let mut def = SubAgentDef::from(config);
if def.workspace.is_none() {
def.workspace = self.workspace.clone();
}
if def.audit_trail.is_none() {
def.audit_trail = self.audit_trail.clone();
}
if def.audit_user_id.is_none() {
def.audit_user_id = self.audit_user_id.clone();
}
if def.audit_tenant_id.is_none() {
def.audit_tenant_id = self.audit_tenant_id.clone();
}
if def.audit_delegation_chain.is_empty() {
def.audit_delegation_chain = self.audit_delegation_chain.clone();
}
self.sub_agents.push(def);
self
}
pub fn max_turns(mut self, max_turns: usize) -> Self {
self.max_turns = max_turns;
self
}
pub fn max_tokens(mut self, max_tokens: u32) -> Self {
self.max_tokens = max_tokens;
self
}
pub fn context_strategy(mut self, strategy: ContextStrategy) -> Self {
self.context_strategy = Some(strategy);
self
}
pub fn summarize_threshold(mut self, threshold: u32) -> Self {
self.summarize_threshold = Some(threshold);
self
}
pub fn tool_timeout(mut self, timeout: Duration) -> Self {
self.tool_timeout = Some(timeout);
self
}
pub fn max_tool_output_bytes(mut self, max: usize) -> Self {
self.max_tool_output_bytes = Some(max);
self
}
pub fn shared_memory(mut self, memory: Arc<dyn Memory>) -> Self {
self.shared_memory = Some(memory);
self
}
pub fn memory_namespace_prefix(mut self, prefix: impl Into<String>) -> Self {
self.memory_namespace_prefix = Some(prefix.into());
self
}
pub fn allow_shared_write(mut self, allow: bool) -> Self {
self.allow_shared_write = allow;
self
}
pub fn multi_agent_prompt(mut self, enabled: bool) -> Self {
self.multi_agent_prompt = enabled;
self
}
pub fn spawn_config(
mut self,
config: crate::types::SpawnConfig,
builtin_tools: Vec<Arc<dyn Tool>>,
) -> Self {
self.spawn_config = Some(config);
self.spawn_builtin_tools = builtin_tools;
self
}
pub fn tenant_tracker(
mut self,
tracker: Arc<crate::agent::tenant_tracker::TenantTokenTracker>,
) -> Self {
self.tenant_tracker = Some(tracker);
self
}
pub fn blackboard(mut self, blackboard: Arc<dyn Blackboard>) -> Self {
self.blackboard = Some(blackboard);
self
}
pub fn knowledge(mut self, kb: Arc<dyn KnowledgeBase>) -> Self {
self.knowledge_base = Some(kb);
self
}
pub fn on_text(mut self, callback: Arc<crate::llm::OnText>) -> Self {
self.on_text = Some(callback);
self
}
pub fn on_approval(mut self, callback: Arc<crate::llm::OnApproval>) -> Self {
self.on_approval = Some(callback);
self
}
pub fn learned_permissions(
mut self,
learned: Arc<std::sync::Mutex<super::permission::LearnedPermissions>>,
) -> Self {
self.learned_permissions = Some(learned);
self
}
pub fn lsp_manager(mut self, manager: Arc<crate::lsp::LspManager>) -> Self {
self.lsp_manager = Some(manager);
self
}
pub fn on_event(mut self, callback: Arc<OnEvent>) -> Self {
self.on_event = Some(callback);
self
}
pub fn guardrail(mut self, guardrail: Arc<dyn Guardrail>) -> Self {
self.guardrails.push(guardrail);
self
}
pub fn guardrails(mut self, guardrails: Vec<Arc<dyn Guardrail>>) -> Self {
self.guardrails.extend(guardrails);
self
}
pub fn on_question(mut self, callback: Arc<OnQuestion>) -> Self {
self.on_question = Some(callback);
self
}
pub fn run_timeout(mut self, timeout: Duration) -> Self {
self.run_timeout = Some(timeout);
self
}
pub fn enable_squads(mut self, enable: bool) -> Self {
self.enable_squads = Some(enable);
self
}
pub fn reasoning_effort(mut self, effort: crate::llm::types::ReasoningEffort) -> Self {
self.reasoning_effort = Some(effort);
self
}
pub fn enable_reflection(mut self, enabled: bool) -> Self {
self.enable_reflection = enabled;
self
}
pub fn tool_output_compression_threshold(mut self, threshold: usize) -> Self {
self.tool_output_compression_threshold = Some(threshold);
self
}
pub fn max_tools_per_turn(mut self, max: usize) -> Self {
self.max_tools_per_turn = Some(max);
self
}
pub fn max_identical_tool_calls(mut self, max: u32) -> Self {
self.max_identical_tool_calls = Some(max);
self
}
pub fn max_fuzzy_identical_tool_calls(mut self, max: u32) -> Self {
self.max_fuzzy_identical_tool_calls = Some(max);
self
}
pub fn max_tool_calls_per_turn(mut self, cap: u32) -> Self {
self.max_tool_calls_per_turn = Some(cap);
self
}
pub fn permission_rules(mut self, rules: super::permission::PermissionRuleset) -> Self {
self.permission_rules = rules;
self
}
pub fn instruction_text(mut self, text: impl Into<String>) -> Self {
let text = text.into();
if !text.is_empty() {
self.instruction_text = Some(text);
}
self
}
pub fn observability_mode(mut self, mode: super::observability::ObservabilityMode) -> Self {
self.observability_mode = Some(mode);
self
}
pub fn dispatch_mode(mut self, mode: DispatchMode) -> Self {
self.dispatch_mode = mode;
self
}
pub fn workspace(mut self, path: impl Into<std::path::PathBuf>) -> Self {
self.workspace = Some(path.into());
self
}
pub fn audit_trail(mut self, trail: Arc<dyn super::audit::AuditTrail>) -> Self {
self.audit_trail = Some(trail);
self
}
pub fn audit_user_context(
mut self,
user_id: impl Into<String>,
tenant_id: impl Into<String>,
) -> Self {
self.audit_user_id = Some(user_id.into());
self.audit_tenant_id = Some(tenant_id.into());
self
}
pub fn audit_delegation_chain(mut self, chain: Vec<String>) -> Self {
self.audit_delegation_chain = chain;
self
}
pub fn build(mut self) -> Result<Orchestrator<P>, Error> {
if self.multi_agent_prompt {
for agent in &mut self.sub_agents {
agent
.system_prompt
.push_str(&crate::agent::prompts::render_collab_prompt(
&agent.name,
&agent.description,
));
}
}
{
let mut seen = std::collections::HashSet::new();
for agent in &self.sub_agents {
if agent.name.is_empty() {
return Err(Error::Config("sub-agent name must not be empty".into()));
}
if !seen.insert(&agent.name) {
return Err(Error::Config(format!(
"duplicate sub-agent name: '{}'",
agent.name
)));
}
if agent.max_turns == Some(0) {
return Err(Error::Config(format!(
"sub-agent '{}': max_turns must be > 0",
agent.name
)));
}
if agent.max_tokens == Some(0) {
return Err(Error::Config(format!(
"sub-agent '{}': max_tokens must be > 0",
agent.name
)));
}
}
}
if self.sub_agents.is_empty() {
tracing::warn!(
"orchestrator built with no sub-agents — delegate_task tool will list no agents"
);
}
let squads_enabled = if self.dispatch_mode == DispatchMode::Sequential {
false
} else {
self.enable_squads.unwrap_or(self.sub_agents.len() >= 2)
};
if squads_enabled && self.sub_agents.len() < 2 {
tracing::warn!(
"enable_squads is true but fewer than 2 agents are registered — \
form_squad requires at least 2 agents to be useful"
);
}
let tool_names: Vec<Vec<String>> = self
.sub_agents
.iter()
.map(|a| a.tools.iter().map(|t| t.definition().name).collect())
.collect();
let triples: Vec<(&str, &str, &[String])> = self
.sub_agents
.iter()
.zip(tool_names.iter())
.map(|(a, names)| (a.name.as_str(), a.description.as_str(), names.as_slice()))
.collect();
let mut system = build_system_prompt(&triples, squads_enabled, self.dispatch_mode);
if let (Some(uid), Some(tid)) = (&self.audit_user_id, &self.audit_tenant_id) {
system.push_str(&format!(
"\n---\nYou are operating on behalf of **{uid}** in organization **{tid}**.\nKeep this user's information private. Do not share their data with other users."
));
}
let cached_definition = build_delegate_tool_schema(&triples, self.dispatch_mode);
let form_squad_definition = if squads_enabled {
Some(build_form_squad_tool_schema(&triples))
} else {
None
};
drop(triples);
drop(tool_names);
let sub_agent_tokens = Arc::new(Mutex::new(TokenUsage::default()));
let shared_provider = Arc::new(BoxedProvider::from_arc(self.provider.clone()));
let agent_pool = if squads_enabled {
Some(self.sub_agents.clone())
} else {
None
};
let resolved_mode = self
.observability_mode
.unwrap_or(super::observability::ObservabilityMode::Production);
let spawn_tool_data = if let Some(spawn_cfg) = self.spawn_config.take() {
let mut tool_pool = std::collections::HashMap::new();
for tool in &self.spawn_builtin_tools {
let name = tool.definition().name;
if spawn_cfg.tool_allowlist.contains(&name) {
tool_pool.insert(name, tool.clone());
}
}
for allowed in &spawn_cfg.tool_allowlist {
if !tool_pool.contains_key(allowed) {
return Err(Error::Config(format!(
"orchestrator.spawn.tool_allowlist: unknown tool '{}'. \
Available builtin tools: [{}]",
allowed,
self.spawn_builtin_tools
.iter()
.map(|t| t.definition().name)
.collect::<Vec<_>>()
.join(", ")
)));
}
}
tool_pool.remove("delegate_task");
tool_pool.remove("form_squad");
tool_pool.remove("spawn_agent");
let cached_definition = SpawnAgentTool::build_definition(&spawn_cfg);
system.push_str(
"\n\n## Dynamic Agent Spawning\n\
You also have the **spawn_agent** tool to create specialist agents at runtime \
when no pre-configured agent fits the task. Use this as a secondary option — \
prefer delegating to existing agents when they match the need.",
);
Some((spawn_cfg, tool_pool, cached_definition))
} else {
None
};
let delegate_tool: Arc<dyn Tool> = Arc::new(DelegateTaskTool {
shared_provider: shared_provider.clone(),
sub_agents: self.sub_agents,
max_turns: self.max_turns,
max_tokens: self.max_tokens,
permission_rules: self.permission_rules.clone(),
accumulated_tokens: sub_agent_tokens.clone(),
shared_memory: self.shared_memory.clone(),
memory_namespace_prefix: self.memory_namespace_prefix.clone(),
blackboard: self.blackboard.clone(),
knowledge_base: self.knowledge_base.clone(),
cached_definition,
on_event: self.on_event.clone(),
on_text: self.on_text.clone(),
lsp_manager: self.lsp_manager.clone(),
observability_mode: resolved_mode,
allow_shared_write: self.allow_shared_write,
tenant_tracker: self.tenant_tracker.clone(),
guardrails: self.guardrails.clone(),
});
let mut runner_builder = AgentRunner::builder(self.provider)
.name("orchestrator")
.system_prompt(system)
.tool(delegate_tool)
.max_turns(self.max_turns)
.max_tokens(self.max_tokens);
if let Some(agent_pool) = agent_pool {
let squad_def = form_squad_definition.expect("squad definition computed when enabled");
let form_squad_tool: Arc<dyn Tool> = Arc::new(FormSquadTool {
shared_provider: shared_provider.clone(),
agent_pool,
default_max_turns: self.max_turns,
default_max_tokens: self.max_tokens,
permission_rules: self.permission_rules.clone(),
accumulated_tokens: sub_agent_tokens.clone(),
shared_memory: self.shared_memory.clone(),
memory_namespace_prefix: self.memory_namespace_prefix.clone(),
blackboard: self.blackboard.clone(),
knowledge_base: self.knowledge_base.clone(),
on_event: self.on_event.clone(),
on_text: self.on_text.clone(),
lsp_manager: self.lsp_manager.clone(),
cached_definition: squad_def,
observability_mode: resolved_mode,
allow_shared_write: self.allow_shared_write,
tenant_tracker: self.tenant_tracker.clone(),
});
runner_builder = runner_builder.tool(form_squad_tool);
}
if let Some((spawn_cfg, tool_pool, spawn_def)) = spawn_tool_data {
let spawn_tool: Arc<dyn Tool> = Arc::new(SpawnAgentTool {
shared_provider: shared_provider.clone(),
spawn_config: spawn_cfg,
tool_pool,
spawn_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
spawned_names: Arc::new(Mutex::new(std::collections::HashSet::new())),
accumulated_tokens: sub_agent_tokens.clone(),
permission_rules: self.permission_rules.clone(),
shared_memory: self.shared_memory.clone(),
memory_namespace_prefix: self.memory_namespace_prefix.clone(),
on_event: self.on_event.clone(),
on_text: self.on_text.clone(),
lsp_manager: self.lsp_manager.clone(),
observability_mode: resolved_mode,
workspace: self.workspace.clone(),
guardrails: self.guardrails.clone(),
audit_trail: self.audit_trail.clone(),
audit_user_id: self.audit_user_id.clone(),
audit_tenant_id: self.audit_tenant_id.clone(),
audit_delegation_chain: self.audit_delegation_chain.clone(),
cached_definition: spawn_def,
tenant_tracker: self.tenant_tracker.clone(),
});
runner_builder = runner_builder.tool(spawn_tool);
}
if let Some(ref memory) = self.shared_memory {
let orch_ns = self
.memory_namespace_prefix
.as_deref()
.unwrap_or("orchestrator");
let mem_scope = crate::auth::TenantScope::from_audit_fields(
self.audit_tenant_id.as_deref(),
self.audit_user_id.as_deref(),
);
let mem_tools = crate::memory::shared_tools::shared_memory_tools(
memory.clone(),
orch_ns,
mem_scope,
self.allow_shared_write,
);
runner_builder = runner_builder.tools(mem_tools);
}
if let Some(strategy) = self.context_strategy {
runner_builder = runner_builder.context_strategy(strategy);
}
if let Some(threshold) = self.summarize_threshold {
runner_builder = runner_builder.summarize_threshold(threshold);
}
if let Some(timeout) = self.tool_timeout {
runner_builder = runner_builder.tool_timeout(timeout);
}
if let Some(max) = self.max_tool_output_bytes {
runner_builder = runner_builder.max_tool_output_bytes(max);
}
if let Some(on_text) = self.on_text {
runner_builder = runner_builder.on_text(on_text);
}
if let Some(on_approval) = self.on_approval {
runner_builder = runner_builder.on_approval(on_approval);
}
if let Some(learned) = self.learned_permissions {
runner_builder = runner_builder.learned_permissions(learned);
}
if let Some(lsp) = self.lsp_manager {
runner_builder = runner_builder.lsp_manager(lsp);
}
if let Some(on_event) = self.on_event {
runner_builder = runner_builder.on_event(on_event);
}
if !self.guardrails.is_empty() {
runner_builder = runner_builder.guardrails(self.guardrails);
}
if let Some(on_question) = self.on_question {
runner_builder = runner_builder.on_question(on_question);
}
if let Some(timeout) = self.run_timeout {
runner_builder = runner_builder.run_timeout(timeout);
}
if let Some(effort) = self.reasoning_effort {
runner_builder = runner_builder.reasoning_effort(effort);
}
if self.enable_reflection {
runner_builder = runner_builder.enable_reflection(true);
}
if let Some(threshold) = self.tool_output_compression_threshold {
runner_builder = runner_builder.tool_output_compression_threshold(threshold);
}
if let Some(max) = self.max_tools_per_turn {
runner_builder = runner_builder.max_tools_per_turn(max);
}
if let Some(max) = self.max_identical_tool_calls {
runner_builder = runner_builder.max_identical_tool_calls(max);
}
if let Some(max) = self.max_fuzzy_identical_tool_calls {
runner_builder = runner_builder.max_fuzzy_identical_tool_calls(max);
}
if let Some(cap) = self.max_tool_calls_per_turn {
runner_builder = runner_builder.max_tool_calls_per_turn(cap);
}
if !self.permission_rules.is_empty() {
runner_builder = runner_builder.permission_rules(self.permission_rules);
}
if let Some(text) = self.instruction_text {
runner_builder = runner_builder.instruction_text(text);
}
if let Some(mode) = self.observability_mode {
runner_builder = runner_builder.observability_mode(mode);
}
if let Some(trail) = self.audit_trail {
runner_builder = runner_builder.audit_trail(trail);
}
if let Some(uid) = self.audit_user_id
&& let Some(tid) = self.audit_tenant_id
{
runner_builder = runner_builder.audit_user_context(uid, tid);
}
if !self.audit_delegation_chain.is_empty() {
runner_builder =
runner_builder.audit_delegation_chain(self.audit_delegation_chain.clone());
}
if let Some(tracker) = self.tenant_tracker {
runner_builder = runner_builder.tenant_tracker(tracker);
}
let runner = runner_builder.build()?;
Ok(Orchestrator {
runner,
sub_agent_tokens,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm::types::{
CompletionRequest, CompletionResponse, ContentBlock, StopReason, TokenUsage,
};
use crate::tool::ToolOutput;
use std::sync::Mutex;
struct MockProvider {
responses: Mutex<Vec<CompletionResponse>>,
}
impl MockProvider {
fn new(responses: Vec<CompletionResponse>) -> Self {
Self {
responses: Mutex::new(responses),
}
}
}
impl LlmProvider for MockProvider {
async fn complete(&self, _request: CompletionRequest) -> Result<CompletionResponse, Error> {
let mut responses = self.responses.lock().expect("mock lock poisoned");
if responses.is_empty() {
return Err(Error::Agent("no more mock responses".into()));
}
Ok(responses.remove(0))
}
fn model_name(&self) -> Option<&str> {
Some("mock-model-v1")
}
}
struct MockTool {
def: crate::llm::types::ToolDefinition,
response: String,
}
impl MockTool {
fn new(name: &str, response: &str) -> Self {
Self {
def: crate::llm::types::ToolDefinition {
name: name.into(),
description: format!("Mock {name}"),
input_schema: json!({"type": "object"}),
},
response: response.into(),
}
}
}
impl crate::tool::Tool for MockTool {
fn definition(&self) -> crate::llm::types::ToolDefinition {
self.def.clone()
}
fn execute(
&self,
_input: serde_json::Value,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ToolOutput, Error>> + Send + '_>,
> {
let response = self.response.clone();
Box::pin(async move { Ok(ToolOutput::success(response)) })
}
}
#[test]
fn system_prompt_includes_agents() {
let tools_a = vec!["web_search".to_string(), "read_file".to_string()];
let tools_b: Vec<String> = vec![];
let agents: Vec<(&str, &str, &[String])> = vec![
("researcher", "Research specialist", tools_a.as_slice()),
("coder", "Coding expert", tools_b.as_slice()),
];
let prompt = build_system_prompt(&agents, false, DispatchMode::Parallel);
assert!(prompt.contains("researcher"));
assert!(prompt.contains("Research specialist"));
assert!(prompt.contains("coder"));
assert!(prompt.contains("Tools: web_search, read_file"));
assert!(prompt.contains("Tools: (none)"));
assert!(
prompt.contains("Decision Process"),
"prompt should contain Decision Process section: {prompt}"
);
assert!(
prompt.contains("Effort Scaling"),
"prompt should contain Effort Scaling section: {prompt}"
);
assert!(
prompt.contains("Task Quality"),
"prompt should contain Task Quality section: {prompt}"
);
assert!(
prompt.contains("DECOMPOSE"),
"prompt should contain decomposition guidance: {prompt}"
);
}
#[test]
fn system_prompt_shows_tool_names() {
let tools = vec![
"web_search".to_string(),
"read_file".to_string(),
"knowledge_search".to_string(),
];
let no_tools: Vec<String> = vec![];
let agents: Vec<(&str, &str, &[String])> = vec![
("researcher", "Research specialist", tools.as_slice()),
("analyst", "Analytical thinker", no_tools.as_slice()),
];
let prompt = build_system_prompt(&agents, false, DispatchMode::Parallel);
assert!(
prompt.contains("Tools: web_search, read_file, knowledge_search"),
"prompt should list tool names: {prompt}"
);
assert!(
prompt.contains("Tools: (none)"),
"prompt should show (none) for agents without tools: {prompt}"
);
}
#[test]
fn system_prompt_sequential_says_one_at_a_time() {
let tools: Vec<String> = vec![];
let agents: Vec<(&str, &str, &[String])> =
vec![("builder", "Builds stuff", tools.as_slice())];
let prompt = build_system_prompt(&agents, false, DispatchMode::Sequential);
assert!(
prompt.contains("ONE agent at a time"),
"sequential prompt should say one at a time: {prompt}"
);
assert!(
!prompt.contains("parallel execution"),
"sequential prompt should not mention parallel: {prompt}"
);
}
#[test]
fn system_prompt_parallel_says_parallel() {
let tools: Vec<String> = vec![];
let agents: Vec<(&str, &str, &[String])> =
vec![("builder", "Builds stuff", tools.as_slice())];
let prompt = build_system_prompt(&agents, false, DispatchMode::Parallel);
assert!(
prompt.contains("parallel execution"),
"parallel prompt should mention parallel: {prompt}"
);
}
#[test]
fn delegate_schema_sequential_max_items_1() {
let tools: Vec<String> = vec![];
let agents: Vec<(&str, &str, &[String])> =
vec![("builder", "Builds stuff", tools.as_slice())];
let def = build_delegate_tool_schema(&agents, DispatchMode::Sequential);
let tasks = &def.input_schema["properties"]["tasks"];
assert_eq!(
tasks["maxItems"], 1,
"sequential schema should have maxItems=1: {tasks}"
);
assert!(
def.description.contains("ONE sub-agent"),
"sequential description should say ONE: {}",
def.description
);
}
#[test]
fn delegate_schema_parallel_no_max_items() {
let tools: Vec<String> = vec![];
let agents: Vec<(&str, &str, &[String])> =
vec![("builder", "Builds stuff", tools.as_slice())];
let def = build_delegate_tool_schema(&agents, DispatchMode::Parallel);
let tasks = &def.input_schema["properties"]["tasks"];
assert!(
tasks.get("maxItems").is_none(),
"parallel schema should not have maxItems: {tasks}"
);
}
#[tokio::test]
async fn sequential_dispatch_disables_squads() {
let provider = Arc::new(MockProvider::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
}]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("a", "Agent A", "prompt a")
.sub_agent("b", "Agent B", "prompt b")
.dispatch_mode(DispatchMode::Sequential)
.build()
.unwrap();
let output = orch.run("test").await.unwrap();
assert_eq!(output.result, "done");
}
#[test]
fn sequential_dispatch_disables_squads_in_prompt() {
let tools: Vec<String> = vec![];
let agents: Vec<(&str, &str, &[String])> = vec![
("a", "Agent A", tools.as_slice()),
("b", "Agent B", tools.as_slice()),
];
let prompt = build_system_prompt(&agents, false, DispatchMode::Sequential);
assert!(
!prompt.contains("form_squad"),
"sequential prompt should not mention form_squad: {prompt}"
);
}
#[test]
fn delegate_tool_schema_includes_agents() {
let tools = vec!["web_search".to_string()];
let agents: Vec<(&str, &str, &[String])> =
vec![("researcher", "Research", tools.as_slice())];
let def = build_delegate_tool_schema(&agents, DispatchMode::Parallel);
assert_eq!(def.name, "delegate_task");
assert!(def.description.contains("researcher"));
assert!(
def.description.contains("web_search"),
"delegate tool description should contain tool names: {}",
def.description
);
assert!(
def.description.contains("isolation"),
"delegate tool description should mention isolation: {}",
def.description
);
assert!(
def.description.contains("self-contained"),
"delegate tool description should mention self-contained tasks: {}",
def.description
);
}
#[test]
fn delegate_tool_definition_includes_agents() {
let agents: Vec<(&str, &str, &[String])> = vec![("researcher", "Research", &[])];
let cached_definition = build_delegate_tool_schema(&agents, DispatchMode::Parallel);
let tool = DelegateTaskTool {
shared_provider: Arc::new(BoxedProvider::new(MockProvider::new(vec![]))),
sub_agents: vec![SubAgentDef {
name: "researcher".into(),
description: "Research".into(),
system_prompt: "prompt".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider_override: None,
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
}],
shared_memory: None,
memory_namespace_prefix: None,
blackboard: None,
knowledge_base: None,
max_turns: 10,
max_tokens: 4096,
permission_rules: crate::agent::permission::PermissionRuleset::default(),
accumulated_tokens: Arc::new(Mutex::new(TokenUsage::default())),
cached_definition,
on_event: None,
on_text: None,
lsp_manager: None,
observability_mode: crate::ObservabilityMode::Production,
allow_shared_write: true,
tenant_tracker: None,
guardrails: vec![],
};
let def = tool.definition();
assert_eq!(def.name, "delegate_task");
assert!(def.description.contains("researcher"));
assert!(
def.description.contains("tools"),
"delegate tool description should contain 'tools' key: {}",
def.description
);
}
#[test]
fn build_errors_on_duplicate_sub_agent_names() {
let provider = Arc::new(MockProvider::new(vec![]));
let result = Orchestrator::builder(provider)
.sub_agent("researcher", "Research 1", "prompt1")
.sub_agent("researcher", "Research 2", "prompt2")
.build();
assert!(result.is_err());
let err = result.err().unwrap();
assert!(
err.to_string()
.contains("duplicate sub-agent name: 'researcher'"),
"error: {err}"
);
}
#[tokio::test]
async fn orchestrator_direct_response_no_delegation() {
let provider = Arc::new(MockProvider::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "Simple answer.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 10,
output_tokens: 5,
..Default::default()
},
model: None,
}]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.build()
.unwrap();
let output = orch.run("simple question").await.unwrap();
assert_eq!(output.result, "Simple answer.");
assert_eq!(output.tool_calls_made, 0);
}
#[tokio::test]
async fn orchestrator_delegates_and_synthesizes() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [
{"agent": "researcher", "task": "Research Rust"},
{"agent": "analyst", "task": "Analyze findings"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 50,
output_tokens: 20,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Rust is fast and safe.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 10,
output_tokens: 8,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Strengths: memory safety, performance.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 12,
output_tokens: 10,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Based on research: Rust is excellent.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 80,
output_tokens: 30,
..Default::default()
},
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research specialist", "You research.")
.sub_agent("analyst", "Analysis expert", "You analyze.")
.build()
.unwrap();
let output = orch.run("Analyze Rust").await.unwrap();
assert_eq!(output.result, "Based on research: Rust is excellent.");
assert_eq!(output.tool_calls_made, 1); assert_eq!(output.tokens_used.input_tokens, 50 + 80 + 10 + 12);
assert_eq!(output.tokens_used.output_tokens, 20 + 30 + 8 + 10);
}
#[tokio::test]
async fn orchestrator_handles_unknown_agent_gracefully() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "nonexistent", "task": "do stuff"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "No such agent available.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.build()
.unwrap();
let output = orch.run("delegate to unknown").await.unwrap();
assert_eq!(output.result, "No such agent available.");
}
#[tokio::test]
async fn orchestrator_handles_invalid_tool_name() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "wrong_tool".into(),
input: json!({}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Sorry, let me respond directly.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.build()
.unwrap();
let output = orch.run("do something").await.unwrap();
assert_eq!(output.result, "Sorry, let me respond directly.");
}
#[tokio::test]
async fn orchestrator_handles_empty_delegate_tasks() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({"tasks": []}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Let me try again properly.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.build()
.unwrap();
let output = orch.run("do something").await.unwrap();
assert_eq!(output.result, "Let me try again properly.");
}
#[tokio::test]
async fn orchestrator_handles_missing_tasks_field() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "I need to format correctly.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.build()
.unwrap();
let output = orch.run("do something").await.unwrap();
assert_eq!(output.result, "I need to format correctly.");
}
#[tokio::test]
async fn blackboard_populated_after_delegation() {
use crate::agent::blackboard::InMemoryBlackboard;
let bb = Arc::new(InMemoryBlackboard::new());
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "researcher", "task": "Find info"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Research result here.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research specialist", "You research.")
.blackboard(bb.clone())
.build()
.unwrap();
orch.run("research something").await.unwrap();
let val: Option<serde_json::Value> = bb.read("agent:researcher").await.unwrap();
assert!(val.is_some(), "blackboard should have agent:researcher key");
assert_eq!(
val.unwrap(),
serde_json::Value::String("Research result here.".into())
);
}
#[tokio::test]
async fn sub_agents_receive_blackboard_tools() {
use crate::agent::blackboard::InMemoryBlackboard;
use crate::llm::types::CompletionRequest;
struct ToolTrackingProvider {
responses: Mutex<Vec<CompletionResponse>>,
tool_names_seen: Mutex<Vec<Vec<String>>>,
}
impl LlmProvider for ToolTrackingProvider {
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, Error> {
let names: Vec<String> = request.tools.iter().map(|t| t.name.clone()).collect();
self.tool_names_seen.lock().expect("lock").push(names);
let mut responses = self.responses.lock().expect("lock");
if responses.is_empty() {
return Err(Error::Agent("no more mock responses".into()));
}
Ok(responses.remove(0))
}
}
let bb = Arc::new(InMemoryBlackboard::new());
let provider = Arc::new(ToolTrackingProvider {
responses: Mutex::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "worker", "task": "do work"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Work done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "All done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]),
tool_names_seen: Mutex::new(vec![]),
});
let mut orch = Orchestrator::builder(provider.clone())
.sub_agent("worker", "Worker agent", "You work.")
.blackboard(bb)
.build()
.unwrap();
orch.run("do work").await.unwrap();
let all_tool_names = provider.tool_names_seen.lock().expect("lock");
assert!(
all_tool_names.len() >= 2,
"expected at least 2 LLM calls, got {}",
all_tool_names.len()
);
let sub_agent_tools = &all_tool_names[1];
assert!(
sub_agent_tools.contains(&"blackboard_read".to_string()),
"sub-agent should have blackboard_read tool, got: {sub_agent_tools:?}"
);
assert!(
sub_agent_tools.contains(&"blackboard_write".to_string()),
"sub-agent should have blackboard_write tool, got: {sub_agent_tools:?}"
);
assert!(
sub_agent_tools.contains(&"blackboard_list".to_string()),
"sub-agent should have blackboard_list tool, got: {sub_agent_tools:?}"
);
}
#[test]
fn blackboard_builder_method_works() {
use crate::agent::blackboard::InMemoryBlackboard;
let bb = Arc::new(InMemoryBlackboard::new());
let provider = Arc::new(MockProvider::new(vec![]));
let result = Orchestrator::builder(provider)
.sub_agent("agent1", "Agent one", "You are agent 1.")
.blackboard(bb)
.build();
assert!(result.is_ok());
}
#[test]
fn knowledge_builder_method_works() {
use crate::knowledge::in_memory::InMemoryKnowledgeBase;
let kb: Arc<dyn KnowledgeBase> = Arc::new(InMemoryKnowledgeBase::new());
let provider = Arc::new(MockProvider::new(vec![]));
let result = Orchestrator::builder(provider)
.sub_agent("agent1", "Agent one", "You are agent 1.")
.knowledge(kb)
.build();
assert!(result.is_ok());
}
#[tokio::test]
async fn sub_agents_receive_knowledge_tools() {
use crate::knowledge::in_memory::InMemoryKnowledgeBase;
use crate::llm::types::CompletionRequest;
struct ToolTrackingProvider {
responses: Mutex<Vec<CompletionResponse>>,
tool_names_seen: Mutex<Vec<Vec<String>>>,
}
impl LlmProvider for ToolTrackingProvider {
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, Error> {
let names: Vec<String> = request.tools.iter().map(|t| t.name.clone()).collect();
self.tool_names_seen.lock().expect("lock").push(names);
let mut responses = self.responses.lock().expect("lock");
if responses.is_empty() {
return Err(Error::Agent("no more mock responses".into()));
}
Ok(responses.remove(0))
}
}
let kb: Arc<dyn KnowledgeBase> = Arc::new(InMemoryKnowledgeBase::new());
let provider = Arc::new(ToolTrackingProvider {
responses: Mutex::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "worker", "task": "do work"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Work done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "All done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]),
tool_names_seen: Mutex::new(vec![]),
});
let mut orch = Orchestrator::builder(provider.clone())
.sub_agent("worker", "Worker agent", "You work.")
.knowledge(kb)
.build()
.unwrap();
orch.run("do work").await.unwrap();
let all_tool_names = provider.tool_names_seen.lock().expect("lock");
assert!(
all_tool_names.len() >= 2,
"expected at least 2 LLM calls, got {}",
all_tool_names.len()
);
let sub_agent_tools = &all_tool_names[1];
assert!(
sub_agent_tools.contains(&"knowledge_search".to_string()),
"sub-agent should have knowledge_search tool, got: {sub_agent_tools:?}"
);
}
#[tokio::test]
async fn orchestrator_accumulates_cache_tokens_through_delegation() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "researcher", "task": "Research Rust"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 50,
output_tokens: 20,
cache_creation_input_tokens: 100,
cache_read_input_tokens: 0,
reasoning_tokens: 0,
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Rust is fast.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 10,
output_tokens: 8,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 30,
reasoning_tokens: 0,
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Rust is excellent.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 80,
output_tokens: 30,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 90,
reasoning_tokens: 0,
},
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research specialist", "You research.")
.build()
.unwrap();
let output = orch.run("Analyze Rust").await.unwrap();
assert_eq!(output.tokens_used.input_tokens, 50 + 80 + 10);
assert_eq!(output.tokens_used.output_tokens, 20 + 30 + 8);
assert_eq!(output.tokens_used.cache_creation_input_tokens, 100);
assert_eq!(output.tokens_used.cache_read_input_tokens, 90 + 30);
}
#[tokio::test]
async fn orchestrator_error_includes_sub_agent_tokens() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "researcher", "task": "Research Rust"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 50,
output_tokens: 20,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Rust is fast.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 15,
output_tokens: 10,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-2".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "researcher", "task": "More research"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 80,
output_tokens: 25,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "More info.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 12,
output_tokens: 8,
..Default::default()
},
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.max_turns(2)
.build()
.unwrap();
let err = orch.run("research deeply").await.unwrap_err();
match &err {
Error::WithPartialUsage { source, .. } => {
assert!(
matches!(**source, Error::MaxTurnsExceeded(2)),
"inner error should be MaxTurnsExceeded(2), got: {source}"
);
}
other => panic!("expected WithPartialUsage, got: {other}"),
}
let usage = err.partial_usage();
assert_eq!(
usage.input_tokens,
50 + 80 + 15 + 12,
"input tokens: orchestrator(50+80) + sub-agent(15+12)"
);
assert_eq!(
usage.output_tokens,
20 + 25 + 10 + 8,
"output tokens: orchestrator(20+25) + sub-agent(10+8)"
);
}
#[tokio::test]
async fn on_event_emits_sub_agent_events() {
use crate::agent::events::AgentEvent;
let events: Arc<std::sync::Mutex<Vec<AgentEvent>>> =
Arc::new(std::sync::Mutex::new(vec![]));
let events_clone = events.clone();
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "researcher", "task": "Research Rust"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Rust is fast.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 10,
output_tokens: 5,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Summary: Rust is fast.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.on_event(Arc::new(move |e| {
events_clone.lock().unwrap().push(e);
}))
.build()
.unwrap();
orch.run("research task").await.unwrap();
let events = events.lock().unwrap();
let dispatched: Vec<_> = events
.iter()
.filter(|e| matches!(e, AgentEvent::SubAgentsDispatched { .. }))
.collect();
assert_eq!(dispatched.len(), 1, "expected 1 SubAgentsDispatched");
match &dispatched[0] {
AgentEvent::SubAgentsDispatched { agent, agents } => {
assert_eq!(agent, "orchestrator");
assert_eq!(agents, &["researcher"]);
}
_ => unreachable!(),
}
let completed: Vec<_> = events
.iter()
.filter(|e| matches!(e, AgentEvent::SubAgentCompleted { .. }))
.collect();
assert_eq!(completed.len(), 1, "expected 1 SubAgentCompleted");
match &completed[0] {
AgentEvent::SubAgentCompleted {
agent,
success,
usage,
} => {
assert_eq!(agent, "researcher");
assert!(success);
assert_eq!(usage.input_tokens, 10);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn sub_agent_receives_guardrails() {
use crate::agent::guardrail::Guardrail;
use crate::llm::types::CompletionRequest;
struct MarkerGuardrail;
impl Guardrail for MarkerGuardrail {
fn pre_llm(
&self,
request: &mut CompletionRequest,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), crate::error::Error>> + Send + '_>,
> {
request.system = format!("{} [GUARDRAIL_ACTIVE]", request.system);
Box::pin(async { Ok(()) })
}
}
struct CapturingProvider {
responses: Mutex<Vec<CompletionResponse>>,
systems_seen: Mutex<Vec<String>>,
}
impl LlmProvider for CapturingProvider {
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, crate::error::Error> {
self.systems_seen
.lock()
.unwrap()
.push(request.system.clone());
let mut responses = self.responses.lock().unwrap();
if responses.is_empty() {
return Err(crate::error::Error::Agent("no more responses".into()));
}
Ok(responses.remove(0))
}
}
let guardrail: Arc<dyn Guardrail> = Arc::new(MarkerGuardrail);
let provider = Arc::new(CapturingProvider {
responses: Mutex::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "worker", "task": "do work"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Work done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "All done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]),
systems_seen: Mutex::new(vec![]),
});
let mut orch = Orchestrator::builder(provider.clone())
.sub_agent_full(SubAgentConfig {
name: "worker".into(),
description: "Worker agent".into(),
system_prompt: "You work.".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![guardrail],
provider: None,
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build()
.unwrap();
orch.run("do work").await.unwrap();
let systems = provider.systems_seen.lock().unwrap();
assert!(
systems.len() >= 2,
"expected at least 2 LLM calls, got {}",
systems.len()
);
assert!(
systems[1].contains("[GUARDRAIL_ACTIVE]"),
"sub-agent system prompt should contain guardrail marker: {}",
systems[1]
);
assert!(
!systems[0].contains("[GUARDRAIL_ACTIVE]"),
"orchestrator system prompt should NOT contain guardrail marker: {}",
systems[0]
);
}
#[tokio::test]
async fn orchestrator_guardrails_propagate_to_delegated_sub_agents() {
use crate::agent::guardrail::Guardrail;
use crate::llm::types::CompletionRequest;
struct MarkerGuardrail;
impl Guardrail for MarkerGuardrail {
fn pre_llm(
&self,
request: &mut CompletionRequest,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), crate::error::Error>> + Send + '_>,
> {
request.system = format!("{} [ORCH_GUARD_ACTIVE]", request.system);
Box::pin(async { Ok(()) })
}
}
struct CapturingProvider {
responses: Mutex<Vec<CompletionResponse>>,
systems_seen: Mutex<Vec<String>>,
}
impl LlmProvider for CapturingProvider {
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, crate::error::Error> {
self.systems_seen
.lock()
.unwrap()
.push(request.system.clone());
let mut responses = self.responses.lock().unwrap();
if responses.is_empty() {
return Err(crate::error::Error::Agent("no more responses".into()));
}
Ok(responses.remove(0))
}
}
let guardrail: Arc<dyn Guardrail> = Arc::new(MarkerGuardrail);
let provider = Arc::new(CapturingProvider {
responses: Mutex::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "worker", "task": "do work"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Work done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Synthesized.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]),
systems_seen: Mutex::new(vec![]),
});
let mut orch = Orchestrator::builder(provider.clone())
.guardrail(guardrail)
.sub_agent_full(SubAgentConfig {
name: "worker".into(),
description: "Worker agent".into(),
system_prompt: "You work.".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider: None,
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build()
.unwrap();
orch.run("do work").await.unwrap();
let systems = provider.systems_seen.lock().unwrap();
assert!(systems.len() >= 2, "expected at least 2 LLM calls");
assert!(
systems[1].contains("[ORCH_GUARD_ACTIVE]"),
"sub-agent system prompt should contain orchestrator guardrail marker; got: {}",
systems[1]
);
}
#[test]
fn build_rejects_sub_agent_with_zero_max_turns() {
let provider = Arc::new(MockProvider::new(vec![]));
let result = Orchestrator::builder(provider)
.sub_agent_full(SubAgentConfig {
name: "agent1".into(),
description: "Test agent".into(),
system_prompt: "prompt".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: Some(0),
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider: None,
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build();
match result {
Err(e) => assert!(
e.to_string().contains("max_turns must be > 0"),
"expected max_turns error, got: {e}"
),
Ok(_) => panic!("expected build to fail with zero max_turns"),
}
}
#[test]
fn build_rejects_sub_agent_with_zero_max_tokens() {
let provider = Arc::new(MockProvider::new(vec![]));
let result = Orchestrator::builder(provider)
.sub_agent_full(SubAgentConfig {
name: "agent1".into(),
description: "Test agent".into(),
system_prompt: "prompt".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: Some(0),
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider: None,
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build();
match result {
Err(e) => assert!(
e.to_string().contains("max_tokens must be > 0"),
"expected max_tokens error, got: {e}"
),
Ok(_) => panic!("expected build to fail with zero max_tokens"),
}
}
#[tokio::test]
async fn sub_agent_uses_override_provider() {
use crate::llm::types::CompletionRequest;
struct IdentifiedProvider {
id: String,
responses: Mutex<Vec<CompletionResponse>>,
}
impl LlmProvider for IdentifiedProvider {
async fn complete(
&self,
_request: CompletionRequest,
) -> Result<CompletionResponse, Error> {
let mut responses = self.responses.lock().expect("lock");
if responses.is_empty() {
return Err(Error::Agent(format!("no more responses for {}", self.id)));
}
Ok(responses.remove(0))
}
}
let opus_provider = Arc::new(IdentifiedProvider {
id: "opus".into(),
responses: Mutex::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "cheap", "task": "do cheap work"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]),
});
let haiku_provider: Arc<BoxedProvider> = Arc::new(BoxedProvider::new(IdentifiedProvider {
id: "haiku".into(),
responses: Mutex::new(vec![
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Cheap work done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 5,
output_tokens: 3,
..Default::default()
},
model: None,
},
]),
}));
let mut orch = Orchestrator::builder(opus_provider)
.sub_agent_full(SubAgentConfig {
name: "cheap".into(),
description: "Cheap agent".into(),
system_prompt: "You do cheap work.".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider: Some(haiku_provider),
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build()
.unwrap();
let output = orch.run("do work cheaply").await.unwrap();
assert_eq!(output.result, "Done.");
assert_eq!(output.tokens_used.input_tokens, 5);
}
#[tokio::test]
async fn sub_agent_inherits_default_provider() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "worker", "task": "do work"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Work done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "All done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent_full(SubAgentConfig {
name: "worker".into(),
description: "Worker".into(),
system_prompt: "Work.".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider: None,
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build()
.unwrap();
let output = orch.run("do work").await.unwrap();
assert_eq!(output.result, "All done.");
}
#[test]
fn form_squad_tool_definition_schema() {
let tools = vec!["web_search".to_string()];
let agents: Vec<(&str, &str, &[String])> = vec![
("researcher", "Research specialist", tools.as_slice()),
("analyst", "Analysis expert", &[]),
];
let def = build_form_squad_tool_schema(&agents);
assert_eq!(def.name, "form_squad");
assert!(
def.description.contains("researcher"),
"description should list agents: {}",
def.description
);
assert!(
def.description.contains("analyst"),
"description should list agents: {}",
def.description
);
assert!(
def.description.contains("blackboard"),
"description should mention shared blackboard: {}",
def.description
);
assert!(
def.description.contains("Unlike delegate_task"),
"description should contrast with delegate_task: {}",
def.description
);
assert_eq!(
def.input_schema["properties"]["tasks"]["type"], "array",
"schema should have tasks array"
);
assert_eq!(
def.input_schema["properties"]["tasks"]["items"]["properties"]["agent"]["type"],
"string",
"tasks items should have agent field"
);
assert_eq!(
def.input_schema["properties"]["tasks"]["items"]["properties"]["task"]["type"],
"string",
"tasks items should have task field"
);
let required = def.input_schema["required"]
.as_array()
.expect("required should be array");
assert!(
required.contains(&json!("tasks")),
"tasks should be required"
);
}
#[tokio::test]
async fn form_squad_dispatches_directly() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "researcher", "task": "Research Rust"},
{"agent": "analyst", "task": "Analyze findings"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 50,
output_tokens: 20,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Rust is fast and safe.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 10,
output_tokens: 8,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Strengths: memory safety.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 12,
output_tokens: 10,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Final: Rust is excellent.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 60,
output_tokens: 25,
..Default::default()
},
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research specialist", "You research.")
.sub_agent("analyst", "Analysis expert", "You analyze.")
.sub_agent("coder", "Coding expert", "You code.")
.build()
.unwrap();
let output = orch.run("Analyze Rust deeply").await.unwrap();
assert_eq!(output.result, "Final: Rust is excellent.");
}
#[tokio::test]
async fn form_squad_tokens_roll_up() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "agent_a", "task": "Task A"},
{"agent": "agent_b", "task": "Task B"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 50,
output_tokens: 20,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Done A.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 10,
output_tokens: 5,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Done B.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 12,
output_tokens: 6,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "All done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 60,
output_tokens: 25,
..Default::default()
},
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("agent_a", "Agent A", "You are A.")
.sub_agent("agent_b", "Agent B", "You are B.")
.build()
.unwrap();
let output = orch.run("Collaborate").await.unwrap();
assert_eq!(
output.tokens_used.input_tokens,
50 + 60 + 10 + 12,
"all token levels should roll up"
);
assert_eq!(
output.tokens_used.output_tokens,
20 + 25 + 5 + 6,
"all token levels should roll up"
);
}
#[tokio::test]
async fn form_squad_returns_error_for_unknown_agent() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "researcher", "task": "Do research"},
{"agent": "nonexistent", "task": "Do stuff"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "No such agent available.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.sub_agent("analyst", "Analysis", "prompt")
.build()
.unwrap();
let output = orch.run("delegate to unknown squad").await.unwrap();
assert_eq!(output.result, "No such agent available.");
}
#[tokio::test]
async fn form_squad_requires_at_least_two_agents() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "researcher", "task": "Solo task"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Using delegate_task instead.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.sub_agent("analyst", "Analysis", "prompt")
.build()
.unwrap();
let output = orch.run("form solo squad").await.unwrap();
assert_eq!(output.result, "Using delegate_task instead.");
}
#[tokio::test]
async fn form_squad_rejects_duplicate_agents() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "researcher", "task": "Task 1"},
{"agent": "researcher", "task": "Task 2"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Fixed duplicate issue.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.sub_agent("analyst", "Analysis", "prompt")
.build()
.unwrap();
let output = orch.run("form squad with dupes").await.unwrap();
assert_eq!(output.result, "Fixed duplicate issue.");
}
#[tokio::test]
async fn form_squad_private_blackboard() {
use crate::agent::blackboard::InMemoryBlackboard;
let outer_bb = Arc::new(InMemoryBlackboard::new());
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "writer_a", "task": "Write something"},
{"agent": "writer_b", "task": "Write something else"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Written to squad blackboard.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Also written.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("writer_a", "Writer A", "You write.")
.sub_agent("writer_b", "Writer B", "You write.")
.blackboard(outer_bb.clone())
.build()
.unwrap();
orch.run("write to blackboard").await.unwrap();
let squad_key = "squad:writer_a+writer_b";
let val = outer_bb.read(squad_key).await.unwrap();
assert!(
val.is_some(),
"outer blackboard should have squad result under '{squad_key}'"
);
let agent_key = "agent:writer_a";
let val = outer_bb.read(agent_key).await.unwrap();
assert!(
val.is_none(),
"outer blackboard should NOT have '{agent_key}' — that's on the private blackboard"
);
}
#[tokio::test]
async fn form_squad_error_returns_tool_error_not_hard_error() {
let failing_provider = Arc::new(BoxedProvider::new(MockProvider::new(vec![])));
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "agent_a", "task": "Do A"},
{"agent": "agent_b", "task": "Do B"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 50,
output_tokens: 20,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Done A.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 10,
output_tokens: 5,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Squad failed, falling back.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 60,
output_tokens: 25,
..Default::default()
},
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.sub_agent("agent_a", "Agent A", "You are A.")
.sub_agent_full(SubAgentConfig {
name: "agent_b".into(),
description: "Agent B".into(),
system_prompt: "You are B.".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider: Some(failing_provider),
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build()
.unwrap();
let output = orch.run("complex task").await.unwrap();
assert_eq!(output.result, "Squad failed, falling back.");
assert!(
output.tokens_used.input_tokens > 50 + 60,
"should include partial squad tokens: {}",
output.tokens_used.input_tokens
);
}
#[tokio::test]
async fn orchestrator_registers_both_tools() {
use crate::llm::types::CompletionRequest;
struct ToolCapturingProvider {
responses: Mutex<Vec<CompletionResponse>>,
tool_names_seen: Mutex<Vec<Vec<String>>>,
}
impl LlmProvider for ToolCapturingProvider {
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, Error> {
let names: Vec<String> = request.tools.iter().map(|t| t.name.clone()).collect();
self.tool_names_seen.lock().expect("lock").push(names);
let mut responses = self.responses.lock().expect("lock");
if responses.is_empty() {
return Err(Error::Agent("no more responses".into()));
}
Ok(responses.remove(0))
}
}
let provider = Arc::new(ToolCapturingProvider {
responses: Mutex::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "Direct answer.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
}]),
tool_names_seen: Mutex::new(vec![]),
});
let mut orch = Orchestrator::builder(provider.clone())
.sub_agent("researcher", "Research", "prompt")
.sub_agent("analyst", "Analysis", "prompt")
.build()
.unwrap();
orch.run("test").await.unwrap();
let tool_names = provider.tool_names_seen.lock().unwrap();
assert!(
tool_names[0].contains(&"delegate_task".to_string()),
"should have delegate_task: {:?}",
tool_names[0]
);
assert!(
tool_names[0].contains(&"form_squad".to_string()),
"should have form_squad: {:?}",
tool_names[0]
);
}
#[test]
fn orchestrator_single_agent_no_squads() {
let provider = Arc::new(MockProvider::new(vec![]));
let result = Orchestrator::builder(provider)
.sub_agent("researcher", "Research", "prompt")
.build();
assert!(result.is_ok());
}
#[tokio::test]
async fn orchestrator_squads_disabled_explicitly() {
use crate::llm::types::CompletionRequest;
struct ToolCapturingProvider {
responses: Mutex<Vec<CompletionResponse>>,
tool_names_seen: Mutex<Vec<Vec<String>>>,
}
impl LlmProvider for ToolCapturingProvider {
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, Error> {
let names: Vec<String> = request.tools.iter().map(|t| t.name.clone()).collect();
self.tool_names_seen.lock().expect("lock").push(names);
let mut responses = self.responses.lock().expect("lock");
if responses.is_empty() {
return Err(Error::Agent("no more responses".into()));
}
Ok(responses.remove(0))
}
}
let provider = Arc::new(ToolCapturingProvider {
responses: Mutex::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "Direct answer.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
}]),
tool_names_seen: Mutex::new(vec![]),
});
let mut orch = Orchestrator::builder(provider.clone())
.sub_agent("researcher", "Research", "prompt")
.sub_agent("analyst", "Analysis", "prompt")
.enable_squads(false)
.build()
.unwrap();
orch.run("test").await.unwrap();
let tool_names = provider.tool_names_seen.lock().unwrap();
assert!(
tool_names[0].contains(&"delegate_task".to_string()),
"should have delegate_task: {:?}",
tool_names[0]
);
assert!(
!tool_names[0].contains(&"form_squad".to_string()),
"should NOT have form_squad when disabled: {:?}",
tool_names[0]
);
}
#[test]
fn system_prompt_mentions_both_tools_when_squads_enabled() {
let tools = vec!["web_search".to_string()];
let agents: Vec<(&str, &str, &[String])> = vec![
("researcher", "Research specialist", tools.as_slice()),
("analyst", "Analysis expert", &[]),
];
let prompt = build_system_prompt(&agents, true, DispatchMode::Parallel);
assert!(
prompt.contains("delegate_task"),
"prompt should mention delegate_task: {prompt}"
);
assert!(
prompt.contains("form_squad"),
"prompt should mention form_squad: {prompt}"
);
assert!(
prompt.contains("two delegation tools"),
"prompt should explain both tools: {prompt}"
);
assert!(
prompt.contains("isolation"),
"prompt should mention isolation for delegate_task: {prompt}"
);
assert!(
prompt.contains("blackboard"),
"prompt should mention shared blackboard for form_squad: {prompt}"
);
}
#[test]
fn system_prompt_only_delegate_when_squads_disabled() {
let agents: Vec<(&str, &str, &[String])> = vec![("researcher", "Research specialist", &[])];
let prompt = build_system_prompt(&agents, false, DispatchMode::Parallel);
assert!(
prompt.contains("delegate_task"),
"prompt should mention delegate_task: {prompt}"
);
assert!(
!prompt.contains("form_squad"),
"prompt should NOT mention form_squad: {prompt}"
);
assert!(
prompt.contains("Decision Process"),
"prompt should contain Decision Process even without squads: {prompt}"
);
assert!(
prompt.contains("Effort Scaling"),
"prompt should contain Effort Scaling even without squads: {prompt}"
);
}
#[tokio::test]
async fn delegate_forwards_on_event_to_sub_agents() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [
{"agent": "worker", "task": "do work"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "All done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
let events_clone = events.clone();
let on_event: Arc<OnEvent> = Arc::new(move |event: AgentEvent| {
events_clone.lock().expect("test lock").push(event);
});
let mut orch = Orchestrator::builder(provider)
.sub_agent("worker", "Worker agent", "You do work.")
.on_event(on_event)
.build()
.unwrap();
let _output = orch.run("delegate some work").await.unwrap();
let events = events.lock().expect("test lock");
let orchestrator_events: Vec<_> = events
.iter()
.filter(|e| match e {
AgentEvent::RunStarted { agent, .. }
| AgentEvent::TurnStarted { agent, .. }
| AgentEvent::LlmResponse { agent, .. }
| AgentEvent::RunCompleted { agent, .. } => agent == "orchestrator",
_ => false,
})
.collect();
let worker_events: Vec<_> = events
.iter()
.filter(|e| match e {
AgentEvent::RunStarted { agent, .. }
| AgentEvent::TurnStarted { agent, .. }
| AgentEvent::LlmResponse { agent, .. }
| AgentEvent::RunCompleted { agent, .. } => agent == "worker",
_ => false,
})
.collect();
assert!(
!orchestrator_events.is_empty(),
"should have orchestrator events"
);
assert!(
!worker_events.is_empty(),
"should have sub-agent worker events (forwarded via on_event)"
);
let worker_run_started = events
.iter()
.any(|e| matches!(e, AgentEvent::RunStarted { agent, .. } if agent == "worker"));
assert!(
worker_run_started,
"sub-agent should emit RunStarted via forwarded on_event"
);
}
#[tokio::test]
async fn full_audit_trail_end_to_end() {
let long_output = "x".repeat(70_000);
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![
ContentBlock::Text {
text: "I'll delegate to the researcher and coder.".into(),
},
ContentBlock::ToolUse {
id: "orch-call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [
{"agent": "researcher", "task": "Search for Rust concurrency patterns"},
{"agent": "coder", "task": "Read the main.rs file"}
]
}),
},
],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 100,
output_tokens: 40,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![
ContentBlock::Text {
text: "Let me search for Rust concurrency info.".into(),
},
ContentBlock::ToolUse {
id: "res-call-1".into(),
name: "web_search".into(),
input: json!({"query": "rust async concurrency"}),
},
],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 20,
output_tokens: 10,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Rust uses async/await with tokio for concurrency.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 30,
output_tokens: 15,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![
ContentBlock::Text {
text: "I'll read the main.rs file.".into(),
},
ContentBlock::ToolUse {
id: "cod-call-1".into(),
name: "read_file".into(),
input: json!({"path": "/src/main.rs"}),
},
],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 15,
output_tokens: 8,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "The main.rs contains the entry point.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 25,
output_tokens: 12,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Combined analysis: Rust async is great for concurrency.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 200,
output_tokens: 50,
..Default::default()
},
model: None,
},
]));
let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
let events_clone = events.clone();
let on_event: Arc<OnEvent> = Arc::new(move |event: AgentEvent| {
events_clone.lock().expect("test lock").push(event);
});
let long_output_clone = long_output.clone();
let shared_tools: Vec<Arc<dyn Tool>> = vec![
Arc::new(MockTool::new("web_search", &long_output_clone)),
Arc::new(MockTool::new(
"read_file",
"fn main() { println!(\"hello\"); }",
)),
];
let mut orch = Orchestrator::builder(provider)
.sub_agent_with_tools(
"researcher",
"Research specialist",
"You research topics.",
shared_tools.clone(),
)
.sub_agent_with_tools(
"coder",
"Code expert",
"You read and analyze code.",
shared_tools.clone(),
)
.on_event(on_event)
.build()
.unwrap();
let output = orch
.run("Analyze Rust concurrency and the main.rs file")
.await
.unwrap();
assert_eq!(
output.result,
"Combined analysis: Rust async is great for concurrency."
);
assert_eq!(output.tool_calls_made, 1);
let events = events.lock().expect("test lock");
fn agent_of(e: &AgentEvent) -> &str {
match e {
AgentEvent::RunStarted { agent, .. }
| AgentEvent::TurnStarted { agent, .. }
| AgentEvent::LlmResponse { agent, .. }
| AgentEvent::ToolCallStarted { agent, .. }
| AgentEvent::ToolCallCompleted { agent, .. }
| AgentEvent::RunCompleted { agent, .. }
| AgentEvent::RunFailed { agent, .. }
| AgentEvent::SubAgentsDispatched { agent, .. }
| AgentEvent::SubAgentCompleted { agent, .. }
| AgentEvent::ApprovalRequested { agent, .. }
| AgentEvent::ApprovalDecision { agent, .. }
| AgentEvent::ContextSummarized { agent, .. }
| AgentEvent::GuardrailDenied { agent, .. }
| AgentEvent::GuardrailWarned { agent, .. }
| AgentEvent::RetryAttempt { agent, .. }
| AgentEvent::DoomLoopDetected { agent, .. }
| AgentEvent::FuzzyDoomLoopDetected { agent, .. }
| AgentEvent::AutoCompactionTriggered { agent, .. }
| AgentEvent::SessionPruned { agent, .. }
| AgentEvent::ModelEscalated { agent, .. }
| AgentEvent::BudgetExceeded { agent, .. }
| AgentEvent::AgentSpawned { agent, .. }
| AgentEvent::KillSwitchActivated { agent, .. }
| AgentEvent::ToolNameRepaired { agent, .. } => agent,
AgentEvent::SensorEventProcessed { sensor_name, .. } => sensor_name,
AgentEvent::StoryUpdated { story_id, .. } => story_id,
AgentEvent::TaskRouted { decision, .. } => decision,
AgentEvent::WorkflowNodeStarted { node, .. }
| AgentEvent::WorkflowNodeCompleted { node, .. }
| AgentEvent::WorkflowNodeFailed { node, .. } => node,
}
}
let event_summary: Vec<String> = events
.iter()
.enumerate()
.map(|(i, e)| format!("{i}: [{:>12}] {:?}", agent_of(e), std::mem::discriminant(e)))
.collect();
let agents_seen: std::collections::HashSet<&str> = events.iter().map(agent_of).collect();
assert!(
agents_seen.contains("orchestrator"),
"missing orchestrator events.\nEvent stream:\n{}",
event_summary.join("\n")
);
assert!(
agents_seen.contains("researcher"),
"missing researcher events (should be forwarded).\nEvent stream:\n{}",
event_summary.join("\n")
);
assert!(
agents_seen.contains("coder"),
"missing coder events (should be forwarded).\nEvent stream:\n{}",
event_summary.join("\n")
);
let orch_events: Vec<&AgentEvent> = events
.iter()
.filter(|e| agent_of(e) == "orchestrator")
.collect();
assert!(
matches!(orch_events[0], AgentEvent::RunStarted { task, .. } if task.contains("Analyze Rust")),
"first orch event should be RunStarted, got: {:?}",
orch_events[0]
);
assert!(
matches!(orch_events.last().unwrap(), AgentEvent::RunCompleted { .. }),
"last orch event should be RunCompleted, got: {:?}",
orch_events.last().unwrap()
);
let llm_responses: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::LlmResponse { .. }))
.collect();
assert!(
llm_responses.len() >= 3,
"expected >= 3 LlmResponse events (1 orch + at least 1 per sub-agent), got {}.\nEvents:\n{}",
llm_responses.len(),
event_summary.join("\n")
);
for llm_event in &llm_responses {
match llm_event {
AgentEvent::LlmResponse {
agent, text, model, ..
} => {
assert_eq!(
model.as_deref(),
Some("mock-model-v1"),
"LlmResponse for '{agent}' should have model name"
);
assert!(
!text.is_empty(),
"LlmResponse for '{agent}' should have non-empty text"
);
}
_ => unreachable!(),
}
}
let tool_started: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::ToolCallStarted { .. }))
.collect();
assert!(
tool_started.len() >= 3,
"expected >= 3 ToolCallStarted events, got {}.\nEvents:\n{}",
tool_started.len(),
event_summary.join("\n")
);
let web_search_started = tool_started.iter().find(|e| {
matches!(e, AgentEvent::ToolCallStarted { tool_name, .. } if tool_name == "web_search")
});
assert!(
web_search_started.is_some(),
"should have a web_search ToolCallStarted"
);
match web_search_started.unwrap() {
AgentEvent::ToolCallStarted { input, .. } => {
assert!(
input.contains("rust async concurrency"),
"web_search input should contain query, got: {input}"
);
}
_ => unreachable!(),
}
let read_file_started = tool_started.iter().find(|e| {
matches!(e, AgentEvent::ToolCallStarted { tool_name, .. } if tool_name == "read_file")
});
assert!(
read_file_started.is_some(),
"should have a read_file ToolCallStarted"
);
match read_file_started.unwrap() {
AgentEvent::ToolCallStarted { input, .. } => {
assert!(
input.contains("/src/main.rs"),
"read_file input should contain path, got: {input}"
);
}
_ => unreachable!(),
}
let delegate_started = tool_started.iter().find(|e| {
matches!(e, AgentEvent::ToolCallStarted { tool_name, .. } if tool_name == "delegate_task")
});
assert!(
delegate_started.is_some(),
"should have a delegate_task ToolCallStarted"
);
match delegate_started.unwrap() {
AgentEvent::ToolCallStarted { agent, input, .. } => {
assert_eq!(agent, "orchestrator");
assert!(
input.contains("researcher"),
"delegate_task input should contain agent names, got: {input}"
);
}
_ => unreachable!(),
}
let tool_completed: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::ToolCallCompleted { .. }))
.collect();
assert!(
tool_completed.len() >= 3,
"expected >= 3 ToolCallCompleted events, got {}",
tool_completed.len()
);
let web_search_completed = tool_completed.iter().find(|e| {
matches!(e, AgentEvent::ToolCallCompleted { tool_name, .. } if tool_name == "web_search")
});
assert!(
web_search_completed.is_some(),
"should have a web_search ToolCallCompleted"
);
match web_search_completed.unwrap() {
AgentEvent::ToolCallCompleted {
output, is_error, ..
} => {
assert!(!is_error);
assert!(
output.contains("[truncated:"),
"web_search output (70000 bytes) should be truncated in event, got {} bytes: {}",
output.len(),
&output[..output.len().min(100)]
);
}
_ => unreachable!(),
}
let read_file_completed = tool_completed.iter().find(|e| {
matches!(e, AgentEvent::ToolCallCompleted { tool_name, .. } if tool_name == "read_file")
});
assert!(
read_file_completed.is_some(),
"should have a read_file ToolCallCompleted"
);
match read_file_completed.unwrap() {
AgentEvent::ToolCallCompleted {
output, is_error, ..
} => {
assert!(!is_error);
assert!(
output.contains("fn main()"),
"read_file output should contain file content, got: {output}"
);
assert!(
!output.contains("[truncated:"),
"read_file output should NOT be truncated"
);
}
_ => unreachable!(),
}
let dispatched: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::SubAgentsDispatched { .. }))
.collect();
assert_eq!(dispatched.len(), 1, "expected 1 SubAgentsDispatched event");
match dispatched[0] {
AgentEvent::SubAgentsDispatched { agents, .. } => {
assert!(
agents.contains(&"researcher".to_string()),
"dispatched agents should include researcher"
);
assert!(
agents.contains(&"coder".to_string()),
"dispatched agents should include coder"
);
}
_ => unreachable!(),
}
let completed: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::SubAgentCompleted { .. }))
.collect();
assert_eq!(
completed.len(),
2,
"expected 2 SubAgentCompleted events (one per sub-agent)"
);
for c in &completed {
match c {
AgentEvent::SubAgentCompleted { success, agent, .. } => {
assert!(success, "sub-agent '{agent}' should succeed");
}
_ => unreachable!(),
}
}
for agent_name in &["orchestrator", "researcher", "coder"] {
let agent_events: Vec<&AgentEvent> = events
.iter()
.filter(|e| agent_of(e) == *agent_name)
.collect();
if !agent_events.is_empty() {
assert!(
matches!(agent_events[0], AgentEvent::RunStarted { .. }),
"first event for '{agent_name}' should be RunStarted, got: {:?}",
agent_events[0]
);
}
}
assert_eq!(
output.tokens_used.input_tokens,
100 + 200 + 20 + 30 + 15 + 25,
"total input tokens should include orchestrator + sub-agents"
);
assert_eq!(
output.tokens_used.output_tokens,
40 + 50 + 10 + 15 + 8 + 12,
"total output tokens should include orchestrator + sub-agents"
);
let sub_agent_llm = llm_responses.iter().find(|e| {
matches!(e, AgentEvent::LlmResponse { text, .. }
if text.contains("async/await"))
});
assert!(
sub_agent_llm.is_some(),
"should have a sub-agent LlmResponse with text about async/await"
);
assert!(
events.len() >= 20,
"expected at least 20 events for full audit trail, got {}.\nEvents:\n{}",
events.len(),
event_summary.join("\n")
);
}
#[tokio::test]
async fn sub_agent_run_timeout_fires_when_configured() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "slow-agent", "task": "do something"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
]));
struct SlowProvider;
impl LlmProvider for SlowProvider {
async fn complete(
&self,
_request: CompletionRequest,
) -> Result<CompletionResponse, Error> {
tokio::time::sleep(Duration::from_secs(3600)).await;
unreachable!()
}
}
let slow_provider = Arc::new(BoxedProvider::new(SlowProvider));
let mut orch = Orchestrator::builder(provider)
.sub_agent_full(SubAgentConfig {
name: "slow-agent".into(),
description: "A slow agent".into(),
system_prompt: "sys".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: Some(Duration::from_millis(100)),
guardrails: vec![],
provider: Some(slow_provider),
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.build()
.unwrap();
let result = orch.run("go").await;
match result {
Ok(output) => {
assert!(
output.result.contains("timeout") || output.result.contains("Timeout"),
"expected timeout in result, got: {}",
output.result
);
}
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("no more mock responses")
|| msg.contains("timeout")
|| msg.contains("Timeout"),
"expected timeout-related error, got: {msg}"
);
}
}
}
#[tokio::test]
async fn form_squad_complex_with_tools_events_and_failure() {
use crate::agent::blackboard::InMemoryBlackboard;
let outer_bb = Arc::new(InMemoryBlackboard::new());
let failing_provider = Arc::new(BoxedProvider::new(MockProvider::new(vec![])));
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "orch-call-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "planner", "task": "Create a plan for the analysis"},
{"agent": "worker", "task": "Compute the metrics"},
{"agent": "reviewer", "task": "Review all findings"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 100,
output_tokens: 40,
cache_creation_input_tokens: 5,
cache_read_input_tokens: 3,
reasoning_tokens: 0,
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Plan: Step 1 gather data, Step 2 compute, Step 3 review.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 20,
output_tokens: 15,
reasoning_tokens: 8,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![
ContentBlock::Text {
text: "I'll compute the metrics now.".into(),
},
ContentBlock::ToolUse {
id: "worker-call-1".into(),
name: "compute".into(),
input: json!({"expression": "42 * 17"}),
},
],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 25,
output_tokens: 12,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Computation result: 714. Analysis complete.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 35,
output_tokens: 18,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Squad partial success: plan and computation done, review failed.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 200,
output_tokens: 60,
..Default::default()
},
model: None,
},
]));
let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
let events_clone = events.clone();
let on_event: Arc<OnEvent> = Arc::new(move |event: AgentEvent| {
events_clone.lock().expect("test lock").push(event);
});
let compute_tool: Arc<dyn Tool> = Arc::new(MockTool::new("compute", "714"));
let mut orch = Orchestrator::builder(provider)
.sub_agent_with_tools(
"planner",
"Planning specialist",
"You create plans.",
vec![compute_tool.clone()],
)
.sub_agent_with_tools(
"worker",
"Computation worker",
"You compute metrics.",
vec![compute_tool.clone()],
)
.sub_agent_full(SubAgentConfig {
name: "reviewer".into(),
description: "Review specialist".into(),
system_prompt: "You review findings.".into(),
tools: vec![],
context_strategy: None,
summarize_threshold: None,
tool_timeout: None,
max_tool_output_bytes: None,
max_turns: None,
max_tokens: None,
response_schema: None,
run_timeout: None,
guardrails: vec![],
provider: Some(failing_provider),
reasoning_effort: None,
enable_reflection: None,
tool_output_compression_threshold: None,
max_tools_per_turn: None,
tool_profile: None,
max_identical_tool_calls: None,
max_fuzzy_identical_tool_calls: None,
max_tool_calls_per_turn: None,
session_prune_config: None,
enable_recursive_summarization: None,
reflection_threshold: None,
consolidate_on_exit: None,
workspace: None,
max_total_tokens: None,
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: Vec::new(),
})
.blackboard(outer_bb.clone())
.on_event(on_event)
.build()
.unwrap();
let output = orch.run("Analyze the system performance").await.unwrap();
assert_eq!(
output.result,
"Squad partial success: plan and computation done, review failed."
);
let expected_input = 100 + 200 + 20 + 25 + 35;
let expected_output = 40 + 60 + 15 + 12 + 18;
assert_eq!(
output.tokens_used.input_tokens, expected_input,
"input tokens should sum orchestrator + planner + worker (reviewer failed)"
);
assert_eq!(
output.tokens_used.output_tokens, expected_output,
"output tokens should sum orchestrator + planner + worker"
);
assert_eq!(
output.tokens_used.reasoning_tokens, 8,
"reasoning tokens should come from planner"
);
assert_eq!(
output.tokens_used.cache_creation_input_tokens, 5,
"cache creation tokens from orchestrator"
);
assert_eq!(
output.tokens_used.cache_read_input_tokens, 3,
"cache read tokens from orchestrator"
);
let squad_key = "squad:planner+worker+reviewer";
let squad_val = outer_bb.read(squad_key).await.unwrap();
assert!(
squad_val.is_some(),
"outer blackboard should have squad result under '{squad_key}'"
);
let squad_text = squad_val.unwrap().to_string();
assert!(
squad_text.contains("Plan: Step 1"),
"squad result should include planner's output"
);
assert!(
squad_text.contains("Computation result: 714"),
"squad result should include worker's output"
);
assert!(
squad_text.contains("Error"),
"squad result should include reviewer's error"
);
assert!(
outer_bb.read("agent:planner").await.unwrap().is_none(),
"outer blackboard should NOT have agent:planner"
);
assert!(
outer_bb.read("agent:worker").await.unwrap().is_none(),
"outer blackboard should NOT have agent:worker"
);
let events = events.lock().expect("test lock");
fn agent_of(e: &AgentEvent) -> &str {
match e {
AgentEvent::RunStarted { agent, .. }
| AgentEvent::TurnStarted { agent, .. }
| AgentEvent::LlmResponse { agent, .. }
| AgentEvent::ToolCallStarted { agent, .. }
| AgentEvent::ToolCallCompleted { agent, .. }
| AgentEvent::RunCompleted { agent, .. }
| AgentEvent::RunFailed { agent, .. }
| AgentEvent::SubAgentsDispatched { agent, .. }
| AgentEvent::SubAgentCompleted { agent, .. }
| AgentEvent::ApprovalRequested { agent, .. }
| AgentEvent::ApprovalDecision { agent, .. }
| AgentEvent::ContextSummarized { agent, .. }
| AgentEvent::GuardrailDenied { agent, .. }
| AgentEvent::GuardrailWarned { agent, .. }
| AgentEvent::RetryAttempt { agent, .. }
| AgentEvent::DoomLoopDetected { agent, .. }
| AgentEvent::FuzzyDoomLoopDetected { agent, .. }
| AgentEvent::AutoCompactionTriggered { agent, .. }
| AgentEvent::SessionPruned { agent, .. }
| AgentEvent::ModelEscalated { agent, .. }
| AgentEvent::BudgetExceeded { agent, .. }
| AgentEvent::AgentSpawned { agent, .. }
| AgentEvent::KillSwitchActivated { agent, .. }
| AgentEvent::ToolNameRepaired { agent, .. } => agent,
AgentEvent::SensorEventProcessed { sensor_name, .. } => sensor_name,
AgentEvent::StoryUpdated { story_id, .. } => story_id,
AgentEvent::TaskRouted { decision, .. } => decision,
AgentEvent::WorkflowNodeStarted { node, .. }
| AgentEvent::WorkflowNodeCompleted { node, .. }
| AgentEvent::WorkflowNodeFailed { node, .. } => node,
}
}
fn event_type(e: &AgentEvent) -> &'static str {
match e {
AgentEvent::RunStarted { .. } => "RunStarted",
AgentEvent::TurnStarted { .. } => "TurnStarted",
AgentEvent::LlmResponse { .. } => "LlmResponse",
AgentEvent::ToolCallStarted { .. } => "ToolCallStarted",
AgentEvent::ToolCallCompleted { .. } => "ToolCallCompleted",
AgentEvent::RunCompleted { .. } => "RunCompleted",
AgentEvent::RunFailed { .. } => "RunFailed",
AgentEvent::SubAgentsDispatched { .. } => "SubAgentsDispatched",
AgentEvent::SubAgentCompleted { .. } => "SubAgentCompleted",
AgentEvent::ApprovalRequested { .. } => "ApprovalRequested",
AgentEvent::ApprovalDecision { .. } => "ApprovalDecision",
AgentEvent::ContextSummarized { .. } => "ContextSummarized",
AgentEvent::GuardrailDenied { .. } => "GuardrailDenied",
AgentEvent::GuardrailWarned { .. } => "GuardrailWarned",
AgentEvent::RetryAttempt { .. } => "RetryAttempt",
AgentEvent::DoomLoopDetected { .. } => "DoomLoopDetected",
AgentEvent::FuzzyDoomLoopDetected { .. } => "FuzzyDoomLoopDetected",
AgentEvent::AutoCompactionTriggered { .. } => "AutoCompactionTriggered",
AgentEvent::SessionPruned { .. } => "SessionPruned",
AgentEvent::SensorEventProcessed { .. } => "SensorEventProcessed",
AgentEvent::StoryUpdated { .. } => "StoryUpdated",
AgentEvent::TaskRouted { .. } => "TaskRouted",
AgentEvent::ModelEscalated { .. } => "ModelEscalated",
AgentEvent::BudgetExceeded { .. } => "BudgetExceeded",
AgentEvent::AgentSpawned { .. } => "AgentSpawned",
AgentEvent::KillSwitchActivated { .. } => "KillSwitchActivated",
AgentEvent::WorkflowNodeStarted { .. } => "WorkflowNodeStarted",
AgentEvent::WorkflowNodeCompleted { .. } => "WorkflowNodeCompleted",
AgentEvent::WorkflowNodeFailed { .. } => "WorkflowNodeFailed",
AgentEvent::ToolNameRepaired { .. } => "ToolNameRepaired",
}
}
let event_summary: Vec<String> = events
.iter()
.enumerate()
.map(|(i, e)| format!("{i}: [{:>12}] {}", agent_of(e), event_type(e)))
.collect();
let event_log = event_summary.join("\n");
let agents_seen: std::collections::HashSet<&str> = events.iter().map(agent_of).collect();
assert!(
agents_seen.contains("orchestrator"),
"missing orchestrator events.\n{event_log}"
);
let has_planner = agents_seen.contains("planner");
let has_worker = agents_seen.contains("worker");
assert!(
has_planner || has_worker,
"should have events from at least one successful squad member.\n{event_log}"
);
let dispatched: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::SubAgentsDispatched { .. }))
.collect();
assert_eq!(
dispatched.len(),
1,
"expected exactly 1 SubAgentsDispatched event.\n{event_log}"
);
match dispatched[0] {
AgentEvent::SubAgentsDispatched { agents, agent } => {
assert_eq!(
agent, "squad-leader",
"form_squad uses 'squad-leader' label"
);
assert_eq!(agents.len(), 3, "should dispatch 3 squad members");
assert!(agents.contains(&"planner".to_string()));
assert!(agents.contains(&"worker".to_string()));
assert!(agents.contains(&"reviewer".to_string()));
}
_ => unreachable!(),
}
let completed: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::SubAgentCompleted { .. }))
.collect();
assert_eq!(
completed.len(),
4,
"expected 4 SubAgentCompleted events (3 per-agent + 1 aggregate).\n{event_log}"
);
let per_agent: Vec<&AgentEvent> = completed
.iter()
.filter(|e| {
matches!(e, AgentEvent::SubAgentCompleted { agent, .. }
if !agent.starts_with("squad["))
})
.copied()
.collect();
assert_eq!(per_agent.len(), 3, "3 per-agent completion events");
let reviewer_completed = per_agent.iter().find(
|e| matches!(e, AgentEvent::SubAgentCompleted { agent, .. } if agent == "reviewer"),
);
assert!(
reviewer_completed.is_some(),
"should have reviewer SubAgentCompleted"
);
match reviewer_completed.unwrap() {
AgentEvent::SubAgentCompleted { success, .. } => {
assert!(!success, "reviewer should have failed");
}
_ => unreachable!(),
}
let planner_completed = per_agent.iter().find(
|e| matches!(e, AgentEvent::SubAgentCompleted { agent, .. } if agent == "planner"),
);
assert!(
planner_completed.is_some(),
"should have planner SubAgentCompleted"
);
match planner_completed.unwrap() {
AgentEvent::SubAgentCompleted { success, usage, .. } => {
assert!(success, "planner should have succeeded");
assert_eq!(usage.input_tokens, 20);
assert_eq!(usage.output_tokens, 15);
assert_eq!(usage.reasoning_tokens, 8);
}
_ => unreachable!(),
}
let squad_completed = completed.iter().find(|e| {
matches!(e, AgentEvent::SubAgentCompleted { agent, .. }
if agent.starts_with("squad["))
});
assert!(
squad_completed.is_some(),
"should have aggregate squad completion event.\n{event_log}"
);
match squad_completed.unwrap() {
AgentEvent::SubAgentCompleted {
agent,
success,
usage,
} => {
assert!(
agent.contains("planner")
&& agent.contains("worker")
&& agent.contains("reviewer"),
"aggregate label should list all agents: {agent}"
);
assert!(
!success,
"aggregate should be false because reviewer failed"
);
assert_eq!(usage.input_tokens, 20 + 25 + 35, "aggregate input tokens");
assert_eq!(usage.output_tokens, 15 + 12 + 18, "aggregate output tokens");
}
_ => unreachable!(),
}
let tool_started: Vec<&AgentEvent> = events
.iter()
.filter(|e| {
matches!(e, AgentEvent::ToolCallStarted { tool_name, .. } if tool_name == "compute")
})
.collect();
assert!(
!tool_started.is_empty(),
"should have at least one compute ToolCallStarted.\n{event_log}"
);
let tool_completed: Vec<&AgentEvent> = events
.iter()
.filter(|e| {
matches!(e, AgentEvent::ToolCallCompleted { tool_name, .. } if tool_name == "compute")
})
.collect();
assert!(
!tool_completed.is_empty(),
"should have at least one compute ToolCallCompleted.\n{event_log}"
);
match tool_completed[0] {
AgentEvent::ToolCallCompleted {
output, is_error, ..
} => {
assert!(!is_error, "compute tool should succeed");
assert!(
output.contains("714"),
"compute output should be '714', got: {output}"
);
}
_ => unreachable!(),
}
for agent_name in &["orchestrator", "planner", "worker"] {
let agent_events: Vec<&AgentEvent> = events
.iter()
.filter(|e| agent_of(e) == *agent_name)
.collect();
if !agent_events.is_empty() {
assert!(
matches!(agent_events[0], AgentEvent::RunStarted { .. }),
"first event for '{agent_name}' should be RunStarted, got: {:?}\n{event_log}",
agent_events[0]
);
}
}
let reviewer_events: Vec<&AgentEvent> = events
.iter()
.filter(|e| agent_of(e) == "reviewer")
.collect();
if !reviewer_events.is_empty() {
assert!(
matches!(reviewer_events[0], AgentEvent::RunStarted { .. }),
"reviewer first event should be RunStarted"
);
let has_failed = reviewer_events
.iter()
.any(|e| matches!(e, AgentEvent::RunFailed { .. }));
assert!(
has_failed,
"reviewer should have a RunFailed event.\n{event_log}"
);
}
let dispatch_idx = events
.iter()
.position(|e| matches!(e, AgentEvent::SubAgentsDispatched { .. }));
let first_completed_idx = events
.iter()
.position(|e| matches!(e, AgentEvent::SubAgentCompleted { .. }));
if let (Some(d), Some(c)) = (dispatch_idx, first_completed_idx) {
assert!(
d < c,
"SubAgentsDispatched (idx {d}) should precede SubAgentCompleted (idx {c})\n{event_log}"
);
}
let llm_responses: Vec<&AgentEvent> = events
.iter()
.filter(|e| matches!(e, AgentEvent::LlmResponse { .. }))
.collect();
assert!(
!llm_responses.is_empty(),
"should have LlmResponse events.\n{event_log}"
);
for lr in &llm_responses {
match lr {
AgentEvent::LlmResponse { model, .. } => {
assert_eq!(
model.as_deref(),
Some("mock-model-v1"),
"LlmResponse should carry provider model name"
);
}
_ => unreachable!(),
}
}
assert!(
events.len() >= 15,
"expected at least 15 events for complex squad test, got {}.\n{event_log}",
events.len(),
);
}
#[test]
fn build_rejects_empty_sub_agent_name() {
let provider = Arc::new(MockProvider::new(vec![]));
let result = Orchestrator::builder(provider)
.sub_agent("", "Empty name agent", "prompt")
.build();
match result {
Err(Error::Config(msg)) => {
assert!(
msg.contains("must not be empty"),
"expected empty name error, got: {msg}"
);
}
Err(other) => panic!("expected Config error, got: {other:?}"),
Ok(_) => panic!("expected error for empty sub-agent name"),
}
}
#[tokio::test]
async fn instruction_text_wired_to_orchestrator_system_prompt() {
struct CapturingProvider {
captured_systems: Mutex<Vec<String>>,
}
impl LlmProvider for CapturingProvider {
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, Error> {
self.captured_systems
.lock()
.expect("lock")
.push(request.system.clone());
Ok(CompletionResponse {
content: vec![ContentBlock::Text {
text: "Task complete.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
})
}
}
let provider = Arc::new(CapturingProvider {
captured_systems: Mutex::new(Vec::new()),
});
let mut orchestrator = Orchestrator::builder(provider.clone())
.sub_agent("agent-a", "Does things", "You are agent A.")
.instruction_text("Always verify your work.")
.build()
.unwrap();
let _output = orchestrator.run("test task").await.unwrap();
let systems = provider.captured_systems.lock().expect("lock").clone();
assert!(!systems.is_empty(), "should have at least one LLM call");
let orchestrator_system = &systems[0];
assert!(
orchestrator_system.contains("# Project Instructions"),
"orchestrator system prompt should contain instruction header"
);
assert!(
orchestrator_system.contains("Always verify your work."),
"orchestrator system prompt should contain instruction text"
);
}
#[tokio::test]
async fn permission_rules_propagate_to_sub_agents() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "orch-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "worker", "task": "run a bash command"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "worker-1".into(),
name: "bash".into(),
input: json!({"command": "echo hello"}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Bash was denied.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Worker reported bash was denied.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
let events_clone = events.clone();
let on_event: Arc<OnEvent> = Arc::new(move |event: AgentEvent| {
events_clone.lock().expect("test lock").push(event);
});
let deny_bash = crate::agent::permission::PermissionRuleset::new(vec![
crate::agent::permission::PermissionRule {
tool: "bash".into(),
pattern: "*".into(),
action: crate::agent::permission::PermissionAction::Deny,
},
]);
let bash_tool: Arc<dyn Tool> = Arc::new(MockTool::new("bash", "executed"));
let mut orch = Orchestrator::builder(provider)
.sub_agent_with_tools("worker", "Bash worker", "You run bash.", vec![bash_tool])
.permission_rules(deny_bash)
.on_event(on_event)
.build()
.unwrap();
let output = orch.run("run bash via worker").await.unwrap();
assert_eq!(output.result, "Worker reported bash was denied.");
let events = events.lock().expect("test lock");
let worker_tool_events: Vec<_> = events
.iter()
.filter(|e| {
matches!(
e,
AgentEvent::ToolCallStarted { agent, tool_name, .. }
| AgentEvent::ToolCallCompleted { agent, tool_name, .. }
if agent == "worker" && tool_name == "bash"
)
})
.collect();
assert!(
worker_tool_events.is_empty(),
"bash tool calls in worker should be denied (no events emitted), got: {worker_tool_events:?}"
);
}
#[tokio::test]
async fn permission_rules_propagate_to_squad_members() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "orch-1".into(),
name: "form_squad".into(),
input: json!({
"tasks": [
{"agent": "alpha", "task": "run bash"},
{"agent": "beta", "task": "say hello"}
]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "alpha-1".into(),
name: "bash".into(),
input: json!({"command": "ls"}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Bash denied.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Hello!".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Squad done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage::default(),
model: None,
},
]));
let deny_bash = crate::agent::permission::PermissionRuleset::new(vec![
crate::agent::permission::PermissionRule {
tool: "bash".into(),
pattern: "*".into(),
action: crate::agent::permission::PermissionAction::Deny,
},
]);
let bash_tool: Arc<dyn Tool> = Arc::new(MockTool::new("bash", "executed"));
let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
let events_clone = events.clone();
let on_event: Arc<OnEvent> = Arc::new(move |event: AgentEvent| {
events_clone.lock().expect("test lock").push(event);
});
let mut orch = Orchestrator::builder(provider)
.sub_agent_with_tools(
"alpha",
"Alpha agent",
"You run bash.",
vec![bash_tool.clone()],
)
.sub_agent("beta", "Beta agent", "You say hello.")
.permission_rules(deny_bash)
.on_event(on_event)
.build()
.unwrap();
let output = orch.run("form a squad").await.unwrap();
assert_eq!(output.result, "Squad done.");
let events = events.lock().expect("test lock");
let bash_events: Vec<_> = events
.iter()
.filter(|e| {
matches!(
e,
AgentEvent::ToolCallStarted { tool_name, .. }
| AgentEvent::ToolCallCompleted { tool_name, .. }
if tool_name == "bash"
)
})
.collect();
assert!(
bash_events.is_empty(),
"bash tool calls in squad should be denied (no events), got: {bash_events:?}"
);
}
#[test]
fn workspace_propagates_from_builder_to_sub_agents() {
let provider = Arc::new(MockProvider::new(vec![]));
let builder = Orchestrator::builder(provider)
.workspace("/shared/workspace")
.sub_agent_full(SubAgentConfig {
name: "agent1".into(),
description: "test".into(),
workspace: None, ..Default::default()
});
let agent = &builder.sub_agents[0];
assert_eq!(
agent.workspace.as_deref(),
Some(std::path::Path::new("/shared/workspace")),
"sub-agent should inherit workspace from builder"
);
}
#[test]
fn sub_agent_workspace_overrides_builder() {
let provider = Arc::new(MockProvider::new(vec![]));
let builder = Orchestrator::builder(provider)
.workspace("/shared/workspace")
.sub_agent_full(SubAgentConfig {
name: "agent1".into(),
description: "test".into(),
workspace: Some("/custom/workspace".into()),
..Default::default()
});
let agent = &builder.sub_agents[0];
assert_eq!(
agent.workspace.as_deref(),
Some(std::path::Path::new("/custom/workspace")),
"sub-agent should use its own workspace over builder's"
);
}
#[test]
fn no_workspace_when_builder_has_none() {
let provider = Arc::new(MockProvider::new(vec![]));
let builder = Orchestrator::builder(provider).sub_agent_full(SubAgentConfig {
name: "agent1".into(),
description: "test".into(),
workspace: None,
..Default::default()
});
let agent = &builder.sub_agents[0];
assert!(
agent.workspace.is_none(),
"sub-agent should have no workspace when builder has none"
);
}
#[test]
fn multi_agent_prompt_enabled_by_default() {
let provider = Arc::new(MockProvider::new(vec![]));
let builder = Orchestrator::builder(provider);
assert!(builder.multi_agent_prompt);
}
#[test]
fn multi_agent_prompt_can_be_disabled() {
let provider = Arc::new(MockProvider::new(vec![]));
let builder = Orchestrator::builder(provider).multi_agent_prompt(false);
assert!(!builder.multi_agent_prompt);
}
#[test]
fn build_injects_collab_prompt_when_enabled() {
let provider = Arc::new(MockProvider::new(vec![]));
let mut builder = Orchestrator::builder(provider).sub_agent(
"writer",
"Writes content",
"You are a writer.",
);
assert!(
!builder.sub_agents[0]
.system_prompt
.contains("MULTI-AGENT COLLABORATION PROTOCOL")
);
if builder.multi_agent_prompt {
for agent in &mut builder.sub_agents {
agent
.system_prompt
.push_str(&crate::agent::prompts::render_collab_prompt(
&agent.name,
&agent.description,
));
}
}
assert!(
builder.sub_agents[0]
.system_prompt
.contains("MULTI-AGENT COLLABORATION PROTOCOL")
);
assert!(builder.sub_agents[0].system_prompt.contains("`writer`"));
assert!(
builder.sub_agents[0]
.system_prompt
.contains("Writes content")
);
}
#[test]
fn build_omits_collab_prompt_when_disabled() {
let provider = Arc::new(MockProvider::new(vec![]));
let mut builder = Orchestrator::builder(provider)
.multi_agent_prompt(false)
.sub_agent("writer", "Writes content", "You are a writer.");
if builder.multi_agent_prompt {
for agent in &mut builder.sub_agents {
agent
.system_prompt
.push_str(&crate::agent::prompts::render_collab_prompt(
&agent.name,
&agent.description,
));
}
}
assert!(
!builder.sub_agents[0]
.system_prompt
.contains("MULTI-AGENT COLLABORATION PROTOCOL")
);
assert_eq!(builder.sub_agents[0].system_prompt, "You are a writer.");
}
fn make_spawn_config() -> crate::types::SpawnConfig {
crate::types::SpawnConfig {
max_spawned_agents: 3,
tool_allowlist: vec![],
max_turns: 5,
max_tokens: 1024,
max_total_tokens: 10_000,
}
}
fn build_spawn_tool(
provider: Arc<MockProvider>,
config: crate::types::SpawnConfig,
tools: Vec<Arc<dyn Tool>>,
) -> SpawnAgentTool {
let mut tool_pool = std::collections::HashMap::new();
for tool in &tools {
let name = tool.definition().name;
if config.tool_allowlist.contains(&name) {
tool_pool.insert(name, tool.clone());
}
}
let cached_definition = SpawnAgentTool::build_definition(&config);
SpawnAgentTool {
shared_provider: Arc::new(BoxedProvider::from_arc(provider)),
spawn_config: config,
tool_pool,
spawn_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
spawned_names: Arc::new(Mutex::new(std::collections::HashSet::new())),
accumulated_tokens: Arc::new(Mutex::new(TokenUsage::default())),
permission_rules: crate::agent::permission::PermissionRuleset::default(),
shared_memory: None,
memory_namespace_prefix: None,
on_event: None,
on_text: None,
lsp_manager: None,
observability_mode: crate::agent::observability::ObservabilityMode::Production,
workspace: None,
guardrails: vec![],
audit_trail: None,
audit_user_id: None,
audit_tenant_id: None,
audit_delegation_chain: vec![],
cached_definition,
tenant_tracker: None,
}
}
#[tokio::test]
async fn spawn_agent_basic_execution() {
let provider = Arc::new(MockProvider::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "Tax analysis complete.".into(),
}],
usage: TokenUsage {
input_tokens: 50,
output_tokens: 20,
..Default::default()
},
stop_reason: StopReason::EndTurn,
model: None,
}]));
let tool = build_spawn_tool(provider, make_spawn_config(), vec![]);
let result = tool
.spawn(SpawnAgentInput {
name: "tax_specialist".into(),
system_prompt: "You are a tax law expert.".into(),
tools: vec![],
task: "Analyze tax implications.".into(),
})
.await
.unwrap();
assert!(!result.is_error);
assert!(result.content.contains("Tax analysis complete."));
assert!(result.content.contains("spawn:tax_specialist"));
assert_eq!(
tool.spawn_count.load(std::sync::atomic::Ordering::Relaxed),
1
);
}
#[tokio::test]
async fn spawn_agent_rejects_invalid_name() {
let provider = Arc::new(MockProvider::new(vec![]));
let tool = build_spawn_tool(provider, make_spawn_config(), vec![]);
let invalid_names = vec![
"Tax-Specialist",
"123abc",
"",
"has spaces",
"../path",
"a/b",
"UPPER",
];
for name in invalid_names {
let result = tool
.spawn(SpawnAgentInput {
name: name.into(),
system_prompt: "test".into(),
tools: vec![],
task: "test".into(),
})
.await
.unwrap();
assert!(
result.is_error,
"expected error for name '{name}', got success: {}",
result.content
);
assert!(
result.content.contains("Invalid agent name"),
"expected 'Invalid agent name' in error for name '{name}', got: {}",
result.content
);
}
}
#[tokio::test]
async fn spawn_agent_rejects_duplicate_name() {
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: TokenUsage::default(),
stop_reason: StopReason::EndTurn,
model: None,
},
]));
let tool = build_spawn_tool(provider, make_spawn_config(), vec![]);
let r1 = tool
.spawn(SpawnAgentInput {
name: "helper".into(),
system_prompt: "test".into(),
tools: vec![],
task: "test".into(),
})
.await
.unwrap();
assert!(!r1.is_error);
let r2 = tool
.spawn(SpawnAgentInput {
name: "helper".into(),
system_prompt: "test".into(),
tools: vec![],
task: "test".into(),
})
.await
.unwrap();
assert!(r2.is_error);
assert!(r2.content.contains("already used"));
}
#[tokio::test]
async fn spawn_agent_enforces_tool_allowlist() {
let provider = Arc::new(MockProvider::new(vec![]));
let mut config = make_spawn_config();
config.tool_allowlist = vec!["mock_read".into()];
let mock = MockTool::new("mock_read", "file content");
let tool = build_spawn_tool(provider, config, vec![Arc::new(mock)]);
let result = tool
.spawn(SpawnAgentInput {
name: "reader".into(),
system_prompt: "test".into(),
tools: vec!["bash".into()],
task: "test".into(),
})
.await
.unwrap();
assert!(result.is_error);
assert!(result.content.contains("not in allowlist"));
}
#[tokio::test]
async fn spawn_agent_count_cap() {
let provider = Arc::new(MockProvider::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: TokenUsage::default(),
stop_reason: StopReason::EndTurn,
model: None,
}]));
let mut config = make_spawn_config();
config.max_spawned_agents = 1;
let tool = build_spawn_tool(provider, config, vec![]);
let r1 = tool
.spawn(SpawnAgentInput {
name: "first".into(),
system_prompt: "test".into(),
tools: vec![],
task: "test".into(),
})
.await
.unwrap();
assert!(!r1.is_error);
let r2 = tool
.spawn(SpawnAgentInput {
name: "second".into(),
system_prompt: "test".into(),
tools: vec![],
task: "test".into(),
})
.await
.unwrap();
assert!(r2.is_error);
assert!(r2.content.contains("Spawn limit reached"));
}
#[tokio::test]
async fn spawn_agent_token_budget_enforcement() {
let provider = Arc::new(MockProvider::new(vec![]));
let mut config = make_spawn_config();
config.max_total_tokens = 100;
let tool = build_spawn_tool(provider, config, vec![]);
{
let mut acc = tool.accumulated_tokens.lock().unwrap();
*acc = TokenUsage {
input_tokens: 60,
output_tokens: 50,
..Default::default()
};
}
let result = tool
.spawn(SpawnAgentInput {
name: "spender".into(),
system_prompt: "test".into(),
tools: vec![],
task: "test".into(),
})
.await
.unwrap();
assert!(result.is_error);
assert!(result.content.contains("budget exhausted"));
}
#[tokio::test]
async fn spawn_agent_no_delegation_tools() {
let provider = Arc::new(MockProvider::new(vec![]));
let mut config = make_spawn_config();
config.tool_allowlist = vec!["mock_read".into()];
let mock = MockTool::new("mock_read", "content");
let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(mock)];
let mut tool_pool = std::collections::HashMap::new();
for t in &tools {
let name = t.definition().name;
if config.tool_allowlist.contains(&name) {
tool_pool.insert(name, t.clone());
}
}
tool_pool.insert(
"delegate_task".into(),
Arc::new(MockTool::new("delegate_task", "bad")),
);
tool_pool.remove("delegate_task");
tool_pool.remove("form_squad");
tool_pool.remove("spawn_agent");
assert!(tool_pool.contains_key("mock_read"));
assert!(!tool_pool.contains_key("delegate_task"));
assert!(!tool_pool.contains_key("form_squad"));
assert!(!tool_pool.contains_key("spawn_agent"));
drop(provider);
}
#[tokio::test]
async fn spawn_agent_emits_events() {
let events: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(vec![]));
let events_clone = events.clone();
let provider = Arc::new(MockProvider::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "result".into(),
}],
usage: TokenUsage::default(),
stop_reason: StopReason::EndTurn,
model: None,
}]));
let mut tool = build_spawn_tool(provider, make_spawn_config(), vec![]);
tool.on_event = Some(Arc::new(move |e: AgentEvent| {
events_clone.lock().unwrap().push(e);
}));
let _ = tool
.spawn(SpawnAgentInput {
name: "emitter".into(),
system_prompt: "test".into(),
tools: vec![],
task: "test".into(),
})
.await;
let events = events.lock().unwrap();
let spawned = events
.iter()
.any(|e| matches!(e, AgentEvent::AgentSpawned { spawned_name, .. } if spawned_name == "spawn:emitter"));
assert!(spawned, "expected AgentSpawned event");
let completed = events
.iter()
.any(|e| matches!(e, AgentEvent::SubAgentCompleted { agent, success, .. } if agent == "spawn:emitter" && *success));
assert!(completed, "expected SubAgentCompleted event");
}
#[tokio::test]
async fn spawn_agent_empty_tools() {
let provider = Arc::new(MockProvider::new(vec![CompletionResponse {
content: vec![ContentBlock::Text {
text: "Pure reasoning response".into(),
}],
usage: TokenUsage::default(),
stop_reason: StopReason::EndTurn,
model: None,
}]));
let tool = build_spawn_tool(provider, make_spawn_config(), vec![]);
let result = tool
.spawn(SpawnAgentInput {
name: "thinker".into(),
system_prompt: "You are a reasoning agent.".into(),
tools: vec![],
task: "Think about this.".into(),
})
.await
.unwrap();
assert!(!result.is_error);
assert!(result.content.contains("Pure reasoning response"));
}
#[tokio::test]
async fn spawn_agent_prompt_too_long() {
let provider = Arc::new(MockProvider::new(vec![]));
let tool = build_spawn_tool(provider, make_spawn_config(), vec![]);
let long_prompt = "x".repeat(SPAWN_MAX_PROMPT_BYTES + 1);
let result = tool
.spawn(SpawnAgentInput {
name: "verbose".into(),
system_prompt: long_prompt,
tools: vec![],
task: "test".into(),
})
.await
.unwrap();
assert!(result.is_error);
assert!(result.content.contains("System prompt too long"));
}
#[test]
fn spawn_config_validation_rejects_zero_agents() {
let toml_str = r#"
[provider]
name = "anthropic"
model = "claude-sonnet-4-20250514"
[orchestrator.spawn]
max_spawned_agents = 0
"#;
let err = crate::config::HeartbitConfig::from_toml(toml_str).unwrap_err();
assert!(
err.to_string()
.contains("max_spawned_agents must be at least 1"),
"err: {err}"
);
}
#[test]
fn spawn_config_validation_rejects_zero_turns() {
let toml_str = r#"
[provider]
name = "anthropic"
model = "claude-sonnet-4-20250514"
[orchestrator.spawn]
max_turns = 0
"#;
let err = crate::config::HeartbitConfig::from_toml(toml_str).unwrap_err();
assert!(
err.to_string().contains("max_turns must be at least 1"),
"err: {err}"
);
}
#[test]
fn spawn_config_from_toml() {
let toml_str = r#"
[provider]
name = "anthropic"
model = "claude-sonnet-4-20250514"
[orchestrator.spawn]
max_spawned_agents = 5
tool_allowlist = ["read", "grep", "bash"]
max_turns = 20
max_tokens = 8192
max_total_tokens = 100000
"#;
let config: crate::config::HeartbitConfig = toml::from_str(toml_str).unwrap();
let spawn = config.orchestrator.spawn.as_ref().unwrap();
assert_eq!(spawn.max_spawned_agents, 5);
assert_eq!(spawn.tool_allowlist, vec!["read", "grep", "bash"]);
assert_eq!(spawn.max_turns, 20);
assert_eq!(spawn.max_tokens, 8192);
assert_eq!(spawn.max_total_tokens, 100_000);
}
#[test]
fn spawn_disabled_by_default() {
let toml_str = r#"
[provider]
name = "anthropic"
model = "claude-sonnet-4-20250514"
"#;
let config: crate::config::HeartbitConfig = toml::from_str(toml_str).unwrap();
assert!(config.orchestrator.spawn.is_none());
}
#[test]
fn spawn_config_invalid_tool_rejected_at_build() {
let provider = Arc::new(MockProvider::new(vec![]));
let mut config = make_spawn_config();
config.tool_allowlist = vec!["nonexistent_tool".into()];
let result = Orchestrator::builder(provider)
.sub_agent("worker", "does work", "You work.")
.spawn_config(config, vec![]) .build();
match result {
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("nonexistent_tool"),
"expected error mentioning the bad tool, got: {msg}"
);
}
Ok(_) => panic!("expected build error for invalid tool in allowlist"),
}
}
#[test]
fn spawn_tool_definition_includes_allowlist() {
let mut config = make_spawn_config();
config.tool_allowlist = vec!["read".into(), "grep".into()];
let def = SpawnAgentTool::build_definition(&config);
assert_eq!(def.name, "spawn_agent");
assert!(def.description.contains("read, grep"));
assert!(def.description.contains("3 agents max"));
}
#[tokio::test]
async fn spawn_system_prompt_added_when_configured() {
use std::sync::Arc;
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::Text {
text: "No delegation needed.".into(),
}],
usage: TokenUsage::default(),
stop_reason: StopReason::EndTurn,
model: None,
},
]));
let config = make_spawn_config();
let mut orchestrator = Orchestrator::builder(provider)
.sub_agent("worker", "does work", "You work.")
.spawn_config(config, vec![])
.build()
.unwrap();
let output = orchestrator.run("test task").await.unwrap();
assert!(output.result.contains("No delegation needed."));
}
#[tokio::test(flavor = "multi_thread")]
async fn orchestrator_propagates_tenant_tracker_to_sub_agents() {
use crate::agent::tenant_tracker::TenantTokenTracker;
use crate::auth::TenantScope;
let tracker = Arc::new(TenantTokenTracker::new(1_000_000));
let scope = TenantScope::new("tenant-abc");
drop(tracker.reserve(&scope, 100_000).unwrap());
let initial_snap = tracker.snapshot();
assert_eq!(initial_snap.len(), 1, "entry must exist after reserve+drop");
assert_eq!(initial_snap[0].1.in_flight, 0);
let provider = Arc::new(MockProvider::new(vec![
CompletionResponse {
content: vec![ContentBlock::ToolUse {
id: "tt-call-1".into(),
name: "delegate_task".into(),
input: json!({
"tasks": [{"agent": "worker", "task": "do work"}]
}),
}],
stop_reason: StopReason::ToolUse,
usage: TokenUsage {
input_tokens: 5,
output_tokens: 5,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "Work done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 400,
output_tokens: 100,
..Default::default()
},
model: None,
},
CompletionResponse {
content: vec![ContentBlock::Text {
text: "All done.".into(),
}],
stop_reason: StopReason::EndTurn,
usage: TokenUsage {
input_tokens: 2,
output_tokens: 2,
..Default::default()
},
model: None,
},
]));
let mut orch = Orchestrator::builder(provider)
.audit_user_context("user-1", "tenant-abc")
.sub_agent("worker", "Does work", "You work.")
.tenant_tracker(tracker.clone())
.build()
.unwrap();
orch.run("do work").await.unwrap();
let snap_mid = tracker.snapshot();
let state_mid = snap_mid
.iter()
.find(|(tid, _)| tid == "tenant-abc")
.map(|(_, s)| s.clone())
.expect("entry for 'tenant-abc' should still exist after the run");
assert!(
state_mid.high_water >= 500,
"high_water ({}) must be >= 500 (sub-agent's 500 tokens); \
if it is < 500, the sub-agent runner is not propagating the tracker",
state_mid.high_water
);
drop(orch);
let snap_final = tracker.snapshot();
let state_final = snap_final
.iter()
.find(|(tid, _)| tid == "tenant-abc")
.map(|(_, s)| s.clone())
.expect("entry should still exist after drop");
assert_eq!(
state_final.in_flight, 0,
"in_flight should return to 0 after all runners are dropped"
);
}
}