use super::super::AppState;
use super::super::core;
use super::super::decomposition::DelegationProvenance;
use super::super::flight_recorder::{ReactStep, ToolSource};
use super::super::guards::DedupGuard;
use super::super::pipeline_trace::{PipelineTrace, SpanOutcome, ns};
use super::config::{AuthorityMode, InferenceMode, SessionResolutionMode};
use super::delegation::classify_delegation_error;
use super::task_state::{
build_task_state_input, derive_delegation_plan_from_task, derive_specialist_proposal_from_task,
resolve_specialist_creation_for_task,
};
use super::trace_helpers::{annotate_mcp_call, annotate_retrieval_metrics, annotate_tool_search};
use super::types::{
MAX_MESSAGE_BYTES, PipelineError, PipelineOutcome, PipelineRequest, StreamContext,
};
use roboticus_agent::action_planner::PlannedAction;
use super::heuristics::contextualize_short_followup;
fn annotate_mcp_calls_from_react_trace(
pipeline_trace: &mut PipelineTrace,
react_trace: &super::super::flight_recorder::ReactTrace,
) {
for step in &react_trace.steps {
if let ReactStep::ToolCall {
tool_name,
duration_ms,
success,
source: ToolSource::Mcp { server },
..
} = step
{
annotate_mcp_call(pipeline_trace, server, tool_name, *duration_ms, *success);
}
}
}
fn persist_pipeline_trace(
state: &AppState,
session_id: &str,
pipeline_trace: &PipelineTrace,
react_trace: &super::super::flight_recorder::ReactTrace,
) {
let trace_id = uuid::Uuid::new_v4().to_string();
let total_ms: i64 = pipeline_trace
.stages
.iter()
.map(|stage| stage.duration_ms.unwrap_or(0) as i64)
.sum();
let stages_json = serde_json::to_string(&pipeline_trace.stages).unwrap_or_else(|_| "[]".into());
let trace_row = roboticus_db::traces::PipelineTraceRow {
id: trace_id.clone(),
turn_id: pipeline_trace.turn_id.clone(),
session_id: session_id.to_string(),
channel: pipeline_trace.channel.clone(),
total_ms,
stages_json,
created_at: chrono::Utc::now().to_rfc3339(),
};
if let Err(e) = roboticus_db::traces::save_pipeline_trace(&state.db, &trace_row) {
tracing::warn!(
turn_id = %pipeline_trace.turn_id,
error = %e,
"failed to save pipeline trace"
);
return;
}
let react_json = serde_json::to_string(react_trace).unwrap_or_default();
if let Err(e) = roboticus_db::traces::save_react_trace(&state.db, &trace_id, &react_json) {
tracing::warn!(
turn_id = %pipeline_trace.turn_id,
error = %e,
"failed to save react trace"
);
}
}
#[tracing::instrument(skip_all, fields(channel = %request.config.channel_label))]
pub(in super::super) async fn run_pipeline(
request: PipelineRequest<'_>,
) -> Result<PipelineOutcome, PipelineError> {
let state = request.state;
let config = &request.config;
let channel_label = &config.channel_label;
let turn_id = uuid::Uuid::new_v4().to_string();
let mut pipeline_trace = PipelineTrace::new(&turn_id, channel_label.as_str());
pipeline_trace.begin_stage("input_validation");
if request.raw_content.trim().is_empty() {
return Err(PipelineError::BadRequest(
"message content cannot be empty".into(),
));
}
if request.raw_content.len() > MAX_MESSAGE_BYTES {
return Err(PipelineError::PayloadTooLarge(format!(
"message content exceeds maximum length ({MAX_MESSAGE_BYTES} bytes)"
)));
}
pipeline_trace.end_stage(SpanOutcome::Ok);
pipeline_trace.begin_stage("injection_defense");
let threat = if config.injection_defense {
roboticus_agent::injection::check_injection(request.raw_content)
} else {
roboticus_agent::injection::ThreatScore::new(0.0)
};
if threat.is_blocked() {
pipeline_trace.end_stage(SpanOutcome::Error("blocked".into()));
return Err(PipelineError::InjectionBlocked {
threat_score: threat.value(),
});
}
let threat_is_caution = threat.is_caution();
let user_content = if threat_is_caution {
tracing::info!(score = threat.value(), "Sanitizing caution-level input");
roboticus_agent::injection::sanitize(request.raw_content)
} else {
request.raw_content.to_string()
};
pipeline_trace.end_stage(if threat_is_caution {
SpanOutcome::Error("caution".into())
} else {
SpanOutcome::Ok
});
pipeline_trace.begin_stage("dedup_tracking");
let dedup_guard = if config.dedup_tracking {
let dedup_fp = roboticus_llm::DedupTracker::fingerprint(
"",
&[roboticus_llm::format::UnifiedMessage {
role: "user".into(),
content: user_content.clone(),
parts: None,
}],
);
{
let mut llm = state.llm.write().await;
if !llm.dedup.check_and_track(&dedup_fp) {
return Err(PipelineError::DuplicateRequest);
}
}
DedupGuard {
llm: std::sync::Arc::clone(&state.llm),
fingerprint: dedup_fp,
}
} else {
DedupGuard {
llm: std::sync::Arc::clone(&state.llm),
fingerprint: String::new(), }
};
pipeline_trace.end_stage(SpanOutcome::Ok);
pipeline_trace.begin_stage("session_resolution");
let cfg = state.config.read().await;
let agent_id = cfg.agent.id.clone();
let session_id = match &config.session_resolution {
SessionResolutionMode::FromBody => {
if let Some(sid) = request.session_id_hint {
match roboticus_db::sessions::get_session(&state.db, sid) {
Ok(Some(session)) if session.agent_id.eq_ignore_ascii_case(&agent_id) => {
if roboticus_db::sessions::is_non_interactive(&state.db, sid)
.unwrap_or(false)
{
return Err(PipelineError::SessionError(
"this session is non-interactive (cron/subagent) and read-only"
.into(),
axum::http::StatusCode::FORBIDDEN,
));
}
if let Some(ref scope_key) = session.scope_key
&& let Some(sc) = extract_channel_from_scope_key(scope_key)
&& sc != channel_label
&& !session.cross_channel_consent
{
return Err(PipelineError::SessionError(
"cross-channel session access requires consent".into(),
axum::http::StatusCode::FORBIDDEN,
));
}
sid.to_string()
}
Ok(Some(_)) => {
return Err(PipelineError::SessionError(
"session does not belong to this agent".into(),
axum::http::StatusCode::FORBIDDEN,
));
}
Ok(None) => {
return Err(PipelineError::SessionError(
"session not found".into(),
axum::http::StatusCode::NOT_FOUND,
));
}
Err(e) => {
tracing::error!(error = %e, "failed to retrieve session");
return Err(PipelineError::Internal("session lookup failed".into()));
}
}
} else if let Some(ref scope) = request.scope_hint {
match roboticus_db::sessions::find_or_create(&state.db, &agent_id, Some(scope)) {
Ok(sid) => sid,
Err(e) => {
tracing::error!(error = %e, "failed to create session");
return Err(PipelineError::Internal("session creation failed".into()));
}
}
} else {
match roboticus_db::sessions::find_or_create(&state.db, &agent_id, None) {
Ok(sid) => sid,
Err(e) => {
tracing::error!(error = %e, "failed to create session");
return Err(PipelineError::Internal("session creation failed".into()));
}
}
}
}
SessionResolutionMode::FromChannel { .. } => {
let scope = request.scope_hint.as_ref();
match roboticus_db::sessions::find_or_create(&state.db, &agent_id, scope) {
Ok(sid) => sid,
Err(e) => {
tracing::error!(error = %e, "failed to create channel session");
return Err(PipelineError::Internal("session creation failed".into()));
}
}
}
SessionResolutionMode::Dedicated => {
let content_hash = {
use std::hash::{DefaultHasher, Hash, Hasher};
let mut h = DefaultHasher::new();
request.raw_content.hash(&mut h);
format!("{:016x}", h.finish())
};
let content_hash = &content_hash[..12];
let stable_scope = roboticus_db::sessions::SessionScope::Peer {
peer_id: format!("{agent_id}-task-{content_hash}"),
channel: "dedicated".into(),
};
match roboticus_db::sessions::find_or_create(&state.db, &agent_id, Some(&stable_scope))
{
Ok(sid) => {
let task_nick = config.session_nickname_override.clone().unwrap_or_else(|| {
roboticus_db::sessions::derive_nickname(request.raw_content)
});
let _ = roboticus_db::sessions::update_nickname(&state.db, &sid, &task_nick);
let _ = roboticus_db::sessions::set_non_interactive(&state.db, &sid, true);
sid
}
Err(e) => {
tracing::error!(error = %e, "failed to create dedicated session");
return Err(PipelineError::Internal("session creation failed".into()));
}
}
}
#[cfg(test)]
SessionResolutionMode::Provided { session_id } => session_id.clone(),
};
let (user_content, is_correction_turn) = if config.short_followup_expansion {
let (expanded, correction) =
contextualize_short_followup(&state.db, &session_id, &user_content);
(expanded, request.is_correction_turn || correction)
} else {
(user_content, request.is_correction_turn)
};
pipeline_trace.end_stage(SpanOutcome::Ok);
let topic_tag = {
let recent = roboticus_db::sessions::list_recent_messages(&state.db, &session_id, 3)
.unwrap_or_default();
if recent.is_empty() {
Some("topic-1".to_string())
} else {
let current_tag = recent
.iter()
.find_map(|m| m.topic_tag.as_deref())
.unwrap_or("topic-1");
let max_topic_num: u32 = recent
.iter()
.filter_map(|m| m.topic_tag.as_deref())
.filter_map(|t| t.strip_prefix("topic-").and_then(|n| n.parse().ok()))
.max()
.unwrap_or(1);
let recent_content: Vec<&str> = recent
.iter()
.filter(|m| m.role == "assistant")
.map(|m| m.content.as_str())
.collect();
let is_continuation = recent_content.iter().any(|prev| {
roboticus_agent::compaction::text_overlap_score(request.raw_content, prev) > 0.3 });
if is_continuation {
Some(current_tag.to_string())
} else {
Some(format!("topic-{}", max_topic_num + 1))
}
}
};
let user_message_id = match roboticus_db::sessions::append_message_with_topic(
&state.db,
&session_id,
"user",
request.raw_content,
topic_tag.as_deref(),
) {
Ok(id) => id,
Err(e) => {
tracing::error!(error = %e, "failed to store user message");
return Err(PipelineError::Internal("message storage failed".into()));
}
};
if config.nickname_refinement
&& let Ok(Some(s)) = roboticus_db::sessions::get_session(&state.db, &session_id)
&& s.nickname.is_none()
{
let nick = roboticus_db::sessions::derive_nickname(request.raw_content);
let _ = roboticus_db::sessions::update_nickname(&state.db, &session_id, &nick);
}
if let Err(e) = roboticus_db::sessions::create_turn_with_id(
&state.db,
&turn_id,
&session_id,
None,
None,
None,
None,
) {
tracing::warn!(error = %e, "failed to pre-create turn record");
}
pipeline_trace.begin_stage("decomposition_gate");
let mut gate_decision = if config.decomposition_gate {
let features = roboticus_llm::extract_features(&user_content, 0, 1);
let complexity = roboticus_llm::classify_complexity(&features);
let decision = super::super::decomposition::evaluate_decomposition_gate(
state,
&user_content,
complexity,
)
.await;
Some(decision)
} else {
None
};
pipeline_trace.end_stage(if gate_decision.is_some() {
SpanOutcome::Ok
} else {
SpanOutcome::Skipped
});
let authority = match config.authority_mode {
AuthorityMode::ApiClaim => {
let claim = roboticus_core::security::resolve_api_claim(
threat_is_caution,
channel_label,
&cfg.security,
);
tracing::debug!(
authority = ?claim.authority,
sources = ?claim.sources,
"Pipeline authority resolved (ApiClaim)"
);
claim.authority
}
AuthorityMode::ChannelClaim => {
if let Some(ref ctx) = request.channel_context {
let claim = roboticus_core::security::resolve_channel_claim(
&roboticus_core::security::ChannelContext {
sender_id: &ctx.sender_id,
chat_id: &ctx.chat_id,
channel: &ctx.platform,
sender_in_allowlist: ctx.sender_in_allowlist,
allowlist_configured: ctx.allowlist_configured,
threat_is_caution,
trusted_sender_ids: &ctx.trusted_sender_ids,
},
&cfg.security,
);
if claim.threat_downgraded {
tracing::info!(
effective_authority = ?claim.authority,
"Threat-score ceiling applied"
);
}
claim.authority
} else {
tracing::warn!(
"ChannelClaim mode but no channel_context provided — falling back to SelfGenerated"
);
roboticus_core::InputAuthority::SelfGenerated
}
}
AuthorityMode::SelfGenerated => roboticus_core::InputAuthority::SelfGenerated,
};
drop(cfg);
let intents = super::super::intent_registry::IntentRegistry::default_registry()
.classify_semantic(
&user_content,
&state.semantic_classifier,
super::super::intent_registry::INTENT_THRESHOLD,
)
.await;
let mut delegation_workflow_note = None;
let inference_mode_label = match config.inference_mode {
InferenceMode::Standard => "standard",
InferenceMode::Streaming => "streaming",
};
state.event_bus.publish(
serde_json::json!({
"type": "agent_working",
"workstation": "pipeline",
"activity": "classifying turn",
"session_id": session_id,
})
.to_string(),
);
let task_input = build_task_state_input(
state,
&session_id,
&user_content,
&intents,
authority,
gate_decision.as_ref(),
inference_mode_label,
)
.await;
let task_state = roboticus_agent::task_state::synthesize(&task_input);
let is_first_turn = task_input.recent_response_skeletons.is_empty()
&& task_input.recent_user_message_lengths.is_empty();
let retrieval_decision = roboticus_agent::retrieval_strategy::decide_retrieval_strategy(
&task_input.intents,
&task_state,
&user_content,
is_first_turn,
);
tracing::info!(
retrieval_strategy = retrieval_decision.strategy.as_str(),
confidence = retrieval_decision.confidence,
signals = %retrieval_decision.signals.join(","),
"retrieval strategy selected"
);
state.event_bus.publish(
serde_json::json!({
"type": "agent_working",
"workstation": "pipeline",
"activity": format!(
"checking memory ({} retrieved, {:.2} avg)",
task_state.memory_confidence.retrieval_count,
task_state.memory_confidence.avg_similarity,
),
"session_id": session_id,
})
.to_string(),
);
if task_state.classification == roboticus_agent::task_state::TaskClassification::Task {
state.event_bus.publish(
serde_json::json!({
"type": "agent_working",
"workstation": "pipeline",
"activity": format!(
"reviewing roster ({} specialists)",
task_state.roster_fit.taskable_count,
),
"session_id": session_id,
})
.to_string(),
);
}
let execution_plan = roboticus_agent::action_planner::plan(&task_state, &task_input);
state.event_bus.publish(
serde_json::json!({
"type": "agent_working",
"workstation": "pipeline",
"activity": format!(
"planning: {:?} (confidence {:.1})",
execution_plan.selected,
execution_plan.candidates.first().map_or(0.0, |c| c.confidence),
),
"session_id": session_id,
})
.to_string(),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"task_mode",
serde_json::json!(match task_state.classification {
roboticus_agent::task_state::TaskClassification::Conversation => "conversation",
roboticus_agent::task_state::TaskClassification::Task => "task",
}),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"planner_next_action",
serde_json::json!(match execution_plan.selected {
PlannedAction::AnswerDirectly => "answer_directly",
PlannedAction::ContinueCentralized => "continue_centralized",
PlannedAction::ComposeSubagent => "compose_specialist",
PlannedAction::DelegateToSpecialist => "delegate_to_specialist",
PlannedAction::InspectMemory => "inspect_memory",
PlannedAction::ComposeSkill => "compose_skill",
PlannedAction::ReturnBlocker => "return_blocker",
PlannedAction::NormalizationRetry => "normalization_retry",
}),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"planner_rationale",
serde_json::json!(&execution_plan.selected_rationale),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"roster_taskable_count",
serde_json::json!(task_state.roster_fit.taskable_count),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"specialist_fit_count",
serde_json::json!(task_state.roster_fit.fit_count),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"enabled_skill_count",
serde_json::json!(task_state.skill_fit.enabled_count),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"explicit_specialist_workflow",
serde_json::json!(task_input.explicit_specialist_workflow),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"task_authority",
serde_json::json!(&task_input.authority),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"inputs.classification",
serde_json::json!(format!("{:?}", task_state.classification)),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"inputs.memory_avg_similarity",
serde_json::json!(task_state.memory_confidence.avg_similarity),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"inputs.memory_retrieval_count",
serde_json::json!(task_state.memory_confidence.retrieval_count),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"inputs.memory_recall_gap",
serde_json::json!(task_state.memory_confidence.recall_gap),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"inputs.tool_available_count",
serde_json::json!(task_state.tool_fit.available_count),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"inputs.roster_taskable_count",
serde_json::json!(task_state.roster_fit.taskable_count),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"inputs.roster_fit_count",
serde_json::json!(task_state.roster_fit.fit_count),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"candidates.count",
serde_json::json!(execution_plan.candidates.len()),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"selected.action",
serde_json::json!(format!("{:?}", execution_plan.selected)),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"selected.rationale",
serde_json::json!(&execution_plan.selected_rationale),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"behavior.structural_repetition",
serde_json::json!(task_state.behavioral_history.structural_repetition),
);
if task_state.behavioral_history.structural_repetition {
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"behavior.repetition_streak",
serde_json::json!(task_state.behavioral_history.repetition_streak),
);
if let Some(ref pattern) = task_state.behavioral_history.repeated_pattern {
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"behavior.repeated_pattern",
serde_json::json!(pattern),
);
}
}
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"behavior.engagement_declining",
serde_json::json!(task_state.behavioral_history.engagement_declining),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"behavior.self_echo_risk",
serde_json::json!(task_state.behavioral_history.self_echo_risk),
);
if let Some(ref hint) = task_state.behavioral_history.variation_hint {
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"behavior.variation_hint",
serde_json::json!(hint),
);
}
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"declared_action.detected",
serde_json::json!(task_state.declared_action.action_declared),
);
if let Some(ref action) = task_state.declared_action.action {
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"declared_action.verb",
serde_json::json!(&action.verb),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"declared_action.target",
serde_json::json!(&action.target),
);
pipeline_trace.annotate_ns(
ns::TASK_STATE,
"declared_action.high_consequence",
serde_json::json!(task_state.declared_action.high_consequence),
);
}
pipeline_trace.annotate_ns(
ns::RETRIEVAL_STRATEGY,
"selected",
serde_json::json!(retrieval_decision.strategy.as_str()),
);
pipeline_trace.annotate_ns(
ns::RETRIEVAL_STRATEGY,
"confidence",
serde_json::json!(retrieval_decision.confidence),
);
pipeline_trace.annotate_ns(
ns::RETRIEVAL_STRATEGY,
"signals",
serde_json::json!(retrieval_decision.signals.join(",")),
);
let mapped_gate_decision = match execution_plan.selected {
PlannedAction::DelegateToSpecialist => {
match gate_decision.take() {
Some(super::super::decomposition::DecompositionDecision::Delegated(plan)) => {
Some(super::super::decomposition::DecompositionDecision::Delegated(plan))
}
_ => Some(
super::super::decomposition::DecompositionDecision::Delegated(
derive_delegation_plan_from_task(&user_content),
),
),
}
}
PlannedAction::ComposeSubagent => {
match gate_decision.take() {
Some(
super::super::decomposition::DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale,
},
) => Some(
super::super::decomposition::DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale,
},
),
_ => Some(
super::super::decomposition::DecompositionDecision::RequiresSpecialistCreation {
proposal: derive_specialist_proposal_from_task(&user_content),
rationale: execution_plan.selected_rationale.clone(),
},
),
}
}
_ => None,
};
gate_decision = mapped_gate_decision;
let auto_compose = matches!(execution_plan.selected, PlannedAction::ComposeSubagent)
&& !config.specialist_controls
&& matches!(
gate_decision,
Some(
super::super::decomposition::DecompositionDecision::RequiresSpecialistCreation { .. }
)
);
if let Some(decision_for_resolution) = gate_decision.take() {
let resolved_decision = if auto_compose {
let (resolved, workflow_note) = resolve_specialist_creation_for_task(
state,
&turn_id,
channel_label,
authority,
&session_id,
&user_content,
decision_for_resolution,
&mut pipeline_trace,
)
.await;
delegation_workflow_note = workflow_note;
resolved
} else {
decision_for_resolution
};
if delegation_workflow_note.is_none() {
let outcome = super::super::decomposition::apply_decomposition_decision(
state,
&resolved_decision,
&session_id,
channel_label,
)
.await;
delegation_workflow_note = match outcome {
super::super::decomposition::DecompositionOutcome::SpecialistProposalPending {
prompt,
} => {
if config.specialist_controls {
return Ok(PipelineOutcome::SpecialistProposal { session_id, prompt });
}
tracing::debug!("specialist proposal suppressed (specialist_controls=false)");
None
}
super::super::decomposition::DecompositionOutcome::Centralized => None,
super::super::decomposition::DecompositionOutcome::Delegated { workflow_note } => {
Some(workflow_note)
}
};
}
gate_decision = Some(resolved_decision);
let resolved_input = build_task_state_input(
state,
&session_id,
&user_content,
&intents,
authority,
gate_decision.as_ref(),
inference_mode_label,
)
.await;
let resolved_state = roboticus_agent::task_state::synthesize(&resolved_input);
let resolved_plan = roboticus_agent::action_planner::plan(&resolved_state, &resolved_input);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"resolved_task_next_action",
serde_json::json!(match resolved_plan.selected {
PlannedAction::AnswerDirectly => "answer_directly",
PlannedAction::ContinueCentralized => "continue_centralized",
PlannedAction::ComposeSubagent => "compose_specialist",
PlannedAction::DelegateToSpecialist => "delegate_to_specialist",
PlannedAction::InspectMemory => "inspect_memory",
PlannedAction::ComposeSkill => "compose_skill",
PlannedAction::ReturnBlocker => "return_blocker",
PlannedAction::NormalizationRetry => "normalization_retry",
}),
);
}
let normalization_note = if execution_plan.selected == PlannedAction::NormalizationRetry {
Some(
"[Protocol Correction] Your previous response contained malformed tool protocol. \
When you need to use a tool, emit a proper tool_use block — do NOT narrate the \
tool call as text, do NOT write JSON tool payloads inline, and do NOT describe \
what you would do if you had access to a tool. Either call the tool correctly \
or respond without tools."
.to_string(),
)
} else {
None
};
let gate_system_note = gate_decision
.as_ref()
.map(|d| {
super::super::decomposition::build_gate_system_note(
d,
delegation_workflow_note.as_deref(),
)
})
.or(normalization_note);
let mut delegation_provenance = DelegationProvenance::default();
let (delegated_execution_note, delegated_execution_result) = if config.delegated_execution {
if let Some(super::super::decomposition::DecompositionDecision::Delegated(plan)) =
&gate_decision
{
pipeline_trace.begin_stage("delegated_execution");
pipeline_trace.annotate_ns(
ns::DELEGATION,
"subtask_count",
serde_json::json!(plan.subtasks.len()),
);
pipeline_trace.annotate_ns(ns::DELEGATION, "pattern", serde_json::json!("fan-out"));
let delegated_params = serde_json::json!({
"task": user_content,
"subtasks": plan.subtasks,
});
let deleg_start = std::time::Instant::now();
match super::super::execute_tool_call(
state,
"orchestrate-subagents",
&delegated_params,
&turn_id,
authority,
Some(channel_label),
)
.await
{
Ok(output) => {
let output = super::super::guards::strip_internal_delegation_metadata(&output);
delegation_provenance.subagent_task_started = true;
let config_snapshot = state.config.read().await;
let should_evaluate = match config_snapshot.agent.output_validation_policy {
roboticus_core::config::OutputValidationPolicy::Strict => true,
roboticus_core::config::OutputValidationPolicy::Sample => {
let sample_rate = config_snapshot.agent.output_validation_sample_rate;
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| (d.as_nanos() % 100) < (sample_rate * 100.0) as u128)
.unwrap_or(true)
}
roboticus_core::config::OutputValidationPolicy::Off => false,
};
drop(config_snapshot);
let mut final_output = output.clone();
let mut quality_score_val: Option<f64> = None;
let mut retry_count_val: i32 = 0;
if should_evaluate && !output.trim().is_empty() {
let task_desc: String = user_content.chars().take(500).collect();
let verdict =
super::quality_gate::evaluate_output_heuristic(&task_desc, &output);
match verdict {
super::quality_gate::OutputVerdict::Pass { score } => {
quality_score_val = Some(f64::from(score) / 100.0);
}
super::quality_gate::OutputVerdict::Retry { feedback, score } => {
tracing::info!(
score,
feedback = %feedback,
"delegation output quality gate: retrying delegated execution"
);
quality_score_val = Some(f64::from(score) / 100.0);
retry_count_val = 1;
let retry_params = serde_json::json!({
"task": format!(
"{user_content}\n\nQuality gate feedback: {feedback}\nReturn a more complete, concrete final result."
),
"subtasks": plan.subtasks,
});
match super::super::execute_tool_call(
state,
"orchestrate-subagents",
&retry_params,
&turn_id,
authority,
Some(channel_label),
)
.await
{
Ok(retry_output) => {
let retry_output = super::super::guards::strip_internal_delegation_metadata(
&retry_output,
);
let retry_verdict =
super::quality_gate::evaluate_output_heuristic(
&task_desc,
&retry_output,
);
match retry_verdict {
super::quality_gate::OutputVerdict::Pass { score } => {
quality_score_val = Some(f64::from(score) / 100.0);
final_output = retry_output;
}
super::quality_gate::OutputVerdict::Retry {
feedback,
score,
} => {
quality_score_val = Some(f64::from(score) / 100.0);
final_output = format!(
"[ESCALATED AFTER RETRY: Subagent output still did not meet quality threshold. Score: {}/100. Feedback: {}]\n\n{}",
score, feedback, retry_output
);
}
super::quality_gate::OutputVerdict::Escalate {
reason,
score,
} => {
quality_score_val = Some(f64::from(score) / 100.0);
final_output = format!(
"[ESCALATED AFTER RETRY: Subagent output did not meet quality threshold. Score: {}/100. Reason: {}]\n\n{}",
score, reason, retry_output
);
}
}
}
Err(err) => {
tracing::warn!(
error = %err,
"delegation quality-gate retry failed"
);
final_output = format!(
"[ESCALATED AFTER RETRY FAILURE: Delegated retry failed: {}]\n\n{}",
err, output
);
}
}
}
super::quality_gate::OutputVerdict::Escalate { reason, score } => {
quality_score_val = Some(f64::from(score) / 100.0);
final_output = format!(
"[ESCALATED: Subagent output did not meet quality threshold. \
Score: {}/100. Reason: {}]\n\n{}",
score, reason, output
);
}
}
}
delegation_provenance.subagent_task_completed = true;
delegation_provenance.subagent_result_attached =
!final_output.trim().is_empty();
{
let delegation_duration_ms = deleg_start.elapsed().as_millis() as i64;
let subtask_events = roboticus_db::task_events::subtask_events_for_parent(
&state.db, &turn_id,
)
.unwrap_or_default();
let mut assigned_agents = Vec::new();
for event in &subtask_events {
if let Some(agent) = event.assigned_to.as_ref()
&& !assigned_agents
.iter()
.any(|existing: &String| existing == agent)
{
assigned_agents.push(agent.clone());
}
}
let subagents =
roboticus_db::agents::list_sub_agents(&state.db).unwrap_or_default();
let mut utilized_skills = Vec::new();
for agent in &assigned_agents {
if let Some(row) = subagents.iter().find(|sa| sa.name == *agent) {
for skill in
super::super::parse_skills_json(row.skills_json.as_deref())
{
if !utilized_skills
.iter()
.any(|existing: &String| existing == &skill)
{
utilized_skills.push(skill);
}
}
}
}
let outcome_row = roboticus_db::delegation::DelegationOutcomeRow {
id: uuid::Uuid::new_v4().to_string(),
turn_id: turn_id.clone(),
session_id: session_id.clone(),
task_description: user_content.chars().take(500).collect(),
subtask_count: plan.subtasks.len() as i64,
pattern: "fan-out".into(),
assigned_agents_json: serde_json::to_string(&assigned_agents)
.unwrap_or_else(|_| "[]".into()),
total_duration_ms: Some(delegation_duration_ms),
success: true,
quality_score: quality_score_val,
created_at: String::new(),
skill_utilization_json: Some(
serde_json::to_string(&utilized_skills)
.unwrap_or_else(|_| "[]".into()),
),
retry_count: retry_count_val,
};
let db_clone = state.db.clone();
tokio::spawn(async move {
if let Err(e) = roboticus_db::delegation::insert_delegation_outcome(
&db_clone,
&outcome_row,
) {
tracing::warn!(error = %e, "failed to persist delegation outcome");
}
});
pipeline_trace.annotate_ns(
ns::DELEGATION,
"duration_ms",
serde_json::json!(delegation_duration_ms),
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"success",
serde_json::json!(true),
);
if let Some(score) = quality_score_val {
pipeline_trace.annotate_ns(
ns::DELEGATION,
"quality_score",
serde_json::json!(score),
);
}
}
pipeline_trace.end_stage(SpanOutcome::Ok);
(
Some(format!(
"Delegated subagent execution completed this turn. Verified output:\n{final_output}"
)),
Some(final_output),
)
}
Err(err) => {
let typed_err = classify_delegation_error(&err);
tracing::warn!(
error = %err,
error_type = typed_err.error_type(),
"delegated tool execution failed"
);
pipeline_trace.annotate_ns(
ns::DELEGATION,
"duration_ms",
serde_json::json!(deleg_start.elapsed().as_millis() as i64),
);
pipeline_trace.annotate_ns(ns::DELEGATION, "success", serde_json::json!(false));
pipeline_trace.annotate_ns(
ns::DELEGATION,
"error_type",
serde_json::json!(typed_err.error_type()),
);
pipeline_trace
.end_stage(SpanOutcome::Error(typed_err.error_type().to_string()));
delegation_provenance.subagent_task_started = true;
(Some(format!("Delegation failed: {typed_err}")), None)
}
}
} else {
(None, None)
}
} else {
(None, None)
};
if config.skill_first_enabled
&& authority == roboticus_core::InputAuthority::Creator
&& let Some(skill_reply) =
try_skill_first_fulfillment(state, &user_content, &turn_id, authority, channel_label)
.await
{
let asst_id = roboticus_db::sessions::append_message_with_topic(
&state.db,
&session_id,
"assistant",
&skill_reply,
topic_tag.as_deref(),
)
.map(|id| id.to_string())
.unwrap_or_default();
return Ok(PipelineOutcome::Complete {
session_id,
user_message_id,
result: Box::new(core::PipelineResult::synthetic(
skill_reply,
"skill-first".into(),
asst_id,
)),
});
}
let cfg = state.config.read().await;
let agent_name = cfg.agent.name.clone();
let agent_id_for_prompt = cfg.agent.id.clone();
let primary_model = config
.model_override
.clone()
.unwrap_or_else(|| cfg.models.primary.clone());
let tier_adapt = cfg.tier_adapt.clone();
drop(cfg);
let personality = state.personality.read().await;
let os_text = personality.os_text.clone();
let firmware_text = personality.firmware_text.clone();
drop(personality);
let inference_input = core::InferenceInput {
state,
session_id: &session_id,
user_content: &user_content,
turn_id: &turn_id,
channel_label,
agent_name,
agent_id: agent_id_for_prompt,
os_text,
firmware_text,
primary_model,
tier_adapt,
delegation_workflow_note,
inject_diagnostics: config.inject_diagnostics,
gate_system_note,
delegated_execution_note,
delegated_execution_result,
behavioral_note: task_state.behavioral_history.variation_hint.clone(),
content_parts: request.content_parts.clone(),
topic_tag: topic_tag.clone(),
};
pipeline_trace.begin_stage(ns::RETRIEVAL);
let mut prepared = core::prepare_inference(&inference_input)
.await
.map_err(PipelineError::Internal)?;
annotate_retrieval_metrics(&mut pipeline_trace, &prepared.retrieval_metrics);
pipeline_trace.end_stage(SpanOutcome::Ok);
{
let metrics = &prepared.retrieval_metrics;
let mut post_retrieval_input = task_input.clone();
post_retrieval_input.retrieval_metrics = Some(metrics.clone());
post_retrieval_input.tool_search_stats = prepared.tool_search_stats.clone();
let post_state = roboticus_agent::task_state::synthesize(&post_retrieval_input);
let post_plan = roboticus_agent::action_planner::plan(&post_state, &post_retrieval_input);
if post_plan.selected != execution_plan.selected {
tracing::info!(
pre = ?execution_plan.selected,
post = ?post_plan.selected,
"post-retrieval planner re-evaluation changed decision"
);
pipeline_trace.annotate_ns(
ns::RETRIEVAL,
"post_retrieval_replan",
serde_json::json!({
"pre_decision": format!("{:?}", execution_plan.selected),
"post_decision": format!("{:?}", post_plan.selected),
"reason": post_plan.selected_rationale,
}),
);
if post_plan.selected == PlannedAction::InspectMemory {
tracing::info!(
"post-retrieval: InspectMemory activated — injecting memory inspection instruction"
);
prepared.request.inject_system_note(
"[Memory Inspection Required] Retrieval confidence is low for this query. \
Before answering, call `get_memory_stats` to check what memory tiers are \
available and whether relevant context exists. If recall is uncertain, \
say so explicitly rather than guessing.",
);
}
if post_plan.selected == PlannedAction::ReturnBlocker {
tracing::warn!("post-retrieval: ReturnBlocker — all providers blocked");
return Err(PipelineError::Internal(
"All configured model providers are currently unavailable. \
Please try again shortly or check provider status."
.into(),
));
}
}
}
if !matches!(
execution_plan.selected,
PlannedAction::DelegateToSpecialist | PlannedAction::ComposeSubagent
) {
let sub_names: Vec<String> = roboticus_db::agents::list_sub_agents(&state.db)
.unwrap_or_default()
.iter()
.map(|a| a.name.clone())
.collect();
if !sub_names.is_empty() {
prepared.request.inject_system_note(&format!(
"IMPORTANT: You are responding directly to the user. Do NOT mention, reference, \
or delegate to any internal specialist or subagent ({}). Do NOT describe your \
internal architecture, tool inventory, or delegation capabilities. Respond as \
your persona — address the user's request directly.",
sub_names.join(", ")
));
}
}
if let Some(tool_search_stats) = &prepared.tool_search_stats {
pipeline_trace.begin_stage(ns::TOOL_SEARCH);
annotate_tool_search(&mut pipeline_trace, tool_search_stats);
pipeline_trace.end_stage(SpanOutcome::Ok);
}
if let Err(e) = roboticus_db::sessions::upsert_context_snapshot(
&state.db,
roboticus_db::sessions::UpsertContextSnapshotInput {
turn_id: &turn_id,
complexity_level: &prepared.context_snapshot.complexity_level,
token_budget: prepared.context_snapshot.token_budget,
system_prompt_tokens: Some(prepared.context_snapshot.system_prompt_tokens),
memory_tokens: Some(prepared.context_snapshot.memory_tokens),
history_tokens: Some(prepared.context_snapshot.history_tokens),
history_depth: Some(prepared.context_snapshot.history_depth),
memory_tiers_json: prepared.context_snapshot.memory_tiers_json.as_deref(),
retrieved_memories_json: prepared.context_snapshot.retrieved_memories_json.as_deref(),
model: Some(&prepared.model),
},
) {
tracing::warn!(turn_id = %turn_id, error = %e, "failed to persist context snapshot");
}
let shortcut_result = if config.shortcuts_enabled && !is_correction_turn {
let registry = super::super::intent_registry::IntentRegistry::default_registry();
let bypass_cache = registry.should_bypass_cache(&prepared.intents);
let agent_name = {
let cfg = state.config.read().await;
cfg.agent.name.clone()
};
let mut shortcut_ctx = super::super::shortcuts::ShortcutContext {
state,
user_content: &user_content,
turn_id: &turn_id,
intents: &prepared.intents,
agent_name: &agent_name,
channel_label,
prepared_model: &prepared.model,
authority,
delegation_provenance: &mut delegation_provenance,
is_correction_turn,
has_conversation_context: prepared.previous_assistant.is_some(),
};
match super::super::shortcuts::ShortcutDispatcher::default_dispatcher()
.try_dispatch(&mut shortcut_ctx, bypass_cache)
.await
{
Ok(Some(output)) => {
tracing::info!(
model = %output.model,
quality = output.quality_score,
"Shortcut matched"
);
Some(output)
}
Err(e) => {
tracing::warn!(error = %e, "shortcut dispatch failed; falling through");
None
}
Ok(None) => None,
}
} else {
None
};
match config.inference_mode {
InferenceMode::Standard => {
if let Some(sc) = shortcut_result {
let asst_id = roboticus_db::sessions::append_message_with_topic(
&state.db,
&session_id,
"assistant",
&sc.content,
topic_tag.as_deref(),
)
.map_err(|e| {
PipelineError::Internal(format!("failed to store shortcut response: {e}"))
})?;
core::record_cost(
state,
&sc.model,
roboticus_core::model::provider_prefix(&sc.model),
sc.tokens_in,
sc.tokens_out,
sc.cost,
None,
false,
Some(sc.latency_ms as i64),
Some(sc.quality_score),
sc.escalated,
Some(&turn_id),
);
pipeline_trace.begin_stage("inference");
annotate_mcp_calls_from_react_trace(&mut pipeline_trace, &sc.react_trace);
pipeline_trace.end_stage(SpanOutcome::Skipped);
persist_pipeline_trace(state, &session_id, &pipeline_trace, &sc.react_trace);
return Ok(PipelineOutcome::Complete {
session_id: session_id.clone(),
user_message_id: user_message_id.clone(),
result: Box::new(core::PipelineResult::from_inference(
&sc,
sc.model.clone(),
asst_id,
)),
});
}
pipeline_trace.begin_stage("inference");
let result = core::execute_inference_pipeline(
state,
&prepared,
&session_id,
&user_content,
&turn_id,
authority,
Some(channel_label),
&mut delegation_provenance,
topic_tag.as_deref(),
)
.await;
match &result {
Ok(result) => {
annotate_mcp_calls_from_react_trace(&mut pipeline_trace, &result.react_trace);
pipeline_trace.end_stage(SpanOutcome::Ok);
let guard_steps: Vec<_> = result
.react_trace
.steps
.iter()
.filter_map(|step| {
if let super::super::flight_recorder::ReactStep::Guard {
guard_name,
fired,
action,
detail,
..
} = step
{
Some((guard_name.clone(), *fired, action.clone(), detail.clone()))
} else {
None
}
})
.collect();
if !guard_steps.is_empty() {
pipeline_trace.begin_stage("guard_chain");
for (name, fired, action, detail) in &guard_steps {
pipeline_trace.annotate(
format!("guard.{name}"),
serde_json::json!({
"fired": fired,
"action": action,
"detail": detail,
}),
);
}
let any_fired = guard_steps.iter().any(|(_, fired, _, _)| *fired);
pipeline_trace.end_stage(if any_fired {
SpanOutcome::Error("guard_fired".into())
} else {
SpanOutcome::Ok
});
}
persist_pipeline_trace(
state,
&session_id,
&pipeline_trace,
&result.react_trace,
);
}
Err(err) => {
pipeline_trace.end_stage(SpanOutcome::Error(err.clone()));
}
}
let result = result.map_err(PipelineError::Internal)?;
if config.nickname_refinement {
let db = state.db.clone();
let llm = std::sync::Arc::clone(&state.llm);
let sid = session_id.clone();
let oauth = state.oauth.clone();
let keystore = state.keystore.clone();
tokio::spawn(async move {
if let Ok(count) = roboticus_db::sessions::message_count(&db, &sid)
&& count >= 4
{
let _ =
core::refine_session_nickname(&db, &llm, &sid, &oauth, &keystore).await;
}
});
}
Ok(PipelineOutcome::Complete {
session_id,
user_message_id,
result: Box::new(result),
})
}
InferenceMode::Streaming => {
let agent_id_stream = {
let cfg = state.config.read().await;
cfg.agent.id.clone()
};
let has_tools = !prepared.request.tools.is_empty();
if has_tools && shortcut_result.is_none() {
pipeline_trace.begin_stage("inference");
let result = core::execute_inference_pipeline(
state,
&prepared,
&session_id,
&user_content,
&turn_id,
authority,
Some(channel_label),
&mut delegation_provenance,
topic_tag.as_deref(),
)
.await;
match &result {
Ok(result) => {
annotate_mcp_calls_from_react_trace(
&mut pipeline_trace,
&result.react_trace,
);
pipeline_trace.end_stage(SpanOutcome::Ok);
persist_pipeline_trace(
state,
&session_id,
&pipeline_trace,
&result.react_trace,
);
}
Err(err) => {
pipeline_trace.end_stage(SpanOutcome::Error(err.clone()));
}
}
let result = result.map_err(PipelineError::Internal)?;
let synthetic = core::InferenceOutput {
content: result.content,
model: result.model,
tokens_in: result.tokens_in,
tokens_out: result.tokens_out,
cost: result.cost,
react_turns: result.react_turns,
latency_ms: 0,
quality_score: 0.0,
escalated: false,
tool_results: vec![],
react_trace: result.react_trace,
};
Ok(PipelineOutcome::StreamReady(Box::new(StreamContext {
session_id,
turn_id,
user_content,
prepared,
shortcut_result: Some(synthetic),
dedup_guard,
agent_id: agent_id_stream,
resolved_stream: None,
guard_set: config.guard_set,
})))
} else {
let _ = authority;
let resolved_stream = if shortcut_result.is_some() {
None
} else {
match super::super::routing::infer_stream_with_fallback(
state,
&prepared.request,
&prepared.model,
)
.await
{
Ok(ctx) => Some(ctx),
Err(e) => {
tracing::error!(error = %e, "all streaming fallback candidates failed");
return Err(PipelineError::SessionError(
"upstream provider error".into(),
axum::http::StatusCode::BAD_GATEWAY,
));
}
}
};
Ok(PipelineOutcome::StreamReady(Box::new(StreamContext {
session_id,
turn_id,
user_content,
prepared,
shortcut_result,
dedup_guard,
agent_id: agent_id_stream,
resolved_stream,
guard_set: config.guard_set,
})))
}
}
}
}
fn extract_channel_from_scope_key(scope_key: &str) -> Option<&str> {
if scope_key == "agent" {
return None;
}
let rest = scope_key
.strip_prefix("peer:")
.or_else(|| scope_key.strip_prefix("group:"))?;
rest.split(':').next()
}
fn user_keyword_tokens(input: &str) -> std::collections::HashSet<String> {
input
.split(|c: char| !c.is_alphanumeric() && c != '-' && c != '_')
.map(|s| s.trim().to_ascii_lowercase())
.filter(|s| s.len() >= 3)
.collect()
}
async fn try_skill_first_fulfillment(
state: &AppState,
user_content: &str,
turn_id: &str,
authority: roboticus_core::InputAuthority,
channel: &str,
) -> Option<String> {
let skills = match roboticus_db::skills::list_skills(&state.db) {
Ok(rows) => rows.into_iter().filter(|s| s.enabled).collect::<Vec<_>>(),
Err(e) => {
tracing::warn!(error = %e, "skill-first lookup failed");
return None;
}
};
if skills.is_empty() {
return None;
}
let tokens = user_keyword_tokens(user_content);
if tokens.is_empty() {
return None;
}
let mut best: Option<(usize, String, String)> = None;
for skill in skills {
let Some(script_path) = skill.script_path.clone() else {
continue;
};
let Some(triggers_raw) = skill.triggers_json.as_deref() else {
continue;
};
let Ok(triggers) = serde_json::from_str::<roboticus_core::SkillTrigger>(triggers_raw)
else {
continue;
};
let score = triggers
.keywords
.iter()
.map(|k| k.to_ascii_lowercase())
.filter(|k| tokens.contains(k))
.count();
if score == 0 {
continue;
}
match best {
Some((best_score, _, _)) if best_score >= score => {}
_ => best = Some((score, skill.name.clone(), script_path)),
}
}
let (_score, skill_name, script_path) = best?;
let params = serde_json::json!({
"path": script_path,
"args": [user_content],
});
match super::super::execute_tool_call(
state,
"run_script",
¶ms,
turn_id,
authority,
Some(channel),
)
.await
{
Ok(output) => {
tracing::info!(skill = %skill_name, "skill-first execution succeeded");
Some(output)
}
Err(e) => {
tracing::warn!(skill = %skill_name, error = %e, "skill-first execution failed; falling back to LLM pipeline");
None
}
}
}
#[cfg(test)]
mod tests {
use super::super::config::*;
use super::*;
use crate::api::routes::agent::decomposition::{
DecompositionDecision, DelegationPlan, SpecialistProposal,
};
use crate::api::routes::agent::flight_recorder::ReactTrace;
#[test]
fn api_preset_enables_all_core_features() {
let cfg = PipelineConfig::api();
assert!(cfg.injection_defense);
assert!(cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
assert!(cfg.shortcuts_enabled);
assert!(cfg.cache_enabled);
assert!(cfg.post_turn_ingest);
assert!(cfg.nickname_refinement);
assert!(cfg.inject_diagnostics);
assert!(!cfg.specialist_controls); assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
assert_eq!(cfg.authority_mode, AuthorityMode::ApiClaim);
assert_eq!(cfg.channel_label, "api");
assert_eq!(cfg.session_resolution, SessionResolutionMode::FromBody);
}
#[tokio::test]
async fn auto_composition_creates_missing_skills_and_subagent_for_task_turn() {
let state = crate::api::routes::tests::test_state();
{
let mut cfg = state.config.write().await;
cfg.agent.composition_policy = roboticus_core::config::CompositionPolicy::Autonomous;
}
let proposal = SpecialistProposal {
name: "finance-specialist".into(),
display_name: "Finance Specialist".into(),
description: "Handles finance planning tasks".into(),
skills: vec!["forecasting".into(), "pricing".into()],
model: "auto".into(),
};
let decision = DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale: "no specialist fit".into(),
};
let mut trace = PipelineTrace::new("turn-auto-compose", "api");
let (resolved, workflow_note) = resolve_specialist_creation_for_task(
&state,
"turn-auto-compose",
"api",
roboticus_core::InputAuthority::Creator,
"session-auto-compose",
"Research pricing and forecast revenue and draft a rollout plan",
decision,
&mut trace,
)
.await;
assert!(!matches!(
resolved,
DecompositionDecision::RequiresSpecialistCreation { .. }
));
assert!(
workflow_note.is_some()
|| matches!(resolved, DecompositionDecision::Centralized { .. })
);
let skills = roboticus_db::skills::list_skills(&state.db).expect("list skills");
assert!(skills.iter().any(|s| s.name == "forecasting"));
assert!(skills.iter().any(|s| s.name == "pricing"));
let agents = roboticus_db::agents::list_sub_agents(&state.db).expect("list subagents");
let subagent = agents
.iter()
.find(|a| a.name == "finance-specialist")
.expect("finance specialist created");
let skills_json = subagent.skills_json.as_deref().unwrap_or("[]");
assert!(skills_json.contains("forecasting"));
assert!(skills_json.contains("pricing"));
}
#[test]
fn planner_selects_compose_subagent_for_explicit_empty_roster_workflow() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::{TaskClassification, TaskStateInput};
let input = TaskStateInput {
user_content:
"Introspect what you need, compose a specialist, then delegate this task.".into(),
intents: vec!["Delegation".into()],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 0,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: None,
explicit_specialist_workflow: true,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(state.classification, TaskClassification::Task);
assert_eq!(state.roster_fit.taskable_count, 0);
assert_eq!(plan.selected, PlannedAction::ComposeSubagent);
}
#[test]
fn planner_selects_answer_directly_for_conversation() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::{TaskClassification, TaskStateInput};
let input = TaskStateInput {
user_content: "Thanks, that makes sense.".into(),
intents: vec![],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 0,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: None,
explicit_specialist_workflow: false,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(state.classification, TaskClassification::Conversation);
assert_eq!(plan.selected, PlannedAction::AnswerDirectly);
}
#[test]
fn build_task_state_input_compose_subagent_with_centralized_gate() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskStateInput;
let input = TaskStateInput {
user_content: "Compose a specialist and delegate this task".into(),
intents: vec!["Delegation".into()],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 10,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: Some(roboticus_agent::task_state::DecompositionProposal {
should_delegate: false,
rationale: "single-step".into(),
utility_margin: -0.1,
}),
explicit_specialist_workflow: true,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(plan.selected, PlannedAction::ComposeSubagent);
}
#[test]
fn build_task_state_input_conversation_is_non_compositional() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskStateInput;
let input = TaskStateInput {
user_content: "Thanks".into(),
intents: vec![],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 10,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: None,
explicit_specialist_workflow: false,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(plan.selected, PlannedAction::AnswerDirectly);
}
#[tokio::test]
async fn build_task_state_input_prefers_delegation_for_explicit_matching_specialists() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskClassification;
let state = crate::api::routes::tests::test_state();
let agent = roboticus_db::agents::SubAgentRow {
id: "test-revenue-strategist".into(),
name: "revenue-strategist".into(),
display_name: Some("Revenue Strategist".into()),
model: "auto".into(),
fallback_models_json: Some("[]".into()),
role: "Subagent".into(),
description: Some("Handles freelancer revenue strategy".into()),
skills_json: Some(r#"["revenue","pricing","freelancers"]"#.into()),
enabled: true,
session_count: 0,
last_used_at: None,
};
roboticus_db::agents::upsert_sub_agent(&state.db, &agent).expect("seed subagent");
let intents = vec![crate::api::routes::agent::intent_registry::Intent::Delegation];
let gate_decision = DecompositionDecision::Delegated(DelegationPlan {
subtasks: vec!["test task".into()],
rationale: "test delegation".into(),
expected_utility_margin: 0.5,
});
let task_input = build_task_state_input(
&state,
"test-session",
"revenue pricing",
&intents,
roboticus_core::InputAuthority::Creator,
Some(&gate_decision),
"standard",
)
.await;
let task_state = roboticus_agent::task_state::synthesize(&task_input);
let plan = roboticus_agent::action_planner::plan(&task_state, &task_input);
assert_eq!(task_state.classification, TaskClassification::Task);
assert_eq!(task_state.roster_fit.taskable_count, 1);
assert_eq!(task_state.roster_fit.fit_count, 1);
assert!(
matches!(
plan.selected,
PlannedAction::DelegateToSpecialist | PlannedAction::ComposeSkill
),
"planner should act on roster fit: got {:?}",
plan.selected
);
}
#[tokio::test]
async fn build_task_state_input_counts_name_and_description_tokens_for_specialist_fit() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskClassification;
let state = crate::api::routes::tests::test_state();
let agent = roboticus_db::agents::SubAgentRow {
id: "test-saas-ideator".into(),
name: "saas-ideator".into(),
display_name: Some("SaaS Ideator".into()),
model: "auto".into(),
fallback_models_json: Some("[]".into()),
role: "Subagent".into(),
description: Some("Generates SaaS ideas for freelancers".into()),
skills_json: Some(r#"["typescript","python"]"#.into()),
enabled: true,
session_count: 0,
last_used_at: None,
};
roboticus_db::agents::upsert_sub_agent(&state.db, &agent).expect("seed subagent");
let intents = vec![crate::api::routes::agent::intent_registry::Intent::Delegation];
let gate_decision = DecompositionDecision::Delegated(DelegationPlan {
subtasks: vec!["test task".into()],
rationale: "test delegation".into(),
expected_utility_margin: 0.5,
});
let task_input = build_task_state_input(
&state,
"test-session",
"saas ideas freelancers",
&intents,
roboticus_core::InputAuthority::Creator,
Some(&gate_decision),
"standard",
)
.await;
let task_state = roboticus_agent::task_state::synthesize(&task_input);
let plan = roboticus_agent::action_planner::plan(&task_state, &task_input);
assert_eq!(task_state.classification, TaskClassification::Task);
assert_eq!(task_state.roster_fit.taskable_count, 1);
assert_eq!(task_state.roster_fit.fit_count, 1);
assert!(
matches!(
plan.selected,
PlannedAction::DelegateToSpecialist | PlannedAction::ComposeSkill
),
"planner should act on roster fit: got {:?}",
plan.selected
);
}
#[tokio::test]
async fn build_task_state_input_treats_explicit_specialist_workflow_as_task() {
use roboticus_agent::task_state::TaskClassification;
let state = crate::api::routes::tests::test_state();
let intents = vec![crate::api::routes::agent::intent_registry::Intent::Delegation];
let task_input = build_task_state_input(
&state,
"test-session",
"delegate this to a specialist",
&intents,
roboticus_core::InputAuthority::Creator,
None,
"standard",
)
.await;
let task_state = roboticus_agent::task_state::synthesize(&task_input);
assert_eq!(task_state.classification, TaskClassification::Task);
}
#[test]
fn build_task_state_input_delegates_when_explicit_workflow_and_fit_agents() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::{DecompositionProposal, TaskStateInput};
let input = TaskStateInput {
user_content:
"Use the best existing specialists if they fit, otherwise compose what is missing."
.into(),
intents: vec!["Delegation".into()],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 2,
fit_agent_count: 1,
fit_agent_names: vec!["revenue-strategist".into()],
enabled_skill_count: 10,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: Some(DecompositionProposal {
should_delegate: false,
rationale: "single-step".into(),
utility_margin: -0.1,
}),
explicit_specialist_workflow: true,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert!(
matches!(
plan.selected,
PlannedAction::DelegateToSpecialist | PlannedAction::ComposeSkill
),
"planner should act on roster fit: got {:?}",
plan.selected
);
}
#[test]
fn streaming_preset_has_full_feature_parity() {
let cfg = PipelineConfig::streaming();
assert!(cfg.injection_defense);
assert!(cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
assert!(cfg.shortcuts_enabled);
assert!(!cfg.specialist_controls);
assert_eq!(cfg.inference_mode, InferenceMode::Streaming);
assert_eq!(cfg.guard_set, GuardSetPreset::Streaming);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::None);
assert!(!cfg.nickname_refinement);
assert!(cfg.post_turn_ingest);
assert!(cfg.cache_enabled);
assert_eq!(cfg.authority_mode, AuthorityMode::ApiClaim);
assert_eq!(cfg.channel_label, "api-stream");
}
#[test]
fn channel_preset_enables_specialist_controls() {
let cfg = PipelineConfig::channel("telegram");
assert!(cfg.injection_defense);
assert!(cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
assert!(cfg.shortcuts_enabled);
assert!(cfg.specialist_controls); assert!(cfg.cache_enabled);
assert!(cfg.post_turn_ingest);
assert!(!cfg.nickname_refinement); assert!(!cfg.inject_diagnostics); assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
assert_eq!(cfg.authority_mode, AuthorityMode::ChannelClaim);
assert_eq!(cfg.channel_label, "telegram");
assert_eq!(
cfg.session_resolution,
SessionResolutionMode::FromChannel {
platform: "telegram".into()
}
);
}
#[test]
fn channel_preset_uses_platform_as_label() {
let telegram = PipelineConfig::channel("telegram");
assert_eq!(telegram.channel_label, "telegram");
let discord = PipelineConfig::channel("discord");
assert_eq!(discord.channel_label, "discord");
let email = PipelineConfig::channel("email");
assert_eq!(email.channel_label, "email");
}
#[test]
fn cron_preset_has_injection_defense() {
let cfg = PipelineConfig::cron();
assert!(cfg.injection_defense);
assert!(!cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(!cfg.delegated_execution); assert!(!cfg.shortcuts_enabled); assert!(!cfg.specialist_controls);
assert!(cfg.cache_enabled);
assert!(cfg.post_turn_ingest);
assert!(!cfg.nickname_refinement);
assert!(!cfg.inject_diagnostics);
assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
assert_eq!(cfg.authority_mode, AuthorityMode::SelfGenerated);
assert_eq!(cfg.channel_label, "cron");
assert_eq!(cfg.session_resolution, SessionResolutionMode::Dedicated);
}
#[test]
fn mcp_react_steps_annotate_pipeline_trace() {
let mut pipeline_trace = PipelineTrace::new("turn-mcp", "api");
pipeline_trace.begin_stage("inference");
let mut react_trace = ReactTrace::new("turn-mcp");
react_trace.record(ReactStep::ToolCall {
tool_name: "github::create_issue".into(),
parameters_redacted: false,
result_summary: "created".into(),
duration_ms: 275,
success: true,
source: ToolSource::Mcp {
server: "github".into(),
},
});
annotate_mcp_calls_from_react_trace(&mut pipeline_trace, &react_trace);
pipeline_trace.end_stage(SpanOutcome::Ok);
let span = &pipeline_trace.stages[0];
assert_eq!(
span.annotations.get("mcp.server"),
Some(&serde_json::json!("github"))
);
assert_eq!(
span.annotations.get("mcp.tool"),
Some(&serde_json::json!("github::create_issue"))
);
assert_eq!(
span.annotations.get("mcp.duration_ms"),
Some(&serde_json::json!(275))
);
assert_eq!(
span.annotations.get("mcp.success"),
Some(&serde_json::json!(true))
);
}
#[test]
fn guard_set_presets_resolve_to_non_empty_chains() {
let full = GuardSetPreset::Full.resolve();
assert!(!full.is_empty());
let cached = GuardSetPreset::Cached.resolve();
assert!(!cached.is_empty());
let streaming = GuardSetPreset::Streaming.resolve();
assert!(!streaming.is_empty());
}
#[test]
fn guard_set_none_resolves_to_empty_chain() {
let none = GuardSetPreset::None.resolve();
assert!(none.is_empty());
}
#[test]
fn api_predicates() {
let cfg = PipelineConfig::api();
assert!(cfg.is_standard_inference());
assert!(!cfg.is_streaming_inference());
assert!(cfg.enforces_authority());
assert!(cfg.can_execute_tools());
assert!(cfg.resolves_session_from_body());
assert!(!cfg.is_channel());
assert!(!cfg.is_cron());
}
#[test]
fn streaming_predicates() {
let cfg = PipelineConfig::streaming();
assert!(!cfg.is_standard_inference());
assert!(cfg.is_streaming_inference());
assert!(cfg.enforces_authority());
assert!(!cfg.can_execute_tools()); assert!(cfg.resolves_session_from_body());
assert!(!cfg.is_channel());
assert!(!cfg.is_cron());
}
#[test]
fn channel_predicates() {
let cfg = PipelineConfig::channel("telegram");
assert!(cfg.is_standard_inference());
assert!(!cfg.is_streaming_inference());
assert!(cfg.enforces_authority());
assert!(cfg.can_execute_tools());
assert!(!cfg.resolves_session_from_body());
assert!(cfg.is_channel());
assert!(!cfg.is_cron());
}
#[test]
fn cron_predicates() {
let cfg = PipelineConfig::cron();
assert!(cfg.is_standard_inference());
assert!(!cfg.is_streaming_inference());
assert!(!cfg.enforces_authority());
assert!(cfg.can_execute_tools());
assert!(!cfg.resolves_session_from_body());
assert!(!cfg.is_channel());
assert!(cfg.is_cron());
}
#[test]
fn all_presets_have_injection_defense() {
assert!(PipelineConfig::api().injection_defense);
assert!(PipelineConfig::streaming().injection_defense);
assert!(PipelineConfig::channel("test").injection_defense);
assert!(PipelineConfig::cron().injection_defense);
}
#[test]
fn api_and_streaming_share_pre_inference_stage_flags() {
let api = PipelineConfig::api();
let stream = PipelineConfig::streaming();
assert_eq!(api.decomposition_gate, stream.decomposition_gate);
assert_eq!(api.delegated_execution, stream.delegated_execution);
assert_eq!(api.shortcuts_enabled, stream.shortcuts_enabled);
assert_eq!(api.injection_defense, stream.injection_defense);
assert_eq!(api.skill_first_enabled, stream.skill_first_enabled);
assert_eq!(
api.short_followup_expansion,
stream.short_followup_expansion
);
}
#[test]
fn all_presets_have_post_turn_ingest() {
assert!(PipelineConfig::api().post_turn_ingest);
assert!(PipelineConfig::streaming().post_turn_ingest);
assert!(PipelineConfig::channel("test").post_turn_ingest);
assert!(PipelineConfig::cron().post_turn_ingest);
}
#[test]
fn standard_inference_paths_have_full_guards() {
let api = PipelineConfig::api();
let channel = PipelineConfig::channel("telegram");
let cron = PipelineConfig::cron();
for cfg in [&api, &channel, &cron] {
assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
}
}
#[test]
fn only_api_has_nickname_refinement() {
assert!(PipelineConfig::api().nickname_refinement);
assert!(!PipelineConfig::streaming().nickname_refinement);
assert!(!PipelineConfig::channel("test").nickname_refinement);
assert!(!PipelineConfig::cron().nickname_refinement);
}
#[test]
fn only_channel_has_specialist_controls() {
assert!(!PipelineConfig::api().specialist_controls);
assert!(!PipelineConfig::streaming().specialist_controls);
assert!(PipelineConfig::channel("test").specialist_controls);
assert!(!PipelineConfig::cron().specialist_controls);
}
#[test]
fn streaming_uses_streaming_inference_mode() {
let cfg = PipelineConfig::streaming();
assert_eq!(cfg.inference_mode, InferenceMode::Streaming);
assert!(cfg.shortcuts_enabled);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
}
#[test]
fn session_resolution_modes_are_correct() {
assert_eq!(
PipelineConfig::api().session_resolution,
SessionResolutionMode::FromBody
);
assert_eq!(
PipelineConfig::streaming().session_resolution,
SessionResolutionMode::FromBody
);
assert_eq!(
PipelineConfig::channel("discord").session_resolution,
SessionResolutionMode::FromChannel {
platform: "discord".into()
}
);
assert_eq!(
PipelineConfig::cron().session_resolution,
SessionResolutionMode::Dedicated
);
}
#[test]
fn provided_session_resolution_stores_id() {
let mode = SessionResolutionMode::Provided {
session_id: "test-session-123".into(),
};
match mode {
SessionResolutionMode::Provided { session_id } => {
assert_eq!(session_id, "test-session-123");
}
_ => panic!("expected Provided variant"),
}
}
}