#[allow(unused_imports)]
use crate::sync_util::LockExt;
use rig::completion::ToolDefinition;
use rig::tool::Tool;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Mutex;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::agent::agent_loop::tool::AbortSignal;
use crate::agent::tools::background::{BackgroundStore, TaskState};
use crate::agent::tools::{AskSender, PermCheck, ToolError, check_perm};
use crate::provider::AnyModel;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum SubagentChatEvent {
Spawn { id: String, prompt: String },
Complete { id: String, result: String },
Failed { id: String, error: String },
Token { id: String, text: String },
Reasoning { id: String, text: String },
ToolCall {
id: String,
tool_name: String,
args_summary: String,
},
ToolResult {
id: String,
tool_name: String,
output_summary: String,
},
Aborted { id: String },
}
pub const SUBAGENT_CHAT_CAP: usize = 1024;
pub type SubagentChatSender = mpsc::Sender<SubagentChatEvent>;
#[allow(dead_code)]
pub type SubagentChatReceiver = mpsc::Receiver<SubagentChatEvent>;
static SUBAGENT_CHAT_SINK: std::sync::OnceLock<SubagentChatSender> = std::sync::OnceLock::new();
pub fn set_subagent_chat_sink(sink: SubagentChatSender) {
if SUBAGENT_CHAT_SINK.set(sink).is_err() {
tracing::debug!("subagent chat sink already set; ignoring re-set");
}
}
pub fn subagent_chat_sink() -> Option<SubagentChatSender> {
SUBAGENT_CHAT_SINK.get().cloned()
}
static SUBAGENT_ABORT_REGISTRY: std::sync::OnceLock<Mutex<HashMap<String, AbortSignal>>> =
std::sync::OnceLock::new();
fn abort_registry() -> &'static Mutex<HashMap<String, AbortSignal>> {
SUBAGENT_ABORT_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}
pub fn register_subagent_abort(id: &str, signal: AbortSignal) {
let mut map = abort_registry().lock_ignore_poison();
map.insert(id.to_string(), signal);
}
pub fn unregister_subagent_abort(id: &str) {
let mut map = abort_registry().lock_ignore_poison();
map.remove(id);
}
#[derive(Clone)]
pub struct SubagentRoute {
pub model: Option<AnyModel>,
pub preamble: Option<String>,
}
static SUBAGENT_ROUTES: std::sync::OnceLock<HashMap<String, SubagentRoute>> =
std::sync::OnceLock::new();
pub fn set_subagent_routes(routes: HashMap<String, SubagentRoute>) {
if SUBAGENT_ROUTES.set(routes).is_err() {
tracing::debug!("subagent routes already set; ignoring re-set");
}
}
fn subagent_route(name: &str) -> Option<SubagentRoute> {
SUBAGENT_ROUTES.get()?.get(name).cloned()
}
fn subagent_routes_available() -> bool {
SUBAGENT_ROUTES.get().is_some_and(|m| !m.is_empty())
}
fn subagent_route_names() -> Vec<String> {
let Some(map) = SUBAGENT_ROUTES.get() else {
return Vec::new();
};
let mut names: Vec<String> = map.keys().cloned().collect();
names.sort();
names
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KillOutcome {
NotFound,
Ambiguous(Vec<String>),
Killed(String),
}
pub fn kill_subagent(id_prefix: &str) -> KillOutcome {
let trimmed = id_prefix.trim();
if trimmed.is_empty() {
return KillOutcome::NotFound;
}
let map = abort_registry().lock_ignore_poison();
if let Some(sig) = map.get(trimmed) {
sig.cancel();
return KillOutcome::Killed(trimmed.to_string());
}
let matches: Vec<String> = map
.keys()
.filter(|k| k.starts_with(trimmed))
.cloned()
.collect();
match matches.len() {
0 => KillOutcome::NotFound,
1 => {
let id = matches.into_iter().next().unwrap();
if let Some(sig) = map.get(&id) {
sig.cancel();
}
KillOutcome::Killed(id)
}
_ => KillOutcome::Ambiguous(matches),
}
}
#[allow(dead_code)]
pub fn registered_subagent_ids() -> Vec<String> {
let map = abort_registry().lock_ignore_poison();
map.keys().cloned().collect()
}
#[cfg(test)]
pub fn clear_abort_registry_for_test() {
let mut map = abort_registry().lock_ignore_poison();
map.clear();
}
pub struct TaskTool {
pub permission: Option<PermCheck>,
pub ask_tx: Option<AskSender>,
model: AnyModel,
bg_store: BackgroundStore,
chat_sink: Option<SubagentChatSender>,
}
impl TaskTool {
pub fn new(
permission: Option<PermCheck>,
ask_tx: Option<AskSender>,
model: AnyModel,
bg_store: BackgroundStore,
) -> Self {
Self {
permission,
ask_tx,
model,
bg_store,
chat_sink: None,
}
}
#[allow(dead_code)]
pub fn with_chat_sink(mut self, sink: SubagentChatSender) -> Self {
self.chat_sink = Some(sink);
self
}
fn emit_chat(&self, event: SubagentChatEvent) {
if let Some(sink) = &self.chat_sink {
let _ = sink.try_send(event);
return;
}
if let Some(sink) = subagent_chat_sink() {
let _ = sink.try_send(event);
}
}
}
#[derive(Deserialize)]
pub struct Args {
pub prompt: String,
#[serde(default)]
pub background: Option<bool>,
#[serde(default)]
pub agent: Option<String>,
}
impl Tool for TaskTool {
const NAME: &'static str = "task";
type Error = ToolError;
type Args = Args;
type Output = String;
async fn definition(&self, _prompt: String) -> ToolDefinition {
let mut description = "Spawn a subagent to handle a specific subtask. The subagent runs as a one-shot query (no tools) and returns its result inline. Use for research, analysis, or planning subtasks that don't require file access. Set background=true to run asynchronously — completion is delivered to you automatically as a <system-reminder> at the start of your next turn. Do NOT poll task_status in a loop or sleep waiting; continue with other work."
.to_string();
let mut properties = serde_json::json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Task description for the subagent"
},
"background": {
"type": "boolean",
"description": "Run asynchronously (default: false). When true, returns a task_id immediately. The result is delivered automatically as a <system-reminder> on your next turn — do NOT poll task_status."
}
},
"required": ["prompt"]
});
let names = subagent_route_names();
if !names.is_empty() {
description.push_str(&format!(
" Optionally set agent=<name> to run the subagent under a defined profile (its own model + system prompt). Available profiles: {}.",
names.join(", ")
));
if let Some(props) = properties
.get_mut("properties")
.and_then(|p| p.as_object_mut())
{
props.insert(
"agent".to_string(),
serde_json::json!({
"type": "string",
"enum": names,
"description": "Agent profile to run this subagent under (model + system prompt). Omit for the default subagent."
}),
);
}
}
ToolDefinition {
name: "task".to_string(),
description,
parameters: properties,
}
}
async fn call(&self, args: Args) -> Result<String, ToolError> {
check_perm(&self.permission, &self.ask_tx, "task", &args.prompt).await?;
let (route_model, route_preamble) = match args.agent.as_deref() {
None => (None, None),
Some(name) => {
if !subagent_routes_available() {
return Err(ToolError::Msg(format!(
"agent profile '{}' requested but no profiles are defined. Add .dirge/agents/<name>.md or a config.json \"agents\" entry, or omit `agent`.",
name
)));
}
match subagent_route(name) {
Some(r) => (r.model, r.preamble),
None => {
return Err(ToolError::Msg(format!(
"unknown agent profile '{}'. Available: {}.",
name,
subagent_route_names().join(", ")
)));
}
}
}
};
let model = route_model.unwrap_or_else(|| self.model.clone());
let background = args.background.unwrap_or(false);
if background {
let running = self.bg_store.running_count();
let cap = BackgroundStore::max_concurrent();
if running >= cap {
return Err(ToolError::Msg(format!(
"background subagent cap reached ({}/{} in flight). Wait for one to finish (use task_status) or run inline (background=false). Capping prevents fan-out from burning the API budget.",
running, cap,
)));
}
let task_id = Uuid::new_v4().to_string();
self.bg_store.insert(task_id.clone());
self.bg_store.notify_started(&task_id);
self.emit_chat(SubagentChatEvent::Spawn {
id: task_id.clone(),
prompt: args.prompt.clone(),
});
let abort = AbortSignal::new();
register_subagent_abort(&task_id, abort.clone());
let prompt = args.prompt;
let store = self.bg_store.clone();
let tid = task_id.clone();
let chat_sink = self.chat_sink.clone();
let abort_for_task = abort.clone();
let preamble_for_task = route_preamble.clone();
const SUBAGENT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
let store_for_task = store.clone();
let tid_for_task = tid.clone();
let handle = tokio::spawn(async move {
let fut = model.btw_query_with(
format!(
"You are a subagent working on a specific subtask. Complete it thoroughly.\n\nTask: {}",
prompt
),
preamble_for_task.as_deref(),
);
let abort_check = abort_for_task.clone();
let raced = async {
tokio::pin!(fut);
loop {
tokio::select! {
r = &mut fut => break Ok::<_, ()>(r),
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
if abort_check.is_cancelled() {
break Err(());
}
}
}
}
};
let outer = tokio::time::timeout(SUBAGENT_TIMEOUT, raced).await;
let (state, chat_event) = match outer {
Ok(Ok(Ok(text))) => (
TaskState::Completed(text.clone()),
SubagentChatEvent::Token {
id: tid_for_task.clone(),
text: text.clone(),
},
),
Ok(Ok(Err(e))) => {
let msg = e.to_string();
(
TaskState::Failed(msg.clone()),
SubagentChatEvent::Failed {
id: tid_for_task.clone(),
error: msg,
},
)
}
Ok(Err(())) => {
let msg = "aborted by user".to_string();
(
TaskState::Failed(msg.clone()),
SubagentChatEvent::Aborted {
id: tid_for_task.clone(),
},
)
}
Err(_) => {
let msg =
format!("subagent timed out after {}s", SUBAGENT_TIMEOUT.as_secs(),);
(
TaskState::Failed(msg.clone()),
SubagentChatEvent::Failed {
id: tid_for_task.clone(),
error: msg,
},
)
}
};
let final_event = match &chat_event {
SubagentChatEvent::Token { id, text } => {
if let Some(sink) = &chat_sink {
let _ = sink.try_send(chat_event.clone());
} else if let Some(sink) = subagent_chat_sink() {
let _ = sink.try_send(chat_event.clone());
}
SubagentChatEvent::Complete {
id: id.clone(),
result: text.clone(),
}
}
_ => chat_event.clone(),
};
if let Some(sink) = chat_sink {
let _ = sink.try_send(final_event);
} else if let Some(sink) = subagent_chat_sink() {
let _ = sink.try_send(final_event);
}
unregister_subagent_abort(&tid_for_task);
store_for_task.notify(&tid_for_task, state);
});
store.attach_handle(&tid, handle);
Ok(format!(
"background task started — task_id: {}\n\nThe subagent runs in the background. Completion will be delivered automatically as a <system-reminder> at the start of your next turn. Do NOT poll task_status or sleep waiting — continue with other work.",
task_id
))
} else {
let task_id = Uuid::new_v4().to_string();
self.emit_chat(SubagentChatEvent::Spawn {
id: task_id.clone(),
prompt: args.prompt.clone(),
});
let abort = AbortSignal::new();
register_subagent_abort(&task_id, abort.clone());
let fut = model.btw_query_with(
format!(
"You are a subagent working on a specific subtask. Complete it thoroughly.\n\nTask: {}",
args.prompt
),
route_preamble.as_deref(),
);
let abort_check = abort.clone();
let raced = async {
tokio::pin!(fut);
loop {
tokio::select! {
r = &mut fut => break Ok::<_, ()>(r),
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
if abort_check.is_cancelled() {
break Err(());
}
}
}
}
};
let result = raced.await;
unregister_subagent_abort(&task_id);
match result {
Ok(Ok(text)) => {
self.emit_chat(SubagentChatEvent::Token {
id: task_id.clone(),
text: text.clone(),
});
self.emit_chat(SubagentChatEvent::Complete {
id: task_id,
result: text.clone(),
});
let outcome =
crate::agent::tools::output_relay::relay_if_large("task", text, "");
Ok(outcome.text)
}
Ok(Err(e)) => {
let msg = e.to_string();
self.emit_chat(SubagentChatEvent::Failed {
id: task_id,
error: msg.clone(),
});
Err(ToolError::Msg(format!("Subagent error: {}", msg)))
}
Err(()) => {
self.emit_chat(SubagentChatEvent::Aborted { id: task_id });
Err(ToolError::Msg("Subagent aborted by user".to_string()))
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::tools::background::BackgroundStore;
use crate::provider::AnyModel;
use rig::client::CompletionClient;
use rig::providers::openrouter;
fn mock_tool() -> TaskTool {
let client = openrouter::Client::new("test-key").unwrap();
let model = client.completion_model("anthropic/claude-sonnet-4.5");
TaskTool::new(
None,
None,
AnyModel::OpenRouter(model),
BackgroundStore::new(),
)
}
#[tokio::test]
async fn definition_steers_agent_away_from_polling() {
let tool = mock_tool();
let def = tool.definition(String::new()).await;
let desc = def.description.to_lowercase();
assert!(
desc.contains("system-reminder") || desc.contains("automatically"),
"task description must reference automatic notification: {}",
def.description
);
assert!(
desc.contains("do not poll") || desc.contains("not poll"),
"task description must explicitly discourage polling: {}",
def.description
);
}
#[test]
fn subagent_path_is_stateless_no_session_search_leakage() {
let _expected_fields = ["permission", "ask_tx", "model", "bg_store", "chat_sink"];
let _tool: TaskTool = mock_tool();
let provider_src = include_str!("../../provider/dispatch.rs");
let btw_idx = provider_src
.find("pub async fn btw_query_with")
.expect("btw_query_with must exist in provider/dispatch.rs");
let btw_end = provider_src[btw_idx..]
.find("\n }\n")
.map(|i| btw_idx + i)
.unwrap_or(provider_src.len());
let btw_body = &provider_src[btw_idx..btw_end];
assert!(
!btw_body.contains(".tool("),
"btw_query_with must not attach tools to the subagent — that would \
require auditing session_id propagation per dirge-mifq. \
Source snippet:\n{btw_body}"
);
assert!(
!btw_body.contains(".tools("),
"btw_query_with must not attach tools to the subagent — that would \
require auditing session_id propagation per dirge-mifq."
);
}
#[tokio::test]
async fn subagent_routing_advertises_and_validates() {
let mut routes = HashMap::new();
routes.insert(
"reviewer".to_string(),
SubagentRoute {
model: None,
preamble: Some("You are a reviewer.".to_string()),
},
);
set_subagent_routes(routes);
assert!(subagent_routes_available());
assert_eq!(subagent_route_names(), vec!["reviewer".to_string()]);
assert!(subagent_route("reviewer").is_some());
assert!(subagent_route("ghost").is_none());
let tool = mock_tool();
let def = tool.definition(String::new()).await;
assert!(
def.description.contains("Available profiles: reviewer"),
"definition must list installed profiles: {}",
def.description
);
let props = def
.parameters
.get("properties")
.and_then(|v| v.as_object())
.expect("properties present");
let agent_prop = props.get("agent").expect("agent property advertised");
assert_eq!(agent_prop["enum"][0], "reviewer");
}
#[tokio::test]
async fn definition_advertises_background_field() {
let tool = mock_tool();
let def = tool.definition(String::new()).await;
let props = def
.parameters
.get("properties")
.and_then(|v| v.as_object())
.expect("properties present");
assert!(props.contains_key("background"));
let bg_desc = props["background"]["description"]
.as_str()
.unwrap()
.to_lowercase();
assert!(bg_desc.contains("automatically") || bg_desc.contains("system-reminder"));
assert!(bg_desc.contains("do not poll") || bg_desc.contains("not poll"));
}
#[test]
fn task_short_output_returned_verbatim() {
let short = "subagent: 42 is the answer.\n".to_string();
let outcome = crate::agent::tools::output_relay::relay_if_large("task", short.clone(), "");
assert!(
outcome.relayed_to.is_none(),
"short output must not trigger the disk relay",
);
assert_eq!(
outcome.text, short,
"short subagent output must round-trip unchanged to the parent",
);
}
#[test]
fn task_large_output_relayed_to_disk_with_summary() {
let huge: String = "subagent line\n".repeat(5_000);
let original_len = huge.len();
let outcome = crate::agent::tools::output_relay::relay_if_large("task", huge, "");
let path = outcome
.relayed_to
.as_ref()
.expect("large output must trigger the disk relay");
assert!(path.exists(), "relayed file must exist at {path:?}");
let written = std::fs::read_to_string(path).expect("read relayed file");
assert_eq!(
written.len(),
original_len,
"the FULL original payload must be on disk (not the truncated head)",
);
let summary = &outcome.text;
assert!(
summary.len() < original_len,
"summary should be much smaller than the original payload",
);
assert!(
summary.contains("`read`"),
"summary must mention the `read` tool so the agent can recover the full payload: {summary}",
);
assert!(
summary.contains("transient") || summary.contains(".dirge"),
"summary must reference the transient path: {summary}",
);
let _ = std::fs::remove_file(path);
}
fn registry_test_lock() -> std::sync::MutexGuard<'static, ()> {
static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
LOCK.get_or_init(|| std::sync::Mutex::new(()))
.lock_ignore_poison()
}
#[test]
fn kill_unknown_id_no_op() {
let _guard = registry_test_lock();
clear_abort_registry_for_test();
assert_eq!(kill_subagent("abc"), KillOutcome::NotFound);
assert_eq!(kill_subagent(""), KillOutcome::NotFound);
let sig = AbortSignal::new();
register_subagent_abort("aaa-1111", sig.clone());
assert_eq!(kill_subagent("zzz"), KillOutcome::NotFound);
assert!(
!sig.is_cancelled(),
"unmatched kill must NOT cancel the surviving subagent",
);
unregister_subagent_abort("aaa-1111");
}
#[test]
fn kill_resolves_by_prefix_unique_match() {
let _guard = registry_test_lock();
clear_abort_registry_for_test();
let sig_a = AbortSignal::new();
let sig_b = AbortSignal::new();
register_subagent_abort("aa11-deadbeef", sig_a.clone());
register_subagent_abort("bb22-cafef00d", sig_b.clone());
match kill_subagent("aa") {
KillOutcome::Killed(id) => assert_eq!(id, "aa11-deadbeef"),
other => panic!("expected Killed; got {:?}", other),
}
assert!(sig_a.is_cancelled(), "matched signal must be cancelled");
assert!(!sig_b.is_cancelled(), "unmatched signal must survive");
let sig_a2 = AbortSignal::new();
register_subagent_abort("aa99-othertask", sig_a2.clone());
match kill_subagent("aa") {
KillOutcome::Ambiguous(ids) => {
assert_eq!(ids.len(), 2);
assert!(ids.iter().any(|i| i == "aa11-deadbeef"));
assert!(ids.iter().any(|i| i == "aa99-othertask"));
}
other => panic!("expected Ambiguous; got {:?}", other),
}
assert!(
!sig_a2.is_cancelled(),
"ambiguous kill must NOT cancel any signal",
);
clear_abort_registry_for_test();
let s1 = AbortSignal::new();
let s2 = AbortSignal::new();
register_subagent_abort("abc", s1.clone());
register_subagent_abort("abcdef", s2.clone());
match kill_subagent("abc") {
KillOutcome::Killed(id) => assert_eq!(id, "abc"),
other => panic!("expected exact-match Killed; got {:?}", other),
}
assert!(s1.is_cancelled());
assert!(!s2.is_cancelled());
clear_abort_registry_for_test();
}
#[tokio::test]
async fn subagent_complete_after_kill_returns_aborted_result() {
let _guard = registry_test_lock();
clear_abort_registry_for_test();
let tid = "t-abort-1";
let abort = AbortSignal::new();
register_subagent_abort(tid, abort.clone());
let abort_check = abort.clone();
let fut = async {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
Ok::<String, anyhow::Error>("never-arrives".to_string())
};
let raced = async {
tokio::pin!(fut);
loop {
tokio::select! {
r = &mut fut => break Ok::<_, ()>(r),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {
if abort_check.is_cancelled() {
break Err(());
}
}
}
}
};
let killer = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(75)).await;
assert!(matches!(kill_subagent("t-abort"), KillOutcome::Killed(_)));
});
let result = tokio::time::timeout(std::time::Duration::from_secs(2), raced)
.await
.expect("racer must exit before the 2s test timeout");
killer.await.unwrap();
match result {
Err(()) => { }
Ok(other) => panic!("expected abort; got Ok({:?})", other),
}
unregister_subagent_abort(tid);
clear_abort_registry_for_test();
}
#[test]
fn subagent_chat_channel_is_bounded_and_drops_on_overflow() {
let (tx, _rx) = mpsc::channel::<SubagentChatEvent>(SUBAGENT_CHAT_CAP);
for i in 0..SUBAGENT_CHAT_CAP {
tx.try_send(SubagentChatEvent::Token {
id: "x".into(),
text: format!("{i}"),
})
.expect("sends within capacity succeed");
}
let overflow = tx.try_send(SubagentChatEvent::Token {
id: "x".into(),
text: "overflow".into(),
});
assert!(
overflow.is_err(),
"channel must be bounded — an over-capacity try_send drops"
);
}
#[test]
fn subagent_token_event_routes_to_chat_slot() {
let (tx, mut rx) = mpsc::channel::<SubagentChatEvent>(SUBAGENT_CHAT_CAP);
tx.try_send(SubagentChatEvent::Token {
id: "a1".into(),
text: "hello world".into(),
})
.unwrap();
tx.try_send(SubagentChatEvent::Reasoning {
id: "a1".into(),
text: "thinking".into(),
})
.unwrap();
tx.try_send(SubagentChatEvent::Aborted { id: "a1".into() })
.unwrap();
match rx.try_recv().unwrap() {
SubagentChatEvent::Token { id, text } => {
assert_eq!(id, "a1");
assert_eq!(text, "hello world");
}
other => panic!("expected Token; got {:?}", other),
}
match rx.try_recv().unwrap() {
SubagentChatEvent::Reasoning { id, text } => {
assert_eq!(id, "a1");
assert_eq!(text, "thinking");
}
other => panic!("expected Reasoning; got {:?}", other),
}
match rx.try_recv().unwrap() {
SubagentChatEvent::Aborted { id } => assert_eq!(id, "a1"),
other => panic!("expected Aborted; got {:?}", other),
}
}
#[test]
fn subagent_tool_call_event_routes_to_chat_slot() {
let (tx, mut rx) = mpsc::channel::<SubagentChatEvent>(SUBAGENT_CHAT_CAP);
tx.try_send(SubagentChatEvent::ToolCall {
id: "a1".into(),
tool_name: "read".into(),
args_summary: "path=/tmp/x".into(),
})
.unwrap();
tx.try_send(SubagentChatEvent::ToolResult {
id: "a1".into(),
tool_name: "read".into(),
output_summary: "12 lines".into(),
})
.unwrap();
match rx.try_recv().unwrap() {
SubagentChatEvent::ToolCall {
id,
tool_name,
args_summary,
} => {
assert_eq!(id, "a1");
assert_eq!(tool_name, "read");
assert_eq!(args_summary, "path=/tmp/x");
}
other => panic!("expected ToolCall; got {:?}", other),
}
match rx.try_recv().unwrap() {
SubagentChatEvent::ToolResult {
id,
tool_name,
output_summary,
} => {
assert_eq!(id, "a1");
assert_eq!(tool_name, "read");
assert_eq!(output_summary, "12 lines");
}
other => panic!("expected ToolResult; got {:?}", other),
}
}
}