use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use serde_json::Value;
use tokio::sync::{Semaphore, mpsc};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use crate::domain::{
Msg, State, ToolDefinition, ToolMetadata, ToolOutcome, ToolRunMetadata, TurnState, update,
};
use crate::effect::{EffectRunner, MSG_CHANNEL_CAPACITY};
use crate::models::MessageRole;
use crate::providers::ProviderFactory;
use crate::providers::ctx::{ExecContext, ProgressEvent, SubagentPhase};
use super::ToolExecutor;
use super::ToolRegistry;
pub const MAX_DEPTH: usize = 3;
pub const MAX_INFLIGHT: usize = 10;
pub const DEFAULT_TIMEOUT_SECS: u64 = 20 * 60;
tokio::task_local! {
static SUBAGENT_DEPTH: usize;
}
pub struct SubagentSpawner {
providers: Arc<ProviderFactory>,
inflight: Arc<Semaphore>,
}
impl SubagentSpawner {
pub fn new(providers: Arc<ProviderFactory>) -> Self {
Self {
providers,
inflight: Arc::new(Semaphore::new(MAX_INFLIGHT)),
}
}
}
pub struct SubagentTool {
spawner: Arc<SubagentSpawner>,
}
impl SubagentTool {
pub fn new(spawner: Arc<SubagentSpawner>) -> Self {
Self { spawner }
}
}
#[async_trait]
impl ToolExecutor for SubagentTool {
fn name(&self) -> &'static str {
"agent"
}
fn schema(&self) -> ToolDefinition {
ToolDefinition {
name: "agent".to_string(),
description: format!(
"Spawn a child agent with its own context and tool access to work on an \
independent sub-task. Useful for parallel fan-out (emit multiple `agent` \
calls in the same turn to run them concurrently) or for scoping a noisy \
sub-task (the child's tool output doesn't clutter the parent's turn). \
Depth-capped at {max_depth}; breadth-capped at {max_breadth} concurrent. \
Subagents don't get GUI (screenshot/click/…) access because coordinate \
metadata can't be shared cleanly.",
max_depth = MAX_DEPTH,
max_breadth = MAX_INFLIGHT,
),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "The task for the subagent. Self-contained; the subagent has no access to the parent's conversation."
},
"description": {
"type": "string",
"description": "Short label shown in the parent's status line (e.g. 'list domain files')."
}
},
"required": ["prompt"]
}),
}
}
async fn execute(&self, args: Value, ctx: ExecContext) -> ToolOutcome {
let started = Instant::now();
let current_depth = SUBAGENT_DEPTH.try_with(|d| *d).unwrap_or(0);
if current_depth >= MAX_DEPTH {
return ToolOutcome::error(format!("subagent depth limit {} reached", MAX_DEPTH), 0.0);
}
let prompt = match args.get("prompt").and_then(|v| v.as_str()) {
Some(s) if !s.trim().is_empty() => s.to_string(),
_ => {
return ToolOutcome::error("agent requires non-empty `prompt`", 0.0);
},
};
let description = args
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("subagent")
.to_string();
let permit = tokio::select! {
biased;
_ = ctx.token.cancelled() => return ToolOutcome::cancelled(),
p = self.spawner.inflight.clone().acquire_owned() => match p {
Ok(permit) => permit,
Err(_) => return ToolOutcome::error(
"subagent semaphore closed",
started.elapsed().as_secs_f64(),
),
},
};
let config = (*ctx.config).clone();
let cwd = ctx.workdir.clone();
let model_id = if ctx.model_id.is_empty() {
default_model_id(&config)
} else {
ctx.model_id.clone()
};
let child_model_id = model_id.clone();
let child_state = State::new(config.clone(), cwd.clone(), model_id);
let child_tools = build_child_registry(self.spawner.providers.clone());
let child_token = ctx.token.child_token();
let (child_tx, child_rx) = mpsc::channel(MSG_CHANNEL_CAPACITY);
let child_runner =
EffectRunner::new_child(child_tx, cwd, self.spawner.providers.clone(), child_tools);
let drive = drive_child(
child_state,
child_runner,
child_rx,
ctx.progress.clone(),
prompt,
description.clone(),
child_token,
);
let depth_scoped = SUBAGENT_DEPTH.scope(current_depth + 1, drive);
let result = timeout(Duration::from_secs(DEFAULT_TIMEOUT_SECS), depth_scoped).await;
drop(permit);
let elapsed = started.elapsed().as_secs_f64();
match result {
Ok(Ok(summary)) => ToolOutcome::success(summary, "subagent completed", elapsed)
.with_metadata(subagent_metadata(child_model_id)),
Ok(Err(DriveError::Cancelled)) => ToolOutcome::cancelled(),
Ok(Err(DriveError::Errored(e))) => {
ToolOutcome::error(format!("subagent ({}): {}", description, e), elapsed)
.with_metadata(subagent_metadata(child_model_id))
},
Err(_) => ToolOutcome::error(
format!(
"subagent ({}) exceeded {}s timeout",
description, DEFAULT_TIMEOUT_SECS
),
elapsed,
)
.with_metadata(subagent_metadata(child_model_id)),
}
}
}
fn subagent_metadata(model_id: String) -> ToolRunMetadata {
ToolRunMetadata {
detail: ToolMetadata::Subagent { model_id },
..ToolRunMetadata::default()
}
}
enum DriveError {
Cancelled,
Errored(String),
}
async fn drive_child(
mut state: State,
mut runner: EffectRunner,
mut msg_rx: mpsc::Receiver<Msg>,
parent_progress: mpsc::Sender<ProgressEvent>,
prompt: String,
description: String,
token: CancellationToken,
) -> Result<String, DriveError> {
let _ = parent_progress
.send(ProgressEvent::SubagentText(format!(
"▶ {} — {}",
description,
prompt.chars().take(80).collect::<String>()
)))
.await;
runner.dispatch(crate::domain::Cmd::RefreshInstructions);
let seed = Msg::SubmitPrompt {
text: prompt,
attachment_ids: vec![],
};
let (new_state, cmds) = update(state, seed);
state = new_state;
for cmd in cmds {
runner.dispatch(cmd);
}
loop {
if token.is_cancelled() {
runner.shutdown().await;
return Err(DriveError::Cancelled);
}
if matches!(state.turn, TurnState::Idle) && state.ui.queued_messages.is_empty() {
break;
}
let msg = tokio::select! {
biased;
_ = token.cancelled() => {
runner.shutdown().await;
return Err(DriveError::Cancelled);
},
recv = msg_rx.recv() => match recv {
Some(m) => m,
None => {
break;
},
},
};
forward_child_event(&msg, &parent_progress, &state).await;
let (new_state, cmds) = update(state, msg);
state = new_state;
for cmd in cmds {
runner.dispatch(cmd);
}
if state.should_exit {
break;
}
}
runner.shutdown().await;
let summary = state
.session
.messages()
.iter()
.rev()
.find(|m| m.role == MessageRole::Assistant)
.map(|m| m.content.clone())
.unwrap_or_default();
if summary.trim().is_empty() {
return Err(DriveError::Errored(
"subagent produced no assistant output".to_string(),
));
}
Ok(summary)
}
async fn forward_child_event(msg: &Msg, progress: &mpsc::Sender<ProgressEvent>, state: &State) {
match msg {
Msg::ToolStarted {
turn: _, call_id, ..
} => {
let tool_name = lookup_tool_name(state, *call_id).unwrap_or_else(|| "tool".to_string());
let _ = progress
.send(ProgressEvent::SubagentToolCall {
child_call_id: *call_id,
tool_name,
phase: SubagentPhase::Started,
})
.await;
},
Msg::ToolFinished {
turn: _,
call_id,
outcome,
} => {
let tool_name = lookup_tool_name(state, *call_id).unwrap_or_else(|| "tool".to_string());
let phase = if outcome.is_success() {
SubagentPhase::Finished
} else {
SubagentPhase::Errored
};
let _ = progress
.send(ProgressEvent::SubagentToolCall {
child_call_id: *call_id,
tool_name,
phase,
})
.await;
},
Msg::StreamText { chunk, .. } => {
if !chunk.trim().is_empty() {
let snippet: String = chunk.chars().take(120).collect();
let _ = progress.send(ProgressEvent::SubagentText(snippet)).await;
}
},
_ => {},
}
}
fn lookup_tool_name(state: &State, call_id: crate::domain::ToolCallId) -> Option<String> {
match &state.turn {
TurnState::ExecutingTools { calls, .. } => calls
.iter()
.find(|c| c.call_id == call_id)
.map(|c| c.source.function.name.clone()),
_ => None,
}
}
fn build_child_registry(providers: Arc<ProviderFactory>) -> Arc<ToolRegistry> {
use super::{
computer_use, exec, filesystem, mcp,
web::{WebFetchTool, WebSearchTool},
};
let mut r = ToolRegistry::new();
r.register(Arc::new(filesystem::ReadFileTool));
r.register(Arc::new(filesystem::WriteFileTool));
r.register(Arc::new(filesystem::EditFileTool));
r.register(Arc::new(filesystem::DeleteFileTool));
r.register(Arc::new(filesystem::CreateDirectoryTool));
r.register(Arc::new(exec::ExecuteCommandTool));
r.register(Arc::new(mcp::McpToolProxy));
if let Some(key) = crate::utils::resolve_api_key("OLLAMA_API_KEY", None) {
r.register(Arc::new(WebSearchTool::new(key.clone())));
r.register(Arc::new(WebFetchTool::new(key)));
}
let _ = computer_use::probe;
let _ = providers;
Arc::new(r)
}
fn default_model_id(config: &crate::app::Config) -> String {
if !config.default_model.provider.is_empty() && !config.default_model.name.is_empty() {
format!(
"{}/{}",
config.default_model.provider, config.default_model.name
)
} else {
config.default_model.name.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::{ToolCallId, TurnId};
use crate::providers::ctx::test_exec_context;
use std::path::PathBuf;
#[tokio::test]
async fn depth_cap_rejects_when_at_max() {
let spawner = Arc::new(SubagentSpawner::new(Arc::new(ProviderFactory::new(
crate::app::Config::default(),
))));
let tool = SubagentTool::new(spawner);
let (ctx, _rx) = test_exec_context(TurnId(1), ToolCallId(1), PathBuf::from("/tmp"));
let outcome = SUBAGENT_DEPTH
.scope(
MAX_DEPTH,
tool.execute(serde_json::json!({"prompt": "hi"}), ctx),
)
.await;
let error = outcome.error_message().expect("expected error");
assert!(
error.contains("depth limit"),
"expected depth-limit error, got: {}",
error
);
}
#[tokio::test]
async fn empty_prompt_is_rejected() {
let spawner = Arc::new(SubagentSpawner::new(Arc::new(ProviderFactory::new(
crate::app::Config::default(),
))));
let tool = SubagentTool::new(spawner);
let (ctx, _rx) = test_exec_context(TurnId(1), ToolCallId(1), PathBuf::from("/tmp"));
let outcome = tool.execute(serde_json::json!({"prompt": " "}), ctx).await;
assert_eq!(outcome.status, crate::domain::ToolStatus::Error);
}
#[test]
fn default_model_id_reads_config_provider_and_name() {
let mut cfg = crate::app::Config::default();
cfg.default_model.provider = "ollama".to_string();
cfg.default_model.name = "qwen3-coder:30b".to_string();
assert_eq!(default_model_id(&cfg), "ollama/qwen3-coder:30b");
}
#[test]
fn default_model_id_returns_bare_name_when_provider_empty() {
let mut cfg = crate::app::Config::default();
cfg.default_model.name = "just-a-name".to_string();
assert_eq!(default_model_id(&cfg), "just-a-name");
}
#[test]
fn build_child_registry_excludes_gui_and_self() {
let providers = Arc::new(ProviderFactory::new(crate::app::Config::default()));
let r = build_child_registry(providers);
assert!(r.get("screenshot").is_none());
assert!(r.get("click").is_none());
assert!(r.get("type_text").is_none());
assert!(r.get("press_key").is_none());
assert!(r.get("scroll").is_none());
assert!(r.get("mouse_move").is_none());
assert!(r.get("list_windows").is_none());
assert!(r.get("agent").is_none());
assert!(r.get("read_file").is_some());
assert!(r.get("execute_command").is_some());
}
}