use std::collections::{HashMap, HashSet, VecDeque};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, OnceLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Semaphore};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use futures_util::stream::{FuturesUnordered, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::client::DeepSeekClient;
use crate::config::MAX_SUBAGENTS;
use crate::core::events::Event;
use crate::llm_client::LlmClient;
use crate::models::{ContentBlock, Message, MessageRequest, SystemPrompt, Tool};
use crate::tools::plan::{PlanState, SharedPlanState};
use crate::tools::registry::{ToolRegistry, ToolRegistryBuilder};
use crate::tools::spec::{
ApprovalRequirement, ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec,
optional_bool, optional_u64, required_str,
};
use crate::tools::todo::{SharedTodoList, TodoList};
pub mod mailbox;
#[allow(unused_imports)]
pub use mailbox::{Mailbox, MailboxEnvelope, MailboxMessage, MailboxReceiver};
const DEFAULT_MAX_STEPS: u32 = 100;
const TOOL_TIMEOUT: Duration = Duration::from_secs(30);
const STEP_API_TIMEOUT: Duration = Duration::from_secs(120);
const RESULT_POLL_INTERVAL: Duration = Duration::from_millis(250);
const DEFAULT_RESULT_TIMEOUT_MS: u64 = 30_000;
const MIN_WAIT_TIMEOUT_MS: u64 = 10_000;
const MAX_RESULT_TIMEOUT_MS: u64 = 3_600_000;
const COMPLETED_AGENT_RETENTION: Duration = Duration::from_secs(60 * 60);
const SUBAGENT_STATE_SCHEMA_VERSION: u32 = 1;
const SUBAGENT_STATE_FILE: &str = "subagents.v1.json";
const SUBAGENT_RESTART_REASON: &str = "Interrupted by process restart";
const DEFAULT_CSV_MAX_CONCURRENCY: u64 = 16;
const DEFAULT_CSV_MAX_RUNTIME_SECONDS: u64 = 1800;
const MAX_CSV_MAX_RUNTIME_SECONDS: u64 = 86_400;
const VALID_SUBAGENT_TYPES: &str =
"general, explore, plan, review, custom, worker, explorer, awaiter, default";
const DEPRECATION_REMOVAL_VERSION: &str = "0.8.0";
static AGENT_JOB_REPORTS: OnceLock<StdMutex<HashMap<String, HashMap<String, AgentJobReport>>>> =
OnceLock::new();
static AGENT_JOB_ASSIGNMENTS: OnceLock<StdMutex<HashMap<String, HashMap<String, String>>>> =
OnceLock::new();
fn wrap_with_deprecation_notice(
mut result: ToolResult,
this_tool: &str,
use_instead: &str,
) -> ToolResult {
tracing::warn!(
"Deprecated tool '{}' invoked — use '{}' instead (removal: v{})",
this_tool,
use_instead,
DEPRECATION_REMOVAL_VERSION,
);
let notice = json!({
"_deprecation": {
"this_tool": this_tool,
"use_instead": use_instead,
"removed_in": DEPRECATION_REMOVAL_VERSION,
"message": format!(
"Tool '{}' is deprecated; switch to '{}' before v{}.",
this_tool, use_instead, DEPRECATION_REMOVAL_VERSION
)
}
});
result.metadata = Some(match result.metadata.take() {
Some(Value::Object(mut map)) => {
if let Value::Object(notice_map) = notice {
map.extend(notice_map);
}
Value::Object(map)
}
Some(other) => {
json!({ "_deprecation": notice["_deprecation"].clone(), "_original_metadata": other })
}
None => notice,
});
result
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubAgentAssignment {
pub objective: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
}
impl SubAgentAssignment {
fn new(objective: String, role: Option<String>) -> Self {
Self { objective, role }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "snake_case")]
pub enum SubAgentType {
#[default]
General,
Explore,
Plan,
Review,
Custom,
}
impl SubAgentType {
#[must_use]
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"general" | "general-purpose" | "general_purpose" | "worker" | "default" => {
Some(Self::General)
}
"explore" | "exploration" | "explorer" => Some(Self::Explore),
"plan" | "planning" | "awaiter" => Some(Self::Plan),
"review" | "code-review" | "code_review" | "reviewer" => Some(Self::Review),
"custom" => Some(Self::Custom),
_ => None,
}
}
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::General => "general",
Self::Explore => "explore",
Self::Plan => "plan",
Self::Review => "review",
Self::Custom => "custom",
}
}
#[must_use]
pub fn system_prompt(&self) -> String {
match self {
Self::General => GENERAL_AGENT_PROMPT.to_string(),
Self::Explore => EXPLORE_AGENT_PROMPT.to_string(),
Self::Plan => PLAN_AGENT_PROMPT.to_string(),
Self::Review => REVIEW_AGENT_PROMPT.to_string(),
Self::Custom => CUSTOM_AGENT_PROMPT.to_string(),
}
}
#[must_use]
#[deprecated(
since = "0.6.6",
note = "Default sub-agents inherit the full parent registry; pass an explicit allowed_tools list only for narrow Custom roles."
)]
pub fn allowed_tools(&self) -> Vec<&'static str> {
match self {
Self::General => vec![
"list_dir",
"read_file",
"write_file",
"edit_file",
"apply_patch",
"grep_files",
"file_search",
"web.run",
"web_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
"note",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
"update_plan",
"report_agent_job_result",
],
Self::Explore => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"web.run",
"web_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
],
Self::Plan => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"web.run",
"note",
"update_plan",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
],
Self::Review => vec!["list_dir", "read_file", "grep_files", "file_search", "note"],
Self::Custom => vec![], }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SubAgentStatus {
Running,
Completed,
Interrupted(String),
Failed(String),
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentResult {
pub agent_id: String,
pub agent_type: SubAgentType,
pub assignment: SubAgentAssignment,
pub status: SubAgentStatus,
pub result: Option<String>,
pub steps_taken: u32,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WaitMode {
Any,
All,
}
impl WaitMode {
fn from_str(value: &str) -> Option<Self> {
match value.to_ascii_lowercase().as_str() {
"any" | "first" => Some(Self::Any),
"all" => Some(Self::All),
_ => None,
}
}
fn as_str(self) -> &'static str {
match self {
Self::Any => "any",
Self::All => "all",
}
}
fn condition_met(self, snapshots: &[SubAgentResult]) -> bool {
match self {
Self::Any => snapshots
.iter()
.any(|snapshot| snapshot.status != SubAgentStatus::Running),
Self::All => snapshots
.iter()
.all(|snapshot| snapshot.status != SubAgentStatus::Running),
}
}
}
#[derive(Debug, Clone)]
struct SubAgentInput {
text: String,
interrupt: bool,
}
#[derive(Debug, Clone)]
struct SpawnRequest {
prompt: String,
agent_type: SubAgentType,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
cwd: Option<PathBuf>,
}
#[derive(Debug, Clone)]
struct AssignRequest {
agent_id: String,
objective: Option<String>,
role: Option<String>,
message: Option<String>,
interrupt: bool,
}
#[derive(Debug, Clone)]
struct CsvRowTask {
row_index: usize,
item_id: String,
values: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize)]
struct CsvWorkerOutcome {
#[serde(skip_serializing)]
row_index: usize,
item_id: String,
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
agent_id: Option<String>,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
result_json: Option<Value>,
}
#[derive(Debug, Clone)]
struct AgentJobReport {
result: Value,
stop: bool,
}
#[derive(Debug, Clone, Serialize)]
struct SpawnAgentsOnCsvSummary {
job_id: String,
total: usize,
completed: usize,
failed: usize,
timed_out: usize,
skipped: usize,
output_csv_path: String,
results: Vec<CsvWorkerOutcome>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedSubAgent {
id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
status: SubAgentStatus,
result: Option<String>,
steps_taken: u32,
duration_ms: u64,
allowed_tools: Vec<String>,
updated_at_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedSubAgentState {
schema_version: u32,
agents: Vec<PersistedSubAgent>,
}
impl Default for PersistedSubAgentState {
fn default() -> Self {
Self {
schema_version: SUBAGENT_STATE_SCHEMA_VERSION,
agents: Vec::new(),
}
}
}
pub const DEFAULT_MAX_SPAWN_DEPTH: u32 = 3;
#[derive(Clone)]
pub struct SubAgentRuntime {
pub client: DeepSeekClient,
pub model: String,
pub context: ToolContext,
pub allow_shell: bool,
pub event_tx: Option<mpsc::Sender<Event>>,
pub manager: SharedSubAgentManager,
pub spawn_depth: u32,
pub max_spawn_depth: u32,
pub cancel_token: CancellationToken,
pub mailbox: Option<Mailbox>,
}
impl SubAgentRuntime {
#[must_use]
pub fn new(
client: DeepSeekClient,
model: String,
context: ToolContext,
allow_shell: bool,
event_tx: Option<mpsc::Sender<Event>>,
manager: SharedSubAgentManager,
) -> Self {
Self {
client,
model,
context,
allow_shell,
event_tx,
manager,
spawn_depth: 0,
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
}
}
#[must_use]
#[allow(dead_code)] pub fn with_mailbox(mut self, mailbox: Mailbox) -> Self {
self.mailbox = Some(mailbox);
self
}
#[must_use]
#[allow(dead_code)] pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
self.cancel_token = token;
self
}
#[must_use]
#[allow(dead_code)]
pub fn with_max_spawn_depth(mut self, max: u32) -> Self {
self.max_spawn_depth = max;
self
}
#[must_use]
pub fn child_runtime(&self) -> Self {
let mut child_context = self.context.clone();
child_context.auto_approve = true;
Self {
client: self.client.clone(),
model: self.model.clone(),
context: child_context,
allow_shell: self.allow_shell,
event_tx: self.event_tx.clone(),
manager: self.manager.clone(),
spawn_depth: self.spawn_depth + 1,
max_spawn_depth: self.max_spawn_depth,
cancel_token: self.cancel_token.child_token(),
mailbox: self.mailbox.clone(),
}
}
#[must_use]
pub fn would_exceed_depth(&self) -> bool {
self.spawn_depth + 1 > self.max_spawn_depth
}
}
pub struct SubAgent {
pub id: String,
pub agent_type: SubAgentType,
pub prompt: String,
pub assignment: SubAgentAssignment,
pub status: SubAgentStatus,
pub result: Option<String>,
pub steps_taken: u32,
pub started_at: Instant,
pub allowed_tools: Option<Vec<String>>,
input_tx: Option<mpsc::UnboundedSender<SubAgentInput>>,
task_handle: Option<JoinHandle<()>>,
}
impl SubAgent {
fn new(
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
input_tx: mpsc::UnboundedSender<SubAgentInput>,
) -> Self {
let id = format!("agent_{}", &Uuid::new_v4().to_string()[..8]);
Self {
id,
agent_type,
prompt,
assignment,
status: SubAgentStatus::Running,
result: None,
steps_taken: 0,
started_at: Instant::now(),
allowed_tools,
input_tx: Some(input_tx),
task_handle: None,
}
}
#[must_use]
pub fn snapshot(&self) -> SubAgentResult {
SubAgentResult {
agent_id: self.id.clone(),
agent_type: self.agent_type.clone(),
assignment: self.assignment.clone(),
status: self.status.clone(),
result: self.result.clone(),
steps_taken: self.steps_taken,
duration_ms: u64::try_from(self.started_at.elapsed().as_millis()).unwrap_or(u64::MAX),
}
}
}
pub struct SubAgentManager {
agents: HashMap<String, SubAgent>,
#[allow(dead_code)] workspace: PathBuf,
state_path: Option<PathBuf>,
max_steps: u32,
max_agents: usize,
}
impl SubAgentManager {
#[must_use]
pub fn new(workspace: PathBuf, max_agents: usize) -> Self {
Self {
agents: HashMap::new(),
workspace,
state_path: None,
max_steps: DEFAULT_MAX_STEPS,
max_agents,
}
}
#[must_use]
fn with_state_path(mut self, path: PathBuf) -> Self {
self.state_path = Some(path);
self
}
fn persist_state(&self) -> Result<()> {
let Some(path) = self.state_path.as_ref() else {
return Ok(());
};
let now_ms = epoch_millis_now();
let mut agents = Vec::with_capacity(self.agents.len());
for agent in self.agents.values() {
agents.push(PersistedSubAgent {
id: agent.id.clone(),
agent_type: agent.agent_type.clone(),
prompt: agent.prompt.clone(),
assignment: agent.assignment.clone(),
status: agent.status.clone(),
result: agent.result.clone(),
steps_taken: agent.steps_taken,
duration_ms: u64::try_from(agent.started_at.elapsed().as_millis())
.unwrap_or(u64::MAX),
allowed_tools: agent.allowed_tools.clone().unwrap_or_default(),
updated_at_ms: now_ms,
});
}
agents.sort_by(|a, b| a.id.cmp(&b.id));
let payload = PersistedSubAgentState {
schema_version: SUBAGENT_STATE_SCHEMA_VERSION,
agents,
};
write_json_atomic(path, &payload)
}
fn persist_state_best_effort(&self) {
if let Err(err) = self.persist_state() {
eprintln!("Failed to persist sub-agent state: {err}");
}
}
fn load_state(&mut self) -> Result<()> {
let Some(path) = self.state_path.as_ref() else {
return Ok(());
};
if !path.exists() {
return Ok(());
}
let raw = fs::read_to_string(path)?;
let state = serde_json::from_str::<PersistedSubAgentState>(&raw)?;
if state.schema_version != SUBAGENT_STATE_SCHEMA_VERSION {
return Err(anyhow!(
"Unsupported sub-agent state schema {}",
state.schema_version
));
}
self.agents.clear();
for persisted in state.agents {
let mut status = persisted.status;
if matches!(status, SubAgentStatus::Running) {
status = SubAgentStatus::Interrupted(SUBAGENT_RESTART_REASON.to_string());
}
let started_at = instant_from_duration(Duration::from_millis(persisted.duration_ms));
let allowed_tools = if persisted.allowed_tools.is_empty() {
None
} else {
Some(persisted.allowed_tools)
};
let agent = SubAgent {
id: persisted.id.clone(),
agent_type: persisted.agent_type,
prompt: persisted.prompt,
assignment: persisted.assignment,
status,
result: persisted.result,
steps_taken: persisted.steps_taken,
started_at,
allowed_tools,
input_tx: None,
task_handle: None,
};
self.agents.insert(persisted.id, agent);
}
Ok(())
}
pub fn running_count(&self) -> usize {
self.agents
.values()
.filter(|agent| {
if agent.status != SubAgentStatus::Running {
return false;
}
!agent
.task_handle
.as_ref()
.is_some_and(tokio::task::JoinHandle::is_finished)
})
.count()
}
#[must_use]
pub fn max_agents(&self) -> usize {
self.max_agents
}
#[must_use]
pub fn available_slots(&self) -> usize {
self.max_agents.saturating_sub(self.running_count())
}
pub fn spawn_background(
&mut self,
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
allowed_tools: Option<Vec<String>>,
) -> Result<SubAgentResult> {
self.spawn_background_with_assignment(
manager_handle,
runtime,
agent_type,
prompt.clone(),
SubAgentAssignment::new(prompt, None),
allowed_tools,
)
}
pub fn spawn_background_with_assignment(
&mut self,
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
) -> Result<SubAgentResult> {
self.cleanup(COMPLETED_AGENT_RETENTION);
if self.running_count() >= self.max_agents {
return Err(anyhow!(
"Sub-agent limit reached (max {}, running {}). Cancel, close, or wait for an existing agent to finish. Consider issuing multiple tool calls in one turn (the dispatcher runs them in parallel) for parallel one-shot work.",
self.max_agents,
self.running_count()
));
}
let tools = build_allowed_tools(&agent_type, allowed_tools, runtime.allow_shell)?;
let (input_tx, input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
agent_type.clone(),
prompt.clone(),
assignment.clone(),
tools.clone(),
input_tx,
);
let agent_id = agent.id.clone();
let started_at = agent.started_at;
let max_steps = self.max_steps;
if let Some(event_tx) = runtime.event_tx.clone() {
let _ = event_tx.try_send(Event::AgentSpawned {
id: agent_id.clone(),
prompt: prompt.clone(),
});
}
let task = SubAgentTask {
manager_handle,
runtime,
agent_id: agent_id.clone(),
agent_type,
prompt,
assignment,
allowed_tools: tools,
started_at,
max_steps,
input_rx,
};
let handle = tokio::spawn(run_subagent_task(task));
agent.task_handle = Some(handle);
self.agents.insert(agent_id.clone(), agent);
self.persist_state_best_effort();
Ok(self
.agents
.get(&agent_id)
.expect("agent should exist after spawn")
.snapshot())
}
pub fn get_result(&self, agent_id: &str) -> Result<SubAgentResult> {
let agent = self
.agents
.get(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
Ok(agent.snapshot())
}
pub fn cancel(&mut self, agent_id: &str) -> Result<SubAgentResult> {
let (snapshot, changed) = {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
let mut changed = false;
if agent.status == SubAgentStatus::Running {
agent.status = SubAgentStatus::Cancelled;
if let Some(handle) = agent.task_handle.take() {
handle.abort();
}
changed = true;
}
(agent.snapshot(), changed)
};
if changed {
self.persist_state_best_effort();
}
Ok(snapshot)
}
pub fn resume(
&mut self,
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_id: &str,
) -> Result<SubAgentResult> {
let status = self
.agents
.get(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?
.status
.clone();
if status == SubAgentStatus::Running {
let agent = self
.agents
.get(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
return Ok(agent.snapshot());
}
if self.running_count() >= self.max_agents {
return Err(anyhow!(
"Sub-agent limit reached (max {}, running {}). Close or wait for an existing agent before resuming. Consider issuing multiple tool calls in one turn (the dispatcher runs them in parallel) for parallel one-shot work.",
self.max_agents,
self.running_count()
));
}
let snapshot = {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
let (input_tx, input_rx) = mpsc::unbounded_channel();
let restarted_at = Instant::now();
let task = SubAgentTask {
manager_handle,
runtime: runtime.clone(),
agent_id: agent.id.clone(),
agent_type: agent.agent_type.clone(),
prompt: agent.prompt.clone(),
assignment: agent.assignment.clone(),
allowed_tools: agent.allowed_tools.clone(),
started_at: restarted_at,
max_steps: self.max_steps,
input_rx,
};
let handle = tokio::spawn(run_subagent_task(task));
agent.status = SubAgentStatus::Running;
agent.result = None;
agent.steps_taken = 0;
agent.started_at = restarted_at;
agent.input_tx = Some(input_tx);
agent.task_handle = Some(handle);
if let Some(event_tx) = runtime.event_tx {
let _ = event_tx.try_send(Event::AgentSpawned {
id: agent.id.clone(),
prompt: format!("(resumed) {}", agent.prompt),
});
}
agent.snapshot()
};
self.persist_state_best_effort();
Ok(snapshot)
}
pub fn send_input(&mut self, agent_id: &str, text: String, interrupt: bool) -> Result<()> {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
if agent.status != SubAgentStatus::Running {
return Err(anyhow!("Agent {agent_id} is not running"));
}
let tx = agent
.input_tx
.as_ref()
.ok_or_else(|| anyhow!("Agent {agent_id} cannot accept input"))?;
tx.send(SubAgentInput { text, interrupt })
.map_err(|_| anyhow!("Failed to send input to agent {agent_id}"))?;
Ok(())
}
pub fn assign(
&mut self,
agent_id: &str,
objective: Option<String>,
role: Option<String>,
message: Option<String>,
interrupt: bool,
) -> Result<SubAgentResult> {
if objective.is_none() && role.is_none() && message.is_none() {
return Err(anyhow!(
"Provide at least one of objective, role, or message"
));
}
if message.is_some() {
let status = self
.agents
.get(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?
.status
.clone();
if status != SubAgentStatus::Running {
return Err(anyhow!(
"Agent {agent_id} is not running; cannot deliver assignment message"
));
}
}
let mut changed = false;
let (input_tx, payload) = {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
let mut assignment_lines = Vec::new();
if let Some(objective) = objective {
let objective = objective.trim();
if objective.is_empty() {
return Err(anyhow!("objective cannot be empty"));
}
if agent.assignment.objective != objective {
agent.assignment.objective = objective.to_string();
changed = true;
}
assignment_lines.push(format!("- objective: {}", agent.assignment.objective));
}
if let Some(role) = role {
let normalized = normalize_role_alias(&role)
.ok_or_else(|| {
anyhow!(
"Invalid role alias '{role}'. Use: worker, explorer, awaiter, default"
)
})?
.to_string();
if agent.assignment.role.as_deref() != Some(normalized.as_str()) {
agent.assignment.role = Some(normalized.clone());
changed = true;
}
assignment_lines.push(format!("- role: {normalized}"));
}
let mut payload_parts = Vec::new();
if !assignment_lines.is_empty() && agent.status == SubAgentStatus::Running {
payload_parts.push(format!(
"Assignment updated:\n{}",
assignment_lines.join("\n")
));
}
if let Some(message) = message {
let message = message.trim();
if message.is_empty() {
return Err(anyhow!("message cannot be empty"));
}
payload_parts.push(format!("Coordinator note:\n{message}"));
}
let payload = if payload_parts.is_empty() {
None
} else {
Some(payload_parts.join("\n\n"))
};
(agent.input_tx.clone(), payload)
};
if let Some(payload) = payload {
let tx = input_tx
.ok_or_else(|| anyhow!("Agent {agent_id} cannot accept assignment input"))?;
tx.send(SubAgentInput {
text: payload,
interrupt,
})
.map_err(|_| anyhow!("Failed to send assignment to agent {agent_id}"))?;
}
if changed {
self.persist_state_best_effort();
}
self.get_result(agent_id)
}
#[must_use]
pub fn list(&self) -> Vec<SubAgentResult> {
self.agents.values().map(SubAgent::snapshot).collect()
}
pub fn cleanup(&mut self, max_age: Duration) {
let before = self.agents.len();
self.agents.retain(|_, agent| {
if agent.status == SubAgentStatus::Running {
true
} else {
agent.started_at.elapsed() < max_age
}
});
if self.agents.len() != before {
self.persist_state_best_effort();
}
}
fn update_from_result(&mut self, agent_id: &str, result: SubAgentResult) {
let mut changed = false;
if let Some(agent) = self.agents.get_mut(agent_id) {
agent.status = result.status;
agent.assignment = result.assignment;
agent.result = result.result;
agent.steps_taken = result.steps_taken;
agent.task_handle = None;
changed = true;
}
if changed {
self.persist_state_best_effort();
}
}
fn update_failed(&mut self, agent_id: &str, error: String) {
let mut changed = false;
if let Some(agent) = self.agents.get_mut(agent_id) {
agent.status = SubAgentStatus::Failed(error);
agent.task_handle = None;
changed = true;
}
if changed {
self.persist_state_best_effort();
}
}
}
pub type SharedSubAgentManager = Arc<Mutex<SubAgentManager>>;
fn default_state_path(workspace: &Path) -> PathBuf {
workspace
.join(".deepseek")
.join("state")
.join(SUBAGENT_STATE_FILE)
}
fn epoch_millis_now() -> u64 {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
Err(_) => 0,
}
}
fn instant_from_duration(duration: Duration) -> Instant {
Instant::now()
.checked_sub(duration)
.unwrap_or_else(Instant::now)
}
fn write_json_atomic<T: Serialize>(path: &Path, value: &T) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let payload = serde_json::to_string_pretty(value)?;
let tmp_path = path.with_extension("tmp");
fs::write(&tmp_path, payload)?;
fs::rename(tmp_path, path)?;
Ok(())
}
#[must_use]
pub fn new_shared_subagent_manager(workspace: PathBuf, max_agents: usize) -> SharedSubAgentManager {
let max_agents = max_agents.clamp(1, MAX_SUBAGENTS);
let state_path = default_state_path(&workspace);
let mut manager = SubAgentManager::new(workspace, max_agents).with_state_path(state_path);
if let Err(err) = manager.load_state() {
eprintln!("Failed to load sub-agent state: {err}");
}
Arc::new(Mutex::new(manager))
}
pub struct AgentSpawnTool {
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
name: &'static str,
}
impl AgentSpawnTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, runtime: SubAgentRuntime) -> Self {
Self::with_name(manager, runtime, "agent_spawn")
}
#[must_use]
pub fn with_name(
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
name: &'static str,
) -> Self {
Self {
manager,
runtime,
name,
}
}
}
#[async_trait]
impl ToolSpec for AgentSpawnTool {
fn name(&self) -> &'static str {
self.name
}
fn description(&self) -> &'static str {
"Spawn a background sub-agent for a focused task. Returns an agent_id immediately; \
follow with agent_result to retrieve the final result. Max 5 in flight (each is a \
full sub-agent loop; cancel or wait if you hit the cap). For parallel one-shot LLM \
queries, just emit multiple tool calls in one turn — the dispatcher runs them in parallel."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Task description for the sub-agent"
},
"message": {
"type": "string",
"description": "Alias for prompt"
},
"objective": {
"type": "string",
"description": "Alias for prompt"
},
"items": {
"type": "array",
"description": "Structured input items (text, mention, skill, local_image, image)",
"items": {
"type": "object"
}
},
"type": {
"type": "string",
"description": "Sub-agent type: general, explore, plan, review, custom"
},
"agent_type": {
"type": "string",
"description": "Alias for type"
},
"agent_name": {
"type": "string",
"description": "Alias for type"
},
"role": {
"type": "string",
"description": "Role alias: worker, explorer, awaiter, default"
},
"agent_role": {
"type": "string",
"description": "Alias for role"
},
"allowed_tools": {
"type": "array",
"items": { "type": "string" },
"description": "Explicit tool allowlist (required for custom type). Default behavior is full registry inheritance from the parent."
},
"cwd": {
"type": "string",
"description": "Optional working directory for the child. Must be inside the parent's workspace (use a relative path or an absolute path under the workspace root). Used for the parallel-worktree pattern: parent runs `git worktree add .worktrees/feature-x ...` then spawns the child with `cwd: \".worktrees/feature-x\"`."
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let spawn_request = parse_spawn_request(&input)?;
if self.runtime.would_exceed_depth() {
return Err(ToolError::execution_failed(format!(
"Sub-agent depth limit reached (current depth {}, max {}). \
Increase via [runtime] max_spawn_depth in config.toml.",
self.runtime.spawn_depth, self.runtime.max_spawn_depth
)));
}
let validated_cwd = if let Some(requested_cwd) = spawn_request.cwd.as_ref() {
let parent_workspace = &self.runtime.context.workspace;
let resolved = if requested_cwd.is_absolute() {
requested_cwd.clone()
} else {
parent_workspace.join(requested_cwd)
};
let canonical = resolved.canonicalize().map_err(|e| {
ToolError::invalid_input(format!(
"Invalid cwd '{}': {e} (path may not exist yet — create the worktree first)",
requested_cwd.display()
))
})?;
let workspace_canonical = parent_workspace
.canonicalize()
.unwrap_or_else(|_| parent_workspace.clone());
if !canonical.starts_with(&workspace_canonical) {
return Err(ToolError::invalid_input(format!(
"cwd must be inside the parent workspace: {} is not under {}",
canonical.display(),
workspace_canonical.display()
)));
}
Some(canonical)
} else {
None
};
let mut child_runtime = self.runtime.child_runtime();
if let Some(cwd) = validated_cwd {
child_runtime.context.workspace = cwd;
}
let mut manager = self.manager.lock().await;
let result = manager
.spawn_background_with_assignment(
Arc::clone(&self.manager),
child_runtime,
spawn_request.agent_type,
spawn_request.prompt,
spawn_request.assignment,
spawn_request.allowed_tools,
)
.map_err(|e| ToolError::execution_failed(format!("Failed to spawn sub-agent: {e}")))?;
let mut tool_result = if self.name == "spawn_agent" {
let payload = json!({
"agent_id": result.agent_id.clone(),
"nickname": Value::Null
});
ToolResult::json(&payload).map_err(|e| ToolError::execution_failed(e.to_string()))?
} else {
ToolResult::json(&result).map_err(|e| ToolError::execution_failed(e.to_string()))?
};
if result.status == SubAgentStatus::Running {
if self.name == "spawn_agent" {
tool_result.metadata = Some(json!({
"status": "Running",
"snapshot": result
}));
} else {
tool_result.metadata = Some(json!({ "status": "Running" }));
}
}
if self.name == "spawn_agent" {
tool_result = wrap_with_deprecation_notice(tool_result, "spawn_agent", "agent_spawn");
}
Ok(tool_result)
}
}
pub struct AgentResultTool {
manager: SharedSubAgentManager,
}
impl AgentResultTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager) -> Self {
Self { manager }
}
}
#[async_trait]
impl ToolSpec for AgentResultTool {
fn name(&self) -> &'static str {
"agent_result"
}
fn description(&self) -> &'static str {
"Get the latest status or final result for a sub-agent. Set `block: true` to wait until the \
agent reaches a terminal state (respects `timeout_ms`)."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": "ID returned by agent_spawn"
},
"id": {
"type": "string",
"description": "Alias for agent_id"
},
"block": {
"type": "boolean",
"description": "Wait for completion (default: false)"
},
"timeout_ms": {
"type": "integer",
"description": "Max wait time in milliseconds (default: 30000, clamped to 1000-3600000)"
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::ReadOnly]
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let agent_id = input
.get("agent_id")
.or_else(|| input.get("id"))
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::missing_field("agent_id"))?;
let block = optional_bool(&input, "block", false);
let timeout_ms = optional_u64(&input, "timeout_ms", DEFAULT_RESULT_TIMEOUT_MS)
.clamp(1000, MAX_RESULT_TIMEOUT_MS);
let (result, timed_out) = if block {
wait_for_result(&self.manager, agent_id, Duration::from_millis(timeout_ms)).await?
} else {
let manager = self.manager.lock().await;
(
manager
.get_result(agent_id)
.map_err(|e| ToolError::execution_failed(e.to_string()))?,
false,
)
};
let mut tool_result =
ToolResult::json(&result).map_err(|e| ToolError::execution_failed(e.to_string()))?;
if timed_out {
tool_result.metadata = Some(json!({
"status": "TimedOut",
"timed_out": true,
"timeout_ms": timeout_ms
}));
} else if result.status == SubAgentStatus::Running {
tool_result.metadata = Some(json!({ "status": "Running" }));
}
Ok(tool_result)
}
}
pub struct AgentCancelTool {
manager: SharedSubAgentManager,
}
impl AgentCancelTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager) -> Self {
Self { manager }
}
}
#[async_trait]
impl ToolSpec for AgentCancelTool {
fn name(&self) -> &'static str {
"agent_cancel"
}
fn description(&self) -> &'static str {
"Cancel a running sub-agent. Returns the final snapshot with the cancelled status."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": "ID returned by agent_spawn"
}
},
"required": ["agent_id"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let agent_id = required_str(&input, "agent_id")?;
let mut manager = self.manager.lock().await;
let result = manager
.cancel(agent_id)
.map_err(|e| ToolError::execution_failed(format!("Failed to cancel sub-agent: {e}")))?;
ToolResult::json(&result).map_err(|e| ToolError::execution_failed(e.to_string()))
}
}
pub struct AgentListTool {
manager: SharedSubAgentManager,
}
pub struct AgentCloseTool {
manager: SharedSubAgentManager,
}
impl AgentCloseTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager) -> Self {
Self { manager }
}
}
#[async_trait]
impl ToolSpec for AgentCloseTool {
fn name(&self) -> &'static str {
"close_agent"
}
fn description(&self) -> &'static str {
"Close a running sub-agent. Alias for agent_cancel."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Agent id returned by agent_spawn"
},
"agent_id": {
"type": "string",
"description": "Alias for id"
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let agent_id = input
.get("id")
.or_else(|| input.get("agent_id"))
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::missing_field("id"))?;
let mut manager = self.manager.lock().await;
let result = manager
.cancel(agent_id)
.map_err(|e| ToolError::execution_failed(format!("Failed to close sub-agent: {e}")))?;
let tool_result =
ToolResult::json(&result).map_err(|e| ToolError::execution_failed(e.to_string()))?;
Ok(wrap_with_deprecation_notice(
tool_result,
"close_agent",
"agent_cancel",
))
}
}
pub struct AgentResumeTool {
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
}
impl AgentResumeTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, runtime: SubAgentRuntime) -> Self {
Self { manager, runtime }
}
}
#[async_trait]
impl ToolSpec for AgentResumeTool {
fn name(&self) -> &'static str {
"resume_agent"
}
fn description(&self) -> &'static str {
"Resume a previously closed or completed sub-agent by restarting its assignment."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Agent id to resume"
},
"agent_id": {
"type": "string",
"description": "Alias for id"
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let agent_id = input
.get("id")
.or_else(|| input.get("agent_id"))
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::missing_field("id"))?;
let mut manager = self.manager.lock().await;
let result = manager
.resume(Arc::clone(&self.manager), self.runtime.clone(), agent_id)
.map_err(|e| ToolError::execution_failed(format!("Failed to resume sub-agent: {e}")))?;
ToolResult::json(&result).map_err(|e| ToolError::execution_failed(e.to_string()))
}
}
impl AgentListTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager) -> Self {
Self { manager }
}
}
#[async_trait]
impl ToolSpec for AgentListTool {
fn name(&self) -> &'static str {
"agent_list"
}
fn description(&self) -> &'static str {
"List all active and recently completed sub-agents with their status, type, assignment, \
steps taken, and duration."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::ReadOnly]
}
async fn execute(
&self,
_input: Value,
_context: &ToolContext,
) -> Result<ToolResult, ToolError> {
let mut manager = self.manager.lock().await;
manager.cleanup(COMPLETED_AGENT_RETENTION);
let results = manager.list();
ToolResult::json(&results).map_err(|e| ToolError::execution_failed(e.to_string()))
}
}
pub struct AgentSendInputTool {
manager: SharedSubAgentManager,
name: &'static str,
}
impl AgentSendInputTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, name: &'static str) -> Self {
Self { manager, name }
}
}
#[async_trait]
impl ToolSpec for AgentSendInputTool {
fn name(&self) -> &'static str {
self.name
}
fn description(&self) -> &'static str {
"Send input to a running sub-agent. Returns the agent's current snapshot after delivery."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": "ID returned by agent_spawn"
},
"id": {
"type": "string",
"description": "Alias for agent_id"
},
"message": {
"type": "string",
"description": "Message to deliver to the agent"
},
"input": {
"type": "string",
"description": "Alias for message"
},
"items": {
"type": "array",
"description": "Structured input items (text, mention, skill, local_image, image)",
"items": {
"type": "object"
}
},
"interrupt": {
"type": "boolean",
"description": "Prioritize this message over pending inputs"
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![]
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let agent_id = input
.get("agent_id")
.or_else(|| input.get("id"))
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::missing_field("agent_id"))?;
let message = parse_text_or_items(&input, &["message", "input"], "items", "message")?;
let interrupt = optional_bool(&input, "interrupt", false);
let mut manager = self.manager.lock().await;
manager
.send_input(agent_id, message, interrupt)
.map_err(|e| ToolError::execution_failed(e.to_string()))?;
let snapshot = manager
.get_result(agent_id)
.map_err(|e| ToolError::execution_failed(e.to_string()))?;
let tool_result =
ToolResult::json(&snapshot).map_err(|e| ToolError::execution_failed(e.to_string()))?;
if self.name == "send_input" {
Ok(wrap_with_deprecation_notice(
tool_result,
"send_input",
"agent_send_input",
))
} else {
Ok(tool_result)
}
}
}
pub struct AgentAssignTool {
manager: SharedSubAgentManager,
name: &'static str,
}
impl AgentAssignTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, name: &'static str) -> Self {
Self { manager, name }
}
}
#[async_trait]
impl ToolSpec for AgentAssignTool {
fn name(&self) -> &'static str {
self.name
}
fn description(&self) -> &'static str {
"Update a sub-agent's assignment (objective, role) and optionally deliver an immediate \
coordinator note. The update is delivered as a high-priority message when `interrupt` is \
true (the default). Returns the agent's current snapshot."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": "Agent id returned by agent_spawn"
},
"id": {
"type": "string",
"description": "Alias for agent_id"
},
"objective": {
"type": "string",
"description": "Updated assignment objective"
},
"role": {
"type": "string",
"description": "Updated role alias: worker, explorer, awaiter, default"
},
"agent_role": {
"type": "string",
"description": "Alias for role"
},
"message": {
"type": "string",
"description": "Optional coordinator note to send to the agent"
},
"input": {
"type": "string",
"description": "Alias for message"
},
"items": {
"type": "array",
"description": "Structured input items (text, mention, skill, local_image, image)",
"items": {
"type": "object"
}
},
"interrupt": {
"type": "boolean",
"description": "Prioritize this assignment update in the agent inbox (default: true)"
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![]
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let request = parse_assign_request(&input)?;
let mut manager = self.manager.lock().await;
let result = manager
.assign(
&request.agent_id,
request.objective,
request.role,
request.message,
request.interrupt,
)
.map_err(|e| ToolError::execution_failed(format!("Failed to assign sub-agent: {e}")))?;
ToolResult::json(&result).map_err(|e| ToolError::execution_failed(e.to_string()))
}
}
pub struct AgentWaitTool {
manager: SharedSubAgentManager,
name: &'static str,
}
impl AgentWaitTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, name: &'static str) -> Self {
Self { manager, name }
}
}
#[async_trait]
impl ToolSpec for AgentWaitTool {
fn name(&self) -> &'static str {
self.name
}
fn description(&self) -> &'static str {
"Wait for one or more sub-agents to reach a terminal status. Use `wait_mode: \"all\"` to block \
until every listed agent finishes, or `wait_mode: \"any\"` (default) to return as soon as \
one finishes. When no ids are given, waits on all currently running sub-agents."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"ids": {
"type": "array",
"items": { "type": "string" },
"description": "Agent IDs to wait on. When omitted, waits on all currently running sub-agents."
},
"agent_ids": {
"type": "array",
"items": { "type": "string" },
"description": "Alias for ids"
},
"agent_id": {
"type": "string",
"description": "Single agent ID"
},
"id": {
"type": "string",
"description": "Alias for agent_id"
},
"wait_mode": {
"type": "string",
"description": "Wait behavior: any (default) or all"
},
"timeout_ms": {
"type": "integer",
"description": "Max wait time in milliseconds (default: 30000, clamped to 10000-3600000)"
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::ReadOnly]
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let timeout_ms = optional_u64(&input, "timeout_ms", DEFAULT_RESULT_TIMEOUT_MS)
.clamp(MIN_WAIT_TIMEOUT_MS, MAX_RESULT_TIMEOUT_MS);
let mut ids = parse_wait_ids(&input);
if ids.is_empty() {
let manager = self.manager.lock().await;
ids = manager
.list()
.into_iter()
.filter(|snapshot| snapshot.status == SubAgentStatus::Running)
.map(|snapshot| snapshot.agent_id)
.collect();
}
let wait_mode = parse_wait_mode(&input)?;
if ids.is_empty() {
let empty: Vec<SubAgentResult> = Vec::new();
let mut result =
ToolResult::json(&empty).map_err(|e| ToolError::execution_failed(e.to_string()))?;
result.metadata = Some(json!({
"wait_mode": wait_mode.as_str(),
"timed_out": false,
"status": "Completed",
"timeout_ms": timeout_ms,
"waited_ids": [],
"completed_ids": [],
"running_ids": [],
"status_by_id": {}
}));
return Ok(result);
}
let waited_ids = ids.clone();
let (snapshots, timed_out) = wait_for_agents(
&self.manager,
&ids,
wait_mode,
Duration::from_millis(timeout_ms),
)
.await?;
let all_done = snapshots
.iter()
.all(|snapshot| snapshot.status != SubAgentStatus::Running);
let completed_ids = snapshots
.iter()
.filter(|snapshot| snapshot.status != SubAgentStatus::Running)
.map(|snapshot| snapshot.agent_id.clone())
.collect::<Vec<_>>();
let running_ids = snapshots
.iter()
.filter(|snapshot| snapshot.status == SubAgentStatus::Running)
.map(|snapshot| snapshot.agent_id.clone())
.collect::<Vec<_>>();
let status_by_id = snapshots
.iter()
.map(|snapshot| {
(
snapshot.agent_id.clone(),
subagent_status_name(&snapshot.status).to_string(),
)
})
.collect::<HashMap<_, _>>();
let mut result =
ToolResult::json(&snapshots).map_err(|e| ToolError::execution_failed(e.to_string()))?;
result.metadata = Some(json!({
"wait_mode": wait_mode.as_str(),
"timed_out": timed_out,
"status": if timed_out { "TimedOut" } else if all_done { "Completed" } else { "Partial" },
"timeout_ms": timeout_ms,
"waited_ids": waited_ids,
"completed_ids": completed_ids,
"running_ids": running_ids,
"status_by_id": status_by_id
}));
Ok(result)
}
}
pub struct DelegateToAgentTool {
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
}
impl DelegateToAgentTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, runtime: SubAgentRuntime) -> Self {
Self { manager, runtime }
}
}
#[async_trait]
impl ToolSpec for DelegateToAgentTool {
fn name(&self) -> &'static str {
"delegate_to_agent"
}
fn description(&self) -> &'static str {
"Delegate a task to a specialized sub-agent. This is an alias for agent_spawn — same schema, \
same behavior. Use `type` (or `agent_name`, `agent_type`) to pick the agent flavor."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"agent_name": {
"type": "string",
"description": "Name/type alias for the agent (general, explore, plan, review, worker, explorer, awaiter)"
},
"type": {
"type": "string",
"description": "Alias for agent_name"
},
"agent_type": {
"type": "string",
"description": "Alias for agent_name"
},
"role": {
"type": "string",
"description": "Role alias: worker, explorer, awaiter, default"
},
"agent_role": {
"type": "string",
"description": "Alias for role"
},
"objective": {
"type": "string",
"description": "The goal or task description for the agent"
},
"prompt": {
"type": "string",
"description": "Alias for objective"
},
"message": {
"type": "string",
"description": "Alias for objective"
},
"items": {
"type": "array",
"description": "Structured input items (text, mention, skill, local_image, image)",
"items": {
"type": "object"
}
},
"allowed_tools": {
"type": "array",
"items": { "type": "string" },
"description": "Explicit tool allowlist (required for custom type)"
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<ToolResult, ToolError> {
let spawn_tool = AgentSpawnTool::new(self.manager.clone(), self.runtime.clone());
let result = spawn_tool.execute(input, context).await?;
Ok(wrap_with_deprecation_notice(
result,
"delegate_to_agent",
"agent_spawn",
))
}
}
pub struct SpawnAgentsOnCsvTool {
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
}
struct AgentJobReportCleanup {
job_id: String,
}
impl AgentJobReportCleanup {
fn new(job_id: String) -> Self {
clear_agent_job_results(&job_id);
Self { job_id }
}
}
impl Drop for AgentJobReportCleanup {
fn drop(&mut self) {
clear_agent_job_results(&self.job_id);
}
}
impl SpawnAgentsOnCsvTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, runtime: SubAgentRuntime) -> Self {
Self { manager, runtime }
}
}
#[async_trait]
impl ToolSpec for SpawnAgentsOnCsvTool {
fn name(&self) -> &'static str {
"spawn_agents_on_csv"
}
fn description(&self) -> &'static str {
"Process a CSV by spawning one worker sub-agent per row. The instruction string is a template where `{column}` placeholders are replaced with row values. Each worker must call `report_agent_job_result` with a JSON object (matching `output_schema` when provided); missing reports are treated as failures. This call blocks until all rows finish and automatically exports results to `output_csv_path` (or a default path)."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"csv_path": {
"type": "string",
"description": "Path to the input CSV file"
},
"instruction": {
"type": "string",
"description": "Instruction template. Use {column_name} placeholders for row values."
},
"id_column": {
"type": "string",
"description": "Optional CSV column name used as stable item id"
},
"max_concurrency": {
"type": "integer",
"description": "Maximum concurrent workers (default: 16)"
},
"max_workers": {
"type": "integer",
"description": "Alias for max_concurrency"
},
"max_runtime_seconds": {
"type": "integer",
"description": "Per-worker timeout in seconds (default: 1800)"
},
"output_csv_path": {
"type": "string",
"description": "Optional output CSV path for worker results"
},
"output_schema": {
"type": "object",
"description": "Optional JSON schema-like object used to validate worker JSON output"
}
},
"required": ["csv_path", "instruction"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<ToolResult, ToolError> {
let csv_path_raw = required_str(&input, "csv_path")?;
let csv_path = context.resolve_path(csv_path_raw)?;
let instruction_template = required_str(&input, "instruction")?;
if instruction_template.trim().is_empty() {
return Err(ToolError::invalid_input(
"instruction cannot be empty".to_string(),
));
}
let id_column = optional_input_str(&input, &["id_column"]).map(str::to_string);
let rows = load_csv_rows(&csv_path, id_column.as_deref())?;
if rows.is_empty() {
return Err(ToolError::invalid_input(format!(
"CSV '{}' has no data rows",
csv_path.display()
)));
}
let output_schema = input.get("output_schema").cloned();
let output_csv_path = resolve_results_csv_path(context, &input, &csv_path)?;
let max_runtime_seconds = optional_u64(
&input,
"max_runtime_seconds",
DEFAULT_CSV_MAX_RUNTIME_SECONDS,
)
.clamp(1, MAX_CSV_MAX_RUNTIME_SECONDS);
let requested_concurrency = parse_csv_concurrency(&input);
let max_agents = {
let manager = self.manager.lock().await;
manager.max_agents().max(1)
};
let max_concurrency = requested_concurrency.clamp(1, max_agents as u64) as usize;
let semaphore = Arc::new(Semaphore::new(max_concurrency));
let timeout = Duration::from_secs(max_runtime_seconds);
let job_id = format!("job_{}", &Uuid::new_v4().to_string()[..8]);
let _cleanup = AgentJobReportCleanup::new(job_id.clone());
let stop_requested = Arc::new(AtomicBool::new(false));
let mut workers = FuturesUnordered::new();
for row in rows {
let permit = semaphore
.clone()
.acquire_owned()
.await
.map_err(|_| ToolError::execution_failed("Worker semaphore closed"))?;
let manager = self.manager.clone();
let runtime = self.runtime.clone();
let template = instruction_template.to_string();
let schema = output_schema.clone();
let job_id = job_id.clone();
let stop_requested = stop_requested.clone();
workers.push(tokio::spawn(async move {
let _permit = permit;
run_csv_row_agent(
manager,
runtime,
&job_id,
row,
&template,
timeout,
schema,
stop_requested,
)
.await
}));
}
let mut outcomes = Vec::new();
while let Some(joined) = workers.next().await {
match joined {
Ok(outcome) => outcomes.push(outcome),
Err(err) => outcomes.push(CsvWorkerOutcome {
row_index: usize::MAX,
item_id: "worker_join".to_string(),
status: "failed".to_string(),
agent_id: None,
duration_ms: 0,
error: Some(format!("Worker task failed to join: {err}")),
result: None,
result_json: None,
}),
}
}
outcomes.sort_by_key(|outcome| outcome.row_index);
write_csv_worker_outcomes(&output_csv_path, &outcomes).map_err(|err| {
ToolError::execution_failed(format!("Failed to write output CSV: {err}"))
})?;
let completed = outcomes
.iter()
.filter(|outcome| outcome.status == "completed")
.count();
let skipped = outcomes
.iter()
.filter(|outcome| outcome.status == "skipped")
.count();
let timed_out = outcomes
.iter()
.filter(|outcome| outcome.status == "timed_out")
.count();
let failed = outcomes
.iter()
.filter(|outcome| outcome.status == "failed")
.count()
+ timed_out;
let summary = SpawnAgentsOnCsvSummary {
job_id,
total: outcomes.len(),
completed,
failed,
timed_out,
skipped,
output_csv_path: output_csv_path.display().to_string(),
results: outcomes,
};
let status = if summary.failed > 0 {
if summary.completed == 0 && summary.skipped == 0 {
"Failed"
} else {
"Partial"
}
} else if stop_requested.load(Ordering::Relaxed) || summary.skipped > 0 {
"Cancelled"
} else {
"Completed"
};
let mut result =
ToolResult::json(&summary).map_err(|e| ToolError::execution_failed(e.to_string()))?;
result.metadata = Some(json!({
"status": status,
"job_id": summary.job_id,
"completed": summary.completed,
"failed": summary.failed,
"timed_out": summary.timed_out,
"skipped": summary.skipped,
"stop_requested": stop_requested.load(Ordering::Relaxed),
"output_csv_path": summary.output_csv_path,
}));
Ok(result)
}
}
pub struct ReportAgentJobResultTool;
#[async_trait]
impl ToolSpec for ReportAgentJobResultTool {
fn name(&self) -> &'static str {
"report_agent_job_result"
}
fn description(&self) -> &'static str {
"Worker-only tool to report a structured result for a spawn_agents_on_csv row."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "Identifier of the CSV job"
},
"item_id": {
"type": "string",
"description": "Identifier of the CSV row item"
},
"result": {
"type": "object",
"description": "Structured JSON result to record for the row"
},
"stop": {
"type": "boolean",
"description": "Optional. When true, cancels remaining unstarted CSV rows for this job."
}
},
"required": ["job_id", "item_id", "result"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![]
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let job_id = required_str(&input, "job_id")?.trim();
let item_id = required_str(&input, "item_id")?.trim();
if job_id.is_empty() {
return Err(ToolError::invalid_input("job_id cannot be empty"));
}
if item_id.is_empty() {
return Err(ToolError::invalid_input("item_id cannot be empty"));
}
let result = input
.get("result")
.cloned()
.ok_or_else(|| ToolError::missing_field("result"))?;
if !result.is_object() {
return Err(ToolError::invalid_input("result must be a JSON object"));
}
let reporting_agent_id = input
.get("__reporting_agent_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
let stop = optional_bool(&input, "stop", false);
let accepted =
record_agent_job_result(job_id, item_id, result.clone(), stop, reporting_agent_id);
let payload = json!({
"job_id": job_id,
"item_id": item_id,
"accepted": accepted,
"stop": stop,
"result": result
});
ToolResult::json(&payload).map_err(|e| ToolError::execution_failed(e.to_string()))
}
}
fn build_subagent_system_prompt(
agent_type: &SubAgentType,
assignment: &SubAgentAssignment,
) -> String {
let base = agent_type.system_prompt();
match assignment.role.as_deref() {
Some(role) if !role.trim().is_empty() => {
format!(
"{base}\n\nYou are operating in the role of `{}`.",
role.trim()
)
}
_ => base,
}
}
struct SubAgentTask {
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
started_at: Instant,
max_steps: u32,
input_rx: mpsc::UnboundedReceiver<SubAgentInput>,
}
#[allow(clippy::too_many_lines)]
async fn run_subagent_task(task: SubAgentTask) {
let result = run_subagent(
&task.runtime,
task.agent_id.clone(),
task.agent_type,
task.prompt,
task.assignment,
task.allowed_tools,
task.started_at,
task.max_steps,
task.input_rx,
)
.await;
let mut manager = task.manager_handle.lock().await;
match &result {
Ok(res) => manager.update_from_result(&task.agent_id, res.clone()),
Err(err) => manager.update_failed(&task.agent_id, err.to_string()),
}
let (summary, sentinel) = match &result {
Ok(res) => (
summarize_subagent_result(res),
subagent_done_sentinel(&task.agent_id, res),
),
Err(err) => (
format!("Failed: {err}"),
subagent_failed_sentinel(&task.agent_id, &err.to_string()),
),
};
if let Some(mb) = task.runtime.mailbox.as_ref() {
let envelope = match &result {
Ok(_) => MailboxMessage::Completed {
agent_id: task.agent_id.clone(),
summary: summary.clone(),
},
Err(err) => MailboxMessage::Failed {
agent_id: task.agent_id.clone(),
error: err.to_string(),
},
};
let _ = mb.send(envelope);
}
if let Some(event_tx) = task.runtime.event_tx {
let payload = format!("{summary}\n{sentinel}");
let _ = event_tx.try_send(Event::AgentComplete {
id: task.agent_id,
result: payload,
});
}
}
fn subagent_done_sentinel(agent_id: &str, res: &SubAgentResult) -> String {
let payload = json!({
"agent_id": agent_id,
"agent_type": res.agent_type.as_str(),
"status": subagent_status_name(&res.status),
"duration_ms": res.duration_ms,
"steps": res.steps_taken,
"summary": summarize_subagent_result(res),
});
format!("<deepseek:subagent.done>{payload}</deepseek:subagent.done>")
}
fn subagent_failed_sentinel(agent_id: &str, err: &str) -> String {
let payload = json!({
"agent_id": agent_id,
"status": "failed",
"error": err,
});
format!("<deepseek:subagent.done>{payload}</deepseek:subagent.done>")
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn run_subagent(
runtime: &SubAgentRuntime,
agent_id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
started_at: Instant,
max_steps: u32,
mut input_rx: mpsc::UnboundedReceiver<SubAgentInput>,
) -> Result<SubAgentResult> {
let system_prompt = build_subagent_system_prompt(&agent_type, &assignment);
let tool_registry = SubAgentToolRegistry::new(
runtime.clone(),
allowed_tools.clone(),
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let unavailable_tools = tool_registry.unavailable_allowed_tools();
if !unavailable_tools.is_empty() {
return Err(anyhow!(
"Sub-agent requested unavailable tools: {}",
unavailable_tools.join(", ")
));
}
let tools = tool_registry.tools_for_model();
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::started(&agent_id, agent_type.clone()));
}
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("started ({})", agent_type.as_str()),
);
let mut messages = vec![Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: build_assignment_prompt(&prompt, &assignment, &agent_type),
cache_control: None,
}],
}];
let mut steps = 0;
let mut final_result: Option<String> = None;
let mut pending_inputs: VecDeque<SubAgentInput> = VecDeque::new();
for _step in 0..max_steps {
if runtime.cancel_token.is_cancelled() {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: cancelled"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
return Ok(SubAgentResult {
agent_id: agent_id.clone(),
agent_type: agent_type.clone(),
assignment: assignment.clone(),
status: SubAgentStatus::Cancelled,
result: None,
steps_taken: steps,
duration_ms: u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX),
});
}
steps += 1;
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: requesting model response"),
);
while let Ok(input) = input_rx.try_recv() {
if input.interrupt {
pending_inputs.clear();
}
pending_inputs.push_back(input);
}
while let Some(input) = pending_inputs.pop_front() {
if !input.text.trim().is_empty() {
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: input.text,
cache_control: None,
}],
});
}
}
let request = MessageRequest {
model: runtime.model.clone(),
messages: messages.clone(),
max_tokens: 4096,
system: Some(SystemPrompt::Text(system_prompt.clone())),
tools: Some(tools.clone()),
tool_choice: Some(json!({ "type": "auto" })),
metadata: None,
thinking: None,
reasoning_effort: None,
stream: Some(false),
temperature: None,
top_p: None,
};
let response = tokio::select! {
biased;
() = runtime.cancel_token.cancelled() => {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: cancelled mid-request"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
return Ok(SubAgentResult {
agent_id: agent_id.clone(),
agent_type: agent_type.clone(),
assignment: assignment.clone(),
status: SubAgentStatus::Cancelled,
result: None,
steps_taken: steps,
duration_ms: u64::try_from(started_at.elapsed().as_millis())
.unwrap_or(u64::MAX),
});
}
api = tokio::time::timeout(STEP_API_TIMEOUT, runtime.client.create_message(request)) => {
api.map_err(|_| anyhow!("API call timed out after {}s", STEP_API_TIMEOUT.as_secs()))??
}
};
let mut tool_uses = Vec::new();
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::token_usage(
&agent_id,
response.model.clone(),
response.usage.clone(),
));
}
for block in &response.content {
match block {
ContentBlock::Text { text, .. } if !text.trim().is_empty() => {
final_result = Some(text.clone());
}
ContentBlock::ToolUse {
id, name, input, ..
} => {
tool_uses.push((id.clone(), name.clone(), input.clone()));
}
_ => {}
}
}
messages.push(Message {
role: "assistant".to_string(),
content: response.content.clone(),
});
if tool_uses.is_empty() {
while let Ok(input) = input_rx.try_recv() {
if input.interrupt {
pending_inputs.clear();
}
pending_inputs.push_back(input);
}
if pending_inputs.is_empty() {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: complete"),
);
break;
}
continue;
}
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!(
"step {steps}/{max_steps}: executing {} tool call(s)",
tool_uses.len()
),
);
let mut tool_results: Vec<ContentBlock> = Vec::new();
for (tool_id, tool_name, tool_input) in tool_uses {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: running tool '{tool_name}'"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallStarted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
});
}
let result = match tokio::time::timeout(TOOL_TIMEOUT, async {
tool_registry
.execute(&agent_id, &tool_name, tool_input)
.await
})
.await
{
Ok(Ok(output)) => output,
Ok(Err(e)) => format!("Error: {e}"),
Err(_) => format!("Error: Tool {tool_name} timed out"),
};
let tool_ok = !result.starts_with("Error:");
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: finished tool '{tool_name}'"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallCompleted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
ok: tool_ok,
});
}
tool_results.push(ContentBlock::ToolResult {
tool_use_id: tool_id,
content: result,
is_error: None,
content_blocks: None,
});
}
if !tool_results.is_empty() {
messages.push(Message {
role: "user".to_string(),
content: tool_results,
});
}
}
Ok(SubAgentResult {
agent_id,
agent_type,
assignment,
status: SubAgentStatus::Completed,
result: final_result,
steps_taken: steps,
duration_ms: u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX),
})
}
async fn wait_for_result(
manager: &SharedSubAgentManager,
agent_id: &str,
timeout: Duration,
) -> Result<(SubAgentResult, bool), ToolError> {
let deadline = Instant::now() + timeout;
loop {
let snapshot = {
let manager = manager.lock().await;
manager
.get_result(agent_id)
.map_err(|e| ToolError::execution_failed(e.to_string()))?
};
if snapshot.status != SubAgentStatus::Running {
return Ok((snapshot, false));
}
if Instant::now() >= deadline {
return Ok((snapshot, true));
}
tokio::time::sleep(RESULT_POLL_INTERVAL).await;
}
}
async fn wait_for_agents(
manager: &SharedSubAgentManager,
ids: &[String],
wait_mode: WaitMode,
timeout: Duration,
) -> Result<(Vec<SubAgentResult>, bool), ToolError> {
let deadline = Instant::now() + timeout;
loop {
let snapshots = {
let manager = manager.lock().await;
ids.iter()
.map(|id| {
manager
.get_result(id)
.map_err(|e| ToolError::execution_failed(e.to_string()))
})
.collect::<Result<Vec<_>, _>>()?
};
if wait_mode.condition_met(&snapshots) {
return Ok((snapshots, false));
}
if Instant::now() >= deadline {
return Ok((snapshots, true));
}
tokio::time::sleep(RESULT_POLL_INTERVAL).await;
}
}
fn parse_wait_mode(input: &Value) -> Result<WaitMode, ToolError> {
let raw_mode = input
.get("wait_mode")
.and_then(|v| v.as_str())
.unwrap_or("any");
WaitMode::from_str(raw_mode).ok_or_else(|| {
ToolError::invalid_input(format!("Invalid wait_mode '{raw_mode}'. Use: any or all"))
})
}
fn parse_wait_ids(input: &Value) -> Vec<String> {
let mut ids = Vec::new();
for key in ["ids", "agent_ids"] {
if let Some(list) = input.get(key).and_then(|v| v.as_array()) {
for value in list {
if let Some(id) = value.as_str() {
let id = id.trim();
if !id.is_empty() && !ids.iter().any(|existing| existing == id) {
ids.push(id.to_string());
}
}
}
}
}
for key in ["agent_id", "id"] {
if let Some(id) = input.get(key).and_then(|v| v.as_str()) {
let id = id.trim();
if !id.is_empty() && !ids.iter().any(|existing| existing == id) {
ids.push(id.to_string());
}
}
}
ids
}
fn optional_input_str<'a>(input: &'a Value, keys: &[&str]) -> Option<&'a str> {
keys.iter()
.filter_map(|key| input.get(*key).and_then(Value::as_str))
.map(str::trim)
.find(|value| !value.is_empty())
}
fn parse_text_or_items(
input: &Value,
text_keys: &[&str],
items_key: &str,
required_field: &str,
) -> Result<String, ToolError> {
let text = optional_input_str(input, text_keys).map(str::to_string);
let items = parse_items_text(input, items_key)?;
match (text, items) {
(Some(_), Some(_)) => Err(ToolError::invalid_input(format!(
"Provide either {required_field} text or {items_key}, but not both"
))),
(Some(text), None) => Ok(text),
(None, Some(items)) => Ok(items),
(None, None) => Err(ToolError::missing_field(required_field)),
}
}
fn parse_optional_text_or_items(
input: &Value,
text_keys: &[&str],
items_key: &str,
) -> Result<Option<String>, ToolError> {
let text = optional_input_str(input, text_keys).map(str::to_string);
let items = parse_items_text(input, items_key)?;
match (text, items) {
(Some(_), Some(_)) => Err(ToolError::invalid_input(format!(
"Provide either {} text or {}, but not both",
text_keys[0], items_key
))),
(Some(text), None) => Ok(Some(text)),
(None, Some(items)) => Ok(Some(items)),
(None, None) => Ok(None),
}
}
fn parse_items_text(input: &Value, key: &str) -> Result<Option<String>, ToolError> {
let Some(items) = input.get(key) else {
return Ok(None);
};
let array = items
.as_array()
.ok_or_else(|| ToolError::invalid_input(format!("'{key}' must be an array")))?;
if array.is_empty() {
return Err(ToolError::invalid_input(format!("'{key}' cannot be empty")));
}
let mut lines = Vec::new();
for item in array {
let object = item
.as_object()
.ok_or_else(|| ToolError::invalid_input("each item must be an object"))?;
let item_type = object
.get("type")
.and_then(Value::as_str)
.unwrap_or("text")
.trim();
let rendered = match item_type {
"text" => object
.get("text")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.map(str::to_string)
.ok_or_else(|| ToolError::invalid_input("text item requires non-empty text"))?,
"mention" => {
let name = object
.get("name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("mention item requires name"))?;
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("mention item requires path"))?;
format!("[mention:${name}]({path})")
}
"skill" => {
let name = object
.get("name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("skill item requires name"))?;
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("skill item requires path"))?;
format!("[skill:${name}]({path})")
}
"local_image" => {
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("local_image item requires path"))?;
format!("[local_image:{path}]")
}
"image" => {
let url = object
.get("image_url")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("image item requires image_url"))?;
format!("[image:{url}]")
}
_ => object
.get("text")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.map(str::to_string)
.unwrap_or_else(|| "[input]".to_string()),
};
lines.push(rendered);
}
Ok(Some(lines.join("\n")))
}
fn parse_spawn_request(input: &Value) -> Result<SpawnRequest, ToolError> {
let prompt = parse_text_or_items(
input,
&["prompt", "message", "objective"],
"items",
"prompt",
)?;
let type_input = optional_input_str(input, &["type", "agent_type", "agent_name"]);
let role_input = optional_input_str(input, &["role", "agent_role"]);
let parsed_type = type_input
.map(|kind| {
SubAgentType::from_str(kind).ok_or_else(|| {
ToolError::invalid_input(format!(
"Invalid sub-agent type '{kind}'. Use: {VALID_SUBAGENT_TYPES}"
))
})
})
.transpose()?;
let parsed_role_type = role_input
.map(|role| {
SubAgentType::from_str(role).ok_or_else(|| {
ToolError::invalid_input(format!(
"Invalid role alias '{role}'. Use: worker, explorer, awaiter, default"
))
})
})
.transpose()?;
if let (Some(type_kind), Some(role_kind)) = (&parsed_type, &parsed_role_type)
&& type_kind != role_kind
{
return Err(ToolError::invalid_input(
"Conflicting type/agent_type and role/agent_role values".to_string(),
));
}
let agent_type = parsed_type
.or(parsed_role_type)
.unwrap_or(SubAgentType::General);
if let Some(role) = role_input
&& normalize_role_alias(role).is_none()
{
return Err(ToolError::invalid_input(format!(
"Invalid role alias '{role}'. Use: worker, explorer, awaiter, default"
)));
}
let role = role_input
.and_then(normalize_role_alias)
.or_else(|| type_input.and_then(normalize_role_alias))
.map(str::to_string);
let allowed_tools = input
.get("allowed_tools")
.and_then(|v| v.as_array())
.map(|items| {
let mut tools = Vec::new();
for item in items {
if let Some(tool) = item.as_str() {
let trimmed = tool.trim();
if !trimmed.is_empty() && !tools.iter().any(|existing| existing == trimmed) {
tools.push(trimmed.to_string());
}
}
}
tools
});
let cwd = parse_optional_cwd(input)?;
Ok(SpawnRequest {
prompt: prompt.clone(),
agent_type,
assignment: SubAgentAssignment::new(prompt, role),
allowed_tools,
cwd,
})
}
fn parse_optional_cwd(input: &Value) -> Result<Option<PathBuf>, ToolError> {
let raw = input.get("cwd").and_then(|v| v.as_str()).map(str::trim);
match raw {
None | Some("") => Ok(None),
Some(s) => Ok(Some(PathBuf::from(s))),
}
}
fn parse_assign_request(input: &Value) -> Result<AssignRequest, ToolError> {
let agent_id = input
.get("agent_id")
.or_else(|| input.get("id"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|id| !id.is_empty())
.ok_or_else(|| ToolError::missing_field("agent_id"))?
.to_string();
let objective = optional_input_str(input, &["objective"]).map(str::to_string);
let role = optional_input_str(input, &["role", "agent_role"])
.map(|role| {
normalize_role_alias(role).ok_or_else(|| {
ToolError::invalid_input(format!(
"Invalid role alias '{role}'. Use: worker, explorer, awaiter, default"
))
})
})
.transpose()?
.map(str::to_string);
let message = parse_optional_text_or_items(input, &["message", "input"], "items")?;
let interrupt = optional_bool(input, "interrupt", true);
if objective.is_none() && role.is_none() && message.is_none() {
return Err(ToolError::invalid_input(
"Provide at least one of objective, role/agent_role, message/input, or items"
.to_string(),
));
}
Ok(AssignRequest {
agent_id,
objective,
role,
message,
interrupt,
})
}
fn parse_csv_concurrency(input: &Value) -> u64 {
if input.get("max_concurrency").is_some() {
return optional_u64(input, "max_concurrency", DEFAULT_CSV_MAX_CONCURRENCY).max(1);
}
if input.get("max_workers").is_some() {
return optional_u64(input, "max_workers", DEFAULT_CSV_MAX_CONCURRENCY).max(1);
}
DEFAULT_CSV_MAX_CONCURRENCY
}
fn agent_job_reports_store() -> &'static StdMutex<HashMap<String, HashMap<String, AgentJobReport>>>
{
AGENT_JOB_REPORTS.get_or_init(|| StdMutex::new(HashMap::new()))
}
fn agent_job_assignments_store() -> &'static StdMutex<HashMap<String, HashMap<String, String>>> {
AGENT_JOB_ASSIGNMENTS.get_or_init(|| StdMutex::new(HashMap::new()))
}
fn record_agent_job_assignment(job_id: &str, item_id: &str, agent_id: &str) {
let mut store = agent_job_assignments_store()
.lock()
.expect("agent job assignments lock poisoned");
let job = store.entry(job_id.to_string()).or_default();
job.insert(item_id.to_string(), agent_id.to_string());
}
fn remove_agent_job_assignment(job_id: &str, item_id: &str) {
let mut store = agent_job_assignments_store()
.lock()
.expect("agent job assignments lock poisoned");
if let Some(job) = store.get_mut(job_id) {
job.remove(item_id);
if job.is_empty() {
store.remove(job_id);
}
}
}
fn clear_agent_job_assignments(job_id: &str) {
let mut store = agent_job_assignments_store()
.lock()
.expect("agent job assignments lock poisoned");
store.remove(job_id);
}
fn report_matches_assignment(
job_id: &str,
item_id: &str,
reporting_agent_id: Option<&str>,
) -> bool {
let Some(reporting_agent_id) = reporting_agent_id else {
return false;
};
let store = agent_job_assignments_store()
.lock()
.expect("agent job assignments lock poisoned");
store
.get(job_id)
.and_then(|job| job.get(item_id))
.is_some_and(|expected| expected == reporting_agent_id)
}
fn record_agent_job_result(
job_id: &str,
item_id: &str,
result: Value,
stop: bool,
reporting_agent_id: Option<&str>,
) -> bool {
if !report_matches_assignment(job_id, item_id, reporting_agent_id) {
return false;
}
let mut store = agent_job_reports_store()
.lock()
.expect("agent job reports lock poisoned");
let job = store.entry(job_id.to_string()).or_default();
if job.contains_key(item_id) {
return false;
}
job.insert(item_id.to_string(), AgentJobReport { result, stop });
true
}
fn take_agent_job_result(job_id: &str, item_id: &str) -> Option<AgentJobReport> {
let mut store = agent_job_reports_store()
.lock()
.expect("agent job reports lock poisoned");
let result = store.get_mut(job_id).and_then(|job| job.remove(item_id));
if store
.get(job_id)
.is_some_and(|job_results| job_results.is_empty())
{
store.remove(job_id);
}
remove_agent_job_assignment(job_id, item_id);
result
}
fn clear_agent_job_results(job_id: &str) {
let mut store = agent_job_reports_store()
.lock()
.expect("agent job reports lock poisoned");
store.remove(job_id);
clear_agent_job_assignments(job_id);
}
fn resolve_results_csv_path(
context: &ToolContext,
input: &Value,
csv_path: &Path,
) -> Result<PathBuf, ToolError> {
if let Some(path) = optional_input_str(input, &["output_csv_path"]) {
context.resolve_path(path)
} else {
Ok(default_results_csv_path(csv_path))
}
}
fn default_results_csv_path(csv_path: &Path) -> PathBuf {
let stem = csv_path
.file_stem()
.and_then(|stem| stem.to_str())
.filter(|stem| !stem.is_empty())
.unwrap_or("results");
csv_path.with_file_name(format!("{stem}.results.csv"))
}
fn load_csv_rows(csv_path: &Path, id_column: Option<&str>) -> Result<Vec<CsvRowTask>, ToolError> {
let mut reader = csv::ReaderBuilder::new()
.from_path(csv_path)
.map_err(|err| {
ToolError::execution_failed(format!(
"Failed to read CSV '{}': {err}",
csv_path.display()
))
})?;
let headers = reader
.headers()
.map_err(|err| {
ToolError::execution_failed(format!(
"Failed to read CSV headers '{}': {err}",
csv_path.display()
))
})?
.clone();
if headers.is_empty() {
return Err(ToolError::invalid_input(format!(
"CSV '{}' has no headers",
csv_path.display()
)));
}
let mut seen_headers = HashSet::new();
for header in &headers {
if !seen_headers.insert(header.to_string()) {
return Err(ToolError::invalid_input(format!(
"CSV '{}' has duplicate header '{}'",
csv_path.display(),
header
)));
}
}
let id_index = if let Some(column_name) = id_column {
let trimmed = column_name.trim();
if trimmed.is_empty() {
None
} else {
let index = headers
.iter()
.position(|header| header == trimmed)
.ok_or_else(|| {
ToolError::invalid_input(format!(
"CSV '{}' is missing id_column '{trimmed}'",
csv_path.display()
))
})?;
Some(index)
}
} else {
None
};
let mut rows = Vec::new();
let mut seen_item_ids = HashSet::new();
for (row_index, row) in reader.records().enumerate() {
let record = row.map_err(|err| {
ToolError::execution_failed(format!(
"Failed to parse CSV row {} in '{}': {err}",
row_index + 1,
csv_path.display()
))
})?;
let mut values = HashMap::new();
for (idx, header) in headers.iter().enumerate() {
values.insert(
header.to_string(),
record.get(idx).unwrap_or_default().to_string(),
);
}
let base_item_id = id_index
.and_then(|idx| record.get(idx))
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.unwrap_or_else(|| format!("row-{}", row_index + 1));
let mut item_id = base_item_id.clone();
let mut suffix = 2usize;
while !seen_item_ids.insert(item_id.clone()) {
item_id = format!("{base_item_id}-{suffix}");
suffix = suffix.saturating_add(1);
}
rows.push(CsvRowTask {
row_index,
item_id,
values,
});
}
Ok(rows)
}
fn render_instruction_template(template: &str, values: &HashMap<String, String>) -> String {
const OPEN_BRACE_SENTINEL: &str = "__DEEPSEEK_OPEN_BRACE__";
const CLOSE_BRACE_SENTINEL: &str = "__DEEPSEEK_CLOSE_BRACE__";
let mut rendered = template
.replace("{{", OPEN_BRACE_SENTINEL)
.replace("}}", CLOSE_BRACE_SENTINEL);
for (key, value) in values {
rendered = rendered.replace(&format!("{{{key}}}"), value);
}
rendered
.replace(OPEN_BRACE_SENTINEL, "{")
.replace(CLOSE_BRACE_SENTINEL, "}")
}
fn validate_output_schema(schema: &Value, payload: &Value) -> Result<(), String> {
let object = payload
.as_object()
.ok_or_else(|| "Expected JSON object output".to_string())?;
if let Some(expected_type) = schema.get("type").and_then(Value::as_str)
&& expected_type != "object"
{
return Err("output_schema.type must be 'object' when provided".to_string());
}
if let Some(required_fields) = schema.get("required").and_then(Value::as_array) {
for field in required_fields {
let Some(field_name) = field.as_str() else {
continue;
};
if !object.contains_key(field_name) {
return Err(format!(
"Worker output missing required field '{field_name}'"
));
}
}
}
Ok(())
}
fn write_csv_worker_outcomes(csv_path: &Path, outcomes: &[CsvWorkerOutcome]) -> Result<()> {
if let Some(parent) = csv_path.parent() {
fs::create_dir_all(parent)?;
}
let mut writer = csv::WriterBuilder::new().from_path(csv_path)?;
writer.write_record([
"item_id",
"status",
"agent_id",
"duration_ms",
"error",
"result",
"result_json",
])?;
for outcome in outcomes {
let result_json = outcome
.result_json
.as_ref()
.map(serde_json::to_string)
.transpose()?
.unwrap_or_default();
writer.write_record([
outcome.item_id.clone(),
outcome.status.clone(),
outcome.agent_id.clone().unwrap_or_default(),
outcome.duration_ms.to_string(),
outcome.error.clone().unwrap_or_default(),
outcome.result.clone().unwrap_or_default(),
result_json,
])?;
}
writer.flush()?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn run_csv_row_agent(
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
job_id: &str,
row: CsvRowTask,
instruction_template: &str,
timeout: Duration,
output_schema: Option<Value>,
stop_requested: Arc<AtomicBool>,
) -> CsvWorkerOutcome {
let CsvRowTask {
row_index,
item_id,
values,
} = row;
if stop_requested.load(Ordering::Relaxed) {
return CsvWorkerOutcome {
row_index,
item_id,
status: "skipped".to_string(),
agent_id: None,
duration_ms: 0,
error: Some("Skipped because stop=true was reported by another worker".to_string()),
result: None,
result_json: None,
};
}
let schema_text = output_schema
.as_ref()
.map(serde_json::to_string_pretty)
.transpose()
.unwrap_or(None)
.unwrap_or_else(|| "{}".to_string());
let rendered_instruction = render_instruction_template(instruction_template, &values);
let row_json = serde_json::to_string_pretty(&values).unwrap_or_else(|_| "{}".to_string());
let prompt = format!(
"You are processing one item for a spawn_agents_on_csv job.\n\
Job ID: {job_id}\n\
Item ID: {item_id}\n\n\
Task instruction:\n\
{rendered_instruction}\n\n\
Input row (JSON):\n\
{row_json}\n\n\
Expected result schema (JSON Schema or {{}}):\n\
{schema_text}\n\n\
You MUST call the `report_agent_job_result` tool exactly once with:\n\
1. `job_id` = \"{job_id}\"\n\
2. `item_id` = \"{item_id}\"\n\
3. `result` = a JSON object for this row.\n\n\
If you need to stop the job early, include `stop` = true in the same tool call.\n\n\
After the tool call succeeds, stop.",
item_id = item_id.as_str()
);
let assignment = SubAgentAssignment::new(
format!("Process CSV item '{item_id}' for job '{job_id}'"),
Some("worker".to_string()),
);
let spawn_deadline = Instant::now() + timeout.min(Duration::from_secs(60));
let spawned = loop {
if stop_requested.load(Ordering::Relaxed) {
return CsvWorkerOutcome {
row_index,
item_id,
status: "skipped".to_string(),
agent_id: None,
duration_ms: 0,
error: Some("Skipped because stop=true was reported by another worker".to_string()),
result: None,
result_json: None,
};
}
let attempt = {
let mut manager_guard = manager.lock().await;
manager_guard.spawn_background_with_assignment(
manager.clone(),
runtime.clone(),
SubAgentType::General,
prompt.clone(),
assignment.clone(),
None,
)
};
match attempt {
Ok(snapshot) => break Ok(snapshot),
Err(err) => {
let message = err.to_string();
if message.contains("Sub-agent limit reached") && Instant::now() < spawn_deadline {
tokio::time::sleep(RESULT_POLL_INTERVAL).await;
continue;
}
break Err(message);
}
}
};
let spawn_snapshot = match spawned {
Ok(snapshot) => snapshot,
Err(error) => {
return CsvWorkerOutcome {
row_index,
item_id,
status: "failed".to_string(),
agent_id: None,
duration_ms: 0,
error: Some(error),
result: None,
result_json: None,
};
}
};
let agent_id = spawn_snapshot.agent_id.clone();
record_agent_job_assignment(job_id, item_id.as_str(), &agent_id);
let deadline = Instant::now() + timeout;
let final_snapshot = loop {
let snapshot = {
let manager = manager.lock().await;
manager.get_result(&agent_id)
};
match snapshot {
Ok(snapshot) if snapshot.status != SubAgentStatus::Running => break Ok(snapshot),
Ok(snapshot) => {
if Instant::now() >= deadline {
let cancelled = {
let mut manager = manager.lock().await;
manager.cancel(&agent_id)
};
let mut outcome = CsvWorkerOutcome {
row_index,
item_id,
status: "timed_out".to_string(),
agent_id: Some(agent_id.clone()),
duration_ms: snapshot.duration_ms,
error: Some("Worker timed out and was cancelled".to_string()),
result: snapshot.result,
result_json: None,
};
if let Ok(cancelled_snapshot) = cancelled {
outcome.duration_ms = cancelled_snapshot.duration_ms;
}
return outcome;
}
tokio::time::sleep(RESULT_POLL_INTERVAL).await;
}
Err(err) => break Err(err.to_string()),
}
};
let snapshot = match final_snapshot {
Ok(snapshot) => snapshot,
Err(error) => {
return CsvWorkerOutcome {
row_index,
item_id,
status: "failed".to_string(),
agent_id: Some(agent_id),
duration_ms: 0,
error: Some(error),
result: None,
result_json: None,
};
}
};
match snapshot.status {
SubAgentStatus::Completed => {
let Some(report) = take_agent_job_result(job_id, item_id.as_str()) else {
return CsvWorkerOutcome {
row_index,
item_id,
status: "failed".to_string(),
agent_id: Some(snapshot.agent_id),
duration_ms: snapshot.duration_ms,
error: Some(
"Worker finished without calling report_agent_job_result".to_string(),
),
result: snapshot.result,
result_json: None,
};
};
if let Some(schema) = output_schema.as_ref()
&& let Err(error) = validate_output_schema(schema, &report.result)
{
return CsvWorkerOutcome {
row_index,
item_id,
status: "failed".to_string(),
agent_id: Some(snapshot.agent_id),
duration_ms: snapshot.duration_ms,
error: Some(error),
result: snapshot.result,
result_json: Some(report.result),
};
}
if report.stop {
stop_requested.store(true, Ordering::Relaxed);
}
CsvWorkerOutcome {
row_index,
item_id,
status: "completed".to_string(),
agent_id: Some(snapshot.agent_id),
duration_ms: snapshot.duration_ms,
error: None,
result: snapshot.result,
result_json: Some(report.result),
}
}
SubAgentStatus::Interrupted(error) => CsvWorkerOutcome {
row_index,
item_id,
status: "interrupted".to_string(),
agent_id: Some(snapshot.agent_id),
duration_ms: snapshot.duration_ms,
error: Some(error),
result: snapshot.result,
result_json: None,
},
SubAgentStatus::Failed(error) => CsvWorkerOutcome {
row_index,
item_id,
status: "failed".to_string(),
agent_id: Some(snapshot.agent_id),
duration_ms: snapshot.duration_ms,
error: Some(error),
result: snapshot.result,
result_json: None,
},
SubAgentStatus::Cancelled => CsvWorkerOutcome {
row_index,
item_id,
status: "failed".to_string(),
agent_id: Some(snapshot.agent_id),
duration_ms: snapshot.duration_ms,
error: Some("Worker cancelled".to_string()),
result: snapshot.result,
result_json: None,
},
SubAgentStatus::Running => CsvWorkerOutcome {
row_index,
item_id,
status: "failed".to_string(),
agent_id: Some(snapshot.agent_id),
duration_ms: snapshot.duration_ms,
error: Some("Worker did not reach terminal status".to_string()),
result: snapshot.result,
result_json: None,
},
}
}
fn normalize_role_alias(input: &str) -> Option<&'static str> {
match input.to_ascii_lowercase().as_str() {
"default" => Some("default"),
"worker" | "general" => Some("worker"),
"explorer" | "explore" => Some("explorer"),
"awaiter" | "plan" | "planner" => Some("awaiter"),
_ => None,
}
}
fn build_assignment_prompt(
prompt: &str,
assignment: &SubAgentAssignment,
agent_type: &SubAgentType,
) -> String {
let role = assignment.role.as_deref().unwrap_or("default");
format!(
"Assignment metadata:\n- objective: {}\n- role: {}\n- resolved_type: {}\n\nTask:\n{}",
assignment.objective,
role,
agent_type.as_str(),
prompt
)
}
fn emit_agent_progress(
event_tx: Option<&mpsc::Sender<Event>>,
mailbox: Option<&Mailbox>,
agent_id: &str,
status: String,
) {
if let Some(mb) = mailbox {
let _ = mb.send(MailboxMessage::progress(agent_id, status.clone()));
}
if let Some(event_tx) = event_tx {
let _ = event_tx.try_send(Event::AgentProgress {
id: agent_id.to_string(),
status,
});
}
}
struct SubAgentToolRegistry {
allowed_tools: Option<Vec<String>>,
registry: ToolRegistry,
}
impl SubAgentToolRegistry {
fn new(
runtime: SubAgentRuntime,
explicit_allowed_tools: Option<Vec<String>>,
todo_list: SharedTodoList,
plan_state: SharedPlanState,
) -> Self {
let context = runtime.context.clone();
let registry = ToolRegistryBuilder::new()
.with_full_agent_surface(
Some(runtime.client.clone()),
runtime.model.clone(),
runtime.manager.clone(),
runtime.clone(),
runtime.allow_shell,
todo_list,
plan_state,
)
.with_tool(Arc::new(ReportAgentJobResultTool))
.build(context);
Self {
allowed_tools: explicit_allowed_tools,
registry,
}
}
fn is_tool_allowed(&self, name: &str) -> bool {
match &self.allowed_tools {
None => true,
Some(list) => list.iter().any(|t| t == name),
}
}
fn tools_for_model(&self) -> Vec<Tool> {
let api_tools = self.registry.to_api_tools();
match &self.allowed_tools {
None => api_tools,
Some(list) => api_tools
.into_iter()
.filter(|tool| list.contains(&tool.name))
.collect(),
}
}
fn unavailable_allowed_tools(&self) -> Vec<String> {
match &self.allowed_tools {
None => Vec::new(),
Some(list) => list
.iter()
.filter(|name| !self.registry.contains(name))
.cloned()
.collect(),
}
}
async fn execute(&self, agent_id: &str, name: &str, mut input: Value) -> Result<String> {
if !self.is_tool_allowed(name) {
return Err(anyhow!("Tool {name} not allowed for this sub-agent"));
}
if name == "report_agent_job_result"
&& let Some(object) = input.as_object_mut()
{
object.insert(
"__reporting_agent_id".to_string(),
Value::String(agent_id.to_string()),
);
}
self.registry
.execute(name, input)
.await
.map_err(|e| anyhow!(e))
}
}
fn build_allowed_tools(
agent_type: &SubAgentType,
explicit_tools: Option<Vec<String>>,
_allow_shell: bool,
) -> Result<Option<Vec<String>>> {
if let Some(tools) = explicit_tools {
let mut deduped = Vec::new();
for tool in tools {
let name = tool.trim();
if !name.is_empty() && !deduped.iter().any(|existing: &String| existing == name) {
deduped.push(name.to_string());
}
}
if matches!(agent_type, SubAgentType::Custom) && deduped.is_empty() {
return Err(anyhow!(
"Custom sub-agent requires a non-empty allowed_tools list"
));
}
return Ok(Some(deduped));
}
if matches!(agent_type, SubAgentType::Custom) {
return Err(anyhow!(
"Custom sub-agent requires a non-empty allowed_tools list"
));
}
Ok(None)
}
fn summarize_subagent_result(result: &SubAgentResult) -> String {
match (&result.status, result.result.as_ref()) {
(SubAgentStatus::Completed, Some(text)) => truncate_preview(text),
(SubAgentStatus::Completed, None) => "Completed (no output)".to_string(),
(SubAgentStatus::Interrupted(error), _) => format!("Interrupted: {error}"),
(SubAgentStatus::Cancelled, _) => "Cancelled".to_string(),
(SubAgentStatus::Failed(error), _) => format!("Failed: {error}"),
(SubAgentStatus::Running, _) => "Running".to_string(),
}
}
fn subagent_status_name(status: &SubAgentStatus) -> &'static str {
match status {
SubAgentStatus::Running => "running",
SubAgentStatus::Completed => "completed",
SubAgentStatus::Interrupted(_) => "interrupted",
SubAgentStatus::Failed(_) => "failed",
SubAgentStatus::Cancelled => "cancelled",
}
}
fn truncate_preview(text: &str) -> String {
const MAX_LEN: usize = 240;
if text.len() <= MAX_LEN {
text.to_string()
} else {
format!("{}...", text.chars().take(MAX_LEN).collect::<String>())
}
}
const GENERAL_AGENT_PROMPT: &str = concat!(
"You are a general-purpose sub-agent spawned to handle a specific task autonomously.\n",
"\n",
"Your scope is exactly what the parent assigned to you. Do not expand the\n",
"objective — if you discover related work that needs doing, surface it under\n",
"RISKS or BLOCKERS rather than starting it. Work autonomously: the parent is\n",
"not available to answer questions mid-run.\n",
"\n",
"Plan before you act. Use `checklist_write` for any multi-step task so your work\n",
"is visible in the parent's sidebar. For complex initiatives, layer\n",
"`update_plan` (strategy) above `checklist_write` (tactics).\n",
"\n",
include_str!("../../prompts/subagent_output_format.md"),
);
const EXPLORE_AGENT_PROMPT: &str = concat!(
"You are an exploration sub-agent. Your job is to map the relevant region\n",
"of the codebase fast and report what is there. You are read-only by\n",
"convention — do not write, patch, or run side-effectful commands. If the\n",
"task seems to require a write, stop and put it under BLOCKERS.\n",
"\n",
"Method:\n",
"- Start with `list_dir` and `file_search` to orient.\n",
"- Use `grep_files` (NOT `exec_shell rg`) to find call sites, type defs,\n",
" and string literals. Prefer narrow, structured queries over broad scans.\n",
"- Read each candidate file with `read_file`. Skim, then quote line ranges.\n",
"- Stop reading once you have enough evidence — exhaustive sweeps are not\n",
" the goal. The parent will spawn a follow-up explorer if needed.\n",
"\n",
"EVIDENCE is the load-bearing section for explorers. Cite every file you\n",
"read with `path:line-range` and one line per finding. The parent uses your\n",
"EVIDENCE list as a working set for the next turn, so be precise.\n",
"\n",
"CHANGES will almost always be \"None.\" for an explorer.\n",
"\n",
include_str!("../../prompts/subagent_output_format.md"),
);
const PLAN_AGENT_PROMPT: &str = concat!(
"You are a planning sub-agent. Your job is to take an objective and\n",
"produce a prioritized, executable plan — not to execute it. Keep writes\n",
"to a minimum (notes and plan artifacts only); avoid patches and shell\n",
"side effects.\n",
"\n",
"Method:\n",
"- Read enough of the codebase to ground the plan in reality. A plan\n",
" written without `read_file` evidence is a guess.\n",
"- Decompose the objective into ordered, verifiable steps. Each step names\n",
" the artifact it produces and the check that proves it works.\n",
"- Surface trade-offs explicitly. If two approaches are viable, name both\n",
" and pick one with a reason — don't leave the parent with a fork.\n",
"- Use `update_plan` to record the high-level strategy and `checklist_write` to\n",
" emit the granular backlog. The parent (and the user) reads these from\n",
" the sidebar after you finish.\n",
"\n",
"Prioritization: order todos by the dependency graph first, then by the\n",
"ratio of risk reduced to effort spent. Tag each item with `[P0]` / `[P1]`\n",
"/ `[P2]` so the parent can pick a slice without re-reading the whole plan.\n",
"\n",
"CHANGES should list the plan artifacts you wrote (e.g. `update_plan` rows,\n",
"`checklist_write` ids, any notes). Do not include speculative future edits.\n",
"\n",
include_str!("../../prompts/subagent_output_format.md"),
);
const REVIEW_AGENT_PROMPT: &str = concat!(
"You are a code review sub-agent. Your job is to read the code under\n",
"review and emit a severity-scored list of findings. You are read-only by\n",
"convention — do not patch the code under review even if a fix is obvious;\n",
"describe the fix in the finding so the parent can apply it.\n",
"\n",
"Method:\n",
"- Read the diff or files end-to-end with `read_file` before scoring.\n",
"- Use `grep_files` to check for sibling call sites, similar patterns\n",
" elsewhere, and existing tests covering the same surface.\n",
"- For each finding, score severity as one of:\n",
" BLOCKER — correctness, security, data loss, or contract break.\n",
" MAJOR — likely bug, missing error path, perf regression at scale.\n",
" MINOR — style, naming, redundancy, suboptimal but correct code.\n",
" NIT — taste; reasonable people may disagree.\n",
"- Order EVIDENCE bullets by severity, BLOCKER first. Each bullet:\n",
" `[SEVERITY] path:line-range — one-line description; suggested fix`.\n",
"- Be constructive. Cite the failure mode, not the author.\n",
"\n",
"If you find no issues at MAJOR or above, say so plainly in SUMMARY — a\n",
"clean review is a valid result and the parent benefits from knowing it.\n",
"\n",
"CHANGES will almost always be \"None.\" for a reviewer.\n",
"\n",
include_str!("../../prompts/subagent_output_format.md"),
);
const CUSTOM_AGENT_PROMPT: &str = concat!(
"You are a custom sub-agent. The parent has given you a narrowed tool\n",
"registry — only the tools you see at runtime are available. Do not try\n",
"to reach for a tool that is not registered; if the task needs one, put\n",
"the gap under BLOCKERS and stop.\n",
"\n",
"Stay tightly scoped to the assigned objective. The parent chose Custom\n",
"specifically to constrain you — do not expand into adjacent work.\n",
"\n",
include_str!("../../prompts/subagent_output_format.md"),
);
#[cfg(test)]
mod tests;