use std::sync::Arc;
use std::time::Duration;
use anyhow::{Result, anyhow, bail};
use serde_json::{Value, json};
use tracing::{info, warn};
use super::prompt_builder::build_base_system_prompt;
use super::registry::{AgentMessage, AgentReply};
use super::runtime::{persist_agent_to_config, update_agent_in_config, AgentRuntime, RunContext, DEFAULT_TIMEOUT_SECONDS};
impl AgentRuntime {
fn build_subagent_system_prompt(&self, role_desc: &str) -> String {
let base_parts = build_base_system_prompt(&self.config.raw);
let mut prompt = base_parts.join("\n\n");
prompt.push_str("\n\n## Your Role\n");
prompt.push_str(role_desc);
prompt.push_str(
"\n\n## Sub-Agent Guidelines\n\
- You are a sub-agent working on a delegated task. Focus on the task and return results.\n\
- Use the tools available to you. If a tool is not in your toolset, find an alternative.\n\
- Be concise in your reply — the main agent will relay your output to the user.\n\
- If the task is unclear or impossible, explain why instead of looping.",
);
prompt
}
async fn tool_agent_spawn(&self, args: Value) -> Result<Value> {
let spawner = self
.spawner
.as_ref()
.ok_or_else(|| anyhow!("agent_spawn: spawner not available"))?;
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent_spawn: `id` required"))?
.to_owned();
let primary_model = self.resolve_model_name();
let flash_model = self.resolve_flash_model_name();
let persistent = args["persistent"].as_bool().unwrap_or(true);
let default_model = if persistent { &primary_model } else { &flash_model };
let model = args["model"].as_str()
.filter(|s| !s.is_empty() && *s != "default")
.unwrap_or(default_model)
.to_owned();
let system = args["system"]
.as_str()
.ok_or_else(|| anyhow!("agent_spawn: `system` required"))?
.to_owned();
let toolset_str = args["toolset"]
.as_str()
.unwrap_or("standard")
.to_owned();
let channels: Option<Vec<String>> = args["channels"]
.as_array()
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(|s| s.to_owned())).collect());
use crate::config::schema::{AgentEntry, ModelConfig};
let entry = AgentEntry {
id: id.clone(),
default: Some(false),
workspace: Some(crate::config::loader::path_to_forward_slash(
&crate::config::loader::base_dir().join(format!("workspace-{id}")),
)),
model: Some(ModelConfig {
primary: Some(model),
fallbacks: None,
image: None,
image_fallbacks: None,
video: None,
thinking: None,
tools_enabled: None,
toolset: Some(toolset_str.clone()),
tools: None,
context_tokens: None,
max_tokens: None,
flash: None,
}),
flash_model: None,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: channels.clone(),
name: None,
agent_dir: None,
system: None,
commands: None,
allowed_commands: None,
opencode: None,
claudecode: None,
};
let kind = if persistent {
crate::agent::registry::AgentKind::Named
} else {
crate::agent::registry::AgentKind::Sub
};
spawner.spawn_agent_with_kind(entry.clone(), kind)?;
if persistent {
let ws_path = crate::config::loader::base_dir().join(format!("workspace-{id}"));
if let Err(e) = tokio::fs::create_dir_all(&ws_path).await {
warn!("agent_spawn: failed to create workspace for {id}: {e:#}");
}
let soul_path = ws_path.join("SOUL.md");
let full_prompt = self.build_subagent_system_prompt(&system);
if let Err(e) = tokio::fs::write(&soul_path, format!("# Agent: {id}\n\n{full_prompt}\n")).await {
warn!("agent_spawn: failed to write SOUL.md for {id}: {e:#}");
}
if let Err(e) = persist_agent_to_config(&entry).await {
warn!("agent_spawn: failed to persist to config: {e:#}");
}
}
let needs_restart = persistent && channels.is_some();
Ok(json!({
"spawned": id,
"model": args["model"],
"persistent": persistent,
"channels": channels,
"needs_restart": needs_restart,
"status": if needs_restart { "saved — restart gateway to bind channels" } else { "ready" }
}))
}
async fn tool_agent_task(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let spawner = self
.spawner
.as_ref()
.ok_or_else(|| anyhow!("agent_task: spawner not available"))?;
let default_task_model = self.resolve_flash_model_name();
let model = args["model"]
.as_str()
.filter(|s| !s.is_empty())
.unwrap_or(&default_task_model)
.to_owned();
let system = args["system"]
.as_str()
.ok_or_else(|| anyhow!("agent_task: `system` required"))?
.to_owned();
let message = args["message"]
.as_str()
.ok_or_else(|| anyhow!("agent_task: `message` required"))?
.to_owned();
let toolset_str = args["toolset"]
.as_str()
.unwrap_or("standard")
.to_owned();
let short_id = &uuid::Uuid::new_v4().to_string()[..8];
let id = format!("task-{short_id}");
let base = crate::config::loader::base_dir();
let parent_ws = self.handle.config.workspace
.as_deref()
.map(std::path::PathBuf::from)
.unwrap_or_else(|| base.join("workspace"));
let ws_path = parent_ws.join(format!("task-{short_id}"));
use crate::config::schema::{AgentEntry, ModelConfig};
let entry = AgentEntry {
id: id.clone(),
default: Some(false),
workspace: Some(crate::config::loader::path_to_forward_slash(&ws_path)),
model: Some(ModelConfig {
primary: Some(model),
fallbacks: None,
image: None,
image_fallbacks: None,
video: None,
thinking: None,
tools_enabled: None,
toolset: Some(toolset_str.clone()),
tools: None,
context_tokens: None,
max_tokens: None,
flash: None,
}),
flash_model: None,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: None,
name: None,
agent_dir: None,
system: None,
commands: None,
allowed_commands: None,
opencode: None,
claudecode: None,
};
spawner.spawn_agent_with_kind(entry, crate::agent::registry::AgentKind::Task)?;
let registry = self
.agents
.as_ref()
.ok_or_else(|| anyhow!("agent_task: agent registry not available"))?;
let target = registry.get(&id)?;
let task_session = format!("{}:task:{short_id}", ctx.session_key);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let msg = AgentMessage {
session_key: task_session,
text: message.clone(),
channel: format!("task:{}", ctx.agent_id),
peer_id: ctx.agent_id.clone(),
chat_id: String::new(),
reply_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
};
target.tx.send(msg).await.map_err(|_| anyhow!("agent_task: agent inbox closed"))?;
let pending = Arc::clone(&self.pending_task_results);
let self_handle = Arc::clone(&self.handle);
let notification_tx = self.notification_tx.clone();
let session_key = ctx.session_key.clone();
let channel = ctx.channel.clone();
let peer_id = ctx.peer_id.clone();
let chat_id = ctx.chat_id.clone();
let task_id = id.clone();
let agents = self.agents.as_ref().map(Arc::clone);
let timeout_secs = self
.config
.agents
.defaults
.timeout_seconds
.unwrap_or(DEFAULT_TIMEOUT_SECONDS as u32) as u64;
let task_timeout = timeout_secs;
tokio::spawn(async move {
let result_text = match tokio::time::timeout(
Duration::from_secs(task_timeout),
reply_rx,
).await {
Ok(Ok(reply)) => reply.text,
Ok(Err(_)) => "[task agent channel closed unexpectedly]".to_owned(),
Err(_) => {
if let Some(ref reg) = agents {
reg.remove_handle(&task_id);
}
info!(task = %task_id, timeout = task_timeout, "task agent timed out and killed");
format!("[task {task_id} timed out after {task_timeout}s and was terminated]")
}
};
if let Ok(mut guard) = pending.lock() {
guard.push((task_id.clone(), session_key.clone(), result_text));
}
let (wake_tx, wake_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let wake_msg = AgentMessage {
session_key: session_key.clone(),
text: format!("[async task {task_id} completed]"),
channel: channel.clone(),
peer_id: peer_id.clone(),
chat_id: chat_id.clone(),
reply_tx: wake_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
};
if let Err(e) = self_handle.tx.send(wake_msg).await {
warn!(task_id, "failed to wake parent agent: {e}");
} else {
if let Ok(reply) = wake_rx.await {
if !reply.text.is_empty() {
if let Some(ref ntx) = notification_tx {
let target = if !chat_id.is_empty() { chat_id } else { peer_id };
if !target.is_empty() && !channel.is_empty() && channel != "system" && channel != "cron" {
let _ = ntx.send(crate::channel::OutboundMessage {
target_id: target,
is_group: false,
text: reply.text,
reply_to: None,
images: reply.images.clone(),
files: reply.files.clone(),
channel: Some(channel),
});
}
}
}
info!(task = %task_id, "async task: agent replied to user");
}
}
if let Some(reg) = agents {
reg.remove_handle(&task_id);
}
let _ = tokio::fs::remove_dir_all(&ws_path).await;
info!(task = %task_id, "async task agent completed and cleaned up");
});
Ok(json!({
"task_id": id,
"status": "dispatched",
"toolset": toolset_str,
"message": message,
"note": "Task is running in the background. Results will be available on your next turn. You can continue with other work."
}))
}
async fn tool_agent_send(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let target_id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent_send: `id` required"))?
.to_owned();
let message = args["message"]
.as_str()
.ok_or_else(|| anyhow!("agent_send: `message` required"))?
.to_owned();
let send_depth = ctx.session_key.matches(":send:").count();
if send_depth >= 5 {
return Ok(json!({
"error": "agent_send: max communication depth reached (5). Possible loop detected.",
"depth": send_depth,
"from": ctx.agent_id,
"to": target_id,
}));
}
let registry = self
.agents
.as_ref()
.ok_or_else(|| anyhow!("agent_send: agent registry not available"))?;
let target = registry.get(&target_id)?;
let short_id = &uuid::Uuid::new_v4().to_string()[..8];
let send_session = format!("{}:send:{short_id}", ctx.session_key);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let msg = AgentMessage {
session_key: send_session,
text: message.clone(),
channel: format!("send:{}", ctx.agent_id),
peer_id: ctx.agent_id.clone(),
chat_id: String::new(),
reply_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
};
target.tx.send(msg).await.map_err(|_| anyhow!("agent_send: agent '{target_id}' inbox closed"))?;
let pending = Arc::clone(&self.pending_task_results);
let self_handle = Arc::clone(&self.handle);
let notification_tx = self.notification_tx.clone();
let session_key = ctx.session_key.clone();
let channel = ctx.channel.clone();
let peer_id = ctx.peer_id.clone();
let chat_id = ctx.chat_id.clone();
let timeout_secs = self
.config
.agents
.defaults
.timeout_seconds
.unwrap_or(DEFAULT_TIMEOUT_SECONDS as u32) as u64;
let send_timeout = timeout_secs;
let send_id = format!("send-{target_id}-{short_id}");
let send_id_bg = send_id.clone();
let target_id_bg = target_id.clone();
tokio::spawn(async move {
let result_text = match tokio::time::timeout(
Duration::from_secs(send_timeout),
reply_rx,
).await {
Ok(Ok(reply)) => reply.text,
Ok(Err(_)) => format!("[agent {target_id_bg} channel closed]"),
Err(_) => format!("[agent {target_id_bg} timed out after {send_timeout}s]"),
};
if let Ok(mut guard) = pending.lock() {
guard.push((send_id_bg.clone(), session_key.clone(), result_text));
}
let (wake_tx, wake_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let wake_msg = AgentMessage {
session_key: session_key.clone(),
text: format!("[async send {send_id_bg} completed]"),
channel: channel.clone(),
peer_id: peer_id.clone(),
chat_id: chat_id.clone(),
reply_tx: wake_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
};
if let Err(e) = self_handle.tx.send(wake_msg).await {
warn!(send_id = %send_id_bg, "failed to wake parent agent: {e}");
} else if let Ok(reply) = wake_rx.await {
if !reply.text.is_empty() {
if let Some(ref ntx) = notification_tx {
let target = if !chat_id.is_empty() { chat_id } else { peer_id };
if !target.is_empty() && !channel.is_empty() && channel != "system" && channel != "cron" {
let _ = ntx.send(crate::channel::OutboundMessage {
target_id: target,
is_group: false,
text: reply.text,
reply_to: None,
images: reply.images.clone(),
files: reply.files.clone(),
channel: Some(channel),
});
}
}
info!(send_id = %send_id_bg, "async send: agent replied to user");
}
}
});
Ok(json!({
"send_id": send_id,
"target": target_id,
"status": "sent",
"note": "Message sent to agent. Reply will be available on your next turn."
}))
}
async fn tool_agent_update(&self, args: Value) -> Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent_update: `id` required"))?;
let model = args["model"].as_str();
let name = args["name"].as_str();
update_agent_in_config(id, model, name).await
}
async fn tool_agent_list(&self) -> Result<Value> {
let agents = match &self.agents {
Some(reg) => reg
.all()
.iter()
.map(|h| {
json!({
"id": h.id,
"kind": h.kind,
"model": h.config.model.as_ref()
.and_then(|m| m.primary.as_deref())
.unwrap_or("unknown"),
})
})
.collect::<Vec<_>>(),
None => vec![],
};
Ok(json!({"agents": agents}))
}
pub(crate) async fn tool_agent_consolidated(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let action = args["action"].as_str().unwrap_or("list");
let kind = self.handle.kind;
match (kind, action) {
(crate::agent::registry::AgentKind::Task, a) if a != "list" => {
return Ok(json!({"error": "task agents cannot manage other agents"}));
}
(crate::agent::registry::AgentKind::Sub, "spawn" | "send" | "kill") => {
return Ok(json!({"error": format!("sub agents can only use task and list, not {action}")}));
}
_ => {} }
match action {
"spawn" => self.tool_agent_spawn(args).await,
"task" => self.tool_agent_task(ctx, args).await,
"send" => self.tool_agent_send(ctx, args).await,
"list" => self.tool_agent_list().await,
"update" => self.tool_agent_update(args).await,
"kill" => {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent kill: `id` required"))?;
if let Some(reg) = &self.agents {
if let Ok(handle) = reg.get(id) {
if handle.kind == crate::agent::registry::AgentKind::Main {
return Ok(json!({"error": "cannot kill the main agent"}));
}
}
}
Ok(json!({
"action": "kill",
"id": id,
"note": "agent termination not yet implemented; agent will stop on next idle timeout"
}))
}
_ => bail!("agent: unknown action '{action}' (spawn, task, send, list, update, kill)"),
}
}
}