#![allow(dead_code)]
use std::collections::{HashMap, VecDeque};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, RwLock, Semaphore};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::client::DeepSeekClient;
use crate::config::MAX_SUBAGENTS;
use crate::core::events::Event;
use crate::dependencies::{ExternalTool, Git};
use crate::llm_client::LlmClient;
use crate::models::{ContentBlock, Message, MessageRequest, MessageResponse, SystemPrompt, Tool};
use crate::request_tuning::RequestTuning;
use crate::tools::handle::VarHandle;
use crate::tools::plan::{PlanState, SharedPlanState};
use crate::tools::registry::{ToolRegistry, ToolRegistryBuilder};
use crate::tools::spec::{
ApprovalRequirement, ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec,
};
use crate::tools::todo::{SharedTodoList, TodoList};
use crate::tui::app::ReasoningEffort;
use crate::utils::spawn_supervised;
use crate::worker_profile::{ModelRoute, ToolScope, WorkerRuntimeProfile};
pub mod mailbox;
#[allow(unused_imports)]
pub use mailbox::{Mailbox, MailboxEnvelope, MailboxMessage, MailboxReceiver};
static RESIDENT_LEASES: std::sync::OnceLock<
std::sync::Mutex<std::collections::HashMap<String, String>>,
> = std::sync::OnceLock::new();
fn release_resident_leases_for(agent_id: &str) {
if let Some(lock) = RESIDENT_LEASES.get()
&& let Ok(mut guard) = lock.lock()
{
guard.retain(|_, owner| owner != agent_id);
}
}
const DEFAULT_MAX_STEPS: u32 = u32::MAX;
const TOOL_TIMEOUT: Duration = Duration::from_secs(30);
fn format_step_counter(steps: u32, max_steps: u32) -> String {
if max_steps == u32::MAX {
format!("step {steps}")
} else {
format!("step {steps}/{max_steps}")
}
}
const SUBAGENT_RESPONSE_MAX_TOKENS: u32 = 16_384;
const MAX_CONSECUTIVE_TRUNCATED_SUBAGENT_RESPONSES: u32 = 5;
const DEFAULT_STEP_API_TIMEOUT: Duration =
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_API_TIMEOUT_SECS);
const DEFAULT_RESULT_TIMEOUT_MS: u64 = 30_000;
const MAX_RESULT_TIMEOUT_MS: u64 = 3_600_000;
const COMPLETED_AGENT_RETENTION: Duration = Duration::from_secs(60 * 60);
const MAX_AGENT_WORKER_RECORDS: usize = 256;
const MAX_AGENT_WORKER_EVENTS_PER_RECORD: usize = 128;
const SUBAGENT_STATE_SCHEMA_VERSION: u32 = 1;
const SUBAGENT_STATE_FILE: &str = "subagents.v1.json";
const SUBAGENT_RESTART_REASON: &str = "Interrupted by process restart";
const SUBAGENT_QUEUED_LAUNCH_REASON: &str = "queued: waiting for a sub-agent launch slot";
const SUBAGENT_MODEL_WAIT_REASON: &str = "waiting for model response";
const SUBAGENT_PERSIST_DEBOUNCE: Duration = Duration::from_millis(1500);
static SUBAGENT_PERSIST_WRITES: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
static SUBAGENT_PERSIST_SKIPPED: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
fn subagent_perf_enabled() -> bool {
static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
*ENABLED.get_or_init(|| {
std::env::var("CODEWHALE_SUBAGENT_PERF_TRACE")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
})
}
const VALID_SUBAGENT_TYPES: &str = "general (aliases: general-purpose, general_purpose, worker, default), \
explore (aliases: exploration, explorer), plan (aliases: planning, planner, awaiter), \
review (aliases: code-review, code_review, reviewer), implementer (aliases: implement, implementation, builder), \
verifier (aliases: verify, verification, validator, tester), custom";
const VALID_ROLE_ALIASES: &str = "default; worker (aliases: general, general-purpose, general_purpose); \
explorer (aliases: explore, exploration); awaiter (aliases: plan, planning, planner); \
reviewer (aliases: review, code-review, code_review); implementer (aliases: implement, implementation, builder); \
verifier (aliases: verify, verification, validator, tester); custom";
const SUBAGENT_TYPE_DESCRIPTION: &str = "Sub-agent type. Accepted vocabulary: general (aliases: general-purpose, general_purpose, worker, default), \
explore (aliases: exploration, explorer), plan (aliases: planning, planner, awaiter), \
review (aliases: code-review, code_review, reviewer), implementer (aliases: implement, implementation, builder), \
verifier (aliases: verify, verification, validator, tester), custom.";
const SUBAGENT_ROLE_DESCRIPTION: &str = "Role alias. Accepted vocabulary: default; worker (aliases: general, general-purpose, general_purpose); \
explorer (aliases: explore, exploration); awaiter (aliases: plan, planning, planner); \
reviewer (aliases: review, code-review, code_review); implementer (aliases: implement, implementation, builder); \
verifier (aliases: verify, verification, validator, tester); custom. \
Must match `type` if both are given.";
pub const WHALE_NICKNAMES: &[&str] = &[
"Blue",
"蓝鲸",
"Humpback",
"座头鲸",
"Sperm",
"抹香鲸",
"Fin",
"长须鲸",
"Sei",
"塞鲸",
"Bryde's",
"布氏鲸",
"Minke",
"小须鲸",
"Antarctic Minke",
"南极小须鲸",
"Pygmy Right",
"小露脊鲸",
"Omura's",
"大村鲸",
"Eden's",
"艾氏鲸",
"Rice's",
"赖斯鲸",
"Gray",
"灰鲸",
"Bowhead",
"弓头鲸",
"North Atlantic Right",
"北大西洋露脊鲸",
"North Pacific Right",
"北太平洋露脊鲸",
"Southern Right",
"南露脊鲸",
"Beluga",
"白鲸",
"Narwhal",
"独角鲸",
"Orca",
"虎鲸",
"Pilot",
"领航鲸",
"False Killer",
"伪虎鲸",
"Pygmy Killer",
"小虎鲸",
"Melon-headed",
"瓜头鲸",
"Beaked",
"喙鲸",
"Cuvier's Beaked",
"柯氏喙鲸",
"Baird's Beaked",
"贝氏喙鲸",
"Blainville's Beaked",
"柏氏喙鲸",
"Ginkgo-toothed Beaked",
"银杏齿喙鲸",
"Strap-toothed",
"带齿喙鲸",
"Stejneger's Beaked",
"斯氏喙鲸",
"Dwarf Sperm",
"小抹香鲸",
"Pygmy Sperm",
"侏儒抹香鲸",
"Rough-toothed",
"糙齿海豚",
"Atlantic Spotted",
"大西洋斑海豚",
"Pantropical Spotted",
"热带斑海豚",
"Spinner",
"长吻飞旋海豚",
"Clymene",
"短吻飞旋海豚",
"Striped",
"条纹海豚",
"Common Bottlenose",
"宽吻海豚",
"Indo-Pacific Bottlenose",
"印太瓶鼻海豚",
"Risso's",
"灰海豚",
"Commerson's",
"花斑海豚",
"Chilean",
"智利海豚",
"Heaviside's",
"海氏矮海豚",
"Hector's",
"赫氏矮海豚",
"Amazon River",
"亚马逊河豚",
"Ganges River",
"恒河豚",
"Indus River",
"印度河豚",
"La Plata",
"拉普拉塔河豚",
"Franciscana",
"拉河豚",
];
#[must_use]
pub fn whale_name_for_id(id: &str) -> String {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
id.hash(&mut hasher);
let idx = (hasher.finish() as usize) % WHALE_NICKNAMES.len();
WHALE_NICKNAMES[idx].to_string()
}
#[must_use]
pub fn assign_unique_whale_name(
id: &str,
active_names: &std::collections::HashSet<String>,
) -> String {
let base = whale_name_for_id(id);
if !active_names.contains(&base) {
return base;
}
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
id.hash(&mut hasher);
let suffix_seed = hasher.finish();
for i in 2.. {
let candidate = format!("{base} ({i})");
if !active_names.contains(&candidate) {
return candidate;
}
let probe = (suffix_seed.wrapping_add(i as u64)) % 100;
let candidate2 = format!("{base} ({probe})");
if !active_names.contains(&candidate2) {
return candidate2;
}
}
format!("{base} ({})", id.get(..4).unwrap_or("?"))
}
const DEPRECATION_REMOVAL_VERSION: &str = "0.8.0";
#[must_use]
pub fn whale_nickname_for_index(index: usize) -> String {
let base = WHALE_NICKNAMES[index % WHALE_NICKNAMES.len()];
if index < WHALE_NICKNAMES.len() {
base.to_string()
} else {
format!("{base} {}", index / WHALE_NICKNAMES.len() + 1)
}
}
fn wrap_with_deprecation_notice(
mut result: ToolResult,
this_tool: &str,
use_instead: &str,
) -> ToolResult {
tracing::warn!(
"Deprecated tool '{}' invoked — use '{}' instead (removal: v{})",
this_tool,
use_instead,
DEPRECATION_REMOVAL_VERSION,
);
let notice = json!({
"_deprecation": {
"this_tool": this_tool,
"use_instead": use_instead,
"removed_in": DEPRECATION_REMOVAL_VERSION,
"message": format!(
"Tool '{}' is deprecated; switch to '{}' before v{}.",
this_tool, use_instead, DEPRECATION_REMOVAL_VERSION
)
}
});
result.metadata = Some(match result.metadata.take() {
Some(Value::Object(mut map)) => {
if let Value::Object(notice_map) = notice {
map.extend(notice_map);
}
Value::Object(map)
}
Some(other) => {
json!({ "_deprecation": notice["_deprecation"].clone(), "_original_metadata": other })
}
None => notice,
});
result
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubAgentAssignment {
pub objective: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
}
impl SubAgentAssignment {
fn new(objective: String, role: Option<String>) -> Self {
Self { objective, role }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum SubAgentType {
#[default]
General,
Explore,
Plan,
Review,
Implementer,
Verifier,
Custom,
}
impl SubAgentType {
#[must_use]
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"general" | "general-purpose" | "general_purpose" | "worker" | "default" => {
Some(Self::General)
}
"explore" | "exploration" | "explorer" => Some(Self::Explore),
"plan" | "planning" | "planner" | "awaiter" => Some(Self::Plan),
"review" | "code-review" | "code_review" | "reviewer" => Some(Self::Review),
"implementer" | "implement" | "implementation" | "builder" => Some(Self::Implementer),
"verifier" | "verify" | "verification" | "validator" | "tester" => Some(Self::Verifier),
"custom" => Some(Self::Custom),
_ => None,
}
}
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::General => "general",
Self::Explore => "explore",
Self::Plan => "plan",
Self::Review => "review",
Self::Implementer => "implementer",
Self::Verifier => "verifier",
Self::Custom => "custom",
}
}
#[must_use]
pub fn system_prompt(&self) -> String {
let role_intro = match self {
Self::General => GENERAL_AGENT_INTRO,
Self::Explore => EXPLORE_AGENT_INTRO,
Self::Plan => PLAN_AGENT_INTRO,
Self::Review => REVIEW_AGENT_INTRO,
Self::Implementer => IMPLEMENTER_AGENT_INTRO,
Self::Verifier => VERIFIER_AGENT_INTRO,
Self::Custom => CUSTOM_AGENT_INTRO,
};
format!("{role_intro}{SUBAGENT_OUTPUT_FORMAT}")
}
#[must_use]
#[deprecated(
since = "0.6.6",
note = "Default sub-agents inherit the full parent registry; pass an explicit allowed_tools list only for narrow Custom roles."
)]
pub fn allowed_tools(&self) -> Vec<&'static str> {
match self {
Self::General => vec![
"list_dir",
"read_file",
"write_file",
"edit_file",
"apply_patch",
"grep_files",
"file_search",
"web.run",
"web_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
"note",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
"update_plan",
],
Self::Explore => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"web.run",
"web_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
],
Self::Plan => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"web.run",
"note",
"update_plan",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
],
Self::Review => vec!["list_dir", "read_file", "grep_files", "file_search", "note"],
Self::Implementer => vec![
"list_dir",
"read_file",
"write_file",
"edit_file",
"apply_patch",
"grep_files",
"file_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
"note",
"checklist_write",
"checklist_add",
"checklist_update",
"checklist_list",
"todo_write",
"todo_add",
"todo_update",
"todo_list",
"update_plan",
],
Self::Verifier => vec![
"list_dir",
"read_file",
"grep_files",
"file_search",
"exec_shell",
"exec_shell_wait",
"exec_shell_interact",
"exec_wait",
"exec_interact",
"run_tests",
"run_verifiers",
"diagnostics",
"note",
],
Self::Custom => vec![], }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SubAgentStatus {
Running,
Completed,
Interrupted(String),
Failed(String),
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubAgentNeedsInput {
pub question: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentResult {
pub name: String,
pub agent_id: String,
pub context_mode: String,
pub fork_context: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workspace: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git_branch: Option<String>,
pub agent_type: SubAgentType,
pub assignment: SubAgentAssignment,
#[serde(default)]
pub model: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nickname: Option<String>,
pub status: SubAgentStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_status: Option<AgentWorkerStatus>,
pub result: Option<String>,
pub steps_taken: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint: Option<SubAgentCheckpoint>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub needs_input: Option<SubAgentNeedsInput>,
pub duration_ms: u64,
#[serde(default, skip_serializing_if = "is_false")]
pub from_prior_session: bool,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AgentWorkerStatus {
Queued,
Starting,
Running,
WaitingForUser,
ModelWait,
RunningTool,
Completed,
Failed,
Cancelled,
Interrupted,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AgentWorkerToolProfile {
Inherited,
Explicit(Vec<String>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentWorkerSpec {
pub worker_id: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub run_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_run_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_name: Option<String>,
pub objective: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
pub agent_type: SubAgentType,
pub model: String,
pub workspace: PathBuf,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git_branch: Option<String>,
pub context_mode: String,
pub fork_context: bool,
pub tool_profile: AgentWorkerToolProfile,
#[serde(default)]
pub runtime_profile: WorkerRuntimeProfile,
pub max_steps: u32,
pub spawn_depth: u32,
pub max_spawn_depth: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunFollowUpDelivery {
pub delivered: bool,
pub timestamp_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message_preview: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
pub interrupt: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub continued_from_checkpoint: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunFollowUpTarget {
#[serde(default = "default_agent_inspect_tool")]
pub tool: String,
pub agent_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_name: Option<String>,
#[serde(default)]
pub accepted_statuses: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub latest_delivery: Option<AgentRunFollowUpDelivery>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunTakeoverTarget {
#[serde(default = "default_subagent_takeover_kind")]
pub kind: String,
#[serde(default)]
pub supported: bool,
pub agent_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_name: Option<String>,
pub instructions: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub unsupported_reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunArtifactRef {
pub kind: String,
pub name: String,
pub target: String,
pub description: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunUsage {
pub status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub total_tokens: Option<u64>,
pub note: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunVerificationSummary {
pub status: String,
pub summary: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentRunRecommendedAction {
pub action: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool: Option<String>,
pub reason: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentWorkerEvent {
pub seq: u64,
pub worker_id: String,
pub status: AgentWorkerStatus,
pub timestamp_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub step: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentWorkerRecord {
pub spec: AgentWorkerSpec,
#[serde(default = "default_subagent_actor_kind")]
pub actor_kind: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_run_id: Option<String>,
#[serde(default = "default_agent_run_follow_up")]
pub follow_up: AgentRunFollowUpTarget,
#[serde(default = "default_agent_run_takeover")]
pub takeover: AgentRunTakeoverTarget,
#[serde(default)]
pub artifacts: Vec<AgentRunArtifactRef>,
#[serde(default = "default_agent_run_usage")]
pub usage: AgentRunUsage,
#[serde(default = "default_agent_run_verification")]
pub verification: AgentRunVerificationSummary,
#[serde(default = "default_agent_run_recommended_action")]
pub recommended_action: AgentRunRecommendedAction,
pub status: AgentWorkerStatus,
pub created_at_ms: u64,
pub updated_at_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub started_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub latest_message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result_summary: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default)]
pub steps_taken: u32,
#[serde(default)]
pub events: VecDeque<AgentWorkerEvent>,
}
impl AgentWorkerRecord {
fn new(spec: AgentWorkerSpec, now_ms: u64) -> Self {
let run_id = agent_worker_run_id(&spec);
let artifacts = default_subagent_artifacts(&run_id);
let follow_up = follow_up_target_for_spec(&spec);
let takeover = takeover_target_for_spec(&spec);
let recommended_action =
recommended_action_for_worker_status(AgentWorkerStatus::Starting, &spec);
Self {
parent_run_id: spec.parent_run_id.clone(),
spec,
actor_kind: default_subagent_actor_kind(),
follow_up,
takeover,
artifacts,
usage: default_agent_run_usage(),
verification: default_agent_run_verification(),
recommended_action,
status: AgentWorkerStatus::Starting,
created_at_ms: now_ms,
updated_at_ms: now_ms,
started_at_ms: None,
completed_at_ms: None,
latest_message: None,
result_summary: None,
error: None,
steps_taken: 0,
events: VecDeque::new(),
}
}
}
fn default_subagent_actor_kind() -> String {
"subagent".to_string()
}
fn default_agent_inspect_tool() -> String {
"handle_read".to_string()
}
fn default_subagent_takeover_kind() -> String {
"local_subagent_session".to_string()
}
fn default_agent_run_follow_up() -> AgentRunFollowUpTarget {
AgentRunFollowUpTarget {
tool: default_agent_inspect_tool(),
agent_id: String::new(),
session_name: None,
accepted_statuses: vec!["running".to_string(), "interrupted_continuable".to_string()],
latest_delivery: None,
}
}
fn default_agent_run_takeover() -> AgentRunTakeoverTarget {
AgentRunTakeoverTarget {
kind: default_subagent_takeover_kind(),
supported: false,
agent_id: String::new(),
session_name: None,
instructions: "No takeover target is available for this older record.".to_string(),
unsupported_reason: Some("legacy_record_missing_agent_id".to_string()),
}
}
fn default_agent_run_usage() -> AgentRunUsage {
AgentRunUsage {
status: "unknown".to_string(),
total_tokens: None,
note: "Token usage is not yet reported by the sub-agent worker ledger.".to_string(),
}
}
fn default_agent_run_verification() -> AgentRunVerificationSummary {
AgentRunVerificationSummary {
status: "self_report_only".to_string(),
summary:
"No verified command or test receipt is attached; treat the result summary as a child self-report."
.to_string(),
}
}
fn default_agent_run_recommended_action() -> AgentRunRecommendedAction {
AgentRunRecommendedAction {
action: "inspect_transcript".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: "Inspect the returned transcript handle if the child result needs audit detail."
.to_string(),
}
}
fn recommended_action_for_worker_status(
status: AgentWorkerStatus,
spec: &AgentWorkerSpec,
) -> AgentRunRecommendedAction {
let agent_ref = spec
.session_name
.as_deref()
.filter(|name| !name.is_empty())
.unwrap_or(&spec.worker_id);
match status {
AgentWorkerStatus::Queued => AgentRunRecommendedAction {
action: "continue_parent_work".to_string(),
tool: None,
reason: format!(
"Worker {agent_ref} is queued in the background; continue coordinating and consume its completion event when it arrives."
),
},
AgentWorkerStatus::Starting
| AgentWorkerStatus::Running
| AgentWorkerStatus::ModelWait
| AgentWorkerStatus::RunningTool => AgentRunRecommendedAction {
action: "continue_parent_work".to_string(),
tool: None,
reason: format!(
"Worker {agent_ref} is active in the background; continue parent work until its completion event arrives."
),
},
AgentWorkerStatus::WaitingForUser => AgentRunRecommendedAction {
action: "inspect_or_replace".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: format!(
"Worker {agent_ref} needs parent action; inspect the transcript handle and open a replacement with agent if the task still matters."
),
},
AgentWorkerStatus::Completed => AgentRunRecommendedAction {
action: "verify_self_report".to_string(),
tool: Some("handle_read".to_string()),
reason: format!(
"Worker {agent_ref} completed; verify its self-report before treating side effects as fact."
),
},
AgentWorkerStatus::Failed => AgentRunRecommendedAction {
action: "inspect_failure".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: format!(
"Worker {agent_ref} failed; inspect the transcript handle and decide whether to open a replacement."
),
},
AgentWorkerStatus::Cancelled => AgentRunRecommendedAction {
action: "open_replacement_if_needed".to_string(),
tool: Some("agent".to_string()),
reason: format!(
"Worker {agent_ref} was cancelled; open a replacement with agent only if the assignment still matters."
),
},
AgentWorkerStatus::Interrupted => AgentRunRecommendedAction {
action: "inspect_or_replace".to_string(),
tool: Some(default_agent_inspect_tool()),
reason: format!(
"Worker {agent_ref} was interrupted; inspect the transcript handle before deciding whether to re-dispatch."
),
},
}
}
fn agent_worker_run_id(spec: &AgentWorkerSpec) -> String {
if spec.run_id.is_empty() {
spec.worker_id.clone()
} else {
spec.run_id.clone()
}
}
fn follow_up_target_for_spec(spec: &AgentWorkerSpec) -> AgentRunFollowUpTarget {
AgentRunFollowUpTarget {
tool: default_agent_inspect_tool(),
agent_id: spec.worker_id.clone(),
session_name: spec.session_name.clone(),
accepted_statuses: vec!["running".to_string(), "interrupted_continuable".to_string()],
latest_delivery: None,
}
}
fn takeover_target_for_spec(spec: &AgentWorkerSpec) -> AgentRunTakeoverTarget {
let agent_ref = spec
.session_name
.as_deref()
.filter(|name| !name.is_empty())
.unwrap_or(&spec.worker_id);
AgentRunTakeoverTarget {
kind: default_subagent_takeover_kind(),
supported: true,
agent_id: spec.worker_id.clone(),
session_name: spec.session_name.clone(),
instructions: format!(
"Inspect agent '{agent_ref}' through the returned transcript_handle with handle_read; open a replacement with agent if the lane no longer fits."
),
unsupported_reason: None,
}
}
fn default_subagent_artifacts(run_id: &str) -> Vec<AgentRunArtifactRef> {
vec![
AgentRunArtifactRef {
kind: "worker_events".to_string(),
name: "worker_record.events".to_string(),
target: run_id.to_string(),
description: "Bounded structured lifecycle events retained on the worker record."
.to_string(),
},
AgentRunArtifactRef {
kind: "transcript".to_string(),
name: "transcript_handle".to_string(),
target: format!("agent:{run_id}"),
description:
"Use the projection transcript_handle with handle_read for the child transcript."
.to_string(),
},
AgentRunArtifactRef {
kind: "receipt".to_string(),
name: "result_summary".to_string(),
target: run_id.to_string(),
description: "Child final summary when present; verify before treating as fact."
.to_string(),
},
]
}
fn message_preview(text: &str) -> String {
const MAX_PREVIEW_CHARS: usize = 120;
let mut preview: String = text.chars().take(MAX_PREVIEW_CHARS).collect();
if text.chars().count() > MAX_PREVIEW_CHARS {
preview.push_str("...");
}
preview
}
fn normalize_worker_spec(mut spec: AgentWorkerSpec) -> AgentWorkerSpec {
if spec.run_id.is_empty() {
spec.run_id = spec.worker_id.clone();
}
spec
}
fn worker_tool_scope(tool_profile: &AgentWorkerToolProfile) -> ToolScope {
match tool_profile {
AgentWorkerToolProfile::Inherited => ToolScope::Inherit,
AgentWorkerToolProfile::Explicit(tools) => ToolScope::Explicit(tools.clone()),
}
}
fn worker_profile_from_spec(spec: &AgentWorkerSpec) -> WorkerRuntimeProfile {
let mut profile = WorkerRuntimeProfile::for_role(spec.agent_type.clone());
profile.tools = worker_tool_scope(&spec.tool_profile);
profile.model = ModelRoute::Fixed(spec.model.clone());
profile.max_spawn_depth = spec.max_spawn_depth.saturating_sub(spec.spawn_depth);
profile.background = true;
profile
}
fn worker_profile_for_spawn(
runtime: &SubAgentRuntime,
agent_type: &SubAgentType,
tool_profile: &AgentWorkerToolProfile,
effective_model: &str,
model_route: Option<ModelRoute>,
) -> WorkerRuntimeProfile {
let mut requested = WorkerRuntimeProfile::for_role(agent_type.clone());
requested.tools = worker_tool_scope(tool_profile);
requested.model = model_route.unwrap_or_else(|| ModelRoute::Fixed(effective_model.to_string()));
requested.provider = Some(runtime.client.api_provider().as_str().to_string());
requested.max_spawn_depth = runtime.max_spawn_depth.saturating_sub(runtime.spawn_depth);
requested.background = true;
runtime.worker_profile.derive_child(&requested)
}
fn normalize_worker_record(mut record: AgentWorkerRecord) -> AgentWorkerRecord {
record.spec = normalize_worker_spec(record.spec);
if record.spec.runtime_profile == WorkerRuntimeProfile::default() {
record.spec.runtime_profile = worker_profile_from_spec(&record.spec);
}
let run_id = agent_worker_run_id(&record.spec);
if record.actor_kind.is_empty() {
record.actor_kind = default_subagent_actor_kind();
}
if record.parent_run_id.is_none() {
record.parent_run_id = record.spec.parent_run_id.clone();
}
if record.follow_up.agent_id.is_empty() {
record.follow_up = follow_up_target_for_spec(&record.spec);
} else if record.follow_up.tool != default_agent_inspect_tool() {
record.follow_up.tool = default_agent_inspect_tool();
}
if record.takeover.agent_id.is_empty()
|| !record
.takeover
.instructions
.contains(&default_agent_inspect_tool())
{
record.takeover = takeover_target_for_spec(&record.spec);
}
record.recommended_action = recommended_action_for_worker_status(record.status, &record.spec);
if record.artifacts.is_empty() {
record.artifacts = default_subagent_artifacts(&run_id);
}
if record.usage.status.is_empty() {
record.usage = default_agent_run_usage();
}
if record.verification.status.is_empty() {
record.verification = default_agent_run_verification();
}
record
}
fn is_false(b: &bool) -> bool {
!*b
}
fn current_git_branch(workspace: &Path) -> Option<String> {
let branch = run_git(workspace, &["rev-parse", "--abbrev-ref", "HEAD"])?;
let branch = branch.trim();
if branch.is_empty() {
return None;
}
if branch != "HEAD" {
return Some(branch.to_string());
}
let short_hash = run_git(workspace, &["rev-parse", "--short", "HEAD"])?;
let short_hash = short_hash.trim();
(!short_hash.is_empty()).then(|| format!("detached:{short_hash}"))
}
fn run_git(workspace: &Path, args: &[&str]) -> Option<String> {
let output = Git::output(args, workspace).ok()?;
output
.status
.success()
.then(|| String::from_utf8_lossy(&output.stdout).to_string())
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SubAgentSpawnOptions {
pub name: Option<String>,
pub model: Option<String>,
pub model_route: Option<ModelRoute>,
pub nickname: Option<String>,
pub fork_context: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SubAgentModelStrength {
Same,
Faster,
}
impl SubAgentModelStrength {
fn parse(value: &str) -> Result<Self, ToolError> {
let normalized = value.trim().to_ascii_lowercase();
match normalized.as_str() {
"same" | "inherit" | "parent" | "current" => Ok(Self::Same),
"faster" | "fast" | "smaller" | "small" | "lower" | "cheap" | "flash" => {
Ok(Self::Faster)
}
_ => Err(ToolError::invalid_input(
"model_strength must be one of: same, faster".to_string(),
)),
}
}
fn model_route(self) -> ModelRoute {
match self {
Self::Same => ModelRoute::Inherit,
Self::Faster => ModelRoute::Faster,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SubAgentThinking {
Inherit,
Auto,
Effort(ReasoningEffort),
}
impl SubAgentThinking {
fn parse(value: &str) -> Result<Self, ToolError> {
let normalized = value.trim().to_ascii_lowercase();
match normalized.as_str() {
"inherit" | "parent" | "same" | "current" => Ok(Self::Inherit),
"auto" | "automatic" => Ok(Self::Auto),
"off" | "disabled" | "none" | "false" => Ok(Self::Effort(ReasoningEffort::Off)),
"low" | "minimal" => Ok(Self::Effort(ReasoningEffort::Low)),
"medium" | "mid" => Ok(Self::Effort(ReasoningEffort::Medium)),
"high" => Ok(Self::Effort(ReasoningEffort::High)),
"max" | "maximum" | "xhigh" | "ultracode" => Ok(Self::Effort(ReasoningEffort::Max)),
_ => Err(ToolError::invalid_input(
"thinking must be one of: inherit, auto, off, low, medium, high, max".to_string(),
)),
}
}
}
#[derive(Debug, Clone)]
struct SubAgentInput {
text: String,
interrupt: bool,
}
#[derive(Debug, Clone)]
struct SpawnRequest {
session_name: Option<String>,
prompt: String,
agent_type: SubAgentType,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
model: Option<String>,
model_strength: SubAgentModelStrength,
thinking: SubAgentThinking,
cwd: Option<PathBuf>,
resident_file: Option<String>,
fork_context: bool,
max_depth: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SubAgentCheckpoint {
pub checkpoint_id: String,
pub agent_id: String,
pub continuation_handle: String,
pub reason: String,
pub continuable: bool,
pub steps_taken: u32,
pub message_count: usize,
pub created_at_ms: u64,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub messages: Vec<Message>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedSubAgent {
id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
session_name: Option<String>,
#[serde(default)]
fork_context: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
workspace: Option<PathBuf>,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
#[serde(default)]
model: String,
#[serde(default)]
nickname: Option<String>,
status: SubAgentStatus,
result: Option<String>,
steps_taken: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
checkpoint: Option<SubAgentCheckpoint>,
#[serde(default, skip_serializing_if = "Option::is_none")]
needs_input: Option<SubAgentNeedsInput>,
duration_ms: u64,
allowed_tools: Vec<String>,
updated_at_ms: u64,
#[serde(default)]
session_boot_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedSubAgentState {
schema_version: u32,
agents: Vec<PersistedSubAgent>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
workers: Vec<AgentWorkerRecord>,
}
impl Default for PersistedSubAgentState {
fn default() -> Self {
Self {
schema_version: SUBAGENT_STATE_SCHEMA_VERSION,
agents: Vec::new(),
workers: Vec::new(),
}
}
}
pub const DEFAULT_MAX_SPAWN_DEPTH: u32 = codewhale_config::DEFAULT_SPAWN_DEPTH;
#[derive(Debug, Clone)]
pub struct SubAgentCompletion {
#[allow(dead_code)]
pub agent_id: String,
pub payload: String,
}
#[derive(Clone, Debug)]
pub struct SubAgentForkContext {
pub system: Option<SystemPrompt>,
pub messages: Vec<Message>,
pub structured_state_block: Option<String>,
}
#[derive(Clone)]
pub struct SubAgentRuntime {
pub client: DeepSeekClient,
pub model: String,
pub auto_model: bool,
pub reasoning_effort: Option<String>,
pub reasoning_effort_auto: bool,
pub role_models: HashMap<String, String>,
pub context: ToolContext,
pub allow_shell: bool,
pub worker_profile: WorkerRuntimeProfile,
pub event_tx: Option<mpsc::Sender<Event>>,
pub manager: SharedSubAgentManager,
pub spawn_depth: u32,
pub max_spawn_depth: u32,
pub cancel_token: CancellationToken,
pub mailbox: Option<Mailbox>,
pub parent_completion_tx: Option<mpsc::UnboundedSender<SubAgentCompletion>>,
pub fork_context: Option<SubAgentForkContext>,
pub mcp_pool: Option<std::sync::Arc<tokio::sync::Mutex<crate::mcp::McpPool>>>,
pub step_api_timeout: Duration,
pub speech_output_dir: Option<PathBuf>,
}
impl SubAgentRuntime {
#[must_use]
pub fn new(
client: DeepSeekClient,
model: String,
context: ToolContext,
allow_shell: bool,
event_tx: Option<mpsc::Sender<Event>>,
manager: SharedSubAgentManager,
) -> Self {
Self {
client,
model,
auto_model: false,
reasoning_effort: None,
reasoning_effort_auto: false,
role_models: HashMap::new(),
context,
allow_shell,
worker_profile: WorkerRuntimeProfile::for_role(SubAgentType::General),
event_tx,
manager,
spawn_depth: 0,
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
parent_completion_tx: None,
fork_context: None,
mcp_pool: None,
step_api_timeout: DEFAULT_STEP_API_TIMEOUT,
speech_output_dir: None,
}
}
#[must_use]
pub fn with_mcp_pool(
mut self,
pool: Option<std::sync::Arc<tokio::sync::Mutex<crate::mcp::McpPool>>>,
) -> Self {
self.mcp_pool = pool;
self
}
#[must_use]
pub fn with_step_api_timeout(mut self, timeout: Duration) -> Self {
self.step_api_timeout = timeout;
self
}
#[must_use]
pub fn with_speech_output_dir(mut self, output_dir: Option<PathBuf>) -> Self {
self.speech_output_dir = output_dir;
self
}
#[must_use]
pub fn with_parent_completion_tx(
mut self,
tx: mpsc::UnboundedSender<SubAgentCompletion>,
) -> Self {
self.parent_completion_tx = Some(tx);
self
}
#[must_use]
pub fn with_fork_context(mut self, context: SubAgentForkContext) -> Self {
self.fork_context = Some(context);
self
}
#[must_use]
#[allow(dead_code)] pub fn with_mailbox(mut self, mailbox: Mailbox) -> Self {
self.mailbox = Some(mailbox);
self
}
#[must_use]
#[allow(dead_code)] pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
self.cancel_token = token;
self
}
#[must_use]
#[allow(dead_code)]
pub fn with_max_spawn_depth(mut self, max: u32) -> Self {
self.max_spawn_depth = max;
self
}
#[must_use]
pub fn with_role_models(mut self, role_models: HashMap<String, String>) -> Self {
self.role_models = role_models;
self
}
#[must_use]
pub fn with_auto_model(mut self, auto_model: bool) -> Self {
self.auto_model = auto_model;
self
}
#[must_use]
pub fn with_reasoning_effort(
mut self,
reasoning_effort: Option<String>,
reasoning_effort_auto: bool,
) -> Self {
self.reasoning_effort = reasoning_effort;
self.reasoning_effort_auto = reasoning_effort_auto;
self
}
#[must_use]
pub fn background_runtime(&self) -> Self {
let mut runtime = self.child_runtime();
let token = CancellationToken::new();
runtime.cancel_token = token.clone();
runtime.context.cancel_token = Some(token);
runtime
}
#[must_use]
pub fn child_runtime(&self) -> Self {
let mut child_context = self.context.clone();
child_context.auto_approve = self.context.auto_approve;
Self {
client: self.client.clone(),
model: self.model.clone(),
auto_model: self.auto_model,
reasoning_effort: self.reasoning_effort.clone(),
reasoning_effort_auto: self.reasoning_effort_auto,
role_models: self.role_models.clone(),
context: child_context,
allow_shell: self.allow_shell,
worker_profile: self.worker_profile.clone(),
event_tx: self.event_tx.clone(),
manager: self.manager.clone(),
spawn_depth: self.spawn_depth + 1,
max_spawn_depth: self.max_spawn_depth,
cancel_token: self.cancel_token.child_token(),
mailbox: self.mailbox.clone(),
parent_completion_tx: self.parent_completion_tx.clone(),
fork_context: self.fork_context.clone(),
mcp_pool: self.mcp_pool.clone(),
step_api_timeout: self.step_api_timeout,
speech_output_dir: self.speech_output_dir.clone(),
}
}
#[must_use]
pub fn would_exceed_depth(&self) -> bool {
self.spawn_depth + 1 > self.max_spawn_depth
}
}
pub struct SubAgent {
pub id: String,
pub session_name: String,
pub fork_context: bool,
pub agent_type: SubAgentType,
pub prompt: String,
pub assignment: SubAgentAssignment,
pub model: String,
pub nickname: Option<String>,
pub status: SubAgentStatus,
pub result: Option<String>,
pub steps_taken: u32,
pub checkpoint: Option<SubAgentCheckpoint>,
pub needs_input: Option<SubAgentNeedsInput>,
pub started_at: Instant,
pub last_activity_at: Instant,
pub allowed_tools: Option<Vec<String>>,
pub session_boot_id: String,
pub workspace: PathBuf,
input_tx: Option<mpsc::UnboundedSender<SubAgentInput>>,
task_handle: Option<JoinHandle<()>>,
}
impl SubAgent {
#[allow(clippy::too_many_arguments)]
fn new(
id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
model: String,
nickname: Option<String>,
allowed_tools: Option<Vec<String>>,
input_tx: mpsc::UnboundedSender<SubAgentInput>,
workspace: PathBuf,
session_boot_id: String,
) -> Self {
let session_name = id.clone();
let started_at = Instant::now();
Self {
id,
session_name,
fork_context: false,
agent_type,
prompt,
assignment,
model,
nickname,
status: SubAgentStatus::Running,
result: None,
steps_taken: 0,
checkpoint: None,
needs_input: None,
started_at,
last_activity_at: started_at,
allowed_tools,
session_boot_id,
workspace,
input_tx: Some(input_tx),
task_handle: None,
}
}
#[must_use]
pub fn snapshot(&self) -> SubAgentResult {
SubAgentResult {
name: self.session_name.clone(),
agent_id: self.id.clone(),
context_mode: if self.fork_context { "forked" } else { "fresh" }.to_string(),
fork_context: self.fork_context,
workspace: Some(self.workspace.clone()),
git_branch: current_git_branch(&self.workspace),
agent_type: self.agent_type.clone(),
assignment: self.assignment.clone(),
model: self.model.clone(),
nickname: self.nickname.clone(),
status: self.status.clone(),
worker_status: None,
result: self.result.clone(),
steps_taken: self.steps_taken,
checkpoint: self.checkpoint.clone(),
needs_input: self.needs_input.clone(),
duration_ms: u64::try_from(self.started_at.elapsed().as_millis()).unwrap_or(u64::MAX),
from_prior_session: false,
}
}
}
pub struct SubAgentManager {
agents: HashMap<String, SubAgent>,
worker_records: HashMap<String, AgentWorkerRecord>,
worker_event_seq: u64,
#[allow(dead_code)] workspace: PathBuf,
state_path: Option<PathBuf>,
max_steps: u32,
max_agents: usize,
running_heartbeat_timeout: Duration,
current_session_boot_id: String,
launch_gate: Arc<Semaphore>,
last_persist_at: Option<Instant>,
persist_pending: bool,
}
impl SubAgentManager {
#[must_use]
pub fn new(workspace: PathBuf, max_agents: usize) -> Self {
Self {
agents: HashMap::new(),
worker_records: HashMap::new(),
worker_event_seq: 0,
workspace,
state_path: None,
max_steps: DEFAULT_MAX_STEPS,
max_agents,
running_heartbeat_timeout: Duration::from_secs(
crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS,
),
current_session_boot_id: format!("boot_{}", &Uuid::new_v4().to_string()[..12]),
launch_gate: Arc::new(Semaphore::new(max_agents.max(1))),
last_persist_at: None,
persist_pending: false,
}
}
#[must_use]
pub fn with_launch_concurrency(mut self, limit: usize) -> Self {
self.launch_gate = Arc::new(Semaphore::new(limit.clamp(1, self.max_agents)));
self
}
#[cfg(test)]
pub fn session_boot_id(&self) -> &str {
&self.current_session_boot_id
}
fn is_from_prior_session(&self, agent: &SubAgent) -> bool {
agent.session_boot_id.is_empty() || agent.session_boot_id != self.current_session_boot_id
}
#[must_use]
fn with_state_path(mut self, path: PathBuf) -> Self {
self.state_path = Some(path);
self
}
#[must_use]
pub fn with_running_heartbeat_timeout(mut self, timeout: Duration) -> Self {
self.running_heartbeat_timeout = if timeout.is_zero() {
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS)
} else {
timeout
};
self
}
fn persist_state(&self) -> Result<()> {
let Some(path) = self.state_path.as_ref() else {
return Ok(());
};
let now_ms = epoch_millis_now();
let mut agents = Vec::with_capacity(self.agents.len());
for agent in self.agents.values() {
agents.push(PersistedSubAgent {
id: agent.id.clone(),
session_name: Some(agent.session_name.clone()),
fork_context: agent.fork_context,
workspace: Some(agent.workspace.clone()),
agent_type: agent.agent_type.clone(),
prompt: agent.prompt.clone(),
assignment: agent.assignment.clone(),
model: agent.model.clone(),
nickname: agent.nickname.clone(),
status: agent.status.clone(),
result: agent.result.clone(),
steps_taken: agent.steps_taken,
checkpoint: agent.checkpoint.clone(),
needs_input: agent.needs_input.clone(),
duration_ms: u64::try_from(agent.started_at.elapsed().as_millis())
.unwrap_or(u64::MAX),
allowed_tools: agent.allowed_tools.clone().unwrap_or_default(),
updated_at_ms: now_ms,
session_boot_id: agent.session_boot_id.clone(),
});
}
agents.sort_by(|a, b| a.id.cmp(&b.id));
let payload = PersistedSubAgentState {
schema_version: SUBAGENT_STATE_SCHEMA_VERSION,
agents,
workers: self.sorted_worker_records(),
};
write_json_atomic(path, &payload)
}
fn persist_state_best_effort(&self) {
if let Err(err) = self.persist_state() {
tracing::warn!(target: "subagent", ?err, "failed to persist sub-agent state");
}
}
fn persist_state_debounced(&mut self) {
let now = Instant::now();
let due = match self.last_persist_at {
Some(last) => now.duration_since(last) >= SUBAGENT_PERSIST_DEBOUNCE,
None => true,
};
if due {
self.last_persist_at = Some(now);
self.persist_pending = false;
self.persist_state_best_effort();
let writes =
SUBAGENT_PERSIST_WRITES.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
if subagent_perf_enabled() {
let skipped = SUBAGENT_PERSIST_SKIPPED.load(std::sync::atomic::Ordering::Relaxed);
tracing::info!(
target: "subagent_perf",
writes,
skipped,
agents = self.agents.len(),
"checkpoint persist (debounced write)"
);
}
} else {
self.persist_pending = true;
SUBAGENT_PERSIST_SKIPPED.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
pub fn flush_pending_persist(&mut self) {
if self.persist_pending {
self.last_persist_at = Some(Instant::now());
self.persist_pending = false;
self.persist_state_best_effort();
}
}
fn load_state(&mut self) -> Result<()> {
let Some(path) = self.state_path.as_ref() else {
return Ok(());
};
if !path.exists() {
return Ok(());
}
let raw = fs::read_to_string(path)?;
let state = serde_json::from_str::<PersistedSubAgentState>(&raw)?;
if state.schema_version != SUBAGENT_STATE_SCHEMA_VERSION {
return Err(anyhow!(
"Unsupported sub-agent state schema {}",
state.schema_version
));
}
self.agents.clear();
self.worker_records.clear();
for persisted in state.agents {
let mut status = persisted.status;
if matches!(status, SubAgentStatus::Running) {
status = SubAgentStatus::Interrupted(SUBAGENT_RESTART_REASON.to_string());
}
let started_at = instant_from_duration(Duration::from_millis(persisted.duration_ms));
let allowed_tools = if persisted.allowed_tools.is_empty() {
None
} else {
Some(persisted.allowed_tools)
};
let agent = SubAgent {
id: persisted.id.clone(),
session_name: persisted
.session_name
.filter(|name| !name.trim().is_empty())
.unwrap_or_else(|| persisted.id.clone()),
fork_context: persisted.fork_context,
workspace: persisted
.workspace
.unwrap_or_else(|| self.workspace.clone()),
agent_type: persisted.agent_type,
prompt: persisted.prompt,
assignment: persisted.assignment,
model: if persisted.model.is_empty() {
"unknown".to_string()
} else {
persisted.model
},
nickname: persisted.nickname,
status,
result: persisted.result,
steps_taken: persisted.steps_taken,
checkpoint: persisted.checkpoint,
needs_input: persisted.needs_input,
started_at,
last_activity_at: started_at,
allowed_tools,
session_boot_id: persisted.session_boot_id,
input_tx: None,
task_handle: None,
};
self.agents.insert(persisted.id, agent);
}
for worker in state.workers {
let worker = normalize_worker_record(worker);
self.worker_event_seq = self.worker_event_seq.max(
worker
.events
.iter()
.map(|event| event.seq)
.max()
.unwrap_or(0),
);
self.worker_records
.insert(worker.spec.worker_id.clone(), worker);
}
self.prune_worker_records();
Ok(())
}
fn sorted_worker_records(&self) -> Vec<AgentWorkerRecord> {
let mut workers: Vec<_> = self.worker_records.values().cloned().collect();
workers.sort_by(|a, b| {
b.updated_at_ms
.cmp(&a.updated_at_ms)
.then_with(|| a.spec.worker_id.cmp(&b.spec.worker_id))
});
workers
}
fn prune_worker_records(&mut self) {
if self.worker_records.len() <= MAX_AGENT_WORKER_RECORDS {
return;
}
let keep_ids: std::collections::HashSet<String> = self
.sorted_worker_records()
.into_iter()
.take(MAX_AGENT_WORKER_RECORDS)
.map(|record| record.spec.worker_id)
.collect();
self.worker_records
.retain(|worker_id, _| keep_ids.contains(worker_id));
}
pub fn register_worker(&mut self, spec: AgentWorkerSpec) {
let worker_id = spec.worker_id.clone();
let now_ms = epoch_millis_now();
let mut record = AgentWorkerRecord::new(normalize_worker_spec(spec), now_ms);
self.push_worker_event(
&mut record,
AgentWorkerStatus::Starting,
Some("starting".to_string()),
None,
None,
now_ms,
);
self.worker_records.insert(worker_id, record);
self.prune_worker_records();
}
pub fn list_worker_records(&self) -> Vec<AgentWorkerRecord> {
self.sorted_worker_records()
}
pub fn get_worker_record(&self, worker_id: &str) -> Option<AgentWorkerRecord> {
self.worker_records.get(worker_id).cloned()
}
fn push_worker_event(
&mut self,
record: &mut AgentWorkerRecord,
status: AgentWorkerStatus,
message: Option<String>,
step: Option<u32>,
tool_name: Option<String>,
now_ms: u64,
) {
self.worker_event_seq = self.worker_event_seq.saturating_add(1);
record.events.push_back(AgentWorkerEvent {
seq: self.worker_event_seq,
worker_id: record.spec.worker_id.clone(),
status,
timestamp_ms: now_ms,
message,
step,
tool_name,
});
while record.events.len() > MAX_AGENT_WORKER_EVENTS_PER_RECORD {
record.events.pop_front();
}
}
fn record_worker_event(
&mut self,
worker_id: &str,
status: AgentWorkerStatus,
message: Option<String>,
step: Option<u32>,
tool_name: Option<String>,
) {
let now_ms = epoch_millis_now();
let Some(mut record) = self.worker_records.remove(worker_id) else {
return;
};
record.status = status;
record.recommended_action = recommended_action_for_worker_status(status, &record.spec);
record.updated_at_ms = now_ms;
record.latest_message = message.clone();
if matches!(
status,
AgentWorkerStatus::Starting | AgentWorkerStatus::Running
) && record.started_at_ms.is_none()
{
record.started_at_ms = Some(now_ms);
}
if matches!(
status,
AgentWorkerStatus::Completed
| AgentWorkerStatus::Failed
| AgentWorkerStatus::Cancelled
| AgentWorkerStatus::Interrupted
) {
record.completed_at_ms = Some(now_ms);
}
if let Some(step) = step {
record.steps_taken = step;
}
self.push_worker_event(&mut record, status, message, step, tool_name, now_ms);
self.worker_records.insert(worker_id.to_string(), record);
}
fn record_worker_progress(&mut self, worker_id: &str, message: String) {
let (status, step, tool_name) = worker_progress_event_parts(&message);
self.record_worker_event(worker_id, status, Some(message), step, tool_name);
}
fn complete_worker_from_result(&mut self, worker_id: &str, result: &SubAgentResult) {
let status = worker_status_from_subagent_result(result);
let message = match &result.status {
SubAgentStatus::Completed => Some("completed".to_string()),
SubAgentStatus::Failed(err) => Some(err.clone()),
SubAgentStatus::Interrupted(reason) => Some(reason.clone()),
SubAgentStatus::Cancelled => Some("cancelled".to_string()),
SubAgentStatus::Running => Some("running".to_string()),
};
self.record_worker_event(worker_id, status, message, Some(result.steps_taken), None);
if let Some(record) = self.worker_records.get_mut(worker_id) {
record.result_summary = result.result.clone();
record.steps_taken = result.steps_taken;
if let SubAgentStatus::Failed(err) = &result.status {
record.error = Some(err.clone());
}
}
}
fn record_follow_up_delivery(
&mut self,
worker_id: &str,
delivered: bool,
message: Option<&str>,
reason: Option<&str>,
interrupt: bool,
continued_from_checkpoint: bool,
) {
let Some(record) = self.worker_records.get_mut(worker_id) else {
return;
};
let now_ms = epoch_millis_now();
record.updated_at_ms = now_ms;
record.follow_up.latest_delivery = Some(AgentRunFollowUpDelivery {
delivered,
timestamp_ms: now_ms,
message_preview: message.map(message_preview),
reason: reason.map(str::to_string),
interrupt,
continued_from_checkpoint,
});
if delivered {
record.latest_message = Some("follow-up delivered".to_string());
}
}
fn fail_worker(&mut self, worker_id: &str, error: String) {
self.record_worker_event(
worker_id,
AgentWorkerStatus::Failed,
Some(error.clone()),
None,
None,
);
if let Some(record) = self.worker_records.get_mut(worker_id) {
record.error = Some(error);
}
}
pub fn running_count(&self) -> usize {
self.agents
.values()
.filter(|agent| {
if agent.status != SubAgentStatus::Running {
return false;
}
if agent.task_handle.is_none() {
return false;
}
!self.running_heartbeat_timed_out(agent)
})
.count()
}
fn running_heartbeat_timed_out(&self, agent: &SubAgent) -> bool {
agent.status == SubAgentStatus::Running
&& agent.task_handle.is_some()
&& agent.last_activity_at.elapsed() >= self.running_heartbeat_timeout
}
pub fn touch(&mut self, agent_id: &str) -> bool {
let Some(agent) = self.agents.get_mut(agent_id) else {
return false;
};
if agent.status != SubAgentStatus::Running {
return false;
}
agent.last_activity_at = Instant::now();
true
}
pub fn spawn_background(
&mut self,
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
allowed_tools: Option<Vec<String>>,
) -> Result<SubAgentResult> {
self.spawn_background_with_assignment(
manager_handle,
runtime,
agent_type,
prompt.clone(),
SubAgentAssignment::new(prompt, None),
allowed_tools,
)
}
pub fn spawn_background_with_assignment(
&mut self,
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
) -> Result<SubAgentResult> {
self.spawn_background_with_assignment_options(
manager_handle,
runtime,
agent_type,
prompt,
assignment,
allowed_tools,
SubAgentSpawnOptions::default(),
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn_background_with_assignment_options(
&mut self,
manager_handle: SharedSubAgentManager,
mut runtime: SubAgentRuntime,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
options: SubAgentSpawnOptions,
) -> Result<SubAgentResult> {
self.cleanup(COMPLETED_AGENT_RETENTION);
if self.running_count() >= self.max_agents {
return Err(anyhow!(
"Sub-agent limit reached (max {}, running {}). Cancel, close, or wait for an existing agent to finish. Consider issuing multiple tool calls in one turn (the dispatcher runs them in parallel) for parallel one-shot work.",
self.max_agents,
self.running_count()
));
}
if let Some(model) = options.model.as_deref() {
runtime.model = model.to_string();
}
let effective_model = runtime.model.clone();
let agent_id = format!("agent_{}", &Uuid::new_v4().to_string()[..8]);
let active_names: std::collections::HashSet<String> = self
.agents
.values()
.filter_map(|a| a.nickname.clone())
.collect();
let nickname = options
.nickname
.or_else(|| Some(assign_unique_whale_name(&agent_id, &active_names)));
let tools = build_allowed_tools(&agent_type, allowed_tools, runtime.allow_shell)?;
let (input_tx, input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
agent_id.clone(),
agent_type.clone(),
prompt.clone(),
assignment.clone(),
effective_model,
nickname,
tools.clone(),
input_tx,
runtime.context.workspace.clone(),
self.current_session_boot_id.clone(),
);
if let Some(name) = options
.name
.as_deref()
.map(str::trim)
.filter(|name| !name.is_empty())
{
if let Some(existing) = self
.agents
.values()
.find(|existing| existing.session_name == name)
{
let elapsed = existing.started_at.elapsed();
let since = if elapsed.as_secs() < 120 {
format!("{}s ago", elapsed.as_secs())
} else {
let mins = elapsed.as_secs() / 60;
let secs = elapsed.as_secs() % 60;
format!("{mins}m{secs}s ago")
};
return Err(anyhow!(
"Sub-agent session name '{name}' is already in use by agent_id '{}' \
(status: {}, started {since}). \
Wait for its completion event, or open a new agent with a different name.",
existing.id,
subagent_status_name(&existing.status)
));
}
agent.session_name = name.to_string();
}
agent.fork_context = options.fork_context;
let agent_id = agent.id.clone();
let started_at = agent.started_at;
let max_steps = self.max_steps;
let tool_profile = match tools.clone() {
Some(tools) => AgentWorkerToolProfile::Explicit(tools),
None => AgentWorkerToolProfile::Inherited,
};
let runtime_profile = worker_profile_for_spawn(
&runtime,
&agent_type,
&tool_profile,
&agent.model,
options.model_route.clone(),
);
runtime.worker_profile = runtime_profile.clone();
let worker_spec = AgentWorkerSpec {
worker_id: agent_id.clone(),
run_id: agent_id.clone(),
parent_run_id: None,
session_name: Some(agent.session_name.clone()),
objective: assignment.objective.clone(),
role: assignment.role.clone(),
agent_type: agent_type.clone(),
model: agent.model.clone(),
workspace: agent.workspace.clone(),
git_branch: current_git_branch(&agent.workspace),
context_mode: if options.fork_context {
"forked"
} else {
"fresh"
}
.to_string(),
fork_context: options.fork_context,
tool_profile,
runtime_profile,
max_steps,
spawn_depth: runtime.spawn_depth,
max_spawn_depth: runtime.max_spawn_depth,
};
self.register_worker(worker_spec);
if let Some(event_tx) = runtime.event_tx.clone() {
let _ = event_tx.try_send(Event::AgentSpawned {
id: agent_id.clone(),
prompt: prompt.clone(),
});
}
let launch_gate = (runtime.spawn_depth == 1).then(|| self.launch_gate.clone());
let task = SubAgentTask {
manager_handle,
runtime,
agent_id: agent_id.clone(),
agent_type,
prompt,
assignment,
allowed_tools: tools,
fork_context: options.fork_context,
started_at,
max_steps,
input_rx,
launch_gate,
};
let handle = spawn_supervised(
"subagent-task",
std::panic::Location::caller(),
run_subagent_task(task),
);
agent.task_handle = Some(handle);
self.agents.insert(agent_id.clone(), agent);
self.persist_state_best_effort();
Ok(self
.agents
.get(&agent_id)
.expect("agent should exist after spawn")
.snapshot())
}
pub fn get_result(&self, agent_id: &str) -> Result<SubAgentResult> {
let agent = self
.agents
.get(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
Ok(agent.snapshot())
}
fn resolve_agent_ref(&self, agent_ref: &str) -> Result<String> {
let agent_ref = agent_ref.trim();
if self.agents.contains_key(agent_ref) {
return Ok(agent_ref.to_string());
}
let matches = self
.agents
.values()
.filter(|agent| agent.session_name == agent_ref)
.map(|agent| agent.id.clone())
.collect::<Vec<_>>();
match matches.as_slice() {
[id] => Ok(id.clone()),
[] => Err(anyhow!("Agent session {agent_ref} not found")),
_ => Err(anyhow!(
"Agent session name '{agent_ref}' is ambiguous; use an agent_id"
)),
}
}
#[must_use]
fn snapshot_for_listing(&self, agent: &SubAgent) -> SubAgentResult {
let mut snap = agent.snapshot();
snap.from_prior_session = self.is_from_prior_session(agent);
snap.worker_status = self
.worker_records
.get(&agent.id)
.map(|record| record.status);
snap
}
pub fn list(&self) -> Vec<SubAgentResult> {
self.agents
.values()
.map(|agent| self.snapshot_for_listing(agent))
.collect()
}
pub fn list_filtered(&self, include_archived: bool) -> Vec<SubAgentResult> {
self.agents
.values()
.filter(|agent| {
if include_archived {
return true;
}
if agent.status == SubAgentStatus::Running {
return true;
}
!self.is_from_prior_session(agent)
})
.map(|agent| self.snapshot_for_listing(agent))
.collect()
}
pub fn cleanup(&mut self, max_age: Duration) -> usize {
let before = self.agents.len();
let mut auto_cancelled = 0;
let timeout = self.running_heartbeat_timeout;
let mut worker_cancellations = Vec::new();
for agent in self.agents.values_mut() {
if agent.status == SubAgentStatus::Running
&& agent.task_handle.is_some()
&& agent.last_activity_at.elapsed() >= timeout
{
tracing::warn!(
target: "subagent",
agent_id = %agent.id,
timeout_secs = timeout.as_secs(),
"auto-cancelling stale sub-agent with no manager-visible progress"
);
agent.status = SubAgentStatus::Cancelled;
agent.result = Some(format!(
"Auto-cancelled after {}s without sub-agent progress.",
timeout.as_secs()
));
release_resident_leases_for(&agent.id);
if let Some(handle) = agent.task_handle.take() {
handle.abort();
}
agent.input_tx = None;
worker_cancellations.push((
agent.id.clone(),
agent.result.clone(),
agent.steps_taken,
));
auto_cancelled += 1;
}
}
for (agent_id, message, steps_taken) in worker_cancellations {
self.record_worker_event(
&agent_id,
AgentWorkerStatus::Cancelled,
message,
Some(steps_taken),
None,
);
}
self.agents.retain(|_, agent| {
if agent.status == SubAgentStatus::Running {
true
} else {
agent.started_at.elapsed() < max_age
}
});
if self.agents.len() != before || auto_cancelled > 0 {
self.persist_state_best_effort();
}
auto_cancelled
}
fn update_from_result(&mut self, agent_id: &str, result: SubAgentResult) {
let mut changed = false;
if let Some(agent) = self.agents.get_mut(agent_id) {
agent.status = result.status.clone();
agent.assignment = result.assignment.clone();
agent.result = result.result.clone();
agent.steps_taken = result.steps_taken;
agent.checkpoint = result.checkpoint.clone();
agent.needs_input = result.needs_input.clone();
if result.status != SubAgentStatus::Running {
agent.input_tx = None;
}
agent.task_handle = None;
changed = true;
}
self.complete_worker_from_result(agent_id, &result);
if changed {
self.persist_state_best_effort();
}
}
fn update_failed(&mut self, agent_id: &str, error: String) {
let mut changed = false;
if let Some(agent) = self.agents.get_mut(agent_id) {
agent.status = SubAgentStatus::Failed(error.clone());
release_resident_leases_for(agent_id);
agent.input_tx = None;
agent.task_handle = None;
changed = true;
}
self.fail_worker(agent_id, error);
if changed {
self.persist_state_best_effort();
}
}
fn update_checkpoint(&mut self, agent_id: &str, checkpoint: SubAgentCheckpoint) -> bool {
let Some(agent) = self.agents.get_mut(agent_id) else {
return false;
};
agent.steps_taken = checkpoint.steps_taken;
agent.checkpoint = Some(checkpoint);
agent.last_activity_at = Instant::now();
self.persist_state_debounced();
true
}
fn interrupt_with_checkpoint(
&mut self,
agent_id: &str,
reason: String,
checkpoint: SubAgentCheckpoint,
needs_input: Option<SubAgentNeedsInput>,
) -> Result<SubAgentResult> {
let snapshot = {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
agent.status = SubAgentStatus::Interrupted(reason.clone());
agent.result = Some(reason);
agent.steps_taken = checkpoint.steps_taken;
agent.checkpoint = Some(checkpoint);
agent.needs_input = needs_input;
agent.last_activity_at = Instant::now();
release_resident_leases_for(agent_id);
agent.snapshot()
};
self.record_worker_event(
agent_id,
AgentWorkerStatus::Interrupted,
snapshot.result.clone(),
Some(snapshot.steps_taken),
None,
);
self.persist_state_best_effort();
Ok(snapshot)
}
fn continue_checkpointed(
&mut self,
agent_id: &str,
message: Option<String>,
interrupt: bool,
) -> Result<SubAgentResult> {
let snapshot = {
let agent = self
.agents
.get_mut(agent_id)
.ok_or_else(|| anyhow!("Agent {agent_id} not found"))?;
if !matches!(agent.status, SubAgentStatus::Interrupted(_)) {
return Err(anyhow!(
"Agent {agent_id} is not interrupted; checkpoint continuation is only available for interrupted sessions"
));
}
let checkpoint = agent
.checkpoint
.as_ref()
.ok_or_else(|| anyhow!("Agent {agent_id} has no checkpoint to continue"))?;
if !checkpoint.continuable || checkpoint.messages.is_empty() {
return Err(anyhow!("Agent {agent_id} checkpoint is not continuable"));
}
let tx = agent.input_tx.as_ref().ok_or_else(|| {
anyhow!(
"Agent {agent_id} checkpoint is persisted, but no live child task is available to continue"
)
})?;
tx.send(SubAgentInput {
text: message.unwrap_or_default(),
interrupt,
})
.map_err(|_| anyhow!("Failed to continue checkpointed agent {agent_id}"))?;
agent.status = SubAgentStatus::Running;
agent.result = None;
agent.last_activity_at = Instant::now();
agent.snapshot()
};
self.record_worker_event(
agent_id,
AgentWorkerStatus::Running,
Some("continued from checkpoint".to_string()),
Some(snapshot.steps_taken),
None,
);
self.persist_state_best_effort();
Ok(snapshot)
}
}
pub type SharedSubAgentManager = Arc<RwLock<SubAgentManager>>;
pub fn load_persisted_agent_worker_records(workspace: &Path) -> Result<Vec<AgentWorkerRecord>> {
let mut manager = SubAgentManager::new(workspace.to_path_buf(), 1)
.with_state_path(default_state_path(workspace));
manager.load_state()?;
Ok(manager.list_worker_records())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentSessionProjection {
pub name: String,
pub agent_id: String,
#[serde(default)]
pub run_id: String,
pub status: String,
pub terminal: bool,
pub context_mode: String,
pub fork_context: bool,
pub prefix_cache: SubAgentPrefixCacheProjection,
pub transcript_handle: VarHandle,
#[serde(default = "default_agent_run_follow_up")]
pub follow_up: AgentRunFollowUpTarget,
#[serde(default = "default_agent_run_takeover")]
pub takeover: AgentRunTakeoverTarget,
#[serde(default)]
pub artifacts: Vec<AgentRunArtifactRef>,
#[serde(default = "default_agent_run_usage")]
pub usage: AgentRunUsage,
#[serde(default = "default_agent_run_verification")]
pub verification: AgentRunVerificationSummary,
pub snapshot: SubAgentResult,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint: Option<SubAgentCheckpoint>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub needs_input: Option<SubAgentNeedsInput>,
#[serde(default, skip_serializing_if = "is_false")]
pub continuable: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub needs_continuation: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub timed_out: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub timed_out_with_checkpoint: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_record: Option<AgentWorkerRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentPrefixCacheProjection {
pub mode: String,
pub parent_prefix: String,
pub deepseek_prefix_cache_reuse: String,
}
fn subagent_prefix_cache_projection(snapshot: &SubAgentResult) -> SubAgentPrefixCacheProjection {
if snapshot.fork_context {
SubAgentPrefixCacheProjection {
mode: "forked".to_string(),
parent_prefix: "preserved_byte_identical_when_available".to_string(),
deepseek_prefix_cache_reuse: "optimized_for_existing_parent_prefill".to_string(),
}
} else {
SubAgentPrefixCacheProjection {
mode: "fresh".to_string(),
parent_prefix: "not_inherited".to_string(),
deepseek_prefix_cache_reuse: "independent_child_prefill".to_string(),
}
}
}
fn subagent_checkpoint_is_continuable(snapshot: &SubAgentResult) -> bool {
matches!(snapshot.status, SubAgentStatus::Interrupted(_))
&& snapshot
.checkpoint
.as_ref()
.is_some_and(|checkpoint| checkpoint.continuable && !checkpoint.messages.is_empty())
}
async fn subagent_session_projection(
snapshot: SubAgentResult,
timed_out: bool,
context: &ToolContext,
worker_record: Option<AgentWorkerRecord>,
) -> SubAgentSessionProjection {
let transcript_session_id = format!("agent:{}", snapshot.agent_id);
let continuable = subagent_checkpoint_is_continuable(&snapshot);
let transcript_payload = json!({
"kind": "subagent_session_snapshot",
"agent_id": snapshot.agent_id.clone(),
"name": snapshot.name.clone(),
"status": subagent_status_name(&snapshot.status),
"context_mode": snapshot.context_mode.clone(),
"fork_context": snapshot.fork_context,
"result": snapshot.result.clone(),
"steps_taken": snapshot.steps_taken,
"duration_ms": snapshot.duration_ms,
"assignment": snapshot.assignment.clone(),
"checkpoint": snapshot.checkpoint.clone(),
"needs_input": snapshot.needs_input.clone(),
"needs_continuation": continuable,
"timed_out_with_checkpoint": timed_out && continuable,
"snapshot": snapshot.clone(),
});
let transcript_handle = {
let mut store = context.runtime.handle_store.lock().await;
let full_transcript_lookup = VarHandle {
kind: "var_handle".to_string(),
session_id: transcript_session_id.clone(),
name: "full_transcript".to_string(),
type_name: String::new(),
length: 0,
repr_preview: String::new(),
sha256: String::new(),
};
if snapshot.status != SubAgentStatus::Running
&& let Some(record) = store.get(&full_transcript_lookup)
{
record.handle.clone()
} else {
store.insert_json(transcript_session_id, "transcript", transcript_payload)
}
};
let run_id = worker_record
.as_ref()
.map(|record| agent_worker_run_id(&record.spec))
.unwrap_or_else(|| snapshot.agent_id.clone());
let follow_up = worker_record
.as_ref()
.map(|record| record.follow_up.clone())
.unwrap_or_else(|| AgentRunFollowUpTarget {
tool: default_agent_inspect_tool(),
agent_id: snapshot.agent_id.clone(),
session_name: Some(snapshot.name.clone()),
accepted_statuses: vec!["running".to_string(), "interrupted_continuable".to_string()],
latest_delivery: None,
});
let takeover = worker_record
.as_ref()
.map(|record| record.takeover.clone())
.unwrap_or_else(|| AgentRunTakeoverTarget {
kind: default_subagent_takeover_kind(),
supported: true,
agent_id: snapshot.agent_id.clone(),
session_name: Some(snapshot.name.clone()),
instructions: format!(
"Inspect agent '{}' through the returned transcript_handle with handle_read; open a replacement with agent if the lane no longer fits.",
snapshot.agent_id
),
unsupported_reason: None,
});
let artifacts = worker_record
.as_ref()
.map(|record| record.artifacts.clone())
.unwrap_or_else(|| default_subagent_artifacts(&run_id));
let usage = worker_record
.as_ref()
.map(|record| record.usage.clone())
.unwrap_or_else(default_agent_run_usage);
let verification = worker_record
.as_ref()
.map(|record| record.verification.clone())
.unwrap_or_else(default_agent_run_verification);
let status = worker_record
.as_ref()
.map(|record| agent_worker_status_name(record.status))
.unwrap_or_else(|| agent_worker_status_name(worker_status_from_subagent_result(&snapshot)))
.to_string();
SubAgentSessionProjection {
name: snapshot.name.clone(),
agent_id: snapshot.agent_id.clone(),
run_id,
status,
terminal: snapshot.status != SubAgentStatus::Running,
context_mode: snapshot.context_mode.clone(),
fork_context: snapshot.fork_context,
prefix_cache: subagent_prefix_cache_projection(&snapshot),
transcript_handle,
follow_up,
takeover,
artifacts,
usage,
verification,
checkpoint: snapshot.checkpoint.clone(),
needs_input: snapshot.needs_input.clone(),
continuable: subagent_checkpoint_is_continuable(&snapshot),
needs_continuation: continuable,
snapshot,
timed_out,
timed_out_with_checkpoint: timed_out && continuable,
worker_record,
}
}
fn default_state_path(workspace: &Path) -> PathBuf {
let primary = workspace.join(".codewhale").join("state");
if primary.exists() {
return primary.join(SUBAGENT_STATE_FILE);
}
workspace
.join(".deepseek")
.join("state")
.join(SUBAGENT_STATE_FILE)
}
fn epoch_millis_now() -> u64 {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
Err(_) => 0,
}
}
fn instant_from_duration(duration: Duration) -> Instant {
Instant::now()
.checked_sub(duration)
.unwrap_or_else(Instant::now)
}
fn write_json_atomic<T: Serialize>(path: &Path, value: &T) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let payload = serde_json::to_string_pretty(value)?;
let tmp_path = path.with_extension("tmp");
fs::write(&tmp_path, payload)?;
fs::rename(tmp_path, path)?;
Ok(())
}
#[must_use]
pub fn new_shared_subagent_manager(workspace: PathBuf, max_agents: usize) -> SharedSubAgentManager {
new_shared_subagent_manager_with_timeout(
workspace,
max_agents,
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS),
max_agents,
)
}
#[must_use]
pub fn new_shared_subagent_manager_with_timeout(
workspace: PathBuf,
max_agents: usize,
running_heartbeat_timeout: Duration,
launch_concurrency: usize,
) -> SharedSubAgentManager {
let max_agents = max_agents.clamp(1, MAX_SUBAGENTS);
let state_path = default_state_path(&workspace);
let mut manager = SubAgentManager::new(workspace, max_agents)
.with_running_heartbeat_timeout(running_heartbeat_timeout)
.with_launch_concurrency(launch_concurrency)
.with_state_path(state_path);
if let Err(err) = manager.load_state() {
tracing::warn!(target: "subagent", ?err, "failed to load sub-agent state");
}
Arc::new(RwLock::new(manager))
}
pub struct AgentTool {
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
}
impl AgentTool {
#[must_use]
pub fn new(manager: SharedSubAgentManager, runtime: SubAgentRuntime) -> Self {
Self { manager, runtime }
}
}
#[async_trait]
impl ToolSpec for AgentTool {
fn name(&self) -> &'static str {
"agent"
}
fn description(&self) -> &'static str {
concat!(
"Start one focused child agent task. Use this only for independent work that benefits from a clean context. ",
"The child runs in the background and reports back automatically when finished; keep tiny reads/searches local. ",
"Returns a session projection with the generated agent_id and transcript_handle for UI/debug inspection."
)
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Optional stable session name. Defaults to the generated agent_id."
},
"prompt": {
"type": "string",
"description": "Focused task for the child agent"
},
"type": {
"type": "string",
"description": SUBAGENT_TYPE_DESCRIPTION
},
"model_strength": {
"type": "string",
"enum": ["same", "faster"],
"description": "Optional child model strength. Use same (default) when the child should be as capable as the current model. Use faster for type=explore, read-only lookup/search, status, or other low-risk tasks that can run on a smaller/faster same-family sibling; CodeWhale maps known families such as DeepSeek V4 Pro to Flash and GLM-5.2 to GLM-5.1. No hidden auto-downgrade happens."
},
"model": {
"type": "string",
"description": "Optional exact provider model id for the child. Overrides model_strength. Prefer model_strength unless you know the provider-specific id."
},
"thinking": {
"type": "string",
"enum": ["inherit", "auto", "off", "low", "medium", "high", "max"],
"description": "Optional child thinking budget. inherit (default) follows the parent thinking mode. auto chooses from the child prompt. off is best for faster explore/lookups. high is for normal reasoning. max is for hard design/debug/release/security work. Explicit thinking overrides the default off used by model_strength=faster."
},
"cwd": {
"type": "string",
"description": "Optional working directory for the child; must be inside the parent workspace"
},
"fork_context": {
"type": "boolean",
"description": "false (default): fresh child context. true: include the current parent context prefix when the child needs it."
},
"max_depth": {
"type": "integer",
"minimum": 0,
"maximum": 3,
"description": "Optional remaining nested-agent depth budget for this child. Defaults to the configured runtime budget."
}
},
"required": ["prompt"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<ToolResult, ToolError> {
let snapshot =
spawn_subagent_from_input(input, self.manager.clone(), self.runtime.clone()).await?;
let worker_record = {
let manager = self.manager.read().await;
manager.get_worker_record(&snapshot.agent_id)
};
let projection = subagent_session_projection(snapshot, false, context, worker_record).await;
let mut tool_result = ToolResult::json(&projection)
.map_err(|e| ToolError::execution_failed(e.to_string()))?;
tool_result.metadata = Some(json!({
"status": projection.status,
"terminal": projection.terminal,
"context_mode": projection.context_mode,
"prefix_cache": projection.prefix_cache,
}));
Ok(tool_result)
}
}
async fn spawn_subagent_from_input(
input: Value,
manager: SharedSubAgentManager,
runtime: SubAgentRuntime,
) -> Result<SubAgentResult, ToolError> {
let spawn_request = parse_spawn_request(&input)?;
if runtime.would_exceed_depth() {
return Err(ToolError::execution_failed(format!(
"Sub-agent depth limit reached (current depth {}, max {}). \
Increase via [runtime] max_spawn_depth in config.toml.",
runtime.spawn_depth, runtime.max_spawn_depth
)));
}
let validated_cwd = if let Some(requested_cwd) = spawn_request.cwd.as_ref() {
let parent_workspace = &runtime.context.workspace;
let resolved = if requested_cwd.is_absolute() {
requested_cwd.clone()
} else {
parent_workspace.join(requested_cwd)
};
let canonical = resolved.canonicalize().map_err(|e| {
ToolError::invalid_input(format!(
"Invalid cwd '{}': {e} (path may not exist yet — create the worktree first)",
requested_cwd.display()
))
})?;
let workspace_canonical = parent_workspace
.canonicalize()
.unwrap_or_else(|_| parent_workspace.clone());
if !canonical.starts_with(&workspace_canonical) {
return Err(ToolError::invalid_input(format!(
"cwd must be inside the parent workspace: {} is not under {}",
canonical.display(),
workspace_canonical.display()
)));
}
Some(canonical)
} else {
None
};
let mut child_runtime = runtime.background_runtime();
if let Some(max_depth) = spawn_request.max_depth {
child_runtime.max_spawn_depth = child_runtime.spawn_depth.saturating_add(max_depth);
}
if let Some(cwd) = validated_cwd {
child_runtime.context.workspace = cwd;
}
let configured_model = match spawn_request.model.clone() {
Some(model) => Some(normalize_requested_subagent_model(
&model,
"model",
runtime.client.api_provider(),
)?),
None => configured_model_for_role_or_type(
&runtime,
spawn_request.assignment.role.as_deref(),
&spawn_request.agent_type,
)?,
};
let (effective_prompt, _resident_conflict) =
if let Some(ref file_path) = spawn_request.resident_file {
let abs_path = if std::path::Path::new(file_path).is_absolute() {
std::path::PathBuf::from(file_path)
} else {
runtime.context.workspace.join(file_path)
};
let file_contents = std::fs::read_to_string(&abs_path)
.unwrap_or_else(|e| format!("<!-- resident_file read error: {e} -->"));
let prefixed = format!(
"<!-- resident_file: {file_path} -->\n```\n{file_contents}\n```\n\n{}",
spawn_request.prompt
);
let conflict = {
let leases = RESIDENT_LEASES.get_or_init(|| std::sync::Mutex::new(HashMap::new()));
let mut guard = leases.lock().unwrap_or_else(|p| p.into_inner());
if let Some(owner) = guard.get(file_path) {
Some(format!(
"Warning: agent {owner} already holds a resident lease on {file_path}"
))
} else {
guard.insert(file_path.clone(), "pending".to_string());
None
}
};
(prefixed, conflict)
} else {
(spawn_request.prompt, None)
};
let route = resolve_subagent_assignment_route(
&runtime,
configured_model,
&effective_prompt,
&spawn_request.agent_type,
spawn_request.model_strength.model_route(),
spawn_request.thinking,
)
.await;
child_runtime.model = route.model.clone();
child_runtime.reasoning_effort = route.reasoning_effort.clone();
child_runtime.reasoning_effort_auto = false;
let effective_model = route.model;
let model_route = route.model_route;
let mut manager_guard = manager.write().await;
let result = manager_guard
.spawn_background_with_assignment_options(
Arc::clone(&manager),
child_runtime,
spawn_request.agent_type,
effective_prompt,
spawn_request.assignment,
spawn_request.allowed_tools,
SubAgentSpawnOptions {
name: spawn_request.session_name.clone(),
model: Some(effective_model),
model_route: Some(model_route),
nickname: None,
fork_context: spawn_request.fork_context,
},
)
.map_err(|e| ToolError::execution_failed(format!("Failed to spawn sub-agent: {e}")))?;
if let Some(ref file_path) = spawn_request.resident_file
&& let Some(lock) = RESIDENT_LEASES.get()
&& let Ok(mut guard) = lock.lock()
&& let Some(owner) = guard.get_mut(file_path)
&& owner == "pending"
{
*owner = result.agent_id.clone();
}
Ok(result)
}
fn build_subagent_system_prompt(
agent_type: &SubAgentType,
assignment: &SubAgentAssignment,
) -> String {
let base = agent_type.system_prompt();
let mut prompt = match assignment.role.as_deref() {
Some(role) if !role.trim().is_empty() => {
format!(
"{base}\n\nYou are operating in the role of `{}`.",
role.trim()
)
}
_ => base,
};
prompt.push_str(
"\n\nYou are a background sub-agent: every instruction comes from the orchestrating agent, not a human. Never address the end user or ask them questions — do the assigned work and report results back to the orchestrator.",
);
prompt
}
fn subagent_request_system_prompt(
subagent_system_prompt: &str,
fork_context: Option<&SubAgentForkContext>,
) -> SystemPrompt {
fork_context
.and_then(|context| context.system.clone())
.unwrap_or_else(|| SystemPrompt::Text(subagent_system_prompt.to_string()))
}
fn build_initial_subagent_messages(
prompt: &str,
assignment: &SubAgentAssignment,
agent_type: &SubAgentType,
fork_context: Option<&SubAgentForkContext>,
) -> Vec<Message> {
let mut messages = fork_context
.map(|context| context.messages.clone())
.unwrap_or_default();
if let Some(context) = fork_context {
if let Some(state) = context
.structured_state_block
.as_deref()
.map(str::trim)
.filter(|state| !state.is_empty())
{
messages.push(system_text_message(format!(
"<codewhale:fork_state>\n{state}\n</codewhale:fork_state>"
)));
}
messages.push(system_text_message(format!(
"<codewhale:subagent_context>\n{}\n</codewhale:subagent_context>",
build_subagent_system_prompt(agent_type, assignment)
)));
}
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: build_assignment_prompt(prompt, assignment, agent_type),
cache_control: None,
}],
});
messages
}
fn system_text_message(text: String) -> Message {
Message {
role: "system".to_string(),
content: vec![ContentBlock::Text {
text,
cache_control: None,
}],
}
}
struct SubAgentTask {
manager_handle: SharedSubAgentManager,
runtime: SubAgentRuntime,
agent_id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
fork_context: bool,
started_at: Instant,
max_steps: u32,
input_rx: mpsc::UnboundedReceiver<SubAgentInput>,
launch_gate: Option<Arc<Semaphore>>,
}
#[allow(clippy::too_many_lines)]
async fn run_subagent_task(task: SubAgentTask) {
let mut _launch_permit = None;
if let Some(gate) = task.launch_gate.as_ref() {
match Arc::clone(gate).try_acquire_owned() {
Ok(permit) => _launch_permit = Some(permit),
Err(tokio::sync::TryAcquireError::NoPermits) => {
_launch_permit = acquire_queued_launch_permit(&task, Arc::clone(gate)).await;
}
Err(tokio::sync::TryAcquireError::Closed) => {
crate::logging::warn(format!(
"sub-agent launch gate closed for {}; proceeding without backpressure",
task.agent_id
));
}
}
}
let result = run_subagent(
&task.runtime,
task.agent_id.clone(),
task.agent_type,
task.prompt,
task.assignment,
task.allowed_tools,
task.fork_context,
task.started_at,
task.max_steps,
task.input_rx,
)
.await;
let model_id = task.runtime.model.clone();
let (summary, sentinel) = match &result {
Ok(res) => (
summarize_subagent_result(res),
subagent_done_sentinel(&task.agent_id, res),
),
Err(err) => {
let annotated = annotate_child_model_error(&err.to_string(), &model_id);
(
format!("Failed: {annotated}"),
subagent_failed_sentinel(&task.agent_id, &annotated),
)
}
};
if let Some(mb) = task.runtime.mailbox.as_ref() {
let envelope = match &result {
Ok(_) => MailboxMessage::Completed {
agent_id: task.agent_id.clone(),
summary: summary.clone(),
},
Err(err) => MailboxMessage::Failed {
agent_id: task.agent_id.clone(),
error: annotate_child_model_error(&err.to_string(), &model_id),
},
};
let _ = mb.send(envelope);
}
let payload = format!("{summary}\n{sentinel}");
let agent_id = task.agent_id.clone();
emit_parent_completion(&task.runtime, &agent_id, &payload);
let mut manager = task.manager_handle.write().await;
match &result {
Ok(res) => manager.update_from_result(&agent_id, res.clone()),
Err(err) => {
manager.update_failed(
&agent_id,
annotate_child_model_error(&err.to_string(), &model_id),
);
}
}
if let Some(event_tx) = task.runtime.event_tx {
let _ = event_tx.try_send(Event::AgentComplete {
id: agent_id.clone(),
result: payload,
});
}
}
async fn acquire_queued_launch_permit(
task: &SubAgentTask,
gate: Arc<Semaphore>,
) -> Option<tokio::sync::OwnedSemaphorePermit> {
record_queued_launch_progress(task).await;
tokio::select! {
biased;
() = task.runtime.cancel_token.cancelled() => {
record_agent_progress(
&task.runtime,
&task.agent_id,
"cancelled while queued for a sub-agent launch slot".to_string(),
);
None
}
permit = Arc::clone(&gate).acquire_owned() => {
permit.ok()
}
}
}
async fn record_queued_launch_progress(task: &SubAgentTask) {
{
let mut manager = task.runtime.manager.write().await;
manager.touch(&task.agent_id);
manager.record_worker_event(
&task.agent_id,
AgentWorkerStatus::Queued,
Some(SUBAGENT_QUEUED_LAUNCH_REASON.to_string()),
None,
None,
);
}
emit_agent_progress(
task.runtime.event_tx.as_ref(),
task.runtime.mailbox.as_ref(),
&task.agent_id,
SUBAGENT_QUEUED_LAUNCH_REASON.to_string(),
);
}
pub(crate) fn emit_parent_completion(
runtime: &SubAgentRuntime,
agent_id: &str,
payload: &str,
) -> bool {
if runtime.spawn_depth != 1 {
return false;
}
let Some(tx) = runtime.parent_completion_tx.as_ref() else {
return false;
};
let _ = tx.send(SubAgentCompletion {
agent_id: agent_id.to_string(),
payload: payload.to_string(),
});
true
}
fn subagent_done_sentinel(agent_id: &str, res: &SubAgentResult) -> String {
let mut payload = json!({
"agent_id": agent_id,
"name": res.nickname,
"agent_type": res.agent_type.as_str(),
"status": subagent_status_name(&res.status),
"summary_location": "previous_line",
});
if let Some(needs_input) = res.needs_input.clone() {
payload["needs_input"] = json!(needs_input);
}
format!("<codewhale:subagent.done>{payload}</codewhale:subagent.done>")
}
fn subagent_failed_sentinel(agent_id: &str, _err: &str) -> String {
let payload = json!({
"agent_id": agent_id,
"status": "failed",
"error_location": "previous_line",
});
format!("<codewhale:subagent.done>{payload}</codewhale:subagent.done>")
}
fn response_was_truncated(response: &MessageResponse) -> bool {
response.stop_reason.as_deref() == Some("length")
}
fn truncated_response_tool_results(tool_uses: &[(String, String, Value)]) -> Vec<ContentBlock> {
tool_uses
.iter()
.map(|(tool_id, tool_name, _)| ContentBlock::ToolResult {
tool_use_id: tool_id.clone(),
content: format!(
"Error: the model response was truncated by max_tokens before the tool call arguments for '{tool_name}' could be fully generated. Split large content into smaller writes and retry."
),
is_error: Some(true),
content_blocks: None,
})
.collect()
}
fn truncated_response_text_retry_message() -> Vec<ContentBlock> {
vec![ContentBlock::Text {
text: "Error: the model response was truncated by max_tokens. No complete tool call was available, so the partial response was not accepted as the sub-agent result. Retry with a shorter response or split the work into smaller steps.".to_string(),
cache_control: None,
}]
}
fn record_truncated_subagent_response(consecutive: &mut u32) -> Result<()> {
*consecutive = consecutive.saturating_add(1);
if *consecutive > MAX_CONSECUTIVE_TRUNCATED_SUBAGENT_RESPONSES {
return Err(anyhow!(
"Sub-agent response was truncated by max_tokens {count} consecutive times; stopping to avoid an unbounded retry loop.",
count = *consecutive
));
}
Ok(())
}
fn reset_truncated_subagent_responses(consecutive: &mut u32) {
*consecutive = 0;
}
#[allow(clippy::too_many_arguments)]
async fn insert_subagent_full_transcript_handle(
runtime: &SubAgentRuntime,
agent_id: &str,
agent_type: &SubAgentType,
assignment: &SubAgentAssignment,
status: &SubAgentStatus,
result: Option<&String>,
checkpoint: Option<&SubAgentCheckpoint>,
messages: &[Message],
steps_taken: u32,
duration_ms: u64,
fork_context: bool,
) -> VarHandle {
let payload = json!({
"kind": "subagent_full_transcript",
"agent_id": agent_id,
"agent_type": agent_type.as_str(),
"status": subagent_status_name(status),
"context_mode": if fork_context { "forked" } else { "fresh" },
"fork_context": fork_context,
"result": result,
"steps_taken": steps_taken,
"duration_ms": duration_ms,
"assignment": assignment,
"checkpoint": checkpoint,
"messages": messages,
});
let mut store = runtime.context.runtime.handle_store.lock().await;
store.insert_json(format!("agent:{agent_id}"), "full_transcript", payload)
}
fn build_subagent_checkpoint(
agent_id: &str,
reason: impl Into<String>,
messages: &[Message],
steps_taken: u32,
continuable: bool,
) -> SubAgentCheckpoint {
let created_at_ms = epoch_millis_now();
let checkpoint_id = format!("{agent_id}:step:{steps_taken}:ts:{created_at_ms}");
SubAgentCheckpoint {
checkpoint_id: checkpoint_id.clone(),
agent_id: agent_id.to_string(),
continuation_handle: format!("agent:{agent_id}:checkpoint:{checkpoint_id}"),
reason: reason.into(),
continuable,
steps_taken,
message_count: messages.len(),
created_at_ms,
messages: messages.to_vec(),
}
}
async fn checkpoint_subagent_progress(
runtime: &SubAgentRuntime,
agent_id: &str,
reason: impl Into<String>,
messages: &[Message],
steps_taken: u32,
continuable: bool,
) -> SubAgentCheckpoint {
let checkpoint =
build_subagent_checkpoint(agent_id, reason, messages, steps_taken, continuable);
let mut manager = runtime.manager.write().await;
manager.update_checkpoint(agent_id, checkpoint.clone());
checkpoint
}
fn needs_input_for_interrupted_checkpoint(
reason: &str,
checkpoint: &SubAgentCheckpoint,
) -> SubAgentNeedsInput {
SubAgentNeedsInput {
question: format!(
"Sub-agent interrupted before completion ({reason}). Re-dispatch this worker or provide explicit follow-up using checkpoint {}.",
checkpoint.continuation_handle
),
}
}
fn record_agent_progress(runtime: &SubAgentRuntime, agent_id: &str, message: impl Into<String>) {
let message = message.into();
if let Ok(mut manager) = runtime.manager.try_write() {
manager.touch(agent_id);
manager.record_worker_progress(agent_id, message.clone());
}
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
agent_id,
message,
);
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn run_subagent(
runtime: &SubAgentRuntime,
agent_id: String,
agent_type: SubAgentType,
prompt: String,
assignment: SubAgentAssignment,
allowed_tools: Option<Vec<String>>,
fork_context: bool,
started_at: Instant,
max_steps: u32,
mut input_rx: mpsc::UnboundedReceiver<SubAgentInput>,
) -> Result<SubAgentResult> {
let system_prompt = build_subagent_system_prompt(&agent_type, &assignment);
let fork_context_enabled = fork_context;
let fork_context = fork_context_enabled
.then_some(runtime.fork_context.as_ref())
.flatten();
let request_system = subagent_request_system_prompt(&system_prompt, fork_context);
let mut messages =
build_initial_subagent_messages(&prompt, &assignment, &agent_type, fork_context);
let runtime_for_tools = runtime.clone().with_fork_context(SubAgentForkContext {
system: Some(request_system.clone()),
messages: messages.clone(),
structured_state_block: None,
});
let tool_registry = SubAgentToolRegistry::new(
runtime_for_tools,
agent_type.clone(),
allowed_tools.clone(),
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let unavailable_tools = tool_registry.unavailable_allowed_tools();
if !unavailable_tools.is_empty() {
return Err(anyhow!(
"Sub-agent requested unavailable tools: {}",
unavailable_tools.join(", ")
));
}
let tools = tool_registry.tools_for_model(&agent_type);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::started(&agent_id, agent_type.clone()));
}
record_agent_progress(
runtime,
&agent_id,
format!("started ({})", agent_type.as_str()),
);
let mut steps = 0;
let mut final_result: Option<String> = None;
let mut pending_inputs: VecDeque<SubAgentInput> = VecDeque::new();
let mut consecutive_truncated_responses = 0;
let mut latest_checkpoint: Option<SubAgentCheckpoint> = None;
for _step in 0..max_steps {
if runtime.cancel_token.is_cancelled() {
record_agent_progress(
runtime,
&agent_id,
format!("{}: cancelled", format_step_counter(steps, max_steps)),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
let status = SubAgentStatus::Cancelled;
let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
None,
latest_checkpoint.as_ref(),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
return Ok(SubAgentResult {
name: agent_id.clone(),
agent_id: agent_id.clone(),
context_mode: if fork_context_enabled {
"forked"
} else {
"fresh"
}
.to_string(),
fork_context: fork_context_enabled,
workspace: Some(runtime.context.workspace.clone()),
git_branch: current_git_branch(&runtime.context.workspace),
agent_type: agent_type.clone(),
assignment: assignment.clone(),
model: runtime.model.clone(),
nickname: None,
status,
worker_status: None,
result: None,
steps_taken: steps,
checkpoint: latest_checkpoint.clone(),
needs_input: None,
duration_ms,
from_prior_session: false,
});
}
steps += 1;
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: requesting model response",
format_step_counter(steps, max_steps)
),
);
while let Ok(input) = input_rx.try_recv() {
if input.interrupt {
pending_inputs.clear();
}
pending_inputs.push_back(input);
}
while let Some(input) = pending_inputs.pop_front() {
if !input.text.trim().is_empty() {
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: input.text,
cache_control: None,
}],
});
}
}
let request = MessageRequest {
model: runtime.model.clone(),
messages: messages.clone(),
max_tokens: SUBAGENT_RESPONSE_MAX_TOKENS,
system: Some(request_system.clone()),
tools: Some(tools.clone()),
tool_choice: Some(json!({ "type": "auto" })),
metadata: None,
thinking: None,
reasoning_effort: runtime.reasoning_effort.clone(),
stream: Some(false),
temperature: None,
top_p: None,
};
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"before_api_request",
&messages,
steps,
true,
)
.await,
);
let response = tokio::select! {
biased;
() = runtime.cancel_token.cancelled() => {
record_agent_progress(
runtime,
&agent_id,
format!("{}: cancelled mid-request", format_step_counter(steps, max_steps)),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
let status = SubAgentStatus::Cancelled;
let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
None,
latest_checkpoint.as_ref(),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
return Ok(SubAgentResult {
name: agent_id.clone(),
agent_id: agent_id.clone(),
context_mode: if fork_context_enabled { "forked" } else { "fresh" }.to_string(),
fork_context: fork_context_enabled,
workspace: Some(runtime.context.workspace.clone()),
git_branch: current_git_branch(&runtime.context.workspace),
agent_type: agent_type.clone(),
assignment: assignment.clone(),
model: runtime.model.clone(),
nickname: None,
status,
worker_status: None,
result: None,
steps_taken: steps,
checkpoint: latest_checkpoint.clone(),
needs_input: None,
duration_ms,
from_prior_session: false,
});
}
api = tokio::time::timeout(runtime.step_api_timeout, runtime.client.create_message(request)) => {
match api {
Ok(response) => response?,
Err(_) => {
let reason = format!(
"API call timed out after {}ms; checkpoint preserved for continuation",
runtime.step_api_timeout.as_millis()
);
let checkpoint = checkpoint_subagent_progress(
runtime,
&agent_id,
"api_timeout",
&messages,
steps,
true,
)
.await;
record_agent_progress(
runtime,
&agent_id,
format!("{}: interrupted; {reason}", format_step_counter(steps, max_steps)),
);
let status = SubAgentStatus::Interrupted(reason.clone());
let duration_ms =
u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
Some(&reason),
Some(&checkpoint),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
let needs_input =
needs_input_for_interrupted_checkpoint(&reason, &checkpoint);
let interrupted_snapshot = {
let mut manager = runtime.manager.write().await;
manager.interrupt_with_checkpoint(
&agent_id,
reason.clone(),
checkpoint.clone(),
Some(needs_input.clone()),
)?
};
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: waiting for user; {}",
format_step_counter(steps, max_steps),
needs_input.question
),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Interrupted {
agent_id: agent_id.clone(),
reason: reason.clone(),
});
}
return Ok(interrupted_snapshot);
}
}
}
};
let mut tool_uses = Vec::new();
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::token_usage(
&agent_id,
response.model.clone(),
response.usage.clone(),
));
}
for block in &response.content {
match block {
ContentBlock::Text { text, .. } if !text.trim().is_empty() => {
final_result = Some(text.clone());
}
ContentBlock::ToolUse {
id, name, input, ..
} => {
tool_uses.push((id.clone(), name.clone(), input.clone()));
}
_ => {}
}
}
messages.push(Message {
role: "assistant".to_string(),
content: response.content.clone(),
});
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"after_model_response",
&messages,
steps,
true,
)
.await,
);
if response_was_truncated(&response) {
final_result = None;
record_truncated_subagent_response(&mut consecutive_truncated_responses)?;
let progress = if tool_uses.is_empty() {
"response truncated, returning retry instruction".to_string()
} else {
format!(
"response truncated, returning {} tool error(s)",
tool_uses.len()
)
};
record_agent_progress(
runtime,
&agent_id,
format!("{}: {progress}", format_step_counter(steps, max_steps)),
);
messages.push(Message {
role: "user".to_string(),
content: if tool_uses.is_empty() {
truncated_response_text_retry_message()
} else {
truncated_response_tool_results(&tool_uses)
},
});
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"after_truncated_response_retry_message",
&messages,
steps,
true,
)
.await,
);
continue;
}
reset_truncated_subagent_responses(&mut consecutive_truncated_responses);
if tool_uses.is_empty() {
while let Ok(input) = input_rx.try_recv() {
if input.interrupt {
pending_inputs.clear();
}
pending_inputs.push_back(input);
}
if pending_inputs.is_empty() {
record_agent_progress(
runtime,
&agent_id,
format!("{}: complete", format_step_counter(steps, max_steps)),
);
break;
}
continue;
}
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: executing {} tool call(s)",
format_step_counter(steps, max_steps),
tool_uses.len()
),
);
let mut tool_results: Vec<ContentBlock> = Vec::new();
for (tool_id, tool_name, tool_input) in tool_uses {
let tool_display_name = subagent_progress_tool_display_name(&tool_name);
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: running tool '{tool_display_name}'",
format_step_counter(steps, max_steps)
),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallStarted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
});
}
let result = match tokio::time::timeout(TOOL_TIMEOUT, async {
tool_registry
.execute(&agent_id, &tool_name, tool_input)
.await
})
.await
{
Ok(Ok(output)) => output,
Ok(Err(e)) => format!("Error: {e}"),
Err(_) => format!("Error: Tool {tool_name} timed out"),
};
let tool_ok = !result.starts_with("Error:");
record_agent_progress(
runtime,
&agent_id,
format!(
"{}: finished tool '{tool_display_name}'",
format_step_counter(steps, max_steps)
),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallCompleted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
ok: tool_ok,
});
}
tool_results.push(ContentBlock::ToolResult {
tool_use_id: tool_id,
content: result,
is_error: None,
content_blocks: None,
});
}
if !tool_results.is_empty() {
messages.push(Message {
role: "user".to_string(),
content: tool_results,
});
latest_checkpoint = Some(
checkpoint_subagent_progress(
runtime,
&agent_id,
"after_tool_results",
&messages,
steps,
true,
)
.await,
);
}
}
release_resident_leases_for(&agent_id);
let status = SubAgentStatus::Completed;
let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
latest_checkpoint = Some(build_subagent_checkpoint(
&agent_id,
"completed",
&messages,
steps,
false,
));
insert_subagent_full_transcript_handle(
runtime,
&agent_id,
&agent_type,
&assignment,
&status,
final_result.as_ref(),
latest_checkpoint.as_ref(),
&messages,
steps,
duration_ms,
fork_context_enabled,
)
.await;
Ok(SubAgentResult {
name: agent_id.clone(),
agent_id,
context_mode: if fork_context_enabled {
"forked"
} else {
"fresh"
}
.to_string(),
fork_context: fork_context_enabled,
workspace: Some(runtime.context.workspace.clone()),
git_branch: current_git_branch(&runtime.context.workspace),
agent_type,
assignment,
model: runtime.model.clone(),
nickname: None,
status,
worker_status: None,
result: final_result,
steps_taken: steps,
checkpoint: latest_checkpoint,
needs_input: None,
duration_ms,
from_prior_session: false,
})
}
fn optional_input_str<'a>(input: &'a Value, keys: &[&str]) -> Option<&'a str> {
keys.iter()
.filter_map(|key| input.get(*key).and_then(Value::as_str))
.map(str::trim)
.find(|value| !value.is_empty())
}
fn parse_text_or_items(
input: &Value,
text_keys: &[&str],
items_key: &str,
required_field: &str,
) -> Result<String, ToolError> {
let text = optional_input_str(input, text_keys).map(str::to_string);
let items = parse_items_text(input, items_key)?;
match (text, items) {
(Some(_), Some(_)) => Err(ToolError::invalid_input(format!(
"Provide either {required_field} text or {items_key}, but not both"
))),
(Some(text), None) => Ok(text),
(None, Some(items)) => Ok(items),
(None, None) => Err(ToolError::missing_field(required_field)),
}
}
fn parse_items_text(input: &Value, key: &str) -> Result<Option<String>, ToolError> {
let Some(items) = input.get(key) else {
return Ok(None);
};
let array = items
.as_array()
.ok_or_else(|| ToolError::invalid_input(format!("'{key}' must be an array")))?;
if array.is_empty() {
return Err(ToolError::invalid_input(format!("'{key}' cannot be empty")));
}
let mut lines = Vec::new();
for item in array {
let object = item
.as_object()
.ok_or_else(|| ToolError::invalid_input("each item must be an object"))?;
let item_type = object
.get("type")
.and_then(Value::as_str)
.unwrap_or("text")
.trim();
let rendered = match item_type {
"text" => object
.get("text")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.map(str::to_string)
.ok_or_else(|| ToolError::invalid_input("text item requires non-empty text"))?,
"mention" => {
let name = object
.get("name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("mention item requires name"))?;
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("mention item requires path"))?;
format!("[mention:${name}]({path})")
}
"skill" => {
let name = object
.get("name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("skill item requires name"))?;
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("skill item requires path"))?;
format!("[skill:${name}]({path})")
}
"local_image" => {
let path = object
.get("path")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("local_image item requires path"))?;
format!("[local_image:{path}]")
}
"image" => {
let url = object
.get("image_url")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.ok_or_else(|| ToolError::invalid_input("image item requires image_url"))?;
format!("[image:{url}]")
}
_ => object
.get("text")
.and_then(Value::as_str)
.map(str::trim)
.filter(|text| !text.is_empty())
.map(str::to_string)
.unwrap_or_else(|| "[input]".to_string()),
};
lines.push(rendered);
}
Ok(Some(lines.join("\n")))
}
fn parse_spawn_request(input: &Value) -> Result<SpawnRequest, ToolError> {
let prompt = parse_text_or_items(
input,
&["prompt", "message", "objective"],
"items",
"prompt",
)?;
let session_name = optional_input_str(input, &["name", "session_name"])
.map(validate_session_name)
.transpose()?;
let type_input = optional_input_str(input, &["type", "agent_type", "agent_name"]);
let role_input = optional_input_str(input, &["role", "agent_role"]);
let parsed_type = type_input
.map(|kind| {
SubAgentType::from_str(kind).ok_or_else(|| {
ToolError::invalid_input(format!(
"Invalid sub-agent type '{kind}'. Use: {VALID_SUBAGENT_TYPES}"
))
})
})
.transpose()?;
let parsed_role_type = role_input
.map(|role| {
SubAgentType::from_str(role).ok_or_else(|| {
ToolError::invalid_input(format!(
"Invalid role alias '{role}'. Use: {VALID_ROLE_ALIASES}"
))
})
})
.transpose()?;
if let (Some(type_kind), Some(role_kind)) = (&parsed_type, &parsed_role_type)
&& type_kind != role_kind
{
return Err(ToolError::invalid_input(
"Conflicting type/agent_type and role/agent_role values".to_string(),
));
}
let agent_type = parsed_type
.or(parsed_role_type)
.unwrap_or(SubAgentType::General);
if let Some(role) = role_input
&& normalize_role_alias(role).is_none()
{
return Err(ToolError::invalid_input(format!(
"Invalid role alias '{role}'. Use: {VALID_ROLE_ALIASES}"
)));
}
let role = role_input
.and_then(normalize_role_alias)
.or_else(|| type_input.and_then(normalize_role_alias))
.map(str::to_string);
let allowed_tools = input
.get("allowed_tools")
.and_then(|v| v.as_array())
.map(|items| {
let mut tools = Vec::new();
for item in items {
if let Some(tool) = item.as_str() {
let trimmed = tool.trim();
if !trimmed.is_empty() && !tools.iter().any(|existing| existing == trimmed) {
tools.push(trimmed.to_string());
}
}
}
tools
});
let cwd = parse_optional_cwd(input)?;
let model = parse_optional_subagent_model(input, "model")?;
let model_strength = optional_input_str(input, &["model_strength", "modelStrength"])
.map(SubAgentModelStrength::parse)
.transpose()?
.unwrap_or(SubAgentModelStrength::Same);
let thinking = optional_input_str(input, &["thinking", "reasoning_effort", "reasoningEffort"])
.map(SubAgentThinking::parse)
.transpose()?
.unwrap_or(SubAgentThinking::Inherit);
let resident_file = input
.get("resident_file")
.and_then(|v| v.as_str())
.map(str::to_string)
.filter(|s| !s.trim().is_empty());
let fork_context =
parse_optional_bool(input, &["fork_context", "forkContext", "inherit_context"])
.unwrap_or(false);
let max_depth = input
.get("max_depth")
.or_else(|| input.get("maxDepth"))
.or_else(|| input.get("max_spawn_depth"))
.and_then(Value::as_u64)
.map(|depth| {
let ceiling = codewhale_config::MAX_SPAWN_DEPTH_CEILING;
u32::try_from(depth)
.map_err(|_| {
ToolError::invalid_input(format!("max_depth must be between 0 and {ceiling}"))
})
.and_then(|depth| {
if depth <= ceiling {
Ok(depth)
} else {
Err(ToolError::invalid_input(format!(
"max_depth must be between 0 and {ceiling}"
)))
}
})
})
.transpose()?;
Ok(SpawnRequest {
session_name,
prompt: prompt.clone(),
agent_type,
assignment: SubAgentAssignment::new(prompt, role),
allowed_tools,
model,
model_strength,
thinking,
cwd,
resident_file,
fork_context,
max_depth,
})
}
fn validate_session_name(name: &str) -> Result<String, ToolError> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(ToolError::invalid_input("name cannot be blank"));
}
if trimmed.chars().any(char::is_whitespace) {
return Err(ToolError::invalid_input(
"name must not contain whitespace; use letters, numbers, '-', '_', or '.'",
));
}
if !trimmed
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.'))
{
return Err(ToolError::invalid_input(
"name may only contain ASCII letters, numbers, '-', '_', or '.'",
));
}
Ok(trimmed.to_string())
}
fn parse_optional_bool(input: &Value, names: &[&str]) -> Option<bool> {
names
.iter()
.find_map(|name| input.get(*name))
.and_then(Value::as_bool)
}
fn with_default_fork_context(mut input: Value, default: bool) -> Value {
let Some(object) = input.as_object_mut() else {
return input;
};
if !object.contains_key("fork_context")
&& !object.contains_key("forkContext")
&& !object.contains_key("inherit_context")
{
object.insert("fork_context".to_string(), Value::Bool(default));
}
input
}
pub(crate) fn normalize_requested_subagent_model(
value: &str,
field: &str,
provider: crate::config::ApiProvider,
) -> Result<String, ToolError> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(ToolError::invalid_input(format!("{field} cannot be blank")));
}
crate::config::requested_model_for_provider(provider, trimmed).ok_or_else(|| {
let valid_names = crate::config::model_completion_names_for_provider(provider);
let valid_hint = if valid_names.is_empty() {
String::new()
} else {
format!(" (accepted: {})", valid_names.join(", "))
};
ToolError::invalid_input(format!(
"Invalid {field} '{trimmed}' for provider {}{valid_hint}",
provider_name_for_error(provider)
))
})
}
fn provider_name_for_error(provider: crate::config::ApiProvider) -> &'static str {
match provider {
crate::config::ApiProvider::Deepseek | crate::config::ApiProvider::DeepseekCN => "DeepSeek",
crate::config::ApiProvider::Openai | crate::config::ApiProvider::OpenaiCodex => "OpenAI",
crate::config::ApiProvider::Moonshot => "Moonshot",
crate::config::ApiProvider::Ollama => "Ollama",
_ => "this provider",
}
}
pub(crate) fn configured_model_for_role_or_type(
runtime: &SubAgentRuntime,
role: Option<&str>,
agent_type: &SubAgentType,
) -> Result<Option<String>, ToolError> {
let mut keys = Vec::new();
if let Some(role) = role.map(str::trim).filter(|role| !role.is_empty()) {
keys.push(role.to_ascii_lowercase());
}
keys.push(agent_type.as_str().to_string());
keys.push("default".to_string());
for key in keys {
if let Some(model) = runtime.role_models.get(&key) {
return normalize_requested_subagent_model(
model,
&format!("subagents.{key}.model"),
runtime.client.api_provider(),
)
.map(Some);
}
}
Ok(None)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct SubAgentResolvedRoute {
pub(crate) model_route: ModelRoute,
pub(crate) model: String,
pub(crate) reasoning_effort: Option<String>,
pub(crate) tuning: RequestTuning,
}
impl SubAgentResolvedRoute {
fn new(
model_route: ModelRoute,
model: String,
reasoning_effort: Option<String>,
) -> SubAgentResolvedRoute {
let tuning = subagent_request_tuning(reasoning_effort.as_deref());
SubAgentResolvedRoute {
model_route,
model,
reasoning_effort,
tuning,
}
}
fn refresh_tuning(&mut self) {
self.tuning = subagent_request_tuning(self.reasoning_effort.as_deref());
}
}
pub(crate) async fn resolve_subagent_assignment_route(
runtime: &SubAgentRuntime,
configured_model: Option<String>,
prompt: &str,
agent_type: &SubAgentType,
requested_model_route: ModelRoute,
requested_thinking: SubAgentThinking,
) -> SubAgentResolvedRoute {
let model_route = assignment_model_route(configured_model.as_deref(), requested_model_route);
worker_profile_subagent_assignment_route(
runtime,
&model_route,
requested_thinking,
prompt,
agent_type,
)
}
fn assignment_model_route(
configured_model: Option<&str>,
requested_model_route: ModelRoute,
) -> ModelRoute {
if let Some(model) = configured_model
.map(str::trim)
.filter(|model| !model.is_empty())
{
return ModelRoute::Fixed(model.to_string());
}
requested_model_route
}
fn subagent_request_tuning(reasoning_effort: Option<&str>) -> RequestTuning {
RequestTuning {
reasoning_effort: reasoning_effort.map(ReasoningEffort::from_setting),
max_output_tokens: Some(SUBAGENT_RESPONSE_MAX_TOKENS),
}
}
fn subagent_router_candidates(runtime: &SubAgentRuntime) -> crate::model_routing::RouterCandidates {
crate::model_routing::provider_router_candidates(runtime.client.api_provider(), &runtime.model)
}
fn fallback_subagent_assignment_route(
runtime: &SubAgentRuntime,
configured_model: Option<String>,
requested_model_route: ModelRoute,
requested_thinking: SubAgentThinking,
prompt: &str,
) -> SubAgentResolvedRoute {
let model_route = assignment_model_route(configured_model.as_deref(), requested_model_route);
worker_profile_subagent_assignment_route(
runtime,
&model_route,
requested_thinking,
prompt,
&SubAgentType::General,
)
}
fn worker_profile_subagent_assignment_route(
runtime: &SubAgentRuntime,
model_route: &ModelRoute,
requested_thinking: SubAgentThinking,
prompt: &str,
_agent_type: &SubAgentType,
) -> SubAgentResolvedRoute {
let candidates = subagent_router_candidates(runtime);
let mut requested_fast_lane = false;
let model = match model_route {
ModelRoute::Fixed(model) => model.clone(),
ModelRoute::Faster | ModelRoute::Auto => {
requested_fast_lane = true;
candidates
.cheap
.clone()
.unwrap_or_else(|| runtime.model.clone())
}
ModelRoute::Inherit => runtime.model.clone(),
};
let reasoning_effort = subagent_reasoning_effort_for_request(
runtime,
prompt,
requested_fast_lane,
requested_thinking,
);
SubAgentResolvedRoute::new(model_route.clone(), model, reasoning_effort)
}
fn subagent_reasoning_effort_for_request(
runtime: &SubAgentRuntime,
prompt: &str,
requested_fast_lane: bool,
requested_thinking: SubAgentThinking,
) -> Option<String> {
match requested_thinking {
SubAgentThinking::Effort(effort) => Some(effort.as_setting().to_string()),
SubAgentThinking::Auto => Some(
auto_subagent_reasoning_effort(prompt)
.as_setting()
.to_string(),
),
SubAgentThinking::Inherit if requested_fast_lane => {
Some(ReasoningEffort::Off.as_setting().to_string())
}
SubAgentThinking::Inherit => fallback_subagent_reasoning_effort(runtime, prompt),
}
}
fn fallback_subagent_reasoning_effort(runtime: &SubAgentRuntime, prompt: &str) -> Option<String> {
if runtime.reasoning_effort_auto {
Some(
auto_subagent_reasoning_effort(prompt)
.as_setting()
.to_string(),
)
} else {
runtime.reasoning_effort.clone()
}
}
fn auto_subagent_reasoning_effort(prompt: &str) -> ReasoningEffort {
match crate::auto_reasoning::select(false, prompt) {
ReasoningEffort::Low | ReasoningEffort::Medium => ReasoningEffort::High,
other => other,
}
}
fn parse_optional_subagent_model(input: &Value, key: &str) -> Result<Option<String>, ToolError> {
match input.get(key) {
None | Some(Value::Null) => Ok(None),
Some(Value::String(value)) => {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(ToolError::invalid_input(format!("{key} cannot be blank")));
}
Ok(Some(trimmed.to_string()))
}
Some(_) => Err(ToolError::invalid_input(format!("{key} must be a string"))),
}
}
fn parse_optional_cwd(input: &Value) -> Result<Option<PathBuf>, ToolError> {
let raw = input.get("cwd").and_then(|v| v.as_str()).map(str::trim);
match raw {
None | Some("") => Ok(None),
Some(s) => Ok(Some(PathBuf::from(s))),
}
}
fn normalize_role_alias(input: &str) -> Option<&'static str> {
match input.to_ascii_lowercase().as_str() {
"default" => Some("default"),
"worker" | "general" | "general-purpose" | "general_purpose" => Some("worker"),
"explorer" | "explore" | "exploration" => Some("explorer"),
"awaiter" | "plan" | "planner" | "planning" => Some("awaiter"),
"reviewer" | "review" | "code-review" | "code_review" => Some("reviewer"),
"implementer" | "implement" | "implementation" | "builder" => Some("implementer"),
"verifier" | "verify" | "verification" | "validator" | "tester" => Some("verifier"),
"custom" => Some("custom"),
_ => None,
}
}
fn build_assignment_prompt(
prompt: &str,
assignment: &SubAgentAssignment,
agent_type: &SubAgentType,
) -> String {
let role = assignment.role.as_deref().unwrap_or("default");
format!(
"Assignment metadata:\n- objective: {}\n- role: {}\n- resolved_type: {}\n\nTask:\n{}",
assignment.objective,
role,
agent_type.as_str(),
prompt
)
}
fn worker_status_from_subagent_status(status: &SubAgentStatus) -> AgentWorkerStatus {
match status {
SubAgentStatus::Running => AgentWorkerStatus::Running,
SubAgentStatus::Completed => AgentWorkerStatus::Completed,
SubAgentStatus::Failed(_) => AgentWorkerStatus::Failed,
SubAgentStatus::Cancelled => AgentWorkerStatus::Cancelled,
SubAgentStatus::Interrupted(_) => AgentWorkerStatus::Interrupted,
}
}
pub fn agent_worker_status_name(status: AgentWorkerStatus) -> &'static str {
match status {
AgentWorkerStatus::Queued => "queued",
AgentWorkerStatus::Starting => "starting",
AgentWorkerStatus::Running => "running",
AgentWorkerStatus::WaitingForUser => "waiting_for_user",
AgentWorkerStatus::ModelWait => "model_wait",
AgentWorkerStatus::RunningTool => "running_tool",
AgentWorkerStatus::Completed => "completed",
AgentWorkerStatus::Failed => "failed",
AgentWorkerStatus::Cancelled => "cancelled",
AgentWorkerStatus::Interrupted => "interrupted",
}
}
fn worker_status_from_subagent_result(result: &SubAgentResult) -> AgentWorkerStatus {
if subagent_checkpoint_is_continuable(result) {
AgentWorkerStatus::WaitingForUser
} else {
worker_status_from_subagent_status(&result.status)
}
}
fn worker_progress_event_parts(message: &str) -> (AgentWorkerStatus, Option<u32>, Option<String>) {
let step = parse_progress_step(message);
let lower = message.to_ascii_lowercase();
let status = if lower.contains("queued") {
AgentWorkerStatus::Queued
} else if lower.contains("waiting for user") || lower.contains("waiting for follow-up") {
AgentWorkerStatus::WaitingForUser
} else if lower.contains("requesting model response")
|| lower.contains(SUBAGENT_MODEL_WAIT_REASON)
{
AgentWorkerStatus::ModelWait
} else if lower.contains("running tool") || lower.contains("executing") {
AgentWorkerStatus::RunningTool
} else if lower.contains("cancelled") {
AgentWorkerStatus::Cancelled
} else if lower.contains("interrupted") || lower.contains("timed out") {
AgentWorkerStatus::Interrupted
} else if lower.contains("complete") {
AgentWorkerStatus::Completed
} else if lower.contains("started") {
AgentWorkerStatus::Starting
} else {
AgentWorkerStatus::Running
};
(status, step, parse_progress_tool_name(message))
}
fn parse_progress_step(message: &str) -> Option<u32> {
let rest = message.strip_prefix("step ")?;
let digits: String = rest.chars().take_while(|ch| ch.is_ascii_digit()).collect();
(!digits.is_empty())
.then(|| digits.parse::<u32>().ok())
.flatten()
}
fn parse_progress_tool_name(message: &str) -> Option<String> {
let marker = "tool '";
let start = message.find(marker)? + marker.len();
let rest = &message[start..];
let end = rest.find('\'')?;
let tool = rest[..end].trim();
(!tool.is_empty()).then(|| tool.to_string())
}
fn subagent_progress_tool_display_name(name: &str) -> &str {
match name {
"exec_shell"
| "exec_shell_wait"
| "exec_shell_interact"
| "exec_wait"
| "exec_interact"
| "task_shell_start"
| "task_shell_wait" => "Bash",
_ => name,
}
}
fn emit_agent_progress(
event_tx: Option<&mpsc::Sender<Event>>,
mailbox: Option<&Mailbox>,
agent_id: &str,
status: String,
) {
if let Some(mb) = mailbox {
let _ = mb.send(MailboxMessage::progress(agent_id, status.clone()));
}
if let Some(event_tx) = event_tx {
let _ = event_tx.try_send(Event::AgentProgress {
id: agent_id.to_string(),
status,
});
}
}
fn role_posture_permits(agent_type: &SubAgentType, approval: ApprovalRequirement) -> bool {
if matches!(agent_type, SubAgentType::Custom) {
return true;
}
let profile = WorkerRuntimeProfile::for_role(agent_type.clone());
match approval {
ApprovalRequirement::Auto => true,
ApprovalRequirement::Suggest => profile.permissions.write,
ApprovalRequirement::Required => {
matches!(profile.shell, crate::worker_profile::ShellPolicy::Full)
}
}
}
struct SubAgentToolRegistry {
allowed_tools: Option<Vec<String>>,
auto_approve: bool,
agent_type: SubAgentType,
can_spawn_child: bool,
registry: ToolRegistry,
}
impl SubAgentToolRegistry {
fn new(
runtime: SubAgentRuntime,
agent_type: SubAgentType,
explicit_allowed_tools: Option<Vec<String>>,
todo_list: SharedTodoList,
plan_state: SharedPlanState,
) -> Self {
let can_spawn_child = !runtime.would_exceed_depth();
let context = runtime.context.clone();
let mut registry = ToolRegistryBuilder::new().with_full_agent_surface(
Some(runtime.client.clone()),
runtime.model.clone(),
runtime.manager.clone(),
runtime.clone(),
runtime.allow_shell,
todo_list,
plan_state,
);
if let Some(pool) = runtime.mcp_pool.as_ref() {
registry = registry.with_mcp_tools(std::sync::Arc::clone(pool));
}
let registry = registry.build(context);
Self {
allowed_tools: explicit_allowed_tools,
auto_approve: runtime.context.auto_approve,
agent_type,
can_spawn_child,
registry,
}
}
fn role_can_delegate_writes(agent_type: &SubAgentType) -> bool {
matches!(agent_type, SubAgentType::Implementer | SubAgentType::Custom)
}
fn posture_permits_tool(&self, name: &str) -> bool {
if name == "agent" {
return true;
}
match self.registry.get(name) {
Some(spec) => role_posture_permits(&self.agent_type, spec.approval_requirement()),
None => true,
}
}
fn is_tool_allowed(&self, name: &str) -> bool {
if name == "agent" && !self.can_spawn_child {
return false;
}
match &self.allowed_tools {
None => true,
Some(list) => list.iter().any(|t| t == name),
}
}
fn tools_for_model(&self, agent_type: &SubAgentType) -> Vec<Tool> {
let _ = agent_type;
let api_tools = self.registry.to_api_tools();
let filtered = match &self.allowed_tools {
None => api_tools,
Some(list) => api_tools
.into_iter()
.filter(|tool| list.contains(&tool.name))
.collect::<Vec<_>>(),
};
filtered
.into_iter()
.filter(|tool| tool.name != "agent" || self.can_spawn_child)
.filter(|tool| self.posture_permits_tool(&tool.name))
.collect()
}
fn unavailable_allowed_tools(&self) -> Vec<String> {
match &self.allowed_tools {
None => Vec::new(),
Some(list) => list
.iter()
.filter(|name| !self.registry.contains(name))
.cloned()
.collect(),
}
}
async fn execute(&self, _agent_id: &str, name: &str, input: Value) -> Result<String> {
if !self.is_tool_allowed(name) {
return Err(anyhow!("Tool {name} not allowed for this sub-agent"));
}
if !self.posture_permits_tool(name) {
return Err(anyhow!(
"Tool {name} is not permitted for the read-only `{role}` sub-agent role. Use an `implementer` or `general` role (or a `custom` role with an explicit allowed_tools list) to mutate the workspace or run shell commands.",
role = self.agent_type.as_str()
));
}
if !self.auto_approve {
let Some(spec) = self.registry.get(name) else {
return Err(anyhow!("Tool {name} is not registered"));
};
match spec.approval_requirement() {
ApprovalRequirement::Auto => {}
ApprovalRequirement::Suggest => {
if !Self::role_can_delegate_writes(&self.agent_type) {
return Err(anyhow!(
"Tool {name} requires approval and is not delegated to {role} sub-agents; rerun the parent with auto approval or pick a write-capable role",
role = self.agent_type.as_str()
));
}
}
ApprovalRequirement::Required => {
return Err(anyhow!(
"Tool {name} requires approval and cannot run inside this sub-agent unless the parent session is auto-approved"
));
}
}
}
reject_subagent_terminal_takeover(name, &input)?;
self.registry
.execute(name, input)
.await
.map_err(|e| anyhow!(e))
}
}
fn reject_subagent_terminal_takeover(name: &str, input: &Value) -> Result<()> {
let wants_interactive_shell = name == "exec_shell"
&& input
.get("interactive")
.and_then(Value::as_bool)
.unwrap_or(false);
if wants_interactive_shell {
return Err(anyhow!(
"Sub-agents run in the background and cannot use exec_shell with interactive=true \
because that would take over the parent TUI terminal. Use non-interactive \
exec_shell, background=true, tty=true, or task_shell_start instead."
));
}
Ok(())
}
fn build_allowed_tools(
agent_type: &SubAgentType,
explicit_tools: Option<Vec<String>>,
_allow_shell: bool,
) -> Result<Option<Vec<String>>> {
if let Some(tools) = explicit_tools {
let mut deduped = Vec::new();
for tool in tools {
let name = tool.trim();
if !name.is_empty() && !deduped.iter().any(|existing: &String| existing == name) {
deduped.push(name.to_string());
}
}
if matches!(agent_type, SubAgentType::Custom) && deduped.is_empty() {
return Err(anyhow!(
"Custom sub-agent requires a non-empty allowed_tools list"
));
}
return Ok(Some(deduped));
}
if matches!(agent_type, SubAgentType::Custom) {
return Err(anyhow!(
"Custom sub-agent requires a non-empty allowed_tools list"
));
}
Ok(None)
}
fn annotate_child_model_error(err: &str, model: &str) -> String {
match crate::error_taxonomy::classify_error_message(err) {
crate::error_taxonomy::ErrorCategory::Authorization
| crate::error_taxonomy::ErrorCategory::State => format!(
"{err}\n(child model `{model}` may be unavailable under the current access profile — \
remove the explicit child model override or adjust child-agent model config before retrying)"
),
_ => {
let lower = err.to_ascii_lowercase();
if lower.contains("model not exist")
|| lower.contains("model_not_found")
|| lower.contains("does not exist")
|| lower.contains("no such model")
|| lower.contains("invalid model")
{
format!(
"{err}\n(child model `{model}` may be unavailable under the current access profile — \
remove the explicit child model override or adjust child-agent model config before retrying)"
)
} else {
err.to_string()
}
}
}
}
fn summarize_subagent_result(result: &SubAgentResult) -> String {
if let Some(needs_input) = result.needs_input.as_ref() {
return format!("Needs input: {}", needs_input.question);
}
match (&result.status, result.result.as_ref()) {
(SubAgentStatus::Completed, Some(text)) => text.clone(),
(SubAgentStatus::Completed, None) => "Completed (no output)".to_string(),
(SubAgentStatus::Interrupted(error), _) => format!("Interrupted: {error}"),
(SubAgentStatus::Cancelled, _) => "Cancelled".to_string(),
(SubAgentStatus::Failed(error), _) => format!("Failed: {error}"),
(SubAgentStatus::Running, _) => "Running".to_string(),
}
}
fn subagent_status_name(status: &SubAgentStatus) -> &'static str {
match status {
SubAgentStatus::Running => "running",
SubAgentStatus::Completed => "completed",
SubAgentStatus::Interrupted(_) => "interrupted",
SubAgentStatus::Failed(_) => "failed",
SubAgentStatus::Cancelled => "cancelled",
}
}
const SUBAGENT_OUTPUT_FORMAT: &str = include_str!("../../prompts/subagent_output_format.md");
const GENERAL_AGENT_INTRO: &str = concat!(
"You are a trusted general-purpose sub-agent. Your job is to complete the one task you were given, end-to-end, and report back concisely.\n",
"Stay inside the assigned scope; put adjacent work under RISKS/BLOCKERS.\n",
"For genuinely multi-step work, track progress with `checklist_write` (and `update_plan` for complex strategy); skip it for short, focused tasks.\n",
"**Stop quickly on failure**: if the same tool call fails 2 times in a row, stop retrying and return what you have so far with a one-line note explaining what's missing. Do not loop on impossible queries (e.g. external API unreachable, rate-limited, or returning empty).\n",
"**Bounded effort**: prefer one focused attempt over many speculative retries. If you cannot complete the task with available data within 3-5 tool calls, return your current partial findings — the parent agent can compensate with its own knowledge.\n\n"
);
const EXPLORE_AGENT_INTRO: &str = concat!(
"You are a trusted exploration sub-agent (role: `explore`). Your job is to map the relevant code quickly and stay strictly read-only.\n",
"Orient first: confirm the workspace/project root, read relevant AGENTS.md/README guidance when the tree is unfamiliar, then search only the likely scope.\n",
"Use list_dir/file_search, grep_files, and read_file; use RLM only for long inputs or many semantic slices, not basic path discovery.\n",
"DeepSeek V4 can hold broad evidence, but your value is compressed reconnaissance: cite `path:line-range` for each finding and stop once evidence is sufficient.\n",
"CHANGES will almost always be \"None.\" for an explorer.\n\n"
);
const PLAN_AGENT_INTRO: &str = concat!(
"You are a trusted planning sub-agent (role: `plan`). Your job is to produce a grounded, prioritized plan, not patches.\n",
"Read enough code to avoid guessing; each step names its artifact and verification.\n",
"Use update_plan/checklist_write for plan artifacts and explain key trade-offs.\n",
"CHANGES should list plan artifacts only, not future speculative edits.\n\n"
);
const REVIEW_AGENT_INTRO: &str = concat!(
"You are a trusted code review sub-agent (role: `review`). Your job is to find and report severity-scored issues, and stay strictly read-only.\n",
"Read the diff/files, grep sibling patterns/tests, then order EVIDENCE by severity.\n",
"Use BLOCKER/MAJOR/MINOR/NIT and include path:line-range plus suggested fix.\n",
"If no MAJOR+ issues exist, say so plainly in SUMMARY.\n",
"CHANGES will almost always be \"None.\" for a reviewer.\n\n"
);
const CUSTOM_AGENT_INTRO: &str = concat!(
"You are a trusted custom sub-agent (role: `custom`) with a narrowed tool registry. Your job is to stay tightly scoped to the assigned objective.\n",
"Use only tools available at runtime; put missing capabilities under BLOCKERS and stop.\n\n"
);
const IMPLEMENTER_AGENT_INTRO: &str = concat!(
"You are a trusted implementation sub-agent (role: `implementer`). Your job is to land the assigned change with minimal surrounding edits.\n",
"Read target files before editing; prefer edit_file for narrow changes and apply_patch for hunks.\n",
"Run relevant verification after edit batches; write needed tests with the implementation.\n",
"CHANGES is load-bearing: list every modified file with a one-line why.\n\n"
);
const VERIFIER_AGENT_INTRO: &str = concat!(
"You are a trusted verification sub-agent (role: `verifier`). Your job is to run the requested gates and report results, and stay read-only.\n",
"Report PASS/FAIL/FLAKY at the top of SUMMARY with exact command evidence.\n",
"Capture failing assertion and file:line; put obvious fixes under RISKS.\n",
"CHANGES will almost always be \"None.\" for a verifier.\n\n"
);
#[cfg(test)]
mod tests;