use std::collections::{HashMap, HashSet};
use super::super::JsonError;
use super::AppState;
pub(super) fn is_virtual_delegation_tool(tool_name: &str) -> bool {
roboticus_core::delegation_tools::is_virtual_delegation_tool(tool_name)
}
fn subtask_task_id(turn_id: &str, idx: usize) -> String {
format!("{turn_id}-sub-{idx}")
}
#[allow(clippy::too_many_arguments)]
fn record_task_event(
state: &AppState,
task_id: &str,
parent_task_id: &str,
assigned_to: Option<&str>,
event_type: roboticus_db::task_events::TaskLifecycleState,
summary: Option<String>,
detail_json: Option<String>,
percentage: Option<f64>,
retry_count: i32,
) {
let row = roboticus_db::task_events::TaskEventRow {
id: uuid::Uuid::new_v4().to_string(),
task_id: task_id.to_string(),
parent_task_id: Some(parent_task_id.to_string()),
assigned_to: assigned_to.map(ToOwned::to_owned),
event_type,
summary,
detail_json,
percentage,
retry_count,
created_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
};
if let Err(e) = roboticus_db::task_events::insert_task_event(&state.db, &row) {
tracing::warn!(
error = %e,
task_id = %task_id,
event_type = %row.event_type,
"failed to persist delegation task event"
);
}
}
#[derive(Clone, Copy)]
enum PickRunningSubagentMode {
Strict,
Lenient,
}
async fn resolve_subagent_runtime_model(
state: &AppState,
subagent: &roboticus_db::agents::SubAgentRow,
task: &str,
) -> String {
let configured = subagent.model.trim();
if configured.eq_ignore_ascii_case("auto") {
return super::select_routed_model(state, task).await;
}
if configured.eq_ignore_ascii_case("orchestrator") {
let llm = state.llm.read().await;
return llm.router.select_model().to_string();
}
if configured.is_empty() {
return super::select_routed_model(state, task).await;
}
configured.to_string()
}
fn pick_running_subagent<'a>(
task: &str,
specialist_hint: Option<&str>,
taskable_subagents: &'a [roboticus_db::agents::SubAgentRow],
runtime_by_name: &HashMap<String, roboticus_agent::subagents::AgentInstance>,
mode: PickRunningSubagentMode,
) -> Option<&'a roboticus_db::agents::SubAgentRow> {
let running: Vec<&roboticus_db::agents::SubAgentRow> = taskable_subagents
.iter()
.filter(|sa| {
runtime_by_name
.get(&sa.name.to_ascii_lowercase())
.is_some_and(|inst| {
inst.state == roboticus_agent::subagents::AgentRunState::Running
})
})
.collect();
if running.is_empty() {
return None;
}
if let Some(hint_raw) = specialist_hint {
let hint = hint_raw.trim().to_ascii_lowercase();
if !hint.is_empty()
&& let Some(chosen) = running.iter().find(|sa| {
sa.name.eq_ignore_ascii_case(&hint)
|| sa
.display_name
.as_deref()
.is_some_and(|d| d.to_ascii_lowercase().contains(&hint))
})
{
return Some(chosen);
}
}
let required = super::capability_tokens(task);
let hint_nonempty = specialist_hint.is_some_and(|h| !h.trim().is_empty());
if matches!(mode, PickRunningSubagentMode::Strict) && required.is_empty() && !hint_nonempty {
return None;
}
let mut scored: Vec<(&roboticus_db::agents::SubAgentRow, usize)> = running
.iter()
.map(|sa| {
let skills = super::parse_skills_json(sa.skills_json.as_deref());
let skill_tokens: HashSet<String> = skills
.iter()
.flat_map(|s| super::capability_tokens(s))
.collect();
let identity_tokens: HashSet<String> = [
Some(sa.name.as_str()),
sa.display_name.as_deref(),
sa.description.as_deref(),
]
.into_iter()
.flatten()
.flat_map(super::capability_tokens)
.collect();
let overlap = required
.iter()
.filter(|tok| skill_tokens.contains(*tok) || identity_tokens.contains(*tok))
.count();
(*sa, overlap)
})
.collect();
scored.sort_by_key(|(_, overlap)| std::cmp::Reverse(*overlap));
match mode {
PickRunningSubagentMode::Lenient => scored
.first()
.map(|(sa, _)| *sa)
.or_else(|| running.first().copied()),
PickRunningSubagentMode::Strict => {
let (best_sa, best_overlap) = scored.first().copied()?;
if best_overlap == 0 {
return None;
}
Some(best_sa)
}
}
}
fn timeout_like_error(err: &str) -> bool {
let e = err.to_ascii_lowercase();
e.contains("timeout")
|| e.contains("timed out")
|| e.contains("no route to host")
|| e.contains("error sending request")
|| e.contains("connection refused")
}
async fn provider_breaker_blocked(state: &AppState, model: &str) -> bool {
let provider_prefix = roboticus_core::model::provider_prefix(model);
let llm = state.llm.read().await;
llm.breakers.is_blocked(provider_prefix)
}
async fn select_cloud_rescue_model(
state: &AppState,
current_model: &str,
preferred_fallbacks: &[String],
) -> Option<String> {
let config = state.config.read().await;
let mut candidates = Vec::new();
for m in preferred_fallbacks {
if !m.trim().is_empty() && !candidates.iter().any(|c: &String| c == m) {
candidates.push(m.clone());
}
}
for m in &config.models.fallbacks {
if !m.trim().is_empty() && !candidates.iter().any(|c: &String| c == m) {
candidates.push(m.clone());
}
}
if !config.models.primary.trim().is_empty()
&& !candidates
.iter()
.any(|c: &String| c == &config.models.primary)
{
candidates.push(config.models.primary.clone());
}
drop(config);
let llm = state.llm.read().await;
for candidate in candidates {
if candidate.eq_ignore_ascii_case(current_model) {
continue;
}
let Some(provider) = llm.providers.get_by_model(&candidate) else {
continue;
};
if provider.is_local {
continue;
}
if llm
.breakers
.is_blocked(roboticus_core::model::provider_prefix(&candidate))
{
continue;
}
return Some(candidate);
}
None
}
pub(super) async fn execute_virtual_subagent_tool_call(
state: &AppState,
tool_name: &str,
params: &serde_json::Value,
turn_id: &str,
authority: roboticus_core::InputAuthority,
tier: roboticus_core::SurvivalTier,
) -> Result<String, String> {
let policy_result = super::check_tool_policy(
&state.policy_engine,
tool_name,
params,
authority,
tier,
roboticus_core::RiskLevel::Caution,
);
let (decision_str, rule_name, reason) = match &policy_result {
Ok(()) => ("allow".to_string(), None, None),
Err(JsonError(_status, msg)) => (
"deny".to_string(),
Some("policy_engine"),
Some(msg.as_str()),
),
};
roboticus_db::policy::record_policy_decision(
&state.db,
Some(turn_id),
tool_name,
&decision_str,
rule_name,
reason,
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to record policy decision"))
.ok();
if let Err(JsonError(_status, msg)) = policy_result {
return Err(format!("Policy denied: {msg}"));
}
match state.approvals.check_tool(tool_name) {
Ok(roboticus_agent::approvals::ToolClassification::Gated) => {
let request = state
.approvals
.request_approval(
tool_name,
¶ms.to_string(),
None,
Some(turn_id),
authority,
)
.map_err(|e| format!("Approval error: {e}"))?;
roboticus_db::approvals::record_approval_request(
&state.db,
&request.id,
&request.tool_name,
&request.tool_input,
request.session_id.as_deref(),
request.turn_id.as_deref(),
"pending",
&request.timeout_at.to_rfc3339(),
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to persist approval request"))
.ok();
return Err(format!(
"Tool '{tool_name}' requires approval (request: {})",
request.id
));
}
Err(e) => return Err(format!("Tool blocked: {e}")),
Ok(_) => {}
}
let action = tool_name.trim().to_ascii_lowercase();
let mut task = params
.get("task")
.and_then(|v| v.as_str())
.or_else(|| params.get("query").and_then(|v| v.as_str()))
.or_else(|| params.get("prompt").and_then(|v| v.as_str()))
.unwrap_or("")
.trim()
.to_string();
let specialist_hint = params
.get("specialist")
.and_then(|v| v.as_str())
.or_else(|| params.get("subagent").and_then(|v| v.as_str()));
let explicit_task_id = params
.get("task_id")
.or_else(|| params.get("id"))
.and_then(|v| v.as_str())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
let parent_task_id = params
.get("parent_task_id")
.or_else(|| params.get("turn_id"))
.and_then(|v| v.as_str())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| turn_id.to_string());
let subtasks: Vec<String> = match params.get("subtasks") {
Some(v) => match v.as_array() {
Some(arr) => arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.trim().to_string()))
.filter(|s| !s.is_empty())
.take(6)
.collect(),
None => {
tracing::warn!("delegation 'subtasks' param is not an array, ignoring");
vec![]
}
},
None => vec![],
};
if task.is_empty() && !subtasks.is_empty() {
task = subtasks.join("; ");
}
if task.is_empty() {
return Err("delegation tool requires `task` (or `subtasks`)".to_string());
}
let all_subagents = roboticus_db::agents::list_sub_agents(&state.db)
.map_err(|e| format!("failed to query sub-agents: {e}"))?;
let taskable_subagents: Vec<roboticus_db::agents::SubAgentRow> = all_subagents
.into_iter()
.filter(|sa| !super::is_model_proxy_role(&sa.role) && sa.enabled)
.collect();
if taskable_subagents.is_empty() {
return Err("no enabled taskable subagents are configured".to_string());
}
let mut ready_subagents = Vec::new();
let mut repair_failures = Vec::new();
for sa in &taskable_subagents {
match crate::api::routes::subagent_integrity::ensure_taskable_subagent_ready(state, sa)
.await
{
Ok(ready) => ready_subagents.push(ready),
Err(err) => repair_failures.push(format!("{}: {}", sa.name, err)),
}
}
if ready_subagents.is_empty() {
return Err(format!(
"no viable taskable subagents are available after integrity repair ({}).",
repair_failures.join(" | ")
));
}
let runtime_by_name: HashMap<String, roboticus_agent::subagents::AgentInstance> = state
.registry
.list_agents()
.await
.into_iter()
.map(|a| (a.id.to_ascii_lowercase(), a))
.collect();
let booting_count = runtime_by_name
.values()
.filter(|a| {
matches!(
a.state,
roboticus_agent::subagents::AgentRunState::Starting
| roboticus_agent::subagents::AgentRunState::Idle
)
})
.count();
let running_count = runtime_by_name
.values()
.filter(|a| a.state == roboticus_agent::subagents::AgentRunState::Running)
.count();
if action == "select-subagent-model" || action == "select_subagent_model" {
let chosen = pick_running_subagent(
&task,
specialist_hint,
&ready_subagents,
&runtime_by_name,
PickRunningSubagentMode::Lenient,
)
.or_else(|| ready_subagents.first())
.ok_or_else(|| "no candidate subagent found for model selection".to_string())?;
let model = resolve_subagent_runtime_model(state, chosen, &task).await;
return Ok(format!(
"selected_subagent={} resolved_model={} running={} booting={}",
chosen.name, model, running_count, booting_count
));
}
let chosen = pick_running_subagent(
&task,
specialist_hint,
&ready_subagents,
&runtime_by_name,
PickRunningSubagentMode::Strict,
)
.ok_or_else(|| {
format!(
"no running taskable subagents are available after integrity repair (running={}, booting={}, repair_failures={})",
running_count, booting_count, repair_failures.join(" | ")
)
})?;
let model = resolve_subagent_runtime_model(state, chosen, &task).await;
roboticus_db::agents::record_subagent_usage(&state.db, &chosen.name)
.inspect_err(|e| tracing::warn!(error = %e, subagent = %chosen.name, "failed to record subagent usage"))
.ok();
let preferred_fallbacks = crate::api::routes::subagents::parse_fallback_models_json(
chosen.fallback_models_json.as_deref(),
);
let configured_is_fixed = {
let raw = chosen.model.trim().to_ascii_lowercase();
!raw.is_empty() && raw != "auto" && raw != "orchestrator"
};
let mut effective_model = model.clone();
let mut delegation_notes: Vec<String> = Vec::new();
if configured_is_fixed
&& provider_breaker_blocked(state, &effective_model).await
&& let Some(rescue) =
select_cloud_rescue_model(state, &effective_model, &preferred_fallbacks).await
{
delegation_notes.push(format!(
"breaker-open guardrail rerouted fixed model from {} to {}",
effective_model, rescue
));
effective_model = rescue;
}
let task_list = if subtasks.is_empty() {
vec![task.clone()]
} else {
subtasks
};
let mut outputs = Vec::new();
for (idx, subtask) in task_list.iter().enumerate() {
let task_id = if idx == 0 {
explicit_task_id
.clone()
.unwrap_or_else(|| subtask_task_id(&parent_task_id, idx))
} else {
subtask_task_id(&parent_task_id, idx)
};
let skills = super::parse_skills_json(chosen.skills_json.as_deref());
let skills_line = if skills.is_empty() {
"(none)".to_string()
} else {
skills.join(", ")
};
let charter_note = chosen
.description
.as_deref()
.map(str::trim)
.filter(|d| !d.is_empty())
.map(|d| {
const MAX: usize = 400;
if d.len() <= MAX {
d.to_string()
} else {
let b = d.floor_char_boundary(MAX);
format!("{}…", &d[..b])
}
})
.map(|d| format!("\nOperator-configured charter: {d}"))
.unwrap_or_default();
let system_prompt = format!(
"You are specialist subagent `{}` answering an **internal orchestrator delegation**, \
not as the primary agent the user talks to day-to-day. Skills from roster: {skills_line}.{charter_note}\n\
\nYou report to the orchestrator. Answer the assigned subtask directly with concrete results. \
Do **not** open with a long self-introduction, do not restate this entire system message, and do not add \
generic training-data or scope disclaimers unless the subtask itself requires specific caveats.",
chosen.name,
);
let model_for_api = roboticus_core::model::model_name(&effective_model).to_string();
let req = roboticus_llm::format::UnifiedRequest {
model: model_for_api,
messages: vec![
roboticus_llm::format::UnifiedMessage {
role: "system".into(),
content: system_prompt,
parts: None,
},
roboticus_llm::format::UnifiedMessage {
role: "user".into(),
content: subtask.clone(),
parts: None,
},
],
max_tokens: Some(1200),
temperature: None,
system: None,
quality_target: None,
tools: vec![],
};
record_task_event(
state,
&task_id,
&parent_task_id,
None,
roboticus_db::task_events::TaskLifecycleState::Pending,
Some(subtask.chars().take(200).collect()),
Some(serde_json::json!({ "subtask": subtask }).to_string()),
None,
0,
);
record_task_event(
state,
&task_id,
&parent_task_id,
Some(&chosen.name),
roboticus_db::task_events::TaskLifecycleState::Assigned,
Some(format!("Assigned to {}", chosen.name)),
Some(
serde_json::json!({
"subtask": subtask,
"subagent": chosen.name,
})
.to_string(),
),
Some(0.0),
0,
);
record_task_event(
state,
&task_id,
&parent_task_id,
Some(&chosen.name),
roboticus_db::task_events::TaskLifecycleState::Running,
Some(format!(
"Subagent execution started with model {}",
effective_model
)),
Some(
serde_json::json!({
"subtask": subtask,
"subagent": chosen.name,
"model": effective_model,
})
.to_string(),
),
Some(10.0),
0,
);
let budget = {
let config = state.config.read().await;
super::delegated_inference_budget(&config.models.routing)
};
let subtask_start = std::time::Instant::now();
let result = match super::infer_with_fallback_with_budget_and_preferred(
state,
&req,
&effective_model,
budget,
&preferred_fallbacks,
)
.await
{
Ok(r) => r,
Err(err) => {
let retry_target = if timeout_like_error(&err) {
select_cloud_rescue_model(state, &effective_model, &preferred_fallbacks).await
} else {
None
};
if let Some(rescue_model) = retry_target {
tracing::warn!(
subagent = %chosen.name,
from_model = %effective_model,
to_model = %rescue_model,
error = %err,
"delegation timeout guardrail rerouting to cloud model"
);
delegation_notes.push(format!(
"timeout guardrail rerouted delegated subtask from {} to {}",
effective_model, rescue_model
));
record_task_event(
state,
&task_id,
&parent_task_id,
Some(&chosen.name),
roboticus_db::task_events::TaskLifecycleState::Progress,
Some("Retrying delegated subtask with fallback model".into()),
Some(
serde_json::json!({
"subtask": subtask,
"subagent": chosen.name,
"from_model": effective_model,
"to_model": rescue_model,
"error": err,
})
.to_string(),
),
Some(50.0),
0,
);
record_task_event(
state,
&task_id,
&parent_task_id,
Some(&chosen.name),
roboticus_db::task_events::TaskLifecycleState::Retry,
Some("Timeout guardrail initiated retry".into()),
Some(
serde_json::json!({
"subtask": subtask,
"subagent": chosen.name,
"from_model": effective_model,
"to_model": rescue_model,
})
.to_string(),
),
None,
1,
);
effective_model = rescue_model.clone();
let retry_model_for_api =
roboticus_core::model::model_name(&rescue_model).to_string();
let retry_req = roboticus_llm::format::UnifiedRequest {
model: retry_model_for_api,
messages: req.messages.clone(),
max_tokens: req.max_tokens,
temperature: req.temperature,
system: req.system.clone(),
quality_target: req.quality_target,
tools: req.tools.clone(),
};
let retry_budget = {
let config = state.config.read().await;
super::delegated_inference_budget(&config.models.routing)
};
super::infer_with_fallback_with_budget_and_preferred(
state,
&retry_req,
&rescue_model,
retry_budget,
&preferred_fallbacks,
)
.await?
} else {
let typed_err = super::pipeline::delegation::classify_delegation_error(&err);
record_task_event(
state,
&task_id,
&parent_task_id,
Some(&chosen.name),
roboticus_db::task_events::TaskLifecycleState::Failed,
Some(typed_err.to_string()),
Some(
serde_json::json!({
"subtask": subtask,
"subagent": chosen.name,
"error": serde_json::from_str::<serde_json::Value>(&typed_err.to_detail_json())
.unwrap_or_else(|_| serde_json::json!({ "message": typed_err.to_string() })),
})
.to_string(),
),
None,
0,
);
return Err(err);
}
}
};
record_task_event(
state,
&task_id,
&parent_task_id,
Some(&chosen.name),
roboticus_db::task_events::TaskLifecycleState::Completed,
Some(result.content.trim().chars().take(200).collect()),
Some(
serde_json::json!({
"subtask": subtask,
"subagent": chosen.name,
"model": effective_model,
"duration_ms": subtask_start.elapsed().as_millis() as i64,
})
.to_string(),
),
Some(100.0),
0,
);
outputs.push(format!(
"subtask {} -> {}\n{}",
idx + 1,
chosen.name,
result.content.trim()
));
if action == "assign-tasks" || action == "assign_tasks" {
break;
}
}
Ok(format!(
"delegated_subagent={} model={} fallback_models={}{}\n{}",
chosen.name,
effective_model,
serde_json::to_string(&preferred_fallbacks).unwrap_or_else(|_| "[]".to_string()),
if delegation_notes.is_empty() {
String::new()
} else {
format!("\nnotes={}", delegation_notes.join(" | "))
},
outputs.join("\n\n")
))
}
#[cfg(test)]
mod pick_running_subagent_tests {
use super::{PickRunningSubagentMode, pick_running_subagent};
use roboticus_agent::subagents::{AgentInstance, AgentRunState};
use roboticus_db::agents::SubAgentRow;
use std::collections::HashMap;
fn row(name: &str, skills: &[&str]) -> SubAgentRow {
let skills_json = serde_json::to_string(&skills.to_vec()).unwrap();
SubAgentRow {
id: uuid::Uuid::new_v4().to_string(),
name: name.to_string(),
display_name: None,
model: "auto".to_string(),
fallback_models_json: Some("[]".to_string()),
role: "subagent".into(),
description: None,
skills_json: Some(skills_json),
enabled: true,
session_count: 0,
last_used_at: None,
}
}
fn row_with_identity(
name: &str,
display_name: Option<&str>,
description: Option<&str>,
skills: &[&str],
) -> SubAgentRow {
let mut row = row(name, skills);
row.display_name = display_name.map(str::to_string);
row.description = description.map(str::to_string);
row
}
fn running(names: &[&str]) -> HashMap<String, AgentInstance> {
names
.iter()
.map(|n| {
(
n.to_ascii_lowercase(),
AgentInstance {
id: (*n).to_string(),
name: (*n).to_string(),
model: "m".into(),
state: AgentRunState::Running,
session_count: 0,
started_at: None,
last_error: None,
},
)
})
.collect()
}
#[test]
fn strict_mode_rejects_zero_overlap_when_multiple_running() {
let defi = row("defi-specialist", &["defi", "solidity", "audit"]);
let rust = row("rust-specialist", &["rust", "cargo"]);
let taskable = [defi, rust];
let rt = running(&["defi-specialist", "rust-specialist"]);
let picked = pick_running_subagent(
"run cargo test and fix failures",
None,
&taskable,
&rt,
PickRunningSubagentMode::Strict,
);
assert!(
picked.is_some_and(|sa| sa.name == "rust-specialist"),
"expected rust specialist for cargo-related task"
);
}
#[test]
fn strict_mode_returns_none_when_no_skill_overlap() {
let defi = row("defi-specialist", &["defi", "ssv", "xion"]);
let taskable = [defi];
let rt = running(&["defi-specialist"]);
let picked = pick_running_subagent(
"refactor the dashboard help modal",
None,
&taskable,
&rt,
PickRunningSubagentMode::Strict,
);
assert!(
picked.is_none(),
"unrelated task must not bind to arbitrary running subagent"
);
}
#[test]
fn lenient_mode_still_falls_back_to_first_running() {
let defi = row("defi-specialist", &["defi"]);
let taskable = [defi];
let rt = running(&["defi-specialist"]);
let picked = pick_running_subagent(
"refactor help modal",
None,
&taskable,
&rt,
PickRunningSubagentMode::Lenient,
);
assert!(picked.is_some_and(|sa| sa.name == "defi-specialist"));
}
#[test]
fn strict_mode_uses_name_and_description_overlap_for_fit() {
let saas = row_with_identity(
"saas-ideator",
Some("SaaS Ideator"),
Some("Generates SaaS ideas for freelancers"),
&["python", "typescript"],
);
let taskable = [saas];
let rt = running(&["saas-ideator"]);
let picked = pick_running_subagent(
"produce one SaaS idea for freelancers",
None,
&taskable,
&rt,
PickRunningSubagentMode::Strict,
);
assert!(picked.is_some_and(|sa| sa.name == "saas-ideator"));
}
}