use std::{sync::Arc, time::Duration};
use anyhow::{Result, anyhow, bail};
use serde_json::{Value, json};
use tracing::{info, warn};
use super::{
registry::{AgentMessage, AgentReply},
runtime::{
AgentRuntime, DEFAULT_TIMEOUT_SECONDS, RunContext, persist_agent_to_config,
update_agent_in_config,
},
};
impl AgentRuntime {
fn build_subagent_system_prompt(&self, role_desc: &str) -> String {
let mut prompt = String::from("## Your Role\n");
prompt.push_str(role_desc);
prompt.push_str(
"\n\n## Sub-Agent Guidelines\n\
- You are a sub-agent working on a delegated task. Focus on the task and return results.\n\
- Use the tools available to you. If a tool is not in your toolset, find an alternative.\n\
- Be concise in your reply — the main agent will relay your output to the user.\n\
- If the task is unclear or impossible, explain why instead of looping.",
);
prompt
}
async fn tool_agent_spawn(&self, args: Value) -> Result<Value> {
let spawner = self
.spawner
.as_ref()
.ok_or_else(|| {
anyhow!("agent_spawn: spawner not initialized in this gateway instance; agent spawning is disabled here — do not retry, inform the user")
})?;
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent_spawn: `id` required"))?
.to_owned();
let primary_model = self.resolve_model_name();
let flash_model = self.resolve_flash_model_name();
let persistent = args["persistent"].as_bool().unwrap_or(true);
let default_model = if persistent {
&primary_model
} else {
&flash_model
};
let model = args["model"]
.as_str()
.filter(|s| !s.is_empty() && *s != "default")
.unwrap_or(default_model)
.to_owned();
let system = args["system"]
.as_str()
.ok_or_else(|| anyhow!("agent_spawn: `system` required"))?
.to_owned();
let toolset_str = args["toolset"].as_str().unwrap_or("standard").to_owned();
let channels: Option<Vec<String>> = args["channels"].as_array().map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_owned()))
.collect()
});
use rsclaw_config::schema::{AgentEntry, ModelConfig};
let entry = AgentEntry {
id: id.clone(),
description: None,
default: Some(false),
workspace: Some(rsclaw_config::loader::path_to_forward_slash(
&rsclaw_config::loader::base_dir().join(format!("workspace-{id}")),
)),
daemon: false,
model: Some(ModelConfig {
primary: Some(model.into()),
fallbacks: None,
image: None,
video: None,
thinking: None,
tools_enabled: None,
toolset: Some(toolset_str.clone()),
tools: None,
plugin_tools: None,
plugin_tools_unpin: None,
user_tools_cap: None,
user_tools_budget: None,
plugin_groups: None,
context_tokens: None,
max_tokens: None,
flash: None,
vision: None,
}),
flash_model: None,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: channels.clone(),
name: None,
agent_dir: None,
system: None,
commands: None,
allowed_commands: None,
opencode: None,
claudecode: None,
codex: None,
temperature: None,
};
let kind = if persistent {
crate::registry::AgentKind::Named
} else {
crate::registry::AgentKind::Sub
};
spawner.spawn_agent_with_kind(entry.clone(), kind)?;
let ws_path = rsclaw_config::loader::base_dir().join(format!("workspace-{id}"));
if let Err(e) = tokio::fs::create_dir_all(&ws_path).await {
warn!("agent_spawn: failed to create workspace for {id}: {e:#}");
}
let soul_path = ws_path.join("SOUL.md");
let full_prompt = self.build_subagent_system_prompt(&system);
if let Err(e) =
tokio::fs::write(&soul_path, format!("# Agent: {id}\n\n{full_prompt}\n")).await
{
warn!("agent_spawn: failed to write SOUL.md for {id}: {e:#}");
}
if persistent {
if let Err(e) = persist_agent_to_config(&entry).await {
warn!("agent_spawn: failed to persist to config: {e:#}");
}
}
let needs_restart = persistent && channels.is_some();
Ok(json!({
"spawned": id,
"model": args["model"],
"persistent": persistent,
"channels": channels,
"needs_restart": needs_restart,
"status": if needs_restart { "saved — restart gateway to bind channels" } else { "ready" }
}))
}
async fn tool_agent_task(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let spawner = self
.spawner
.as_ref()
.ok_or_else(|| {
anyhow!("agent_task: spawner not initialized in this gateway instance; agent tasks are disabled here — do not retry, inform the user")
})?;
let default_task_model = self.resolve_flash_model_name();
let model = args["model"]
.as_str()
.filter(|s| !s.is_empty())
.unwrap_or(&default_task_model)
.to_owned();
let _system = args["system"]
.as_str()
.ok_or_else(|| anyhow!("agent_task: `system` required"))?
.to_owned();
let message = args["message"]
.as_str()
.ok_or_else(|| anyhow!("agent_task: `message` required"))?
.to_owned();
let toolset_str = args["toolset"].as_str().unwrap_or("standard").to_owned();
let short_id = &uuid::Uuid::new_v4().to_string()[..8];
let id = format!("task-{short_id}");
let base = rsclaw_config::loader::base_dir();
let parent_ws = self
.handle
.config
.workspace
.as_deref()
.map(std::path::PathBuf::from)
.unwrap_or_else(|| base.join("workspace"));
let ws_path = parent_ws.join(format!("task-{short_id}"));
use rsclaw_config::schema::{AgentEntry, ModelConfig};
let entry = AgentEntry {
id: id.clone(),
description: None,
default: Some(false),
workspace: Some(rsclaw_config::loader::path_to_forward_slash(&ws_path)),
daemon: false,
model: Some(ModelConfig {
primary: Some(model.into()),
fallbacks: None,
image: None,
video: None,
thinking: None,
tools_enabled: None,
toolset: Some(toolset_str.clone()),
tools: None,
plugin_tools: None,
plugin_tools_unpin: None,
user_tools_cap: None,
user_tools_budget: None,
plugin_groups: None,
context_tokens: None,
max_tokens: None,
flash: None,
vision: None,
}),
flash_model: None,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: None,
name: None,
agent_dir: None,
system: None,
commands: None,
allowed_commands: None,
opencode: None,
claudecode: None,
codex: None,
temperature: None,
};
spawner.spawn_agent(entry)?;
let registry = self
.agents
.as_ref()
.ok_or_else(|| {
anyhow!("agent_task: agent registry not initialized in this gateway instance; agent tasks are disabled here — do not retry, inform the user")
})?;
let target = registry.get(&id)?;
let task_session = format!("{}:task:{short_id}", ctx.session_key);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let msg = AgentMessage {
session_key: task_session,
text: message.clone(),
channel: format!("task:{}", ctx.agent_id),
peer_id: ctx.agent_id.clone(),
chat_id: String::new(),
reply_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
target
.tx
.send(msg)
.await
.map_err(|_| {
anyhow!("agent_task: task agent '{id}' exited before accepting the message (likely crashed on startup); retry agent action=temp once — if it recurs, report to the user and check gateway logs")
})?;
let pending = Arc::clone(&self.pending_task_results);
let self_handle = Arc::clone(&self.handle);
let notification_tx = self.notification_tx.clone();
let session_key = ctx.session_key.clone();
let channel = ctx.channel.clone();
let peer_id = ctx.peer_id.clone();
let chat_id = ctx.chat_id.clone();
let task_id = id.clone();
let agents = self.agents.as_ref().map(Arc::clone);
let timeout_secs = self
.config
.agents
.defaults
.timeout_seconds
.unwrap_or(DEFAULT_TIMEOUT_SECONDS as u32) as u64;
let task_timeout = timeout_secs.min(300);
tokio::spawn(async move {
let result_text =
match tokio::time::timeout(Duration::from_secs(task_timeout), reply_rx).await {
Ok(Ok(reply)) => reply.text,
Ok(Err(_)) => format!(
"[task {task_id} failed: sub-agent exited without replying (likely crashed mid-run); no result was produced. Retry with agent action=temp, or do the work yourself and tell the user the sub-task failed.]"
),
Err(_) => format!(
"[task {task_id} timed out after {task_timeout}s — no result. Split it into smaller tasks, or ask the user to raise agents.defaults.timeout_seconds.]"
),
};
if let Ok(mut guard) = pending.lock() {
guard.push((task_id.clone(), session_key.clone(), result_text));
}
let (wake_tx, wake_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let wake_msg = AgentMessage {
session_key: session_key.clone(),
text: format!("[async task {task_id} completed]"),
channel: channel.clone(),
peer_id: peer_id.clone(),
chat_id: chat_id.clone(),
reply_tx: wake_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
if let Err(e) = self_handle.tx.send(wake_msg).await {
warn!(task_id, "failed to wake parent agent: {e}");
} else {
if let Ok(reply) = wake_rx.await {
if !reply.text.is_empty() {
if let Some(ref ntx) = notification_tx {
let target = if !chat_id.is_empty() {
chat_id
} else {
peer_id
};
if !target.is_empty()
&& !channel.is_empty()
&& channel != "system"
&& channel != "cron"
{
let body = if channel == "ws" || channel == "desktop" {
rsclaw_channel::outbound_with_kind(
rsclaw_channel::outbound_kind::TASK_COMPLETE,
reply.text,
)
} else {
reply.text
};
let _ = ntx.send(rsclaw_channel::OutboundMessage {
target_id: target,
is_group: false,
text: body,
reply_to: None,
images: reply.images.clone(),
files: reply.files.clone(),
channel: Some(channel),
account: None,
});
}
}
}
info!(task = %task_id, "async task: agent replied to user");
}
}
if let Some(reg) = agents {
reg.remove_handle(&task_id);
}
let _ = tokio::fs::remove_dir_all(&ws_path).await;
info!(task = %task_id, "async task agent completed and cleaned up");
});
Ok(json!({
"task_id": id,
"status": "dispatched",
"toolset": toolset_str,
"message": message,
"note": "Task is running in the background. Results will be available on your next turn. You can continue with other work."
}))
}
async fn tool_agent_send(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let target_id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent_send: `id` required"))?
.to_owned();
let message = args["message"]
.as_str()
.ok_or_else(|| anyhow!("agent_send: `message` required"))?
.to_owned();
let registry = self
.agents
.as_ref()
.ok_or_else(|| {
anyhow!("agent_send: agent registry not initialized in this gateway instance; agent messaging is disabled here — do not retry, inform the user")
})?;
let target = registry.get(&target_id)?;
let short_id = &uuid::Uuid::new_v4().to_string()[..8];
let send_session = format!("{}:send:{short_id}", ctx.session_key);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let msg = AgentMessage {
session_key: send_session,
text: message.clone(),
channel: format!("send:{}", ctx.agent_id),
peer_id: ctx.agent_id.clone(),
chat_id: String::new(),
reply_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
target
.tx
.send(msg)
.await
.map_err(|_| {
anyhow!("agent_send: agent '{target_id}' is no longer accepting messages (it has exited); run agent action=list to see live agents, or recreate it with agent action=create before resending")
})?;
let pending = Arc::clone(&self.pending_task_results);
let self_handle = Arc::clone(&self.handle);
let notification_tx = self.notification_tx.clone();
let session_key = ctx.session_key.clone();
let channel = ctx.channel.clone();
let peer_id = ctx.peer_id.clone();
let chat_id = ctx.chat_id.clone();
let timeout_secs = self
.config
.agents
.defaults
.timeout_seconds
.unwrap_or(DEFAULT_TIMEOUT_SECONDS as u32) as u64;
let send_timeout = timeout_secs.min(300);
let send_id = format!("send-{target_id}-{short_id}");
let send_id_bg = send_id.clone();
let target_id_bg = target_id.clone();
tokio::spawn(async move {
let result_text =
match tokio::time::timeout(Duration::from_secs(send_timeout), reply_rx).await {
Ok(Ok(reply)) => reply.text,
Ok(Err(_)) => format!(
"[agent {target_id_bg} exited before replying — the message may not have been processed. Run agent action=list to check if it is still alive; respawn and resend if needed.]"
),
Err(_) => format!(
"[agent {target_id_bg} timed out after {send_timeout}s — no result. Split the request into smaller messages, or ask the user to raise agents.defaults.timeout_seconds.]"
),
};
if let Ok(mut guard) = pending.lock() {
guard.push((send_id_bg.clone(), session_key.clone(), result_text));
}
let (wake_tx, wake_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let wake_msg = AgentMessage {
session_key: session_key.clone(),
text: format!("[async send {send_id_bg} completed]"),
channel: channel.clone(),
peer_id: peer_id.clone(),
chat_id: chat_id.clone(),
reply_tx: wake_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
if let Err(e) = self_handle.tx.send(wake_msg).await {
warn!(send_id = %send_id_bg, "failed to wake parent agent: {e}");
} else if let Ok(reply) = wake_rx.await {
if !reply.text.is_empty() {
if let Some(ref ntx) = notification_tx {
let target = if !chat_id.is_empty() {
chat_id
} else {
peer_id
};
if !target.is_empty()
&& !channel.is_empty()
&& channel != "system"
&& channel != "cron"
{
let body = if channel == "ws" || channel == "desktop" {
rsclaw_channel::outbound_with_kind(
rsclaw_channel::outbound_kind::ASYNC_SEND,
reply.text,
)
} else {
reply.text
};
let _ = ntx.send(rsclaw_channel::OutboundMessage {
target_id: target,
is_group: false,
text: body,
reply_to: None,
images: reply.images.clone(),
files: reply.files.clone(),
channel: Some(channel),
account: None,
});
}
}
info!(send_id = %send_id_bg, "async send: agent replied to user");
}
}
});
Ok(json!({
"send_id": send_id,
"target": target_id,
"status": "sent",
"note": "Message sent to agent. Reply will be available on your next turn."
}))
}
async fn tool_agent_update(&self, args: Value) -> Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent_update: `id` required"))?;
let model = args["model"].as_str();
let name = args["name"].as_str();
update_agent_in_config(id, model, name).await
}
async fn tool_agent_list(&self) -> Result<Value> {
let agents = match &self.agents {
Some(reg) => reg
.all()
.iter()
.map(|h| {
json!({
"id": h.id,
"model": h.config.model.as_ref()
.and_then(|m| m.primary_head())
.unwrap_or("unknown"),
})
})
.collect::<Vec<_>>(),
None => vec![],
};
Ok(json!({"agents": agents}))
}
pub(crate) async fn tool_agent_consolidated(
&self,
ctx: &RunContext,
args: Value,
) -> Result<Value> {
let action = args["action"].as_str().unwrap_or("list").trim();
match action {
"create" => self.tool_agent_spawn(args).await,
"temp" => self.tool_agent_task(ctx, args).await,
"send" => self.tool_agent_send(ctx, args).await,
"list" => self.tool_agent_list().await,
"update" => self.tool_agent_update(args).await,
"kill" => {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent kill: `id` required"))?;
Ok(json!({
"action": "kill",
"id": id,
"note": "agent termination not yet implemented; agent will stop on next idle timeout"
}))
}
_ => bail!("agent: unknown action '{action}' (spawn, dispatch, send, list, update, kill)"),
}
}
}