#![allow(dead_code)]
use std::collections::{HashMap, VecDeque};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, RwLock, Semaphore};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::client::DeepSeekClient;
use crate::config::MAX_SUBAGENTS;
use crate::core::events::Event;
use crate::dependencies::{ExternalTool, Git};
use crate::llm_client::LlmClient;
use crate::models::{
ContentBlock, Message, MessageRequest, MessageResponse, SystemPrompt, Tool, Usage,
};
use crate::request_tuning::RequestTuning;
use crate::tools::handle::VarHandle;
use crate::tools::plan::{PlanState, SharedPlanState};
use crate::tools::registry::{ToolRegistry, ToolRegistryBuilder};
use crate::tools::spec::{
ApprovalRequirement, ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec,
};
use crate::tools::todo::SharedTodoList;
#[cfg(test)]
use crate::tools::todo::TodoList;
use crate::tui::app::ReasoningEffort;
use crate::utils::spawn_supervised;
use crate::worker_profile::{ModelRoute, ToolScope, WorkerRuntimeProfile};
pub mod mailbox;
#[allow(unused_imports)]
pub use mailbox::{Mailbox, MailboxEnvelope, MailboxMessage, MailboxReceiver};
static RESIDENT_LEASES: std::sync::OnceLock<
std::sync::Mutex<std::collections::HashMap<String, String>>,
> = std::sync::OnceLock::new();
fn release_resident_leases_for(agent_id: &str) {
if let Some(lock) = RESIDENT_LEASES.get()
&& let Ok(mut guard) = lock.lock()
{
guard.retain(|_, owner| owner != agent_id);
}
}
const DEFAULT_MAX_STEPS: u32 = u32::MAX;
const TOOL_TIMEOUT: Duration = Duration::from_secs(30);
const MIN_SUBAGENT_SPAWN_TOKEN_RESERVE: u64 = 1;
fn format_step_counter(steps: u32, max_steps: u32) -> String {
if max_steps == u32::MAX {
format!("step {steps}")
} else {
format!("step {steps}/{max_steps}")
}
}
const SUBAGENT_RESPONSE_MAX_TOKENS: u32 = 16_384;
const MAX_CONSECUTIVE_TRUNCATED_SUBAGENT_RESPONSES: u32 = 5;
const SUBAGENT_TRANSIENT_PROVIDER_MAX_RETRIES: u32 = 2;
const SUBAGENT_TRANSIENT_PROVIDER_INITIAL_BACKOFF: Duration = Duration::from_millis(250);
const DEFAULT_STEP_API_TIMEOUT: Duration =
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_API_TIMEOUT_SECS);
const DEFAULT_RESULT_TIMEOUT_MS: u64 = 30_000;
const MAX_RESULT_TIMEOUT_MS: u64 = 3_600_000;
const COMPLETED_AGENT_RETENTION: Duration = Duration::from_secs(60 * 60);
const MAX_AGENT_WORKER_RECORDS: usize = 256;
const MAX_AGENT_WORKER_EVENTS_PER_RECORD: usize = 128;
const SUBAGENT_STATE_SCHEMA_VERSION: u32 = 1;
const SUBAGENT_STATE_FILE: &str = "subagents.v1.json";
const SUBAGENT_WORKTREE_ROOT_DIR: &str = ".codewhale-worktrees";
const SUBAGENT_RESTART_REASON: &str = "Interrupted by process restart";
const SUBAGENT_QUEUED_LAUNCH_REASON: &str = "queued: waiting for a sub-agent launch slot";
const SUBAGENT_MODEL_WAIT_REASON: &str = "waiting for model response";
const SUBAGENT_PERSIST_DEBOUNCE: Duration = Duration::from_millis(1500);
static SUBAGENT_PERSIST_WRITES: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
static SUBAGENT_PERSIST_SKIPPED: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
fn subagent_perf_enabled() -> bool {
static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
*ENABLED.get_or_init(|| {
std::env::var("CODEWHALE_SUBAGENT_PERF_TRACE")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
})
}
const VALID_SUBAGENT_TYPES: &str = "general (aliases: general-purpose, general_purpose, worker, default), \
explore (aliases: exploration, explorer), plan (aliases: planning, planner, awaiter), \
review (aliases: code-review, code_review, reviewer), implementer (aliases: implement, implementation, builder), \
verifier (aliases: verify, verification, validator, tester), custom";
const VALID_ROLE_ALIASES: &str = "default; worker (aliases: general, general-purpose, general_purpose); \
explorer (aliases: explore, exploration); awaiter (aliases: plan, planning, planner); \
reviewer (aliases: review, code-review, code_review); implementer (aliases: implement, implementation, builder); \
verifier (aliases: verify, verification, validator, tester); custom";
const SUBAGENT_TYPE_DESCRIPTION: &str = "Sub-agent type. Accepted vocabulary: general (aliases: general-purpose, general_purpose, worker, default), \
explore (aliases: exploration, explorer), plan (aliases: planning, planner, awaiter), \
review (aliases: code-review, code_review, reviewer), implementer (aliases: implement, implementation, builder), \
verifier (aliases: verify, verification, validator, tester), custom.";
const SUBAGENT_ROLE_DESCRIPTION: &str = "Role alias. Accepted vocabulary: default; worker (aliases: general, general-purpose, general_purpose); \
explorer (aliases: explore, exploration); awaiter (aliases: plan, planning, planner); \
reviewer (aliases: review, code-review, code_review); implementer (aliases: implement, implementation, builder); \
verifier (aliases: verify, verification, validator, tester); custom. \
Must match `type` if both are given.";
pub const WHALE_NICKNAMES: &[&str] = &[
"Blue",
"蓝鲸",
"Humpback",
"座头鲸",
"Sperm",
"抹香鲸",
"Fin",
"长须鲸",
"Sei",
"塞鲸",
"Bryde's",
"布氏鲸",
"Minke",
"小须鲸",
"Antarctic Minke",
"南极小须鲸",
"Pygmy Right",
"小露脊鲸",
"Omura's",
"大村鲸",
"Eden's",
"艾氏鲸",
"Rice's",
"赖斯鲸",
"Gray",
"灰鲸",
"Bowhead",
"弓头鲸",
"North Atlantic Right",
"北大西洋露脊鲸",
"North Pacific Right",
"北太平洋露脊鲸",
"Southern Right",
"南露脊鲸",
"Beluga",
"白鲸",
"Narwhal",
"独角鲸",
"Orca",
"虎鲸",
"Pilot",
"领航鲸",
"False Killer",
"伪虎鲸",
"Pygmy Killer",
"小虎鲸",
"Melon-headed",
"瓜头鲸",
"Beaked",
"喙鲸",
"Cuvier's Beaked",
"柯氏喙鲸",
"Baird's Beaked",
"贝氏喙鲸",
"Blainville's Beaked",
"柏氏喙鲸",
"Ginkgo-toothed Beaked",
"银杏齿喙鲸",
"Strap-toothed",
"带齿喙鲸",
"Stejneger's Beaked",
"斯氏喙鲸",
"Dwarf Sperm",
"小抹香鲸",
"Pygmy Sperm",
"侏儒抹香鲸",
"Rough-toothed",
"糙齿海豚",
"Atlantic Spotted",
"大西洋斑海豚",
"Pantropical Spotted",
"热带斑海豚",
"Spinner",
"长吻飞旋海豚",
"Clymene",
"短吻飞旋海豚",
"Striped",
"条纹海豚",
"Common Bottlenose",
"宽吻海豚",
"Indo-Pacific Bottlenose",
"印太瓶鼻海豚",
"Risso's",
"灰海豚",
"Commerson's",
"花斑海豚",
"Chilean",
"智利海豚",
"Heaviside's",
"海氏矮海豚",
"Hector's",
"赫氏矮海豚",
"Amazon River",
"亚马逊河豚",
"Ganges River",
"恒河豚",
"Indus River",
"印度河豚",
"La Plata",
"拉普拉塔河豚",
"Franciscana",
"拉河豚",
];
#[must_use]
pub fn whale_name_for_id(id: &str) -> String {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
id.hash(&mut hasher);
let idx = (hasher.finish() as usize) % WHALE_NICKNAMES.len();
WHALE_NICKNAMES[idx].to_string()
}
#[must_use]
pub fn assign_unique_whale_name(
id: &str,
active_names: &std::collections::HashSet<String>,
) -> String {
let base = whale_name_for_id(id);
if !active_names.contains(&base) {
return base;
}
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
id.hash(&mut hasher);
let suffix_seed = hasher.finish();
for i in 2.. {
let candidate = format!("{base} ({i})");
if !active_names.contains(&candidate) {
return candidate;
}
let probe = (suffix_seed.wrapping_add(i as u64)) % 100;
let candidate2 = format!("{base} ({probe})");
if !active_names.contains(&candidate2) {
return candidate2;
}
}
format!("{base} ({})", id.get(..4).unwrap_or("?"))
}
const DEPRECATION_REMOVAL_VERSION: &str = "0.8.0";
#[must_use]
pub fn whale_nickname_for_index(index: usize) -> String {
let base = WHALE_NICKNAMES[index % WHALE_NICKNAMES.len()];
if index < WHALE_NICKNAMES.len() {
base.to_string()
} else {
format!("{base} {}", index / WHALE_NICKNAMES.len() + 1)
}
}
fn wrap_with_deprecation_notice(
mut result: ToolResult,
this_tool: &str,
use_instead: &str,
) -> ToolResult {
tracing::warn!(
"Deprecated tool '{}' invoked — use '{}' instead (removal: v{})",
this_tool,
use_instead,
DEPRECATION_REMOVAL_VERSION,
);
let notice = json!({
"_deprecation": {
"this_tool": this_tool,
"use_instead": use_instead,
"removed_in": DEPRECATION_REMOVAL_VERSION,
"message": format!(
"Tool '{}' is deprecated; switch to '{}' before v{}.",
this_tool, use_instead, DEPRECATION_REMOVAL_VERSION
)
}
});
result.metadata = Some(match result.metadata.take() {
Some(Value::Object(mut map)) => {
if let Value::Object(notice_map) = notice {
map.extend(notice_map);
}
Value::Object(map)
}
Some(other) => {
json!({ "_deprecation": notice["_deprecation"].clone(), "_original_metadata": other })
}
None => notice,
});
result
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubAgentAssignment {
pub objective: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
}
impl SubAgentAssignment {
fn new(objective: String, role: Option<String>) -> Self {
Self { objective, role }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum SubAgentType {
#[default]
General,
Explore,
Plan,
Review,
Implementer,
Verifier,
Custom,
}
impl SubAgentType {
#[must_use]
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"general" | "general-purpose" | "general_purpose" | "worker" | "default" => {
Some(Self::General)
}
"explore" | "exploration" | "explorer" => Some(Self::Explore),
"plan" | "planning" | "planner" | "awaiter" => Some(Self::Plan),
"review" | "code-review" | "code_review" | "reviewer" => Some(Self::Review),
"implementer" | "implement" | "implementation" | "builder" => Some(Self::Implementer),
"verifier" | "verify" | "verification" | "validator" | "tester" => Some(Self::Verifier),
"custom" => Some(Self::Custom),
_ => None,
}
}
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::General => "general",
Self::Explore => "explore",
Self::Plan => "plan",
Self::Review => "review",
Self::Implementer => "implementer",
Self::Verifier => "verifier",
Self::Custom => "custom",
}
}
#[must_use]
pub fn system_prompt(&self) -> String {
let role_intro = match self {
Self::General => GENERAL_AGENT_INTRO,
Self::Explore => EXPLORE_AGENT_INTRO,
Self::Plan => PLAN_AGENT_INTRO,
Self::Review => REVIEW_AGENT_INTRO,
Self::Implementer => IMPLEMENTER_AGENT_INTRO,
Self::Verifier => VERIFIER_AGENT_INTRO,
Self::Custom => CUSTOM_AGENT_INTRO,
};
format!("{role_intro}{SUBAGENT_OUTPUT_FORMAT}")
}
#[must_use]
#[deprecated(
since = "0.6.6",
note = "Default sub-agents inherit the full parent registry; pass an explicit allowed_tools list only for narrow Custom roles."
)]
pub fn allowed_tools(&self) -> Vec<&'static str> {
match self {
Self::General => vec![
"list_dir",
"read_file",
"write_file",
"edit_file",
"apply_patch",
"grep_files",
"file_search",
"web.run",
"web_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
"note",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
"update_plan",
],
Self::Explore => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"web.run",
"web_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
],
Self::Plan => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"web.run",
"note",
"update_plan",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
],
Self::Review => vec!["list_dir", "read_file", "grep_files", "file_search", "note"],
Self::Implementer => vec![
"list_dir",
"read_file",
"write_file",
"edit_file",
"apply_patch",
"grep_files",
"file_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
"note",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
"update_plan",
],
Self::Verifier => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
"run_tests",
"run_verifiers",
"diagnostics",
"note",
],
Self::Custom => vec![], }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SubAgentStatus {
Running,
Completed,
Interrupted(String),
Failed(String),
Cancelled,
BudgetExhausted,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubAgentNeedsInput {
pub question: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentResult {
pub name: String,
pub agent_id: String,
pub context_mode: String,
pub fork_context: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workspace: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git_branch: Option<String>,
pub agent_type: SubAgentType,
pub assignment: SubAgentAssignment,
#[serde(default)]
pub model: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nickname: Option<String>,
pub status: SubAgentStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_status: Option<AgentWorkerStatus>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_run_id: Option<String>,
#[serde(default)]
pub spawn_depth: u32,
pub result: Option<String>,
pub steps_taken: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint: Option<SubAgentCheckpoint>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub needs_input: Option<SubAgentNeedsInput>,
pub duration_ms: u64,
#[serde(default, skip_serializing_if = "is_false")]
pub from_prior_session: bool,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AgentWorkerStatus {
Queued,
Starting,
Running,
WaitingForUser,
ModelWait,
RunningTool,
Completed,
Failed,
Cancelled,
Interrupted,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AgentWorkerToolProfile {
Inherited,
Explicit(Vec<String>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentWorkerSpec {
pub worker_id: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub run_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_run_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_name: Option<String>,
pub objective: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
pub agent_type: SubAgentType,
pub model: String,
pub workspace: PathBuf,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git_branch: Option<String>,
pub context_mode: String,
pub fork_context: bool,
pub tool_profile: AgentWorkerToolProfile,
#[serde(default)]
pub runtime_profile: WorkerRuntimeProfile,
pub max_steps: u32,
pub spawn_depth: u32,
pub max_spawn_depth: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunFollowUpDelivery {
pub delivered: bool,
pub timestamp_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message_preview: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
pub interrupt: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub continued_from_checkpoint: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunFollowUpTarget {
#[serde(default = "default_agent_inspect_tool")]
pub tool: String,
pub agent_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_name: Option<String>,
#[serde(default)]
pub accepted_statuses: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub latest_delivery: Option<AgentRunFollowUpDelivery>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunTakeoverTarget {
#[serde(default = "default_subagent_takeover_kind")]
pub kind: String,
#[serde(default)]
pub supported: bool,
pub agent_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_name: Option<String>,
pub instructions: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub unsupported_reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunArtifactRef {
pub kind: String,
pub name: String,
pub target: String,
pub description: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunUsage {
pub status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub total_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub token_budget: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub budget_spent_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub budget_remaining_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub budget_scope: Option<String>,
pub note: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunVerificationSummary {
pub status: String,
pub summary: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunRecommendedAction {
pub action: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool: Option<String>,
pub reason: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentWorkerEvent {
pub seq: u64,
pub worker_id: String,
pub status: AgentWorkerStatus,
pub timestamp_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub step: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentWorkerRecord {
pub spec: AgentWorkerSpec,
#[serde(default = "default_subagent_actor_kind")]
pub actor_kind: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_run_id: Option<String>,
#[serde(default = "default_agent_run_follow_up")]
pub follow_up: AgentRunFollowUpTarget,
#[serde(default = "default_agent_run_takeover")]
pub takeover: AgentRunTakeoverTarget,
#[serde(default)]
pub artifacts: Vec<AgentRunArtifactRef>,
#[serde(default = "default_agent_run_usage")]
pub usage: AgentRunUsage,
#[serde(default = "default_agent_run_verification")]
pub verification: AgentRunVerificationSummary,
#[serde(default = "default_agent_run_recommended_action")]
pub recommended_action: AgentRunRecommendedAction,
pub status: AgentWorkerStatus,
pub created_at_ms: u64,
pub updated_at_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub started_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub latest_message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result_summary: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default)]
pub steps_taken: u32,
#[serde(default)]
pub events: VecDeque<AgentWorkerEvent>,
}
impl AgentWorkerRecord {
fn new(spec: AgentWorkerSpec, now_ms: u64) -> Self {
let run_id = agent_worker_run_id(&spec);
let artifacts = default_subagent_artifacts(&run_id);
let follow_up = follow_up_target_for_spec(&spec);
let takeover = takeover_target_for_spec(&spec);
let recommended_action =
recommended_action_for_worker_status(AgentWorkerStatus::Starting, &spec);
Self {
parent_run_id: spec.parent_run_id.clone(),
spec,
actor_kind: default_subagent_actor_kind(),
follow_up,
takeover,
artifacts,
usage: default_agent_run_usage(),
verification: default_agent_run_verification(),
recommended_action,
status: AgentWorkerStatus::Starting,
created_at_ms: now_ms,
updated_at_ms: now_ms,
started_at_ms: None,
completed_at_ms: None,
latest_message: None,
result_summary: None,
error: None,
steps_taken: 0,
events: VecDeque::new(),
}
}
}
fn default_subagent_actor_kind() -> String {
"subagent".to_string()
}
fn default_agent_inspect_tool() -> String {
"handle_read".to_string()
}
fn default_subagent_takeover_kind() -> String {
"local_subagent_session".to_string()
}
fn default_agent_run_follow_up() -> AgentRunFollowUpTarget {
AgentRunFollowUpTarget {
tool: default_agent_inspect_tool(),
agent_id: String::new(),
session_name: None,
accepted_statuses: vec!["running".to_string(), "interrupted_continuable".to_string()],
latest_delivery: None,
}
}
fn default_agent_run_takeover() -> AgentRunTakeoverTarget {
AgentRunTakeoverTarget {
kind: default_subagent_takeover_kind(),
supported: false,
agent_id: String::new(),
session_name: None,
instructions: "No takeover target is available for this older record.".to_string(),
unsupported_reason: Some("legacy_record_missing_agent_id".to_string()),
}
}
fn default_agent_run_usage() -> AgentRunUsage {
AgentRunUsage {
status: "unknown".to_string(),
input_tokens: None,
output_tokens: None,
total_tokens: None,
token_budget: None,
budget_spent_tokens: None,
budget_remaining_tokens: None,
budget_scope: None,
note: "Token usage is not yet reported by the sub-agent worker ledger.".to_string(),
}
}
fn positive_token_budget(budget: Option<u64>) -> Option<u64> {
budget.filter(|value| *value > 0)
}
fn usage_total_tokens(usage: &Usage) -> u64 {
u64::from(usage.input_tokens).saturating_add(u64::from(usage.output_tokens))
}
fn refresh_usage_note(usage: &mut AgentRunUsage) {
let worker_total = usage.total_tokens.unwrap_or(0);
if let Some(limit) = usage.token_budget {
let spent = usage.budget_spent_tokens.unwrap_or(worker_total);
let remaining = usage
.budget_remaining_tokens
.unwrap_or_else(|| limit.saturating_sub(spent));
usage.status = if remaining == 0 {
"budget_exhausted".to_string()
} else if worker_total > 0 {
"reported".to_string()
} else {
"tracking".to_string()
};
usage.note = if worker_total > 0 {
format!(
"Token budget: {spent}/{limit} spent, {remaining} remaining. This worker reported {worker_total} tokens."
)
} else {
format!("Token budget: {spent}/{limit} spent, {remaining} remaining.")
};
} else if worker_total > 0 {
usage.status = "reported".to_string();
usage.note = format!("Provider reported {worker_total} tokens for this worker.");
} else if usage.status.is_empty() {
*usage = default_agent_run_usage();
}
}
fn default_agent_run_verification() -> AgentRunVerificationSummary {
AgentRunVerificationSummary {
status: "self_report_only".to_string(),
summary:
"No verified command or test receipt is attached; treat the result summary as a child self-report."
.to_string(),
}
}
fn default_agent_run_recommended_action() -> AgentRunRecommendedAction {
AgentRunRecommendedAction {
action: "inspect_transcript".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: "Inspect the returned transcript handle if the child result needs audit detail."
.to_string(),
}
}
fn recommended_action_for_worker_status(
status: AgentWorkerStatus,
spec: &AgentWorkerSpec,
) -> AgentRunRecommendedAction {
let agent_ref = spec
.session_name
.as_deref()
.filter(|name| !name.is_empty())
.unwrap_or(&spec.worker_id);
match status {
AgentWorkerStatus::Queued => AgentRunRecommendedAction {
action: "continue_parent_work".to_string(),
tool: None,
reason: format!(
"Worker {agent_ref} is queued in the background; continue coordinating and consume its completion event when it arrives."
),
},
AgentWorkerStatus::Starting
| AgentWorkerStatus::Running
| AgentWorkerStatus::ModelWait
| AgentWorkerStatus::RunningTool => AgentRunRecommendedAction {
action: "continue_parent_work".to_string(),
tool: None,
reason: format!(
"Worker {agent_ref} is active in the background; continue parent work until its completion event arrives."
),
},
AgentWorkerStatus::WaitingForUser => AgentRunRecommendedAction {
action: "inspect_or_replace".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: format!(
"Worker {agent_ref} needs parent action; inspect the transcript handle and open a replacement with agent if the task still matters."
),
},
AgentWorkerStatus::Completed => AgentRunRecommendedAction {
action: "verify_self_report".to_string(),
tool: Some("handle_read".to_string()),
reason: format!(
"Worker {agent_ref} completed; verify its self-report before treating side effects as fact."
),
},
AgentWorkerStatus::Failed => AgentRunRecommendedAction {
action: "inspect_failure".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: format!(
"Worker {agent_ref} failed; inspect the transcript handle and decide whether to open a replacement."
),
},
AgentWorkerStatus::Cancelled => AgentRunRecommendedAction {
action: "open_replacement_if_needed".to_string(),
tool: Some("agent".to_string()),
reason: format!(
"Worker {agent_ref} was cancelled; open a replacement with agent only if the assignment still matters."
),
},
AgentWorkerStatus::Interrupted => AgentRunRecommendedAction {
action: "inspect_or_replace".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: format!(
"Worker {agent_ref} was interrupted; inspect the transcript handle before deciding whether to re-dispatch."
),
},
}
}
fn agent_worker_run_id(spec: &AgentWorkerSpec) -> String {
if spec.run_id.is_empty() {
spec.worker_id.clone()
} else {
spec.run_id.clone()
}
}
fn follow_up_target_for_spec(spec: &AgentWorkerSpec) -> AgentRunFollowUpTarget {
AgentRunFollowUpTarget {
tool: default_agent_inspect_tool(),
agent_id: spec.worker_id.clone(),
session_name: spec.session_name.clone(),
accepted_statuses: vec!["running".to_string(), "interrupted_continuable".to_string()],
latest_delivery: None,
}
}
fn takeover_target_for_spec(spec: &AgentWorkerSpec) -> AgentRunTakeoverTarget {
let agent_ref = spec
.session_name
.as_deref()
.filter(|name| !name.is_empty())
.unwrap_or(&spec.worker_id);
AgentRunTakeoverTarget {
kind: default_subagent_takeover_kind(),
supported: true,
agent_id: spec.worker_id.clone(),
session_name: spec.session_name.clone(),
instructions: format!(
"Inspect agent '{agent_ref}' through the returned transcript_handle with handle_read; open a replacement with agent if the lane no longer fits."
),
unsupported_reason: None,
}
}
fn default_subagent_artifacts(run_id: &str) -> Vec<AgentRunArtifactRef> {
vec![
AgentRunArtifactRef {
kind: "worker_events".to_string(),
name: "worker_record.events".to_string(),
target: run_id.to_string(),
description: "Bounded structured lifecycle events retained on the worker record."
.to_string(),
},
AgentRunArtifactRef {
kind: "transcript".to_string(),
name: "transcript_handle".to_string(),
target: format!("agent:{run_id}"),
description:
"Use the projection transcript_handle with handle_read for the child transcript."
.to_string(),
},
AgentRunArtifactRef {
kind: "receipt".to_string(),
name: "result_summary".to_string(),
target: run_id.to_string(),
description: "Child final summary when present; verify before treating as fact."
.to_string(),
},
]
}
fn message_preview(text: &str) -> String {
const MAX_PREVIEW_CHARS: usize = 120;
let mut preview: String = text.chars().take(MAX_PREVIEW_CHARS).collect();
if text.chars().count() > MAX_PREVIEW_CHARS {
preview.push_str("...");
}
preview
}
fn normalize_worker_spec(mut spec: AgentWorkerSpec) -> AgentWorkerSpec {
if spec.run_id.is_empty() {
spec.run_id = spec.worker_id.clone();
}
spec
}
fn worker_tool_scope(tool_profile: &AgentWorkerToolProfile) -> ToolScope {
match tool_profile {
AgentWorkerToolProfile::Inherited => ToolScope::Inherit,
AgentWorkerToolProfile::Explicit(tools) => ToolScope::Explicit(tools.clone()),
}
}
fn worker_profile_from_spec(spec: &AgentWorkerSpec) -> WorkerRuntimeProfile {
let mut profile = WorkerRuntimeProfile::for_role(spec.agent_type.clone());
profile.tools = worker_tool_scope(&spec.tool_profile);
profile.model = ModelRoute::Fixed(spec.model.clone());
profile.max_spawn_depth = spec.max_spawn_depth.saturating_sub(spec.spawn_depth);
profile.background = true;
profile
}
fn worker_profile_for_spawn(
runtime: &SubAgentRuntime,
agent_type: &SubAgentType,
tool_profile: &AgentWorkerToolProfile,
effective_model: &str,
model_route: Option<ModelRoute>,
) -> WorkerRuntimeProfile {
let mut requested = WorkerRuntimeProfile::for_role(agent_type.clone());
requested.tools = worker_tool_scope(tool_profile);
requested.model = model_route.unwrap_or_else(|| ModelRoute::Fixed(effective_model.to_string()));
requested.provider = Some(runtime.client.api_provider().as_str().to_string());
requested.max_spawn_depth = runtime.max_spawn_depth.saturating_sub(runtime.spawn_depth);
requested.background = true;
runtime.worker_profile.derive_child(&requested)
}
fn normalize_worker_record(mut record: AgentWorkerRecord) -> AgentWorkerRecord {
record.spec = normalize_worker_spec(record.spec);
if record.spec.runtime_profile == WorkerRuntimeProfile::default() {
record.spec.runtime_profile = worker_profile_from_spec(&record.spec);
}
let run_id = agent_worker_run_id(&record.spec);
if record.actor_kind.is_empty() {
record.actor_kind = default_subagent_actor_kind();
}
if record.parent_run_id.is_none() {
record.parent_run_id = record.spec.parent_run_id.clone();
}
if record.follow_up.agent_id.is_empty() {
record.follow_up = follow_up_target_for_spec(&record.spec);
} else if record.follow_up.tool != default_agent_inspect_tool() {
record.follow_up.tool = default_agent_inspect_tool();
}
if record.takeover.agent_id.is_empty()
|| !record
.takeover
.instructions
.contains(&default_agent_inspect_tool())
{
record.takeover = takeover_target_for_spec(&record.spec);
}
record.recommended_action = recommended_action_for_worker_status(record.status, &record.spec);
if record.artifacts.is_empty() {
record.artifacts = default_subagent_artifacts(&run_id);
}
if record.usage.status.is_empty() {
record.usage = default_agent_run_usage();
} else {
refresh_usage_note(&mut record.usage);
}
if record.verification.status.is_empty() {
record.verification = default_agent_run_verification();
}
record
}
fn is_false(b: &bool) -> bool {
!*b
}
fn current_git_branch(workspace: &Path) -> Option<String> {
let branch = run_git(workspace, &["rev-parse", "--abbrev-ref", "HEAD"])?;
let branch = branch.trim();
if branch.is_empty() {
return None;
}
if branch != "HEAD" {
return Some(branch.to_string());
}
let short_hash = run_git(workspace, &["rev-parse", "--short", "HEAD"])?;
let short_hash = short_hash.trim();
(!short_hash.is_empty()).then(|| format!("detached:{short_hash}"))
}
fn run_git(workspace: &Path, args: &[&str]) -> Option<String> {
let output = Git::output(args, workspace).ok()?;
output
.status
.success()
.then(|| String::from_utf8_lossy(&output.stdout).to_string())
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SubAgentSpawnOptions {
pub name: Option<String>,
pub model: Option<String>,
pub model_route: Option<ModelRoute>,
pub nickname: Option<String>,
pub fork_context: bool,
pub token_budget: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SubAgentModelStrength {
Same,
Faster,
}
impl SubAgentModelStrength {
fn parse(value: &str) -> Result<Self, ToolError> {
let normalized = value.trim().to_ascii_lowercase();
match normalized.as_str() {
"same" | "inherit" | "parent" | "current" => Ok(Self::Same),
"faster" | "fast" | "smaller" | "small" | "lower" | "cheap" | "flash" => {
Ok(Self::Faster)
}
_ => Err(ToolError::invalid_input(
"model_strength must be one of: same, faster".to_string(),
)),
}
}
fn model_route(self) -> ModelRoute {
match self {
Self::Same => ModelRoute::Inherit,
Self::Faster => ModelRoute::Faster,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SubAgentThinking {
Inherit,
Auto,
Effort(ReasoningEffort),
}
impl SubAgentThinking {
fn parse(value: &str) -> Result<Self, ToolError> {
let normalized = value.trim().to_ascii_lowercase();
match normalized.as_str() {
"inherit" | "parent" | "same" | "current" => Ok(Self::Inherit),
"auto" | "automatic" => Ok(Self::Auto),
"off" | "disabled" | "none" | "false" => Ok(Self::Effort(ReasoningEffort::Off)),
"low" | "minimal" => Ok(Self::Effort(ReasoningEffort::Low)),
"medium" | "mid" => Ok(Self::Effort(ReasoningEffort::Medium)),
"high" => Ok(Self::Effort(ReasoningEffort::High)),
"max" | "maximum" | "xhigh" | "ultracode" => Ok(Self::Effort(ReasoningEffort::Max)),
_ => Err(ToolError::invalid_input(
"thinking must be one of: inherit, auto, off, low, medium, high, max".to_string(),
)),
}
}
}
#[derive(Debug, Clone)]
struct SubAgentInput {
text: String,
interrupt: bool,
}
#[derive(Debug, Clone)]
struct SpawnRequest {
session_name: Option<String>,
prompt: String,
agent_type: SubAgentType,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
model: Option<String>,
model_strength: SubAgentModelStrength,
thinking: SubAgentThinking,
cwd: Option<PathBuf>,
worktree: Option<SubAgentWorktreeRequest>,
resident_file: Option<String>,
fork_context: bool,
max_depth: Option<u32>,
token_budget: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct SubAgentWorktreeRequest {
branch: Option<String>,
path: Option<PathBuf>,
base_ref: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct AgentUsageBudgetScope {
scope_id: String,
limit: u64,
spent: u64,
remaining: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SubAgentCheckpoint {
pub checkpoint_id: String,
pub agent_id: String,
pub continuation_handle: String,
pub reason: String,
pub continuable: bool,
pub steps_taken: u32,
pub message_count: usize,
pub created_at_ms: u64,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub messages: Vec<Message>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedSubAgent {
id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
session_name: Option<String>,
#[serde(default)]
fork_context: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
workspace: Option<PathBuf>,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
#[serde(default)]
model: String,
#[serde(default)]
nickname: Option<String>,
status: SubAgentStatus,
result: Option<String>,
steps_taken: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
checkpoint: Option<SubAgentCheckpoint>,
#[serde(default, skip_serializing_if = "Option::is_none")]
needs_input: Option<SubAgentNeedsInput>,
duration_ms: u64,
allowed_tools: Vec<String>,
updated_at_ms: u64,
#[serde(default)]
session_boot_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedSubAgentState {
schema_version: u32,
agents: Vec<PersistedSubAgent>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
workers: Vec<AgentWorkerRecord>,
}
impl Default for PersistedSubAgentState {
fn default() -> Self {
Self {
schema_version: SUBAGENT_STATE_SCHEMA_VERSION,
agents: Vec::new(),
workers: Vec::new(),
}
}
}
pub const DEFAULT_MAX_SPAWN_DEPTH: u32 = codewhale_config::DEFAULT_SPAWN_DEPTH;
#[derive(Debug, Clone)]
pub struct SubAgentCompletion {
#[allow(dead_code)]
pub agent_id: String,
pub payload: String,
}
#[derive(Clone, Debug)]
pub struct SubAgentForkContext {
pub system: Option<SystemPrompt>,
pub messages: Vec<Message>,
pub structured_state_block: Option<String>,
}
#[derive(Clone)]
pub struct SubAgentRuntime {
pub client: DeepSeekClient,
pub model: String,
pub auto_model: bool,
pub reasoning_effort: Option<String>,
pub reasoning_effort_auto: bool,
pub role_models: HashMap<String, String>,
pub context: ToolContext,
pub allow_shell: bool,
pub worker_profile: WorkerRuntimeProfile,
pub event_tx: Option<mpsc::Sender<Event>>,
pub manager: SharedSubAgentManager,
pub spawn_depth: u32,
pub parent_agent_id: Option<String>,
pub max_spawn_depth: u32,
pub cancel_token: CancellationToken,
pub mailbox: Option<Mailbox>,
pub parent_completion_tx: Option<mpsc::UnboundedSender<SubAgentCompletion>>,
pub fork_context: Option<SubAgentForkContext>,
pub mcp_pool: Option<std::sync::Arc<tokio::sync::Mutex<crate::mcp::McpPool>>>,
pub step_api_timeout: Duration,
pub speech_output_dir: Option<PathBuf>,
pub todos: SharedTodoList,
}
impl SubAgentRuntime {
#[must_use]
pub fn new(
client: DeepSeekClient,
model: String,
context: ToolContext,
allow_shell: bool,
event_tx: Option<mpsc::Sender<Event>>,
manager: SharedSubAgentManager,
) -> Self {
Self {
client,
model,
auto_model: false,
reasoning_effort: None,
reasoning_effort_auto: false,
role_models: HashMap::new(),
context,
allow_shell,
worker_profile: WorkerRuntimeProfile::for_role(SubAgentType::General),
event_tx,
manager,
spawn_depth: 0,
parent_agent_id: None,
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
parent_completion_tx: None,
fork_context: None,
mcp_pool: None,
step_api_timeout: DEFAULT_STEP_API_TIMEOUT,
speech_output_dir: None,
todos: crate::tools::todo::new_shared_todo_list(),
}
}
#[must_use]
pub fn with_todos(mut self, todos: SharedTodoList) -> Self {
self.todos = todos;
self
}
#[must_use]
pub fn with_mcp_pool(
mut self,
pool: Option<std::sync::Arc<tokio::sync::Mutex<crate::mcp::McpPool>>>,
) -> Self {
self.mcp_pool = pool;
self
}
#[must_use]
pub fn with_step_api_timeout(mut self, timeout: Duration) -> Self {
self.step_api_timeout = timeout;
self
}
#[must_use]
pub fn with_speech_output_dir(mut self, output_dir: Option<PathBuf>) -> Self {
self.speech_output_dir = output_dir;
self
}
#[must_use]
pub fn with_parent_completion_tx(
mut self,
tx: mpsc::UnboundedSender<SubAgentCompletion>,
) -> Self {
self.parent_completion_tx = Some(tx);
self
}
#[must_use]
pub fn with_fork_context(mut self, context: SubAgentForkContext) -> Self {
self.fork_context = Some(context);
self
}
#[must_use]
#[allow(dead_code)] pub fn with_mailbox(mut self, mailbox: Mailbox) -> Self {
self.mailbox = Some(mailbox);
self
}
#[must_use]
#[allow(dead_code)] pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
self.cancel_token = token;
self
}
#[must_use]
#[allow(dead_code)]
pub fn with_max_spawn_depth(mut self, max: u32) -> Self {
self.max_spawn_depth = max;
self
}
#[must_use]
pub fn with_role_models(mut self, role_models: HashMap<String, String>) -> Self {
self.role_models = role_models;
self
}
#[must_use]
pub fn with_auto_model(mut self, auto_model: bool) -> Self {
self.auto_model = auto_model;
self
}
#[must_use]
pub fn with_reasoning_effort(
mut self,
reasoning_effort: Option<String>,
reasoning_effort_auto: bool,
) -> Self {
self.reasoning_effort = reasoning_effort;
self.reasoning_effort_auto = reasoning_effort_auto;
self
}
#[must_use]
pub fn background_runtime(&self) -> Self {
let mut runtime = self.child_runtime();
let token = CancellationToken::new();
runtime.cancel_token = token.clone();
runtime.context.cancel_token = Some(token);
runtime
}
#[must_use]
pub fn child_runtime(&self) -> Self {
let mut child_context = self.context.clone();
child_context.auto_approve = self.context.auto_approve;
Self {
client: self.client.clone(),
model: self.model.clone(),
auto_model: self.auto_model,
reasoning_effort: self.reasoning_effort.clone(),
reasoning_effort_auto: self.reasoning_effort_auto,
role_models: self.role_models.clone(),
context: child_context,
allow_shell: self.allow_shell,
worker_profile: self.worker_profile.clone(),
event_tx: self.event_tx.clone(),
manager: self.manager.clone(),
spawn_depth: self.spawn_depth + 1,
parent_agent_id: self.parent_agent_id.clone(),
max_spawn_depth: self.max_spawn_depth,
cancel_token: self.cancel_token.child_token(),
mailbox: self.mailbox.clone(),
parent_completion_tx: self.parent_completion_tx.clone(),
fork_context: self.fork_context.clone(),
mcp_pool: self.mcp_pool.clone(),
step_api_timeout: self.step_api_timeout,
speech_output_dir: self.speech_output_dir.clone(),
todos: self.todos.clone(),
}
}
#[must_use]
pub fn would_exceed_depth(&self) -> bool {
self.spawn_depth + 1 > self.max_spawn_depth
}
}
pub struct SubAgent {
pub id: String,
pub session_name: String,
pub fork_context: bool,
pub agent_type: SubAgentType,
pub prompt: String,
pub assignment: SubAgentAssignment,
pub model: String,
pub nickname: Option<String>,
pub status: SubAgentStatus,
pub result: Option<String>,
pub steps_taken: u32,
pub checkpoint: Option<SubAgentCheckpoint>,
pub needs_input: Option<SubAgentNeedsInput>,
pub started_at: Instant,
pub last_activity_at: Instant,
pub allowed_tools: Option<Vec<String>>,
pub session_boot_id: String,
pub workspace: PathBuf,
input_tx: Option<mpsc::UnboundedSender<SubAgentInput>>,
task_handle: Option<JoinHandle<()>>,
}
impl SubAgent {
#[allow(clippy::too_many_arguments)]
fn new(
id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
model: String,
nickname: Option<String>,
allowed_tools: Option<Vec<String>>,
input_tx: mpsc::UnboundedSender<SubAgentInput>,
workspace: PathBuf,
session_boot_id: String,
) -> Self {
let session_name = id.clone();
let started_at = Instant::now();
Self {
id,
session_name,
fork_context: false,
agent_type,
prompt,
assignment,
model,
nickname,
status: SubAgentStatus::Running,
result: None,
steps_taken: 0,
checkpoint: None,
needs_input: None,
started_at,
last_activity_at: started_at,
allowed_tools,
session_boot_id,
workspace,
input_tx: Some(input_tx),
task_handle: None,
}
}
#[must_use]
pub fn snapshot(&self) -> SubAgentResult {
SubAgentResult {
name: self.session_name.clone(),
agent_id: self.id.clone(),
context_mode: if self.fork_context { "forked" } else { "fresh" }.to_string(),
fork_context: self.fork_context,
workspace: Some(self.workspace.clone()),
git_branch: current_git_branch(&self.workspace),
agent_type: self.agent_type.clone(),
assignment: self.assignment.clone(),
model: self.model.clone(),
nickname: self.nickname.clone(),
status: self.status.clone(),
worker_status: None,
parent_run_id: None,
spawn_depth: 0,
result: self.result.clone(),
steps_taken: self.steps_taken,
checkpoint: self.checkpoint.clone(),
needs_input: self.needs_input.clone(),
duration_ms: u64::try_from(self.started_at.elapsed().as_millis()).unwrap_or(u64::MAX),
from_prior_session: false,
}
}
}
pub struct SubAgentManager {
agents: HashMap<String, SubAgent>,
worker_records: HashMap<String, AgentWorkerRecord>,
worker_event_seq: u64,
#[allow(dead_code)] workspace: PathBuf,
state_path: Option<PathBuf>,
max_steps: u32,
max_agents: usize,
max_admitted_agents: usize,
default_token_budget: Option<u64>,
running_heartbeat_timeout: Duration,
current_session_boot_id: String,
launch_gate: Arc<Semaphore>,
last_persist_at: Option<Instant>,
persist_pending: bool,
}
impl SubAgentManager {
#[must_use]
pub fn new(workspace: PathBuf, max_agents: usize) -> Self {
Self {
agents: HashMap::new(),
worker_records: HashMap::new(),
worker_event_seq: 0,
workspace,
state_path: None,
max_steps: DEFAULT_MAX_STEPS,
max_agents,
max_admitted_agents: max_agents,
default_token_budget: None,
running_heartbeat_timeout: Duration::from_secs(
crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS,
),
current_session_boot_id: format!("boot_{}", &Uuid::new_v4().to_string()[..12]),
launch_gate: Arc::new(Semaphore::new(max_agents.max(1))),
last_persist_at: None,
persist_pending: false,
}
}
#[must_use]
pub fn with_launch_concurrency(mut self, limit: usize) -> Self {
self.launch_gate = Arc::new(Semaphore::new(limit.clamp(1, self.max_agents)));
self
}
#[must_use]
pub fn with_admission_limit(mut self, max_admitted: usize) -> Self {
self.max_admitted_agents =
max_admitted.clamp(self.max_agents, crate::config::MAX_SUBAGENT_ADMISSION);
self
}
#[must_use]
pub fn with_default_token_budget(mut self, budget: Option<u64>) -> Self {
self.default_token_budget = positive_token_budget(budget);
self
}
#[cfg(test)]
pub fn session_boot_id(&self) -> &str {
&self.current_session_boot_id
}
fn is_from_prior_session(&self, agent: &SubAgent) -> bool {
agent.session_boot_id.is_empty() || agent.session_boot_id != self.current_session_boot_id
}
#[must_use]
fn with_state_path(mut self, path: PathBuf) -> Self {
self.state_path = Some(path);
self
}
#[must_use]
pub fn with_running_heartbeat_timeout(mut self, timeout: Duration) -> Self {
self.running_heartbeat_timeout = if timeout.is_zero() {
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS)
} else {
timeout
};
self
}
pub fn update_runtime_limits(
&mut self,
max_agents: usize,
max_admitted_agents: usize,
running_heartbeat_timeout: Duration,
launch_concurrency: usize,
default_token_budget: Option<u64>,
) -> bool {
self.max_agents = max_agents.clamp(1, crate::config::MAX_SUBAGENTS);
self.max_admitted_agents =
max_admitted_agents.clamp(self.max_agents, crate::config::MAX_SUBAGENT_ADMISSION);
self.default_token_budget = positive_token_budget(default_token_budget);
self.running_heartbeat_timeout = if running_heartbeat_timeout.is_zero() {
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS)
} else {
running_heartbeat_timeout
};
if self.running_count() == 0 {
self.launch_gate =
Arc::new(Semaphore::new(launch_concurrency.clamp(1, self.max_agents)));
true
} else {
false
}
}
fn persist_state(&self) -> Result<()> {
let Some(path) = self.state_path.as_ref() else {
return Ok(());
};
let now_ms = epoch_millis_now();
let mut agents = Vec::with_capacity(self.agents.len());
for agent in self.agents.values() {
agents.push(PersistedSubAgent {
id: agent.id.clone(),
session_name: Some(agent.session_name.clone()),
fork_context: agent.fork_context,
workspace: Some(agent.workspace.clone()),
agent_type: agent.agent_type.clone(),
prompt: agent.prompt.clone(),
assignment: agent.assignment.clone(),
model: agent.model.clone(),
nickname: agent.nickname.clone(),
status: agent.status.clone(),
result: agent.result.clone(),
steps_taken: agent.steps_taken,
checkpoint: agent.checkpoint.clone(),
needs_input: agent.needs_input.clone(),
duration_ms: u64::try_from(agent.started_at.elapsed().as_millis())
.unwrap_or(u64::MAX),
allowed_tools: agent.allowed_tools.clone().unwrap_or_default(),
updated_at_ms: now_ms,
session_boot_id: agent.session_boot_id.clone(),
});
}
agents.sort_by(|a, b| a.id.cmp(&b.id));
let payload = PersistedSubAgentState {
schema_version: SUBAGENT_STATE_SCHEMA_VERSION,
agents,
workers: self.sorted_worker_records(),
};
write_json_atomic(path, &payload)
}
fn persist_state_best_effort(&self) {
if let Err(err) = self.persist_state() {
tracing::warn!(target: "subagent", ?err, "failed to persist sub-agent state");
}
}
fn persist_state_debounced(&mut self) {
let now = Instant::now();
let due = match self.last_persist_at {
Some(last) => now.duration_since(last) >= SUBAGENT_PERSIST_DEBOUNCE,
None => true,
};
if due {
self.last_persist_at = Some(now);
self.persist_pending = false;
self.persist_state_best_effort();
let writes =
SUBAGENT_PERSIST_WRITES.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
if subagent_perf_enabled() {
let skipped = SUBAGENT_PERSIST_SKIPPED.load(std::sync::atomic::Ordering::Relaxed);
tracing::info!(
target: "subagent_perf",
writes,
skipped,
agents = self.agents.len(),
"checkpoint persist (debounced write)"
);
}
} else {
self.persist_pending = true;
SUBAGENT_PERSIST_SKIPPED.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
pub fn flush_pending_persist(&mut self) {
if self.persist_pending {
self.last_persist_at = Some(Instant::now());
self.persist_pending = false;
self.persist_state_best_effort();
}
}
fn load_state(&mut self) -> Result<()> {
let Some(path) = self.state_path.as_ref() else {
return Ok(());
};
if !path.exists() {
return Ok(());
}
let raw = fs::read_to_string(path)?;
let state = serde_json::from_str::<PersistedSubAgentState>(&raw)?;
if state.schema_version != SUBAGENT_STATE_SCHEMA_VERSION {
return Err(anyhow!(
"Unsupported sub-agent state schema {}",
state.schema_version
));
}
self.agents.clear();
self.worker_records.clear();
for persisted in state.agents {
let mut status = persisted.status;
if matches!(status, SubAgentStatus::Running) {
status = SubAgentStatus::Interrupted(SUBAGENT_RESTART_REASON.to_string());
}
let started_at = instant_from_duration(Duration::from_millis(persisted.duration_ms));
let allowed_tools = if persisted.allowed_tools.is_empty() {
None
} else {
Some(persisted.allowed_tools)
};
let agent = SubAgent {
id: persisted.id.clone(),
session_name: persisted
.session_name
.filter(|name| !name.trim().is_empty())
.unwrap_or_else(|| persisted.id.clone()),
fork_context: persisted.fork_context,
workspace: persisted
.workspace
.unwrap_or_else(|| self.workspace.clone()),
agent_type: persisted.agent_type,
prompt: persisted.prompt,
assignment: persisted.assignment,
model: if persisted.model.is_empty() {
"unknown".to_string()
} else {
persisted.model
},
nickname: persisted.nickname,
status,
result: persisted.result,
steps_taken: persisted.steps_taken,
checkpoint: persisted.checkpoint,
needs_input: persisted.needs_input,
started_at,
last_activity_at: started_at,
allowed_tools,
session_boot_id: persisted.session_boot_id,
input_tx: None,
task_handle: None,
};
self.agents.insert(persisted.id, agent);
}
for worker in state.workers {
let worker = normalize_worker_record(worker);
self.worker_event_seq = self.worker_event_seq.max(
worker
.events
.iter()
.map(|event| event.seq)
.max()
.unwrap_or(0),
);
self.worker_records
.insert(worker.spec.worker_id.clone(), worker);
}
self.refresh_all_budget_scopes();
self.prune_worker_records();
Ok(())
}
fn sorted_worker_records(&self) -> Vec<AgentWorkerRecord> {
let mut workers: Vec<_> = self.worker_records.values().cloned().collect();
workers.sort_by(|a, b| {
b.updated_at_ms
.cmp(&a.updated_at_ms)
.then_with(|| a.spec.worker_id.cmp(&b.spec.worker_id))
});
workers
}
fn prune_worker_records(&mut self) {
if self.worker_records.len() <= MAX_AGENT_WORKER_RECORDS {
return;
}
let keep_ids: std::collections::HashSet<String> = self
.sorted_worker_records()
.into_iter()
.take(MAX_AGENT_WORKER_RECORDS)
.map(|record| record.spec.worker_id)
.collect();
self.worker_records
.retain(|worker_id, _| keep_ids.contains(worker_id));
}
pub fn register_worker(&mut self, spec: AgentWorkerSpec) {
let worker_id = spec.worker_id.clone();
let now_ms = epoch_millis_now();
let mut record = AgentWorkerRecord::new(normalize_worker_spec(spec), now_ms);
self.push_worker_event(
&mut record,
AgentWorkerStatus::Starting,
Some("starting".to_string()),
None,
None,
now_ms,
);
self.worker_records.insert(worker_id, record);
self.prune_worker_records();
}
pub fn list_worker_records(&self) -> Vec<AgentWorkerRecord> {
self.sorted_worker_records()
}
pub fn get_worker_record(&self, worker_id: &str) -> Option<AgentWorkerRecord> {
self.worker_records.get(worker_id).cloned()
}
fn aggregate_budget_spent(&self, scope_id: &str) -> u64 {
self.worker_records
.values()
.filter(|record| record.usage.budget_scope.as_deref() == Some(scope_id))
.fold(0_u64, |total, record| {
total.saturating_add(record.usage.total_tokens.unwrap_or(0))
})
}
fn inherited_budget_scope(&self, parent_run_id: Option<&str>) -> Option<(String, u64)> {
let parent = self.worker_records.get(parent_run_id?)?;
let limit = parent.usage.token_budget?;
let scope_id = parent
.usage
.budget_scope
.clone()
.unwrap_or_else(|| parent.spec.worker_id.clone());
Some((scope_id, limit))
}
fn resolve_spawn_budget_scope(
&self,
worker_id: &str,
parent_run_id: Option<&str>,
requested_budget: Option<u64>,
) -> Result<Option<AgentUsageBudgetScope>> {
let scope = if let Some(limit) = positive_token_budget(requested_budget) {
Some((worker_id.to_string(), limit))
} else if let Some(parent_scope) = self.inherited_budget_scope(parent_run_id) {
Some(parent_scope)
} else {
self.default_token_budget
.map(|limit| (worker_id.to_string(), limit))
};
let Some((scope_id, limit)) = scope else {
return Ok(None);
};
let spent = self.aggregate_budget_spent(&scope_id);
let remaining = limit.saturating_sub(spent);
if remaining < MIN_SUBAGENT_SPAWN_TOKEN_RESERVE {
return Err(anyhow!(
"Sub-agent token budget exhausted for scope {scope_id}: {spent}/{limit} tokens spent, {remaining} remaining. Wait for the parent/Workflow to summarize results or start a new agent run with an explicit token_budget override."
));
}
Ok(Some(AgentUsageBudgetScope {
scope_id,
limit,
spent,
remaining,
}))
}
fn attach_budget_scope(&mut self, worker_id: &str, scope: AgentUsageBudgetScope) {
let Some(record) = self.worker_records.get_mut(worker_id) else {
return;
};
record.usage.token_budget = Some(scope.limit);
record.usage.budget_scope = Some(scope.scope_id.clone());
record.usage.budget_spent_tokens = Some(scope.spent);
record.usage.budget_remaining_tokens = Some(scope.remaining);
refresh_usage_note(&mut record.usage);
self.refresh_budget_scope(&scope.scope_id);
}
fn refresh_budget_scope(&mut self, scope_id: &str) {
let Some(limit) = self
.worker_records
.values()
.find(|record| record.usage.budget_scope.as_deref() == Some(scope_id))
.and_then(|record| record.usage.token_budget)
else {
return;
};
let spent = self.aggregate_budget_spent(scope_id);
let remaining = limit.saturating_sub(spent);
for record in self.worker_records.values_mut() {
if record.usage.budget_scope.as_deref() == Some(scope_id) {
record.usage.token_budget = Some(limit);
record.usage.budget_spent_tokens = Some(spent);
record.usage.budget_remaining_tokens = Some(remaining);
refresh_usage_note(&mut record.usage);
}
}
}
fn refresh_all_budget_scopes(&mut self) {
let scope_ids = self
.worker_records
.values()
.filter_map(|record| record.usage.budget_scope.clone())
.collect::<std::collections::HashSet<_>>();
for scope_id in scope_ids {
self.refresh_budget_scope(&scope_id);
}
}
fn record_worker_usage(&mut self, worker_id: &str, usage: &Usage) {
let now_ms = epoch_millis_now();
let total_delta = usage_total_tokens(usage);
let Some(record) = self.worker_records.get_mut(worker_id) else {
return;
};
record.updated_at_ms = now_ms;
record.usage.input_tokens = Some(
record
.usage
.input_tokens
.unwrap_or(0)
.saturating_add(u64::from(usage.input_tokens)),
);
record.usage.output_tokens = Some(
record
.usage
.output_tokens
.unwrap_or(0)
.saturating_add(u64::from(usage.output_tokens)),
);
record.usage.total_tokens = Some(
record
.usage
.total_tokens
.unwrap_or(0)
.saturating_add(total_delta),
);
let scope_id = record.usage.budget_scope.clone();
refresh_usage_note(&mut record.usage);
if let Some(scope_id) = scope_id {
self.refresh_budget_scope(&scope_id);
}
self.persist_state_debounced();
}
fn push_worker_event(
&mut self,
record: &mut AgentWorkerRecord,
status: AgentWorkerStatus,
message: Option<String>,
step: Option<u32>,
tool_name: Option<String>,
now_ms: u64,
) {
self.worker_event_seq = self.worker_event_seq.saturating_add(1);
record.events.push_back(AgentWorkerEvent {
seq: self.worker_event_seq,
worker_id: record.spec.worker_id.clone(),
status,
timestamp_ms: now_ms,
message,
step,
tool_name,
});
while record.events.len() > MAX_AGENT_WORKER_EVENTS_PER_RECORD {
record.events.pop_front();
}
}
fn record_worker_event(
&mut self,
worker_id: &str,
status: AgentWorkerStatus,
message: Option<String>,
step: Option<u32>,
tool_name: Option<String>,
) {
let now_ms = epoch_millis_now();
let Some(mut record) = self.worker_records.remove(worker_id) else {
return;
};
record.status = status;
record.recommended_action = recommended_action_for_worker_status(status, &record.spec);
record.updated_at_ms = now_ms;
record.latest_message = message.clone();
if matches!(
status,
AgentWorkerStatus::Starting | AgentWorkerStatus::Running
) && record.started_at_ms.is_none()
{
record.started_at_ms = Some(now_ms);
}
if matches!(
status,
AgentWorkerStatus::Completed
| AgentWorkerStatus::Failed
| AgentWorkerStatus::Cancelled
| AgentWorkerStatus::Interrupted
) {
record.completed_at_ms = Some(now_ms);
}
if let Some(step) = step {
record.steps_taken = step;
}
self.push_worker_event(&mut record, status, message, step, tool_name, now_ms);
self.worker_records.insert(worker_id.to_string(), record);
}
fn record_worker_progress(&mut self, worker_id: &str, message: String) {
let (status, step, tool_name) = worker_progress_event_parts(&message);
self.record_worker_event(worker_id, status, Some(message), step, tool_name);
}
fn complete_worker_from_result(&mut self, worker_id: &str, result: &SubAgentResult) {
let status = worker_status_from_subagent_result(result);
let message = match &result.status {
SubAgentStatus::Completed => Some("completed".to_string()),
SubAgentStatus::Failed(err) => Some(err.clone()),
SubAgentStatus::Interrupted(reason) => Some(reason.clone()),
SubAgentStatus::Cancelled => Some("cancelled".to_string()),
SubAgentStatus::BudgetExhausted => Some("token budget exhausted".to_string()),
SubAgentStatus::Running => Some("running".to_string()),
};
self.record_worker_event(worker_id, status, message, Some(result.steps_taken), None);
if let Some(record) = self.worker_records.get_mut(worker_id) {
record.result_summary = result.result.clone();
record.steps_taken = result.steps_taken;
if let SubAgentStatus::Failed(err) = &result.status {
record.error = Some(err.clone());
}
}
}
fn record_follow_up_delivery(
&mut self,
worker_id: &str,
delivered: bool,
message: Option<&str>,
reason: Option<&str>,
interrupt: bool,
continued_from_checkpoint: bool,
) {
let Some(record) = self.worker_records.get_mut(worker_id) else {
return;
};
let now_ms = epoch_millis_now();
record.updated_at_ms = now_ms;
record.follow_up.latest_delivery = Some(AgentRunFollowUpDelivery {
delivered,
timestamp_ms: now_ms,
message_preview: message.map(message_preview),
reason: reason.map(str::to_string),
interrupt,
continued_from_checkpoint,
});
if delivered {
record.latest_message = Some("follow-up delivered".to_string());
}
}
fn fail_worker(&mut self, worker_id: &str, error: String) {
self.record_worker_event(
worker_id,
AgentWorkerStatus::Failed,
Some(error.clone()),
None,
None,
);
if let Some(record) = self.worker_records.get_mut(worker_id) {
record.error = Some(error);
}
}
pub fn cancel_agent(&mut self, agent_ref: &str) -> Result<SubAgentResult> {
let agent_id = self.resolve_agent_ref(agent_ref)?;
let snapshot = {
let agent = self
.agents
.get_mut(&agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
if agent.status != SubAgentStatus::Running {
return Ok(agent.snapshot());
}
agent.status = SubAgentStatus::Cancelled;
agent.result = Some("Cancelled by parent request.".to_string());
release_resident_leases_for(&agent.id);
if let Some(handle) = agent.task_handle.take() {
handle.abort();
}
agent.input_tx = None;
agent.snapshot()
};
self.record_worker_event(
&agent_id,
AgentWorkerStatus::Cancelled,
snapshot.result.clone(),
Some(snapshot.steps_taken),
None,
);
self.persist_state_best_effort();
Ok(snapshot)
}
pub fn running_count(&self) -> usize {
self.admitted_count()
}
pub fn admitted_count(&self) -> usize {
self.agents
.values()
.filter(|agent| {
if agent.status != SubAgentStatus::Running {
return false;
}
if agent.task_handle.is_none() {
return false;
}
!self.running_heartbeat_timed_out(agent)
})
.count()
}
pub fn queued_count(&self) -> usize {
self.agents
.values()
.filter(|agent| {
agent.status == SubAgentStatus::Running
&& agent.task_handle.is_some()
&& !self.running_heartbeat_timed_out(agent)
&& self
.worker_records
.get(&agent.id)
.is_some_and(|record| record.status == AgentWorkerStatus::Queued)
})
.count()
}
pub fn active_count(&self) -> usize {
self.admitted_count().saturating_sub(self.queued_count())
}
fn check_admission_capacity(&self) -> Result<()> {
let admitted = self.admitted_count();
if admitted >= self.max_admitted_agents {
return Err(anyhow!(
"Sub-agent admission limit reached (max_admitted {}, admitted {}, running {}, queued {}). Wait for queued/running agents to finish, cancel unneeded agents, or raise [subagents] max_admitted for this Workflow.",
self.max_admitted_agents,
admitted,
self.active_count(),
self.queued_count()
));
}
Ok(())
}
fn running_heartbeat_timed_out(&self, agent: &SubAgent) -> bool {
agent.status == SubAgentStatus::Running
&& agent.task_handle.is_some()
&& agent.last_activity_at.elapsed() >= self.running_heartbeat_timeout
}
pub fn touch(&mut self, agent_id: &str) -> bool {
let Some(agent) = self.agents.get_mut(agent_id) else {
return false;
};
if agent.status != SubAgentStatus::Running {
return false;
}
agent.last_activity_at = Instant::now();
true
}
pub fn spawn_background(
&mut self,
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
allowed_tools: Option<Vec<String>>,
) -> Result<SubAgentResult> {
self.spawn_background_with_assignment(
manager_handle,
runtime,
agent_type,
prompt.clone(),
SubAgentAssignment::new(prompt, None),
allowed_tools,
)
}
pub fn spawn_background_with_assignment(
&mut self,
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
) -> Result<SubAgentResult> {
self.spawn_background_with_assignment_options(
manager_handle,
runtime,
agent_type,
prompt,
assignment,
allowed_tools,
SubAgentSpawnOptions::default(),
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn_background_with_assignment_options(
&mut self,
manager_handle: SharedSubAgentManager,
mut runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
options: SubAgentSpawnOptions,
) -> Result<SubAgentResult> {
self.cleanup(COMPLETED_AGENT_RETENTION);
self.check_admission_capacity()?;
if let Some(model) = options.model.as_deref() {
runtime.model = model.to_string();
}
let effective_model = runtime.model.clone();
let agent_id = format!("agent_{}", &Uuid::new_v4().to_string()[..8]);
let budget_scope = self.resolve_spawn_budget_scope(
&agent_id,
runtime.parent_agent_id.as_deref(),
options.token_budget,
)?;
let active_names: std::collections::HashSet<String> = self
.agents
.values()
.filter_map(|a| a.nickname.clone())
.collect();
let nickname = options
.nickname
.or_else(|| Some(assign_unique_whale_name(&agent_id, &active_names)));
let tools = build_allowed_tools(&agent_type, allowed_tools, runtime.allow_shell)?;
let (input_tx, input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
agent_id.clone(),
agent_type.clone(),
prompt.clone(),
assignment.clone(),
effective_model,
nickname,
tools.clone(),
input_tx,
runtime.context.workspace.clone(),
self.current_session_boot_id.clone(),
);
if let Some(name) = options
.name
.as_deref()
.map(str::trim)
.filter(|name| !name.is_empty())
{
if let Some(existing) = self
.agents
.values()
.find(|existing| existing.session_name == name)
{
let elapsed = existing.started_at.elapsed();
let since = if elapsed.as_secs() < 120 {
format!("{}s ago", elapsed.as_secs())
} else {
let mins = elapsed.as_secs() / 60;
let secs = elapsed.as_secs() % 60;
format!("{mins}m{secs}s ago")
};
return Err(anyhow!(
"Sub-agent session name '{name}' is already in use by agent_id '{}' \
(status: {}, started {since}). \
Wait for its completion event, or open a new agent with a different name.",
existing.id,
subagent_status_name(&existing.status)
));
}
agent.session_name = name.to_string();
}
agent.fork_context = options.fork_context;
let agent_id = agent.id.clone();
let started_at = agent.started_at;
let max_steps = self.max_steps;
let tool_profile = match tools.clone() {
Some(tools) => AgentWorkerToolProfile::Explicit(tools),
None => AgentWorkerToolProfile::Inherited,
};
let runtime_profile = worker_profile_for_spawn(
&runtime,
&agent_type,
&tool_profile,
&agent.model,
options.model_route.clone(),
);
runtime.worker_profile = runtime_profile.clone();
let worker_spec = AgentWorkerSpec {
worker_id: agent_id.clone(),
run_id: agent_id.clone(),
parent_run_id: runtime.parent_agent_id.clone(),
session_name: Some(agent.session_name.clone()),
objective: assignment.objective.clone(),
role: assignment.role.clone(),
agent_type: agent_type.clone(),
model: agent.model.clone(),
workspace: agent.workspace.clone(),
git_branch: current_git_branch(&agent.workspace),
context_mode: if options.fork_context {
"forked"
} else {
"fresh"
}
.to_string(),
fork_context: options.fork_context,
tool_profile,
runtime_profile,
max_steps,
spawn_depth: runtime.spawn_depth,
max_spawn_depth: runtime.max_spawn_depth,
};
self.register_worker(worker_spec);
if let Some(scope) = budget_scope {
self.attach_budget_scope(&agent_id, scope);
}
if let Some(event_tx) = runtime.event_tx.clone() {
let _ = event_tx.try_send(Event::AgentSpawned {
id: agent_id.clone(),
prompt: prompt.clone(),
parent_run_id: runtime.parent_agent_id.clone(),
spawn_depth: runtime.spawn_depth,
});
}
let launch_gate = (runtime.spawn_depth == 1).then(|| self.launch_gate.clone());
let task = SubAgentTask {
manager_handle,
runtime,
agent_id: agent_id.clone(),
agent_type,
prompt,
assignment,
allowed_tools: tools,
fork_context: options.fork_context,
started_at,
max_steps,
token_budget: options.token_budget,
input_rx,
launch_gate,
};
let handle = spawn_supervised(
"subagent-task",
std::panic::Location::caller(),
run_subagent_task(task),
);
agent.task_handle = Some(handle);
self.agents.insert(agent_id.clone(), agent);
self.persist_state_best_effort();
Ok(self
.agents
.get(&agent_id)
.expect("agent should exist after spawn")
.snapshot())
}
pub fn get_result(&self, agent_id: &str) -> Result<SubAgentResult> {
let agent = self
.agents
.get(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
Ok(agent.snapshot())
}
pub fn get_result_by_ref(&self, agent_ref: &str) -> Result<SubAgentResult> {
let agent_id = self.resolve_agent_ref(agent_ref)?;
self.get_result(&agent_id)
}
pub fn terminal_results_excluding(
&self,
delivered_ids: &std::collections::HashSet<String>,
) -> Vec<SubAgentResult> {
let mut results = self
.agents
.values()
.filter(|agent| agent.status != SubAgentStatus::Running)
.filter(|agent| agent.session_boot_id == self.current_session_boot_id)
.filter(|agent| {
self.worker_records
.get(&agent.id)
.is_none_or(|record| record.spec.parent_run_id.is_none())
})
.filter(|agent| !delivered_ids.contains(&agent.id))
.map(SubAgent::snapshot)
.collect::<Vec<_>>();
results.sort_by(|a, b| a.agent_id.cmp(&b.agent_id));
results
}
fn resolve_agent_ref(&self, agent_ref: &str) -> Result<String> {
let agent_ref = agent_ref.trim();
if self.agents.contains_key(agent_ref) {
return Ok(agent_ref.to_string());
}
let matches = self
.agents
.values()
.filter(|agent| agent.session_name == agent_ref)
.map(|agent| agent.id.clone())
.collect::<Vec<_>>();
match matches.as_slice() {
[id] => Ok(id.clone()),
[] => Err(anyhow!("Agent session {agent_ref} not found")),
_ => Err(anyhow!(
"Agent session name '{agent_ref}' is ambiguous; use an agent_id"
)),
}
}
#[must_use]
fn snapshot_for_listing(&self, agent: &SubAgent) -> SubAgentResult {
let mut snap = agent.snapshot();
snap.from_prior_session = self.is_from_prior_session(agent);
if let Some(record) = self.worker_records.get(&agent.id) {
snap.worker_status = Some(record.status);
snap.parent_run_id = record
.parent_run_id
.clone()
.or_else(|| record.spec.parent_run_id.clone());
snap.spawn_depth = record.spec.spawn_depth;
}
snap
}
pub fn list(&self) -> Vec<SubAgentResult> {
self.agents
.values()
.map(|agent| self.snapshot_for_listing(agent))
.collect()
}
pub fn list_filtered(&self, include_archived: bool) -> Vec<SubAgentResult> {
self.agents
.values()
.filter(|agent| {
if include_archived {
return true;
}
if agent.status == SubAgentStatus::Running {
return true;
}
!self.is_from_prior_session(agent)
})
.map(|agent| self.snapshot_for_listing(agent))
.collect()
}
pub fn cleanup(&mut self, max_age: Duration) -> usize {
let before = self.agents.len();
let mut auto_cancelled = 0;
let timeout = self.running_heartbeat_timeout;
let mut worker_cancellations = Vec::new();
for agent in self.agents.values_mut() {
if agent.status == SubAgentStatus::Running
&& agent.task_handle.is_some()
&& agent.last_activity_at.elapsed() >= timeout
{
tracing::warn!(
target: "subagent",
agent_id = %agent.id,
timeout_secs = timeout.as_secs(),
"auto-cancelling stale sub-agent with no manager-visible progress"
);
agent.status = SubAgentStatus::Cancelled;
agent.result = Some(format!(
"Auto-cancelled after {}s without sub-agent progress.",
timeout.as_secs()
));
release_resident_leases_for(&agent.id);
if let Some(handle) = agent.task_handle.take() {
handle.abort();
}
agent.input_tx = None;
worker_cancellations.push((
agent.id.clone(),
agent.result.clone(),
agent.steps_taken,
));
auto_cancelled += 1;
}
}
for (agent_id, message, steps_taken) in worker_cancellations {
self.record_worker_event(
&agent_id,
AgentWorkerStatus::Cancelled,
message,
Some(steps_taken),
None,
);
}
self.agents.retain(|_, agent| {
if agent.status == SubAgentStatus::Running {
true
} else {
agent.started_at.elapsed() < max_age
}
});
if self.agents.len() != before || auto_cancelled > 0 {
self.persist_state_best_effort();
}
auto_cancelled
}
fn update_from_result(&mut self, agent_id: &str, result: SubAgentResult) {
let mut changed = false;
if let Some(agent) = self.agents.get_mut(agent_id) {
agent.status = result.status.clone();
agent.assignment = result.assignment.clone();
agent.result = result.result.clone();
agent.steps_taken = result.steps_taken;
agent.checkpoint = result.checkpoint.clone();
agent.needs_input = result.needs_input.clone();
if result.status != SubAgentStatus::Running {
agent.input_tx = None;
}
agent.task_handle = None;
changed = true;
}
self.complete_worker_from_result(agent_id, &result);
if changed {
self.persist_state_best_effort();
}
}
fn update_failed(&mut self, agent_id: &str, error: String) {
let mut changed = false;
if let Some(agent) = self.agents.get_mut(agent_id) {
agent.status = SubAgentStatus::Failed(error.clone());
release_resident_leases_for(agent_id);
agent.input_tx = None;
agent.task_handle = None;
changed = true;
}
self.fail_worker(agent_id, error);
if changed {
self.persist_state_best_effort();
}
}
fn update_checkpoint(&mut self, agent_id: &str, checkpoint: SubAgentCheckpoint) -> bool {
let Some(agent) = self.agents.get_mut(agent_id) else {
return false;
};
agent.steps_taken = checkpoint.steps_taken;
agent.checkpoint = Some(checkpoint);
agent.last_activity_at = Instant::now();
self.persist_state_debounced();
true
}
fn interrupt_with_checkpoint(
&mut self,
agent_id: &str,
reason: String,
checkpoint: SubAgentCheckpoint,
needs_input: Option<SubAgentNeedsInput>,
) -> Result<SubAgentResult> {
let snapshot = {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
agent.status = SubAgentStatus::Interrupted(reason.clone());
agent.result = Some(reason);
agent.steps_taken = checkpoint.steps_taken;
agent.checkpoint = Some(checkpoint);
agent.needs_input = needs_input;
agent.last_activity_at = Instant::now();
release_resident_leases_for(agent_id);
agent.snapshot()
};
self.record_worker_event(
agent_id,
AgentWorkerStatus::Interrupted,
snapshot.result.clone(),
Some(snapshot.steps_taken),
None,
);
self.persist_state_best_effort();
Ok(snapshot)
}
fn continue_checkpointed(
&mut self,
agent_id: &str,
message: Option<String>,
interrupt: bool,
) -> Result<SubAgentResult> {
let snapshot = {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
if !matches!(agent.status, SubAgentStatus::Interrupted(_)) {
return Err(anyhow!(
"Agent {agent_id} is not interrupted; checkpoint continuation is only available for interrupted sessions"
));
}
let checkpoint = agent
.checkpoint
.as_ref()
.ok_or_else(|| anyhow!("Agent {agent_id} has no checkpoint to continue"))?;
if !checkpoint.continuable || checkpoint.messages.is_empty() {
return Err(anyhow!("Agent {agent_id} checkpoint is not continuable"));
}
let tx = agent.input_tx.as_ref().ok_or_else(|| {
anyhow!(
"Agent {agent_id} checkpoint is persisted, but no live child task is available to continue"
)
})?;
tx.send(SubAgentInput {
text: message.unwrap_or_default(),
interrupt,
})
.map_err(|_| anyhow!("Failed to continue checkpointed agent {agent_id}"))?;
agent.status = SubAgentStatus::Running;
agent.result = None;
agent.last_activity_at = Instant::now();
agent.snapshot()
};
self.record_worker_event(
agent_id,
AgentWorkerStatus::Running,
Some("continued from checkpoint".to_string()),
Some(snapshot.steps_taken),
None,
);
self.persist_state_best_effort();
Ok(snapshot)
}
}
pub type SharedSubAgentManager = Arc<RwLock<SubAgentManager>>;
pub fn load_persisted_agent_worker_records(workspace: &Path) -> Result<Vec<AgentWorkerRecord>> {
let mut manager = SubAgentManager::new(workspace.to_path_buf(), 1)
.with_state_path(default_state_path(workspace));
manager.load_state()?;
Ok(manager.list_worker_records())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentSessionProjection {
pub name: String,
pub agent_id: String,
#[serde(default)]
pub run_id: String,
pub status: String,
pub terminal: bool,
pub context_mode: String,
pub fork_context: bool,
pub prefix_cache: SubAgentPrefixCacheProjection,
pub transcript_handle: VarHandle,
#[serde(default = "default_agent_run_follow_up")]
pub follow_up: AgentRunFollowUpTarget,
#[serde(default = "default_agent_run_takeover")]
pub takeover: AgentRunTakeoverTarget,
#[serde(default)]
pub artifacts: Vec<AgentRunArtifactRef>,
#[serde(default = "default_agent_run_usage")]
pub usage: AgentRunUsage,
#[serde(default = "default_agent_run_verification")]
pub verification: AgentRunVerificationSummary,
pub snapshot: SubAgentResult,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint: Option<SubAgentCheckpoint>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub needs_input: Option<SubAgentNeedsInput>,
#[serde(default, skip_serializing_if = "is_false")]
pub continuable: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub needs_continuation: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub timed_out: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub timed_out_with_checkpoint: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_record: Option<AgentWorkerRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentPrefixCacheProjection {
pub mode: String,
pub parent_prefix: String,
pub deepseek_prefix_cache_reuse: String,
}
fn subagent_prefix_cache_projection(snapshot: &SubAgentResult) -> SubAgentPrefixCacheProjection {
if snapshot.fork_context {
SubAgentPrefixCacheProjection {
mode: "forked".to_string(),
parent_prefix: "preserved_byte_identical_when_available".to_string(),
deepseek_prefix_cache_reuse: "optimized_for_existing_parent_prefill".to_string(),
}
} else {
SubAgentPrefixCacheProjection {
mode: "fresh".to_string(),
parent_prefix: "not_inherited".to_string(),
deepseek_prefix_cache_reuse: "independent_child_prefill".to_string(),
}
}
}
fn subagent_checkpoint_is_continuable(snapshot: &SubAgentResult) -> bool {
matches!(snapshot.status, SubAgentStatus::Interrupted(_))
&& snapshot
.checkpoint
.as_ref()
.is_some_and(|checkpoint| checkpoint.continuable && !checkpoint.messages.is_empty())
}
async fn subagent_session_projection(
snapshot: SubAgentResult,
timed_out: bool,
context: &ToolContext,
worker_record: Option<AgentWorkerRecord>,
) -> SubAgentSessionProjection {
let transcript_session_id = format!("agent:{}", snapshot.agent_id);
let continuable = subagent_checkpoint_is_continuable(&snapshot);
let transcript_payload = json!({
"kind": "subagent_session_snapshot",
"agent_id": snapshot.agent_id.clone(),
"name": snapshot.name.clone(),
"status": subagent_status_name(&snapshot.status),
"context_mode": snapshot.context_mode.clone(),
"fork_context": snapshot.fork_context,
"result": snapshot.result.clone(),
"steps_taken": snapshot.steps_taken,
"duration_ms": snapshot.duration_ms,
"assignment": snapshot.assignment.clone(),
"checkpoint": snapshot.checkpoint.clone(),
"needs_input": snapshot.needs_input.clone(),
"needs_continuation": continuable,
"timed_out_with_checkpoint": timed_out && continuable,
"snapshot": snapshot.clone(),
});
let transcript_handle = {
let mut store = context.runtime.handle_store.lock().await;
let full_transcript_lookup = VarHandle {
kind: "var_handle".to_string(),
session_id: transcript_session_id.clone(),
name: "full_transcript".to_string(),
type_name: String::new(),
length: 0,
repr_preview: String::new(),
sha256: String::new(),
};
if snapshot.status != SubAgentStatus::Running
&& let Some(record) = store.get(&full_transcript_lookup)
{
record.handle.clone()
} else {
store.insert_json(transcript_session_id, "transcript", transcript_payload)
}
};
let run_id = worker_record
.as_ref()
.map(|record| agent_worker_run_id(&record.spec))
.unwrap_or_else(|| snapshot.agent_id.clone());
let follow_up = worker_record
.as_ref()
.map(|record| record.follow_up.clone())
.unwrap_or_else(|| AgentRunFollowUpTarget {
tool: default_agent_inspect_tool(),
agent_id: snapshot.agent_id.clone(),
session_name: Some(snapshot.name.clone()),
accepted_statuses: vec!["running".to_string(), "interrupted_continuable".to_string()],
latest_delivery: None,
});
let takeover = worker_record
.as_ref()
.map(|record| record.takeover.clone())
.unwrap_or_else(|| AgentRunTakeoverTarget {
kind: default_subagent_takeover_kind(),
supported: true,
agent_id: snapshot.agent_id.clone(),
session_name: Some(snapshot.name.clone()),
instructions: format!(
"Inspect agent '{}' through the returned transcript_handle with handle_read; open a replacement with agent if the lane no longer fits.",
snapshot.agent_id
),
unsupported_reason: None,
});
let artifacts = worker_record
.as_ref()
.map(|record| record.artifacts.clone())
.unwrap_or_else(|| default_subagent_artifacts(&run_id));
let usage = worker_record
.as_ref()
.map(|record| record.usage.clone())
.unwrap_or_else(default_agent_run_usage);
let verification = worker_record
.as_ref()
.map(|record| record.verification.clone())
.unwrap_or_else(default_agent_run_verification);
let status = worker_record
.as_ref()
.map(|record| agent_worker_status_name(record.status))
.unwrap_or_else(|| agent_worker_status_name(worker_status_from_subagent_result(&snapshot)))
.to_string();
SubAgentSessionProjection {
name: snapshot.name.clone(),
agent_id: snapshot.agent_id.clone(),
run_id,
status,
terminal: snapshot.status != SubAgentStatus::Running,
context_mode: snapshot.context_mode.clone(),
fork_context: snapshot.fork_context,
prefix_cache: subagent_prefix_cache_projection(&snapshot),
transcript_handle,
follow_up,
takeover,
artifacts,
usage,
verification,
checkpoint: snapshot.checkpoint.clone(),
needs_input: snapshot.needs_input.clone(),
continuable: subagent_checkpoint_is_continuable(&snapshot),
needs_continuation: continuable,
snapshot,
timed_out,
timed_out_with_checkpoint: timed_out && continuable,
worker_record,
}
}
fn default_state_path(workspace: &Path) -> PathBuf {
let primary = workspace.join(".codewhale").join("state");
if primary.exists() {
return primary.join(SUBAGENT_STATE_FILE);
}
workspace
.join(".deepseek")
.join("state")
.join(SUBAGENT_STATE_FILE)
}
fn epoch_millis_now() -> u64 {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
Err(_) => 0,
}
}
fn instant_from_duration(duration: Duration) -> Instant {
Instant::now()
.checked_sub(duration)
.unwrap_or_else(Instant::now)
}
fn write_json_atomic<T: Serialize>(path: &Path, value: &T) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let payload = serde_json::to_string_pretty(value)?;
let tmp_path = path.with_extension("tmp");
fs::write(&tmp_path, payload)?;
fs::rename(tmp_path, path)?;
Ok(())
}
#[must_use]
pub fn new_shared_subagent_manager(workspace: PathBuf, max_agents: usize) -> SharedSubAgentManager {
new_shared_subagent_manager_with_timeout(
workspace,
max_agents,
max_agents,
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS),
max_agents,
None,
)
}
#[must_use]
pub fn new_shared_subagent_manager_with_timeout(
workspace: PathBuf,
max_agents: usize,
max_admitted_agents: usize,
running_heartbeat_timeout: Duration,
launch_concurrency: usize,
default_token_budget: Option<u64>,
) -> SharedSubAgentManager {
let max_agents = max_agents.clamp(1, MAX_SUBAGENTS);
let state_path = default_state_path(&workspace);
let mut manager = SubAgentManager::new(workspace, max_agents)
.with_admission_limit(max_admitted_agents)
.with_running_heartbeat_timeout(running_heartbeat_timeout)
.with_launch_concurrency(launch_concurrency)
.with_default_token_budget(default_token_budget)
.with_state_path(state_path);
if let Err(err) = manager.load_state() {
tracing::warn!(target: "subagent", ?err, "failed to load sub-agent state");
}
Arc::new(RwLock::new(manager))
}
pub struct AgentTool {
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
}
impl AgentTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, runtime: SubAgentRuntime) -> Self {
Self { manager, runtime }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AgentToolAction {
Start,
Status,
Peek,
Cancel,
}
fn parse_agent_tool_action(input: &Value) -> Result<AgentToolAction, ToolError> {
let Some(action) = optional_input_str(input, &["action", "op"]) else {
return Ok(AgentToolAction::Start);
};
match action.trim().to_ascii_lowercase().as_str() {
"" | "start" | "spawn" | "run" => Ok(AgentToolAction::Start),
"status" | "list" | "inspect" => Ok(AgentToolAction::Status),
"peek" | "progress" => Ok(AgentToolAction::Peek),
"cancel" | "stop" | "abort" => Ok(AgentToolAction::Cancel),
other => Err(ToolError::invalid_input(format!(
"Invalid agent action '{other}'. Use start, status, peek, or cancel."
))),
}
}
fn parse_agent_ref(input: &Value) -> Option<String> {
optional_input_str(input, &["agent_id", "id", "session_name", "name"])
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
#[async_trait]
impl ToolSpec for AgentTool {
fn name(&self) -> &'static str {
"agent"
}
fn description(&self) -> &'static str {
concat!(
"Start, inspect, peek at, or cancel focused child agent tasks through one surface. Use start only for independent work that benefits from a clean context. ",
"For several independent targets, call agent separately for each target; CodeWhale runs or queues them under runtime capacity and provider rate-limit backpressure. ",
"The child runs in the background and reports back automatically when finished; keep tiny reads/searches local. ",
"Use action=status or action=peek with agent_id to inspect progress, and action=cancel with agent_id to stop a running child. Returns session projections with transcript_handle for UI/debug inspection."
)
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["start", "status", "peek", "cancel"],
"description": "start (default) launches a child. status lists current children or inspects agent_id. peek is status for one child. cancel stops a running child by agent_id."
},
"agent_id": {
"type": "string",
"description": "Agent id or session name for action=status, action=peek, or action=cancel."
},
"include_archived": {
"type": "boolean",
"description": "For action=status without agent_id, include prior-session completed agents."
},
"name": {
"type": "string",
"description": "For action=start, optional stable session name. For status/peek/cancel, accepted as an alias for agent_id."
},
"prompt": {
"type": "string",
"description": "Focused task for the child agent. Prefer a compact Subagent Brief with QUESTION, SCOPE, ALREADY_KNOWN, EFFORT, STOP_CONDITION, and OUTPUT."
},
"type": {
"type": "string",
"description": SUBAGENT_TYPE_DESCRIPTION
},
"model_strength": {
"type": "string",
"enum": ["same", "faster"],
"description": "Optional child model strength. Use same when the child should be as capable as the current model. Use faster for type=explore, read-only lookup/search, status, or other low-risk tasks that can run on a smaller/faster same-family sibling; CodeWhale maps known families such as DeepSeek V4 Pro to Flash and GLM-5.2 to GLM-5-Turbo. type=explore defaults to faster unless you pass model_strength or model explicitly. No hidden auto-downgrade happens."
},
"model": {
"type": "string",
"description": "Optional exact provider model id for the child. Overrides model_strength. Prefer model_strength unless you know the provider-specific id."
},
"thinking": {
"type": "string",
"enum": ["inherit", "auto", "off", "low", "medium", "high", "max"],
"description": "Optional child thinking budget. inherit (default) follows the parent thinking mode. auto chooses from the child prompt. off is best for faster explore/lookups. high is for normal reasoning. max is for hard design/debug/release/security work. Explicit thinking overrides the default off used by model_strength=faster."
},
"cwd": {
"type": "string",
"description": "Optional pre-existing working directory for the child; must be inside the parent workspace. Prefer worktree=true for isolated parallel edit tasks."
},
"worktree": {
"type": "boolean",
"description": "When true, create a fresh git worktree and branch for this child before it starts. Use for parallel edit tasks that must not collide with the parent checkout."
},
"worktree_branch": {
"type": "string",
"description": "Optional branch name for worktree=true. Defaults to codex/agent-<name>-<id>."
},
"worktree_base": {
"type": "string",
"description": "Optional git ref to branch the worktree from. Defaults to HEAD in the parent checkout."
},
"worktree_path": {
"type": "string",
"description": "Optional worktree checkout path. Relative paths are created under the default sibling .codewhale-worktrees directory, not inside the parent checkout."
},
"fork_context": {
"type": "boolean",
"description": "false (default): fresh child context. true: include the current parent context prefix when the child needs it."
},
"max_depth": {
"type": "integer",
"minimum": 0,
"maximum": 3,
"description": "Optional remaining nested-agent depth budget for this child. Defaults to the configured runtime budget."
},
"token_budget": {
"type": "integer",
"minimum": 1,
"description": "Optional aggregate token budget for this child and descendants. When unset, the child inherits the parent budget pool or the configured root default."
}
},
"required": []
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<ToolResult, ToolError> {
let action = parse_agent_tool_action(&input)?;
match action {
AgentToolAction::Start => {}
AgentToolAction::Status | AgentToolAction::Peek => {
return inspect_agent_from_input(
&input,
self.manager.clone(),
context,
matches!(action, AgentToolAction::Peek),
)
.await;
}
AgentToolAction::Cancel => {
return cancel_agent_from_input(&input, self.manager.clone(), context).await;
}
}
let snapshot =
spawn_subagent_from_input(input, self.manager.clone(), self.runtime.clone()).await?;
let worker_record = {
let manager = self.manager.read().await;
manager.get_worker_record(&snapshot.agent_id)
};
let projection = subagent_session_projection(snapshot, false, context, worker_record).await;
let mut tool_result = ToolResult::json(&projection)
.map_err(|e| ToolError::execution_failed(e.to_string()))?;
tool_result.metadata = Some(json!({
"status": projection.status,
"terminal": projection.terminal,
"context_mode": projection.context_mode,
"prefix_cache": projection.prefix_cache,
}));
Ok(tool_result)
}
}
async fn inspect_agent_from_input(
input: &Value,
manager: SharedSubAgentManager,
context: &ToolContext,
peek: bool,
) -> Result<ToolResult, ToolError> {
let include_archived =
parse_optional_bool(input, &["include_archived", "includeArchived"]).unwrap_or(false);
if let Some(agent_ref) = parse_agent_ref(input) {
let (snapshot, worker_record) = {
let manager = manager.read().await;
let snapshot = manager
.get_result_by_ref(&agent_ref)
.map_err(|err| ToolError::invalid_input(err.to_string()))?;
let worker_record = manager.get_worker_record(&snapshot.agent_id);
(snapshot, worker_record)
};
let projection =
subagent_session_projection(snapshot, include_archived, context, worker_record).await;
let mut tool_result = ToolResult::json(&projection)
.map_err(|err| ToolError::execution_failed(err.to_string()))?;
tool_result.metadata = Some(json!({
"action": if peek { "peek" } else { "status" },
"status": projection.status,
"terminal": projection.terminal,
"agent_id": projection.agent_id,
}));
return Ok(tool_result);
}
let snapshots = {
let manager = manager.read().await;
manager
.list_filtered(include_archived)
.into_iter()
.map(|snapshot| {
let worker_record = manager.get_worker_record(&snapshot.agent_id);
(snapshot, worker_record)
})
.collect::<Vec<_>>()
};
let mut projections = Vec::with_capacity(snapshots.len());
for (snapshot, worker_record) in snapshots {
projections.push(
subagent_session_projection(snapshot, include_archived, context, worker_record).await,
);
}
let payload = json!({
"action": if peek { "peek" } else { "status" },
"count": projections.len(),
"agents": projections,
});
let mut tool_result =
ToolResult::json(&payload).map_err(|err| ToolError::execution_failed(err.to_string()))?;
tool_result.metadata = Some(json!({
"action": if peek { "peek" } else { "status" },
"count": payload["count"],
}));
Ok(tool_result)
}
async fn cancel_agent_from_input(
input: &Value,
manager: SharedSubAgentManager,
context: &ToolContext,
) -> Result<ToolResult, ToolError> {
let agent_ref = parse_agent_ref(input).ok_or_else(|| ToolError::missing_field("agent_id"))?;
let (snapshot, worker_record) = {
let mut manager = manager.write().await;
let snapshot = manager
.cancel_agent(&agent_ref)
.map_err(|err| ToolError::invalid_input(err.to_string()))?;
let worker_record = manager.get_worker_record(&snapshot.agent_id);
(snapshot, worker_record)
};
let projection = subagent_session_projection(snapshot, false, context, worker_record).await;
let mut tool_result = ToolResult::json(&projection)
.map_err(|err| ToolError::execution_failed(err.to_string()))?;
tool_result.metadata = Some(json!({
"action": "cancel",
"status": projection.status,
"terminal": projection.terminal,
"agent_id": projection.agent_id,
}));
Ok(tool_result)
}
async fn spawn_subagent_from_input(
input: Value,
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
) -> Result<SubAgentResult, ToolError> {
let spawn_request = parse_spawn_request(&input)?;
if runtime.would_exceed_depth() {
return Err(ToolError::execution_failed(format!(
"Sub-agent depth limit reached (current depth {}, max {}). \
Increase via [subagents] max_depth in config.toml.",
runtime.spawn_depth, runtime.max_spawn_depth
)));
}
if let Some(remaining) = crate::retry_status::rate_limit_remaining() {
let seconds = remaining.as_secs() + u64::from(remaining.subsec_nanos() > 0);
return Err(ToolError::execution_failed(format!(
"Provider is rate-limiting; sub-agent spawning is paused for {seconds}s. \
Wait for the current backoff window before starting new agent work."
)));
}
if spawn_request.worktree.is_some() {
let manager_guard = manager.read().await;
manager_guard
.check_admission_capacity()
.map_err(|err| ToolError::execution_failed(err.to_string()))?;
}
let child_workspace = prepare_child_workspace(&runtime.context.workspace, &spawn_request)?;
let mut child_runtime = runtime.background_runtime();
if let Some(max_depth) = spawn_request.max_depth {
child_runtime.max_spawn_depth = child_runtime.spawn_depth.saturating_add(max_depth);
}
if let Some(workspace) = child_workspace {
child_runtime.context.workspace = workspace;
}
let configured_model = match spawn_request.model.clone() {
Some(model) => Some(normalize_requested_subagent_model(
&model,
"model",
runtime.client.api_provider(),
)?),
None => configured_model_for_role_or_type(
&runtime,
spawn_request.assignment.role.as_deref(),
&spawn_request.agent_type,
)?,
};
let (effective_prompt, _resident_conflict) =
if let Some(ref file_path) = spawn_request.resident_file {
let abs_path = if std::path::Path::new(file_path).is_absolute() {
std::path::PathBuf::from(file_path)
} else {
runtime.context.workspace.join(file_path)
};
let file_contents = std::fs::read_to_string(&abs_path)
.unwrap_or_else(|e| format!("<!-- resident_file read error: {e} -->"));
let prefixed = format!(
"<!-- resident_file: {file_path} -->\n```\n{file_contents}\n```\n\n{}",
spawn_request.prompt
);
let conflict = {
let leases = RESIDENT_LEASES.get_or_init(|| std::sync::Mutex::new(HashMap::new()));
let mut guard = leases.lock().unwrap_or_else(|p| p.into_inner());
if let Some(owner) = guard.get(file_path) {
Some(format!(
"Warning: agent {owner} already holds a resident lease on {file_path}"
))
} else {
guard.insert(file_path.clone(), "pending".to_string());
None
}
};
(prefixed, conflict)
} else {
(spawn_request.prompt, None)
};
let route = resolve_subagent_assignment_route(
&runtime,
configured_model,
&effective_prompt,
&spawn_request.agent_type,
spawn_request.model_strength.model_route(),
spawn_request.thinking,
)
.await;
child_runtime.model = route.model.clone();
child_runtime.reasoning_effort = route.reasoning_effort.clone();
child_runtime.reasoning_effort_auto = false;
let effective_model = route.model;
let model_route = route.model_route;
let mut manager_guard = manager.write().await;
let result = manager_guard
.spawn_background_with_assignment_options(
Arc::clone(&manager),
child_runtime,
spawn_request.agent_type,
effective_prompt,
spawn_request.assignment,
spawn_request.allowed_tools,
SubAgentSpawnOptions {
name: spawn_request.session_name.clone(),
model: Some(effective_model),
model_route: Some(model_route),
nickname: None,
fork_context: spawn_request.fork_context,
token_budget: spawn_request.token_budget,
},
)
.map_err(|e| ToolError::execution_failed(format!("Failed to spawn sub-agent: {e}")))?;
if let Some(ref file_path) = spawn_request.resident_file
&& let Some(lock) = RESIDENT_LEASES.get()
&& let Ok(mut guard) = lock.lock()
&& let Some(owner) = guard.get_mut(file_path)
&& owner == "pending"
{
*owner = result.agent_id.clone();
}
Ok(result)
}
fn build_subagent_system_prompt(
agent_type: &SubAgentType,
assignment: &SubAgentAssignment,
) -> String {
let base = agent_type.system_prompt();
let mut prompt = match assignment.role.as_deref() {
Some(role) if !role.trim().is_empty() => {
format!(
"{base}\n\nYou are operating in the role of `{}`.",
role.trim()
)
}
_ => base,
};
prompt.push_str(
"\n\nYou are a background sub-agent: every instruction comes from the orchestrating agent, not a human. Never address the end user or ask them questions — do the assigned work and report results back to the orchestrator.",
);
prompt
}
fn subagent_request_system_prompt(
subagent_system_prompt: &str,
fork_context: Option<&SubAgentForkContext>,
) -> SystemPrompt {
fork_context
.and_then(|context| context.system.clone())
.unwrap_or_else(|| SystemPrompt::Text(subagent_system_prompt.to_string()))
}
fn build_initial_subagent_messages(
prompt: &str,
assignment: &SubAgentAssignment,
agent_type: &SubAgentType,
fork_context: Option<&SubAgentForkContext>,
) -> Vec<Message> {
let mut messages = fork_context
.map(|context| context.messages.clone())
.unwrap_or_default();
if let Some(context) = fork_context {
if let Some(state) = context
.structured_state_block
.as_deref()
.map(str::trim)
.filter(|state| !state.is_empty())
{
messages.push(system_text_message(format!(
"<codewhale:fork_state>\n{state}\n</codewhale:fork_state>"
)));
}
messages.push(system_text_message(format!(
"<codewhale:subagent_context>\n{}\n</codewhale:subagent_context>",
build_subagent_system_prompt(agent_type, assignment)
)));
}
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: build_assignment_prompt(prompt, assignment, agent_type),
cache_control: None,
}],
});
messages
}
fn system_text_message(text: String) -> Message {
Message {
role: "system".to_string(),
content: vec![ContentBlock::Text {
text,
cache_control: None,
}],
}
}
struct SubAgentTask {
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
fork_context: bool,
started_at: Instant,
max_steps: u32,
token_budget: Option<u64>,
input_rx: mpsc::UnboundedReceiver<SubAgentInput>,
launch_gate: Option<Arc<Semaphore>>,
}
#[allow(clippy::too_many_lines)]
async fn run_subagent_task(task: SubAgentTask) {
let mut _launch_permit = None;
if let Some(gate) = task.launch_gate.as_ref() {
match Arc::clone(gate).try_acquire_owned() {
Ok(permit) => _launch_permit = Some(permit),
Err(tokio::sync::TryAcquireError::NoPermits) => {
_launch_permit = acquire_queued_launch_permit(&task, Arc::clone(gate)).await;
}
Err(tokio::sync::TryAcquireError::Closed) => {
crate::logging::warn(format!(
"sub-agent launch gate closed for {}; proceeding without backpressure",
task.agent_id
));
}
}
}
let result = run_subagent(
&task.runtime,
task.agent_id.clone(),
task.agent_type,
task.prompt,
task.assignment,
task.allowed_tools,
task.fork_context,
task.started_at,
task.max_steps,
task.token_budget,
task.input_rx,
)
.await;
let model_id = task.runtime.model.clone();
let (summary, sentinel) = match &result {
Ok(res) => {
let raw = summarize_subagent_result(res);
let (summary, truncated) = stamp_subagent_summary(&raw);
let sentinel = subagent_done_sentinel(&task.agent_id, res, truncated);
(summary, sentinel)
}
Err(err) => {
let annotated = annotate_child_model_error(&err.to_string(), &model_id);
(
format!("Failed: {annotated}"),
subagent_failed_sentinel(&task.agent_id, &annotated),
)
}
};
if let Some(mb) = task.runtime.mailbox.as_ref() {
let envelope = match &result {
Ok(_) => MailboxMessage::Completed {
agent_id: task.agent_id.clone(),
summary: summary.clone(),
},
Err(err) => MailboxMessage::Failed {
agent_id: task.agent_id.clone(),
error: annotate_child_model_error(&err.to_string(), &model_id),
},
};
let _ = mb.send(envelope);
}
let payload = format!("{summary}\n{sentinel}");
let agent_id = task.agent_id.clone();
emit_parent_completion(&task.runtime, &agent_id, &payload);
let mut manager = task.manager_handle.write().await;
match &result {
Ok(res) => manager.update_from_result(&agent_id, res.clone()),
Err(err) => {
manager.update_failed(
&agent_id,
annotate_child_model_error(&err.to_string(), &model_id),
);
}
}
if let Some(event_tx) = task.runtime.event_tx {
let _ = event_tx.try_send(Event::AgentComplete {
id: agent_id.clone(),
result: payload,
});
}
}
async fn acquire_queued_launch_permit(
task: &SubAgentTask,
gate: Arc<Semaphore>,
) -> Option<tokio::sync::OwnedSemaphorePermit> {
record_queued_launch_progress(task).await;
tokio::select! {
biased;
() = task.runtime.cancel_token.cancelled() => {
record_agent_progress(
&task.runtime,
&task.agent_id,
"cancelled while queued for a sub-agent launch slot".to_string(),
);
None
}
permit = Arc::clone(&gate).acquire_owned() => {
permit.ok()
}
}
}
async fn record_queued_launch_progress(task: &SubAgentTask) {
{
let mut manager = task.runtime.manager.write().await;
manager.touch(&task.agent_id);
manager.record_worker_event(
&task.agent_id,
AgentWorkerStatus::Queued,
Some(SUBAGENT_QUEUED_LAUNCH_REASON.to_string()),
None,
None,
);
}
emit_agent_progress(
task.runtime.event_tx.as_ref(),
task.runtime.mailbox.as_ref(),
&task.agent_id,
SUBAGENT_QUEUED_LAUNCH_REASON.to_string(),
task.runtime.parent_agent_id.clone(),
task.runtime.spawn_depth,
);
}
pub(crate) fn emit_parent_completion(
runtime: &SubAgentRuntime,
agent_id: &str,
payload: &str,
) -> bool {
if runtime.spawn_depth == 0 {
return false;
}
let Some(tx) = runtime.parent_completion_tx.as_ref() else {
return false;
};
let _ = tx.send(SubAgentCompletion {
agent_id: agent_id.to_string(),
payload: payload.to_string(),
});
true
}
pub(crate) fn subagent_completion_from_result(result: &SubAgentResult) -> SubAgentCompletion {
let raw = summarize_subagent_result(result);
let (summary, truncated) = stamp_subagent_summary(&raw);
let sentinel = match &result.status {
SubAgentStatus::Failed(error) => subagent_failed_sentinel(&result.agent_id, error),
_ => subagent_done_sentinel(&result.agent_id, result, truncated),
};
SubAgentCompletion {
agent_id: result.agent_id.clone(),
payload: format!("{summary}\n{sentinel}"),
}
}
fn subagent_done_sentinel(agent_id: &str, res: &SubAgentResult, truncated: bool) -> String {
let mut payload = json!({
"agent_id": agent_id,
"name": res.nickname,
"agent_type": res.agent_type.as_str(),
"status": subagent_status_name(&res.status),
"summary_location": "previous_line",
"summary_kind": if truncated { "truncated" } else { "complete" },
});
if let Some(needs_input) = res.needs_input.clone() {
payload["needs_input"] = json!(needs_input);
}
format!("<codewhale:subagent.done>{payload}</codewhale:subagent.done>")
}
fn subagent_failed_sentinel(agent_id: &str, _err: &str) -> String {
let payload = json!({
"agent_id": agent_id,
"status": "failed",
"error_location": "previous_line",
});
format!("<codewhale:subagent.done>{payload}</codewhale:subagent.done>")
}
fn response_was_truncated(response: &MessageResponse) -> bool {
response.stop_reason.as_deref() == Some("length")
}
fn truncated_response_tool_results(tool_uses: &[(String, String, Value)]) -> Vec<ContentBlock> {
tool_uses
.iter()
.map(|(tool_id, tool_name, _)| ContentBlock::ToolResult {
tool_use_id: tool_id.clone(),
content: format!(
"Error: the model response was truncated by max_tokens before the tool call arguments for '{tool_name}' could be fully generated. Split large content into smaller writes and retry."
),
is_error: Some(true),
content_blocks: None,
})
.collect()
}
fn truncated_response_text_retry_message() -> Vec<ContentBlock> {
vec![ContentBlock::Text {
text: "Error: the model response was truncated by max_tokens. No complete tool call was available, so the partial response was not accepted as the sub-agent result. Retry with a shorter response or split the work into smaller steps.".to_string(),
cache_control: None,
}]
}
fn record_truncated_subagent_response(consecutive: &mut u32) -> Result<()> {
*consecutive = consecutive.saturating_add(1);
if *consecutive > MAX_CONSECUTIVE_TRUNCATED_SUBAGENT_RESPONSES {
return Err(anyhow!(
"Sub-agent response was truncated by max_tokens {count} consecutive times; stopping to avoid an unbounded retry loop.",
count = *consecutive
));
}
Ok(())
}
fn reset_truncated_subagent_responses(consecutive: &mut u32) {
*consecutive = 0;
}
#[allow(clippy::too_many_arguments)]
async fn insert_subagent_full_transcript_handle(
runtime: &SubAgentRuntime,
agent_id: &str,
agent_type: &SubAgentType,
assignment: &SubAgentAssignment,
status: &SubAgentStatus,
result: Option<&String>,
checkpoint: Option<&SubAgentCheckpoint>,
messages: &[Message],
steps_taken: u32,
duration_ms: u64,
fork_context: bool,
) -> VarHandle {
let payload = json!({
"kind": "subagent_full_transcript",
"agent_id": agent_id,
"agent_type": agent_type.as_str(),
"status": subagent_status_name(status),
"context_mode": if fork_context { "forked" } else { "fresh" },
"fork_context": fork_context,
"result": result,
"steps_taken": steps_taken,
"duration_ms": duration_ms,
"assignment": assignment,
"checkpoint": checkpoint,
"messages": messages,
});
let mut store = runtime.context.runtime.handle_store.lock().await;
store.insert_json(format!("agent:{agent_id}"), "full_transcript", payload)
}
fn build_subagent_checkpoint(
agent_id: &str,
reason: impl Into<String>,
messages: &[Message],
steps_taken: u32,
continuable: bool,
) -> SubAgentCheckpoint {
let created_at_ms = epoch_millis_now();
let checkpoint_id = format!("{agent_id}:step:{steps_taken}:ts:{created_at_ms}");
SubAgentCheckpoint {
checkpoint_id: checkpoint_id.clone(),
agent_id: agent_id.to_string(),
continuation_handle: format!("agent:{agent_id}:checkpoint:{checkpoint_id}"),
reason: reason.into(),
continuable,
steps_taken,
message_count: messages.len(),
created_at_ms,
messages: messages.to_vec(),
}
}
async fn checkpoint_subagent_progress(
runtime: &SubAgentRuntime,
agent_id: &str,
reason: impl Into<String>,
messages: &[Message],
steps_taken: u32,
continuable: bool,
) -> SubAgentCheckpoint {
let checkpoint =
build_subagent_checkpoint(agent_id, reason, messages, steps_taken, continuable);
let mut manager = runtime.manager.write().await;
manager.update_checkpoint(agent_id, checkpoint.clone());
checkpoint
}
fn needs_input_for_interrupted_checkpoint(
reason: &str,
checkpoint: &SubAgentCheckpoint,
) -> SubAgentNeedsInput {
SubAgentNeedsInput {
question: format!(
"Sub-agent interrupted before completion ({reason}). Re-dispatch this worker or provide explicit follow-up using checkpoint {}.",
checkpoint.continuation_handle
),
}
}
#[derive(Debug)]
enum SubAgentApiRequestFailure {
Fatal(anyhow::Error),
Interrupted {
reason: String,
checkpoint_reason: &'static str,
},
}
fn subagent_transient_provider_retry_delay(retry_number: u32) -> Duration {
let multiplier = 1u32
.checked_shl(retry_number.saturating_sub(1))
.unwrap_or(4);
SUBAGENT_TRANSIENT_PROVIDER_INITIAL_BACKOFF.saturating_mul(multiplier.min(4))
}
fn is_transient_subagent_provider_error(error: &anyhow::Error) -> bool {
let message = format!("{error:#}").to_ascii_lowercase();
[
"did not receive response headers",
"response headers",
"stream request",
"request timed out",
"operation timed out",
"deadline has elapsed",
"connection reset",
"connection closed",
"connection aborted",
"temporarily unavailable",
"bad gateway",
"gateway timeout",
"service unavailable",
"502",
"503",
"504",
]
.iter()
.any(|needle| message.contains(needle))
}
async fn request_subagent_model_response_with_retries(
runtime: &SubAgentRuntime,
agent_id: &str,
steps: u32,
max_steps: u32,
request: MessageRequest,
) -> std::result::Result<MessageResponse, SubAgentApiRequestFailure> {
let mut transient_failures = 0u32;
loop {
match tokio::time::timeout(
runtime.step_api_timeout,
runtime.client.create_message(request.clone()),
)
.await
{
Ok(Ok(response)) => return Ok(response),
Ok(Err(err)) if is_transient_subagent_provider_error(&err) => {
if transient_failures >= SUBAGENT_TRANSIENT_PROVIDER_MAX_RETRIES {
let attempts = transient_failures.saturating_add(1);
return Err(SubAgentApiRequestFailure::Interrupted {
reason: format!(
"Transient provider failure after {attempts} API attempt(s): {err}; checkpoint preserved for continuation"
),
checkpoint_reason: "api_transient_provider_failure",
});
}
transient_failures = transient_failures.saturating_add(1);
let delay = subagent_transient_provider_retry_delay(transient_failures);
record_agent_progress(
runtime,
agent_id,
format!(
"{}: transient provider failure; retrying API request {}/{} in {}ms ({err})",
format_step_counter(steps, max_steps),
transient_failures,
SUBAGENT_TRANSIENT_PROVIDER_MAX_RETRIES,
delay.as_millis(),
),
);
tokio::time::sleep(delay).await;
}
Ok(Err(err)) => return Err(SubAgentApiRequestFailure::Fatal(err)),
Err(_) => {
return Err(SubAgentApiRequestFailure::Interrupted {
reason: format!(
"API call timed out after {}ms; checkpoint preserved for continuation",
runtime.step_api_timeout.as_millis()
),
checkpoint_reason: "api_timeout",
});
}
}
}
}
fn record_agent_progress(runtime: &SubAgentRuntime, agent_id: &str, message: impl Into<String>) {
let message = message.into();
if let Ok(mut manager) = runtime.manager.try_write() {
manager.touch(agent_id);
manager.record_worker_progress(agent_id, message.clone());
}
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
agent_id,
message,
runtime.parent_agent_id.clone(),
runtime.spawn_depth,
);
}
fn runtime_for_nested_agent_tools(
runtime: &SubAgentRuntime,
parent_agent_id: &str,
fork_context: SubAgentForkContext,
) -> (SubAgentRuntime, mpsc::UnboundedReceiver<SubAgentCompletion>) {
let (child_completion_tx, child_completion_rx) =
mpsc::unbounded_channel::<SubAgentCompletion>();
let runtime_for_tools = runtime
.clone()
.with_parent_completion_tx(child_completion_tx)
.with_fork_context(fork_context);
let runtime_for_tools = SubAgentRuntime {
parent_agent_id: Some(parent_agent_id.to_string()),
..runtime_for_tools
};
(runtime_for_tools, child_completion_rx)
}
fn drain_child_completion_events(
child_completion_rx: &mut mpsc::UnboundedReceiver<SubAgentCompletion>,
) -> Vec<SubAgentCompletion> {
let mut completions = Vec::new();
while let Ok(completion) = child_completion_rx.try_recv() {
completions.push(completion);
}
completions
}
fn child_completion_runtime_message(completions: &[SubAgentCompletion]) -> Message {
let mut text = String::from(
"<codewhale:runtime_event kind=\"child_subagent_completion\" visibility=\"internal\">\n\
This is an internal runtime event, not user input. One or more child sub-agents \
you spawned have finished. Treat each child summary as an unverified self-report: \
if you rely on it, cite the child agent_id and the EVIDENCE lines it provided, \
and distinguish that from evidence you personally verified.\n",
);
for completion in completions {
text.push_str("\n--- child sub-agent completion ---\n");
text.push_str("agent_id: ");
text.push_str(&completion.agent_id);
text.push('\n');
text.push_str(&completion.payload);
text.push('\n');
}
text.push_str("</codewhale:runtime_event>");
Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text,
cache_control: None,
}],
}
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn run_subagent(
runtime: &SubAgentRuntime,
agent_id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
fork_context: bool,
started_at: Instant,
max_steps: u32,
token_budget: Option<u64>,
mut input_rx: mpsc::UnboundedReceiver<SubAgentInput>,
) -> Result<SubAgentResult> {
let system_prompt = build_subagent_system_prompt(&agent_type, &assignment);
let fork_context_enabled = fork_context;
let fork_context = fork_context_enabled
.then_some(runtime.fork_context.as_ref())
.flatten();
let request_system = subagent_request_system_prompt(&system_prompt, fork_context);
let mut messages =
build_initial_subagent_messages(&prompt, &assignment, &agent_type, fork_context);
let (runtime_for_tools, mut child_completion_rx) = runtime_for_nested_agent_tools(
runtime,
&agent_id,
SubAgentForkContext {
system: Some(request_system.clone()),
messages: messages.clone(),
structured_state_block: None,
},
);
let tool_registry = SubAgentToolRegistry::new_with_owner(
runtime_for_tools,
agent_type.clone(),
agent_id.clone(),
assignment
.role
.as_deref()
.filter(|role| !role.trim().is_empty())
.unwrap_or(agent_type.as_str())
.to_string(),
allowed_tools.clone(),
runtime.todos.clone(),
Arc::new(Mutex::new(PlanState::default())),
);
let unavailable_tools = tool_registry.unavailable_allowed_tools();
if !unavailable_tools.is_empty() {
return Err(anyhow!(
"Sub-agent requested unavailable tools: {}",
unavailable_tools.join(", ")
));
}
let tools = tool_registry.tools_for_model(&agent_type);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::started(&agent_id, agent_type.clone()));
}
record_agent_progress(
runtime,
&agent_id,
format!("started ({})", agent_type.as_str()),
);
let mut steps = 0;
let mut final_result: Option<String> = None;
let mut pending_inputs: VecDeque<SubAgentInput> = VecDeque::new();
let mut consecutive_truncated_responses = 0;
let mut latest_checkpoint: Option<SubAgentCheckpoint> = None;
let mut tokens_used: u64 = 0;
for _step in 0..max_steps {
if runtime.cancel_token.is_cancelled() {
record_agent_progress(
runtime,
&agent_id,
format!("{}: cancelled", format_step_counter(steps, max_steps)),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
let status = SubAgentStatus::Cancelled;
let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
None,
latest_checkpoint.as_ref(),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
return Ok(SubAgentResult {
name: agent_id.clone(),
agent_id: agent_id.clone(),
context_mode: if fork_context_enabled {
"forked"
} else {
"fresh"
}
.to_string(),
fork_context: fork_context_enabled,
workspace: Some(runtime.context.workspace.clone()),
git_branch: current_git_branch(&runtime.context.workspace),
agent_type: agent_type.clone(),
assignment: assignment.clone(),
model: runtime.model.clone(),
nickname: None,
status,
worker_status: None,
parent_run_id: runtime.parent_agent_id.clone(),
spawn_depth: runtime.spawn_depth,
result: None,
steps_taken: steps,
checkpoint: latest_checkpoint.clone(),
needs_input: None,
duration_ms,
from_prior_session: false,
});
}
steps += 1;
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: requesting model response",
format_step_counter(steps, max_steps)
),
);
while let Ok(input) = input_rx.try_recv() {
if input.interrupt {
pending_inputs.clear();
}
pending_inputs.push_back(input);
}
while let Some(input) = pending_inputs.pop_front() {
if !input.text.trim().is_empty() {
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: input.text,
cache_control: None,
}],
});
}
}
let child_completions = drain_child_completion_events(&mut child_completion_rx);
if !child_completions.is_empty() {
let count = child_completions.len();
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: received {count} child sub-agent completion(s)",
format_step_counter(steps, max_steps)
),
);
messages.push(child_completion_runtime_message(&child_completions));
}
let request = MessageRequest {
model: runtime.model.clone(),
messages: messages.clone(),
max_tokens: SUBAGENT_RESPONSE_MAX_TOKENS,
system: Some(request_system.clone()),
tools: Some(tools.clone()),
tool_choice: Some(json!({ "type": "auto" })),
metadata: None,
thinking: None,
reasoning_effort: runtime.reasoning_effort.clone(),
stream: Some(false),
temperature: None,
top_p: None,
};
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"before_api_request",
&messages,
steps,
true,
)
.await,
);
let response = tokio::select! {
biased;
() = runtime.cancel_token.cancelled() => {
record_agent_progress(
runtime,
&agent_id,
format!("{}: cancelled mid-request", format_step_counter(steps, max_steps)),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
let status = SubAgentStatus::Cancelled;
let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
None,
latest_checkpoint.as_ref(),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
return Ok(SubAgentResult {
name: agent_id.clone(),
agent_id: agent_id.clone(),
context_mode: if fork_context_enabled { "forked" } else { "fresh" }.to_string(),
fork_context: fork_context_enabled,
workspace: Some(runtime.context.workspace.clone()),
git_branch: current_git_branch(&runtime.context.workspace),
agent_type: agent_type.clone(),
assignment: assignment.clone(),
model: runtime.model.clone(),
nickname: None,
status,
worker_status: None,
parent_run_id: runtime.parent_agent_id.clone(),
spawn_depth: runtime.spawn_depth,
result: None,
steps_taken: steps,
checkpoint: latest_checkpoint.clone(),
needs_input: None,
duration_ms,
from_prior_session: false,
});
}
api = request_subagent_model_response_with_retries(
runtime,
&agent_id,
steps,
max_steps,
request,
) => {
match api {
Ok(response) => response,
Err(SubAgentApiRequestFailure::Fatal(err)) => return Err(err),
Err(SubAgentApiRequestFailure::Interrupted { reason, checkpoint_reason }) => {
let checkpoint = checkpoint_subagent_progress(
runtime,
&agent_id,
checkpoint_reason,
&messages,
steps,
true,
)
.await;
record_agent_progress(
runtime,
&agent_id,
format!("{}: interrupted; {reason}", format_step_counter(steps, max_steps)),
);
let status = SubAgentStatus::Interrupted(reason.clone());
let duration_ms =
u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
Some(&reason),
Some(&checkpoint),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
let needs_input =
needs_input_for_interrupted_checkpoint(&reason, &checkpoint);
let interrupted_snapshot = {
let mut manager = runtime.manager.write().await;
manager.interrupt_with_checkpoint(
&agent_id,
reason.clone(),
checkpoint.clone(),
Some(needs_input.clone()),
)?
};
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: waiting for user; {}",
format_step_counter(steps, max_steps),
needs_input.question
),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Interrupted {
agent_id: agent_id.clone(),
reason: reason.clone(),
});
}
return Ok(interrupted_snapshot);
}
}
}
};
let mut tool_uses = Vec::new();
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::token_usage(
&agent_id,
response.model.clone(),
response.usage.clone(),
));
}
{
let mut manager = runtime.manager.write().await;
manager.record_worker_usage(&agent_id, &response.usage);
}
tokens_used = tokens_used.saturating_add(usage_total_tokens(&response.usage));
if let Some(budget) = token_budget {
if tokens_used > budget {
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: token budget exhausted ({tokens_used}/{budget})",
format_step_counter(steps, max_steps)
),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
let status = SubAgentStatus::BudgetExhausted;
let duration_ms =
u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"token_budget_exhausted",
&messages,
steps,
true,
)
.await,
);
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
final_result.as_ref(),
latest_checkpoint.as_ref(),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
return Ok(SubAgentResult {
name: agent_id.clone(),
agent_id: agent_id.clone(),
context_mode: if fork_context_enabled {
"forked"
} else {
"fresh"
}
.to_string(),
fork_context: fork_context_enabled,
workspace: Some(runtime.context.workspace.clone()),
git_branch: current_git_branch(&runtime.context.workspace),
agent_type: agent_type.clone(),
assignment: assignment.clone(),
model: runtime.model.clone(),
nickname: None,
status,
worker_status: None,
parent_run_id: runtime.parent_agent_id.clone(),
spawn_depth: runtime.spawn_depth,
result: final_result.clone(),
steps_taken: steps,
checkpoint: latest_checkpoint.clone(),
needs_input: None,
duration_ms,
from_prior_session: false,
});
}
}
for block in &response.content {
match block {
ContentBlock::Text { text, .. } if !text.trim().is_empty() => {
final_result = Some(text.clone());
}
ContentBlock::ToolUse {
id, name, input, ..
} => {
tool_uses.push((id.clone(), name.clone(), input.clone()));
}
_ => {}
}
}
messages.push(Message {
role: "assistant".to_string(),
content: response.content.clone(),
});
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"after_model_response",
&messages,
steps,
true,
)
.await,
);
if response_was_truncated(&response) {
final_result = None;
record_truncated_subagent_response(&mut consecutive_truncated_responses)?;
let progress = if tool_uses.is_empty() {
"response truncated, returning retry instruction".to_string()
} else {
format!(
"response truncated, returning {} tool error(s)",
tool_uses.len()
)
};
record_agent_progress(
runtime,
&agent_id,
format!("{}: {progress}", format_step_counter(steps, max_steps)),
);
messages.push(Message {
role: "user".to_string(),
content: if tool_uses.is_empty() {
truncated_response_text_retry_message()
} else {
truncated_response_tool_results(&tool_uses)
},
});
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"after_truncated_response_retry_message",
&messages,
steps,
true,
)
.await,
);
continue;
}
reset_truncated_subagent_responses(&mut consecutive_truncated_responses);
if tool_uses.is_empty() {
let child_completions = drain_child_completion_events(&mut child_completion_rx);
if !child_completions.is_empty() {
let count = child_completions.len();
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: resuming with {count} child sub-agent completion(s)",
format_step_counter(steps, max_steps)
),
);
messages.push(child_completion_runtime_message(&child_completions));
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"after_tail_child_subagent_completion",
&messages,
steps,
true,
)
.await,
);
continue;
}
while let Ok(input) = input_rx.try_recv() {
if input.interrupt {
pending_inputs.clear();
}
pending_inputs.push_back(input);
}
if pending_inputs.is_empty() {
record_agent_progress(
runtime,
&agent_id,
format!("{}: complete", format_step_counter(steps, max_steps)),
);
break;
}
continue;
}
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: executing {} tool call(s)",
format_step_counter(steps, max_steps),
tool_uses.len()
),
);
let mut tool_results: Vec<ContentBlock> = Vec::new();
for (tool_id, tool_name, tool_input) in tool_uses {
let tool_display_name = subagent_progress_tool_display_name(&tool_name);
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: running tool '{tool_display_name}'",
format_step_counter(steps, max_steps)
),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallStarted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
});
}
let result = match tokio::time::timeout(TOOL_TIMEOUT, async {
tool_registry
.execute(&agent_id, &tool_name, tool_input)
.await
})
.await
{
Ok(Ok(output)) => output,
Ok(Err(e)) => format!("Error: {e}"),
Err(_) => format!("Error: Tool {tool_name} timed out"),
};
let tool_ok = !result.starts_with("Error:");
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: finished tool '{tool_display_name}'",
format_step_counter(steps, max_steps)
),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallCompleted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
ok: tool_ok,
});
}
tool_results.push(ContentBlock::ToolResult {
tool_use_id: tool_id,
content: result,
is_error: None,
content_blocks: None,
});
}
if !tool_results.is_empty() {
messages.push(Message {
role: "user".to_string(),
content: tool_results,
});
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"after_tool_results",
&messages,
steps,
true,
)
.await,
);
}
}
release_resident_leases_for(&agent_id);
let status = SubAgentStatus::Completed;
let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
latest_checkpoint = Some(build_subagent_checkpoint(
&agent_id,
"completed",
&messages,
steps,
false,
));
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
final_result.as_ref(),
latest_checkpoint.as_ref(),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
Ok(SubAgentResult {
name: agent_id.clone(),
agent_id,
context_mode: if fork_context_enabled {
"forked"
} else {
"fresh"
}
.to_string(),
fork_context: fork_context_enabled,
workspace: Some(runtime.context.workspace.clone()),
git_branch: current_git_branch(&runtime.context.workspace),
agent_type,
assignment,
model: runtime.model.clone(),
nickname: None,
status,
worker_status: None,
parent_run_id: runtime.parent_agent_id.clone(),
spawn_depth: runtime.spawn_depth,
result: final_result,
steps_taken: steps,
checkpoint: latest_checkpoint,
needs_input: None,
duration_ms,
from_prior_session: false,
})
}
fn optional_input_str<'a>(input: &'a Value, keys: &[&str]) -> Option<&'a str> {
keys.iter()
.filter_map(|key| input.get(*key).and_then(Value::as_str))
.map(str::trim)
.find(|value| !value.is_empty())
}
fn parse_text_or_items(
input: &Value,
text_keys: &[&str],
items_key: &str,
required_field: &str,
) -> Result<String, ToolError> {
let text = optional_input_str(input, text_keys).map(str::to_string);
let items = parse_items_text(input, items_key)?;
match (text, items) {
(Some(_), Some(_)) => Err(ToolError::invalid_input(format!(
"Provide either {required_field} text or {items_key}, but not both"
))),
(Some(text), None) => Ok(text),
(None, Some(items)) => Ok(items),
(None, None) => Err(ToolError::missing_field(required_field)),
}
}
fn parse_items_text(input: &Value, key: &str) -> Result<Option<String>, ToolError> {
let Some(items) = input.get(key) else {
return Ok(None);
};
let array = items
.as_array()
.ok_or_else(|| ToolError::invalid_input(format!("'{key}' must be an array")))?;
if array.is_empty() {
return Err(ToolError::invalid_input(format!("'{key}' cannot be empty")));
}
let mut lines = Vec::new();
for item in array {
let object = item
.as_object()
.ok_or_else(|| ToolError::invalid_input("each item must be an object"))?;
let item_type = object
.get("type")
.and_then(Value::as_str)
.unwrap_or("text")
.trim();
let rendered = match item_type {
"text" => object
.get("text")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.map(str::to_string)
.ok_or_else(|| ToolError::invalid_input("text item requires non-empty text"))?,
"mention" => {
let name = object
.get("name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("mention item requires name"))?;
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("mention item requires path"))?;
format!("[mention:${name}]({path})")
}
"skill" => {
let name = object
.get("name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("skill item requires name"))?;
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("skill item requires path"))?;
format!("[skill:${name}]({path})")
}
"local_image" => {
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("local_image item requires path"))?;
format!("[local_image:{path}]")
}
"image" => {
let url = object
.get("image_url")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("image item requires image_url"))?;
format!("[image:{url}]")
}
_ => object
.get("text")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.map(str::to_string)
.unwrap_or_else(|| "[input]".to_string()),
};
lines.push(rendered);
}
Ok(Some(lines.join("\n")))
}
fn parse_spawn_request(input: &Value) -> Result<SpawnRequest, ToolError> {
let prompt = parse_text_or_items(
input,
&["prompt", "message", "objective"],
"items",
"prompt",
)?;
let session_name = optional_input_str(input, &["name", "session_name"])
.map(validate_session_name)
.transpose()?;
let type_input = optional_input_str(input, &["type", "agent_type", "agent_name"]);
let role_input = optional_input_str(input, &["role", "agent_role"]);
let parsed_type = type_input
.map(|kind| {
SubAgentType::from_str(kind).ok_or_else(|| {
ToolError::invalid_input(format!(
"Invalid sub-agent type '{kind}'. Use: {VALID_SUBAGENT_TYPES}"
))
})
})
.transpose()?;
let parsed_role_type = role_input
.map(|role| {
SubAgentType::from_str(role).ok_or_else(|| {
ToolError::invalid_input(format!(
"Invalid role alias '{role}'. Use: {VALID_ROLE_ALIASES}"
))
})
})
.transpose()?;
if let (Some(type_kind), Some(role_kind)) = (&parsed_type, &parsed_role_type)
&& type_kind != role_kind
{
return Err(ToolError::invalid_input(
"Conflicting type/agent_type and role/agent_role values".to_string(),
));
}
let agent_type = parsed_type
.or(parsed_role_type)
.unwrap_or(SubAgentType::General);
if let Some(role) = role_input
&& normalize_role_alias(role).is_none()
{
return Err(ToolError::invalid_input(format!(
"Invalid role alias '{role}'. Use: {VALID_ROLE_ALIASES}"
)));
}
let role = role_input
.and_then(normalize_role_alias)
.or_else(|| type_input.and_then(normalize_role_alias))
.map(str::to_string);
let allowed_tools = input
.get("allowed_tools")
.and_then(|v| v.as_array())
.map(|items| {
let mut tools = Vec::new();
for item in items {
if let Some(tool) = item.as_str() {
let trimmed = tool.trim();
if !trimmed.is_empty() && !tools.iter().any(|existing| existing == trimmed) {
tools.push(trimmed.to_string());
}
}
}
tools
});
let cwd = parse_optional_cwd(input)?;
let worktree = parse_optional_worktree_request(input)?;
if cwd.is_some() && worktree.is_some() {
return Err(ToolError::invalid_input(
"Use either cwd or worktree isolation, not both".to_string(),
));
}
let model = parse_optional_subagent_model(input, "model")?;
let model_strength = optional_input_str(input, &["model_strength", "modelStrength"])
.map(SubAgentModelStrength::parse)
.transpose()?
.unwrap_or_else(|| {
if agent_type == SubAgentType::Explore && model.is_none() {
SubAgentModelStrength::Faster
} else {
SubAgentModelStrength::Same
}
});
let thinking = optional_input_str(input, &["thinking", "reasoning_effort", "reasoningEffort"])
.map(SubAgentThinking::parse)
.transpose()?
.unwrap_or(SubAgentThinking::Inherit);
let resident_file = input
.get("resident_file")
.and_then(|v| v.as_str())
.map(str::to_string)
.filter(|s| !s.trim().is_empty());
let fork_context =
parse_optional_bool(input, &["fork_context", "forkContext", "inherit_context"])
.unwrap_or(false);
let max_depth = input
.get("max_depth")
.or_else(|| input.get("maxDepth"))
.or_else(|| input.get("max_spawn_depth"))
.and_then(Value::as_u64)
.map(|depth| {
let ceiling = codewhale_config::MAX_SPAWN_DEPTH_CEILING;
u32::try_from(depth)
.map_err(|_| {
ToolError::invalid_input(format!("max_depth must be between 0 and {ceiling}"))
})
.and_then(|depth| {
if depth <= ceiling {
Ok(depth)
} else {
Err(ToolError::invalid_input(format!(
"max_depth must be between 0 and {ceiling}"
)))
}
})
})
.transpose()?;
let token_budget =
parse_optional_positive_u64(input, &["token_budget", "tokenBudget", "max_tokens"])?;
Ok(SpawnRequest {
session_name,
prompt: prompt.clone(),
agent_type,
assignment: SubAgentAssignment::new(prompt, role),
allowed_tools,
model,
model_strength,
thinking,
cwd,
worktree,
resident_file,
fork_context,
max_depth,
token_budget,
})
}
fn validate_session_name(name: &str) -> Result<String, ToolError> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(ToolError::invalid_input("name cannot be blank"));
}
if trimmed.chars().any(char::is_whitespace) {
return Err(ToolError::invalid_input(
"name must not contain whitespace; use letters, numbers, '-', '_', or '.'",
));
}
if !trimmed
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.'))
{
return Err(ToolError::invalid_input(
"name may only contain ASCII letters, numbers, '-', '_', or '.'",
));
}
Ok(trimmed.to_string())
}
fn parse_optional_bool(input: &Value, names: &[&str]) -> Option<bool> {
names
.iter()
.find_map(|name| input.get(*name))
.and_then(Value::as_bool)
}
fn parse_optional_positive_u64(input: &Value, names: &[&str]) -> Result<Option<u64>, ToolError> {
for name in names {
let Some(value) = input.get(*name) else {
continue;
};
let Some(parsed) = value.as_u64() else {
return Err(ToolError::invalid_input(format!(
"{name} must be a positive integer token count"
)));
};
if parsed == 0 {
return Err(ToolError::invalid_input(format!(
"{name} must be greater than zero; omit it to inherit or disable the budget"
)));
}
return Ok(Some(parsed));
}
Ok(None)
}
fn with_default_fork_context(mut input: Value, default: bool) -> Value {
let Some(object) = input.as_object_mut() else {
return input;
};
if !object.contains_key("fork_context")
&& !object.contains_key("forkContext")
&& !object.contains_key("inherit_context")
{
object.insert("fork_context".to_string(), Value::Bool(default));
}
input
}
pub(crate) fn normalize_requested_subagent_model(
value: &str,
field: &str,
provider: crate::config::ApiProvider,
) -> Result<String, ToolError> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(ToolError::invalid_input(format!("{field} cannot be blank")));
}
crate::config::requested_model_for_provider(provider, trimmed).ok_or_else(|| {
let valid_names = crate::config::model_completion_names_for_provider(provider);
let valid_hint = if valid_names.is_empty() {
String::new()
} else {
format!(" (accepted: {})", valid_names.join(", "))
};
ToolError::invalid_input(format!(
"Invalid {field} '{trimmed}' for provider {}{valid_hint}",
provider_name_for_error(provider)
))
})
}
fn provider_name_for_error(provider: crate::config::ApiProvider) -> &'static str {
match provider {
crate::config::ApiProvider::Deepseek | crate::config::ApiProvider::DeepseekCN => "DeepSeek",
crate::config::ApiProvider::Openai | crate::config::ApiProvider::OpenaiCodex => "OpenAI",
crate::config::ApiProvider::Moonshot => "Moonshot",
crate::config::ApiProvider::Ollama => "Ollama",
_ => "this provider",
}
}
pub(crate) fn configured_model_for_role_or_type(
runtime: &SubAgentRuntime,
role: Option<&str>,
agent_type: &SubAgentType,
) -> Result<Option<String>, ToolError> {
let mut keys = Vec::new();
if let Some(role) = role.map(str::trim).filter(|role| !role.is_empty()) {
keys.push(role.to_ascii_lowercase());
}
keys.push(agent_type.as_str().to_string());
keys.push("default".to_string());
for key in keys {
if let Some(model) = runtime.role_models.get(&key) {
return normalize_requested_subagent_model(
model,
&format!("subagents.{key}.model"),
runtime.client.api_provider(),
)
.map(Some);
}
}
Ok(None)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct SubAgentResolvedRoute {
pub(crate) model_route: ModelRoute,
pub(crate) model: String,
pub(crate) reasoning_effort: Option<String>,
pub(crate) tuning: RequestTuning,
}
impl SubAgentResolvedRoute {
fn new(
model_route: ModelRoute,
model: String,
reasoning_effort: Option<String>,
) -> SubAgentResolvedRoute {
let tuning = subagent_request_tuning(reasoning_effort.as_deref());
SubAgentResolvedRoute {
model_route,
model,
reasoning_effort,
tuning,
}
}
fn refresh_tuning(&mut self) {
self.tuning = subagent_request_tuning(self.reasoning_effort.as_deref());
}
}
pub(crate) async fn resolve_subagent_assignment_route(
runtime: &SubAgentRuntime,
configured_model: Option<String>,
prompt: &str,
agent_type: &SubAgentType,
requested_model_route: ModelRoute,
requested_thinking: SubAgentThinking,
) -> SubAgentResolvedRoute {
let model_route = assignment_model_route(configured_model.as_deref(), requested_model_route);
worker_profile_subagent_assignment_route(
runtime,
&model_route,
requested_thinking,
prompt,
agent_type,
)
}
fn assignment_model_route(
configured_model: Option<&str>,
requested_model_route: ModelRoute,
) -> ModelRoute {
if let Some(model) = configured_model
.map(str::trim)
.filter(|model| !model.is_empty())
{
return ModelRoute::Fixed(model.to_string());
}
requested_model_route
}
fn subagent_request_tuning(reasoning_effort: Option<&str>) -> RequestTuning {
RequestTuning {
reasoning_effort: reasoning_effort.map(ReasoningEffort::from_setting),
max_output_tokens: Some(SUBAGENT_RESPONSE_MAX_TOKENS),
}
}
fn subagent_router_candidates(runtime: &SubAgentRuntime) -> crate::model_routing::RouterCandidates {
crate::model_routing::provider_router_candidates(runtime.client.api_provider(), &runtime.model)
}
fn fallback_subagent_assignment_route(
runtime: &SubAgentRuntime,
configured_model: Option<String>,
requested_model_route: ModelRoute,
requested_thinking: SubAgentThinking,
prompt: &str,
) -> SubAgentResolvedRoute {
let model_route = assignment_model_route(configured_model.as_deref(), requested_model_route);
worker_profile_subagent_assignment_route(
runtime,
&model_route,
requested_thinking,
prompt,
&SubAgentType::General,
)
}
fn worker_profile_subagent_assignment_route(
runtime: &SubAgentRuntime,
model_route: &ModelRoute,
requested_thinking: SubAgentThinking,
prompt: &str,
_agent_type: &SubAgentType,
) -> SubAgentResolvedRoute {
let candidates = subagent_router_candidates(runtime);
let mut requested_fast_lane = false;
let model = match model_route {
ModelRoute::Fixed(model) => model.clone(),
ModelRoute::Faster | ModelRoute::Auto => {
requested_fast_lane = true;
candidates
.cheap
.clone()
.unwrap_or_else(|| runtime.model.clone())
}
ModelRoute::Inherit => runtime.model.clone(),
};
let reasoning_effort = subagent_reasoning_effort_for_request(
runtime,
prompt,
requested_fast_lane,
requested_thinking,
);
SubAgentResolvedRoute::new(model_route.clone(), model, reasoning_effort)
}
fn subagent_reasoning_effort_for_request(
runtime: &SubAgentRuntime,
prompt: &str,
requested_fast_lane: bool,
requested_thinking: SubAgentThinking,
) -> Option<String> {
match requested_thinking {
SubAgentThinking::Effort(effort) => Some(effort.as_setting().to_string()),
SubAgentThinking::Auto => Some(
auto_subagent_reasoning_effort(prompt)
.as_setting()
.to_string(),
),
SubAgentThinking::Inherit if requested_fast_lane => {
let provider = runtime.client.api_provider();
let effort = if matches!(provider, crate::config::ApiProvider::OpenaiCodex) {
ReasoningEffort::Low
} else {
ReasoningEffort::Off
};
Some(effort.as_setting().to_string())
}
SubAgentThinking::Inherit => fallback_subagent_reasoning_effort(runtime, prompt),
}
}
fn fallback_subagent_reasoning_effort(runtime: &SubAgentRuntime, prompt: &str) -> Option<String> {
if runtime.reasoning_effort_auto {
Some(
auto_subagent_reasoning_effort(prompt)
.as_setting()
.to_string(),
)
} else {
runtime.reasoning_effort.clone()
}
}
fn auto_subagent_reasoning_effort(prompt: &str) -> ReasoningEffort {
match crate::auto_reasoning::select(false, prompt) {
ReasoningEffort::Low | ReasoningEffort::Medium => ReasoningEffort::High,
other => other,
}
}
fn parse_optional_subagent_model(input: &Value, key: &str) -> Result<Option<String>, ToolError> {
match input.get(key) {
None | Some(Value::Null) => Ok(None),
Some(Value::String(value)) => {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(ToolError::invalid_input(format!("{key} cannot be blank")));
}
Ok(Some(trimmed.to_string()))
}
Some(_) => Err(ToolError::invalid_input(format!("{key} must be a string"))),
}
}
fn parse_optional_cwd(input: &Value) -> Result<Option<PathBuf>, ToolError> {
let raw = input.get("cwd").and_then(|v| v.as_str()).map(str::trim);
match raw {
None | Some("") => Ok(None),
Some(s) => Ok(Some(PathBuf::from(s))),
}
}
fn parse_optional_worktree_request(
input: &Value,
) -> Result<Option<SubAgentWorktreeRequest>, ToolError> {
let worktree_flag =
parse_optional_bool_strict(input, &["worktree", "isolate_worktree", "isolateWorktree"])?;
let isolation = optional_input_str(input, &["isolation"])
.map(|value| value.trim().to_ascii_lowercase().replace(['_', '-'], ""));
let isolation_wants_worktree = match isolation.as_deref() {
None | Some("") | Some("none") | Some("shared") => false,
Some("worktree") | Some("gitworktree") => true,
Some(other) => {
return Err(ToolError::invalid_input(format!(
"isolation must be 'worktree' or 'none' (got '{other}')"
)));
}
};
let branch = optional_input_str(
input,
&[
"worktree_branch",
"worktreeBranch",
"branch_name",
"branchName",
"branch",
],
)
.map(str::to_string);
let path = optional_input_str(
input,
&[
"worktree_path",
"worktreePath",
"worktree_dir",
"worktreeDir",
],
)
.map(PathBuf::from);
let base_ref = optional_input_str(
input,
&["worktree_base", "worktreeBase", "base_ref", "baseRef"],
)
.map(str::to_string);
let has_worktree_details = branch.is_some() || path.is_some() || base_ref.is_some();
if worktree_flag == Some(false) && (isolation_wants_worktree || has_worktree_details) {
return Err(ToolError::invalid_input(
"worktree=false conflicts with worktree isolation options".to_string(),
));
}
if worktree_flag.unwrap_or(false) || isolation_wants_worktree || has_worktree_details {
Ok(Some(SubAgentWorktreeRequest {
branch,
path,
base_ref,
}))
} else {
Ok(None)
}
}
fn parse_optional_bool_strict(input: &Value, names: &[&str]) -> Result<Option<bool>, ToolError> {
for name in names {
let Some(value) = input.get(*name) else {
continue;
};
return value.as_bool().map(Some).ok_or_else(|| {
ToolError::invalid_input(format!("{name} must be a boolean when provided"))
});
}
Ok(None)
}
fn prepare_child_workspace(
parent_workspace: &Path,
request: &SpawnRequest,
) -> Result<Option<PathBuf>, ToolError> {
if let Some(requested_cwd) = request.cwd.as_ref() {
return validate_existing_child_cwd(parent_workspace, requested_cwd).map(Some);
}
if let Some(worktree) = request.worktree.as_ref() {
return create_isolated_worktree(
parent_workspace,
worktree,
request.session_name.as_deref(),
&request.agent_type,
)
.map(Some);
}
Ok(None)
}
fn validate_existing_child_cwd(
parent_workspace: &Path,
requested_cwd: &Path,
) -> Result<PathBuf, ToolError> {
let resolved = if requested_cwd.is_absolute() {
requested_cwd.to_path_buf()
} else {
parent_workspace.join(requested_cwd)
};
let canonical = resolved.canonicalize().map_err(|e| {
ToolError::invalid_input(format!(
"Invalid cwd '{}': {e} (path may not exist yet — use worktree=true to let CodeWhale create an isolated checkout)",
requested_cwd.display()
))
})?;
let workspace_canonical = parent_workspace
.canonicalize()
.unwrap_or_else(|_| parent_workspace.to_path_buf());
if !canonical.starts_with(&workspace_canonical) {
return Err(ToolError::invalid_input(format!(
"cwd must be inside the parent workspace: {} is not under {}",
canonical.display(),
workspace_canonical.display()
)));
}
Ok(canonical)
}
fn create_isolated_worktree(
parent_workspace: &Path,
request: &SubAgentWorktreeRequest,
session_name: Option<&str>,
agent_type: &SubAgentType,
) -> Result<PathBuf, ToolError> {
let repo_root = git_repo_root(parent_workspace)?;
let branch = request
.branch
.clone()
.unwrap_or_else(|| default_worktree_branch(session_name, agent_type));
validate_git_branch_name(&repo_root, &branch)?;
let base_ref = request
.base_ref
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("HEAD")
.to_string();
let worktree_path = resolve_worktree_path(&repo_root, &branch, request.path.as_ref())?;
if let Some(parent) = worktree_path.parent() {
fs::create_dir_all(parent).map_err(|err| {
ToolError::execution_failed(format!(
"Failed to create worktree parent '{}': {err}",
parent.display()
))
})?;
}
let path_arg = worktree_path.to_string_lossy().to_string();
let args = vec![
"worktree".to_string(),
"add".to_string(),
"-b".to_string(),
branch,
path_arg,
base_ref,
];
run_git_checked(&repo_root, &args, "create sub-agent worktree")?;
worktree_path.canonicalize().map_err(|err| {
ToolError::execution_failed(format!(
"Created worktree path '{}' could not be resolved: {err}",
worktree_path.display()
))
})
}
fn git_repo_root(workspace: &Path) -> Result<PathBuf, ToolError> {
let output = run_git_checked(
workspace,
&["rev-parse".to_string(), "--show-toplevel".to_string()],
"resolve git repository root",
)?;
let root = output.trim();
if root.is_empty() {
return Err(ToolError::invalid_input(
"worktree=true requires a git repository workspace".to_string(),
));
}
Ok(PathBuf::from(root))
}
fn validate_git_branch_name(repo_root: &Path, branch: &str) -> Result<(), ToolError> {
let branch = branch.trim();
if branch.is_empty() {
return Err(ToolError::invalid_input(
"worktree_branch cannot be blank".to_string(),
));
}
run_git_checked(
repo_root,
&[
"check-ref-format".to_string(),
"--branch".to_string(),
branch.to_string(),
],
"validate sub-agent worktree branch",
)
.map(|_| ())
.map_err(|err| ToolError::invalid_input(format!("Invalid worktree_branch '{branch}': {err}")))
}
fn default_worktree_branch(session_name: Option<&str>, agent_type: &SubAgentType) -> String {
let seed = session_name
.map(str::trim)
.filter(|name| !name.is_empty())
.unwrap_or_else(|| agent_type.as_str());
format!(
"codex/agent-{}-{}",
sanitize_worktree_slug(seed),
&Uuid::new_v4().to_string()[..8]
)
}
fn resolve_worktree_path(
repo_root: &Path,
branch: &str,
requested_path: Option<&PathBuf>,
) -> Result<PathBuf, ToolError> {
let default_root = default_worktree_root(repo_root);
let path = match requested_path {
Some(path) if path.is_absolute() => path.to_path_buf(),
Some(path) => {
let resolved = normalize_path_lexically(&default_root.join(path));
if !resolved.starts_with(&default_root) {
return Err(ToolError::invalid_input(format!(
"relative worktree_path '{}' must stay under {}",
path.display(),
default_root.display()
)));
}
resolved
}
None => default_root.join(sanitize_worktree_slug(branch)),
};
let normalized = normalize_path_lexically(&path);
let repo_canonical = repo_root
.canonicalize()
.unwrap_or_else(|_| repo_root.to_path_buf());
if normalized.starts_with(&repo_canonical) {
return Err(ToolError::invalid_input(format!(
"worktree_path must not be inside the parent checkout: {} is under {}",
normalized.display(),
repo_canonical.display()
)));
}
Ok(normalized)
}
fn default_worktree_root(repo_root: &Path) -> PathBuf {
let repo_name = repo_root
.file_name()
.and_then(|name| name.to_str())
.map(sanitize_worktree_slug)
.filter(|name| !name.is_empty())
.unwrap_or_else(|| "repo".to_string());
let parent = repo_root.parent().unwrap_or(repo_root);
normalize_path_lexically(&parent.join(SUBAGENT_WORKTREE_ROOT_DIR).join(repo_name))
}
fn sanitize_worktree_slug(input: &str) -> String {
let mut slug = String::new();
for ch in input.chars() {
let normalized = if ch.is_ascii_alphanumeric() {
ch.to_ascii_lowercase()
} else if matches!(ch, '-' | '_' | '.') {
ch
} else {
'-'
};
if normalized == '-' && slug.ends_with('-') {
continue;
}
slug.push(normalized);
if slug.len() >= 48 {
break;
}
}
let slug = slug.trim_matches(['-', '.', '_']).to_string();
if slug.is_empty() {
"task".to_string()
} else {
slug
}
}
fn normalize_path_lexically(path: &Path) -> PathBuf {
let mut normalized = PathBuf::new();
for component in path.components() {
match component {
std::path::Component::CurDir => {}
std::path::Component::ParentDir => {
normalized.pop();
}
other => normalized.push(other.as_os_str()),
}
}
normalized
}
fn run_git_checked(workspace: &Path, args: &[String], action: &str) -> Result<String, ToolError> {
let arg_refs = args.iter().map(String::as_str).collect::<Vec<_>>();
let output = Git::output(&arg_refs, workspace).map_err(|err| {
ToolError::execution_failed(format!("Failed to {action}: could not run git: {err}"))
})?;
if output.status.success() {
return Ok(String::from_utf8_lossy(&output.stdout).to_string());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
let detail = if !stderr.is_empty() {
stderr
} else if !stdout.is_empty() {
stdout
} else {
format!("git exited with status {}", output.status)
};
Err(ToolError::execution_failed(format!(
"Failed to {action}: {detail}"
)))
}
fn normalize_role_alias(input: &str) -> Option<&'static str> {
match input.to_ascii_lowercase().as_str() {
"default" => Some("default"),
"worker" | "general" | "general-purpose" | "general_purpose" => Some("worker"),
"explorer" | "explore" | "exploration" => Some("explorer"),
"awaiter" | "plan" | "planner" | "planning" => Some("awaiter"),
"reviewer" | "review" | "code-review" | "code_review" => Some("reviewer"),
"implementer" | "implement" | "implementation" | "builder" => Some("implementer"),
"verifier" | "verify" | "verification" | "validator" | "tester" => Some("verifier"),
"custom" => Some("custom"),
_ => None,
}
}
fn build_assignment_prompt(
prompt: &str,
assignment: &SubAgentAssignment,
agent_type: &SubAgentType,
) -> String {
let role = assignment.role.as_deref().unwrap_or("default");
format!(
"Assignment metadata:\n- objective: {}\n- role: {}\n- resolved_type: {}\n\nTask:\n{}",
assignment.objective,
role,
agent_type.as_str(),
prompt
)
}
fn worker_status_from_subagent_status(status: &SubAgentStatus) -> AgentWorkerStatus {
match status {
SubAgentStatus::Running => AgentWorkerStatus::Running,
SubAgentStatus::Completed => AgentWorkerStatus::Completed,
SubAgentStatus::Failed(_) => AgentWorkerStatus::Failed,
SubAgentStatus::Cancelled => AgentWorkerStatus::Cancelled,
SubAgentStatus::BudgetExhausted => AgentWorkerStatus::Failed,
SubAgentStatus::Interrupted(_) => AgentWorkerStatus::Interrupted,
}
}
pub fn agent_worker_status_name(status: AgentWorkerStatus) -> &'static str {
match status {
AgentWorkerStatus::Queued => "queued",
AgentWorkerStatus::Starting => "starting",
AgentWorkerStatus::Running => "running",
AgentWorkerStatus::WaitingForUser => "waiting_for_user",
AgentWorkerStatus::ModelWait => "model_wait",
AgentWorkerStatus::RunningTool => "running_tool",
AgentWorkerStatus::Completed => "completed",
AgentWorkerStatus::Failed => "failed",
AgentWorkerStatus::Cancelled => "cancelled",
AgentWorkerStatus::Interrupted => "interrupted",
}
}
fn worker_status_from_subagent_result(result: &SubAgentResult) -> AgentWorkerStatus {
if subagent_checkpoint_is_continuable(result) {
AgentWorkerStatus::WaitingForUser
} else {
worker_status_from_subagent_status(&result.status)
}
}
fn worker_progress_event_parts(message: &str) -> (AgentWorkerStatus, Option<u32>, Option<String>) {
let step = parse_progress_step(message);
let lower = message.to_ascii_lowercase();
let status = if lower.contains("queued") {
AgentWorkerStatus::Queued
} else if lower.contains("waiting for user") || lower.contains("waiting for follow-up") {
AgentWorkerStatus::WaitingForUser
} else if lower.contains("requesting model response")
|| lower.contains(SUBAGENT_MODEL_WAIT_REASON)
{
AgentWorkerStatus::ModelWait
} else if lower.contains("running tool") || lower.contains("executing") {
AgentWorkerStatus::RunningTool
} else if lower.contains("cancelled") {
AgentWorkerStatus::Cancelled
} else if lower.contains("interrupted") || lower.contains("timed out") {
AgentWorkerStatus::Interrupted
} else if lower.contains("complete") {
AgentWorkerStatus::Completed
} else if lower.contains("started") {
AgentWorkerStatus::Starting
} else {
AgentWorkerStatus::Running
};
(status, step, parse_progress_tool_name(message))
}
fn parse_progress_step(message: &str) -> Option<u32> {
let rest = message.strip_prefix("step ")?;
let digits: String = rest.chars().take_while(|ch| ch.is_ascii_digit()).collect();
(!digits.is_empty())
.then(|| digits.parse::<u32>().ok())
.flatten()
}
fn parse_progress_tool_name(message: &str) -> Option<String> {
let marker = "tool '";
let start = message.find(marker)? + marker.len();
let rest = &message[start..];
let end = rest.find('\'')?;
let tool = rest[..end].trim();
(!tool.is_empty()).then(|| tool.to_string())
}
fn subagent_progress_tool_display_name(name: &str) -> &str {
match name {
"exec_shell"
| "exec_shell_wait"
| "exec_shell_interact"
| "exec_wait"
| "exec_interact"
| "task_shell_start"
| "task_shell_wait" => "Bash",
_ => name,
}
}
fn emit_agent_progress(
event_tx: Option<&mpsc::Sender<Event>>,
mailbox: Option<&Mailbox>,
agent_id: &str,
status: String,
parent_run_id: Option<String>,
spawn_depth: u32,
) {
if let Some(mb) = mailbox {
let _ = mb.send(MailboxMessage::progress(agent_id, status.clone()));
}
if let Some(event_tx) = event_tx {
let _ = event_tx.try_send(Event::AgentProgress {
id: agent_id.to_string(),
status,
parent_run_id,
spawn_depth,
});
}
}
fn role_posture_permits(agent_type: &SubAgentType, approval: ApprovalRequirement) -> bool {
if matches!(agent_type, SubAgentType::Custom) {
return true;
}
let profile = WorkerRuntimeProfile::for_role(agent_type.clone());
match approval {
ApprovalRequirement::Auto => true,
ApprovalRequirement::Suggest => profile.permissions.write,
ApprovalRequirement::Required => {
matches!(profile.shell, crate::worker_profile::ShellPolicy::Full)
}
}
}
struct SubAgentToolRegistry {
allowed_tools: Option<Vec<String>>,
auto_approve: bool,
agent_type: SubAgentType,
can_spawn_child: bool,
owner_agent_id: String,
owner_agent_name: String,
registry: ToolRegistry,
}
impl SubAgentToolRegistry {
fn new(
runtime: SubAgentRuntime,
agent_type: SubAgentType,
explicit_allowed_tools: Option<Vec<String>>,
todo_list: SharedTodoList,
plan_state: SharedPlanState,
) -> Self {
Self::new_with_owner(
runtime,
agent_type,
"agent_unknown".to_string(),
"sub-agent".to_string(),
explicit_allowed_tools,
todo_list,
plan_state,
)
}
fn new_with_owner(
runtime: SubAgentRuntime,
agent_type: SubAgentType,
owner_agent_id: String,
owner_agent_name: String,
explicit_allowed_tools: Option<Vec<String>>,
todo_list: SharedTodoList,
plan_state: SharedPlanState,
) -> Self {
let can_spawn_child = !runtime.would_exceed_depth();
let context = runtime.context.clone();
let mut registry = ToolRegistryBuilder::new().with_full_agent_surface(
Some(runtime.client.clone()),
runtime.model.clone(),
runtime.manager.clone(),
runtime.clone(),
runtime.allow_shell,
todo_list,
plan_state,
);
if let Some(pool) = runtime.mcp_pool.as_ref() {
registry = registry.with_mcp_tools(std::sync::Arc::clone(pool));
}
let registry = registry.build(context);
Self {
allowed_tools: explicit_allowed_tools,
auto_approve: runtime.context.auto_approve,
agent_type,
can_spawn_child,
owner_agent_id,
owner_agent_name,
registry,
}
}
fn role_can_delegate_writes(agent_type: &SubAgentType) -> bool {
matches!(agent_type, SubAgentType::Implementer | SubAgentType::Custom)
}
fn posture_permits_tool(&self, name: &str) -> bool {
if name == "agent" {
return true;
}
match self.registry.get(name) {
Some(spec) => role_posture_permits(&self.agent_type, spec.approval_requirement()),
None => true,
}
}
fn is_tool_allowed(&self, name: &str) -> bool {
if name == "agent" && !self.can_spawn_child {
return false;
}
match &self.allowed_tools {
None => true,
Some(list) => list.iter().any(|t| t == name),
}
}
fn tools_for_model(&self, agent_type: &SubAgentType) -> Vec<Tool> {
let _ = agent_type;
let api_tools = self.registry.to_api_tools();
let filtered = match &self.allowed_tools {
None => api_tools,
Some(list) => api_tools
.into_iter()
.filter(|tool| list.contains(&tool.name))
.collect::<Vec<_>>(),
};
filtered
.into_iter()
.filter(|tool| tool.name != "agent" || self.can_spawn_child)
.filter(|tool| self.posture_permits_tool(&tool.name))
.collect()
}
fn unavailable_allowed_tools(&self) -> Vec<String> {
match &self.allowed_tools {
None => Vec::new(),
Some(list) => list
.iter()
.filter(|name| !self.registry.contains(name))
.cloned()
.collect(),
}
}
async fn execute(&self, _agent_id: &str, name: &str, input: Value) -> Result<String> {
if !self.is_tool_allowed(name) {
return Err(anyhow!("Tool {name} not allowed for this sub-agent"));
}
if !self.posture_permits_tool(name) {
return Err(anyhow!(
"Tool {name} is not permitted for the read-only `{role}` sub-agent role. Use an `implementer` or `general` role (or a `custom` role with an explicit allowed_tools list) to mutate the workspace or run shell commands.",
role = self.agent_type.as_str()
));
}
if !self.auto_approve {
let Some(spec) = self.registry.get(name) else {
return Err(anyhow!("Tool {name} is not registered"));
};
match spec.approval_requirement() {
ApprovalRequirement::Auto => {}
ApprovalRequirement::Suggest => {
if !Self::role_can_delegate_writes(&self.agent_type) {
return Err(anyhow!(
"Tool {name} requires approval and is not delegated to {role} sub-agents; rerun the parent with auto approval or pick a write-capable role",
role = self.agent_type.as_str()
));
}
}
ApprovalRequirement::Required => {
return Err(anyhow!(
"Tool {name} requires approval and cannot run inside this sub-agent unless the parent session is auto-approved"
));
}
}
}
reject_subagent_terminal_takeover(name, &input)?;
let context = self
.registry
.context()
.clone()
.with_owner_agent(self.owner_agent_id.clone(), self.owner_agent_name.clone());
self.registry
.execute_full_with_context(name, input, Some(&context))
.await
.map(|result| result.content)
.map_err(|e| anyhow!(e))
}
}
fn reject_subagent_terminal_takeover(name: &str, input: &Value) -> Result<()> {
let wants_interactive_shell = name == "exec_shell"
&& input
.get("interactive")
.and_then(Value::as_bool)
.unwrap_or(false);
if wants_interactive_shell {
return Err(anyhow!(
"Sub-agents run in the background and cannot use exec_shell with interactive=true \
because that would take over the parent TUI terminal. Use non-interactive \
exec_shell, background=true, tty=true, or task_shell_start instead."
));
}
Ok(())
}
fn build_allowed_tools(
agent_type: &SubAgentType,
explicit_tools: Option<Vec<String>>,
_allow_shell: bool,
) -> Result<Option<Vec<String>>> {
if let Some(tools) = explicit_tools {
let mut deduped = Vec::new();
for tool in tools {
let name = tool.trim();
if !name.is_empty() && !deduped.iter().any(|existing: &String| existing == name) {
deduped.push(name.to_string());
}
}
if matches!(agent_type, SubAgentType::Custom) && deduped.is_empty() {
return Err(anyhow!(
"Custom sub-agent requires a non-empty allowed_tools list"
));
}
return Ok(Some(deduped));
}
if matches!(agent_type, SubAgentType::Custom) {
return Err(anyhow!(
"Custom sub-agent requires a non-empty allowed_tools list"
));
}
Ok(None)
}
fn annotate_child_model_error(err: &str, model: &str) -> String {
match crate::error_taxonomy::classify_error_message(err) {
crate::error_taxonomy::ErrorCategory::Authorization
| crate::error_taxonomy::ErrorCategory::State => format!(
"{err}\n(child model `{model}` may be unavailable under the current access profile — \
remove the explicit child model override or adjust child-agent model config before retrying)"
),
_ => {
let lower = err.to_ascii_lowercase();
if lower.contains("model not exist")
|| lower.contains("model_not_found")
|| lower.contains("does not exist")
|| lower.contains("no such model")
|| lower.contains("invalid model")
{
format!(
"{err}\n(child model `{model}` may be unavailable under the current access profile — \
remove the explicit child model override or adjust child-agent model config before retrying)"
)
} else {
err.to_string()
}
}
}
}
const SUBAGENT_SUMMARY_CHAR_BUDGET: usize = 12_000;
const SUBAGENT_SUMMARY_HEAD_CHARS: usize = 4_000;
const SUBAGENT_SUMMARY_TAIL_CHARS: usize = 4_000;
const SUBAGENT_SELF_REPORT_NOTE: &str = "\n[Sub-agent self-report — re-verify material claims (read changed files, \
run the relevant tests) before relying on it.]";
fn stamp_subagent_summary(raw: &str) -> (String, bool) {
let total = raw.chars().count();
if total <= SUBAGENT_SUMMARY_CHAR_BUDGET {
return (format!("{raw}{SUBAGENT_SELF_REPORT_NOTE}"), false);
}
let chars: Vec<char> = raw.chars().collect();
let head: String = chars.iter().take(SUBAGENT_SUMMARY_HEAD_CHARS).collect();
let tail: String = chars
.iter()
.skip(total.saturating_sub(SUBAGENT_SUMMARY_TAIL_CHARS))
.collect();
let omitted = total
.saturating_sub(SUBAGENT_SUMMARY_HEAD_CHARS)
.saturating_sub(SUBAGENT_SUMMARY_TAIL_CHARS);
let stamped = format!(
"{head}\n\n[Sub-agent summary truncated: {head_chars} + {tail_chars} of {total} \
chars shown. This is the child's self-report; the elided middle ({omitted} chars) is not in \
the spillover store and cannot be retrieved via retrieve_tool_result. Re-open the child or \
read changed files directly to verify material claims.]\n\n{tail}",
head_chars = SUBAGENT_SUMMARY_HEAD_CHARS,
tail_chars = SUBAGENT_SUMMARY_TAIL_CHARS,
);
(stamped, true)
}
fn summarize_subagent_result(result: &SubAgentResult) -> String {
if let Some(needs_input) = result.needs_input.as_ref() {
return format!("Needs input: {}", needs_input.question);
}
match (&result.status, result.result.as_ref()) {
(SubAgentStatus::Completed, Some(text)) => text.clone(),
(SubAgentStatus::Completed, None) => "Completed (no output)".to_string(),
(SubAgentStatus::Interrupted(error), _) => format!("Interrupted: {error}"),
(SubAgentStatus::Cancelled, _) => "Cancelled".to_string(),
(SubAgentStatus::BudgetExhausted, _) => "Token budget exhausted".to_string(),
(SubAgentStatus::Failed(error), _) => format!("Failed: {error}"),
(SubAgentStatus::Running, _) => "Running".to_string(),
}
}
fn subagent_status_name(status: &SubAgentStatus) -> &'static str {
match status {
SubAgentStatus::Running => "running",
SubAgentStatus::Completed => "completed",
SubAgentStatus::Interrupted(_) => "interrupted",
SubAgentStatus::Failed(_) => "failed",
SubAgentStatus::Cancelled => "cancelled",
SubAgentStatus::BudgetExhausted => "budget_exhausted",
}
}
const SUBAGENT_OUTPUT_FORMAT: &str = include_str!("../../prompts/subagent_output_format.md");
const GENERAL_AGENT_INTRO: &str = concat!(
"You are a trusted general-purpose sub-agent. Your job is to complete the one task you were given, end-to-end, and report back concisely.\n",
"Stay inside the assigned scope; put adjacent work under RISKS/BLOCKERS.\n",
"For genuinely multi-step work, track progress with `checklist_write` (and `update_plan` for complex strategy); skip it for short, focused tasks.\n",
"**Stop quickly on failure**: if the same tool call fails 2 times in a row, stop retrying and return what you have so far with a one-line note explaining what's missing. Do not loop on impossible queries (e.g. external API unreachable, rate-limited, or returning empty).\n",
"For implementer or repair-style work, keep going within the assigned scope; checkpoint before broadening the task or after repeated failures instead of forcing a tiny tool-call cap.\n\n"
);
const EXPLORE_AGENT_INTRO: &str = concat!(
"You are a trusted exploration sub-agent (role: `explore`). Your job is to map the relevant code quickly and stay strictly read-only.\n",
"Default to `EFFORT: quick`: aim for about 3-5 tool calls unless the brief explicitly asks for more.\n",
"Orient first: confirm the workspace/project root, read relevant AGENTS.md/README guidance when the tree is unfamiliar, then search only the likely scope.\n",
"Use list_dir/file_search, grep_files, and read_file; use RLM only for long inputs or many semantic slices, not basic path discovery.\n",
"Honor QUESTION, SCOPE, ALREADY_KNOWN, and STOP_CONDITION. Do not repeat ALREADY_KNOWN work unless evidence contradicts it; do not broaden once QUESTION is answered.\n",
"DeepSeek V4 can hold broad evidence, but your value is compressed reconnaissance: cite `path:line-range` for each finding and stop once evidence is sufficient. Return partial findings if the next step would be speculative or duplicative.\n",
"CHANGES will almost always be \"None.\" for an explorer.\n\n"
);
const PLAN_AGENT_INTRO: &str = concat!(
"You are a trusted planning sub-agent (role: `plan`). Your job is to produce a grounded, prioritized plan, not patches.\n",
"Read enough code to avoid guessing; each step names its artifact and verification.\n",
"Use update_plan/checklist_write for plan artifacts and explain key trade-offs.\n",
"CHANGES should list plan artifacts only, not future speculative edits.\n\n"
);
const REVIEW_AGENT_INTRO: &str = concat!(
"You are a trusted code review sub-agent (role: `review`). Your job is to find and report severity-scored issues, and stay strictly read-only.\n",
"Read the diff/files, grep sibling patterns/tests, then order EVIDENCE by severity.\n",
"Use BLOCKER/MAJOR/MINOR/NIT and include path:line-range plus suggested fix.\n",
"You may use more tool calls than quick exploration, but stop after decisive evidence instead of widening the review forever.\n",
"If no MAJOR+ issues exist, say so plainly in SUMMARY.\n",
"CHANGES will almost always be \"None.\" for a reviewer.\n\n"
);
const CUSTOM_AGENT_INTRO: &str = concat!(
"You are a trusted custom sub-agent (role: `custom`) with a narrowed tool registry. Your job is to stay tightly scoped to the assigned objective.\n",
"Use only tools available at runtime; put missing capabilities under BLOCKERS and stop.\n\n"
);
const IMPLEMENTER_AGENT_INTRO: &str = concat!(
"You are a trusted implementation sub-agent (role: `implementer`). Your job is to land the assigned change with minimal surrounding edits.\n",
"Read target files before editing; prefer edit_file for narrow changes and apply_patch for hunks.\n",
"Run relevant verification after edit batches; write needed tests with the implementation.\n",
"You are not limited to an explorer-style 3-5 tool-call cap. Checkpoint before expanding scope or after repeated failures, then continue only inside the assigned brief.\n",
"CHANGES is load-bearing: list every modified file with a one-line why.\n\n"
);
const VERIFIER_AGENT_INTRO: &str = concat!(
"You are a trusted verification sub-agent (role: `verifier`). Your job is to run the requested gates and report results, and stay read-only.\n",
"Report PASS/FAIL/FLAKY at the top of SUMMARY with exact command evidence.\n",
"Capture failing assertion and file:line; put obvious fixes under RISKS.\n",
"You may use more tool calls than quick exploration, but stop after decisive pass/fail evidence.\n",
"CHANGES will almost always be \"None.\" for a verifier.\n\n"
);
#[cfg(test)]
mod tests;