use roboticus_agent::agent_loop::{AgentLoop, ReactAction, ReactState};
use roboticus_core::InputAuthority;
use super::super::AppState;
use super::super::decomposition::DelegationProvenance;
use super::super::flight_recorder::{ReactStep, ReactTrace, ToolSource};
use super::super::guard_registry::{
GuardContext, GuardId, contains_internal_protocol_marker, guard_sets,
};
use super::super::routing::infer_with_fallback;
use super::super::tools::{execute_tool_call_detailed, parse_tool_call, parse_tool_calls};
use super::guard_fallback::{
deterministic_guard_fallback, deterministic_quality_fallback, is_generic_degraded_fallback,
};
use super::react_loop::sanitize_model_output;
use super::types::PreparedInference;
pub(crate) use super::guard_fallback::is_task_like_turn;
pub(crate) fn build_retry_request(
guard_id: GuardId,
prepared: &PreparedInference,
ctx: &GuardContext<'_>,
) -> roboticus_llm::format::UnifiedRequest {
let (directive, max_tokens, preserve_tools): (String, u32, bool) = match guard_id {
GuardId::EmptyResponse => (
"Operator directive: your previous response was completely empty. \
You MUST provide a substantive response to the user's request. \
Acknowledge what was done, summarize results, or ask a clarifying question."
.to_string(),
1024,
false,
),
GuardId::LiteraryQuoteRetry => (
"Operator directive: Provide a brief literary quote/paraphrase response only. \
Do not provide tactical guidance; keep it contextual and non-operational."
.to_string(),
256,
false,
),
GuardId::LowValueParroting => (
"Operator directive: your previous response was placeholder/status-only. \
Provide a concrete, complete answer to the original user request now. \
Do not output placeholder lines such as 'ready' or status-only acknowledgements."
.to_string(),
768,
false,
),
GuardId::TaskDeferral => {
let introspection_state = if ctx.tool_results.is_empty() {
"No prior introspection results were preserved.".to_string()
} else {
let summaries = ctx
.tool_results
.iter()
.map(|(name, output)| {
format!(
"- {name}: {}",
output
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
.chars()
.take(180)
.collect::<String>()
)
})
.collect::<Vec<_>>()
.join("\n");
format!("Prior tool state:\n{summaries}")
};
(
format!(
"Operator directive: do not stop at narrated next steps. You already inspected the \
relevant state for this task. Continue from that state by taking the next concrete \
action now (for example: compose the missing specialist, call the next tool, or \
return the actual completed result). Do not say what you will do next — do it.\n\n\
If the roster is empty or there is no clean specialist fit, compose what is missing \
before delegating.\n\n{}",
introspection_state
),
896,
true,
)
}
GuardId::OutputContract => {
let execution_state = if ctx.tool_results.is_empty() {
"No prior execution state was preserved.".to_string()
} else {
let summaries = ctx
.tool_results
.iter()
.map(|(name, output)| {
format!(
"- {name}: {}",
output
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
.chars()
.take(180)
.collect::<String>()
)
})
.collect::<Vec<_>>()
.join("\n");
format!("Prior execution state:\n{summaries}")
};
(
format!(
"Operator directive: preserve the task result, but rewrite it so it matches the \
user's exact requested output contract. If they asked for exactly a specific number \
of bullet points, return exactly that many bullet points and no headings, preamble, \
labels, or trailing explanation.\n\n{}",
execution_state
),
768,
false,
)
}
GuardId::InternalProtocol => (
"Operator directive: your previous response still contained internal execution metadata \
or non-executable tool protocol. Continue the task now using only an executable tool call \
shape or a final user-facing result. If you need to act, emit a valid tool call object. \
Do not narrate internal protocol, and do not stop at 'I will' or 'let me'."
.to_string(),
896,
true,
),
GuardId::Perspective => (
"Operator directive: your previous response narrated the user's actions or thoughts \
in first person ('I glance', 'I feel', 'my sword'). You must NEVER speak AS the user. \
Rewrite using second person ('you see', 'your blade') to describe observable facts, \
or describe the world's response to the user's actions. For NPC dialogue, use quoted \
speech attributed to the NPC. Do not assert the user's internal states as facts."
.to_string(),
768,
false,
),
GuardId::UserEcho => (
"Operator directive: your previous response echoed the user's own words back verbatim. \
React to the sentiment and meaning of what the user said using your own original phrasing. \
Do not quote or parrot the user's memorable phrases."
.to_string(),
768,
false,
),
GuardId::ExecutionTruth => (
"Operator directive: your previous response either claimed to execute a tool without \
actually calling one, or denied having tool access when tools are available. \
Actually invoke the appropriate tool (e.g., call `bash` with the command) and report \
the real output. Do not narrate what a tool would do — call it."
.to_string(),
896,
true,
),
GuardId::DeclaredAction => (
"Operator directive: the user declared an action but your response did not resolve it. \
Acknowledge the declared action, determine if a roll or check is needed, and describe \
the outcome. Do not redirect to a different action or ignore what the user said they do."
.to_string(),
768,
false,
),
_ => (
"Operator directive: the previous response was filtered. Provide a concrete, \
complete answer now."
.to_string(),
512,
false,
),
};
let retry_note = format!(
"RETRY: Your previous response was rejected: {directive}\n\
The user's original request was: \"{user_prompt}\"\n\
Generate a new response that addresses the user's request while avoiding the issue described above.",
user_prompt = ctx.user_prompt,
);
let mut retry_messages = prepared.request.messages.clone();
retry_messages.push(roboticus_llm::format::UnifiedMessage {
role: "user".into(),
content: retry_note,
parts: None,
});
roboticus_llm::format::UnifiedRequest {
model: prepared.request.model.clone(),
messages: retry_messages,
max_tokens: Some(max_tokens),
temperature: match guard_id {
GuardId::LowValueParroting => prepared.request.temperature,
_ => None,
},
system: None,
quality_target: match guard_id {
GuardId::LowValueParroting => prepared.request.quality_target,
_ => None,
},
tools: if preserve_tools {
prepared.request.tools.clone()
} else {
vec![]
},
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_retry_react_actions(
state: &AppState,
retry_req: &roboticus_llm::format::UnifiedRequest,
preferred_model: &str,
initial_content: String,
turn_id: &str,
authority: InputAuthority,
channel_label: Option<&str>,
delegation_provenance: &mut DelegationProvenance,
react_loop: &mut AgentLoop,
react_trace: &mut ReactTrace,
tool_results_acc: &mut Vec<(String, String)>,
resolved_model: &mut String,
total_in: &mut i64,
total_out: &mut i64,
total_cost: &mut f64,
) -> String {
let mut pending_calls = parse_tool_calls(&initial_content);
if pending_calls.is_empty()
&& let Some(single) = parse_tool_call(&initial_content)
{
pending_calls.push(single);
}
if pending_calls.is_empty() {
return initial_content;
}
let mut final_content = initial_content.clone();
let mut react_messages = retry_req.messages.clone();
react_messages.push(roboticus_llm::format::UnifiedMessage {
role: "assistant".into(),
content: initial_content,
parts: None,
});
while !pending_calls.is_empty() {
let mut observations = Vec::new();
let mut batch_aborted = false;
for (tn, tp) in &pending_calls {
if react_loop.is_looping(tn, &tp.to_string()) {
tracing::warn!(
tool = tn.as_str(),
"ReAct loop detected during retry — same tool+params repeated"
);
batch_aborted = true;
break;
}
if tn.to_ascii_lowercase().contains("subagent")
|| tn.to_ascii_lowercase().contains("delegate")
{
delegation_provenance.subagent_task_started = true;
}
react_loop.transition(ReactAction::Act {
tool_name: tn.clone(),
params: tp.to_string(),
});
if react_loop.state == ReactState::Done {
batch_aborted = true;
break;
}
let tool_start = std::time::Instant::now();
let tool_result =
execute_tool_call_detailed(state, tn, tp, turn_id, authority, channel_label).await;
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
let result_text = match &tool_result {
Ok(details) => {
if tn.to_ascii_lowercase().contains("subagent")
|| tn.to_ascii_lowercase().contains("delegate")
{
delegation_provenance.subagent_task_completed = true;
delegation_provenance.subagent_result_attached =
!details.output.trim().is_empty();
}
details.output.clone()
}
Err(err) => format!("error: {err}"),
};
react_trace.record(ReactStep::ToolCall {
tool_name: tn.clone(),
parameters_redacted: false,
result_summary: result_text.chars().take(120).collect(),
duration_ms: tool_duration_ms,
success: tool_result.is_ok(),
source: tool_result
.as_ref()
.map(|details| details.source.clone())
.unwrap_or(ToolSource::BuiltIn),
});
tool_results_acc.push((tn.clone(), result_text.clone()));
let observation = match tool_result {
Ok(_) => format!("[Tool {tn} succeeded]: {result_text}"),
Err(err) => format!("[Tool {tn} failed]: {err}"),
};
observations.push(if roboticus_agent::injection::scan_output(&observation) {
format!("[Tool {tn} result blocked by safety filter]")
} else {
observation
});
}
if batch_aborted && observations.is_empty() {
let last_error = tool_results_acc
.iter()
.rev()
.find(|(_, output)| output.starts_with("error:") || output.starts_with("Error:"))
.map(|(tool, output)| {
let snippet: String = output.chars().take(200).collect();
format!("The tool `{tool}` failed with: {snippet}")
})
.unwrap_or_default();
return if last_error.is_empty() {
"I attempted this task multiple times but the same tool call kept \
repeating without making progress. This usually means the approach \
needs to change — could you rephrase or suggest a different strategy?"
.to_string()
} else {
format!(
"I attempted this task multiple times but got stuck in a loop. \
{last_error}. Could you help me take a different approach?"
)
};
}
react_loop.transition(ReactAction::Observe);
react_messages.push(roboticus_llm::format::UnifiedMessage {
role: "user".into(),
content: observations.join("\n\n"),
parts: None,
});
let follow_req = roboticus_llm::format::UnifiedRequest {
model: retry_req.model.clone(),
messages: react_messages.clone(),
max_tokens: Some(2048),
temperature: None,
system: None,
quality_target: None,
tools: retry_req.tools.clone(),
};
let follow_content = match infer_with_fallback(state, &follow_req, preferred_model).await {
Ok(result) => {
*resolved_model = result.model.clone();
*total_in += result.tokens_in;
*total_out += result.tokens_out;
*total_cost += result.cost;
result.content
}
Err(e) => format!("LLM follow-up error: {e}"),
};
react_messages.push(roboticus_llm::format::UnifiedMessage {
role: "assistant".into(),
content: follow_content.clone(),
parts: None,
});
let follow_content = sanitize_model_output(follow_content, state.hmac_secret.as_ref());
pending_calls = parse_tool_calls(&follow_content);
if pending_calls.is_empty()
&& let Some(single) = parse_tool_call(&follow_content)
{
pending_calls.push(single);
}
if pending_calls.is_empty() {
react_loop.transition(ReactAction::Finish);
final_content = follow_content;
}
}
final_content
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn apply_guards_with_retry(
content: String,
ctx: &GuardContext<'_>,
state: &AppState,
prepared: &PreparedInference,
turn_id: &str,
authority: InputAuthority,
channel_label: Option<&str>,
delegation_provenance: &mut DelegationProvenance,
react_loop: &mut AgentLoop,
react_trace: &mut ReactTrace,
tool_results_acc: &mut Vec<(String, String)>,
resolved_model: &mut String,
total_in: &mut i64,
total_out: &mut i64,
total_cost: &mut f64,
) -> String {
let chain = guard_sets::full();
let original_content = content.clone();
let result = chain.apply(content, ctx);
match result.retry {
None => result.content,
Some(signal) => {
tracing::warn!(
guard = ?signal.guard_id,
reason = %signal.reason,
"guard chain requested inference retry"
);
react_trace.record(super::super::flight_recorder::ReactStep::Guard {
guard_name: format!("{:?}", signal.guard_id),
fired: true,
action: "retry_requested".into(),
detail: Some(signal.reason.clone()),
rejected_content: Some(truncate_for_trace(&original_content)),
replacement_content: None,
});
let retry_req = build_retry_request(signal.guard_id, prepared, ctx);
match infer_with_fallback(state, &retry_req, &prepared.model).await {
Ok(retried_result) => {
*resolved_model = retried_result.model.clone();
*total_in += retried_result.tokens_in;
*total_out += retried_result.tokens_out;
*total_cost += retried_result.cost;
let retried_initial =
sanitize_model_output(retried_result.content, state.hmac_secret.as_ref());
let retried = if retry_req.tools.is_empty() {
retried_initial
} else {
execute_retry_react_actions(
state,
&retry_req,
&prepared.model,
retried_initial,
turn_id,
authority,
channel_label,
delegation_provenance,
react_loop,
react_trace,
tool_results_acc,
resolved_model,
total_in,
total_out,
total_cost,
)
.await
};
let fresh_scores = super::super::guard_registry::precompute_guard_scores(
&state.semantic_classifier,
&retried,
)
.await;
let retry_ctx = GuardContext {
user_prompt: ctx.user_prompt,
intents: ctx.intents,
tool_results: tool_results_acc,
agent_name: ctx.agent_name,
resolved_model: ctx.resolved_model,
delegation_provenance: ctx.delegation_provenance,
previous_assistant: ctx.previous_assistant,
prior_assistant_messages: ctx.prior_assistant_messages,
semantic_guard_scores: fresh_scores,
subagent_names: ctx.subagent_names.clone(),
};
let resumed = chain.apply(retried, &retry_ctx);
match resumed.retry {
None => {
if matches!(signal.guard_id, GuardId::TaskDeferral)
&& is_generic_degraded_fallback(&resumed.content)
{
tracing::warn!(
"task-deferral retry degraded into generic fallback; using deterministic blocker/result fallback"
);
deterministic_guard_fallback(
signal.guard_id,
&resumed.content,
tool_results_acc,
ctx.user_prompt,
ctx.agent_name,
)
} else {
resumed.content
}
}
Some(signal2) => {
tracing::warn!(
"guard chain retry still failing; using deterministic fallback"
);
let fallback = deterministic_guard_fallback(
signal.guard_id,
&resumed.content,
tool_results_acc,
ctx.user_prompt,
ctx.agent_name,
);
react_trace.record(super::super::flight_recorder::ReactStep::Guard {
guard_name: format!("{:?}", signal2.guard_id),
fired: true,
action: "fallback_after_retry_exhausted".into(),
detail: Some(signal2.reason.clone()),
rejected_content: Some(truncate_for_trace(&resumed.content)),
replacement_content: Some(truncate_for_trace(&fallback)),
});
fallback
}
}
}
Err(e) => {
tracing::warn!(error = %e, "guard retry inference failed; using deterministic fallback");
deterministic_guard_fallback(
signal.guard_id,
&result.content,
tool_results_acc,
ctx.user_prompt,
ctx.agent_name,
)
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn rescue_task_protocol_leak(
content: String,
state: &AppState,
prepared: &PreparedInference,
user_prompt: &str,
agent_name: &str,
turn_id: &str,
authority: InputAuthority,
channel_label: Option<&str>,
delegation_provenance: &mut DelegationProvenance,
react_loop: &mut AgentLoop,
react_trace: &mut ReactTrace,
tool_results_acc: &mut Vec<(String, String)>,
resolved_model: &mut String,
total_in: &mut i64,
total_out: &mut i64,
total_cost: &mut f64,
) -> String {
let retry_req = build_retry_request(
GuardId::InternalProtocol,
prepared,
&GuardContext {
user_prompt,
intents: &prepared.intents,
tool_results: tool_results_acc,
agent_name,
resolved_model,
delegation_provenance,
previous_assistant: prepared.previous_assistant.as_deref(),
prior_assistant_messages: &[],
semantic_guard_scores: std::collections::HashMap::new(),
subagent_names: vec![],
},
);
let rescued = execute_retry_react_actions(
state,
&retry_req,
&prepared.model,
content.clone(),
turn_id,
authority,
channel_label,
delegation_provenance,
react_loop,
react_trace,
tool_results_acc,
resolved_model,
total_in,
total_out,
total_cost,
)
.await;
if rescued.trim() != content.trim() && !contains_internal_protocol_marker(&rescued) {
rescued
} else {
let fallback = deterministic_quality_fallback(user_prompt, agent_name);
react_trace.record(super::super::flight_recorder::ReactStep::Guard {
guard_name: "streaming_buffer_guard".into(),
fired: true,
action: "fallback_after_retry_exhausted".into(),
detail: Some("streaming buffer retry did not produce improved output".into()),
rejected_content: Some(truncate_for_trace(&content)),
replacement_content: Some(truncate_for_trace(&fallback)),
});
fallback
}
}
fn truncate_for_trace(content: &str) -> String {
const MAX: usize = 500;
if content.len() <= MAX {
content.to_string()
} else {
let boundary = content
.char_indices()
.map(|(i, _)| i)
.take_while(|&i| i <= MAX)
.last()
.unwrap_or(0);
format!(
"{}... [truncated, {} total chars]",
&content[..boundary],
content.len()
)
}
}