use roboticus_agent::agent_loop::{AgentLoop, ReactAction, ReactState};
use roboticus_core::InputAuthority;
use super::super::AppState;
use super::super::decomposition::DelegationProvenance;
use super::super::flight_recorder::{ReactStep, ReactTrace, ToolSource};
use super::super::guard_registry::{GuardContext, contains_internal_protocol_marker};
use super::super::routing::infer_with_fallback;
use super::super::tools::{execute_tool_call_detailed, parse_tool_call, parse_tool_calls};
use super::guard_retry::{apply_guards_with_retry, is_task_like_turn, rescue_task_protocol_leak};
use super::types::{InferenceOutput, PreparedInference};
pub(crate) fn sanitize_model_output(content: String, hmac_secret: &[u8]) -> String {
let content = if content.contains("<<<TRUST_BOUNDARY:") {
if !roboticus_agent::prompt::verify_hmac_boundary(&content, hmac_secret) {
tracing::warn!("HMAC boundary tampered in model output, stripping");
roboticus_agent::prompt::strip_hmac_boundaries(&content)
} else {
content
}
} else {
content
};
if roboticus_agent::injection::scan_output(&content) {
tracing::warn!("L4 output scan flagged model response, blocking");
"[Response blocked by output safety filter]".to_string()
} else {
content
}
}
pub(crate) async fn run_inference_and_react(
state: &AppState,
prepared: &PreparedInference,
session_id: &str,
turn_id: &str,
authority: InputAuthority,
channel_label: Option<&str>,
delegation_provenance: &mut DelegationProvenance,
) -> InferenceOutput {
let (max_react_turns, max_turn_duration_seconds) = {
let cfg = state.config.read().await;
(
cfg.agent.autonomy_max_react_turns,
cfg.agent.autonomy_max_turn_duration_seconds,
)
};
let user_prompt = prepared
.request
.messages
.last()
.map(|m| m.content.as_str())
.unwrap_or_default();
let mut resolved_model = prepared.model.clone();
let (
initial_content,
mut total_in,
mut total_out,
mut total_cost,
latency_ms,
quality_score,
escalated,
) = match infer_with_fallback(state, &prepared.request, &prepared.model).await {
Ok(result) => {
resolved_model = result.model.clone();
(
result.content,
result.tokens_in,
result.tokens_out,
result.cost,
result.latency_ms,
result.quality_score,
result.escalated,
)
}
Err(last_error) => (
super::super::tools::provider_failure_user_message(&last_error.to_string(), true),
0,
0,
0.0,
0,
0.0,
false,
),
};
let initial_content = sanitize_model_output(initial_content, state.hmac_secret.as_ref());
let mut react_loop = AgentLoop::new(max_react_turns);
let mut react_trace = ReactTrace::new(turn_id);
let mut final_content = initial_content.clone();
let mut tool_results_acc: Vec<(String, String)> = prepared
.delegated_execution_result
.clone()
.filter(|s| !s.trim().is_empty())
.map(|result| vec![("orchestrate-subagents".to_string(), result)])
.unwrap_or_default();
let react_deadline =
std::time::Instant::now() + std::time::Duration::from_secs(max_turn_duration_seconds);
let mut pending_calls = parse_tool_calls(&initial_content);
if pending_calls.is_empty()
&& let Some(single) = parse_tool_call(&initial_content)
{
pending_calls.push(single);
}
if !pending_calls.is_empty() {
react_loop.transition(ReactAction::Think);
let mut react_messages = prepared.request.messages.clone();
react_messages.push(roboticus_llm::format::UnifiedMessage {
role: "assistant".into(),
content: initial_content,
parts: None,
});
while !pending_calls.is_empty() {
if std::time::Instant::now() >= react_deadline {
final_content = format!(
"I stopped this turn after reaching the autonomy duration limit ({}s). \
Please continue with a narrower or next-step command.",
max_turn_duration_seconds
);
pending_calls.clear();
break;
}
let mut observations = Vec::new();
let mut batch_aborted = false;
for (tn, tp) in &pending_calls {
if react_loop.is_looping(tn, &tp.to_string()) {
tracing::warn!(
tool = tn.as_str(),
"ReAct loop detected — same tool+params repeated"
);
batch_aborted = true;
break;
}
if react_loop.should_suppress_duplicate(tn, &tp.to_string()) {
react_loop.increment_suppressed();
if react_loop.should_abort_error_loop() {
tracing::warn!(
tool = tn.as_str(),
"ReAct error loop — model repeating failed tool call. Aborting."
);
batch_aborted = true;
break;
}
tracing::info!(tool = tn.as_str(), "suppressing duplicate failed tool call");
let prev_error = react_loop.last_error().unwrap_or("previous call failed");
observations.push(format!(
"[Tool {tn} suppressed]: This call was already attempted and failed: {prev_error}"
));
continue;
}
if tn.to_ascii_lowercase().contains("subagent")
|| tn.to_ascii_lowercase().contains("delegate")
{
delegation_provenance.subagent_task_started = true;
}
react_loop.transition(ReactAction::Act {
tool_name: tn.clone(),
params: tp.to_string(),
});
if react_loop.state == ReactState::Done {
batch_aborted = true;
break;
}
let tool_start = std::time::Instant::now();
let tool_result =
execute_tool_call_detailed(state, tn, tp, turn_id, authority, channel_label)
.await;
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
let observation = match tool_result {
Ok(ref details) => {
tracing::info!(
tool = tn.as_str(),
output_len = details.output.len(),
"tool call succeeded"
);
react_loop.clear_error_state();
if tn.to_ascii_lowercase().contains("subagent")
|| tn.to_ascii_lowercase().contains("delegate")
{
delegation_provenance.subagent_task_completed = true;
delegation_provenance.subagent_result_attached =
!details.output.trim().is_empty();
}
format!("[Tool {tn} succeeded]: {}", details.output)
}
Err(ref err) => {
tracing::warn!(tool = tn.as_str(), error = %err, "tool call failed");
let error_message = err.to_string();
react_loop.record_tool_error(tn, &tp.to_string(), &error_message);
format!("[Tool {tn} failed]: {err}")
}
};
let result_text = match &tool_result {
Ok(details) => details.output.clone(),
Err(err) => format!("error: {err}"),
};
let tool_success = tool_result.is_ok();
let result_summary = result_text.chars().take(120).collect::<String>();
react_trace.record(ReactStep::ToolCall {
tool_name: tn.clone(),
parameters_redacted: false,
result_summary,
duration_ms: tool_duration_ms,
success: tool_success,
source: tool_result
.as_ref()
.map(|details| details.source.clone())
.unwrap_or(ToolSource::BuiltIn),
});
tool_results_acc.push((tn.clone(), result_text));
let observation = if roboticus_agent::injection::scan_output(&observation) {
tracing::warn!(
tool = tn.as_str(),
"tool result flagged by output scan, sanitizing"
);
format!("[Tool {tn} result blocked by safety filter]")
} else {
observation
};
observations.push(observation);
}
if batch_aborted {
if observations.is_empty() {
let last_error = tool_results_acc
.iter()
.rev()
.find(|(_, output)| {
output.starts_with("error:") || output.starts_with("Error:")
})
.map(|(tool, output)| {
let snippet: String = output.chars().take(200).collect();
format!("The tool `{tool}` failed with: {snippet}")
})
.unwrap_or_default();
final_content = if last_error.is_empty() {
"I attempted this task multiple times but the same tool call kept \
repeating without making progress. This usually means the approach \
needs to change — could you rephrase or suggest a different strategy?"
.to_string()
} else {
format!(
"I attempted this task multiple times but got stuck in a loop. \
{last_error}. Could you help me take a different approach?"
)
};
}
break; }
react_loop.transition(ReactAction::Observe);
let combined_observation = observations.join("\n\n");
react_messages.push(roboticus_llm::format::UnifiedMessage {
role: "user".into(),
content: combined_observation,
parts: None,
});
if react_loop.state == ReactState::Done {
break;
}
let follow_req = roboticus_llm::format::UnifiedRequest {
model: prepared.request.model.clone(),
messages: react_messages.clone(),
max_tokens: Some(2048),
temperature: None,
system: None,
quality_target: None,
tools: prepared.request.tools.clone(),
};
let follow_content =
match infer_with_fallback(state, &follow_req, &prepared.model).await {
Ok(result) => {
resolved_model = result.model.clone();
total_in += result.tokens_in;
total_out += result.tokens_out;
total_cost += result.cost;
result.content
}
Err(e) => format!("LLM follow-up error: {e}"),
};
react_messages.push(roboticus_llm::format::UnifiedMessage {
role: "assistant".into(),
content: follow_content.clone(),
parts: None,
});
let follow_content = sanitize_model_output(follow_content, state.hmac_secret.as_ref());
pending_calls = parse_tool_calls(&follow_content);
if pending_calls.is_empty()
&& let Some(single) = parse_tool_call(&follow_content)
{
pending_calls.push(single);
}
let has_marker = follow_content.contains("\"tool_call\"");
tracing::debug!(
follow_content_len = follow_content.len(),
has_tool_call_marker = has_marker,
parsed_tool_calls = pending_calls.len(),
tool_names = ?pending_calls.iter().map(|(n, _)| n.as_str()).collect::<Vec<_>>(),
"react loop: follow-up tool call parsing result"
);
if pending_calls.is_empty() && has_marker {
tracing::warn!(
content_len = follow_content.len(),
content_tail = &follow_content[follow_content.len().saturating_sub(200)..],
"react loop: tool_call marker present but parser returned empty — model's tool call was lost"
);
}
if pending_calls.is_empty() {
react_loop.transition(ReactAction::Finish);
final_content = if follow_content.trim().is_empty() && !tool_results_acc.is_empty()
{
let summaries: Vec<String> = tool_results_acc
.iter()
.map(|(name, result)| format!("[{name}]: {result}"))
.collect();
format!("Completed. {}", summaries.join(" | "))
} else {
follow_content
};
}
}
if !pending_calls.is_empty()
&& (final_content.trim().is_empty() || final_content.contains("\"tool_call\""))
{
final_content = "I could not complete the requested tool workflow this turn. Please retry with a narrower command.".to_string();
}
let mut last_results: std::collections::HashMap<&str, &str> =
std::collections::HashMap::new();
for (name, output) in &tool_results_acc {
last_results.insert(name.as_str(), output.as_str());
}
let unresolved_failures: Vec<(&str, &str)> = last_results
.into_iter()
.filter(|(_, output)| output.starts_with("error:") || output.starts_with("Error:"))
.collect();
if !unresolved_failures.is_empty() && !final_content.contains("[Tool") {
let failure_summary: String = unresolved_failures
.iter()
.map(|(name, output)| {
let snippet: String = output.chars().take(150).collect();
format!("- {name}: {snippet}")
})
.collect::<Vec<_>>()
.join("\n");
final_content.push_str(&format!("\n\n[Tool issues encountered]\n{failure_summary}"));
}
}
let agent_name = {
let cfg = state.config.read().await;
cfg.agent.name.clone()
};
let prior_assistant_msgs: Vec<String> =
roboticus_db::sessions::list_messages(&state.db, session_id, Some(10))
.unwrap_or_default()
.into_iter()
.filter(|m| m.role == "assistant")
.map(|m| m.content)
.collect();
let model_snapshot = resolved_model.clone();
let guard_tool_results = tool_results_acc.clone();
let guard_delegation_provenance = delegation_provenance.clone();
let semantic_guard_scores = super::super::guard_registry::precompute_guard_scores(
&state.semantic_classifier,
&final_content,
)
.await;
let subagent_names: Vec<String> = roboticus_db::agents::list_sub_agents(&state.db)
.unwrap_or_default()
.iter()
.map(|a| a.name.to_ascii_lowercase())
.collect();
let guard_ctx = GuardContext {
user_prompt,
intents: &prepared.intents,
tool_results: &guard_tool_results,
agent_name: &agent_name,
resolved_model: &model_snapshot,
delegation_provenance: &guard_delegation_provenance,
previous_assistant: prepared.previous_assistant.as_deref(),
prior_assistant_messages: &prior_assistant_msgs,
semantic_guard_scores,
subagent_names,
};
let mut final_content = apply_guards_with_retry(
final_content,
&guard_ctx,
state,
prepared,
turn_id,
authority,
channel_label,
delegation_provenance,
&mut react_loop,
&mut react_trace,
&mut tool_results_acc,
&mut resolved_model,
&mut total_in,
&mut total_out,
&mut total_cost,
)
.await;
if is_task_like_turn(&prepared.intents) && contains_internal_protocol_marker(&final_content) {
tracing::warn!(
"task turn leaked internal protocol after guard application; attempting final rescue"
);
final_content = rescue_task_protocol_leak(
final_content,
state,
prepared,
user_prompt,
&agent_name,
turn_id,
authority,
channel_label,
delegation_provenance,
&mut react_loop,
&mut react_trace,
&mut tool_results_acc,
&mut resolved_model,
&mut total_in,
&mut total_out,
&mut total_cost,
)
.await;
}
InferenceOutput {
content: final_content,
model: resolved_model,
tokens_in: total_in,
tokens_out: total_out,
cost: total_cost,
react_turns: react_loop.turn_count,
latency_ms,
quality_score,
escalated,
tool_results: tool_results_acc,
react_trace: Box::new(react_trace),
}
}