use super::super::AppState;
use super::super::pipeline_trace::{PipelineTrace, SpanOutcome, ns};
use roboticus_agent::task_state::{DecompositionProposal, TaskStateInput};
pub(super) fn composition_skill_description(skill: &str, user_content: &str) -> String {
let scope = user_content.trim().chars().take(180).collect::<String>();
format!(
"Auto-drafted capability for '{skill}'. Use this skill when the task requires that \
capability in work like: {scope}"
)
}
async fn explicit_specialist_workflow_requested(
user_content: &str,
classifier: &roboticus_llm::semantic_classifier::SemanticClassifier,
) -> bool {
use roboticus_llm::intent_exemplars::{
CAT_SPECIALIST_WORKFLOW, SPECIALIST_WORKFLOW_GUARD_EXEMPLARS,
};
match classifier
.matches_category(
user_content,
SPECIALIST_WORKFLOW_GUARD_EXEMPLARS,
CAT_SPECIALIST_WORKFLOW,
0.80,
)
.await
{
Ok(Some((score, trust))) => {
tracing::debug!(score, ?trust, "specialist workflow detected semantically");
true
}
Ok(None) => false,
Err(e) => {
tracing::debug!(error = %e, "specialist workflow semantic check failed");
false
}
}
}
pub(super) fn derive_specialist_proposal_from_task(
user_content: &str,
) -> super::super::decomposition::SpecialistProposal {
let capabilities = super::super::decomposition::capability_tokens(user_content);
let skills = capabilities.iter().take(6).cloned().collect::<Vec<_>>();
let first = capabilities
.first()
.cloned()
.unwrap_or_else(|| "task".to_string());
let slug = first.replace('_', "-");
let display = first
.split(['-', '_'])
.filter(|s| !s.is_empty())
.map(|part| {
let mut chars = part.chars();
match chars.next() {
Some(c) => format!("{}{}", c.to_ascii_uppercase(), chars.as_str()),
None => String::new(),
}
})
.collect::<Vec<_>>()
.join(" ");
super::super::decomposition::SpecialistProposal {
name: format!("{slug}-specialist"),
display_name: if display.is_empty() {
"Task Specialist".to_string()
} else {
format!("{display} Specialist")
},
description: format!(
"Auto-proposed specialist for task work in: {}",
user_content.chars().take(180).collect::<String>()
),
skills,
model: "auto".to_string(),
}
}
pub(super) fn derive_delegation_plan_from_task(
user_content: &str,
) -> super::super::decomposition::DelegationPlan {
let subtasks = {
let derived = super::super::decomposition::split_subtasks(user_content);
if derived.is_empty() {
vec![user_content.trim().to_string()]
} else {
derived
}
};
super::super::decomposition::DelegationPlan {
subtasks,
rationale:
"explicit specialist workflow requested and existing roster has matching capability fit"
.to_string(),
expected_utility_margin: 0.5,
}
}
pub(super) async fn build_task_state_input(
state: &AppState,
session_id: &str,
user_content: &str,
intents: &[super::super::intent_registry::Intent],
authority: roboticus_core::InputAuthority,
gate_decision: Option<&super::super::decomposition::DecompositionDecision>,
inference_mode: &str,
) -> TaskStateInput {
let explicit_specialist_workflow =
explicit_specialist_workflow_requested(user_content, &state.semantic_classifier).await;
let roster_taskable_agents = roboticus_db::agents::list_sub_agents(&state.db)
.unwrap_or_default()
.into_iter()
.filter(|a| !super::super::is_model_proxy_role(&a.role) && a.enabled)
.collect::<Vec<_>>();
let taskable_agent_count = roster_taskable_agents.len();
let required = super::super::decomposition::capability_tokens(user_content);
let (fit_agent_count, fit_agent_names) = {
let mut names = Vec::new();
let mut count = 0usize;
for agent in &roster_taskable_agents {
let skills = super::super::parse_skills_json(agent.skills_json.as_deref());
let skill_tokens = skills
.iter()
.flat_map(|skill| super::super::decomposition::capability_tokens(skill))
.collect::<std::collections::HashSet<_>>();
let identity_tokens = [
Some(agent.name.as_str()),
agent.display_name.as_deref(),
agent.description.as_deref(),
]
.into_iter()
.flatten()
.flat_map(super::super::decomposition::capability_tokens)
.collect::<std::collections::HashSet<_>>();
if required
.iter()
.any(|token| skill_tokens.contains(token) || identity_tokens.contains(token))
{
count += 1;
names.push(agent.name.clone());
}
}
(count, names)
};
let all_skills = roboticus_db::skills::list_skills(&state.db).unwrap_or_default();
let enabled_skill_count = all_skills.iter().filter(|s| s.enabled).count();
let (matching_skill_count, missing_skills) = {
let skill_registry_names =
crate::api::routes::subagent_integrity::skill_registry_names(state);
let mut matched = 0usize;
let mut matched_tokens = std::collections::HashSet::new();
for skill in all_skills.iter().filter(|s| s.enabled) {
let triggers: Vec<String> = skill
.triggers_json
.as_deref()
.and_then(|t| serde_json::from_str::<Vec<String>>(t).ok())
.unwrap_or_default();
let trigger_tokens: std::collections::HashSet<String> = triggers
.iter()
.flat_map(|t| super::super::decomposition::capability_tokens(t))
.collect();
if required.iter().any(|token| trigger_tokens.contains(token)) {
matched += 1;
for token in &trigger_tokens {
matched_tokens.insert(token.clone());
}
}
}
let missing: Vec<String> = required
.iter()
.filter(|token| {
!matched_tokens.contains(*token)
&& !skill_registry_names.contains(&token.to_ascii_lowercase())
})
.cloned()
.collect();
(matched, missing)
};
let decomposition_proposal = gate_decision.map(|decision| match decision {
super::super::decomposition::DecompositionDecision::Centralized {
rationale,
expected_utility_margin,
} => DecompositionProposal {
should_delegate: false,
rationale: rationale.clone(),
utility_margin: *expected_utility_margin,
},
super::super::decomposition::DecompositionDecision::Delegated(plan) => {
DecompositionProposal {
should_delegate: true,
rationale: plan.rationale.clone(),
utility_margin: plan.expected_utility_margin,
}
}
super::super::decomposition::DecompositionDecision::RequiresSpecialistCreation {
rationale,
..
} => DecompositionProposal {
should_delegate: true,
rationale: rationale.clone(),
utility_margin: 0.5,
},
});
let intent_strings: Vec<String> = intents.iter().map(|i| format!("{i:?}")).collect();
let (recent_response_skeletons, recent_user_message_lengths, self_echo_fragments) = {
let history = roboticus_db::sessions::list_recent_messages(&state.db, session_id, 10)
.unwrap_or_default();
let mut skeletons: Vec<String> = Vec::new();
let mut user_lengths: Vec<usize> = Vec::new();
let mut echo_fragments: Vec<String> = Vec::new();
for msg in &history {
if msg.role == "assistant" {
skeletons.push(roboticus_agent::task_state::response_skeleton(&msg.content));
let mut frags = roboticus_agent::task_state::extract_echo_fragments(&msg.content);
echo_fragments.append(&mut frags);
} else if msg.role == "user" {
user_lengths.push(msg.content.split_whitespace().count());
}
}
let skeletons = skeletons
.into_iter()
.rev()
.take(5)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect();
let user_lengths = user_lengths
.into_iter()
.rev()
.take(5)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect();
echo_fragments.dedup();
let echo_fragments = echo_fragments.into_iter().take(10).collect();
(skeletons, user_lengths, echo_fragments)
};
let mcp_tools_available = state.mcp_server.read().await.tool_count() > 0;
let provider_breaker_open = {
let llm = state.llm.read().await;
let cfg = state.config.read().await;
let primary = &cfg.models.primary;
let primary_provider = primary.split('/').next().unwrap_or(primary.as_str());
let primary_blocked = llm.breakers.is_blocked(primary_provider);
if !primary_blocked {
false
} else {
let fallback_available = cfg.models.fallbacks.iter().any(|m| {
let p = m.split('/').next().unwrap_or(m.as_str());
!llm.breakers.is_blocked(p)
});
!fallback_available }
};
let remaining_budget_tokens = {
let cfg = state.config.read().await;
cfg.context_budget.l0
};
let named_tool_match = {
let content_lower = user_content.to_ascii_lowercase();
let content_normalized = content_lower.replace(['-', '_'], " ");
state.tools.list().iter().any(|tool| {
if tool.plugin_owner().is_none() {
return false;
}
let name = tool.name().to_ascii_lowercase();
let name_normalized = name.replace(['-', '_'], " ");
if name_normalized.split_whitespace().count() < 2 {
return false;
}
content_normalized.contains(&name_normalized) || content_lower.contains(&name)
})
};
TaskStateInput {
user_content: user_content.to_string(),
intents: intent_strings,
authority: format!("{authority:?}"),
retrieval_metrics: None, tool_search_stats: None, mcp_tools_available,
taskable_agent_count,
fit_agent_count,
fit_agent_names,
enabled_skill_count,
matching_skill_count,
missing_skills,
remaining_budget_tokens,
provider_breaker_open,
inference_mode: inference_mode.to_string(),
decomposition_proposal,
explicit_specialist_workflow,
named_tool_match,
recent_response_skeletons,
recent_user_message_lengths,
self_echo_fragments,
declared_action: None, previous_turn_had_protocol_issues: {
let msgs = roboticus_db::sessions::list_recent_messages(&state.db, session_id, 2)
.unwrap_or_default();
msgs.iter()
.find(|m| m.role == "assistant")
.map(|m| {
super::super::guard_registry::contains_internal_protocol_marker(&m.content)
})
.unwrap_or(false)
},
normalization_retry_streak: {
let msgs = roboticus_db::sessions::list_recent_messages(&state.db, session_id, 10)
.unwrap_or_default();
msgs.iter()
.filter(|m| m.role == "assistant")
.take_while(|m| {
super::super::guard_registry::contains_internal_protocol_marker(&m.content)
})
.count() as u8
},
}
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn resolve_specialist_creation_for_task(
state: &AppState,
turn_id: &str,
channel_label: &str,
authority: roboticus_core::InputAuthority,
session_id: &str,
user_content: &str,
gate_decision: super::super::decomposition::DecompositionDecision,
pipeline_trace: &mut PipelineTrace,
) -> (
super::super::decomposition::DecompositionDecision,
Option<String>,
) {
use super::super::decomposition::{DecompositionDecision, apply_decomposition_decision};
let DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale,
} = gate_decision
else {
return (gate_decision, None);
};
if authority < roboticus_core::InputAuthority::Creator {
return (
DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale,
},
None,
);
}
let composition_policy = {
let cfg = state.config.read().await;
cfg.agent.composition_policy
};
if !matches!(
composition_policy,
roboticus_core::config::CompositionPolicy::Autonomous
) {
return (
DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale,
},
None,
);
}
pipeline_trace.begin_stage("specialist_composition");
pipeline_trace.annotate_ns(
ns::DELEGATION,
"auto_compose_attempted",
serde_json::json!(true),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"specialist_name",
serde_json::json!(proposal.name.clone()),
);
let registry = crate::api::routes::subagent_integrity::skill_registry_names(state);
let missing_skills = proposal
.skills
.iter()
.filter(|skill| !registry.contains(&skill.to_ascii_lowercase()))
.cloned()
.collect::<Vec<_>>();
pipeline_trace.annotate_ns(
ns::DELEGATION,
"missing_skill_count",
serde_json::json!(missing_skills.len()),
);
for skill in &missing_skills {
let params = serde_json::json!({
"name": skill,
"description": composition_skill_description(skill, user_content),
"kind": "instruction",
"triggers": [skill],
});
if let Err(err) = super::super::execute_tool_call(
state,
"compose-skill",
¶ms,
turn_id,
authority,
Some(channel_label),
)
.await
{
pipeline_trace.annotate_ns(
ns::DELEGATION,
"auto_compose_error",
serde_json::json!(format!("compose-skill:{skill}:{err}")),
);
pipeline_trace.end_stage(SpanOutcome::Error("compose-skill".into()));
return (
DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale,
},
None,
);
}
}
let compose_params = serde_json::json!({
"name": proposal.name,
"display_name": proposal.display_name,
"description": proposal.description,
"skills": proposal.skills,
"model": proposal.model,
});
if let Err(err) = super::super::execute_tool_call(
state,
"compose-subagent",
&compose_params,
turn_id,
authority,
Some(channel_label),
)
.await
{
pipeline_trace.annotate_ns(
ns::DELEGATION,
"auto_compose_error",
serde_json::json!(format!("compose-subagent:{err}")),
);
pipeline_trace.end_stage(SpanOutcome::Error("compose-subagent".into()));
return (
DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale,
},
None,
);
}
let features = roboticus_llm::extract_features(user_content, 0, 1);
let complexity = roboticus_llm::classify_complexity(&features);
let refreshed =
super::super::decomposition::evaluate_decomposition_gate(state, user_content, complexity)
.await;
let workflow_note =
match apply_decomposition_decision(state, &refreshed, session_id, channel_label).await {
super::super::decomposition::DecompositionOutcome::SpecialistProposalPending {
..
} => None,
super::super::decomposition::DecompositionOutcome::Centralized => None,
super::super::decomposition::DecompositionOutcome::Delegated { workflow_note } => {
Some(workflow_note)
}
};
pipeline_trace.annotate_ns(
ns::DELEGATION,
"auto_compose_completed",
serde_json::json!(true),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"refreshed_decision",
serde_json::json!(match &refreshed {
DecompositionDecision::Centralized { .. } => "centralized",
DecompositionDecision::Delegated(_) => "delegated",
DecompositionDecision::RequiresSpecialistCreation { .. } =>
"requires_specialist_creation",
}),
);
pipeline_trace.end_stage(SpanOutcome::Ok);
(refreshed, workflow_note)
}
#[cfg(test)]
pub(in super::super) async fn build_task_state_input_for_test(
state: &AppState,
session_id: &str,
user_content: &str,
intents: &[super::super::intent_registry::Intent],
authority: roboticus_core::InputAuthority,
gate_decision: Option<&super::super::decomposition::DecompositionDecision>,
inference_mode: &str,
) -> TaskStateInput {
build_task_state_input(
state,
session_id,
user_content,
intents,
authority,
gate_decision,
inference_mode,
)
.await
}