use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::agent::context::ConversationContext;
use crate::agent::r#loop::AgentEvent;
use crate::api::provider::OpenAiCompatibleProvider;
use crate::config::Config;
#[derive(Debug)]
pub struct SubagentTask {
pub id: String,
pub prompt: String,
pub agent_name: Option<String>,
pub available_agents: Vec<crate::config::AgentDef>,
pub model_override: Option<String>,
pub working_dir_override: Option<String>,
pub outer_event_tx:
Option<tokio::sync::mpsc::UnboundedSender<crate::agent::r#loop::AgentEvent>>,
pub cancel_token: Option<CancellationToken>,
pub iteration_budget: Option<crate::agent::guard::IterationBudget>,
pub instruction_rx: Option<
tokio::sync::mpsc::UnboundedReceiver<crate::agent::swarm::knowledge::WorkerInstruction>,
>,
}
#[derive(Debug)]
pub struct SubagentResult {
pub id: String,
pub success: bool,
pub response: String,
pub modified_files: Vec<String>,
pub tool_calls: u32,
pub input_tokens: u64,
pub output_tokens: u64,
pub continuation_hint: Option<String>,
}
#[derive(Default, Clone)]
pub struct SharedResources {
pub mcp_manager: Option<std::sync::Arc<crate::mcp::manager::McpManager>>,
pub tool_index: Option<std::sync::Arc<crate::tools::tool_index::ToolIndex>>,
pub skill_registry: Option<std::sync::Arc<crate::skills::SkillRegistry>>,
pub shared_knowledge: Option<crate::agent::swarm::knowledge::SharedKnowledge>,
pub hook_runtime: Option<std::sync::Arc<crate::plugin::hooks::HookRuntime>>,
}
pub fn spawn(
task: SubagentTask,
client: OpenAiCompatibleProvider,
config: Config,
system_prompt: String,
working_dir: String,
lsp_manager: crate::lsp::manager::LspManager,
mcp_manager: Option<std::sync::Arc<crate::mcp::manager::McpManager>>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = SubagentResult> + Send>> {
let shared = SharedResources {
mcp_manager,
..Default::default()
};
Box::pin(spawn_inner(
task,
client,
config,
system_prompt,
working_dir,
lsp_manager,
shared,
))
}
pub fn spawn_with_resources(
task: SubagentTask,
client: OpenAiCompatibleProvider,
config: Config,
system_prompt: String,
working_dir: String,
lsp_manager: crate::lsp::manager::LspManager,
shared: SharedResources,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = SubagentResult> + Send>> {
Box::pin(spawn_inner(
task,
client,
config,
system_prompt,
working_dir,
lsp_manager,
shared,
))
}
async fn spawn_inner(
mut task: SubagentTask,
client: OpenAiCompatibleProvider,
config: Config,
system_prompt: String,
working_dir: String,
lsp_manager: crate::lsp::manager::LspManager,
shared: SharedResources,
) -> SubagentResult {
let mcp_manager = shared.mcp_manager;
let shared_knowledge = shared.shared_knowledge;
let shared_tool_index = shared.tool_index;
let shared_skill_registry = shared.skill_registry;
let hook_runtime = shared.hook_runtime;
let instruction_rx = task.instruction_rx.take();
let mut config = config;
if let Some(budget) = task.iteration_budget.clone() {
config.iteration_budget = Some(budget);
}
let resolved_agent_name = task.agent_name.as_deref().unwrap_or("agent").to_string();
let agent_id_for_hooks = task.id.clone();
let agent_name_for_hooks = resolved_agent_name.clone();
let wd_for_hooks = working_dir.clone();
if let Some(ref hr) = hook_runtime
&& hr.has_hooks(crate::plugin::hooks::HookEvent::SubagentStart)
{
let ctx = crate::plugin::hooks::HookContext::subagent_start(
&agent_id_for_hooks,
&agent_name_for_hooks,
std::path::Path::new(&wd_for_hooks),
);
let results = hr
.fire(crate::plugin::hooks::HookEvent::SubagentStart, &ctx)
.await;
for action in &results {
if let crate::plugin::hooks::HookAction::Error(e) = action {
tracing::warn!(error = %e, "Plugin SubagentStart hook error");
}
}
}
let agent_def = config
.agents
.iter()
.find(|a| a.name.eq_ignore_ascii_case(&resolved_agent_name))
.cloned();
let effective_system_prompt =
resolve_effective_system_prompt(&task, &config, agent_def.as_ref(), system_prompt);
let context = ConversationContext::new(effective_system_prompt);
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<AgentEvent>();
let cancel = task.cancel_token.clone().unwrap_or_default();
let mut actual_client = client;
apply_model_override(&mut actual_client, task.model_override.as_deref());
let actual_client_for_reflect = actual_client.clone();
let soul_enabled = super::soul::is_enabled(&config, agent_def.as_ref());
let soul_home = config.collet_home.clone();
let soul_model = config.model.clone();
let soul_agent = resolved_agent_name.clone();
let prompt = task.prompt.clone();
let effective_working_dir = task.working_dir_override.clone().unwrap_or(working_dir);
let knowledge_for_claim = shared_knowledge.clone();
tokio::spawn(async move {
let agent_params = crate::agent::r#loop::AgentParams {
client: actual_client,
config,
context,
user_msg: prompt,
working_dir: effective_working_dir,
event_tx,
cancel,
lsp_manager,
trust_level: crate::trust::TrustLevel::Full,
approval_gate: crate::agent::approval::ApprovalGate::yolo(),
images: Vec::new(),
};
let mcp = mcp_manager
.unwrap_or_else(|| std::sync::Arc::new(crate::mcp::manager::McpManager::empty()));
crate::agent::r#loop::run_with_shared_mcp(
agent_params,
crate::agent::r#loop::SwarmParams {
mcp_manager: mcp,
shared_knowledge,
shared_tool_index,
shared_skill_registry,
instruction_rx,
},
)
.await;
});
let mut response = String::new();
let mut modified_files = Vec::new();
let mut tool_calls = 0u32;
let mut success = true;
let mut input_tokens = 0u64;
let mut output_tokens = 0u64;
let mut continuation_hint: Option<String> = None;
while let Some(event) = event_rx.recv().await {
match event {
AgentEvent::Token(token) => {
response.push_str(&token);
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmAgentToken {
agent_id: task.id.clone(),
text: token,
});
}
}
AgentEvent::Response(text) => {
if response.is_empty() {
response = text.clone();
}
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmAgentResponse {
agent_id: task.id.clone(),
text,
});
}
}
AgentEvent::ToolCall {
ref name, ref args, ..
} => {
tool_calls += 1;
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmAgentToolCall {
agent_id: task.id.clone(),
name: name.clone(),
args: args.clone(),
});
}
}
AgentEvent::ToolResult {
ref name,
ref result,
success: tool_success,
..
} => {
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmAgentToolResult {
agent_id: task.id.clone(),
name: name.clone(),
result: truncate_result(result),
success: tool_success,
});
}
}
AgentEvent::FileModified { path } => {
modified_files.push(path);
}
AgentEvent::Status {
prompt_tokens,
completion_tokens,
iteration,
..
} => {
input_tokens += prompt_tokens as u64;
output_tokens += completion_tokens as u64;
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmAgentProgress {
agent_id: task.id.clone(),
agent_name: task.agent_name.clone().unwrap_or_else(|| task.id.clone()),
iteration,
status: format!(
"iter {iteration} ({prompt_tokens}t in, {completion_tokens}t out)"
),
});
}
}
AgentEvent::Error(_) => {
success = false;
}
AgentEvent::SwarmWorkerApproaching { remaining, .. } => {
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmWorkerApproaching {
agent_id: task.id.clone(),
task_preview: crate::util::truncate_bytes(&task.prompt, 80).to_string(),
remaining,
});
}
}
AgentEvent::SwarmWorkerPaused { .. } => {
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmWorkerPaused {
agent_id: task.id.clone(),
});
}
}
AgentEvent::SwarmWorkerResumed { .. } => {
if let Some(ref tx) = task.outer_event_tx {
let _ = tx.send(AgentEvent::SwarmWorkerResumed {
agent_id: task.id.clone(),
});
}
}
AgentEvent::Done {
context: done_ctx,
stop_reason,
..
} => {
if stop_reason
.as_ref()
.map(|r| r.is_continuable())
.unwrap_or(false)
{
continuation_hint = Some(build_continuation_hint(&task.prompt, &response));
}
if soul_enabled {
let effective_soul_agent = if !task.available_agents.is_empty() {
response
.lines()
.find_map(|line| {
let l = line.trim();
l.to_uppercase()
.starts_with("ROLE:")
.then(|| l[5..].trim().to_lowercase())
.filter(|r| !r.is_empty())
})
.unwrap_or_else(|| soul_agent.clone())
} else {
soul_agent.clone()
};
let messages = done_ctx.messages().to_vec();
let home = soul_home.clone();
let model = soul_model.clone();
let reflect_client = actual_client_for_reflect.clone();
tokio::spawn(async move {
if let Err(e) = super::soul::reflect_simple(
&reflect_client,
&model,
&home,
&effective_soul_agent,
&messages,
None,
)
.await
{
tracing::warn!(agent = %effective_soul_agent, "Subagent soul reflection failed: {e}");
}
});
}
break;
}
_ => {}
}
}
if !task.available_agents.is_empty()
&& let Some(ref kb) = knowledge_for_claim
{
claim_role_on_blackboard(kb, &task.id, &response).await;
}
if let Some(ref hr) = hook_runtime
&& hr.has_hooks(crate::plugin::hooks::HookEvent::SubagentStop)
{
let ctx = crate::plugin::hooks::HookContext::subagent_stop(
&agent_id_for_hooks,
&agent_name_for_hooks,
success,
std::path::Path::new(&wd_for_hooks),
);
let results = hr
.fire(crate::plugin::hooks::HookEvent::SubagentStop, &ctx)
.await;
for action in &results {
if let crate::plugin::hooks::HookAction::Error(e) = action {
tracing::warn!(error = %e, "Plugin SubagentStop hook error");
}
}
}
SubagentResult {
id: task.id,
success,
response,
modified_files,
tool_calls,
input_tokens,
output_tokens,
continuation_hint,
}
}
fn resolve_effective_system_prompt(
task: &SubagentTask,
config: &Config,
agent_def: Option<&crate::config::AgentDef>,
system_prompt: String,
) -> String {
let base = if let Some(name) = task.agent_name.as_deref() {
config
.agents
.iter()
.find(|a| a.name.eq_ignore_ascii_case(name))
.filter(|a| !a.system_prompt.is_empty())
.map(|a| a.system_prompt.clone())
.unwrap_or(system_prompt)
} else if !task.available_agents.is_empty() {
let roster = task
.available_agents
.iter()
.filter(|a| !a.system_prompt.is_empty())
.map(|a| {
let desc = a.description.as_deref().unwrap_or("general-purpose agent");
format!("- {}: {}", a.name, desc)
})
.collect::<Vec<_>>()
.join("\n");
format!(
"## Agent Role Selection\n\
You are one agent in a collaborative swarm. Based on your assigned task, \
select the most appropriate role from the available agents below and fully \
adopt that agent's behavior, expertise, and constraints for this task.\n\n\
Available agents:\n{roster}\n\n\
**REQUIRED**: Begin your very first response with exactly:\n\
`ROLE: <agent_name>`\n\
where <agent_name> is one of the names listed above. Then proceed with your work.\n\
Example: `ROLE: security`\n\n\
---\n\n\
{system_prompt}"
)
} else {
system_prompt
};
let resolved_agent_name = task.agent_name.as_deref().unwrap_or("agent");
if super::soul::is_enabled(config, agent_def)
&& let Some(soul) = super::soul::load(&config.collet_home, resolved_agent_name)
{
format!(
"{base}\n\n\
## Soul (Your Persistent Memory & Personality)\n\n\
The following is YOUR soul — your evolving identity, thoughts, and growth.\n\
Let it influence your tone, opinions, and approach naturally.\n\n{soul}"
)
} else {
base
}
}
fn apply_model_override(client: &mut OpenAiCompatibleProvider, spec: Option<&str>) {
let Some(spec) = spec else {
return;
};
let Some(slash) = spec.find('/') else {
client.model = spec.to_string();
return;
};
let provider_name = &spec[..slash];
let model = &spec[slash + 1..];
if let Some((entry, api_key)) = crate::config::resolve_provider(provider_name) {
if !entry.base_url.is_empty() {
let profile = crate::api::model_profile::profile_for(model);
client.switch_provider(
entry.base_url,
api_key,
model.to_string(),
profile.max_output_tokens,
);
} else {
client.model = model.to_string();
}
} else {
client.model = model.to_string();
}
}
async fn claim_role_on_blackboard(
kb: &crate::agent::swarm::knowledge::SharedKnowledge,
task_id: &str,
response: &str,
) {
let claimed_role = response.lines().find_map(|line| {
let l = line.trim();
l.to_uppercase()
.starts_with("ROLE:")
.then(|| l[5..].trim().to_lowercase())
.filter(|r| !r.is_empty())
});
if let Some(role) = claimed_role {
tracing::info!(%task_id, %role, "Hive agent claimed role");
kb.post_to_blackboard(
&format!("role_claim:{role}"),
&role,
task_id,
crate::agent::swarm::knowledge::BlackboardKind::Claim,
)
.await;
} else {
tracing::warn!(%task_id, "Hive agent did not declare ROLE: in response");
}
}
fn truncate_result(s: &str) -> String {
const MAX_LEN: usize = 2000;
if s.len() <= MAX_LEN {
s.to_string()
} else {
let truncated: String = s.chars().take(MAX_LEN).collect();
format!("{truncated}\n... (truncated)")
}
}
fn build_continuation_hint(original_prompt: &str, last_response: &str) -> String {
let progress = if last_response.len() > 800 {
let start = last_response
.char_indices()
.rev()
.nth(800)
.map(|(i, _)| i)
.unwrap_or(0);
&last_response[start..]
} else {
last_response
};
format!(
"## Continuation Task\n\
The previous agent was stopped mid-task due to iteration limits.\n\
It made partial progress — continue from where it left off.\n\n\
### Original task\n\
{original_prompt}\n\n\
### Progress so far (last agent output)\n\
{progress}\n\n\
### Your job\n\
Review the progress above, identify what remains unfinished, and complete it.\n\
Do not redo work that was already completed successfully."
)
}
pub async fn spawn_parallel(
tasks: Vec<SubagentTask>,
client: OpenAiCompatibleProvider,
config: Config,
system_prompt: String,
working_dir: String,
lsp_manager: crate::lsp::manager::LspManager,
mcp_manager: Option<std::sync::Arc<crate::mcp::manager::McpManager>>,
) -> Vec<SubagentResult> {
use crate::agent::rate_limit::{
AcquireResult, ModelKey, ModelRateLimiter, parse_providers_chain,
};
use std::sync::Arc;
let limiter = Arc::new(ModelRateLimiter::new(&config));
let mut handles = Vec::new();
for task in tasks {
let client = client.clone();
let config = config.clone();
let sp = system_prompt.clone();
let wd = working_dir.clone();
let lsp = lsp_manager.clone();
let mcp = mcp_manager.clone();
let limiter = limiter.clone();
handles.push(tokio::spawn(async move {
let agent_def = task
.agent_name
.as_deref()
.and_then(|name| {
config
.agents
.iter()
.find(|a| a.name.eq_ignore_ascii_case(name))
})
.or_else(|| {
let m = task.model_override.as_deref().unwrap_or(&config.model);
config.agents.iter().find(|a| a.model == m)
});
let model = agent_def
.map(|a| a.model.as_str())
.filter(|m| *m != "default")
.or(task.model_override.as_deref())
.unwrap_or(&config.model);
let primary_provider = agent_def
.and_then(|a| a.provider.as_deref())
.unwrap_or("default");
let primary = ModelKey::new(primary_provider, model);
let fallback_chain: Vec<ModelKey> = agent_def
.map(|a| parse_providers_chain(&a.providers.join(",")))
.unwrap_or_default();
let result = limiter
.acquire_with_fallback(&primary, &fallback_chain, &config)
.await;
let (effective_key, _permit) = match result {
AcquireResult::Acquired { key, permit } => (key, permit),
AcquireResult::QueueWait { key } => {
let permit = limiter.acquire_wait(&key).await;
(key, permit)
}
};
let mut actual_client = client;
if effective_key.model != model || effective_key.provider != primary_provider {
tracing::info!(
task_id = %task.id,
original = %primary.as_str(),
actual = %effective_key.as_str(),
"Subagent using fallback model"
);
if let Some((entry, api_key)) =
crate::config::resolve_provider(&effective_key.provider)
{
if entry.is_cli() {
actual_client.model = effective_key.model.clone();
} else {
let profile = crate::api::model_profile::profile_for(&effective_key.model);
actual_client.switch_provider(
entry.base_url.clone(),
api_key,
effective_key.model.clone(),
profile.max_output_tokens,
);
}
} else {
actual_client.model = effective_key.model.clone();
}
}
let mut task = task;
task.model_override = Some(effective_key.model.clone());
spawn(task, actual_client, config, sp, wd, lsp, mcp).await
}));
}
let mut results = Vec::new();
for handle in handles {
match handle.await {
Ok(result) => results.push(result),
Err(e) => {
tracing::error!("Subagent task panicked: {e}");
}
}
}
results
}