use crate::approval_flow::request_approval;
use crate::config::KodaConfig;
use crate::db::{Database, Role};
use crate::engine::{ApprovalDecision, EngineCommand, EngineEvent};
use crate::loop_guard;
use crate::memory;
use crate::persistence::Persistence;
use crate::preview;
use crate::prompt::build_system_prompt;
use crate::providers::{ChatMessage, ToolCall};
use crate::sub_agent_cache::SubAgentCache;
use crate::tool_dispatch::execute_one_tool;
use crate::tools::{self, ToolRegistry};
use crate::trust::{self, ToolApproval, TrustMode, derive_child_trust};
use anyhow::{Context, Result};
use koda_sandbox::{CwdProvider, GitWorktreeProvider, WorkspaceProvider};
#[cfg(target_os = "macos")]
use koda_sandbox::ClonefileProvider;
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
static NEXT_INVOCATION_ID: AtomicU32 = AtomicU32::new(1);
pub(crate) fn next_invocation_id() -> u32 {
NEXT_INVOCATION_ID.fetch_add(1, Ordering::Relaxed)
}
struct InvocationCleanup<'a> {
bg: &'a std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
invocation_id: u32,
}
impl Drop for InvocationCleanup<'_> {
fn drop(&mut self) {
let cancelled = self.bg.cancel_for_spawner(self.invocation_id);
if cancelled > 0 {
tracing::debug!(
spawner = self.invocation_id,
cancelled,
"execute_sub_agent exit: cancelled orphaned bg agents",
);
}
}
}
#[allow(clippy::too_many_arguments)]
async fn run_bg_agent(
project_root: std::path::PathBuf,
parent_config: KodaConfig,
db: Database,
arguments: String,
sub_agent_cache: SubAgentCache,
parent_session: String,
tx: tokio::sync::oneshot::Sender<
Result<crate::bg_agent::BgPayload, crate::bg_agent::BgPayload>,
>,
cancel: CancellationToken,
parent_trust: TrustMode,
parent_sandbox_policy: koda_sandbox::SandboxPolicy,
status_tx: tokio::sync::watch::Sender<crate::bg_agent::AgentStatus>,
) {
let _ = status_tx.send(crate::bg_agent::AgentStatus::Running { iter: 0 });
let (_, mut cmd_rx) = mpsc::channel(1);
let buffering_sink = crate::engine::sink::BufferingSink::new();
let nested_bg = crate::bg_agent::new_shared();
let mut sync_args: serde_json::Value = serde_json::from_str(&arguments).unwrap_or_default();
sync_args["background"] = serde_json::Value::Bool(false);
let sync_arguments = serde_json::to_string(&sync_args).unwrap();
let cancel_for_status = cancel.clone();
let result = execute_sub_agent(
&project_root,
&parent_config,
&db,
&sync_arguments,
parent_trust,
&buffering_sink,
cancel,
&mut cmd_rx,
None,
&sub_agent_cache,
&parent_session,
&nested_bg,
&parent_sandbox_policy,
None,
Some(status_tx.clone()),
)
.await;
let events = buffering_sink.take_lines();
match &result {
Ok(output) => {
let _ = status_tx.send(crate::bg_agent::AgentStatus::Completed {
summary: output.clone(),
});
}
Err(e) => {
let status = if cancel_for_status.is_cancelled() {
crate::bg_agent::AgentStatus::Cancelled
} else {
crate::bg_agent::AgentStatus::Errored {
error: e.to_string(),
}
};
let _ = status_tx.send(status);
}
}
let _ = match result {
Ok(output) => tx.send(Ok((output, events))),
Err(e) => tx.send(Err((format!("Error: {e}"), events))),
};
}
#[tracing::instrument(skip_all, fields(agent_name, cached = false))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn execute_sub_agent<'a>(
project_root: &'a Path,
parent_config: &'a KodaConfig,
db: &'a Database,
arguments: &'a str,
mode: TrustMode,
sink: &'a dyn crate::engine::EngineSink,
cancel: CancellationToken,
cmd_rx: &'a mut mpsc::Receiver<EngineCommand>,
parent_cache: Option<crate::tools::FileReadCache>,
sub_agent_cache: &'a SubAgentCache,
parent_session_id: &'a str,
bg_agents: &'a std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
parent_sandbox_policy: &'a koda_sandbox::SandboxPolicy,
parent_spawner: Option<u32>,
status_tx: Option<tokio::sync::watch::Sender<crate::bg_agent::AgentStatus>>,
) -> impl std::future::Future<Output = Result<String>> + Send + 'a {
async move {
let my_invocation_id = next_invocation_id();
let args: serde_json::Value = serde_json::from_str(arguments)?;
let agent_name = args["agent_name"].as_str().unwrap_or("task");
tracing::Span::current().record("agent_name", agent_name);
let prompt = args["prompt"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing 'prompt'"))?;
let is_fork = agent_name == "fork";
let background = args["background"].as_bool().unwrap_or(false);
if background {
let reservation = bg_agents.reserve(&cancel, parent_spawner);
let task_id = reservation.task_id;
let bg_cancel = reservation.cancel.clone();
let bg_tx = reservation.tx;
let bg_rx = reservation.rx;
let entry_cancel = reservation.cancel;
let bg_status_tx = reservation.status_tx;
let entry_status_rx = reservation.status_rx;
let project_root_owned = project_root.to_path_buf();
let parent_config_owned = parent_config.clone();
let agent_name_owned = agent_name.to_string();
let prompt_owned = prompt.to_string();
let arguments_owned = arguments.to_string();
let sub_agent_cache_owned = sub_agent_cache.clone();
let parent_session_owned = parent_session_id.to_string();
let bg_db = db.clone();
let bg_policy = parent_sandbox_policy.clone();
let bg_trust = mode;
sink.emit(EngineEvent::Info {
message: format!(
" \u{1f680} {agent_name} launched in background (task {task_id})"
),
});
let handle = tokio::spawn(run_bg_agent(
project_root_owned,
parent_config_owned,
bg_db,
arguments_owned,
sub_agent_cache_owned,
parent_session_owned,
bg_tx,
bg_cancel,
bg_trust,
bg_policy,
bg_status_tx,
));
bg_agents.attach(
task_id,
&agent_name_owned,
&prompt_owned,
bg_rx,
entry_cancel,
entry_status_rx,
parent_spawner,
handle,
);
return Ok(format!(
"Background agent '{agent_name_owned}' started (task {task_id}). \
Results will be injected when complete."
));
}
let _cleanup = InvocationCleanup {
bg: bg_agents,
invocation_id: my_invocation_id,
};
if let Some(cached) = sub_agent_cache.get(agent_name, prompt) {
sink.emit(EngineEvent::Info {
message: format!(" \u{26a1} {agent_name}: cache hit, skipping LLM call"),
});
tracing::Span::current().record("cached", true);
return Ok(cached);
}
sink.emit(EngineEvent::SubAgentStart {
agent_name: agent_name.to_string(),
});
let sub_config = if is_fork {
let mut cfg = parent_config.clone();
cfg.trust = derive_child_trust(mode, mode);
assert!(
cfg.trust == mode,
"fork must inherit parent's runtime trust mode exactly"
);
cfg
} else {
let raw = crate::config::KodaConfig::load_agent_json(project_root, agent_name)
.with_context(|| format!("Failed to load sub-agent: {agent_name}"))?;
let mut cfg = crate::config::KodaConfig::load(project_root, agent_name)
.with_context(|| format!("Failed to load sub-agent: {agent_name}"))?;
let agent_has_own_provider = raw.provider.is_some() || raw.base_url.is_some();
if !agent_has_own_provider {
let model_override = raw.model.is_none().then(|| parent_config.model.clone());
cfg = cfg.with_overrides(
Some(parent_config.base_url.clone()),
model_override,
Some(parent_config.provider_type.to_string()),
);
}
let child_trust = cfg.trust;
cfg.trust = derive_child_trust(mode, cfg.trust);
if cfg.trust != child_trust {
tracing::info!(
agent = agent_name,
parent = %mode,
child = %child_trust,
effective = %cfg.trust,
"sub-agent trust clamped to match parent",
);
}
cfg
};
let sub_session = {
let sid = db
.create_session(&sub_config.agent_name, project_root)
.await?;
if is_fork {
let parent_history = db.load_context(parent_session_id).await?;
db.copy_messages_into_session(&sid, &parent_history).await?
}
sid
};
db.insert_message(&sub_session, &Role::User, Some(prompt), None, None, None)
.await?;
let provider = crate::providers::create_provider(&sub_config);
let has_write_tools = !sub_config
.disallowed_tools
.iter()
.any(|t| t == "Write" || t == "Edit");
let workspace: Box<dyn WorkspaceProvider + Send + Sync> = if has_write_tools {
pick_write_provider(project_root, agent_name)
} else {
Box::new(CwdProvider::new(project_root))
};
let effective_root = match workspace.provision(&sub_session).await {
Ok(path) => {
if path != project_root {
sink.emit(EngineEvent::Info {
message: format!(" \u{1f333} {agent_name}: isolated in worktree"),
});
}
path
}
Err(e) if has_write_tools => {
let reason = e.to_string();
tracing::warn!("Workspace provision failed for sub-agent '{agent_name}': {reason}");
sink.emit(EngineEvent::Info {
message: format!(
" \u{26a0}\u{fe0f} {agent_name}: workspace isolation failed, not dispatching ({reason})"
),
});
let marker = workspace_provision_failure_marker(agent_name, &reason);
sub_agent_cache.put(agent_name, prompt, &marker);
return Ok(marker);
}
Err(e) => {
tracing::warn!(
"Workspace provision failed (read-only sub-agent '{agent_name}'): {e}"
);
project_root.to_path_buf()
}
};
let effective_root_ref = effective_root.as_path();
let tools = {
let registry = ToolRegistry::with_trust(
effective_root.clone(),
sub_config.max_context_tokens,
sub_config.trust,
);
let composed_policy = crate::sandbox::compose_child_policy(
parent_sandbox_policy,
sub_config.trust,
effective_root_ref,
);
let registry = registry.with_sandbox_policy(composed_policy);
match parent_cache {
Some(cache) => registry.with_shared_cache(cache),
None => registry,
}
};
let tool_defs = {
let mut denied = sub_config.disallowed_tools.clone();
if !denied.contains(&"InvokeAgent".to_string()) {
denied.push("InvokeAgent".to_string());
}
if !denied.contains(&"AskUser".to_string()) {
denied.push("AskUser".to_string());
}
tools.get_definitions(&sub_config.allowed_tools, &denied)
};
let semantic_memory = if sub_config.skip_memory {
String::new()
} else {
memory::load(project_root)?
};
let env = crate::prompt::EnvironmentInfo {
project_root: effective_root_ref,
model: &sub_config.model,
platform: std::env::consts::OS,
};
let system_prompt = build_system_prompt(
&sub_config.system_prompt,
&semantic_memory,
&sub_config.agents_dir,
&env,
&[], &tools.skill_registry,
);
for iter in 1u8..=loop_guard::MAX_SUB_AGENT_ITERATIONS as u8 {
if let Some(ref tx) = status_tx {
let _ = tx.send(crate::bg_agent::AgentStatus::Running { iter });
}
if cancel.is_cancelled() {
let _ = workspace.release(&sub_session, &effective_root).await;
return Ok("[cancelled by parent]".to_string());
}
let history = db.load_context(&sub_session).await?;
let mut messages = vec![ChatMessage::text("system", &system_prompt)];
for msg in &history {
let tool_calls: Option<Vec<ToolCall>> = msg
.tool_calls
.as_deref()
.and_then(|tc| serde_json::from_str(tc).ok());
messages.push(ChatMessage {
role: msg.role.as_str().to_string(),
content: msg.content.clone(),
tool_calls,
tool_call_id: msg.tool_call_id.clone(),
images: None,
});
}
sink.emit(EngineEvent::SpinnerStart {
message: format!(" 🦥 {agent_name} thinking..."),
});
let response = provider
.chat(&messages, &tool_defs, &sub_config.model_settings)
.await?;
sink.emit(EngineEvent::SpinnerStop);
let tool_calls_json = if response.tool_calls.is_empty() {
None
} else {
Some(serde_json::to_string(&response.tool_calls)?)
};
db.insert_message(
&sub_session,
&Role::Assistant,
response.content.as_deref(),
tool_calls_json.as_deref(),
None,
Some(&response.usage),
)
.await?;
if response.tool_calls.is_empty() {
let result = response
.content
.unwrap_or_else(|| "(no output)".to_string());
sub_agent_cache.put(agent_name, prompt, &result);
if let Ok(Some(hint)) = workspace.release(&sub_session, &effective_root).await {
sink.emit(EngineEvent::Info {
message: format!(" \u{1f335} {agent_name}: {hint}"),
});
}
return Ok(result);
}
for tc in &response.tool_calls {
sink.emit(EngineEvent::ToolCallStart {
id: tc.id.clone(),
name: tc.function_name.clone(),
args: serde_json::from_str(&tc.arguments).unwrap_or_default(),
is_sub_agent: true,
});
let parsed_args: serde_json::Value =
serde_json::from_str(&tc.arguments).unwrap_or_default();
if tc.function_name == "InvokeAgent" {
let refusal = "InvokeAgent is not available inside a sub-agent. \
Sub-agents are autonomous workers and cannot spawn \
further sub-agents. Complete the task directly with \
the tools you have, or report back what additional \
dispatch the parent agent should perform.";
db.insert_message(
&sub_session,
&Role::Tool,
Some(refusal),
None,
Some(&tc.id),
None,
)
.await?;
continue;
}
if tc.function_name == "AskUser" {
let refusal = "AskUser is not available inside a sub-agent. \
Sub-agents have no channel to the user; the parent \
agent gathers any required input before delegating. \
Proceed with the information you already have or \
report what's missing.";
db.insert_message(
&sub_session,
&Role::Tool,
Some(refusal),
None,
Some(&tc.id),
None,
)
.await?;
continue;
}
let validation_error = tools::validate::validate_with_registry(
&tools,
&tc.function_name,
&parsed_args,
effective_root_ref,
)
.await;
let output = if let Some(error) = validation_error {
format!("Validation error: {error}")
} else {
let approval = trust::check_tool(
&tc.function_name,
&parsed_args,
mode,
Some(effective_root_ref),
);
match approval {
ToolApproval::AutoApprove => {
let (_id, result, _success, _full) = execute_one_tool(
tc,
project_root,
&sub_config,
db,
&sub_session,
&tools,
mode,
sink,
cancel.clone(),
sub_agent_cache,
bg_agents,
Some(my_invocation_id),
)
.await;
result
}
ToolApproval::Blocked => {
let detail = tools::describe_action(&tc.function_name, &parsed_args);
let diff_preview = preview::compute(
&tc.function_name,
&parsed_args,
effective_root_ref,
)
.await;
sink.emit(EngineEvent::ActionBlocked {
tool_name: tc.function_name.clone(),
detail,
preview: diff_preview,
});
"[safe mode] Action blocked.".to_string()
}
ToolApproval::NeedsConfirmation => {
let detail = tools::describe_action(&tc.function_name, &parsed_args);
let diff_preview = preview::compute(
&tc.function_name,
&parsed_args,
effective_root_ref,
)
.await;
let effect = crate::trust::resolve_tool_effect_with_registry(
&tc.function_name,
&parsed_args,
&tools,
);
match request_approval(
sink,
cmd_rx,
&cancel,
&tc.function_name,
&detail,
diff_preview,
effect,
)
.await
{
Some(ApprovalDecision::Approve) => {
let (_id, result, _success, _full) = execute_one_tool(
tc,
project_root,
&sub_config,
db,
&sub_session,
&tools,
mode,
sink,
cancel.clone(),
sub_agent_cache,
bg_agents,
Some(my_invocation_id),
)
.await;
result
}
Some(ApprovalDecision::Reject) => "[rejected by user]".to_string(),
Some(ApprovalDecision::RejectWithFeedback { feedback }) => {
format!("[rejected: {feedback}]")
}
Some(ApprovalDecision::RejectAuto { reason }) => {
format!("[auto-rejected: {reason}]")
}
None => {
if cancel.is_cancelled() {
"[cancelled]".to_string()
} else {
format!(
"[auto-rejected: '{tool}' requires user \
confirmation but this sub-agent has no \
channel to the user. The parent agent \
must pre-approve destructive operations \
or run the tool itself.]",
tool = tc.function_name,
)
}
}
}
}
}
};
db.insert_message(
&sub_session,
&Role::Tool,
Some(&output),
None,
Some(&tc.id),
None,
)
.await?;
}
}
sink.emit(EngineEvent::Warn {
message: format!(
"Sub-agent '{agent_name}' hit its iteration limit ({}). Returning partial result.",
loop_guard::MAX_SUB_AGENT_ITERATIONS
),
});
if let Ok(Some(hint)) = workspace.release(&sub_session, &effective_root).await {
sink.emit(EngineEvent::Info {
message: format!(" \u{1f335} {agent_name}: {hint}"),
});
}
let result = iteration_cap_marker(agent_name, loop_guard::MAX_SUB_AGENT_ITERATIONS);
sub_agent_cache.put(agent_name, prompt, &result);
Ok(result)
}
}
#[cfg(target_os = "macos")]
fn pick_write_provider(
project_root: &std::path::Path,
agent_name: &str,
) -> Box<dyn WorkspaceProvider + Send + Sync> {
match ClonefileProvider::new(project_root) {
Ok(p) => Box::new(p),
Err(e) => {
tracing::warn!("ClonefileProvider unavailable, falling back to git worktree: {e}");
Box::new(GitWorktreeProvider::new(project_root, agent_name))
}
}
}
#[cfg(not(target_os = "macos"))]
fn pick_write_provider(
project_root: &std::path::Path,
agent_name: &str,
) -> Box<dyn WorkspaceProvider + Send + Sync> {
Box::new(GitWorktreeProvider::new(project_root, agent_name))
}
fn iteration_cap_marker(agent_name: &str, cap: usize) -> String {
format!(
"[ERROR: sub-agent '{agent_name}' exceeded its iteration cap of {cap} without producing \
a final answer. Decompose the task into smaller pieces, or attempt the work \
directly without delegating.]"
)
}
fn workspace_provision_failure_marker(agent_name: &str, reason: &str) -> String {
format!(
"[ERROR: sub-agent '{agent_name}' could not provision an isolated workspace and was not \
dispatched, to avoid corrupting the parent project tree (reason: {reason}). Either \
resolve the workspace setup issue, retry without write tools, or attempt the work \
directly without delegating.]"
)
}
#[cfg(test)]
mod b18_tests {
use super::iteration_cap_marker;
use crate::loop_guard::MAX_SUB_AGENT_ITERATIONS;
#[test]
fn marker_has_error_prefix() {
let m = iteration_cap_marker("scout", 20);
assert!(
m.starts_with("[ERROR:"),
"marker must start with `[ERROR:` so the model treats it \
as structural failure metadata, not a sub-agent answer; \
got: {m}"
);
}
#[test]
fn marker_includes_agent_name() {
let m = iteration_cap_marker("scout", 20);
assert!(
m.contains("'scout'"),
"marker must name the capped sub-agent; got: {m}"
);
}
#[test]
fn marker_includes_cap_as_number() {
let m = iteration_cap_marker("scout", 20);
assert!(
m.contains("20"),
"marker must include the cap as a number so the model can \
reason about decomposability; got: {m}"
);
}
#[test]
fn marker_includes_restrategize_hint() {
let m = iteration_cap_marker("scout", 20);
assert!(
m.to_lowercase().contains("decompose")
|| m.to_lowercase().contains("attempt the work directly"),
"marker must give the model a concrete next action; got: {m}"
);
}
#[test]
fn marker_uses_real_cap_constant() {
let m = iteration_cap_marker("agent", MAX_SUB_AGENT_ITERATIONS);
assert!(m.contains(&MAX_SUB_AGENT_ITERATIONS.to_string()));
}
#[test]
fn marker_is_single_line_for_tool_result_clarity() {
let m = iteration_cap_marker("scout", 20);
assert!(
!m.contains('\n'),
"marker must be single-line for clean tool-result formatting; got:\n{m}"
);
}
}
#[cfg(test)]
mod b21_tests {
use super::workspace_provision_failure_marker;
#[test]
fn marker_has_error_prefix() {
let m = workspace_provision_failure_marker("writer", "clonefile: ENOTSUP");
assert!(
m.starts_with("[ERROR:"),
"marker must start with `[ERROR:` so the model treats it as \
structural failure, not a sub-agent answer; got: {m}"
);
}
#[test]
fn marker_includes_agent_name() {
let m = workspace_provision_failure_marker("writer", "clonefile: ENOTSUP");
assert!(
m.contains("'writer'"),
"marker must name the sub-agent that failed so multi-agent \
flows can disambiguate; got: {m}"
);
}
#[test]
fn marker_includes_failure_reason() {
let m = workspace_provision_failure_marker("writer", "clonefile: ENOTSUP");
assert!(
m.contains("clonefile: ENOTSUP"),
"marker must include the failure reason verbatim so it's \
diagnosable; got: {m}"
);
}
#[test]
fn marker_does_not_silently_dispatch() {
let m = workspace_provision_failure_marker("writer", "x");
let lower = m.to_lowercase();
assert!(
lower.contains("not dispatched") || lower.contains("was not dispatched"),
"marker must state the sub-agent was not dispatched, so the \
parent doesn't assume the work happened; got: {m}"
);
}
#[test]
fn marker_includes_restrategize_hint() {
let m = workspace_provision_failure_marker("writer", "x");
let lower = m.to_lowercase();
assert!(
lower.contains("directly") || lower.contains("resolve") || lower.contains("retry"),
"marker must hint at re-strategizing (e.g. resolve setup, \
retry without write tools, do directly); got: {m}"
);
}
#[test]
fn marker_is_single_line() {
let m = workspace_provision_failure_marker("writer", "clonefile: ENOTSUP");
assert!(
!m.contains('\n'),
"marker must be single-line for clean tool-result formatting; got:\n{m}"
);
}
}
#[cfg(test)]
mod invocation_cleanup_tests {
use super::InvocationCleanup;
use crate::bg_agent::BgAgentRegistry;
use std::sync::Arc;
#[tokio::test]
async fn drop_cancels_entries_tagged_with_matching_spawner() {
let reg = Arc::new(BgAgentRegistry::new());
let (_id_a, _tx_a, _status_tx_a, cancel_a) =
reg.register_test_with_status("scout", "a", Some(7));
let (_id_b, _tx_b, _status_tx_b, cancel_b) =
reg.register_test_with_status("scout", "b", Some(7));
assert!(
!cancel_a.is_cancelled() && !cancel_b.is_cancelled(),
"setup"
);
drop(InvocationCleanup {
bg: ®,
invocation_id: 7,
});
assert!(
cancel_a.is_cancelled(),
"entry A's cancel token must fire when guard drops"
);
assert!(
cancel_b.is_cancelled(),
"entry B's cancel token must fire when guard drops"
);
}
#[tokio::test]
async fn drop_leaves_entries_with_different_spawner_alone() {
let reg = Arc::new(BgAgentRegistry::new());
let (_id_mine, _tx_m, _status_tx_m, cancel_mine) =
reg.register_test_with_status("a", "mine", Some(7));
let (_id_sib, _tx_s, _status_tx_s, cancel_sibling) =
reg.register_test_with_status("a", "sibling", Some(8));
let (_id_top, _tx_t, _status_tx_t, cancel_toplevel) =
reg.register_test_with_status("a", "toplevel", None);
drop(InvocationCleanup {
bg: ®,
invocation_id: 7,
});
assert!(
cancel_mine.is_cancelled(),
"my own (spawner=7) entry must be cancelled"
);
assert!(
!cancel_sibling.is_cancelled(),
"sibling sub-agent's (spawner=8) entry must NOT be cancelled"
);
assert!(
!cancel_toplevel.is_cancelled(),
"top-level (spawner=None) entry must NOT be cancelled"
);
}
#[tokio::test]
async fn drop_with_no_matching_entries_is_noop() {
let reg = Arc::new(BgAgentRegistry::new());
let (_id, _tx, _status_tx, cancel) = reg.register_test_with_status("a", "x", Some(99));
drop(InvocationCleanup {
bg: ®,
invocation_id: 7,
});
assert!(
!cancel.is_cancelled(),
"unrelated entry's cancel token must not fire"
);
}
}